1use crate::prelude::v1::*;
2use crate::base::*;
3use crate::mutex::*;
4use crate::queue::*;
5use crate::units::*;
6
7pub type SharedClientWithReplyQueue<O> = Arc<ClientWithReplyQueue<O>>;
8pub type Client<I> = ProcessorClient<I, ()>;
9pub type ClientWithReplies<I, O> = ProcessorClient<I, SharedClientWithReplyQueue<O>>;
10
11pub trait ReplyableMessage {
12 fn reply_to_client_id(&self) -> Option<usize>;
13}
14
15#[derive(Copy, Clone)]
16pub struct InputMessage<I> where I: Copy {
17 val: I,
18 reply_to_client_id: Option<usize>
19}
20
21impl<I> InputMessage<I> where I: Copy {
22 pub fn request(val: I) -> Self {
23 InputMessage { val: val, reply_to_client_id: None }
24 }
25
26 pub fn request_with_reply(val: I, client_id: usize) -> Self {
27 InputMessage { val: val, reply_to_client_id: Some(client_id) }
28 }
29
30 pub fn get_val(&self) -> I {
31 self.val
32 }
33}
34
35impl<I> ReplyableMessage for InputMessage<I> where I: Copy {
36 fn reply_to_client_id(&self) -> Option<usize> {
37 self.reply_to_client_id
38 }
39}
40
41pub struct Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
42 queue: Arc<Queue<I>>,
43 inner: Arc<Mutex<ProcessorInner<O>>>,
44}
45
46impl<I, O> Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
47 pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
48 let p = ProcessorInner {
49 clients: Vec::new(),
50 next_client_id: 1
51 };
52 let p = Arc::new(Mutex::new(p)?);
53 let p = Processor {
54 queue: Arc::new(Queue::new(queue_size)?),
55 inner: p
56 };
57 Ok(p)
58 }
59
60 pub fn new_client(&self) -> Result<Client<I>, FreeRtosError> {
61 let c = ProcessorClient {
62 processor_queue: Arc::downgrade(&self.queue),
63 client_reply: ()
64 };
65
66 Ok(c)
67 }
68
69
70 pub fn new_client_with_reply<D: DurationTicks>(&self, client_receive_queue_size: usize, max_wait: D) -> Result<ProcessorClient<I, SharedClientWithReplyQueue<O>>, FreeRtosError> {
71 if client_receive_queue_size == 0 {
72 return Err(FreeRtosError::InvalidQueueSize);
73 }
74
75 let client_reply = {
76 let mut processor = self.inner.lock(max_wait)?;
77
78 let c = ClientWithReplyQueue {
79 id: processor.next_client_id,
80 processor_inner: self.inner.clone(),
81 receive_queue: Queue::new(client_receive_queue_size)?
82 };
83
84 let c = Arc::new(c);
85 processor.clients.push((c.id, Arc::downgrade(&c)));
86
87 processor.next_client_id += 1;
88
89 c
90 };
91
92 let c = ProcessorClient {
93 processor_queue: Arc::downgrade(&self.queue),
94 client_reply: client_reply
95 };
96
97 Ok(c)
98 }
99
100 pub fn get_receive_queue(&self) -> &Queue<I> {
101 &*self.queue
102 }
103
104 pub fn reply<D: DurationTicks>(&self, received_message: I, reply: O, max_wait: D) -> Result<bool, FreeRtosError> {
105 if let Some(client_id) = received_message.reply_to_client_id() {
106 let inner = self.inner.lock(max_wait)?;
107 if let Some(client) = inner.clients.iter().flat_map(|ref x| x.1.upgrade().into_iter()).find(|x| x.id == client_id) {
108 client.receive_queue.send(reply, max_wait)?;
109 return Ok(true);
110 }
111 }
112
113 Ok(false)
114 }
115}
116
117impl<I, O> Processor<InputMessage<I>, O> where I: Copy, O: Copy {
118 pub fn reply_val<D: DurationTicks>(&self, received_message: InputMessage<I>, reply: O, max_wait: D) -> Result<bool, FreeRtosError> {
119 self.reply(received_message, reply, max_wait)
120 }
121}
122
123struct ProcessorInner<O> where O: Copy {
124 clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
125 next_client_id: usize
126}
127
128impl<O> ProcessorInner<O> where O: Copy {
129 fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
130 self.clients.retain(|ref x| x.0 != client.id)
131 }
132}
133
134
135
136pub struct ProcessorClient<I, C> where I: ReplyableMessage + Copy {
137 processor_queue: Weak<Queue<I>>,
138 client_reply: C
139}
140
141impl<I, O> ProcessorClient<I, O> where I: ReplyableMessage + Copy {
142 pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
143 let processor_queue = self.processor_queue.upgrade().ok_or(FreeRtosError::ProcessorHasShutDown)?;
144 processor_queue.send(message, max_wait)?;
145 Ok(())
146 }
147
148 pub fn send_from_isr(&self, context: &mut crate::isr::InterruptContext, message: I) -> Result<(), FreeRtosError> {
149 let processor_queue = self.processor_queue.upgrade().ok_or(FreeRtosError::ProcessorHasShutDown)?;
150 processor_queue.send_from_isr(context, message)
151 }
152}
153
154impl<I> ProcessorClient<InputMessage<I>, ()> where I: Copy {
155 pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
156 self.send(InputMessage::request(val), max_wait)
157 }
158
159 pub fn send_val_from_isr(&self, context: &mut crate::isr::InterruptContext, val: I) -> Result<(), FreeRtosError> {
160 self.send_from_isr(context, InputMessage::request(val))
161 }
162}
163
164impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>> where I: ReplyableMessage + Copy, O: Copy {
165 pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
166 self.send(message, max_wait)?;
167 self.client_reply.receive_queue.receive(max_wait)
168 }
169
170 pub fn get_receive_queue(&self) -> &Queue<O> {
171 &self.client_reply.receive_queue
172 }
173}
174
175impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>> where I: Copy, O: Copy {
176 pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
177 self.send(InputMessage::request(val), max_wait)
178 }
179
180 pub fn call_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<O, FreeRtosError> {
181 let reply = self.call(InputMessage::request_with_reply(val, self.client_reply.id), max_wait)?;
182 Ok(reply)
183 }
184}
185
186impl<I, C> Clone for ProcessorClient<I, C> where I: ReplyableMessage + Copy, C: Clone {
187 fn clone(&self) -> Self {
188 ProcessorClient {
189 processor_queue: self.processor_queue.clone(),
190 client_reply: self.client_reply.clone()
191 }
192 }
193}
194
195
196
197pub struct ClientWithReplyQueue<O> where O: Copy {
198 id: usize,
199 processor_inner: Arc<Mutex<ProcessorInner<O>>>,
200 receive_queue: Queue<O>
201}
202
203impl<O> Drop for ClientWithReplyQueue<O> where O: Copy {
204 fn drop(&mut self) {
205 if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
206 p.remove_client_reply(&self);
207 }
208 }
209}