use crate::error::NanonisError;
use crate::types::{TCPLogStatus, TCPLoggerData};
use crate::NanonisClient;
use byteorder::{BigEndian, ReadBytesExt};
use std::io::{Cursor, Read};
use std::net::{SocketAddr, TcpStream};
use std::time::Duration;
pub struct TCPLoggerStream {
stream: TcpStream,
control: NanonisClient,
buffer: Vec<u8>,
}
impl TCPLoggerStream {
pub fn connect(addr: &str, stream_port: u16, control_port: u16) -> Result<Self, NanonisError> {
let socket_addr: SocketAddr = format!("{addr}:{stream_port}")
.parse()
.map_err(|_| NanonisError::InvalidAddress(addr.to_string()))?;
let stream = TcpStream::connect_timeout(&socket_addr, Duration::from_secs(5))?;
stream.set_nonblocking(false)?;
stream.set_read_timeout(Some(Duration::from_secs(30)))?;
let control = NanonisClient::new(addr, control_port)?;
Ok(Self {
stream,
control,
buffer: Vec::with_capacity(1024),
})
}
pub fn connect_timeout(
addr: &str,
stream_port: u16,
control_port: u16,
timeout: Duration,
) -> Result<Self, NanonisError> {
let socket_addr: SocketAddr = format!("{addr}:{stream_port}")
.parse()
.map_err(|_| NanonisError::InvalidAddress(addr.to_string()))?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
let control = NanonisClient::new(addr, control_port)?;
stream.set_read_timeout(Some(Duration::from_secs(30)))?;
Ok(Self {
stream,
control,
buffer: Vec::with_capacity(1024),
})
}
pub fn get_status(&mut self) -> Result<TCPLogStatus, NanonisError> {
self.control.tcplog_status_get()
}
pub fn read_frame(&mut self) -> Result<TCPLoggerData, NanonisError> {
let header_size = 18;
self.buffer.resize(header_size, 0);
self.stream
.read_exact(&mut self.buffer[..header_size])
.map_err(|e| NanonisError::Io {
source: e,
context: "Reading TCP Logger frame header".to_string(),
})?;
let mut cursor = Cursor::new(&self.buffer[..header_size]);
let num_channels = cursor.read_u32::<BigEndian>()?;
let oversampling = cursor.read_f32::<BigEndian>()?;
let counter = cursor.read_u64::<BigEndian>()?;
let state_val = cursor.read_u16::<BigEndian>()?;
let state = TCPLogStatus::try_from(state_val as i32)?;
let data_size = num_channels as usize * 4;
let total_size = header_size + data_size;
self.buffer.resize(total_size, 0);
self.stream
.read_exact(&mut self.buffer[header_size..])
.map_err(|e| NanonisError::Io {
source: e,
context: "Reading TCP Logger frame data".to_string(),
})?;
let mut cursor = Cursor::new(&self.buffer[header_size..]);
let mut data = Vec::with_capacity(num_channels as usize);
for _ in 0..num_channels {
data.push(cursor.read_f32::<BigEndian>()?);
}
Ok(TCPLoggerData {
num_channels,
oversampling,
counter,
state,
data,
})
}
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> Result<(), NanonisError> {
self.stream
.set_read_timeout(timeout)
.map_err(|e| NanonisError::Io {
source: e,
context: "Setting read timeout".to_string(),
})
}
pub fn data_available(&self) -> Result<bool, NanonisError> {
let mut buf = [0u8; 1];
match self.stream.peek(&mut buf) {
Ok(_) => Ok(true),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
Err(e) => Err(NanonisError::Io {
source: e,
context: "Checking data availability".to_string(),
}),
}
}
}
impl Drop for TCPLoggerStream {
fn drop(&mut self) {
let _ = self.control.tcplog_stop();
let _ = self.stream.shutdown(std::net::Shutdown::Both);
}
}