1use async_trait::async_trait;
19use bluer::{
20 adv::{Advertisement, AdvertisementHandle},
21 gatt::local::{
22 Application, ApplicationHandle, Characteristic, CharacteristicNotify,
23 CharacteristicNotifyMethod, CharacteristicRead, CharacteristicWrite,
24 CharacteristicWriteMethod, Service,
25 },
26 Adapter, Address, Session,
27};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::{broadcast, Mutex, RwLock};
31
32use crate::config::{BleConfig, DiscoveryConfig};
33use crate::error::{BleError, Result};
34use crate::gatt::HiveCharacteristicUuids;
35use crate::platform::{
36 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
37 DiscoveryCallback,
38};
39use crate::transport::BleConnection;
40use crate::{NodeId, HIVE_SERVICE_UUID};
41
42use super::BluerConnection;
43
44#[derive(Default)]
46struct AdapterState {
47 connections: HashMap<NodeId, BluerConnection>,
49 address_to_node: HashMap<Address, NodeId>,
51 node_to_address: HashMap<NodeId, Address>,
53 #[allow(dead_code)]
56 discovered: HashMap<Address, DiscoveredDevice>,
57}
58
59type SyncCallback = Box<dyn Fn(Vec<u8>) + Send + Sync>;
61
62struct GattState {
64 node_id: Mutex<Option<NodeId>>,
66 node_info: Mutex<Vec<u8>>,
68 sync_state: Mutex<Vec<u8>>,
70 status: Mutex<Vec<u8>>,
72 sync_data_callback: Mutex<Option<SyncCallback>>,
74 command_callback: Mutex<Option<SyncCallback>>,
76}
77
78impl GattState {
79 fn new() -> Self {
80 Self {
81 node_id: Mutex::new(None),
82 node_info: Mutex::new(Vec::new()),
83 sync_state: Mutex::new(Vec::new()),
84 status: Mutex::new(Vec::new()),
85 sync_data_callback: Mutex::new(None),
86 command_callback: Mutex::new(None),
87 }
88 }
89
90 async fn init(&self, node_id: NodeId) {
92 *self.node_id.lock().await = Some(node_id);
93 *self.node_info.lock().await = node_id.as_u32().to_le_bytes().to_vec();
95 *self.sync_state.lock().await = vec![0x00];
97 *self.status.lock().await = vec![0x00];
99 }
100}
101
102pub struct BluerAdapter {
107 #[allow(dead_code)]
109 session: Session,
110 adapter: Adapter,
112 cached_address: Option<String>,
114 cached_powered: std::sync::atomic::AtomicBool,
116 config: RwLock<Option<BleConfig>>,
118 state: RwLock<AdapterState>,
120 adv_handle: RwLock<Option<AdvertisementHandle>>,
122 gatt_handle: RwLock<Option<ApplicationHandle>>,
124 gatt_state: Arc<GattState>,
126 discovery_callback: RwLock<Option<DiscoveryCallback>>,
128 connection_callback: RwLock<Option<ConnectionCallback>>,
130 shutdown_tx: broadcast::Sender<()>,
132}
133
134impl BluerAdapter {
135 pub async fn new() -> Result<Self> {
139 let session = Session::new().await.map_err(|e| {
140 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
141 })?;
142
143 let adapter = session
144 .default_adapter()
145 .await
146 .map_err(|_| BleError::AdapterNotAvailable)?;
147
148 let powered = adapter.is_powered().await.map_err(|e| {
150 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
151 })?;
152
153 if !powered {
154 adapter.set_powered(true).await.map_err(|e| {
156 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
157 })?;
158 }
159
160 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
162
163 let (shutdown_tx, _) = broadcast::channel(1);
164
165 Ok(Self {
166 session,
167 adapter,
168 cached_address,
169 cached_powered: std::sync::atomic::AtomicBool::new(true), config: RwLock::new(None),
171 state: RwLock::new(AdapterState::default()),
172 adv_handle: RwLock::new(None),
173 gatt_handle: RwLock::new(None),
174 gatt_state: Arc::new(GattState::new()),
175 discovery_callback: RwLock::new(None),
176 connection_callback: RwLock::new(None),
177 shutdown_tx,
178 })
179 }
180
181 pub async fn with_adapter_name(name: &str) -> Result<Self> {
183 let session = Session::new().await.map_err(|e| {
184 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
185 })?;
186
187 let adapter = session.adapter(name).map_err(|e| {
188 BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
189 })?;
190
191 let powered = adapter.is_powered().await.map_err(|e| {
192 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
193 })?;
194
195 if !powered {
196 adapter.set_powered(true).await.map_err(|e| {
197 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
198 })?;
199 }
200
201 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
203
204 let (shutdown_tx, _) = broadcast::channel(1);
205
206 Ok(Self {
207 session,
208 adapter,
209 cached_address,
210 cached_powered: std::sync::atomic::AtomicBool::new(true),
211 config: RwLock::new(None),
212 state: RwLock::new(AdapterState::default()),
213 adv_handle: RwLock::new(None),
214 gatt_handle: RwLock::new(None),
215 gatt_state: Arc::new(GattState::new()),
216 discovery_callback: RwLock::new(None),
217 connection_callback: RwLock::new(None),
218 shutdown_tx,
219 })
220 }
221
222 pub fn adapter_name(&self) -> &str {
224 self.adapter.name()
225 }
226
227 fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
229 let mut adv = Advertisement {
230 advertisement_type: bluer::adv::Type::Peripheral,
231 service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
232 local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
233 discoverable: Some(true),
234 ..Default::default()
235 };
236
237 adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
239
240 adv
241 }
242
243 #[allow(dead_code)]
246 fn parse_hive_beacon(
247 &self,
248 address: Address,
249 name: Option<String>,
250 rssi: i16,
251 service_data: &HashMap<bluer::Uuid, Vec<u8>>,
252 _manufacturer_data: &HashMap<u16, Vec<u8>>,
253 ) -> Option<DiscoveredDevice> {
254 let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
256
257 let node_id = name
259 .as_ref()
260 .and_then(|n| n.strip_prefix("HIVE-"))
261 .and_then(NodeId::parse);
262
263 Some(DiscoveredDevice {
264 address: address.to_string(),
265 name,
266 rssi: rssi as i8,
267 is_hive_node: is_hive || node_id.is_some(),
268 node_id,
269 adv_data: Vec::new(), })
271 }
272
273 pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
275 let mut state = self.state.write().await;
276 state.address_to_node.insert(address, node_id);
277 state.node_to_address.insert(node_id, address);
278 }
279
280 pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
282 let state = self.state.read().await;
283 state.node_to_address.get(node_id).copied()
284 }
285}
286
287#[async_trait]
288impl BleAdapter for BluerAdapter {
289 async fn init(&mut self, config: &BleConfig) -> Result<()> {
290 *self.config.write().await = Some(config.clone());
291 log::info!(
292 "BluerAdapter initialized for node {:08X}",
293 config.node_id.as_u32()
294 );
295 Ok(())
296 }
297
298 async fn start(&self) -> Result<()> {
299 let config = self.config.read().await;
300 let config = config
301 .as_ref()
302 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
303
304 self.start_advertising(&config.discovery).await?;
306
307 self.start_scan(&config.discovery).await?;
309
310 log::info!("BluerAdapter started");
311 Ok(())
312 }
313
314 async fn stop(&self) -> Result<()> {
315 self.stop_advertising().await?;
317
318 self.stop_scan().await?;
320
321 let _ = self.shutdown_tx.send(());
323
324 log::info!("BluerAdapter stopped");
325 Ok(())
326 }
327
328 fn is_powered(&self) -> bool {
329 self.cached_powered
330 .load(std::sync::atomic::Ordering::Relaxed)
331 }
332
333 fn address(&self) -> Option<String> {
334 self.cached_address.clone()
335 }
336
337 async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
338 use bluer::DiscoveryFilter;
339 use bluer::DiscoveryTransport;
340
341 let filter = DiscoveryFilter {
342 transport: DiscoveryTransport::Le,
343 duplicate_data: !config.filter_duplicates,
344 ..Default::default()
345 };
346
347 self.adapter
348 .set_discovery_filter(filter)
349 .await
350 .map_err(|e| {
351 BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
352 })?;
353
354 let discover =
356 self.adapter.discover_devices().await.map_err(|e| {
357 BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
358 })?;
359
360 let callback = self.discovery_callback.read().await.clone();
362 let adapter = self.adapter.clone();
363 let mut shutdown_rx = self.shutdown_tx.subscribe();
364
365 tokio::spawn(async move {
366 use tokio_stream::StreamExt;
367 let mut discover = std::pin::pin!(discover);
368
369 loop {
370 tokio::select! {
371 _ = shutdown_rx.recv() => {
372 log::debug!("Discovery task shutting down");
373 break;
374 }
375 event = discover.next() => {
376 match event {
377 Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
378 if let Ok(device) = adapter.device(addr) {
379 let name = device.name().await.ok().flatten();
381 let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
382
383 let service_uuids = device.uuids().await.ok().flatten().unwrap_or_default();
385
386 let has_hive_service = service_uuids.contains(&HIVE_SERVICE_UUID);
388
389 let name_indicates_hive = name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false);
391
392 let is_hive_node = has_hive_service || name_indicates_hive;
394
395 let discovered = DiscoveredDevice {
396 address: addr.to_string(),
397 name: name.clone(),
398 rssi: rssi as i8,
399 is_hive_node,
400 node_id: name.and_then(|n| {
401 n.strip_prefix("HIVE-").and_then(NodeId::parse)
402 }),
403 adv_data: Vec::new(),
404 };
405
406 log::debug!(
407 "Discovered device: {} (HIVE: {}, service_uuid: {}, name: {})",
408 discovered.address, is_hive_node, has_hive_service, name_indicates_hive
409 );
410
411 if let Some(ref cb) = callback {
412 cb(discovered);
413 }
414 }
415 }
416 Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
417 log::debug!("Device removed: {}", addr);
418 }
419 None => break,
420 _ => {}
421 }
422 }
423 }
424 }
425 });
426
427 log::info!("BLE scanning started");
428 Ok(())
429 }
430
431 async fn stop_scan(&self) -> Result<()> {
432 log::info!("BLE scanning stopped");
435 Ok(())
436 }
437
438 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
439 let ble_config = self.config.read().await;
440 let ble_config = ble_config
441 .as_ref()
442 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
443
444 let adv = self.build_advertisement(ble_config);
445
446 let handle =
447 self.adapter.advertise(adv).await.map_err(|e| {
448 BleError::PlatformError(format!("Failed to start advertising: {}", e))
449 })?;
450
451 *self.adv_handle.write().await = Some(handle);
452
453 log::info!(
454 "BLE advertising started for HIVE-{:08X}",
455 ble_config.node_id.as_u32()
456 );
457 Ok(())
458 }
459
460 async fn stop_advertising(&self) -> Result<()> {
461 *self.adv_handle.write().await = None;
463 log::info!("BLE advertising stopped");
464 Ok(())
465 }
466
467 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
468 if let Ok(mut cb) = self.discovery_callback.try_write() {
471 *cb = callback;
472 }
473 }
474
475 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
476 let address = self
478 .get_node_address(peer_id)
479 .await
480 .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
481
482 let device = self
483 .adapter
484 .device(address)
485 .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
486
487 device
489 .connect()
490 .await
491 .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
492
493 let connection = BluerConnection::new(*peer_id, device).await?;
495
496 {
498 let mut state = self.state.write().await;
499 state.connections.insert(*peer_id, connection.clone());
500 }
501
502 if let Some(ref cb) = *self.connection_callback.read().await {
504 cb(
505 *peer_id,
506 ConnectionEvent::Connected {
507 mtu: connection.mtu(),
508 phy: connection.phy(),
509 },
510 );
511 }
512
513 log::info!("Connected to peer {}", peer_id);
514 Ok(Box::new(connection))
515 }
516
517 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
518 let connection = {
519 let mut state = self.state.write().await;
520 state.connections.remove(peer_id)
521 };
522
523 if let Some(conn) = connection {
524 conn.disconnect().await?;
525
526 if let Some(ref cb) = *self.connection_callback.read().await {
528 cb(
529 *peer_id,
530 ConnectionEvent::Disconnected {
531 reason: DisconnectReason::LocalRequest,
532 },
533 );
534 }
535
536 log::info!("Disconnected from peer {}", peer_id);
537 }
538
539 Ok(())
540 }
541
542 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
543 if let Ok(state) = self.state.try_read() {
545 state
546 .connections
547 .get(peer_id)
548 .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
549 } else {
550 None
551 }
552 }
553
554 fn peer_count(&self) -> usize {
555 if let Ok(state) = self.state.try_read() {
556 state.connections.len()
557 } else {
558 0
559 }
560 }
561
562 fn connected_peers(&self) -> Vec<NodeId> {
563 if let Ok(state) = self.state.try_read() {
564 state.connections.keys().cloned().collect()
565 } else {
566 Vec::new()
567 }
568 }
569
570 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
571 if let Ok(mut cb) = self.connection_callback.try_write() {
572 *cb = callback;
573 }
574 }
575
576 async fn register_gatt_service(&self) -> Result<()> {
577 let config = self.config.read().await;
579 let node_id = config
580 .as_ref()
581 .map(|c| c.node_id)
582 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
583
584 self.gatt_state.init(node_id).await;
586
587 let state = self.gatt_state.clone();
589 let state_read_node = state.clone();
590 let state_read_sync = state.clone();
591 let state_read_status = state.clone();
592 let state_write_sync = state.clone();
593 let state_write_cmd = state.clone();
594
595 let app = Application {
597 services: vec![Service {
598 uuid: HIVE_SERVICE_UUID,
599 primary: true,
600 characteristics: vec![
601 Characteristic {
603 uuid: HiveCharacteristicUuids::node_info(),
604 read: Some(CharacteristicRead {
605 read: true,
606 fun: Box::new(move |req| {
607 let state = state_read_node.clone();
608 Box::pin(async move {
609 let data = state.node_info.lock().await;
610 log::debug!(
611 "GATT read node_info from {:?}: {} bytes",
612 req.device_address,
613 data.len()
614 );
615 Ok(data.clone())
616 })
617 }),
618 ..Default::default()
619 }),
620 ..Default::default()
621 },
622 Characteristic {
624 uuid: HiveCharacteristicUuids::sync_state(),
625 read: Some(CharacteristicRead {
626 read: true,
627 fun: Box::new(move |req| {
628 let state = state_read_sync.clone();
629 Box::pin(async move {
630 let data = state.sync_state.lock().await;
631 log::debug!(
632 "GATT read sync_state from {:?}: {} bytes",
633 req.device_address,
634 data.len()
635 );
636 Ok(data.clone())
637 })
638 }),
639 ..Default::default()
640 }),
641 notify: Some(CharacteristicNotify {
642 notify: true,
643 method: CharacteristicNotifyMethod::Io,
644 ..Default::default()
645 }),
646 ..Default::default()
647 },
648 Characteristic {
650 uuid: HiveCharacteristicUuids::sync_data(),
651 write: Some(CharacteristicWrite {
652 write: true,
653 method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
654 let state = state_write_sync.clone();
655 Box::pin(async move {
656 log::debug!(
657 "GATT write sync_data from {:?}: {} bytes",
658 req.device_address,
659 data.len()
660 );
661 if let Some(ref cb) = *state.sync_data_callback.lock().await {
663 cb(data);
664 }
665 Ok(())
666 })
667 })),
668 ..Default::default()
669 }),
670 notify: Some(CharacteristicNotify {
671 indicate: true,
672 method: CharacteristicNotifyMethod::Io,
673 ..Default::default()
674 }),
675 ..Default::default()
676 },
677 Characteristic {
679 uuid: HiveCharacteristicUuids::command(),
680 write: Some(CharacteristicWrite {
681 write: true,
682 write_without_response: true,
683 method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
684 let state = state_write_cmd.clone();
685 Box::pin(async move {
686 log::debug!(
687 "GATT write command from {:?}: {} bytes",
688 req.device_address,
689 data.len()
690 );
691 if let Some(ref cb) = *state.command_callback.lock().await {
693 cb(data);
694 }
695 Ok(())
696 })
697 })),
698 ..Default::default()
699 }),
700 ..Default::default()
701 },
702 Characteristic {
704 uuid: HiveCharacteristicUuids::status(),
705 read: Some(CharacteristicRead {
706 read: true,
707 fun: Box::new(move |req| {
708 let state = state_read_status.clone();
709 Box::pin(async move {
710 let data = state.status.lock().await;
711 log::debug!(
712 "GATT read status from {:?}: {} bytes",
713 req.device_address,
714 data.len()
715 );
716 Ok(data.clone())
717 })
718 }),
719 ..Default::default()
720 }),
721 notify: Some(CharacteristicNotify {
722 notify: true,
723 method: CharacteristicNotifyMethod::Io,
724 ..Default::default()
725 }),
726 ..Default::default()
727 },
728 ],
729 ..Default::default()
730 }],
731 ..Default::default()
732 };
733
734 let handle = self
736 .adapter
737 .serve_gatt_application(app)
738 .await
739 .map_err(|e| BleError::GattError(format!("Failed to register GATT service: {}", e)))?;
740
741 *self.gatt_handle.write().await = Some(handle);
743
744 log::info!(
745 "GATT service registered for node {:08X} with 5 characteristics",
746 node_id.as_u32()
747 );
748 Ok(())
749 }
750
751 async fn unregister_gatt_service(&self) -> Result<()> {
752 let handle = self.gatt_handle.write().await.take();
754 if handle.is_some() {
755 log::info!("GATT service unregistered");
756 }
757 Ok(())
758 }
759
760 fn supports_coded_phy(&self) -> bool {
761 true
765 }
766
767 fn supports_extended_advertising(&self) -> bool {
768 true
770 }
771
772 fn max_mtu(&self) -> u16 {
773 517
775 }
776
777 fn max_connections(&self) -> u8 {
778 7
780 }
781}
782
783#[cfg(test)]
784mod tests {
785 #[tokio::test]
789 #[ignore = "Requires BlueZ and Bluetooth hardware"]
790 async fn test_adapter_creation() {
791 use super::*;
792
793 let adapter = BluerAdapter::new().await;
794 assert!(
795 adapter.is_ok(),
796 "Failed to create adapter: {:?}",
797 adapter.err()
798 );
799 }
800
801 #[tokio::test]
802 #[ignore = "Requires BlueZ and Bluetooth hardware"]
803 async fn test_adapter_init() {
804 use super::*;
805 use crate::BleConfig;
806
807 let mut adapter = BluerAdapter::new().await.unwrap();
808 let config = BleConfig::new(NodeId::new(0x12345678));
809 let result = adapter.init(&config).await;
810 assert!(result.is_ok());
811 }
812}