meshcore_rs/meshcore/
serial.rs1use crate::events::EventPayload;
2use crate::meshcore::frame_packet;
3use crate::packets::FRAME_START;
4use crate::{Error, EventType, MeshCore, MeshCoreEvent};
5use tokio::sync::mpsc;
6
7impl MeshCore {
8 pub async fn serial(port: &str, baud_rate: u32) -> crate::Result<MeshCore> {
10 use bytes::BytesMut;
11 use tokio::io::{AsyncReadExt, AsyncWriteExt};
12 use tokio_serial::SerialPortBuilderExt;
13
14 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
15 let meshcore = MeshCore::new_with_sender(tx);
16
17 let port = tokio_serial::new(port, baud_rate)
19 .open_native_async()
20 .map_err(|e| Error::connection(format!("Failed to open serial port: {}", e)))?;
21
22 let (mut reader, mut writer) = tokio::io::split(port);
23
24 let write_task = tokio::spawn(async move {
26 while let Some(data) = rx.recv().await {
27 let framed = frame_packet(&data);
28 if writer.write_all(&framed).await.is_err() {
29 break;
30 }
31 }
32 });
33
34 let msg_reader = meshcore.reader.clone();
36 let connected = meshcore.connected.clone();
37 let dispatcher = meshcore.dispatcher.clone();
38
39 let read_task = tokio::spawn(async move {
41 let mut buffer = BytesMut::with_capacity(4096);
42 let mut read_buf = [0u8; 1024];
43
44 loop {
45 match reader.read(&mut read_buf).await {
46 Ok(0) => {
47 *connected.write().await = false;
49 dispatcher
50 .emit(MeshCoreEvent::new(
51 EventType::Disconnected,
52 EventPayload::None,
53 ))
54 .await;
55 break;
56 }
57 Ok(n) => {
58 buffer.extend_from_slice(&read_buf[..n]);
59
60 while buffer.len() >= 3 {
62 if buffer[0] != FRAME_START {
63 use bytes::Buf;
64 buffer.advance(1);
65 continue;
66 }
67
68 let len = u16::from_le_bytes([buffer[1], buffer[2]]) as usize;
69 if buffer.len() < 3 + len {
70 break;
71 }
72
73 let frame = buffer[3..3 + len].to_vec();
74 use bytes::Buf;
75 buffer.advance(3 + len);
76
77 if let Err(e) = msg_reader.handle_rx(frame).await {
78 tracing::error!("Error handling message: {}", e);
79 }
80 }
81 }
82 Err(_) => {
83 *connected.write().await = false;
84 dispatcher
85 .emit(MeshCoreEvent::new(
86 EventType::Disconnected,
87 EventPayload::None,
88 ))
89 .await;
90 break;
91 }
92 }
93 }
94 });
95
96 meshcore.tasks.lock().await.push(write_task);
98 meshcore.tasks.lock().await.push(read_task);
99
100 *meshcore.connected.write().await = true;
102
103 meshcore.setup_event_handlers().await;
105
106 Ok(meshcore)
107 }
108}