hive_btle/platform/linux/
connection.rs1use bluer::Device;
22use std::collections::VecDeque;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25use tokio::sync::{Mutex, RwLock};
26
27use crate::config::BlePhy;
28use crate::error::{BleError, Result};
29use crate::transport::BleConnection;
30use crate::NodeId;
31
32struct QueuedWrite {
34 service_uuid: uuid::Uuid,
36 char_uuid: uuid::Uuid,
38 data: Vec<u8>,
40 complete_tx: tokio::sync::oneshot::Sender<Result<()>>,
42}
43
44struct ConnectionState {
46 alive: bool,
48 mtu: u16,
50 phy: BlePhy,
52 rssi: Option<i8>,
54}
55
56struct WriteQueueState {
58 queue: VecDeque<QueuedWrite>,
60 write_in_progress: bool,
62}
63
64#[derive(Clone)]
70pub struct BluerConnection {
71 peer_id: NodeId,
73 device: Device,
75 state: Arc<RwLock<ConnectionState>>,
77 write_queue: Arc<Mutex<WriteQueueState>>,
79 connected_at: Instant,
81}
82
83const DEFAULT_BLE_MTU: u16 = 185;
87
88#[allow(dead_code)]
90const MIN_BLE_MTU: u16 = 23;
91
92impl BluerConnection {
93 pub(crate) async fn new(peer_id: NodeId, device: Device) -> Result<Self> {
95 let mtu = DEFAULT_BLE_MTU;
99
100 let state = ConnectionState {
101 alive: true,
102 mtu,
103 phy: BlePhy::Le1M, rssi: None,
105 };
106
107 let write_queue = WriteQueueState {
108 queue: VecDeque::new(),
109 write_in_progress: false,
110 };
111
112 let conn = Self {
113 peer_id,
114 device,
115 state: Arc::new(RwLock::new(state)),
116 write_queue: Arc::new(Mutex::new(write_queue)),
117 connected_at: Instant::now(),
118 };
119
120 conn.update_rssi().await;
122
123 Ok(conn)
124 }
125
126 pub async fn discover_mtu(
132 &self,
133 service_uuid: uuid::Uuid,
134 char_uuid: uuid::Uuid,
135 ) -> Result<u16> {
136 let service = self
137 .find_service(service_uuid)
138 .await?
139 .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
140
141 let characteristics = service
142 .characteristics()
143 .await
144 .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
145
146 for char in characteristics {
147 if char.uuid().await.ok() == Some(char_uuid) {
148 match char.write_io().await {
150 Ok(writer) => {
151 let mtu = writer.mtu();
152 self.set_mtu(mtu as u16).await;
153 log::info!("Discovered MTU: {} bytes via {}", mtu, char_uuid);
154 return Ok(mtu as u16);
155 }
156 Err(e) => {
157 log::debug!("Could not acquire write IO for MTU discovery: {}", e);
158 }
160 }
161
162 match char.notify_io().await {
164 Ok(reader) => {
165 let mtu = reader.mtu();
166 self.set_mtu(mtu as u16).await;
167 log::info!("Discovered MTU: {} bytes via notify {}", mtu, char_uuid);
168 return Ok(mtu as u16);
169 }
170 Err(e) => {
171 log::debug!("Could not acquire notify IO for MTU discovery: {}", e);
172 }
173 }
174 }
175 }
176
177 Ok(self.mtu())
179 }
180
181 pub fn device(&self) -> &Device {
183 &self.device
184 }
185
186 pub async fn update_rssi(&self) {
188 if let Ok(Some(rssi)) = self.device.rssi().await {
189 let mut state = self.state.write().await;
190 state.rssi = Some(rssi as i8);
191 }
192 }
193
194 pub async fn set_mtu(&self, mtu: u16) {
196 let mut state = self.state.write().await;
197 state.mtu = mtu;
198 }
199
200 pub async fn set_phy(&self, phy: BlePhy) {
202 let mut state = self.state.write().await;
203 state.phy = phy;
204 }
205
206 pub async fn mark_dead(&self) {
208 let mut state = self.state.write().await;
209 state.alive = false;
210 }
211
212 pub async fn disconnect(&self) -> Result<()> {
216 self.clear_write_queue().await;
218
219 self.device
220 .disconnect()
221 .await
222 .map_err(|e| BleError::ConnectionFailed(format!("Failed to disconnect: {}", e)))?;
223 self.mark_dead().await;
224 Ok(())
225 }
226
227 pub async fn discover_services(&self) -> Result<()> {
229 let _ = self.device.services().await;
233 Ok(())
234 }
235
236 pub async fn services(&self) -> Result<Vec<bluer::gatt::remote::Service>> {
238 self.device
239 .services()
240 .await
241 .map_err(|e| BleError::GattError(format!("Failed to get services: {}", e)))
242 }
243
244 pub async fn find_service(
246 &self,
247 uuid: uuid::Uuid,
248 ) -> Result<Option<bluer::gatt::remote::Service>> {
249 let services = self.services().await?;
250 for service in services {
251 if service.uuid().await.ok() == Some(uuid) {
252 return Ok(Some(service));
253 }
254 }
255 Ok(None)
256 }
257
258 pub async fn read_characteristic(
260 &self,
261 service_uuid: uuid::Uuid,
262 char_uuid: uuid::Uuid,
263 ) -> Result<Vec<u8>> {
264 let service = self
265 .find_service(service_uuid)
266 .await?
267 .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
268
269 let characteristics = service
270 .characteristics()
271 .await
272 .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
273
274 for char in characteristics {
275 if char.uuid().await.ok() == Some(char_uuid) {
276 return char.read().await.map_err(|e| {
277 BleError::GattError(format!("Failed to read characteristic: {}", e))
278 });
279 }
280 }
281
282 Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
283 }
284
285 pub async fn write_characteristic(
291 &self,
292 service_uuid: uuid::Uuid,
293 char_uuid: uuid::Uuid,
294 value: &[u8],
295 ) -> Result<()> {
296 let service = self
297 .find_service(service_uuid)
298 .await?
299 .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
300
301 let characteristics = service
302 .characteristics()
303 .await
304 .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
305
306 for char in characteristics {
307 if char.uuid().await.ok() == Some(char_uuid) {
308 return char.write(value).await.map_err(|e| {
309 BleError::GattError(format!("Failed to write characteristic: {}", e))
310 });
311 }
312 }
313
314 Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
315 }
316
317 pub async fn write_characteristic_queued(
325 &self,
326 service_uuid: uuid::Uuid,
327 char_uuid: uuid::Uuid,
328 value: &[u8],
329 ) -> Result<()> {
330 let (tx, rx) = tokio::sync::oneshot::channel();
332
333 {
335 let mut queue_state = self.write_queue.lock().await;
336 queue_state.queue.push_back(QueuedWrite {
337 service_uuid,
338 char_uuid,
339 data: value.to_vec(),
340 complete_tx: tx,
341 });
342 log::debug!(
343 "Queued write to {} ({} bytes, queue depth: {})",
344 char_uuid,
345 value.len(),
346 queue_state.queue.len()
347 );
348 }
349
350 self.process_write_queue().await;
352
353 rx.await.map_err(|_| {
355 BleError::GattError("Write was cancelled (connection closed?)".to_string())
356 })?
357 }
358
359 async fn process_write_queue(&self) {
364 loop {
365 let queued_write = {
367 let mut queue_state = self.write_queue.lock().await;
368
369 if queue_state.write_in_progress {
371 return;
372 }
373
374 match queue_state.queue.pop_front() {
376 Some(write) => {
377 queue_state.write_in_progress = true;
378 write
379 }
380 None => return, }
382 };
383
384 let result = self
386 .write_characteristic(
387 queued_write.service_uuid,
388 queued_write.char_uuid,
389 &queued_write.data,
390 )
391 .await;
392
393 {
395 let mut queue_state = self.write_queue.lock().await;
396 queue_state.write_in_progress = false;
397 }
398
399 let _ = queued_write.complete_tx.send(result);
401
402 }
404 }
405
406 pub async fn write_queue_depth(&self) -> usize {
411 self.write_queue.lock().await.queue.len()
412 }
413
414 pub async fn write_in_progress(&self) -> bool {
416 self.write_queue.lock().await.write_in_progress
417 }
418
419 pub async fn clear_write_queue(&self) {
423 let mut queue_state = self.write_queue.lock().await;
424 let queue_len = queue_state.queue.len();
425
426 while let Some(write) = queue_state.queue.pop_front() {
428 let _ = write.complete_tx.send(Err(BleError::GattError(
429 "Write queue cleared (disconnected?)".to_string(),
430 )));
431 }
432
433 if queue_len > 0 {
434 log::debug!("Cleared {} pending writes from queue", queue_len);
435 }
436 }
437
438 pub async fn subscribe_characteristic(
440 &self,
441 service_uuid: uuid::Uuid,
442 char_uuid: uuid::Uuid,
443 ) -> Result<impl tokio_stream::Stream<Item = Vec<u8>>> {
444 let service = self
445 .find_service(service_uuid)
446 .await?
447 .ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
448
449 let characteristics = service
450 .characteristics()
451 .await
452 .map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
453
454 for char in characteristics {
455 if char.uuid().await.ok() == Some(char_uuid) {
456 return char.notify().await.map_err(|e| {
457 BleError::GattError(format!("Failed to subscribe to notifications: {}", e))
458 });
459 }
460 }
461
462 Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
463 }
464}
465
466impl BleConnection for BluerConnection {
467 fn peer_id(&self) -> &NodeId {
468 &self.peer_id
469 }
470
471 fn is_alive(&self) -> bool {
472 if let Ok(state) = self.state.try_read() {
474 state.alive
475 } else {
476 true
478 }
479 }
480
481 fn mtu(&self) -> u16 {
482 if let Ok(state) = self.state.try_read() {
483 state.mtu
484 } else {
485 23 }
487 }
488
489 fn phy(&self) -> BlePhy {
490 if let Ok(state) = self.state.try_read() {
491 state.phy
492 } else {
493 BlePhy::Le1M
494 }
495 }
496
497 fn rssi(&self) -> Option<i8> {
498 if let Ok(state) = self.state.try_read() {
499 state.rssi
500 } else {
501 None
502 }
503 }
504
505 fn connected_duration(&self) -> Duration {
506 self.connected_at.elapsed()
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 }