Skip to main content

hashtree_network/
bluetooth.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3#[cfg(target_os = "macos")]
4use std::collections::HashMap;
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7#[cfg(target_os = "macos")]
8use std::time::Instant;
9use tokio::sync::mpsc;
10use tracing::{debug, info, warn};
11
12use super::session::MeshPeer;
13#[cfg(test)]
14use super::types::PeerPool;
15use super::types::{MeshNostrFrame, PeerId, PoolSettings};
16use crate::bluetooth_peer::{BluetoothFrame, BluetoothLink, BluetoothPeer};
17use crate::manager::{PeerClassifier, WebRTCState};
18use crate::peer::ContentStore;
19use crate::relay_bridge::SharedMeshRelayClient;
20use crate::runtime_peer::PeerDirection;
21use crate::runtime_peer::{PeerSignalPath, PeerTransport, TransportPeerRegistrar};
22
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
29#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
30// Keep BLE chunks comfortably below conservative cross-platform GATT write budgets.
31pub const HTREE_BLE_CHUNK_BYTES: usize = 64;
32const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
33
34/// Configuration for the optional Bluetooth peer transport.
35#[derive(Debug, Clone, Default)]
36pub struct BluetoothConfig {
37    pub enabled: bool,
38    pub max_peers: usize,
39}
40
41impl BluetoothConfig {
42    pub fn is_enabled(&self) -> bool {
43        self.enabled && self.max_peers > 0
44    }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum BluetoothBackendState {
49    Disabled,
50    Running,
51    Unsupported,
52}
53
54#[derive(Clone)]
55pub struct PendingBluetoothLink {
56    pub link: Arc<dyn BluetoothLink>,
57    pub direction: PeerDirection,
58    pub local_hello_sent: bool,
59    pub peer_hint: Option<String>,
60}
61
62#[async_trait]
63pub trait MobileBluetoothBridge: Send + Sync {
64    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
65}
66
67static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
68
69pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
70    MOBILE_BLUETOOTH_BRIDGE
71        .set(bridge)
72        .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
73}
74
75fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
76    MOBILE_BLUETOOTH_BRIDGE.get().cloned()
77}
78
79#[derive(Clone)]
80pub struct BluetoothPeerRegistrar {
81    pub state: Arc<WebRTCState>,
82    inner: TransportPeerRegistrar<MeshPeer>,
83}
84
85impl BluetoothPeerRegistrar {
86    pub fn new(
87        state: Arc<WebRTCState>,
88        peer_classifier: PeerClassifier,
89        pools: PoolSettings,
90        max_bluetooth_peers: usize,
91    ) -> Self {
92        Self {
93            inner: TransportPeerRegistrar::new(
94                state.runtime.peers.clone(),
95                state.runtime.connected_count.clone(),
96                peer_classifier,
97                pools,
98                PeerTransport::Bluetooth,
99                PeerSignalPath::Bluetooth,
100                max_bluetooth_peers,
101            ),
102            state,
103        }
104    }
105
106    pub async fn register_connected_peer(
107        &self,
108        peer_id: PeerId,
109        direction: PeerDirection,
110        peer: MeshPeer,
111    ) -> bool {
112        let accepted = self
113            .inner
114            .register_connected_peer(peer_id.clone(), direction, peer)
115            .await;
116        if !accepted {
117            warn!(
118                "Bluetooth peer {} rejected by shared registrar",
119                peer_id.short()
120            );
121        }
122        accepted
123    }
124
125    pub async fn unregister_peer(&self, peer_id: &PeerId) {
126        self.inner.unregister_peer(peer_id).await;
127    }
128
129    pub async fn unregister_bluetooth_peer_if_current(
130        &self,
131        peer_id: &PeerId,
132        expected_peer: &Arc<BluetoothPeer>,
133    ) {
134        self.inner
135            .unregister_peer_if(peer_id, |peer| match peer {
136                MeshPeer::Bluetooth(current) => Arc::ptr_eq(current, expected_peer),
137                _ => false,
138            })
139            .await;
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
147    use crate::session::{MeshPeer, TestMeshPeer};
148
149    #[tokio::test]
150    async fn register_connected_peer_closes_replaced_session() {
151        let state = Arc::new(WebRTCState::new());
152        let registrar = BluetoothPeerRegistrar::new(
153            state.clone(),
154            Arc::new(|_| PeerPool::Other),
155            PoolSettings::default(),
156            2,
157        );
158
159        let peer_id = PeerId::new("peer-pub".to_string());
160        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
161        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
162        let first_ref = first.mock_ref().expect("mock peer").clone();
163
164        assert!(
165            registrar
166                .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
167                .await
168        );
169        assert!(
170            registrar
171                .register_connected_peer(peer_id, PeerDirection::Outbound, second)
172                .await
173        );
174
175        assert!(first_ref.is_closed());
176    }
177
178    #[tokio::test]
179    async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
180        let state = Arc::new(WebRTCState::new());
181        let registrar = BluetoothPeerRegistrar::new(
182            state.clone(),
183            Arc::new(|_| PeerPool::Other),
184            PoolSettings::default(),
185            2,
186        );
187
188        let first_peer_id = PeerId::new("peer-pub".to_string());
189        let second_peer_id = PeerId::new("peer-pub".to_string());
190        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
191        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
192        let first_ref = first.mock_ref().expect("mock peer").clone();
193
194        assert!(
195            registrar
196                .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
197                .await
198        );
199        assert!(
200            registrar
201                .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
202                .await
203        );
204
205        assert!(first_ref.is_closed());
206        let peers = state.runtime.peers.read().await;
207        assert!(peers.contains_key(&second_peer_id.to_string()));
208        assert_eq!(peers.len(), 1);
209        assert_eq!(
210            state
211                .runtime
212                .connected_count
213                .load(std::sync::atomic::Ordering::Relaxed),
214            1
215        );
216    }
217
218    #[tokio::test]
219    async fn handle_pending_link_unregisters_peer_after_transport_closes() {
220        let (link_a, link_b) = MockBluetoothLink::pair();
221        let state = Arc::new(WebRTCState::new());
222        let registrar = BluetoothPeerRegistrar::new(
223            state.clone(),
224            Arc::new(|_| PeerPool::Other),
225            PoolSettings::default(),
226            2,
227        );
228        let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
229        let local_peer_id = PeerId::new("local-pub".to_string());
230        let remote_peer_id = PeerId::new("remote-pub".to_string());
231        let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
232
233        send_hello(&remote_link, &remote_peer_id)
234            .await
235            .expect("send hello");
236
237        let accepted = handle_pending_link(
238            PendingBluetoothLink {
239                link: link_a.clone(),
240                direction: PeerDirection::Outbound,
241                local_hello_sent: false,
242                peer_hint: Some("mock-ble".to_string()),
243            },
244            BluetoothRuntimeContext {
245                my_peer_id: local_peer_id,
246                store: None,
247                nostr_relay: None,
248                mesh_frame_tx,
249                registrar: registrar.clone(),
250            },
251        )
252        .await
253        .expect("handle pending link");
254
255        assert!(accepted);
256        assert!(state
257            .runtime
258            .peers
259            .read()
260            .await
261            .contains_key(&remote_peer_id.to_string()));
262
263        link_a.close().await.expect("close local transport");
264
265        tokio::time::timeout(Duration::from_secs(2), async {
266            loop {
267                if !state
268                    .runtime
269                    .peers
270                    .read()
271                    .await
272                    .contains_key(&remote_peer_id.to_string())
273                {
274                    break;
275                }
276                tokio::time::sleep(Duration::from_millis(25)).await;
277            }
278        })
279        .await
280        .expect("peer should be unregistered");
281    }
282}
283
284#[derive(Clone)]
285pub struct BluetoothRuntimeContext {
286    pub my_peer_id: PeerId,
287    pub store: Option<Arc<dyn ContentStore>>,
288    pub nostr_relay: Option<SharedMeshRelayClient>,
289    pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
290    pub registrar: BluetoothPeerRegistrar,
291}
292
293/// Native Bluetooth backend. Nearby BLE links become transport-native mesh peers.
294pub struct BluetoothMesh {
295    config: BluetoothConfig,
296}
297
298impl BluetoothMesh {
299    pub fn new(config: BluetoothConfig) -> Self {
300        Self { config }
301    }
302
303    pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
304        if !self.config.is_enabled() {
305            info!("Bluetooth transport disabled");
306            return BluetoothBackendState::Disabled;
307        }
308
309        let mut started = false;
310
311        if let Some(bridge) = mobile_bluetooth_bridge() {
312            info!(
313                "Starting mobile Bluetooth bridge for peer {}",
314                context.my_peer_id.short()
315            );
316            match bridge.start(context.my_peer_id.to_string()).await {
317                Ok(mut rx) => {
318                    info!("Mobile Bluetooth bridge is running");
319                    let ctx = context.clone();
320                    tokio::spawn(async move {
321                        while let Some(link) = rx.recv().await {
322                            if let Err(err) = handle_pending_link(link, ctx.clone()).await {
323                                warn!("Mobile Bluetooth link failed: {}", err);
324                            }
325                        }
326                    });
327                    started = true;
328                }
329                Err(err) => {
330                    warn!("Failed to start mobile Bluetooth bridge: {}", err);
331                }
332            }
333        }
334
335        #[cfg(target_os = "macos")]
336        {
337            if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
338            {
339                warn!("Failed to start macOS Bluetooth central: {}", err);
340            } else {
341                started = true;
342            }
343        }
344
345        if started {
346            BluetoothBackendState::Running
347        } else {
348            warn!(
349                "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
350                self.config.max_peers
351            );
352            BluetoothBackendState::Unsupported
353        }
354    }
355}
356
357async fn handle_pending_link(
358    link: PendingBluetoothLink,
359    context: BluetoothRuntimeContext,
360) -> Result<bool> {
361    if let Some(peer_hint) = link.peer_hint.as_deref() {
362        debug!("Handling pending Bluetooth link {}", peer_hint);
363    }
364    info!(
365        "Handling pending Bluetooth link direction={} local_hello_sent={}",
366        link.direction, link.local_hello_sent
367    );
368    let remote_peer_id = receive_remote_hello(&link.link).await?;
369    info!(
370        "Received Bluetooth hello from {} while attaching pending link",
371        remote_peer_id.short()
372    );
373    if !link.local_hello_sent {
374        send_hello(&link.link, &context.my_peer_id).await?;
375        info!(
376            "Sent Bluetooth hello to {} while attaching pending link",
377            remote_peer_id.short()
378        );
379    }
380
381    let bluetooth_peer = BluetoothPeer::new(
382        remote_peer_id.clone(),
383        link.direction,
384        link.link,
385        context.store.clone(),
386        context.nostr_relay.clone(),
387        Some(context.mesh_frame_tx.clone()),
388        Some(context.registrar.state.clone()),
389    );
390    let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
391
392    if !context
393        .registrar
394        .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
395        .await
396    {
397        warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
398        bluetooth_peer.close().await?;
399        return Ok(false);
400    } else {
401        info!("Bluetooth peer {} connected", remote_peer_id.short());
402        spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
403    }
404    Ok(true)
405}
406
407fn spawn_bluetooth_disconnect_watch(
408    peer_id: PeerId,
409    peer: Arc<BluetoothPeer>,
410    registrar: BluetoothPeerRegistrar,
411) {
412    tokio::spawn(async move {
413        loop {
414            if !peer.is_connected() {
415                registrar
416                    .unregister_bluetooth_peer_if_current(&peer_id, &peer)
417                    .await;
418                break;
419            }
420            tokio::time::sleep(Duration::from_millis(250)).await;
421        }
422    });
423}
424
425#[derive(serde::Serialize, serde::Deserialize)]
426#[serde(rename_all = "camelCase")]
427struct BluetoothHello {
428    #[serde(rename = "type")]
429    kind: String,
430    peer_id: String,
431}
432
433async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
434    let hello = BluetoothHello {
435        kind: "hello".to_string(),
436        peer_id: my_peer_id.to_string(),
437    };
438    link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
439        .await
440}
441
442async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
443    let result = tokio::time::timeout(HELLO_TIMEOUT, async {
444        loop {
445            match link.recv().await {
446                Some(BluetoothFrame::Text(text)) => {
447                    if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
448                        if hello.kind == "hello" {
449                            return PeerId::from_string(&hello.peer_id)
450                                .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
451                        }
452                    }
453                }
454                Some(BluetoothFrame::Binary(_)) => {}
455                None => return Err(anyhow!("bluetooth link closed before hello")),
456            }
457        }
458    })
459    .await;
460
461    match result {
462        Ok(peer_id) => peer_id,
463        Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
464    }
465}
466
467#[cfg(target_os = "macos")]
468mod macos {
469    use super::*;
470    use btleplug::api::{
471        Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
472        ScanFilter, ValueNotification, WriteType,
473    };
474    use btleplug::platform::{Manager, Peripheral};
475    use futures::StreamExt;
476    use std::collections::HashSet;
477    use tokio::sync::Mutex;
478    use uuid::Uuid;
479
480    const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
481
482    #[derive(Clone)]
483    struct AdvertisementSnapshot {
484        last_seen: Instant,
485        peer_hint: Option<String>,
486    }
487
488    pub(super) async fn start_macos_central(
489        config: BluetoothConfig,
490        context: BluetoothRuntimeContext,
491    ) -> Result<()> {
492        struct ConnectedPeripheral {
493            peripheral: Peripheral,
494            link: Arc<dyn BluetoothLink>,
495            peer_hint: Option<String>,
496        }
497
498        let manager = Manager::new().await?;
499        let adapters = manager.adapters().await?;
500        let Some(adapter) = adapters.into_iter().next() else {
501            warn!("No Bluetooth adapters available on macOS");
502            return Ok(());
503        };
504        info!(
505            "Starting macOS Bluetooth central for peer {} (max_peers={})",
506            context.my_peer_id.short(),
507            config.max_peers
508        );
509        let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
510        let mut events = adapter.events().await?;
511        let fresh_advertisements =
512            Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
513        let fresh_advertisements_for_events = fresh_advertisements.clone();
514        tokio::spawn(async move {
515            while let Some(event) = events.next().await {
516                match event {
517                    CentralEvent::ServicesAdvertisement { id, services }
518                        if services.contains(&service_uuid) =>
519                    {
520                        let mut advertisements = fresh_advertisements_for_events.lock().await;
521                        let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
522                            AdvertisementSnapshot {
523                                last_seen: Instant::now(),
524                                peer_hint: None,
525                            }
526                        });
527                        entry.last_seen = Instant::now();
528                    }
529                    CentralEvent::ServiceDataAdvertisement { id, service_data } => {
530                        if let Some(data) = service_data.get(&service_uuid) {
531                            let mut advertisements = fresh_advertisements_for_events.lock().await;
532                            let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
533                                AdvertisementSnapshot {
534                                    last_seen: Instant::now(),
535                                    peer_hint: None,
536                                }
537                            });
538                            entry.last_seen = Instant::now();
539                            if !data.is_empty() {
540                                entry.peer_hint = Some(hex::encode(data));
541                            }
542                        }
543                    }
544                    CentralEvent::DeviceDisconnected(id) => {
545                        fresh_advertisements_for_events
546                            .lock()
547                            .await
548                            .remove(&id.to_string());
549                    }
550                    _ => {}
551                }
552            }
553        });
554
555        tokio::spawn(async move {
556            let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
557            loop {
558                if let Err(err) = adapter
559                    .start_scan(ScanFilter {
560                        services: vec![service_uuid],
561                    })
562                    .await
563                {
564                    warn!("Failed to start BLE scan: {}", err);
565                    tokio::time::sleep(Duration::from_secs(5)).await;
566                    continue;
567                }
568                tokio::time::sleep(Duration::from_secs(2)).await;
569
570                let peripherals = match adapter.peripherals().await {
571                    Ok(peripherals) => peripherals,
572                    Err(err) => {
573                        warn!("Failed to list BLE peripherals: {}", err);
574                        tokio::time::sleep(Duration::from_secs(5)).await;
575                        continue;
576                    }
577                };
578
579                connected.retain(|_, connection| {
580                    futures::executor::block_on(async {
581                        let peripheral_connected =
582                            connection.peripheral.is_connected().await.unwrap_or(false);
583                        let link_open = connection.link.is_open();
584                        if !peripheral_connected || !link_open {
585                            if peripheral_connected {
586                                let _ = connection.peripheral.disconnect().await;
587                            }
588                            false
589                        } else {
590                            true
591                        }
592                    })
593                });
594
595                let advertisement_snapshot = {
596                    let now = Instant::now();
597                    let mut seen = fresh_advertisements.lock().await;
598                    seen.retain(|_, advertisement| {
599                        now.saturating_duration_since(advertisement.last_seen)
600                            <= FRESH_ADVERTISEMENT_WINDOW
601                    });
602                    seen.clone()
603                };
604
605                let mut candidates = Vec::new();
606                for peripheral in peripherals {
607                    let peripheral_id = peripheral.id().to_string();
608                    if connected.contains_key(&peripheral_id) {
609                        continue;
610                    }
611                    let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
612                    else {
613                        debug!(
614                            "Skipping cached BLE peripheral {} without a fresh advertisement",
615                            peripheral_id
616                        );
617                        continue;
618                    };
619                    let properties = match peripheral.properties().await {
620                        Ok(Some(properties)) => properties,
621                        _ => continue,
622                    };
623                    if !properties.services.contains(&service_uuid) {
624                        continue;
625                    }
626                    candidates.push((peripheral, peripheral_id, advertisement));
627                }
628
629                candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
630                let mut represented_peer_hints = connected
631                    .values()
632                    .filter_map(|connection| connection.peer_hint.clone())
633                    .collect::<HashSet<_>>();
634
635                for (peripheral, peripheral_id, advertisement) in candidates {
636                    if connected.len() >= config.max_peers {
637                        break;
638                    }
639                    if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
640                        if !represented_peer_hints.insert(peer_hint.clone()) {
641                            debug!(
642                                "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
643                                peripheral_id,
644                                peer_hint
645                            );
646                            continue;
647                        }
648                    }
649                    info!(
650                        "Discovered BLE peripheral {} advertising htree service{}",
651                        peripheral_id,
652                        advertisement
653                            .peer_hint
654                            .as_ref()
655                            .map(|hint| format!(" (peer hint {})", hint))
656                            .unwrap_or_default()
657                    );
658                    match connect_peripheral(peripheral.clone()).await {
659                        Ok(Some(link)) => {
660                            let tracked_link = link.clone();
661                            let pending = PendingBluetoothLink {
662                                link,
663                                direction: PeerDirection::Outbound,
664                                local_hello_sent: false,
665                                peer_hint: advertisement
666                                    .peer_hint
667                                    .clone()
668                                    .or(Some(peripheral_id.clone())),
669                            };
670                            match handle_pending_link(pending, context.clone()).await {
671                                Ok(true) => {
672                                    connected.insert(
673                                        peripheral_id.clone(),
674                                        ConnectedPeripheral {
675                                            peripheral,
676                                            link: tracked_link,
677                                            peer_hint: advertisement.peer_hint.clone(),
678                                        },
679                                    );
680                                }
681                                Ok(false) => {}
682                                Err(err) => {
683                                    warn!("Failed to attach BLE peripheral: {}", err);
684                                }
685                            }
686                        }
687                        Ok(None) => {}
688                        Err(err) => {
689                            warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
690                        }
691                    }
692                }
693
694                tokio::time::sleep(Duration::from_secs(5)).await;
695            }
696        });
697
698        Ok(())
699    }
700
701    async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
702        let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
703        let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
704        let peripheral_id = peripheral.id().to_string();
705
706        let mut connect_timed_out = false;
707        if !peripheral.is_connected().await? {
708            info!("Connecting to BLE peripheral {}", peripheral_id);
709            match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
710                Ok(Ok(())) => {}
711                Ok(Err(err)) => return Err(err.into()),
712                Err(_) => {
713                    warn!(
714                        "BLE connect timed out for {}; probing services before giving up",
715                        peripheral_id
716                    );
717                    connect_timed_out = true;
718                }
719            }
720        }
721        if connect_timed_out && !peripheral.is_connected().await.unwrap_or(false) {
722            warn!(
723                "BLE peripheral {} never reached a connected state after timeout; retrying later",
724                peripheral_id
725            );
726            let _ = peripheral.disconnect().await;
727            return Ok(None);
728        }
729        tokio::time::sleep(Duration::from_millis(300)).await;
730        let mut rx_char = None;
731        let mut tx_char = None;
732        for attempt in 1..=8 {
733            info!(
734                "Discovering BLE services for {} (attempt {}/8)",
735                peripheral_id, attempt
736            );
737            if let Err(err) = peripheral.discover_services().await {
738                if attempt == 8 {
739                    if connect_timed_out {
740                        warn!(
741                            "BLE service discovery failed for {} after soft connect timeout: {}",
742                            peripheral_id, err
743                        );
744                        let _ = peripheral.disconnect().await;
745                        return Ok(None);
746                    }
747                    return Err(err.into());
748                }
749                warn!(
750                    "BLE service discovery attempt {}/8 failed for {}: {}",
751                    attempt, peripheral_id, err
752                );
753                tokio::time::sleep(Duration::from_millis(500)).await;
754                continue;
755            }
756
757            let characteristics = peripheral.characteristics();
758            if !characteristics.is_empty() {
759                let uuids = characteristics
760                    .iter()
761                    .map(|c| c.uuid.to_string())
762                    .collect::<Vec<_>>()
763                    .join(", ");
764                info!(
765                    "BLE peripheral {} characteristics: {}",
766                    peripheral_id, uuids
767                );
768            }
769            rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
770            tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
771            if rx_char.is_some() && tx_char.is_some() {
772                break;
773            }
774            tokio::time::sleep(Duration::from_millis(500)).await;
775        }
776        let Some(rx_char) = rx_char else {
777            warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
778            let _ = peripheral.disconnect().await;
779            return Ok(None);
780        };
781        let Some(tx_char) = tx_char else {
782            warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
783            let _ = peripheral.disconnect().await;
784            return Ok(None);
785        };
786        if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
787            warn!(
788                "BLE peripheral {} TX characteristic is missing NOTIFY",
789                peripheral_id
790            );
791            let _ = peripheral.disconnect().await;
792            return Ok(None);
793        }
794        let notifications = peripheral.notifications().await?;
795        info!("Subscribing to BLE notifications for {}", peripheral_id);
796        peripheral.subscribe(&tx_char).await?;
797
798        let initial_frames = match peripheral.read(&tx_char).await {
799            Ok(bytes) if !bytes.is_empty() => {
800                info!(
801                    "Read initial BLE hello bytes from {} ({} bytes)",
802                    peripheral_id,
803                    bytes.len()
804                );
805                let mut decoder = FrameDecoder::new();
806                match decoder.push(&bytes) {
807                    Ok(frames) => frames,
808                    Err(err) => {
809                        warn!(
810                            "Discarding malformed initial BLE frame from {}: {}",
811                            peripheral_id, err
812                        );
813                        Vec::new()
814                    }
815                }
816            }
817            Ok(_) => Vec::new(),
818            Err(err) => {
819                warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
820                Vec::new()
821            }
822        };
823
824        info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
825        Ok(Some(MacosCentralLink::new(
826            peripheral,
827            rx_char,
828            notifications,
829            initial_frames,
830        )))
831    }
832
833    struct FrameDecoder {
834        buffer: Vec<u8>,
835    }
836
837    impl FrameDecoder {
838        fn new() -> Self {
839            Self { buffer: Vec::new() }
840        }
841
842        fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
843            self.buffer.extend_from_slice(chunk);
844            let mut frames = Vec::new();
845            loop {
846                if self.buffer.len() < 5 {
847                    break;
848                }
849                let kind = self.buffer[0];
850                let len = u32::from_be_bytes([
851                    self.buffer[1],
852                    self.buffer[2],
853                    self.buffer[3],
854                    self.buffer[4],
855                ]) as usize;
856                if self.buffer.len() < 5 + len {
857                    break;
858                }
859                let payload = self.buffer[5..5 + len].to_vec();
860                self.buffer.drain(..5 + len);
861                let frame = match kind {
862                    1 => BluetoothFrame::Text(String::from_utf8(payload)?),
863                    2 => BluetoothFrame::Binary(payload),
864                    _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
865                };
866                frames.push(frame);
867            }
868            Ok(frames)
869        }
870    }
871
872    fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
873        match frame {
874            BluetoothFrame::Text(text) => {
875                let payload = text.into_bytes();
876                let mut out = Vec::with_capacity(5 + payload.len());
877                out.push(1);
878                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
879                out.extend_from_slice(&payload);
880                out
881            }
882            BluetoothFrame::Binary(payload) => {
883                let mut out = Vec::with_capacity(5 + payload.len());
884                out.push(2);
885                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
886                out.extend_from_slice(&payload);
887                out
888            }
889        }
890    }
891
892    struct MacosCentralLink {
893        peripheral: Peripheral,
894        rx_char: Characteristic,
895        open: std::sync::atomic::AtomicBool,
896        rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
897    }
898
899    impl MacosCentralLink {
900        fn new(
901            peripheral: Peripheral,
902            rx_char: Characteristic,
903            mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
904            initial_frames: Vec<BluetoothFrame>,
905        ) -> Arc<Self> {
906            let (tx, rx) = mpsc::channel(64);
907            let link = Arc::new(Self {
908                peripheral,
909                rx_char,
910                open: std::sync::atomic::AtomicBool::new(true),
911                rx: Mutex::new(rx),
912            });
913
914            for frame in initial_frames {
915                let _ = tx.try_send(frame);
916            }
917
918            let weak = Arc::downgrade(&link);
919            tokio::spawn(async move {
920                let mut decoder = FrameDecoder::new();
921                while let Some(notification) = notifications.next().await {
922                    let Ok(frames) = decoder.push(&notification.value) else {
923                        break;
924                    };
925                    for frame in frames {
926                        if tx.send(frame).await.is_err() {
927                            break;
928                        }
929                    }
930                }
931                if let Some(link) = weak.upgrade() {
932                    link.open.store(false, std::sync::atomic::Ordering::Relaxed);
933                }
934            });
935
936            link
937        }
938    }
939
940    #[async_trait]
941    impl BluetoothLink for MacosCentralLink {
942        async fn send(&self, frame: BluetoothFrame) -> Result<()> {
943            let bytes = encode_frame(frame);
944            for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
945                self.peripheral
946                    .write(&self.rx_char, chunk, WriteType::WithResponse)
947                    .await?;
948            }
949            Ok(())
950        }
951
952        async fn recv(&self) -> Option<BluetoothFrame> {
953            self.rx.lock().await.recv().await
954        }
955
956        fn is_open(&self) -> bool {
957            self.open.load(std::sync::atomic::Ordering::Relaxed)
958        }
959
960        async fn close(&self) -> Result<()> {
961            self.open.store(false, std::sync::atomic::Ordering::Relaxed);
962            let _ = self.peripheral.disconnect().await;
963            Ok(())
964        }
965    }
966}