use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use super::{Board, ConfigurableBoard};
use crate::channel_config::{ChannelConfig, GainTracker, CHANNEL_LETTERS};
use crate::electrode::ElectrodeLayout;
use crate::error::{OpenBciError, Result};
use crate::packet::{cast_24bit_to_i32, cast_16bit_to_i32};
use crate::sample::{Sample, StreamHandle, now};
const GALEA_PORT: u16 = 2390;
const EXG_PACKET_SIZE: usize = 96;
const MAX_UDP_RECV: usize = 4096;
fn default_galea_gains() -> Vec<f64> {
let mut g = vec![0.0f64; 24];
for i in 0..8 { g[i] = 4.0; }
for i in 8..18 { g[i] = 12.0; }
for i in 18..22 { g[i] = 4.0; }
for i in 22..24 { g[i] = 12.0; }
g
}
#[derive(Debug, Clone)]
pub struct GaleaSample {
pub base: Sample,
pub eda_volts: f32,
pub battery: u8,
pub temperature_c: f32,
pub ppg_red: i32,
pub ppg_ir: i32,
pub device_timestamp: f64,
pub accel: Option<[f64; 3]>,
pub gyro: Option<[f64; 3]>,
pub magnetometer: Option<[f64; 3]>,
}
pub struct GaleaBoard {
ip_address: String,
electrode_layout: ElectrodeLayout,
gains: GainTracker,
socket: Option<UdpSocket>,
streaming: bool,
keep_alive: Arc<AtomicBool>,
half_rtt: f64,
}
impl GaleaBoard {
pub fn new(ip_address: impl Into<String>) -> Self {
Self {
ip_address: ip_address.into(),
electrode_layout: Self::default_layout(),
gains: GainTracker::new(default_galea_gains()),
socket: None,
streaming: false,
keep_alive: Arc::new(AtomicBool::new(false)),
half_rtt: 0.0,
}
}
pub fn with_electrode_layout(mut self, layout: ElectrodeLayout) -> Self {
self.electrode_layout = layout;
self
}
fn default_layout() -> ElectrodeLayout {
use crate::electrode::{Electrode, SignalType};
let mut layout = ElectrodeLayout::new(24);
for i in 0..8 {
layout.set_electrode(i, Electrode {
label: format!("EMG{}", i + 1),
signal_type: SignalType::Emg,
note: None,
});
}
for i in 8..18 {
layout.set_electrode(i, Electrode {
label: format!("EEG{}", i - 7),
signal_type: SignalType::Eeg,
note: None,
});
}
for i in 18..22 {
layout.set_electrode(i, Electrode {
label: format!("AUX{}", i - 17),
signal_type: SignalType::Emg,
note: None,
});
}
layout
}
fn calibrate_time(&mut self) -> Result<()> {
let sock = self.socket.as_ref().ok_or(OpenBciError::BoardNotPrepared)?;
let mut rtt_sum = 0.0f64;
let time_cmd = b"F4444444";
for _ in 0..3 {
let t_start = now();
sock.send(time_cmd)?;
let mut resp = [0u8; 8];
sock.recv(&mut resp)?;
let t_done = now();
let rtt = t_done - t_start;
rtt_sum += rtt;
}
self.half_rtt = rtt_sum / 6.0; log::info!("Galea estimated half-RTT: {:.3} ms", self.half_rtt * 1000.0);
Ok(())
}
fn decode_packet(
buf: &[u8],
offset: usize,
gains: &GainTracker,
half_rtt: f64,
pc_timestamp: f64,
) -> GaleaSample {
let o = offset;
let packet_num = buf[o];
let mut eda_bytes = [0u8; 4];
eda_bytes.copy_from_slice(&buf[o + 1..o + 5]);
let eda_volts = f32::from_le_bytes(eda_bytes);
const EEG_SCALE: f64 = 4.5 / 8_388_607.0 * 1_000_000.0;
let mut eeg = vec![0.0f64; 24];
for ch in 0..24 {
let byte_off = o + 5 + ch * 3;
let raw = cast_24bit_to_i32(&buf[byte_off..byte_off + 3]);
eeg[ch] = EEG_SCALE / gains.gain_for(ch) * raw as f64;
}
let battery = buf[o + 77];
let temp_raw = u16::from_le_bytes([buf[o + 78], buf[o + 79]]);
let temperature_c = temp_raw as f32 / 100.0;
let mut ppg_red_bytes = [0u8; 4];
ppg_red_bytes.copy_from_slice(&buf[o + 80..o + 84]);
let ppg_red = i32::from_le_bytes(ppg_red_bytes);
let mut ppg_ir_bytes = [0u8; 4];
ppg_ir_bytes.copy_from_slice(&buf[o + 84..o + 88]);
let ppg_ir = i32::from_le_bytes(ppg_ir_bytes);
let mut ts_bytes = [0u8; 8];
ts_bytes.copy_from_slice(&buf[o + 88..o + 96]);
let device_ts_us = u64::from_le_bytes(ts_bytes);
let device_timestamp = device_ts_us as f64 / 1_000_000.0;
let corrected_ts = device_timestamp + (pc_timestamp - device_timestamp) - half_rtt;
let mut base = Sample::zeroed(24);
base.sample_num = packet_num;
base.eeg = eeg;
base.timestamp = corrected_ts;
let imu_offset = 96; let (accel, gyro, mag) = if packet_num % 5 == 0 && buf.len() > imu_offset + 18 {
let io = imu_offset;
const ACCEL_SCALE: f64 = 8.0 / 65535.0;
const GYRO_SCALE: f64 = 1000.0 / 65535.0;
const MAG_SCALE_XY: f64 = 2.6 / 8191.0;
const MAG_SCALE_Z: f64 = 5.0 / 32767.0;
let ax = cast_16bit_to_i32(&buf[io..]) as f64 * ACCEL_SCALE;
let ay = cast_16bit_to_i32(&buf[io+2..]) as f64 * ACCEL_SCALE;
let az = cast_16bit_to_i32(&buf[io+4..]) as f64 * ACCEL_SCALE;
let gx = cast_16bit_to_i32(&buf[io+6..]) as f64 * GYRO_SCALE;
let gy = cast_16bit_to_i32(&buf[io+8..]) as f64 * GYRO_SCALE;
let gz = cast_16bit_to_i32(&buf[io+10..]) as f64 * GYRO_SCALE;
let mx = cast_16bit_to_i32(&buf[io+12..]) as f64 * MAG_SCALE_XY;
let my = cast_16bit_to_i32(&buf[io+14..]) as f64 * MAG_SCALE_XY;
let mz = cast_16bit_to_i32(&buf[io+16..]) as f64 * MAG_SCALE_Z;
(Some([ax, ay, az]), Some([gx, gy, gz]), Some([mx, my, mz]))
} else {
(None, None, None)
};
GaleaSample {
base,
eda_volts,
battery,
temperature_c,
ppg_red,
ppg_ir,
device_timestamp,
accel,
gyro,
magnetometer: mag,
}
}
}
impl Board for GaleaBoard {
fn prepare(&mut self) -> Result<()> {
if self.socket.is_some() { return Ok(()); }
if self.ip_address.is_empty() {
use super::wifi_shield::discover_wifi_shield;
self.ip_address = discover_wifi_shield(Duration::from_secs(10));
log::info!("Galea discovered at {}", self.ip_address);
}
let sock = UdpSocket::bind("0.0.0.0:0")?;
sock.connect(format!("{}:{}", self.ip_address, GALEA_PORT))?;
sock.set_read_timeout(Some(Duration::from_secs(5)))?;
self.socket = Some(sock);
self.send_command("d")?;
self.send_command("~6")?;
self.calibrate_time()?;
Ok(())
}
fn start_stream(&mut self) -> Result<StreamHandle> {
if self.streaming { return Err(OpenBciError::AlreadyStreaming); }
let sock = self.socket.as_ref().ok_or(OpenBciError::BoardNotPrepared)?;
sock.send(b"b")?;
let reader = sock.try_clone()?;
let (sample_tx, sample_rx) = std::sync::mpsc::sync_channel::<Sample>(512);
let (stop_tx, stop_rx) = std::sync::mpsc::sync_channel::<()>(1);
let keep_alive = self.keep_alive.clone();
let gains = self.gains.clone();
let half_rtt = self.half_rtt;
keep_alive.store(true, Ordering::Release);
std::thread::spawn(move || {
let mut buf = vec![0u8; MAX_UDP_RECV];
loop {
if stop_rx.try_recv().is_ok() || !keep_alive.load(Ordering::Acquire) { break; }
let n = match reader.recv(&mut buf) {
Ok(n) => n,
Err(_) => continue,
};
if n < EXG_PACKET_SIZE || (n % EXG_PACKET_SIZE != 0 && n < EXG_PACKET_SIZE + 18) {
let preview = &buf[..n.min(64)];
log::debug!("Non-packet data received: {:?}", preview);
continue;
}
let pc_ts = now();
let num_pkts = n / EXG_PACKET_SIZE;
for pkt_i in 0..num_pkts {
let offset = pkt_i * EXG_PACKET_SIZE;
let galea_sample = GaleaBoard::decode_packet(
&buf[..n], offset, &gains, half_rtt, pc_ts
);
if sample_tx.send(galea_sample.base).is_err() { return; }
}
}
});
self.streaming = true;
Ok(StreamHandle { receiver: sample_rx, stop_tx: Some(stop_tx) })
}
fn stop_stream(&mut self) -> Result<()> {
if !self.streaming { return Err(OpenBciError::NotStreaming); }
self.keep_alive.store(false, Ordering::Release);
if let Some(ref sock) = self.socket {
let _ = sock.send(b"s");
}
self.streaming = false;
Ok(())
}
fn release(&mut self) -> Result<()> {
if self.streaming { let _ = self.stop_stream(); }
self.socket = None;
Ok(())
}
fn send_command(&mut self, cmd: &str) -> Result<String> {
let sock = self.socket.as_ref().ok_or(OpenBciError::BoardNotPrepared)?;
sock.send(cmd.as_bytes())?;
if self.streaming {
return Ok(String::new());
}
self.gains.apply_command(cmd);
let mut resp_bytes = Vec::new();
let mut buf = [0u8; 1024];
loop {
match sock.recv(&mut buf) {
Ok(n) if n > 0 && n % EXG_PACKET_SIZE != 0 => {
resp_bytes.extend_from_slice(&buf[..n]);
break;
}
_ => break,
}
}
Ok(String::from_utf8_lossy(&resp_bytes).into_owned())
}
fn electrode_layout(&self) -> &ElectrodeLayout { &self.electrode_layout }
fn set_electrode_layout(&mut self, l: ElectrodeLayout) { self.electrode_layout = l; }
fn channel_count(&self) -> usize { 24 }
fn sampling_rate(&self) -> u32 { 250 }
}
impl ConfigurableBoard for GaleaBoard {
fn apply_channel_config(&mut self, channel: usize, config: &ChannelConfig) -> Result<()> {
if channel >= 24 { return Err(OpenBciError::ChannelOutOfRange(channel, 24)); }
let letter = if channel < 16 {
CHANNEL_LETTERS[channel]
} else {
const GALEA_EXTRA: [char; 8] = ['A','S','D','G','H','J','K','L'];
GALEA_EXTRA[channel - 16]
};
let cmd = config.to_command(letter);
self.gains.apply_command(&cmd);
self.send_command(&cmd)?;
Ok(())
}
}