tinkerforge_async/
converting_high_level_callback_receiver.rs

1//! A wrapper for [`Receiver`](std::sync::mpsc::Receiver), which converts received byte vectors to structured data.
2//! This variant of [`ConvertingReceiver`](crate::converting_receiver::ConvertingReceiver) is used for high level
3//! events, for use cases such as streaming.
4
5use std::{marker::PhantomData, time::Duration};
6
7use crate::{
8    byte_converter::FromByteSlice,
9    converting_callback_receiver::{CallbackRecvError, CallbackRecvTimeoutError, CallbackTryRecvError, ConvertingCallbackReceiver},
10    low_level_traits::LowLevelRead,
11};
12
13/// A wrapper for [`Receiver`], which converts received byte vectors to structured data. This variant of
14/// [`ConvertingReceiver`](crate::converting_receiver::ConvertingReceiver) is used for high level
15/// events, for use cases such as streaming.
16///
17/// This receiver wraps a [`Receiver`] receiving raw bytes. Calling [`recv_forever`], [`recv_timeout`] or [`try_recv`]
18/// will call equivalent methods on the wrapped [`Receiver`] and then convert the received bytes
19/// to a instance of `T`.
20///
21///
22/// # Type parameters
23///
24/// * `PayloadT` - Type of the payload which will be streamed. Must be trivially copy- and constructable.
25/// * `ResultT` - Type of additional return values which don't change between streaming events of the same stream.
26/// * `T` - Type which is created from received byte vectors. Must implement [`FromByteSlice`](crate::byte_converter::FromByteSlice).
27/// Also this type has to provide low level steaming information by implementing
28/// [`LowLevelRead`](crate::low_level_traits::LowLevelRead) for `PayloadT` and `ResultT`.
29///
30/// # Errors
31///
32/// Returned errors are equivalent to those returned from methods of a [`Receiver`].
33/// If the received response can not be interpreted as the result type `T`, a `MalformedPacket`
34/// error is raised.
35///
36/// [`Receiver`]: std::sync::mpsc::Receiver
37/// [`recv_forever`]: #method.recv_forever
38/// [`recv_timeout`]: #method.recv_timeout
39/// [`try_recv`]: #method.try_recv
40pub struct ConvertingHighLevelCallbackReceiver<
41    PayloadT: Default + Copy + Clone,
42    ResultT,
43    T: FromByteSlice + LowLevelRead<PayloadT, ResultT>,
44> {
45    receiver: ConvertingCallbackReceiver<T>,
46    buf: Vec<PayloadT>,
47    currently_receiving_stream: bool,
48    message_length: usize,
49    chunk_offset: usize,
50    phantom: PhantomData<ResultT>,
51}
52
53impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>>
54    ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T>
55{
56    /// Creates a new converting high level callback receiver which wraps the given [`ConvertingCallbackReceiver`](crate::converting_callback_receiver::ConvertingCallbackReceiver).
57    pub fn new(receiver: ConvertingCallbackReceiver<T>) -> ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T> {
58        ConvertingHighLevelCallbackReceiver {
59            receiver,
60            phantom: PhantomData,
61            buf: Vec::with_capacity(0),
62            currently_receiving_stream: false,
63            message_length: 0,
64            chunk_offset: 0,
65        }
66    }
67
68    fn recv_stream_chunk(&mut self, chunk: &T) -> Option<(Vec<PayloadT>, ResultT)> {
69        if !self.currently_receiving_stream && chunk.ll_message_chunk_offset() != 0 {
70            //currently not receiving and chunk is not start of stream => out of sync
71            return None;
72        }
73
74        if self.currently_receiving_stream
75            && (chunk.ll_message_chunk_offset() != self.chunk_offset || chunk.ll_message_length() != self.message_length)
76        {
77            //currently receiving, but chunk is not next expected or has wrong length (skipped whole stream) => out of sync
78            return None;
79        }
80
81        if !self.currently_receiving_stream {
82            //initialize
83            self.currently_receiving_stream = true;
84            self.message_length = chunk.ll_message_length();
85            self.chunk_offset = 0;
86            self.buf = vec![PayloadT::default(); self.message_length];
87        }
88
89        let read_length = std::cmp::min(chunk.ll_message_chunk_data().len(), self.message_length - self.chunk_offset);
90        self.buf[self.chunk_offset..self.chunk_offset + read_length].copy_from_slice(&chunk.ll_message_chunk_data()[0..read_length]);
91        self.chunk_offset += read_length;
92
93        if self.chunk_offset >= self.message_length {
94            //stream complete
95            self.currently_receiving_stream = false;
96            return Some((self.buf.clone(), chunk.get_result()));
97        }
98        None
99    }
100
101    /// Attempts to return a pending value on this receiver without blocking. This method behaves like [`try_recv`](std::sync::mpsc::Receiver::try_recv).
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the queue was disconnected or currently empty, or if the received packet was malformed.
106    pub fn try_recv(&mut self) -> Result<(Vec<PayloadT>, ResultT), CallbackTryRecvError> {
107        loop {
108            let ll_result = self.receiver.try_recv()?;
109            let result_opt = self.recv_stream_chunk(&ll_result);
110            if let Some(tup) = result_opt {
111                return Ok(tup);
112            }
113        }
114    }
115
116    /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up. This method behaves like [`recv`](std::sync::mpsc::Receiver::recv).
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the queue was disconnected, or if the received packet was malformed.
121    pub fn recv_forever(&mut self) -> Result<(Vec<PayloadT>, ResultT), CallbackRecvError> {
122        loop {
123            let ll_result = self.receiver.recv_forever()?;
124            let result_opt = self.recv_stream_chunk(&ll_result);
125            if let Some(tup) = result_opt {
126                return Ok(tup);
127            }
128        }
129    }
130
131    /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up, or if it waits more than timeout.
132    /// This method behaves like [`recv_timeout`](std::sync::mpsc::Receiver::recv_timeout).
133    ///
134    /// # Errors
135    ///
136    /// Returns an error on one of the following conditions:
137    /// * The queue was disconnected.
138    /// * The received packet was malformed.
139    /// * Blocked longer than the configured time out.
140    pub fn recv_timeout(&mut self, timeout: Duration) -> Result<(Vec<PayloadT>, ResultT), CallbackRecvTimeoutError> {
141        loop {
142            let ll_result = self.receiver.recv_timeout(timeout)?;
143            let result_opt = self.recv_stream_chunk(&ll_result);
144            if let Some(tup) = result_opt {
145                return Ok(tup);
146            }
147        }
148    }
149
150    /* uncomment if https://github.com/rust-lang/rust/issues/46316 has landed
151        pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
152           let bytes = self.receiver.recv_deadline(deadline)?;
153            Ok(T::from_le_byte_slice(bytes))
154        }
155    */
156}
157
158impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>> Iterator
159    for ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T>
160{
161    type Item = Option<(Vec<PayloadT>, ResultT)>;
162    fn next(&mut self) -> Option<Option<(Vec<PayloadT>, ResultT)>> {
163        match self.recv_forever() {
164            Ok(result) => Some(Some(result)),
165            Err(CallbackRecvError::MalformedPacket) => Some(None),
166            Err(_e) => None,
167        }
168    }
169}
170/*
171impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>> IntoIterator for ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T> {
172    fn into_iter(self) -> Iter<T> { self.iter() }
173}
174
175impl<T: FromByteSlice> IntoIterator for ConvertingHighLevelCallbackReceiver<T> {
176    type Item = T;
177    type IntoIter = IntoIter<T>;
178
179    fn into_iter(self) -> IntoIter<T> { IntoIter { rx: self } }
180}
181*/