awear 0.1.0

Rust client for AWEAR EEG devices over BLE using btleplug
Documentation
//! BLE scanning, connection, authentication, and data streaming for AWEAR devices.

use std::time::Duration;

use anyhow::{Context, Result};
use btleplug::api::{
    Central, Manager as _, Peripheral as _, ScanFilter, WriteType,
};
use btleplug::platform::{Adapter, Manager, Peripheral};
use futures::StreamExt;
use log::{debug, error, info};
use tokio::sync::mpsc;

use crate::parse;
use crate::protocol::*;
use crate::types::*;

/// A discovered AWEAR device ready for connection.
pub struct AwearDevice {
    pub name: String,
    pub id: String,
    pub rssi: i16,
    pub(crate) peripheral: Peripheral,
    #[allow(dead_code)]
    pub(crate) adapter: Adapter,
}

/// Handle to a connected AWEAR device for sending commands.
pub struct AwearHandle {
    peripheral: Peripheral,
    tx_char: btleplug::api::Characteristic,
}

impl AwearHandle {
    /// Send a raw command string to the device.
    pub async fn send_command(&self, cmd: &str) -> Result<()> {
        let data = cmd.as_bytes().to_vec();
        self.peripheral
            .write(&self.tx_char, &data, WriteType::WithoutResponse)
            .await?;
        debug!("Sent command: {} ({} bytes)", cmd, data.len());
        Ok(())
    }

    /// Send START command to begin EEG streaming.
    pub async fn start(&self) -> Result<()> {
        self.send_command("START").await
    }

    /// Send STOP command to halt EEG streaming.
    pub async fn stop(&self) -> Result<()> {
        self.send_command("STOP").await
    }

    /// Disconnect from the device.
    pub async fn disconnect(&self) -> Result<()> {
        self.peripheral.disconnect().await?;
        Ok(())
    }
}

/// AWEAR BLE client for scanning and connecting.
pub struct AwearClient {
    pub config: AwearClientConfig,
}

impl AwearClient {
    pub fn new(config: AwearClientConfig) -> Self {
        Self { config }
    }

    /// Get the BLE adapter.
    async fn get_adapter() -> Result<Adapter> {
        let manager = Manager::new().await?;
        let adapters = manager.adapters().await?;
        adapters
            .into_iter()
            .next()
            .context("No BLE adapters found")
    }

