hive_btle/platform/linux/
adapter.rs1use async_trait::async_trait;
4use bluer::{
5 adv::{Advertisement, AdvertisementHandle},
6 Adapter, Address, Session,
7};
8use std::collections::HashMap;
9use tokio::sync::{broadcast, RwLock};
10
11use crate::config::{BleConfig, DiscoveryConfig};
12use crate::error::{BleError, Result};
13use crate::platform::{
14 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
15 DiscoveryCallback,
16};
17use crate::transport::BleConnection;
18use crate::{NodeId, HIVE_SERVICE_UUID};
19
20use super::BluerConnection;
21
22struct AdapterState {
24 connections: HashMap<NodeId, BluerConnection>,
26 address_to_node: HashMap<Address, NodeId>,
28 node_to_address: HashMap<NodeId, Address>,
30 #[allow(dead_code)]
33 discovered: HashMap<Address, DiscoveredDevice>,
34}
35
36impl Default for AdapterState {
37 fn default() -> Self {
38 Self {
39 connections: HashMap::new(),
40 address_to_node: HashMap::new(),
41 node_to_address: HashMap::new(),
42 discovered: HashMap::new(),
43 }
44 }
45}
46
47pub struct BluerAdapter {
52 #[allow(dead_code)]
54 session: Session,
55 adapter: Adapter,
57 cached_address: Option<String>,
59 cached_powered: std::sync::atomic::AtomicBool,
61 config: RwLock<Option<BleConfig>>,
63 state: RwLock<AdapterState>,
65 adv_handle: RwLock<Option<AdvertisementHandle>>,
67 discovery_callback: RwLock<Option<DiscoveryCallback>>,
69 connection_callback: RwLock<Option<ConnectionCallback>>,
71 shutdown_tx: broadcast::Sender<()>,
73}
74
75impl BluerAdapter {
76 pub async fn new() -> Result<Self> {
80 let session = Session::new().await.map_err(|e| {
81 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
82 })?;
83
84 let adapter = session
85 .default_adapter()
86 .await
87 .map_err(|_| BleError::AdapterNotAvailable)?;
88
89 let powered = adapter.is_powered().await.map_err(|e| {
91 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
92 })?;
93
94 if !powered {
95 adapter.set_powered(true).await.map_err(|e| {
97 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
98 })?;
99 }
100
101 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
103
104 let (shutdown_tx, _) = broadcast::channel(1);
105
106 Ok(Self {
107 session,
108 adapter,
109 cached_address,
110 cached_powered: std::sync::atomic::AtomicBool::new(true), config: RwLock::new(None),
112 state: RwLock::new(AdapterState::default()),
113 adv_handle: RwLock::new(None),
114 discovery_callback: RwLock::new(None),
115 connection_callback: RwLock::new(None),
116 shutdown_tx,
117 })
118 }
119
120 pub async fn with_adapter_name(name: &str) -> Result<Self> {
122 let session = Session::new().await.map_err(|e| {
123 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
124 })?;
125
126 let adapter = session.adapter(name).map_err(|e| {
127 BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
128 })?;
129
130 let powered = adapter.is_powered().await.map_err(|e| {
131 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
132 })?;
133
134 if !powered {
135 adapter.set_powered(true).await.map_err(|e| {
136 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
137 })?;
138 }
139
140 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
142
143 let (shutdown_tx, _) = broadcast::channel(1);
144
145 Ok(Self {
146 session,
147 adapter,
148 cached_address,
149 cached_powered: std::sync::atomic::AtomicBool::new(true),
150 config: RwLock::new(None),
151 state: RwLock::new(AdapterState::default()),
152 adv_handle: RwLock::new(None),
153 discovery_callback: RwLock::new(None),
154 connection_callback: RwLock::new(None),
155 shutdown_tx,
156 })
157 }
158
159 pub fn adapter_name(&self) -> &str {
161 self.adapter.name()
162 }
163
164 fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
166 let mut adv = Advertisement {
167 advertisement_type: bluer::adv::Type::Peripheral,
168 service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
169 local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
170 discoverable: Some(true),
171 ..Default::default()
172 };
173
174 adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
176
177 adv
178 }
179
180 #[allow(dead_code)]
183 fn parse_hive_beacon(
184 &self,
185 address: Address,
186 name: Option<String>,
187 rssi: i16,
188 service_data: &HashMap<bluer::Uuid, Vec<u8>>,
189 _manufacturer_data: &HashMap<u16, Vec<u8>>,
190 ) -> Option<DiscoveredDevice> {
191 let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
193
194 let node_id = name.as_ref().and_then(|n| {
196 if n.starts_with("HIVE-") {
197 NodeId::parse(&n[5..])
198 } else {
199 None
200 }
201 });
202
203 Some(DiscoveredDevice {
204 address: address.to_string(),
205 name,
206 rssi: rssi as i8,
207 is_hive_node: is_hive || node_id.is_some(),
208 node_id,
209 adv_data: Vec::new(), })
211 }
212
213 pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
215 let mut state = self.state.write().await;
216 state.address_to_node.insert(address, node_id.clone());
217 state.node_to_address.insert(node_id, address);
218 }
219
220 pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
222 let state = self.state.read().await;
223 state.node_to_address.get(node_id).copied()
224 }
225}
226
227#[async_trait]
228impl BleAdapter for BluerAdapter {
229 async fn init(&mut self, config: &BleConfig) -> Result<()> {
230 *self.config.write().await = Some(config.clone());
231 log::info!(
232 "BluerAdapter initialized for node {:08X}",
233 config.node_id.as_u32()
234 );
235 Ok(())
236 }
237
238 async fn start(&self) -> Result<()> {
239 let config = self.config.read().await;
240 let config = config
241 .as_ref()
242 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
243
244 self.start_advertising(&config.discovery).await?;
246
247 self.start_scan(&config.discovery).await?;
249
250 log::info!("BluerAdapter started");
251 Ok(())
252 }
253
254 async fn stop(&self) -> Result<()> {
255 self.stop_advertising().await?;
257
258 self.stop_scan().await?;
260
261 let _ = self.shutdown_tx.send(());
263
264 log::info!("BluerAdapter stopped");
265 Ok(())
266 }
267
268 fn is_powered(&self) -> bool {
269 self.cached_powered
270 .load(std::sync::atomic::Ordering::Relaxed)
271 }
272
273 fn address(&self) -> Option<String> {
274 self.cached_address.clone()
275 }
276
277 async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
278 use bluer::DiscoveryFilter;
279 use bluer::DiscoveryTransport;
280
281 let filter = DiscoveryFilter {
282 transport: DiscoveryTransport::Le,
283 duplicate_data: !config.filter_duplicates,
284 ..Default::default()
285 };
286
287 self.adapter
288 .set_discovery_filter(filter)
289 .await
290 .map_err(|e| {
291 BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
292 })?;
293
294 let discover =
296 self.adapter.discover_devices().await.map_err(|e| {
297 BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
298 })?;
299
300 let callback = self.discovery_callback.read().await.clone();
302 let adapter = self.adapter.clone();
303 let mut shutdown_rx = self.shutdown_tx.subscribe();
304
305 tokio::spawn(async move {
306 use tokio_stream::StreamExt;
307 let mut discover = std::pin::pin!(discover);
308
309 loop {
310 tokio::select! {
311 _ = shutdown_rx.recv() => {
312 log::debug!("Discovery task shutting down");
313 break;
314 }
315 event = discover.next() => {
316 match event {
317 Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
318 if let Ok(device) = adapter.device(addr) {
319 let name = device.name().await.ok().flatten();
321 let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
322
323 let service_uuids = device.uuids().await.ok().flatten().unwrap_or_default();
325
326 let has_hive_service = service_uuids.contains(&HIVE_SERVICE_UUID);
328
329 let name_indicates_hive = name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false);
331
332 let is_hive_node = has_hive_service || name_indicates_hive;
334
335 let discovered = DiscoveredDevice {
336 address: addr.to_string(),
337 name: name.clone(),
338 rssi: rssi as i8,
339 is_hive_node,
340 node_id: name.and_then(|n| {
341 n.strip_prefix("HIVE-").and_then(NodeId::parse)
342 }),
343 adv_data: Vec::new(),
344 };
345
346 log::debug!(
347 "Discovered device: {} (HIVE: {}, service_uuid: {}, name: {})",
348 discovered.address, is_hive_node, has_hive_service, name_indicates_hive
349 );
350
351 if let Some(ref cb) = callback {
352 cb(discovered);
353 }
354 }
355 }
356 Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
357 log::debug!("Device removed: {}", addr);
358 }
359 None => break,
360 _ => {}
361 }
362 }
363 }
364 }
365 });
366
367 log::info!("BLE scanning started");
368 Ok(())
369 }
370
371 async fn stop_scan(&self) -> Result<()> {
372 log::info!("BLE scanning stopped");
375 Ok(())
376 }
377
378 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
379 let ble_config = self.config.read().await;
380 let ble_config = ble_config
381 .as_ref()
382 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
383
384 let adv = self.build_advertisement(ble_config);
385
386 let handle =
387 self.adapter.advertise(adv).await.map_err(|e| {
388 BleError::PlatformError(format!("Failed to start advertising: {}", e))
389 })?;
390
391 *self.adv_handle.write().await = Some(handle);
392
393 log::info!(
394 "BLE advertising started for HIVE-{:08X}",
395 ble_config.node_id.as_u32()
396 );
397 Ok(())
398 }
399
400 async fn stop_advertising(&self) -> Result<()> {
401 *self.adv_handle.write().await = None;
403 log::info!("BLE advertising stopped");
404 Ok(())
405 }
406
407 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
408 if let Ok(mut cb) = self.discovery_callback.try_write() {
411 *cb = callback;
412 }
413 }
414
415 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
416 let address = self
418 .get_node_address(peer_id)
419 .await
420 .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
421
422 let device = self
423 .adapter
424 .device(address)
425 .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
426
427 device
429 .connect()
430 .await
431 .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
432
433 let connection = BluerConnection::new(peer_id.clone(), device).await?;
435
436 {
438 let mut state = self.state.write().await;
439 state
440 .connections
441 .insert(peer_id.clone(), connection.clone());
442 }
443
444 if let Some(ref cb) = *self.connection_callback.read().await {
446 cb(
447 peer_id.clone(),
448 ConnectionEvent::Connected {
449 mtu: connection.mtu(),
450 phy: connection.phy(),
451 },
452 );
453 }
454
455 log::info!("Connected to peer {}", peer_id);
456 Ok(Box::new(connection))
457 }
458
459 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
460 let connection = {
461 let mut state = self.state.write().await;
462 state.connections.remove(peer_id)
463 };
464
465 if let Some(conn) = connection {
466 conn.disconnect().await?;
467
468 if let Some(ref cb) = *self.connection_callback.read().await {
470 cb(
471 peer_id.clone(),
472 ConnectionEvent::Disconnected {
473 reason: DisconnectReason::LocalRequest,
474 },
475 );
476 }
477
478 log::info!("Disconnected from peer {}", peer_id);
479 }
480
481 Ok(())
482 }
483
484 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
485 if let Ok(state) = self.state.try_read() {
487 state
488 .connections
489 .get(peer_id)
490 .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
491 } else {
492 None
493 }
494 }
495
496 fn peer_count(&self) -> usize {
497 if let Ok(state) = self.state.try_read() {
498 state.connections.len()
499 } else {
500 0
501 }
502 }
503
504 fn connected_peers(&self) -> Vec<NodeId> {
505 if let Ok(state) = self.state.try_read() {
506 state.connections.keys().cloned().collect()
507 } else {
508 Vec::new()
509 }
510 }
511
512 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
513 if let Ok(mut cb) = self.connection_callback.try_write() {
514 *cb = callback;
515 }
516 }
517
518 async fn register_gatt_service(&self) -> Result<()> {
519 log::warn!("GATT service registration not yet implemented");
522 Ok(())
523 }
524
525 async fn unregister_gatt_service(&self) -> Result<()> {
526 Ok(())
528 }
529
530 fn supports_coded_phy(&self) -> bool {
531 true
535 }
536
537 fn supports_extended_advertising(&self) -> bool {
538 true
540 }
541
542 fn max_mtu(&self) -> u16 {
543 517
545 }
546
547 fn max_connections(&self) -> u8 {
548 7
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 #[tokio::test]
559 #[ignore = "Requires BlueZ and Bluetooth hardware"]
560 async fn test_adapter_creation() {
561 use super::*;
562
563 let adapter = BluerAdapter::new().await;
564 assert!(
565 adapter.is_ok(),
566 "Failed to create adapter: {:?}",
567 adapter.err()
568 );
569 }
570
571 #[tokio::test]
572 #[ignore = "Requires BlueZ and Bluetooth hardware"]
573 async fn test_adapter_init() {
574 use super::*;
575 use crate::BleConfig;
576
577 let mut adapter = BluerAdapter::new().await.unwrap();
578 let config = BleConfig::new(NodeId::new(0x12345678));
579 let result = adapter.init(&config).await;
580 assert!(result.is_ok());
581 }
582}