use std::time::Duration;
use anyhow::{Context, Result};
use btleplug::api::{
Central, Manager as _, Peripheral as _, ScanFilter, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::StreamExt;
use log::{debug, error, info};
use tokio::sync::mpsc;
use crate::parse;
use crate::protocol::*;
use crate::types::*;
pub struct AwearDevice {
pub name: String,
pub id: String,
pub rssi: i16,
pub(crate) peripheral: Peripheral,
#[allow(dead_code)]
pub(crate) adapter: Adapter,
}
pub struct AwearHandle {
peripheral: Peripheral,
tx_char: btleplug::api::Characteristic,
}
impl AwearHandle {
pub async fn send_command(&self, cmd: &str) -> Result<()> {
let data = cmd.as_bytes().to_vec();
self.peripheral
.write(&self.tx_char, &data, WriteType::WithoutResponse)
.await?;
debug!("Sent command: {} ({} bytes)", cmd, data.len());
Ok(())
}
pub async fn start(&self) -> Result<()> {
self.send_command("START").await
}
pub async fn stop(&self) -> Result<()> {
self.send_command("STOP").await
}
pub async fn disconnect(&self) -> Result<()> {
self.peripheral.disconnect().await?;
Ok(())
}
}
pub struct AwearClient {
pub config: AwearClientConfig,
}
impl AwearClient {
pub fn new(config: AwearClientConfig) -> Self {
Self { config }
}
async fn get_adapter() -> Result<Adapter> {
let manager = Manager::new().await?;
let adapters = manager.adapters().await?;
adapters
.into_iter()
.next()
.context("No BLE adapters found")
}
pub async fn scan_all(&self) -> Result<Vec<AwearDevice>> {
let adapter = Self::get_adapter().await?;
#[cfg(target_os = "macos")]
{
use btleplug::api::CentralState;
for _ in 0..20 {
if adapter.adapter_state().await? == CentralState::PoweredOn {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
info!("Starting AWEAR device scan ({}s timeout)", self.config.scan_timeout_secs);
adapter.start_scan(ScanFilter::default()).await?;
tokio::time::sleep(Duration::from_secs(self.config.scan_timeout_secs)).await;
adapter.stop_scan().await?;
let peripherals = adapter.peripherals().await?;
let mut devices = Vec::new();
for p in peripherals {
if let Some(props) = p.properties().await? {
let name = props.local_name.unwrap_or_default();
if name.to_uppercase().contains(&self.config.name_prefix.to_uppercase()) {
let rssi = props.rssi.unwrap_or(-127);
if rssi >= self.config.min_rssi {
info!("Found AWEAR device: {} (RSSI={})", name, rssi);
devices.push(AwearDevice {
name,
id: p.id().to_string(),
rssi,
peripheral: p,
adapter: adapter.clone(),
});
}
}
}
}
devices.sort_by(|a, b| b.rssi.cmp(&a.rssi));
info!("AWEAR devices found: {}", devices.len());
Ok(devices)
}
pub async fn connect(&self) -> Result<(mpsc::Receiver<AwearEvent>, AwearHandle)> {
let devices = self.scan_all().await?;
let device = devices
.into_iter()
.next()
.context("No AWEAR devices found")?;
self.connect_to(device).await
}
pub async fn connect_to(
&self,
device: AwearDevice,
) -> Result<(mpsc::Receiver<AwearEvent>, AwearHandle)> {
let device_name = device.name.clone();
let peripheral = device.peripheral;
info!("Connecting to AWEAR device: {}", device_name);
tokio::time::timeout(
Duration::from_secs(self.config.connect_timeout_secs),
peripheral.connect(),
)
.await
.context("Connection timeout")?
.context("Connection failed")?;
info!("Connected, discovering services...");
#[cfg(target_os = "linux")]
tokio::time::sleep(Duration::from_millis(600)).await;
tokio::time::timeout(
Duration::from_secs(15),
peripheral.discover_services(),
)
.await
.context("Service discovery timeout")?
.context("Service discovery failed")?;
let chars = peripheral.characteristics();
let tx_char = chars
.iter()
.find(|c| c.uuid == AWEAR_TX_CHARACTERISTIC)
.cloned()
.context("TX characteristic not found")?;
let rx_char = chars
.iter()
.find(|c| c.uuid == AWEAR_RX_CHARACTERISTIC)
.cloned()
.context("RX characteristic not found")?;
info!("AWEAR service discovered — TX={} RX={}", tx_char.uuid, rx_char.uuid);
peripheral.subscribe(&rx_char).await?;
let (tx, rx) = mpsc::channel::<AwearEvent>(512);
let p = peripheral.clone();
let tx_clone = tx.clone();
let tx_char_clone = tx_char.clone();
let device_name_clone = device_name.clone();
tokio::spawn(async move {
if let Err(e) = notification_dispatcher(
p,
tx_clone,
tx_char_clone,
device_name_clone,
)
.await
{
error!("Notification dispatcher error: {}", e);
}
});
let _ = tx.send(AwearEvent::Connected(device_name)).await;
let handle = AwearHandle {
peripheral,
tx_char,
};
Ok((rx, handle))
}
}
async fn notification_dispatcher(
peripheral: Peripheral,
tx: mpsc::Sender<AwearEvent>,
tx_char: btleplug::api::Characteristic,
device_name: String,
) -> Result<()> {
let mut notifications = peripheral.notifications().await?;
let mut luca_expecting_data = false;
let mut luca_last_chunk_size: u16 = 0;
let mut luca_block_counter: u32 = 0;
let mut luca_eeg_buffer: Vec<u8> = Vec::new();
let mut device_ready = false;
info!("Waiting for AWEAR handshake...");
while let Some(notif) = notifications.next().await {
if notif.uuid != AWEAR_RX_CHARACTERISTIC {
continue;
}
let data = ¬if.value;
if data.is_empty() {
continue;
}
if data.last() == Some(&b'\n') {
let text = String::from_utf8_lossy(data).trim().to_string();
if text.starts_with(AWEAR_CONNECTED_PREFIX) {
let challenge = &text[AWEAR_CONNECTED_PREFIX.len()..];
info!("Received handshake: AWEAR_CONNECTED:{}", challenge);
if challenge.len() == 8 {
if let Some(reply) = compute_challenge_reply(challenge) {
info!("Sending challenge reply: {}", reply);
if let Err(e) = peripheral
.write(&tx_char, reply.as_bytes(), WriteType::WithoutResponse)
.await
{
error!("Failed to send CRPL: {}", e);
}
}
}
continue;
}
if text.starts_with(AWEAR_READY_PREFIX) {
info!("Device authenticated: {}", text);
device_ready = true;
let _ = tx.send(AwearEvent::Ready).await;
continue;
}
if text.starts_with("Battery mV:") {
if let Some(mv_str) = text.strip_prefix("Battery mV:") {
if let Ok(mv) = mv_str.trim().parse::<u32>() {
let pct = parse::battery_mv_to_percent(mv);
let _ = tx.send(AwearEvent::Battery(pct)).await;
}
}
continue;
}
if text.starts_with("RSSI DBm:") {
if let Some(rssi_str) = text.strip_prefix("RSSI DBm:") {
if let Ok(rssi) = rssi_str.trim().parse::<i8>() {
let _ = tx.send(AwearEvent::Signal(rssi)).await;
}
}
continue;
}
if !text.is_empty() {
debug!("RX text: {}", text);
let _ = tx.send(AwearEvent::Misc(text)).await;
}
continue;
}
if data.len() >= 4 && &data[..4] == LUCA_MAGIC {
if !device_ready {
continue;
}
if let Some((data_type, seq, payload_hint)) = parse::parse_luca_header(data) {
if data_type == 1 || data_type == 7 {
if luca_expecting_data && !luca_eeg_buffer.is_empty() {
if let Some(reading) =
parse::parse_luca_eeg_block(&luca_eeg_buffer, luca_block_counter)
{
let _ = tx.send(AwearEvent::Eeg(reading)).await;
}
}
luca_expecting_data = true;
luca_eeg_buffer.clear();
luca_last_chunk_size = payload_hint;
luca_block_counter = seq;
debug!("LUCA EEG header: type={} seq={} last_chunk={}", data_type, seq, payload_hint);
} else {
if luca_expecting_data && !luca_eeg_buffer.is_empty() {
if let Some(reading) =
parse::parse_luca_eeg_block(&luca_eeg_buffer, luca_block_counter)
{
let _ = tx.send(AwearEvent::Eeg(reading)).await;
}
}
luca_expecting_data = false;
debug!("LUCA info header: type={} seq={}", data_type, seq);
}
}
continue;
}
if luca_expecting_data && device_ready {
luca_eeg_buffer.extend_from_slice(data);
if luca_last_chunk_size > 0 && data.len() == luca_last_chunk_size as usize {
luca_expecting_data = false;
if let Some(reading) =
parse::parse_luca_eeg_block(&luca_eeg_buffer, luca_block_counter)
{
let _ = tx.send(AwearEvent::Eeg(reading)).await;
}
luca_eeg_buffer.clear();
}
continue;
}
if let Some(ptype) = DataPacketType::from_byte(data[0]) {
match ptype {
DataPacketType::Battery => {
if let Some(level) = parse::parse_battery(data) {
let _ = tx.send(AwearEvent::Battery(level)).await;
}
}
DataPacketType::Signal => {
if let Some(rssi) = parse::parse_signal(data) {
let _ = tx.send(AwearEvent::Signal(rssi)).await;
}
}
DataPacketType::Eeg => {
if let Some((counter, channels)) = parse::parse_legacy_eeg(data) {
let samples: Vec<i16> = channels
.iter()
.map(|&v| v.clamp(i16::MIN as i32, i16::MAX as i32) as i16)
.collect();
let _ = tx
.send(AwearEvent::Eeg(EegReading {
sequence: counter as u32,
samples,
}))
.await;
}
}
DataPacketType::Misc => {
if let Ok(text) = std::str::from_utf8(&data[1..]) {
let text = text.trim().to_string();
if !text.is_empty() {
let _ = tx.send(AwearEvent::Misc(text)).await;
}
}
}
}
}
}
info!("Device {} disconnected", device_name);
let _ = tx.send(AwearEvent::Disconnected).await;
Ok(())
}