freertos_rust/patterns/
processor.rs

1use crate::prelude::v1::*;
2use crate::base::*;
3use crate::mutex::*;
4use crate::queue::*;
5use crate::units::*;
6
7pub type SharedClientWithReplyQueue<O> = Arc<ClientWithReplyQueue<O>>;
8pub type Client<I> = ProcessorClient<I, ()>;
9pub type ClientWithReplies<I, O> = ProcessorClient<I, SharedClientWithReplyQueue<O>>;
10
11pub trait ReplyableMessage {
12    fn reply_to_client_id(&self) -> Option<usize>;
13}
14
15#[derive(Copy, Clone)]
16pub struct InputMessage<I> where I: Copy {
17    val: I,
18    reply_to_client_id: Option<usize>
19}
20
21impl<I> InputMessage<I> where I: Copy {
22    pub fn request(val: I) -> Self {
23        InputMessage { val: val, reply_to_client_id: None }
24    }
25
26    pub fn request_with_reply(val: I, client_id: usize) -> Self {
27        InputMessage { val: val, reply_to_client_id: Some(client_id) }
28    }
29
30    pub fn get_val(&self) -> I {
31        self.val
32    }
33}
34
35impl<I> ReplyableMessage for InputMessage<I> where I: Copy {
36    fn reply_to_client_id(&self) -> Option<usize> {
37        self.reply_to_client_id
38    }
39}
40
41pub struct Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
42    queue: Arc<Queue<I>>,
43    inner: Arc<Mutex<ProcessorInner<O>>>,
44}
45
46impl<I, O> Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
47    pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
48        let p = ProcessorInner {
49            clients: Vec::new(), 
50            next_client_id: 1
51        };        
52        let p = Arc::new(Mutex::new(p)?);
53        let p = Processor {
54            queue: Arc::new(Queue::new(queue_size)?),
55            inner: p
56        };
57        Ok(p)
58    }
59
60    pub fn new_client(&self) -> Result<Client<I>, FreeRtosError> {
61        let c = ProcessorClient {
62            processor_queue: Arc::downgrade(&self.queue),
63            client_reply: ()
64        };
65
66        Ok(c)
67    }
68
69    
70    pub fn new_client_with_reply<D: DurationTicks>(&self, client_receive_queue_size: usize, max_wait: D) -> Result<ProcessorClient<I, SharedClientWithReplyQueue<O>>, FreeRtosError> {        
71        if client_receive_queue_size == 0 {
72            return Err(FreeRtosError::InvalidQueueSize);
73        }
74
75        let client_reply = {
76            let mut processor = self.inner.lock(max_wait)?;
77
78            let c = ClientWithReplyQueue {
79                id: processor.next_client_id,
80                processor_inner: self.inner.clone(),
81                receive_queue: Queue::new(client_receive_queue_size)?
82            };            
83
84            let c = Arc::new(c);
85            processor.clients.push((c.id, Arc::downgrade(&c)));
86
87            processor.next_client_id += 1;
88
89            c
90        };
91
92        let c = ProcessorClient {
93            processor_queue: Arc::downgrade(&self.queue),
94            client_reply: client_reply
95        };
96
97        Ok(c)
98    }
99
100    pub fn get_receive_queue(&self) -> &Queue<I> {
101        &*self.queue
102    }
103
104    pub fn reply<D: DurationTicks>(&self, received_message: I, reply: O, max_wait: D) -> Result<bool, FreeRtosError> {
105        if let Some(client_id) = received_message.reply_to_client_id() {            
106            let inner = self.inner.lock(max_wait)?;
107            if let Some(client) = inner.clients.iter().flat_map(|ref x| x.1.upgrade().into_iter()).find(|x| x.id == client_id) {
108                client.receive_queue.send(reply, max_wait)?;
109                return Ok(true);
110            }
111        }
112
113        Ok(false)
114    }
115}
116
117impl<I, O> Processor<InputMessage<I>, O> where I: Copy, O: Copy {
118    pub fn reply_val<D: DurationTicks>(&self, received_message: InputMessage<I>, reply: O, max_wait: D) -> Result<bool, FreeRtosError> {
119        self.reply(received_message, reply, max_wait)
120    }
121}
122
123struct ProcessorInner<O> where O: Copy {
124    clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
125    next_client_id: usize
126}
127
128impl<O> ProcessorInner<O> where O: Copy {
129    fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
130        self.clients.retain(|ref x| x.0 != client.id)
131    }
132}
133
134
135
136pub struct ProcessorClient<I, C> where I: ReplyableMessage + Copy {    
137    processor_queue: Weak<Queue<I>>,
138    client_reply: C
139}
140
141impl<I, O> ProcessorClient<I, O> where I: ReplyableMessage + Copy {
142    pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
143        let processor_queue = self.processor_queue.upgrade().ok_or(FreeRtosError::ProcessorHasShutDown)?;
144        processor_queue.send(message, max_wait)?;
145        Ok(())   
146    }
147
148    pub fn send_from_isr(&self, context: &mut crate::isr::InterruptContext, message: I) -> Result<(), FreeRtosError> {
149        let processor_queue = self.processor_queue.upgrade().ok_or(FreeRtosError::ProcessorHasShutDown)?;
150        processor_queue.send_from_isr(context, message)
151    }
152}
153
154impl<I> ProcessorClient<InputMessage<I>, ()> where I: Copy {
155    pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
156        self.send(InputMessage::request(val), max_wait)
157    }
158
159    pub fn send_val_from_isr(&self, context: &mut crate::isr::InterruptContext, val: I) -> Result<(), FreeRtosError> {
160        self.send_from_isr(context, InputMessage::request(val))
161    }
162}
163
164impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>> where I: ReplyableMessage + Copy, O: Copy {
165    pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
166        self.send(message, max_wait)?;
167        self.client_reply.receive_queue.receive(max_wait)
168    }
169
170    pub fn get_receive_queue(&self) -> &Queue<O> {
171        &self.client_reply.receive_queue
172    }
173}
174
175impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>> where I: Copy, O: Copy {
176    pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
177        self.send(InputMessage::request(val), max_wait)
178    }
179    
180    pub fn call_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<O, FreeRtosError> {
181        let reply = self.call(InputMessage::request_with_reply(val, self.client_reply.id), max_wait)?;
182        Ok(reply)
183    }
184}
185
186impl<I, C> Clone for ProcessorClient<I, C> where I: ReplyableMessage + Copy, C: Clone {
187    fn clone(&self) -> Self {
188        ProcessorClient {
189            processor_queue: self.processor_queue.clone(),
190            client_reply: self.client_reply.clone()
191        }
192    }
193}
194
195
196
197pub struct ClientWithReplyQueue<O> where O: Copy {
198    id: usize,
199    processor_inner: Arc<Mutex<ProcessorInner<O>>>,
200    receive_queue: Queue<O>
201}
202
203impl<O> Drop for ClientWithReplyQueue<O> where O: Copy {
204    fn drop(&mut self) {
205        if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
206            p.remove_client_reply(&self);
207        }
208    }
209}