Skip to main content

rusty_tip/
buffered_tcp_reader.rs

1//! Buffered TCP Reader for continuous signal data collection
2//!
3//! This module provides a BufferedTCPReader that automatically buffers TCP logger data
4//! in the background using a lightweight time-series database approach. It leverages
5//! the existing TCPLoggerStream infrastructure while providing efficient time-windowed
6//! queries for synchronized data collection during SPM experiments.
7
8use crate::types::TimestampedSignalFrame;
9use crate::NanonisError;
10use nanonis_rs::TCPLoggerStream;
11use parking_lot::RwLock;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::{mpsc, Arc};
15use std::thread::{self, JoinHandle};
16use std::time::{Duration, Instant};
17
18// TODO: For 2kHz sampling, consider replacing with:
19// use crossbeam::queue::ArrayQueue; // Lock-free ring buffer
20// use parking_lot::RwLock;          // Faster reader-writer lock
21
22/// Buffered TCP reader that continuously collects timestamped signal data
23///
24/// This component creates a background thread that reads lightweight SignalFrame data
25/// from TCPLoggerStream's channel and buffers it with high-resolution timestamps in a
26/// circular buffer. It provides time-windowed query methods for retrieving data before,
27/// during, and after specific time periods.
28///
29/// # High-Frequency Performance (2kHz+)
30/// **IMPORTANT**: At sampling rates above 1kHz, lock contention becomes critical:
31/// - Current implementation uses `Mutex<VecDeque>` suitable for <1kHz
32/// - For 2kHz+, consider `crossbeam::queue::ArrayQueue` (lock-free)
33/// - Alternative: `parking_lot::RwLock` for multiple concurrent readers
34/// - Query methods must complete in <0.1ms to avoid data loss
35///
36/// # Memory Efficiency
37/// Works with lightweight SignalFrame structures (just counter + data) throughout the
38/// entire pipeline, avoiding the overhead of full TCPLoggerData per frame.
39///
40/// # Architecture
41/// - TCPLoggerStream converts protocol data to SignalFrame (protocol → lightweight conversion)
42/// - BufferedTCPReader adds timestamps to SignalFrame (timing layer)
43/// - Thread-safe time-windowed queries while continuous collection runs in background
44pub struct BufferedTCPReader {
45    /// Thread-safe circular buffer of timestamped signal frames
46    buffer: Arc<RwLock<VecDeque<TimestampedSignalFrame>>>,
47    /// Background thread handle for buffering operations
48    buffering_thread: Option<JoinHandle<Result<(), NanonisError>>>,
49    /// Maximum number of frames to keep in circular buffer
50    max_buffer_size: usize,
51    /// Time when data collection started (for relative timestamps)
52    start_time: Instant,
53    /// Signal to shut down background thread
54    shutdown_signal: Arc<AtomicBool>,
55    /// Number of channels (configuration parameter)
56    num_channels: u32,
57    /// Oversampling rate (configuration parameter)
58    oversampling: f32,
59}
60
61impl BufferedTCPReader {
62    /// Create a new BufferedTCPReader with automatic background data collection
63    ///
64    /// This establishes a connection to the TCP logger stream and starts a background
65    /// thread for continuous data buffering with lightweight SignalFrame structures.
66    ///
67    /// # Arguments
68    /// * `host` - TCP server host address (e.g., "127.0.0.1")
69    /// * `port` - TCP logger data stream port (typically 6590)
70    /// * `buffer_size` - Maximum number of frames to keep in circular buffer
71    /// * `num_channels` - Number of channels being recorded by TCP logger
72    /// * `oversampling` - Oversampling rate configured for TCP logger
73    ///
74    /// # Returns
75    /// A BufferedTCPReader with active background collection, ready for queries
76    ///
77    /// # Implementation Notes
78    /// - Creates TCPLoggerStream and gets its background reader channel
79    /// - Starts buffering thread that converts SignalFrame to TimestampedSignalFrame
80    /// - Implements circular buffer behavior (drops oldest when full)
81    pub fn new(
82        host: &str,
83        port: u16,
84        buffer_size: usize,
85        num_channels: u32,
86        oversampling: f32,
87    ) -> Result<Self, NanonisError> {
88        let tcp_stream = TCPLoggerStream::new(host, port)?;
89        let tcp_receiver = tcp_stream.spawn_background_reader();
90
91        let buffer =
92            Arc::new(RwLock::new(VecDeque::with_capacity(buffer_size)));
93        let buffer_clone = buffer.clone();
94
95        let shutdown_signal = Arc::new(AtomicBool::new(false));
96        let shutdown_clone = shutdown_signal.clone();
97
98        let start_time = Instant::now();
99
100        // Don't block waiting for first frame - let background thread handle it
101        // The TCP logger might not be started yet when this constructor runs
102
103        let buffering_thread = thread::spawn(
104            move || -> Result<(), NanonisError> {
105                log::debug!("Started buffering thread for TCP logger data");
106
107                while !shutdown_clone.load(Ordering::Relaxed) {
108                    match tcp_receiver.recv_timeout(Duration::from_millis(100))
109                    {
110                        Ok(signal_frame) => {
111                            // Skip the first frame (signal indices metadata)
112                            if signal_frame.counter == 0 {
113                                log::debug!("Skipping metadata frame (counter=0) with signal indices");
114                                continue;
115                            }
116
117                            let timestamped_frame = TimestampedSignalFrame::new(
118                                signal_frame,
119                                start_time,
120                            );
121
122                            {
123                                let mut buffer = buffer_clone.write();
124                                buffer.push_back(timestamped_frame);
125
126                                if buffer.len() > buffer_size {
127                                    buffer.pop_front();
128                                }
129                            }
130                        }
131                        Err(mpsc::RecvTimeoutError::Timeout) => {
132                            continue;
133                        }
134                        Err(mpsc::RecvTimeoutError::Disconnected) => {
135                            log::info!("TCP logger stream disconnected ending buffering");
136                            break;
137                        }
138                    }
139                }
140                Ok(())
141            },
142        );
143
144        Ok(Self {
145            buffer,
146            buffering_thread: Some(buffering_thread),
147            max_buffer_size: buffer_size,
148            start_time,
149            shutdown_signal,
150            num_channels,
151            oversampling,
152        })
153    }
154
155    /// Check if the background buffering thread is still active
156    ///
157    /// # Returns
158    /// `true` if buffering is active, `false` if stopped or failed
159    pub fn is_buffering(&self) -> bool {
160        !self.shutdown_signal.load(Ordering::Relaxed)
161    }
162
163    /// Get current buffer utilization as a percentage
164    ///
165    /// # Returns
166    /// Value between 0.0 and 1.0 indicating how full the buffer is
167    ///
168    /// # Usage
169    /// Useful for monitoring buffer health and detecting if data collection
170    /// is faster than buffer capacity
171    pub fn buffer_utilization(&self) -> f64 {
172        let buffer = self.buffer.read();
173        buffer.len() as f64 / self.max_buffer_size as f64
174    }
175
176    /// Get the total uptime of the buffered TCP reader
177    ///
178    /// Returns the duration since the BufferedTCPReader was created and started
179    /// collecting data. This can be useful for monitoring, logging, and understanding
180    /// the data collection timespan.
181    ///
182    /// # Returns
183    /// Duration since the reader was started
184    ///
185    /// # Thread Safety
186    /// This method is very fast as it only reads the start_time field and calculates
187    /// the current duration. No locks are acquired.
188    ///
189    /// # Example
190    /// ```rust,ignore
191    /// let tcp_reader = BufferedTCPReader::new("127.0.0.1", 6590, 1000, 24, 100.0)?;
192    ///
193    /// // Later...
194    /// let uptime = tcp_reader.uptime();
195    /// println!("TCP reader has been running for {:.1}s", uptime.as_secs_f64());
196    ///
197    /// // Useful for rate calculations
198    /// let (frame_count, _, _) = tcp_reader.buffer_stats();
199    /// let avg_rate = frame_count as f64 / uptime.as_secs_f64();
200    /// println!("Average data rate: {:.1} frames/sec", avg_rate);
201    /// ```
202    pub fn uptime(&self) -> Duration {
203        self.start_time.elapsed()
204    }
205
206    /// Get all signal data since a specific timestamp
207    ///
208    /// # Arguments
209    /// * `since` - Timestamp to start collecting data from
210    ///
211    /// # Returns
212    /// Vector of timestamped signal frames from the specified time onwards
213    ///
214    /// # Thread Safety
215    /// This method acquires a lock on the buffer briefly to copy matching frames.
216    /// Lock is held for minimal time to avoid blocking the buffering thread.
217    pub fn get_data_since(
218        &self,
219        since: Instant,
220    ) -> Vec<TimestampedSignalFrame> {
221        let buffer = self.buffer.read();
222        buffer
223            .iter()
224            .filter(|frame| frame.timestamp >= since)
225            .cloned()
226            .collect()
227    }
228
229    /// Get signal data between two timestamps (time window query)
230    ///
231    /// # Arguments
232    /// * `start` - Start of time window (inclusive)
233    /// * `end` - End of time window (inclusive)
234    ///
235    /// # Returns
236    /// Vector of timestamped signal frames within the specified time window
237    ///
238    /// # Thread Safety
239    /// Minimizes lock time to avoid blocking the buffering thread.
240    ///
241    /// # Usage
242    /// This is the core method for synchronized data collection during actions.
243    /// Typically used to get data before/during/after specific operations.
244    pub fn get_data_between(
245        &self,
246        start: Instant,
247        end: Instant,
248    ) -> Vec<TimestampedSignalFrame> {
249        let buffer = self.buffer.read();
250        buffer
251            .iter()
252            .filter(|frame| frame.timestamp >= start && frame.timestamp <= end)
253            .cloned()
254            .collect()
255    }
256
257    /// Get recent signal data for a specific duration
258    ///
259    /// # Arguments
260    /// * `duration` - How far back to collect data from current time
261    ///
262    /// # Returns
263    /// Vector of timestamped signal frames from the recent past
264    ///
265    /// # Thread Safety
266    /// Delegates to get_data_since() which minimizes lock time.
267    ///
268    /// # Usage
269    /// Convenient for real-time monitoring and getting recent signal history
270    /// without needing to track specific timestamps
271    pub fn get_recent_data(
272        &self,
273        duration: Duration,
274    ) -> Vec<TimestampedSignalFrame> {
275        let since = Instant::now() - duration;
276        self.get_data_since(since)
277    }
278
279    /// Get all buffered signal data
280    ///
281    /// # Returns
282    /// Vector containing all currently buffered timestamped signal frames
283    ///
284    /// # Thread Safety
285    /// WARNING: This clones the entire buffer. For large buffers, prefer time-windowed queries.
286    /// Lock is held briefly but cloning large amounts of data may still impact performance.
287    ///
288    /// # Usage
289    /// Useful for final data collection when stopping buffering, or for
290    /// full experiment analysis
291    pub fn get_all_data(&self) -> Vec<TimestampedSignalFrame> {
292        let buffer = self.buffer.read();
293        buffer.iter().cloned().collect()
294    }
295
296    /// Get TCP logger configuration that was provided during construction
297    ///
298    /// # Returns
299    /// Tuple of (num_channels, oversampling) from the TCP logger
300    ///
301    /// # Usage
302    /// Needed when converting TimestampedSignalFrame back to TCPLoggerData
303    /// for backward compatibility
304    pub fn get_tcp_config(&self) -> (u32, f32) {
305        (self.num_channels, self.oversampling)
306    }
307
308    /// Get buffer statistics for monitoring
309    ///
310    /// # Returns
311    /// Tuple of (current_count, max_capacity, time_span_of_data)
312    ///
313    /// # Thread Safety
314    /// Very brief lock to read buffer metadata only, no cloning.
315    ///
316    /// # Usage
317    /// Useful for monitoring buffer health, detecting overruns, and
318    /// understanding the time span of collected data
319    pub fn buffer_stats(&self) -> (usize, usize, Duration) {
320        let buffer = self.buffer.read();
321        let count = buffer.len();
322        let capacity = self.max_buffer_size;
323        let time_span = if let (Some(first), Some(last)) =
324            (buffer.front(), buffer.back())
325        {
326            last.timestamp.duration_since(first.timestamp)
327        } else {
328            Duration::ZERO
329        };
330        (count, capacity, time_span)
331    }
332
333    /// Get the most recent N frames from the buffer
334    ///
335    /// Returns frames in reverse chronological order (newest first).
336    /// If fewer than `count` frames are available, returns all available frames.
337    ///
338    /// # Arguments
339    /// * `count` - Maximum number of frames to retrieve
340    ///
341    /// # Returns
342    /// Vector of timestamped signal frames, newest first
343    ///
344    /// # Example
345    /// ```rust,ignore
346    /// let recent_100 = tcp_reader.get_recent_frames(100);
347    /// ```
348    pub fn get_recent_frames(
349        &self,
350        count: usize,
351    ) -> Vec<TimestampedSignalFrame> {
352        let buffer = self.buffer.read();
353        buffer.iter().rev().take(count).cloned().collect()
354    }
355
356    /// Get the oldest N frames from the buffer
357    ///
358    /// Returns frames in chronological order (oldest first).
359    /// If fewer than `count` frames are available, returns all available frames.
360    /// Useful for FIFO processing or getting a stable baseline.
361    ///
362    /// # Arguments
363    /// * `count` - Maximum number of frames to retrieve
364    ///
365    /// # Returns
366    /// Vector of timestamped signal frames, oldest first
367    ///
368    /// # Example
369    /// ```rust,ignore
370    /// let baseline = tcp_reader.get_oldest_frames(50);
371    /// ```
372    pub fn get_oldest_frames(
373        &self,
374        count: usize,
375    ) -> Vec<TimestampedSignalFrame> {
376        let buffer = self.buffer.read();
377        buffer.iter().take(count).cloned().collect()
378    }
379
380    /// Get the current number of frames in the buffer
381    ///
382    /// Returns the total count of frames currently stored in the circular buffer.
383    /// This can be used to check buffer fill level or validate requests.
384    ///
385    /// # Returns
386    /// Number of frames currently buffered
387    ///
388    /// # Example
389    /// ```rust,ignore
390    /// let available = tcp_reader.frame_count();
391    /// if available >= 100 {
392    ///     let data = tcp_reader.get_recent_frames(100);
393    /// }
394    /// ```
395    pub fn frame_count(&self) -> usize {
396        let buffer = self.buffer.read();
397        buffer.len()
398    }
399
400    /// Get frames from a specific range in the buffer
401    ///
402    /// Returns frames starting from `start_idx` (0 = oldest frame) for `count` frames.
403    /// If the range extends beyond available data, returns available frames only.
404    /// Useful for windowed analysis or specific time periods.
405    ///
406    /// # Arguments
407    /// * `start_idx` - Starting index (0 = oldest frame in buffer)
408    /// * `count` - Number of frames to retrieve from start_idx
409    ///
410    /// # Returns
411    /// Vector of timestamped signal frames in chronological order
412    ///
413    /// # Example
414    /// ```rust,ignore
415    /// // Get frames 50-149 (middle section of buffer)
416    /// let middle_data = tcp_reader.get_frame_range(50, 100);
417    /// ```
418    pub fn get_frame_range(
419        &self,
420        start_idx: usize,
421        count: usize,
422    ) -> Vec<TimestampedSignalFrame> {
423        let buffer = self.buffer.read();
424
425        buffer.iter().skip(start_idx).take(count).cloned().collect()
426    }
427
428    /// Check if the buffer has at least N frames available
429    ///
430    /// Convenience method to check data availability before requesting frames.
431    /// More efficient than getting frame_count() when you only need a threshold check.
432    ///
433    /// # Arguments
434    /// * `min_count` - Minimum number of frames required
435    ///
436    /// # Returns
437    /// True if buffer contains at least `min_count` frames
438    ///
439    /// # Example
440    /// ```rust,ignore
441    /// if tcp_reader.has_frames(100) {
442    ///     let stable_data = tcp_reader.get_recent_frames(100);
443    /// } else {
444    ///     println!("Not enough data yet, only {} frames", tcp_reader.frame_count());
445    /// }
446    /// ```
447    pub fn has_frames(&self, min_count: usize) -> bool {
448        self.frame_count() > min_count
449    }
450
451    /// Clear all buffered data
452    ///
453    /// This removes all frames from the buffer, effectively resetting it to an empty state.
454    /// The background thread continues to run and will start filling the buffer again.
455    /// This is useful when you want to discard old data and start fresh.
456    ///
457    /// # Example
458    /// ```rust,ignore
459    /// // Clear any stale data before starting a new measurement
460    /// tcp_reader.clear_buffer();
461    /// thread::sleep(Duration::from_millis(500)); // Wait for fresh data
462    /// let fresh_data = tcp_reader.get_recent_data(Duration::from_millis(100));
463    /// ```
464    pub fn clear_buffer(&self) {
465        let mut buffer = self.buffer.write();
466        buffer.clear();
467        log::debug!("Cleared TCP reader buffer");
468    }
469
470    /// Stop background buffering and clean up resources
471    ///
472    /// # Returns
473    /// Result indicating if cleanup was successful
474    ///
475    /// # Implementation Notes
476    /// - Sets shutdown signal to stop background thread
477    /// - Waits for thread to finish and returns any errors
478    /// - Called automatically when BufferedTCPReader is dropped
479    pub fn stop(&mut self) -> Result<(), NanonisError> {
480        self.shutdown_signal.store(true, Ordering::Relaxed);
481        if let Some(handle) = self.buffering_thread.take() {
482            match handle.join() {
483                Ok(result) => result,
484                Err(_) => Err(NanonisError::Protocol(
485                    "Buffering thread panicked".to_string(),
486                )),
487            }
488        } else {
489            Ok(())
490        }
491    }
492}
493
494impl Drop for BufferedTCPReader {
495    /// Automatically stop buffering when BufferedTCPReader is dropped
496    fn drop(&mut self) {
497        let _ = self.stop();
498    }
499}