use crate::setup::RUNTIME;
use crate::{BleAddress, BleDevice, BleError};
use btleplug::api::CentralEvent;
use btleplug::api::{
Central, Characteristic, Manager as _, Peripheral as _, ScanFilter, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::{Stream, StreamExt};
use log::debug;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio::task::AbortHandle;
use tokio::time::sleep;
use uuid::Uuid;
struct Listener {
uuid: Uuid,
callback: Arc<dyn Fn(&[u8]) + Send + Sync>,
}
pub struct BleHandler {
connected: Option<Arc<Peripheral>>,
characs: Vec<Characteristic>,
devices: HashMap<BleAddress, Peripheral>,
adapter: Adapter,
notify_abort: Option<AbortHandle>,
notify_listeners: Arc<Mutex<Vec<Listener>>>,
on_disconnect: Option<Mutex<Box<dyn Fn() + Send>>>,
}
impl BleHandler {
pub async fn new() -> Result<Self, BleError> {
let manager = Manager::new().await?;
let adapters = manager.adapters().await?;
let central = adapters.into_iter().next().ok_or(BleError::NoAdapters)?;
Ok(Self {
devices: HashMap::new(),
characs: vec![],
connected: None,
adapter: central,
notify_abort: None,
notify_listeners: Arc::new(Mutex::new(vec![])),
on_disconnect: None,
})
}
pub async fn connect(
&mut self,
address: BleAddress,
service: Uuid,
characs: Vec<Uuid>,
on_disconnect: Option<impl Fn() + Send + 'static>,
) -> Result<(), BleError> {
if self.devices.len() == 0 {
self.discover(None, 1000).await?;
}
self.connect_device(address).await?;
self.connect_service(service, &characs).await?;
if let Some(cb) = on_disconnect {
self.on_disconnect = Some(Mutex::new(Box::new(cb)));
}
let rt = RUNTIME.get().ok_or(BleError::RuntimeNotInitialized)?;
let listen_handle = rt.spawn(listen_notify(
self.get_device().await?,
self.notify_listeners.clone(),
));
self.notify_abort = Some(listen_handle.abort_handle());
Ok(())
}
async fn connect_service(&mut self, service: Uuid, characs: &[Uuid]) -> Result<(), BleError> {
let device = self.get_device().await?;
device.discover_services().await?;
let services = device.services();
let s = services
.iter()
.find(|s| s.uuid == service)
.ok_or(BleError::ServiceNotFound)?;
for c in &s.characteristics {
if characs.contains(&c.uuid) {
self.characs.push(c.clone());
}
}
Ok(())
}
async fn connect_device(&mut self, address: BleAddress) -> Result<(), BleError> {
debug!("connecting to {address}",);
if let Some(dev) = self.connected.clone() {
if address == dev.address() {
return Err(BleError::AlreadyConnected.into());
}
}
let device = self
.devices
.get(&address)
.ok_or(BleError::UnknownPeripheral(address.to_string()))?;
if !device.is_connected().await? {
debug!("Connecting to device");
device.connect().await?;
debug!("Connecting done");
}
self.connected = Some(Arc::new(device.clone()));
Ok(())
}
pub async fn disconnect(&mut self) -> Result<(), BleError> {
debug!("disconnecting");
if let Some(notify) = self.notify_abort.as_ref() {
notify.abort();
self.notify_abort = None;
}
*self.notify_listeners.lock().await = vec![];
if let Some(dev) = self.connected.as_mut() {
if let Ok(true) = dev.is_connected().await {
dev.disconnect().await?;
}
self.connected = None;
}
if let Some(on_disconnect) = &self.on_disconnect {
let callback = on_disconnect.lock().await;
callback();
}
self.characs.clear();
self.devices.clear();
Ok(())
}
pub async fn discover(
&mut self,
tx: Option<mpsc::Sender<Vec<BleDevice>>>,
timeout: u64,
) -> Result<Vec<BleDevice>, BleError> {
self.adapter
.start_scan(ScanFilter {
services: vec![],
})
.await?;
self.devices.clear();
let loops = (timeout as f64 / 200.0).round() as u64;
let mut devices = vec![];
for _ in 0..loops {
sleep(Duration::from_millis(200)).await;
let discovered = self.adapter.peripherals().await?;
devices = self.add_devices(discovered).await;
if !devices.is_empty() {
if let Some(tx) = &tx {
tx.send(devices.clone())
.await
.map_err(|e| BleError::SendingDevices(e))?;
}
}
}
self.adapter.stop_scan().await?;
Ok(devices)
}
async fn add_devices(&mut self, discovered: Vec<Peripheral>) -> Vec<BleDevice> {
let mut devices = vec![];
for p in discovered {
if let Ok(dev) = BleDevice::from_peripheral(&p).await {
self.devices.insert(dev.address.clone(), p);
devices.push(dev);
}
}
devices.sort();
devices
}
pub async fn send_data(&mut self, c: Uuid, data: &[u8]) -> Result<(), BleError> {
let dev = self.get_device().await?;
let charac = self.get_charac(c)?;
dev.write(charac, &data, WriteType::WithoutResponse).await?;
Ok(())
}
pub async fn recv_data(&mut self, c: Uuid) -> Result<Vec<u8>, BleError> {
let dev = self.get_device().await?;
let charac = self.get_charac(c)?;
let data = dev.read(charac).await?;
Ok(data)
}
fn get_charac(&self, uuid: Uuid) -> Result<&Characteristic, BleError> {
let charac = self.characs.iter().find(|c| c.uuid == uuid);
charac.ok_or(BleError::CharacNotAvailable(uuid.to_string()).into())
}
async fn get_device(&mut self) -> Result<Arc<Peripheral>, BleError> {
let dev = self.connected.as_ref().ok_or(BleError::NoDeviceConnected)?;
if !dev.is_connected().await? {
self.disconnect().await?;
return Err(BleError::NoDeviceConnected.into());
} else {
return Ok(dev.clone());
}
}
pub async fn check_connected(&self) -> Result<bool, BleError> {
let mut connected = false;
if let Some(dev) = self.connected.as_ref() {
connected = dev.is_connected().await?;
}
Ok(connected)
}
pub async fn subscribe(
&mut self,
c: Uuid,
callback: impl Fn(&[u8]) + Send + Sync + 'static,
) -> Result<(), BleError> {
let dev = self.get_device().await?;
let charac = self.get_charac(c)?;
dev.subscribe(charac).await?;
self.notify_listeners.lock().await.push(Listener {
uuid: charac.uuid,
callback: Arc::new(callback),
});
Ok(())
}
pub(super) async fn get_event_stream(
&self,
) -> Result<Pin<Box<dyn Stream<Item = CentralEvent> + Send>>, BleError> {
let events = self.adapter.events().await?;
Ok(events)
}
pub async fn handle_event(&mut self, event: CentralEvent) -> Result<(), BleError> {
match event {
CentralEvent::DeviceDisconnected(_) => self.disconnect().await,
_ => Ok(()),
}
}
pub async fn connected_device(&self) -> Result<BleDevice, BleError> {
let p = self.connected.as_ref().ok_or(BleError::NoDeviceConnected)?;
let d = BleDevice::from_peripheral(&p).await?;
Ok(d)
}
}
async fn listen_notify(
dev: Arc<Peripheral>,
listeners: Arc<Mutex<Vec<Listener>>>,
) -> Result<(), BleError> {
let mut stream = dev.notifications().await?;
let rt = RUNTIME.get().ok_or(BleError::RuntimeNotInitialized)?;
while let Some(data) = stream.next().await {
for l in listeners.lock().await.iter() {
if l.uuid == data.uuid {
let data = data.value.clone();
let cb = l.callback.clone();
rt.spawn_blocking(move || cb(&data));
}
}
}
Ok(())
}