use std::io::Read;
use std::net::TcpStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use super::Board;
use super::wifi_shield::{
connect_wifi_shield, send_wifi_command, wifi_start_stream, wifi_stop_stream, WifiShieldConfig,
};
use crate::electrode::ElectrodeLayout;
use crate::error::{OpenBciError, Result};
use crate::packet::{decode_cyton, START_BYTE};
use crate::channel_config::GainTracker;
use crate::sample::{Sample, StreamHandle};
pub struct GanglionWifiBoard {
wifi_cfg: GanglionWifiConfig,
electrode_layout: ElectrodeLayout,
gains: GainTracker,
stream: Option<TcpStream>,
streaming: bool,
keep_alive: Arc<AtomicBool>,
}
#[derive(Debug, Clone)]
pub struct GanglionWifiConfig {
pub shield_ip: String,
pub local_port: u16,
pub http_timeout: u64,
}
impl Default for GanglionWifiConfig {
fn default() -> Self {
Self { shield_ip: String::new(), local_port: 3001, http_timeout: 10 }
}
}
impl GanglionWifiBoard {
pub fn new(cfg: GanglionWifiConfig) -> Self {
Self {
wifi_cfg: cfg,
electrode_layout: ElectrodeLayout::new(4),
gains: GainTracker::new(vec![1.0; 4]),
stream: None,
streaming: false,
keep_alive: Arc::new(AtomicBool::new(false)),
}
}
pub fn with_electrode_layout(mut self, layout: ElectrodeLayout) -> Self {
self.electrode_layout = layout;
self
}
fn shield_ip(&self) -> &str { &self.wifi_cfg.shield_ip }
fn http_timeout(&self) -> u64 { self.wifi_cfg.http_timeout }
}
impl Board for GanglionWifiBoard {
fn prepare(&mut self) -> Result<()> {
if self.stream.is_some() { return Ok(()); }
let mut shield_cfg = WifiShieldConfig {
shield_ip: self.wifi_cfg.shield_ip.clone(),
local_port: self.wifi_cfg.local_port,
http_timeout: self.wifi_cfg.http_timeout,
};
let tcp_stream = connect_wifi_shield(&mut shield_cfg)?;
self.wifi_cfg.shield_ip = shield_cfg.shield_ip;
send_wifi_command(self.shield_ip(), "~4", self.http_timeout())?;
self.stream = Some(tcp_stream);
Ok(())
}
fn start_stream(&mut self) -> Result<StreamHandle> {
if self.streaming { return Err(OpenBciError::AlreadyStreaming); }
let tcp = self.stream.as_ref().ok_or(OpenBciError::BoardNotPrepared)?;
wifi_start_stream(self.shield_ip(), self.http_timeout())?;
let mut reader = tcp.try_clone()?;
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Sample>(512);
let (stop_tx, stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
let keep_alive = self.keep_alive.clone();
let gains = self.gains.clone();
keep_alive.store(true, Ordering::Release);
std::thread::spawn(move || {
let mut buf = [0u8; 33];
'outer: loop {
if stop_rx.try_recv().is_ok() || !keep_alive.load(Ordering::Acquire) { break; }
let mut remaining = 33usize;
let mut pos = 0usize;
while remaining > 0 {
if stop_rx.try_recv().is_ok() { break 'outer; }
match reader.read(&mut buf[pos..pos + remaining]) {
Ok(n) if n > 0 => { pos += n; remaining -= n; }
_ => continue 'outer,
}
}
if buf[0] != START_BYTE { continue; }
let body: [u8; 32] = buf[1..33].try_into().unwrap();
if let Some(mut sample) = decode_cyton(&body, &gains, 4) {
sample.eeg.truncate(4);
if sample_tx.send(sample).is_err() { break; }
}
}
});
self.streaming = true;
Ok(StreamHandle { receiver: sample_rx, stop_tx: Some(stop_tx) })
}
fn stop_stream(&mut self) -> Result<()> {
if !self.streaming { return Err(OpenBciError::NotStreaming); }
self.keep_alive.store(false, Ordering::Release);
wifi_stop_stream(self.shield_ip(), self.http_timeout())?;
self.streaming = false;
Ok(())
}
fn release(&mut self) -> Result<()> {
if self.streaming { let _ = self.stop_stream(); }
self.stream = None;
Ok(())
}
fn send_command(&mut self, cmd: &str) -> Result<String> {
send_wifi_command(self.shield_ip(), cmd, self.http_timeout())?;
Ok(String::new())
}
fn electrode_layout(&self) -> &ElectrodeLayout { &self.electrode_layout }
fn set_electrode_layout(&mut self, l: ElectrodeLayout) { self.electrode_layout = l; }
fn channel_count(&self) -> usize { 4 }
fn sampling_rate(&self) -> u32 { 200 }
}