Skip to main content

meshcore_rs/meshcore/
mod.rs

1//! Main MeshCore client implementation
2
3use crate::commands::CommandHandler;
4use crate::events::*;
5#[cfg(any(feature = "serial", feature = "tcp"))]
6use crate::packets::FRAME_START;
7use crate::reader::MessageReader;
8use crate::Result;
9use futures::StreamExt;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13#[cfg(any(feature = "serial", feature = "ble", feature = "tcp"))]
14use tokio::sync::mpsc;
15use tokio::sync::{Mutex, RwLock};
16use tokio_stream::wrappers::BroadcastStream;
17
18#[cfg(feature = "ble")]
19pub mod ble;
20#[cfg(feature = "serial")]
21pub mod serial;
22#[cfg(feature = "tcp")]
23pub mod tcp;
24
25/// MeshCore client for communicating with MeshCore devices
26pub struct MeshCore {
27    /// Event dispatcher
28    pub(crate) dispatcher: Arc<EventDispatcher>,
29    /// Message reader
30    pub(crate) reader: Arc<MessageReader>,
31    /// Command handler
32    commands: Arc<Mutex<CommandHandler>>,
33    /// Contact cache
34    contacts: Arc<RwLock<HashMap<String, Contact>>>,
35    /// Self-info cache
36    self_info: Arc<RwLock<Option<SelfInfo>>>,
37    /// Device time cache
38    device_time: Arc<RwLock<Option<u32>>>,
39    /// Contacts dirty flag
40    contacts_dirty: Arc<RwLock<bool>>,
41    /// Connection state
42    pub(crate) connected: Arc<RwLock<bool>>,
43    /// Auto message fetching subscription
44    auto_fetch_sub: Arc<Mutex<Option<Subscription>>>,
45    /// Background tasks
46    pub(crate) tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
47}
48
49impl MeshCore {
50    #[cfg(any(feature = "serial", feature = "ble", feature = "tcp"))]
51    /// Create a new MeshCore client with a custom connection
52    pub(crate) fn new_with_sender(sender: mpsc::Sender<Vec<u8>>) -> Self {
53        let dispatcher = Arc::new(EventDispatcher::new());
54        let reader = Arc::new(MessageReader::new(dispatcher.clone()));
55
56        let commands = CommandHandler::new(sender, dispatcher.clone(), reader.clone());
57
58        Self {
59            dispatcher,
60            reader,
61            commands: Arc::new(Mutex::new(commands)),
62            contacts: Arc::new(RwLock::new(HashMap::new())),
63            self_info: Arc::new(RwLock::new(None)),
64            device_time: Arc::new(RwLock::new(None)),
65            contacts_dirty: Arc::new(RwLock::new(true)),
66            connected: Arc::new(RwLock::new(false)),
67            auto_fetch_sub: Arc::new(Mutex::new(None)),
68            tasks: Arc::new(Mutex::new(Vec::new())),
69        }
70    }
71
72    #[cfg(any(feature = "serial", feature = "ble", feature = "tcp"))]
73    /// Set up internal event handlers for caching
74    pub(crate) async fn setup_event_handlers(&self) {
75        let contacts = self.contacts.clone();
76        let contacts_dirty = self.contacts_dirty.clone();
77
78        // Subscribe to contacts updates
79        self.dispatcher
80            .subscribe(EventType::Contacts, HashMap::new(), move |event| {
81                if let EventPayload::Contacts(new_contacts) = event.payload {
82                    let contacts = contacts.clone();
83                    let contacts_dirty = contacts_dirty.clone();
84                    tokio::spawn(async move {
85                        let mut map = contacts.write().await;
86                        map.clear();
87                        for contact in new_contacts {
88                            let key = crate::parsing::hex_encode(&contact.public_key);
89                            map.insert(key, contact);
90                        }
91                        *contacts_dirty.write().await = false;
92                    });
93                }
94            })
95            .await;
96
97        let self_info = self.self_info.clone();
98
99        // Subscribe to self-info updates
100        self.dispatcher
101            .subscribe(EventType::SelfInfo, HashMap::new(), move |event| {
102                if let EventPayload::SelfInfo(info) = event.payload {
103                    let self_info = self_info.clone();
104                    tokio::spawn(async move {
105                        *self_info.write().await = Some(info);
106                    });
107                }
108            })
109            .await;
110
111        let device_time = self.device_time.clone();
112
113        // Subscribe to time updates
114        self.dispatcher
115            .subscribe(EventType::CurrentTime, HashMap::new(), move |event| {
116                if let EventPayload::Time(t) = event.payload {
117                    let device_time = device_time.clone();
118                    tokio::spawn(async move {
119                        *device_time.write().await = Some(t);
120                    });
121                }
122            })
123            .await;
124
125        let contacts2 = self.contacts.clone();
126
127        // Subscribe to new contacts
128        self.dispatcher
129            .subscribe(EventType::NewContact, HashMap::new(), move |event| {
130                if let EventPayload::Contact(contact) = event.payload {
131                    let contacts = contacts2.clone();
132                    tokio::spawn(async move {
133                        let key = crate::parsing::hex_encode(&contact.public_key);
134                        contacts.write().await.insert(key, contact);
135                    });
136                }
137            })
138            .await;
139    }
140
141    /// Check if connected
142    pub async fn is_connected(&self) -> bool {
143        *self.connected.read().await
144    }
145
146    /// Get the command handler
147    pub fn commands(&self) -> &Arc<Mutex<CommandHandler>> {
148        &self.commands
149    }
150
151    /// Get cached contacts
152    pub async fn contacts(&self) -> HashMap<String, Contact> {
153        self.contacts.read().await.clone()
154    }
155
156    /// Get cached self-info
157    pub async fn self_info(&self) -> Option<SelfInfo> {
158        self.self_info.read().await.clone()
159    }
160
161    /// Get cached device time
162    pub async fn device_time(&self) -> Option<u32> {
163        *self.device_time.read().await
164    }
165
166    /// Check if the contact cache is dirty
167    pub async fn contacts_dirty(&self) -> bool {
168        *self.contacts_dirty.read().await
169    }
170
171    /// Get contact by name
172    pub async fn get_contact_by_name(&self, name: &str) -> Option<Contact> {
173        let contacts = self.contacts.read().await;
174        contacts
175            .values()
176            .find(|c| c.adv_name.eq_ignore_ascii_case(name))
177            .cloned()
178    }
179
180    /// Get contact by public key prefix
181    pub async fn get_contact_by_prefix(&self, prefix: &[u8]) -> Option<Contact> {
182        let contacts = self.contacts.read().await;
183        contacts
184            .values()
185            .find(|c| c.public_key.starts_with(prefix))
186            .cloned()
187    }
188
189    /// Ensure contacts are loaded
190    pub async fn ensure_contacts(&self) -> Result<()> {
191        if *self.contacts_dirty.read().await {
192            let contacts = self.commands.lock().await.get_contacts(0).await?;
193            let mut map = self.contacts.write().await;
194            map.clear();
195            for contact in contacts {
196                let key = crate::parsing::hex_encode(&contact.public_key);
197                map.insert(key, contact);
198            }
199            *self.contacts_dirty.write().await = false;
200        }
201        Ok(())
202    }
203
204    /// Subscribe to events
205    pub async fn subscribe<F>(
206        &self,
207        event_type: EventType,
208        filters: HashMap<String, String>,
209        callback: F,
210    ) -> Subscription
211    where
212        F: Fn(MeshCoreEvent) + Send + Sync + 'static,
213    {
214        self.dispatcher
215            .subscribe(event_type, filters, callback)
216            .await
217    }
218
219    /// Wait for an event, either matching a specific [EventType] or all
220    pub async fn wait_for_event(
221        &self,
222        event_type: Option<EventType>,
223        filters: HashMap<String, String>,
224        timeout: Duration,
225    ) -> Option<MeshCoreEvent> {
226        self.dispatcher
227            .wait_for_event(event_type, filters, timeout)
228            .await
229    }
230
231    /// Start auto-fetching messages when MESSAGES_WAITING is received
232    pub async fn start_auto_message_fetching(&self) {
233        let commands = self.commands.clone();
234        let dispatcher = self.dispatcher.clone();
235
236        let sub = self
237            .dispatcher
238            .subscribe(EventType::MessagesWaiting, HashMap::new(), move |_| {
239                let commands = commands.clone();
240                let _dispatcher = dispatcher.clone();
241                tokio::spawn(async move {
242                    loop {
243                        let result = commands.lock().await.get_msg().await;
244                        match result {
245                            Ok(Some(_msg)) => {
246                                // Message already emitted by the reader
247                            }
248                            Ok(None) => break, // No more messages
249                            Err(_) => break,
250                        }
251                    }
252                });
253            })
254            .await;
255
256        *self.auto_fetch_sub.lock().await = Some(sub);
257    }
258
259    /// Stop auto-fetching messages
260    pub async fn stop_auto_message_fetching(&self) {
261        if let Some(sub) = self.auto_fetch_sub.lock().await.take() {
262            sub.unsubscribe().await;
263        }
264    }
265
266    /// Disconnect from the device
267    pub async fn disconnect(&self) -> Result<()> {
268        *self.connected.write().await = false;
269
270        // Abort all background tasks
271        let mut tasks = self.tasks.lock().await;
272        for task in tasks.drain(..) {
273            task.abort();
274        }
275
276        // Emit disconnected event
277        self.dispatcher
278            .emit(MeshCoreEvent::new(
279                EventType::Disconnected,
280                EventPayload::None,
281            ))
282            .await;
283
284        Ok(())
285    }
286
287    /// Set default timeout
288    pub async fn set_default_timeout(&self, timeout: Duration) {
289        self.commands.lock().await.set_default_timeout(timeout);
290    }
291
292    /// Get the event dispatcher
293    pub fn dispatcher(&self) -> &Arc<EventDispatcher> {
294        &self.dispatcher
295    }
296
297    /// Get the message reader
298    pub fn reader(&self) -> &Arc<MessageReader> {
299        &self.reader
300    }
301
302    /// Create a stream of all events
303    ///
304    /// Returns a stream that yields all events emitted by the device.
305    /// Use `StreamExt` methods to filter or process events.
306    ///
307    /// # Example
308    ///
309    /// ```dont_run
310    /// use futures::StreamExt;
311    ///
312    /// let mut stream = meshcore.event_stream();
313    /// while let Some(event) = stream.next().await {
314    ///     println!("Received: {:?}", event.event_type);
315    /// }
316    /// ```
317    pub fn event_stream(&self) -> impl futures::Stream<Item = MeshCoreEvent> + Unpin {
318        BroadcastStream::new(self.dispatcher.receiver())
319            .filter_map(|result| std::future::ready(result.ok()))
320    }
321
322    /// Create a filtered stream of events by type
323    ///
324    /// Returns a stream that yields only events matching the specified type.
325    ///
326    /// # Example
327    ///
328    /// ```dont_run
329    /// use futures::StreamExt;
330    /// use meshcore_rs::EventType;
331    ///
332    /// let mut stream = meshcore.event_stream_filtered(EventType::ContactMsgRecv);
333    /// while let Some(event) = stream.next().await {
334    ///     println!("Message received: {:?}", event.payload);
335    /// }
336    /// ```
337    pub fn event_stream_filtered(
338        &self,
339        event_type: EventType,
340    ) -> impl futures::Stream<Item = MeshCoreEvent> + Unpin {
341        BroadcastStream::new(self.dispatcher.receiver()).filter_map(move |result| {
342            std::future::ready(result.ok().filter(|event| event.event_type == event_type))
343        })
344    }
345}
346
347/// Frame a packet for transmission
348///
349/// Format: `[START: 0x3c][LENGTH_L][LENGTH_H][PAYLOAD]`
350#[cfg(any(feature = "serial", feature = "tcp"))]
351pub(crate) fn frame_packet(data: &[u8]) -> Vec<u8> {
352    let len = data.len() as u16;
353    let mut framed = Vec::with_capacity(3 + data.len());
354    framed.push(FRAME_START);
355    framed.push((len & 0xFF) as u8);
356    framed.push((len >> 8) as u8);
357    framed.extend_from_slice(data);
358    framed
359}
360
361#[cfg(test)]
362#[cfg(any(feature = "serial", feature = "tcp"))]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_frame_packet() {
368        let data = vec![0x01, 0x02, 0x03];
369        let framed = frame_packet(&data);
370
371        assert_eq!(framed[0], FRAME_START);
372        assert_eq!(framed[1], 0x03); // Length low byte
373        assert_eq!(framed[2], 0x00); // Length high byte
374        assert_eq!(&framed[3..], &data);
375    }
376
377    #[test]
378    fn test_frame_packet_empty() {
379        let data: Vec<u8> = vec![];
380        let framed = frame_packet(&data);
381
382        assert_eq!(framed.len(), 3);
383        assert_eq!(framed[0], FRAME_START);
384        assert_eq!(framed[1], 0x00); // Length low byte
385        assert_eq!(framed[2], 0x00); // Length high byte
386    }
387
388    #[test]
389    fn test_frame_packet_single_byte() {
390        let data = vec![0xFF];
391        let framed = frame_packet(&data);
392
393        assert_eq!(framed.len(), 4);
394        assert_eq!(framed[0], FRAME_START);
395        assert_eq!(framed[1], 0x01);
396        assert_eq!(framed[2], 0x00);
397        assert_eq!(framed[3], 0xFF);
398    }
399
400    #[test]
401    fn test_frame_packet_256_bytes() {
402        let data = vec![0xAA; 256];
403        let framed = frame_packet(&data);
404
405        assert_eq!(framed.len(), 259);
406        assert_eq!(framed[0], FRAME_START);
407        assert_eq!(framed[1], 0x00); // 256 & 0xFF = 0
408        assert_eq!(framed[2], 0x01); // 256 >> 8 = 1
409        assert_eq!(&framed[3..], &data[..]);
410    }
411
412    #[test]
413    fn test_frame_packet_large() {
414        let data = vec![0xBB; 1000];
415        let framed = frame_packet(&data);
416
417        assert_eq!(framed.len(), 1003);
418        assert_eq!(framed[0], FRAME_START);
419        // 1000 = 0x03E8
420        assert_eq!(framed[1], 0xE8); // Low byte
421        assert_eq!(framed[2], 0x03); // High byte
422    }
423
424    #[test]
425    fn test_frame_start_constant() {
426        assert_eq!(FRAME_START, 0x3c);
427        assert_eq!(FRAME_START, b'<');
428    }
429}