1use crate::commands::CommandHandler;
4use crate::events::*;
5#[cfg(any(feature = "serial", feature = "tcp"))]
6use crate::packets::FRAME_START;
7use crate::reader::MessageReader;
8use crate::Result;
9use futures::StreamExt;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13#[cfg(any(feature = "serial", feature = "ble", feature = "tcp"))]
14use tokio::sync::mpsc;
15use tokio::sync::{Mutex, RwLock};
16use tokio_stream::wrappers::BroadcastStream;
17
18#[cfg(feature = "ble")]
19pub mod ble;
20#[cfg(feature = "serial")]
21pub mod serial;
22#[cfg(feature = "tcp")]
23pub mod tcp;
24
25pub struct MeshCore {
27 pub(crate) dispatcher: Arc<EventDispatcher>,
29 pub(crate) reader: Arc<MessageReader>,
31 commands: Arc<Mutex<CommandHandler>>,
33 contacts: Arc<RwLock<HashMap<String, Contact>>>,
35 self_info: Arc<RwLock<Option<SelfInfo>>>,
37 device_time: Arc<RwLock<Option<u32>>>,
39 contacts_dirty: Arc<RwLock<bool>>,
41 pub(crate) connected: Arc<RwLock<bool>>,
43 auto_fetch_sub: Arc<Mutex<Option<Subscription>>>,
45 pub(crate) tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
47}
48
49impl MeshCore {
50 #[cfg(any(feature = "serial", feature = "ble", feature = "tcp"))]
51 pub(crate) fn new_with_sender(sender: mpsc::Sender<Vec<u8>>) -> Self {
53 let dispatcher = Arc::new(EventDispatcher::new());
54 let reader = Arc::new(MessageReader::new(dispatcher.clone()));
55
56 let commands = CommandHandler::new(sender, dispatcher.clone(), reader.clone());
57
58 Self {
59 dispatcher,
60 reader,
61 commands: Arc::new(Mutex::new(commands)),
62 contacts: Arc::new(RwLock::new(HashMap::new())),
63 self_info: Arc::new(RwLock::new(None)),
64 device_time: Arc::new(RwLock::new(None)),
65 contacts_dirty: Arc::new(RwLock::new(true)),
66 connected: Arc::new(RwLock::new(false)),
67 auto_fetch_sub: Arc::new(Mutex::new(None)),
68 tasks: Arc::new(Mutex::new(Vec::new())),
69 }
70 }
71
72 #[cfg(any(feature = "serial", feature = "ble", feature = "tcp"))]
73 pub(crate) async fn setup_event_handlers(&self) {
75 let contacts = self.contacts.clone();
76 let contacts_dirty = self.contacts_dirty.clone();
77
78 self.dispatcher
80 .subscribe(EventType::Contacts, HashMap::new(), move |event| {
81 if let EventPayload::Contacts(new_contacts) = event.payload {
82 let contacts = contacts.clone();
83 let contacts_dirty = contacts_dirty.clone();
84 tokio::spawn(async move {
85 let mut map = contacts.write().await;
86 map.clear();
87 for contact in new_contacts {
88 let key = crate::parsing::hex_encode(&contact.public_key);
89 map.insert(key, contact);
90 }
91 *contacts_dirty.write().await = false;
92 });
93 }
94 })
95 .await;
96
97 let self_info = self.self_info.clone();
98
99 self.dispatcher
101 .subscribe(EventType::SelfInfo, HashMap::new(), move |event| {
102 if let EventPayload::SelfInfo(info) = event.payload {
103 let self_info = self_info.clone();
104 tokio::spawn(async move {
105 *self_info.write().await = Some(info);
106 });
107 }
108 })
109 .await;
110
111 let device_time = self.device_time.clone();
112
113 self.dispatcher
115 .subscribe(EventType::CurrentTime, HashMap::new(), move |event| {
116 if let EventPayload::Time(t) = event.payload {
117 let device_time = device_time.clone();
118 tokio::spawn(async move {
119 *device_time.write().await = Some(t);
120 });
121 }
122 })
123 .await;
124
125 let contacts2 = self.contacts.clone();
126
127 self.dispatcher
129 .subscribe(EventType::NewContact, HashMap::new(), move |event| {
130 if let EventPayload::Contact(contact) = event.payload {
131 let contacts = contacts2.clone();
132 tokio::spawn(async move {
133 let key = crate::parsing::hex_encode(&contact.public_key);
134 contacts.write().await.insert(key, contact);
135 });
136 }
137 })
138 .await;
139 }
140
141 pub async fn is_connected(&self) -> bool {
143 *self.connected.read().await
144 }
145
146 pub fn commands(&self) -> &Arc<Mutex<CommandHandler>> {
148 &self.commands
149 }
150
151 pub async fn contacts(&self) -> HashMap<String, Contact> {
153 self.contacts.read().await.clone()
154 }
155
156 pub async fn self_info(&self) -> Option<SelfInfo> {
158 self.self_info.read().await.clone()
159 }
160
161 pub async fn device_time(&self) -> Option<u32> {
163 *self.device_time.read().await
164 }
165
166 pub async fn contacts_dirty(&self) -> bool {
168 *self.contacts_dirty.read().await
169 }
170
171 pub async fn get_contact_by_name(&self, name: &str) -> Option<Contact> {
173 let contacts = self.contacts.read().await;
174 contacts
175 .values()
176 .find(|c| c.adv_name.eq_ignore_ascii_case(name))
177 .cloned()
178 }
179
180 pub async fn get_contact_by_prefix(&self, prefix: &[u8]) -> Option<Contact> {
182 let contacts = self.contacts.read().await;
183 contacts
184 .values()
185 .find(|c| c.public_key.starts_with(prefix))
186 .cloned()
187 }
188
189 pub async fn ensure_contacts(&self) -> Result<()> {
191 if *self.contacts_dirty.read().await {
192 let contacts = self.commands.lock().await.get_contacts(0).await?;
193 let mut map = self.contacts.write().await;
194 map.clear();
195 for contact in contacts {
196 let key = crate::parsing::hex_encode(&contact.public_key);
197 map.insert(key, contact);
198 }
199 *self.contacts_dirty.write().await = false;
200 }
201 Ok(())
202 }
203
204 pub async fn subscribe<F>(
206 &self,
207 event_type: EventType,
208 filters: HashMap<String, String>,
209 callback: F,
210 ) -> Subscription
211 where
212 F: Fn(MeshCoreEvent) + Send + Sync + 'static,
213 {
214 self.dispatcher
215 .subscribe(event_type, filters, callback)
216 .await
217 }
218
219 pub async fn wait_for_event(
221 &self,
222 event_type: Option<EventType>,
223 filters: HashMap<String, String>,
224 timeout: Duration,
225 ) -> Option<MeshCoreEvent> {
226 self.dispatcher
227 .wait_for_event(event_type, filters, timeout)
228 .await
229 }
230
231 pub async fn start_auto_message_fetching(&self) {
233 let commands = self.commands.clone();
234 let dispatcher = self.dispatcher.clone();
235
236 let sub = self
237 .dispatcher
238 .subscribe(EventType::MessagesWaiting, HashMap::new(), move |_| {
239 let commands = commands.clone();
240 let _dispatcher = dispatcher.clone();
241 tokio::spawn(async move {
242 loop {
243 let result = commands.lock().await.get_msg().await;
244 match result {
245 Ok(Some(_msg)) => {
246 }
248 Ok(None) => break, Err(_) => break,
250 }
251 }
252 });
253 })
254 .await;
255
256 *self.auto_fetch_sub.lock().await = Some(sub);
257 }
258
259 pub async fn stop_auto_message_fetching(&self) {
261 if let Some(sub) = self.auto_fetch_sub.lock().await.take() {
262 sub.unsubscribe().await;
263 }
264 }
265
266 pub async fn disconnect(&self) -> Result<()> {
268 *self.connected.write().await = false;
269
270 let mut tasks = self.tasks.lock().await;
272 for task in tasks.drain(..) {
273 task.abort();
274 }
275
276 self.dispatcher
278 .emit(MeshCoreEvent::new(
279 EventType::Disconnected,
280 EventPayload::None,
281 ))
282 .await;
283
284 Ok(())
285 }
286
287 pub async fn set_default_timeout(&self, timeout: Duration) {
289 self.commands.lock().await.set_default_timeout(timeout);
290 }
291
292 pub fn dispatcher(&self) -> &Arc<EventDispatcher> {
294 &self.dispatcher
295 }
296
297 pub fn reader(&self) -> &Arc<MessageReader> {
299 &self.reader
300 }
301
302 pub fn event_stream(&self) -> impl futures::Stream<Item = MeshCoreEvent> + Unpin {
318 BroadcastStream::new(self.dispatcher.receiver())
319 .filter_map(|result| std::future::ready(result.ok()))
320 }
321
322 pub fn event_stream_filtered(
338 &self,
339 event_type: EventType,
340 ) -> impl futures::Stream<Item = MeshCoreEvent> + Unpin {
341 BroadcastStream::new(self.dispatcher.receiver()).filter_map(move |result| {
342 std::future::ready(result.ok().filter(|event| event.event_type == event_type))
343 })
344 }
345}
346
347#[cfg(any(feature = "serial", feature = "tcp"))]
351pub(crate) fn frame_packet(data: &[u8]) -> Vec<u8> {
352 let len = data.len() as u16;
353 let mut framed = Vec::with_capacity(3 + data.len());
354 framed.push(FRAME_START);
355 framed.push((len & 0xFF) as u8);
356 framed.push((len >> 8) as u8);
357 framed.extend_from_slice(data);
358 framed
359}
360
361#[cfg(test)]
362#[cfg(any(feature = "serial", feature = "tcp"))]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_frame_packet() {
368 let data = vec![0x01, 0x02, 0x03];
369 let framed = frame_packet(&data);
370
371 assert_eq!(framed[0], FRAME_START);
372 assert_eq!(framed[1], 0x03); assert_eq!(framed[2], 0x00); assert_eq!(&framed[3..], &data);
375 }
376
377 #[test]
378 fn test_frame_packet_empty() {
379 let data: Vec<u8> = vec![];
380 let framed = frame_packet(&data);
381
382 assert_eq!(framed.len(), 3);
383 assert_eq!(framed[0], FRAME_START);
384 assert_eq!(framed[1], 0x00); assert_eq!(framed[2], 0x00); }
387
388 #[test]
389 fn test_frame_packet_single_byte() {
390 let data = vec![0xFF];
391 let framed = frame_packet(&data);
392
393 assert_eq!(framed.len(), 4);
394 assert_eq!(framed[0], FRAME_START);
395 assert_eq!(framed[1], 0x01);
396 assert_eq!(framed[2], 0x00);
397 assert_eq!(framed[3], 0xFF);
398 }
399
400 #[test]
401 fn test_frame_packet_256_bytes() {
402 let data = vec![0xAA; 256];
403 let framed = frame_packet(&data);
404
405 assert_eq!(framed.len(), 259);
406 assert_eq!(framed[0], FRAME_START);
407 assert_eq!(framed[1], 0x00); assert_eq!(framed[2], 0x01); assert_eq!(&framed[3..], &data[..]);
410 }
411
412 #[test]
413 fn test_frame_packet_large() {
414 let data = vec![0xBB; 1000];
415 let framed = frame_packet(&data);
416
417 assert_eq!(framed.len(), 1003);
418 assert_eq!(framed[0], FRAME_START);
419 assert_eq!(framed[1], 0xE8); assert_eq!(framed[2], 0x03); }
423
424 #[test]
425 fn test_frame_start_constant() {
426 assert_eq!(FRAME_START, 0x3c);
427 assert_eq!(FRAME_START, b'<');
428 }
429}