openigtlink_rust/io/server.rs
1//! Synchronous OpenIGTLink server implementation
2//!
3//! Provides a simple blocking TCP server for OpenIGTLink communication.
4
5use std::io::{Read, Write};
6use std::net::{TcpListener, TcpStream};
7
8use tracing::{debug, info, trace, warn};
9
10use crate::error::Result;
11use crate::protocol::header::Header;
12use crate::protocol::message::{IgtlMessage, Message};
13
14/// Synchronous OpenIGTLink server
15///
16/// Uses blocking I/O with `std::net::TcpListener` for simple, synchronous server implementation.
17pub struct IgtlServer {
18 listener: TcpListener,
19}
20
21impl IgtlServer {
22 /// Bind to a local address and create a server
23 ///
24 /// # Arguments
25 ///
26 /// * `addr` - Local address to bind (e.g., "127.0.0.1:18944")
27 ///
28 /// # Errors
29 ///
30 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to bind (address in use, insufficient permissions, etc.)
31 ///
32 /// # Examples
33 ///
34 /// ```no_run
35 /// use openigtlink_rust::io::IgtlServer;
36 ///
37 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
38 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
39 /// ```
40 pub fn bind(addr: &str) -> Result<Self> {
41 info!(addr = %addr, "Binding OpenIGTLink server");
42 let listener = TcpListener::bind(addr)?;
43 let local_addr = listener.local_addr()?;
44 info!(
45 local_addr = %local_addr,
46 "OpenIGTLink server listening"
47 );
48 Ok(IgtlServer { listener })
49 }
50
51 /// Accept a new client connection
52 ///
53 /// Blocks until a client connects.
54 ///
55 /// # Errors
56 ///
57 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to accept connection
58 ///
59 /// # Examples
60 ///
61 /// ```no_run
62 /// use openigtlink_rust::io::IgtlServer;
63 ///
64 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
65 /// let connection = server.accept()?;
66 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
67 /// ```
68 pub fn accept(&self) -> Result<IgtlConnection> {
69 trace!("Waiting for client connection");
70 let (stream, addr) = self.listener.accept()?;
71 info!(
72 peer_addr = %addr,
73 "Client connected"
74 );
75 Ok(IgtlConnection {
76 stream,
77 verify_crc: true, // Default: verify CRC
78 })
79 }
80
81 /// Get the local address this server is bound to
82 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
83 Ok(self.listener.local_addr()?)
84 }
85}
86
87/// Represents an accepted client connection
88///
89/// Provides methods to send and receive OpenIGTLink messages over the connection.
90pub struct IgtlConnection {
91 stream: TcpStream,
92 verify_crc: bool,
93}
94
95impl IgtlConnection {
96 /// Enable or disable CRC verification for received messages
97 ///
98 /// # Arguments
99 ///
100 /// * `verify` - true to enable CRC verification (default), false to disable
101 ///
102 /// # Safety
103 ///
104 /// Disabling CRC verification should only be done in trusted environments
105 /// where data corruption is unlikely (e.g., loopback, local network).
106 pub fn set_verify_crc(&mut self, verify: bool) {
107 if verify != self.verify_crc {
108 info!(verify = verify, "CRC verification setting changed");
109 if !verify {
110 warn!("CRC verification disabled - use only in trusted environments");
111 }
112 }
113 self.verify_crc = verify;
114 }
115
116 /// Get current CRC verification setting
117 ///
118 /// # Returns
119 ///
120 /// true if CRC verification is enabled, false otherwise
121 pub fn verify_crc(&self) -> bool {
122 self.verify_crc
123 }
124
125 /// Send a message to the connected client
126 ///
127 /// # Arguments
128 ///
129 /// * `msg` - Message to send
130 ///
131 /// # Errors
132 ///
133 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed (connection lost, broken pipe, etc.)
134 /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
135 ///
136 /// # Examples
137 ///
138 /// ```no_run
139 /// use openigtlink_rust::io::IgtlServer;
140 /// use openigtlink_rust::protocol::types::StatusMessage;
141 /// use openigtlink_rust::protocol::message::IgtlMessage;
142 ///
143 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
144 /// let mut conn = server.accept()?;
145 ///
146 /// let status = StatusMessage::ok("Ready");
147 /// let msg = IgtlMessage::new(status, "Server")?;
148 /// conn.send(&msg)?;
149 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
150 /// ```
151 pub fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
152 let data = msg.encode()?;
153 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
154 let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
155
156 debug!(
157 msg_type = msg_type,
158 device_name = device_name,
159 size = data.len(),
160 "Sending message to client"
161 );
162
163 self.stream.write_all(&data)?;
164 self.stream.flush()?;
165
166 trace!(
167 msg_type = msg_type,
168 bytes_sent = data.len(),
169 "Message sent successfully"
170 );
171
172 Ok(())
173 }
174
175 /// Receive a message from the connected client
176 ///
177 /// Blocks until a complete message is received.
178 ///
179 /// # Errors
180 ///
181 /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed (connection lost, timeout, etc.)
182 /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
183 /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
184 /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
185 /// - [`IgtlError::InvalidSize`](crate::error::IgtlError::InvalidSize) - Message size mismatch
186 ///
187 /// # Examples
188 ///
189 /// ```no_run
190 /// use openigtlink_rust::io::IgtlServer;
191 /// use openigtlink_rust::protocol::types::TransformMessage;
192 /// use openigtlink_rust::protocol::message::IgtlMessage;
193 ///
194 /// let server = IgtlServer::bind("127.0.0.1:18944")?;
195 /// let mut conn = server.accept()?;
196 ///
197 /// let msg: IgtlMessage<TransformMessage> = conn.receive()?;
198 /// # Ok::<(), openigtlink_rust::error::IgtlError>(())
199 /// ```
200 pub fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
201 trace!("Waiting for message header from client");
202
203 // Read header (58 bytes)
204 let mut header_buf = vec![0u8; Header::SIZE];
205 self.stream.read_exact(&mut header_buf)?;
206
207 let header = Header::decode(&header_buf)?;
208
209 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
210 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
211
212 debug!(
213 msg_type = msg_type,
214 device_name = device_name,
215 body_size = header.body_size,
216 version = header.version,
217 "Received message header from client"
218 );
219
220 // Read body
221 let mut body_buf = vec![0u8; header.body_size as usize];
222 self.stream.read_exact(&mut body_buf)?;
223
224 trace!(
225 msg_type = msg_type,
226 bytes_read = body_buf.len(),
227 "Message body received from client"
228 );
229
230 // Decode full message with CRC verification setting
231 let mut full_msg = header_buf;
232 full_msg.extend_from_slice(&body_buf);
233
234 let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
235
236 match &result {
237 Ok(_) => {
238 debug!(
239 msg_type = msg_type,
240 device_name = device_name,
241 "Message decoded successfully"
242 );
243 }
244 Err(e) => {
245 warn!(
246 msg_type = msg_type,
247 error = %e,
248 "Failed to decode message from client"
249 );
250 }
251 }
252
253 result
254 }
255
256 /// Set read timeout for the underlying TCP stream
257 ///
258 /// # Arguments
259 ///
260 /// * `timeout` - Timeout duration (None for infinite)
261 pub fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
262 self.stream.set_read_timeout(timeout)?;
263 Ok(())
264 }
265
266 /// Set write timeout for the underlying TCP stream
267 ///
268 /// # Arguments
269 ///
270 /// * `timeout` - Timeout duration (None for infinite)
271 pub fn set_write_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
272 self.stream.set_write_timeout(timeout)?;
273 Ok(())
274 }
275
276 /// Enable or disable TCP_NODELAY (Nagle's algorithm)
277 ///
278 /// Wrapper around [`std::net::TcpStream::set_nodelay`].
279 pub fn set_nodelay(&self, nodelay: bool) -> Result<()> {
280 self.stream.set_nodelay(nodelay)?;
281 Ok(())
282 }
283
284 /// Set the size of the TCP receive buffer (SO_RCVBUF)
285 pub fn set_recv_buffer_size(&self, size: usize) -> Result<()> {
286 #[cfg(unix)]
287 {
288 use std::os::fd::AsRawFd;
289
290 let fd = self.stream.as_raw_fd();
291 let size = size as libc::c_int;
292
293 unsafe {
294 let ret = libc::setsockopt(
295 fd,
296 libc::SOL_SOCKET,
297 libc::SO_RCVBUF,
298 &size as *const _ as *const libc::c_void,
299 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
300 );
301
302 if ret != 0 {
303 return Err(std::io::Error::last_os_error().into());
304 }
305 }
306 }
307
308 #[cfg(windows)]
309 {
310 use std::os::windows::io::AsRawSocket;
311
312 // Windows Winsock constants
313 const SOL_SOCKET: libc::c_int = 0xffff;
314 const SO_RCVBUF: libc::c_int = 0x1002;
315
316 let socket = self.stream.as_raw_socket();
317 let size = size as libc::c_int;
318
319 unsafe {
320 let ret = libc::setsockopt(
321 socket as libc::SOCKET,
322 SOL_SOCKET,
323 SO_RCVBUF,
324 &size as *const _ as *const libc::c_char,
325 std::mem::size_of::<libc::c_int>() as libc::c_int,
326 );
327
328 if ret != 0 {
329 return Err(std::io::Error::last_os_error().into());
330 }
331 }
332 }
333
334 Ok(())
335 }
336
337 /// Set the size of the TCP send buffer (SO_SNDBUF)
338 pub fn set_send_buffer_size(&self, size: usize) -> Result<()> {
339 #[cfg(unix)]
340 {
341 use std::os::fd::AsRawFd;
342
343 let fd = self.stream.as_raw_fd();
344 let size = size as libc::c_int;
345
346 unsafe {
347 let ret = libc::setsockopt(
348 fd,
349 libc::SOL_SOCKET,
350 libc::SO_SNDBUF,
351 &size as *const _ as *const libc::c_void,
352 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
353 );
354
355 if ret != 0 {
356 return Err(std::io::Error::last_os_error().into());
357 }
358 }
359 }
360
361 #[cfg(windows)]
362 {
363 use std::os::windows::io::AsRawSocket;
364
365 // Windows Winsock constants
366 const SOL_SOCKET: libc::c_int = 0xffff;
367 const SO_SNDBUF: libc::c_int = 0x1001;
368
369 let socket = self.stream.as_raw_socket();
370 let size = size as libc::c_int;
371
372 unsafe {
373 let ret = libc::setsockopt(
374 socket as libc::SOCKET,
375 SOL_SOCKET,
376 SO_SNDBUF,
377 &size as *const _ as *const libc::c_char,
378 std::mem::size_of::<libc::c_int>() as libc::c_int,
379 );
380
381 if ret != 0 {
382 return Err(std::io::Error::last_os_error().into());
383 }
384 }
385 }
386
387 Ok(())
388 }
389
390 /// Get the current TCP_NODELAY setting
391 pub fn nodelay(&self) -> Result<bool> {
392 Ok(self.stream.nodelay()?)
393 }
394
395 /// Get the remote peer address
396 pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
397 Ok(self.stream.peer_addr()?)
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 // Integration tests require multi-threaded setup
404 // See examples/ for end-to-end testing
405}