tinkerforge_async/
converting_callback_receiver.rs1use std::sync::mpsc::{RecvTimeoutError, TryRecvError};
5use std::{marker::PhantomData, sync::mpsc::Receiver, time::Duration};
6
7use crate::byte_converter::FromByteSlice;
8
9#[derive(Copy, Clone, Debug, PartialEq)]
11pub enum CallbackRecvError {
12 QueueDisconnected,
14 MalformedPacket,
16}
17
18impl std::fmt::Display for CallbackRecvError {
19 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
20 let message = match self {
21 CallbackRecvError::QueueDisconnected => "The queue was disconnected. This usually happens if the ip connection is destroyed.",
22 CallbackRecvError::MalformedPacket => {
23 "The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?"
24 }
25 };
26 write!(f, "{}", message)
27 }
28}
29
30#[derive(Copy, Clone, Debug, PartialEq)]
32pub enum CallbackRecvTimeoutError {
33 QueueDisconnected,
35 QueueTimeout,
37 MalformedPacket,
39}
40
41impl std::fmt::Display for CallbackRecvTimeoutError {
42 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
43 let message = match &self {
44 CallbackRecvTimeoutError::QueueDisconnected => {
45 "The queue was disconnected. This usually happens if the ip connection is destroyed."
46 }
47 CallbackRecvTimeoutError::QueueTimeout => "The request could not be responded to before the timeout was reached.",
48 CallbackRecvTimeoutError::MalformedPacket => {
49 "The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?"
50 }
51 };
52 write!(f, "{}", message)
53 }
54}
55
56impl std::error::Error for CallbackRecvTimeoutError {}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
60pub enum CallbackTryRecvError {
61 QueueDisconnected,
63 QueueEmpty,
65 MalformedPacket,
67}
68
69impl std::fmt::Display for CallbackTryRecvError {
70 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71 write!(
72 f,
73 "{}",
74 match &self {
75 CallbackTryRecvError::QueueDisconnected =>
76 "The queue was disconnected. This usually happens if the ip connection is destroyed.",
77 CallbackTryRecvError::QueueEmpty => "There are currently no responses available.",
78 CallbackTryRecvError::MalformedPacket =>
79 "The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?",
80 }
81 )
82 }
83}
84
85impl std::error::Error for CallbackTryRecvError {}
86
87pub struct ConvertingCallbackReceiver<T: FromByteSlice> {
110 receiver: Receiver<Vec<u8>>,
111 phantom: PhantomData<T>,
112}
113
114impl<T: FromByteSlice> ConvertingCallbackReceiver<T> {
115 pub fn new(receiver: Receiver<Vec<u8>>) -> ConvertingCallbackReceiver<T> {
117 ConvertingCallbackReceiver { receiver, phantom: PhantomData }
118 }
119
120 pub fn try_recv(&self) -> Result<T, CallbackTryRecvError> {
126 let recv_result = self.receiver.try_recv();
127 match recv_result {
128 Ok(bytes) => {
129 if T::bytes_expected() == bytes.len() {
130 Ok(T::from_le_byte_slice(&bytes))
131 } else {
132 Err(CallbackTryRecvError::MalformedPacket)
133 }
134 }
135 Err(TryRecvError::Disconnected) => Err(CallbackTryRecvError::QueueDisconnected),
136 Err(TryRecvError::Empty) => Err(CallbackTryRecvError::QueueEmpty),
137 }
138 }
139
140 pub fn recv_forever(&self) -> Result<T, CallbackRecvError> {
146 let recv_result = self.receiver.recv();
147 match recv_result {
148 Ok(bytes) => {
149 if T::bytes_expected() == bytes.len() {
150 Ok(T::from_le_byte_slice(&bytes))
151 } else {
152 Err(CallbackRecvError::MalformedPacket)
153 }
154 }
155 Err(_) => Err(CallbackRecvError::QueueDisconnected),
156 }
157 }
158
159 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, CallbackRecvTimeoutError> {
169 let recv_result = self.receiver.recv_timeout(timeout);
170 match recv_result {
171 Ok(bytes) => {
172 if T::bytes_expected() == bytes.len() {
173 Ok(T::from_le_byte_slice(&bytes))
174 } else {
175 Err(CallbackRecvTimeoutError::MalformedPacket)
176 }
177 }
178 Err(RecvTimeoutError::Disconnected) => Err(CallbackRecvTimeoutError::QueueDisconnected),
179 Err(RecvTimeoutError::Timeout) => Err(CallbackRecvTimeoutError::QueueTimeout),
180 }
181 }
182
183 pub fn iter(&self) -> Iter<T> {
190 Iter { rx: self }
191 }
192
193 pub fn try_iter(&self) -> TryIter<T> {
194 TryIter { rx: self }
195 }
196}
197
198pub struct Iter<'a, T: 'a + FromByteSlice> {
199 rx: &'a ConvertingCallbackReceiver<T>,
200}
201
202pub struct TryIter<'a, T: 'a + FromByteSlice> {
203 rx: &'a ConvertingCallbackReceiver<T>,
204}
205
206pub struct IntoIter<T: FromByteSlice> {
207 rx: ConvertingCallbackReceiver<T>,
208}
209
210impl<'a, T: FromByteSlice> Iterator for Iter<'a, T> {
211 type Item = T;
212
213 fn next(&mut self) -> Option<T> {
214 self.rx.recv_forever().ok()
215 }
216}
217
218impl<'a, T: FromByteSlice> Iterator for TryIter<'a, T> {
219 type Item = T;
220
221 fn next(&mut self) -> Option<T> {
222 self.rx.try_recv().ok()
223 }
224}
225
226impl<'a, T: FromByteSlice> IntoIterator for &'a ConvertingCallbackReceiver<T> {
227 type Item = T;
228 type IntoIter = Iter<'a, T>;
229
230 fn into_iter(self) -> Iter<'a, T> {
231 self.iter()
232 }
233}
234
235impl<T: FromByteSlice> Iterator for IntoIter<T> {
236 type Item = T;
237 fn next(&mut self) -> Option<T> {
238 self.rx.recv_forever().ok()
239 }
240}
241
242impl<T: FromByteSlice> IntoIterator for ConvertingCallbackReceiver<T> {
243 type Item = T;
244 type IntoIter = IntoIter<T>;
245
246 fn into_iter(self) -> IntoIter<T> {
247 IntoIter { rx: self }
248 }
249}