    /// Scan for all nearby AWEAR devices.
    pub async fn scan_all(&self) -> Result<Vec<AwearDevice>> {
        let adapter = Self::get_adapter().await?;

        // On macOS, wait for adapter to power on
        #[cfg(target_os = "macos")]
        {
            use btleplug::api::CentralState;
            for _ in 0..20 {
                if adapter.adapter_state().await? == CentralState::PoweredOn {
                    break;
                }
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }

        info!("Starting AWEAR device scan ({}s timeout)", self.config.scan_timeout_secs);
        adapter.start_scan(ScanFilter::default()).await?;
        tokio::time::sleep(Duration::from_secs(self.config.scan_timeout_secs)).await;
        adapter.stop_scan().await?;

        let peripherals = adapter.peripherals().await?;
        let mut devices = Vec::new();

        for p in peripherals {
            if let Some(props) = p.properties().await? {
                let name = props.local_name.unwrap_or_default();
                if name.to_uppercase().contains(&self.config.name_prefix.to_uppercase()) {
                    let rssi = props.rssi.unwrap_or(-127);
                    if rssi >= self.config.min_rssi {
                        info!("Found AWEAR device: {} (RSSI={})", name, rssi);
                        devices.push(AwearDevice {
                            name,
                            id: p.id().to_string(),
                            rssi,
                            peripheral: p,
                            adapter: adapter.clone(),
                        });
                    }
                }
            }
        }

        devices.sort_by(|a, b| b.rssi.cmp(&a.rssi));
        info!("AWEAR devices found: {}", devices.len());
        Ok(devices)
    }

    /// Scan and connect to the first discovered device.
    pub async fn connect(&self) -> Result<(mpsc::Receiver<AwearEvent>, AwearHandle)> {
        let devices = self.scan_all().await?;
        let device = devices
            .into_iter()
            .next()
            .context("No AWEAR devices found")?;
        self.connect_to(device).await
    }

    /// Connect to a specific AWEAR device.
    pub async fn connect_to(
        &self,
        device: AwearDevice,
    ) -> Result<(mpsc::Receiver<AwearEvent>, AwearHandle)> {
        let device_name = device.name.clone();
        let peripheral = device.peripheral;

        info!("Connecting to AWEAR device: {}", device_name);

        // Connect with timeout
        tokio::time::timeout(
            Duration::from_secs(self.config.connect_timeout_secs),
            peripheral.connect(),
        )
        .await
        .context("Connection timeout")?
        .context("Connection failed")?;

        info!("Connected, discovering services...");

        // Discover services
        #[cfg(target_os = "linux")]
        tokio::time::sleep(Duration::from_millis(600)).await;

        tokio::time::timeout(
            Duration::from_secs(15),
            peripheral.discover_services(),
        )
        .await
        .context("Service discovery timeout")?
        .context("Service discovery failed")?;

        // Find TX and RX characteristics
        let chars = peripheral.characteristics();
        let tx_char = chars
            .iter()
            .find(|c| c.uuid == AWEAR_TX_CHARACTERISTIC)
            .cloned()
            .context("TX characteristic not found")?;
        let rx_char = chars
            .iter()
            .find(|c| c.uuid == AWEAR_RX_CHARACTERISTIC)
            .cloned()
            .context("RX characteristic not found")?;

        info!("AWEAR service discovered — TX={} RX={}", tx_char.uuid, rx_char.uuid);

        // Subscribe to RX notifications
        peripheral.subscribe(&rx_char).await?;

        let (tx, rx) = mpsc::channel::<AwearEvent>(512);

        // Spawn notification dispatcher
        let p = peripheral.clone();
        let tx_clone = tx.clone();
        let tx_char_clone = tx_char.clone();
        let device_name_clone = device_name.clone();

        tokio::spawn(async move {
            if let Err(e) = notification_dispatcher(
                p,
                tx_clone,
                tx_char_clone,
                device_name_clone,
            )
            .await
            {
                error!("Notification dispatcher error: {}", e);
            }
        });

        let _ = tx.send(AwearEvent::Connected(device_name)).await;

        let handle = AwearHandle {
            peripheral,
            tx_char,
        };

        Ok((rx, handle))
    }
}

/// Background task that receives BLE notifications and dispatches events.
async fn notification_dispatcher(
    peripheral: Peripheral,
    tx: mpsc::Sender<AwearEvent>,
    tx_char: btleplug::api::Characteristic,
    device_name: String,
) -> Result<()> {
    let mut notifications = peripheral.notifications().await?;

    // LUCA protocol state
    let mut luca_expecting_data = false;
    let mut luca_last_chunk_size: u16 = 0;
    let mut luca_block_counter: u32 = 0;
    let mut luca_eeg_buffer: Vec<u8> = Vec::new();
    let mut device_ready = false;

    // Wait for AWEAR_CONNECTED challenge, send CRPL, wait for AWEAR_READY
    info!("Waiting for AWEAR handshake...");

    while let Some(notif) = notifications.next().await {
        if notif.uuid != AWEAR_RX_CHARACTERISTIC {
            continue;
        }
        let data = &notif.value;

        if data.is_empty() {
            continue;
        }

        // ── Text messages (newline-terminated) ──
        if data.last() == Some(&b'\n') {
            let text = String::from_utf8_lossy(data).trim().to_string();

            if text.starts_with(AWEAR_CONNECTED_PREFIX) {
                let challenge = &text[AWEAR_CONNECTED_PREFIX.len()..];
                info!("Received handshake: AWEAR_CONNECTED:{}", challenge);

                if challenge.len() == 8 {
                    if let Some(reply) = compute_challenge_reply(challenge) {
                        info!("Sending challenge reply: {}", reply);
                        if let Err(e) = peripheral
                            .write(&tx_char, reply.as_bytes(), WriteType::WithoutResponse)
                            .await
                        {
                            error!("Failed to send CRPL: {}", e);
                        }
                    }
                }
                continue;
            }

            if text.starts_with(AWEAR_READY_PREFIX) {
                info!("Device authenticated: {}", text);
                device_ready = true;
                let _ = tx.send(AwearEvent::Ready).await;
                continue;
            }

            if text.starts_with("Battery mV:") {
                if let Some(mv_str) = text.strip_prefix("Battery mV:") {
                    if let Ok(mv) = mv_str.trim().parse::<u32>() {
                        let pct = parse::battery_mv_to_percent(mv);
                        let _ = tx.send(AwearEvent::Battery(pct)).await;
                    }
                }
                continue;
            }

            if text.starts_with("RSSI DBm:") {
                if let Some(rssi_str) = text.strip_prefix("RSSI DBm:") {
                    if let Ok(rssi) = rssi_str.trim().parse::<i8>() {
                        let _ = tx.send(AwearEvent::Signal(rssi)).await;
                    }
                }
                continue;
            }

            if !text.is_empty() {
                debug!("RX text: {}", text);
                let _ = tx.send(AwearEvent::Misc(text)).await;
            }
            continue;
        }

        // ── LUCA header (36 bytes, starts with "LUCA") ──
        if data.len() >= 4 && &data[..4] == LUCA_MAGIC {
            if !device_ready {
                continue;
            }

            if let Some((data_type, seq, payload_hint)) = parse::parse_luca_header(data) {
                if data_type == 1 || data_type == 7 {
                    // Finalize any pending EEG block
                    if luca_expecting_data && !luca_eeg_buffer.is_empty() {
                        if let Some(reading) =
                            parse::parse_luca_eeg_block(&luca_eeg_buffer, luca_block_counter)
                        {
                            let _ = tx.send(AwearEvent::Eeg(reading)).await;
                        }
                    }
                    luca_expecting_data = true;
                    luca_eeg_buffer.clear();
                    luca_last_chunk_size = payload_hint;
                    luca_block_counter = seq;
                    debug!("LUCA EEG header: type={} seq={} last_chunk={}", data_type, seq, payload_hint);
                } else {
                    if luca_expecting_data && !luca_eeg_buffer.is_empty() {
                        if let Some(reading) =
                            parse::parse_luca_eeg_block(&luca_eeg_buffer, luca_block_counter)
                        {
                            let _ = tx.send(AwearEvent::Eeg(reading)).await;
                        }
                    }
                    luca_expecting_data = false;
                    debug!("LUCA info header: type={} seq={}", data_type, seq);
                }
            }
            continue;
        }

        // ── Raw EEG data (LUCA continuation) ──
        if luca_expecting_data && device_ready {
            luca_eeg_buffer.extend_from_slice(data);

            // Finalize on final chunk
            if luca_last_chunk_size > 0 && data.len() == luca_last_chunk_size as usize {
                luca_expecting_data = false;
                if let Some(reading) =
                    parse::parse_luca_eeg_block(&luca_eeg_buffer, luca_block_counter)
                {
                    let _ = tx.send(AwearEvent::Eeg(reading)).await;
                }
                luca_eeg_buffer.clear();
            }
            continue;
        }

        // ── Legacy single-byte type routing ──
        if let Some(ptype) = DataPacketType::from_byte(data[0]) {
            match ptype {
                DataPacketType::Battery => {
                    if let Some(level) = parse::parse_battery(data) {
                        let _ = tx.send(AwearEvent::Battery(level)).await;
                    }
                }
                DataPacketType::Signal => {
                    if let Some(rssi) = parse::parse_signal(data) {
                        let _ = tx.send(AwearEvent::Signal(rssi)).await;
                    }
                }
                DataPacketType::Eeg => {
                    if let Some((counter, channels)) = parse::parse_legacy_eeg(data) {
                        let samples: Vec<i16> = channels
                            .iter()
                            .map(|&v| v.clamp(i16::MIN as i32, i16::MAX as i32) as i16)
                            .collect();
                        let _ = tx
                            .send(AwearEvent::Eeg(EegReading {
                                sequence: counter as u32,
                                samples,
                            }))
                            .await;
                    }
                }
                DataPacketType::Misc => {
                    if let Ok(text) = std::str::from_utf8(&data[1..]) {
                        let text = text.trim().to_string();
                        if !text.is_empty() {
                            let _ = tx.send(AwearEvent::Misc(text)).await;
                        }
                    }
                }
            }
        }
    }

    info!("Device {} disconnected", device_name);
    let _ = tx.send(AwearEvent::Disconnected).await;
    Ok(())
}