Skip to main content

hive_btle/platform/linux/
adapter.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! BlueZ adapter implementation using the `bluer` crate
17
18use async_trait::async_trait;
19use bluer::{
20    adv::{Advertisement, AdvertisementHandle},
21    gatt::local::{
22        Application, ApplicationHandle, Characteristic, CharacteristicNotify,
23        CharacteristicNotifyMethod, CharacteristicRead, CharacteristicWrite,
24        CharacteristicWriteMethod, Service,
25    },
26    Adapter, Address, Session,
27};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::{broadcast, Mutex, RwLock};
31
32use crate::config::{BleConfig, DiscoveryConfig};
33use crate::error::{BleError, Result};
34use crate::gatt::HiveCharacteristicUuids;
35use crate::platform::{
36    BleAdapter, ConnectionCallback, ConnectionEvent, DisconnectReason, DiscoveredDevice,
37    DiscoveryCallback,
38};
39use crate::transport::BleConnection;
40use crate::{NodeId, HIVE_SERVICE_UUID};
41
42use super::BluerConnection;
43
44/// Internal state for the adapter
45#[derive(Default)]
46struct AdapterState {
47    /// Active connections by node ID
48    connections: HashMap<NodeId, BluerConnection>,
49    /// Device address to node ID mapping
50    address_to_node: HashMap<Address, NodeId>,
51    /// Node ID to device address mapping
52    node_to_address: HashMap<NodeId, Address>,
53    /// Discovered devices (address -> device info)
54    /// TODO: Wire up device tracking for connection management
55    #[allow(dead_code)]
56    discovered: HashMap<Address, DiscoveredDevice>,
57}
58
59/// Type alias for callback functions to reduce complexity
60type SyncCallback = Box<dyn Fn(Vec<u8>) + Send + Sync>;
61
62/// State shared between GATT characteristic callbacks
63struct GattState {
64    /// Node ID for this adapter
65    node_id: Mutex<Option<NodeId>>,
66    /// Node info data (readable)
67    node_info: Mutex<Vec<u8>>,
68    /// Sync state data (readable, notifiable)
69    sync_state: Mutex<Vec<u8>>,
70    /// Status data (readable, notifiable)
71    status: Mutex<Vec<u8>>,
72    /// Received sync data callback
73    sync_data_callback: Mutex<Option<SyncCallback>>,
74    /// Received command callback
75    command_callback: Mutex<Option<SyncCallback>>,
76    /// Per-peer MTU from GATT operations (address -> mtu)
77    /// Updated when peers perform GATT read/write operations
78    peer_mtu: Mutex<HashMap<Address, u16>>,
79}
80
81impl GattState {
82    fn new() -> Self {
83        Self {
84            node_id: Mutex::new(None),
85            node_info: Mutex::new(Vec::new()),
86            sync_state: Mutex::new(Vec::new()),
87            status: Mutex::new(Vec::new()),
88            sync_data_callback: Mutex::new(None),
89            command_callback: Mutex::new(None),
90            peer_mtu: Mutex::new(HashMap::new()),
91        }
92    }
93
94    /// Initialize state with node information
95    async fn init(&self, node_id: NodeId) {
96        *self.node_id.lock().await = Some(node_id);
97        // Initialize node_info with basic data (node_id as 4 bytes LE)
98        *self.node_info.lock().await = node_id.as_u32().to_le_bytes().to_vec();
99        // Initialize sync_state as idle (0x00)
100        *self.sync_state.lock().await = vec![0x00];
101        // Initialize status as empty
102        *self.status.lock().await = vec![0x00];
103    }
104
105    /// Update the MTU for a peer based on GATT request
106    async fn update_peer_mtu(&self, address: Address, mtu: u16) {
107        let mut peer_mtu = self.peer_mtu.lock().await;
108        let old_mtu = peer_mtu.insert(address, mtu);
109        if old_mtu != Some(mtu) {
110            log::debug!("Peer {} MTU: {} (was {:?})", address, mtu, old_mtu);
111        }
112    }
113
114    /// Get the MTU for a peer
115    async fn get_peer_mtu(&self, address: &Address) -> Option<u16> {
116        self.peer_mtu.lock().await.get(address).copied()
117    }
118}
119
120/// Linux/BlueZ BLE adapter
121///
122/// Implements the `BleAdapter` trait using the `bluer` crate for
123/// BlueZ D-Bus communication.
124pub struct BluerAdapter {
125    /// BlueZ session (kept alive for adapter lifetime)
126    #[allow(dead_code)]
127    session: Session,
128    /// BlueZ adapter
129    adapter: Adapter,
130    /// Cached adapter address (queried once at creation)
131    cached_address: Option<String>,
132    /// Cached power state (updated on power changes)
133    cached_powered: std::sync::atomic::AtomicBool,
134    /// Configuration
135    config: RwLock<Option<BleConfig>>,
136    /// Internal state
137    state: Arc<RwLock<AdapterState>>,
138    /// Advertisement handle (keeps advertisement alive)
139    adv_handle: RwLock<Option<AdvertisementHandle>>,
140    /// GATT application handle (keeps service registered)
141    gatt_handle: RwLock<Option<ApplicationHandle>>,
142    /// GATT service state for read/write callbacks
143    gatt_state: Arc<GattState>,
144    /// Discovery callback
145    discovery_callback: RwLock<Option<DiscoveryCallback>>,
146    /// Connection callback
147    connection_callback: RwLock<Option<ConnectionCallback>>,
148    /// Shutdown signal
149    shutdown_tx: broadcast::Sender<()>,
150}
151
152impl BluerAdapter {
153    /// Create a new BlueZ adapter
154    ///
155    /// This connects to the system D-Bus and gets the default Bluetooth adapter.
156    pub async fn new() -> Result<Self> {
157        let session = Session::new().await.map_err(|e| {
158            BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
159        })?;
160
161        let adapter = session
162            .default_adapter()
163            .await
164            .map_err(|_| BleError::AdapterNotAvailable)?;
165
166        // Check if adapter is powered
167        let powered = adapter.is_powered().await.map_err(|e| {
168            BleError::PlatformError(format!("Failed to check adapter power: {}", e))
169        })?;
170
171        if !powered {
172            // Try to power on the adapter
173            adapter.set_powered(true).await.map_err(|e| {
174                BleError::PlatformError(format!("Failed to power on adapter: {}", e))
175            })?;
176        }
177
178        // Disable pairing to prevent pairing prompts on remote devices
179        if let Err(e) = adapter.set_pairable(false).await {
180            log::warn!("Failed to disable pairing: {}", e);
181        }
182
183        // Cache the adapter address
184        let cached_address = adapter.address().await.ok().map(|a| a.to_string());
185
186        let (shutdown_tx, _) = broadcast::channel(1);
187
188        Ok(Self {
189            session,
190            adapter,
191            cached_address,
192            cached_powered: std::sync::atomic::AtomicBool::new(true), // We ensured it's powered above
193            config: RwLock::new(None),
194            state: Arc::new(RwLock::new(AdapterState::default())),
195            adv_handle: RwLock::new(None),
196            gatt_handle: RwLock::new(None),
197            gatt_state: Arc::new(GattState::new()),
198            discovery_callback: RwLock::new(None),
199            connection_callback: RwLock::new(None),
200            shutdown_tx,
201        })
202    }
203
204    /// Create adapter with a specific adapter name (e.g., "hci0")
205    pub async fn with_adapter_name(name: &str) -> Result<Self> {
206        let session = Session::new().await.map_err(|e| {
207            BleError::PlatformError(format!("Failed to create BlueZ session: {}", e))
208        })?;
209
210        let adapter = session.adapter(name).map_err(|e| {
211            BleError::PlatformError(format!("Failed to get adapter '{}': {}", name, e))
212        })?;
213
214        let powered = adapter.is_powered().await.map_err(|e| {
215            BleError::PlatformError(format!("Failed to check adapter power: {}", e))
216        })?;
217
218        if !powered {
219            adapter.set_powered(true).await.map_err(|e| {
220                BleError::PlatformError(format!("Failed to power on adapter: {}", e))
221            })?;
222        }
223
224        // Disable pairing to prevent pairing prompts on remote devices
225        if let Err(e) = adapter.set_pairable(false).await {
226            log::warn!("Failed to disable pairing: {}", e);
227        }
228
229        // Cache the adapter address
230        let cached_address = adapter.address().await.ok().map(|a| a.to_string());
231
232        let (shutdown_tx, _) = broadcast::channel(1);
233
234        Ok(Self {
235            session,
236            adapter,
237            cached_address,
238            cached_powered: std::sync::atomic::AtomicBool::new(true),
239            config: RwLock::new(None),
240            state: Arc::new(RwLock::new(AdapterState::default())),
241            adv_handle: RwLock::new(None),
242            gatt_handle: RwLock::new(None),
243            gatt_state: Arc::new(GattState::new()),
244            discovery_callback: RwLock::new(None),
245            connection_callback: RwLock::new(None),
246            shutdown_tx,
247        })
248    }
249
250    /// Get the adapter name (e.g., "hci0")
251    pub fn adapter_name(&self) -> &str {
252        self.adapter.name()
253    }
254
255    /// Build HIVE advertisement
256    ///
257    /// Matches Android's advertisement format for maximum compatibility:
258    /// - 16-bit HIVE service UUID alias (0xF47A)
259    /// - Service data with [nodeId:4 bytes BE][meshId:4 bytes BE]
260    /// - Device name goes in scan response (handled by BlueZ via adapter alias)
261    ///
262    /// This keeps the advertisement packet under 31 bytes:
263    /// - Flags: 3 bytes
264    /// - Service UUID (16-bit): 4 bytes
265    /// - Service data: 2 (header) + 2 (UUID) + 4 (nodeId) + 4 (meshId) = 12 bytes
266    /// - Total: 19 bytes (name in scan response)
267    fn build_advertisement(&self, config: &BleConfig) -> Advertisement {
268        use std::collections::BTreeMap;
269
270        // The 16-bit UUID 0xF47A in its 128-bit Bluetooth SIG form
271        let service_uuid_16bit =
272            uuid::Uuid::parse_str("0000F47A-0000-1000-8000-00805F9B34FB").unwrap();
273
274        // Build service data: [nodeId:4 bytes BE][meshId:4 bytes BE]
275        // meshId is an 8-char hex string like "29C916FA" -> 4 bytes
276        let mut service_data_bytes = config.node_id.as_u32().to_be_bytes().to_vec();
277
278        // Parse mesh_id as hex and append (4 bytes)
279        if let Ok(mesh_id_int) = u32::from_str_radix(&config.mesh.mesh_id, 16) {
280            service_data_bytes.extend_from_slice(&mesh_id_int.to_be_bytes());
281            log::debug!(
282                "Advertisement includes mesh_id: {} (0x{:08X})",
283                config.mesh.mesh_id,
284                mesh_id_int
285            );
286        } else {
287            // Fallback: use first 4 bytes of mesh_id string as ASCII
288            let mesh_bytes: Vec<u8> = config.mesh.mesh_id.bytes().take(4).collect();
289            service_data_bytes.extend_from_slice(&mesh_bytes);
290            log::debug!(
291                "Advertisement includes mesh_id as ASCII: {}",
292                config.mesh.mesh_id
293            );
294        }
295
296        let mut service_data = BTreeMap::new();
297        service_data.insert(service_uuid_16bit, service_data_bytes);
298
299        // Device name - include in advertisement for maximum compatibility
300        // (some scanners need the name in the main advertisement)
301        let device_name = format!("HIVE-{:08X}", config.node_id.as_u32());
302
303        Advertisement {
304            advertisement_type: bluer::adv::Type::Peripheral,
305            service_uuids: vec![service_uuid_16bit].into_iter().collect(),
306            // Include local_name - BlueZ will put it in scan response if needed
307            local_name: Some(device_name),
308            service_data,
309            // Set discoverable to allow other devices to find us
310            discoverable: Some(true),
311            ..Default::default()
312        }
313    }
314
315    /// Set the adapter's alias (used for scan response device name)
316    pub async fn set_adapter_alias(&self, alias: &str) -> Result<()> {
317        self.adapter
318            .set_alias(alias.to_string())
319            .await
320            .map_err(|e| BleError::PlatformError(format!("Failed to set adapter alias: {}", e)))
321    }
322
323    /// Parse HIVE beacon from advertising data
324    /// TODO: Use this method instead of inline parsing in discovery loop
325    #[allow(dead_code)]
326    fn parse_hive_beacon(
327        &self,
328        address: Address,
329        name: Option<String>,
330        rssi: i16,
331        service_data: &HashMap<bluer::Uuid, Vec<u8>>,
332        _manufacturer_data: &HashMap<u16, Vec<u8>>,
333    ) -> Option<DiscoveredDevice> {
334        // Check if this is a HIVE node by looking for our service UUID
335        let is_hive = service_data.contains_key(&HIVE_SERVICE_UUID);
336
337        // Try to extract node ID from name (HIVE-XXXXXXXX format)
338        let node_id = name
339            .as_ref()
340            .and_then(|n| n.strip_prefix("HIVE-"))
341            .and_then(NodeId::parse);
342
343        Some(DiscoveredDevice {
344            address: address.to_string(),
345            name,
346            rssi: rssi as i8,
347            is_hive_node: is_hive || node_id.is_some(),
348            node_id,
349            adv_data: Vec::new(), // TODO: serialize full adv data
350        })
351    }
352
353    /// Register node ID to address mapping
354    pub async fn register_node_address(&self, node_id: NodeId, address: Address) {
355        let mut state = self.state.write().await;
356        state.address_to_node.insert(address, node_id);
357        state.node_to_address.insert(node_id, address);
358    }
359
360    /// Get address for a node ID
361    pub async fn get_node_address(&self, node_id: &NodeId) -> Option<Address> {
362        let state = self.state.read().await;
363        state.node_to_address.get(node_id).copied()
364    }
365
366    /// Get a clone of a stored connection by node ID
367    ///
368    /// Returns the `BluerConnection` for a connected peer, allowing direct
369    /// GATT operations (read/write characteristics) on the remote device.
370    pub async fn get_connection(&self, node_id: &NodeId) -> Option<BluerConnection> {
371        let state = self.state.read().await;
372        state.connections.get(node_id).cloned()
373    }
374
375    /// Set callback for when sync data is received from a connected peer
376    ///
377    /// This is invoked when a remote device writes to the sync_data characteristic.
378    /// Use this to feed received documents into `HiveMesh::on_ble_data_received_anonymous`.
379    pub async fn set_sync_data_callback<F>(&self, callback: F)
380    where
381        F: Fn(Vec<u8>) + Send + Sync + 'static,
382    {
383        *self.gatt_state.sync_data_callback.lock().await = Some(Box::new(callback));
384    }
385
386    /// Clear the sync data callback
387    pub async fn clear_sync_data_callback(&self) {
388        *self.gatt_state.sync_data_callback.lock().await = None;
389    }
390
391    /// Update the sync state data that connected peers can read
392    ///
393    /// Call this with the output of `HiveMesh::tick()` or `HiveMesh::build_document()`
394    /// to make the current mesh state available to connected peers.
395    pub async fn update_sync_state(&self, data: &[u8]) {
396        *self.gatt_state.sync_state.lock().await = data.to_vec();
397    }
398
399    /// Get current sync state data
400    pub async fn get_sync_state(&self) -> Vec<u8> {
401        self.gatt_state.sync_state.lock().await.clone()
402    }
403
404    /// Get the negotiated MTU for a connected peer (by BLE address)
405    ///
406    /// Returns the MTU captured from the peer's last GATT operation.
407    /// This is populated when the peer performs read/write operations on our GATT server.
408    pub async fn get_peer_mtu(&self, address: &Address) -> Option<u16> {
409        self.gatt_state.get_peer_mtu(address).await
410    }
411
412    /// Get all known peer MTUs (for debugging/monitoring)
413    pub async fn get_all_peer_mtus(&self) -> HashMap<Address, u16> {
414        self.gatt_state.peer_mtu.lock().await.clone()
415    }
416
417    /// Get a device handle by address for direct GATT operations
418    ///
419    /// This is useful when you need to connect to a device directly.
420    pub fn get_device(&self, address: Address) -> std::result::Result<bluer::Device, bluer::Error> {
421        self.adapter.device(address)
422    }
423
424    /// Connect to a device with explicit address type
425    ///
426    /// This is needed for devices using random BLE addresses (common in BLE peripherals
427    /// like WearOS watches). The address type can be determined from the address itself:
428    /// - If first byte MSBs are 11 (0xC0+ range), it's typically a random static address
429    /// - Use `AddressType::LeRandom` for random addresses
430    /// - Use `AddressType::LePublic` for public addresses
431    pub async fn connect_device(
432        &self,
433        address: Address,
434        address_type: bluer::AddressType,
435    ) -> std::result::Result<bluer::Device, bluer::Error> {
436        self.adapter.connect_device(address, address_type).await
437    }
438
439    /// Determine if a BLE address is a random address based on its structure
440    ///
441    /// In BLE, random addresses have specific patterns in the two MSBs of the first byte:
442    /// - 11: Random static address
443    /// - 01: Resolvable private address (RPA)
444    /// - 00: Non-resolvable private address
445    ///
446    /// Public addresses don't follow this pattern and are manufacturer-assigned.
447    pub fn is_random_address(address: &Address) -> bool {
448        let bytes = address.0;
449        // The first byte of the address string (e.g., "D8" in "D8:3A:DD:F5:FD:53") is bytes[0]
450        let first_byte = bytes[0];
451        // Random static: top 2 bits = 11 (0xC0+)
452        // RPA: top 2 bits = 01 (0x40-0x7F)
453        // Non-resolvable: top 2 bits = 00 (0x00-0x3F)
454        // A simple heuristic: if MSB bits are 11, it's almost certainly random static
455        (first_byte & 0xC0) == 0xC0
456    }
457
458    /// Stop BLE discovery temporarily
459    ///
460    /// This is useful before connecting to avoid the "le-connection-abort-by-local" error
461    /// that can happen when BlueZ tries to scan and connect simultaneously.
462    pub async fn stop_discovery(&self) -> Result<()> {
463        // Note: This doesn't stop our discovery stream task, just tells the adapter to pause
464        self.adapter
465            .set_discovery_filter(bluer::DiscoveryFilter::default())
466            .await
467            .ok();
468        Ok(())
469    }
470
471    /// Resume BLE discovery
472    pub async fn resume_discovery(&self) -> Result<()> {
473        use bluer::DiscoveryFilter;
474        use bluer::DiscoveryTransport;
475
476        let filter = DiscoveryFilter {
477            transport: DiscoveryTransport::Le,
478            ..Default::default()
479        };
480        self.adapter.set_discovery_filter(filter).await.ok();
481        Ok(())
482    }
483
484    /// Remove a device from BlueZ's cache
485    ///
486    /// This can help clear stale state that causes connection failures.
487    /// Use this when repeated connection attempts fail.
488    pub async fn remove_device(&self, address: Address) -> Result<()> {
489        self.adapter
490            .remove_device(address)
491            .await
492            .map_err(|e| BleError::ConnectionFailed(format!("Failed to remove device: {}", e)))?;
493        log::debug!("Removed device {} from BlueZ cache", address);
494        Ok(())
495    }
496}
497
498#[async_trait]
499impl BleAdapter for BluerAdapter {
500    async fn init(&mut self, config: &BleConfig) -> Result<()> {
501        *self.config.write().await = Some(config.clone());
502        log::info!(
503            "BluerAdapter initialized for node {:08X}",
504            config.node_id.as_u32()
505        );
506        Ok(())
507    }
508
509    async fn start(&self) -> Result<()> {
510        let config = self.config.read().await;
511        let config = config
512            .as_ref()
513            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
514
515        // Start advertising
516        self.start_advertising(&config.discovery).await?;
517
518        // Start scanning
519        self.start_scan(&config.discovery).await?;
520
521        log::info!("BluerAdapter started");
522        Ok(())
523    }
524
525    async fn stop(&self) -> Result<()> {
526        // Stop advertising
527        self.stop_advertising().await?;
528
529        // Stop scanning
530        self.stop_scan().await?;
531
532        // Signal shutdown to background tasks
533        let _ = self.shutdown_tx.send(());
534
535        log::info!("BluerAdapter stopped");
536        Ok(())
537    }
538
539    fn is_powered(&self) -> bool {
540        self.cached_powered
541            .load(std::sync::atomic::Ordering::Relaxed)
542    }
543
544    fn address(&self) -> Option<String> {
545        self.cached_address.clone()
546    }
547
548    async fn start_scan(&self, config: &DiscoveryConfig) -> Result<()> {
549        use bluer::DiscoveryFilter;
550        use bluer::DiscoveryTransport;
551
552        let filter = DiscoveryFilter {
553            transport: DiscoveryTransport::Le,
554            duplicate_data: !config.filter_duplicates,
555            ..Default::default()
556        };
557
558        self.adapter
559            .set_discovery_filter(filter)
560            .await
561            .map_err(|e| {
562                BleError::DiscoveryFailed(format!("Failed to set discovery filter: {}", e))
563            })?;
564
565        // Start discovery
566        let discover =
567            self.adapter.discover_devices().await.map_err(|e| {
568                BleError::DiscoveryFailed(format!("Failed to start discovery: {}", e))
569            })?;
570
571        // Spawn task to handle discovered devices
572        let callback = self.discovery_callback.read().await.clone();
573        let adapter = self.adapter.clone();
574        let state = self.state.clone();
575        let mut shutdown_rx = self.shutdown_tx.subscribe();
576
577        tokio::spawn(async move {
578            use tokio_stream::StreamExt;
579            let mut discover = std::pin::pin!(discover);
580
581            loop {
582                tokio::select! {
583                    _ = shutdown_rx.recv() => {
584                        log::debug!("Discovery task shutting down");
585                        break;
586                    }
587                    event = discover.next() => {
588                        match event {
589                            Some(bluer::AdapterEvent::DeviceAdded(addr)) => {
590                                if let Ok(device) = adapter.device(addr) {
591                                    // Get device properties from advertisement data only
592                                    // IMPORTANT: Do NOT call device.uuids() as it may trigger
593                                    // service discovery which causes unwanted pairing requests
594                                    let name = device.name().await.ok().flatten();
595                                    let rssi = device.rssi().await.ok().flatten().unwrap_or(0);
596
597                                    // Get service data from advertisement (no connection needed)
598                                    let service_data = device.service_data().await.ok().flatten().unwrap_or_default();
599
600                                    // The 16-bit UUID 0xF47A expands to this 128-bit form
601                                    let service_uuid_16bit =
602                                        uuid::Uuid::parse_str("0000F47A-0000-1000-8000-00805F9B34FB").unwrap();
603
604                                    // Check if HIVE service UUID is present in advertisement
605                                    // Check both the full UUID and the 16-bit alias
606                                    let has_hive_service = service_data.contains_key(&HIVE_SERVICE_UUID)
607                                        || service_data.contains_key(&service_uuid_16bit);
608
609                                    // Check if name indicates HIVE node (fallback)
610                                    // Supports both formats:
611                                    // - New: HIVE_<MESH>-<NODE_ID> (e.g., "HIVE_WEARTAK-8DD4")
612                                    // - Legacy: HIVE-<NODE_ID> (e.g., "HIVE-12345678")
613                                    let name_indicates_hive = name.as_ref().map(|n| {
614                                        n.starts_with("HIVE_") || n.starts_with("HIVE-")
615                                    }).unwrap_or(false);
616
617                                    // HIVE node detection: prefer service UUID, fallback to name
618                                    let is_hive_node = has_hive_service || name_indicates_hive;
619
620                                    // Parse node ID from name (supports both formats)
621                                    let mut node_id = name.as_ref().and_then(|n| {
622                                        crate::config::MeshConfig::parse_device_name(n)
623                                            .map(|(_, node_id)| node_id)
624                                    });
625
626                                    // If not found in name, try to extract from service data
627                                    // Service data format: [nodeId:4 bytes BE][meshId:UTF-8]
628                                    if node_id.is_none() {
629                                        if let Some(data) = service_data.get(&service_uuid_16bit)
630                                            .or_else(|| service_data.get(&HIVE_SERVICE_UUID))
631                                        {
632                                            if data.len() >= 4 {
633                                                let id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
634                                                node_id = Some(NodeId::new(id));
635                                            }
636                                        }
637                                    }
638
639                                    let discovered = DiscoveredDevice {
640                                        address: addr.to_string(),
641                                        name: name.clone(),
642                                        rssi: rssi as i8,
643                                        is_hive_node,
644                                        node_id,
645                                        adv_data: Vec::new(),
646                                    };
647
648                                    // Register node ID → address mapping so connect() can find the peer
649                                    if let Some(nid) = node_id {
650                                        let mut s = state.write().await;
651                                        s.address_to_node.insert(addr, nid);
652                                        s.node_to_address.insert(nid, addr);
653                                    }
654
655                                    log::debug!(
656                                        "Discovered device: {} (HIVE: {}, service_uuid: {}, name: {})",
657                                        discovered.address, is_hive_node, has_hive_service, name_indicates_hive
658                                    );
659
660                                    if let Some(ref cb) = callback {
661                                        cb(discovered);
662                                    }
663                                }
664                            }
665                            Some(bluer::AdapterEvent::DeviceRemoved(addr)) => {
666                                log::debug!("Device removed: {}", addr);
667                            }
668                            None => break,
669                            _ => {}
670                        }
671                    }
672                }
673            }
674        });
675
676        log::info!("BLE scanning started");
677        Ok(())
678    }
679
680    async fn stop_scan(&self) -> Result<()> {
681        // Discovery is stopped when the stream is dropped
682        // The shutdown signal will cause the task to exit
683        log::info!("BLE scanning stopped");
684        Ok(())
685    }
686
687    async fn start_advertising(&self, _config: &DiscoveryConfig) -> Result<()> {
688        let ble_config = self.config.read().await;
689        let ble_config = ble_config
690            .as_ref()
691            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
692
693        let adv = self.build_advertisement(ble_config);
694
695        let handle =
696            self.adapter.advertise(adv).await.map_err(|e| {
697                BleError::PlatformError(format!("Failed to start advertising: {}", e))
698            })?;
699
700        *self.adv_handle.write().await = Some(handle);
701
702        log::info!(
703            "BLE advertising started for HIVE-{:08X}",
704            ble_config.node_id.as_u32()
705        );
706        Ok(())
707    }
708
709    async fn stop_advertising(&self) -> Result<()> {
710        // Drop the advertisement handle to stop advertising
711        *self.adv_handle.write().await = None;
712        log::info!("BLE advertising stopped");
713        Ok(())
714    }
715
716    fn set_discovery_callback(&mut self, callback: Option<DiscoveryCallback>) {
717        // Use blocking write since this is a sync method
718        // In practice, this should be called before start()
719        if let Ok(mut cb) = self.discovery_callback.try_write() {
720            *cb = callback;
721        }
722    }
723
724    async fn connect(&self, peer_id: &NodeId) -> Result<Box<dyn BleConnection>> {
725        // Look up the address for this node ID
726        let address = self
727            .get_node_address(peer_id)
728            .await
729            .ok_or_else(|| BleError::ConnectionFailed(format!("Unknown node ID: {}", peer_id)))?;
730
731        let device = self
732            .adapter
733            .device(address)
734            .map_err(|e| BleError::ConnectionFailed(format!("Failed to get device: {}", e)))?;
735
736        // Trust the device to prevent pairing prompts on the remote side
737        // This tells BlueZ we accept this device without requiring user confirmation
738        if let Err(e) = device.set_trusted(true).await {
739            log::warn!("Failed to set device as trusted: {}", e);
740        }
741
742        // Pause advertising during connection to avoid le-connection-abort-by-local.
743        // Single-radio BLE adapters often can't advertise and initiate connections
744        // simultaneously. We temporarily drop the advertisement handle then restart.
745        let had_advertising = self.adv_handle.read().await.is_some();
746        if had_advertising {
747            log::debug!("Pausing advertising for connection to {}", address);
748            *self.adv_handle.write().await = None;
749            // Small delay for the adapter to finish stopping
750            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
751        }
752
753        // Connect to the device
754        let connect_result = device.connect().await;
755
756        // Restart advertising if it was active
757        if had_advertising {
758            log::debug!("Resuming advertising after connection attempt");
759            let ble_config = self.config.read().await;
760            if let Some(ref cfg) = *ble_config {
761                let adv = self.build_advertisement(cfg);
762                match self.adapter.advertise(adv).await {
763                    Ok(handle) => *self.adv_handle.write().await = Some(handle),
764                    Err(e) => log::warn!("Failed to restart advertising: {}", e),
765                }
766            }
767        }
768
769        connect_result
770            .map_err(|e| BleError::ConnectionFailed(format!("Failed to connect: {}", e)))?;
771
772        // Create connection wrapper
773        let connection = BluerConnection::new(*peer_id, device).await?;
774
775        // Store connection
776        {
777            let mut state = self.state.write().await;
778            state.connections.insert(*peer_id, connection.clone());
779        }
780
781        // Notify callback
782        if let Some(ref cb) = *self.connection_callback.read().await {
783            cb(
784                *peer_id,
785                ConnectionEvent::Connected {
786                    mtu: connection.mtu(),
787                    phy: connection.phy(),
788                },
789            );
790        }
791
792        log::info!("Connected to peer {}", peer_id);
793        Ok(Box::new(connection))
794    }
795
796    async fn disconnect(&self, peer_id: &NodeId) -> Result<()> {
797        let connection = {
798            let mut state = self.state.write().await;
799            state.connections.remove(peer_id)
800        };
801
802        if let Some(conn) = connection {
803            conn.disconnect().await?;
804
805            // Notify callback
806            if let Some(ref cb) = *self.connection_callback.read().await {
807                cb(
808                    *peer_id,
809                    ConnectionEvent::Disconnected {
810                        reason: DisconnectReason::LocalRequest,
811                    },
812                );
813            }
814
815            log::info!("Disconnected from peer {}", peer_id);
816        }
817
818        Ok(())
819    }
820
821    fn get_connection(&self, peer_id: &NodeId) -> Option<Box<dyn BleConnection>> {
822        // Use try_read to avoid blocking
823        if let Ok(state) = self.state.try_read() {
824            state
825                .connections
826                .get(peer_id)
827                .map(|c| Box::new(c.clone()) as Box<dyn BleConnection>)
828        } else {
829            None
830        }
831    }
832
833    fn peer_count(&self) -> usize {
834        if let Ok(state) = self.state.try_read() {
835            state.connections.len()
836        } else {
837            0
838        }
839    }
840
841    fn connected_peers(&self) -> Vec<NodeId> {
842        if let Ok(state) = self.state.try_read() {
843            state.connections.keys().cloned().collect()
844        } else {
845            Vec::new()
846        }
847    }
848
849    fn set_connection_callback(&mut self, callback: Option<ConnectionCallback>) {
850        if let Ok(mut cb) = self.connection_callback.try_write() {
851            *cb = callback;
852        }
853    }
854
855    async fn register_gatt_service(&self) -> Result<()> {
856        // Get config to access node_id
857        let config = self.config.read().await;
858        let node_id = config
859            .as_ref()
860            .map(|c| c.node_id)
861            .ok_or_else(|| BleError::InvalidState("Adapter not initialized".to_string()))?;
862
863        // Initialize GATT state with node info
864        self.gatt_state.init(node_id).await;
865
866        // Clone Arc for use in callbacks
867        let state = self.gatt_state.clone();
868        let state_read_node = state.clone();
869        let state_read_sync = state.clone();
870        let state_read_status = state.clone();
871        let state_write_sync = state.clone();
872        let state_write_cmd = state.clone();
873
874        // Build GATT application with HIVE service
875        let app = Application {
876            services: vec![Service {
877                uuid: HIVE_SERVICE_UUID,
878                primary: true,
879                characteristics: vec![
880                    // Node Info characteristic (0001) - READ
881                    Characteristic {
882                        uuid: HiveCharacteristicUuids::node_info(),
883                        read: Some(CharacteristicRead {
884                            read: true,
885                            fun: Box::new(move |req| {
886                                let state = state_read_node.clone();
887                                Box::pin(async move {
888                                    // Track peer MTU from GATT request
889                                    state.update_peer_mtu(req.device_address, req.mtu).await;
890                                    let data = state.node_info.lock().await;
891                                    log::debug!(
892                                        "GATT read node_info from {:?}: {} bytes (MTU={})",
893                                        req.device_address,
894                                        data.len(),
895                                        req.mtu
896                                    );
897                                    Ok(data.clone())
898                                })
899                            }),
900                            ..Default::default()
901                        }),
902                        ..Default::default()
903                    },
904                    // Sync State characteristic (0002) - READ, NOTIFY
905                    Characteristic {
906                        uuid: HiveCharacteristicUuids::sync_state(),
907                        read: Some(CharacteristicRead {
908                            read: true,
909                            fun: Box::new(move |req| {
910                                let state = state_read_sync.clone();
911                                Box::pin(async move {
912                                    // Track peer MTU from GATT request
913                                    state.update_peer_mtu(req.device_address, req.mtu).await;
914                                    let data = state.sync_state.lock().await;
915                                    log::debug!(
916                                        "GATT read sync_state from {:?}: {} bytes (MTU={})",
917                                        req.device_address,
918                                        data.len(),
919                                        req.mtu
920                                    );
921                                    Ok(data.clone())
922                                })
923                            }),
924                            ..Default::default()
925                        }),
926                        notify: Some(CharacteristicNotify {
927                            notify: true,
928                            method: CharacteristicNotifyMethod::Io,
929                            ..Default::default()
930                        }),
931                        ..Default::default()
932                    },
933                    // Sync Data characteristic (0003) - WRITE, INDICATE
934                    Characteristic {
935                        uuid: HiveCharacteristicUuids::sync_data(),
936                        write: Some(CharacteristicWrite {
937                            write: true,
938                            method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
939                                let state = state_write_sync.clone();
940                                Box::pin(async move {
941                                    // Track peer MTU from GATT request
942                                    state.update_peer_mtu(req.device_address, req.mtu).await;
943                                    log::debug!(
944                                        "GATT write sync_data from {:?}: {} bytes (MTU={})",
945                                        req.device_address,
946                                        data.len(),
947                                        req.mtu
948                                    );
949                                    // Invoke callback if set
950                                    if let Some(ref cb) = *state.sync_data_callback.lock().await {
951                                        cb(data);
952                                    }
953                                    Ok(())
954                                })
955                            })),
956                            ..Default::default()
957                        }),
958                        notify: Some(CharacteristicNotify {
959                            indicate: true,
960                            method: CharacteristicNotifyMethod::Io,
961                            ..Default::default()
962                        }),
963                        ..Default::default()
964                    },
965                    // Command characteristic (0004) - WRITE
966                    Characteristic {
967                        uuid: HiveCharacteristicUuids::command(),
968                        write: Some(CharacteristicWrite {
969                            write: true,
970                            write_without_response: true,
971                            method: CharacteristicWriteMethod::Fun(Box::new(move |data, req| {
972                                let state = state_write_cmd.clone();
973                                Box::pin(async move {
974                                    // Track peer MTU from GATT request
975                                    state.update_peer_mtu(req.device_address, req.mtu).await;
976                                    log::debug!(
977                                        "GATT write command from {:?}: {} bytes (MTU={})",
978                                        req.device_address,
979                                        data.len(),
980                                        req.mtu
981                                    );
982                                    // Invoke callback if set
983                                    if let Some(ref cb) = *state.command_callback.lock().await {
984                                        cb(data);
985                                    }
986                                    Ok(())
987                                })
988                            })),
989                            ..Default::default()
990                        }),
991                        ..Default::default()
992                    },
993                    // Status characteristic (0005) - READ, NOTIFY
994                    Characteristic {
995                        uuid: HiveCharacteristicUuids::status(),
996                        read: Some(CharacteristicRead {
997                            read: true,
998                            fun: Box::new(move |req| {
999                                let state = state_read_status.clone();
1000                                Box::pin(async move {
1001                                    // Track peer MTU from GATT request
1002                                    state.update_peer_mtu(req.device_address, req.mtu).await;
1003                                    let data = state.status.lock().await;
1004                                    log::debug!(
1005                                        "GATT read status from {:?}: {} bytes (MTU={})",
1006                                        req.device_address,
1007                                        data.len(),
1008                                        req.mtu
1009                                    );
1010                                    Ok(data.clone())
1011                                })
1012                            }),
1013                            ..Default::default()
1014                        }),
1015                        notify: Some(CharacteristicNotify {
1016                            notify: true,
1017                            method: CharacteristicNotifyMethod::Io,
1018                            ..Default::default()
1019                        }),
1020                        ..Default::default()
1021                    },
1022                ],
1023                ..Default::default()
1024            }],
1025            ..Default::default()
1026        };
1027
1028        // Register the GATT application with BlueZ
1029        let handle = self
1030            .adapter
1031            .serve_gatt_application(app)
1032            .await
1033            .map_err(|e| BleError::GattError(format!("Failed to register GATT service: {}", e)))?;
1034
1035        // Store the handle to keep the service alive
1036        *self.gatt_handle.write().await = Some(handle);
1037
1038        log::info!(
1039            "GATT service registered for node {:08X} with 5 characteristics",
1040            node_id.as_u32()
1041        );
1042        Ok(())
1043    }
1044
1045    async fn unregister_gatt_service(&self) -> Result<()> {
1046        // Drop the handle to unregister the GATT application
1047        let handle = self.gatt_handle.write().await.take();
1048        if handle.is_some() {
1049            log::info!("GATT service unregistered");
1050        }
1051        Ok(())
1052    }
1053
1054    fn supports_coded_phy(&self) -> bool {
1055        // Check if LE Coded PHY is supported
1056        // This would require checking adapter capabilities
1057        // For now, assume BLE 5.0+ adapters support it
1058        true
1059    }
1060
1061    fn supports_extended_advertising(&self) -> bool {
1062        // Check if extended advertising is supported
1063        true
1064    }
1065
1066    fn max_mtu(&self) -> u16 {
1067        // BlueZ typically supports up to 517 bytes MTU
1068        517
1069    }
1070
1071    fn max_connections(&self) -> u8 {
1072        // BlueZ default is 7 connections
1073        7
1074    }
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079    // Integration tests require actual Bluetooth hardware
1080    // They should be run with --ignored flag on a Linux system
1081
1082    #[tokio::test]
1083    #[ignore = "Requires BlueZ and Bluetooth hardware"]
1084    async fn test_adapter_creation() {
1085        use super::*;
1086
1087        let adapter = BluerAdapter::new().await;
1088        assert!(
1089            adapter.is_ok(),
1090            "Failed to create adapter: {:?}",
1091            adapter.err()
1092        );
1093    }
1094
1095    #[tokio::test]
1096    #[ignore = "Requires BlueZ and Bluetooth hardware"]
1097    async fn test_adapter_init() {
1098        use super::*;
1099        use crate::BleConfig;
1100
1101        let mut adapter = BluerAdapter::new().await.unwrap();
1102        let config = BleConfig::new(NodeId::new(0x12345678));
1103        let result = adapter.init(&config).await;
1104        assert!(result.is_ok());
1105    }
1106}