rusty_tip/buffered_tcp_reader.rs
1//! Buffered TCP Reader for continuous signal data collection
2//!
3//! This module provides a BufferedTCPReader that automatically buffers TCP logger data
4//! in the background using a lightweight time-series database approach. It leverages
5//! the existing TCPLoggerStream infrastructure while providing efficient time-windowed
6//! queries for synchronized data collection during SPM experiments.
7
8use crate::types::TimestampedSignalFrame;
9use crate::NanonisError;
10use nanonis_rs::TCPLoggerStream;
11use parking_lot::RwLock;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::{mpsc, Arc};
15use std::thread::{self, JoinHandle};
16use std::time::{Duration, Instant};
17
18// TODO: For 2kHz sampling, consider replacing with:
19// use crossbeam::queue::ArrayQueue; // Lock-free ring buffer
20// use parking_lot::RwLock; // Faster reader-writer lock
21
22/// Buffered TCP reader that continuously collects timestamped signal data
23///
24/// This component creates a background thread that reads lightweight SignalFrame data
25/// from TCPLoggerStream's channel and buffers it with high-resolution timestamps in a
26/// circular buffer. It provides time-windowed query methods for retrieving data before,
27/// during, and after specific time periods.
28///
29/// # High-Frequency Performance (2kHz+)
30/// **IMPORTANT**: At sampling rates above 1kHz, lock contention becomes critical:
31/// - Current implementation uses `Mutex<VecDeque>` suitable for <1kHz
32/// - For 2kHz+, consider `crossbeam::queue::ArrayQueue` (lock-free)
33/// - Alternative: `parking_lot::RwLock` for multiple concurrent readers
34/// - Query methods must complete in <0.1ms to avoid data loss
35///
36/// # Memory Efficiency
37/// Works with lightweight SignalFrame structures (just counter + data) throughout the
38/// entire pipeline, avoiding the overhead of full TCPLoggerData per frame.
39///
40/// # Architecture
41/// - TCPLoggerStream converts protocol data to SignalFrame (protocol → lightweight conversion)
42/// - BufferedTCPReader adds timestamps to SignalFrame (timing layer)
43/// - Thread-safe time-windowed queries while continuous collection runs in background
44pub struct BufferedTCPReader {
45 /// Thread-safe circular buffer of timestamped signal frames
46 buffer: Arc<RwLock<VecDeque<TimestampedSignalFrame>>>,
47 /// Background thread handle for buffering operations
48 buffering_thread: Option<JoinHandle<Result<(), NanonisError>>>,
49 /// Maximum number of frames to keep in circular buffer
50 max_buffer_size: usize,
51 /// Time when data collection started (for relative timestamps)
52 start_time: Instant,
53 /// Signal to shut down background thread
54 shutdown_signal: Arc<AtomicBool>,
55 /// Number of channels (configuration parameter)
56 num_channels: u32,
57 /// Oversampling rate (configuration parameter)
58 oversampling: f32,
59}
60
61impl BufferedTCPReader {
62 /// Create a new BufferedTCPReader with automatic background data collection
63 ///
64 /// This establishes a connection to the TCP logger stream and starts a background
65 /// thread for continuous data buffering with lightweight SignalFrame structures.
66 ///
67 /// # Arguments
68 /// * `host` - TCP server host address (e.g., "127.0.0.1")
69 /// * `port` - TCP logger data stream port (typically 6590)
70 /// * `buffer_size` - Maximum number of frames to keep in circular buffer
71 /// * `num_channels` - Number of channels being recorded by TCP logger
72 /// * `oversampling` - Oversampling rate configured for TCP logger
73 ///
74 /// # Returns
75 /// A BufferedTCPReader with active background collection, ready for queries
76 ///
77 /// # Implementation Notes
78 /// - Creates TCPLoggerStream and gets its background reader channel
79 /// - Starts buffering thread that converts SignalFrame to TimestampedSignalFrame
80 /// - Implements circular buffer behavior (drops oldest when full)
81 pub fn new(
82 host: &str,
83 port: u16,
84 buffer_size: usize,
85 num_channels: u32,
86 oversampling: f32,
87 ) -> Result<Self, NanonisError> {
88 let tcp_stream = TCPLoggerStream::new(host, port)?;
89 let tcp_receiver = tcp_stream.spawn_background_reader();
90
91 let buffer =
92 Arc::new(RwLock::new(VecDeque::with_capacity(buffer_size)));
93 let buffer_clone = buffer.clone();
94
95 let shutdown_signal = Arc::new(AtomicBool::new(false));
96 let shutdown_clone = shutdown_signal.clone();
97
98 let start_time = Instant::now();
99
100 // Don't block waiting for first frame - let background thread handle it
101 // The TCP logger might not be started yet when this constructor runs
102
103 let buffering_thread = thread::spawn(
104 move || -> Result<(), NanonisError> {
105 log::debug!("Started buffering thread for TCP logger data");
106
107 while !shutdown_clone.load(Ordering::Relaxed) {
108 match tcp_receiver.recv_timeout(Duration::from_millis(100))
109 {
110 Ok(signal_frame) => {
111 // Skip the first frame (signal indices metadata)
112 if signal_frame.counter == 0 {
113 log::debug!("Skipping metadata frame (counter=0) with signal indices");
114 continue;
115 }
116
117 let timestamped_frame = TimestampedSignalFrame::new(
118 signal_frame,
119 start_time,
120 );
121
122 {
123 let mut buffer = buffer_clone.write();
124 buffer.push_back(timestamped_frame);
125
126 if buffer.len() > buffer_size {
127 buffer.pop_front();
128 }
129 }
130 }
131 Err(mpsc::RecvTimeoutError::Timeout) => {
132 continue;
133 }
134 Err(mpsc::RecvTimeoutError::Disconnected) => {
135 log::info!("TCP logger stream disconnected ending buffering");
136 break;
137 }
138 }
139 }
140 Ok(())
141 },
142 );
143
144 Ok(Self {
145 buffer,
146 buffering_thread: Some(buffering_thread),
147 max_buffer_size: buffer_size,
148 start_time,
149 shutdown_signal,
150 num_channels,
151 oversampling,
152 })
153 }
154
155 /// Check if the background buffering thread is still active
156 ///
157 /// # Returns
158 /// `true` if buffering is active, `false` if stopped or failed
159 pub fn is_buffering(&self) -> bool {
160 !self.shutdown_signal.load(Ordering::Relaxed)
161 }
162
163 /// Get current buffer utilization as a percentage
164 ///
165 /// # Returns
166 /// Value between 0.0 and 1.0 indicating how full the buffer is
167 ///
168 /// # Usage
169 /// Useful for monitoring buffer health and detecting if data collection
170 /// is faster than buffer capacity
171 pub fn buffer_utilization(&self) -> f64 {
172 let buffer = self.buffer.read();
173 buffer.len() as f64 / self.max_buffer_size as f64
174 }
175
176 /// Get the total uptime of the buffered TCP reader
177 ///
178 /// Returns the duration since the BufferedTCPReader was created and started
179 /// collecting data. This can be useful for monitoring, logging, and understanding
180 /// the data collection timespan.
181 ///
182 /// # Returns
183 /// Duration since the reader was started
184 ///
185 /// # Thread Safety
186 /// This method is very fast as it only reads the start_time field and calculates
187 /// the current duration. No locks are acquired.
188 ///
189 /// # Example
190 /// ```rust,ignore
191 /// let tcp_reader = BufferedTCPReader::new("127.0.0.1", 6590, 1000, 24, 100.0)?;
192 ///
193 /// // Later...
194 /// let uptime = tcp_reader.uptime();
195 /// println!("TCP reader has been running for {:.1}s", uptime.as_secs_f64());
196 ///
197 /// // Useful for rate calculations
198 /// let (frame_count, _, _) = tcp_reader.buffer_stats();
199 /// let avg_rate = frame_count as f64 / uptime.as_secs_f64();
200 /// println!("Average data rate: {:.1} frames/sec", avg_rate);
201 /// ```
202 pub fn uptime(&self) -> Duration {
203 self.start_time.elapsed()
204 }
205
206 /// Get all signal data since a specific timestamp
207 ///
208 /// # Arguments
209 /// * `since` - Timestamp to start collecting data from
210 ///
211 /// # Returns
212 /// Vector of timestamped signal frames from the specified time onwards
213 ///
214 /// # Thread Safety
215 /// This method acquires a lock on the buffer briefly to copy matching frames.
216 /// Lock is held for minimal time to avoid blocking the buffering thread.
217 pub fn get_data_since(
218 &self,
219 since: Instant,
220 ) -> Vec<TimestampedSignalFrame> {
221 let buffer = self.buffer.read();
222 buffer
223 .iter()
224 .filter(|frame| frame.timestamp >= since)
225 .cloned()
226 .collect()
227 }
228
229 /// Get signal data between two timestamps (time window query)
230 ///
231 /// # Arguments
232 /// * `start` - Start of time window (inclusive)
233 /// * `end` - End of time window (inclusive)
234 ///
235 /// # Returns
236 /// Vector of timestamped signal frames within the specified time window
237 ///
238 /// # Thread Safety
239 /// Minimizes lock time to avoid blocking the buffering thread.
240 ///
241 /// # Usage
242 /// This is the core method for synchronized data collection during actions.
243 /// Typically used to get data before/during/after specific operations.
244 pub fn get_data_between(
245 &self,
246 start: Instant,
247 end: Instant,
248 ) -> Vec<TimestampedSignalFrame> {
249 let buffer = self.buffer.read();
250 buffer
251 .iter()
252 .filter(|frame| frame.timestamp >= start && frame.timestamp <= end)
253 .cloned()
254 .collect()
255 }
256
257 /// Get recent signal data for a specific duration
258 ///
259 /// # Arguments
260 /// * `duration` - How far back to collect data from current time
261 ///
262 /// # Returns
263 /// Vector of timestamped signal frames from the recent past
264 ///
265 /// # Thread Safety
266 /// Delegates to get_data_since() which minimizes lock time.
267 ///
268 /// # Usage
269 /// Convenient for real-time monitoring and getting recent signal history
270 /// without needing to track specific timestamps
271 pub fn get_recent_data(
272 &self,
273 duration: Duration,
274 ) -> Vec<TimestampedSignalFrame> {
275 let since = Instant::now() - duration;
276 self.get_data_since(since)
277 }
278
279 /// Get all buffered signal data
280 ///
281 /// # Returns
282 /// Vector containing all currently buffered timestamped signal frames
283 ///
284 /// # Thread Safety
285 /// WARNING: This clones the entire buffer. For large buffers, prefer time-windowed queries.
286 /// Lock is held briefly but cloning large amounts of data may still impact performance.
287 ///
288 /// # Usage
289 /// Useful for final data collection when stopping buffering, or for
290 /// full experiment analysis
291 pub fn get_all_data(&self) -> Vec<TimestampedSignalFrame> {
292 let buffer = self.buffer.read();
293 buffer.iter().cloned().collect()
294 }
295
296 /// Get TCP logger configuration that was provided during construction
297 ///
298 /// # Returns
299 /// Tuple of (num_channels, oversampling) from the TCP logger
300 ///
301 /// # Usage
302 /// Needed when converting TimestampedSignalFrame back to TCPLoggerData
303 /// for backward compatibility
304 pub fn get_tcp_config(&self) -> (u32, f32) {
305 (self.num_channels, self.oversampling)
306 }
307
308 /// Get buffer statistics for monitoring
309 ///
310 /// # Returns
311 /// Tuple of (current_count, max_capacity, time_span_of_data)
312 ///
313 /// # Thread Safety
314 /// Very brief lock to read buffer metadata only, no cloning.
315 ///
316 /// # Usage
317 /// Useful for monitoring buffer health, detecting overruns, and
318 /// understanding the time span of collected data
319 pub fn buffer_stats(&self) -> (usize, usize, Duration) {
320 let buffer = self.buffer.read();
321 let count = buffer.len();
322 let capacity = self.max_buffer_size;
323 let time_span = if let (Some(first), Some(last)) =
324 (buffer.front(), buffer.back())
325 {
326 last.timestamp.duration_since(first.timestamp)
327 } else {
328 Duration::ZERO
329 };
330 (count, capacity, time_span)
331 }
332
333 /// Get the most recent N frames from the buffer
334 ///
335 /// Returns frames in reverse chronological order (newest first).
336 /// If fewer than `count` frames are available, returns all available frames.
337 ///
338 /// # Arguments
339 /// * `count` - Maximum number of frames to retrieve
340 ///
341 /// # Returns
342 /// Vector of timestamped signal frames, newest first
343 ///
344 /// # Example
345 /// ```rust,ignore
346 /// let recent_100 = tcp_reader.get_recent_frames(100);
347 /// ```
348 pub fn get_recent_frames(
349 &self,
350 count: usize,
351 ) -> Vec<TimestampedSignalFrame> {
352 let buffer = self.buffer.read();
353 buffer.iter().rev().take(count).cloned().collect()
354 }
355
356 /// Get the oldest N frames from the buffer
357 ///
358 /// Returns frames in chronological order (oldest first).
359 /// If fewer than `count` frames are available, returns all available frames.
360 /// Useful for FIFO processing or getting a stable baseline.
361 ///
362 /// # Arguments
363 /// * `count` - Maximum number of frames to retrieve
364 ///
365 /// # Returns
366 /// Vector of timestamped signal frames, oldest first
367 ///
368 /// # Example
369 /// ```rust,ignore
370 /// let baseline = tcp_reader.get_oldest_frames(50);
371 /// ```
372 pub fn get_oldest_frames(
373 &self,
374 count: usize,
375 ) -> Vec<TimestampedSignalFrame> {
376 let buffer = self.buffer.read();
377 buffer.iter().take(count).cloned().collect()
378 }
379
380 /// Get the current number of frames in the buffer
381 ///
382 /// Returns the total count of frames currently stored in the circular buffer.
383 /// This can be used to check buffer fill level or validate requests.
384 ///
385 /// # Returns
386 /// Number of frames currently buffered
387 ///
388 /// # Example
389 /// ```rust,ignore
390 /// let available = tcp_reader.frame_count();
391 /// if available >= 100 {
392 /// let data = tcp_reader.get_recent_frames(100);
393 /// }
394 /// ```
395 pub fn frame_count(&self) -> usize {
396 let buffer = self.buffer.read();
397 buffer.len()
398 }
399
400 /// Get frames from a specific range in the buffer
401 ///
402 /// Returns frames starting from `start_idx` (0 = oldest frame) for `count` frames.
403 /// If the range extends beyond available data, returns available frames only.
404 /// Useful for windowed analysis or specific time periods.
405 ///
406 /// # Arguments
407 /// * `start_idx` - Starting index (0 = oldest frame in buffer)
408 /// * `count` - Number of frames to retrieve from start_idx
409 ///
410 /// # Returns
411 /// Vector of timestamped signal frames in chronological order
412 ///
413 /// # Example
414 /// ```rust,ignore
415 /// // Get frames 50-149 (middle section of buffer)
416 /// let middle_data = tcp_reader.get_frame_range(50, 100);
417 /// ```
418 pub fn get_frame_range(
419 &self,
420 start_idx: usize,
421 count: usize,
422 ) -> Vec<TimestampedSignalFrame> {
423 let buffer = self.buffer.read();
424
425 buffer.iter().skip(start_idx).take(count).cloned().collect()
426 }
427
428 /// Check if the buffer has at least N frames available
429 ///
430 /// Convenience method to check data availability before requesting frames.
431 /// More efficient than getting frame_count() when you only need a threshold check.
432 ///
433 /// # Arguments
434 /// * `min_count` - Minimum number of frames required
435 ///
436 /// # Returns
437 /// True if buffer contains at least `min_count` frames
438 ///
439 /// # Example
440 /// ```rust,ignore
441 /// if tcp_reader.has_frames(100) {
442 /// let stable_data = tcp_reader.get_recent_frames(100);
443 /// } else {
444 /// println!("Not enough data yet, only {} frames", tcp_reader.frame_count());
445 /// }
446 /// ```
447 pub fn has_frames(&self, min_count: usize) -> bool {
448 self.frame_count() > min_count
449 }
450
451 /// Clear all buffered data
452 ///
453 /// This removes all frames from the buffer, effectively resetting it to an empty state.
454 /// The background thread continues to run and will start filling the buffer again.
455 /// This is useful when you want to discard old data and start fresh.
456 ///
457 /// # Example
458 /// ```rust,ignore
459 /// // Clear any stale data before starting a new measurement
460 /// tcp_reader.clear_buffer();
461 /// thread::sleep(Duration::from_millis(500)); // Wait for fresh data
462 /// let fresh_data = tcp_reader.get_recent_data(Duration::from_millis(100));
463 /// ```
464 pub fn clear_buffer(&self) {
465 let mut buffer = self.buffer.write();
466 buffer.clear();
467 log::debug!("Cleared TCP reader buffer");
468 }
469
470 /// Stop background buffering and clean up resources
471 ///
472 /// # Returns
473 /// Result indicating if cleanup was successful
474 ///
475 /// # Implementation Notes
476 /// - Sets shutdown signal to stop background thread
477 /// - Waits for thread to finish and returns any errors
478 /// - Called automatically when BufferedTCPReader is dropped
479 pub fn stop(&mut self) -> Result<(), NanonisError> {
480 self.shutdown_signal.store(true, Ordering::Relaxed);
481 if let Some(handle) = self.buffering_thread.take() {
482 match handle.join() {
483 Ok(result) => result,
484 Err(_) => Err(NanonisError::Protocol(
485 "Buffering thread panicked".to_string(),
486 )),
487 }
488 } else {
489 Ok(())
490 }
491 }
492}
493
494impl Drop for BufferedTCPReader {
495 /// Automatically stop buffering when BufferedTCPReader is dropped
496 fn drop(&mut self) {
497 let _ = self.stop();
498 }
499}