Skip to main content

hashtree_network/
bluetooth.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3#[cfg(target_os = "macos")]
4use std::collections::HashMap;
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7#[cfg(target_os = "macos")]
8use std::time::Instant;
9use tokio::sync::mpsc;
10use tracing::{debug, info, warn};
11
12use super::session::MeshPeer;
13#[cfg(test)]
14use super::types::PeerPool;
15use super::types::{MeshNostrFrame, PeerId, PoolSettings};
16use crate::bluetooth_peer::{BluetoothFrame, BluetoothLink, BluetoothPeer};
17use crate::manager::{PeerClassifier, WebRTCState};
18use crate::peer::ContentStore;
19use crate::relay_bridge::SharedMeshRelayClient;
20use crate::runtime_peer::PeerDirection;
21use crate::runtime_peer::{PeerSignalPath, PeerTransport, TransportPeerRegistrar};
22
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
29#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
30// Keep BLE chunks comfortably below conservative cross-platform GATT write budgets.
31pub const HTREE_BLE_CHUNK_BYTES: usize = 64;
32const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
33
34/// Configuration for the optional Bluetooth peer transport.
35#[derive(Debug, Clone)]
36pub struct BluetoothConfig {
37    pub enabled: bool,
38    pub max_peers: usize,
39}
40
41impl BluetoothConfig {
42    pub fn is_enabled(&self) -> bool {
43        self.enabled && self.max_peers > 0
44    }
45}
46
47impl Default for BluetoothConfig {
48    fn default() -> Self {
49        Self {
50            enabled: false,
51            max_peers: 0,
52        }
53    }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum BluetoothBackendState {
58    Disabled,
59    Running,
60    Unsupported,
61}
62
63#[derive(Clone)]
64pub struct PendingBluetoothLink {
65    pub link: Arc<dyn BluetoothLink>,
66    pub direction: PeerDirection,
67    pub local_hello_sent: bool,
68    pub peer_hint: Option<String>,
69}
70
71#[async_trait]
72pub trait MobileBluetoothBridge: Send + Sync {
73    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
74}
75
76static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
77
78pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
79    MOBILE_BLUETOOTH_BRIDGE
80        .set(bridge)
81        .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
82}
83
84fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
85    MOBILE_BLUETOOTH_BRIDGE.get().cloned()
86}
87
88#[derive(Clone)]
89pub struct BluetoothPeerRegistrar {
90    pub state: Arc<WebRTCState>,
91    inner: TransportPeerRegistrar<MeshPeer>,
92}
93
94impl BluetoothPeerRegistrar {
95    pub fn new(
96        state: Arc<WebRTCState>,
97        peer_classifier: PeerClassifier,
98        pools: PoolSettings,
99        max_bluetooth_peers: usize,
100    ) -> Self {
101        Self {
102            inner: TransportPeerRegistrar::new(
103                state.runtime.peers.clone(),
104                state.runtime.connected_count.clone(),
105                peer_classifier,
106                pools,
107                PeerTransport::Bluetooth,
108                PeerSignalPath::Bluetooth,
109                max_bluetooth_peers,
110            ),
111            state,
112        }
113    }
114
115    pub async fn register_connected_peer(
116        &self,
117        peer_id: PeerId,
118        direction: PeerDirection,
119        peer: MeshPeer,
120    ) -> bool {
121        let accepted = self
122            .inner
123            .register_connected_peer(peer_id.clone(), direction, peer)
124            .await;
125        if !accepted {
126            warn!(
127                "Bluetooth peer {} rejected by shared registrar",
128                peer_id.short()
129            );
130        }
131        accepted
132    }
133
134    pub async fn unregister_peer(&self, peer_id: &PeerId) {
135        self.inner.unregister_peer(peer_id).await;
136    }
137
138    pub async fn unregister_bluetooth_peer_if_current(
139        &self,
140        peer_id: &PeerId,
141        expected_peer: &Arc<BluetoothPeer>,
142    ) {
143        self.inner
144            .unregister_peer_if(peer_id, |peer| match peer {
145                MeshPeer::Bluetooth(current) => Arc::ptr_eq(current, expected_peer),
146                _ => false,
147            })
148            .await;
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
156    use crate::session::{MeshPeer, TestMeshPeer};
157
158    #[tokio::test]
159    async fn register_connected_peer_closes_replaced_session() {
160        let state = Arc::new(WebRTCState::new());
161        let registrar = BluetoothPeerRegistrar::new(
162            state.clone(),
163            Arc::new(|_| PeerPool::Other),
164            PoolSettings::default(),
165            2,
166        );
167
168        let peer_id = PeerId::new("peer-pub".to_string());
169        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
170        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
171        let first_ref = first.mock_ref().expect("mock peer").clone();
172
173        assert!(
174            registrar
175                .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
176                .await
177        );
178        assert!(
179            registrar
180                .register_connected_peer(peer_id, PeerDirection::Outbound, second)
181                .await
182        );
183
184        assert!(first_ref.is_closed());
185    }
186
187    #[tokio::test]
188    async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
189        let state = Arc::new(WebRTCState::new());
190        let registrar = BluetoothPeerRegistrar::new(
191            state.clone(),
192            Arc::new(|_| PeerPool::Other),
193            PoolSettings::default(),
194            2,
195        );
196
197        let first_peer_id = PeerId::new("peer-pub".to_string());
198        let second_peer_id = PeerId::new("peer-pub".to_string());
199        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
200        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
201        let first_ref = first.mock_ref().expect("mock peer").clone();
202
203        assert!(
204            registrar
205                .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
206                .await
207        );
208        assert!(
209            registrar
210                .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
211                .await
212        );
213
214        assert!(first_ref.is_closed());
215        let peers = state.runtime.peers.read().await;
216        assert!(peers.contains_key(&second_peer_id.to_string()));
217        assert_eq!(peers.len(), 1);
218        assert_eq!(
219            state
220                .runtime
221                .connected_count
222                .load(std::sync::atomic::Ordering::Relaxed),
223            1
224        );
225    }
226
227    #[tokio::test]
228    async fn handle_pending_link_unregisters_peer_after_transport_closes() {
229        let (link_a, link_b) = MockBluetoothLink::pair();
230        let state = Arc::new(WebRTCState::new());
231        let registrar = BluetoothPeerRegistrar::new(
232            state.clone(),
233            Arc::new(|_| PeerPool::Other),
234            PoolSettings::default(),
235            2,
236        );
237        let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
238        let local_peer_id = PeerId::new("local-pub".to_string());
239        let remote_peer_id = PeerId::new("remote-pub".to_string());
240        let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
241
242        send_hello(&remote_link, &remote_peer_id)
243            .await
244            .expect("send hello");
245
246        let accepted = handle_pending_link(
247            PendingBluetoothLink {
248                link: link_a.clone(),
249                direction: PeerDirection::Outbound,
250                local_hello_sent: false,
251                peer_hint: Some("mock-ble".to_string()),
252            },
253            BluetoothRuntimeContext {
254                my_peer_id: local_peer_id,
255                store: None,
256                nostr_relay: None,
257                mesh_frame_tx,
258                registrar: registrar.clone(),
259            },
260        )
261        .await
262        .expect("handle pending link");
263
264        assert!(accepted);
265        assert!(state
266            .runtime
267            .peers
268            .read()
269            .await
270            .contains_key(&remote_peer_id.to_string()));
271
272        link_a.close().await.expect("close local transport");
273
274        tokio::time::timeout(Duration::from_secs(2), async {
275            loop {
276                if !state
277                    .runtime
278                    .peers
279                    .read()
280                    .await
281                    .contains_key(&remote_peer_id.to_string())
282                {
283                    break;
284                }
285                tokio::time::sleep(Duration::from_millis(25)).await;
286            }
287        })
288        .await
289        .expect("peer should be unregistered");
290    }
291}
292
293#[derive(Clone)]
294pub struct BluetoothRuntimeContext {
295    pub my_peer_id: PeerId,
296    pub store: Option<Arc<dyn ContentStore>>,
297    pub nostr_relay: Option<SharedMeshRelayClient>,
298    pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
299    pub registrar: BluetoothPeerRegistrar,
300}
301
302/// Native Bluetooth backend. Nearby BLE links become transport-native mesh peers.
303pub struct BluetoothMesh {
304    config: BluetoothConfig,
305}
306
307impl BluetoothMesh {
308    pub fn new(config: BluetoothConfig) -> Self {
309        Self { config }
310    }
311
312    pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
313        if !self.config.is_enabled() {
314            info!("Bluetooth transport disabled");
315            return BluetoothBackendState::Disabled;
316        }
317
318        let mut started = false;
319
320        if let Some(bridge) = mobile_bluetooth_bridge() {
321            info!(
322                "Starting mobile Bluetooth bridge for peer {}",
323                context.my_peer_id.short()
324            );
325            match bridge.start(context.my_peer_id.to_string()).await {
326                Ok(mut rx) => {
327                    info!("Mobile Bluetooth bridge is running");
328                    let ctx = context.clone();
329                    tokio::spawn(async move {
330                        while let Some(link) = rx.recv().await {
331                            if let Err(err) = handle_pending_link(link, ctx.clone()).await {
332                                warn!("Mobile Bluetooth link failed: {}", err);
333                            }
334                        }
335                    });
336                    started = true;
337                }
338                Err(err) => {
339                    warn!("Failed to start mobile Bluetooth bridge: {}", err);
340                }
341            }
342        }
343
344        #[cfg(target_os = "macos")]
345        {
346            if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
347            {
348                warn!("Failed to start macOS Bluetooth central: {}", err);
349            } else {
350                started = true;
351            }
352        }
353
354        if started {
355            BluetoothBackendState::Running
356        } else {
357            warn!(
358                "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
359                self.config.max_peers
360            );
361            BluetoothBackendState::Unsupported
362        }
363    }
364}
365
366async fn handle_pending_link(
367    link: PendingBluetoothLink,
368    context: BluetoothRuntimeContext,
369) -> Result<bool> {
370    if let Some(peer_hint) = link.peer_hint.as_deref() {
371        debug!("Handling pending Bluetooth link {}", peer_hint);
372    }
373    info!(
374        "Handling pending Bluetooth link direction={} local_hello_sent={}",
375        link.direction, link.local_hello_sent
376    );
377    let remote_peer_id = receive_remote_hello(&link.link).await?;
378    info!(
379        "Received Bluetooth hello from {} while attaching pending link",
380        remote_peer_id.short()
381    );
382    if !link.local_hello_sent {
383        send_hello(&link.link, &context.my_peer_id).await?;
384        info!(
385            "Sent Bluetooth hello to {} while attaching pending link",
386            remote_peer_id.short()
387        );
388    }
389
390    let bluetooth_peer = BluetoothPeer::new(
391        remote_peer_id.clone(),
392        link.direction,
393        link.link,
394        context.store.clone(),
395        context.nostr_relay.clone(),
396        Some(context.mesh_frame_tx.clone()),
397        Some(context.registrar.state.clone()),
398    );
399    let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
400
401    if !context
402        .registrar
403        .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
404        .await
405    {
406        warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
407        bluetooth_peer.close().await?;
408        return Ok(false);
409    } else {
410        info!("Bluetooth peer {} connected", remote_peer_id.short());
411        spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
412    }
413    Ok(true)
414}
415
416fn spawn_bluetooth_disconnect_watch(
417    peer_id: PeerId,
418    peer: Arc<BluetoothPeer>,
419    registrar: BluetoothPeerRegistrar,
420) {
421    tokio::spawn(async move {
422        loop {
423            if !peer.is_connected() {
424                registrar
425                    .unregister_bluetooth_peer_if_current(&peer_id, &peer)
426                    .await;
427                break;
428            }
429            tokio::time::sleep(Duration::from_millis(250)).await;
430        }
431    });
432}
433
434#[derive(serde::Serialize, serde::Deserialize)]
435#[serde(rename_all = "camelCase")]
436struct BluetoothHello {
437    #[serde(rename = "type")]
438    kind: String,
439    peer_id: String,
440}
441
442async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
443    let hello = BluetoothHello {
444        kind: "hello".to_string(),
445        peer_id: my_peer_id.to_string(),
446    };
447    link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
448        .await
449}
450
451async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
452    let result = tokio::time::timeout(HELLO_TIMEOUT, async {
453        loop {
454            match link.recv().await {
455                Some(BluetoothFrame::Text(text)) => {
456                    if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
457                        if hello.kind == "hello" {
458                            return PeerId::from_string(&hello.peer_id)
459                                .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
460                        }
461                    }
462                }
463                Some(BluetoothFrame::Binary(_)) => {}
464                None => return Err(anyhow!("bluetooth link closed before hello")),
465            }
466        }
467    })
468    .await;
469
470    match result {
471        Ok(peer_id) => peer_id,
472        Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
473    }
474}
475
476#[cfg(target_os = "macos")]
477mod macos {
478    use super::*;
479    use btleplug::api::{
480        Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
481        ScanFilter, ValueNotification, WriteType,
482    };
483    use btleplug::platform::{Manager, Peripheral};
484    use futures::StreamExt;
485    use std::collections::HashSet;
486    use tokio::sync::Mutex;
487    use uuid::Uuid;
488
489    const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
490
491    #[derive(Clone)]
492    struct AdvertisementSnapshot {
493        last_seen: Instant,
494        peer_hint: Option<String>,
495    }
496
497    pub(super) async fn start_macos_central(
498        config: BluetoothConfig,
499        context: BluetoothRuntimeContext,
500    ) -> Result<()> {
501        struct ConnectedPeripheral {
502            peripheral: Peripheral,
503            link: Arc<dyn BluetoothLink>,
504            peer_hint: Option<String>,
505        }
506
507        let manager = Manager::new().await?;
508        let adapters = manager.adapters().await?;
509        let Some(adapter) = adapters.into_iter().next() else {
510            warn!("No Bluetooth adapters available on macOS");
511            return Ok(());
512        };
513        info!(
514            "Starting macOS Bluetooth central for peer {} (max_peers={})",
515            context.my_peer_id.short(),
516            config.max_peers
517        );
518        let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
519        let mut events = adapter.events().await?;
520        let fresh_advertisements =
521            Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
522        let fresh_advertisements_for_events = fresh_advertisements.clone();
523        tokio::spawn(async move {
524            while let Some(event) = events.next().await {
525                match event {
526                    CentralEvent::ServicesAdvertisement { id, services }
527                        if services.contains(&service_uuid) =>
528                    {
529                        let mut advertisements = fresh_advertisements_for_events.lock().await;
530                        let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
531                            AdvertisementSnapshot {
532                                last_seen: Instant::now(),
533                                peer_hint: None,
534                            }
535                        });
536                        entry.last_seen = Instant::now();
537                    }
538                    CentralEvent::ServiceDataAdvertisement { id, service_data } => {
539                        if let Some(data) = service_data.get(&service_uuid) {
540                            let mut advertisements = fresh_advertisements_for_events.lock().await;
541                            let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
542                                AdvertisementSnapshot {
543                                    last_seen: Instant::now(),
544                                    peer_hint: None,
545                                }
546                            });
547                            entry.last_seen = Instant::now();
548                            if !data.is_empty() {
549                                entry.peer_hint = Some(hex::encode(data));
550                            }
551                        }
552                    }
553                    CentralEvent::DeviceDisconnected(id) => {
554                        fresh_advertisements_for_events
555                            .lock()
556                            .await
557                            .remove(&id.to_string());
558                    }
559                    _ => {}
560                }
561            }
562        });
563
564        tokio::spawn(async move {
565            let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
566            loop {
567                if let Err(err) = adapter
568                    .start_scan(ScanFilter {
569                        services: vec![service_uuid],
570                    })
571                    .await
572                {
573                    warn!("Failed to start BLE scan: {}", err);
574                    tokio::time::sleep(Duration::from_secs(5)).await;
575                    continue;
576                }
577                tokio::time::sleep(Duration::from_secs(2)).await;
578
579                let peripherals = match adapter.peripherals().await {
580                    Ok(peripherals) => peripherals,
581                    Err(err) => {
582                        warn!("Failed to list BLE peripherals: {}", err);
583                        tokio::time::sleep(Duration::from_secs(5)).await;
584                        continue;
585                    }
586                };
587
588                connected.retain(|_, connection| {
589                    futures::executor::block_on(async {
590                        let peripheral_connected =
591                            connection.peripheral.is_connected().await.unwrap_or(false);
592                        let link_open = connection.link.is_open();
593                        if !peripheral_connected || !link_open {
594                            if peripheral_connected {
595                                let _ = connection.peripheral.disconnect().await;
596                            }
597                            false
598                        } else {
599                            true
600                        }
601                    })
602                });
603
604                let advertisement_snapshot = {
605                    let now = Instant::now();
606                    let mut seen = fresh_advertisements.lock().await;
607                    seen.retain(|_, advertisement| {
608                        now.saturating_duration_since(advertisement.last_seen)
609                            <= FRESH_ADVERTISEMENT_WINDOW
610                    });
611                    seen.clone()
612                };
613
614                let mut candidates = Vec::new();
615                for peripheral in peripherals {
616                    let peripheral_id = peripheral.id().to_string();
617                    if connected.contains_key(&peripheral_id) {
618                        continue;
619                    }
620                    let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
621                    else {
622                        debug!(
623                            "Skipping cached BLE peripheral {} without a fresh advertisement",
624                            peripheral_id
625                        );
626                        continue;
627                    };
628                    let properties = match peripheral.properties().await {
629                        Ok(Some(properties)) => properties,
630                        _ => continue,
631                    };
632                    if !properties.services.contains(&service_uuid) {
633                        continue;
634                    }
635                    candidates.push((peripheral, peripheral_id, advertisement));
636                }
637
638                candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
639                let mut represented_peer_hints = connected
640                    .values()
641                    .filter_map(|connection| connection.peer_hint.clone())
642                    .collect::<HashSet<_>>();
643
644                for (peripheral, peripheral_id, advertisement) in candidates {
645                    if connected.len() >= config.max_peers {
646                        break;
647                    }
648                    if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
649                        if !represented_peer_hints.insert(peer_hint.clone()) {
650                            debug!(
651                                "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
652                                peripheral_id,
653                                peer_hint
654                            );
655                            continue;
656                        }
657                    }
658                    info!(
659                        "Discovered BLE peripheral {} advertising htree service{}",
660                        peripheral_id,
661                        advertisement
662                            .peer_hint
663                            .as_ref()
664                            .map(|hint| format!(" (peer hint {})", hint))
665                            .unwrap_or_default()
666                    );
667                    match connect_peripheral(peripheral.clone()).await {
668                        Ok(Some(link)) => {
669                            let tracked_link = link.clone();
670                            let pending = PendingBluetoothLink {
671                                link,
672                                direction: PeerDirection::Outbound,
673                                local_hello_sent: false,
674                                peer_hint: advertisement
675                                    .peer_hint
676                                    .clone()
677                                    .or(Some(peripheral_id.clone())),
678                            };
679                            match handle_pending_link(pending, context.clone()).await {
680                                Ok(true) => {
681                                    connected.insert(
682                                        peripheral_id.clone(),
683                                        ConnectedPeripheral {
684                                            peripheral,
685                                            link: tracked_link,
686                                            peer_hint: advertisement.peer_hint.clone(),
687                                        },
688                                    );
689                                }
690                                Ok(false) => {}
691                                Err(err) => {
692                                    warn!("Failed to attach BLE peripheral: {}", err);
693                                }
694                            }
695                        }
696                        Ok(None) => {}
697                        Err(err) => {
698                            warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
699                        }
700                    }
701                }
702
703                tokio::time::sleep(Duration::from_secs(5)).await;
704            }
705        });
706
707        Ok(())
708    }
709
710    async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
711        let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
712        let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
713        let peripheral_id = peripheral.id().to_string();
714
715        let mut connect_timed_out = false;
716        if !peripheral.is_connected().await? {
717            info!("Connecting to BLE peripheral {}", peripheral_id);
718            match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
719                Ok(Ok(())) => {}
720                Ok(Err(err)) => return Err(err.into()),
721                Err(_) => {
722                    warn!(
723                        "BLE connect timed out for {}; probing services before giving up",
724                        peripheral_id
725                    );
726                    connect_timed_out = true;
727                }
728            }
729        }
730        if connect_timed_out && !peripheral.is_connected().await.unwrap_or(false) {
731            warn!(
732                "BLE peripheral {} never reached a connected state after timeout; retrying later",
733                peripheral_id
734            );
735            let _ = peripheral.disconnect().await;
736            return Ok(None);
737        }
738        tokio::time::sleep(Duration::from_millis(300)).await;
739        let mut rx_char = None;
740        let mut tx_char = None;
741        for attempt in 1..=8 {
742            info!(
743                "Discovering BLE services for {} (attempt {}/8)",
744                peripheral_id, attempt
745            );
746            if let Err(err) = peripheral.discover_services().await {
747                if attempt == 8 {
748                    if connect_timed_out {
749                        warn!(
750                            "BLE service discovery failed for {} after soft connect timeout: {}",
751                            peripheral_id, err
752                        );
753                        let _ = peripheral.disconnect().await;
754                        return Ok(None);
755                    }
756                    return Err(err.into());
757                }
758                warn!(
759                    "BLE service discovery attempt {}/8 failed for {}: {}",
760                    attempt, peripheral_id, err
761                );
762                tokio::time::sleep(Duration::from_millis(500)).await;
763                continue;
764            }
765
766            let characteristics = peripheral.characteristics();
767            if !characteristics.is_empty() {
768                let uuids = characteristics
769                    .iter()
770                    .map(|c| c.uuid.to_string())
771                    .collect::<Vec<_>>()
772                    .join(", ");
773                info!(
774                    "BLE peripheral {} characteristics: {}",
775                    peripheral_id, uuids
776                );
777            }
778            rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
779            tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
780            if rx_char.is_some() && tx_char.is_some() {
781                break;
782            }
783            tokio::time::sleep(Duration::from_millis(500)).await;
784        }
785        let Some(rx_char) = rx_char else {
786            warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
787            let _ = peripheral.disconnect().await;
788            return Ok(None);
789        };
790        let Some(tx_char) = tx_char else {
791            warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
792            let _ = peripheral.disconnect().await;
793            return Ok(None);
794        };
795        if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
796            warn!(
797                "BLE peripheral {} TX characteristic is missing NOTIFY",
798                peripheral_id
799            );
800            let _ = peripheral.disconnect().await;
801            return Ok(None);
802        }
803        let notifications = peripheral.notifications().await?;
804        info!("Subscribing to BLE notifications for {}", peripheral_id);
805        peripheral.subscribe(&tx_char).await?;
806
807        let initial_frames = match peripheral.read(&tx_char).await {
808            Ok(bytes) if !bytes.is_empty() => {
809                info!(
810                    "Read initial BLE hello bytes from {} ({} bytes)",
811                    peripheral_id,
812                    bytes.len()
813                );
814                let mut decoder = FrameDecoder::new();
815                match decoder.push(&bytes) {
816                    Ok(frames) => frames,
817                    Err(err) => {
818                        warn!(
819                            "Discarding malformed initial BLE frame from {}: {}",
820                            peripheral_id, err
821                        );
822                        Vec::new()
823                    }
824                }
825            }
826            Ok(_) => Vec::new(),
827            Err(err) => {
828                warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
829                Vec::new()
830            }
831        };
832
833        info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
834        Ok(Some(MacosCentralLink::new(
835            peripheral,
836            rx_char,
837            notifications,
838            initial_frames,
839        )))
840    }
841
842    struct FrameDecoder {
843        buffer: Vec<u8>,
844    }
845
846    impl FrameDecoder {
847        fn new() -> Self {
848            Self { buffer: Vec::new() }
849        }
850
851        fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
852            self.buffer.extend_from_slice(chunk);
853            let mut frames = Vec::new();
854            loop {
855                if self.buffer.len() < 5 {
856                    break;
857                }
858                let kind = self.buffer[0];
859                let len = u32::from_be_bytes([
860                    self.buffer[1],
861                    self.buffer[2],
862                    self.buffer[3],
863                    self.buffer[4],
864                ]) as usize;
865                if self.buffer.len() < 5 + len {
866                    break;
867                }
868                let payload = self.buffer[5..5 + len].to_vec();
869                self.buffer.drain(..5 + len);
870                let frame = match kind {
871                    1 => BluetoothFrame::Text(String::from_utf8(payload)?),
872                    2 => BluetoothFrame::Binary(payload),
873                    _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
874                };
875                frames.push(frame);
876            }
877            Ok(frames)
878        }
879    }
880
881    fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
882        match frame {
883            BluetoothFrame::Text(text) => {
884                let payload = text.into_bytes();
885                let mut out = Vec::with_capacity(5 + payload.len());
886                out.push(1);
887                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
888                out.extend_from_slice(&payload);
889                out
890            }
891            BluetoothFrame::Binary(payload) => {
892                let mut out = Vec::with_capacity(5 + payload.len());
893                out.push(2);
894                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
895                out.extend_from_slice(&payload);
896                out
897            }
898        }
899    }
900
901    struct MacosCentralLink {
902        peripheral: Peripheral,
903        rx_char: Characteristic,
904        open: std::sync::atomic::AtomicBool,
905        rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
906    }
907
908    impl MacosCentralLink {
909        fn new(
910            peripheral: Peripheral,
911            rx_char: Characteristic,
912            mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
913            initial_frames: Vec<BluetoothFrame>,
914        ) -> Arc<Self> {
915            let (tx, rx) = mpsc::channel(64);
916            let link = Arc::new(Self {
917                peripheral,
918                rx_char,
919                open: std::sync::atomic::AtomicBool::new(true),
920                rx: Mutex::new(rx),
921            });
922
923            for frame in initial_frames {
924                let _ = tx.try_send(frame);
925            }
926
927            let weak = Arc::downgrade(&link);
928            tokio::spawn(async move {
929                let mut decoder = FrameDecoder::new();
930                while let Some(notification) = notifications.next().await {
931                    let Ok(frames) = decoder.push(&notification.value) else {
932                        break;
933                    };
934                    for frame in frames {
935                        if tx.send(frame).await.is_err() {
936                            break;
937                        }
938                    }
939                }
940                if let Some(link) = weak.upgrade() {
941                    link.open.store(false, std::sync::atomic::Ordering::Relaxed);
942                }
943            });
944
945            link
946        }
947    }
948
949    #[async_trait]
950    impl BluetoothLink for MacosCentralLink {
951        async fn send(&self, frame: BluetoothFrame) -> Result<()> {
952            let bytes = encode_frame(frame);
953            for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
954                self.peripheral
955                    .write(&self.rx_char, chunk, WriteType::WithResponse)
956                    .await?;
957            }
958            Ok(())
959        }
960
961        async fn recv(&self) -> Option<BluetoothFrame> {
962            self.rx.lock().await.recv().await
963        }
964
965        fn is_open(&self) -> bool {
966            self.open.load(std::sync::atomic::Ordering::Relaxed)
967        }
968
969        async fn close(&self) -> Result<()> {
970            self.open.store(false, std::sync::atomic::Ordering::Relaxed);
971            let _ = self.peripheral.disconnect().await;
972            Ok(())
973        }
974    }
975}