Skip to main content

hashtree_cli/webrtc/
bluetooth.rs

1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3#[cfg(target_os = "macos")]
4use futures::StreamExt;
5use std::collections::BTreeSet;
6#[cfg(target_os = "macos")]
7use std::collections::HashMap;
8use std::sync::{Arc, OnceLock};
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc;
11use tracing::{debug, info, warn};
12
13use crate::nostr_relay::NostrRelay;
14
15use super::bluetooth_peer::{BluetoothFrame, BluetoothLink};
16use super::peer::ContentStore;
17use super::session::MeshPeer;
18use super::signaling::{
19    ConnectionState, PeerClassifier, PeerEntry, PeerSignalPath, PeerTransport, WebRTCState,
20};
21use super::types::{MeshNostrFrame, PeerDirection, PeerId, PeerPool, PoolSettings};
22
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
29#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
30pub const HTREE_BLE_CHUNK_BYTES: usize = 180;
31const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
32
33/// Configuration for the optional Bluetooth peer transport.
34#[derive(Debug, Clone)]
35pub struct BluetoothConfig {
36    pub enabled: bool,
37    pub max_peers: usize,
38}
39
40impl BluetoothConfig {
41    pub fn is_enabled(&self) -> bool {
42        self.enabled && self.max_peers > 0
43    }
44}
45
46impl Default for BluetoothConfig {
47    fn default() -> Self {
48        Self {
49            enabled: false,
50            max_peers: 0,
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum BluetoothBackendState {
57    Disabled,
58    Running,
59    Unsupported,
60}
61
62#[derive(Clone)]
63pub struct PendingBluetoothLink {
64    pub link: Arc<dyn BluetoothLink>,
65    pub direction: PeerDirection,
66    pub local_hello_sent: bool,
67    pub peer_hint: Option<String>,
68}
69
70#[async_trait]
71pub trait MobileBluetoothBridge: Send + Sync {
72    async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
73}
74
75static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
76
77pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
78    MOBILE_BLUETOOTH_BRIDGE
79        .set(bridge)
80        .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
81}
82
83fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
84    MOBILE_BLUETOOTH_BRIDGE.get().cloned()
85}
86
87#[derive(Clone)]
88pub struct BluetoothPeerRegistrar {
89    state: Arc<WebRTCState>,
90    peer_classifier: PeerClassifier,
91    pools: PoolSettings,
92    max_bluetooth_peers: usize,
93}
94
95impl BluetoothPeerRegistrar {
96    pub fn new(
97        state: Arc<WebRTCState>,
98        peer_classifier: PeerClassifier,
99        pools: PoolSettings,
100        max_bluetooth_peers: usize,
101    ) -> Self {
102        Self {
103            state,
104            peer_classifier,
105            pools,
106            max_bluetooth_peers,
107        }
108    }
109
110    async fn pool_counts(&self) -> (usize, usize) {
111        let peers = self.state.peers.read().await;
112        let mut follows = 0usize;
113        let mut other = 0usize;
114        for entry in peers.values() {
115            if entry.state != ConnectionState::Connected {
116                continue;
117            }
118            match entry.pool {
119                PeerPool::Follows => follows += 1,
120                PeerPool::Other => other += 1,
121            }
122        }
123        (follows, other)
124    }
125
126    async fn bluetooth_peer_count(&self, peer_key: &str) -> usize {
127        let peers = self.state.peers.read().await;
128        peers
129            .values()
130            .filter(|entry| entry.transport == PeerTransport::Bluetooth)
131            .filter(|entry| entry.state == ConnectionState::Connected)
132            .filter(|entry| entry.peer_id.to_string() != peer_key)
133            .count()
134    }
135
136    pub async fn register_connected_peer(
137        &self,
138        peer_id: PeerId,
139        direction: PeerDirection,
140        peer: MeshPeer,
141    ) -> bool {
142        let peer_key = peer_id.to_string();
143        let pool = (self.peer_classifier)(&peer_id.pubkey);
144        let (follows, other) = self.pool_counts().await;
145        let can_accept_pool = match pool {
146            PeerPool::Follows => follows < self.pools.follows.max_connections,
147            PeerPool::Other => other < self.pools.other.max_connections,
148        };
149        if !can_accept_pool {
150            warn!(
151                "Bluetooth peer {} rejected because pool {:?} is full",
152                peer_id.short(),
153                pool
154            );
155            return false;
156        }
157
158        if self.max_bluetooth_peers == 0
159            || self.bluetooth_peer_count(&peer_key).await >= self.max_bluetooth_peers
160        {
161            warn!(
162                "Bluetooth peer {} rejected because max_bluetooth_peers={} reached",
163                peer_id.short(),
164                self.max_bluetooth_peers
165            );
166            return false;
167        }
168
169        let mut peers = self.state.peers.write().await;
170        let duplicate_keys = peers
171            .iter()
172            .filter(|(key, entry)| {
173                key.as_str() != peer_key
174                    && entry.transport == PeerTransport::Bluetooth
175                    && entry.peer_id.pubkey == peer_id.pubkey
176            })
177            .map(|(key, _)| key.clone())
178            .collect::<Vec<_>>();
179        let was_connected = peers
180            .get(&peer_key)
181            .map(|entry| entry.state == ConnectionState::Connected)
182            .unwrap_or(false);
183        let replaced = peers.insert(
184            peer_key,
185            PeerEntry {
186                peer_id,
187                direction,
188                state: ConnectionState::Connected,
189                last_seen: Instant::now(),
190                peer: Some(peer),
191                pool,
192                transport: PeerTransport::Bluetooth,
193                signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
194                bytes_sent: 0,
195                bytes_received: 0,
196            },
197        );
198        let removed_duplicates = duplicate_keys
199            .into_iter()
200            .filter_map(|key| peers.remove(&key))
201            .collect::<Vec<_>>();
202        drop(peers);
203
204        if let Some(previous) = replaced.and_then(|entry| entry.peer) {
205            let _ = previous.close().await;
206        }
207        for duplicate in &removed_duplicates {
208            if let Some(peer) = duplicate.peer.as_ref() {
209                let _ = peer.close().await;
210            }
211        }
212
213        let removed_connected_duplicates = removed_duplicates
214            .iter()
215            .filter(|entry| entry.state == ConnectionState::Connected)
216            .count() as isize;
217        let connected_delta =
218            1isize - if was_connected { 1 } else { 0 } - removed_connected_duplicates;
219        if connected_delta > 0 {
220            self.state.connected_count.fetch_add(
221                connected_delta as usize,
222                std::sync::atomic::Ordering::Relaxed,
223            );
224        } else if connected_delta < 0 {
225            self.state.connected_count.fetch_sub(
226                (-connected_delta) as usize,
227                std::sync::atomic::Ordering::Relaxed,
228            );
229        }
230        true
231    }
232
233    pub async fn unregister_peer(&self, peer_id: &PeerId) {
234        let peer_key = peer_id.to_string();
235        let removed = self.state.peers.write().await.remove(&peer_key);
236        if let Some(entry) = removed {
237            if entry.state == ConnectionState::Connected {
238                self.state
239                    .connected_count
240                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
241            }
242            if let Some(peer) = entry.peer {
243                let _ = peer.close().await;
244            }
245        }
246    }
247
248    pub async fn unregister_bluetooth_peer_if_current(
249        &self,
250        peer_id: &PeerId,
251        expected_peer: &Arc<super::BluetoothPeer>,
252    ) {
253        let peer_key = peer_id.to_string();
254        let removed = {
255            let mut peers = self.state.peers.write().await;
256            let matches_current = peers
257                .get(&peer_key)
258                .and_then(|entry| entry.peer.as_ref())
259                .and_then(|peer| match peer {
260                    MeshPeer::Bluetooth(current) => Some(Arc::ptr_eq(current, expected_peer)),
261                    _ => None,
262                })
263                .unwrap_or(false);
264            if matches_current {
265                peers.remove(&peer_key)
266            } else {
267                None
268            }
269        };
270        if let Some(entry) = removed {
271            if entry.state == ConnectionState::Connected {
272                self.state
273                    .connected_count
274                    .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
275            }
276            if let Some(peer) = entry.peer {
277                let _ = peer.close().await;
278            }
279        }
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::webrtc::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
287    use crate::webrtc::session::{MeshPeer, TestMeshPeer};
288
289    #[tokio::test]
290    async fn register_connected_peer_closes_replaced_session() {
291        let state = Arc::new(WebRTCState::new());
292        let registrar = BluetoothPeerRegistrar::new(
293            state.clone(),
294            Arc::new(|_| PeerPool::Other),
295            PoolSettings::default(),
296            2,
297        );
298
299        let peer_id = PeerId::new("peer-pub".to_string(), Some("session".to_string()));
300        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
301        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
302        let first_ref = first.mock_ref().expect("mock peer").clone();
303
304        assert!(
305            registrar
306                .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
307                .await
308        );
309        assert!(
310            registrar
311                .register_connected_peer(peer_id, PeerDirection::Outbound, second)
312                .await
313        );
314
315        assert!(first_ref.is_closed());
316    }
317
318    #[tokio::test]
319    async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
320        let state = Arc::new(WebRTCState::new());
321        let registrar = BluetoothPeerRegistrar::new(
322            state.clone(),
323            Arc::new(|_| PeerPool::Other),
324            PoolSettings::default(),
325            2,
326        );
327
328        let first_peer_id = PeerId::new("peer-pub".to_string(), Some("session-a".to_string()));
329        let second_peer_id = PeerId::new("peer-pub".to_string(), Some("session-b".to_string()));
330        let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
331        let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
332        let first_ref = first.mock_ref().expect("mock peer").clone();
333
334        assert!(
335            registrar
336                .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
337                .await
338        );
339        assert!(
340            registrar
341                .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
342                .await
343        );
344
345        assert!(first_ref.is_closed());
346        let peers = state.peers.read().await;
347        assert!(!peers.contains_key(&first_peer_id.to_string()));
348        assert!(peers.contains_key(&second_peer_id.to_string()));
349        assert_eq!(
350            state
351                .connected_count
352                .load(std::sync::atomic::Ordering::Relaxed),
353            1
354        );
355    }
356
357    #[tokio::test]
358    async fn handle_pending_link_unregisters_peer_after_transport_closes() {
359        let (link_a, link_b) = MockBluetoothLink::pair();
360        let state = Arc::new(WebRTCState::new());
361        let registrar = BluetoothPeerRegistrar::new(
362            state.clone(),
363            Arc::new(|_| PeerPool::Other),
364            PoolSettings::default(),
365            2,
366        );
367        let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
368        let local_peer_id = PeerId::new("local-pub".to_string(), Some("local-session".to_string()));
369        let remote_peer_id =
370            PeerId::new("remote-pub".to_string(), Some("remote-session".to_string()));
371        let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
372
373        send_hello(&remote_link, &remote_peer_id)
374            .await
375            .expect("send hello");
376
377        let accepted = handle_pending_link(
378            PendingBluetoothLink {
379                link: link_a.clone(),
380                direction: PeerDirection::Outbound,
381                local_hello_sent: false,
382                peer_hint: Some("mock-ble".to_string()),
383            },
384            BluetoothRuntimeContext {
385                my_peer_id: local_peer_id,
386                store: None,
387                nostr_relay: None,
388                mesh_frame_tx,
389                registrar: registrar.clone(),
390            },
391        )
392        .await
393        .expect("handle pending link");
394
395        assert!(accepted);
396        assert!(state
397            .peers
398            .read()
399            .await
400            .contains_key(&remote_peer_id.to_string()));
401
402        link_a.close().await.expect("close local transport");
403
404        tokio::time::timeout(Duration::from_secs(2), async {
405            loop {
406                if !state
407                    .peers
408                    .read()
409                    .await
410                    .contains_key(&remote_peer_id.to_string())
411                {
412                    break;
413                }
414                tokio::time::sleep(Duration::from_millis(25)).await;
415            }
416        })
417        .await
418        .expect("peer should be unregistered");
419    }
420}
421
422#[derive(Clone)]
423pub struct BluetoothRuntimeContext {
424    pub my_peer_id: PeerId,
425    pub store: Option<Arc<dyn ContentStore>>,
426    pub nostr_relay: Option<Arc<NostrRelay>>,
427    pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
428    pub registrar: BluetoothPeerRegistrar,
429}
430
431/// Native Bluetooth backend. Nearby BLE links become transport-native mesh peers.
432pub struct BluetoothMesh {
433    config: BluetoothConfig,
434}
435
436impl BluetoothMesh {
437    pub fn new(config: BluetoothConfig) -> Self {
438        Self { config }
439    }
440
441    pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
442        if !self.config.is_enabled() {
443            info!("Bluetooth transport disabled");
444            return BluetoothBackendState::Disabled;
445        }
446
447        let mut started = false;
448
449        if let Some(bridge) = mobile_bluetooth_bridge() {
450            info!(
451                "Starting mobile Bluetooth bridge for peer {}",
452                context.my_peer_id.short()
453            );
454            match bridge.start(context.my_peer_id.to_string()).await {
455                Ok(mut rx) => {
456                    info!("Mobile Bluetooth bridge is running");
457                    let ctx = context.clone();
458                    tokio::spawn(async move {
459                        while let Some(link) = rx.recv().await {
460                            if let Err(err) = handle_pending_link(link, ctx.clone()).await {
461                                warn!("Mobile Bluetooth link failed: {}", err);
462                            }
463                        }
464                    });
465                    started = true;
466                }
467                Err(err) => {
468                    warn!("Failed to start mobile Bluetooth bridge: {}", err);
469                }
470            }
471        }
472
473        #[cfg(target_os = "macos")]
474        {
475            if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
476            {
477                warn!("Failed to start macOS Bluetooth central: {}", err);
478            } else {
479                started = true;
480            }
481        }
482
483        if started {
484            BluetoothBackendState::Running
485        } else {
486            warn!(
487                "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
488                self.config.max_peers
489            );
490            BluetoothBackendState::Unsupported
491        }
492    }
493}
494
495async fn handle_pending_link(
496    link: PendingBluetoothLink,
497    context: BluetoothRuntimeContext,
498) -> Result<bool> {
499    if let Some(peer_hint) = link.peer_hint.as_deref() {
500        debug!("Handling pending Bluetooth link {}", peer_hint);
501    }
502    let remote_peer_id = receive_remote_hello(&link.link).await?;
503    if !link.local_hello_sent {
504        send_hello(&link.link, &context.my_peer_id).await?;
505    }
506
507    let bluetooth_peer = super::BluetoothPeer::new(
508        remote_peer_id.clone(),
509        link.direction,
510        link.link,
511        context.store.clone(),
512        context.nostr_relay.clone(),
513        Some(context.mesh_frame_tx.clone()),
514        Some(context.registrar.state.clone()),
515    );
516    let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
517
518    if !context
519        .registrar
520        .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
521        .await
522    {
523        warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
524        bluetooth_peer.close().await?;
525        return Ok(false);
526    } else {
527        info!("Bluetooth peer {} connected", remote_peer_id.short());
528        spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
529    }
530    Ok(true)
531}
532
533fn spawn_bluetooth_disconnect_watch(
534    peer_id: PeerId,
535    peer: Arc<super::BluetoothPeer>,
536    registrar: BluetoothPeerRegistrar,
537) {
538    tokio::spawn(async move {
539        loop {
540            if !peer.is_connected() {
541                registrar
542                    .unregister_bluetooth_peer_if_current(&peer_id, &peer)
543                    .await;
544                break;
545            }
546            tokio::time::sleep(Duration::from_millis(250)).await;
547        }
548    });
549}
550
551#[derive(serde::Serialize, serde::Deserialize)]
552#[serde(rename_all = "camelCase")]
553struct BluetoothHello {
554    #[serde(rename = "type")]
555    kind: String,
556    peer_id: String,
557}
558
559async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
560    let hello = BluetoothHello {
561        kind: "hello".to_string(),
562        peer_id: my_peer_id.to_string(),
563    };
564    link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
565        .await
566}
567
568async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
569    let result = tokio::time::timeout(HELLO_TIMEOUT, async {
570        loop {
571            match link.recv().await {
572                Some(BluetoothFrame::Text(text)) => {
573                    if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
574                        if hello.kind == "hello" {
575                            return PeerId::from_string(&hello.peer_id)
576                                .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
577                        }
578                    }
579                }
580                Some(BluetoothFrame::Binary(_)) => {}
581                None => return Err(anyhow!("bluetooth link closed before hello")),
582            }
583        }
584    })
585    .await;
586
587    match result {
588        Ok(peer_id) => peer_id,
589        Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
590    }
591}
592
593#[cfg(target_os = "macos")]
594mod macos {
595    use super::*;
596    use btleplug::api::{
597        Central, CharPropFlags, Characteristic, Manager as _, Peripheral as _, ScanFilter,
598        ValueNotification, WriteType,
599    };
600    use btleplug::platform::{Manager, Peripheral};
601    use tokio::sync::Mutex;
602    use uuid::Uuid;
603
604    pub(super) async fn start_macos_central(
605        config: BluetoothConfig,
606        context: BluetoothRuntimeContext,
607    ) -> Result<()> {
608        struct ConnectedPeripheral {
609            peripheral: Peripheral,
610            link: Arc<dyn BluetoothLink>,
611        }
612
613        let manager = Manager::new().await?;
614        let adapters = manager.adapters().await?;
615        let Some(adapter) = adapters.into_iter().next() else {
616            warn!("No Bluetooth adapters available on macOS");
617            return Ok(());
618        };
619        info!(
620            "Starting macOS Bluetooth central for peer {} (max_peers={})",
621            context.my_peer_id.short(),
622            config.max_peers
623        );
624
625        tokio::spawn(async move {
626            let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
627            let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
628            loop {
629                if let Err(err) = adapter.start_scan(ScanFilter::default()).await {
630                    warn!("Failed to start BLE scan: {}", err);
631                    tokio::time::sleep(Duration::from_secs(5)).await;
632                    continue;
633                }
634                tokio::time::sleep(Duration::from_secs(2)).await;
635
636                let peripherals = match adapter.peripherals().await {
637                    Ok(peripherals) => peripherals,
638                    Err(err) => {
639                        warn!("Failed to list BLE peripherals: {}", err);
640                        tokio::time::sleep(Duration::from_secs(5)).await;
641                        continue;
642                    }
643                };
644
645                connected.retain(|_, connection| {
646                    futures::executor::block_on(async {
647                        let peripheral_connected =
648                            connection.peripheral.is_connected().await.unwrap_or(false);
649                        let link_open = connection.link.is_open();
650                        if !peripheral_connected || !link_open {
651                            if peripheral_connected {
652                                let _ = connection.peripheral.disconnect().await;
653                            }
654                            false
655                        } else {
656                            true
657                        }
658                    })
659                });
660
661                for peripheral in peripherals {
662                    if connected.len() >= config.max_peers {
663                        break;
664                    }
665                    let peripheral_id = peripheral.id().to_string();
666                    if connected.contains_key(&peripheral_id) {
667                        continue;
668                    }
669                    let properties = match peripheral.properties().await {
670                        Ok(Some(properties)) => properties,
671                        _ => continue,
672                    };
673                    if !properties.services.contains(&service_uuid) {
674                        continue;
675                    }
676                    info!(
677                        "Discovered BLE peripheral {} advertising htree service",
678                        peripheral_id
679                    );
680                    if let Err(err) = adapter.stop_scan().await {
681                        debug!("Failed to stop BLE scan before connecting to {}: {}", peripheral_id, err);
682                    }
683                    match connect_peripheral(peripheral.clone()).await {
684                        Ok(Some(link)) => {
685                            let tracked_link = link.clone();
686                            let pending = PendingBluetoothLink {
687                                link,
688                                direction: PeerDirection::Outbound,
689                                local_hello_sent: false,
690                                peer_hint: Some(peripheral_id.clone()),
691                            };
692                            match handle_pending_link(pending, context.clone()).await {
693                                Ok(true) => {
694                                    connected.insert(
695                                        peripheral_id.clone(),
696                                        ConnectedPeripheral {
697                                            peripheral,
698                                            link: tracked_link,
699                                        },
700                                    );
701                                }
702                                Ok(false) => {}
703                                Err(err) => {
704                                    warn!("Failed to attach BLE peripheral: {}", err);
705                                }
706                            }
707                        }
708                        Ok(None) => {}
709                        Err(err) => {
710                            warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
711                        }
712                    }
713                }
714
715                tokio::time::sleep(Duration::from_secs(5)).await;
716            }
717        });
718
719        Ok(())
720    }
721
722    async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
723        let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
724        let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
725        let peripheral_id = peripheral.id().to_string();
726
727        if !peripheral.is_connected().await? {
728            info!("Connecting to BLE peripheral {}", peripheral_id);
729            match tokio::time::timeout(Duration::from_secs(5), peripheral.connect()).await {
730                Ok(Ok(())) => {}
731                Ok(Err(err)) => return Err(err.into()),
732                Err(_) => {
733                    let connected = peripheral.is_connected().await.unwrap_or(false);
734                    if !connected {
735                        warn!("BLE connect timed out for {} without establishing a link", peripheral_id);
736                        let _ = peripheral.disconnect().await;
737                        return Ok(None);
738                    }
739                    warn!(
740                        "BLE connect timed out for {} but the adapter reports it connected; continuing",
741                        peripheral_id
742                    );
743                }
744            }
745        }
746        let mut rx_char = None;
747        let mut tx_char = None;
748        for attempt in 1..=6 {
749            info!(
750                "Discovering BLE services for {} (attempt {}/6)",
751                peripheral_id, attempt
752            );
753            peripheral.discover_services().await?;
754
755            let characteristics = peripheral.characteristics();
756            if !characteristics.is_empty() {
757                let uuids = characteristics
758                    .iter()
759                    .map(|c| c.uuid.to_string())
760                    .collect::<Vec<_>>()
761                    .join(", ");
762                info!(
763                    "BLE peripheral {} characteristics: {}",
764                    peripheral_id, uuids
765                );
766            }
767            rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
768            tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
769            if rx_char.is_some() && tx_char.is_some() {
770                break;
771            }
772            tokio::time::sleep(Duration::from_millis(500)).await;
773        }
774        let Some(rx_char) = rx_char else {
775            warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
776            let _ = peripheral.disconnect().await;
777            return Ok(None);
778        };
779        let Some(tx_char) = tx_char else {
780            warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
781            let _ = peripheral.disconnect().await;
782            return Ok(None);
783        };
784        if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
785            warn!(
786                "BLE peripheral {} TX characteristic is missing NOTIFY",
787                peripheral_id
788            );
789            let _ = peripheral.disconnect().await;
790            return Ok(None);
791        }
792        let notifications = peripheral.notifications().await?;
793        info!("Subscribing to BLE notifications for {}", peripheral_id);
794        peripheral.subscribe(&tx_char).await?;
795
796        let initial_frames = match peripheral.read(&tx_char).await {
797            Ok(bytes) if !bytes.is_empty() => {
798                info!(
799                    "Read initial BLE hello bytes from {} ({} bytes)",
800                    peripheral_id,
801                    bytes.len()
802                );
803                let mut decoder = FrameDecoder::new();
804                match decoder.push(&bytes) {
805                    Ok(frames) => frames,
806                    Err(err) => {
807                        warn!(
808                            "Discarding malformed initial BLE frame from {}: {}",
809                            peripheral_id, err
810                        );
811                        Vec::new()
812                    }
813                }
814            }
815            Ok(_) => Vec::new(),
816            Err(err) => {
817                warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
818                Vec::new()
819            }
820        };
821
822        info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
823        Ok(Some(MacosCentralLink::new(
824            peripheral,
825            rx_char,
826            notifications,
827            initial_frames,
828        )))
829    }
830
831    struct FrameDecoder {
832        buffer: Vec<u8>,
833    }
834
835    impl FrameDecoder {
836        fn new() -> Self {
837            Self { buffer: Vec::new() }
838        }
839
840        fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
841            self.buffer.extend_from_slice(chunk);
842            let mut frames = Vec::new();
843            loop {
844                if self.buffer.len() < 5 {
845                    break;
846                }
847                let kind = self.buffer[0];
848                let len = u32::from_be_bytes([
849                    self.buffer[1],
850                    self.buffer[2],
851                    self.buffer[3],
852                    self.buffer[4],
853                ]) as usize;
854                if self.buffer.len() < 5 + len {
855                    break;
856                }
857                let payload = self.buffer[5..5 + len].to_vec();
858                self.buffer.drain(..5 + len);
859                let frame = match kind {
860                    1 => BluetoothFrame::Text(String::from_utf8(payload)?),
861                    2 => BluetoothFrame::Binary(payload),
862                    _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
863                };
864                frames.push(frame);
865            }
866            Ok(frames)
867        }
868    }
869
870    fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
871        match frame {
872            BluetoothFrame::Text(text) => {
873                let payload = text.into_bytes();
874                let mut out = Vec::with_capacity(5 + payload.len());
875                out.push(1);
876                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
877                out.extend_from_slice(&payload);
878                out
879            }
880            BluetoothFrame::Binary(payload) => {
881                let mut out = Vec::with_capacity(5 + payload.len());
882                out.push(2);
883                out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
884                out.extend_from_slice(&payload);
885                out
886            }
887        }
888    }
889
890    struct MacosCentralLink {
891        peripheral: Peripheral,
892        rx_char: Characteristic,
893        open: std::sync::atomic::AtomicBool,
894        rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
895    }
896
897    impl MacosCentralLink {
898        fn new(
899            peripheral: Peripheral,
900            rx_char: Characteristic,
901            mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
902            initial_frames: Vec<BluetoothFrame>,
903        ) -> Arc<Self> {
904            let (tx, rx) = mpsc::channel(64);
905            let link = Arc::new(Self {
906                peripheral,
907                rx_char,
908                open: std::sync::atomic::AtomicBool::new(true),
909                rx: Mutex::new(rx),
910            });
911
912            for frame in initial_frames {
913                let _ = tx.try_send(frame);
914            }
915
916            let weak = Arc::downgrade(&link);
917            tokio::spawn(async move {
918                let mut decoder = FrameDecoder::new();
919                while let Some(notification) = notifications.next().await {
920                    let Ok(frames) = decoder.push(&notification.value) else {
921                        break;
922                    };
923                    for frame in frames {
924                        if tx.send(frame).await.is_err() {
925                            break;
926                        }
927                    }
928                }
929                if let Some(link) = weak.upgrade() {
930                    link.open.store(false, std::sync::atomic::Ordering::Relaxed);
931                }
932            });
933
934            link
935        }
936    }
937
938    #[async_trait]
939    impl BluetoothLink for MacosCentralLink {
940        async fn send(&self, frame: BluetoothFrame) -> Result<()> {
941            let bytes = encode_frame(frame);
942            for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
943                self.peripheral
944                    .write(&self.rx_char, chunk, WriteType::WithResponse)
945                    .await?;
946            }
947            Ok(())
948        }
949
950        async fn recv(&self) -> Option<BluetoothFrame> {
951            self.rx.lock().await.recv().await
952        }
953
954        fn is_open(&self) -> bool {
955            self.open.load(std::sync::atomic::Ordering::Relaxed)
956        }
957
958        async fn close(&self) -> Result<()> {
959            self.open.store(false, std::sync::atomic::Ordering::Relaxed);
960            let _ = self.peripheral.disconnect().await;
961            Ok(())
962        }
963    }
964}