Skip to main content

meshcore_rs/meshcore/
tcp.rs

1use crate::events::EventPayload;
2use crate::packets::FRAME_START;
3use crate::{Error, EventType, MeshCore, MeshCoreEvent};
4use tokio::sync::mpsc;
5
6impl MeshCore {
7    /// Create a MeshCore client connected via TCP
8    pub async fn tcp(host: &str, port: u16) -> crate::Result<MeshCore> {
9        use bytes::BytesMut;
10        use tokio::io::{AsyncReadExt, AsyncWriteExt};
11
12        let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
13        let meshcore = MeshCore::new_with_sender(tx);
14
15        // Connect via TCP
16        let addr = format!("{}:{}", host, port);
17        let stream = tokio::net::TcpStream::connect(&addr)
18            .await
19            .map_err(|e| Error::connection(format!("Failed to connect to {}: {}", addr, e)))?;
20
21        let (mut reader, mut writer) = tokio::io::split(stream);
22
23        // Spawn write task
24        let write_task = tokio::spawn(async move {
25            while let Some(data) = rx.recv().await {
26                let framed = crate::meshcore::frame_packet(&data);
27                if writer.write_all(&framed).await.is_err() {
28                    break;
29                }
30            }
31        });
32
33        // Spawn read task
34        let msg_reader = meshcore.reader.clone();
35        let connected = meshcore.connected.clone();
36        let dispatcher = meshcore.dispatcher.clone();
37
38        // TODO the read task should be extractable from the discovery methods I think
39        let read_task = tokio::spawn(async move {
40            let mut buffer = BytesMut::with_capacity(4096);
41            let mut read_buf = [0u8; 1024];
42
43            loop {
44                match reader.read(&mut read_buf).await {
45                    Ok(0) => {
46                        *connected.write().await = false;
47                        dispatcher
48                            .emit(MeshCoreEvent::new(
49                                EventType::Disconnected,
50                                EventPayload::None,
51                            ))
52                            .await;
53                        break;
54                    }
55                    Ok(n) => {
56                        buffer.extend_from_slice(&read_buf[..n]);
57
58                        while buffer.len() >= 3 {
59                            if buffer[0] != FRAME_START {
60                                use bytes::Buf;
61                                buffer.advance(1);
62                                continue;
63                            }
64
65                            let len = u16::from_le_bytes([buffer[1], buffer[2]]) as usize;
66                            if buffer.len() < 3 + len {
67                                break;
68                            }
69
70                            let frame = buffer[3..3 + len].to_vec();
71                            use bytes::Buf;
72                            buffer.advance(3 + len);
73
74                            if let Err(e) = msg_reader.handle_rx(frame).await {
75                                tracing::error!("Error handling message: {}", e);
76                            }
77                        }
78                    }
79                    Err(_) => {
80                        *connected.write().await = false;
81                        dispatcher
82                            .emit(MeshCoreEvent::new(
83                                EventType::Disconnected,
84                                EventPayload::None,
85                            ))
86                            .await;
87                        break;
88                    }
89                }
90            }
91        });
92
93        meshcore.tasks.lock().await.push(write_task);
94        meshcore.tasks.lock().await.push(read_task);
95
96        *meshcore.connected.write().await = true;
97
98        meshcore.setup_event_handlers().await;
99
100        Ok(meshcore)
101    }
102}