hive_btle/platform/linux/
adapter.rs

1//! BlueZ adapter implementation using the `bluer` crate
2
3use async_trait::async_trait;
4use bluer::{
5    adv::{Advertisement, AdvertisementHandle},
6    Adapter, Address, Session,
7};
8use std::collections::HashMap;
9use tokio::sync::{broadcast, RwLock};
10
11use crate::config::{BleConfig, DiscoveryConfig};
12use crate::error::{BleError, Result};
13use crate::platform::{
14    BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
15    DiscoveryCallback,
16};
17use crate::transport::BleConnection;
18use crate::{NodeId, HIVE_SERVICE_UUID};
19
20use super::BluerConnection;
21
22/// Internal state for the adapter
23struct AdapterState {
24    /// Active connections by node ID
25    connections: HashMap<NodeId, BluerConnection>,
26    /// Device address to node ID mapping
27    address_to_node: HashMap<Address, NodeId>,
28    /// Node ID to device address mapping
29    node_to_address: HashMap<NodeId, Address>,
30    /// Discovered devices (address -> device info)
31    /// TODO: Wire up device tracking for connection management
32    #[allow(dead_code)]
33    discovered: HashMap<Address, DiscoveredDevice>,
34}
35
36impl Default for AdapterState {
37    fn default() -> Self {
38        Self {
39            connections: HashMap::new(),
40            address_to_node: HashMap::new(),
41            node_to_address: HashMap::new(),
42            discovered: HashMap::new(),
43        }
44    }
45}
46
47/// Linux/BlueZ BLE adapter
48///
49/// Implements the `BleAdapter` trait using the `bluer` crate for
50/// BlueZ D-Bus communication.
51pub struct BluerAdapter {
52    /// BlueZ session (kept alive for adapter lifetime)
53    #[allow(dead_code)]
54    session: Session,
55    /// BlueZ adapter
56    adapter: Adapter,
57    /// Cached adapter address (queried once at creation)
58    cached_address: Option<String>,
59    /// Cached power state (updated on power changes)
60    cached_powered: std::sync::atomic::AtomicBool,
61    /// Configuration
62    config: RwLock<Option<BleConfig>>,
63    /// Internal state
64    state: RwLock<AdapterState>,
65    /// Advertisement handle (keeps advertisement alive)
66    adv_handle: RwLock<Option<AdvertisementHandle>>,
67    /// Discovery callback
68    discovery_callback: RwLock<Option<DiscoveryCallback>>,
69    /// Connection callback
70    connection_callback: RwLock<Option<ConnectionCallback>>,
71    /// Shutdown signal
72    shutdown_tx: broadcast::Sender<()>,
73}
74
75impl BluerAdapter {
76    /// Create a new BlueZ adapter
77    ///
78    /// This connects to the system D-Bus and gets the default Bluetooth adapter.
79    pub async fn new() -> Result<Self> {
80        let session = Session::new().await.map_err(|e| {
81            BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
82        })?;
83
84        let adapter = session
85            .default_adapter()
86            .await
87            .map_err(|_| BleError::AdapterNotAvailable)?;
88
89        // Check if adapter is powered
90        let powered = adapter.is_powered().await.map_err(|e| {
91            BleError::PlatformError(format!("Failed to check adapter power: {}", e))
92        })?;
93
94        if !powered {
95            // Try to power on the adapter
96            adapter.set_powered(true).await.map_err(|e| {
97                BleError::PlatformError(format!("Failed to power on adapter: {}", e))
98            })?;
99        }
100
101        // Cache the adapter address
102        let cached_address = adapter.address().await.ok().map(|a| a.to_string());
103
104        let (shutdown_tx, _) = broadcast::channel(1);
105
106        Ok(Self {
107            session,
108            adapter,
109            cached_address,
110            cached_powered: std::sync::atomic::AtomicBool::new(true), // We ensured it's powered above
111            config: RwLock::new(None),
112            state: RwLock::new(AdapterState::default()),
113            adv_handle: RwLock::new(None),
114            discovery_callback: RwLock::new(None),
115            connection_callback: RwLock::new(None),
116            shutdown_tx,
117        })
118    }
119
120    /// Create adapter with a specific adapter name (e.g., "hci0")
121    pub async fn with_adapter_name(name: &str) -> Result<Self> {
122        let session = Session::new().await.map_err(|e| {
123            BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
124        })?;
125
126        let adapter = session.adapter(name).map_err(|e| {
127            BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
128        })?;
129
130        let powered = adapter.is_powered().await.map_err(|e| {
131            BleError::PlatformError(format!("Failed to check adapter power: {}", e))
132        })?;
133
134        if !powered {
135            adapter.set_powered(true).await.map_err(|e| {
136                BleError::PlatformError(format!("Failed to power on adapter: {}", e))
137            })?;
138        }
139
140        // Cache the adapter address
141        let cached_address = adapter.address().await.ok().map(|a| a.to_string());
142
143        let (shutdown_tx, _) = broadcast::channel(1);
144
145        Ok(Self {
146            session,
147            adapter,
148            cached_address,
149            cached_powered: std::sync::atomic::AtomicBool::new(true),
150            config: RwLock::new(None),
151            state: RwLock::new(AdapterState::default()),
152            adv_handle: RwLock::new(None),
153            discovery_callback: RwLock::new(None),
154            connection_callback: RwLock::new(None),
155            shutdown_tx,
156        })
157    }
158
159    /// Get the adapter name (e.g., "hci0")
160    pub fn adapter_name(&self) -> &str {
161        self.adapter.name()
162    }
163
164    /// Build HIVE advertisement
165    fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
166        let mut adv = Advertisement {
167            advertisement_type: bluer::adv::Type::Peripheral,
168            service_uuids: vec![HIVE_SERVICE_UUID].into_iter().collect(),
169            local_name: Some(format!("HIVE-{:08X}", config.node_id.as_u32())),
170            discoverable: Some(true),
171            ..Default::default()
172        };
173
174        // Set TX power if supported
175        adv.tx_power = Some(config.discovery.tx_power_dbm as i16);
176
177        adv
178    }
179
180    /// Parse HIVE beacon from advertising data
181    /// TODO: Use this method instead of inline parsing in discovery loop
182    #[allow(dead_code)]
183    fn parse_hive_beacon(
184        &self,
185        address: Address,
186        name: Option<String>,
187        rssi: i16,
188        service_data: &HashMap<bluer::Uuid, Vec<u8>>,
189        _manufacturer_data: &HashMap<u16, Vec<u8>>,
190    ) -> Option<DiscoveredDevice> {
191        // Check if this is a HIVE node by looking for our service UUID
192        let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
193
194        // Try to extract node ID from name (HIVE-XXXXXXXX format)
195        let node_id = name.as_ref().and_then(|n| {
196            if n.starts_with("HIVE-") {
197                NodeId::parse(&n[5..])
198            } else {
199                None
200            }
201        });
202
203        Some(DiscoveredDevice {
204            address: address.to_string(),
205            name,
206            rssi: rssi as i8,
207            is_hive_node: is_hive || node_id.is_some(),
208            node_id,
209            adv_data: Vec::new(), // TODO: serialize full adv data
210        })
211    }
212
213    /// Register node ID to address mapping
214    pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
215        let mut state = self.state.write().await;
216        state.address_to_node.insert(address, node_id.clone());
217        state.node_to_address.insert(node_id, address);
218    }
219
220    /// Get address for a node ID
221    pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
222        let state = self.state.read().await;
223        state.node_to_address.get(node_id).copied()
224    }
225}
226
227#[async_trait]
228impl BleAdapter for BluerAdapter {
229    async fn init(&mut self, config: &BleConfig) -> Result<()> {
230        *self.config.write().await = Some(config.clone());
231        log::info!(
232            "BluerAdapter initialized for node {:08X}",
233            config.node_id.as_u32()
234        );
235        Ok(())
236    }
237
238    async fn start(&self) -> Result<()> {
239        let config = self.config.read().await;
240        let config = config
241            .as_ref()
242            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
243
244        // Start advertising
245        self.start_advertising(&config.discovery).await?;
246
247        // Start scanning
248        self.start_scan(&config.discovery).await?;
249
250        log::info!("BluerAdapter started");
251        Ok(())
252    }
253
254    async fn stop(&self) -> Result<()> {
255        // Stop advertising
256        self.stop_advertising().await?;
257
258        // Stop scanning
259        self.stop_scan().await?;
260
261        // Signal shutdown to background tasks
262        let _ = self.shutdown_tx.send(());
263
264        log::info!("BluerAdapter stopped");
265        Ok(())
266    }
267
268    fn is_powered(&self) -> bool {
269        self.cached_powered
270            .load(std::sync::atomic::Ordering::Relaxed)
271    }
272
273    fn address(&self) -> Option<String> {
274        self.cached_address.clone()
275    }
276
277    async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
278        use bluer::DiscoveryFilter;
279        use bluer::DiscoveryTransport;
280
281        let filter = DiscoveryFilter {
282            transport: DiscoveryTransport::Le,
283            duplicate_data: !config.filter_duplicates,
284            ..Default::default()
285        };
286
287        self.adapter
288            .set_discovery_filter(filter)
289            .await
290            .map_err(|e| {
291                BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
292            })?;
293
294        // Start discovery
295        let discover =
296            self.adapter.discover_devices().await.map_err(|e| {
297                BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
298            })?;
299
300        // Spawn task to handle discovered devices
301        let callback = self.discovery_callback.read().await.clone();
302        let adapter = self.adapter.clone();
303        let mut shutdown_rx = self.shutdown_tx.subscribe();
304
305        tokio::spawn(async move {
306            use tokio_stream::StreamExt;
307            let mut discover = std::pin::pin!(discover);
308
309            loop {
310                tokio::select! {
311                    _ = shutdown_rx.recv() => {
312                        log::debug!("Discovery task shutting down");
313                        break;
314                    }
315                    event = discover.next() => {
316                        match event {
317                            Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
318                                if let Ok(device) = adapter.device(addr) {
319                                    // Get device properties
320                                    let name = device.name().await.ok().flatten();
321                                    let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
322
323                                    let discovered = DiscoveredDevice {
324                                        address: addr.to_string(),
325                                        name: name.clone(),
326                                        rssi: rssi as i8,
327                                        is_hive_node: name.as_ref().map(|n| n.starts_with("HIVE-")).unwrap_or(false),
328                                        node_id: name.and_then(|n| {
329                                            if n.starts_with("HIVE-") {
330                                                NodeId::parse(&n[5..])
331                                            } else {
332                                                None
333                                            }
334                                        }),
335                                        adv_data: Vec::new(),
336                                    };
337
338                                    log::debug!("Discovered device: {} (HIVE: {})",
339                                        discovered.address, discovered.is_hive_node);
340
341                                    if let Some(ref cb) = callback {
342                                        cb(discovered);
343                                    }
344                                }
345                            }
346                            Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
347                                log::debug!("Device removed: {}", addr);
348                            }
349                            None => break,
350                            _ => {}
351                        }
352                    }
353                }
354            }
355        });
356
357        log::info!("BLE scanning started");
358        Ok(())
359    }
360
361    async fn stop_scan(&self) -> Result<()> {
362        // Discovery is stopped when the stream is dropped
363        // The shutdown signal will cause the task to exit
364        log::info!("BLE scanning stopped");
365        Ok(())
366    }
367
368    async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
369        let ble_config = self.config.read().await;
370        let ble_config = ble_config
371            .as_ref()
372            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
373
374        let adv = self.build_advertisement(ble_config);
375
376        let handle =
377            self.adapter.advertise(adv).await.map_err(|e| {
378                BleError::PlatformError(format!("Failed to start advertising: {}", e))
379            })?;
380
381        *self.adv_handle.write().await = Some(handle);
382
383        log::info!(
384            "BLE advertising started for HIVE-{:08X}",
385            ble_config.node_id.as_u32()
386        );
387        Ok(())
388    }
389
390    async fn stop_advertising(&self) -> Result<()> {
391        // Drop the advertisement handle to stop advertising
392        *self.adv_handle.write().await = None;
393        log::info!("BLE advertising stopped");
394        Ok(())
395    }
396
397    fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
398        // Use blocking write since this is a sync method
399        // In practice, this should be called before start()
400        if let Ok(mut cb) = self.discovery_callback.try_write() {
401            *cb = callback;
402        }
403    }
404
405    async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
406        // Look up the address for this node ID
407        let address = self
408            .get_node_address(peer_id)
409            .await
410            .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
411
412        let device = self
413            .adapter
414            .device(address)
415            .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
416
417        // Connect to the device
418        device
419            .connect()
420            .await
421            .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
422
423        // Create connection wrapper
424        let connection = BluerConnection::new(peer_id.clone(), device).await?;
425
426        // Store connection
427        {
428            let mut state = self.state.write().await;
429            state
430                .connections
431                .insert(peer_id.clone(), connection.clone());
432        }
433
434        // Notify callback
435        if let Some(ref cb) = *self.connection_callback.read().await {
436            cb(
437                peer_id.clone(),
438                ConnectionEvent::Connected {
439                    mtu: connection.mtu(),
440                    phy: connection.phy(),
441                },
442            );
443        }
444
445        log::info!("Connected to peer {}", peer_id);
446        Ok(Box::new(connection))
447    }
448
449    async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
450        let connection = {
451            let mut state = self.state.write().await;
452            state.connections.remove(peer_id)
453        };
454
455        if let Some(conn) = connection {
456            conn.disconnect().await?;
457
458            // Notify callback
459            if let Some(ref cb) = *self.connection_callback.read().await {
460                cb(
461                    peer_id.clone(),
462                    ConnectionEvent::Disconnected {
463                        reason: DisconnectReason::LocalRequest,
464                    },
465                );
466            }
467
468            log::info!("Disconnected from peer {}", peer_id);
469        }
470
471        Ok(())
472    }
473
474    fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
475        // Use try_read to avoid blocking
476        if let Ok(state) = self.state.try_read() {
477            state
478                .connections
479                .get(peer_id)
480                .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
481        } else {
482            None
483        }
484    }
485
486    fn peer_count(&self) -> usize {
487        if let Ok(state) = self.state.try_read() {
488            state.connections.len()
489        } else {
490            0
491        }
492    }
493
494    fn connected_peers(&self) -> Vec<NodeId> {
495        if let Ok(state) = self.state.try_read() {
496            state.connections.keys().cloned().collect()
497        } else {
498            Vec::new()
499        }
500    }
501
502    fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
503        if let Ok(mut cb) = self.connection_callback.try_write() {
504            *cb = callback;
505        }
506    }
507
508    async fn register_gatt_service(&self) -> Result<()> {
509        // TODO: Implement GATT server registration
510        // This will be done in #405 (GATT Service Definition)
511        log::warn!("GATT service registration not yet implemented");
512        Ok(())
513    }
514
515    async fn unregister_gatt_service(&self) -> Result<()> {
516        // TODO: Implement GATT server unregistration
517        Ok(())
518    }
519
520    fn supports_coded_phy(&self) -> bool {
521        // Check if LE Coded PHY is supported
522        // This would require checking adapter capabilities
523        // For now, assume BLE 5.0+ adapters support it
524        true
525    }
526
527    fn supports_extended_advertising(&self) -> bool {
528        // Check if extended advertising is supported
529        true
530    }
531
532    fn max_mtu(&self) -> u16 {
533        // BlueZ typically supports up to 517 bytes MTU
534        517
535    }
536
537    fn max_connections(&self) -> u8 {
538        // BlueZ default is 7 connections
539        7
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    // Integration tests require actual Bluetooth hardware
546    // They should be run with --ignored flag on a Linux system
547
548    #[tokio::test]
549    #[ignore = "Requires BlueZ and Bluetooth hardware"]
550    async fn test_adapter_creation() {
551        use super::*;
552
553        let adapter = BluerAdapter::new().await;
554        assert!(
555            adapter.is_ok(),
556            "Failed to create adapter: {:?}",
557            adapter.err()
558        );
559    }
560
561    #[tokio::test]
562    #[ignore = "Requires BlueZ and Bluetooth hardware"]
563    async fn test_adapter_init() {
564        use super::*;
565        use crate::BleConfig;
566
567        let mut adapter = BluerAdapter::new().await.unwrap();
568        let config = BleConfig::new(NodeId::new(0x12345678));
569        let result = adapter.init(&config).await;
570        assert!(result.is_ok());
571    }
572}