Skip to main content

hashtree_cli/webrtc/
bluetooth.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::BTreeSet;
4#[cfg(target_os = "macos")]
5use std::collections::HashMap;
6use std::sync::{Arc, OnceLock};
7use std::time::{Duration, Instant};
8use tokio::sync::mpsc;
9use tracing::{debug, info, warn};
10
11use crate::nostr_relay::NostrRelay;
12
13use super::bluetooth_peer::{BluetoothFrame, BluetoothLink};
14use super::peer::ContentStore;
15use super::session::MeshPeer;
16use super::signaling::{
17    ConnectionState, PeerClassifier, PeerEntry, PeerSignalPath, PeerTransport, WebRTCState,
18};
19use super::types::{MeshNostrFrame, PeerDirection, PeerId, PeerPool, PoolSettings};
20
21#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
22pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_CHUNK_BYTES: usize = 180;
29const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
30
31/// Configuration for the optional Bluetooth peer transport.
32#[derive(Debug, Clone)]
33pub struct BluetoothConfig {
34    pub enabled: bool,
35    pub max_peers: usize,
36}
37
38impl BluetoothConfig {
39    pub fn is_enabled(&self) -> bool {
40        self.enabled && self.max_peers > 0
41    }
42}
43
44impl Default for BluetoothConfig {
45    fn default() -> Self {
46        Self {
47            enabled: false,
48            max_peers: 0,
49        }
50    }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum BluetoothBackendState {
55    Disabled,
56    Running,
57    Unsupported,
58}
59
60#[derive(Clone)]
61pub struct PendingBluetoothLink {
62    pub link: Arc<dyn BluetoothLink>,
63    pub direction: PeerDirection,
64    pub local_hello_sent: bool,
65    pub peer_hint: Option<String>,
66}
67
68#[async_trait]
69pub trait MobileBluetoothBridge: Send + Sync {
70    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
71}
72
73static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
74
75pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
76    MOBILE_BLUETOOTH_BRIDGE
77        .set(bridge)
78        .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
79}
80
81fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
82    MOBILE_BLUETOOTH_BRIDGE.get().cloned()
83}
84
85#[derive(Clone)]
86pub struct BluetoothPeerRegistrar {
87    state: Arc<WebRTCState>,
88    peer_classifier: PeerClassifier,
89    pools: PoolSettings,
90    max_bluetooth_peers: usize,
91}
92
93impl BluetoothPeerRegistrar {
94    pub fn new(
95        state: Arc<WebRTCState>,
96        peer_classifier: PeerClassifier,
97        pools: PoolSettings,
98        max_bluetooth_peers: usize,
99    ) -> Self {
100        Self {
101            state,
102            peer_classifier,
103            pools,
104            max_bluetooth_peers,
105        }
106    }
107
108    async fn pool_counts(&self) -> (usize, usize) {
109        let peers = self.state.peers.read().await;
110        let mut follows = 0usize;
111        let mut other = 0usize;
112        for entry in peers.values() {
113            if entry.state != ConnectionState::Connected {
114                continue;
115            }
116            match entry.pool {
117                PeerPool::Follows => follows += 1,
118                PeerPool::Other => other += 1,
119            }
120        }
121        (follows, other)
122    }
123
124    async fn bluetooth_peer_count(&self, peer_key: &str) -> usize {
125        let peers = self.state.peers.read().await;
126        peers
127            .values()
128            .filter(|entry| entry.transport == PeerTransport::Bluetooth)
129            .filter(|entry| entry.state == ConnectionState::Connected)
130            .filter(|entry| entry.peer_id.to_string() != peer_key)
131            .count()
132    }
133
134    pub async fn register_connected_peer(
135        &self,
136        peer_id: PeerId,
137        direction: PeerDirection,
138        peer: MeshPeer,
139    ) -> bool {
140        let peer_key = peer_id.to_string();
141        let pool = (self.peer_classifier)(&peer_id.pubkey);
142        let (follows, other) = self.pool_counts().await;
143        let can_accept_pool = match pool {
144            PeerPool::Follows => follows < self.pools.follows.max_connections,
145            PeerPool::Other => other < self.pools.other.max_connections,
146        };
147        if !can_accept_pool {
148            warn!(
149                "Bluetooth peer {} rejected because pool {:?} is full",
150                peer_id.short(),
151                pool
152            );
153            return false;
154        }
155
156        if self.max_bluetooth_peers == 0
157            || self.bluetooth_peer_count(&peer_key).await >= self.max_bluetooth_peers
158        {
159            warn!(
160                "Bluetooth peer {} rejected because max_bluetooth_peers={} reached",
161                peer_id.short(),
162                self.max_bluetooth_peers
163            );
164            return false;
165        }
166
167        let mut peers = self.state.peers.write().await;
168        let duplicate_keys = peers
169            .iter()
170            .filter(|(key, entry)| {
171                key.as_str() != peer_key
172                    && entry.transport == PeerTransport::Bluetooth
173                    && entry.peer_id.pubkey == peer_id.pubkey
174            })
175            .map(|(key, _)| key.clone())
176            .collect::<Vec<_>>();
177        let was_connected = peers
178            .get(&peer_key)
179            .map(|entry| entry.state == ConnectionState::Connected)
180            .unwrap_or(false);
181        let replaced = peers.insert(
182            peer_key,
183            PeerEntry {
184                peer_id,
185                direction,
186                state: ConnectionState::Connected,
187                last_seen: Instant::now(),
188                peer: Some(peer),
189                pool,
190                transport: PeerTransport::Bluetooth,
191                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
192                bytes_sent: 0,
193                bytes_received: 0,
194            },
195        );
196        let removed_duplicates = duplicate_keys
197            .into_iter()
198            .filter_map(|key| peers.remove(&key))
199            .collect::<Vec<_>>();
200        drop(peers);
201
202        if let Some(previous) = replaced.and_then(|entry| entry.peer) {
203            let _ = previous.close().await;
204        }
205        for duplicate in &removed_duplicates {
206            if let Some(peer) = duplicate.peer.as_ref() {
207                let _ = peer.close().await;
208            }
209        }
210
211        let removed_connected_duplicates = removed_duplicates
212            .iter()
213            .filter(|entry| entry.state == ConnectionState::Connected)
214            .count() as isize;
215        let connected_delta =
216            1isize - if was_connected { 1 } else { 0 } - removed_connected_duplicates;
217        if connected_delta > 0 {
218            self.state.connected_count.fetch_add(
219                connected_delta as usize,
220                std::sync::atomic::Ordering::Relaxed,
221            );
222        } else if connected_delta < 0 {
223            self.state.connected_count.fetch_sub(
224                (-connected_delta) as usize,
225                std::sync::atomic::Ordering::Relaxed,
226            );
227        }
228        true
229    }
230
231    pub async fn unregister_peer(&self, peer_id: &PeerId) {
232        let peer_key = peer_id.to_string();
233        let removed = self.state.peers.write().await.remove(&peer_key);
234        if let Some(entry) = removed {
235            if entry.state == ConnectionState::Connected {
236                self.state
237                    .connected_count
238                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
239            }
240            if let Some(peer) = entry.peer {
241                let _ = peer.close().await;
242            }
243        }
244    }
245
246    pub async fn unregister_bluetooth_peer_if_current(
247        &self,
248        peer_id: &PeerId,
249        expected_peer: &Arc<super::BluetoothPeer>,
250    ) {
251        let peer_key = peer_id.to_string();
252        let removed = {
253            let mut peers = self.state.peers.write().await;
254            let matches_current = peers
255                .get(&peer_key)
256                .and_then(|entry| entry.peer.as_ref())
257                .and_then(|peer| match peer {
258                    MeshPeer::Bluetooth(current) => Some(Arc::ptr_eq(current, expected_peer)),
259                    _ => None,
260                })
261                .unwrap_or(false);
262            if matches_current {
263                peers.remove(&peer_key)
264            } else {
265                None
266            }
267        };
268        if let Some(entry) = removed {
269            if entry.state == ConnectionState::Connected {
270                self.state
271                    .connected_count
272                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
273            }
274            if let Some(peer) = entry.peer {
275                let _ = peer.close().await;
276            }
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::webrtc::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
285    use crate::webrtc::session::{MeshPeer, TestMeshPeer};
286
287    #[tokio::test]
288    async fn register_connected_peer_closes_replaced_session() {
289        let state = Arc::new(WebRTCState::new());
290        let registrar = BluetoothPeerRegistrar::new(
291            state.clone(),
292            Arc::new(|_| PeerPool::Other),
293            PoolSettings::default(),
294            2,
295        );
296
297        let peer_id = PeerId::new("peer-pub".to_string(), Some("session".to_string()));
298        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
299        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
300        let first_ref = first.mock_ref().expect("mock peer").clone();
301
302        assert!(
303            registrar
304                .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
305                .await
306        );
307        assert!(
308            registrar
309                .register_connected_peer(peer_id, PeerDirection::Outbound, second)
310                .await
311        );
312
313        assert!(first_ref.is_closed());
314    }
315
316    #[tokio::test]
317    async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
318        let state = Arc::new(WebRTCState::new());
319        let registrar = BluetoothPeerRegistrar::new(
320            state.clone(),
321            Arc::new(|_| PeerPool::Other),
322            PoolSettings::default(),
323            2,
324        );
325
326        let first_peer_id = PeerId::new("peer-pub".to_string(), Some("session-a".to_string()));
327        let second_peer_id = PeerId::new("peer-pub".to_string(), Some("session-b".to_string()));
328        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
329        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
330        let first_ref = first.mock_ref().expect("mock peer").clone();
331
332        assert!(
333            registrar
334                .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
335                .await
336        );
337        assert!(
338            registrar
339                .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
340                .await
341        );
342
343        assert!(first_ref.is_closed());
344        let peers = state.peers.read().await;
345        assert!(!peers.contains_key(&first_peer_id.to_string()));
346        assert!(peers.contains_key(&second_peer_id.to_string()));
347        assert_eq!(
348            state
349                .connected_count
350                .load(std::sync::atomic::Ordering::Relaxed),
351            1
352        );
353    }
354
355    #[tokio::test]
356    async fn handle_pending_link_unregisters_peer_after_transport_closes() {
357        let (link_a, link_b) = MockBluetoothLink::pair();
358        let state = Arc::new(WebRTCState::new());
359        let registrar = BluetoothPeerRegistrar::new(
360            state.clone(),
361            Arc::new(|_| PeerPool::Other),
362            PoolSettings::default(),
363            2,
364        );
365        let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
366        let local_peer_id = PeerId::new("local-pub".to_string(), Some("local-session".to_string()));
367        let remote_peer_id =
368            PeerId::new("remote-pub".to_string(), Some("remote-session".to_string()));
369        let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
370
371        send_hello(&remote_link, &remote_peer_id)
372            .await
373            .expect("send hello");
374
375        let accepted = handle_pending_link(
376            PendingBluetoothLink {
377                link: link_a.clone(),
378                direction: PeerDirection::Outbound,
379                local_hello_sent: false,
380                peer_hint: Some("mock-ble".to_string()),
381            },
382            BluetoothRuntimeContext {
383                my_peer_id: local_peer_id,
384                store: None,
385                nostr_relay: None,
386                mesh_frame_tx,
387                registrar: registrar.clone(),
388            },
389        )
390        .await
391        .expect("handle pending link");
392
393        assert!(accepted);
394        assert!(state
395            .peers
396            .read()
397            .await
398            .contains_key(&remote_peer_id.to_string()));
399
400        link_a.close().await.expect("close local transport");
401
402        tokio::time::timeout(Duration::from_secs(2), async {
403            loop {
404                if !state
405                    .peers
406                    .read()
407                    .await
408                    .contains_key(&remote_peer_id.to_string())
409                {
410                    break;
411                }
412                tokio::time::sleep(Duration::from_millis(25)).await;
413            }
414        })
415        .await
416        .expect("peer should be unregistered");
417    }
418}
419
420#[derive(Clone)]
421pub struct BluetoothRuntimeContext {
422    pub my_peer_id: PeerId,
423    pub store: Option<Arc<dyn ContentStore>>,
424    pub nostr_relay: Option<Arc<NostrRelay>>,
425    pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
426    pub registrar: BluetoothPeerRegistrar,
427}
428
429/// Native Bluetooth backend. Nearby BLE links become transport-native mesh peers.
430pub struct BluetoothMesh {
431    config: BluetoothConfig,
432}
433
434impl BluetoothMesh {
435    pub fn new(config: BluetoothConfig) -> Self {
436        Self { config }
437    }
438
439    pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
440        if !self.config.is_enabled() {
441            info!("Bluetooth transport disabled");
442            return BluetoothBackendState::Disabled;
443        }
444
445        let mut started = false;
446
447        if let Some(bridge) = mobile_bluetooth_bridge() {
448            info!(
449                "Starting mobile Bluetooth bridge for peer {}",
450                context.my_peer_id.short()
451            );
452            match bridge.start(context.my_peer_id.to_string()).await {
453                Ok(mut rx) => {
454                    info!("Mobile Bluetooth bridge is running");
455                    let ctx = context.clone();
456                    tokio::spawn(async move {
457                        while let Some(link) = rx.recv().await {
458                            if let Err(err) = handle_pending_link(link, ctx.clone()).await {
459                                warn!("Mobile Bluetooth link failed: {}", err);
460                            }
461                        }
462                    });
463                    started = true;
464                }
465                Err(err) => {
466                    warn!("Failed to start mobile Bluetooth bridge: {}", err);
467                }
468            }
469        }
470
471        #[cfg(target_os = "macos")]
472        {
473            if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
474            {
475                warn!("Failed to start macOS Bluetooth central: {}", err);
476            } else {
477                started = true;
478            }
479        }
480
481        if started {
482            BluetoothBackendState::Running
483        } else {
484            warn!(
485                "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
486                self.config.max_peers
487            );
488            BluetoothBackendState::Unsupported
489        }
490    }
491}
492
493async fn handle_pending_link(
494    link: PendingBluetoothLink,
495    context: BluetoothRuntimeContext,
496) -> Result<bool> {
497    if let Some(peer_hint) = link.peer_hint.as_deref() {
498        debug!("Handling pending Bluetooth link {}", peer_hint);
499    }
500    let remote_peer_id = receive_remote_hello(&link.link).await?;
501    if !link.local_hello_sent {
502        send_hello(&link.link, &context.my_peer_id).await?;
503    }
504
505    let bluetooth_peer = super::BluetoothPeer::new(
506        remote_peer_id.clone(),
507        link.direction,
508        link.link,
509        context.store.clone(),
510        context.nostr_relay.clone(),
511        Some(context.mesh_frame_tx.clone()),
512        Some(context.registrar.state.clone()),
513    );
514    let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
515
516    if !context
517        .registrar
518        .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
519        .await
520    {
521        warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
522        bluetooth_peer.close().await?;
523        return Ok(false);
524    } else {
525        info!("Bluetooth peer {} connected", remote_peer_id.short());
526        spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
527    }
528    Ok(true)
529}
530
531fn spawn_bluetooth_disconnect_watch(
532    peer_id: PeerId,
533    peer: Arc<super::BluetoothPeer>,
534    registrar: BluetoothPeerRegistrar,
535) {
536    tokio::spawn(async move {
537        loop {
538            if !peer.is_connected() {
539                registrar
540                    .unregister_bluetooth_peer_if_current(&peer_id, &peer)
541                    .await;
542                break;
543            }
544            tokio::time::sleep(Duration::from_millis(250)).await;
545        }
546    });
547}
548
549#[derive(serde::Serialize, serde::Deserialize)]
550#[serde(rename_all = "camelCase")]
551struct BluetoothHello {
552    #[serde(rename = "type")]
553    kind: String,
554    peer_id: String,
555}
556
557async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
558    let hello = BluetoothHello {
559        kind: "hello".to_string(),
560        peer_id: my_peer_id.to_string(),
561    };
562    link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
563        .await
564}
565
566async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
567    let result = tokio::time::timeout(HELLO_TIMEOUT, async {
568        loop {
569            match link.recv().await {
570                Some(BluetoothFrame::Text(text)) => {
571                    if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
572                        if hello.kind == "hello" {
573                            return PeerId::from_string(&hello.peer_id)
574                                .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
575                        }
576                    }
577                }
578                Some(BluetoothFrame::Binary(_)) => {}
579                None => return Err(anyhow!("bluetooth link closed before hello")),
580            }
581        }
582    })
583    .await;
584
585    match result {
586        Ok(peer_id) => peer_id,
587        Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
588    }
589}
590
591#[cfg(target_os = "macos")]
592mod macos {
593    use super::*;
594    use btleplug::api::{
595        Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
596        ScanFilter, ValueNotification, WriteType,
597    };
598    use btleplug::platform::{Manager, Peripheral};
599    use futures::StreamExt;
600    use std::collections::HashSet;
601    use tokio::sync::Mutex;
602    use uuid::Uuid;
603
604    const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
605
606    #[derive(Clone)]
607    struct AdvertisementSnapshot {
608        last_seen: Instant,
609        peer_hint: Option<String>,
610    }
611
612    pub(super) async fn start_macos_central(
613        config: BluetoothConfig,
614        context: BluetoothRuntimeContext,
615    ) -> Result<()> {
616        struct ConnectedPeripheral {
617            peripheral: Peripheral,
618            link: Arc<dyn BluetoothLink>,
619            peer_hint: Option<String>,
620        }
621
622        let manager = Manager::new().await?;
623        let adapters = manager.adapters().await?;
624        let Some(adapter) = adapters.into_iter().next() else {
625            warn!("No Bluetooth adapters available on macOS");
626            return Ok(());
627        };
628        info!(
629            "Starting macOS Bluetooth central for peer {} (max_peers={})",
630            context.my_peer_id.short(),
631            config.max_peers
632        );
633        let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
634        let mut events = adapter.events().await?;
635        let fresh_advertisements =
636            Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
637        let fresh_advertisements_for_events = fresh_advertisements.clone();
638        tokio::spawn(async move {
639            while let Some(event) = events.next().await {
640                match event {
641                    CentralEvent::ServicesAdvertisement { id, services }
642                        if services.contains(&service_uuid) =>
643                    {
644                        let mut advertisements = fresh_advertisements_for_events.lock().await;
645                        let entry = advertisements
646                            .entry(id.to_string())
647                            .or_insert_with(|| AdvertisementSnapshot {
648                                last_seen: Instant::now(),
649                                peer_hint: None,
650                            });
651                        entry.last_seen = Instant::now();
652                    }
653                    CentralEvent::ServiceDataAdvertisement { id, service_data } => {
654                        if let Some(data) = service_data.get(&service_uuid) {
655                            let mut advertisements = fresh_advertisements_for_events.lock().await;
656                            let entry = advertisements
657                                .entry(id.to_string())
658                                .or_insert_with(|| AdvertisementSnapshot {
659                                    last_seen: Instant::now(),
660                                    peer_hint: None,
661                                });
662                            entry.last_seen = Instant::now();
663                            if !data.is_empty() {
664                                entry.peer_hint = Some(hex::encode(data));
665                            }
666                        }
667                    }
668                    CentralEvent::DeviceDisconnected(id) => {
669                        fresh_advertisements_for_events
670                            .lock()
671                            .await
672                            .remove(&id.to_string());
673                    }
674                    _ => {}
675                }
676            }
677        });
678
679        tokio::spawn(async move {
680            let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
681            loop {
682                if let Err(err) = adapter
683                    .start_scan(ScanFilter {
684                        services: vec![service_uuid],
685                    })
686                    .await
687                {
688                    warn!("Failed to start BLE scan: {}", err);
689                    tokio::time::sleep(Duration::from_secs(5)).await;
690                    continue;
691                }
692                tokio::time::sleep(Duration::from_secs(2)).await;
693
694                let peripherals = match adapter.peripherals().await {
695                    Ok(peripherals) => peripherals,
696                    Err(err) => {
697                        warn!("Failed to list BLE peripherals: {}", err);
698                        tokio::time::sleep(Duration::from_secs(5)).await;
699                        continue;
700                    }
701                };
702
703                connected.retain(|_, connection| {
704                    futures::executor::block_on(async {
705                        let peripheral_connected =
706                            connection.peripheral.is_connected().await.unwrap_or(false);
707                        let link_open = connection.link.is_open();
708                        if !peripheral_connected || !link_open {
709                            if peripheral_connected {
710                                let _ = connection.peripheral.disconnect().await;
711                            }
712                            false
713                        } else {
714                            true
715                        }
716                    })
717                });
718
719                let advertisement_snapshot = {
720                    let now = Instant::now();
721                    let mut seen = fresh_advertisements.lock().await;
722                    seen.retain(|_, advertisement| {
723                        now.saturating_duration_since(advertisement.last_seen)
724                            <= FRESH_ADVERTISEMENT_WINDOW
725                    });
726                    seen.clone()
727                };
728
729                let mut candidates = Vec::new();
730                for peripheral in peripherals {
731                    let peripheral_id = peripheral.id().to_string();
732                    if connected.contains_key(&peripheral_id) {
733                        continue;
734                    }
735                    let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
736                    else {
737                        debug!(
738                            "Skipping cached BLE peripheral {} without a fresh advertisement",
739                            peripheral_id
740                        );
741                        continue;
742                    };
743                    let properties = match peripheral.properties().await {
744                        Ok(Some(properties)) => properties,
745                        _ => continue,
746                    };
747                    if !properties.services.contains(&service_uuid) {
748                        continue;
749                    }
750                    candidates.push((peripheral, peripheral_id, advertisement));
751                }
752
753                candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
754                let mut represented_peer_hints = connected
755                    .values()
756                    .filter_map(|connection| connection.peer_hint.clone())
757                    .collect::<HashSet<_>>();
758
759                for (peripheral, peripheral_id, advertisement) in candidates {
760                    if connected.len() >= config.max_peers {
761                        break;
762                    }
763                    if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
764                        if !represented_peer_hints.insert(peer_hint.clone()) {
765                            debug!(
766                                "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
767                                peripheral_id,
768                                peer_hint
769                            );
770                            continue;
771                        }
772                    }
773                    info!(
774                        "Discovered BLE peripheral {} advertising htree service{}",
775                        peripheral_id,
776                        advertisement
777                            .peer_hint
778                            .as_ref()
779                            .map(|hint| format!(" (peer hint {})", hint))
780                            .unwrap_or_default()
781                    );
782                    match connect_peripheral(peripheral.clone()).await {
783                        Ok(Some(link)) => {
784                            let tracked_link = link.clone();
785                            let pending = PendingBluetoothLink {
786                                link,
787                                direction: PeerDirection::Outbound,
788                                local_hello_sent: false,
789                                peer_hint: advertisement
790                                    .peer_hint
791                                    .clone()
792                                    .or(Some(peripheral_id.clone())),
793                            };
794                            match handle_pending_link(pending, context.clone()).await {
795                                Ok(true) => {
796                                    connected.insert(
797                                        peripheral_id.clone(),
798                                        ConnectedPeripheral {
799                                            peripheral,
800                                            link: tracked_link,
801                                            peer_hint: advertisement.peer_hint.clone(),
802                                        },
803                                    );
804                                }
805                                Ok(false) => {}
806                                Err(err) => {
807                                    warn!("Failed to attach BLE peripheral: {}", err);
808                                }
809                            }
810                        }
811                        Ok(None) => {}
812                        Err(err) => {
813                            warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
814                        }
815                    }
816                }
817
818                tokio::time::sleep(Duration::from_secs(5)).await;
819            }
820        });
821
822        Ok(())
823    }
824
825    async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
826        let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
827        let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
828        let peripheral_id = peripheral.id().to_string();
829
830        let mut connect_timed_out = false;
831        if !peripheral.is_connected().await? {
832            info!("Connecting to BLE peripheral {}", peripheral_id);
833            match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
834                Ok(Ok(())) => {}
835                Ok(Err(err)) => return Err(err.into()),
836                Err(_) => {
837                    warn!(
838                        "BLE connect timed out for {}; probing services before giving up",
839                        peripheral_id
840                    );
841                    connect_timed_out = true;
842                }
843            }
844        }
845        let mut rx_char = None;
846        let mut tx_char = None;
847        for attempt in 1..=8 {
848            info!(
849                "Discovering BLE services for {} (attempt {}/8)",
850                peripheral_id, attempt
851            );
852            if let Err(err) = peripheral.discover_services().await {
853                if attempt == 8 {
854                    if connect_timed_out {
855                        warn!(
856                            "BLE service discovery failed for {} after soft connect timeout: {}",
857                            peripheral_id, err
858                        );
859                        let _ = peripheral.disconnect().await;
860                        return Ok(None);
861                    }
862                    return Err(err.into());
863                }
864                warn!(
865                    "BLE service discovery attempt {}/8 failed for {}: {}",
866                    attempt, peripheral_id, err
867                );
868                tokio::time::sleep(Duration::from_millis(500)).await;
869                continue;
870            }
871
872            let characteristics = peripheral.characteristics();
873            if !characteristics.is_empty() {
874                let uuids = characteristics
875                    .iter()
876                    .map(|c| c.uuid.to_string())
877                    .collect::<Vec<_>>()
878                    .join(", ");
879                info!(
880                    "BLE peripheral {} characteristics: {}",
881                    peripheral_id, uuids
882                );
883            }
884            rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
885            tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
886            if rx_char.is_some() && tx_char.is_some() {
887                break;
888            }
889            tokio::time::sleep(Duration::from_millis(500)).await;
890        }
891        let Some(rx_char) = rx_char else {
892            warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
893            let _ = peripheral.disconnect().await;
894            return Ok(None);
895        };
896        let Some(tx_char) = tx_char else {
897            warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
898            let _ = peripheral.disconnect().await;
899            return Ok(None);
900        };
901        if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
902            warn!(
903                "BLE peripheral {} TX characteristic is missing NOTIFY",
904                peripheral_id
905            );
906            let _ = peripheral.disconnect().await;
907            return Ok(None);
908        }
909        let notifications = peripheral.notifications().await?;
910        info!("Subscribing to BLE notifications for {}", peripheral_id);
911        peripheral.subscribe(&tx_char).await?;
912
913        let initial_frames = match peripheral.read(&tx_char).await {
914            Ok(bytes) if !bytes.is_empty() => {
915                info!(
916                    "Read initial BLE hello bytes from {} ({} bytes)",
917                    peripheral_id,
918                    bytes.len()
919                );
920                let mut decoder = FrameDecoder::new();
921                match decoder.push(&bytes) {
922                    Ok(frames) => frames,
923                    Err(err) => {
924                        warn!(
925                            "Discarding malformed initial BLE frame from {}: {}",
926                            peripheral_id, err
927                        );
928                        Vec::new()
929                    }
930                }
931            }
932            Ok(_) => Vec::new(),
933            Err(err) => {
934                warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
935                Vec::new()
936            }
937        };
938
939        info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
940        Ok(Some(MacosCentralLink::new(
941            peripheral,
942            rx_char,
943            notifications,
944            initial_frames,
945        )))
946    }
947
948    struct FrameDecoder {
949        buffer: Vec<u8>,
950    }
951
952    impl FrameDecoder {
953        fn new() -> Self {
954            Self { buffer: Vec::new() }
955        }
956
957        fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
958            self.buffer.extend_from_slice(chunk);
959            let mut frames = Vec::new();
960            loop {
961                if self.buffer.len() < 5 {
962                    break;
963                }
964                let kind = self.buffer[0];
965                let len = u32::from_be_bytes([
966                    self.buffer[1],
967                    self.buffer[2],
968                    self.buffer[3],
969                    self.buffer[4],
970                ]) as usize;
971                if self.buffer.len() < 5 + len {
972                    break;
973                }
974                let payload = self.buffer[5..5 + len].to_vec();
975                self.buffer.drain(..5 + len);
976                let frame = match kind {
977                    1 => BluetoothFrame::Text(String::from_utf8(payload)?),
978                    2 => BluetoothFrame::Binary(payload),
979                    _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
980                };
981                frames.push(frame);
982            }
983            Ok(frames)
984        }
985    }
986
987    fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
988        match frame {
989            BluetoothFrame::Text(text) => {
990                let payload = text.into_bytes();
991                let mut out = Vec::with_capacity(5 + payload.len());
992                out.push(1);
993                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
994                out.extend_from_slice(&payload);
995                out
996            }
997            BluetoothFrame::Binary(payload) => {
998                let mut out = Vec::with_capacity(5 + payload.len());
999                out.push(2);
1000                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1001                out.extend_from_slice(&payload);
1002                out
1003            }
1004        }
1005    }
1006
1007    struct MacosCentralLink {
1008        peripheral: Peripheral,
1009        rx_char: Characteristic,
1010        open: std::sync::atomic::AtomicBool,
1011        rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
1012    }
1013
1014    impl MacosCentralLink {
1015        fn new(
1016            peripheral: Peripheral,
1017            rx_char: Characteristic,
1018            mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
1019            initial_frames: Vec<BluetoothFrame>,
1020        ) -> Arc<Self> {
1021            let (tx, rx) = mpsc::channel(64);
1022            let link = Arc::new(Self {
1023                peripheral,
1024                rx_char,
1025                open: std::sync::atomic::AtomicBool::new(true),
1026                rx: Mutex::new(rx),
1027            });
1028
1029            for frame in initial_frames {
1030                let _ = tx.try_send(frame);
1031            }
1032
1033            let weak = Arc::downgrade(&link);
1034            tokio::spawn(async move {
1035                let mut decoder = FrameDecoder::new();
1036                while let Some(notification) = notifications.next().await {
1037                    let Ok(frames) = decoder.push(&notification.value) else {
1038                        break;
1039                    };
1040                    for frame in frames {
1041                        if tx.send(frame).await.is_err() {
1042                            break;
1043                        }
1044                    }
1045                }
1046                if let Some(link) = weak.upgrade() {
1047                    link.open.store(false, std::sync::atomic::Ordering::Relaxed);
1048                }
1049            });
1050
1051            link
1052        }
1053    }
1054
1055    #[async_trait]
1056    impl BluetoothLink for MacosCentralLink {
1057        async fn send(&self, frame: BluetoothFrame) -> Result<()> {
1058            let bytes = encode_frame(frame);
1059            for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
1060                self.peripheral
1061                    .write(&self.rx_char, chunk, WriteType::WithResponse)
1062                    .await?;
1063            }
1064            Ok(())
1065        }
1066
1067        async fn recv(&self) -> Option<BluetoothFrame> {
1068            self.rx.lock().await.recv().await
1069        }
1070
1071        fn is_open(&self) -> bool {
1072            self.open.load(std::sync::atomic::Ordering::Relaxed)
1073        }
1074
1075        async fn close(&self) -> Result<()> {
1076            self.open.store(false, std::sync::atomic::Ordering::Relaxed);
1077            let _ = self.peripheral.disconnect().await;
1078            Ok(())
1079        }
1080    }
1081}