tinkerforge_async/
converting_callback_receiver.rs

1//! A wrapper for [`Receiver`](std::sync::mpsc::Receiver), which converts received byte vectors to structured data.
2//! This variant of [`ConvertingReceiver`](crate::converting_receiver::ConvertingReceiver) is used for events.
3
4use std::sync::mpsc::{RecvTimeoutError, TryRecvError};
5use std::{marker::PhantomData, sync::mpsc::Receiver, time::Duration};
6
7use crate::byte_converter::FromByteSlice;
8
9/// Error type which is returned if a [`recv_forever`](crate::converting_callback_receiver::ConvertingCallbackReceiver::recv_forever) call fails.
10#[derive(Copy, Clone, Debug, PartialEq)]
11pub enum CallbackRecvError {
12    /// The queue was disconnected. This usually happens if the ip connection is destroyed.
13    QueueDisconnected,
14    /// The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?
15    MalformedPacket,
16}
17
18impl std::fmt::Display for CallbackRecvError {
19    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
20        let message = match self {
21            CallbackRecvError::QueueDisconnected => "The queue was disconnected. This usually happens if the ip connection is destroyed.",
22            CallbackRecvError::MalformedPacket => {
23                "The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?"
24            }
25        };
26        write!(f, "{}", message)
27    }
28}
29
30/// Error type which is returned if a [`recv_timeout`](crate::converting_callback_receiver::ConvertingCallbackReceiver::recv_timeout) call fails.
31#[derive(Copy, Clone, Debug, PartialEq)]
32pub enum CallbackRecvTimeoutError {
33    /// The queue was disconnected. This usually happens if the ip connection is destroyed.
34    QueueDisconnected,
35    /// The request could not be responded to before the timeout was reached.
36    QueueTimeout,
37    /// The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?
38    MalformedPacket,
39}
40
41impl std::fmt::Display for CallbackRecvTimeoutError {
42    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
43        let message = match &self {
44            CallbackRecvTimeoutError::QueueDisconnected => {
45                "The queue was disconnected. This usually happens if the ip connection is destroyed."
46            }
47            CallbackRecvTimeoutError::QueueTimeout => "The request could not be responded to before the timeout was reached.",
48            CallbackRecvTimeoutError::MalformedPacket => {
49                "The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?"
50            }
51        };
52        write!(f, "{}", message)
53    }
54}
55
56impl std::error::Error for CallbackRecvTimeoutError {}
57
58/// Error type which is returned if a [`try_recv`](crate::converting_callback_receiver::ConvertingCallbackReceiver::try_recv) call fails.
59#[derive(Copy, Clone, Debug, PartialEq)]
60pub enum CallbackTryRecvError {
61    /// The queue was disconnected. This usually happens if the ip connection is destroyed.
62    QueueDisconnected,
63    /// There are currently no responses available.
64    QueueEmpty,
65    /// The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?
66    MalformedPacket,
67}
68
69impl std::fmt::Display for CallbackTryRecvError {
70    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
71        write!(
72            f,
73            "{}",
74            match &self {
75                CallbackTryRecvError::QueueDisconnected =>
76                    "The queue was disconnected. This usually happens if the ip connection is destroyed.",
77                CallbackTryRecvError::QueueEmpty => "There are currently no responses available.",
78                CallbackTryRecvError::MalformedPacket =>
79                    "The received packet had an unexpected length. Maybe a function was called on a wrong brick or bricklet?",
80            }
81        )
82    }
83}
84
85impl std::error::Error for CallbackTryRecvError {}
86
87/// A wrapper for [`Receiver`], which converts received byte vectors to structured data. This variant of
88/// [`ConvertingReceiver`](crate::converting_receiver::ConvertingReceiver) is used for events.
89///
90/// This receiver wraps a [`Receiver`] receiving raw bytes. Calling [`recv_forever`], [`recv_timeout`] or [`try_recv`]
91/// will call equivalent methods on the wrapped [`Receiver`] and then convert the received bytes
92/// to a instance of `T`.
93///
94///
95/// # Type parameters
96///
97/// * `T` - Type which is created from received byte vectors. Must implement [`FromByteSlice`](crate::byte_converter::FromByteSlice)
98///
99/// # Errors
100///
101/// Returned errors are equivalent to those returned from methods of a [`Receiver`].
102/// If the received response can not be interpreted as the result type `T`, a `MalformedPacket`
103/// error is raised.
104///
105/// [`Receiver`]: std::sync::mpsc::Receiver
106/// [`recv_forever`]: #method.recv_forever
107/// [`recv_timeout`]: #method.recv_timeout
108/// [`try_recv`]: #method.try_recv
109pub struct ConvertingCallbackReceiver<T: FromByteSlice> {
110    receiver: Receiver<Vec<u8>>,
111    phantom: PhantomData<T>,
112}
113
114impl<T: FromByteSlice> ConvertingCallbackReceiver<T> {
115    /// Creates a new converting callback receiver which wraps the given [`Receiver`](std::sync::mpsc::Receiver).
116    pub fn new(receiver: Receiver<Vec<u8>>) -> ConvertingCallbackReceiver<T> {
117        ConvertingCallbackReceiver { receiver, phantom: PhantomData }
118    }
119
120    /// Attempts to return a pending value on this receiver without blocking. This method behaves like [`try_recv`](std::sync::mpsc::Receiver::try_recv).
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the queue was disconnected or currently empty, or if the received packet was malformed.
125    pub fn try_recv(&self) -> Result<T, CallbackTryRecvError> {
126        let recv_result = self.receiver.try_recv();
127        match recv_result {
128            Ok(bytes) => {
129                if T::bytes_expected() == bytes.len() {
130                    Ok(T::from_le_byte_slice(&bytes))
131                } else {
132                    Err(CallbackTryRecvError::MalformedPacket)
133                }
134            }
135            Err(TryRecvError::Disconnected) => Err(CallbackTryRecvError::QueueDisconnected),
136            Err(TryRecvError::Empty) => Err(CallbackTryRecvError::QueueEmpty),
137        }
138    }
139
140    /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up. This method behaves like [`recv`](std::sync::mpsc::Receiver::recv).
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the queue was disconnected or currently empty, or if the received packet was malformed.
145    pub fn recv_forever(&self) -> Result<T, CallbackRecvError> {
146        let recv_result = self.receiver.recv();
147        match recv_result {
148            Ok(bytes) => {
149                if T::bytes_expected() == bytes.len() {
150                    Ok(T::from_le_byte_slice(&bytes))
151                } else {
152                    Err(CallbackRecvError::MalformedPacket)
153                }
154            }
155            Err(_) => Err(CallbackRecvError::QueueDisconnected),
156        }
157    }
158
159    /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up, or if it waits more than timeout.
160    /// This method behaves like [`recv_timeout`](std::sync::mpsc::Receiver::recv_timeout).
161    ///
162    /// # Errors
163    ///
164    /// Returns an error on one of the following conditions:
165    /// * The queue was disconnected.
166    /// * The received packet was malformed.
167    /// * Blocked longer than the configured time out.
168    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, CallbackRecvTimeoutError> {
169        let recv_result = self.receiver.recv_timeout(timeout);
170        match recv_result {
171            Ok(bytes) => {
172                if T::bytes_expected() == bytes.len() {
173                    Ok(T::from_le_byte_slice(&bytes))
174                } else {
175                    Err(CallbackRecvTimeoutError::MalformedPacket)
176                }
177            }
178            Err(RecvTimeoutError::Disconnected) => Err(CallbackRecvTimeoutError::QueueDisconnected),
179            Err(RecvTimeoutError::Timeout) => Err(CallbackRecvTimeoutError::QueueTimeout),
180        }
181    }
182
183    /* uncomment if https://github.com/rust-lang/rust/issues/46316 has landed
184        pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
185           let bytes = self.receiver.recv_deadline(deadline)?;
186            Ok(T::from_le_byte_slice(bytes))
187        }
188    */
189    pub fn iter(&self) -> Iter<T> {
190        Iter { rx: self }
191    }
192
193    pub fn try_iter(&self) -> TryIter<T> {
194        TryIter { rx: self }
195    }
196}
197
198pub struct Iter<'a, T: 'a + FromByteSlice> {
199    rx: &'a ConvertingCallbackReceiver<T>,
200}
201
202pub struct TryIter<'a, T: 'a + FromByteSlice> {
203    rx: &'a ConvertingCallbackReceiver<T>,
204}
205
206pub struct IntoIter<T: FromByteSlice> {
207    rx: ConvertingCallbackReceiver<T>,
208}
209
210impl<'a, T: FromByteSlice> Iterator for Iter<'a, T> {
211    type Item = T;
212
213    fn next(&mut self) -> Option<T> {
214        self.rx.recv_forever().ok()
215    }
216}
217
218impl<'a, T: FromByteSlice> Iterator for TryIter<'a, T> {
219    type Item = T;
220
221    fn next(&mut self) -> Option<T> {
222        self.rx.try_recv().ok()
223    }
224}
225
226impl<'a, T: FromByteSlice> IntoIterator for &'a ConvertingCallbackReceiver<T> {
227    type Item = T;
228    type IntoIter = Iter<'a, T>;
229
230    fn into_iter(self) -> Iter<'a, T> {
231        self.iter()
232    }
233}
234
235impl<T: FromByteSlice> Iterator for IntoIter<T> {
236    type Item = T;
237    fn next(&mut self) -> Option<T> {
238        self.rx.recv_forever().ok()
239    }
240}
241
242impl<T: FromByteSlice> IntoIterator for ConvertingCallbackReceiver<T> {
243    type Item = T;
244    type IntoIter = IntoIter<T>;
245
246    fn into_iter(self) -> IntoIter<T> {
247        IntoIter { rx: self }
248    }
249}