nanonis_rs/
tcplogger_stream.rs1use crate::error::NanonisError;
2use crate::signals::SignalFrame;
3use crate::tcplog::TCPLogStatus;
4use byteorder::{BigEndian, ReadBytesExt};
5use std::io::{Cursor, Read};
6use std::net::{SocketAddr, TcpStream};
7use std::sync::mpsc;
8use std::thread;
9use std::time::Duration;
10
11pub struct TCPLoggerStream {
13 stream: TcpStream,
14 buffer: Vec<u8>,
15}
16
17impl TCPLoggerStream {
18 pub fn new(addr: &str, stream_port: u16) -> Result<Self, NanonisError> {
30 let socket_addr: SocketAddr = format!("{addr}:{stream_port}")
31 .parse()
32 .map_err(|_| NanonisError::Protocol(format!("Invalid address: {addr}")))?;
33
34 let stream = TcpStream::connect(socket_addr).map_err(|e| NanonisError::Io {
35 source: e,
36 context: format!("Failed to connect to TCP stream at {}", socket_addr),
37 })?;
38
39 stream
41 .set_read_timeout(Some(Duration::from_secs(30)))
42 .map_err(|e| NanonisError::Io {
43 source: e,
44 context: "Setting TCP stream read timeout".to_string(),
45 })?;
46
47 Ok(Self {
48 stream,
49 buffer: Vec::with_capacity(1024),
50 })
51 }
52
53 pub fn spawn_background_reader(mut self) -> mpsc::Receiver<SignalFrame> {
62 let (sender, receiver) = mpsc::channel();
63
64 thread::spawn(move || {
65 while let Ok(frame) = self.read_frame() {
66 if sender.send(frame).is_err() {
67 break;
68 }
69 }
70 });
71
72 receiver
73 }
74
75 pub fn read_frame(&mut self) -> Result<SignalFrame, NanonisError> {
83 let header_size = 18;
85 self.buffer.resize(header_size, 0);
86
87 self.stream
89 .read_exact(&mut self.buffer[..header_size])
90 .map_err(|e| NanonisError::Io {
91 source: e,
92 context: "Reading TCP Logger frame header".to_string(),
93 })?;
94
95 let mut cursor = Cursor::new(&self.buffer[..header_size]);
97 let num_channels = cursor.read_u32::<BigEndian>()?;
98 let _oversampling = cursor.read_f32::<BigEndian>()?;
99 let counter = cursor.read_u64::<BigEndian>()?;
100 let state_val = cursor.read_u16::<BigEndian>()?;
101 let _state = TCPLogStatus::try_from(state_val as i32)?;
102
103 let data_size = (num_channels * 4) as usize; self.buffer.resize(data_size, 0);
106
107 self.stream
108 .read_exact(&mut self.buffer[..data_size])
109 .map_err(|e| NanonisError::Io {
110 source: e,
111 context: "Reading TCP Logger frame data".to_string(),
112 })?;
113
114 let mut cursor = Cursor::new(&self.buffer[..data_size]);
116 let mut data = Vec::with_capacity(num_channels as usize);
117 for _ in 0..num_channels {
118 data.push(cursor.read_f32::<BigEndian>()?);
119 }
120
121 Ok(SignalFrame { counter, data })
122 }
123}