use bluer::Device;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, RwLock};
use crate::config::BlePhy;
use crate::error::{BleError, Result};
use crate::transport::BleConnection;
use crate::NodeId;
struct QueuedWrite {
service_uuid: uuid::Uuid,
char_uuid: uuid::Uuid,
data: Vec<u8>,
complete_tx: tokio::sync::oneshot::Sender<Result<()>>,
}
struct ConnectionState {
alive: bool,
mtu: u16,
phy: BlePhy,
rssi: Option<i8>,
}
struct WriteQueueState {
queue: VecDeque<QueuedWrite>,
write_in_progress: bool,
}
#[derive(Clone)]
pub struct BluerConnection {
peer_id: NodeId,
device: Device,
state: Arc<RwLock<ConnectionState>>,
write_queue: Arc<Mutex<WriteQueueState>>,
connected_at: Instant,
}
const DEFAULT_BLE_MTU: u16 = 185;
#[allow(dead_code)]
const MIN_BLE_MTU: u16 = 23;
impl BluerConnection {
pub(crate) async fn new(peer_id: NodeId, device: Device) -> Result<Self> {
let mtu = DEFAULT_BLE_MTU;
let state = ConnectionState {
alive: true,
mtu,
phy: BlePhy::Le1M, rssi: None,
};
let write_queue = WriteQueueState {
queue: VecDeque::new(),
write_in_progress: false,
};
let conn = Self {
peer_id,
device,
state: Arc::new(RwLock::new(state)),
write_queue: Arc::new(Mutex::new(write_queue)),
connected_at: Instant::now(),
};
conn.update_rssi().await;
Ok(conn)
}
pub async fn discover_mtu(
&self,
service_uuid: uuid::Uuid,
char_uuid: uuid::Uuid,
) -> Result<u16> {
let service = self
.find_service(service_uuid)
.await?
.ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
let characteristics = service
.characteristics()
.await
.map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
for char in characteristics {
if char.uuid().await.ok() == Some(char_uuid) {
match char.write_io().await {
Ok(writer) => {
let mtu = writer.mtu();
self.set_mtu(mtu as u16).await;
log::info!("Discovered MTU: {} bytes via {}", mtu, char_uuid);
return Ok(mtu as u16);
}
Err(e) => {
log::debug!("Could not acquire write IO for MTU discovery: {}", e);
}
}
match char.notify_io().await {
Ok(reader) => {
let mtu = reader.mtu();
self.set_mtu(mtu as u16).await;
log::info!("Discovered MTU: {} bytes via notify {}", mtu, char_uuid);
return Ok(mtu as u16);
}
Err(e) => {
log::debug!("Could not acquire notify IO for MTU discovery: {}", e);
}
}
}
}
Ok(self.mtu())
}
pub fn device(&self) -> &Device {
&self.device
}
pub async fn update_rssi(&self) {
if let Ok(Some(rssi)) = self.device.rssi().await {
let mut state = self.state.write().await;
state.rssi = Some(rssi as i8);
}
}
pub async fn set_mtu(&self, mtu: u16) {
let mut state = self.state.write().await;
state.mtu = mtu;
}
pub async fn set_phy(&self, phy: BlePhy) {
let mut state = self.state.write().await;
state.phy = phy;
}
pub async fn mark_dead(&self) {
let mut state = self.state.write().await;
state.alive = false;
}
pub async fn disconnect(&self) -> Result<()> {
self.clear_write_queue().await;
self.device
.disconnect()
.await
.map_err(|e| BleError::ConnectionFailed(format!("Failed to disconnect: {}", e)))?;
self.mark_dead().await;
Ok(())
}
pub async fn discover_services(&self) -> Result<()> {
let _ = self.device.services().await;
Ok(())
}
pub async fn services(&self) -> Result<Vec<bluer::gatt::remote::Service>> {
self.device
.services()
.await
.map_err(|e| BleError::GattError(format!("Failed to get services: {}", e)))
}
pub async fn find_service(
&self,
uuid: uuid::Uuid,
) -> Result<Option<bluer::gatt::remote::Service>> {
let services = self.services().await?;
for service in services {
if service.uuid().await.ok() == Some(uuid) {
return Ok(Some(service));
}
}
Ok(None)
}
pub async fn read_characteristic(
&self,
service_uuid: uuid::Uuid,
char_uuid: uuid::Uuid,
) -> Result<Vec<u8>> {
let service = self
.find_service(service_uuid)
.await?
.ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
let characteristics = service
.characteristics()
.await
.map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
for char in characteristics {
if char.uuid().await.ok() == Some(char_uuid) {
return char.read().await.map_err(|e| {
BleError::GattError(format!("Failed to read characteristic: {}", e))
});
}
}
Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
}
pub async fn write_characteristic(
&self,
service_uuid: uuid::Uuid,
char_uuid: uuid::Uuid,
value: &[u8],
) -> Result<()> {
let service = self
.find_service(service_uuid)
.await?
.ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
let characteristics = service
.characteristics()
.await
.map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
for char in characteristics {
if char.uuid().await.ok() == Some(char_uuid) {
return char.write(value).await.map_err(|e| {
BleError::GattError(format!("Failed to write characteristic: {}", e))
});
}
}
Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
}
pub async fn write_characteristic_queued(
&self,
service_uuid: uuid::Uuid,
char_uuid: uuid::Uuid,
value: &[u8],
) -> Result<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
{
let mut queue_state = self.write_queue.lock().await;
queue_state.queue.push_back(QueuedWrite {
service_uuid,
char_uuid,
data: value.to_vec(),
complete_tx: tx,
});
log::debug!(
"Queued write to {} ({} bytes, queue depth: {})",
char_uuid,
value.len(),
queue_state.queue.len()
);
}
self.process_write_queue().await;
rx.await.map_err(|_| {
BleError::GattError("Write was cancelled (connection closed?)".to_string())
})?
}
async fn process_write_queue(&self) {
loop {
let queued_write = {
let mut queue_state = self.write_queue.lock().await;
if queue_state.write_in_progress {
return;
}
match queue_state.queue.pop_front() {
Some(write) => {
queue_state.write_in_progress = true;
write
}
None => return, }
};
let result = self
.write_characteristic(
queued_write.service_uuid,
queued_write.char_uuid,
&queued_write.data,
)
.await;
{
let mut queue_state = self.write_queue.lock().await;
queue_state.write_in_progress = false;
}
let _ = queued_write.complete_tx.send(result);
}
}
pub async fn write_queue_depth(&self) -> usize {
self.write_queue.lock().await.queue.len()
}
pub async fn write_in_progress(&self) -> bool {
self.write_queue.lock().await.write_in_progress
}
pub async fn clear_write_queue(&self) {
let mut queue_state = self.write_queue.lock().await;
let queue_len = queue_state.queue.len();
while let Some(write) = queue_state.queue.pop_front() {
let _ = write.complete_tx.send(Err(BleError::GattError(
"Write queue cleared (disconnected?)".to_string(),
)));
}
if queue_len > 0 {
log::debug!("Cleared {} pending writes from queue", queue_len);
}
}
pub async fn subscribe_characteristic(
&self,
service_uuid: uuid::Uuid,
char_uuid: uuid::Uuid,
) -> Result<impl tokio_stream::Stream<Item = Vec<u8>>> {
let service = self
.find_service(service_uuid)
.await?
.ok_or_else(|| BleError::ServiceNotFound(service_uuid.to_string()))?;
let characteristics = service
.characteristics()
.await
.map_err(|e| BleError::GattError(format!("Failed to get characteristics: {}", e)))?;
for char in characteristics {
if char.uuid().await.ok() == Some(char_uuid) {
return char.notify().await.map_err(|e| {
BleError::GattError(format!("Failed to subscribe to notifications: {}", e))
});
}
}
Err(BleError::CharacteristicNotFound(char_uuid.to_string()))
}
}
impl BleConnection for BluerConnection {
fn peer_id(&self) -> &NodeId {
&self.peer_id
}
fn is_alive(&self) -> bool {
if let Ok(state) = self.state.try_read() {
state.alive
} else {
true
}
}
fn mtu(&self) -> u16 {
if let Ok(state) = self.state.try_read() {
state.mtu
} else {
23 }
}
fn phy(&self) -> BlePhy {
if let Ok(state) = self.state.try_read() {
state.phy
} else {
BlePhy::Le1M
}
}
fn rssi(&self) -> Option<i8> {
if let Ok(state) = self.state.try_read() {
state.rssi
} else {
None
}
}
fn connected_duration(&self) -> Duration {
self.connected_at.elapsed()
}
}
#[cfg(test)]
mod tests {
}