Skip to main content

nanonis_rs/
tcplogger_stream.rs

1use crate::error::NanonisError;
2use crate::signals::SignalFrame;
3use crate::tcplog::TCPLogStatus;
4use byteorder::{BigEndian, ReadBytesExt};
5use std::io::{Cursor, Read};
6use std::net::{SocketAddr, TcpStream};
7use std::sync::mpsc;
8use std::thread;
9use std::time::Duration;
10
11/// Simple TCP Logger Stream - connects to data stream only, no control
12pub struct TCPLoggerStream {
13    stream: TcpStream,
14    buffer: Vec<u8>,
15}
16
17impl TCPLoggerStream {
18    /// Connect to TCP Logger data stream only
19    ///
20    /// Creates a simple connection to the TCP data stream without any control operations.
21    /// All control (start/stop/configure) should be handled externally.
22    ///
23    /// # Arguments
24    /// * `addr` - Server address (e.g., "127.0.0.1")
25    /// * `stream_port` - TCP Logger data stream port (typically 6590)
26    ///
27    /// # Returns
28    /// Connected `TCPLoggerStream` ready to read data frames.
29    pub fn new(addr: &str, stream_port: u16) -> Result<Self, NanonisError> {
30        let socket_addr: SocketAddr = format!("{addr}:{stream_port}")
31            .parse()
32            .map_err(|_| NanonisError::Protocol(format!("Invalid address: {addr}")))?;
33
34        let stream = TcpStream::connect(socket_addr).map_err(|e| NanonisError::Io {
35            source: e,
36            context: format!("Failed to connect to TCP stream at {}", socket_addr),
37        })?;
38
39        // Set read timeout for continuous reading
40        stream
41            .set_read_timeout(Some(Duration::from_secs(30)))
42            .map_err(|e| NanonisError::Io {
43                source: e,
44                context: "Setting TCP stream read timeout".to_string(),
45            })?;
46
47        Ok(Self {
48            stream,
49            buffer: Vec::with_capacity(1024),
50        })
51    }
52
53    /// Spawn background reader thread
54    ///
55    /// Creates a background thread that continuously reads TCP Logger data frames
56    /// and sends them through a channel. The thread automatically exits when the
57    /// receiver is dropped.
58    ///
59    /// # Returns
60    /// A receiver channel for `TCPLoggerData` frames.
61    pub fn spawn_background_reader(mut self) -> mpsc::Receiver<SignalFrame> {
62        let (sender, receiver) = mpsc::channel();
63
64        thread::spawn(move || {
65            while let Ok(frame) = self.read_frame() {
66                if sender.send(frame).is_err() {
67                    break;
68                }
69            }
70        });
71
72        receiver
73    }
74
75    /// Read a single data frame from the stream
76    ///
77    /// # Returns
78    /// `TCPLoggerData` containing the frame header and signal data.
79    ///
80    /// # Frame Format
81    /// Always reads 18 bytes header first, then reads data based on num_channels.
82    pub fn read_frame(&mut self) -> Result<SignalFrame, NanonisError> {
83        // First read header to determine frame size
84        let header_size = 18;
85        self.buffer.resize(header_size, 0);
86
87        // Read header into buffer
88        self.stream
89            .read_exact(&mut self.buffer[..header_size])
90            .map_err(|e| NanonisError::Io {
91                source: e,
92                context: "Reading TCP Logger frame header".to_string(),
93            })?;
94
95        // Parse header from buffer
96        let mut cursor = Cursor::new(&self.buffer[..header_size]);
97        let num_channels = cursor.read_u32::<BigEndian>()?;
98        let _oversampling = cursor.read_f32::<BigEndian>()?;
99        let counter = cursor.read_u64::<BigEndian>()?;
100        let state_val = cursor.read_u16::<BigEndian>()?;
101        let _state = TCPLogStatus::try_from(state_val as i32)?;
102
103        // Calculate total frame size and read data portion
104        let data_size = (num_channels * 4) as usize; // 4 bytes per f32
105        self.buffer.resize(data_size, 0);
106
107        self.stream
108            .read_exact(&mut self.buffer[..data_size])
109            .map_err(|e| NanonisError::Io {
110                source: e,
111                context: "Reading TCP Logger frame data".to_string(),
112            })?;
113
114        // Parse data values from buffer
115        let mut cursor = Cursor::new(&self.buffer[..data_size]);
116        let mut data = Vec::with_capacity(num_channels as usize);
117        for _ in 0..num_channels {
118            data.push(cursor.read_f32::<BigEndian>()?);
119        }
120
121        Ok(SignalFrame { counter, data })
122    }
123}