1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::BTreeSet;
4#[cfg(target_os = "macos")]
5use std::collections::HashMap;
6use std::sync::{Arc, OnceLock};
7use std::time::{Duration, Instant};
8use tokio::sync::mpsc;
9use tracing::{debug, info, warn};
10
11use crate::nostr_relay::NostrRelay;
12
13use super::bluetooth_peer::{BluetoothFrame, BluetoothLink};
14use super::peer::ContentStore;
15use super::session::MeshPeer;
16use super::signaling::{
17 ConnectionState, PeerClassifier, PeerEntry, PeerSignalPath, PeerTransport, WebRTCState,
18};
19use super::types::{MeshNostrFrame, PeerDirection, PeerId, PeerPool, PoolSettings};
20
21#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
22pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_CHUNK_BYTES: usize = 64;
30const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
31
32#[derive(Debug, Clone)]
34pub struct BluetoothConfig {
35 pub enabled: bool,
36 pub max_peers: usize,
37}
38
39impl BluetoothConfig {
40 pub fn is_enabled(&self) -> bool {
41 self.enabled && self.max_peers > 0
42 }
43}
44
45impl Default for BluetoothConfig {
46 fn default() -> Self {
47 Self {
48 enabled: false,
49 max_peers: 0,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum BluetoothBackendState {
56 Disabled,
57 Running,
58 Unsupported,
59}
60
61#[derive(Clone)]
62pub struct PendingBluetoothLink {
63 pub link: Arc<dyn BluetoothLink>,
64 pub direction: PeerDirection,
65 pub local_hello_sent: bool,
66 pub peer_hint: Option<String>,
67}
68
69#[async_trait]
70pub trait MobileBluetoothBridge: Send + Sync {
71 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
72}
73
74static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
75
76pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
77 MOBILE_BLUETOOTH_BRIDGE
78 .set(bridge)
79 .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
80}
81
82fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
83 MOBILE_BLUETOOTH_BRIDGE.get().cloned()
84}
85
86#[derive(Clone)]
87pub struct BluetoothPeerRegistrar {
88 state: Arc<WebRTCState>,
89 peer_classifier: PeerClassifier,
90 pools: PoolSettings,
91 max_bluetooth_peers: usize,
92}
93
94impl BluetoothPeerRegistrar {
95 pub fn new(
96 state: Arc<WebRTCState>,
97 peer_classifier: PeerClassifier,
98 pools: PoolSettings,
99 max_bluetooth_peers: usize,
100 ) -> Self {
101 Self {
102 state,
103 peer_classifier,
104 pools,
105 max_bluetooth_peers,
106 }
107 }
108
109 async fn pool_counts(&self) -> (usize, usize) {
110 let peers = self.state.peers.read().await;
111 let mut follows = 0usize;
112 let mut other = 0usize;
113 for entry in peers.values() {
114 if entry.state != ConnectionState::Connected {
115 continue;
116 }
117 match entry.pool {
118 PeerPool::Follows => follows += 1,
119 PeerPool::Other => other += 1,
120 }
121 }
122 (follows, other)
123 }
124
125 async fn bluetooth_peer_count(&self, peer_key: &str) -> usize {
126 let peers = self.state.peers.read().await;
127 peers
128 .values()
129 .filter(|entry| entry.transport == PeerTransport::Bluetooth)
130 .filter(|entry| entry.state == ConnectionState::Connected)
131 .filter(|entry| entry.peer_id.to_string() != peer_key)
132 .count()
133 }
134
135 pub async fn register_connected_peer(
136 &self,
137 peer_id: PeerId,
138 direction: PeerDirection,
139 peer: MeshPeer,
140 ) -> bool {
141 let peer_key = peer_id.to_string();
142 let pool = (self.peer_classifier)(&peer_id.pubkey);
143 let (follows, other) = self.pool_counts().await;
144 let can_accept_pool = match pool {
145 PeerPool::Follows => follows < self.pools.follows.max_connections,
146 PeerPool::Other => other < self.pools.other.max_connections,
147 };
148 if !can_accept_pool {
149 warn!(
150 "Bluetooth peer {} rejected because pool {:?} is full",
151 peer_id.short(),
152 pool
153 );
154 return false;
155 }
156
157 if self.max_bluetooth_peers == 0
158 || self.bluetooth_peer_count(&peer_key).await >= self.max_bluetooth_peers
159 {
160 warn!(
161 "Bluetooth peer {} rejected because max_bluetooth_peers={} reached",
162 peer_id.short(),
163 self.max_bluetooth_peers
164 );
165 return false;
166 }
167
168 let mut peers = self.state.peers.write().await;
169 let duplicate_keys = peers
170 .iter()
171 .filter(|(key, entry)| {
172 key.as_str() != peer_key
173 && entry.transport == PeerTransport::Bluetooth
174 && entry.peer_id.pubkey == peer_id.pubkey
175 })
176 .map(|(key, _)| key.clone())
177 .collect::<Vec<_>>();
178 let was_connected = peers
179 .get(&peer_key)
180 .map(|entry| entry.state == ConnectionState::Connected)
181 .unwrap_or(false);
182 let replaced = peers.insert(
183 peer_key,
184 PeerEntry {
185 peer_id,
186 direction,
187 state: ConnectionState::Connected,
188 last_seen: Instant::now(),
189 peer: Some(peer),
190 pool,
191 transport: PeerTransport::Bluetooth,
192 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
193 bytes_sent: 0,
194 bytes_received: 0,
195 },
196 );
197 let removed_duplicates = duplicate_keys
198 .into_iter()
199 .filter_map(|key| peers.remove(&key))
200 .collect::<Vec<_>>();
201 drop(peers);
202
203 if let Some(previous) = replaced.and_then(|entry| entry.peer) {
204 let _ = previous.close().await;
205 }
206 for duplicate in &removed_duplicates {
207 if let Some(peer) = duplicate.peer.as_ref() {
208 let _ = peer.close().await;
209 }
210 }
211
212 let removed_connected_duplicates = removed_duplicates
213 .iter()
214 .filter(|entry| entry.state == ConnectionState::Connected)
215 .count() as isize;
216 let connected_delta =
217 1isize - if was_connected { 1 } else { 0 } - removed_connected_duplicates;
218 if connected_delta > 0 {
219 self.state.connected_count.fetch_add(
220 connected_delta as usize,
221 std::sync::atomic::Ordering::Relaxed,
222 );
223 } else if connected_delta < 0 {
224 self.state.connected_count.fetch_sub(
225 (-connected_delta) as usize,
226 std::sync::atomic::Ordering::Relaxed,
227 );
228 }
229 true
230 }
231
232 pub async fn unregister_peer(&self, peer_id: &PeerId) {
233 let peer_key = peer_id.to_string();
234 let removed = self.state.peers.write().await.remove(&peer_key);
235 if let Some(entry) = removed {
236 if entry.state == ConnectionState::Connected {
237 self.state
238 .connected_count
239 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
240 }
241 if let Some(peer) = entry.peer {
242 let _ = peer.close().await;
243 }
244 }
245 }
246
247 pub async fn unregister_bluetooth_peer_if_current(
248 &self,
249 peer_id: &PeerId,
250 expected_peer: &Arc<super::BluetoothPeer>,
251 ) {
252 let peer_key = peer_id.to_string();
253 let removed = {
254 let mut peers = self.state.peers.write().await;
255 let matches_current = peers
256 .get(&peer_key)
257 .and_then(|entry| entry.peer.as_ref())
258 .and_then(|peer| match peer {
259 MeshPeer::Bluetooth(current) => Some(Arc::ptr_eq(current, expected_peer)),
260 _ => None,
261 })
262 .unwrap_or(false);
263 if matches_current {
264 peers.remove(&peer_key)
265 } else {
266 None
267 }
268 };
269 if let Some(entry) = removed {
270 if entry.state == ConnectionState::Connected {
271 self.state
272 .connected_count
273 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
274 }
275 if let Some(peer) = entry.peer {
276 let _ = peer.close().await;
277 }
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::webrtc::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
286 use crate::webrtc::session::{MeshPeer, TestMeshPeer};
287
288 #[tokio::test]
289 async fn register_connected_peer_closes_replaced_session() {
290 let state = Arc::new(WebRTCState::new());
291 let registrar = BluetoothPeerRegistrar::new(
292 state.clone(),
293 Arc::new(|_| PeerPool::Other),
294 PoolSettings::default(),
295 2,
296 );
297
298 let peer_id = PeerId::new("peer-pub".to_string());
299 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
300 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
301 let first_ref = first.mock_ref().expect("mock peer").clone();
302
303 assert!(
304 registrar
305 .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
306 .await
307 );
308 assert!(
309 registrar
310 .register_connected_peer(peer_id, PeerDirection::Outbound, second)
311 .await
312 );
313
314 assert!(first_ref.is_closed());
315 }
316
317 #[tokio::test]
318 async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
319 let state = Arc::new(WebRTCState::new());
320 let registrar = BluetoothPeerRegistrar::new(
321 state.clone(),
322 Arc::new(|_| PeerPool::Other),
323 PoolSettings::default(),
324 2,
325 );
326
327 let first_peer_id = PeerId::new("peer-pub".to_string());
328 let second_peer_id = PeerId::new("peer-pub".to_string());
329 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
330 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
331 let first_ref = first.mock_ref().expect("mock peer").clone();
332
333 assert!(
334 registrar
335 .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
336 .await
337 );
338 assert!(
339 registrar
340 .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
341 .await
342 );
343
344 assert!(first_ref.is_closed());
345 let peers = state.peers.read().await;
346 assert!(peers.contains_key(&second_peer_id.to_string()));
347 assert_eq!(peers.len(), 1);
348 assert_eq!(
349 state
350 .connected_count
351 .load(std::sync::atomic::Ordering::Relaxed),
352 1
353 );
354 }
355
356 #[tokio::test]
357 async fn handle_pending_link_unregisters_peer_after_transport_closes() {
358 let (link_a, link_b) = MockBluetoothLink::pair();
359 let state = Arc::new(WebRTCState::new());
360 let registrar = BluetoothPeerRegistrar::new(
361 state.clone(),
362 Arc::new(|_| PeerPool::Other),
363 PoolSettings::default(),
364 2,
365 );
366 let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
367 let local_peer_id = PeerId::new("local-pub".to_string());
368 let remote_peer_id = PeerId::new("remote-pub".to_string());
369 let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
370
371 send_hello(&remote_link, &remote_peer_id)
372 .await
373 .expect("send hello");
374
375 let accepted = handle_pending_link(
376 PendingBluetoothLink {
377 link: link_a.clone(),
378 direction: PeerDirection::Outbound,
379 local_hello_sent: false,
380 peer_hint: Some("mock-ble".to_string()),
381 },
382 BluetoothRuntimeContext {
383 my_peer_id: local_peer_id,
384 store: None,
385 nostr_relay: None,
386 mesh_frame_tx,
387 registrar: registrar.clone(),
388 },
389 )
390 .await
391 .expect("handle pending link");
392
393 assert!(accepted);
394 assert!(state
395 .peers
396 .read()
397 .await
398 .contains_key(&remote_peer_id.to_string()));
399
400 link_a.close().await.expect("close local transport");
401
402 tokio::time::timeout(Duration::from_secs(2), async {
403 loop {
404 if !state
405 .peers
406 .read()
407 .await
408 .contains_key(&remote_peer_id.to_string())
409 {
410 break;
411 }
412 tokio::time::sleep(Duration::from_millis(25)).await;
413 }
414 })
415 .await
416 .expect("peer should be unregistered");
417 }
418}
419
420#[derive(Clone)]
421pub struct BluetoothRuntimeContext {
422 pub my_peer_id: PeerId,
423 pub store: Option<Arc<dyn ContentStore>>,
424 pub nostr_relay: Option<Arc<NostrRelay>>,
425 pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
426 pub registrar: BluetoothPeerRegistrar,
427}
428
429pub struct BluetoothMesh {
431 config: BluetoothConfig,
432}
433
434impl BluetoothMesh {
435 pub fn new(config: BluetoothConfig) -> Self {
436 Self { config }
437 }
438
439 pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
440 if !self.config.is_enabled() {
441 info!("Bluetooth transport disabled");
442 return BluetoothBackendState::Disabled;
443 }
444
445 let mut started = false;
446
447 if let Some(bridge) = mobile_bluetooth_bridge() {
448 info!(
449 "Starting mobile Bluetooth bridge for peer {}",
450 context.my_peer_id.short()
451 );
452 match bridge.start(context.my_peer_id.to_string()).await {
453 Ok(mut rx) => {
454 info!("Mobile Bluetooth bridge is running");
455 let ctx = context.clone();
456 tokio::spawn(async move {
457 while let Some(link) = rx.recv().await {
458 if let Err(err) = handle_pending_link(link, ctx.clone()).await {
459 warn!("Mobile Bluetooth link failed: {}", err);
460 }
461 }
462 });
463 started = true;
464 }
465 Err(err) => {
466 warn!("Failed to start mobile Bluetooth bridge: {}", err);
467 }
468 }
469 }
470
471 #[cfg(target_os = "macos")]
472 {
473 if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
474 {
475 warn!("Failed to start macOS Bluetooth central: {}", err);
476 } else {
477 started = true;
478 }
479 }
480
481 if started {
482 BluetoothBackendState::Running
483 } else {
484 warn!(
485 "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
486 self.config.max_peers
487 );
488 BluetoothBackendState::Unsupported
489 }
490 }
491}
492
493async fn handle_pending_link(
494 link: PendingBluetoothLink,
495 context: BluetoothRuntimeContext,
496) -> Result<bool> {
497 if let Some(peer_hint) = link.peer_hint.as_deref() {
498 debug!("Handling pending Bluetooth link {}", peer_hint);
499 }
500 info!(
501 "Handling pending Bluetooth link direction={} local_hello_sent={}",
502 link.direction, link.local_hello_sent
503 );
504 let remote_peer_id = receive_remote_hello(&link.link).await?;
505 info!(
506 "Received Bluetooth hello from {} while attaching pending link",
507 remote_peer_id.short()
508 );
509 if !link.local_hello_sent {
510 send_hello(&link.link, &context.my_peer_id).await?;
511 info!(
512 "Sent Bluetooth hello to {} while attaching pending link",
513 remote_peer_id.short()
514 );
515 }
516
517 let bluetooth_peer = super::BluetoothPeer::new(
518 remote_peer_id.clone(),
519 link.direction,
520 link.link,
521 context.store.clone(),
522 context.nostr_relay.clone(),
523 Some(context.mesh_frame_tx.clone()),
524 Some(context.registrar.state.clone()),
525 );
526 let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
527
528 if !context
529 .registrar
530 .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
531 .await
532 {
533 warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
534 bluetooth_peer.close().await?;
535 return Ok(false);
536 } else {
537 info!("Bluetooth peer {} connected", remote_peer_id.short());
538 spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
539 }
540 Ok(true)
541}
542
543fn spawn_bluetooth_disconnect_watch(
544 peer_id: PeerId,
545 peer: Arc<super::BluetoothPeer>,
546 registrar: BluetoothPeerRegistrar,
547) {
548 tokio::spawn(async move {
549 loop {
550 if !peer.is_connected() {
551 registrar
552 .unregister_bluetooth_peer_if_current(&peer_id, &peer)
553 .await;
554 break;
555 }
556 tokio::time::sleep(Duration::from_millis(250)).await;
557 }
558 });
559}
560
561#[derive(serde::Serialize, serde::Deserialize)]
562#[serde(rename_all = "camelCase")]
563struct BluetoothHello {
564 #[serde(rename = "type")]
565 kind: String,
566 peer_id: String,
567}
568
569async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
570 let hello = BluetoothHello {
571 kind: "hello".to_string(),
572 peer_id: my_peer_id.to_string(),
573 };
574 link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
575 .await
576}
577
578async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
579 let result = tokio::time::timeout(HELLO_TIMEOUT, async {
580 loop {
581 match link.recv().await {
582 Some(BluetoothFrame::Text(text)) => {
583 if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
584 if hello.kind == "hello" {
585 return PeerId::from_string(&hello.peer_id)
586 .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
587 }
588 }
589 }
590 Some(BluetoothFrame::Binary(_)) => {}
591 None => return Err(anyhow!("bluetooth link closed before hello")),
592 }
593 }
594 })
595 .await;
596
597 match result {
598 Ok(peer_id) => peer_id,
599 Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
600 }
601}
602
603#[cfg(target_os = "macos")]
604mod macos {
605 use super::*;
606 use btleplug::api::{
607 Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
608 ScanFilter, ValueNotification, WriteType,
609 };
610 use btleplug::platform::{Manager, Peripheral};
611 use futures::StreamExt;
612 use std::collections::HashSet;
613 use tokio::sync::Mutex;
614 use uuid::Uuid;
615
616 const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
617
618 #[derive(Clone)]
619 struct AdvertisementSnapshot {
620 last_seen: Instant,
621 peer_hint: Option<String>,
622 }
623
624 pub(super) async fn start_macos_central(
625 config: BluetoothConfig,
626 context: BluetoothRuntimeContext,
627 ) -> Result<()> {
628 struct ConnectedPeripheral {
629 peripheral: Peripheral,
630 link: Arc<dyn BluetoothLink>,
631 peer_hint: Option<String>,
632 }
633
634 let manager = Manager::new().await?;
635 let adapters = manager.adapters().await?;
636 let Some(adapter) = adapters.into_iter().next() else {
637 warn!("No Bluetooth adapters available on macOS");
638 return Ok(());
639 };
640 info!(
641 "Starting macOS Bluetooth central for peer {} (max_peers={})",
642 context.my_peer_id.short(),
643 config.max_peers
644 );
645 let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
646 let mut events = adapter.events().await?;
647 let fresh_advertisements =
648 Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
649 let fresh_advertisements_for_events = fresh_advertisements.clone();
650 tokio::spawn(async move {
651 while let Some(event) = events.next().await {
652 match event {
653 CentralEvent::ServicesAdvertisement { id, services }
654 if services.contains(&service_uuid) =>
655 {
656 let mut advertisements = fresh_advertisements_for_events.lock().await;
657 let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
658 AdvertisementSnapshot {
659 last_seen: Instant::now(),
660 peer_hint: None,
661 }
662 });
663 entry.last_seen = Instant::now();
664 }
665 CentralEvent::ServiceDataAdvertisement { id, service_data } => {
666 if let Some(data) = service_data.get(&service_uuid) {
667 let mut advertisements = fresh_advertisements_for_events.lock().await;
668 let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
669 AdvertisementSnapshot {
670 last_seen: Instant::now(),
671 peer_hint: None,
672 }
673 });
674 entry.last_seen = Instant::now();
675 if !data.is_empty() {
676 entry.peer_hint = Some(hex::encode(data));
677 }
678 }
679 }
680 CentralEvent::DeviceDisconnected(id) => {
681 fresh_advertisements_for_events
682 .lock()
683 .await
684 .remove(&id.to_string());
685 }
686 _ => {}
687 }
688 }
689 });
690
691 tokio::spawn(async move {
692 let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
693 loop {
694 if let Err(err) = adapter
695 .start_scan(ScanFilter {
696 services: vec![service_uuid],
697 })
698 .await
699 {
700 warn!("Failed to start BLE scan: {}", err);
701 tokio::time::sleep(Duration::from_secs(5)).await;
702 continue;
703 }
704 tokio::time::sleep(Duration::from_secs(2)).await;
705
706 let peripherals = match adapter.peripherals().await {
707 Ok(peripherals) => peripherals,
708 Err(err) => {
709 warn!("Failed to list BLE peripherals: {}", err);
710 tokio::time::sleep(Duration::from_secs(5)).await;
711 continue;
712 }
713 };
714
715 connected.retain(|_, connection| {
716 futures::executor::block_on(async {
717 let peripheral_connected =
718 connection.peripheral.is_connected().await.unwrap_or(false);
719 let link_open = connection.link.is_open();
720 if !peripheral_connected || !link_open {
721 if peripheral_connected {
722 let _ = connection.peripheral.disconnect().await;
723 }
724 false
725 } else {
726 true
727 }
728 })
729 });
730
731 let advertisement_snapshot = {
732 let now = Instant::now();
733 let mut seen = fresh_advertisements.lock().await;
734 seen.retain(|_, advertisement| {
735 now.saturating_duration_since(advertisement.last_seen)
736 <= FRESH_ADVERTISEMENT_WINDOW
737 });
738 seen.clone()
739 };
740
741 let mut candidates = Vec::new();
742 for peripheral in peripherals {
743 let peripheral_id = peripheral.id().to_string();
744 if connected.contains_key(&peripheral_id) {
745 continue;
746 }
747 let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
748 else {
749 debug!(
750 "Skipping cached BLE peripheral {} without a fresh advertisement",
751 peripheral_id
752 );
753 continue;
754 };
755 let properties = match peripheral.properties().await {
756 Ok(Some(properties)) => properties,
757 _ => continue,
758 };
759 if !properties.services.contains(&service_uuid) {
760 continue;
761 }
762 candidates.push((peripheral, peripheral_id, advertisement));
763 }
764
765 candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
766 let mut represented_peer_hints = connected
767 .values()
768 .filter_map(|connection| connection.peer_hint.clone())
769 .collect::<HashSet<_>>();
770
771 for (peripheral, peripheral_id, advertisement) in candidates {
772 if connected.len() >= config.max_peers {
773 break;
774 }
775 if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
776 if !represented_peer_hints.insert(peer_hint.clone()) {
777 debug!(
778 "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
779 peripheral_id,
780 peer_hint
781 );
782 continue;
783 }
784 }
785 info!(
786 "Discovered BLE peripheral {} advertising htree service{}",
787 peripheral_id,
788 advertisement
789 .peer_hint
790 .as_ref()
791 .map(|hint| format!(" (peer hint {})", hint))
792 .unwrap_or_default()
793 );
794 match connect_peripheral(peripheral.clone()).await {
795 Ok(Some(link)) => {
796 let tracked_link = link.clone();
797 let pending = PendingBluetoothLink {
798 link,
799 direction: PeerDirection::Outbound,
800 local_hello_sent: false,
801 peer_hint: advertisement
802 .peer_hint
803 .clone()
804 .or(Some(peripheral_id.clone())),
805 };
806 match handle_pending_link(pending, context.clone()).await {
807 Ok(true) => {
808 connected.insert(
809 peripheral_id.clone(),
810 ConnectedPeripheral {
811 peripheral,
812 link: tracked_link,
813 peer_hint: advertisement.peer_hint.clone(),
814 },
815 );
816 }
817 Ok(false) => {}
818 Err(err) => {
819 warn!("Failed to attach BLE peripheral: {}", err);
820 }
821 }
822 }
823 Ok(None) => {}
824 Err(err) => {
825 warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
826 }
827 }
828 }
829
830 tokio::time::sleep(Duration::from_secs(5)).await;
831 }
832 });
833
834 Ok(())
835 }
836
837 async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
838 let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
839 let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
840 let peripheral_id = peripheral.id().to_string();
841
842 let mut connect_timed_out = false;
843 if !peripheral.is_connected().await? {
844 info!("Connecting to BLE peripheral {}", peripheral_id);
845 match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
846 Ok(Ok(())) => {}
847 Ok(Err(err)) => return Err(err.into()),
848 Err(_) => {
849 warn!(
850 "BLE connect timed out for {}; probing services before giving up",
851 peripheral_id
852 );
853 connect_timed_out = true;
854 }
855 }
856 }
857 if connect_timed_out && !peripheral.is_connected().await.unwrap_or(false) {
858 warn!(
859 "BLE peripheral {} never reached a connected state after timeout; retrying later",
860 peripheral_id
861 );
862 let _ = peripheral.disconnect().await;
863 return Ok(None);
864 }
865 tokio::time::sleep(Duration::from_millis(300)).await;
866 let mut rx_char = None;
867 let mut tx_char = None;
868 for attempt in 1..=8 {
869 info!(
870 "Discovering BLE services for {} (attempt {}/8)",
871 peripheral_id, attempt
872 );
873 if let Err(err) = peripheral.discover_services().await {
874 if attempt == 8 {
875 if connect_timed_out {
876 warn!(
877 "BLE service discovery failed for {} after soft connect timeout: {}",
878 peripheral_id, err
879 );
880 let _ = peripheral.disconnect().await;
881 return Ok(None);
882 }
883 return Err(err.into());
884 }
885 warn!(
886 "BLE service discovery attempt {}/8 failed for {}: {}",
887 attempt, peripheral_id, err
888 );
889 tokio::time::sleep(Duration::from_millis(500)).await;
890 continue;
891 }
892
893 let characteristics = peripheral.characteristics();
894 if !characteristics.is_empty() {
895 let uuids = characteristics
896 .iter()
897 .map(|c| c.uuid.to_string())
898 .collect::<Vec<_>>()
899 .join(", ");
900 info!(
901 "BLE peripheral {} characteristics: {}",
902 peripheral_id, uuids
903 );
904 }
905 rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
906 tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
907 if rx_char.is_some() && tx_char.is_some() {
908 break;
909 }
910 tokio::time::sleep(Duration::from_millis(500)).await;
911 }
912 let Some(rx_char) = rx_char else {
913 warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
914 let _ = peripheral.disconnect().await;
915 return Ok(None);
916 };
917 let Some(tx_char) = tx_char else {
918 warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
919 let _ = peripheral.disconnect().await;
920 return Ok(None);
921 };
922 if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
923 warn!(
924 "BLE peripheral {} TX characteristic is missing NOTIFY",
925 peripheral_id
926 );
927 let _ = peripheral.disconnect().await;
928 return Ok(None);
929 }
930 let notifications = peripheral.notifications().await?;
931 info!("Subscribing to BLE notifications for {}", peripheral_id);
932 peripheral.subscribe(&tx_char).await?;
933
934 let initial_frames = match peripheral.read(&tx_char).await {
935 Ok(bytes) if !bytes.is_empty() => {
936 info!(
937 "Read initial BLE hello bytes from {} ({} bytes)",
938 peripheral_id,
939 bytes.len()
940 );
941 let mut decoder = FrameDecoder::new();
942 match decoder.push(&bytes) {
943 Ok(frames) => frames,
944 Err(err) => {
945 warn!(
946 "Discarding malformed initial BLE frame from {}: {}",
947 peripheral_id, err
948 );
949 Vec::new()
950 }
951 }
952 }
953 Ok(_) => Vec::new(),
954 Err(err) => {
955 warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
956 Vec::new()
957 }
958 };
959
960 info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
961 Ok(Some(MacosCentralLink::new(
962 peripheral,
963 rx_char,
964 notifications,
965 initial_frames,
966 )))
967 }
968
969 struct FrameDecoder {
970 buffer: Vec<u8>,
971 }
972
973 impl FrameDecoder {
974 fn new() -> Self {
975 Self { buffer: Vec::new() }
976 }
977
978 fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
979 self.buffer.extend_from_slice(chunk);
980 let mut frames = Vec::new();
981 loop {
982 if self.buffer.len() < 5 {
983 break;
984 }
985 let kind = self.buffer[0];
986 let len = u32::from_be_bytes([
987 self.buffer[1],
988 self.buffer[2],
989 self.buffer[3],
990 self.buffer[4],
991 ]) as usize;
992 if self.buffer.len() < 5 + len {
993 break;
994 }
995 let payload = self.buffer[5..5 + len].to_vec();
996 self.buffer.drain(..5 + len);
997 let frame = match kind {
998 1 => BluetoothFrame::Text(String::from_utf8(payload)?),
999 2 => BluetoothFrame::Binary(payload),
1000 _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
1001 };
1002 frames.push(frame);
1003 }
1004 Ok(frames)
1005 }
1006 }
1007
1008 fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
1009 match frame {
1010 BluetoothFrame::Text(text) => {
1011 let payload = text.into_bytes();
1012 let mut out = Vec::with_capacity(5 + payload.len());
1013 out.push(1);
1014 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1015 out.extend_from_slice(&payload);
1016 out
1017 }
1018 BluetoothFrame::Binary(payload) => {
1019 let mut out = Vec::with_capacity(5 + payload.len());
1020 out.push(2);
1021 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1022 out.extend_from_slice(&payload);
1023 out
1024 }
1025 }
1026 }
1027
1028 struct MacosCentralLink {
1029 peripheral: Peripheral,
1030 rx_char: Characteristic,
1031 open: std::sync::atomic::AtomicBool,
1032 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
1033 }
1034
1035 impl MacosCentralLink {
1036 fn new(
1037 peripheral: Peripheral,
1038 rx_char: Characteristic,
1039 mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
1040 initial_frames: Vec<BluetoothFrame>,
1041 ) -> Arc<Self> {
1042 let (tx, rx) = mpsc::channel(64);
1043 let link = Arc::new(Self {
1044 peripheral,
1045 rx_char,
1046 open: std::sync::atomic::AtomicBool::new(true),
1047 rx: Mutex::new(rx),
1048 });
1049
1050 for frame in initial_frames {
1051 let _ = tx.try_send(frame);
1052 }
1053
1054 let weak = Arc::downgrade(&link);
1055 tokio::spawn(async move {
1056 let mut decoder = FrameDecoder::new();
1057 while let Some(notification) = notifications.next().await {
1058 let Ok(frames) = decoder.push(¬ification.value) else {
1059 break;
1060 };
1061 for frame in frames {
1062 if tx.send(frame).await.is_err() {
1063 break;
1064 }
1065 }
1066 }
1067 if let Some(link) = weak.upgrade() {
1068 link.open.store(false, std::sync::atomic::Ordering::Relaxed);
1069 }
1070 });
1071
1072 link
1073 }
1074 }
1075
1076 #[async_trait]
1077 impl BluetoothLink for MacosCentralLink {
1078 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
1079 let bytes = encode_frame(frame);
1080 for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
1081 self.peripheral
1082 .write(&self.rx_char, chunk, WriteType::WithResponse)
1083 .await?;
1084 }
1085 Ok(())
1086 }
1087
1088 async fn recv(&self) -> Option<BluetoothFrame> {
1089 self.rx.lock().await.recv().await
1090 }
1091
1092 fn is_open(&self) -> bool {
1093 self.open.load(std::sync::atomic::Ordering::Relaxed)
1094 }
1095
1096 async fn close(&self) -> Result<()> {
1097 self.open.store(false, std::sync::atomic::Ordering::Relaxed);
1098 let _ = self.peripheral.disconnect().await;
1099 Ok(())
1100 }
1101 }
1102}