1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3#[cfg(target_os = "macos")]
4use futures::StreamExt;
5use std::collections::BTreeSet;
6#[cfg(target_os = "macos")]
7use std::collections::HashMap;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc;
11use tracing::{debug, info, warn};
12
13use crate::nostr_relay::NostrRelay;
14
15use super::bluetooth_peer::{BluetoothFrame, BluetoothLink};
16use super::peer::ContentStore;
17use super::session::MeshPeer;
18use super::signaling::{
19 ConnectionState, PeerClassifier, PeerEntry, PeerSignalPath, PeerTransport, WebRTCState,
20};
21use super::types::{MeshNostrFrame, PeerDirection, PeerId, PeerPool, PoolSettings};
22
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
29#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
30pub const HTREE_BLE_CHUNK_BYTES: usize = 180;
31const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
32
33#[derive(Debug, Clone)]
35pub struct BluetoothConfig {
36 pub enabled: bool,
37 pub max_peers: usize,
38}
39
40impl BluetoothConfig {
41 pub fn is_enabled(&self) -> bool {
42 self.enabled && self.max_peers > 0
43 }
44}
45
46impl Default for BluetoothConfig {
47 fn default() -> Self {
48 Self {
49 enabled: false,
50 max_peers: 0,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum BluetoothBackendState {
57 Disabled,
58 Running,
59 Unsupported,
60}
61
62#[derive(Clone)]
63pub struct PendingBluetoothLink {
64 pub link: Arc<dyn BluetoothLink>,
65 pub direction: PeerDirection,
66 pub local_hello_sent: bool,
67 pub peer_hint: Option<String>,
68}
69
70#[async_trait]
71pub trait MobileBluetoothBridge: Send + Sync {
72 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
73}
74
75static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
76
77pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
78 MOBILE_BLUETOOTH_BRIDGE
79 .set(bridge)
80 .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
81}
82
83fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
84 MOBILE_BLUETOOTH_BRIDGE.get().cloned()
85}
86
87#[derive(Clone)]
88pub struct BluetoothPeerRegistrar {
89 state: Arc<WebRTCState>,
90 peer_classifier: PeerClassifier,
91 pools: PoolSettings,
92 max_bluetooth_peers: usize,
93}
94
95impl BluetoothPeerRegistrar {
96 pub fn new(
97 state: Arc<WebRTCState>,
98 peer_classifier: PeerClassifier,
99 pools: PoolSettings,
100 max_bluetooth_peers: usize,
101 ) -> Self {
102 Self {
103 state,
104 peer_classifier,
105 pools,
106 max_bluetooth_peers,
107 }
108 }
109
110 async fn pool_counts(&self) -> (usize, usize) {
111 let peers = self.state.peers.read().await;
112 let mut follows = 0usize;
113 let mut other = 0usize;
114 for entry in peers.values() {
115 if entry.state != ConnectionState::Connected {
116 continue;
117 }
118 match entry.pool {
119 PeerPool::Follows => follows += 1,
120 PeerPool::Other => other += 1,
121 }
122 }
123 (follows, other)
124 }
125
126 async fn bluetooth_peer_count(&self, peer_key: &str) -> usize {
127 let peers = self.state.peers.read().await;
128 peers
129 .values()
130 .filter(|entry| entry.transport == PeerTransport::Bluetooth)
131 .filter(|entry| entry.state == ConnectionState::Connected)
132 .filter(|entry| entry.peer_id.to_string() != peer_key)
133 .count()
134 }
135
136 pub async fn register_connected_peer(
137 &self,
138 peer_id: PeerId,
139 direction: PeerDirection,
140 peer: MeshPeer,
141 ) -> bool {
142 let peer_key = peer_id.to_string();
143 let pool = (self.peer_classifier)(&peer_id.pubkey);
144 let (follows, other) = self.pool_counts().await;
145 let can_accept_pool = match pool {
146 PeerPool::Follows => follows < self.pools.follows.max_connections,
147 PeerPool::Other => other < self.pools.other.max_connections,
148 };
149 if !can_accept_pool {
150 warn!(
151 "Bluetooth peer {} rejected because pool {:?} is full",
152 peer_id.short(),
153 pool
154 );
155 return false;
156 }
157
158 if self.max_bluetooth_peers == 0
159 || self.bluetooth_peer_count(&peer_key).await >= self.max_bluetooth_peers
160 {
161 warn!(
162 "Bluetooth peer {} rejected because max_bluetooth_peers={} reached",
163 peer_id.short(),
164 self.max_bluetooth_peers
165 );
166 return false;
167 }
168
169 let mut peers = self.state.peers.write().await;
170 let duplicate_keys = peers
171 .iter()
172 .filter(|(key, entry)| {
173 key.as_str() != peer_key
174 && entry.transport == PeerTransport::Bluetooth
175 && entry.peer_id.pubkey == peer_id.pubkey
176 })
177 .map(|(key, _)| key.clone())
178 .collect::<Vec<_>>();
179 let was_connected = peers
180 .get(&peer_key)
181 .map(|entry| entry.state == ConnectionState::Connected)
182 .unwrap_or(false);
183 let replaced = peers.insert(
184 peer_key,
185 PeerEntry {
186 peer_id,
187 direction,
188 state: ConnectionState::Connected,
189 last_seen: Instant::now(),
190 peer: Some(peer),
191 pool,
192 transport: PeerTransport::Bluetooth,
193 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
194 bytes_sent: 0,
195 bytes_received: 0,
196 },
197 );
198 let removed_duplicates = duplicate_keys
199 .into_iter()
200 .filter_map(|key| peers.remove(&key))
201 .collect::<Vec<_>>();
202 drop(peers);
203
204 if let Some(previous) = replaced.and_then(|entry| entry.peer) {
205 let _ = previous.close().await;
206 }
207 for duplicate in &removed_duplicates {
208 if let Some(peer) = duplicate.peer.as_ref() {
209 let _ = peer.close().await;
210 }
211 }
212
213 let removed_connected_duplicates = removed_duplicates
214 .iter()
215 .filter(|entry| entry.state == ConnectionState::Connected)
216 .count() as isize;
217 let connected_delta =
218 1isize - if was_connected { 1 } else { 0 } - removed_connected_duplicates;
219 if connected_delta > 0 {
220 self.state.connected_count.fetch_add(
221 connected_delta as usize,
222 std::sync::atomic::Ordering::Relaxed,
223 );
224 } else if connected_delta < 0 {
225 self.state.connected_count.fetch_sub(
226 (-connected_delta) as usize,
227 std::sync::atomic::Ordering::Relaxed,
228 );
229 }
230 true
231 }
232
233 pub async fn unregister_peer(&self, peer_id: &PeerId) {
234 let peer_key = peer_id.to_string();
235 let removed = self.state.peers.write().await.remove(&peer_key);
236 if let Some(entry) = removed {
237 if entry.state == ConnectionState::Connected {
238 self.state
239 .connected_count
240 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
241 }
242 if let Some(peer) = entry.peer {
243 let _ = peer.close().await;
244 }
245 }
246 }
247
248 pub async fn unregister_bluetooth_peer_if_current(
249 &self,
250 peer_id: &PeerId,
251 expected_peer: &Arc<super::BluetoothPeer>,
252 ) {
253 let peer_key = peer_id.to_string();
254 let removed = {
255 let mut peers = self.state.peers.write().await;
256 let matches_current = peers
257 .get(&peer_key)
258 .and_then(|entry| entry.peer.as_ref())
259 .and_then(|peer| match peer {
260 MeshPeer::Bluetooth(current) => Some(Arc::ptr_eq(current, expected_peer)),
261 _ => None,
262 })
263 .unwrap_or(false);
264 if matches_current {
265 peers.remove(&peer_key)
266 } else {
267 None
268 }
269 };
270 if let Some(entry) = removed {
271 if entry.state == ConnectionState::Connected {
272 self.state
273 .connected_count
274 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
275 }
276 if let Some(peer) = entry.peer {
277 let _ = peer.close().await;
278 }
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::webrtc::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
287 use crate::webrtc::session::{MeshPeer, TestMeshPeer};
288
289 #[tokio::test]
290 async fn register_connected_peer_closes_replaced_session() {
291 let state = Arc::new(WebRTCState::new());
292 let registrar = BluetoothPeerRegistrar::new(
293 state.clone(),
294 Arc::new(|_| PeerPool::Other),
295 PoolSettings::default(),
296 2,
297 );
298
299 let peer_id = PeerId::new("peer-pub".to_string(), Some("session".to_string()));
300 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
301 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
302 let first_ref = first.mock_ref().expect("mock peer").clone();
303
304 assert!(
305 registrar
306 .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
307 .await
308 );
309 assert!(
310 registrar
311 .register_connected_peer(peer_id, PeerDirection::Outbound, second)
312 .await
313 );
314
315 assert!(first_ref.is_closed());
316 }
317
318 #[tokio::test]
319 async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
320 let state = Arc::new(WebRTCState::new());
321 let registrar = BluetoothPeerRegistrar::new(
322 state.clone(),
323 Arc::new(|_| PeerPool::Other),
324 PoolSettings::default(),
325 2,
326 );
327
328 let first_peer_id = PeerId::new("peer-pub".to_string(), Some("session-a".to_string()));
329 let second_peer_id = PeerId::new("peer-pub".to_string(), Some("session-b".to_string()));
330 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
331 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
332 let first_ref = first.mock_ref().expect("mock peer").clone();
333
334 assert!(
335 registrar
336 .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
337 .await
338 );
339 assert!(
340 registrar
341 .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
342 .await
343 );
344
345 assert!(first_ref.is_closed());
346 let peers = state.peers.read().await;
347 assert!(!peers.contains_key(&first_peer_id.to_string()));
348 assert!(peers.contains_key(&second_peer_id.to_string()));
349 assert_eq!(
350 state
351 .connected_count
352 .load(std::sync::atomic::Ordering::Relaxed),
353 1
354 );
355 }
356
357 #[tokio::test]
358 async fn handle_pending_link_unregisters_peer_after_transport_closes() {
359 let (link_a, link_b) = MockBluetoothLink::pair();
360 let state = Arc::new(WebRTCState::new());
361 let registrar = BluetoothPeerRegistrar::new(
362 state.clone(),
363 Arc::new(|_| PeerPool::Other),
364 PoolSettings::default(),
365 2,
366 );
367 let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
368 let local_peer_id = PeerId::new("local-pub".to_string(), Some("local-session".to_string()));
369 let remote_peer_id =
370 PeerId::new("remote-pub".to_string(), Some("remote-session".to_string()));
371 let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
372
373 send_hello(&remote_link, &remote_peer_id)
374 .await
375 .expect("send hello");
376
377 let accepted = handle_pending_link(
378 PendingBluetoothLink {
379 link: link_a.clone(),
380 direction: PeerDirection::Outbound,
381 local_hello_sent: false,
382 peer_hint: Some("mock-ble".to_string()),
383 },
384 BluetoothRuntimeContext {
385 my_peer_id: local_peer_id,
386 store: None,
387 nostr_relay: None,
388 mesh_frame_tx,
389 registrar: registrar.clone(),
390 },
391 )
392 .await
393 .expect("handle pending link");
394
395 assert!(accepted);
396 assert!(state
397 .peers
398 .read()
399 .await
400 .contains_key(&remote_peer_id.to_string()));
401
402 link_a.close().await.expect("close local transport");
403
404 tokio::time::timeout(Duration::from_secs(2), async {
405 loop {
406 if !state
407 .peers
408 .read()
409 .await
410 .contains_key(&remote_peer_id.to_string())
411 {
412 break;
413 }
414 tokio::time::sleep(Duration::from_millis(25)).await;
415 }
416 })
417 .await
418 .expect("peer should be unregistered");
419 }
420}
421
422#[derive(Clone)]
423pub struct BluetoothRuntimeContext {
424 pub my_peer_id: PeerId,
425 pub store: Option<Arc<dyn ContentStore>>,
426 pub nostr_relay: Option<Arc<NostrRelay>>,
427 pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
428 pub registrar: BluetoothPeerRegistrar,
429}
430
431pub struct BluetoothMesh {
433 config: BluetoothConfig,
434}
435
436impl BluetoothMesh {
437 pub fn new(config: BluetoothConfig) -> Self {
438 Self { config }
439 }
440
441 pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
442 if !self.config.is_enabled() {
443 info!("Bluetooth transport disabled");
444 return BluetoothBackendState::Disabled;
445 }
446
447 let mut started = false;
448
449 if let Some(bridge) = mobile_bluetooth_bridge() {
450 info!(
451 "Starting mobile Bluetooth bridge for peer {}",
452 context.my_peer_id.short()
453 );
454 match bridge.start(context.my_peer_id.to_string()).await {
455 Ok(mut rx) => {
456 info!("Mobile Bluetooth bridge is running");
457 let ctx = context.clone();
458 tokio::spawn(async move {
459 while let Some(link) = rx.recv().await {
460 if let Err(err) = handle_pending_link(link, ctx.clone()).await {
461 warn!("Mobile Bluetooth link failed: {}", err);
462 }
463 }
464 });
465 started = true;
466 }
467 Err(err) => {
468 warn!("Failed to start mobile Bluetooth bridge: {}", err);
469 }
470 }
471 }
472
473 #[cfg(target_os = "macos")]
474 {
475 if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
476 {
477 warn!("Failed to start macOS Bluetooth central: {}", err);
478 } else {
479 started = true;
480 }
481 }
482
483 if started {
484 BluetoothBackendState::Running
485 } else {
486 warn!(
487 "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
488 self.config.max_peers
489 );
490 BluetoothBackendState::Unsupported
491 }
492 }
493}
494
495async fn handle_pending_link(
496 link: PendingBluetoothLink,
497 context: BluetoothRuntimeContext,
498) -> Result<bool> {
499 if let Some(peer_hint) = link.peer_hint.as_deref() {
500 debug!("Handling pending Bluetooth link {}", peer_hint);
501 }
502 let remote_peer_id = receive_remote_hello(&link.link).await?;
503 if !link.local_hello_sent {
504 send_hello(&link.link, &context.my_peer_id).await?;
505 }
506
507 let bluetooth_peer = super::BluetoothPeer::new(
508 remote_peer_id.clone(),
509 link.direction,
510 link.link,
511 context.store.clone(),
512 context.nostr_relay.clone(),
513 Some(context.mesh_frame_tx.clone()),
514 Some(context.registrar.state.clone()),
515 );
516 let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
517
518 if !context
519 .registrar
520 .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
521 .await
522 {
523 warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
524 bluetooth_peer.close().await?;
525 return Ok(false);
526 } else {
527 info!("Bluetooth peer {} connected", remote_peer_id.short());
528 spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
529 }
530 Ok(true)
531}
532
533fn spawn_bluetooth_disconnect_watch(
534 peer_id: PeerId,
535 peer: Arc<super::BluetoothPeer>,
536 registrar: BluetoothPeerRegistrar,
537) {
538 tokio::spawn(async move {
539 loop {
540 if !peer.is_connected() {
541 registrar
542 .unregister_bluetooth_peer_if_current(&peer_id, &peer)
543 .await;
544 break;
545 }
546 tokio::time::sleep(Duration::from_millis(250)).await;
547 }
548 });
549}
550
551#[derive(serde::Serialize, serde::Deserialize)]
552#[serde(rename_all = "camelCase")]
553struct BluetoothHello {
554 #[serde(rename = "type")]
555 kind: String,
556 peer_id: String,
557}
558
559async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
560 let hello = BluetoothHello {
561 kind: "hello".to_string(),
562 peer_id: my_peer_id.to_string(),
563 };
564 link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
565 .await
566}
567
568async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
569 let result = tokio::time::timeout(HELLO_TIMEOUT, async {
570 loop {
571 match link.recv().await {
572 Some(BluetoothFrame::Text(text)) => {
573 if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
574 if hello.kind == "hello" {
575 return PeerId::from_string(&hello.peer_id)
576 .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
577 }
578 }
579 }
580 Some(BluetoothFrame::Binary(_)) => {}
581 None => return Err(anyhow!("bluetooth link closed before hello")),
582 }
583 }
584 })
585 .await;
586
587 match result {
588 Ok(peer_id) => peer_id,
589 Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
590 }
591}
592
593#[cfg(target_os = "macos")]
594mod macos {
595 use super::*;
596 use btleplug::api::{
597 Central, CharPropFlags, Characteristic, Manager as _, Peripheral as _, ScanFilter,
598 ValueNotification, WriteType,
599 };
600 use btleplug::platform::{Manager, Peripheral};
601 use tokio::sync::Mutex;
602 use uuid::Uuid;
603
604 pub(super) async fn start_macos_central(
605 config: BluetoothConfig,
606 context: BluetoothRuntimeContext,
607 ) -> Result<()> {
608 struct ConnectedPeripheral {
609 peripheral: Peripheral,
610 link: Arc<dyn BluetoothLink>,
611 }
612
613 let manager = Manager::new().await?;
614 let adapters = manager.adapters().await?;
615 let Some(adapter) = adapters.into_iter().next() else {
616 warn!("No Bluetooth adapters available on macOS");
617 return Ok(());
618 };
619 info!(
620 "Starting macOS Bluetooth central for peer {} (max_peers={})",
621 context.my_peer_id.short(),
622 config.max_peers
623 );
624
625 tokio::spawn(async move {
626 let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
627 let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
628 loop {
629 if let Err(err) = adapter.start_scan(ScanFilter::default()).await {
630 warn!("Failed to start BLE scan: {}", err);
631 tokio::time::sleep(Duration::from_secs(5)).await;
632 continue;
633 }
634 tokio::time::sleep(Duration::from_secs(2)).await;
635
636 let peripherals = match adapter.peripherals().await {
637 Ok(peripherals) => peripherals,
638 Err(err) => {
639 warn!("Failed to list BLE peripherals: {}", err);
640 tokio::time::sleep(Duration::from_secs(5)).await;
641 continue;
642 }
643 };
644
645 connected.retain(|_, connection| {
646 futures::executor::block_on(async {
647 let peripheral_connected =
648 connection.peripheral.is_connected().await.unwrap_or(false);
649 let link_open = connection.link.is_open();
650 if !peripheral_connected || !link_open {
651 if peripheral_connected {
652 let _ = connection.peripheral.disconnect().await;
653 }
654 false
655 } else {
656 true
657 }
658 })
659 });
660
661 for peripheral in peripherals {
662 if connected.len() >= config.max_peers {
663 break;
664 }
665 let peripheral_id = peripheral.id().to_string();
666 if connected.contains_key(&peripheral_id) {
667 continue;
668 }
669 let properties = match peripheral.properties().await {
670 Ok(Some(properties)) => properties,
671 _ => continue,
672 };
673 if !properties.services.contains(&service_uuid) {
674 continue;
675 }
676 info!(
677 "Discovered BLE peripheral {} advertising htree service",
678 peripheral_id
679 );
680 if let Err(err) = adapter.stop_scan().await {
681 debug!("Failed to stop BLE scan before connecting to {}: {}", peripheral_id, err);
682 }
683 match connect_peripheral(peripheral.clone()).await {
684 Ok(Some(link)) => {
685 let tracked_link = link.clone();
686 let pending = PendingBluetoothLink {
687 link,
688 direction: PeerDirection::Outbound,
689 local_hello_sent: false,
690 peer_hint: Some(peripheral_id.clone()),
691 };
692 match handle_pending_link(pending, context.clone()).await {
693 Ok(true) => {
694 connected.insert(
695 peripheral_id.clone(),
696 ConnectedPeripheral {
697 peripheral,
698 link: tracked_link,
699 },
700 );
701 }
702 Ok(false) => {}
703 Err(err) => {
704 warn!("Failed to attach BLE peripheral: {}", err);
705 }
706 }
707 }
708 Ok(None) => {}
709 Err(err) => {
710 warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
711 }
712 }
713 }
714
715 tokio::time::sleep(Duration::from_secs(5)).await;
716 }
717 });
718
719 Ok(())
720 }
721
722 async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
723 let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
724 let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
725 let peripheral_id = peripheral.id().to_string();
726
727 if !peripheral.is_connected().await? {
728 info!("Connecting to BLE peripheral {}", peripheral_id);
729 match tokio::time::timeout(Duration::from_secs(5), peripheral.connect()).await {
730 Ok(Ok(())) => {}
731 Ok(Err(err)) => return Err(err.into()),
732 Err(_) => {
733 let connected = peripheral.is_connected().await.unwrap_or(false);
734 if !connected {
735 warn!("BLE connect timed out for {} without establishing a link", peripheral_id);
736 let _ = peripheral.disconnect().await;
737 return Ok(None);
738 }
739 warn!(
740 "BLE connect timed out for {} but the adapter reports it connected; continuing",
741 peripheral_id
742 );
743 }
744 }
745 }
746 let mut rx_char = None;
747 let mut tx_char = None;
748 for attempt in 1..=6 {
749 info!(
750 "Discovering BLE services for {} (attempt {}/6)",
751 peripheral_id, attempt
752 );
753 peripheral.discover_services().await?;
754
755 let characteristics = peripheral.characteristics();
756 if !characteristics.is_empty() {
757 let uuids = characteristics
758 .iter()
759 .map(|c| c.uuid.to_string())
760 .collect::<Vec<_>>()
761 .join(", ");
762 info!(
763 "BLE peripheral {} characteristics: {}",
764 peripheral_id, uuids
765 );
766 }
767 rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
768 tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
769 if rx_char.is_some() && tx_char.is_some() {
770 break;
771 }
772 tokio::time::sleep(Duration::from_millis(500)).await;
773 }
774 let Some(rx_char) = rx_char else {
775 warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
776 let _ = peripheral.disconnect().await;
777 return Ok(None);
778 };
779 let Some(tx_char) = tx_char else {
780 warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
781 let _ = peripheral.disconnect().await;
782 return Ok(None);
783 };
784 if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
785 warn!(
786 "BLE peripheral {} TX characteristic is missing NOTIFY",
787 peripheral_id
788 );
789 let _ = peripheral.disconnect().await;
790 return Ok(None);
791 }
792 let notifications = peripheral.notifications().await?;
793 info!("Subscribing to BLE notifications for {}", peripheral_id);
794 peripheral.subscribe(&tx_char).await?;
795
796 let initial_frames = match peripheral.read(&tx_char).await {
797 Ok(bytes) if !bytes.is_empty() => {
798 info!(
799 "Read initial BLE hello bytes from {} ({} bytes)",
800 peripheral_id,
801 bytes.len()
802 );
803 let mut decoder = FrameDecoder::new();
804 match decoder.push(&bytes) {
805 Ok(frames) => frames,
806 Err(err) => {
807 warn!(
808 "Discarding malformed initial BLE frame from {}: {}",
809 peripheral_id, err
810 );
811 Vec::new()
812 }
813 }
814 }
815 Ok(_) => Vec::new(),
816 Err(err) => {
817 warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
818 Vec::new()
819 }
820 };
821
822 info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
823 Ok(Some(MacosCentralLink::new(
824 peripheral,
825 rx_char,
826 notifications,
827 initial_frames,
828 )))
829 }
830
831 struct FrameDecoder {
832 buffer: Vec<u8>,
833 }
834
835 impl FrameDecoder {
836 fn new() -> Self {
837 Self { buffer: Vec::new() }
838 }
839
840 fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
841 self.buffer.extend_from_slice(chunk);
842 let mut frames = Vec::new();
843 loop {
844 if self.buffer.len() < 5 {
845 break;
846 }
847 let kind = self.buffer[0];
848 let len = u32::from_be_bytes([
849 self.buffer[1],
850 self.buffer[2],
851 self.buffer[3],
852 self.buffer[4],
853 ]) as usize;
854 if self.buffer.len() < 5 + len {
855 break;
856 }
857 let payload = self.buffer[5..5 + len].to_vec();
858 self.buffer.drain(..5 + len);
859 let frame = match kind {
860 1 => BluetoothFrame::Text(String::from_utf8(payload)?),
861 2 => BluetoothFrame::Binary(payload),
862 _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
863 };
864 frames.push(frame);
865 }
866 Ok(frames)
867 }
868 }
869
870 fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
871 match frame {
872 BluetoothFrame::Text(text) => {
873 let payload = text.into_bytes();
874 let mut out = Vec::with_capacity(5 + payload.len());
875 out.push(1);
876 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
877 out.extend_from_slice(&payload);
878 out
879 }
880 BluetoothFrame::Binary(payload) => {
881 let mut out = Vec::with_capacity(5 + payload.len());
882 out.push(2);
883 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
884 out.extend_from_slice(&payload);
885 out
886 }
887 }
888 }
889
890 struct MacosCentralLink {
891 peripheral: Peripheral,
892 rx_char: Characteristic,
893 open: std::sync::atomic::AtomicBool,
894 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
895 }
896
897 impl MacosCentralLink {
898 fn new(
899 peripheral: Peripheral,
900 rx_char: Characteristic,
901 mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
902 initial_frames: Vec<BluetoothFrame>,
903 ) -> Arc<Self> {
904 let (tx, rx) = mpsc::channel(64);
905 let link = Arc::new(Self {
906 peripheral,
907 rx_char,
908 open: std::sync::atomic::AtomicBool::new(true),
909 rx: Mutex::new(rx),
910 });
911
912 for frame in initial_frames {
913 let _ = tx.try_send(frame);
914 }
915
916 let weak = Arc::downgrade(&link);
917 tokio::spawn(async move {
918 let mut decoder = FrameDecoder::new();
919 while let Some(notification) = notifications.next().await {
920 let Ok(frames) = decoder.push(¬ification.value) else {
921 break;
922 };
923 for frame in frames {
924 if tx.send(frame).await.is_err() {
925 break;
926 }
927 }
928 }
929 if let Some(link) = weak.upgrade() {
930 link.open.store(false, std::sync::atomic::Ordering::Relaxed);
931 }
932 });
933
934 link
935 }
936 }
937
938 #[async_trait]
939 impl BluetoothLink for MacosCentralLink {
940 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
941 let bytes = encode_frame(frame);
942 for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
943 self.peripheral
944 .write(&self.rx_char, chunk, WriteType::WithResponse)
945 .await?;
946 }
947 Ok(())
948 }
949
950 async fn recv(&self) -> Option<BluetoothFrame> {
951 self.rx.lock().await.recv().await
952 }
953
954 fn is_open(&self) -> bool {
955 self.open.load(std::sync::atomic::Ordering::Relaxed)
956 }
957
958 async fn close(&self) -> Result<()> {
959 self.open.store(false, std::sync::atomic::Ordering::Relaxed);
960 let _ = self.peripheral.disconnect().await;
961 Ok(())
962 }
963 }
964}