openigtlink_rust/io/
server.rs

1//! Synchronous OpenIGTLink server implementation
2//!
3//! Provides a simple blocking TCP server for OpenIGTLink communication.
4
5use std::io::{Read, Write};
6use std::net::{TcpListener, TcpStream};
7
8use tracing::{debug, info, trace, warn};
9
10use crate::error::Result;
11use crate::protocol::header::Header;
12use crate::protocol::message::{IgtlMessage, Message};
13use crate::protocol::AnyMessage;
14
15/// Synchronous OpenIGTLink server
16///
17/// Uses blocking I/O with `std::net::TcpListener` for simple, synchronous server implementation.
18pub struct IgtlServer {
19    listener: TcpListener,
20}
21
22impl IgtlServer {
23    /// Bind to a local address and create a server
24    ///
25    /// # Arguments
26    ///
27    /// * `addr` - Local address to bind (e.g., "127.0.0.1:18944")
28    ///
29    /// # Errors
30    ///
31    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to bind (address in use, insufficient permissions, etc.)
32    ///
33    /// # Examples
34    ///
35    /// ```no_run
36    /// use openigtlink_rust::io::IgtlServer;
37    ///
38    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
39    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
40    /// ```
41    pub fn bind(addr: &str) -> Result<Self> {
42        info!(addr = %addr, "Binding OpenIGTLink server");
43        let listener = TcpListener::bind(addr)?;
44        let local_addr = listener.local_addr()?;
45        info!(
46            local_addr = %local_addr,
47            "OpenIGTLink server listening"
48        );
49        Ok(IgtlServer { listener })
50    }
51
52    /// Accept a new client connection
53    ///
54    /// Blocks until a client connects.
55    ///
56    /// # Errors
57    ///
58    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to accept connection
59    ///
60    /// # Examples
61    ///
62    /// ```no_run
63    /// use openigtlink_rust::io::IgtlServer;
64    ///
65    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
66    /// let connection = server.accept()?;
67    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
68    /// ```
69    pub fn accept(&self) -> Result<IgtlConnection> {
70        trace!("Waiting for client connection");
71        let (stream, addr) = self.listener.accept()?;
72        info!(
73            peer_addr = %addr,
74            "Client connected"
75        );
76        Ok(IgtlConnection {
77            stream,
78            verify_crc: true, // Default: verify CRC
79        })
80    }
81
82    /// Get the local address this server is bound to
83    pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
84        Ok(self.listener.local_addr()?)
85    }
86}
87
88/// Represents an accepted client connection
89///
90/// Provides methods to send and receive OpenIGTLink messages over the connection.
91pub struct IgtlConnection {
92    stream: TcpStream,
93    verify_crc: bool,
94}
95
96impl IgtlConnection {
97    /// Enable or disable CRC verification for received messages
98    ///
99    /// # Arguments
100    ///
101    /// * `verify` - true to enable CRC verification (default), false to disable
102    ///
103    /// # Safety
104    ///
105    /// Disabling CRC verification should only be done in trusted environments
106    /// where data corruption is unlikely (e.g., loopback, local network).
107    pub fn set_verify_crc(&mut self, verify: bool) {
108        if verify != self.verify_crc {
109            info!(verify = verify, "CRC verification setting changed");
110            if !verify {
111                warn!("CRC verification disabled - use only in trusted environments");
112            }
113        }
114        self.verify_crc = verify;
115    }
116
117    /// Get current CRC verification setting
118    ///
119    /// # Returns
120    ///
121    /// true if CRC verification is enabled, false otherwise
122    pub fn verify_crc(&self) -> bool {
123        self.verify_crc
124    }
125
126    /// Send a message to the connected client
127    ///
128    /// # Arguments
129    ///
130    /// * `msg` - Message to send
131    ///
132    /// # Errors
133    ///
134    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed (connection lost, broken pipe, etc.)
135    /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
136    ///
137    /// # Examples
138    ///
139    /// ```no_run
140    /// use openigtlink_rust::io::IgtlServer;
141    /// use openigtlink_rust::protocol::types::StatusMessage;
142    /// use openigtlink_rust::protocol::message::IgtlMessage;
143    ///
144    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
145    /// let mut conn = server.accept()?;
146    ///
147    /// let status = StatusMessage::ok("Ready");
148    /// let msg = IgtlMessage::new(status, "Server")?;
149    /// conn.send(&msg)?;
150    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
151    /// ```
152    pub fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
153        let data = msg.encode()?;
154        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
155        let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
156
157        debug!(
158            msg_type = msg_type,
159            device_name = device_name,
160            size = data.len(),
161            "Sending message to client"
162        );
163
164        self.stream.write_all(&data)?;
165        self.stream.flush()?;
166
167        trace!(
168            msg_type = msg_type,
169            bytes_sent = data.len(),
170            "Message sent successfully"
171        );
172
173        Ok(())
174    }
175
176    /// Receive a message from the connected client
177    ///
178    /// Blocks until a complete message is received.
179    ///
180    /// # Errors
181    ///
182    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed (connection lost, timeout, etc.)
183    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
184    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
185    /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
186    /// - [`IgtlError::InvalidSize`](crate::error::IgtlError::InvalidSize) - Message size mismatch
187    ///
188    /// # Examples
189    ///
190    /// ```no_run
191    /// use openigtlink_rust::io::IgtlServer;
192    /// use openigtlink_rust::protocol::types::TransformMessage;
193    /// use openigtlink_rust::protocol::message::IgtlMessage;
194    ///
195    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
196    /// let mut conn = server.accept()?;
197    ///
198    /// let msg: IgtlMessage<TransformMessage> = conn.receive()?;
199    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
200    /// ```
201    pub fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
202        trace!("Waiting for message header from client");
203
204        // Read header (58 bytes)
205        let mut header_buf = vec![0u8; Header::SIZE];
206        self.stream.read_exact(&mut header_buf)?;
207
208        let header = Header::decode(&header_buf)?;
209
210        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
211        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
212
213        debug!(
214            msg_type = msg_type,
215            device_name = device_name,
216            body_size = header.body_size,
217            version = header.version,
218            "Received message header from client"
219        );
220
221        // Read body
222        let mut body_buf = vec![0u8; header.body_size as usize];
223        self.stream.read_exact(&mut body_buf)?;
224
225        trace!(
226            msg_type = msg_type,
227            bytes_read = body_buf.len(),
228            "Message body received from client"
229        );
230
231        // Decode full message with CRC verification setting
232        let mut full_msg = header_buf;
233        full_msg.extend_from_slice(&body_buf);
234
235        let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
236
237        match &result {
238            Ok(_) => {
239                debug!(
240                    msg_type = msg_type,
241                    device_name = device_name,
242                    "Message decoded successfully"
243                );
244            }
245            Err(e) => {
246                warn!(
247                    msg_type = msg_type,
248                    error = %e,
249                    "Failed to decode message from client"
250                );
251            }
252        }
253
254        result
255    }
256
257    /// Receive any message type dynamically
258    ///
259    /// This method receives a message without knowing its type in advance,
260    /// returning it as an [`AnyMessage`] enum that can be pattern matched.
261    ///
262    /// # Errors
263    ///
264    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed
265    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Malformed header
266    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
267    ///
268    /// # Examples
269    ///
270    /// ```no_run
271    /// use openigtlink_rust::io::IgtlServer;
272    /// use openigtlink_rust::protocol::AnyMessage;
273    ///
274    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
275    /// let mut conn = server.accept()?;
276    ///
277    /// let msg = conn.receive_any()?;
278    /// match msg {
279    ///     AnyMessage::Transform(_) => println!("Received transform"),
280    ///     AnyMessage::Status(_) => println!("Received status"),
281    ///     _ => println!("Received other message"),
282    /// }
283    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
284    /// ```
285    pub fn receive_any(&mut self) -> Result<AnyMessage> {
286        trace!("Waiting for any message type from client");
287
288        // Read header (58 bytes)
289        let mut header_buf = vec![0u8; Header::SIZE];
290        self.stream.read_exact(&mut header_buf)?;
291
292        let header = Header::decode(&header_buf)?;
293
294        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
295        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
296
297        debug!(
298            msg_type = msg_type,
299            device_name = device_name,
300            body_size = header.body_size,
301            version = header.version,
302            "Received message header from client"
303        );
304
305        // Read body
306        let mut body_buf = vec![0u8; header.body_size as usize];
307        self.stream.read_exact(&mut body_buf)?;
308
309        trace!(
310            msg_type = msg_type,
311            bytes_read = body_buf.len(),
312            "Message body received from client"
313        );
314
315        // Decode full message with CRC verification setting
316        let mut full_msg = header_buf;
317        full_msg.extend_from_slice(&body_buf);
318
319        let result = AnyMessage::decode_with_options(&full_msg, self.verify_crc);
320
321        match &result {
322            Ok(_) => {
323                debug!(
324                    msg_type = msg_type,
325                    device_name = device_name,
326                    "Message decoded successfully as AnyMessage"
327                );
328            }
329            Err(e) => {
330                warn!(
331                    msg_type = msg_type,
332                    error = %e,
333                    "Failed to decode message from client"
334                );
335            }
336        }
337
338        result
339    }
340
341    /// Set read timeout for the underlying TCP stream
342    ///
343    /// # Arguments
344    ///
345    /// * `timeout` - Timeout duration (None for infinite)
346    pub fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
347        self.stream.set_read_timeout(timeout)?;
348        Ok(())
349    }
350
351    /// Set write timeout for the underlying TCP stream
352    ///
353    /// # Arguments
354    ///
355    /// * `timeout` - Timeout duration (None for infinite)
356    pub fn set_write_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
357        self.stream.set_write_timeout(timeout)?;
358        Ok(())
359    }
360
361    /// Enable or disable TCP_NODELAY (Nagle's algorithm)
362    ///
363    /// Wrapper around [`std::net::TcpStream::set_nodelay`].
364    pub fn set_nodelay(&self, nodelay: bool) -> Result<()> {
365        self.stream.set_nodelay(nodelay)?;
366        Ok(())
367    }
368
369    /// Set the size of the TCP receive buffer (SO_RCVBUF)
370    pub fn set_recv_buffer_size(&self, size: usize) -> Result<()> {
371        #[cfg(unix)]
372        {
373            use std::os::fd::AsRawFd;
374
375            let fd = self.stream.as_raw_fd();
376            let size = size as libc::c_int;
377
378            unsafe {
379                let ret = libc::setsockopt(
380                    fd,
381                    libc::SOL_SOCKET,
382                    libc::SO_RCVBUF,
383                    &size as *const _ as *const libc::c_void,
384                    std::mem::size_of::<libc::c_int>() as libc::socklen_t,
385                );
386
387                if ret != 0 {
388                    return Err(std::io::Error::last_os_error().into());
389                }
390            }
391        }
392
393        #[cfg(windows)]
394        {
395            use std::os::windows::io::AsRawSocket;
396
397            // Windows Winsock constants
398            const SOL_SOCKET: libc::c_int = 0xffff;
399            const SO_RCVBUF: libc::c_int = 0x1002;
400
401            let socket = self.stream.as_raw_socket();
402            let size = size as libc::c_int;
403
404            unsafe {
405                let ret = libc::setsockopt(
406                    socket as libc::SOCKET,
407                    SOL_SOCKET,
408                    SO_RCVBUF,
409                    &size as *const _ as *const libc::c_char,
410                    std::mem::size_of::<libc::c_int>() as libc::c_int,
411                );
412
413                if ret != 0 {
414                    return Err(std::io::Error::last_os_error().into());
415                }
416            }
417        }
418
419        Ok(())
420    }
421
422    /// Set the size of the TCP send buffer (SO_SNDBUF)
423    pub fn set_send_buffer_size(&self, size: usize) -> Result<()> {
424        #[cfg(unix)]
425        {
426            use std::os::fd::AsRawFd;
427
428            let fd = self.stream.as_raw_fd();
429            let size = size as libc::c_int;
430
431            unsafe {
432                let ret = libc::setsockopt(
433                    fd,
434                    libc::SOL_SOCKET,
435                    libc::SO_SNDBUF,
436                    &size as *const _ as *const libc::c_void,
437                    std::mem::size_of::<libc::c_int>() as libc::socklen_t,
438                );
439
440                if ret != 0 {
441                    return Err(std::io::Error::last_os_error().into());
442                }
443            }
444        }
445
446        #[cfg(windows)]
447        {
448            use std::os::windows::io::AsRawSocket;
449
450            // Windows Winsock constants
451            const SOL_SOCKET: libc::c_int = 0xffff;
452            const SO_SNDBUF: libc::c_int = 0x1001;
453
454            let socket = self.stream.as_raw_socket();
455            let size = size as libc::c_int;
456
457            unsafe {
458                let ret = libc::setsockopt(
459                    socket as libc::SOCKET,
460                    SOL_SOCKET,
461                    SO_SNDBUF,
462                    &size as *const _ as *const libc::c_char,
463                    std::mem::size_of::<libc::c_int>() as libc::c_int,
464                );
465
466                if ret != 0 {
467                    return Err(std::io::Error::last_os_error().into());
468                }
469            }
470        }
471
472        Ok(())
473    }
474
475    /// Get the current TCP_NODELAY setting
476    pub fn nodelay(&self) -> Result<bool> {
477        Ok(self.stream.nodelay()?)
478    }
479
480    /// Get the remote peer address
481    pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
482        Ok(self.stream.peer_addr()?)
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    // Integration tests require multi-threaded setup
489    // See examples/ for end-to-end testing
490}