brainvision 0.0.1

Rust library and TUI for Brain Products BrainVision RDA EEG streams over TCP/IP
Documentation
//! Async BrainVision API (Tokio).

use std::collections::VecDeque;

use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;

use crate::device::Scan;
use crate::error::BrainVisionError;
use crate::protocol::decode_frame;
use crate::types::{DataBlock, HeaderInfo, RdaMessage, ENVELOPE_LEN};

/// Async BrainVision device.
pub struct AsyncBrainVisionDevice {
    stream: TcpStream,
    header: Option<HeaderInfo>,
    scan_buf: VecDeque<Scan>,
}

impl AsyncBrainVisionDevice {
    pub async fn connect(host: &str, port: u16) -> Result<Self, BrainVisionError> {
        let addr = format!("{host}:{port}");
        let stream = TcpStream::connect(&addr)
            .await
            .map_err(|e| BrainVisionError::Connection(e.to_string()))?;
        Ok(Self {
            stream,
            header: None,
            scan_buf: VecDeque::new(),
        })
    }

    async fn read_frame(&mut self) -> Result<Vec<u8>, BrainVisionError> {
        let mut env = [0u8; ENVELOPE_LEN];
        self.stream.read_exact(&mut env).await?;
        let size = u32::from_le_bytes(env[16..20].try_into().unwrap()) as usize;
        if size < ENVELOPE_LEN {
            return Err(BrainVisionError::Protocol("message size < envelope".into()));
        }
        let mut frame = Vec::with_capacity(size);
        frame.extend_from_slice(&env);
        if size > ENVELOPE_LEN {
            let mut payload = vec![0u8; size - ENVELOPE_LEN];
            self.stream.read_exact(&mut payload).await?;
            frame.extend_from_slice(&payload);
        }
        Ok(frame)
    }

    pub async fn read_message(&mut self) -> Result<RdaMessage, BrainVisionError> {
        let frame = self.read_frame().await?;
        let msg = decode_frame(&frame, self.header.as_ref())?;
        if let RdaMessage::Start(h) = &msg {
            self.header = Some(h.clone());
        }
        Ok(msg)
    }

    pub async fn wait_for_start(&mut self) -> Result<HeaderInfo, BrainVisionError> {
        loop {
            if let RdaMessage::Start(h) = self.read_message().await? {
                return Ok(h);
            }
        }
    }

    pub async fn next_block(&mut self) -> Result<Option<DataBlock>, BrainVisionError> {
        loop {
            match self.read_message().await? {
                RdaMessage::Data16(b) | RdaMessage::Data32(b) => return Ok(Some(b)),
                RdaMessage::Stop => return Ok(None),
                _ => {}
            }
        }
    }

    pub async fn next_scan(&mut self) -> Result<Option<Scan>, BrainVisionError> {
        if let Some(s) = self.scan_buf.pop_front() {
            return Ok(Some(s));
        }
        let b = match self.next_block().await? {
            Some(b) => b,
            None => return Ok(None),
        };
        let channels = self
            .header
            .as_ref()
            .map(|h| h.channel_count as usize)
            .ok_or_else(|| BrainVisionError::Protocol("no header context".into()))?;
        for chunk in b.samples_uv.chunks(channels) {
            self.scan_buf.push_back(Scan {
                data: chunk.to_vec(),
            });
        }
        Ok(self.scan_buf.pop_front())
    }
}