bluetooth_core 0.0.1

Cross-platform Bluetooth LE (btleplug wrapper) with a small C ABI.
Documentation
//! BLE session: runtime, adapter, background event pump, and peripheral ops.

use std::collections::{HashMap, VecDeque};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use btleplug::api::{
    Central, CentralEvent, Characteristic, Manager as _, Peripheral as _, ScanFilter, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral, PeripheralId};
use futures::StreamExt;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use uuid::Uuid;

use crate::error::Error;
use crate::model::{BleDevice, BleEvent, BleService};

type Result<T> = std::result::Result<T, Error>;

const CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
const OP_TIMEOUT: Duration = Duration::from_secs(15);

/// A live BLE session. Created by `bt_session_new`; freed by `bt_session_free`.
pub struct Session {
    rt: Runtime,
    adapter: Adapter,
    /// Queued events.
    events: Arc<Mutex<VecDeque<BleEvent>>>,
    /// Discovered peripherals by id.
    peripherals: Arc<Mutex<HashMap<String, Peripheral>>>,
    /// Background event-drain task.
    pump: Mutex<Option<JoinHandle<()>>>,
    /// Active notification tasks.
    notif_tasks: Mutex<Vec<JoinHandle<()>>>,
}

impl Session {
    /// Opens the first available adapter and starts pumping its events.
    pub fn new() -> Result<Session> {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(2)
            .enable_all()
            .build()
            .map_err(|e| Error::Internal(format!("failed to start runtime: {e}")))?;

        let adapter = rt.block_on(async {
            let manager = Manager::new()
                .await
                .map_err(|e| Error::Btleplug(format!("failed to create BLE manager: {e}")))?;
            let adapters = manager
                .adapters()
                .await
                .map_err(|e| Error::Btleplug(format!("failed to list adapters: {e}")))?;
            adapters.into_iter().next().ok_or(Error::NoAdapter)
        })?;

        let events: Arc<Mutex<VecDeque<BleEvent>>> = Arc::new(Mutex::new(VecDeque::new()));
        let peripherals: Arc<Mutex<HashMap<String, Peripheral>>> =
            Arc::new(Mutex::new(HashMap::new()));
        let pump = Self::start_pump(&rt, &adapter, events.clone(), peripherals.clone())?;

        Ok(Session {
            rt,
            adapter,
            events,
            peripherals,
            pump: Mutex::new(Some(pump)),
            notif_tasks: Mutex::new(Vec::new()),
        })
    }

