use std::collections::BTreeSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use anyhow::{anyhow, Result};
use btleplug::api::{
Central, CentralEvent, Characteristic, Manager as _, Peripheral as _, ScanFilter, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::StreamExt;
use log::{debug, info, warn};
use prost::Message;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::parse::{parse_adc, parse_calibration, parse_diagnostics, parse_frame, parse_sensor};
use crate::protocol::*;
use crate::types::{DeviceInfo, MendiEvent, MENDI_FCC_ID};
use crate::wire;
#[derive(Clone, Debug)]
pub struct MendiDevice {
pub name: String,
pub id: String,
pub(crate) peripheral: Peripheral,
pub(crate) adapter: Adapter,
}
#[derive(Debug, Clone)]
pub struct MendiClientConfig {
pub scan_timeout_secs: u64,
pub name_prefix: String,
pub filter_by_service_uuid: bool,
}
impl Default for MendiClientConfig {
fn default() -> Self {
Self {
scan_timeout_secs: 10,
name_prefix: MENDI_NAME_PREFIX.into(),
filter_by_service_uuid: true,
}
}
}
pub struct MendiClient {
config: MendiClientConfig,
}
impl MendiClient {
pub fn new(config: MendiClientConfig) -> Self {
Self { config }
}
async fn get_adapter() -> Result<Adapter> {
let manager = Manager::new().await?;
let adapters = manager.adapters().await?;
adapters
.into_iter()
.next()
.ok_or_else(|| anyhow!("No Bluetooth adapter found"))
}
#[cfg(target_os = "macos")]
async fn wait_for_adapter(adapter: &Adapter) {
use btleplug::api::CentralState;
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
loop {
match adapter.adapter_state().await {
Ok(CentralState::PoweredOn) => {
info!("macOS: adapter is PoweredOn");
break;
}
Ok(state) => {
if tokio::time::Instant::now() >= deadline {
warn!("macOS: adapter still {state:?} after 3s — proceeding");
break;
}
debug!("macOS: adapter state = {state:?}, waiting…");
}
Err(e) => {
warn!("macOS: adapter_state() error: {e}");
break;
}
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
pub async fn scan(&self) -> Result<Vec<MendiDevice>> {
let adapter = Self::get_adapter().await?;
#[cfg(target_os = "macos")]
Self::wait_for_adapter(&adapter).await;
info!("Scanning for Mendi devices ({} s) …", self.config.scan_timeout_secs);
let scan_filter = if self.config.filter_by_service_uuid {
ScanFilter {
services: vec![MENDI_SERVICE_UUID],
}
} else {
ScanFilter::default()
};
adapter.start_scan(scan_filter).await?;
tokio::time::sleep(Duration::from_secs(self.config.scan_timeout_secs)).await;
adapter.stop_scan().await.ok();
let mut found = vec![];
for p in adapter.peripherals().await? {
if let Ok(Some(props)) = p.properties().await {
if let Some(name) = props.local_name {
if name.starts_with(&self.config.name_prefix) {
let id = p.id().to_string();
info!("Found: {name} (id={id})");
found.push(MendiDevice {
name,
id,
peripheral: p,
adapter: adapter.clone(),
});
}
}
}
}
info!("{} device(s) found", found.len());
Ok(found)
}
pub async fn connect_to(
&self,
device: MendiDevice,
) -> Result<(mpsc::Receiver<MendiEvent>, MendiHandle)> {
self.setup(device.peripheral, device.name, device.adapter)
.await
}
pub async fn connect(&self) -> Result<(mpsc::Receiver<MendiEvent>, MendiHandle)> {
let adapter = Self::get_adapter().await?;
#[cfg(target_os = "macos")]
Self::wait_for_adapter(&adapter).await;
info!("Scanning for Mendi device ({} s) …", self.config.scan_timeout_secs);
let scan_filter = if self.config.filter_by_service_uuid {
ScanFilter {
services: vec![MENDI_SERVICE_UUID],
}
} else {
ScanFilter::default()
};
adapter.start_scan(scan_filter).await?;
let peripheral = self
.find_first(&adapter, &self.config.name_prefix, self.config.scan_timeout_secs)
.await?;
adapter.stop_scan().await.ok();
let props = peripheral.properties().await?.unwrap_or_default();
let name = props.local_name.unwrap_or_else(|| "Mendi".into());
info!("Found: {name}");
self.setup(peripheral, name, adapter).await
}
async fn setup(
&self,
peripheral: Peripheral,
device_name: String,
adapter: Adapter,
) -> Result<(mpsc::Receiver<MendiEvent>, MendiHandle)> {
tokio::time::timeout(Duration::from_secs(10), peripheral.connect())
.await
.map_err(|_| anyhow!("BLE connect() timed out after 10 s"))??;
#[cfg(target_os = "linux")]
tokio::time::sleep(Duration::from_millis(600)).await;
tokio::time::timeout(Duration::from_secs(15), peripheral.discover_services())
.await
.map_err(|_| anyhow!("discover_services() timed out after 15 s"))??;
info!("Connected, services discovered: {device_name}");
let chars: BTreeSet<Characteristic> = peripheral.characteristics();
let find_char = |uuid: Uuid| -> Option<Characteristic> {
chars.iter().find(|c| c.uuid == uuid).cloned()
};
let firmware_version = if let Some(c) = find_char(FIRMWARE_REVISION) {
peripheral
.read(&c)
.await
.ok()
.and_then(|v| String::from_utf8(v).ok())
} else {
None
};
let hardware_version = if let Some(c) = find_char(HARDWARE_REVISION) {
peripheral
.read(&c)
.await
.ok()
.and_then(|v| String::from_utf8(v).ok())
} else {
None
};
let device_info = DeviceInfo {
firmware_version,
hardware_version,
id: peripheral.id().to_string(),
name: device_name.clone(),
fcc_id: MENDI_FCC_ID.into(),
};
info!("Device: {device_info}");
if let Some(c) = find_char(FRAME_CHARACTERISTIC) {
peripheral.subscribe(&c).await?;
info!("Subscribed to Frame (0xABB1)");
} else {
warn!("Frame characteristic (0xABB1) not found");
}
if let Some(c) = find_char(ADC_CHARACTERISTIC) {
peripheral.subscribe(&c).await?;
info!("Subscribed to ADC (0xABB4)");
}
if let Some(c) = find_char(CALIBRATION_CHARACTERISTIC) {
peripheral.subscribe(&c).await?;
info!("Subscribed to Calibration (0xABB6)");
}
if let Some(c) = find_char(SENSOR_CHARACTERISTIC) {
peripheral.subscribe(&c).await?;
info!("Subscribed to Sensor (0xABB2)");
}
let (tx, rx) = mpsc::channel::<MendiEvent>(256);
let _ = tx.send(MendiEvent::Connected(device_info)).await;
if let Some(c) = find_char(DIAGNOSTICS_CHARACTERISTIC) {
match peripheral.read(&c).await {
Ok(data) => {
if let Some(diag) = parse_diagnostics(&data) {
info!(
"Diagnostics: imu_ok={}, sensor_ok={}, adc={:?}",
diag.imu_ok,
diag.sensor_ok,
diag.adc.as_ref().map(|a| format!("{}mV", a.voltage_mv))
);
let _ = tx.send(MendiEvent::Diagnostics(diag)).await;
}
}
Err(e) => debug!("Could not read Diagnostics: {e}"),
}
}
let disconnected = Arc::new(AtomicBool::new(false));
let disconnect_tx = tx.clone();
let peripheral_id = peripheral.id();
let disconnected_w = Arc::clone(&disconnected);
tokio::spawn(async move {
match adapter.events().await {
Ok(mut events) => {
while let Some(event) = events.next().await {
if let CentralEvent::DeviceDisconnected(id) = event {
if id == peripheral_id {
info!("Disconnect watcher: device disconnected");
if !disconnected_w.swap(true, Ordering::SeqCst) {
let _ = disconnect_tx.send(MendiEvent::Disconnected).await;
}
break;
}
}
}
}
Err(e) => warn!("Could not subscribe to adapter events: {e}"),
}
});
let peripheral_clone = peripheral.clone();
let disconnected_n = Arc::clone(&disconnected);
tokio::spawn(async move {
let mut notifications = match peripheral_clone.notifications().await {
Ok(n) => n,
Err(e) => {
warn!("Could not get notifications stream: {e}");
return;
}
};
let mut count: u64 = 0;
while let Some(notif) = notifications.next().await {
let data = ¬if.value;
let uuid = notif.uuid;
count += 1;
if count <= 5 || count.is_multiple_of(1000) {
debug!("Notification #{count}: uuid={uuid} len={}", data.len());
}
if uuid == FRAME_CHARACTERISTIC {
match parse_frame(data) {
Some(frame) => {
if tx.try_send(MendiEvent::Frame(frame)).is_err() {
debug!("Frame channel full, dropping frame");
}
}
None => {
debug!("Invalid frame (len={}), skipping", data.len());
}
}
} else if uuid == ADC_CHARACTERISTIC {
if let Some(battery) = parse_adc(data) {
let _ = tx.send(MendiEvent::Battery(battery)).await;
}
} else if uuid == CALIBRATION_CHARACTERISTIC {
if let Some(cal) = parse_calibration(data) {
let _ = tx.send(MendiEvent::Calibration(cal)).await;
}
} else if uuid == SENSOR_CHARACTERISTIC {
if let Some(sensor) = parse_sensor(data) {
let _ = tx.send(MendiEvent::SensorRead(sensor)).await;
}
} else if uuid == IMU_CHARACTERISTIC {
debug!("IMU notification (len={})", data.len());
} else {
debug!("Unknown notification from {uuid}");
}
}
info!("Notification stream ended — device disconnected.");
if !disconnected_n.swap(true, Ordering::SeqCst) {
let _ = tx.send(MendiEvent::Disconnected).await;
}
});
let handle = MendiHandle {
peripheral,
chars: chars.into_iter().collect(),
};
Ok((rx, handle))
}
async fn find_first(
&self,
adapter: &Adapter,
prefix: &str,
timeout_secs: u64,
) -> Result<Peripheral> {
tokio::time::timeout(Duration::from_secs(timeout_secs), async {
loop {
for p in adapter.peripherals().await.unwrap_or_default() {
if let Ok(Some(props)) = p.properties().await {
if let Some(name) = &props.local_name {
if name.starts_with(prefix) {
return p;
}
}
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
})
.await
.map_err(|_| anyhow!("Timed out scanning for Mendi after {timeout_secs} s"))
}
}
pub struct MendiHandle {
peripheral: Peripheral,
chars: Vec<Characteristic>,
}
impl MendiHandle {
fn find_char(&self, uuid: Uuid) -> Option<&Characteristic> {
self.chars.iter().find(|c| c.uuid == uuid)
}
pub async fn write_calibration(
&self,
offset_left: f32,
offset_right: f32,
offset_pulse: f32,
enable_auto_cal: bool,
low_power: bool,
) -> Result<()> {
let char = self
.find_char(CALIBRATION_CHARACTERISTIC)
.ok_or_else(|| anyhow!("Calibration characteristic not found"))?;
let msg = wire::Calibration {
offset_l: offset_left,
offset_r: offset_right,
offset_p: offset_pulse,
enable: enable_auto_cal,
low_power_mode: low_power,
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
Ok(())
}
pub async fn write_sensor_register(&self, address: u32, data: u32) -> Result<()> {
let char = self
.find_char(SENSOR_CHARACTERISTIC)
.ok_or_else(|| anyhow!("Sensor characteristic not found"))?;
let msg = wire::Sensor {
read: false,
address,
data,
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
Ok(())
}
pub async fn read_sensor_register(&self, address: u32) -> Result<()> {
let char = self
.find_char(SENSOR_CHARACTERISTIC)
.ok_or_else(|| anyhow!("Sensor characteristic not found"))?;
let msg = wire::Sensor {
read: true,
address,
data: 0,
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
Ok(())
}
pub async fn write_imu_register(&self, address: u32, data: &[u8]) -> Result<()> {
let char = self
.find_char(IMU_CHARACTERISTIC)
.ok_or_else(|| anyhow!("IMU characteristic not found"))?;
let msg = wire::Imu {
read: 0,
address,
data: data.to_vec(),
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
Ok(())
}
pub async fn read_imu_register(&self, address: u32, num_bytes: u32) -> Result<()> {
let char = self
.find_char(IMU_CHARACTERISTIC)
.ok_or_else(|| anyhow!("IMU characteristic not found"))?;
let msg = wire::Imu {
read: num_bytes,
address,
data: vec![],
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
Ok(())
}
pub async fn enable_sensor(&self) -> Result<()> {
let char = self
.find_char(SENSOR_CHARACTERISTIC)
.ok_or_else(|| anyhow!("Sensor characteristic not found"))?;
let msg = wire::Sensor {
read: true,
address: 0,
data: 0,
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
info!("Sensor enabled");
Ok(())
}
pub async fn disable_sensor(&self) -> Result<()> {
let char = self
.find_char(SENSOR_CHARACTERISTIC)
.ok_or_else(|| anyhow!("Sensor characteristic not found"))?;
let msg = wire::Sensor {
read: false,
address: 0,
data: 0,
};
let payload = msg.encode_to_vec();
self.peripheral
.write(char, &payload, WriteType::WithResponse)
.await?;
info!("Sensor disabled");
Ok(())
}
pub async fn is_connected(&self) -> bool {
self.peripheral.is_connected().await.unwrap_or(false)
}
pub async fn disconnect(&self) -> Result<()> {
self.peripheral.disconnect().await?;
Ok(())
}
}