1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3#[cfg(target_os = "macos")]
4use std::collections::HashMap;
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7#[cfg(target_os = "macos")]
8use std::time::Instant;
9use tokio::sync::mpsc;
10use tracing::{debug, info, warn};
11
12use super::session::MeshPeer;
13#[cfg(test)]
14use super::types::PeerPool;
15use super::types::{MeshNostrFrame, PeerId, PoolSettings};
16use crate::bluetooth_peer::{BluetoothFrame, BluetoothLink, BluetoothPeer};
17use crate::manager::{PeerClassifier, WebRTCState};
18use crate::peer::ContentStore;
19use crate::relay_bridge::SharedMeshRelayClient;
20use crate::runtime_peer::PeerDirection;
21use crate::runtime_peer::{PeerSignalPath, PeerTransport, TransportPeerRegistrar};
22
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
29#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
30pub const HTREE_BLE_CHUNK_BYTES: usize = 64;
32const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
33
34#[derive(Debug, Clone)]
36pub struct BluetoothConfig {
37 pub enabled: bool,
38 pub max_peers: usize,
39}
40
41impl BluetoothConfig {
42 pub fn is_enabled(&self) -> bool {
43 self.enabled && self.max_peers > 0
44 }
45}
46
47impl Default for BluetoothConfig {
48 fn default() -> Self {
49 Self {
50 enabled: false,
51 max_peers: 0,
52 }
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum BluetoothBackendState {
58 Disabled,
59 Running,
60 Unsupported,
61}
62
63#[derive(Clone)]
64pub struct PendingBluetoothLink {
65 pub link: Arc<dyn BluetoothLink>,
66 pub direction: PeerDirection,
67 pub local_hello_sent: bool,
68 pub peer_hint: Option<String>,
69}
70
71#[async_trait]
72pub trait MobileBluetoothBridge: Send + Sync {
73 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
74}
75
76static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
77
78pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
79 MOBILE_BLUETOOTH_BRIDGE
80 .set(bridge)
81 .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
82}
83
84fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
85 MOBILE_BLUETOOTH_BRIDGE.get().cloned()
86}
87
88#[derive(Clone)]
89pub struct BluetoothPeerRegistrar {
90 pub state: Arc<WebRTCState>,
91 inner: TransportPeerRegistrar<MeshPeer>,
92}
93
94impl BluetoothPeerRegistrar {
95 pub fn new(
96 state: Arc<WebRTCState>,
97 peer_classifier: PeerClassifier,
98 pools: PoolSettings,
99 max_bluetooth_peers: usize,
100 ) -> Self {
101 Self {
102 inner: TransportPeerRegistrar::new(
103 state.runtime.peers.clone(),
104 state.runtime.connected_count.clone(),
105 peer_classifier,
106 pools,
107 PeerTransport::Bluetooth,
108 PeerSignalPath::Bluetooth,
109 max_bluetooth_peers,
110 ),
111 state,
112 }
113 }
114
115 pub async fn register_connected_peer(
116 &self,
117 peer_id: PeerId,
118 direction: PeerDirection,
119 peer: MeshPeer,
120 ) -> bool {
121 let accepted = self
122 .inner
123 .register_connected_peer(peer_id.clone(), direction, peer)
124 .await;
125 if !accepted {
126 warn!(
127 "Bluetooth peer {} rejected by shared registrar",
128 peer_id.short()
129 );
130 }
131 accepted
132 }
133
134 pub async fn unregister_peer(&self, peer_id: &PeerId) {
135 self.inner.unregister_peer(peer_id).await;
136 }
137
138 pub async fn unregister_bluetooth_peer_if_current(
139 &self,
140 peer_id: &PeerId,
141 expected_peer: &Arc<BluetoothPeer>,
142 ) {
143 self.inner
144 .unregister_peer_if(peer_id, |peer| match peer {
145 MeshPeer::Bluetooth(current) => Arc::ptr_eq(current, expected_peer),
146 _ => false,
147 })
148 .await;
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
156 use crate::session::{MeshPeer, TestMeshPeer};
157
158 #[tokio::test]
159 async fn register_connected_peer_closes_replaced_session() {
160 let state = Arc::new(WebRTCState::new());
161 let registrar = BluetoothPeerRegistrar::new(
162 state.clone(),
163 Arc::new(|_| PeerPool::Other),
164 PoolSettings::default(),
165 2,
166 );
167
168 let peer_id = PeerId::new("peer-pub".to_string());
169 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
170 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
171 let first_ref = first.mock_ref().expect("mock peer").clone();
172
173 assert!(
174 registrar
175 .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
176 .await
177 );
178 assert!(
179 registrar
180 .register_connected_peer(peer_id, PeerDirection::Outbound, second)
181 .await
182 );
183
184 assert!(first_ref.is_closed());
185 }
186
187 #[tokio::test]
188 async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
189 let state = Arc::new(WebRTCState::new());
190 let registrar = BluetoothPeerRegistrar::new(
191 state.clone(),
192 Arc::new(|_| PeerPool::Other),
193 PoolSettings::default(),
194 2,
195 );
196
197 let first_peer_id = PeerId::new("peer-pub".to_string());
198 let second_peer_id = PeerId::new("peer-pub".to_string());
199 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
200 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
201 let first_ref = first.mock_ref().expect("mock peer").clone();
202
203 assert!(
204 registrar
205 .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
206 .await
207 );
208 assert!(
209 registrar
210 .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
211 .await
212 );
213
214 assert!(first_ref.is_closed());
215 let peers = state.runtime.peers.read().await;
216 assert!(peers.contains_key(&second_peer_id.to_string()));
217 assert_eq!(peers.len(), 1);
218 assert_eq!(
219 state
220 .runtime
221 .connected_count
222 .load(std::sync::atomic::Ordering::Relaxed),
223 1
224 );
225 }
226
227 #[tokio::test]
228 async fn handle_pending_link_unregisters_peer_after_transport_closes() {
229 let (link_a, link_b) = MockBluetoothLink::pair();
230 let state = Arc::new(WebRTCState::new());
231 let registrar = BluetoothPeerRegistrar::new(
232 state.clone(),
233 Arc::new(|_| PeerPool::Other),
234 PoolSettings::default(),
235 2,
236 );
237 let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
238 let local_peer_id = PeerId::new("local-pub".to_string());
239 let remote_peer_id = PeerId::new("remote-pub".to_string());
240 let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
241
242 send_hello(&remote_link, &remote_peer_id)
243 .await
244 .expect("send hello");
245
246 let accepted = handle_pending_link(
247 PendingBluetoothLink {
248 link: link_a.clone(),
249 direction: PeerDirection::Outbound,
250 local_hello_sent: false,
251 peer_hint: Some("mock-ble".to_string()),
252 },
253 BluetoothRuntimeContext {
254 my_peer_id: local_peer_id,
255 store: None,
256 nostr_relay: None,
257 mesh_frame_tx,
258 registrar: registrar.clone(),
259 },
260 )
261 .await
262 .expect("handle pending link");
263
264 assert!(accepted);
265 assert!(state
266 .runtime
267 .peers
268 .read()
269 .await
270 .contains_key(&remote_peer_id.to_string()));
271
272 link_a.close().await.expect("close local transport");
273
274 tokio::time::timeout(Duration::from_secs(2), async {
275 loop {
276 if !state
277 .runtime
278 .peers
279 .read()
280 .await
281 .contains_key(&remote_peer_id.to_string())
282 {
283 break;
284 }
285 tokio::time::sleep(Duration::from_millis(25)).await;
286 }
287 })
288 .await
289 .expect("peer should be unregistered");
290 }
291}
292
293#[derive(Clone)]
294pub struct BluetoothRuntimeContext {
295 pub my_peer_id: PeerId,
296 pub store: Option<Arc<dyn ContentStore>>,
297 pub nostr_relay: Option<SharedMeshRelayClient>,
298 pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
299 pub registrar: BluetoothPeerRegistrar,
300}
301
302pub struct BluetoothMesh {
304 config: BluetoothConfig,
305}
306
307impl BluetoothMesh {
308 pub fn new(config: BluetoothConfig) -> Self {
309 Self { config }
310 }
311
312 pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
313 if !self.config.is_enabled() {
314 info!("Bluetooth transport disabled");
315 return BluetoothBackendState::Disabled;
316 }
317
318 let mut started = false;
319
320 if let Some(bridge) = mobile_bluetooth_bridge() {
321 info!(
322 "Starting mobile Bluetooth bridge for peer {}",
323 context.my_peer_id.short()
324 );
325 match bridge.start(context.my_peer_id.to_string()).await {
326 Ok(mut rx) => {
327 info!("Mobile Bluetooth bridge is running");
328 let ctx = context.clone();
329 tokio::spawn(async move {
330 while let Some(link) = rx.recv().await {
331 if let Err(err) = handle_pending_link(link, ctx.clone()).await {
332 warn!("Mobile Bluetooth link failed: {}", err);
333 }
334 }
335 });
336 started = true;
337 }
338 Err(err) => {
339 warn!("Failed to start mobile Bluetooth bridge: {}", err);
340 }
341 }
342 }
343
344 #[cfg(target_os = "macos")]
345 {
346 if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
347 {
348 warn!("Failed to start macOS Bluetooth central: {}", err);
349 } else {
350 started = true;
351 }
352 }
353
354 if started {
355 BluetoothBackendState::Running
356 } else {
357 warn!(
358 "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
359 self.config.max_peers
360 );
361 BluetoothBackendState::Unsupported
362 }
363 }
364}
365
366async fn handle_pending_link(
367 link: PendingBluetoothLink,
368 context: BluetoothRuntimeContext,
369) -> Result<bool> {
370 if let Some(peer_hint) = link.peer_hint.as_deref() {
371 debug!("Handling pending Bluetooth link {}", peer_hint);
372 }
373 info!(
374 "Handling pending Bluetooth link direction={} local_hello_sent={}",
375 link.direction, link.local_hello_sent
376 );
377 let remote_peer_id = receive_remote_hello(&link.link).await?;
378 info!(
379 "Received Bluetooth hello from {} while attaching pending link",
380 remote_peer_id.short()
381 );
382 if !link.local_hello_sent {
383 send_hello(&link.link, &context.my_peer_id).await?;
384 info!(
385 "Sent Bluetooth hello to {} while attaching pending link",
386 remote_peer_id.short()
387 );
388 }
389
390 let bluetooth_peer = BluetoothPeer::new(
391 remote_peer_id.clone(),
392 link.direction,
393 link.link,
394 context.store.clone(),
395 context.nostr_relay.clone(),
396 Some(context.mesh_frame_tx.clone()),
397 Some(context.registrar.state.clone()),
398 );
399 let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
400
401 if !context
402 .registrar
403 .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
404 .await
405 {
406 warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
407 bluetooth_peer.close().await?;
408 return Ok(false);
409 } else {
410 info!("Bluetooth peer {} connected", remote_peer_id.short());
411 spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
412 }
413 Ok(true)
414}
415
416fn spawn_bluetooth_disconnect_watch(
417 peer_id: PeerId,
418 peer: Arc<BluetoothPeer>,
419 registrar: BluetoothPeerRegistrar,
420) {
421 tokio::spawn(async move {
422 loop {
423 if !peer.is_connected() {
424 registrar
425 .unregister_bluetooth_peer_if_current(&peer_id, &peer)
426 .await;
427 break;
428 }
429 tokio::time::sleep(Duration::from_millis(250)).await;
430 }
431 });
432}
433
434#[derive(serde::Serialize, serde::Deserialize)]
435#[serde(rename_all = "camelCase")]
436struct BluetoothHello {
437 #[serde(rename = "type")]
438 kind: String,
439 peer_id: String,
440}
441
442async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
443 let hello = BluetoothHello {
444 kind: "hello".to_string(),
445 peer_id: my_peer_id.to_string(),
446 };
447 link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
448 .await
449}
450
451async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
452 let result = tokio::time::timeout(HELLO_TIMEOUT, async {
453 loop {
454 match link.recv().await {
455 Some(BluetoothFrame::Text(text)) => {
456 if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
457 if hello.kind == "hello" {
458 return PeerId::from_string(&hello.peer_id)
459 .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
460 }
461 }
462 }
463 Some(BluetoothFrame::Binary(_)) => {}
464 None => return Err(anyhow!("bluetooth link closed before hello")),
465 }
466 }
467 })
468 .await;
469
470 match result {
471 Ok(peer_id) => peer_id,
472 Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
473 }
474}
475
476#[cfg(target_os = "macos")]
477mod macos {
478 use super::*;
479 use btleplug::api::{
480 Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
481 ScanFilter, ValueNotification, WriteType,
482 };
483 use btleplug::platform::{Manager, Peripheral};
484 use futures::StreamExt;
485 use std::collections::HashSet;
486 use tokio::sync::Mutex;
487 use uuid::Uuid;
488
489 const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
490
491 #[derive(Clone)]
492 struct AdvertisementSnapshot {
493 last_seen: Instant,
494 peer_hint: Option<String>,
495 }
496
497 pub(super) async fn start_macos_central(
498 config: BluetoothConfig,
499 context: BluetoothRuntimeContext,
500 ) -> Result<()> {
501 struct ConnectedPeripheral {
502 peripheral: Peripheral,
503 link: Arc<dyn BluetoothLink>,
504 peer_hint: Option<String>,
505 }
506
507 let manager = Manager::new().await?;
508 let adapters = manager.adapters().await?;
509 let Some(adapter) = adapters.into_iter().next() else {
510 warn!("No Bluetooth adapters available on macOS");
511 return Ok(());
512 };
513 info!(
514 "Starting macOS Bluetooth central for peer {} (max_peers={})",
515 context.my_peer_id.short(),
516 config.max_peers
517 );
518 let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
519 let mut events = adapter.events().await?;
520 let fresh_advertisements =
521 Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
522 let fresh_advertisements_for_events = fresh_advertisements.clone();
523 tokio::spawn(async move {
524 while let Some(event) = events.next().await {
525 match event {
526 CentralEvent::ServicesAdvertisement { id, services }
527 if services.contains(&service_uuid) =>
528 {
529 let mut advertisements = fresh_advertisements_for_events.lock().await;
530 let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
531 AdvertisementSnapshot {
532 last_seen: Instant::now(),
533 peer_hint: None,
534 }
535 });
536 entry.last_seen = Instant::now();
537 }
538 CentralEvent::ServiceDataAdvertisement { id, service_data } => {
539 if let Some(data) = service_data.get(&service_uuid) {
540 let mut advertisements = fresh_advertisements_for_events.lock().await;
541 let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
542 AdvertisementSnapshot {
543 last_seen: Instant::now(),
544 peer_hint: None,
545 }
546 });
547 entry.last_seen = Instant::now();
548 if !data.is_empty() {
549 entry.peer_hint = Some(hex::encode(data));
550 }
551 }
552 }
553 CentralEvent::DeviceDisconnected(id) => {
554 fresh_advertisements_for_events
555 .lock()
556 .await
557 .remove(&id.to_string());
558 }
559 _ => {}
560 }
561 }
562 });
563
564 tokio::spawn(async move {
565 let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
566 loop {
567 if let Err(err) = adapter
568 .start_scan(ScanFilter {
569 services: vec![service_uuid],
570 })
571 .await
572 {
573 warn!("Failed to start BLE scan: {}", err);
574 tokio::time::sleep(Duration::from_secs(5)).await;
575 continue;
576 }
577 tokio::time::sleep(Duration::from_secs(2)).await;
578
579 let peripherals = match adapter.peripherals().await {
580 Ok(peripherals) => peripherals,
581 Err(err) => {
582 warn!("Failed to list BLE peripherals: {}", err);
583 tokio::time::sleep(Duration::from_secs(5)).await;
584 continue;
585 }
586 };
587
588 connected.retain(|_, connection| {
589 futures::executor::block_on(async {
590 let peripheral_connected =
591 connection.peripheral.is_connected().await.unwrap_or(false);
592 let link_open = connection.link.is_open();
593 if !peripheral_connected || !link_open {
594 if peripheral_connected {
595 let _ = connection.peripheral.disconnect().await;
596 }
597 false
598 } else {
599 true
600 }
601 })
602 });
603
604 let advertisement_snapshot = {
605 let now = Instant::now();
606 let mut seen = fresh_advertisements.lock().await;
607 seen.retain(|_, advertisement| {
608 now.saturating_duration_since(advertisement.last_seen)
609 <= FRESH_ADVERTISEMENT_WINDOW
610 });
611 seen.clone()
612 };
613
614 let mut candidates = Vec::new();
615 for peripheral in peripherals {
616 let peripheral_id = peripheral.id().to_string();
617 if connected.contains_key(&peripheral_id) {
618 continue;
619 }
620 let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
621 else {
622 debug!(
623 "Skipping cached BLE peripheral {} without a fresh advertisement",
624 peripheral_id
625 );
626 continue;
627 };
628 let properties = match peripheral.properties().await {
629 Ok(Some(properties)) => properties,
630 _ => continue,
631 };
632 if !properties.services.contains(&service_uuid) {
633 continue;
634 }
635 candidates.push((peripheral, peripheral_id, advertisement));
636 }
637
638 candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
639 let mut represented_peer_hints = connected
640 .values()
641 .filter_map(|connection| connection.peer_hint.clone())
642 .collect::<HashSet<_>>();
643
644 for (peripheral, peripheral_id, advertisement) in candidates {
645 if connected.len() >= config.max_peers {
646 break;
647 }
648 if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
649 if !represented_peer_hints.insert(peer_hint.clone()) {
650 debug!(
651 "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
652 peripheral_id,
653 peer_hint
654 );
655 continue;
656 }
657 }
658 info!(
659 "Discovered BLE peripheral {} advertising htree service{}",
660 peripheral_id,
661 advertisement
662 .peer_hint
663 .as_ref()
664 .map(|hint| format!(" (peer hint {})", hint))
665 .unwrap_or_default()
666 );
667 match connect_peripheral(peripheral.clone()).await {
668 Ok(Some(link)) => {
669 let tracked_link = link.clone();
670 let pending = PendingBluetoothLink {
671 link,
672 direction: PeerDirection::Outbound,
673 local_hello_sent: false,
674 peer_hint: advertisement
675 .peer_hint
676 .clone()
677 .or(Some(peripheral_id.clone())),
678 };
679 match handle_pending_link(pending, context.clone()).await {
680 Ok(true) => {
681 connected.insert(
682 peripheral_id.clone(),
683 ConnectedPeripheral {
684 peripheral,
685 link: tracked_link,
686 peer_hint: advertisement.peer_hint.clone(),
687 },
688 );
689 }
690 Ok(false) => {}
691 Err(err) => {
692 warn!("Failed to attach BLE peripheral: {}", err);
693 }
694 }
695 }
696 Ok(None) => {}
697 Err(err) => {
698 warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
699 }
700 }
701 }
702
703 tokio::time::sleep(Duration::from_secs(5)).await;
704 }
705 });
706
707 Ok(())
708 }
709
710 async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
711 let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
712 let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
713 let peripheral_id = peripheral.id().to_string();
714
715 let mut connect_timed_out = false;
716 if !peripheral.is_connected().await? {
717 info!("Connecting to BLE peripheral {}", peripheral_id);
718 match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
719 Ok(Ok(())) => {}
720 Ok(Err(err)) => return Err(err.into()),
721 Err(_) => {
722 warn!(
723 "BLE connect timed out for {}; probing services before giving up",
724 peripheral_id
725 );
726 connect_timed_out = true;
727 }
728 }
729 }
730 if connect_timed_out && !peripheral.is_connected().await.unwrap_or(false) {
731 warn!(
732 "BLE peripheral {} never reached a connected state after timeout; retrying later",
733 peripheral_id
734 );
735 let _ = peripheral.disconnect().await;
736 return Ok(None);
737 }
738 tokio::time::sleep(Duration::from_millis(300)).await;
739 let mut rx_char = None;
740 let mut tx_char = None;
741 for attempt in 1..=8 {
742 info!(
743 "Discovering BLE services for {} (attempt {}/8)",
744 peripheral_id, attempt
745 );
746 if let Err(err) = peripheral.discover_services().await {
747 if attempt == 8 {
748 if connect_timed_out {
749 warn!(
750 "BLE service discovery failed for {} after soft connect timeout: {}",
751 peripheral_id, err
752 );
753 let _ = peripheral.disconnect().await;
754 return Ok(None);
755 }
756 return Err(err.into());
757 }
758 warn!(
759 "BLE service discovery attempt {}/8 failed for {}: {}",
760 attempt, peripheral_id, err
761 );
762 tokio::time::sleep(Duration::from_millis(500)).await;
763 continue;
764 }
765
766 let characteristics = peripheral.characteristics();
767 if !characteristics.is_empty() {
768 let uuids = characteristics
769 .iter()
770 .map(|c| c.uuid.to_string())
771 .collect::<Vec<_>>()
772 .join(", ");
773 info!(
774 "BLE peripheral {} characteristics: {}",
775 peripheral_id, uuids
776 );
777 }
778 rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
779 tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
780 if rx_char.is_some() && tx_char.is_some() {
781 break;
782 }
783 tokio::time::sleep(Duration::from_millis(500)).await;
784 }
785 let Some(rx_char) = rx_char else {
786 warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
787 let _ = peripheral.disconnect().await;
788 return Ok(None);
789 };
790 let Some(tx_char) = tx_char else {
791 warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
792 let _ = peripheral.disconnect().await;
793 return Ok(None);
794 };
795 if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
796 warn!(
797 "BLE peripheral {} TX characteristic is missing NOTIFY",
798 peripheral_id
799 );
800 let _ = peripheral.disconnect().await;
801 return Ok(None);
802 }
803 let notifications = peripheral.notifications().await?;
804 info!("Subscribing to BLE notifications for {}", peripheral_id);
805 peripheral.subscribe(&tx_char).await?;
806
807 let initial_frames = match peripheral.read(&tx_char).await {
808 Ok(bytes) if !bytes.is_empty() => {
809 info!(
810 "Read initial BLE hello bytes from {} ({} bytes)",
811 peripheral_id,
812 bytes.len()
813 );
814 let mut decoder = FrameDecoder::new();
815 match decoder.push(&bytes) {
816 Ok(frames) => frames,
817 Err(err) => {
818 warn!(
819 "Discarding malformed initial BLE frame from {}: {}",
820 peripheral_id, err
821 );
822 Vec::new()
823 }
824 }
825 }
826 Ok(_) => Vec::new(),
827 Err(err) => {
828 warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
829 Vec::new()
830 }
831 };
832
833 info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
834 Ok(Some(MacosCentralLink::new(
835 peripheral,
836 rx_char,
837 notifications,
838 initial_frames,
839 )))
840 }
841
842 struct FrameDecoder {
843 buffer: Vec<u8>,
844 }
845
846 impl FrameDecoder {
847 fn new() -> Self {
848 Self { buffer: Vec::new() }
849 }
850
851 fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
852 self.buffer.extend_from_slice(chunk);
853 let mut frames = Vec::new();
854 loop {
855 if self.buffer.len() < 5 {
856 break;
857 }
858 let kind = self.buffer[0];
859 let len = u32::from_be_bytes([
860 self.buffer[1],
861 self.buffer[2],
862 self.buffer[3],
863 self.buffer[4],
864 ]) as usize;
865 if self.buffer.len() < 5 + len {
866 break;
867 }
868 let payload = self.buffer[5..5 + len].to_vec();
869 self.buffer.drain(..5 + len);
870 let frame = match kind {
871 1 => BluetoothFrame::Text(String::from_utf8(payload)?),
872 2 => BluetoothFrame::Binary(payload),
873 _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
874 };
875 frames.push(frame);
876 }
877 Ok(frames)
878 }
879 }
880
881 fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
882 match frame {
883 BluetoothFrame::Text(text) => {
884 let payload = text.into_bytes();
885 let mut out = Vec::with_capacity(5 + payload.len());
886 out.push(1);
887 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
888 out.extend_from_slice(&payload);
889 out
890 }
891 BluetoothFrame::Binary(payload) => {
892 let mut out = Vec::with_capacity(5 + payload.len());
893 out.push(2);
894 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
895 out.extend_from_slice(&payload);
896 out
897 }
898 }
899 }
900
901 struct MacosCentralLink {
902 peripheral: Peripheral,
903 rx_char: Characteristic,
904 open: std::sync::atomic::AtomicBool,
905 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
906 }
907
908 impl MacosCentralLink {
909 fn new(
910 peripheral: Peripheral,
911 rx_char: Characteristic,
912 mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
913 initial_frames: Vec<BluetoothFrame>,
914 ) -> Arc<Self> {
915 let (tx, rx) = mpsc::channel(64);
916 let link = Arc::new(Self {
917 peripheral,
918 rx_char,
919 open: std::sync::atomic::AtomicBool::new(true),
920 rx: Mutex::new(rx),
921 });
922
923 for frame in initial_frames {
924 let _ = tx.try_send(frame);
925 }
926
927 let weak = Arc::downgrade(&link);
928 tokio::spawn(async move {
929 let mut decoder = FrameDecoder::new();
930 while let Some(notification) = notifications.next().await {
931 let Ok(frames) = decoder.push(¬ification.value) else {
932 break;
933 };
934 for frame in frames {
935 if tx.send(frame).await.is_err() {
936 break;
937 }
938 }
939 }
940 if let Some(link) = weak.upgrade() {
941 link.open.store(false, std::sync::atomic::Ordering::Relaxed);
942 }
943 });
944
945 link
946 }
947 }
948
949 #[async_trait]
950 impl BluetoothLink for MacosCentralLink {
951 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
952 let bytes = encode_frame(frame);
953 for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
954 self.peripheral
955 .write(&self.rx_char, chunk, WriteType::WithResponse)
956 .await?;
957 }
958 Ok(())
959 }
960
961 async fn recv(&self) -> Option<BluetoothFrame> {
962 self.rx.lock().await.recv().await
963 }
964
965 fn is_open(&self) -> bool {
966 self.open.load(std::sync::atomic::Ordering::Relaxed)
967 }
968
969 async fn close(&self) -> Result<()> {
970 self.open.store(false, std::sync::atomic::Ordering::Relaxed);
971 let _ = self.peripheral.disconnect().await;
972 Ok(())
973 }
974 }
975}