Skip to main content

hashtree_cli/webrtc/
bluetooth.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::BTreeSet;
4#[cfg(target_os = "macos")]
5use std::collections::HashMap;
6use std::sync::{Arc, OnceLock};
7use std::time::{Duration, Instant};
8use tokio::sync::mpsc;
9use tracing::{debug, info, warn};
10
11use crate::nostr_relay::NostrRelay;
12
13use super::bluetooth_peer::{BluetoothFrame, BluetoothLink};
14use super::peer::ContentStore;
15use super::session::MeshPeer;
16use super::signaling::{
17    ConnectionState, PeerClassifier, PeerEntry, PeerSignalPath, PeerTransport, WebRTCState,
18};
19use super::types::{MeshNostrFrame, PeerDirection, PeerId, PeerPool, PoolSettings};
20
21#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
22pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28// Keep BLE chunks comfortably below conservative cross-platform GATT write budgets.
29pub const HTREE_BLE_CHUNK_BYTES: usize = 64;
30const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
31
32/// Configuration for the optional Bluetooth peer transport.
33#[derive(Debug, Clone)]
34pub struct BluetoothConfig {
35    pub enabled: bool,
36    pub max_peers: usize,
37}
38
39impl BluetoothConfig {
40    pub fn is_enabled(&self) -> bool {
41        self.enabled && self.max_peers > 0
42    }
43}
44
45impl Default for BluetoothConfig {
46    fn default() -> Self {
47        Self {
48            enabled: false,
49            max_peers: 0,
50        }
51    }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum BluetoothBackendState {
56    Disabled,
57    Running,
58    Unsupported,
59}
60
61#[derive(Clone)]
62pub struct PendingBluetoothLink {
63    pub link: Arc<dyn BluetoothLink>,
64    pub direction: PeerDirection,
65    pub local_hello_sent: bool,
66    pub peer_hint: Option<String>,
67}
68
69#[async_trait]
70pub trait MobileBluetoothBridge: Send + Sync {
71    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
72}
73
74static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
75
76pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
77    MOBILE_BLUETOOTH_BRIDGE
78        .set(bridge)
79        .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
80}
81
82fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
83    MOBILE_BLUETOOTH_BRIDGE.get().cloned()
84}
85
86#[derive(Clone)]
87pub struct BluetoothPeerRegistrar {
88    state: Arc<WebRTCState>,
89    peer_classifier: PeerClassifier,
90    pools: PoolSettings,
91    max_bluetooth_peers: usize,
92}
93
94impl BluetoothPeerRegistrar {
95    pub fn new(
96        state: Arc<WebRTCState>,
97        peer_classifier: PeerClassifier,
98        pools: PoolSettings,
99        max_bluetooth_peers: usize,
100    ) -> Self {
101        Self {
102            state,
103            peer_classifier,
104            pools,
105            max_bluetooth_peers,
106        }
107    }
108
109    async fn pool_counts(&self) -> (usize, usize) {
110        let peers = self.state.peers.read().await;
111        let mut follows = 0usize;
112        let mut other = 0usize;
113        for entry in peers.values() {
114            if entry.state != ConnectionState::Connected {
115                continue;
116            }
117            match entry.pool {
118                PeerPool::Follows => follows += 1,
119                PeerPool::Other => other += 1,
120            }
121        }
122        (follows, other)
123    }
124
125    async fn bluetooth_peer_count(&self, peer_key: &str) -> usize {
126        let peers = self.state.peers.read().await;
127        peers
128            .values()
129            .filter(|entry| entry.transport == PeerTransport::Bluetooth)
130            .filter(|entry| entry.state == ConnectionState::Connected)
131            .filter(|entry| entry.peer_id.to_string() != peer_key)
132            .count()
133    }
134
135    pub async fn register_connected_peer(
136        &self,
137        peer_id: PeerId,
138        direction: PeerDirection,
139        peer: MeshPeer,
140    ) -> bool {
141        let peer_key = peer_id.to_string();
142        let pool = (self.peer_classifier)(&peer_id.pubkey);
143        let (follows, other) = self.pool_counts().await;
144        let can_accept_pool = match pool {
145            PeerPool::Follows => follows < self.pools.follows.max_connections,
146            PeerPool::Other => other < self.pools.other.max_connections,
147        };
148        if !can_accept_pool {
149            warn!(
150                "Bluetooth peer {} rejected because pool {:?} is full",
151                peer_id.short(),
152                pool
153            );
154            return false;
155        }
156
157        if self.max_bluetooth_peers == 0
158            || self.bluetooth_peer_count(&peer_key).await >= self.max_bluetooth_peers
159        {
160            warn!(
161                "Bluetooth peer {} rejected because max_bluetooth_peers={} reached",
162                peer_id.short(),
163                self.max_bluetooth_peers
164            );
165            return false;
166        }
167
168        let mut peers = self.state.peers.write().await;
169        let duplicate_keys = peers
170            .iter()
171            .filter(|(key, entry)| {
172                key.as_str() != peer_key
173                    && entry.transport == PeerTransport::Bluetooth
174                    && entry.peer_id.pubkey == peer_id.pubkey
175            })
176            .map(|(key, _)| key.clone())
177            .collect::<Vec<_>>();
178        let was_connected = peers
179            .get(&peer_key)
180            .map(|entry| entry.state == ConnectionState::Connected)
181            .unwrap_or(false);
182        let replaced = peers.insert(
183            peer_key,
184            PeerEntry {
185                peer_id,
186                direction,
187                state: ConnectionState::Connected,
188                last_seen: Instant::now(),
189                peer: Some(peer),
190                pool,
191                transport: PeerTransport::Bluetooth,
192                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
193                bytes_sent: 0,
194                bytes_received: 0,
195            },
196        );
197        let removed_duplicates = duplicate_keys
198            .into_iter()
199            .filter_map(|key| peers.remove(&key))
200            .collect::<Vec<_>>();
201        drop(peers);
202
203        if let Some(previous) = replaced.and_then(|entry| entry.peer) {
204            let _ = previous.close().await;
205        }
206        for duplicate in &removed_duplicates {
207            if let Some(peer) = duplicate.peer.as_ref() {
208                let _ = peer.close().await;
209            }
210        }
211
212        let removed_connected_duplicates = removed_duplicates
213            .iter()
214            .filter(|entry| entry.state == ConnectionState::Connected)
215            .count() as isize;
216        let connected_delta =
217            1isize - if was_connected { 1 } else { 0 } - removed_connected_duplicates;
218        if connected_delta > 0 {
219            self.state.connected_count.fetch_add(
220                connected_delta as usize,
221                std::sync::atomic::Ordering::Relaxed,
222            );
223        } else if connected_delta < 0 {
224            self.state.connected_count.fetch_sub(
225                (-connected_delta) as usize,
226                std::sync::atomic::Ordering::Relaxed,
227            );
228        }
229        true
230    }
231
232    pub async fn unregister_peer(&self, peer_id: &PeerId) {
233        let peer_key = peer_id.to_string();
234        let removed = self.state.peers.write().await.remove(&peer_key);
235        if let Some(entry) = removed {
236            if entry.state == ConnectionState::Connected {
237                self.state
238                    .connected_count
239                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
240            }
241            if let Some(peer) = entry.peer {
242                let _ = peer.close().await;
243            }
244        }
245    }
246
247    pub async fn unregister_bluetooth_peer_if_current(
248        &self,
249        peer_id: &PeerId,
250        expected_peer: &Arc<super::BluetoothPeer>,
251    ) {
252        let peer_key = peer_id.to_string();
253        let removed = {
254            let mut peers = self.state.peers.write().await;
255            let matches_current = peers
256                .get(&peer_key)
257                .and_then(|entry| entry.peer.as_ref())
258                .and_then(|peer| match peer {
259                    MeshPeer::Bluetooth(current) => Some(Arc::ptr_eq(current, expected_peer)),
260                    _ => None,
261                })
262                .unwrap_or(false);
263            if matches_current {
264                peers.remove(&peer_key)
265            } else {
266                None
267            }
268        };
269        if let Some(entry) = removed {
270            if entry.state == ConnectionState::Connected {
271                self.state
272                    .connected_count
273                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
274            }
275            if let Some(peer) = entry.peer {
276                let _ = peer.close().await;
277            }
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::webrtc::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
286    use crate::webrtc::session::{MeshPeer, TestMeshPeer};
287
288    #[tokio::test]
289    async fn register_connected_peer_closes_replaced_session() {
290        let state = Arc::new(WebRTCState::new());
291        let registrar = BluetoothPeerRegistrar::new(
292            state.clone(),
293            Arc::new(|_| PeerPool::Other),
294            PoolSettings::default(),
295            2,
296        );
297
298        let peer_id = PeerId::new("peer-pub".to_string());
299        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
300        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
301        let first_ref = first.mock_ref().expect("mock peer").clone();
302
303        assert!(
304            registrar
305                .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
306                .await
307        );
308        assert!(
309            registrar
310                .register_connected_peer(peer_id, PeerDirection::Outbound, second)
311                .await
312        );
313
314        assert!(first_ref.is_closed());
315    }
316
317    #[tokio::test]
318    async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
319        let state = Arc::new(WebRTCState::new());
320        let registrar = BluetoothPeerRegistrar::new(
321            state.clone(),
322            Arc::new(|_| PeerPool::Other),
323            PoolSettings::default(),
324            2,
325        );
326
327        let first_peer_id = PeerId::new("peer-pub".to_string());
328        let second_peer_id = PeerId::new("peer-pub".to_string());
329        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
330        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
331        let first_ref = first.mock_ref().expect("mock peer").clone();
332
333        assert!(
334            registrar
335                .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
336                .await
337        );
338        assert!(
339            registrar
340                .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
341                .await
342        );
343
344        assert!(first_ref.is_closed());
345        let peers = state.peers.read().await;
346        assert!(peers.contains_key(&second_peer_id.to_string()));
347        assert_eq!(peers.len(), 1);
348        assert_eq!(
349            state
350                .connected_count
351                .load(std::sync::atomic::Ordering::Relaxed),
352            1
353        );
354    }
355
356    #[tokio::test]
357    async fn handle_pending_link_unregisters_peer_after_transport_closes() {
358        let (link_a, link_b) = MockBluetoothLink::pair();
359        let state = Arc::new(WebRTCState::new());
360        let registrar = BluetoothPeerRegistrar::new(
361            state.clone(),
362            Arc::new(|_| PeerPool::Other),
363            PoolSettings::default(),
364            2,
365        );
366        let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
367        let local_peer_id = PeerId::new("local-pub".to_string());
368        let remote_peer_id = PeerId::new("remote-pub".to_string());
369        let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
370
371        send_hello(&remote_link, &remote_peer_id)
372            .await
373            .expect("send hello");
374
375        let accepted = handle_pending_link(
376            PendingBluetoothLink {
377                link: link_a.clone(),
378                direction: PeerDirection::Outbound,
379                local_hello_sent: false,
380                peer_hint: Some("mock-ble".to_string()),
381            },
382            BluetoothRuntimeContext {
383                my_peer_id: local_peer_id,
384                store: None,
385                nostr_relay: None,
386                mesh_frame_tx,
387                registrar: registrar.clone(),
388            },
389        )
390        .await
391        .expect("handle pending link");
392
393        assert!(accepted);
394        assert!(state
395            .peers
396            .read()
397            .await
398            .contains_key(&remote_peer_id.to_string()));
399
400        link_a.close().await.expect("close local transport");
401
402        tokio::time::timeout(Duration::from_secs(2), async {
403            loop {
404                if !state
405                    .peers
406                    .read()
407                    .await
408                    .contains_key(&remote_peer_id.to_string())
409                {
410                    break;
411                }
412                tokio::time::sleep(Duration::from_millis(25)).await;
413            }
414        })
415        .await
416        .expect("peer should be unregistered");
417    }
418}
419
420#[derive(Clone)]
421pub struct BluetoothRuntimeContext {
422    pub my_peer_id: PeerId,
423    pub store: Option<Arc<dyn ContentStore>>,
424    pub nostr_relay: Option<Arc<NostrRelay>>,
425    pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
426    pub registrar: BluetoothPeerRegistrar,
427}
428
429/// Native Bluetooth backend. Nearby BLE links become transport-native mesh peers.
430pub struct BluetoothMesh {
431    config: BluetoothConfig,
432}
433
434impl BluetoothMesh {
435    pub fn new(config: BluetoothConfig) -> Self {
436        Self { config }
437    }
438
439    pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
440        if !self.config.is_enabled() {
441            info!("Bluetooth transport disabled");
442            return BluetoothBackendState::Disabled;
443        }
444
445        let mut started = false;
446
447        if let Some(bridge) = mobile_bluetooth_bridge() {
448            info!(
449                "Starting mobile Bluetooth bridge for peer {}",
450                context.my_peer_id.short()
451            );
452            match bridge.start(context.my_peer_id.to_string()).await {
453                Ok(mut rx) => {
454                    info!("Mobile Bluetooth bridge is running");
455                    let ctx = context.clone();
456                    tokio::spawn(async move {
457                        while let Some(link) = rx.recv().await {
458                            if let Err(err) = handle_pending_link(link, ctx.clone()).await {
459                                warn!("Mobile Bluetooth link failed: {}", err);
460                            }
461                        }
462                    });
463                    started = true;
464                }
465                Err(err) => {
466                    warn!("Failed to start mobile Bluetooth bridge: {}", err);
467                }
468            }
469        }
470
471        #[cfg(target_os = "macos")]
472        {
473            if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
474            {
475                warn!("Failed to start macOS Bluetooth central: {}", err);
476            } else {
477                started = true;
478            }
479        }
480
481        if started {
482            BluetoothBackendState::Running
483        } else {
484            warn!(
485                "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
486                self.config.max_peers
487            );
488            BluetoothBackendState::Unsupported
489        }
490    }
491}
492
493async fn handle_pending_link(
494    link: PendingBluetoothLink,
495    context: BluetoothRuntimeContext,
496) -> Result<bool> {
497    if let Some(peer_hint) = link.peer_hint.as_deref() {
498        debug!("Handling pending Bluetooth link {}", peer_hint);
499    }
500    info!(
501        "Handling pending Bluetooth link direction={} local_hello_sent={}",
502        link.direction, link.local_hello_sent
503    );
504    let remote_peer_id = receive_remote_hello(&link.link).await?;
505    info!(
506        "Received Bluetooth hello from {} while attaching pending link",
507        remote_peer_id.short()
508    );
509    if !link.local_hello_sent {
510        send_hello(&link.link, &context.my_peer_id).await?;
511        info!(
512            "Sent Bluetooth hello to {} while attaching pending link",
513            remote_peer_id.short()
514        );
515    }
516
517    let bluetooth_peer = super::BluetoothPeer::new(
518        remote_peer_id.clone(),
519        link.direction,
520        link.link,
521        context.store.clone(),
522        context.nostr_relay.clone(),
523        Some(context.mesh_frame_tx.clone()),
524        Some(context.registrar.state.clone()),
525    );
526    let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
527
528    if !context
529        .registrar
530        .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
531        .await
532    {
533        warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
534        bluetooth_peer.close().await?;
535        return Ok(false);
536    } else {
537        info!("Bluetooth peer {} connected", remote_peer_id.short());
538        spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
539    }
540    Ok(true)
541}
542
543fn spawn_bluetooth_disconnect_watch(
544    peer_id: PeerId,
545    peer: Arc<super::BluetoothPeer>,
546    registrar: BluetoothPeerRegistrar,
547) {
548    tokio::spawn(async move {
549        loop {
550            if !peer.is_connected() {
551                registrar
552                    .unregister_bluetooth_peer_if_current(&peer_id, &peer)
553                    .await;
554                break;
555            }
556            tokio::time::sleep(Duration::from_millis(250)).await;
557        }
558    });
559}
560
561#[derive(serde::Serialize, serde::Deserialize)]
562#[serde(rename_all = "camelCase")]
563struct BluetoothHello {
564    #[serde(rename = "type")]
565    kind: String,
566    peer_id: String,
567}
568
569async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
570    let hello = BluetoothHello {
571        kind: "hello".to_string(),
572        peer_id: my_peer_id.to_string(),
573    };
574    link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
575        .await
576}
577
578async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
579    let result = tokio::time::timeout(HELLO_TIMEOUT, async {
580        loop {
581            match link.recv().await {
582                Some(BluetoothFrame::Text(text)) => {
583                    if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
584                        if hello.kind == "hello" {
585                            return PeerId::from_string(&hello.peer_id)
586                                .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
587                        }
588                    }
589                }
590                Some(BluetoothFrame::Binary(_)) => {}
591                None => return Err(anyhow!("bluetooth link closed before hello")),
592            }
593        }
594    })
595    .await;
596
597    match result {
598        Ok(peer_id) => peer_id,
599        Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
600    }
601}
602
603#[cfg(target_os = "macos")]
604mod macos {
605    use super::*;
606    use btleplug::api::{
607        Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
608        ScanFilter, ValueNotification, WriteType,
609    };
610    use btleplug::platform::{Manager, Peripheral};
611    use futures::StreamExt;
612    use std::collections::HashSet;
613    use tokio::sync::Mutex;
614    use uuid::Uuid;
615
616    const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
617
618    #[derive(Clone)]
619    struct AdvertisementSnapshot {
620        last_seen: Instant,
621        peer_hint: Option<String>,
622    }
623
624    pub(super) async fn start_macos_central(
625        config: BluetoothConfig,
626        context: BluetoothRuntimeContext,
627    ) -> Result<()> {
628        struct ConnectedPeripheral {
629            peripheral: Peripheral,
630            link: Arc<dyn BluetoothLink>,
631            peer_hint: Option<String>,
632        }
633
634        let manager = Manager::new().await?;
635        let adapters = manager.adapters().await?;
636        let Some(adapter) = adapters.into_iter().next() else {
637            warn!("No Bluetooth adapters available on macOS");
638            return Ok(());
639        };
640        info!(
641            "Starting macOS Bluetooth central for peer {} (max_peers={})",
642            context.my_peer_id.short(),
643            config.max_peers
644        );
645        let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
646        let mut events = adapter.events().await?;
647        let fresh_advertisements =
648            Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
649        let fresh_advertisements_for_events = fresh_advertisements.clone();
650        tokio::spawn(async move {
651            while let Some(event) = events.next().await {
652                match event {
653                    CentralEvent::ServicesAdvertisement { id, services }
654                        if services.contains(&service_uuid) =>
655                    {
656                        let mut advertisements = fresh_advertisements_for_events.lock().await;
657                        let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
658                            AdvertisementSnapshot {
659                                last_seen: Instant::now(),
660                                peer_hint: None,
661                            }
662                        });
663                        entry.last_seen = Instant::now();
664                    }
665                    CentralEvent::ServiceDataAdvertisement { id, service_data } => {
666                        if let Some(data) = service_data.get(&service_uuid) {
667                            let mut advertisements = fresh_advertisements_for_events.lock().await;
668                            let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
669                                AdvertisementSnapshot {
670                                    last_seen: Instant::now(),
671                                    peer_hint: None,
672                                }
673                            });
674                            entry.last_seen = Instant::now();
675                            if !data.is_empty() {
676                                entry.peer_hint = Some(hex::encode(data));
677                            }
678                        }
679                    }
680                    CentralEvent::DeviceDisconnected(id) => {
681                        fresh_advertisements_for_events
682                            .lock()
683                            .await
684                            .remove(&id.to_string());
685                    }
686                    _ => {}
687                }
688            }
689        });
690
691        tokio::spawn(async move {
692            let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
693            loop {
694                if let Err(err) = adapter
695                    .start_scan(ScanFilter {
696                        services: vec![service_uuid],
697                    })
698                    .await
699                {
700                    warn!("Failed to start BLE scan: {}", err);
701                    tokio::time::sleep(Duration::from_secs(5)).await;
702                    continue;
703                }
704                tokio::time::sleep(Duration::from_secs(2)).await;
705
706                let peripherals = match adapter.peripherals().await {
707                    Ok(peripherals) => peripherals,
708                    Err(err) => {
709                        warn!("Failed to list BLE peripherals: {}", err);
710                        tokio::time::sleep(Duration::from_secs(5)).await;
711                        continue;
712                    }
713                };
714
715                connected.retain(|_, connection| {
716                    futures::executor::block_on(async {
717                        let peripheral_connected =
718                            connection.peripheral.is_connected().await.unwrap_or(false);
719                        let link_open = connection.link.is_open();
720                        if !peripheral_connected || !link_open {
721                            if peripheral_connected {
722                                let _ = connection.peripheral.disconnect().await;
723                            }
724                            false
725                        } else {
726                            true
727                        }
728                    })
729                });
730
731                let advertisement_snapshot = {
732                    let now = Instant::now();
733                    let mut seen = fresh_advertisements.lock().await;
734                    seen.retain(|_, advertisement| {
735                        now.saturating_duration_since(advertisement.last_seen)
736                            <= FRESH_ADVERTISEMENT_WINDOW
737                    });
738                    seen.clone()
739                };
740
741                let mut candidates = Vec::new();
742                for peripheral in peripherals {
743                    let peripheral_id = peripheral.id().to_string();
744                    if connected.contains_key(&peripheral_id) {
745                        continue;
746                    }
747                    let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
748                    else {
749                        debug!(
750                            "Skipping cached BLE peripheral {} without a fresh advertisement",
751                            peripheral_id
752                        );
753                        continue;
754                    };
755                    let properties = match peripheral.properties().await {
756                        Ok(Some(properties)) => properties,
757                        _ => continue,
758                    };
759                    if !properties.services.contains(&service_uuid) {
760                        continue;
761                    }
762                    candidates.push((peripheral, peripheral_id, advertisement));
763                }
764
765                candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
766                let mut represented_peer_hints = connected
767                    .values()
768                    .filter_map(|connection| connection.peer_hint.clone())
769                    .collect::<HashSet<_>>();
770
771                for (peripheral, peripheral_id, advertisement) in candidates {
772                    if connected.len() >= config.max_peers {
773                        break;
774                    }
775                    if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
776                        if !represented_peer_hints.insert(peer_hint.clone()) {
777                            debug!(
778                                "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
779                                peripheral_id,
780                                peer_hint
781                            );
782                            continue;
783                        }
784                    }
785                    info!(
786                        "Discovered BLE peripheral {} advertising htree service{}",
787                        peripheral_id,
788                        advertisement
789                            .peer_hint
790                            .as_ref()
791                            .map(|hint| format!(" (peer hint {})", hint))
792                            .unwrap_or_default()
793                    );
794                    match connect_peripheral(peripheral.clone()).await {
795                        Ok(Some(link)) => {
796                            let tracked_link = link.clone();
797                            let pending = PendingBluetoothLink {
798                                link,
799                                direction: PeerDirection::Outbound,
800                                local_hello_sent: false,
801                                peer_hint: advertisement
802                                    .peer_hint
803                                    .clone()
804                                    .or(Some(peripheral_id.clone())),
805                            };
806                            match handle_pending_link(pending, context.clone()).await {
807                                Ok(true) => {
808                                    connected.insert(
809                                        peripheral_id.clone(),
810                                        ConnectedPeripheral {
811                                            peripheral,
812                                            link: tracked_link,
813                                            peer_hint: advertisement.peer_hint.clone(),
814                                        },
815                                    );
816                                }
817                                Ok(false) => {}
818                                Err(err) => {
819                                    warn!("Failed to attach BLE peripheral: {}", err);
820                                }
821                            }
822                        }
823                        Ok(None) => {}
824                        Err(err) => {
825                            warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
826                        }
827                    }
828                }
829
830                tokio::time::sleep(Duration::from_secs(5)).await;
831            }
832        });
833
834        Ok(())
835    }
836
837    async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
838        let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
839        let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
840        let peripheral_id = peripheral.id().to_string();
841
842        let mut connect_timed_out = false;
843        if !peripheral.is_connected().await? {
844            info!("Connecting to BLE peripheral {}", peripheral_id);
845            match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
846                Ok(Ok(())) => {}
847                Ok(Err(err)) => return Err(err.into()),
848                Err(_) => {
849                    warn!(
850                        "BLE connect timed out for {}; probing services before giving up",
851                        peripheral_id
852                    );
853                    connect_timed_out = true;
854                }
855            }
856        }
857        if connect_timed_out && !peripheral.is_connected().await.unwrap_or(false) {
858            warn!(
859                "BLE peripheral {} never reached a connected state after timeout; retrying later",
860                peripheral_id
861            );
862            let _ = peripheral.disconnect().await;
863            return Ok(None);
864        }
865        tokio::time::sleep(Duration::from_millis(300)).await;
866        let mut rx_char = None;
867        let mut tx_char = None;
868        for attempt in 1..=8 {
869            info!(
870                "Discovering BLE services for {} (attempt {}/8)",
871                peripheral_id, attempt
872            );
873            if let Err(err) = peripheral.discover_services().await {
874                if attempt == 8 {
875                    if connect_timed_out {
876                        warn!(
877                            "BLE service discovery failed for {} after soft connect timeout: {}",
878                            peripheral_id, err
879                        );
880                        let _ = peripheral.disconnect().await;
881                        return Ok(None);
882                    }
883                    return Err(err.into());
884                }
885                warn!(
886                    "BLE service discovery attempt {}/8 failed for {}: {}",
887                    attempt, peripheral_id, err
888                );
889                tokio::time::sleep(Duration::from_millis(500)).await;
890                continue;
891            }
892
893            let characteristics = peripheral.characteristics();
894            if !characteristics.is_empty() {
895                let uuids = characteristics
896                    .iter()
897                    .map(|c| c.uuid.to_string())
898                    .collect::<Vec<_>>()
899                    .join(", ");
900                info!(
901                    "BLE peripheral {} characteristics: {}",
902                    peripheral_id, uuids
903                );
904            }
905            rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
906            tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
907            if rx_char.is_some() && tx_char.is_some() {
908                break;
909            }
910            tokio::time::sleep(Duration::from_millis(500)).await;
911        }
912        let Some(rx_char) = rx_char else {
913            warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
914            let _ = peripheral.disconnect().await;
915            return Ok(None);
916        };
917        let Some(tx_char) = tx_char else {
918            warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
919            let _ = peripheral.disconnect().await;
920            return Ok(None);
921        };
922        if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
923            warn!(
924                "BLE peripheral {} TX characteristic is missing NOTIFY",
925                peripheral_id
926            );
927            let _ = peripheral.disconnect().await;
928            return Ok(None);
929        }
930        let notifications = peripheral.notifications().await?;
931        info!("Subscribing to BLE notifications for {}", peripheral_id);
932        peripheral.subscribe(&tx_char).await?;
933
934        let initial_frames = match peripheral.read(&tx_char).await {
935            Ok(bytes) if !bytes.is_empty() => {
936                info!(
937                    "Read initial BLE hello bytes from {} ({} bytes)",
938                    peripheral_id,
939                    bytes.len()
940                );
941                let mut decoder = FrameDecoder::new();
942                match decoder.push(&bytes) {
943                    Ok(frames) => frames,
944                    Err(err) => {
945                        warn!(
946                            "Discarding malformed initial BLE frame from {}: {}",
947                            peripheral_id, err
948                        );
949                        Vec::new()
950                    }
951                }
952            }
953            Ok(_) => Vec::new(),
954            Err(err) => {
955                warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
956                Vec::new()
957            }
958        };
959
960        info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
961        Ok(Some(MacosCentralLink::new(
962            peripheral,
963            rx_char,
964            notifications,
965            initial_frames,
966        )))
967    }
968
969    struct FrameDecoder {
970        buffer: Vec<u8>,
971    }
972
973    impl FrameDecoder {
974        fn new() -> Self {
975            Self { buffer: Vec::new() }
976        }
977
978        fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
979            self.buffer.extend_from_slice(chunk);
980            let mut frames = Vec::new();
981            loop {
982                if self.buffer.len() < 5 {
983                    break;
984                }
985                let kind = self.buffer[0];
986                let len = u32::from_be_bytes([
987                    self.buffer[1],
988                    self.buffer[2],
989                    self.buffer[3],
990                    self.buffer[4],
991                ]) as usize;
992                if self.buffer.len() < 5 + len {
993                    break;
994                }
995                let payload = self.buffer[5..5 + len].to_vec();
996                self.buffer.drain(..5 + len);
997                let frame = match kind {
998                    1 => BluetoothFrame::Text(String::from_utf8(payload)?),
999                    2 => BluetoothFrame::Binary(payload),
1000                    _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
1001                };
1002                frames.push(frame);
1003            }
1004            Ok(frames)
1005        }
1006    }
1007
1008    fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
1009        match frame {
1010            BluetoothFrame::Text(text) => {
1011                let payload = text.into_bytes();
1012                let mut out = Vec::with_capacity(5 + payload.len());
1013                out.push(1);
1014                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1015                out.extend_from_slice(&payload);
1016                out
1017            }
1018            BluetoothFrame::Binary(payload) => {
1019                let mut out = Vec::with_capacity(5 + payload.len());
1020                out.push(2);
1021                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1022                out.extend_from_slice(&payload);
1023                out
1024            }
1025        }
1026    }
1027
1028    struct MacosCentralLink {
1029        peripheral: Peripheral,
1030        rx_char: Characteristic,
1031        open: std::sync::atomic::AtomicBool,
1032        rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
1033    }
1034
1035    impl MacosCentralLink {
1036        fn new(
1037            peripheral: Peripheral,
1038            rx_char: Characteristic,
1039            mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
1040            initial_frames: Vec<BluetoothFrame>,
1041        ) -> Arc<Self> {
1042            let (tx, rx) = mpsc::channel(64);
1043            let link = Arc::new(Self {
1044                peripheral,
1045                rx_char,
1046                open: std::sync::atomic::AtomicBool::new(true),
1047                rx: Mutex::new(rx),
1048            });
1049
1050            for frame in initial_frames {
1051                let _ = tx.try_send(frame);
1052            }
1053
1054            let weak = Arc::downgrade(&link);
1055            tokio::spawn(async move {
1056                let mut decoder = FrameDecoder::new();
1057                while let Some(notification) = notifications.next().await {
1058                    let Ok(frames) = decoder.push(&notification.value) else {
1059                        break;
1060                    };
1061                    for frame in frames {
1062                        if tx.send(frame).await.is_err() {
1063                            break;
1064                        }
1065                    }
1066                }
1067                if let Some(link) = weak.upgrade() {
1068                    link.open.store(false, std::sync::atomic::Ordering::Relaxed);
1069                }
1070            });
1071
1072            link
1073        }
1074    }
1075
1076    #[async_trait]
1077    impl BluetoothLink for MacosCentralLink {
1078        async fn send(&self, frame: BluetoothFrame) -> Result<()> {
1079            let bytes = encode_frame(frame);
1080            for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
1081                self.peripheral
1082                    .write(&self.rx_char, chunk, WriteType::WithResponse)
1083                    .await?;
1084            }
1085            Ok(())
1086        }
1087
1088        async fn recv(&self) -> Option<BluetoothFrame> {
1089            self.rx.lock().await.recv().await
1090        }
1091
1092        fn is_open(&self) -> bool {
1093            self.open.load(std::sync::atomic::Ordering::Relaxed)
1094        }
1095
1096        async fn close(&self) -> Result<()> {
1097            self.open.store(false, std::sync::atomic::Ordering::Relaxed);
1098            let _ = self.peripheral.disconnect().await;
1099            Ok(())
1100        }
1101    }
1102}