use std::time::{Duration, SystemTime, UNIX_EPOCH};
use btleplug::api::{Central, Manager as _, Peripheral as _, ScanFilter};
use btleplug::platform::Manager;
use futures::StreamExt;
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
fn hex_encode(data: &[u8]) -> String {
data.iter().map(|b| format!("{b:02x}")).collect()
}
pub struct BleConnector {
name_filter: String,
char_uuid: Uuid,
scan_duration: Duration,
topic_prefix: String,
classifier: TopicClassifier,
delta_filter: DeltaFilter,
}
impl BleConnector {
pub fn new(name_filter: impl Into<String>, char_uuid: impl AsRef<str>) -> Result<Self> {
let uuid = char_uuid.as_ref().parse::<Uuid>()
.map_err(|e| WireBandError::Ble(format!("invalid UUID: {e}")))?;
Ok(Self {
name_filter: name_filter.into(),
char_uuid: uuid,
scan_duration: Duration::from_secs(10),
topic_prefix: "ble".to_string(),
classifier: TopicClassifier::new(),
delta_filter: DeltaFilter::new(0.0),
})
}
pub fn scan_duration(mut self, d: Duration) -> Self {
self.scan_duration = d;
self
}
pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
self.topic_prefix = prefix.into();
self
}
pub fn delta_threshold(mut self, threshold: f64) -> Self {
self.delta_filter = DeltaFilter::new(threshold);
self
}
pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
&mut self.classifier
}
pub async fn run(mut self, client: WireBandClient) -> Result<()> {
let manager = Manager::new().await
.map_err(|e| WireBandError::Ble(format!("BLE manager init: {e}")))?;
let adapters = manager.adapters().await
.map_err(|e| WireBandError::Ble(format!("BLE list adapters: {e}")))?;
let adapter = adapters.into_iter().next()
.ok_or_else(|| WireBandError::Ble("no BLE adapter found".into()))?;
info!(filter = %self.name_filter, "BLE: scanning for peripheral...");
adapter.start_scan(ScanFilter::default()).await
.map_err(|e| WireBandError::Ble(format!("BLE scan start: {e}")))?;
tokio::time::sleep(self.scan_duration).await;
adapter.stop_scan().await.ok();
let peripherals = adapter.peripherals().await
.map_err(|e| WireBandError::Ble(format!("BLE list peripherals: {e}")))?;
let mut periph_opt = None;
for p in peripherals {
let props = p.properties().await.ok().flatten();
let name = props.as_ref().and_then(|pr| pr.local_name.as_deref()).unwrap_or("");
if name.contains(self.name_filter.as_str()) {
info!(device = %name, "BLE: peripheral matched");
periph_opt = Some(p);
break;
}
}
let periph = periph_opt.ok_or_else(|| WireBandError::Ble(
format!("BLE peripheral '{}' not found after scan", self.name_filter)
))?;
periph.connect().await
.map_err(|e| WireBandError::Ble(format!("BLE connect: {e}")))?;
periph.discover_services().await
.map_err(|e| WireBandError::Ble(format!("BLE discover services: {e}")))?;
let chars = periph.characteristics();
let target = chars.iter()
.find(|c| c.uuid == self.char_uuid)
.ok_or_else(|| WireBandError::Ble(
format!("characteristic {} not found on device", self.char_uuid)
))?
.clone();
periph.subscribe(&target).await
.map_err(|e| WireBandError::Ble(format!("BLE subscribe: {e}")))?;
let device_name = periph.properties().await.ok().flatten()
.and_then(|p| p.local_name)
.unwrap_or_else(|| self.name_filter.clone());
info!(device = %device_name, char = %self.char_uuid, "BLE: streaming notifications");
let mut stream = periph.notifications().await
.map_err(|e| WireBandError::Ble(format!("BLE notifications stream: {e}")))?;
while let Some(notif) = stream.next().await {
self.process(&client, &device_name, ¬if.value).await;
}
warn!(device = %device_name, "BLE: notification stream ended");
Ok(())
}
async fn process(&mut self, client: &WireBandClient, device: &str, data: &[u8]) {
let payload: serde_json::Value = std::str::from_utf8(data)
.ok()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_else(|| serde_json::Value::String(hex_encode(data)));
let topic = format!("{}/{}/data", self.topic_prefix, device);
if self.delta_filter.should_suppress(&topic, &payload) {
debug!(topic = %topic, "BLE: notification delta-filtered");
return;
}
let symbol = self.classifier.classify(&topic, &payload);
let encoded = frame::encode(symbol, &topic, &payload);
client.buffer_event(topic, symbol, encoded, unix_ts()).await;
debug!(device = %device, symbol = format!("{symbol:#06x}"), "BLE notification buffered");
}
}