use std::collections::{HashMap, VecDeque};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use btleplug::api::{
Central, CentralEvent, Characteristic, Manager as _, Peripheral as _, ScanFilter, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral, PeripheralId};
use futures::StreamExt;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use uuid::Uuid;
use crate::error::Error;
use crate::model::{BleDevice, BleEvent, BleService};
type Result<T> = std::result::Result<T, Error>;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
const OP_TIMEOUT: Duration = Duration::from_secs(15);
pub struct Session {
rt: Runtime,
adapter: Adapter,
events: Arc<Mutex<VecDeque<BleEvent>>>,
peripherals: Arc<Mutex<HashMap<String, Peripheral>>>,
pump: Mutex<Option<JoinHandle<()>>>,
notif_tasks: Mutex<Vec<JoinHandle<()>>>,
}
impl Session {
pub fn new() -> Result<Session> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map_err(|e| Error::Internal(format!("failed to start runtime: {e}")))?;
let adapter = rt.block_on(async {
let manager = Manager::new()
.await
.map_err(|e| Error::Btleplug(format!("failed to create BLE manager: {e}")))?;
let adapters = manager
.adapters()
.await
.map_err(|e| Error::Btleplug(format!("failed to list adapters: {e}")))?;
adapters.into_iter().next().ok_or(Error::NoAdapter)
})?;
let events: Arc<Mutex<VecDeque<BleEvent>>> = Arc::new(Mutex::new(VecDeque::new()));
let peripherals: Arc<Mutex<HashMap<String, Peripheral>>> =
Arc::new(Mutex::new(HashMap::new()));
let pump = Self::start_pump(&rt, &adapter, events.clone(), peripherals.clone())?;
Ok(Session {
rt,
adapter,
events,
peripherals,
pump: Mutex::new(Some(pump)),
notif_tasks: Mutex::new(Vec::new()),
})
}
fn start_pump(
rt: &Runtime,
adapter: &Adapter,
queue: Arc<Mutex<VecDeque<BleEvent>>>,
peripherals: Arc<Mutex<HashMap<String, Peripheral>>>,
) -> Result<JoinHandle<()>> {
let adapter = adapter.clone();
let stream = rt
.block_on(async { adapter.events().await })
.map_err(|e| Error::Btleplug(format!("failed to open event stream: {e}")))?;
Ok(rt.spawn(async move {
let mut stream = stream;
while let Some(event) = stream.next().await {
if let Some(id) = event_peripheral_id(&event) {
if let Ok(p) = adapter.peripheral(&id).await {
peripherals.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).insert(id.to_string(), p);
}
}
let translated = translate_event(&adapter, event).await;
if let Some(ev) = translated {
let mut q = queue.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
if q.len() >= 4096 {
q.pop_front();
}
q.push_back(ev);
}
}
}))
}
pub fn start_scan(&self, service_uuids: Option<&[Uuid]>) -> Result<()> {
let filter = match service_uuids {
Some(uuids) => ScanFilter {
services: uuids.to_vec(),
},
None => ScanFilter::default(),
};
self.rt
.block_on(async { self.adapter.start_scan(filter).await })
.map_err(|e| Error::Btleplug(format!("failed to start scan: {e}")))
}
pub fn stop_scan(&self) -> Result<()> {
self.rt
.block_on(async { self.adapter.stop_scan().await })
.map_err(|e| Error::Btleplug(format!("failed to stop scan: {e}")))
}
pub fn poll_event(&self) -> Option<BleEvent> {
self.events.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).pop_front()
}
fn peripheral(&self, id: &str) -> Result<Peripheral> {
self.peripherals
.lock()
.unwrap()
.get(id)
.cloned()
.ok_or_else(|| {
Error::NotFound(format!(
"unknown peripheral {id} (scan to discover it first)"
))
})
}
pub fn connect(&self, id: &str) -> Result<()> {
let p = self.peripheral(id)?;
self.rt.block_on(async {
match timeout(CONNECT_TIMEOUT, p.connect()).await {
Ok(r) => r.map_err(|e| Error::Btleplug(format!("failed to connect to {id}: {e}"))),
Err(_) => Err(Error::Timeout(format!(
"connect to {id} timed out after {}s",
CONNECT_TIMEOUT.as_secs()
))),
}
})
}
pub fn disconnect(&self, id: &str) -> Result<()> {
let p = self.peripheral(id)?;
self.rt.block_on(async {
match timeout(OP_TIMEOUT, p.disconnect()).await {
Ok(r) => r.map_err(|e| Error::Btleplug(format!("failed to disconnect from {id}: {e}"))),
Err(_) => Err(Error::Timeout(format!("disconnect from {id} timed out"))),
}
})
}
pub fn discover_services(&self, id: &str) -> Result<Vec<BleService>> {
let p = self.peripheral(id)?;
self.rt.block_on(async {
let work = async {
p.discover_services()
.await
.map_err(|e| Error::Btleplug(e.to_string()))?;
Ok::<_, Error>(
p.services()
.iter()
.map(BleService::from)
.collect::<Vec<_>>(),
)
};
match timeout(OP_TIMEOUT, work).await {
Ok(r) => r,
Err(_) => Err(Error::Timeout("discover services timed out".to_string())),
}
})
}
pub fn read(&self, id: &str, char_uuid: &str) -> Result<Vec<u8>> {
let p = self.peripheral(id)?;
self.rt.block_on(async {
let ch = find_characteristic(&p, char_uuid)?;
match timeout(OP_TIMEOUT, p.read(&ch)).await {
Ok(r) => r.map_err(|e| Error::Btleplug(format!("read failed: {e}"))),
Err(_) => Err(Error::Timeout("read timed out".to_string())),
}
})
}
pub fn write(&self, id: &str, char_uuid: &str, data: &[u8], with_response: bool) -> Result<()> {
let p = self.peripheral(id)?;
let kind = if with_response {
WriteType::WithResponse
} else {
WriteType::WithoutResponse
};
self.rt.block_on(async {
let ch = find_characteristic(&p, char_uuid)?;
match timeout(OP_TIMEOUT, p.write(&ch, data, kind)).await {
Ok(r) => r.map_err(|e| Error::Btleplug(format!("write failed: {e}"))),
Err(_) => Err(Error::Timeout("write timed out".to_string())),
}
})
}
pub fn subscribe(&self, id: &str, char_uuid: &str) -> Result<()> {
let p = self.peripheral(id)?;
let queue = self.events.clone();
let dev_id = id.to_string();
let task = self.rt.block_on(async {
let ch = find_characteristic(&p, char_uuid)?;
let setup = async {
p.subscribe(&ch)
.await
.map_err(|e| Error::Btleplug(format!("subscribe failed: {e}")))?;
p.notifications()
.await
.map_err(|e| Error::Btleplug(format!("notifications stream failed: {e}")))
};
let stream = match timeout(OP_TIMEOUT, setup).await {
Ok(r) => r?,
Err(_) => return Err(Error::Timeout("subscribe timed out".to_string())),
};
Ok::<_, Error>(tokio::spawn(async move {
let mut stream = stream;
while let Some(n) = stream.next().await {
let mut q = queue.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
if q.len() >= 4096 {
q.pop_front();
}
q.push_back(BleEvent::Notification {
id: dev_id.clone(),
characteristic: n.uuid.to_string(),
value: n.value,
});
}
}))
})?;
self.notif_tasks.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).push(task);
Ok(())
}
pub fn unsubscribe(&self, id: &str, char_uuid: &str) -> Result<()> {
let p = self.peripheral(id)?;
self.rt.block_on(async {
let ch = find_characteristic(&p, char_uuid)?;
match timeout(OP_TIMEOUT, p.unsubscribe(&ch)).await {
Ok(r) => r.map_err(|e| Error::Btleplug(format!("unsubscribe failed: {e}"))),
Err(_) => Err(Error::Timeout("unsubscribe timed out".to_string())),
}
})
}
}
impl Drop for Session {
fn drop(&mut self) {
if let Some(pump) = self.pump.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).take() {
pump.abort();
}
for task in self.notif_tasks.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).drain(..) {
task.abort();
}
}
}
fn find_characteristic(p: &Peripheral, char_uuid: &str) -> Result<Characteristic> {
let target =
Uuid::from_str(char_uuid).map_err(|e| Error::InvalidUuid(format!("{char_uuid}: {e}")))?;
p.characteristics()
.into_iter()
.find(|c| c.uuid == target)
.ok_or_else(|| {
Error::NotFound(format!(
"characteristic {char_uuid} not found (discover services first)"
))
})
}
async fn translate_event(adapter: &Adapter, event: CentralEvent) -> Option<BleEvent> {
match event {
CentralEvent::DeviceDiscovered(id)
| CentralEvent::DeviceUpdated(id)
| CentralEvent::DeviceServicesModified(id)
| CentralEvent::ManufacturerDataAdvertisement { id, .. }
| CentralEvent::ServiceDataAdvertisement { id, .. }
| CentralEvent::ServicesAdvertisement { id, .. }
| CentralEvent::RssiUpdate { id, .. } => Some(device_event(adapter, id).await),
CentralEvent::DeviceConnected(id) => Some(BleEvent::Connected { id: id.to_string() }),
CentralEvent::DeviceDisconnected(id) => {
Some(BleEvent::Disconnected { id: id.to_string() })
}
CentralEvent::StateUpdate(state) => Some(BleEvent::StateUpdate {
state: format!("{state:?}").to_lowercase(),
}),
}
}
fn event_peripheral_id(event: &CentralEvent) -> Option<PeripheralId> {
match event {
CentralEvent::DeviceDiscovered(id)
| CentralEvent::DeviceUpdated(id)
| CentralEvent::DeviceServicesModified(id)
| CentralEvent::DeviceConnected(id)
| CentralEvent::DeviceDisconnected(id)
| CentralEvent::ManufacturerDataAdvertisement { id, .. }
| CentralEvent::ServiceDataAdvertisement { id, .. }
| CentralEvent::ServicesAdvertisement { id, .. }
| CentralEvent::RssiUpdate { id, .. } => Some(id.clone()),
CentralEvent::StateUpdate(_) => None,
}
}
async fn device_event(adapter: &Adapter, id: PeripheralId) -> BleEvent {
let id_str = id.to_string();
let (props, connected) = match adapter.peripheral(&id).await {
Ok(p) => {
let props = p.properties().await.ok().flatten();
let connected = p.is_connected().await.unwrap_or(false);
(props, connected)
}
Err(_) => (None, false),
};
BleEvent::Device {
device: BleDevice::from_properties(id_str, props, connected),
}
}