hive_btle/platform/linux/
adapter.rs1use async_trait::async_trait;
4use bluer::{
5 adv::{Advertisement, AdvertisementHandle},
6 Adapter, Address, Session,
7};
8use std::collections::HashMap;
9use tokio::sync::{broadcast, RwLock};
10
11use crate::config::{BleConfig, DiscoveryConfig};
12use crate::error::{BleError, Result};
13use crate::platform::{
14 BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
15 DiscoveryCallback,
16};
17use crate::transport::BleConnection;
18use crate::{NodeId, HIVE_SERVICE_UUID};
19
20use super::BluerConnection;
21
22struct AdapterState {
24 connections: HashMap<NodeId, BluerConnection>,
26 address_to_node: HashMap<Address, NodeId>,
28 node_to_address: HashMap<NodeId, Address>,
30 #[allow(dead_code)]
33 discovered: HashMap<Address, DiscoveredDevice>,
34}
35
36impl Default for AdapterState {
37 fn default() -> Self {
38 Self {
39 connections: HashMap::new(),
40 address_to_node: HashMap::new(),
41 node_to_address: HashMap::new(),
42 discovered: HashMap::new(),
43 }
44 }
45}
46
47pub struct BluerAdapter {
52 #[allow(dead_code)]
54 session: Session,
55 adapter: Adapter,
57 cached_address: Option<String>,
59 cached_powered: std::sync::atomic::AtomicBool,
61 config: RwLock<Option<BleConfig>>,
63 state: RwLock<AdapterState>,
65 adv_handle: RwLock<Option<AdvertisementHandle>>,
67 discovery_callback: RwLock<Option<DiscoveryCallback>>,
69 connection_callback: RwLock<Option<ConnectionCallback>>,
71 shutdown_tx: broadcast::Sender<()>,
73}
74
75impl BluerAdapter {
76 pub async fn new() -> Result<Self> {
80 let session = Session::new().await.map_err(|e| {
81 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
82 })?;
83
84 let adapter = session
85 .default_adapter()
86 .await
87 .map_err(|_| BleError::AdapterNotAvailable)?;
88
89 let powered = adapter.is_powered().await.map_err(|e| {
91 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
92 })?;
93
94 if !powered {
95 adapter.set_powered(true).await.map_err(|e| {
97 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
98 })?;
99 }
100
101 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
103
104 let (shutdown_tx, _) = broadcast::channel(1);
105
106 Ok(Self {
107 session,
108 adapter,
109 cached_address,
110 cached_powered: std::sync::atomic::AtomicBool::new(true), config: RwLock::new(None),
112 state: RwLock::new(AdapterState::default()),
113 adv_handle: RwLock::new(None),
114 discovery_callback: RwLock::new(None),
115 connection_callback: RwLock::new(None),
116 shutdown_tx,
117 })
118 }
119
120 pub async fn with_adapter_name(name: &str) -> Result<Self> {
122 let session = Session::new().await.map_err(|e| {
123 BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
124 })?;
125
126 let adapter = session.adapter(name).map_err(|e| {
127 BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
128 })?;
129
130 let powered = adapter.is_powered().await.map_err(|e| {
131 BleError::PlatformError(format!("Failed to check adapter power: {}", e))
132 })?;
133
134 if !powered {
135 adapter.set_powered(true).await.map_err(|e| {
136 BleError::PlatformError(format!("Failed to power on adapter: {}", e))
137 })?;
138 }
139
140 let cached_address = adapter.address().await.ok().map(|a| a.to_string());
142
143 let (shutdown_tx, _) = broadcast::channel(1);
144
145 Ok(Self {
146 session,
147 adapter,
148 cached_address,
149 cached_powered: std::sync::atomic::AtomicBool::new(true),
150 config: RwLock::new(None),
151 state: RwLock::new(AdapterState::default()),
152 adv_handle: RwLock::new(None),
153 discovery_callback: RwLock::new(None),
154 connection_callback: RwLock::new(None),
155 shutdown_tx,
156 })
157 }
158
159 pub fn adapter_name(&self) -> &str {
161 self.adapter.name()
162 }
163
164 fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
166 let mut adv = Advertisement {
167 advertisement_type: bluer::adv::Type::Peripheral,
168 service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
169 local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
170 discoverable: Some(true),
171 ..Default::default()
172 };
173
174 adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
176
177 adv
178 }
179
180 #[allow(dead_code)]
183 fn parse_hive_beacon(
184 &self,
185 address: Address,
186 name: Option<String>,
187 rssi: i16,
188 service_data: &HashMap<bluer::Uuid, Vec<u8>>,
189 _manufacturer_data: &HashMap<u16, Vec<u8>>,
190 ) -> Option<DiscoveredDevice> {
191 let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
193
194 let node_id = name.as_ref().and_then(|n| {
196 if n.starts_with("HIVE-") {
197 NodeId::parse(&n[5..])
198 } else {
199 None
200 }
201 });
202
203 Some(DiscoveredDevice {
204 address: address.to_string(),
205 name,
206 rssi: rssi as i8,
207 is_hive_node: is_hive || node_id.is_some(),
208 node_id,
209 adv_data: Vec::new(), })
211 }
212
213 pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
215 let mut state = self.state.write().await;
216 state.address_to_node.insert(address, node_id.clone());
217 state.node_to_address.insert(node_id, address);
218 }
219
220 pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
222 let state = self.state.read().await;
223 state.node_to_address.get(node_id).copied()
224 }
225}
226
227#[async_trait]
228impl BleAdapter for BluerAdapter {
229 async fn init(&mut self, config: &BleConfig) -> Result<()> {
230 *self.config.write().await = Some(config.clone());
231 log::info!(
232 "BluerAdapter initialized for node {:08X}",
233 config.node_id.as_u32()
234 );
235 Ok(())
236 }
237
238 async fn start(&self) -> Result<()> {
239 let config = self.config.read().await;
240 let config = config
241 .as_ref()
242 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
243
244 self.start_advertising(&config.discovery).await?;
246
247 self.start_scan(&config.discovery).await?;
249
250 log::info!("BluerAdapter started");
251 Ok(())
252 }
253
254 async fn stop(&self) -> Result<()> {
255 self.stop_advertising().await?;
257
258 self.stop_scan().await?;
260
261 let _ = self.shutdown_tx.send(());
263
264 log::info!("BluerAdapter stopped");
265 Ok(())
266 }
267
268 fn is_powered(&self) -> bool {
269 self.cached_powered
270 .load(std::sync::atomic::Ordering::Relaxed)
271 }
272
273 fn address(&self) -> Option<String> {
274 self.cached_address.clone()
275 }
276
277 async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
278 use bluer::DiscoveryFilter;
279 use bluer::DiscoveryTransport;
280
281 let filter = DiscoveryFilter {
282 transport: DiscoveryTransport::Le,
283 duplicate_data: !config.filter_duplicates,
284 ..Default::default()
285 };
286
287 self.adapter
288 .set_discovery_filter(filter)
289 .await
290 .map_err(|e| {
291 BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
292 })?;
293
294 let discover =
296 self.adapter.discover_devices().await.map_err(|e| {
297 BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
298 })?;
299
300 let callback = self.discovery_callback.read().await.clone();
302 let adapter = self.adapter.clone();
303 let mut shutdown_rx = self.shutdown_tx.subscribe();
304
305 tokio::spawn(async move {
306 use tokio_stream::StreamExt;
307 let mut discover = std::pin::pin!(discover);
308
309 loop {
310 tokio::select! {
311 _ = shutdown_rx.recv() => {
312 log::debug!("Discovery task shutting down");
313 break;
314 }
315 event = discover.next() => {
316 match event {
317 Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
318 if let Ok(device) = adapter.device(addr) {
319 let name = device.name().await.ok().flatten();
321 let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
322
323 let discovered = DiscoveredDevice {
324 address: addr.to_string(),
325 name: name.clone(),
326 rssi: rssi as i8,
327 is_hive_node: name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false),
328 node_id: name.and_then(|n| {
329 if n.starts_with("HIVE-") {
330 NodeId::parse(&n[5..])
331 } else {
332 None
333 }
334 }),
335 adv_data: Vec::new(),
336 };
337
338 log::debug!("Discovered device: {} (HIVE: {})",
339 discovered.address, discovered.is_hive_node);
340
341 if let Some(ref cb) = callback {
342 cb(discovered);
343 }
344 }
345 }
346 Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
347 log::debug!("Device removed: {}", addr);
348 }
349 None => break,
350 _ => {}
351 }
352 }
353 }
354 }
355 });
356
357 log::info!("BLE scanning started");
358 Ok(())
359 }
360
361 async fn stop_scan(&self) -> Result<()> {
362 log::info!("BLE scanning stopped");
365 Ok(())
366 }
367
368 async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
369 let ble_config = self.config.read().await;
370 let ble_config = ble_config
371 .as_ref()
372 .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
373
374 let adv = self.build_advertisement(ble_config);
375
376 let handle =
377 self.adapter.advertise(adv).await.map_err(|e| {
378 BleError::PlatformError(format!("Failed to start advertising: {}", e))
379 })?;
380
381 *self.adv_handle.write().await = Some(handle);
382
383 log::info!(
384 "BLE advertising started for HIVE-{:08X}",
385 ble_config.node_id.as_u32()
386 );
387 Ok(())
388 }
389
390 async fn stop_advertising(&self) -> Result<()> {
391 *self.adv_handle.write().await = None;
393 log::info!("BLE advertising stopped");
394 Ok(())
395 }
396
397 fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
398 if let Ok(mut cb) = self.discovery_callback.try_write() {
401 *cb = callback;
402 }
403 }
404
405 async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
406 let address = self
408 .get_node_address(peer_id)
409 .await
410 .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
411
412 let device = self
413 .adapter
414 .device(address)
415 .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
416
417 device
419 .connect()
420 .await
421 .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
422
423 let connection = BluerConnection::new(peer_id.clone(), device).await?;
425
426 {
428 let mut state = self.state.write().await;
429 state
430 .connections
431 .insert(peer_id.clone(), connection.clone());
432 }
433
434 if let Some(ref cb) = *self.connection_callback.read().await {
436 cb(
437 peer_id.clone(),
438 ConnectionEvent::Connected {
439 mtu: connection.mtu(),
440 phy: connection.phy(),
441 },
442 );
443 }
444
445 log::info!("Connected to peer {}", peer_id);
446 Ok(Box::new(connection))
447 }
448
449 async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
450 let connection = {
451 let mut state = self.state.write().await;
452 state.connections.remove(peer_id)
453 };
454
455 if let Some(conn) = connection {
456 conn.disconnect().await?;
457
458 if let Some(ref cb) = *self.connection_callback.read().await {
460 cb(
461 peer_id.clone(),
462 ConnectionEvent::Disconnected {
463 reason: DisconnectReason::LocalRequest,
464 },
465 );
466 }
467
468 log::info!("Disconnected from peer {}", peer_id);
469 }
470
471 Ok(())
472 }
473
474 fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
475 if let Ok(state) = self.state.try_read() {
477 state
478 .connections
479 .get(peer_id)
480 .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
481 } else {
482 None
483 }
484 }
485
486 fn peer_count(&self) -> usize {
487 if let Ok(state) = self.state.try_read() {
488 state.connections.len()
489 } else {
490 0
491 }
492 }
493
494 fn connected_peers(&self) -> Vec<NodeId> {
495 if let Ok(state) = self.state.try_read() {
496 state.connections.keys().cloned().collect()
497 } else {
498 Vec::new()
499 }
500 }
501
502 fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
503 if let Ok(mut cb) = self.connection_callback.try_write() {
504 *cb = callback;
505 }
506 }
507
508 async fn register_gatt_service(&self) -> Result<()> {
509 log::warn!("GATT service registration not yet implemented");
512 Ok(())
513 }
514
515 async fn unregister_gatt_service(&self) -> Result<()> {
516 Ok(())
518 }
519
520 fn supports_coded_phy(&self) -> bool {
521 true
525 }
526
527 fn supports_extended_advertising(&self) -> bool {
528 true
530 }
531
532 fn max_mtu(&self) -> u16 {
533 517
535 }
536
537 fn max_connections(&self) -> u8 {
538 7
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 #[tokio::test]
549 #[ignore = "Requires BlueZ and Bluetooth hardware"]
550 async fn test_adapter_creation() {
551 use super::*;
552
553 let adapter = BluerAdapter::new().await;
554 assert!(
555 adapter.is_ok(),
556 "Failed to create adapter: {:?}",
557 adapter.err()
558 );
559 }
560
561 #[tokio::test]
562 #[ignore = "Requires BlueZ and Bluetooth hardware"]
563 async fn test_adapter_init() {
564 use super::*;
565 use crate::BleConfig;
566
567 let mut adapter = BluerAdapter::new().await.unwrap();
568 let config = BleConfig::new(NodeId::new(0x12345678));
569 let result = adapter.init(&config).await;
570 assert!(result.is_ok());
571 }
572}