finalspark_rs/lib.rs
1//! A Rust library for recording live data from MEA (Microelectrode Array) devices.
2//!
3//! This crate provides functionality to connect to and record data from MEA devices
4//! through a WebSocket connection. It supports both single and multiple sample recordings.
5//!
6//! # Example
7//!
8//! ```rust
9//! use finalspark_rs::LiveMEA;
10//!
11//! #[tokio::main]
12//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
13//! let mea = LiveMEA::new();
14//!
15//! // Record a single sample from MEA device 1
16//! let sample = mea.record_sample(1).await?;
17//! println!("Recorded {} electrodes", sample.data.len());
18//!
19//! // Record multiple samples
20//! let samples = mea.record_n_samples(1, 3).await?;
21//! println!("Recorded {} samples", samples.len());
22//!
23//! Ok(())
24//! }
25//! ```
26
27use futures_util::{SinkExt, StreamExt};
28use serde::{Deserialize, Serialize};
29use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
30use url::Url;
31
32/// The Socket.IO URL of the MEA server.
33const MEA_SERVER_URL: &str =
34 "wss://livemeaservice2.alpvision.com/socket.io/?EIO=4&transport=websocket";
35
36/// Internal struct for parsing Socket.IO handshake messages.
37/// This is used to validate the initial connection response.
38#[derive(Deserialize)]
39struct SocketIOHandshake {}
40
41/// Represents live data recorded from MEA devices.
42///
43/// Each `LiveData` instance contains:
44/// * A timestamp string in RFC3339 format
45/// * A 2D array of electrode data where:
46/// * The outer vector contains 32 electrodes
47/// * Each inner vector contains 4096 samples per electrode
48///
49/// # Example
50///
51/// ```rust
52/// use finalspark_rs::LiveData;
53///
54/// fn process_data(data: LiveData) {
55/// println!("Timestamp: {}", data.timestamp);
56/// println!("Number of electrodes: {}", data.data.len());
57/// println!("Samples per electrode: {}", data.data[0].len());
58/// }
59/// ```
60#[derive(Serialize, Deserialize, Debug, Clone)]
61pub struct LiveData {
62 /// The timestamp of when the data was recorded (RFC3339 format)
63 pub timestamp: String,
64 /// The electrode data as a 2D array [32][4096]
65 pub data: Vec<Vec<f32>>,
66}
67
68/// Main struct for handling MEA device connections and data recording.
69///
70/// This struct provides methods to:
71/// * Record single samples from MEA devices
72/// * Record multiple samples in sequence
73/// * Validate MEA device IDs
74///
75/// # Examples
76///
77/// ```rust
78/// use finalspark_rs::LiveMEA;
79///
80/// #[tokio::main]
81/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
82/// let mea = LiveMEA::new();
83///
84/// // Record from MEA device 1
85/// let data = mea.record_sample(1).await?;
86///
87/// Ok(())
88/// }
89/// ```
90pub struct LiveMEA {}
91
92impl LiveMEA {
93 /// Creates a new instance of the MEA handler.
94 ///
95 /// # Examples
96 ///
97 /// ```rust
98 /// use finalspark_rs::LiveMEA;
99 ///
100 /// let mea = LiveMEA::new();
101 /// ```
102 pub fn new() -> Self {
103 Self {}
104 }
105
106 /// Validates that the MEA ID is within the acceptable range (1-4).
107 ///
108 /// # Arguments
109 ///
110 /// * `mea_id` - The ID of the MEA device to validate
111 ///
112 /// # Errors
113 ///
114 /// Returns an error if the MEA ID is not between 1 and 4.
115 fn validate_mea_id(mea_id: u32) -> Result<(), Box<dyn std::error::Error>> {
116 if mea_id < 1 || mea_id > 4 {
117 return Err("MEA ID must be an integer in the range 1-4".into());
118 }
119 Ok(())
120 }
121
122 /// Records a single sample of live data from the specified MEA device.
123 ///
124 /// This method:
125 /// 1. Connects to the MEA server
126 /// 2. Performs Socket.IO handshake
127 /// 3. Requests data from the specified device
128 /// 4. Processes and returns the binary data
129 ///
130 /// # Arguments
131 ///
132 /// * `mea_id` - The ID of the MEA device to record from (1-4)
133 ///
134 /// # Returns
135 ///
136 /// Returns a `Result` containing either:
137 /// * `LiveData` with the recorded sample
138 /// * An error if the recording failed
139 ///
140 /// # Examples
141 ///
142 /// ```rust
143 /// use finalspark_rs::LiveMEA;
144 ///
145 /// #[tokio::main]
146 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
147 /// let mea = LiveMEA::new();
148 /// let data = mea.record_sample(1).await?;
149 /// println!("Recorded at: {}", data.timestamp);
150 /// Ok(())
151 /// }
152 /// ```
153 pub async fn record_sample(&self, mea_id: u32) -> Result<LiveData, Box<dyn std::error::Error>> {
154 Self::validate_mea_id(mea_id)?;
155 let mea_index = mea_id - 1;
156
157 let url = Url::parse(MEA_SERVER_URL)?;
158 let (mut ws_stream, _) = connect_async(url).await?;
159
160 // Handle Socket.IO handshake
161 if let Some(msg) = ws_stream.next().await {
162 match msg? {
163 Message::Text(text) => {
164 if text.starts_with("0") {
165 let _handshake: SocketIOHandshake = serde_json::from_str(&text[1..])?;
166 ws_stream.send(Message::Text("40".to_string())).await?;
167 let mea_msg = format!("42[\"meaid\",{}]", mea_index);
168 ws_stream.send(Message::Text(mea_msg)).await?;
169 }
170 }
171 _ => return Err("Invalid handshake response".into()),
172 }
173 }
174
175 while let Some(msg) = ws_stream.next().await {
176 match msg? {
177 Message::Text(text) => {
178 if text.starts_with("2") {
179 ws_stream.send(Message::Text("3".to_string())).await?;
180 }
181 }
182 Message::Binary(buffer) => {
183 let raw = buffer
184 .chunks(4)
185 .map(|chunk| f32::from_ne_bytes(chunk.try_into().unwrap()))
186 .collect::<Vec<f32>>();
187
188 // Check if we have data for one MEA (32 electrodes) or all MEAs (128 electrodes)
189 let start_idx = if raw.len() == 32 * 4096 {
190 0 // Single MEA data
191 } else if raw.len() == 128 * 4096 {
192 (mea_index as usize) * 32 * 4096 // Full dataset
193 } else {
194 return Err(format!(
195 "Unexpected data size: got {} values, expected {} or {}",
196 raw.len(),
197 32 * 4096,
198 128 * 4096
199 )
200 .into());
201 };
202
203 let elec_data: Vec<Vec<f32>> = (0..32)
204 .map(|i| raw[start_idx + i * 4096..start_idx + (i + 1) * 4096].to_vec())
205 .collect();
206
207 let live_data = LiveData {
208 timestamp: chrono::Utc::now().to_rfc3339(),
209 data: elec_data,
210 };
211
212 let _ = ws_stream.close(None).await;
213 return Ok(live_data);
214 }
215 Message::Close(_) => return Err("Server closed connection".into()),
216 _ => continue,
217 }
218 }
219
220 Err("Connection closed without receiving data".into())
221 }
222
223 /// Records multiple samples of live data from the specified MEA device.
224 ///
225 /// # Arguments
226 ///
227 /// * `mea_id` - The ID of the MEA device to record from (1-4)
228 /// * `n` - The number of samples to record
229 ///
230 /// # Returns
231 ///
232 /// Returns a `Result` containing either:
233 /// * A vector of `LiveData` instances
234 /// * An error if the recording failed
235 ///
236 /// # Examples
237 ///
238 /// ```rust
239 /// use finalspark_rs::LiveMEA;
240 ///
241 /// #[tokio::main]
242 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
243 /// let mea = LiveMEA::new();
244 /// let samples = mea.record_n_samples(1, 3).await?;
245 /// println!("Recorded {} samples", samples.len());
246 /// Ok(())
247 /// }
248 /// ```
249 pub async fn record_n_samples(
250 &self,
251 mea_id: u32,
252 n: usize,
253 ) -> Result<Vec<LiveData>, Box<dyn std::error::Error>> {
254 Self::validate_mea_id(mea_id)?;
255 let mut data = Vec::with_capacity(n);
256 for _ in 0..n {
257 let sample = self.record_sample(mea_id).await?;
258 data.push(sample);
259 }
260 Ok(data)
261 }
262}
263
264// Add test module
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use std::time::Instant;
269
270 #[test]
271 fn test_validate_mea_id() {
272 assert!(LiveMEA::validate_mea_id(1).is_ok());
273 assert!(LiveMEA::validate_mea_id(4).is_ok());
274 assert!(LiveMEA::validate_mea_id(0).is_err());
275 assert!(LiveMEA::validate_mea_id(5).is_err());
276 }
277
278 /// Helper function to validate electrode data structure
279 fn validate_electrode_data(data: &LiveData) {
280 assert_eq!(data.data.len(), 32, "Expected 32 electrodes");
281 assert_eq!(
282 data.data[0].len(),
283 4096,
284 "Expected 4096 samples per electrode"
285 );
286
287 // Validate data contains actual values
288 let sum: f32 = data
289 .data
290 .iter()
291 .flat_map(|electrode| electrode.iter())
292 .sum();
293 assert!(sum != 0.0, "Data appears to be all zeros");
294 }
295
296 #[tokio::test]
297 async fn test_single_sample_recording() {
298 let live_mea = LiveMEA::new();
299 let start = Instant::now();
300
301 let result = live_mea.record_sample(1).await;
302 assert!(
303 result.is_ok(),
304 "Failed to record sample: {:?}",
305 result.err()
306 );
307
308 let data = result.unwrap();
309 println!("✓ Recorded sample in {:?}", start.elapsed());
310 assert_eq!(data.data.len(), 32, "Expected 32 electrodes");
311 assert_eq!(
312 data.data[0].len(),
313 4096,
314 "Expected 4096 samples per electrode"
315 );
316 }
317
318 #[tokio::test]
319 async fn test_multiple_sample_recording() {
320 let live_mea = LiveMEA::new();
321 let start = Instant::now();
322
323 let result = live_mea.record_n_samples(1, 3).await;
324 assert!(
325 result.is_ok(),
326 "Failed to record samples: {:?}",
327 result.err()
328 );
329
330 let samples = result.unwrap();
331 println!(
332 "✓ Recorded {} samples in {:?}",
333 samples.len(),
334 start.elapsed()
335 );
336 assert_eq!(samples.len(), 3, "Expected 3 samples");
337
338 for sample in samples.iter() {
339 assert_eq!(sample.data.len(), 32, "Expected 32 electrodes");
340 assert_eq!(
341 sample.data[0].len(),
342 4096,
343 "Expected 4096 samples per electrode"
344 );
345 }
346 }
347
348 #[tokio::test]
349 async fn test_invalid_mea_id() {
350 let live_mea = LiveMEA::new();
351 let result = live_mea.record_sample(5).await;
352 assert!(result.is_err(), "Expected error for invalid MEA ID");
353 }
354
355 #[tokio::test]
356 async fn test_all_meas_single_sample() {
357 let live_mea = LiveMEA::new();
358
359 for mea_id in 1..=4 {
360 let start = Instant::now();
361 println!("\nTesting MEA {}", mea_id);
362
363 match live_mea.record_sample(mea_id).await {
364 Ok(data) => {
365 println!("✓ MEA {} recorded in {:?}", mea_id, start.elapsed());
366 validate_electrode_data(&data);
367 }
368 Err(e) => panic!("Failed to record from MEA {}: {:?}", mea_id, e),
369 }
370 }
371 }
372
373 #[tokio::test]
374 async fn test_all_meas_multiple_samples() {
375 let live_mea = LiveMEA::new();
376 let samples_per_mea = 2;
377
378 for mea_id in 1..=4 {
379 let start = Instant::now();
380 println!("\nTesting MEA {} with {} samples", mea_id, samples_per_mea);
381
382 match live_mea.record_n_samples(mea_id, samples_per_mea).await {
383 Ok(samples) => {
384 println!(
385 "✓ MEA {} recorded {} samples in {:?}",
386 mea_id,
387 samples.len(),
388 start.elapsed()
389 );
390
391 assert_eq!(
392 samples.len(),
393 samples_per_mea,
394 "Wrong number of samples from MEA {}",
395 mea_id
396 );
397
398 for (i, sample) in samples.iter().enumerate() {
399 println!("Validating MEA {} sample {}", mea_id, i + 1);
400 validate_electrode_data(sample);
401 }
402 }
403 Err(e) => panic!("Failed to record from MEA {}: {:?}", mea_id, e),
404 }
405 }
406 }
407
408 #[tokio::test]
409 async fn test_sequential_recordings() {
410 let live_mea = LiveMEA::new();
411 let total_start = Instant::now();
412
413 // Record one sample from each MEA in sequence
414 for iteration in 1..=3 {
415 println!("\nIteration {}", iteration);
416
417 for mea_id in 1..=4 {
418 let start = Instant::now();
419 match live_mea.record_sample(mea_id).await {
420 Ok(data) => {
421 println!("✓ MEA {} recorded in {:?}", mea_id, start.elapsed());
422 validate_electrode_data(&data);
423 }
424 Err(e) => panic!(
425 "Failed to record from MEA {} on iteration {}: {:?}",
426 mea_id, iteration, e
427 ),
428 }
429 }
430 }
431
432 println!("\nAll recordings completed in {:?}", total_start.elapsed());
433 }
434}