hive_btle/platform/linux/
adapter.rs

1//! BlueZ adapter implementation using the `bluer` crate
2
3use async_trait::async_trait;
4use bluer::{
5    adv::{Advertisement, AdvertisementHandle},
6    Adapter, Address, Session,
7};
8use std::collections::HashMap;
9use tokio::sync::{broadcast, RwLock};
10
11use crate::config::{BleConfig, DiscoveryConfig};
12use crate::error::{BleError, Result};
13use crate::platform::{
14    BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
15    DiscoveryCallback,
16};
17use crate::transport::BleConnection;
18use crate::{NodeId, HIVE_SERVICE_UUID};
19
20use super::BluerConnection;
21
22/// Internal state for the adapter
23struct AdapterState {
24    /// Active connections by node ID
25    connections: HashMap<NodeId, BluerConnection>,
26    /// Device address to node ID mapping
27    address_to_node: HashMap<Address, NodeId>,
28    /// Node ID to device address mapping
29    node_to_address: HashMap<NodeId, Address>,
30    /// Discovered devices (address -> device info)
31    /// TODO: Wire up device tracking for connection management
32    #[allow(dead_code)]
33    discovered: HashMap<Address, DiscoveredDevice>,
34}
35
36impl Default for AdapterState {
37    fn default() -> Self {
38        Self {
39            connections: HashMap::new(),
40            address_to_node: HashMap::new(),
41            node_to_address: HashMap::new(),
42            discovered: HashMap::new(),
43        }
44    }
45}
46
47/// Linux/BlueZ BLE adapter
48///
49/// Implements the `BleAdapter` trait using the `bluer` crate for
50/// BlueZ D-Bus communication.
51pub struct BluerAdapter {
52    /// BlueZ session (kept alive for adapter lifetime)
53    #[allow(dead_code)]
54    session: Session,
55    /// BlueZ adapter
56    adapter: Adapter,
57    /// Cached adapter address (queried once at creation)
58    cached_address: Option<String>,
59    /// Cached power state (updated on power changes)
60    cached_powered: std::sync::atomic::AtomicBool,
61    /// Configuration
62    config: RwLock<Option<BleConfig>>,
63    /// Internal state
64    state: RwLock<AdapterState>,
65    /// Advertisement handle (keeps advertisement alive)
66    adv_handle: RwLock<Option<AdvertisementHandle>>,
67    /// Discovery callback
68    discovery_callback: RwLock<Option<DiscoveryCallback>>,
69    /// Connection callback
70    connection_callback: RwLock<Option<ConnectionCallback>>,
71    /// Shutdown signal
72    shutdown_tx: broadcast::Sender<()>,
73}
74
75impl BluerAdapter {
76    /// Create a new BlueZ adapter
77    ///
78    /// This connects to the system D-Bus and gets the default Bluetooth adapter.
79    pub async fn new() -> Result<Self> {
80        let session = Session::new().await.map_err(|e| {
81            BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
82        })?;
83
84        let adapter = session
85            .default_adapter()
86            .await
87            .map_err(|_| BleError::AdapterNotAvailable)?;
88
89        // Check if adapter is powered
90        let powered = adapter.is_powered().await.map_err(|e| {
91            BleError::PlatformError(format!("Failed to check adapter power: {}", e))
92        })?;
93
94        if !powered {
95            // Try to power on the adapter
96            adapter.set_powered(true).await.map_err(|e| {
97                BleError::PlatformError(format!("Failed to power on adapter: {}", e))
98            })?;
99        }
100
101        // Cache the adapter address
102        let cached_address = adapter.address().await.ok().map(|a| a.to_string());
103
104        let (shutdown_tx, _) = broadcast::channel(1);
105
106        Ok(Self {
107            session,
108            adapter,
109            cached_address,
110            cached_powered: std::sync::atomic::AtomicBool::new(true), // We ensured it's powered above
111            config: RwLock::new(None),
112            state: RwLock::new(AdapterState::default()),
113            adv_handle: RwLock::new(None),
114            discovery_callback: RwLock::new(None),
115            connection_callback: RwLock::new(None),
116            shutdown_tx,
117        })
118    }
119
120    /// Create adapter with a specific adapter name (e.g., "hci0")
121    pub async fn with_adapter_name(name: &str) -> Result<Self> {
122        let session = Session::new().await.map_err(|e| {
123            BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
124        })?;
125
126        let adapter = session.adapter(name).map_err(|e| {
127            BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
128        })?;
129
130        let powered = adapter.is_powered().await.map_err(|e| {
131            BleError::PlatformError(format!("Failed to check adapter power: {}", e))
132        })?;
133
134        if !powered {
135            adapter.set_powered(true).await.map_err(|e| {
136                BleError::PlatformError(format!("Failed to power on adapter: {}", e))
137            })?;
138        }
139
140        // Cache the adapter address
141        let cached_address = adapter.address().await.ok().map(|a| a.to_string());
142
143        let (shutdown_tx, _) = broadcast::channel(1);
144
145        Ok(Self {
146            session,
147            adapter,
148            cached_address,
149            cached_powered: std::sync::atomic::AtomicBool::new(true),
150            config: RwLock::new(None),
151            state: RwLock::new(AdapterState::default()),
152            adv_handle: RwLock::new(None),
153            discovery_callback: RwLock::new(None),
154            connection_callback: RwLock::new(None),
155            shutdown_tx,
156        })
157    }
158
159    /// Get the adapter name (e.g., "hci0")
160    pub fn adapter_name(&self) -> &str {
161        self.adapter.name()
162    }
163
164    /// Build HIVE advertisement
165    fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
166        let mut adv = Advertisement {
167            advertisement_type: bluer::adv::Type::Peripheral,
168            service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
169            local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
170            discoverable: Some(true),
171            ..Default::default()
172        };
173
174        // Set TX power if supported
175        adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
176
177        adv
178    }
179
180    /// Parse HIVE beacon from advertising data
181    /// TODO: Use this method instead of inline parsing in discovery loop
182    #[allow(dead_code)]
183    fn parse_hive_beacon(
184        &self,
185        address: Address,
186        name: Option<String>,
187        rssi: i16,
188        service_data: &HashMap<bluer::Uuid, Vec<u8>>,
189        _manufacturer_data: &HashMap<u16, Vec<u8>>,
190    ) -> Option<DiscoveredDevice> {
191        // Check if this is a HIVE node by looking for our service UUID
192        let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
193
194        // Try to extract node ID from name (HIVE-XXXXXXXX format)
195        let node_id = name.as_ref().and_then(|n| {
196            if n.starts_with("HIVE-") {
197                NodeId::parse(&n[5..])
198            } else {
199                None
200            }
201        });
202
203        Some(DiscoveredDevice {
204            address: address.to_string(),
205            name,
206            rssi: rssi as i8,
207            is_hive_node: is_hive || node_id.is_some(),
208            node_id,
209            adv_data: Vec::new(), // TODO: serialize full adv data
210        })
211    }
212
213    /// Register node ID to address mapping
214    pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
215        let mut state = self.state.write().await;
216        state.address_to_node.insert(address, node_id.clone());
217        state.node_to_address.insert(node_id, address);
218    }
219
220    /// Get address for a node ID
221    pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
222        let state = self.state.read().await;
223        state.node_to_address.get(node_id).copied()
224    }
225}
226
227#[async_trait]
228impl BleAdapter for BluerAdapter {
229    async fn init(&mut self, config: &BleConfig) -> Result<()> {
230        *self.config.write().await = Some(config.clone());
231        log::info!(
232            "BluerAdapter initialized for node {:08X}",
233            config.node_id.as_u32()
234        );
235        Ok(())
236    }
237
238    async fn start(&self) -> Result<()> {
239        let config = self.config.read().await;
240        let config = config
241            .as_ref()
242            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
243
244        // Start advertising
245        self.start_advertising(&config.discovery).await?;
246
247        // Start scanning
248        self.start_scan(&config.discovery).await?;
249
250        log::info!("BluerAdapter started");
251        Ok(())
252    }
253
254    async fn stop(&self) -> Result<()> {
255        // Stop advertising
256        self.stop_advertising().await?;
257
258        // Stop scanning
259        self.stop_scan().await?;
260
261        // Signal shutdown to background tasks
262        let _ = self.shutdown_tx.send(());
263
264        log::info!("BluerAdapter stopped");
265        Ok(())
266    }
267
268    fn is_powered(&self) -> bool {
269        self.cached_powered
270            .load(std::sync::atomic::Ordering::Relaxed)
271    }
272
273    fn address(&self) -> Option<String> {
274        self.cached_address.clone()
275    }
276
277    async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
278        use bluer::DiscoveryFilter;
279        use bluer::DiscoveryTransport;
280
281        let filter = DiscoveryFilter {
282            transport: DiscoveryTransport::Le,
283            duplicate_data: !config.filter_duplicates,
284            ..Default::default()
285        };
286
287        self.adapter
288            .set_discovery_filter(filter)
289            .await
290            .map_err(|e| {
291                BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
292            })?;
293
294        // Start discovery
295        let discover =
296            self.adapter.discover_devices().await.map_err(|e| {
297                BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
298            })?;
299
300        // Spawn task to handle discovered devices
301        let callback = self.discovery_callback.read().await.clone();
302        let adapter = self.adapter.clone();
303        let mut shutdown_rx = self.shutdown_tx.subscribe();
304
305        tokio::spawn(async move {
306            use tokio_stream::StreamExt;
307            let mut discover = std::pin::pin!(discover);
308
309            loop {
310                tokio::select! {
311                    _ = shutdown_rx.recv() => {
312                        log::debug!("Discovery task shutting down");
313                        break;
314                    }
315                    event = discover.next() => {
316                        match event {
317                            Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
318                                if let Ok(device) = adapter.device(addr) {
319                                    // Get device properties
320                                    let name = device.name().await.ok().flatten();
321                                    let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
322
323                                    // Get service UUIDs from the device
324                                    let service_uuids = device.uuids().await.ok().flatten().unwrap_or_default();
325
326                                    // Check if HIVE service UUID is present
327                                    let has_hive_service = service_uuids.contains(&HIVE_SERVICE_UUID);
328
329                                    // Check if name indicates HIVE node (fallback)
330                                    let name_indicates_hive = name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false);
331
332                                    // HIVE node detection: prefer service UUID, fallback to name
333                                    let is_hive_node = has_hive_service || name_indicates_hive;
334
335                                    let discovered = DiscoveredDevice {
336                                        address: addr.to_string(),
337                                        name: name.clone(),
338                                        rssi: rssi as i8,
339                                        is_hive_node,
340                                        node_id: name.and_then(|n| {
341                                            n.strip_prefix("HIVE-").and_then(NodeId::parse)
342                                        }),
343                                        adv_data: Vec::new(),
344                                    };
345
346                                    log::debug!(
347                                        "Discovered device: {} (HIVE: {}, service_uuid: {}, name: {})",
348                                        discovered.address, is_hive_node, has_hive_service, name_indicates_hive
349                                    );
350
351                                    if let Some(ref cb) = callback {
352                                        cb(discovered);
353                                    }
354                                }
355                            }
356                            Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
357                                log::debug!("Device removed: {}", addr);
358                            }
359                            None => break,
360                            _ => {}
361                        }
362                    }
363                }
364            }
365        });
366
367        log::info!("BLE scanning started");
368        Ok(())
369    }
370
371    async fn stop_scan(&self) -> Result<()> {
372        // Discovery is stopped when the stream is dropped
373        // The shutdown signal will cause the task to exit
374        log::info!("BLE scanning stopped");
375        Ok(())
376    }
377
378    async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
379        let ble_config = self.config.read().await;
380        let ble_config = ble_config
381            .as_ref()
382            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
383
384        let adv = self.build_advertisement(ble_config);
385
386        let handle =
387            self.adapter.advertise(adv).await.map_err(|e| {
388                BleError::PlatformError(format!("Failed to start advertising: {}", e))
389            })?;
390
391        *self.adv_handle.write().await = Some(handle);
392
393        log::info!(
394            "BLE advertising started for HIVE-{:08X}",
395            ble_config.node_id.as_u32()
396        );
397        Ok(())
398    }
399
400    async fn stop_advertising(&self) -> Result<()> {
401        // Drop the advertisement handle to stop advertising
402        *self.adv_handle.write().await = None;
403        log::info!("BLE advertising stopped");
404        Ok(())
405    }
406
407    fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
408        // Use blocking write since this is a sync method
409        // In practice, this should be called before start()
410        if let Ok(mut cb) = self.discovery_callback.try_write() {
411            *cb = callback;
412        }
413    }
414
415    async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
416        // Look up the address for this node ID
417        let address = self
418            .get_node_address(peer_id)
419            .await
420            .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
421
422        let device = self
423            .adapter
424            .device(address)
425            .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
426
427        // Connect to the device
428        device
429            .connect()
430            .await
431            .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
432
433        // Create connection wrapper
434        let connection = BluerConnection::new(peer_id.clone(), device).await?;
435
436        // Store connection
437        {
438            let mut state = self.state.write().await;
439            state
440                .connections
441                .insert(peer_id.clone(), connection.clone());
442        }
443
444        // Notify callback
445        if let Some(ref cb) = *self.connection_callback.read().await {
446            cb(
447                peer_id.clone(),
448                ConnectionEvent::Connected {
449                    mtu: connection.mtu(),
450                    phy: connection.phy(),
451                },
452            );
453        }
454
455        log::info!("Connected to peer {}", peer_id);
456        Ok(Box::new(connection))
457    }
458
459    async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
460        let connection = {
461            let mut state = self.state.write().await;
462            state.connections.remove(peer_id)
463        };
464
465        if let Some(conn) = connection {
466            conn.disconnect().await?;
467
468            // Notify callback
469            if let Some(ref cb) = *self.connection_callback.read().await {
470                cb(
471                    peer_id.clone(),
472                    ConnectionEvent::Disconnected {
473                        reason: DisconnectReason::LocalRequest,
474                    },
475                );
476            }
477
478            log::info!("Disconnected from peer {}", peer_id);
479        }
480
481        Ok(())
482    }
483
484    fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
485        // Use try_read to avoid blocking
486        if let Ok(state) = self.state.try_read() {
487            state
488                .connections
489                .get(peer_id)
490                .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
491        } else {
492            None
493        }
494    }
495
496    fn peer_count(&self) -> usize {
497        if let Ok(state) = self.state.try_read() {
498            state.connections.len()
499        } else {
500            0
501        }
502    }
503
504    fn connected_peers(&self) -> Vec<NodeId> {
505        if let Ok(state) = self.state.try_read() {
506            state.connections.keys().cloned().collect()
507        } else {
508            Vec::new()
509        }
510    }
511
512    fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
513        if let Ok(mut cb) = self.connection_callback.try_write() {
514            *cb = callback;
515        }
516    }
517
518    async fn register_gatt_service(&self) -> Result<()> {
519        // TODO: Implement GATT server registration
520        // This will be done in #405 (GATT Service Definition)
521        log::warn!("GATT service registration not yet implemented");
522        Ok(())
523    }
524
525    async fn unregister_gatt_service(&self) -> Result<()> {
526        // TODO: Implement GATT server unregistration
527        Ok(())
528    }
529
530    fn supports_coded_phy(&self) -> bool {
531        // Check if LE Coded PHY is supported
532        // This would require checking adapter capabilities
533        // For now, assume BLE 5.0+ adapters support it
534        true
535    }
536
537    fn supports_extended_advertising(&self) -> bool {
538        // Check if extended advertising is supported
539        true
540    }
541
542    fn max_mtu(&self) -> u16 {
543        // BlueZ typically supports up to 517 bytes MTU
544        517
545    }
546
547    fn max_connections(&self) -> u8 {
548        // BlueZ default is 7 connections
549        7
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    // Integration tests require actual Bluetooth hardware
556    // They should be run with --ignored flag on a Linux system
557
558    #[tokio::test]
559    #[ignore = "Requires BlueZ and Bluetooth hardware"]
560    async fn test_adapter_creation() {
561        use super::*;
562
563        let adapter = BluerAdapter::new().await;
564        assert!(
565            adapter.is_ok(),
566            "Failed to create adapter: {:?}",
567            adapter.err()
568        );
569    }
570
571    #[tokio::test]
572    #[ignore = "Requires BlueZ and Bluetooth hardware"]
573    async fn test_adapter_init() {
574        use super::*;
575        use crate::BleConfig;
576
577        let mut adapter = BluerAdapter::new().await.unwrap();
578        let config = BleConfig::new(NodeId::new(0x12345678));
579        let result = adapter.init(&config).await;
580        assert!(result.is_ok());
581    }
582}