brainvision 0.0.1

Rust library and TUI for Brain Products BrainVision RDA EEG streams over TCP/IP
Documentation
use std::io::Write;
use std::net::TcpListener;
use std::thread;
use std::time::Duration;

use brainvision::prelude::*;

fn start_payload_2ch() -> Vec<u8> {
    let mut p = Vec::new();
    p.extend_from_slice(&2u32.to_le_bytes());
    p.extend_from_slice(&2000.0f64.to_le_bytes()); // 500 Hz
    p.extend_from_slice(&0.1f64.to_le_bytes());
    p.extend_from_slice(&0.1f64.to_le_bytes());
    p.extend_from_slice(b"Cz\0Fz\0\0");
    p
}

fn marker_bytes(position: u32, points: u32, channel: i32, kind: &str, desc: &str) -> Vec<u8> {
    let mut m = Vec::new();
    let size = 16 + kind.len() + 1 + desc.len() + 1;
    m.extend_from_slice(&(size as u32).to_le_bytes());
    m.extend_from_slice(&position.to_le_bytes());
    m.extend_from_slice(&points.to_le_bytes());
    m.extend_from_slice(&channel.to_le_bytes());
    m.extend_from_slice(kind.as_bytes());
    m.push(0);
    m.extend_from_slice(desc.as_bytes());
    m.push(0);
    m
}

fn data16_payload(block: u32, vals: &[i16], markers: &[Vec<u8>]) -> Vec<u8> {
    let mut p = Vec::new();
    p.extend_from_slice(&block.to_le_bytes());
    p.extend_from_slice(&((vals.len() / 2) as u32).to_le_bytes()); // points for 2ch
    p.extend_from_slice(&(markers.len() as u32).to_le_bytes());
    for v in vals {
        p.extend_from_slice(&v.to_le_bytes());
    }
    for m in markers {
        p.extend_from_slice(m);
    }
    p
}

#[test]
fn test_mock_server_capture_and_reconnect() {
    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
    let addr = listener.local_addr().unwrap();

    let server = thread::spawn(move || {
        // Connection 1
        {
            let (mut s, _) = listener.accept().unwrap();
            let start = make_frame(GUID_START, &start_payload_2ch());
            s.write_all(&start).unwrap();
            let d1 = make_frame(
                GUID_DATA16,
                &data16_payload(1, &[100, -100, 200, -200], &[]),
            );
            s.write_all(&d1).unwrap();
            // drop connection to force reconnect
        }
        // Connection 2
        {
            let (mut s, _) = listener.accept().unwrap();
            let start = make_frame(GUID_START, &start_payload_2ch());
            s.write_all(&start).unwrap();
            let d2 = make_frame(
                GUID_DATA16,
                &data16_payload(2, &[300, -300, 400, -400], &[]),
            );
            s.write_all(&d2).unwrap();
        }
    });

    let mut dev = BrainVisionDevice::connect(&addr.ip().to_string(), addr.port()).unwrap();
    let h = dev.wait_for_start().unwrap();
    assert_eq!(h.channel_count, 2);

    // First block
    let b1 = dev.next_block().unwrap().unwrap();
    assert_eq!(b1.block, 1);

    // Force reconnect on next read
    let b2 = dev
        .next_block_resilient(3, Duration::from_millis(20))
        .unwrap()
        .unwrap();
    assert_eq!(b2.block, 2);

    server.join().unwrap();
}

#[test]
fn test_backpressure_and_marker_callback() {
    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
    let addr = listener.local_addr().unwrap();

    let server = thread::spawn(move || {
        let (mut s, _) = listener.accept().unwrap();
        s.write_all(&make_frame(GUID_START, &start_payload_2ch()))
            .unwrap();

        let vals: Vec<i16> = (0..40).map(|i| i as i16).collect(); // 20 points, 2 channels
        let mk = marker_bytes(5, 1, -1, "Stimulus", "S 7");
        s.write_all(&make_frame(GUID_DATA16, &data16_payload(1, &vals, &[mk])))
            .unwrap();
    });

    let cfg = DeviceConfig {
        max_scan_buffer: 4,
        backpressure_policy: BackpressurePolicy::DropNewest,
        ..DeviceConfig::default()
    };
    let mut dev =
        BrainVisionDevice::connect_with_config(&addr.ip().to_string(), addr.port(), cfg).unwrap();
    dev.wait_for_start().unwrap();

    let marker_hits = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
    let marker_hits_cb = marker_hits.clone();
    dev.set_marker_callback(move |_| {
        marker_hits_cb.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    });

    // trigger ingest
    let _ = dev.next_scan().unwrap();

    assert!(dev.stats().dropped_by_backpressure > 0);
    assert_eq!(marker_hits.load(std::sync::atomic::Ordering::SeqCst), 1);

    server.join().unwrap();
}

#[test]
fn test_export_helpers() {
    let scans = vec![
        Scan {
            data: vec![1.0, 2.0],
        },
        Scan {
            data: vec![3.0, 4.0],
        },
    ];
    let markers = vec![Marker {
        position: 1,
        points: 1,
        channel: -1,
        kind: "Stimulus".into(),
        description: "S1".into(),
    }];
    let header = HeaderInfo {
        channel_count: 2,
        sampling_interval_us: 2000.0,
        resolutions_uv: vec![0.1, 0.1],
        channel_names: vec!["Cz".into(), "Fz".into()],
    };

    let dir = std::env::temp_dir();
    let prefix = dir.join("brainvision_test_export");
    write_brainvision_triplet(&prefix, &header, &scans, &markers).unwrap();
}