tinkerforge_async/
ip_connection.rs

1//! The IP Connection manages the communication between the API bindings and the Brick Daemon or a WIFI/Ethernet Extension.
2use crate::base58::Uid;
3use std::str;
4
5use crate::byte_converter::{FromByteSlice, ToBytes};
6
7pub mod async_io {
8    use std::{
9        borrow::BorrowMut,
10        fmt::Debug,
11        ops::{Deref, DerefMut},
12        sync::{
13            atomic::{AtomicBool, Ordering},
14            Arc,
15        },
16        time::Duration,
17    };
18
19    use log::{debug, error, info, warn};
20    use tokio::{
21        io::{self, AsyncReadExt, AsyncWriteExt, WriteHalf},
22        net::{TcpStream, ToSocketAddrs},
23        sync::{
24            broadcast::{self, Receiver},
25            Mutex,
26        },
27        task::AbortHandle,
28    };
29    use tokio_stream::{
30        empty,
31        wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
32        Stream, StreamExt,
33    };
34
35    use crate::{
36        base58::Uid,
37        byte_converter::{FromByteSlice, ToBytes},
38        error::TinkerforgeError,
39        ip_connection::EnumerationType,
40        ip_connection::{EnumerateResponse, PacketHeader},
41    };
42
43    #[derive(Debug, Clone)]
44    pub struct AsyncIpConnection {
45        inner: Arc<Mutex<InnerAsyncIpConnection>>,
46    }
47
48    impl AsyncIpConnection {
49        pub async fn enumerate(&mut self) -> Result<Box<dyn Stream<Item = EnumerateResponse> + Unpin + Send>, TinkerforgeError> {
50            self.inner.borrow_mut().lock().await.enumerate().await
51        }
52        pub async fn disconnect_probe(&mut self) -> Result<(), TinkerforgeError> {
53            self.inner.borrow_mut().lock().await.disconnect_probe().await
54        }
55        pub async fn get_authentication_nonce(&mut self) -> Result<[u8; 4], TinkerforgeError> {
56            self.inner.borrow_mut().lock().await.get_authentication_nonce().await
57        }
58        pub(crate) async fn set(
59            &mut self,
60            uid: Uid,
61            function_id: u8,
62            payload: &[u8],
63            timeout: Option<Duration>,
64        ) -> Result<Option<PacketData>, TinkerforgeError> {
65            self.inner.borrow_mut().lock().await.set(uid, function_id, payload, timeout).await
66        }
67        pub(crate) async fn get(
68            &mut self,
69            uid: Uid,
70            function_id: u8,
71            payload: &[u8],
72            timeout: Duration,
73        ) -> Result<PacketData, TinkerforgeError> {
74            self.inner.borrow_mut().lock().await.get(uid, function_id, payload, timeout).await
75        }
76        pub(crate) async fn callback_stream(&mut self, uid: Uid, function_id: u8) -> impl Stream<Item = PacketData> {
77            self.inner.borrow_mut().lock().await.callback_stream(uid, function_id).await
78        }
79    }
80
81    impl AsyncIpConnection {
82        pub async fn new<T: ToSocketAddrs + Debug + Clone + Send + 'static>(addr: T) -> Result<Self, TinkerforgeError> {
83            Ok(Self { inner: Arc::new(Mutex::new(InnerAsyncIpConnection::new(addr).await?)) })
84        }
85    }
86
87    #[derive(Debug)]
88    struct InnerAsyncIpConnection {
89        write_stream: WriteHalf<TcpStream>,
90        receiver: Receiver<Option<PacketData>>,
91        seq_num: u8,
92        running: Arc<AtomicBool>,
93        abort_handle: AbortHandle,
94    }
95
96    impl InnerAsyncIpConnection {
97        pub async fn new<T: ToSocketAddrs + Clone + Debug + Send + 'static>(addr: T) -> Result<Self, TinkerforgeError> {
98            let socket = TcpStream::connect(addr.clone()).await?;
99            Self::enable_keepalive(&socket)?;
100
101            let (mut rd, write_stream) = io::split(socket);
102            let (enum_tx, receiver) = broadcast::channel(512);
103            let running = Arc::new(AtomicBool::new(true));
104            let running_clone = running.clone();
105            let abort_handle = tokio::spawn(async move {
106                loop {
107                    let mut header_buffer = Box::new([0; PacketHeader::SIZE]);
108                    match rd.read_exact(header_buffer.deref_mut()).await {
109                        Ok(8) => {
110                            let header = PacketHeader::from_le_byte_slice(header_buffer.deref());
111                            let body_size = header.length as usize - PacketHeader::SIZE;
112                            let mut body = vec![0; body_size].into_boxed_slice();
113                            match rd.read_exact(body.deref_mut()).await {
114                                Ok(l) if l == body_size => {}
115                                Ok(l) => {
116                                    panic!("Unexpected body size: {}", l)
117                                }
118                                Err(e) => panic!("Error from socket: {}", e),
119                            }
120                            let packet_data = PacketData { header, body };
121                            debug!("Received: {packet_data:?}");
122                            if let Err(error) = enum_tx.send(Some(packet_data)) {
123                                warn!("Cannot process packet from {addr:?}: {error}");
124                                break;
125                            }
126                        }
127                        Ok(n) => {
128                            error!("Unexpected read count from {addr:?}: {}", n);
129                            if let Err(error) = enum_tx.send(None) {
130                                warn!("Cannot close connection on read error: {error}");
131                            }
132                            break;
133                        }
134                        Err(e) => {
135                            error!("Error from socket {addr:?}: {e}");
136                            if let Err(error) = enum_tx.send(None) {
137                                warn!("Cannot close connection on communication error: {error}");
138                            }
139                            break;
140                        }
141                    };
142                }
143                running_clone.store(false, Ordering::Relaxed);
144                info!("Terminated receiver thread");
145            })
146            .abort_handle();
147            Ok(Self { write_stream, abort_handle, seq_num: 1, receiver, running })
148        }
149
150        fn enable_keepalive(socket: &TcpStream) -> Result<(), TinkerforgeError> {
151            let mut ka = socket2::TcpKeepalive::new();
152            ka = ka.with_time(Duration::from_secs(20));
153            ka = ka.with_interval(Duration::from_secs(20));
154            socket2::SockRef::from(&socket).set_tcp_keepalive(&ka)?;
155            Ok(())
156        }
157        pub async fn enumerate(&mut self) -> Result<Box<dyn Stream<Item = EnumerateResponse> + Unpin + Send>, TinkerforgeError> {
158            if !self.running.as_ref().load(Ordering::Relaxed) {
159                return Ok(Box::new(empty()));
160            }
161            let request = Request::Set { uid: Uid::zero(), function_id: 254, payload: &[] };
162            let stream = BroadcastStream::new(self.receiver.resubscribe()).map_while(Self::while_some).filter_map(|p| match p {
163                Ok(p) if p.header.function_id == 253 => Some(EnumerateResponse::from_le_byte_slice(&p.body)),
164                _ => None,
165            });
166            let seq = self.next_seq();
167            self.send_packet(&request, seq, true).await?;
168            Ok(Box::new(stream))
169        }
170        pub async fn disconnect_probe(&mut self) -> Result<(), TinkerforgeError> {
171            let request = Request::Set { uid: Uid::zero(), function_id: 128, payload: &[] };
172            let seq = self.next_seq();
173            self.send_packet(&request, seq, true).await?;
174            Ok(())
175        }
176        async fn get_authentication_nonce(&mut self) -> Result<[u8; 4], TinkerforgeError> {
177            let request = Request::Set { uid: Uid::zero(), function_id: 1, payload: &[] };
178            let seq = self.next_seq();
179            let stream = BroadcastStream::new(self.receiver.resubscribe()).map_while(Self::while_some).timeout(Duration::from_secs(5));
180            self.send_packet(&request, seq, true).await?;
181            tokio::pin!(stream);
182            let option = stream.next().await;
183            info!("Paket: {option:?}");
184            if let Some(Ok(Ok(next_paket))) = option {
185                let body = next_paket.body;
186                if body.len() == 4 {
187                    let mut ret = [0; 4];
188                    ret.copy_from_slice(&body);
189                    Ok(ret)
190                } else {
191                    todo!()
192                }
193            } else {
194                todo!()
195            }
196        }
197        pub async fn set(
198            &mut self,
199            uid: Uid,
200            function_id: u8,
201            payload: &[u8],
202            timeout: Option<Duration>,
203        ) -> Result<Option<PacketData>, TinkerforgeError> {
204            let request = Request::Set { uid, function_id, payload };
205            let seq = self.next_seq();
206            if let Some(timeout) = timeout {
207                let stream = BroadcastStream::new(self.receiver.resubscribe())
208                    .map_while(Self::while_some)
209                    .filter(Self::filter_response(uid, function_id, seq))
210                    .timeout(timeout);
211                self.send_packet(&request, seq, true).await?;
212                tokio::pin!(stream);
213                if let Some(done) = stream.next().await {
214                    Ok(Some(done.map_err(|_| TinkerforgeError::NoResponseReceived)??))
215                } else {
216                    Err(TinkerforgeError::NoResponseReceived)
217                }
218            } else {
219                self.send_packet(&request, seq, false).await?;
220                Ok(None)
221            }
222        }
223
224        fn filter_response(uid: Uid, function_id: u8, seq: u8) -> impl Fn(&Result<PacketData, BroadcastStreamRecvError>) -> bool {
225            move |result| {
226                if let Ok(PacketData { header, .. }) = result {
227                    header.uid == uid && header.function_id == function_id && header.sequence_number == seq
228                } else {
229                    false
230                }
231            }
232        }
233        pub async fn get(&mut self, uid: Uid, function_id: u8, payload: &[u8], timeout: Duration) -> Result<PacketData, TinkerforgeError> {
234            let request = Request::Get { uid, function_id, payload };
235            let seq = self.next_seq();
236            let stream = BroadcastStream::new(self.receiver.resubscribe())
237                .map_while(Self::while_some)
238                .filter(Self::filter_response(uid, function_id, seq))
239                .timeout(timeout);
240            tokio::pin!(stream);
241            self.send_packet(&request, seq, true).await?;
242            Ok(stream.next().await.ok_or(TinkerforgeError::NoResponseReceived)?.map_err(|_| TinkerforgeError::NoResponseReceived)??)
243        }
244
245        fn while_some(v: Result<Option<PacketData>, BroadcastStreamRecvError>) -> Option<Result<PacketData, BroadcastStreamRecvError>> {
246            match v {
247                Ok(None) => None,
248                Ok(Some(p)) => Some(Ok(p)),
249                Err(e) => Some(Err(e)),
250            }
251        }
252        pub(crate) async fn callback_stream(&mut self, uid: Uid, function_id: u8) -> impl Stream<Item = PacketData> {
253            BroadcastStream::new(self.receiver.resubscribe())
254                .map_while(move |result| match result {
255                    Ok(Some(p)) => {
256                        let header = &p.header;
257
258                        if header.uid == uid && header.function_id == function_id {
259                            Some(Some(p))
260                        } else if header.function_id == 253 {
261                            let enum_paket = EnumerateResponse::from_le_byte_slice(p.body());
262                            if enum_paket.enumeration_type == EnumerationType::Disconnected && Some(uid) == enum_paket.uid.parse().ok() {
263                                // device is disconnected -> end stream
264                                None
265                            } else {
266                                Some(None)
267                            }
268                        } else {
269                            Some(None)
270                        }
271                    }
272                    Ok(None) => None,
273                    Err(BroadcastStreamRecvError::Lagged(count)) => {
274                        warn!("Slow receiver, skipped {count} Packets");
275                        Some(None)
276                    }
277                })
278                .filter_map(|f| f)
279        }
280        async fn send_packet(&mut self, request: &Request<'_>, seq: u8, response_expected: bool) -> Result<(), TinkerforgeError> {
281            let header = request.get_header(response_expected, seq);
282            assert!(header.length <= 72);
283            let mut result = vec![0; header.length as usize];
284            header.uid.write_to_slice(&mut result[0..4]);
285            result[4] = header.length;
286            result[5] = header.function_id;
287            result[6] = header.sequence_number << 4 | (header.response_expected as u8) << 3;
288            result[7] = header.error_code << 6;
289            let payload = request.get_payload();
290            if !payload.is_empty() {
291                result[8..].copy_from_slice(payload);
292            }
293            self.write_stream.write_all(&result[..]).await?;
294            debug!("Sent: {request:?}");
295            Ok(())
296        }
297        fn next_seq(&mut self) -> u8 {
298            self.seq_num += 1;
299            if self.seq_num > 15 {
300                self.seq_num = 1;
301            }
302            self.seq_num
303        }
304    }
305
306    impl Drop for InnerAsyncIpConnection {
307        fn drop(&mut self) {
308            self.abort_handle.abort();
309        }
310    }
311
312    #[derive(Clone, Debug)]
313    pub(crate) struct PacketData {
314        header: PacketHeader,
315        body: Box<[u8]>,
316    }
317
318    impl PacketData {
319        #[allow(dead_code)]
320        pub fn header(&self) -> PacketHeader {
321            self.header
322        }
323        pub fn body(&self) -> &[u8] {
324            &self.body
325        }
326    }
327
328    #[derive(Debug, Clone)]
329    pub(crate) enum Request<'a> {
330        Set { uid: Uid, function_id: u8, payload: &'a [u8] },
331        Get { uid: Uid, function_id: u8, payload: &'a [u8] },
332    }
333
334    impl Request<'_> {
335        fn get_header(&self, response_expected: bool, sequence_number: u8) -> PacketHeader {
336            match self {
337                Request::Set { uid, function_id, payload } => {
338                    PacketHeader::with_payload(*uid, *function_id, sequence_number, response_expected, payload.len() as u8)
339                }
340                Request::Get { uid, function_id, payload, .. } => {
341                    PacketHeader::with_payload(*uid, *function_id, sequence_number, true, payload.len() as u8)
342                }
343            }
344        }
345        fn get_payload(&self) -> &[u8] {
346            match self {
347                Request::Set { payload, .. } => payload,
348                Request::Get { payload, .. } => payload,
349            }
350        }
351    }
352}
353
354#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
355pub(crate) struct PacketHeader {
356    uid: Uid,
357    length: u8,
358    function_id: u8,
359    sequence_number: u8,
360    response_expected: bool,
361    error_code: u8,
362}
363
364impl PacketHeader {
365    pub(crate) fn with_payload(uid: Uid, function_id: u8, sequence_number: u8, response_expected: bool, payload_len: u8) -> PacketHeader {
366        PacketHeader { uid, length: PacketHeader::SIZE as u8 + payload_len, function_id, sequence_number, response_expected, error_code: 0 }
367    }
368
369    pub(crate) const SIZE: usize = 8;
370}
371
372impl FromByteSlice for PacketHeader {
373    fn from_le_byte_slice(bytes: &[u8]) -> PacketHeader {
374        PacketHeader {
375            uid: Uid::from_le_byte_slice(bytes),
376            length: bytes[4],
377            function_id: bytes[5],
378            sequence_number: (bytes[6] & 0xf0) >> 4,
379            response_expected: (bytes[6] & 0x08) != 0,
380            error_code: (bytes[7] & 0xc0) >> 6,
381        }
382    }
383
384    fn bytes_expected() -> usize {
385        8
386    }
387}
388
389impl ToBytes for PacketHeader {
390    fn to_le_byte_vec(header: PacketHeader) -> Vec<u8> {
391        let mut target = vec![0u8; 8];
392        header.uid.write_to_slice(&mut target[0..4]);
393        target[4] = header.length;
394        target[5] = header.function_id;
395        target[6] = header.sequence_number << 4 | (header.response_expected as u8) << 3;
396        target[7] = header.error_code << 6;
397        target
398    }
399
400    fn write_to_slice(self, target: &mut [u8]) {
401        self.uid.write_to_slice(&mut target[0..4]);
402        target[4] = self.length;
403        target[5] = self.function_id;
404        target[6] = self.sequence_number << 4 | (self.response_expected as u8) << 3;
405        target[7] = self.error_code << 6;
406    }
407}
408
409//const MAX_PACKET_SIZE: usize = PacketHeader::SIZE + 64 + 8; //header + payload + optional data
410
411/// Type of enumeration of a device.
412#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
413pub enum EnumerationType {
414    /// Device is available (enumeration triggered by user: [`Enumerate`](crate::ip_connection::IpConnection::enumerate())).
415    /// This enumeration type can occur multiple times for the same device.
416    Available,
417    /// Device is newly connected (automatically send by Brick after establishing a communication connection).
418    /// This indicates that the device has potentially lost its previous configuration and needs to be reconfigured.
419    Connected,
420    /// Device is disconnected (only possible for USB connection). In this case only uid and enumerationType are valid.
421    Disconnected,
422    /// Device returned an unknown enumeration type.
423    Unknown,
424}
425
426impl From<u8> for EnumerationType {
427    fn from(byte: u8) -> EnumerationType {
428        match byte {
429            0 => EnumerationType::Available,
430            1 => EnumerationType::Connected,
431            2 => EnumerationType::Disconnected,
432            _ => EnumerationType::Unknown,
433        }
434    }
435}
436
437/// Devices send `EnumerateResponse`s when they are connected, disconnected or when an enumeration was
438/// triggered by the user using the [`Enumerate`](crate::ip_connection::IpConnection::enumerate) method.
439#[derive(Clone, Debug)]
440pub struct EnumerateResponse {
441    /// The UID of the device.
442    pub uid: String,
443    /// UID where the device is connected to.
444    /// For a Bricklet this is the UID of the Brick or Bricklet it is connected to.
445    /// For a Brick it is the UID of the bottommost Brick in the stack.
446    /// For the bottommost Brick in a stack it is "0".
447    /// With this information it is possible to reconstruct the complete network topology.
448    pub connected_uid: String,
449    /// For Bricks: '0' - '8' (position in stack). For Bricklets: 'a' - 'd' (position on Brick).
450    pub position: char,
451    /// Major, minor and release number for hardware version.
452    pub hardware_version: [u8; 3],
453    /// Major, minor and release number for firmware version.
454    pub firmware_version: [u8; 3],
455    /// A number that represents the device.
456    /// The device identifier numbers can be found [here](https://www.tinkerforge.com/en/doc/Software/Device_Identifier.html).
457    /// There are also constants for these numbers named following this pattern:
458    ///
459    /// <device-class>.DEVICE_IDENTIFIER
460    ///
461    /// For example: MasterBrick.DEVICE_IDENTIFIER or AmbientLightBricklet.DEVICE_IDENTIFIER.
462    pub device_identifier: u16,
463    /// Type of enumeration. See [`EnumerationType`](crate::ip_connection::EnumerationType)
464    pub enumeration_type: EnumerationType,
465}
466
467impl EnumerateResponse {
468    pub fn uid_as_number(&self) {}
469}
470
471impl FromByteSlice for EnumerateResponse {
472    fn from_le_byte_slice(bytes: &[u8]) -> EnumerateResponse {
473        EnumerateResponse {
474            uid: str::from_utf8(&bytes[0..8])
475                .expect("Could not convert to string. This is a bug in the rust bindings.")
476                .replace('\u{0}', ""),
477            connected_uid: str::from_utf8(&bytes[8..16])
478                .expect("Could not convert to string. This is a bug in the rust bindings.")
479                .replace('\u{0}', ""),
480            position: bytes[16] as char,
481            hardware_version: [bytes[17], bytes[18], bytes[19]],
482            firmware_version: [bytes[20], bytes[21], bytes[22]],
483            device_identifier: u16::from_le_byte_slice(&bytes[23..25]),
484            enumeration_type: EnumerationType::from(bytes[25]),
485        }
486    }
487
488    fn bytes_expected() -> usize {
489        26
490    }
491}
492
493struct ServerNonce([u8; 4]);
494
495impl FromByteSlice for ServerNonce {
496    fn from_le_byte_slice(bytes: &[u8]) -> ServerNonce {
497        ServerNonce([bytes[0], bytes[1], bytes[2], bytes[3]])
498    }
499
500    fn bytes_expected() -> usize {
501        4
502    }
503}
504
505/// This error is returned if the remote's server nonce could not be queried.
506#[derive(Debug, Copy, Clone)]
507pub enum AuthenticateError {
508    SecretInvalid,
509    CouldNotGetServerNonce,
510}
511
512impl std::fmt::Display for AuthenticateError {
513    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
514        write!(
515            f,
516            "{}",
517            match self {
518                AuthenticateError::SecretInvalid => {
519                    "Authentication secret contained non-ASCII characters"
520                }
521                AuthenticateError::CouldNotGetServerNonce => "Could not get server nonce",
522            }
523        )
524    }
525}
526
527impl std::error::Error for AuthenticateError {}