meshcore_rs/meshcore/
tcp.rs1use crate::events::EventPayload;
2use crate::packets::FRAME_START;
3use crate::{Error, EventType, MeshCore, MeshCoreEvent};
4use tokio::sync::mpsc;
5
6impl MeshCore {
7 pub async fn tcp(host: &str, port: u16) -> crate::Result<MeshCore> {
9 use bytes::BytesMut;
10 use tokio::io::{AsyncReadExt, AsyncWriteExt};
11
12 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
13 let meshcore = MeshCore::new_with_sender(tx);
14
15 let addr = format!("{}:{}", host, port);
17 let stream = tokio::net::TcpStream::connect(&addr)
18 .await
19 .map_err(|e| Error::connection(format!("Failed to connect to {}: {}", addr, e)))?;
20
21 let (mut reader, mut writer) = tokio::io::split(stream);
22
23 let write_task = tokio::spawn(async move {
25 while let Some(data) = rx.recv().await {
26 let framed = crate::meshcore::frame_packet(&data);
27 if writer.write_all(&framed).await.is_err() {
28 break;
29 }
30 }
31 });
32
33 let msg_reader = meshcore.reader.clone();
35 let connected = meshcore.connected.clone();
36 let dispatcher = meshcore.dispatcher.clone();
37
38 let read_task = tokio::spawn(async move {
40 let mut buffer = BytesMut::with_capacity(4096);
41 let mut read_buf = [0u8; 1024];
42
43 loop {
44 match reader.read(&mut read_buf).await {
45 Ok(0) => {
46 *connected.write().await = false;
47 dispatcher
48 .emit(MeshCoreEvent::new(
49 EventType::Disconnected,
50 EventPayload::None,
51 ))
52 .await;
53 break;
54 }
55 Ok(n) => {
56 buffer.extend_from_slice(&read_buf[..n]);
57
58 while buffer.len() >= 3 {
59 if buffer[0] != FRAME_START {
60 use bytes::Buf;
61 buffer.advance(1);
62 continue;
63 }
64
65 let len = u16::from_le_bytes([buffer[1], buffer[2]]) as usize;
66 if buffer.len() < 3 + len {
67 break;
68 }
69
70 let frame = buffer[3..3 + len].to_vec();
71 use bytes::Buf;
72 buffer.advance(3 + len);
73
74 if let Err(e) = msg_reader.handle_rx(frame).await {
75 tracing::error!("Error handling message: {}", e);
76 }
77 }
78 }
79 Err(_) => {
80 *connected.write().await = false;
81 dispatcher
82 .emit(MeshCoreEvent::new(
83 EventType::Disconnected,
84 EventPayload::None,
85 ))
86 .await;
87 break;
88 }
89 }
90 }
91 });
92
93 meshcore.tasks.lock().await.push(write_task);
94 meshcore.tasks.lock().await.push(read_task);
95
96 *meshcore.connected.write().await = true;
97
98 meshcore.setup_event_handlers().await;
99
100 Ok(meshcore)
101 }
102}