tinkerforge_async/converting_high_level_callback_receiver.rs
1//! A wrapper for [`Receiver`](std::sync::mpsc::Receiver), which converts received byte vectors to structured data.
2//! This variant of [`ConvertingReceiver`](crate::converting_receiver::ConvertingReceiver) is used for high level
3//! events, for use cases such as streaming.
4
5use std::{marker::PhantomData, time::Duration};
6
7use crate::{
8 byte_converter::FromByteSlice,
9 converting_callback_receiver::{CallbackRecvError, CallbackRecvTimeoutError, CallbackTryRecvError, ConvertingCallbackReceiver},
10 low_level_traits::LowLevelRead,
11};
12
13/// A wrapper for [`Receiver`], which converts received byte vectors to structured data. This variant of
14/// [`ConvertingReceiver`](crate::converting_receiver::ConvertingReceiver) is used for high level
15/// events, for use cases such as streaming.
16///
17/// This receiver wraps a [`Receiver`] receiving raw bytes. Calling [`recv_forever`], [`recv_timeout`] or [`try_recv`]
18/// will call equivalent methods on the wrapped [`Receiver`] and then convert the received bytes
19/// to a instance of `T`.
20///
21///
22/// # Type parameters
23///
24/// * `PayloadT` - Type of the payload which will be streamed. Must be trivially copy- and constructable.
25/// * `ResultT` - Type of additional return values which don't change between streaming events of the same stream.
26/// * `T` - Type which is created from received byte vectors. Must implement [`FromByteSlice`](crate::byte_converter::FromByteSlice).
27/// Also this type has to provide low level steaming information by implementing
28/// [`LowLevelRead`](crate::low_level_traits::LowLevelRead) for `PayloadT` and `ResultT`.
29///
30/// # Errors
31///
32/// Returned errors are equivalent to those returned from methods of a [`Receiver`].
33/// If the received response can not be interpreted as the result type `T`, a `MalformedPacket`
34/// error is raised.
35///
36/// [`Receiver`]: std::sync::mpsc::Receiver
37/// [`recv_forever`]: #method.recv_forever
38/// [`recv_timeout`]: #method.recv_timeout
39/// [`try_recv`]: #method.try_recv
40pub struct ConvertingHighLevelCallbackReceiver<
41 PayloadT: Default + Copy + Clone,
42 ResultT,
43 T: FromByteSlice + LowLevelRead<PayloadT, ResultT>,
44> {
45 receiver: ConvertingCallbackReceiver<T>,
46 buf: Vec<PayloadT>,
47 currently_receiving_stream: bool,
48 message_length: usize,
49 chunk_offset: usize,
50 phantom: PhantomData<ResultT>,
51}
52
53impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>>
54 ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T>
55{
56 /// Creates a new converting high level callback receiver which wraps the given [`ConvertingCallbackReceiver`](crate::converting_callback_receiver::ConvertingCallbackReceiver).
57 pub fn new(receiver: ConvertingCallbackReceiver<T>) -> ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T> {
58 ConvertingHighLevelCallbackReceiver {
59 receiver,
60 phantom: PhantomData,
61 buf: Vec::with_capacity(0),
62 currently_receiving_stream: false,
63 message_length: 0,
64 chunk_offset: 0,
65 }
66 }
67
68 fn recv_stream_chunk(&mut self, chunk: &T) -> Option<(Vec<PayloadT>, ResultT)> {
69 if !self.currently_receiving_stream && chunk.ll_message_chunk_offset() != 0 {
70 //currently not receiving and chunk is not start of stream => out of sync
71 return None;
72 }
73
74 if self.currently_receiving_stream
75 && (chunk.ll_message_chunk_offset() != self.chunk_offset || chunk.ll_message_length() != self.message_length)
76 {
77 //currently receiving, but chunk is not next expected or has wrong length (skipped whole stream) => out of sync
78 return None;
79 }
80
81 if !self.currently_receiving_stream {
82 //initialize
83 self.currently_receiving_stream = true;
84 self.message_length = chunk.ll_message_length();
85 self.chunk_offset = 0;
86 self.buf = vec![PayloadT::default(); self.message_length];
87 }
88
89 let read_length = std::cmp::min(chunk.ll_message_chunk_data().len(), self.message_length - self.chunk_offset);
90 self.buf[self.chunk_offset..self.chunk_offset + read_length].copy_from_slice(&chunk.ll_message_chunk_data()[0..read_length]);
91 self.chunk_offset += read_length;
92
93 if self.chunk_offset >= self.message_length {
94 //stream complete
95 self.currently_receiving_stream = false;
96 return Some((self.buf.clone(), chunk.get_result()));
97 }
98 None
99 }
100
101 /// Attempts to return a pending value on this receiver without blocking. This method behaves like [`try_recv`](std::sync::mpsc::Receiver::try_recv).
102 ///
103 /// # Errors
104 ///
105 /// Returns an error if the queue was disconnected or currently empty, or if the received packet was malformed.
106 pub fn try_recv(&mut self) -> Result<(Vec<PayloadT>, ResultT), CallbackTryRecvError> {
107 loop {
108 let ll_result = self.receiver.try_recv()?;
109 let result_opt = self.recv_stream_chunk(&ll_result);
110 if let Some(tup) = result_opt {
111 return Ok(tup);
112 }
113 }
114 }
115
116 /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up. This method behaves like [`recv`](std::sync::mpsc::Receiver::recv).
117 ///
118 /// # Errors
119 ///
120 /// Returns an error if the queue was disconnected, or if the received packet was malformed.
121 pub fn recv_forever(&mut self) -> Result<(Vec<PayloadT>, ResultT), CallbackRecvError> {
122 loop {
123 let ll_result = self.receiver.recv_forever()?;
124 let result_opt = self.recv_stream_chunk(&ll_result);
125 if let Some(tup) = result_opt {
126 return Ok(tup);
127 }
128 }
129 }
130
131 /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up, or if it waits more than timeout.
132 /// This method behaves like [`recv_timeout`](std::sync::mpsc::Receiver::recv_timeout).
133 ///
134 /// # Errors
135 ///
136 /// Returns an error on one of the following conditions:
137 /// * The queue was disconnected.
138 /// * The received packet was malformed.
139 /// * Blocked longer than the configured time out.
140 pub fn recv_timeout(&mut self, timeout: Duration) -> Result<(Vec<PayloadT>, ResultT), CallbackRecvTimeoutError> {
141 loop {
142 let ll_result = self.receiver.recv_timeout(timeout)?;
143 let result_opt = self.recv_stream_chunk(&ll_result);
144 if let Some(tup) = result_opt {
145 return Ok(tup);
146 }
147 }
148 }
149
150 /* uncomment if https://github.com/rust-lang/rust/issues/46316 has landed
151 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
152 let bytes = self.receiver.recv_deadline(deadline)?;
153 Ok(T::from_le_byte_slice(bytes))
154 }
155 */
156}
157
158impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>> Iterator
159 for ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T>
160{
161 type Item = Option<(Vec<PayloadT>, ResultT)>;
162 fn next(&mut self) -> Option<Option<(Vec<PayloadT>, ResultT)>> {
163 match self.recv_forever() {
164 Ok(result) => Some(Some(result)),
165 Err(CallbackRecvError::MalformedPacket) => Some(None),
166 Err(_e) => None,
167 }
168 }
169}
170/*
171impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>> IntoIterator for ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T> {
172 fn into_iter(self) -> Iter<T> { self.iter() }
173}
174
175impl<T: FromByteSlice> IntoIterator for ConvertingHighLevelCallbackReceiver<T> {
176 type Item = T;
177 type IntoIter = IntoIter<T>;
178
179 fn into_iter(self) -> IntoIter<T> { IntoIter { rx: self } }
180}
181*/