use std::io::{Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use super::{Board, ConfigurableBoard};
use crate::channel_config::{ChannelConfig, GainTracker, CHANNEL_LETTERS};
use crate::electrode::ElectrodeLayout;
use crate::error::{OpenBciError, Result};
use crate::packet::{decode_cyton, START_BYTE};
use crate::sample::{Sample, StreamHandle};
pub struct CytonBoard {
port_name: String,
electrode_layout: ElectrodeLayout,
gains: GainTracker,
port: Option<Box<dyn serialport::SerialPort>>,
streaming: bool,
keep_alive: Arc<AtomicBool>,
}
impl CytonBoard {
pub fn new(port: impl Into<String>) -> Self {
Self {
port_name: port.into(),
electrode_layout: ElectrodeLayout::new(8),
gains: GainTracker::new(vec![24.0; 8]),
port: None,
streaming: false,
keep_alive: Arc::new(AtomicBool::new(false)),
}
}
pub fn with_electrode_layout(mut self, layout: ElectrodeLayout) -> Self {
self.electrode_layout = layout;
self
}
fn port_mut(&mut self) -> Result<&mut Box<dyn serialport::SerialPort>> {
self.port.as_mut().ok_or(OpenBciError::BoardNotPrepared)
}
fn wait_for_ready(port: &mut Box<dyn serialport::SerialPort>) -> Result<()> {
let mut buf = [0u8; 1];
let mut dollar_cnt = 0usize;
let mut empty_seq = 0usize;
for _ in 0..500 {
match port.read(&mut buf) {
Ok(1) => {
empty_seq = 0;
if buf[0] == b'$' {
dollar_cnt += 1;
if dollar_cnt == 3 { return Ok(()); }
} else {
dollar_cnt = 0;
}
}
_ => {
empty_seq += 1;
if empty_seq >= 5 {
return Err(OpenBciError::BoardNotReady(
"Board did not send '$$$' ready marker".into(),
));
}
}
}
}
Err(OpenBciError::Timeout)
}
fn drain_response(port: &mut Box<dyn serialport::SerialPort>) -> String {
let mut resp = Vec::new();
let mut b = [0u8; 1];
loop {
match port.read(&mut b) {
Ok(1) => resp.push(b[0]),
_ => break,
}
}
String::from_utf8_lossy(&resp).into_owned()
}
}
impl Board for CytonBoard {
fn prepare(&mut self) -> Result<()> {
if self.port.is_some() { return Ok(()); }
let mut port = serialport::new(&self.port_name, 115_200)
.timeout(Duration::from_millis(1000))
.open()?;
port.write_all(b"v")?;
Self::wait_for_ready(&mut port)?;
port.write_all(b"d")?;
let resp = Self::drain_response(&mut port);
if resp.starts_with("Failure") {
return Err(OpenBciError::BoardNotReady(
"Dongle connected but Cyton board is off or not responding".into(),
));
}
self.port = Some(port);
Ok(())
}
fn start_stream(&mut self) -> Result<StreamHandle> {
if self.streaming { return Err(OpenBciError::AlreadyStreaming); }
let port = self.port.as_mut().ok_or(OpenBciError::BoardNotPrepared)?;
port.write_all(b"b")?;
let mut reader = port.try_clone().map_err(|e| {
OpenBciError::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
})?;
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Sample>(512);
let (stop_tx, stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
let keep_alive = self.keep_alive.clone();
let gains = self.gains.clone();
keep_alive.store(true, Ordering::Release);
std::thread::spawn(move || {
let mut buf1 = [0u8; 1];
let mut buf32 = [0u8; 32];
'outer: loop {
if stop_rx.try_recv().is_ok() || !keep_alive.load(Ordering::Acquire) {
break;
}
match reader.read(&mut buf1) {
Ok(1) if buf1[0] == START_BYTE => {}
_ => continue,
}
let mut remaining = 32usize;
let mut pos = 0usize;
while remaining > 0 {
if stop_rx.try_recv().is_ok() { break 'outer; }
match reader.read(&mut buf32[pos..pos + remaining]) {
Ok(n) if n > 0 => { pos += n; remaining -= n; }
_ => continue 'outer,
}
}
if let Some(sample) = decode_cyton(&buf32, &gains, 8) {
if sample_tx.send(sample).is_err() { break; }
}
}
});
self.streaming = true;
Ok(StreamHandle { receiver: sample_rx, stop_tx: Some(stop_tx) })
}
fn stop_stream(&mut self) -> Result<()> {
if !self.streaming { return Err(OpenBciError::NotStreaming); }
self.keep_alive.store(false, Ordering::Release);
if let Some(ref mut p) = self.port {
let _ = p.write_all(b"s");
}
self.streaming = false;
Ok(())
}
fn release(&mut self) -> Result<()> {
if self.streaming { let _ = self.stop_stream(); }
self.port = None;
Ok(())
}
fn send_command(&mut self, cmd: &str) -> Result<String> {
{
let port = self.port_mut()?;
port.write_all(cmd.as_bytes())?;
}
if self.streaming {
return Ok(String::new());
}
self.gains.apply_command(cmd);
let port = self.port_mut()?;
Ok(Self::drain_response(port))
}
fn electrode_layout(&self) -> &ElectrodeLayout { &self.electrode_layout }
fn set_electrode_layout(&mut self, l: ElectrodeLayout) { self.electrode_layout = l; }
fn channel_count(&self) -> usize { 8 }
fn sampling_rate(&self) -> u32 { 250 }
}
impl ConfigurableBoard for CytonBoard {
fn apply_channel_config(&mut self, channel: usize, config: &ChannelConfig) -> Result<()> {
if channel >= 8 { return Err(OpenBciError::ChannelOutOfRange(channel, 8)); }
let cmd = config.to_command(CHANNEL_LETTERS[channel]);
self.gains.apply_command(&cmd);
let streaming = self.streaming;
{
let port = self.port_mut()?;
port.write_all(cmd.as_bytes())?;
}
if !streaming {
let port = self.port_mut()?;
Self::drain_response(port);
}
Ok(())
}
}