1use crate::base58::Uid;
3use std::str;
4
5use crate::byte_converter::{FromByteSlice, ToBytes};
6
7pub mod async_io {
8 use std::{
9 borrow::BorrowMut,
10 fmt::Debug,
11 ops::{Deref, DerefMut},
12 sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc,
15 },
16 time::Duration,
17 };
18
19 use log::{debug, error, info, warn};
20 use tokio::{
21 io::{self, AsyncReadExt, AsyncWriteExt, WriteHalf},
22 net::{TcpStream, ToSocketAddrs},
23 sync::{
24 broadcast::{self, Receiver},
25 Mutex,
26 },
27 task::AbortHandle,
28 };
29 use tokio_stream::{
30 empty,
31 wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
32 Stream, StreamExt,
33 };
34
35 use crate::{
36 base58::Uid,
37 byte_converter::{FromByteSlice, ToBytes},
38 error::TinkerforgeError,
39 ip_connection::EnumerationType,
40 ip_connection::{EnumerateResponse, PacketHeader},
41 };
42
43 #[derive(Debug, Clone)]
44 pub struct AsyncIpConnection {
45 inner: Arc<Mutex<InnerAsyncIpConnection>>,
46 }
47
48 impl AsyncIpConnection {
49 pub async fn enumerate(&mut self) -> Result<Box<dyn Stream<Item = EnumerateResponse> + Unpin + Send>, TinkerforgeError> {
50 self.inner.borrow_mut().lock().await.enumerate().await
51 }
52 pub async fn disconnect_probe(&mut self) -> Result<(), TinkerforgeError> {
53 self.inner.borrow_mut().lock().await.disconnect_probe().await
54 }
55 pub async fn get_authentication_nonce(&mut self) -> Result<[u8; 4], TinkerforgeError> {
56 self.inner.borrow_mut().lock().await.get_authentication_nonce().await
57 }
58 pub(crate) async fn set(
59 &mut self,
60 uid: Uid,
61 function_id: u8,
62 payload: &[u8],
63 timeout: Option<Duration>,
64 ) -> Result<Option<PacketData>, TinkerforgeError> {
65 self.inner.borrow_mut().lock().await.set(uid, function_id, payload, timeout).await
66 }
67 pub(crate) async fn get(
68 &mut self,
69 uid: Uid,
70 function_id: u8,
71 payload: &[u8],
72 timeout: Duration,
73 ) -> Result<PacketData, TinkerforgeError> {
74 self.inner.borrow_mut().lock().await.get(uid, function_id, payload, timeout).await
75 }
76 pub(crate) async fn callback_stream(&mut self, uid: Uid, function_id: u8) -> impl Stream<Item = PacketData> {
77 self.inner.borrow_mut().lock().await.callback_stream(uid, function_id).await
78 }
79 }
80
81 impl AsyncIpConnection {
82 pub async fn new<T: ToSocketAddrs + Debug + Clone + Send + 'static>(addr: T) -> Result<Self, TinkerforgeError> {
83 Ok(Self { inner: Arc::new(Mutex::new(InnerAsyncIpConnection::new(addr).await?)) })
84 }
85 }
86
87 #[derive(Debug)]
88 struct InnerAsyncIpConnection {
89 write_stream: WriteHalf<TcpStream>,
90 receiver: Receiver<Option<PacketData>>,
91 seq_num: u8,
92 running: Arc<AtomicBool>,
93 abort_handle: AbortHandle,
94 }
95
96 impl InnerAsyncIpConnection {
97 pub async fn new<T: ToSocketAddrs + Clone + Debug + Send + 'static>(addr: T) -> Result<Self, TinkerforgeError> {
98 let socket = TcpStream::connect(addr.clone()).await?;
99 Self::enable_keepalive(&socket)?;
100
101 let (mut rd, write_stream) = io::split(socket);
102 let (enum_tx, receiver) = broadcast::channel(512);
103 let running = Arc::new(AtomicBool::new(true));
104 let running_clone = running.clone();
105 let abort_handle = tokio::spawn(async move {
106 loop {
107 let mut header_buffer = Box::new([0; PacketHeader::SIZE]);
108 match rd.read_exact(header_buffer.deref_mut()).await {
109 Ok(8) => {
110 let header = PacketHeader::from_le_byte_slice(header_buffer.deref());
111 let body_size = header.length as usize - PacketHeader::SIZE;
112 let mut body = vec![0; body_size].into_boxed_slice();
113 match rd.read_exact(body.deref_mut()).await {
114 Ok(l) if l == body_size => {}
115 Ok(l) => {
116 panic!("Unexpected body size: {}", l)
117 }
118 Err(e) => panic!("Error from socket: {}", e),
119 }
120 let packet_data = PacketData { header, body };
121 debug!("Received: {packet_data:?}");
122 if let Err(error) = enum_tx.send(Some(packet_data)) {
123 warn!("Cannot process packet from {addr:?}: {error}");
124 break;
125 }
126 }
127 Ok(n) => {
128 error!("Unexpected read count from {addr:?}: {}", n);
129 if let Err(error) = enum_tx.send(None) {
130 warn!("Cannot close connection on read error: {error}");
131 }
132 break;
133 }
134 Err(e) => {
135 error!("Error from socket {addr:?}: {e}");
136 if let Err(error) = enum_tx.send(None) {
137 warn!("Cannot close connection on communication error: {error}");
138 }
139 break;
140 }
141 };
142 }
143 running_clone.store(false, Ordering::Relaxed);
144 info!("Terminated receiver thread");
145 })
146 .abort_handle();
147 Ok(Self { write_stream, abort_handle, seq_num: 1, receiver, running })
148 }
149
150 fn enable_keepalive(socket: &TcpStream) -> Result<(), TinkerforgeError> {
151 let mut ka = socket2::TcpKeepalive::new();
152 ka = ka.with_time(Duration::from_secs(20));
153 ka = ka.with_interval(Duration::from_secs(20));
154 socket2::SockRef::from(&socket).set_tcp_keepalive(&ka)?;
155 Ok(())
156 }
157 pub async fn enumerate(&mut self) -> Result<Box<dyn Stream<Item = EnumerateResponse> + Unpin + Send>, TinkerforgeError> {
158 if !self.running.as_ref().load(Ordering::Relaxed) {
159 return Ok(Box::new(empty()));
160 }
161 let request = Request::Set { uid: Uid::zero(), function_id: 254, payload: &[] };
162 let stream = BroadcastStream::new(self.receiver.resubscribe()).map_while(Self::while_some).filter_map(|p| match p {
163 Ok(p) if p.header.function_id == 253 => Some(EnumerateResponse::from_le_byte_slice(&p.body)),
164 _ => None,
165 });
166 let seq = self.next_seq();
167 self.send_packet(&request, seq, true).await?;
168 Ok(Box::new(stream))
169 }
170 pub async fn disconnect_probe(&mut self) -> Result<(), TinkerforgeError> {
171 let request = Request::Set { uid: Uid::zero(), function_id: 128, payload: &[] };
172 let seq = self.next_seq();
173 self.send_packet(&request, seq, true).await?;
174 Ok(())
175 }
176 async fn get_authentication_nonce(&mut self) -> Result<[u8; 4], TinkerforgeError> {
177 let request = Request::Set { uid: Uid::zero(), function_id: 1, payload: &[] };
178 let seq = self.next_seq();
179 let stream = BroadcastStream::new(self.receiver.resubscribe()).map_while(Self::while_some).timeout(Duration::from_secs(5));
180 self.send_packet(&request, seq, true).await?;
181 tokio::pin!(stream);
182 let option = stream.next().await;
183 info!("Paket: {option:?}");
184 if let Some(Ok(Ok(next_paket))) = option {
185 let body = next_paket.body;
186 if body.len() == 4 {
187 let mut ret = [0; 4];
188 ret.copy_from_slice(&body);
189 Ok(ret)
190 } else {
191 todo!()
192 }
193 } else {
194 todo!()
195 }
196 }
197 pub async fn set(
198 &mut self,
199 uid: Uid,
200 function_id: u8,
201 payload: &[u8],
202 timeout: Option<Duration>,
203 ) -> Result<Option<PacketData>, TinkerforgeError> {
204 let request = Request::Set { uid, function_id, payload };
205 let seq = self.next_seq();
206 if let Some(timeout) = timeout {
207 let stream = BroadcastStream::new(self.receiver.resubscribe())
208 .map_while(Self::while_some)
209 .filter(Self::filter_response(uid, function_id, seq))
210 .timeout(timeout);
211 self.send_packet(&request, seq, true).await?;
212 tokio::pin!(stream);
213 if let Some(done) = stream.next().await {
214 Ok(Some(done.map_err(|_| TinkerforgeError::NoResponseReceived)??))
215 } else {
216 Err(TinkerforgeError::NoResponseReceived)
217 }
218 } else {
219 self.send_packet(&request, seq, false).await?;
220 Ok(None)
221 }
222 }
223
224 fn filter_response(uid: Uid, function_id: u8, seq: u8) -> impl Fn(&Result<PacketData, BroadcastStreamRecvError>) -> bool {
225 move |result| {
226 if let Ok(PacketData { header, .. }) = result {
227 header.uid == uid && header.function_id == function_id && header.sequence_number == seq
228 } else {
229 false
230 }
231 }
232 }
233 pub async fn get(&mut self, uid: Uid, function_id: u8, payload: &[u8], timeout: Duration) -> Result<PacketData, TinkerforgeError> {
234 let request = Request::Get { uid, function_id, payload };
235 let seq = self.next_seq();
236 let stream = BroadcastStream::new(self.receiver.resubscribe())
237 .map_while(Self::while_some)
238 .filter(Self::filter_response(uid, function_id, seq))
239 .timeout(timeout);
240 tokio::pin!(stream);
241 self.send_packet(&request, seq, true).await?;
242 Ok(stream.next().await.ok_or(TinkerforgeError::NoResponseReceived)?.map_err(|_| TinkerforgeError::NoResponseReceived)??)
243 }
244
245 fn while_some(v: Result<Option<PacketData>, BroadcastStreamRecvError>) -> Option<Result<PacketData, BroadcastStreamRecvError>> {
246 match v {
247 Ok(None) => None,
248 Ok(Some(p)) => Some(Ok(p)),
249 Err(e) => Some(Err(e)),
250 }
251 }
252 pub(crate) async fn callback_stream(&mut self, uid: Uid, function_id: u8) -> impl Stream<Item = PacketData> {
253 BroadcastStream::new(self.receiver.resubscribe())
254 .map_while(move |result| match result {
255 Ok(Some(p)) => {
256 let header = &p.header;
257
258 if header.uid == uid && header.function_id == function_id {
259 Some(Some(p))
260 } else if header.function_id == 253 {
261 let enum_paket = EnumerateResponse::from_le_byte_slice(p.body());
262 if enum_paket.enumeration_type == EnumerationType::Disconnected && Some(uid) == enum_paket.uid.parse().ok() {
263 None
265 } else {
266 Some(None)
267 }
268 } else {
269 Some(None)
270 }
271 }
272 Ok(None) => None,
273 Err(BroadcastStreamRecvError::Lagged(count)) => {
274 warn!("Slow receiver, skipped {count} Packets");
275 Some(None)
276 }
277 })
278 .filter_map(|f| f)
279 }
280 async fn send_packet(&mut self, request: &Request<'_>, seq: u8, response_expected: bool) -> Result<(), TinkerforgeError> {
281 let header = request.get_header(response_expected, seq);
282 assert!(header.length <= 72);
283 let mut result = vec![0; header.length as usize];
284 header.uid.write_to_slice(&mut result[0..4]);
285 result[4] = header.length;
286 result[5] = header.function_id;
287 result[6] = header.sequence_number << 4 | (header.response_expected as u8) << 3;
288 result[7] = header.error_code << 6;
289 let payload = request.get_payload();
290 if !payload.is_empty() {
291 result[8..].copy_from_slice(payload);
292 }
293 self.write_stream.write_all(&result[..]).await?;
294 debug!("Sent: {request:?}");
295 Ok(())
296 }
297 fn next_seq(&mut self) -> u8 {
298 self.seq_num += 1;
299 if self.seq_num > 15 {
300 self.seq_num = 1;
301 }
302 self.seq_num
303 }
304 }
305
306 impl Drop for InnerAsyncIpConnection {
307 fn drop(&mut self) {
308 self.abort_handle.abort();
309 }
310 }
311
312 #[derive(Clone, Debug)]
313 pub(crate) struct PacketData {
314 header: PacketHeader,
315 body: Box<[u8]>,
316 }
317
318 impl PacketData {
319 #[allow(dead_code)]
320 pub fn header(&self) -> PacketHeader {
321 self.header
322 }
323 pub fn body(&self) -> &[u8] {
324 &self.body
325 }
326 }
327
328 #[derive(Debug, Clone)]
329 pub(crate) enum Request<'a> {
330 Set { uid: Uid, function_id: u8, payload: &'a [u8] },
331 Get { uid: Uid, function_id: u8, payload: &'a [u8] },
332 }
333
334 impl Request<'_> {
335 fn get_header(&self, response_expected: bool, sequence_number: u8) -> PacketHeader {
336 match self {
337 Request::Set { uid, function_id, payload } => {
338 PacketHeader::with_payload(*uid, *function_id, sequence_number, response_expected, payload.len() as u8)
339 }
340 Request::Get { uid, function_id, payload, .. } => {
341 PacketHeader::with_payload(*uid, *function_id, sequence_number, true, payload.len() as u8)
342 }
343 }
344 }
345 fn get_payload(&self) -> &[u8] {
346 match self {
347 Request::Set { payload, .. } => payload,
348 Request::Get { payload, .. } => payload,
349 }
350 }
351 }
352}
353
354#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
355pub(crate) struct PacketHeader {
356 uid: Uid,
357 length: u8,
358 function_id: u8,
359 sequence_number: u8,
360 response_expected: bool,
361 error_code: u8,
362}
363
364impl PacketHeader {
365 pub(crate) fn with_payload(uid: Uid, function_id: u8, sequence_number: u8, response_expected: bool, payload_len: u8) -> PacketHeader {
366 PacketHeader { uid, length: PacketHeader::SIZE as u8 + payload_len, function_id, sequence_number, response_expected, error_code: 0 }
367 }
368
369 pub(crate) const SIZE: usize = 8;
370}
371
372impl FromByteSlice for PacketHeader {
373 fn from_le_byte_slice(bytes: &[u8]) -> PacketHeader {
374 PacketHeader {
375 uid: Uid::from_le_byte_slice(bytes),
376 length: bytes[4],
377 function_id: bytes[5],
378 sequence_number: (bytes[6] & 0xf0) >> 4,
379 response_expected: (bytes[6] & 0x08) != 0,
380 error_code: (bytes[7] & 0xc0) >> 6,
381 }
382 }
383
384 fn bytes_expected() -> usize {
385 8
386 }
387}
388
389impl ToBytes for PacketHeader {
390 fn to_le_byte_vec(header: PacketHeader) -> Vec<u8> {
391 let mut target = vec![0u8; 8];
392 header.uid.write_to_slice(&mut target[0..4]);
393 target[4] = header.length;
394 target[5] = header.function_id;
395 target[6] = header.sequence_number << 4 | (header.response_expected as u8) << 3;
396 target[7] = header.error_code << 6;
397 target
398 }
399
400 fn write_to_slice(self, target: &mut [u8]) {
401 self.uid.write_to_slice(&mut target[0..4]);
402 target[4] = self.length;
403 target[5] = self.function_id;
404 target[6] = self.sequence_number << 4 | (self.response_expected as u8) << 3;
405 target[7] = self.error_code << 6;
406 }
407}
408
409#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
413pub enum EnumerationType {
414 Available,
417 Connected,
420 Disconnected,
422 Unknown,
424}
425
426impl From<u8> for EnumerationType {
427 fn from(byte: u8) -> EnumerationType {
428 match byte {
429 0 => EnumerationType::Available,
430 1 => EnumerationType::Connected,
431 2 => EnumerationType::Disconnected,
432 _ => EnumerationType::Unknown,
433 }
434 }
435}
436
437#[derive(Clone, Debug)]
440pub struct EnumerateResponse {
441 pub uid: String,
443 pub connected_uid: String,
449 pub position: char,
451 pub hardware_version: [u8; 3],
453 pub firmware_version: [u8; 3],
455 pub device_identifier: u16,
463 pub enumeration_type: EnumerationType,
465}
466
467impl EnumerateResponse {
468 pub fn uid_as_number(&self) {}
469}
470
471impl FromByteSlice for EnumerateResponse {
472 fn from_le_byte_slice(bytes: &[u8]) -> EnumerateResponse {
473 EnumerateResponse {
474 uid: str::from_utf8(&bytes[0..8])
475 .expect("Could not convert to string. This is a bug in the rust bindings.")
476 .replace('\u{0}', ""),
477 connected_uid: str::from_utf8(&bytes[8..16])
478 .expect("Could not convert to string. This is a bug in the rust bindings.")
479 .replace('\u{0}', ""),
480 position: bytes[16] as char,
481 hardware_version: [bytes[17], bytes[18], bytes[19]],
482 firmware_version: [bytes[20], bytes[21], bytes[22]],
483 device_identifier: u16::from_le_byte_slice(&bytes[23..25]),
484 enumeration_type: EnumerationType::from(bytes[25]),
485 }
486 }
487
488 fn bytes_expected() -> usize {
489 26
490 }
491}
492
493struct ServerNonce([u8; 4]);
494
495impl FromByteSlice for ServerNonce {
496 fn from_le_byte_slice(bytes: &[u8]) -> ServerNonce {
497 ServerNonce([bytes[0], bytes[1], bytes[2], bytes[3]])
498 }
499
500 fn bytes_expected() -> usize {
501 4
502 }
503}
504
505#[derive(Debug, Copy, Clone)]
507pub enum AuthenticateError {
508 SecretInvalid,
509 CouldNotGetServerNonce,
510}
511
512impl std::fmt::Display for AuthenticateError {
513 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
514 write!(
515 f,
516 "{}",
517 match self {
518 AuthenticateError::SecretInvalid => {
519 "Authentication secret contained non-ASCII characters"
520 }
521 AuthenticateError::CouldNotGetServerNonce => "Could not get server nonce",
522 }
523 )
524 }
525}
526
527impl std::error::Error for AuthenticateError {}