    /// Starts the background event-drain loop.
    fn start_pump(
        rt: &Runtime,
        adapter: &Adapter,
        queue: Arc<Mutex<VecDeque<BleEvent>>>,
        peripherals: Arc<Mutex<HashMap<String, Peripheral>>>,
    ) -> Result<JoinHandle<()>> {
        let adapter = adapter.clone();
        let stream = rt
            .block_on(async { adapter.events().await })
            .map_err(|e| Error::Btleplug(format!("failed to open event stream: {e}")))?;
        Ok(rt.spawn(async move {
            let mut stream = stream;
            while let Some(event) = stream.next().await {
                if let Some(id) = event_peripheral_id(&event) {
                    if let Ok(p) = adapter.peripheral(&id).await {
                        peripherals.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).insert(id.to_string(), p);
                    }
                }
                let translated = translate_event(&adapter, event).await;
                if let Some(ev) = translated {
                    let mut q = queue.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
                    if q.len() >= 4096 {
                        q.pop_front();
                    }
                    q.push_back(ev);
                }
            }
        }))
    }

    /// Starts scanning for peripherals.
    pub fn start_scan(&self, service_uuids: Option<&[Uuid]>) -> Result<()> {
        let filter = match service_uuids {
            Some(uuids) => ScanFilter {
                services: uuids.to_vec(),
            },
            None => ScanFilter::default(),
        };
        self.rt
            .block_on(async { self.adapter.start_scan(filter).await })
            .map_err(|e| Error::Btleplug(format!("failed to start scan: {e}")))
    }

    /// Stops scanning.
    pub fn stop_scan(&self) -> Result<()> {
        self.rt
            .block_on(async { self.adapter.stop_scan().await })
            .map_err(|e| Error::Btleplug(format!("failed to stop scan: {e}")))
    }

    /// Pops the next queued event, or `None` if the queue is empty.
    pub fn poll_event(&self) -> Option<BleEvent> {
        self.events.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).pop_front()
    }

    fn peripheral(&self, id: &str) -> Result<Peripheral> {
        self.peripherals
            .lock()
            .unwrap()
            .get(id)
            .cloned()
            .ok_or_else(|| {
                Error::NotFound(format!(
                    "unknown peripheral {id} (scan to discover it first)"
                ))
            })
    }

    /// Connects to a peripheral by id.
    pub fn connect(&self, id: &str) -> Result<()> {
        let p = self.peripheral(id)?;
        self.rt.block_on(async {
            match timeout(CONNECT_TIMEOUT, p.connect()).await {
                Ok(r) => r.map_err(|e| Error::Btleplug(format!("failed to connect to {id}: {e}"))),
                Err(_) => Err(Error::Timeout(format!(
                    "connect to {id} timed out after {}s",
                    CONNECT_TIMEOUT.as_secs()
                ))),
            }
        })
    }

    /// Disconnects from a peripheral by id.
    pub fn disconnect(&self, id: &str) -> Result<()> {
        let p = self.peripheral(id)?;
        self.rt.block_on(async {
            match timeout(OP_TIMEOUT, p.disconnect()).await {
                Ok(r) => r.map_err(|e| Error::Btleplug(format!("failed to disconnect from {id}: {e}"))),
                Err(_) => Err(Error::Timeout(format!("disconnect from {id} timed out"))),
            }
        })
    }

    /// Discovers services and characteristics on a connected peripheral.
    pub fn discover_services(&self, id: &str) -> Result<Vec<BleService>> {
        let p = self.peripheral(id)?;
        self.rt.block_on(async {
            let work = async {
                p.discover_services()
                    .await
                    .map_err(|e| Error::Btleplug(e.to_string()))?;
                Ok::<_, Error>(
                    p.services()
                        .iter()
                        .map(BleService::from)
                        .collect::<Vec<_>>(),
                )
            };
            match timeout(OP_TIMEOUT, work).await {
                Ok(r) => r,
                Err(_) => Err(Error::Timeout("discover services timed out".to_string())),
            }
        })
    }

    /// Reads a characteristic's current value.
    pub fn read(&self, id: &str, char_uuid: &str) -> Result<Vec<u8>> {
        let p = self.peripheral(id)?;
        self.rt.block_on(async {
            let ch = find_characteristic(&p, char_uuid)?;
            match timeout(OP_TIMEOUT, p.read(&ch)).await {
                Ok(r) => r.map_err(|e| Error::Btleplug(format!("read failed: {e}"))),
                Err(_) => Err(Error::Timeout("read timed out".to_string())),
            }
        })
    }

    /// Writes `data` to a characteristic. `with_response` selects the write type.
    pub fn write(&self, id: &str, char_uuid: &str, data: &[u8], with_response: bool) -> Result<()> {
        let p = self.peripheral(id)?;
        let kind = if with_response {
            WriteType::WithResponse
        } else {
            WriteType::WithoutResponse
        };
        self.rt.block_on(async {
            let ch = find_characteristic(&p, char_uuid)?;
            match timeout(OP_TIMEOUT, p.write(&ch, data, kind)).await {
                Ok(r) => r.map_err(|e| Error::Btleplug(format!("write failed: {e}"))),
                Err(_) => Err(Error::Timeout("write timed out".to_string())),
            }
        })
    }

    /// Subscribes to a characteristic; notifications arrive as queued events.
    pub fn subscribe(&self, id: &str, char_uuid: &str) -> Result<()> {
        let p = self.peripheral(id)?;
        let queue = self.events.clone();
        let dev_id = id.to_string();
        let task = self.rt.block_on(async {
            let ch = find_characteristic(&p, char_uuid)?;
            let setup = async {
                p.subscribe(&ch)
                    .await
                    .map_err(|e| Error::Btleplug(format!("subscribe failed: {e}")))?;
                p.notifications()
                    .await
                    .map_err(|e| Error::Btleplug(format!("notifications stream failed: {e}")))
            };
            let stream = match timeout(OP_TIMEOUT, setup).await {
                Ok(r) => r?,
                Err(_) => return Err(Error::Timeout("subscribe timed out".to_string())),
            };
            Ok::<_, Error>(tokio::spawn(async move {
                let mut stream = stream;
                while let Some(n) = stream.next().await {
                    let mut q = queue.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
                    if q.len() >= 4096 {
                        q.pop_front();
                    }
                    q.push_back(BleEvent::Notification {
                        id: dev_id.clone(),
                        characteristic: n.uuid.to_string(),
                        value: n.value,
                    });
                }
            }))
        })?;
        self.notif_tasks.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).push(task);
        Ok(())
    }

    /// Unsubscribes from a characteristic.
    pub fn unsubscribe(&self, id: &str, char_uuid: &str) -> Result<()> {
        let p = self.peripheral(id)?;
        self.rt.block_on(async {
            let ch = find_characteristic(&p, char_uuid)?;
            match timeout(OP_TIMEOUT, p.unsubscribe(&ch)).await {
                Ok(r) => r.map_err(|e| Error::Btleplug(format!("unsubscribe failed: {e}"))),
                Err(_) => Err(Error::Timeout("unsubscribe timed out".to_string())),
            }
        })
    }
}

