use std::collections::VecDeque;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use crate::device::Scan;
use crate::error::BrainVisionError;
use crate::protocol::decode_frame;
use crate::types::{DataBlock, HeaderInfo, RdaMessage, ENVELOPE_LEN};
pub struct AsyncBrainVisionDevice {
stream: TcpStream,
header: Option<HeaderInfo>,
scan_buf: VecDeque<Scan>,
}
impl AsyncBrainVisionDevice {
pub async fn connect(host: &str, port: u16) -> Result<Self, BrainVisionError> {
let addr = format!("{host}:{port}");
let stream = TcpStream::connect(&addr)
.await
.map_err(|e| BrainVisionError::Connection(e.to_string()))?;
Ok(Self {
stream,
header: None,
scan_buf: VecDeque::new(),
})
}
async fn read_frame(&mut self) -> Result<Vec<u8>, BrainVisionError> {
let mut env = [0u8; ENVELOPE_LEN];
self.stream.read_exact(&mut env).await?;
let size = u32::from_le_bytes(env[16..20].try_into().unwrap()) as usize;
if size < ENVELOPE_LEN {
return Err(BrainVisionError::Protocol("message size < envelope".into()));
}
let mut frame = Vec::with_capacity(size);
frame.extend_from_slice(&env);
if size > ENVELOPE_LEN {
let mut payload = vec![0u8; size - ENVELOPE_LEN];
self.stream.read_exact(&mut payload).await?;
frame.extend_from_slice(&payload);
}
Ok(frame)
}
pub async fn read_message(&mut self) -> Result<RdaMessage, BrainVisionError> {
let frame = self.read_frame().await?;
let msg = decode_frame(&frame, self.header.as_ref())?;
if let RdaMessage::Start(h) = &msg {
self.header = Some(h.clone());
}
Ok(msg)
}
pub async fn wait_for_start(&mut self) -> Result<HeaderInfo, BrainVisionError> {
loop {
if let RdaMessage::Start(h) = self.read_message().await? {
return Ok(h);
}
}
}
pub async fn next_block(&mut self) -> Result<Option<DataBlock>, BrainVisionError> {
loop {
match self.read_message().await? {
RdaMessage::Data16(b) | RdaMessage::Data32(b) => return Ok(Some(b)),
RdaMessage::Stop => return Ok(None),
_ => {}
}
}
}
pub async fn next_scan(&mut self) -> Result<Option<Scan>, BrainVisionError> {
if let Some(s) = self.scan_buf.pop_front() {
return Ok(Some(s));
}
let b = match self.next_block().await? {
Some(b) => b,
None => return Ok(None),
};
let channels = self
.header
.as_ref()
.map(|h| h.channel_count as usize)
.ok_or_else(|| BrainVisionError::Protocol("no header context".into()))?;
for chunk in b.samples_uv.chunks(channels) {
self.scan_buf.push_back(Scan {
data: chunk.to_vec(),
});
}
Ok(self.scan_buf.pop_front())
}
}