openigtlink_rust/io/
server.rs

1//! Synchronous OpenIGTLink server implementation
2//!
3//! Provides a simple blocking TCP server for OpenIGTLink communication.
4
5use std::io::{Read, Write};
6use std::net::{TcpListener, TcpStream};
7
8use tracing::{debug, info, trace, warn};
9
10use crate::error::Result;
11use crate::protocol::header::Header;
12use crate::protocol::message::{IgtlMessage, Message};
13
14/// Synchronous OpenIGTLink server
15///
16/// Uses blocking I/O with `std::net::TcpListener` for simple, synchronous server implementation.
17pub struct IgtlServer {
18    listener: TcpListener,
19}
20
21impl IgtlServer {
22    /// Bind to a local address and create a server
23    ///
24    /// # Arguments
25    ///
26    /// * `addr` - Local address to bind (e.g., "127.0.0.1:18944")
27    ///
28    /// # Errors
29    ///
30    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to bind (address in use, insufficient permissions, etc.)
31    ///
32    /// # Examples
33    ///
34    /// ```no_run
35    /// use openigtlink_rust::io::IgtlServer;
36    ///
37    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
38    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
39    /// ```
40    pub fn bind(addr: &str) -> Result<Self> {
41        info!(addr = %addr, "Binding OpenIGTLink server");
42        let listener = TcpListener::bind(addr)?;
43        let local_addr = listener.local_addr()?;
44        info!(
45            local_addr = %local_addr,
46            "OpenIGTLink server listening"
47        );
48        Ok(IgtlServer { listener })
49    }
50
51    /// Accept a new client connection
52    ///
53    /// Blocks until a client connects.
54    ///
55    /// # Errors
56    ///
57    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to accept connection
58    ///
59    /// # Examples
60    ///
61    /// ```no_run
62    /// use openigtlink_rust::io::IgtlServer;
63    ///
64    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
65    /// let connection = server.accept()?;
66    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
67    /// ```
68    pub fn accept(&self) -> Result<IgtlConnection> {
69        trace!("Waiting for client connection");
70        let (stream, addr) = self.listener.accept()?;
71        info!(
72            peer_addr = %addr,
73            "Client connected"
74        );
75        Ok(IgtlConnection {
76            stream,
77            verify_crc: true, // Default: verify CRC
78        })
79    }
80
81    /// Get the local address this server is bound to
82    pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
83        Ok(self.listener.local_addr()?)
84    }
85}
86
87/// Represents an accepted client connection
88///
89/// Provides methods to send and receive OpenIGTLink messages over the connection.
90pub struct IgtlConnection {
91    stream: TcpStream,
92    verify_crc: bool,
93}
94
95impl IgtlConnection {
96    /// Enable or disable CRC verification for received messages
97    ///
98    /// # Arguments
99    ///
100    /// * `verify` - true to enable CRC verification (default), false to disable
101    ///
102    /// # Safety
103    ///
104    /// Disabling CRC verification should only be done in trusted environments
105    /// where data corruption is unlikely (e.g., loopback, local network).
106    pub fn set_verify_crc(&mut self, verify: bool) {
107        if verify != self.verify_crc {
108            info!(verify = verify, "CRC verification setting changed");
109            if !verify {
110                warn!("CRC verification disabled - use only in trusted environments");
111            }
112        }
113        self.verify_crc = verify;
114    }
115
116    /// Get current CRC verification setting
117    ///
118    /// # Returns
119    ///
120    /// true if CRC verification is enabled, false otherwise
121    pub fn verify_crc(&self) -> bool {
122        self.verify_crc
123    }
124
125    /// Send a message to the connected client
126    ///
127    /// # Arguments
128    ///
129    /// * `msg` - Message to send
130    ///
131    /// # Errors
132    ///
133    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed (connection lost, broken pipe, etc.)
134    /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
135    ///
136    /// # Examples
137    ///
138    /// ```no_run
139    /// use openigtlink_rust::io::IgtlServer;
140    /// use openigtlink_rust::protocol::types::StatusMessage;
141    /// use openigtlink_rust::protocol::message::IgtlMessage;
142    ///
143    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
144    /// let mut conn = server.accept()?;
145    ///
146    /// let status = StatusMessage::ok("Ready");
147    /// let msg = IgtlMessage::new(status, "Server")?;
148    /// conn.send(&msg)?;
149    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
150    /// ```
151    pub fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
152        let data = msg.encode()?;
153        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
154        let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
155
156        debug!(
157            msg_type = msg_type,
158            device_name = device_name,
159            size = data.len(),
160            "Sending message to client"
161        );
162
163        self.stream.write_all(&data)?;
164        self.stream.flush()?;
165
166        trace!(
167            msg_type = msg_type,
168            bytes_sent = data.len(),
169            "Message sent successfully"
170        );
171
172        Ok(())
173    }
174
175    /// Receive a message from the connected client
176    ///
177    /// Blocks until a complete message is received.
178    ///
179    /// # Errors
180    ///
181    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed (connection lost, timeout, etc.)
182    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
183    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
184    /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
185    /// - [`IgtlError::InvalidSize`](crate::error::IgtlError::InvalidSize) - Message size mismatch
186    ///
187    /// # Examples
188    ///
189    /// ```no_run
190    /// use openigtlink_rust::io::IgtlServer;
191    /// use openigtlink_rust::protocol::types::TransformMessage;
192    /// use openigtlink_rust::protocol::message::IgtlMessage;
193    ///
194    /// let server = IgtlServer::bind("127.0.0.1:18944")?;
195    /// let mut conn = server.accept()?;
196    ///
197    /// let msg: IgtlMessage<TransformMessage> = conn.receive()?;
198    /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
199    /// ```
200    pub fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
201        trace!("Waiting for message header from client");
202
203        // Read header (58 bytes)
204        let mut header_buf = vec![0u8; Header::SIZE];
205        self.stream.read_exact(&mut header_buf)?;
206
207        let header = Header::decode(&header_buf)?;
208
209        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
210        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
211
212        debug!(
213            msg_type = msg_type,
214            device_name = device_name,
215            body_size = header.body_size,
216            version = header.version,
217            "Received message header from client"
218        );
219
220        // Read body
221        let mut body_buf = vec![0u8; header.body_size as usize];
222        self.stream.read_exact(&mut body_buf)?;
223
224        trace!(
225            msg_type = msg_type,
226            bytes_read = body_buf.len(),
227            "Message body received from client"
228        );
229
230        // Decode full message with CRC verification setting
231        let mut full_msg = header_buf;
232        full_msg.extend_from_slice(&body_buf);
233
234        let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
235
236        match &result {
237            Ok(_) => {
238                debug!(
239                    msg_type = msg_type,
240                    device_name = device_name,
241                    "Message decoded successfully"
242                );
243            }
244            Err(e) => {
245                warn!(
246                    msg_type = msg_type,
247                    error = %e,
248                    "Failed to decode message from client"
249                );
250            }
251        }
252
253        result
254    }
255
256    /// Set read timeout for the underlying TCP stream
257    ///
258    /// # Arguments
259    ///
260    /// * `timeout` - Timeout duration (None for infinite)
261    pub fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
262        self.stream.set_read_timeout(timeout)?;
263        Ok(())
264    }
265
266    /// Set write timeout for the underlying TCP stream
267    ///
268    /// # Arguments
269    ///
270    /// * `timeout` - Timeout duration (None for infinite)
271    pub fn set_write_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
272        self.stream.set_write_timeout(timeout)?;
273        Ok(())
274    }
275
276    /// Enable or disable TCP_NODELAY (Nagle's algorithm)
277    ///
278    /// Wrapper around [`std::net::TcpStream::set_nodelay`].
279    pub fn set_nodelay(&self, nodelay: bool) -> Result<()> {
280        self.stream.set_nodelay(nodelay)?;
281        Ok(())
282    }
283
284    /// Set the size of the TCP receive buffer (SO_RCVBUF)
285    pub fn set_recv_buffer_size(&self, size: usize) -> Result<()> {
286        #[cfg(unix)]
287        {
288            use std::os::fd::AsRawFd;
289
290            let fd = self.stream.as_raw_fd();
291            let size = size as libc::c_int;
292
293            unsafe {
294                let ret = libc::setsockopt(
295                    fd,
296                    libc::SOL_SOCKET,
297                    libc::SO_RCVBUF,
298                    &size as *const _ as *const libc::c_void,
299                    std::mem::size_of::<libc::c_int>() as libc::socklen_t,
300                );
301
302                if ret != 0 {
303                    return Err(std::io::Error::last_os_error().into());
304                }
305            }
306        }
307
308        #[cfg(windows)]
309        {
310            use std::os::windows::io::AsRawSocket;
311
312            // Windows Winsock constants
313            const SOL_SOCKET: libc::c_int = 0xffff;
314            const SO_RCVBUF: libc::c_int = 0x1002;
315
316            let socket = self.stream.as_raw_socket();
317            let size = size as libc::c_int;
318
319            unsafe {
320                let ret = libc::setsockopt(
321                    socket as libc::SOCKET,
322                    SOL_SOCKET,
323                    SO_RCVBUF,
324                    &size as *const _ as *const libc::c_char,
325                    std::mem::size_of::<libc::c_int>() as libc::c_int,
326                );
327
328                if ret != 0 {
329                    return Err(std::io::Error::last_os_error().into());
330                }
331            }
332        }
333
334        Ok(())
335    }
336
337    /// Set the size of the TCP send buffer (SO_SNDBUF)
338    pub fn set_send_buffer_size(&self, size: usize) -> Result<()> {
339        #[cfg(unix)]
340        {
341            use std::os::fd::AsRawFd;
342
343            let fd = self.stream.as_raw_fd();
344            let size = size as libc::c_int;
345
346            unsafe {
347                let ret = libc::setsockopt(
348                    fd,
349                    libc::SOL_SOCKET,
350                    libc::SO_SNDBUF,
351                    &size as *const _ as *const libc::c_void,
352                    std::mem::size_of::<libc::c_int>() as libc::socklen_t,
353                );
354
355                if ret != 0 {
356                    return Err(std::io::Error::last_os_error().into());
357                }
358            }
359        }
360
361        #[cfg(windows)]
362        {
363            use std::os::windows::io::AsRawSocket;
364
365            // Windows Winsock constants
366            const SOL_SOCKET: libc::c_int = 0xffff;
367            const SO_SNDBUF: libc::c_int = 0x1001;
368
369            let socket = self.stream.as_raw_socket();
370            let size = size as libc::c_int;
371
372            unsafe {
373                let ret = libc::setsockopt(
374                    socket as libc::SOCKET,
375                    SOL_SOCKET,
376                    SO_SNDBUF,
377                    &size as *const _ as *const libc::c_char,
378                    std::mem::size_of::<libc::c_int>() as libc::c_int,
379                );
380
381                if ret != 0 {
382                    return Err(std::io::Error::last_os_error().into());
383                }
384            }
385        }
386
387        Ok(())
388    }
389
390    /// Get the current TCP_NODELAY setting
391    pub fn nodelay(&self) -> Result<bool> {
392        Ok(self.stream.nodelay()?)
393    }
394
395    /// Get the remote peer address
396    pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
397        Ok(self.stream.peer_addr()?)
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    // Integration tests require multi-threaded setup
404    // See examples/ for end-to-end testing
405}