1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3use std::collections::BTreeSet;
4#[cfg(target_os = "macos")]
5use std::collections::HashMap;
6use std::sync::{Arc, OnceLock};
7use std::time::{Duration, Instant};
8use tokio::sync::mpsc;
9use tracing::{debug, info, warn};
10
11use crate::nostr_relay::NostrRelay;
12
13use super::bluetooth_peer::{BluetoothFrame, BluetoothLink};
14use super::peer::ContentStore;
15use super::session::MeshPeer;
16use super::signaling::{
17 ConnectionState, PeerClassifier, PeerEntry, PeerSignalPath, PeerTransport, WebRTCState,
18};
19use super::types::{MeshNostrFrame, PeerDirection, PeerId, PeerPool, PoolSettings};
20
21#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
22pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_CHUNK_BYTES: usize = 180;
29const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
30
31#[derive(Debug, Clone)]
33pub struct BluetoothConfig {
34 pub enabled: bool,
35 pub max_peers: usize,
36}
37
38impl BluetoothConfig {
39 pub fn is_enabled(&self) -> bool {
40 self.enabled && self.max_peers > 0
41 }
42}
43
44impl Default for BluetoothConfig {
45 fn default() -> Self {
46 Self {
47 enabled: false,
48 max_peers: 0,
49 }
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum BluetoothBackendState {
55 Disabled,
56 Running,
57 Unsupported,
58}
59
60#[derive(Clone)]
61pub struct PendingBluetoothLink {
62 pub link: Arc<dyn BluetoothLink>,
63 pub direction: PeerDirection,
64 pub local_hello_sent: bool,
65 pub peer_hint: Option<String>,
66}
67
68#[async_trait]
69pub trait MobileBluetoothBridge: Send + Sync {
70 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
71}
72
73static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
74
75pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
76 MOBILE_BLUETOOTH_BRIDGE
77 .set(bridge)
78 .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
79}
80
81fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
82 MOBILE_BLUETOOTH_BRIDGE.get().cloned()
83}
84
85#[derive(Clone)]
86pub struct BluetoothPeerRegistrar {
87 state: Arc<WebRTCState>,
88 peer_classifier: PeerClassifier,
89 pools: PoolSettings,
90 max_bluetooth_peers: usize,
91}
92
93impl BluetoothPeerRegistrar {
94 pub fn new(
95 state: Arc<WebRTCState>,
96 peer_classifier: PeerClassifier,
97 pools: PoolSettings,
98 max_bluetooth_peers: usize,
99 ) -> Self {
100 Self {
101 state,
102 peer_classifier,
103 pools,
104 max_bluetooth_peers,
105 }
106 }
107
108 async fn pool_counts(&self) -> (usize, usize) {
109 let peers = self.state.peers.read().await;
110 let mut follows = 0usize;
111 let mut other = 0usize;
112 for entry in peers.values() {
113 if entry.state != ConnectionState::Connected {
114 continue;
115 }
116 match entry.pool {
117 PeerPool::Follows => follows += 1,
118 PeerPool::Other => other += 1,
119 }
120 }
121 (follows, other)
122 }
123
124 async fn bluetooth_peer_count(&self, peer_key: &str) -> usize {
125 let peers = self.state.peers.read().await;
126 peers
127 .values()
128 .filter(|entry| entry.transport == PeerTransport::Bluetooth)
129 .filter(|entry| entry.state == ConnectionState::Connected)
130 .filter(|entry| entry.peer_id.to_string() != peer_key)
131 .count()
132 }
133
134 pub async fn register_connected_peer(
135 &self,
136 peer_id: PeerId,
137 direction: PeerDirection,
138 peer: MeshPeer,
139 ) -> bool {
140 let peer_key = peer_id.to_string();
141 let pool = (self.peer_classifier)(&peer_id.pubkey);
142 let (follows, other) = self.pool_counts().await;
143 let can_accept_pool = match pool {
144 PeerPool::Follows => follows < self.pools.follows.max_connections,
145 PeerPool::Other => other < self.pools.other.max_connections,
146 };
147 if !can_accept_pool {
148 warn!(
149 "Bluetooth peer {} rejected because pool {:?} is full",
150 peer_id.short(),
151 pool
152 );
153 return false;
154 }
155
156 if self.max_bluetooth_peers == 0
157 || self.bluetooth_peer_count(&peer_key).await >= self.max_bluetooth_peers
158 {
159 warn!(
160 "Bluetooth peer {} rejected because max_bluetooth_peers={} reached",
161 peer_id.short(),
162 self.max_bluetooth_peers
163 );
164 return false;
165 }
166
167 let mut peers = self.state.peers.write().await;
168 let duplicate_keys = peers
169 .iter()
170 .filter(|(key, entry)| {
171 key.as_str() != peer_key
172 && entry.transport == PeerTransport::Bluetooth
173 && entry.peer_id.pubkey == peer_id.pubkey
174 })
175 .map(|(key, _)| key.clone())
176 .collect::<Vec<_>>();
177 let was_connected = peers
178 .get(&peer_key)
179 .map(|entry| entry.state == ConnectionState::Connected)
180 .unwrap_or(false);
181 let replaced = peers.insert(
182 peer_key,
183 PeerEntry {
184 peer_id,
185 direction,
186 state: ConnectionState::Connected,
187 last_seen: Instant::now(),
188 peer: Some(peer),
189 pool,
190 transport: PeerTransport::Bluetooth,
191 signal_paths: BTreeSet::from([PeerSignalPath::Bluetooth]),
192 bytes_sent: 0,
193 bytes_received: 0,
194 },
195 );
196 let removed_duplicates = duplicate_keys
197 .into_iter()
198 .filter_map(|key| peers.remove(&key))
199 .collect::<Vec<_>>();
200 drop(peers);
201
202 if let Some(previous) = replaced.and_then(|entry| entry.peer) {
203 let _ = previous.close().await;
204 }
205 for duplicate in &removed_duplicates {
206 if let Some(peer) = duplicate.peer.as_ref() {
207 let _ = peer.close().await;
208 }
209 }
210
211 let removed_connected_duplicates = removed_duplicates
212 .iter()
213 .filter(|entry| entry.state == ConnectionState::Connected)
214 .count() as isize;
215 let connected_delta =
216 1isize - if was_connected { 1 } else { 0 } - removed_connected_duplicates;
217 if connected_delta > 0 {
218 self.state.connected_count.fetch_add(
219 connected_delta as usize,
220 std::sync::atomic::Ordering::Relaxed,
221 );
222 } else if connected_delta < 0 {
223 self.state.connected_count.fetch_sub(
224 (-connected_delta) as usize,
225 std::sync::atomic::Ordering::Relaxed,
226 );
227 }
228 true
229 }
230
231 pub async fn unregister_peer(&self, peer_id: &PeerId) {
232 let peer_key = peer_id.to_string();
233 let removed = self.state.peers.write().await.remove(&peer_key);
234 if let Some(entry) = removed {
235 if entry.state == ConnectionState::Connected {
236 self.state
237 .connected_count
238 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
239 }
240 if let Some(peer) = entry.peer {
241 let _ = peer.close().await;
242 }
243 }
244 }
245
246 pub async fn unregister_bluetooth_peer_if_current(
247 &self,
248 peer_id: &PeerId,
249 expected_peer: &Arc<super::BluetoothPeer>,
250 ) {
251 let peer_key = peer_id.to_string();
252 let removed = {
253 let mut peers = self.state.peers.write().await;
254 let matches_current = peers
255 .get(&peer_key)
256 .and_then(|entry| entry.peer.as_ref())
257 .and_then(|peer| match peer {
258 MeshPeer::Bluetooth(current) => Some(Arc::ptr_eq(current, expected_peer)),
259 _ => None,
260 })
261 .unwrap_or(false);
262 if matches_current {
263 peers.remove(&peer_key)
264 } else {
265 None
266 }
267 };
268 if let Some(entry) = removed {
269 if entry.state == ConnectionState::Connected {
270 self.state
271 .connected_count
272 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
273 }
274 if let Some(peer) = entry.peer {
275 let _ = peer.close().await;
276 }
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use crate::webrtc::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
285 use crate::webrtc::session::{MeshPeer, TestMeshPeer};
286
287 #[tokio::test]
288 async fn register_connected_peer_closes_replaced_session() {
289 let state = Arc::new(WebRTCState::new());
290 let registrar = BluetoothPeerRegistrar::new(
291 state.clone(),
292 Arc::new(|_| PeerPool::Other),
293 PoolSettings::default(),
294 2,
295 );
296
297 let peer_id = PeerId::new("peer-pub".to_string(), Some("session".to_string()));
298 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
299 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
300 let first_ref = first.mock_ref().expect("mock peer").clone();
301
302 assert!(
303 registrar
304 .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
305 .await
306 );
307 assert!(
308 registrar
309 .register_connected_peer(peer_id, PeerDirection::Outbound, second)
310 .await
311 );
312
313 assert!(first_ref.is_closed());
314 }
315
316 #[tokio::test]
317 async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
318 let state = Arc::new(WebRTCState::new());
319 let registrar = BluetoothPeerRegistrar::new(
320 state.clone(),
321 Arc::new(|_| PeerPool::Other),
322 PoolSettings::default(),
323 2,
324 );
325
326 let first_peer_id = PeerId::new("peer-pub".to_string(), Some("session-a".to_string()));
327 let second_peer_id = PeerId::new("peer-pub".to_string(), Some("session-b".to_string()));
328 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
329 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
330 let first_ref = first.mock_ref().expect("mock peer").clone();
331
332 assert!(
333 registrar
334 .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
335 .await
336 );
337 assert!(
338 registrar
339 .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
340 .await
341 );
342
343 assert!(first_ref.is_closed());
344 let peers = state.peers.read().await;
345 assert!(!peers.contains_key(&first_peer_id.to_string()));
346 assert!(peers.contains_key(&second_peer_id.to_string()));
347 assert_eq!(
348 state
349 .connected_count
350 .load(std::sync::atomic::Ordering::Relaxed),
351 1
352 );
353 }
354
355 #[tokio::test]
356 async fn handle_pending_link_unregisters_peer_after_transport_closes() {
357 let (link_a, link_b) = MockBluetoothLink::pair();
358 let state = Arc::new(WebRTCState::new());
359 let registrar = BluetoothPeerRegistrar::new(
360 state.clone(),
361 Arc::new(|_| PeerPool::Other),
362 PoolSettings::default(),
363 2,
364 );
365 let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
366 let local_peer_id = PeerId::new("local-pub".to_string(), Some("local-session".to_string()));
367 let remote_peer_id =
368 PeerId::new("remote-pub".to_string(), Some("remote-session".to_string()));
369 let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
370
371 send_hello(&remote_link, &remote_peer_id)
372 .await
373 .expect("send hello");
374
375 let accepted = handle_pending_link(
376 PendingBluetoothLink {
377 link: link_a.clone(),
378 direction: PeerDirection::Outbound,
379 local_hello_sent: false,
380 peer_hint: Some("mock-ble".to_string()),
381 },
382 BluetoothRuntimeContext {
383 my_peer_id: local_peer_id,
384 store: None,
385 nostr_relay: None,
386 mesh_frame_tx,
387 registrar: registrar.clone(),
388 },
389 )
390 .await
391 .expect("handle pending link");
392
393 assert!(accepted);
394 assert!(state
395 .peers
396 .read()
397 .await
398 .contains_key(&remote_peer_id.to_string()));
399
400 link_a.close().await.expect("close local transport");
401
402 tokio::time::timeout(Duration::from_secs(2), async {
403 loop {
404 if !state
405 .peers
406 .read()
407 .await
408 .contains_key(&remote_peer_id.to_string())
409 {
410 break;
411 }
412 tokio::time::sleep(Duration::from_millis(25)).await;
413 }
414 })
415 .await
416 .expect("peer should be unregistered");
417 }
418}
419
420#[derive(Clone)]
421pub struct BluetoothRuntimeContext {
422 pub my_peer_id: PeerId,
423 pub store: Option<Arc<dyn ContentStore>>,
424 pub nostr_relay: Option<Arc<NostrRelay>>,
425 pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
426 pub registrar: BluetoothPeerRegistrar,
427}
428
429pub struct BluetoothMesh {
431 config: BluetoothConfig,
432}
433
434impl BluetoothMesh {
435 pub fn new(config: BluetoothConfig) -> Self {
436 Self { config }
437 }
438
439 pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
440 if !self.config.is_enabled() {
441 info!("Bluetooth transport disabled");
442 return BluetoothBackendState::Disabled;
443 }
444
445 let mut started = false;
446
447 if let Some(bridge) = mobile_bluetooth_bridge() {
448 info!(
449 "Starting mobile Bluetooth bridge for peer {}",
450 context.my_peer_id.short()
451 );
452 match bridge.start(context.my_peer_id.to_string()).await {
453 Ok(mut rx) => {
454 info!("Mobile Bluetooth bridge is running");
455 let ctx = context.clone();
456 tokio::spawn(async move {
457 while let Some(link) = rx.recv().await {
458 if let Err(err) = handle_pending_link(link, ctx.clone()).await {
459 warn!("Mobile Bluetooth link failed: {}", err);
460 }
461 }
462 });
463 started = true;
464 }
465 Err(err) => {
466 warn!("Failed to start mobile Bluetooth bridge: {}", err);
467 }
468 }
469 }
470
471 #[cfg(target_os = "macos")]
472 {
473 if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
474 {
475 warn!("Failed to start macOS Bluetooth central: {}", err);
476 } else {
477 started = true;
478 }
479 }
480
481 if started {
482 BluetoothBackendState::Running
483 } else {
484 warn!(
485 "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
486 self.config.max_peers
487 );
488 BluetoothBackendState::Unsupported
489 }
490 }
491}
492
493async fn handle_pending_link(
494 link: PendingBluetoothLink,
495 context: BluetoothRuntimeContext,
496) -> Result<bool> {
497 if let Some(peer_hint) = link.peer_hint.as_deref() {
498 debug!("Handling pending Bluetooth link {}", peer_hint);
499 }
500 let remote_peer_id = receive_remote_hello(&link.link).await?;
501 if !link.local_hello_sent {
502 send_hello(&link.link, &context.my_peer_id).await?;
503 }
504
505 let bluetooth_peer = super::BluetoothPeer::new(
506 remote_peer_id.clone(),
507 link.direction,
508 link.link,
509 context.store.clone(),
510 context.nostr_relay.clone(),
511 Some(context.mesh_frame_tx.clone()),
512 Some(context.registrar.state.clone()),
513 );
514 let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
515
516 if !context
517 .registrar
518 .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
519 .await
520 {
521 warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
522 bluetooth_peer.close().await?;
523 return Ok(false);
524 } else {
525 info!("Bluetooth peer {} connected", remote_peer_id.short());
526 spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
527 }
528 Ok(true)
529}
530
531fn spawn_bluetooth_disconnect_watch(
532 peer_id: PeerId,
533 peer: Arc<super::BluetoothPeer>,
534 registrar: BluetoothPeerRegistrar,
535) {
536 tokio::spawn(async move {
537 loop {
538 if !peer.is_connected() {
539 registrar
540 .unregister_bluetooth_peer_if_current(&peer_id, &peer)
541 .await;
542 break;
543 }
544 tokio::time::sleep(Duration::from_millis(250)).await;
545 }
546 });
547}
548
549#[derive(serde::Serialize, serde::Deserialize)]
550#[serde(rename_all = "camelCase")]
551struct BluetoothHello {
552 #[serde(rename = "type")]
553 kind: String,
554 peer_id: String,
555}
556
557async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
558 let hello = BluetoothHello {
559 kind: "hello".to_string(),
560 peer_id: my_peer_id.to_string(),
561 };
562 link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
563 .await
564}
565
566async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
567 let result = tokio::time::timeout(HELLO_TIMEOUT, async {
568 loop {
569 match link.recv().await {
570 Some(BluetoothFrame::Text(text)) => {
571 if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
572 if hello.kind == "hello" {
573 return PeerId::from_string(&hello.peer_id)
574 .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
575 }
576 }
577 }
578 Some(BluetoothFrame::Binary(_)) => {}
579 None => return Err(anyhow!("bluetooth link closed before hello")),
580 }
581 }
582 })
583 .await;
584
585 match result {
586 Ok(peer_id) => peer_id,
587 Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
588 }
589}
590
591#[cfg(target_os = "macos")]
592mod macos {
593 use super::*;
594 use btleplug::api::{
595 Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
596 ScanFilter, ValueNotification, WriteType,
597 };
598 use btleplug::platform::{Manager, Peripheral};
599 use futures::StreamExt;
600 use std::collections::HashSet;
601 use tokio::sync::Mutex;
602 use uuid::Uuid;
603
604 const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
605
606 #[derive(Clone)]
607 struct AdvertisementSnapshot {
608 last_seen: Instant,
609 peer_hint: Option<String>,
610 }
611
612 pub(super) async fn start_macos_central(
613 config: BluetoothConfig,
614 context: BluetoothRuntimeContext,
615 ) -> Result<()> {
616 struct ConnectedPeripheral {
617 peripheral: Peripheral,
618 link: Arc<dyn BluetoothLink>,
619 peer_hint: Option<String>,
620 }
621
622 let manager = Manager::new().await?;
623 let adapters = manager.adapters().await?;
624 let Some(adapter) = adapters.into_iter().next() else {
625 warn!("No Bluetooth adapters available on macOS");
626 return Ok(());
627 };
628 info!(
629 "Starting macOS Bluetooth central for peer {} (max_peers={})",
630 context.my_peer_id.short(),
631 config.max_peers
632 );
633 let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
634 let mut events = adapter.events().await?;
635 let fresh_advertisements =
636 Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
637 let fresh_advertisements_for_events = fresh_advertisements.clone();
638 tokio::spawn(async move {
639 while let Some(event) = events.next().await {
640 match event {
641 CentralEvent::ServicesAdvertisement { id, services }
642 if services.contains(&service_uuid) =>
643 {
644 let mut advertisements = fresh_advertisements_for_events.lock().await;
645 let entry = advertisements
646 .entry(id.to_string())
647 .or_insert_with(|| AdvertisementSnapshot {
648 last_seen: Instant::now(),
649 peer_hint: None,
650 });
651 entry.last_seen = Instant::now();
652 }
653 CentralEvent::ServiceDataAdvertisement { id, service_data } => {
654 if let Some(data) = service_data.get(&service_uuid) {
655 let mut advertisements = fresh_advertisements_for_events.lock().await;
656 let entry = advertisements
657 .entry(id.to_string())
658 .or_insert_with(|| AdvertisementSnapshot {
659 last_seen: Instant::now(),
660 peer_hint: None,
661 });
662 entry.last_seen = Instant::now();
663 if !data.is_empty() {
664 entry.peer_hint = Some(hex::encode(data));
665 }
666 }
667 }
668 CentralEvent::DeviceDisconnected(id) => {
669 fresh_advertisements_for_events
670 .lock()
671 .await
672 .remove(&id.to_string());
673 }
674 _ => {}
675 }
676 }
677 });
678
679 tokio::spawn(async move {
680 let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
681 loop {
682 if let Err(err) = adapter
683 .start_scan(ScanFilter {
684 services: vec![service_uuid],
685 })
686 .await
687 {
688 warn!("Failed to start BLE scan: {}", err);
689 tokio::time::sleep(Duration::from_secs(5)).await;
690 continue;
691 }
692 tokio::time::sleep(Duration::from_secs(2)).await;
693
694 let peripherals = match adapter.peripherals().await {
695 Ok(peripherals) => peripherals,
696 Err(err) => {
697 warn!("Failed to list BLE peripherals: {}", err);
698 tokio::time::sleep(Duration::from_secs(5)).await;
699 continue;
700 }
701 };
702
703 connected.retain(|_, connection| {
704 futures::executor::block_on(async {
705 let peripheral_connected =
706 connection.peripheral.is_connected().await.unwrap_or(false);
707 let link_open = connection.link.is_open();
708 if !peripheral_connected || !link_open {
709 if peripheral_connected {
710 let _ = connection.peripheral.disconnect().await;
711 }
712 false
713 } else {
714 true
715 }
716 })
717 });
718
719 let advertisement_snapshot = {
720 let now = Instant::now();
721 let mut seen = fresh_advertisements.lock().await;
722 seen.retain(|_, advertisement| {
723 now.saturating_duration_since(advertisement.last_seen)
724 <= FRESH_ADVERTISEMENT_WINDOW
725 });
726 seen.clone()
727 };
728
729 let mut candidates = Vec::new();
730 for peripheral in peripherals {
731 let peripheral_id = peripheral.id().to_string();
732 if connected.contains_key(&peripheral_id) {
733 continue;
734 }
735 let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
736 else {
737 debug!(
738 "Skipping cached BLE peripheral {} without a fresh advertisement",
739 peripheral_id
740 );
741 continue;
742 };
743 let properties = match peripheral.properties().await {
744 Ok(Some(properties)) => properties,
745 _ => continue,
746 };
747 if !properties.services.contains(&service_uuid) {
748 continue;
749 }
750 candidates.push((peripheral, peripheral_id, advertisement));
751 }
752
753 candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
754 let mut represented_peer_hints = connected
755 .values()
756 .filter_map(|connection| connection.peer_hint.clone())
757 .collect::<HashSet<_>>();
758
759 for (peripheral, peripheral_id, advertisement) in candidates {
760 if connected.len() >= config.max_peers {
761 break;
762 }
763 if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
764 if !represented_peer_hints.insert(peer_hint.clone()) {
765 debug!(
766 "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
767 peripheral_id,
768 peer_hint
769 );
770 continue;
771 }
772 }
773 info!(
774 "Discovered BLE peripheral {} advertising htree service{}",
775 peripheral_id,
776 advertisement
777 .peer_hint
778 .as_ref()
779 .map(|hint| format!(" (peer hint {})", hint))
780 .unwrap_or_default()
781 );
782 match connect_peripheral(peripheral.clone()).await {
783 Ok(Some(link)) => {
784 let tracked_link = link.clone();
785 let pending = PendingBluetoothLink {
786 link,
787 direction: PeerDirection::Outbound,
788 local_hello_sent: false,
789 peer_hint: advertisement
790 .peer_hint
791 .clone()
792 .or(Some(peripheral_id.clone())),
793 };
794 match handle_pending_link(pending, context.clone()).await {
795 Ok(true) => {
796 connected.insert(
797 peripheral_id.clone(),
798 ConnectedPeripheral {
799 peripheral,
800 link: tracked_link,
801 peer_hint: advertisement.peer_hint.clone(),
802 },
803 );
804 }
805 Ok(false) => {}
806 Err(err) => {
807 warn!("Failed to attach BLE peripheral: {}", err);
808 }
809 }
810 }
811 Ok(None) => {}
812 Err(err) => {
813 warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
814 }
815 }
816 }
817
818 tokio::time::sleep(Duration::from_secs(5)).await;
819 }
820 });
821
822 Ok(())
823 }
824
825 async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
826 let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
827 let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
828 let peripheral_id = peripheral.id().to_string();
829
830 let mut connect_timed_out = false;
831 if !peripheral.is_connected().await? {
832 info!("Connecting to BLE peripheral {}", peripheral_id);
833 match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
834 Ok(Ok(())) => {}
835 Ok(Err(err)) => return Err(err.into()),
836 Err(_) => {
837 warn!(
838 "BLE connect timed out for {}; probing services before giving up",
839 peripheral_id
840 );
841 connect_timed_out = true;
842 }
843 }
844 }
845 let mut rx_char = None;
846 let mut tx_char = None;
847 for attempt in 1..=8 {
848 info!(
849 "Discovering BLE services for {} (attempt {}/8)",
850 peripheral_id, attempt
851 );
852 if let Err(err) = peripheral.discover_services().await {
853 if attempt == 8 {
854 if connect_timed_out {
855 warn!(
856 "BLE service discovery failed for {} after soft connect timeout: {}",
857 peripheral_id, err
858 );
859 let _ = peripheral.disconnect().await;
860 return Ok(None);
861 }
862 return Err(err.into());
863 }
864 warn!(
865 "BLE service discovery attempt {}/8 failed for {}: {}",
866 attempt, peripheral_id, err
867 );
868 tokio::time::sleep(Duration::from_millis(500)).await;
869 continue;
870 }
871
872 let characteristics = peripheral.characteristics();
873 if !characteristics.is_empty() {
874 let uuids = characteristics
875 .iter()
876 .map(|c| c.uuid.to_string())
877 .collect::<Vec<_>>()
878 .join(", ");
879 info!(
880 "BLE peripheral {} characteristics: {}",
881 peripheral_id, uuids
882 );
883 }
884 rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
885 tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
886 if rx_char.is_some() && tx_char.is_some() {
887 break;
888 }
889 tokio::time::sleep(Duration::from_millis(500)).await;
890 }
891 let Some(rx_char) = rx_char else {
892 warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
893 let _ = peripheral.disconnect().await;
894 return Ok(None);
895 };
896 let Some(tx_char) = tx_char else {
897 warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
898 let _ = peripheral.disconnect().await;
899 return Ok(None);
900 };
901 if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
902 warn!(
903 "BLE peripheral {} TX characteristic is missing NOTIFY",
904 peripheral_id
905 );
906 let _ = peripheral.disconnect().await;
907 return Ok(None);
908 }
909 let notifications = peripheral.notifications().await?;
910 info!("Subscribing to BLE notifications for {}", peripheral_id);
911 peripheral.subscribe(&tx_char).await?;
912
913 let initial_frames = match peripheral.read(&tx_char).await {
914 Ok(bytes) if !bytes.is_empty() => {
915 info!(
916 "Read initial BLE hello bytes from {} ({} bytes)",
917 peripheral_id,
918 bytes.len()
919 );
920 let mut decoder = FrameDecoder::new();
921 match decoder.push(&bytes) {
922 Ok(frames) => frames,
923 Err(err) => {
924 warn!(
925 "Discarding malformed initial BLE frame from {}: {}",
926 peripheral_id, err
927 );
928 Vec::new()
929 }
930 }
931 }
932 Ok(_) => Vec::new(),
933 Err(err) => {
934 warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
935 Vec::new()
936 }
937 };
938
939 info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
940 Ok(Some(MacosCentralLink::new(
941 peripheral,
942 rx_char,
943 notifications,
944 initial_frames,
945 )))
946 }
947
948 struct FrameDecoder {
949 buffer: Vec<u8>,
950 }
951
952 impl FrameDecoder {
953 fn new() -> Self {
954 Self { buffer: Vec::new() }
955 }
956
957 fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
958 self.buffer.extend_from_slice(chunk);
959 let mut frames = Vec::new();
960 loop {
961 if self.buffer.len() < 5 {
962 break;
963 }
964 let kind = self.buffer[0];
965 let len = u32::from_be_bytes([
966 self.buffer[1],
967 self.buffer[2],
968 self.buffer[3],
969 self.buffer[4],
970 ]) as usize;
971 if self.buffer.len() < 5 + len {
972 break;
973 }
974 let payload = self.buffer[5..5 + len].to_vec();
975 self.buffer.drain(..5 + len);
976 let frame = match kind {
977 1 => BluetoothFrame::Text(String::from_utf8(payload)?),
978 2 => BluetoothFrame::Binary(payload),
979 _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
980 };
981 frames.push(frame);
982 }
983 Ok(frames)
984 }
985 }
986
987 fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
988 match frame {
989 BluetoothFrame::Text(text) => {
990 let payload = text.into_bytes();
991 let mut out = Vec::with_capacity(5 + payload.len());
992 out.push(1);
993 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
994 out.extend_from_slice(&payload);
995 out
996 }
997 BluetoothFrame::Binary(payload) => {
998 let mut out = Vec::with_capacity(5 + payload.len());
999 out.push(2);
1000 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
1001 out.extend_from_slice(&payload);
1002 out
1003 }
1004 }
1005 }
1006
1007 struct MacosCentralLink {
1008 peripheral: Peripheral,
1009 rx_char: Characteristic,
1010 open: std::sync::atomic::AtomicBool,
1011 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
1012 }
1013
1014 impl MacosCentralLink {
1015 fn new(
1016 peripheral: Peripheral,
1017 rx_char: Characteristic,
1018 mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
1019 initial_frames: Vec<BluetoothFrame>,
1020 ) -> Arc<Self> {
1021 let (tx, rx) = mpsc::channel(64);
1022 let link = Arc::new(Self {
1023 peripheral,
1024 rx_char,
1025 open: std::sync::atomic::AtomicBool::new(true),
1026 rx: Mutex::new(rx),
1027 });
1028
1029 for frame in initial_frames {
1030 let _ = tx.try_send(frame);
1031 }
1032
1033 let weak = Arc::downgrade(&link);
1034 tokio::spawn(async move {
1035 let mut decoder = FrameDecoder::new();
1036 while let Some(notification) = notifications.next().await {
1037 let Ok(frames) = decoder.push(¬ification.value) else {
1038 break;
1039 };
1040 for frame in frames {
1041 if tx.send(frame).await.is_err() {
1042 break;
1043 }
1044 }
1045 }
1046 if let Some(link) = weak.upgrade() {
1047 link.open.store(false, std::sync::atomic::Ordering::Relaxed);
1048 }
1049 });
1050
1051 link
1052 }
1053 }
1054
1055 #[async_trait]
1056 impl BluetoothLink for MacosCentralLink {
1057 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
1058 let bytes = encode_frame(frame);
1059 for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
1060 self.peripheral
1061 .write(&self.rx_char, chunk, WriteType::WithResponse)
1062 .await?;
1063 }
1064 Ok(())
1065 }
1066
1067 async fn recv(&self) -> Option<BluetoothFrame> {
1068 self.rx.lock().await.recv().await
1069 }
1070
1071 fn is_open(&self) -> bool {
1072 self.open.load(std::sync::atomic::Ordering::Relaxed)
1073 }
1074
1075 async fn close(&self) -> Result<()> {
1076 self.open.store(false, std::sync::atomic::Ordering::Relaxed);
1077 let _ = self.peripheral.disconnect().await;
1078 Ok(())
1079 }
1080 }
1081}