Skip to main content

meshcore_rs/meshcore/
ble.rs

1use crate::events::EventPayload;
2use crate::{Error, EventType, MeshCore, MeshCoreEvent};
3use btleplug::api::{
4    Central, CentralEvent, Characteristic, Manager as _, Peripheral as _, ScanFilter, WriteType,
5};
6use btleplug::platform::{Manager, Peripheral};
7use futures::stream::StreamExt;
8use std::time::Duration;
9use tokio::sync::mpsc;
10use tokio::sync::mpsc::Receiver;
11use uuid::Uuid;
12
13// MeshCore BLE service and characteristic UUIDs
14// These are the standard UUIDs used by MeshCore devices
15const MESHCORE_SERVICE_UUID: Uuid = Uuid::from_u128(0x6e400001_b5a3_f393_e0a9_e50e24dcca9e);
16const MESHCORE_TX_CHAR_UUID: Uuid = Uuid::from_u128(0x6e400002_b5a3_f393_e0a9_e50e24dcca9e);
17const MESHCORE_RX_CHAR_UUID: Uuid = Uuid::from_u128(0x6e400003_b5a3_f393_e0a9_e50e24dcca9e);
18
19impl MeshCore {
20    /// Find MeshCore radios on BTLE upto the time specified by `scan_duration` and return their names
21    pub async fn ble_discover(scan_duration: Duration) -> crate::Result<Vec<String>> {
22        // Get the Bluetooth adapter
23        let manager = Manager::new()
24            .await
25            .map_err(|e| Error::connection(format!("Failed to create BLE manager: {}", e)))?;
26
27        let adapters = manager
28            .adapters()
29            .await
30            .map_err(|e| Error::connection(format!("Failed to get BLE adapters: {}", e)))?;
31
32        let adapter = adapters
33            .into_iter()
34            .next()
35            .ok_or_else(|| Error::connection("No BLE adapters found"))?;
36
37        // Subscribe to adapter events
38        let mut events = adapter
39            .events()
40            .await
41            .map_err(|e| Error::connection(format!("Failed to get adapter events: {}", e)))?;
42
43        // Start scanning
44        adapter
45            .start_scan(ScanFilter {
46                services: vec![MESHCORE_SERVICE_UUID],
47            })
48            .await
49            .map_err(|e| Error::connection(format!("Failed to start BLE scan: {}", e)))?;
50
51        tracing::info!("Scanning for MeshCore devices...");
52
53        let mut discovered_meshcore_radios = Vec::new();
54
55        let _ = tokio::time::timeout(scan_duration, async {
56            while let Some(event) = events.next().await {
57                if let CentralEvent::DeviceDiscovered(id) = event {
58                    if let Ok(peripheral) = adapter.peripheral(&id).await {
59                        if let Ok(Some(props)) = peripheral.properties().await {
60                            if let Some(name) = &props.local_name {
61                                discovered_meshcore_radios.push(name.clone());
62                            }
63                        }
64                    }
65                }
66            }
67        })
68        .await;
69
70        // Stop scanning
71        let _ = adapter.stop_scan().await;
72        tracing::info!("Stopped scanning for MeshCore devices...");
73
74        Ok(discovered_meshcore_radios)
75    }
76
77    /// Connect to a Btle peripheral that is a MeshCore radio and return the [MeshCore] to use to
78    /// communicate with it
79    async fn ble_connect_peripheral(
80        peripheral: &Peripheral,
81    ) -> crate::Result<(MeshCore, Receiver<Vec<u8>>, Characteristic)> {
82        // Check if already connected, disconnect first if so
83        if peripheral.is_connected().await.unwrap_or(false) {
84            let _ = peripheral.disconnect().await;
85            tokio::time::sleep(Duration::from_millis(500)).await;
86        }
87
88        // Connect to the device with retry
89        let mut connect_attempts = 0;
90        const MAX_CONNECT_ATTEMPTS: u32 = 3;
91
92        loop {
93            connect_attempts += 1;
94            tracing::info!(
95                "Connecting to device (attempt {}/{})",
96                connect_attempts,
97                MAX_CONNECT_ATTEMPTS
98            );
99
100            match peripheral.connect().await {
101                Ok(_) => {
102                    tracing::info!("Connected to MeshCore device");
103                    break;
104                }
105                Err(e) => {
106                    tracing::warn!("Connection attempt {} failed: {}", connect_attempts, e);
107                    if connect_attempts >= MAX_CONNECT_ATTEMPTS {
108                        return Err(Error::connection(format!(
109                            "Failed to connect after {} attempts: {}",
110                            MAX_CONNECT_ATTEMPTS, e
111                        )));
112                    }
113                    // Short delay before retry
114                    tokio::time::sleep(Duration::from_millis(1000)).await;
115                }
116            }
117        }
118
119        // Discover services
120        peripheral
121            .discover_services()
122            .await
123            .map_err(|e| Error::connection(format!("Failed to discover services: {}", e)))?;
124
125        // Find the MeshCore service and characteristics
126        let services = peripheral.services();
127        let meshcore_service = services
128            .iter()
129            .find(|s| s.uuid == MESHCORE_SERVICE_UUID)
130            .ok_or_else(|| Error::connection("MeshCore service not found on device"))?;
131
132        let tx_char = meshcore_service
133            .characteristics
134            .iter()
135            .find(|c| c.uuid == MESHCORE_TX_CHAR_UUID)
136            .ok_or_else(|| Error::connection("TX characteristic not found"))?
137            .clone();
138
139        let rx_char = meshcore_service
140            .characteristics
141            .iter()
142            .find(|c| c.uuid == MESHCORE_RX_CHAR_UUID)
143            .ok_or_else(|| Error::connection("RX characteristic not found"))?
144            .clone();
145
146        // Subscribe to notifications on RX characteristic
147        peripheral
148            .subscribe(&rx_char)
149            .await
150            .map_err(|e| Error::connection(format!("Failed to subscribe to RX: {}", e)))?;
151
152        tracing::info!("Subscribed to MeshCore notifications");
153
154        let (tx, rx) = mpsc::channel::<Vec<u8>>(64);
155        Ok((MeshCore::new_with_sender(tx), rx, tx_char))
156    }
157
158    /// Given a peripheral's name or mac address (as a &str formatted thus
159    /// "{:02X}:{:02X}:{:02X}:{:02X}:{:02X}:{:02X}" using BDAddr.to_string()),
160    /// return the [Peripheral] struct
161    async fn find_peripheral(target_name_or_mac: &str) -> crate::Result<Peripheral> {
162        let manager = Manager::new()
163            .await
164            .map_err(|e| Error::connection(format!("Failed to create BLE manager: {}", e)))?;
165
166        let adapters = manager
167            .adapters()
168            .await
169            .map_err(|e| Error::connection(format!("Failed to get BLE adapters: {}", e)))?;
170
171        let adapter = adapters
172            .into_iter()
173            .next()
174            .ok_or_else(|| Error::connection("No BLE adapters found"))?;
175
176        // Subscribe to adapter events
177        let mut events = adapter
178            .events()
179            .await
180            .map_err(|e| Error::connection(format!("Failed to get adapter events: {}", e)))?;
181
182        adapter
183            .start_scan(ScanFilter {
184                services: vec![MESHCORE_SERVICE_UUID],
185            })
186            .await
187            .map_err(|e| Error::connection(format!("Failed to start BLE scan: {}", e)))?;
188
189        let target_peripheral: Option<Peripheral> = {
190            let timeout = tokio::time::timeout(Duration::from_secs(2), async {
191                while let Some(event) = events.next().await {
192                    if let CentralEvent::DeviceDiscovered(id) = event {
193                        if let Ok(peripheral) = adapter.peripheral(&id).await {
194                            if let Ok(Some(props)) = peripheral.properties().await {
195                                // return this peripheral if the name matches
196                                if props.local_name.as_deref() == Some(target_name_or_mac) {
197                                    return Some(peripheral);
198                                }
199
200                                // return this peripheral if the MAC address matches
201                                if props.address.to_string() == target_name_or_mac {
202                                    return Some(peripheral);
203                                }
204                            }
205                        }
206                    }
207                }
208                None
209            })
210            .await;
211
212            timeout.unwrap_or_else(|_| None)
213        };
214
215        adapter
216            .stop_scan()
217            .await
218            .map_err(|e| Error::connection(format!("Failed to stop BLE scan: {}", e)))?;
219
220        target_peripheral.ok_or_else(|| Error::connection("MeshCore device not found"))
221    }
222
223    /// This method connects to a MeshCore radio by BTLE device name
224    pub async fn ble_connect(name: &str) -> crate::Result<MeshCore> {
225        let peripheral = Self::find_peripheral(name).await?;
226        let (meshcore, mut rx, tx_char) = Self::ble_connect_peripheral(&peripheral).await?;
227
228        // Clone peripheral for tasks
229        let peripheral_write = peripheral.clone();
230        let peripheral_read = peripheral.clone();
231
232        // Spawn write task
233        // BLE does NOT use framing - send raw payload directly (unlike serial which uses [0x3c][len][payload])
234        let write_task = tokio::spawn(async move {
235            while let Some(data) = rx.recv().await {
236                tracing::debug!("BLE TX: {} bytes: {:02x?}", data.len(), &data);
237                // BLE has MTU limits, so we may need to chunk the data
238                for chunk in data.chunks(244) {
239                    match peripheral_write
240                        .write(&tx_char, chunk, WriteType::WithoutResponse)
241                        .await
242                    {
243                        Ok(_) => tracing::trace!("BLE TX chunk: {} bytes sent", chunk.len()),
244                        Err(e) => {
245                            tracing::error!("BLE TX error: {}", e);
246                            break;
247                        }
248                    }
249                }
250            }
251        });
252
253        // Spawn read task
254        let msg_reader = meshcore.reader.clone();
255        let connected = meshcore.connected.clone();
256        let dispatcher = meshcore.dispatcher.clone();
257
258        let read_task = tokio::spawn(async move {
259            let mut notification_stream = match peripheral_read.notifications().await {
260                Ok(stream) => stream,
261                Err(_) => {
262                    *connected.write().await = false;
263                    dispatcher
264                        .emit(MeshCoreEvent::new(
265                            EventType::Disconnected,
266                            EventPayload::None,
267                        ))
268                        .await;
269                    return;
270                }
271            };
272
273            while let Some(data) = notification_stream.next().await {
274                // BLE does NOT use framing - each notification IS a complete packet
275                // (unlike serial which uses [0x3c][len][payload])
276                let frame = data.value;
277                tracing::debug!(
278                    "BLE RX: type=0x{:02x}, len={}, data={:02x?}",
279                    frame.first().unwrap_or(&0),
280                    frame.len(),
281                    &frame
282                );
283
284                if !frame.is_empty() {
285                    if let Err(e) = msg_reader.handle_rx(frame).await {
286                        tracing::error!("Error handling BLE message: {}", e);
287                    }
288                }
289            }
290
291            // Notification stream ended - disconnected
292            *connected.write().await = false;
293            dispatcher
294                .emit(MeshCoreEvent::new(
295                    EventType::Disconnected,
296                    EventPayload::None,
297                ))
298                .await;
299        });
300
301        meshcore.tasks.lock().await.push(write_task);
302        meshcore.tasks.lock().await.push(read_task);
303
304        *meshcore.connected.write().await = true;
305
306        meshcore.setup_event_handlers().await;
307
308        Ok(meshcore)
309    }
310}