openigtlink_rust/io/server.rs
1//! Synchronous OpenIGTLink server implementation
2//!
3//! Provides a simple blocking TCP server for OpenIGTLink communication.
4
5use std::io::{Read, Write};
6use std::net::{TcpListener, TcpStream};
7
8use tracing::{debug, info, trace, warn};
9
10use crate::error::Result;
11use crate::protocol::header::Header;
12use crate::protocol::message::{IgtlMessage, Message};
13use crate::protocol::AnyMessage;
14
15/// Synchronous OpenIGTLink server
16///
17/// Uses blocking I/O with `std::net::TcpListener` for simple, synchronous server implementation.
18pub struct IgtlServer {
19 listener: TcpListener,
20}
21
22impl IgtlServer {
23 /// Bind to a local address and create a server
24 ///
25 /// # Arguments
26 ///
27 /// * `addr` - Local address to bind (e.g., "127.0.0.1:18944")
28 ///
29 /// # Errors
30 ///
31 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to bind (address in use, insufficient permissions, etc.)
32 ///
33 /// # Examples
34 ///
35 /// ```no_run
36 /// use openigtlink_rust::io::IgtlServer;
37 ///
38 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
39 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
40 /// ```
41 pub fn bind(addr: &str) -> Result<Self> {
42 info!(addr = %addr, "Binding OpenIGTLink server");
43 let listener = TcpListener::bind(addr)?;
44 let local_addr = listener.local_addr()?;
45 info!(
46 local_addr = %local_addr,
47 "OpenIGTLink server listening"
48 );
49 Ok(IgtlServer { listener })
50 }
51
52 /// Accept a new client connection
53 ///
54 /// Blocks until a client connects.
55 ///
56 /// # Errors
57 ///
58 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to accept connection
59 ///
60 /// # Examples
61 ///
62 /// ```no_run
63 /// use openigtlink_rust::io::IgtlServer;
64 ///
65 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
66 /// let connection = server.accept()?;
67 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
68 /// ```
69 pub fn accept(&self) -> Result<IgtlConnection> {
70 trace!("Waiting for client connection");
71 let (stream, addr) = self.listener.accept()?;
72 info!(
73 peer_addr = %addr,
74 "Client connected"
75 );
76 Ok(IgtlConnection {
77 stream,
78 verify_crc: true, // Default: verify CRC
79 })
80 }
81
82 /// Get the local address this server is bound to
83 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
84 Ok(self.listener.local_addr()?)
85 }
86}
87
88/// Represents an accepted client connection
89///
90/// Provides methods to send and receive OpenIGTLink messages over the connection.
91pub struct IgtlConnection {
92 stream: TcpStream,
93 verify_crc: bool,
94}
95
96impl IgtlConnection {
97 /// Enable or disable CRC verification for received messages
98 ///
99 /// # Arguments
100 ///
101 /// * `verify` - true to enable CRC verification (default), false to disable
102 ///
103 /// # Safety
104 ///
105 /// Disabling CRC verification should only be done in trusted environments
106 /// where data corruption is unlikely (e.g., loopback, local network).
107 pub fn set_verify_crc(&mut self, verify: bool) {
108 if verify != self.verify_crc {
109 info!(verify = verify, "CRC verification setting changed");
110 if !verify {
111 warn!("CRC verification disabled - use only in trusted environments");
112 }
113 }
114 self.verify_crc = verify;
115 }
116
117 /// Get current CRC verification setting
118 ///
119 /// # Returns
120 ///
121 /// true if CRC verification is enabled, false otherwise
122 pub fn verify_crc(&self) -> bool {
123 self.verify_crc
124 }
125
126 /// Send a message to the connected client
127 ///
128 /// # Arguments
129 ///
130 /// * `msg` - Message to send
131 ///
132 /// # Errors
133 ///
134 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed (connection lost, broken pipe, etc.)
135 /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
136 ///
137 /// # Examples
138 ///
139 /// ```no_run
140 /// use openigtlink_rust::io::IgtlServer;
141 /// use openigtlink_rust::protocol::types::StatusMessage;
142 /// use openigtlink_rust::protocol::message::IgtlMessage;
143 ///
144 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
145 /// let mut conn = server.accept()?;
146 ///
147 /// let status = StatusMessage::ok("Ready");
148 /// let msg = IgtlMessage::new(status, "Server")?;
149 /// conn.send(&msg)?;
150 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
151 /// ```
152 pub fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
153 let data = msg.encode()?;
154 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
155 let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
156
157 debug!(
158 msg_type = msg_type,
159 device_name = device_name,
160 size = data.len(),
161 "Sending message to client"
162 );
163
164 self.stream.write_all(&data)?;
165 self.stream.flush()?;
166
167 trace!(
168 msg_type = msg_type,
169 bytes_sent = data.len(),
170 "Message sent successfully"
171 );
172
173 Ok(())
174 }
175
176 /// Receive a message from the connected client
177 ///
178 /// Blocks until a complete message is received.
179 ///
180 /// # Errors
181 ///
182 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed (connection lost, timeout, etc.)
183 /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
184 /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
185 /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
186 /// - [`IgtlError::InvalidSize`](crate::error::IgtlError::InvalidSize) - Message size mismatch
187 ///
188 /// # Examples
189 ///
190 /// ```no_run
191 /// use openigtlink_rust::io::IgtlServer;
192 /// use openigtlink_rust::protocol::types::TransformMessage;
193 /// use openigtlink_rust::protocol::message::IgtlMessage;
194 ///
195 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
196 /// let mut conn = server.accept()?;
197 ///
198 /// let msg: IgtlMessage<TransformMessage> = conn.receive()?;
199 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
200 /// ```
201 pub fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
202 trace!("Waiting for message header from client");
203
204 // Read header (58 bytes)
205 let mut header_buf = vec![0u8; Header::SIZE];
206 self.stream.read_exact(&mut header_buf)?;
207
208 let header = Header::decode(&header_buf)?;
209
210 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
211 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
212
213 debug!(
214 msg_type = msg_type,
215 device_name = device_name,
216 body_size = header.body_size,
217 version = header.version,
218 "Received message header from client"
219 );
220
221 // Read body
222 let mut body_buf = vec![0u8; header.body_size as usize];
223 self.stream.read_exact(&mut body_buf)?;
224
225 trace!(
226 msg_type = msg_type,
227 bytes_read = body_buf.len(),
228 "Message body received from client"
229 );
230
231 // Decode full message with CRC verification setting
232 let mut full_msg = header_buf;
233 full_msg.extend_from_slice(&body_buf);
234
235 let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
236
237 match &result {
238 Ok(_) => {
239 debug!(
240 msg_type = msg_type,
241 device_name = device_name,
242 "Message decoded successfully"
243 );
244 }
245 Err(e) => {
246 warn!(
247 msg_type = msg_type,
248 error = %e,
249 "Failed to decode message from client"
250 );
251 }
252 }
253
254 result
255 }
256
257 /// Receive any message type dynamically
258 ///
259 /// This method receives a message without knowing its type in advance,
260 /// returning it as an [`AnyMessage`] enum that can be pattern matched.
261 ///
262 /// # Errors
263 ///
264 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed
265 /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Malformed header
266 /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
267 ///
268 /// # Examples
269 ///
270 /// ```no_run
271 /// use openigtlink_rust::io::IgtlServer;
272 /// use openigtlink_rust::protocol::AnyMessage;
273 ///
274 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
275 /// let mut conn = server.accept()?;
276 ///
277 /// let msg = conn.receive_any()?;
278 /// match msg {
279 /// AnyMessage::Transform(_) => println!("Received transform"),
280 /// AnyMessage::Status(_) => println!("Received status"),
281 /// _ => println!("Received other message"),
282 /// }
283 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
284 /// ```
285 pub fn receive_any(&mut self) -> Result<AnyMessage> {
286 trace!("Waiting for any message type from client");
287
288 // Read header (58 bytes)
289 let mut header_buf = vec![0u8; Header::SIZE];
290 self.stream.read_exact(&mut header_buf)?;
291
292 let header = Header::decode(&header_buf)?;
293
294 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
295 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
296
297 debug!(
298 msg_type = msg_type,
299 device_name = device_name,
300 body_size = header.body_size,
301 version = header.version,
302 "Received message header from client"
303 );
304
305 // Read body
306 let mut body_buf = vec![0u8; header.body_size as usize];
307 self.stream.read_exact(&mut body_buf)?;
308
309 trace!(
310 msg_type = msg_type,
311 bytes_read = body_buf.len(),
312 "Message body received from client"
313 );
314
315 // Decode full message with CRC verification setting
316 let mut full_msg = header_buf;
317 full_msg.extend_from_slice(&body_buf);
318
319 let result = AnyMessage::decode_with_options(&full_msg, self.verify_crc);
320
321 match &result {
322 Ok(_) => {
323 debug!(
324 msg_type = msg_type,
325 device_name = device_name,
326 "Message decoded successfully as AnyMessage"
327 );
328 }
329 Err(e) => {
330 warn!(
331 msg_type = msg_type,
332 error = %e,
333 "Failed to decode message from client"
334 );
335 }
336 }
337
338 result
339 }
340
341 /// Set read timeout for the underlying TCP stream
342 ///
343 /// # Arguments
344 ///
345 /// * `timeout` - Timeout duration (None for infinite)
346 pub fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
347 self.stream.set_read_timeout(timeout)?;
348 Ok(())
349 }
350
351 /// Set write timeout for the underlying TCP stream
352 ///
353 /// # Arguments
354 ///
355 /// * `timeout` - Timeout duration (None for infinite)
356 pub fn set_write_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
357 self.stream.set_write_timeout(timeout)?;
358 Ok(())
359 }
360
361 /// Enable or disable TCP_NODELAY (Nagle's algorithm)
362 ///
363 /// Wrapper around [`std::net::TcpStream::set_nodelay`].
364 pub fn set_nodelay(&self, nodelay: bool) -> Result<()> {
365 self.stream.set_nodelay(nodelay)?;
366 Ok(())
367 }
368
369 /// Set the size of the TCP receive buffer (SO_RCVBUF)
370 pub fn set_recv_buffer_size(&self, size: usize) -> Result<()> {
371 #[cfg(unix)]
372 {
373 use std::os::fd::AsRawFd;
374
375 let fd = self.stream.as_raw_fd();
376 let size = size as libc::c_int;
377
378 unsafe {
379 let ret = libc::setsockopt(
380 fd,
381 libc::SOL_SOCKET,
382 libc::SO_RCVBUF,
383 &size as *const _ as *const libc::c_void,
384 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
385 );
386
387 if ret != 0 {
388 return Err(std::io::Error::last_os_error().into());
389 }
390 }
391 }
392
393 #[cfg(windows)]
394 {
395 use std::os::windows::io::AsRawSocket;
396
397 // Windows Winsock constants
398 const SOL_SOCKET: libc::c_int = 0xffff;
399 const SO_RCVBUF: libc::c_int = 0x1002;
400
401 let socket = self.stream.as_raw_socket();
402 let size = size as libc::c_int;
403
404 unsafe {
405 let ret = libc::setsockopt(
406 socket as libc::SOCKET,
407 SOL_SOCKET,
408 SO_RCVBUF,
409 &size as *const _ as *const libc::c_char,
410 std::mem::size_of::<libc::c_int>() as libc::c_int,
411 );
412
413 if ret != 0 {
414 return Err(std::io::Error::last_os_error().into());
415 }
416 }
417 }
418
419 Ok(())
420 }
421
422 /// Set the size of the TCP send buffer (SO_SNDBUF)
423 pub fn set_send_buffer_size(&self, size: usize) -> Result<()> {
424 #[cfg(unix)]
425 {
426 use std::os::fd::AsRawFd;
427
428 let fd = self.stream.as_raw_fd();
429 let size = size as libc::c_int;
430
431 unsafe {
432 let ret = libc::setsockopt(
433 fd,
434 libc::SOL_SOCKET,
435 libc::SO_SNDBUF,
436 &size as *const _ as *const libc::c_void,
437 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
438 );
439
440 if ret != 0 {
441 return Err(std::io::Error::last_os_error().into());
442 }
443 }
444 }
445
446 #[cfg(windows)]
447 {
448 use std::os::windows::io::AsRawSocket;
449
450 // Windows Winsock constants
451 const SOL_SOCKET: libc::c_int = 0xffff;
452 const SO_SNDBUF: libc::c_int = 0x1001;
453
454 let socket = self.stream.as_raw_socket();
455 let size = size as libc::c_int;
456
457 unsafe {
458 let ret = libc::setsockopt(
459 socket as libc::SOCKET,
460 SOL_SOCKET,
461 SO_SNDBUF,
462 &size as *const _ as *const libc::c_char,
463 std::mem::size_of::<libc::c_int>() as libc::c_int,
464 );
465
466 if ret != 0 {
467 return Err(std::io::Error::last_os_error().into());
468 }
469 }
470 }
471
472 Ok(())
473 }
474
475 /// Get the current TCP_NODELAY setting
476 pub fn nodelay(&self) -> Result<bool> {
477 Ok(self.stream.nodelay()?)
478 }
479
480 /// Get the remote peer address
481 pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
482 Ok(self.stream.peer_addr()?)
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 // Integration tests require multi-threaded setup
489 // See examples/ for end-to-end testing
490}