freertos_rust/patterns/
processor.rs

1use crate::base::*;
2use crate::mutex::*;
3use crate::prelude::v1::*;
4use crate::queue::*;
5use crate::units::*;
6
7pub type SharedClientWithReplyQueue<O> = Arc<ClientWithReplyQueue<O>>;
8pub type Client<I> = ProcessorClient<I, ()>;
9pub type ClientWithReplies<I, O> = ProcessorClient<I, SharedClientWithReplyQueue<O>>;
10
11pub trait ReplyableMessage {
12    fn reply_to_client_id(&self) -> Option<usize>;
13}
14
15#[derive(Copy, Clone)]
16pub struct InputMessage<I>
17where
18    I: Copy + Send,
19{
20    val: I,
21    reply_to_client_id: Option<usize>,
22}
23
24impl<I> InputMessage<I>
25where
26    I: Copy + Send,
27{
28    pub fn request(val: I) -> Self {
29        InputMessage {
30            val: val,
31            reply_to_client_id: None,
32        }
33    }
34
35    pub fn request_with_reply(val: I, client_id: usize) -> Self {
36        InputMessage {
37            val: val,
38            reply_to_client_id: Some(client_id),
39        }
40    }
41
42    pub fn get_val(&self) -> I {
43        self.val
44    }
45}
46
47impl<I> ReplyableMessage for InputMessage<I>
48where
49    I: Copy + Send,
50{
51    fn reply_to_client_id(&self) -> Option<usize> {
52        self.reply_to_client_id
53    }
54}
55
56pub struct Processor<I, O>
57where
58    I: ReplyableMessage + Copy + Send,
59    O: Copy + Send,
60{
61    queue: Arc<Queue<I>>,
62    inner: Arc<Mutex<ProcessorInner<O>>>,
63}
64
65impl<I, O> Processor<I, O>
66where
67    I: ReplyableMessage + Copy + Send,
68    O: Copy + Send,
69{
70    pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
71        let p = ProcessorInner {
72            clients: Vec::new(),
73            next_client_id: 1,
74        };
75        let p = Arc::new(Mutex::new(p)?);
76        let p = Processor {
77            queue: Arc::new(Queue::new(queue_size)?),
78            inner: p,
79        };
80        Ok(p)
81    }
82
83    pub fn new_client(&self) -> Result<Client<I>, FreeRtosError> {
84        let c = ProcessorClient {
85            processor_queue: Arc::downgrade(&self.queue),
86            client_reply: (),
87        };
88
89        Ok(c)
90    }
91
92    pub fn new_client_with_reply<D: DurationTicks>(
93        &self,
94        client_receive_queue_size: usize,
95        max_wait: D,
96    ) -> Result<ProcessorClient<I, SharedClientWithReplyQueue<O>>, FreeRtosError> {
97        if client_receive_queue_size == 0 {
98            return Err(FreeRtosError::InvalidQueueSize);
99        }
100
101        let client_reply = {
102            let mut processor = self.inner.lock(max_wait)?;
103
104            let c = ClientWithReplyQueue {
105                id: processor.next_client_id,
106                processor_inner: self.inner.clone(),
107                receive_queue: Queue::new(client_receive_queue_size)?,
108            };
109
110            let c = Arc::new(c);
111            processor.clients.push((c.id, Arc::downgrade(&c)));
112
113            processor.next_client_id += 1;
114
115            c
116        };
117
118        let c = ProcessorClient {
119            processor_queue: Arc::downgrade(&self.queue),
120            client_reply: client_reply,
121        };
122
123        Ok(c)
124    }
125
126    pub fn get_receive_queue(&self) -> &Queue<I> {
127        &*self.queue
128    }
129
130    pub fn reply<D: DurationTicks>(
131        &self,
132        received_message: I,
133        reply: O,
134        max_wait: D,
135    ) -> Result<bool, FreeRtosError> {
136        if let Some(client_id) = received_message.reply_to_client_id() {
137            let inner = self.inner.lock(max_wait)?;
138            if let Some(client) = inner
139                .clients
140                .iter()
141                .flat_map(|ref x| x.1.upgrade().into_iter())
142                .find(|x| x.id == client_id)
143            {
144                client
145                    .receive_queue
146                    .send(reply, max_wait)
147                    .map_err(|err| err.error())?;
148                return Ok(true);
149            }
150        }
151
152        Ok(false)
153    }
154}
155
156impl<I, O> Processor<InputMessage<I>, O>
157where
158    I: Copy + Send,
159    O: Copy + Send,
160{
161    pub fn reply_val<D: DurationTicks>(
162        &self,
163        received_message: InputMessage<I>,
164        reply: O,
165        max_wait: D,
166    ) -> Result<bool, FreeRtosError> {
167        self.reply(received_message, reply, max_wait)
168    }
169}
170
171struct ProcessorInner<O>
172where
173    O: Copy + Send,
174{
175    clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
176    next_client_id: usize,
177}
178
179impl<O> ProcessorInner<O>
180where
181    O: Copy + Send,
182{
183    fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
184        self.clients.retain(|ref x| x.0 != client.id)
185    }
186}
187
188pub struct ProcessorClient<I, C>
189where
190    I: ReplyableMessage + Copy + Send,
191{
192    processor_queue: Weak<Queue<I>>,
193    client_reply: C,
194}
195
196impl<I, O> ProcessorClient<I, O>
197where
198    I: ReplyableMessage + Copy + Send,
199{
200    pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
201        let processor_queue = self
202            .processor_queue
203            .upgrade()
204            .ok_or(FreeRtosError::ProcessorHasShutDown)?;
205        processor_queue
206            .send(message, max_wait)
207            .map_err(|err| err.error())?;
208        Ok(())
209    }
210
211    pub fn send_from_isr(
212        &self,
213        context: &mut crate::isr::InterruptContext,
214        message: I,
215    ) -> Result<(), FreeRtosError> {
216        let processor_queue = self
217            .processor_queue
218            .upgrade()
219            .ok_or(FreeRtosError::ProcessorHasShutDown)?;
220        processor_queue
221            .send_from_isr(context, message)
222            .map_err(|err| err.error())
223    }
224}
225
226impl<I> ProcessorClient<InputMessage<I>, ()>
227where
228    I: Copy + Send,
229{
230    pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
231        self.send(InputMessage::request(val), max_wait)
232    }
233
234    pub fn send_val_from_isr(
235        &self,
236        context: &mut crate::isr::InterruptContext,
237        val: I,
238    ) -> Result<(), FreeRtosError> {
239        self.send_from_isr(context, InputMessage::request(val))
240    }
241}
242
243impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>>
244where
245    I: ReplyableMessage + Copy + Send,
246    O: Copy + Send,
247{
248    pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
249        self.send(message, max_wait)?;
250        self.client_reply.receive_queue.receive(max_wait)
251    }
252
253    pub fn get_receive_queue(&self) -> &Queue<O> {
254        &self.client_reply.receive_queue
255    }
256}
257
258impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>>
259where
260    I: Copy + Send,
261    O: Copy + Send,
262{
263    pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
264        self.send(InputMessage::request(val), max_wait)
265    }
266
267    pub fn call_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<O, FreeRtosError> {
268        let reply = self.call(
269            InputMessage::request_with_reply(val, self.client_reply.id),
270            max_wait,
271        )?;
272        Ok(reply)
273    }
274}
275
276impl<I, C> Clone for ProcessorClient<I, C>
277where
278    I: ReplyableMessage + Copy + Send,
279    C: Clone,
280{
281    fn clone(&self) -> Self {
282        ProcessorClient {
283            processor_queue: self.processor_queue.clone(),
284            client_reply: self.client_reply.clone(),
285        }
286    }
287}
288
289pub struct ClientWithReplyQueue<O>
290where
291    O: Copy + Send,
292{
293    id: usize,
294    processor_inner: Arc<Mutex<ProcessorInner<O>>>,
295    receive_queue: Queue<O>,
296}
297
298impl<O> Drop for ClientWithReplyQueue<O>
299where
300    O: Copy + Send,
301{
302    fn drop(&mut self) {
303        if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
304            p.remove_client_reply(&self);
305        }
306    }
307}