1use async_trait::async_trait;
19use bluer::{
20 adv::{Advertisement, AdvertisementHandle},
21 gatt::local::{
22 Application, ApplicationHandle, Characteristic, CharacteristicNotify,
23 CharacteristicNotifyMethod, CharacteristicRead, CharacteristicWrite,
24 CharacteristicWriteMethod, Service,
25 },
26 Adapter, Address, Session,
27};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::{broadcast, Mutex, RwLock};
31
32use crate::config::{BleConfig, DiscoveryConfig};
33use crate::error::{BleError, Result};
34use crate::gatt::HiveCharacteristicUuids;
35use crate::platform::{
36 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
37 DiscoveryCallback,
38};
39use crate::transport::BleConnection;
40use crate::{NodeId, HIVE_SERVICE_UUID};
41
42use super::BluerConnection;
43
44struct AdapterState {
46 connections: HashMap<NodeId, BluerConnection>,
48 address_to_node: HashMap<Address, NodeId>,
50 node_to_address: HashMap<NodeId, Address>,
52 #[allow(dead_code)]
55 discovered: HashMap<Address, DiscoveredDevice>,
56}
57
58impl Default for AdapterState {
59 fn default() -> Self {
60 Self {
61 connections: HashMap::new(),
62 address_to_node: HashMap::new(),
63 node_to_address: HashMap::new(),
64 discovered: HashMap::new(),
65 }
66 }
67}
68
69struct GattState {
71 node_id: Mutex<Option<NodeId>>,
73 node_info: Mutex<Vec<u8>>,
75 sync_state: Mutex<Vec<u8>>,
77 status: Mutex<Vec<u8>>,
79 sync_data_callback: Mutex<Option<Box<dyn Fn(Vec<u8>) + Send + Sync>>>,
81 command_callback: Mutex<Option<Box<dyn Fn(Vec<u8>) + Send + Sync>>>,
83}
84
85impl GattState {
86 fn new() -> Self {
87 Self {
88 node_id: Mutex::new(None),
89 node_info: Mutex::new(Vec::new()),
90 sync_state: Mutex::new(Vec::new()),
91 status: Mutex::new(Vec::new()),
92 sync_data_callback: Mutex::new(None),
93 command_callback: Mutex::new(None),
94 }
95 }
96
97 async fn init(&self, node_id: NodeId) {
99 *self.node_id.lock().await = Some(node_id);
100 *self.node_info.lock().await = node_id.as_u32().to_le_bytes().to_vec();
102 *self.sync_state.lock().await = vec![0x00];
104 *self.status.lock().await = vec![0x00];
106 }
107}
108
109pub struct BluerAdapter {
114 #[allow(dead_code)]
116 session: Session,
117 adapter: Adapter,
119 cached_address: Option<String>,
121 cached_powered: std::sync::atomic::AtomicBool,
123 config: RwLock<Option<BleConfig>>,
125 state: RwLock<AdapterState>,
127 adv_handle: RwLock<Option<AdvertisementHandle>>,
129 gatt_handle: RwLock<Option<ApplicationHandle>>,
131 gatt_state: Arc<GattState>,
133 discovery_callback: RwLock<Option<DiscoveryCallback>>,
135 connection_callback: RwLock<Option<ConnectionCallback>>,
137 shutdown_tx: broadcast::Sender<()>,
139}
140
141impl BluerAdapter {
142 pub async fn new() -> Result<Self> {
146 let session = Session::new().await.map_err(|e| {
147 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
148 })?;
149
150 let adapter = session
151 .default_adapter()
152 .await
153 .map_err(|_| BleError::AdapterNotAvailable)?;
154
155 let powered = adapter.is_powered().await.map_err(|e| {
157 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
158 })?;
159
160 if !powered {
161 adapter.set_powered(true).await.map_err(|e| {
163 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
164 })?;
165 }
166
167 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
169
170 let (shutdown_tx, _) = broadcast::channel(1);
171
172 Ok(Self {
173 session,
174 adapter,
175 cached_address,
176 cached_powered: std::sync::atomic::AtomicBool::new(true), config: RwLock::new(None),
178 state: RwLock::new(AdapterState::default()),
179 adv_handle: RwLock::new(None),
180 gatt_handle: RwLock::new(None),
181 gatt_state: Arc::new(GattState::new()),
182 discovery_callback: RwLock::new(None),
183 connection_callback: RwLock::new(None),
184 shutdown_tx,
185 })
186 }
187
188 pub async fn with_adapter_name(name: &str) -> Result<Self> {
190 let session = Session::new().await.map_err(|e| {
191 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
192 })?;
193
194 let adapter = session.adapter(name).map_err(|e| {
195 BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
196 })?;
197
198 let powered = adapter.is_powered().await.map_err(|e| {
199 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
200 })?;
201
202 if !powered {
203 adapter.set_powered(true).await.map_err(|e| {
204 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
205 })?;
206 }
207
208 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
210
211 let (shutdown_tx, _) = broadcast::channel(1);
212
213 Ok(Self {
214 session,
215 adapter,
216 cached_address,
217 cached_powered: std::sync::atomic::AtomicBool::new(true),
218 config: RwLock::new(None),
219 state: RwLock::new(AdapterState::default()),
220 adv_handle: RwLock::new(None),
221 gatt_handle: RwLock::new(None),
222 gatt_state: Arc::new(GattState::new()),
223 discovery_callback: RwLock::new(None),
224 connection_callback: RwLock::new(None),
225 shutdown_tx,
226 })
227 }
228
229 pub fn adapter_name(&self) -> &str {
231 self.adapter.name()
232 }
233
234 fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
236 let mut adv = Advertisement {
237 advertisement_type: bluer::adv::Type::Peripheral,
238 service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
239 local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
240 discoverable: Some(true),
241 ..Default::default()
242 };
243
244 adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
246
247 adv
248 }
249
250 #[allow(dead_code)]
253 fn parse_hive_beacon(
254 &self,
255 address: Address,
256 name: Option<String>,
257 rssi: i16,
258 service_data: &HashMap<bluer::Uuid, Vec<u8>>,
259 _manufacturer_data: &HashMap<u16, Vec<u8>>,
260 ) -> Option<DiscoveredDevice> {
261 let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
263
264 let node_id = name.as_ref().and_then(|n| {
266 if n.starts_with("HIVE-") {
267 NodeId::parse(&n[5..])
268 } else {
269 None
270 }
271 });
272
273 Some(DiscoveredDevice {
274 address: address.to_string(),
275 name,
276 rssi: rssi as i8,
277 is_hive_node: is_hive || node_id.is_some(),
278 node_id,
279 adv_data: Vec::new(), })
281 }
282
283 pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
285 let mut state = self.state.write().await;
286 state.address_to_node.insert(address, node_id.clone());
287 state.node_to_address.insert(node_id, address);
288 }
289
290 pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
292 let state = self.state.read().await;
293 state.node_to_address.get(node_id).copied()
294 }
295}
296
297#[async_trait]
298impl BleAdapter for BluerAdapter {
299 async fn init(&mut self, config: &BleConfig) -> Result<()> {
300 *self.config.write().await = Some(config.clone());
301 log::info!(
302 "BluerAdapter initialized for node {:08X}",
303 config.node_id.as_u32()
304 );
305 Ok(())
306 }
307
308 async fn start(&self) -> Result<()> {
309 let config = self.config.read().await;
310 let config = config
311 .as_ref()
312 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
313
314 self.start_advertising(&config.discovery).await?;
316
317 self.start_scan(&config.discovery).await?;
319
320 log::info!("BluerAdapter started");
321 Ok(())
322 }
323
324 async fn stop(&self) -> Result<()> {
325 self.stop_advertising().await?;
327
328 self.stop_scan().await?;
330
331 let _ = self.shutdown_tx.send(());
333
334 log::info!("BluerAdapter stopped");
335 Ok(())
336 }
337
338 fn is_powered(&self) -> bool {
339 self.cached_powered
340 .load(std::sync::atomic::Ordering::Relaxed)
341 }
342
343 fn address(&self) -> Option<String> {
344 self.cached_address.clone()
345 }
346
347 async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
348 use bluer::DiscoveryFilter;
349 use bluer::DiscoveryTransport;
350
351 let filter = DiscoveryFilter {
352 transport: DiscoveryTransport::Le,
353 duplicate_data: !config.filter_duplicates,
354 ..Default::default()
355 };
356
357 self.adapter
358 .set_discovery_filter(filter)
359 .await
360 .map_err(|e| {
361 BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
362 })?;
363
364 let discover =
366 self.adapter.discover_devices().await.map_err(|e| {
367 BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
368 })?;
369
370 let callback = self.discovery_callback.read().await.clone();
372 let adapter = self.adapter.clone();
373 let mut shutdown_rx = self.shutdown_tx.subscribe();
374
375 tokio::spawn(async move {
376 use tokio_stream::StreamExt;
377 let mut discover = std::pin::pin!(discover);
378
379 loop {
380 tokio::select! {
381 _ = shutdown_rx.recv() => {
382 log::debug!("Discovery task shutting down");
383 break;
384 }
385 event = discover.next() => {
386 match event {
387 Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
388 if let Ok(device) = adapter.device(addr) {
389 let name = device.name().await.ok().flatten();
391 let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
392
393 let service_uuids = device.uuids().await.ok().flatten().unwrap_or_default();
395
396 let has_hive_service = service_uuids.contains(&HIVE_SERVICE_UUID);
398
399 let name_indicates_hive = name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false);
401
402 let is_hive_node = has_hive_service || name_indicates_hive;
404
405 let discovered = DiscoveredDevice {
406 address: addr.to_string(),
407 name: name.clone(),
408 rssi: rssi as i8,
409 is_hive_node,
410 node_id: name.and_then(|n| {
411 n.strip_prefix("HIVE-").and_then(NodeId::parse)
412 }),
413 adv_data: Vec::new(),
414 };
415
416 log::debug!(
417 "Discovered device: {} (HIVE: {}, service_uuid: {}, name: {})",
418 discovered.address, is_hive_node, has_hive_service, name_indicates_hive
419 );
420
421 if let Some(ref cb) = callback {
422 cb(discovered);
423 }
424 }
425 }
426 Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
427 log::debug!("Device removed: {}", addr);
428 }
429 None => break,
430 _ => {}
431 }
432 }
433 }
434 }
435 });
436
437 log::info!("BLE scanning started");
438 Ok(())
439 }
440
441 async fn stop_scan(&self) -> Result<()> {
442 log::info!("BLE scanning stopped");
445 Ok(())
446 }
447
448 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
449 let ble_config = self.config.read().await;
450 let ble_config = ble_config
451 .as_ref()
452 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
453
454 let adv = self.build_advertisement(ble_config);
455
456 let handle =
457 self.adapter.advertise(adv).await.map_err(|e| {
458 BleError::PlatformError(format!("Failed to start advertising: {}", e))
459 })?;
460
461 *self.adv_handle.write().await = Some(handle);
462
463 log::info!(
464 "BLE advertising started for HIVE-{:08X}",
465 ble_config.node_id.as_u32()
466 );
467 Ok(())
468 }
469
470 async fn stop_advertising(&self) -> Result<()> {
471 *self.adv_handle.write().await = None;
473 log::info!("BLE advertising stopped");
474 Ok(())
475 }
476
477 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
478 if let Ok(mut cb) = self.discovery_callback.try_write() {
481 *cb = callback;
482 }
483 }
484
485 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
486 let address = self
488 .get_node_address(peer_id)
489 .await
490 .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
491
492 let device = self
493 .adapter
494 .device(address)
495 .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
496
497 device
499 .connect()
500 .await
501 .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
502
503 let connection = BluerConnection::new(peer_id.clone(), device).await?;
505
506 {
508 let mut state = self.state.write().await;
509 state
510 .connections
511 .insert(peer_id.clone(), connection.clone());
512 }
513
514 if let Some(ref cb) = *self.connection_callback.read().await {
516 cb(
517 peer_id.clone(),
518 ConnectionEvent::Connected {
519 mtu: connection.mtu(),
520 phy: connection.phy(),
521 },
522 );
523 }
524
525 log::info!("Connected to peer {}", peer_id);
526 Ok(Box::new(connection))
527 }
528
529 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
530 let connection = {
531 let mut state = self.state.write().await;
532 state.connections.remove(peer_id)
533 };
534
535 if let Some(conn) = connection {
536 conn.disconnect().await?;
537
538 if let Some(ref cb) = *self.connection_callback.read().await {
540 cb(
541 peer_id.clone(),
542 ConnectionEvent::Disconnected {
543 reason: DisconnectReason::LocalRequest,
544 },
545 );
546 }
547
548 log::info!("Disconnected from peer {}", peer_id);
549 }
550
551 Ok(())
552 }
553
554 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
555 if let Ok(state) = self.state.try_read() {
557 state
558 .connections
559 .get(peer_id)
560 .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
561 } else {
562 None
563 }
564 }
565
566 fn peer_count(&self) -> usize {
567 if let Ok(state) = self.state.try_read() {
568 state.connections.len()
569 } else {
570 0
571 }
572 }
573
574 fn connected_peers(&self) -> Vec<NodeId> {
575 if let Ok(state) = self.state.try_read() {
576 state.connections.keys().cloned().collect()
577 } else {
578 Vec::new()
579 }
580 }
581
582 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
583 if let Ok(mut cb) = self.connection_callback.try_write() {
584 *cb = callback;
585 }
586 }
587
588 async fn register_gatt_service(&self) -> Result<()> {
589 let config = self.config.read().await;
591 let node_id = config
592 .as_ref()
593 .map(|c| c.node_id)
594 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
595
596 self.gatt_state.init(node_id).await;
598
599 let state = self.gatt_state.clone();
601 let state_read_node = state.clone();
602 let state_read_sync = state.clone();
603 let state_read_status = state.clone();
604 let state_write_sync = state.clone();
605 let state_write_cmd = state.clone();
606
607 let app = Application {
609 services: vec![Service {
610 uuid: HIVE_SERVICE_UUID,
611 primary: true,
612 characteristics: vec![
613 Characteristic {
615 uuid: HiveCharacteristicUuids::node_info(),
616 read: Some(CharacteristicRead {
617 read: true,
618 fun: Box::new(move |req| {
619 let state = state_read_node.clone();
620 Box::pin(async move {
621 let data = state.node_info.lock().await;
622 log::debug!(
623 "GATT read node_info from {:?}: {} bytes",
624 req.device_address,
625 data.len()
626 );
627 Ok(data.clone())
628 })
629 }),
630 ..Default::default()
631 }),
632 ..Default::default()
633 },
634 Characteristic {
636 uuid: HiveCharacteristicUuids::sync_state(),
637 read: Some(CharacteristicRead {
638 read: true,
639 fun: Box::new(move |req| {
640 let state = state_read_sync.clone();
641 Box::pin(async move {
642 let data = state.sync_state.lock().await;
643 log::debug!(
644 "GATT read sync_state from {:?}: {} bytes",
645 req.device_address,
646 data.len()
647 );
648 Ok(data.clone())
649 })
650 }),
651 ..Default::default()
652 }),
653 notify: Some(CharacteristicNotify {
654 notify: true,
655 method: CharacteristicNotifyMethod::Io,
656 ..Default::default()
657 }),
658 ..Default::default()
659 },
660 Characteristic {
662 uuid: HiveCharacteristicUuids::sync_data(),
663 write: Some(CharacteristicWrite {
664 write: true,
665 method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
666 let state = state_write_sync.clone();
667 Box::pin(async move {
668 log::debug!(
669 "GATT write sync_data from {:?}: {} bytes",
670 req.device_address,
671 data.len()
672 );
673 if let Some(ref cb) = *state.sync_data_callback.lock().await {
675 cb(data);
676 }
677 Ok(())
678 })
679 })),
680 ..Default::default()
681 }),
682 notify: Some(CharacteristicNotify {
683 indicate: true,
684 method: CharacteristicNotifyMethod::Io,
685 ..Default::default()
686 }),
687 ..Default::default()
688 },
689 Characteristic {
691 uuid: HiveCharacteristicUuids::command(),
692 write: Some(CharacteristicWrite {
693 write: true,
694 write_without_response: true,
695 method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
696 let state = state_write_cmd.clone();
697 Box::pin(async move {
698 log::debug!(
699 "GATT write command from {:?}: {} bytes",
700 req.device_address,
701 data.len()
702 );
703 if let Some(ref cb) = *state.command_callback.lock().await {
705 cb(data);
706 }
707 Ok(())
708 })
709 })),
710 ..Default::default()
711 }),
712 ..Default::default()
713 },
714 Characteristic {
716 uuid: HiveCharacteristicUuids::status(),
717 read: Some(CharacteristicRead {
718 read: true,
719 fun: Box::new(move |req| {
720 let state = state_read_status.clone();
721 Box::pin(async move {
722 let data = state.status.lock().await;
723 log::debug!(
724 "GATT read status from {:?}: {} bytes",
725 req.device_address,
726 data.len()
727 );
728 Ok(data.clone())
729 })
730 }),
731 ..Default::default()
732 }),
733 notify: Some(CharacteristicNotify {
734 notify: true,
735 method: CharacteristicNotifyMethod::Io,
736 ..Default::default()
737 }),
738 ..Default::default()
739 },
740 ],
741 ..Default::default()
742 }],
743 ..Default::default()
744 };
745
746 let handle = self
748 .adapter
749 .serve_gatt_application(app)
750 .await
751 .map_err(|e| BleError::GattError(format!("Failed to register GATT service: {}", e)))?;
752
753 *self.gatt_handle.write().await = Some(handle);
755
756 log::info!(
757 "GATT service registered for node {:08X} with 5 characteristics",
758 node_id.as_u32()
759 );
760 Ok(())
761 }
762
763 async fn unregister_gatt_service(&self) -> Result<()> {
764 let handle = self.gatt_handle.write().await.take();
766 if handle.is_some() {
767 log::info!("GATT service unregistered");
768 }
769 Ok(())
770 }
771
772 fn supports_coded_phy(&self) -> bool {
773 true
777 }
778
779 fn supports_extended_advertising(&self) -> bool {
780 true
782 }
783
784 fn max_mtu(&self) -> u16 {
785 517
787 }
788
789 fn max_connections(&self) -> u8 {
790 7
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 #[tokio::test]
801 #[ignore = "Requires BlueZ and Bluetooth hardware"]
802 async fn test_adapter_creation() {
803 use super::*;
804
805 let adapter = BluerAdapter::new().await;
806 assert!(
807 adapter.is_ok(),
808 "Failed to create adapter: {:?}",
809 adapter.err()
810 );
811 }
812
813 #[tokio::test]
814 #[ignore = "Requires BlueZ and Bluetooth hardware"]
815 async fn test_adapter_init() {
816 use super::*;
817 use crate::BleConfig;
818
819 let mut adapter = BluerAdapter::new().await.unwrap();
820 let config = BleConfig::new(NodeId::new(0x12345678));
821 let result = adapter.init(&config).await;
822 assert!(result.is_ok());
823 }
824}