1use crate::base::*;
2use crate::mutex::*;
3use crate::prelude::v1::*;
4use crate::queue::*;
5use crate::units::*;
6
7pub type SharedClientWithReplyQueue<O> = Arc<ClientWithReplyQueue<O>>;
8pub type Client<I> = ProcessorClient<I, ()>;
9pub type ClientWithReplies<I, O> = ProcessorClient<I, SharedClientWithReplyQueue<O>>;
10
11pub trait ReplyableMessage {
12 fn reply_to_client_id(&self) -> Option<usize>;
13}
14
15#[derive(Copy, Clone)]
16pub struct InputMessage<I>
17where
18 I: Copy + Send,
19{
20 val: I,
21 reply_to_client_id: Option<usize>,
22}
23
24impl<I> InputMessage<I>
25where
26 I: Copy + Send,
27{
28 pub fn request(val: I) -> Self {
29 InputMessage {
30 val: val,
31 reply_to_client_id: None,
32 }
33 }
34
35 pub fn request_with_reply(val: I, client_id: usize) -> Self {
36 InputMessage {
37 val: val,
38 reply_to_client_id: Some(client_id),
39 }
40 }
41
42 pub fn get_val(&self) -> I {
43 self.val
44 }
45}
46
47impl<I> ReplyableMessage for InputMessage<I>
48where
49 I: Copy + Send,
50{
51 fn reply_to_client_id(&self) -> Option<usize> {
52 self.reply_to_client_id
53 }
54}
55
56pub struct Processor<I, O>
57where
58 I: ReplyableMessage + Copy + Send,
59 O: Copy + Send,
60{
61 queue: Arc<Queue<I>>,
62 inner: Arc<Mutex<ProcessorInner<O>>>,
63}
64
65impl<I, O> Processor<I, O>
66where
67 I: ReplyableMessage + Copy + Send,
68 O: Copy + Send,
69{
70 pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
71 let p = ProcessorInner {
72 clients: Vec::new(),
73 next_client_id: 1,
74 };
75 let p = Arc::new(Mutex::new(p)?);
76 let p = Processor {
77 queue: Arc::new(Queue::new(queue_size)?),
78 inner: p,
79 };
80 Ok(p)
81 }
82
83 pub fn new_client(&self) -> Result<Client<I>, FreeRtosError> {
84 let c = ProcessorClient {
85 processor_queue: Arc::downgrade(&self.queue),
86 client_reply: (),
87 };
88
89 Ok(c)
90 }
91
92 pub fn new_client_with_reply<D: DurationTicks>(
93 &self,
94 client_receive_queue_size: usize,
95 max_wait: D,
96 ) -> Result<ProcessorClient<I, SharedClientWithReplyQueue<O>>, FreeRtosError> {
97 if client_receive_queue_size == 0 {
98 return Err(FreeRtosError::InvalidQueueSize);
99 }
100
101 let client_reply = {
102 let mut processor = self.inner.lock(max_wait)?;
103
104 let c = ClientWithReplyQueue {
105 id: processor.next_client_id,
106 processor_inner: self.inner.clone(),
107 receive_queue: Queue::new(client_receive_queue_size)?,
108 };
109
110 let c = Arc::new(c);
111 processor.clients.push((c.id, Arc::downgrade(&c)));
112
113 processor.next_client_id += 1;
114
115 c
116 };
117
118 let c = ProcessorClient {
119 processor_queue: Arc::downgrade(&self.queue),
120 client_reply: client_reply,
121 };
122
123 Ok(c)
124 }
125
126 pub fn get_receive_queue(&self) -> &Queue<I> {
127 &*self.queue
128 }
129
130 pub fn reply<D: DurationTicks>(
131 &self,
132 received_message: I,
133 reply: O,
134 max_wait: D,
135 ) -> Result<bool, FreeRtosError> {
136 if let Some(client_id) = received_message.reply_to_client_id() {
137 let inner = self.inner.lock(max_wait)?;
138 if let Some(client) = inner
139 .clients
140 .iter()
141 .flat_map(|ref x| x.1.upgrade().into_iter())
142 .find(|x| x.id == client_id)
143 {
144 client
145 .receive_queue
146 .send(reply, max_wait)
147 .map_err(|err| err.error())?;
148 return Ok(true);
149 }
150 }
151
152 Ok(false)
153 }
154}
155
156impl<I, O> Processor<InputMessage<I>, O>
157where
158 I: Copy + Send,
159 O: Copy + Send,
160{
161 pub fn reply_val<D: DurationTicks>(
162 &self,
163 received_message: InputMessage<I>,
164 reply: O,
165 max_wait: D,
166 ) -> Result<bool, FreeRtosError> {
167 self.reply(received_message, reply, max_wait)
168 }
169}
170
171struct ProcessorInner<O>
172where
173 O: Copy + Send,
174{
175 clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
176 next_client_id: usize,
177}
178
179impl<O> ProcessorInner<O>
180where
181 O: Copy + Send,
182{
183 fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
184 self.clients.retain(|ref x| x.0 != client.id)
185 }
186}
187
188pub struct ProcessorClient<I, C>
189where
190 I: ReplyableMessage + Copy + Send,
191{
192 processor_queue: Weak<Queue<I>>,
193 client_reply: C,
194}
195
196impl<I, O> ProcessorClient<I, O>
197where
198 I: ReplyableMessage + Copy + Send,
199{
200 pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
201 let processor_queue = self
202 .processor_queue
203 .upgrade()
204 .ok_or(FreeRtosError::ProcessorHasShutDown)?;
205 processor_queue
206 .send(message, max_wait)
207 .map_err(|err| err.error())?;
208 Ok(())
209 }
210
211 pub fn send_from_isr(
212 &self,
213 context: &mut crate::isr::InterruptContext,
214 message: I,
215 ) -> Result<(), FreeRtosError> {
216 let processor_queue = self
217 .processor_queue
218 .upgrade()
219 .ok_or(FreeRtosError::ProcessorHasShutDown)?;
220 processor_queue
221 .send_from_isr(context, message)
222 .map_err(|err| err.error())
223 }
224}
225
226impl<I> ProcessorClient<InputMessage<I>, ()>
227where
228 I: Copy + Send,
229{
230 pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
231 self.send(InputMessage::request(val), max_wait)
232 }
233
234 pub fn send_val_from_isr(
235 &self,
236 context: &mut crate::isr::InterruptContext,
237 val: I,
238 ) -> Result<(), FreeRtosError> {
239 self.send_from_isr(context, InputMessage::request(val))
240 }
241}
242
243impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>>
244where
245 I: ReplyableMessage + Copy + Send,
246 O: Copy + Send,
247{
248 pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
249 self.send(message, max_wait)?;
250 self.client_reply.receive_queue.receive(max_wait)
251 }
252
253 pub fn get_receive_queue(&self) -> &Queue<O> {
254 &self.client_reply.receive_queue
255 }
256}
257
258impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>>
259where
260 I: Copy + Send,
261 O: Copy + Send,
262{
263 pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
264 self.send(InputMessage::request(val), max_wait)
265 }
266
267 pub fn call_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<O, FreeRtosError> {
268 let reply = self.call(
269 InputMessage::request_with_reply(val, self.client_reply.id),
270 max_wait,
271 )?;
272 Ok(reply)
273 }
274}
275
276impl<I, C> Clone for ProcessorClient<I, C>
277where
278 I: ReplyableMessage + Copy + Send,
279 C: Clone,
280{
281 fn clone(&self) -> Self {
282 ProcessorClient {
283 processor_queue: self.processor_queue.clone(),
284 client_reply: self.client_reply.clone(),
285 }
286 }
287}
288
289pub struct ClientWithReplyQueue<O>
290where
291 O: Copy + Send,
292{
293 id: usize,
294 processor_inner: Arc<Mutex<ProcessorInner<O>>>,
295 receive_queue: Queue<O>,
296}
297
298impl<O> Drop for ClientWithReplyQueue<O>
299where
300 O: Copy + Send,
301{
302 fn drop(&mut self) {
303 if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
304 p.remove_client_reply(&self);
305 }
306 }
307}