Skip to main content

hive_btle/platform/linux/
connection.rs

1// Copyright (c) 2025-2026 (r)evolve - Revolve Team LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! BlueZ connection wrapper
17//!
18//! Provides a write queue to serialize BLE GATT writes, since BLE only allows
19//! one pending write operation per connection at a time.
20
21use bluer::Device;
22use std::collections::VecDeque;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{Mutex, RwLock};
26
27use crate::config::BlePhy;
28use crate::error::{BleError, Result};
29use crate::transport::BleConnection;
30use crate::NodeId;
31
32/// A queued write operation
33struct QueuedWrite {
34    /// Service UUID
35    service_uuid: uuid::Uuid,
36    /// Characteristic UUID
37    char_uuid: uuid::Uuid,
38    /// Data to write
39    data: Vec<u8>,
40    /// Completion notification
41    complete_tx: tokio::sync::oneshot::Sender<Result<()>>,
42}
43
44/// Internal connection state
45struct ConnectionState {
46    /// Whether the connection is alive
47    alive: bool,
48    /// Negotiated MTU
49    mtu: u16,
50    /// Current PHY
51    phy: BlePhy,
52    /// Last RSSI reading
53    rssi: Option<i8>,
54}
55
56/// Write queue state (separate from connection state for finer-grained locking)
57struct WriteQueueState {
58    /// Queue of pending writes
59    queue: VecDeque<QueuedWrite>,
60    /// Whether a write is currently in progress
61    write_in_progress: bool,
62}
63
64/// BlueZ connection wrapper
65///
66/// Wraps a `bluer::Device` with connection state tracking and write queue.
67/// BLE only allows one pending write per connection, so all writes are
68/// serialized through the write queue.
69#[derive(Clone)]
70pub struct BluerConnection {
71    /// Remote peer ID
72    peer_id: NodeId,
73    /// BlueZ device handle
74    device: Device,
75    /// Connection state
76    state: Arc<RwLock<ConnectionState>>,
77    /// Write queue state (uses Mutex for write serialization)
78    write_queue: Arc<Mutex<WriteQueueState>>,
79    /// When the connection was established
80    connected_at: Instant,
81}
82
83/// Default MTU for BLE 4.2+ devices with data length extension
84/// BlueZ typically negotiates 247-517 bytes depending on the remote device
85/// We use 185 as a conservative default (matches WearTAK's request)
86const DEFAULT_BLE_MTU: u16 = 185;
87
88/// Minimum BLE MTU (ATT_MTU_MIN per Bluetooth spec)
89#[allow(dead_code)]
90const MIN_BLE_MTU: u16 = 23;
91
92impl BluerConnection {
93    /// Create a new connection wrapper
94    pub(crate) async fn new(peer_id: NodeId, device: Device) -> Result<Self> {
95        // BlueZ negotiates MTU automatically on first ATT operation
96        // Use a reasonable default that most modern devices support
97        // The actual MTU will be confirmed on the first characteristic access
98        let mtu = DEFAULT_BLE_MTU;
99
100        let state = ConnectionState {
101            alive: true,
102            mtu,
103            phy: BlePhy::Le1M, // Default PHY
104            rssi: None,
105        };
106
107        let write_queue = WriteQueueState {
108            queue: VecDeque::new(),
109            write_in_progress: false,
110        };
111
112        let conn = Self {
113            peer_id,
114            device,
115            state: Arc::new(RwLock::new(state)),
116            write_queue: Arc::new(Mutex::new(write_queue)),
117            connected_at: Instant::now(),
118        };
119
120        // Try to get initial RSSI
121        conn.update_rssi().await;
122
123        Ok(conn)
124    }
125
126    /// Discover the actual negotiated MTU via a characteristic
127    ///
128    /// BlueZ negotiates MTU during the first GATT operation.
129    /// Call this after connecting to get the actual negotiated value.
130    /// Uses AcquireWrite which returns the negotiated MTU.
131    pub async fn discover_mtu(
132        &self,
133        service_uuid: uuid::Uuid,
134        char_uuid: uuid::Uuid,
135    ) -> Result<u16> {
136        let service = self
137            .find_service(service_uuid)
138            .await?
139            .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
140
141        let characteristics = service
142            .characteristics()
143            .await
144            .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
145
146        for char in characteristics {
147            if char.uuid().await.ok() == Some(char_uuid) {
148                // Try to acquire write IO which returns the negotiated MTU
149                match char.write_io().await {
150                    Ok(writer) => {
151                        let mtu = writer.mtu();
152                        self.set_mtu(mtu as u16).await;
153                        log::info!("Discovered MTU: {} bytes via {}", mtu, char_uuid);
154                        return Ok(mtu as u16);
155                    }
156                    Err(e) => {
157                        log::debug!("Could not acquire write IO for MTU discovery: {}", e);
158                        // Fall through to try read/notify
159                    }
160                }
161
162                // Try notify_io as fallback
163                match char.notify_io().await {
164                    Ok(reader) => {
165                        let mtu = reader.mtu();
166                        self.set_mtu(mtu as u16).await;
167                        log::info!("Discovered MTU: {} bytes via notify {}", mtu, char_uuid);
168                        return Ok(mtu as u16);
169                    }
170                    Err(e) => {
171                        log::debug!("Could not acquire notify IO for MTU discovery: {}", e);
172                    }
173                }
174            }
175        }
176
177        // Return current MTU if we couldn't discover it
178        Ok(self.mtu())
179    }
180
181    /// Get the underlying BlueZ device
182    pub fn device(&self) -> &Device {
183        &self.device
184    }
185
186    /// Update RSSI from device
187    pub async fn update_rssi(&self) {
188        if let Ok(Some(rssi)) = self.device.rssi().await {
189            let mut state = self.state.write().await;
190            state.rssi = Some(rssi as i8);
191        }
192    }
193
194    /// Update MTU
195    pub async fn set_mtu(&self, mtu: u16) {
196        let mut state = self.state.write().await;
197        state.mtu = mtu;
198    }
199
200    /// Update PHY
201    pub async fn set_phy(&self, phy: BlePhy) {
202        let mut state = self.state.write().await;
203        state.phy = phy;
204    }
205
206    /// Mark connection as dead
207    pub async fn mark_dead(&self) {
208        let mut state = self.state.write().await;
209        state.alive = false;
210    }
211
212    /// Disconnect from the device
213    ///
214    /// Clears any pending writes and disconnects the BLE connection.
215    pub async fn disconnect(&self) -> Result<()> {
216        // Clear any pending writes first
217        self.clear_write_queue().await;
218
219        self.device
220            .disconnect()
221            .await
222            .map_err(|e| BleError::ConnectionFailed(format!("Failed to disconnect: {}", e)))?;
223        self.mark_dead().await;
224        Ok(())
225    }
226
227    /// Discover GATT services
228    pub async fn discover_services(&self) -> Result<()> {
229        // Trigger service discovery
230        // In bluer, services are discovered automatically on connect
231        // but we can force a refresh
232        let _ = self.device.services().await;
233        Ok(())
234    }
235
236    /// Get GATT services
237    pub async fn services(&self) -> Result<Vec<bluer::gatt::remote::Service>> {
238        self.device
239            .services()
240            .await
241            .map_err(|e| BleError::GattError(format!("Failed to get services: {}", e)))
242    }
243
244    /// Find a service by UUID
245    pub async fn find_service(
246        &self,
247        uuid: uuid::Uuid,
248    ) -> Result<Option<bluer::gatt::remote::Service>> {
249        let services = self.services().await?;
250        for service in services {
251            if service.uuid().await.ok() == Some(uuid) {
252                return Ok(Some(service));
253            }
254        }
255        Ok(None)
256    }
257
258    /// Read a characteristic value
259    pub async fn read_characteristic(
260        &self,
261        service_uuid: uuid::Uuid,
262        char_uuid: uuid::Uuid,
263    ) -> Result<Vec<u8>> {
264        let service = self
265            .find_service(service_uuid)
266            .await?
267            .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
268
269        let characteristics = service
270            .characteristics()
271            .await
272            .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
273
274        for char in characteristics {
275            if char.uuid().await.ok() == Some(char_uuid) {
276                return char.read().await.map_err(|e| {
277                    BleError::GattError(format!("Failed to read characteristic: {}", e))
278                });
279            }
280        }
281
282        Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
283    }
284
285    /// Write a characteristic value (direct, non-queued)
286    ///
287    /// **Warning**: BLE only allows one pending write per connection. Calling this
288    /// method concurrently may cause write failures. Use `write_characteristic_queued`
289    /// for safe concurrent writes.
290    pub async fn write_characteristic(
291        &self,
292        service_uuid: uuid::Uuid,
293        char_uuid: uuid::Uuid,
294        value: &[u8],
295    ) -> Result<()> {
296        let service = self
297            .find_service(service_uuid)
298            .await?
299            .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
300
301        let characteristics = service
302            .characteristics()
303            .await
304            .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
305
306        for char in characteristics {
307            if char.uuid().await.ok() == Some(char_uuid) {
308                return char.write(value).await.map_err(|e| {
309                    BleError::GattError(format!("Failed to write characteristic: {}", e))
310                });
311            }
312        }
313
314        Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
315    }
316
317    /// Write a characteristic value with queuing
318    ///
319    /// BLE only allows one pending write per connection. This method queues writes
320    /// and processes them serially, preventing write conflicts. Safe to call
321    /// concurrently from multiple tasks.
322    ///
323    /// Returns when the write completes (or fails).
324    pub async fn write_characteristic_queued(
325        &self,
326        service_uuid: uuid::Uuid,
327        char_uuid: uuid::Uuid,
328        value: &[u8],
329    ) -> Result<()> {
330        // Create a oneshot channel for completion notification
331        let (tx, rx) = tokio::sync::oneshot::channel();
332
333        // Add to queue
334        {
335            let mut queue_state = self.write_queue.lock().await;
336            queue_state.queue.push_back(QueuedWrite {
337                service_uuid,
338                char_uuid,
339                data: value.to_vec(),
340                complete_tx: tx,
341            });
342            log::debug!(
343                "Queued write to {} ({} bytes, queue depth: {})",
344                char_uuid,
345                value.len(),
346                queue_state.queue.len()
347            );
348        }
349
350        // Try to process the queue (will only proceed if no write in progress)
351        self.process_write_queue().await;
352
353        // Wait for completion
354        rx.await.map_err(|_| {
355            BleError::GattError("Write was cancelled (connection closed?)".to_string())
356        })?
357    }
358
359    /// Process the write queue
360    ///
361    /// Processes queued writes one at a time. Only one write can be in progress
362    /// per connection (BLE limitation).
363    async fn process_write_queue(&self) {
364        loop {
365            // Get the next write from the queue
366            let queued_write = {
367                let mut queue_state = self.write_queue.lock().await;
368
369                // If a write is already in progress, exit
370                if queue_state.write_in_progress {
371                    return;
372                }
373
374                // Get next write from queue
375                match queue_state.queue.pop_front() {
376                    Some(write) => {
377                        queue_state.write_in_progress = true;
378                        write
379                    }
380                    None => return, // Queue empty
381                }
382            };
383
384            // Perform the write (outside the lock)
385            let result = self
386                .write_characteristic(
387                    queued_write.service_uuid,
388                    queued_write.char_uuid,
389                    &queued_write.data,
390                )
391                .await;
392
393            // Mark write as complete
394            {
395                let mut queue_state = self.write_queue.lock().await;
396                queue_state.write_in_progress = false;
397            }
398
399            // Notify the waiter
400            let _ = queued_write.complete_tx.send(result);
401
402            // Continue processing queue (loop will check for more items)
403        }
404    }
405
406    /// Get the current write queue depth
407    ///
408    /// Useful for monitoring backpressure. If the queue grows too large,
409    /// consider slowing down write requests.
410    pub async fn write_queue_depth(&self) -> usize {
411        self.write_queue.lock().await.queue.len()
412    }
413
414    /// Check if a write is currently in progress
415    pub async fn write_in_progress(&self) -> bool {
416        self.write_queue.lock().await.write_in_progress
417    }
418
419    /// Clear the write queue (e.g., on disconnect)
420    ///
421    /// All pending writes will receive an error.
422    pub async fn clear_write_queue(&self) {
423        let mut queue_state = self.write_queue.lock().await;
424        let queue_len = queue_state.queue.len();
425
426        // Drain and notify all waiters of cancellation
427        while let Some(write) = queue_state.queue.pop_front() {
428            let _ = write.complete_tx.send(Err(BleError::GattError(
429                "Write queue cleared (disconnected?)".to_string(),
430            )));
431        }
432
433        if queue_len > 0 {
434            log::debug!("Cleared {} pending writes from queue", queue_len);
435        }
436    }
437
438    /// Subscribe to characteristic notifications
439    pub async fn subscribe_characteristic(
440        &self,
441        service_uuid: uuid::Uuid,
442        char_uuid: uuid::Uuid,
443    ) -> Result<impl tokio_stream::Stream<Item = Vec<u8>>> {
444        let service = self
445            .find_service(service_uuid)
446            .await?
447            .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
448
449        let characteristics = service
450            .characteristics()
451            .await
452            .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
453
454        for char in characteristics {
455            if char.uuid().await.ok() == Some(char_uuid) {
456                return char.notify().await.map_err(|e| {
457                    BleError::GattError(format!("Failed to subscribe to notifications: {}", e))
458                });
459            }
460        }
461
462        Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
463    }
464}
465
466impl BleConnection for BluerConnection {
467    fn peer_id(&self) -> &NodeId {
468        &self.peer_id
469    }
470
471    fn is_alive(&self) -> bool {
472        // Try to read state without blocking
473        if let Ok(state) = self.state.try_read() {
474            state.alive
475        } else {
476            // If we can't get the lock, assume alive
477            true
478        }
479    }
480
481    fn mtu(&self) -> u16 {
482        if let Ok(state) = self.state.try_read() {
483            state.mtu
484        } else {
485            23 // Default BLE MTU
486        }
487    }
488
489    fn phy(&self) -> BlePhy {
490        if let Ok(state) = self.state.try_read() {
491            state.phy
492        } else {
493            BlePhy::Le1M
494        }
495    }
496
497    fn rssi(&self) -> Option<i8> {
498        if let Ok(state) = self.state.try_read() {
499            state.rssi
500        } else {
501            None
502        }
503    }
504
505    fn connected_duration(&self) -> Duration {
506        self.connected_at.elapsed()
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    // Integration tests require actual Bluetooth hardware
513    // and a connected device
514}