openigtlink_rust/io/
async_server.rs

1//! Asynchronous OpenIGTLink server implementation
2//!
3//! Provides a non-blocking, async/await-based server for OpenIGTLink communication.
4
5use crate::error::Result;
6use crate::protocol::header::Header;
7use crate::protocol::message::{IgtlMessage, Message};
8use crate::protocol::AnyMessage;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::{TcpListener, TcpStream};
11use tracing::{debug, info, trace, warn};
12
13/// Asynchronous OpenIGTLink server
14///
15/// Uses non-blocking I/O with Tokio for high-concurrency scenarios.
16///
17/// # Examples
18///
19/// ```no_run
20/// use openigtlink_rust::io::AsyncIgtlServer;
21///
22/// #[tokio::main]
23/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
24///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
25///     let connection = server.accept().await?;
26///     Ok(())
27/// }
28/// ```
29pub struct AsyncIgtlServer {
30    listener: TcpListener,
31}
32
33impl AsyncIgtlServer {
34    /// Bind to a local address and create a server asynchronously
35    ///
36    /// # Arguments
37    ///
38    /// * `addr` - Local address to bind (e.g., "127.0.0.1:18944")
39    ///
40    /// # Errors
41    ///
42    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to bind
43    ///
44    /// # Examples
45    ///
46    /// ```no_run
47    /// use openigtlink_rust::io::AsyncIgtlServer;
48    ///
49    /// #[tokio::main]
50    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
51    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
52    ///     Ok(())
53    /// }
54    /// ```
55    pub async fn bind(addr: &str) -> Result<Self> {
56        info!(addr = %addr, "Binding OpenIGTLink server (async)");
57        let listener = TcpListener::bind(addr).await?;
58        let local_addr = listener.local_addr()?;
59        info!(
60            local_addr = %local_addr,
61            "OpenIGTLink server listening (async)"
62        );
63        Ok(AsyncIgtlServer { listener })
64    }
65
66    /// Accept a new client connection asynchronously
67    ///
68    /// # Errors
69    ///
70    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to accept connection
71    ///
72    /// # Examples
73    ///
74    /// ```no_run
75    /// use openigtlink_rust::io::AsyncIgtlServer;
76    ///
77    /// #[tokio::main]
78    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
79    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
80    ///     let connection = server.accept().await?;
81    ///     Ok(())
82    /// }
83    /// ```
84    pub async fn accept(&self) -> Result<AsyncIgtlConnection> {
85        trace!("Waiting for client connection (async)");
86        let (stream, addr) = self.listener.accept().await?;
87        info!(
88            peer_addr = %addr,
89            "Client connected (async)"
90        );
91        Ok(AsyncIgtlConnection {
92            stream,
93            verify_crc: true,
94        })
95    }
96
97    /// Get the local address this server is bound to
98    pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
99        Ok(self.listener.local_addr()?)
100    }
101}
102
103/// Represents an accepted client connection (async)
104///
105/// Provides methods to send and receive OpenIGTLink messages asynchronously.
106pub struct AsyncIgtlConnection {
107    stream: TcpStream,
108    verify_crc: bool,
109}
110
111impl AsyncIgtlConnection {
112    /// Enable or disable CRC verification for received messages
113    ///
114    /// # Arguments
115    ///
116    /// * `verify` - true to enable CRC verification (default), false to disable
117    ///
118    /// # Safety
119    ///
120    /// Disabling CRC verification should only be done in trusted environments
121    /// where data corruption is unlikely (e.g., loopback, local network).
122    pub fn set_verify_crc(&mut self, verify: bool) {
123        if verify != self.verify_crc {
124            info!(verify = verify, "CRC verification setting changed");
125            if !verify {
126                warn!("CRC verification disabled - use only in trusted environments");
127            }
128        }
129        self.verify_crc = verify;
130    }
131
132    /// Get current CRC verification setting
133    pub fn verify_crc(&self) -> bool {
134        self.verify_crc
135    }
136
137    /// Send a message to the connected client asynchronously
138    ///
139    /// # Arguments
140    ///
141    /// * `msg` - Message to send
142    ///
143    /// # Errors
144    ///
145    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed
146    /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
147    ///
148    /// # Examples
149    ///
150    /// ```no_run
151    /// use openigtlink_rust::io::AsyncIgtlServer;
152    /// use openigtlink_rust::protocol::types::StatusMessage;
153    /// use openigtlink_rust::protocol::message::IgtlMessage;
154    ///
155    /// #[tokio::main]
156    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
157    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
158    ///     let mut conn = server.accept().await?;
159    ///
160    ///     let status = StatusMessage::ok("Ready");
161    ///     let msg = IgtlMessage::new(status, "Server")?;
162    ///     conn.send(&msg).await?;
163    ///
164    ///     Ok(())
165    /// }
166    /// ```
167    pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
168        let data = msg.encode()?;
169        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
170        let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
171
172        debug!(
173            msg_type = msg_type,
174            device_name = device_name,
175            size = data.len(),
176            "Sending message to client (async)"
177        );
178
179        self.stream.write_all(&data).await?;
180        self.stream.flush().await?;
181
182        trace!(
183            msg_type = msg_type,
184            bytes_sent = data.len(),
185            "Message sent successfully (async)"
186        );
187
188        Ok(())
189    }
190
191    /// Receive a message from the connected client asynchronously
192    ///
193    /// # Errors
194    ///
195    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed
196    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
197    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
198    /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
199    ///
200    /// # Examples
201    ///
202    /// ```no_run
203    /// use openigtlink_rust::io::AsyncIgtlServer;
204    /// use openigtlink_rust::protocol::types::TransformMessage;
205    /// use openigtlink_rust::protocol::message::IgtlMessage;
206    ///
207    /// #[tokio::main]
208    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
209    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
210    ///     let mut conn = server.accept().await?;
211    ///
212    ///     let msg: IgtlMessage<TransformMessage> = conn.receive().await?;
213    ///     Ok(())
214    /// }
215    /// ```
216    pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
217        trace!("Waiting for message header from client (async)");
218
219        let mut header_buf = vec![0u8; Header::SIZE];
220        self.stream.read_exact(&mut header_buf).await?;
221
222        let header = Header::decode(&header_buf)?;
223
224        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
225        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
226
227        debug!(
228            msg_type = msg_type,
229            device_name = device_name,
230            body_size = header.body_size,
231            version = header.version,
232            "Received message header from client (async)"
233        );
234
235        let mut body_buf = vec![0u8; header.body_size as usize];
236        self.stream.read_exact(&mut body_buf).await?;
237
238        trace!(
239            msg_type = msg_type,
240            bytes_read = body_buf.len(),
241            "Message body received from client (async)"
242        );
243
244        let mut full_msg = header_buf;
245        full_msg.extend_from_slice(&body_buf);
246
247        let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
248
249        match &result {
250            Ok(_) => {
251                debug!(
252                    msg_type = msg_type,
253                    device_name = device_name,
254                    "Message decoded successfully (async)"
255                );
256            }
257            Err(e) => {
258                warn!(
259                    msg_type = msg_type,
260                    error = %e,
261                    "Failed to decode message from client (async)"
262                );
263            }
264        }
265
266        result
267    }
268
269    /// Receive any message type dynamically (async)
270    ///
271    /// This method receives a message without knowing its type in advance,
272    /// returning it as an [`AnyMessage`] enum that can be pattern matched.
273    ///
274    /// # Errors
275    ///
276    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed
277    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Malformed header
278    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
279    ///
280    /// # Examples
281    ///
282    /// ```no_run
283    /// use openigtlink_rust::io::AsyncIgtlServer;
284    /// use openigtlink_rust::protocol::AnyMessage;
285    ///
286    /// #[tokio::main]
287    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
288    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
289    ///     let mut conn = server.accept().await?;
290    ///
291    ///     let msg = conn.receive_any().await?;
292    ///     match msg {
293    ///         AnyMessage::Transform(_) => println!("Received transform"),
294    ///         AnyMessage::Status(_) => println!("Received status"),
295    ///         _ => println!("Received other message"),
296    ///     }
297    ///
298    ///     Ok(())
299    /// }
300    /// ```
301    pub async fn receive_any(&mut self) -> Result<AnyMessage> {
302        trace!("Waiting for any message type from client (async)");
303
304        let mut header_buf = vec![0u8; Header::SIZE];
305        self.stream.read_exact(&mut header_buf).await?;
306
307        let header = Header::decode(&header_buf)?;
308
309        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
310        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
311
312        debug!(
313            msg_type = msg_type,
314            device_name = device_name,
315            body_size = header.body_size,
316            version = header.version,
317            "Received message header from client (async)"
318        );
319
320        let mut body_buf = vec![0u8; header.body_size as usize];
321        self.stream.read_exact(&mut body_buf).await?;
322
323        trace!(
324            msg_type = msg_type,
325            bytes_read = body_buf.len(),
326            "Message body received from client (async)"
327        );
328
329        let mut full_msg = header_buf;
330        full_msg.extend_from_slice(&body_buf);
331
332        let result = AnyMessage::decode_with_options(&full_msg, self.verify_crc);
333
334        match &result {
335            Ok(_) => {
336                debug!(
337                    msg_type = msg_type,
338                    device_name = device_name,
339                    "Message decoded successfully as AnyMessage (async)"
340                );
341            }
342            Err(e) => {
343                warn!(
344                    msg_type = msg_type,
345                    error = %e,
346                    "Failed to decode message from client (async)"
347                );
348            }
349        }
350
351        result
352    }
353
354    /// Enable or disable TCP_NODELAY (Nagle's algorithm)
355    pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
356        self.stream.set_nodelay(nodelay)?;
357        debug!(nodelay = nodelay, "TCP_NODELAY configured (async)");
358        Ok(())
359    }
360
361    /// Get the current TCP_NODELAY setting
362    pub async fn nodelay(&self) -> Result<bool> {
363        Ok(self.stream.nodelay()?)
364    }
365
366    /// Get the remote peer address
367    pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
368        Ok(self.stream.peer_addr()?)
369    }
370
371    /// Split the connection into read and write halves
372    ///
373    /// This allows concurrent reading and writing on separate tasks.
374    pub fn into_split(self) -> (AsyncIgtlConnectionReader, AsyncIgtlConnectionWriter) {
375        let (reader, writer) = self.stream.into_split();
376        (
377            AsyncIgtlConnectionReader {
378                reader,
379                verify_crc: self.verify_crc,
380            },
381            AsyncIgtlConnectionWriter { writer },
382        )
383    }
384}
385
386/// Read half of an async OpenIGTLink connection
387pub struct AsyncIgtlConnectionReader {
388    reader: tokio::net::tcp::OwnedReadHalf,
389    verify_crc: bool,
390}
391
392impl AsyncIgtlConnectionReader {
393    /// Receive a message from the read half
394    pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
395        trace!("Waiting for message header (async connection reader)");
396
397        let mut header_buf = vec![0u8; Header::SIZE];
398        self.reader.read_exact(&mut header_buf).await?;
399
400        let header = Header::decode(&header_buf)?;
401
402        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
403
404        debug!(
405            msg_type = msg_type,
406            body_size = header.body_size,
407            "Received message header (async connection reader)"
408        );
409
410        let mut body_buf = vec![0u8; header.body_size as usize];
411        self.reader.read_exact(&mut body_buf).await?;
412
413        let mut full_msg = header_buf;
414        full_msg.extend_from_slice(&body_buf);
415
416        IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
417    }
418}
419
420/// Write half of an async OpenIGTLink connection
421pub struct AsyncIgtlConnectionWriter {
422    writer: tokio::net::tcp::OwnedWriteHalf,
423}
424
425impl AsyncIgtlConnectionWriter {
426    /// Send a message to the write half
427    pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
428        let data = msg.encode()?;
429        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
430
431        debug!(
432            msg_type = msg_type,
433            size = data.len(),
434            "Sending message (async connection writer)"
435        );
436
437        self.writer.write_all(&data).await?;
438        self.writer.flush().await?;
439
440        trace!(
441            msg_type = msg_type,
442            bytes_sent = data.len(),
443            "Message sent (async connection writer)"
444        );
445
446        Ok(())
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::protocol::types::StatusMessage;
454    use tokio::time::Duration;
455
456    #[tokio::test]
457    async fn test_async_server_bind() {
458        let server = AsyncIgtlServer::bind("127.0.0.1:0").await;
459        assert!(server.is_ok());
460    }
461
462    #[tokio::test]
463    async fn test_async_server_local_addr() {
464        let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
465        let addr = server.local_addr().unwrap();
466        assert_eq!(addr.ip().to_string(), "127.0.0.1");
467    }
468
469    #[tokio::test]
470    async fn test_async_server_client_communication() {
471        // Create server
472        let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
473        let addr = server.local_addr().unwrap();
474
475        // Spawn server task
476        tokio::spawn(async move {
477            let mut conn = server.accept().await.unwrap();
478
479            // Receive message
480            let msg: IgtlMessage<StatusMessage> = conn.receive().await.unwrap();
481            assert_eq!(msg.content.status_string, "Hello from client");
482
483            // Send response
484            let response = StatusMessage::ok("Hello from server");
485            let response_msg = IgtlMessage::new(response, "Server").unwrap();
486            conn.send(&response_msg).await.unwrap();
487        });
488
489        tokio::time::sleep(Duration::from_millis(10)).await;
490
491        // Connect client
492        use crate::io::ClientBuilder;
493        let mut client = ClientBuilder::new()
494            .tcp(addr.to_string())
495            .async_mode()
496            .build()
497            .await
498            .unwrap();
499
500        // Send message
501        let status = StatusMessage::ok("Hello from client");
502        let msg = IgtlMessage::new(status, "Client").unwrap();
503        client.send(&msg).await.unwrap();
504
505        // Receive response
506        let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
507        assert_eq!(response.content.status_string, "Hello from server");
508    }
509
510    #[tokio::test]
511    async fn test_async_connection_split() {
512        let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
513        let addr = server.local_addr().unwrap();
514
515        tokio::spawn(async move {
516            let conn = server.accept().await.unwrap();
517            let (mut reader, mut writer) = conn.into_split();
518
519            // Receive and echo back
520            let msg: IgtlMessage<StatusMessage> = reader.receive().await.unwrap();
521            let echo = IgtlMessage::new(msg.content, "Echo").unwrap();
522            writer.send(&echo).await.unwrap();
523        });
524
525        tokio::time::sleep(Duration::from_millis(10)).await;
526
527        use crate::io::ClientBuilder;
528        let mut client = ClientBuilder::new()
529            .tcp(addr.to_string())
530            .async_mode()
531            .build()
532            .await
533            .unwrap();
534
535        let status = StatusMessage::ok("Echo test");
536        let msg = IgtlMessage::new(status, "Client").unwrap();
537        client.send(&msg).await.unwrap();
538
539        let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
540        assert_eq!(response.content.status_string, "Echo test");
541    }
542}