1use async_trait::async_trait;
4use bluer::{
5 adv::{Advertisement, AdvertisementHandle},
6 gatt::local::{
7 Application, ApplicationHandle, Characteristic, CharacteristicNotify,
8 CharacteristicNotifyMethod, CharacteristicRead, CharacteristicWrite,
9 CharacteristicWriteMethod, Service,
10 },
11 Adapter, Address, Session,
12};
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::{broadcast, Mutex, RwLock};
16
17use crate::config::{BleConfig, DiscoveryConfig};
18use crate::error::{BleError, Result};
19use crate::gatt::HiveCharacteristicUuids;
20use crate::platform::{
21 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
22 DiscoveryCallback,
23};
24use crate::transport::BleConnection;
25use crate::{NodeId, HIVE_SERVICE_UUID};
26
27use super::BluerConnection;
28
29struct AdapterState {
31 connections: HashMap<NodeId, BluerConnection>,
33 address_to_node: HashMap<Address, NodeId>,
35 node_to_address: HashMap<NodeId, Address>,
37 #[allow(dead_code)]
40 discovered: HashMap<Address, DiscoveredDevice>,
41}
42
43impl Default for AdapterState {
44 fn default() -> Self {
45 Self {
46 connections: HashMap::new(),
47 address_to_node: HashMap::new(),
48 node_to_address: HashMap::new(),
49 discovered: HashMap::new(),
50 }
51 }
52}
53
54struct GattState {
56 node_id: Mutex<Option<NodeId>>,
58 node_info: Mutex<Vec<u8>>,
60 sync_state: Mutex<Vec<u8>>,
62 status: Mutex<Vec<u8>>,
64 sync_data_callback: Mutex<Option<Box<dyn Fn(Vec<u8>) + Send + Sync>>>,
66 command_callback: Mutex<Option<Box<dyn Fn(Vec<u8>) + Send + Sync>>>,
68}
69
70impl GattState {
71 fn new() -> Self {
72 Self {
73 node_id: Mutex::new(None),
74 node_info: Mutex::new(Vec::new()),
75 sync_state: Mutex::new(Vec::new()),
76 status: Mutex::new(Vec::new()),
77 sync_data_callback: Mutex::new(None),
78 command_callback: Mutex::new(None),
79 }
80 }
81
82 async fn init(&self, node_id: NodeId) {
84 *self.node_id.lock().await = Some(node_id);
85 *self.node_info.lock().await = node_id.as_u32().to_le_bytes().to_vec();
87 *self.sync_state.lock().await = vec![0x00];
89 *self.status.lock().await = vec![0x00];
91 }
92}
93
94pub struct BluerAdapter {
99 #[allow(dead_code)]
101 session: Session,
102 adapter: Adapter,
104 cached_address: Option<String>,
106 cached_powered: std::sync::atomic::AtomicBool,
108 config: RwLock<Option<BleConfig>>,
110 state: RwLock<AdapterState>,
112 adv_handle: RwLock<Option<AdvertisementHandle>>,
114 gatt_handle: RwLock<Option<ApplicationHandle>>,
116 gatt_state: Arc<GattState>,
118 discovery_callback: RwLock<Option<DiscoveryCallback>>,
120 connection_callback: RwLock<Option<ConnectionCallback>>,
122 shutdown_tx: broadcast::Sender<()>,
124}
125
126impl BluerAdapter {
127 pub async fn new() -> Result<Self> {
131 let session = Session::new().await.map_err(|e| {
132 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
133 })?;
134
135 let adapter = session
136 .default_adapter()
137 .await
138 .map_err(|_| BleError::AdapterNotAvailable)?;
139
140 let powered = adapter.is_powered().await.map_err(|e| {
142 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
143 })?;
144
145 if !powered {
146 adapter.set_powered(true).await.map_err(|e| {
148 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
149 })?;
150 }
151
152 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
154
155 let (shutdown_tx, _) = broadcast::channel(1);
156
157 Ok(Self {
158 session,
159 adapter,
160 cached_address,
161 cached_powered: std::sync::atomic::AtomicBool::new(true), config: RwLock::new(None),
163 state: RwLock::new(AdapterState::default()),
164 adv_handle: RwLock::new(None),
165 gatt_handle: RwLock::new(None),
166 gatt_state: Arc::new(GattState::new()),
167 discovery_callback: RwLock::new(None),
168 connection_callback: RwLock::new(None),
169 shutdown_tx,
170 })
171 }
172
173 pub async fn with_adapter_name(name: &str) -> Result<Self> {
175 let session = Session::new().await.map_err(|e| {
176 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
177 })?;
178
179 let adapter = session.adapter(name).map_err(|e| {
180 BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
181 })?;
182
183 let powered = adapter.is_powered().await.map_err(|e| {
184 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
185 })?;
186
187 if !powered {
188 adapter.set_powered(true).await.map_err(|e| {
189 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
190 })?;
191 }
192
193 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
195
196 let (shutdown_tx, _) = broadcast::channel(1);
197
198 Ok(Self {
199 session,
200 adapter,
201 cached_address,
202 cached_powered: std::sync::atomic::AtomicBool::new(true),
203 config: RwLock::new(None),
204 state: RwLock::new(AdapterState::default()),
205 adv_handle: RwLock::new(None),
206 gatt_handle: RwLock::new(None),
207 gatt_state: Arc::new(GattState::new()),
208 discovery_callback: RwLock::new(None),
209 connection_callback: RwLock::new(None),
210 shutdown_tx,
211 })
212 }
213
214 pub fn adapter_name(&self) -> &str {
216 self.adapter.name()
217 }
218
219 fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
221 let mut adv = Advertisement {
222 advertisement_type: bluer::adv::Type::Peripheral,
223 service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
224 local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
225 discoverable: Some(true),
226 ..Default::default()
227 };
228
229 adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
231
232 adv
233 }
234
235 #[allow(dead_code)]
238 fn parse_hive_beacon(
239 &self,
240 address: Address,
241 name: Option<String>,
242 rssi: i16,
243 service_data: &HashMap<bluer::Uuid, Vec<u8>>,
244 _manufacturer_data: &HashMap<u16, Vec<u8>>,
245 ) -> Option<DiscoveredDevice> {
246 let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
248
249 let node_id = name.as_ref().and_then(|n| {
251 if n.starts_with("HIVE-") {
252 NodeId::parse(&n[5..])
253 } else {
254 None
255 }
256 });
257
258 Some(DiscoveredDevice {
259 address: address.to_string(),
260 name,
261 rssi: rssi as i8,
262 is_hive_node: is_hive || node_id.is_some(),
263 node_id,
264 adv_data: Vec::new(), })
266 }
267
268 pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
270 let mut state = self.state.write().await;
271 state.address_to_node.insert(address, node_id.clone());
272 state.node_to_address.insert(node_id, address);
273 }
274
275 pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
277 let state = self.state.read().await;
278 state.node_to_address.get(node_id).copied()
279 }
280}
281
282#[async_trait]
283impl BleAdapter for BluerAdapter {
284 async fn init(&mut self, config: &BleConfig) -> Result<()> {
285 *self.config.write().await = Some(config.clone());
286 log::info!(
287 "BluerAdapter initialized for node {:08X}",
288 config.node_id.as_u32()
289 );
290 Ok(())
291 }
292
293 async fn start(&self) -> Result<()> {
294 let config = self.config.read().await;
295 let config = config
296 .as_ref()
297 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
298
299 self.start_advertising(&config.discovery).await?;
301
302 self.start_scan(&config.discovery).await?;
304
305 log::info!("BluerAdapter started");
306 Ok(())
307 }
308
309 async fn stop(&self) -> Result<()> {
310 self.stop_advertising().await?;
312
313 self.stop_scan().await?;
315
316 let _ = self.shutdown_tx.send(());
318
319 log::info!("BluerAdapter stopped");
320 Ok(())
321 }
322
323 fn is_powered(&self) -> bool {
324 self.cached_powered
325 .load(std::sync::atomic::Ordering::Relaxed)
326 }
327
328 fn address(&self) -> Option<String> {
329 self.cached_address.clone()
330 }
331
332 async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
333 use bluer::DiscoveryFilter;
334 use bluer::DiscoveryTransport;
335
336 let filter = DiscoveryFilter {
337 transport: DiscoveryTransport::Le,
338 duplicate_data: !config.filter_duplicates,
339 ..Default::default()
340 };
341
342 self.adapter
343 .set_discovery_filter(filter)
344 .await
345 .map_err(|e| {
346 BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
347 })?;
348
349 let discover =
351 self.adapter.discover_devices().await.map_err(|e| {
352 BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
353 })?;
354
355 let callback = self.discovery_callback.read().await.clone();
357 let adapter = self.adapter.clone();
358 let mut shutdown_rx = self.shutdown_tx.subscribe();
359
360 tokio::spawn(async move {
361 use tokio_stream::StreamExt;
362 let mut discover = std::pin::pin!(discover);
363
364 loop {
365 tokio::select! {
366 _ = shutdown_rx.recv() => {
367 log::debug!("Discovery task shutting down");
368 break;
369 }
370 event = discover.next() => {
371 match event {
372 Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
373 if let Ok(device) = adapter.device(addr) {
374 let name = device.name().await.ok().flatten();
376 let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
377
378 let service_uuids = device.uuids().await.ok().flatten().unwrap_or_default();
380
381 let has_hive_service = service_uuids.contains(&HIVE_SERVICE_UUID);
383
384 let name_indicates_hive = name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false);
386
387 let is_hive_node = has_hive_service || name_indicates_hive;
389
390 let discovered = DiscoveredDevice {
391 address: addr.to_string(),
392 name: name.clone(),
393 rssi: rssi as i8,
394 is_hive_node,
395 node_id: name.and_then(|n| {
396 n.strip_prefix("HIVE-").and_then(NodeId::parse)
397 }),
398 adv_data: Vec::new(),
399 };
400
401 log::debug!(
402 "Discovered device: {} (HIVE: {}, service_uuid: {}, name: {})",
403 discovered.address, is_hive_node, has_hive_service, name_indicates_hive
404 );
405
406 if let Some(ref cb) = callback {
407 cb(discovered);
408 }
409 }
410 }
411 Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
412 log::debug!("Device removed: {}", addr);
413 }
414 None => break,
415 _ => {}
416 }
417 }
418 }
419 }
420 });
421
422 log::info!("BLE scanning started");
423 Ok(())
424 }
425
426 async fn stop_scan(&self) -> Result<()> {
427 log::info!("BLE scanning stopped");
430 Ok(())
431 }
432
433 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
434 let ble_config = self.config.read().await;
435 let ble_config = ble_config
436 .as_ref()
437 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
438
439 let adv = self.build_advertisement(ble_config);
440
441 let handle =
442 self.adapter.advertise(adv).await.map_err(|e| {
443 BleError::PlatformError(format!("Failed to start advertising: {}", e))
444 })?;
445
446 *self.adv_handle.write().await = Some(handle);
447
448 log::info!(
449 "BLE advertising started for HIVE-{:08X}",
450 ble_config.node_id.as_u32()
451 );
452 Ok(())
453 }
454
455 async fn stop_advertising(&self) -> Result<()> {
456 *self.adv_handle.write().await = None;
458 log::info!("BLE advertising stopped");
459 Ok(())
460 }
461
462 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
463 if let Ok(mut cb) = self.discovery_callback.try_write() {
466 *cb = callback;
467 }
468 }
469
470 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
471 let address = self
473 .get_node_address(peer_id)
474 .await
475 .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
476
477 let device = self
478 .adapter
479 .device(address)
480 .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
481
482 device
484 .connect()
485 .await
486 .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
487
488 let connection = BluerConnection::new(peer_id.clone(), device).await?;
490
491 {
493 let mut state = self.state.write().await;
494 state
495 .connections
496 .insert(peer_id.clone(), connection.clone());
497 }
498
499 if let Some(ref cb) = *self.connection_callback.read().await {
501 cb(
502 peer_id.clone(),
503 ConnectionEvent::Connected {
504 mtu: connection.mtu(),
505 phy: connection.phy(),
506 },
507 );
508 }
509
510 log::info!("Connected to peer {}", peer_id);
511 Ok(Box::new(connection))
512 }
513
514 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
515 let connection = {
516 let mut state = self.state.write().await;
517 state.connections.remove(peer_id)
518 };
519
520 if let Some(conn) = connection {
521 conn.disconnect().await?;
522
523 if let Some(ref cb) = *self.connection_callback.read().await {
525 cb(
526 peer_id.clone(),
527 ConnectionEvent::Disconnected {
528 reason: DisconnectReason::LocalRequest,
529 },
530 );
531 }
532
533 log::info!("Disconnected from peer {}", peer_id);
534 }
535
536 Ok(())
537 }
538
539 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
540 if let Ok(state) = self.state.try_read() {
542 state
543 .connections
544 .get(peer_id)
545 .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
546 } else {
547 None
548 }
549 }
550
551 fn peer_count(&self) -> usize {
552 if let Ok(state) = self.state.try_read() {
553 state.connections.len()
554 } else {
555 0
556 }
557 }
558
559 fn connected_peers(&self) -> Vec<NodeId> {
560 if let Ok(state) = self.state.try_read() {
561 state.connections.keys().cloned().collect()
562 } else {
563 Vec::new()
564 }
565 }
566
567 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
568 if let Ok(mut cb) = self.connection_callback.try_write() {
569 *cb = callback;
570 }
571 }
572
573 async fn register_gatt_service(&self) -> Result<()> {
574 let config = self.config.read().await;
576 let node_id = config
577 .as_ref()
578 .map(|c| c.node_id)
579 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
580
581 self.gatt_state.init(node_id).await;
583
584 let state = self.gatt_state.clone();
586 let state_read_node = state.clone();
587 let state_read_sync = state.clone();
588 let state_read_status = state.clone();
589 let state_write_sync = state.clone();
590 let state_write_cmd = state.clone();
591
592 let app = Application {
594 services: vec![Service {
595 uuid: HIVE_SERVICE_UUID,
596 primary: true,
597 characteristics: vec![
598 Characteristic {
600 uuid: HiveCharacteristicUuids::node_info(),
601 read: Some(CharacteristicRead {
602 read: true,
603 fun: Box::new(move |req| {
604 let state = state_read_node.clone();
605 Box::pin(async move {
606 let data = state.node_info.lock().await;
607 log::debug!(
608 "GATT read node_info from {:?}: {} bytes",
609 req.device_address,
610 data.len()
611 );
612 Ok(data.clone())
613 })
614 }),
615 ..Default::default()
616 }),
617 ..Default::default()
618 },
619 Characteristic {
621 uuid: HiveCharacteristicUuids::sync_state(),
622 read: Some(CharacteristicRead {
623 read: true,
624 fun: Box::new(move |req| {
625 let state = state_read_sync.clone();
626 Box::pin(async move {
627 let data = state.sync_state.lock().await;
628 log::debug!(
629 "GATT read sync_state from {:?}: {} bytes",
630 req.device_address,
631 data.len()
632 );
633 Ok(data.clone())
634 })
635 }),
636 ..Default::default()
637 }),
638 notify: Some(CharacteristicNotify {
639 notify: true,
640 method: CharacteristicNotifyMethod::Io,
641 ..Default::default()
642 }),
643 ..Default::default()
644 },
645 Characteristic {
647 uuid: HiveCharacteristicUuids::sync_data(),
648 write: Some(CharacteristicWrite {
649 write: true,
650 method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
651 let state = state_write_sync.clone();
652 Box::pin(async move {
653 log::debug!(
654 "GATT write sync_data from {:?}: {} bytes",
655 req.device_address,
656 data.len()
657 );
658 if let Some(ref cb) = *state.sync_data_callback.lock().await {
660 cb(data);
661 }
662 Ok(())
663 })
664 })),
665 ..Default::default()
666 }),
667 notify: Some(CharacteristicNotify {
668 indicate: true,
669 method: CharacteristicNotifyMethod::Io,
670 ..Default::default()
671 }),
672 ..Default::default()
673 },
674 Characteristic {
676 uuid: HiveCharacteristicUuids::command(),
677 write: Some(CharacteristicWrite {
678 write: true,
679 write_without_response: true,
680 method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
681 let state = state_write_cmd.clone();
682 Box::pin(async move {
683 log::debug!(
684 "GATT write command from {:?}: {} bytes",
685 req.device_address,
686 data.len()
687 );
688 if let Some(ref cb) = *state.command_callback.lock().await {
690 cb(data);
691 }
692 Ok(())
693 })
694 })),
695 ..Default::default()
696 }),
697 ..Default::default()
698 },
699 Characteristic {
701 uuid: HiveCharacteristicUuids::status(),
702 read: Some(CharacteristicRead {
703 read: true,
704 fun: Box::new(move |req| {
705 let state = state_read_status.clone();
706 Box::pin(async move {
707 let data = state.status.lock().await;
708 log::debug!(
709 "GATT read status from {:?}: {} bytes",
710 req.device_address,
711 data.len()
712 );
713 Ok(data.clone())
714 })
715 }),
716 ..Default::default()
717 }),
718 notify: Some(CharacteristicNotify {
719 notify: true,
720 method: CharacteristicNotifyMethod::Io,
721 ..Default::default()
722 }),
723 ..Default::default()
724 },
725 ],
726 ..Default::default()
727 }],
728 ..Default::default()
729 };
730
731 let handle = self
733 .adapter
734 .serve_gatt_application(app)
735 .await
736 .map_err(|e| BleError::GattError(format!("Failed to register GATT service: {}", e)))?;
737
738 *self.gatt_handle.write().await = Some(handle);
740
741 log::info!(
742 "GATT service registered for node {:08X} with 5 characteristics",
743 node_id.as_u32()
744 );
745 Ok(())
746 }
747
748 async fn unregister_gatt_service(&self) -> Result<()> {
749 let handle = self.gatt_handle.write().await.take();
751 if handle.is_some() {
752 log::info!("GATT service unregistered");
753 }
754 Ok(())
755 }
756
757 fn supports_coded_phy(&self) -> bool {
758 true
762 }
763
764 fn supports_extended_advertising(&self) -> bool {
765 true
767 }
768
769 fn max_mtu(&self) -> u16 {
770 517
772 }
773
774 fn max_connections(&self) -> u8 {
775 7
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 #[tokio::test]
786 #[ignore = "Requires BlueZ and Bluetooth hardware"]
787 async fn test_adapter_creation() {
788 use super::*;
789
790 let adapter = BluerAdapter::new().await;
791 assert!(
792 adapter.is_ok(),
793 "Failed to create adapter: {:?}",
794 adapter.err()
795 );
796 }
797
798 #[tokio::test]
799 #[ignore = "Requires BlueZ and Bluetooth hardware"]
800 async fn test_adapter_init() {
801 use super::*;
802 use crate::BleConfig;
803
804 let mut adapter = BluerAdapter::new().await.unwrap();
805 let config = BleConfig::new(NodeId::new(0x12345678));
806 let result = adapter.init(&config).await;
807 assert!(result.is_ok());
808 }
809}