impl Drop for Session {
    fn drop(&mut self) {
        if let Some(pump) = self.pump.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).take() {
            pump.abort();
        }
        for task in self.notif_tasks.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).drain(..) {
            task.abort();
        }
    }
}

/// Finds a characteristic by UUID on a discovered peripheral.
fn find_characteristic(p: &Peripheral, char_uuid: &str) -> Result<Characteristic> {
    let target =
        Uuid::from_str(char_uuid).map_err(|e| Error::InvalidUuid(format!("{char_uuid}: {e}")))?;
    p.characteristics()
        .into_iter()
        .find(|c| c.uuid == target)
        .ok_or_else(|| {
            Error::NotFound(format!(
                "characteristic {char_uuid} not found (discover services first)"
            ))
        })
}

/// Translates a CentralEvent into a BleEvent.
async fn translate_event(adapter: &Adapter, event: CentralEvent) -> Option<BleEvent> {
    match event {
        CentralEvent::DeviceDiscovered(id)
        | CentralEvent::DeviceUpdated(id)
        | CentralEvent::DeviceServicesModified(id)
        | CentralEvent::ManufacturerDataAdvertisement { id, .. }
        | CentralEvent::ServiceDataAdvertisement { id, .. }
        | CentralEvent::ServicesAdvertisement { id, .. }
        | CentralEvent::RssiUpdate { id, .. } => Some(device_event(adapter, id).await),
        CentralEvent::DeviceConnected(id) => Some(BleEvent::Connected { id: id.to_string() }),
        CentralEvent::DeviceDisconnected(id) => {
            Some(BleEvent::Disconnected { id: id.to_string() })
        }
        CentralEvent::StateUpdate(state) => Some(BleEvent::StateUpdate {
            state: format!("{state:?}").to_lowercase(),
        }),
    }
}

/// Extracts the peripheral id from an event, if any.
fn event_peripheral_id(event: &CentralEvent) -> Option<PeripheralId> {
    match event {
        CentralEvent::DeviceDiscovered(id)
        | CentralEvent::DeviceUpdated(id)
        | CentralEvent::DeviceServicesModified(id)
        | CentralEvent::DeviceConnected(id)
        | CentralEvent::DeviceDisconnected(id)
        | CentralEvent::ManufacturerDataAdvertisement { id, .. }
        | CentralEvent::ServiceDataAdvertisement { id, .. }
        | CentralEvent::ServicesAdvertisement { id, .. }
        | CentralEvent::RssiUpdate { id, .. } => Some(id.clone()),
        CentralEvent::StateUpdate(_) => None,
    }
}

/// Builds a Device event from the peripheral's current properties.
async fn device_event(adapter: &Adapter, id: PeripheralId) -> BleEvent {
    let id_str = id.to_string();
    let (props, connected) = match adapter.peripheral(&id).await {
        Ok(p) => {
            let props = p.properties().await.ok().flatten();
            let connected = p.is_connected().await.unwrap_or(false);
            (props, connected)
        }
        Err(_) => (None, false),
    };
    BleEvent::Device {
        device: BleDevice::from_properties(id_str, props, connected),
    }
}