use crate::types::TimestampedSignalFrame;
use crate::NanonisError;
use nanonis_rs::TCPLoggerStream;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
pub struct BufferedTCPReader {
buffer: Arc<RwLock<VecDeque<TimestampedSignalFrame>>>,
buffering_thread: Option<JoinHandle<Result<(), NanonisError>>>,
max_buffer_size: usize,
start_time: Instant,
shutdown_signal: Arc<AtomicBool>,
num_channels: u32,
oversampling: f32,
}
impl BufferedTCPReader {
pub fn new(
host: &str,
port: u16,
buffer_size: usize,
num_channels: u32,
oversampling: f32,
) -> Result<Self, NanonisError> {
let tcp_stream = TCPLoggerStream::new(host, port)?;
let tcp_receiver = tcp_stream.spawn_background_reader();
let buffer =
Arc::new(RwLock::new(VecDeque::with_capacity(buffer_size)));
let buffer_clone = buffer.clone();
let shutdown_signal = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown_signal.clone();
let start_time = Instant::now();
let buffering_thread = thread::spawn(
move || -> Result<(), NanonisError> {
log::debug!("Started buffering thread for TCP logger data");
while !shutdown_clone.load(Ordering::Relaxed) {
match tcp_receiver.recv_timeout(Duration::from_millis(100))
{
Ok(signal_frame) => {
if signal_frame.counter == 0 {
log::debug!("Skipping metadata frame (counter=0) with signal indices");
continue;
}
let timestamped_frame = TimestampedSignalFrame::new(
signal_frame,
start_time,
);
{
let mut buffer = buffer_clone.write();
buffer.push_back(timestamped_frame);
if buffer.len() > buffer_size {
buffer.pop_front();
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
log::info!("TCP logger stream disconnected ending buffering");
break;
}
}
}
Ok(())
},
);
Ok(Self {
buffer,
buffering_thread: Some(buffering_thread),
max_buffer_size: buffer_size,
start_time,
shutdown_signal,
num_channels,
oversampling,
})
}
pub fn is_buffering(&self) -> bool {
!self.shutdown_signal.load(Ordering::Relaxed)
}
pub fn buffer_utilization(&self) -> f64 {
let buffer = self.buffer.read();
buffer.len() as f64 / self.max_buffer_size as f64
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub fn get_data_since(
&self,
since: Instant,
) -> Vec<TimestampedSignalFrame> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|frame| frame.timestamp >= since)
.cloned()
.collect()
}
pub fn get_data_between(
&self,
start: Instant,
end: Instant,
) -> Vec<TimestampedSignalFrame> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|frame| frame.timestamp >= start && frame.timestamp <= end)
.cloned()
.collect()
}
pub fn get_recent_data(
&self,
duration: Duration,
) -> Vec<TimestampedSignalFrame> {
let since = Instant::now() - duration;
self.get_data_since(since)
}
pub fn get_all_data(&self) -> Vec<TimestampedSignalFrame> {
let buffer = self.buffer.read();
buffer.iter().cloned().collect()
}
pub fn get_tcp_config(&self) -> (u32, f32) {
(self.num_channels, self.oversampling)
}
pub fn buffer_stats(&self) -> (usize, usize, Duration) {
let buffer = self.buffer.read();
let count = buffer.len();
let capacity = self.max_buffer_size;
let time_span = if let (Some(first), Some(last)) =
(buffer.front(), buffer.back())
{
last.timestamp.duration_since(first.timestamp)
} else {
Duration::ZERO
};
(count, capacity, time_span)
}
pub fn get_recent_frames(
&self,
count: usize,
) -> Vec<TimestampedSignalFrame> {
let buffer = self.buffer.read();
buffer.iter().rev().take(count).cloned().collect()
}
pub fn get_oldest_frames(
&self,
count: usize,
) -> Vec<TimestampedSignalFrame> {
let buffer = self.buffer.read();
buffer.iter().take(count).cloned().collect()
}
pub fn frame_count(&self) -> usize {
let buffer = self.buffer.read();
buffer.len()
}
pub fn get_frame_range(
&self,
start_idx: usize,
count: usize,
) -> Vec<TimestampedSignalFrame> {
let buffer = self.buffer.read();
buffer.iter().skip(start_idx).take(count).cloned().collect()
}
pub fn has_frames(&self, min_count: usize) -> bool {
self.frame_count() > min_count
}
pub fn clear_buffer(&self) {
let mut buffer = self.buffer.write();
buffer.clear();
log::debug!("Cleared TCP reader buffer");
}
pub fn stop(&mut self) -> Result<(), NanonisError> {
self.shutdown_signal.store(true, Ordering::Relaxed);
if let Some(handle) = self.buffering_thread.take() {
match handle.join() {
Ok(result) => result,
Err(_) => Err(NanonisError::Protocol(
"Buffering thread panicked".to_string(),
)),
}
} else {
Ok(())
}
}
}
impl Drop for BufferedTCPReader {
fn drop(&mut self) {
let _ = self.stop();
}
}