1use anyhow::{anyhow, Result};
2use async_trait::async_trait;
3#[cfg(target_os = "macos")]
4use std::collections::HashMap;
5use std::sync::{Arc, OnceLock};
6use std::time::Duration;
7#[cfg(target_os = "macos")]
8use std::time::Instant;
9use tokio::sync::mpsc;
10use tracing::{debug, info, warn};
11
12use super::session::MeshPeer;
13#[cfg(test)]
14use super::types::PeerPool;
15use super::types::{MeshNostrFrame, PeerId, PoolSettings};
16use crate::bluetooth_peer::{BluetoothFrame, BluetoothLink, BluetoothPeer};
17use crate::manager::{PeerClassifier, WebRTCState};
18use crate::peer::ContentStore;
19use crate::relay_bridge::SharedMeshRelayClient;
20use crate::runtime_peer::PeerDirection;
21use crate::runtime_peer::{PeerSignalPath, PeerTransport, TransportPeerRegistrar};
22
23#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
24pub const HTREE_BLE_SERVICE_UUID: &str = "f18ef5f6-b7ee-4f40-b869-10a2d4f35932";
25#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
26pub const HTREE_BLE_RX_CHARACTERISTIC_UUID: &str = "0bb5f5c9-6369-4511-a84f-4d4c14d8f8d4";
27#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
28pub const HTREE_BLE_TX_CHARACTERISTIC_UUID: &str = "4ec9c0c2-97c6-4f46-9fd1-927d699b2f6d";
29#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
30pub const HTREE_BLE_CHUNK_BYTES: usize = 64;
32const HELLO_TIMEOUT: Duration = Duration::from_secs(10);
33
34#[derive(Debug, Clone, Default)]
36pub struct BluetoothConfig {
37 pub enabled: bool,
38 pub max_peers: usize,
39}
40
41impl BluetoothConfig {
42 pub fn is_enabled(&self) -> bool {
43 self.enabled && self.max_peers > 0
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum BluetoothBackendState {
49 Disabled,
50 Running,
51 Unsupported,
52}
53
54#[derive(Clone)]
55pub struct PendingBluetoothLink {
56 pub link: Arc<dyn BluetoothLink>,
57 pub direction: PeerDirection,
58 pub local_hello_sent: bool,
59 pub peer_hint: Option<String>,
60}
61
62#[async_trait]
63pub trait MobileBluetoothBridge: Send + Sync {
64 async fn start(&self, local_peer_id: String) -> Result<mpsc::Receiver<PendingBluetoothLink>>;
65}
66
67static MOBILE_BLUETOOTH_BRIDGE: OnceLock<Arc<dyn MobileBluetoothBridge>> = OnceLock::new();
68
69pub fn install_mobile_bluetooth_bridge(bridge: Arc<dyn MobileBluetoothBridge>) -> Result<()> {
70 MOBILE_BLUETOOTH_BRIDGE
71 .set(bridge)
72 .map_err(|_| anyhow!("mobile bluetooth bridge already installed"))
73}
74
75fn mobile_bluetooth_bridge() -> Option<Arc<dyn MobileBluetoothBridge>> {
76 MOBILE_BLUETOOTH_BRIDGE.get().cloned()
77}
78
79#[derive(Clone)]
80pub struct BluetoothPeerRegistrar {
81 pub state: Arc<WebRTCState>,
82 inner: TransportPeerRegistrar<MeshPeer>,
83}
84
85impl BluetoothPeerRegistrar {
86 pub fn new(
87 state: Arc<WebRTCState>,
88 peer_classifier: PeerClassifier,
89 pools: PoolSettings,
90 max_bluetooth_peers: usize,
91 ) -> Self {
92 Self {
93 inner: TransportPeerRegistrar::new(
94 state.runtime.peers.clone(),
95 state.runtime.connected_count.clone(),
96 peer_classifier,
97 pools,
98 PeerTransport::Bluetooth,
99 PeerSignalPath::Bluetooth,
100 max_bluetooth_peers,
101 ),
102 state,
103 }
104 }
105
106 pub async fn register_connected_peer(
107 &self,
108 peer_id: PeerId,
109 direction: PeerDirection,
110 peer: MeshPeer,
111 ) -> bool {
112 let accepted = self
113 .inner
114 .register_connected_peer(peer_id.clone(), direction, peer)
115 .await;
116 if !accepted {
117 warn!(
118 "Bluetooth peer {} rejected by shared registrar",
119 peer_id.short()
120 );
121 }
122 accepted
123 }
124
125 pub async fn unregister_peer(&self, peer_id: &PeerId) {
126 self.inner.unregister_peer(peer_id).await;
127 }
128
129 pub async fn unregister_bluetooth_peer_if_current(
130 &self,
131 peer_id: &PeerId,
132 expected_peer: &Arc<BluetoothPeer>,
133 ) {
134 self.inner
135 .unregister_peer_if(peer_id, |peer| match peer {
136 MeshPeer::Bluetooth(current) => Arc::ptr_eq(current, expected_peer),
137 _ => false,
138 })
139 .await;
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::bluetooth_peer::{BluetoothLink, MockBluetoothLink};
147 use crate::session::{MeshPeer, TestMeshPeer};
148
149 #[tokio::test]
150 async fn register_connected_peer_closes_replaced_session() {
151 let state = Arc::new(WebRTCState::new());
152 let registrar = BluetoothPeerRegistrar::new(
153 state.clone(),
154 Arc::new(|_| PeerPool::Other),
155 PoolSettings::default(),
156 2,
157 );
158
159 let peer_id = PeerId::new("peer-pub".to_string());
160 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
161 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
162 let first_ref = first.mock_ref().expect("mock peer").clone();
163
164 assert!(
165 registrar
166 .register_connected_peer(peer_id.clone(), PeerDirection::Outbound, first)
167 .await
168 );
169 assert!(
170 registrar
171 .register_connected_peer(peer_id, PeerDirection::Outbound, second)
172 .await
173 );
174
175 assert!(first_ref.is_closed());
176 }
177
178 #[tokio::test]
179 async fn register_connected_peer_replaces_existing_bluetooth_session_for_same_pubkey() {
180 let state = Arc::new(WebRTCState::new());
181 let registrar = BluetoothPeerRegistrar::new(
182 state.clone(),
183 Arc::new(|_| PeerPool::Other),
184 PoolSettings::default(),
185 2,
186 );
187
188 let first_peer_id = PeerId::new("peer-pub".to_string());
189 let second_peer_id = PeerId::new("peer-pub".to_string());
190 let first = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
191 let second = MeshPeer::mock_for_tests(TestMeshPeer::with_response(None));
192 let first_ref = first.mock_ref().expect("mock peer").clone();
193
194 assert!(
195 registrar
196 .register_connected_peer(first_peer_id.clone(), PeerDirection::Outbound, first)
197 .await
198 );
199 assert!(
200 registrar
201 .register_connected_peer(second_peer_id.clone(), PeerDirection::Outbound, second)
202 .await
203 );
204
205 assert!(first_ref.is_closed());
206 let peers = state.runtime.peers.read().await;
207 assert!(peers.contains_key(&second_peer_id.to_string()));
208 assert_eq!(peers.len(), 1);
209 assert_eq!(
210 state
211 .runtime
212 .connected_count
213 .load(std::sync::atomic::Ordering::Relaxed),
214 1
215 );
216 }
217
218 #[tokio::test]
219 async fn handle_pending_link_unregisters_peer_after_transport_closes() {
220 let (link_a, link_b) = MockBluetoothLink::pair();
221 let state = Arc::new(WebRTCState::new());
222 let registrar = BluetoothPeerRegistrar::new(
223 state.clone(),
224 Arc::new(|_| PeerPool::Other),
225 PoolSettings::default(),
226 2,
227 );
228 let (mesh_frame_tx, _mesh_frame_rx) = mpsc::channel(4);
229 let local_peer_id = PeerId::new("local-pub".to_string());
230 let remote_peer_id = PeerId::new("remote-pub".to_string());
231 let remote_link: Arc<dyn BluetoothLink> = link_b.clone();
232
233 send_hello(&remote_link, &remote_peer_id)
234 .await
235 .expect("send hello");
236
237 let accepted = handle_pending_link(
238 PendingBluetoothLink {
239 link: link_a.clone(),
240 direction: PeerDirection::Outbound,
241 local_hello_sent: false,
242 peer_hint: Some("mock-ble".to_string()),
243 },
244 BluetoothRuntimeContext {
245 my_peer_id: local_peer_id,
246 store: None,
247 nostr_relay: None,
248 mesh_frame_tx,
249 registrar: registrar.clone(),
250 },
251 )
252 .await
253 .expect("handle pending link");
254
255 assert!(accepted);
256 assert!(state
257 .runtime
258 .peers
259 .read()
260 .await
261 .contains_key(&remote_peer_id.to_string()));
262
263 link_a.close().await.expect("close local transport");
264
265 tokio::time::timeout(Duration::from_secs(2), async {
266 loop {
267 if !state
268 .runtime
269 .peers
270 .read()
271 .await
272 .contains_key(&remote_peer_id.to_string())
273 {
274 break;
275 }
276 tokio::time::sleep(Duration::from_millis(25)).await;
277 }
278 })
279 .await
280 .expect("peer should be unregistered");
281 }
282}
283
284#[derive(Clone)]
285pub struct BluetoothRuntimeContext {
286 pub my_peer_id: PeerId,
287 pub store: Option<Arc<dyn ContentStore>>,
288 pub nostr_relay: Option<SharedMeshRelayClient>,
289 pub mesh_frame_tx: mpsc::Sender<(PeerId, MeshNostrFrame)>,
290 pub registrar: BluetoothPeerRegistrar,
291}
292
293pub struct BluetoothMesh {
295 config: BluetoothConfig,
296}
297
298impl BluetoothMesh {
299 pub fn new(config: BluetoothConfig) -> Self {
300 Self { config }
301 }
302
303 pub async fn start(&self, context: BluetoothRuntimeContext) -> BluetoothBackendState {
304 if !self.config.is_enabled() {
305 info!("Bluetooth transport disabled");
306 return BluetoothBackendState::Disabled;
307 }
308
309 let mut started = false;
310
311 if let Some(bridge) = mobile_bluetooth_bridge() {
312 info!(
313 "Starting mobile Bluetooth bridge for peer {}",
314 context.my_peer_id.short()
315 );
316 match bridge.start(context.my_peer_id.to_string()).await {
317 Ok(mut rx) => {
318 info!("Mobile Bluetooth bridge is running");
319 let ctx = context.clone();
320 tokio::spawn(async move {
321 while let Some(link) = rx.recv().await {
322 if let Err(err) = handle_pending_link(link, ctx.clone()).await {
323 warn!("Mobile Bluetooth link failed: {}", err);
324 }
325 }
326 });
327 started = true;
328 }
329 Err(err) => {
330 warn!("Failed to start mobile Bluetooth bridge: {}", err);
331 }
332 }
333 }
334
335 #[cfg(target_os = "macos")]
336 {
337 if let Err(err) = macos::start_macos_central(self.config.clone(), context.clone()).await
338 {
339 warn!("Failed to start macOS Bluetooth central: {}", err);
340 } else {
341 started = true;
342 }
343 }
344
345 if started {
346 BluetoothBackendState::Running
347 } else {
348 warn!(
349 "Bluetooth transport requested (max_peers={}) but no native backend is available for this build",
350 self.config.max_peers
351 );
352 BluetoothBackendState::Unsupported
353 }
354 }
355}
356
357async fn handle_pending_link(
358 link: PendingBluetoothLink,
359 context: BluetoothRuntimeContext,
360) -> Result<bool> {
361 if let Some(peer_hint) = link.peer_hint.as_deref() {
362 debug!("Handling pending Bluetooth link {}", peer_hint);
363 }
364 info!(
365 "Handling pending Bluetooth link direction={} local_hello_sent={}",
366 link.direction, link.local_hello_sent
367 );
368 let remote_peer_id = receive_remote_hello(&link.link).await?;
369 info!(
370 "Received Bluetooth hello from {} while attaching pending link",
371 remote_peer_id.short()
372 );
373 if !link.local_hello_sent {
374 send_hello(&link.link, &context.my_peer_id).await?;
375 info!(
376 "Sent Bluetooth hello to {} while attaching pending link",
377 remote_peer_id.short()
378 );
379 }
380
381 let bluetooth_peer = BluetoothPeer::new(
382 remote_peer_id.clone(),
383 link.direction,
384 link.link,
385 context.store.clone(),
386 context.nostr_relay.clone(),
387 Some(context.mesh_frame_tx.clone()),
388 Some(context.registrar.state.clone()),
389 );
390 let peer = MeshPeer::Bluetooth(bluetooth_peer.clone());
391
392 if !context
393 .registrar
394 .register_connected_peer(remote_peer_id.clone(), link.direction, peer)
395 .await
396 {
397 warn!("Rejecting Bluetooth peer {}", remote_peer_id.short());
398 bluetooth_peer.close().await?;
399 return Ok(false);
400 } else {
401 info!("Bluetooth peer {} connected", remote_peer_id.short());
402 spawn_bluetooth_disconnect_watch(remote_peer_id, bluetooth_peer, context.registrar.clone());
403 }
404 Ok(true)
405}
406
407fn spawn_bluetooth_disconnect_watch(
408 peer_id: PeerId,
409 peer: Arc<BluetoothPeer>,
410 registrar: BluetoothPeerRegistrar,
411) {
412 tokio::spawn(async move {
413 loop {
414 if !peer.is_connected() {
415 registrar
416 .unregister_bluetooth_peer_if_current(&peer_id, &peer)
417 .await;
418 break;
419 }
420 tokio::time::sleep(Duration::from_millis(250)).await;
421 }
422 });
423}
424
425#[derive(serde::Serialize, serde::Deserialize)]
426#[serde(rename_all = "camelCase")]
427struct BluetoothHello {
428 #[serde(rename = "type")]
429 kind: String,
430 peer_id: String,
431}
432
433async fn send_hello(link: &Arc<dyn BluetoothLink>, my_peer_id: &PeerId) -> Result<()> {
434 let hello = BluetoothHello {
435 kind: "hello".to_string(),
436 peer_id: my_peer_id.to_string(),
437 };
438 link.send(BluetoothFrame::Text(serde_json::to_string(&hello)?))
439 .await
440}
441
442async fn receive_remote_hello(link: &Arc<dyn BluetoothLink>) -> Result<PeerId> {
443 let result = tokio::time::timeout(HELLO_TIMEOUT, async {
444 loop {
445 match link.recv().await {
446 Some(BluetoothFrame::Text(text)) => {
447 if let Ok(hello) = serde_json::from_str::<BluetoothHello>(&text) {
448 if hello.kind == "hello" {
449 return PeerId::from_string(&hello.peer_id)
450 .ok_or_else(|| anyhow!("invalid peer id in Bluetooth hello"));
451 }
452 }
453 }
454 Some(BluetoothFrame::Binary(_)) => {}
455 None => return Err(anyhow!("bluetooth link closed before hello")),
456 }
457 }
458 })
459 .await;
460
461 match result {
462 Ok(peer_id) => peer_id,
463 Err(_) => Err(anyhow!("timed out waiting for Bluetooth hello")),
464 }
465}
466
467#[cfg(target_os = "macos")]
468mod macos {
469 use super::*;
470 use btleplug::api::{
471 Central, CentralEvent, CharPropFlags, Characteristic, Manager as _, Peripheral as _,
472 ScanFilter, ValueNotification, WriteType,
473 };
474 use btleplug::platform::{Manager, Peripheral};
475 use futures::StreamExt;
476 use std::collections::HashSet;
477 use tokio::sync::Mutex;
478 use uuid::Uuid;
479
480 const FRESH_ADVERTISEMENT_WINDOW: Duration = Duration::from_secs(10);
481
482 #[derive(Clone)]
483 struct AdvertisementSnapshot {
484 last_seen: Instant,
485 peer_hint: Option<String>,
486 }
487
488 pub(super) async fn start_macos_central(
489 config: BluetoothConfig,
490 context: BluetoothRuntimeContext,
491 ) -> Result<()> {
492 struct ConnectedPeripheral {
493 peripheral: Peripheral,
494 link: Arc<dyn BluetoothLink>,
495 peer_hint: Option<String>,
496 }
497
498 let manager = Manager::new().await?;
499 let adapters = manager.adapters().await?;
500 let Some(adapter) = adapters.into_iter().next() else {
501 warn!("No Bluetooth adapters available on macOS");
502 return Ok(());
503 };
504 info!(
505 "Starting macOS Bluetooth central for peer {} (max_peers={})",
506 context.my_peer_id.short(),
507 config.max_peers
508 );
509 let service_uuid = Uuid::parse_str(HTREE_BLE_SERVICE_UUID).expect("valid UUID");
510 let mut events = adapter.events().await?;
511 let fresh_advertisements =
512 Arc::new(Mutex::new(HashMap::<String, AdvertisementSnapshot>::new()));
513 let fresh_advertisements_for_events = fresh_advertisements.clone();
514 tokio::spawn(async move {
515 while let Some(event) = events.next().await {
516 match event {
517 CentralEvent::ServicesAdvertisement { id, services }
518 if services.contains(&service_uuid) =>
519 {
520 let mut advertisements = fresh_advertisements_for_events.lock().await;
521 let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
522 AdvertisementSnapshot {
523 last_seen: Instant::now(),
524 peer_hint: None,
525 }
526 });
527 entry.last_seen = Instant::now();
528 }
529 CentralEvent::ServiceDataAdvertisement { id, service_data } => {
530 if let Some(data) = service_data.get(&service_uuid) {
531 let mut advertisements = fresh_advertisements_for_events.lock().await;
532 let entry = advertisements.entry(id.to_string()).or_insert_with(|| {
533 AdvertisementSnapshot {
534 last_seen: Instant::now(),
535 peer_hint: None,
536 }
537 });
538 entry.last_seen = Instant::now();
539 if !data.is_empty() {
540 entry.peer_hint = Some(hex::encode(data));
541 }
542 }
543 }
544 CentralEvent::DeviceDisconnected(id) => {
545 fresh_advertisements_for_events
546 .lock()
547 .await
548 .remove(&id.to_string());
549 }
550 _ => {}
551 }
552 }
553 });
554
555 tokio::spawn(async move {
556 let mut connected: HashMap<String, ConnectedPeripheral> = HashMap::new();
557 loop {
558 if let Err(err) = adapter
559 .start_scan(ScanFilter {
560 services: vec![service_uuid],
561 })
562 .await
563 {
564 warn!("Failed to start BLE scan: {}", err);
565 tokio::time::sleep(Duration::from_secs(5)).await;
566 continue;
567 }
568 tokio::time::sleep(Duration::from_secs(2)).await;
569
570 let peripherals = match adapter.peripherals().await {
571 Ok(peripherals) => peripherals,
572 Err(err) => {
573 warn!("Failed to list BLE peripherals: {}", err);
574 tokio::time::sleep(Duration::from_secs(5)).await;
575 continue;
576 }
577 };
578
579 connected.retain(|_, connection| {
580 futures::executor::block_on(async {
581 let peripheral_connected =
582 connection.peripheral.is_connected().await.unwrap_or(false);
583 let link_open = connection.link.is_open();
584 if !peripheral_connected || !link_open {
585 if peripheral_connected {
586 let _ = connection.peripheral.disconnect().await;
587 }
588 false
589 } else {
590 true
591 }
592 })
593 });
594
595 let advertisement_snapshot = {
596 let now = Instant::now();
597 let mut seen = fresh_advertisements.lock().await;
598 seen.retain(|_, advertisement| {
599 now.saturating_duration_since(advertisement.last_seen)
600 <= FRESH_ADVERTISEMENT_WINDOW
601 });
602 seen.clone()
603 };
604
605 let mut candidates = Vec::new();
606 for peripheral in peripherals {
607 let peripheral_id = peripheral.id().to_string();
608 if connected.contains_key(&peripheral_id) {
609 continue;
610 }
611 let Some(advertisement) = advertisement_snapshot.get(&peripheral_id).cloned()
612 else {
613 debug!(
614 "Skipping cached BLE peripheral {} without a fresh advertisement",
615 peripheral_id
616 );
617 continue;
618 };
619 let properties = match peripheral.properties().await {
620 Ok(Some(properties)) => properties,
621 _ => continue,
622 };
623 if !properties.services.contains(&service_uuid) {
624 continue;
625 }
626 candidates.push((peripheral, peripheral_id, advertisement));
627 }
628
629 candidates.sort_by(|left, right| right.2.last_seen.cmp(&left.2.last_seen));
630 let mut represented_peer_hints = connected
631 .values()
632 .filter_map(|connection| connection.peer_hint.clone())
633 .collect::<HashSet<_>>();
634
635 for (peripheral, peripheral_id, advertisement) in candidates {
636 if connected.len() >= config.max_peers {
637 break;
638 }
639 if let Some(peer_hint) = advertisement.peer_hint.as_ref() {
640 if !represented_peer_hints.insert(peer_hint.clone()) {
641 debug!(
642 "Skipping stale BLE peripheral {} because peer hint {} is already represented by a fresher advertisement or live link",
643 peripheral_id,
644 peer_hint
645 );
646 continue;
647 }
648 }
649 info!(
650 "Discovered BLE peripheral {} advertising htree service{}",
651 peripheral_id,
652 advertisement
653 .peer_hint
654 .as_ref()
655 .map(|hint| format!(" (peer hint {})", hint))
656 .unwrap_or_default()
657 );
658 match connect_peripheral(peripheral.clone()).await {
659 Ok(Some(link)) => {
660 let tracked_link = link.clone();
661 let pending = PendingBluetoothLink {
662 link,
663 direction: PeerDirection::Outbound,
664 local_hello_sent: false,
665 peer_hint: advertisement
666 .peer_hint
667 .clone()
668 .or(Some(peripheral_id.clone())),
669 };
670 match handle_pending_link(pending, context.clone()).await {
671 Ok(true) => {
672 connected.insert(
673 peripheral_id.clone(),
674 ConnectedPeripheral {
675 peripheral,
676 link: tracked_link,
677 peer_hint: advertisement.peer_hint.clone(),
678 },
679 );
680 }
681 Ok(false) => {}
682 Err(err) => {
683 warn!("Failed to attach BLE peripheral: {}", err);
684 }
685 }
686 }
687 Ok(None) => {}
688 Err(err) => {
689 warn!("Skipping BLE peripheral {}: {}", peripheral_id, err);
690 }
691 }
692 }
693
694 tokio::time::sleep(Duration::from_secs(5)).await;
695 }
696 });
697
698 Ok(())
699 }
700
701 async fn connect_peripheral(peripheral: Peripheral) -> Result<Option<Arc<dyn BluetoothLink>>> {
702 let rx_uuid = Uuid::parse_str(HTREE_BLE_RX_CHARACTERISTIC_UUID)?;
703 let tx_uuid = Uuid::parse_str(HTREE_BLE_TX_CHARACTERISTIC_UUID)?;
704 let peripheral_id = peripheral.id().to_string();
705
706 let mut connect_timed_out = false;
707 if !peripheral.is_connected().await? {
708 info!("Connecting to BLE peripheral {}", peripheral_id);
709 match tokio::time::timeout(Duration::from_secs(25), peripheral.connect()).await {
710 Ok(Ok(())) => {}
711 Ok(Err(err)) => return Err(err.into()),
712 Err(_) => {
713 warn!(
714 "BLE connect timed out for {}; probing services before giving up",
715 peripheral_id
716 );
717 connect_timed_out = true;
718 }
719 }
720 }
721 if connect_timed_out && !peripheral.is_connected().await.unwrap_or(false) {
722 warn!(
723 "BLE peripheral {} never reached a connected state after timeout; retrying later",
724 peripheral_id
725 );
726 let _ = peripheral.disconnect().await;
727 return Ok(None);
728 }
729 tokio::time::sleep(Duration::from_millis(300)).await;
730 let mut rx_char = None;
731 let mut tx_char = None;
732 for attempt in 1..=8 {
733 info!(
734 "Discovering BLE services for {} (attempt {}/8)",
735 peripheral_id, attempt
736 );
737 if let Err(err) = peripheral.discover_services().await {
738 if attempt == 8 {
739 if connect_timed_out {
740 warn!(
741 "BLE service discovery failed for {} after soft connect timeout: {}",
742 peripheral_id, err
743 );
744 let _ = peripheral.disconnect().await;
745 return Ok(None);
746 }
747 return Err(err.into());
748 }
749 warn!(
750 "BLE service discovery attempt {}/8 failed for {}: {}",
751 attempt, peripheral_id, err
752 );
753 tokio::time::sleep(Duration::from_millis(500)).await;
754 continue;
755 }
756
757 let characteristics = peripheral.characteristics();
758 if !characteristics.is_empty() {
759 let uuids = characteristics
760 .iter()
761 .map(|c| c.uuid.to_string())
762 .collect::<Vec<_>>()
763 .join(", ");
764 info!(
765 "BLE peripheral {} characteristics: {}",
766 peripheral_id, uuids
767 );
768 }
769 rx_char = characteristics.iter().find(|c| c.uuid == rx_uuid).cloned();
770 tx_char = characteristics.iter().find(|c| c.uuid == tx_uuid).cloned();
771 if rx_char.is_some() && tx_char.is_some() {
772 break;
773 }
774 tokio::time::sleep(Duration::from_millis(500)).await;
775 }
776 let Some(rx_char) = rx_char else {
777 warn!("BLE peripheral {} missing RX characteristic", peripheral_id);
778 let _ = peripheral.disconnect().await;
779 return Ok(None);
780 };
781 let Some(tx_char) = tx_char else {
782 warn!("BLE peripheral {} missing TX characteristic", peripheral_id);
783 let _ = peripheral.disconnect().await;
784 return Ok(None);
785 };
786 if !tx_char.properties.contains(CharPropFlags::NOTIFY) {
787 warn!(
788 "BLE peripheral {} TX characteristic is missing NOTIFY",
789 peripheral_id
790 );
791 let _ = peripheral.disconnect().await;
792 return Ok(None);
793 }
794 let notifications = peripheral.notifications().await?;
795 info!("Subscribing to BLE notifications for {}", peripheral_id);
796 peripheral.subscribe(&tx_char).await?;
797
798 let initial_frames = match peripheral.read(&tx_char).await {
799 Ok(bytes) if !bytes.is_empty() => {
800 info!(
801 "Read initial BLE hello bytes from {} ({} bytes)",
802 peripheral_id,
803 bytes.len()
804 );
805 let mut decoder = FrameDecoder::new();
806 match decoder.push(&bytes) {
807 Ok(frames) => frames,
808 Err(err) => {
809 warn!(
810 "Discarding malformed initial BLE frame from {}: {}",
811 peripheral_id, err
812 );
813 Vec::new()
814 }
815 }
816 }
817 Ok(_) => Vec::new(),
818 Err(err) => {
819 warn!("Initial BLE read failed for {}: {}", peripheral_id, err);
820 Vec::new()
821 }
822 };
823
824 info!("BLE peripheral {} ready for mesh traffic", peripheral_id);
825 Ok(Some(MacosCentralLink::new(
826 peripheral,
827 rx_char,
828 notifications,
829 initial_frames,
830 )))
831 }
832
833 struct FrameDecoder {
834 buffer: Vec<u8>,
835 }
836
837 impl FrameDecoder {
838 fn new() -> Self {
839 Self { buffer: Vec::new() }
840 }
841
842 fn push(&mut self, chunk: &[u8]) -> Result<Vec<BluetoothFrame>> {
843 self.buffer.extend_from_slice(chunk);
844 let mut frames = Vec::new();
845 loop {
846 if self.buffer.len() < 5 {
847 break;
848 }
849 let kind = self.buffer[0];
850 let len = u32::from_be_bytes([
851 self.buffer[1],
852 self.buffer[2],
853 self.buffer[3],
854 self.buffer[4],
855 ]) as usize;
856 if self.buffer.len() < 5 + len {
857 break;
858 }
859 let payload = self.buffer[5..5 + len].to_vec();
860 self.buffer.drain(..5 + len);
861 let frame = match kind {
862 1 => BluetoothFrame::Text(String::from_utf8(payload)?),
863 2 => BluetoothFrame::Binary(payload),
864 _ => return Err(anyhow!("unknown bluetooth frame kind {}", kind)),
865 };
866 frames.push(frame);
867 }
868 Ok(frames)
869 }
870 }
871
872 fn encode_frame(frame: BluetoothFrame) -> Vec<u8> {
873 match frame {
874 BluetoothFrame::Text(text) => {
875 let payload = text.into_bytes();
876 let mut out = Vec::with_capacity(5 + payload.len());
877 out.push(1);
878 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
879 out.extend_from_slice(&payload);
880 out
881 }
882 BluetoothFrame::Binary(payload) => {
883 let mut out = Vec::with_capacity(5 + payload.len());
884 out.push(2);
885 out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
886 out.extend_from_slice(&payload);
887 out
888 }
889 }
890 }
891
892 struct MacosCentralLink {
893 peripheral: Peripheral,
894 rx_char: Characteristic,
895 open: std::sync::atomic::AtomicBool,
896 rx: Mutex<mpsc::Receiver<BluetoothFrame>>,
897 }
898
899 impl MacosCentralLink {
900 fn new(
901 peripheral: Peripheral,
902 rx_char: Characteristic,
903 mut notifications: impl futures::Stream<Item = ValueNotification> + Send + Unpin + 'static,
904 initial_frames: Vec<BluetoothFrame>,
905 ) -> Arc<Self> {
906 let (tx, rx) = mpsc::channel(64);
907 let link = Arc::new(Self {
908 peripheral,
909 rx_char,
910 open: std::sync::atomic::AtomicBool::new(true),
911 rx: Mutex::new(rx),
912 });
913
914 for frame in initial_frames {
915 let _ = tx.try_send(frame);
916 }
917
918 let weak = Arc::downgrade(&link);
919 tokio::spawn(async move {
920 let mut decoder = FrameDecoder::new();
921 while let Some(notification) = notifications.next().await {
922 let Ok(frames) = decoder.push(¬ification.value) else {
923 break;
924 };
925 for frame in frames {
926 if tx.send(frame).await.is_err() {
927 break;
928 }
929 }
930 }
931 if let Some(link) = weak.upgrade() {
932 link.open.store(false, std::sync::atomic::Ordering::Relaxed);
933 }
934 });
935
936 link
937 }
938 }
939
940 #[async_trait]
941 impl BluetoothLink for MacosCentralLink {
942 async fn send(&self, frame: BluetoothFrame) -> Result<()> {
943 let bytes = encode_frame(frame);
944 for chunk in bytes.chunks(HTREE_BLE_CHUNK_BYTES) {
945 self.peripheral
946 .write(&self.rx_char, chunk, WriteType::WithResponse)
947 .await?;
948 }
949 Ok(())
950 }
951
952 async fn recv(&self) -> Option<BluetoothFrame> {
953 self.rx.lock().await.recv().await
954 }
955
956 fn is_open(&self) -> bool {
957 self.open.load(std::sync::atomic::Ordering::Relaxed)
958 }
959
960 async fn close(&self) -> Result<()> {
961 self.open.store(false, std::sync::atomic::Ordering::Relaxed);
962 let _ = self.peripheral.disconnect().await;
963 Ok(())
964 }
965 }
966}