openigtlink_rust/io/
async_server.rs

1//! Asynchronous OpenIGTLink server implementation
2//!
3//! Provides a non-blocking, async/await-based server for OpenIGTLink communication.
4
5use crate::error::Result;
6use crate::protocol::header::Header;
7use crate::protocol::message::{IgtlMessage, Message};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream};
10use tracing::{debug, info, trace, warn};
11
12/// Asynchronous OpenIGTLink server
13///
14/// Uses non-blocking I/O with Tokio for high-concurrency scenarios.
15///
16/// # Examples
17///
18/// ```no_run
19/// use openigtlink_rust::io::AsyncIgtlServer;
20///
21/// #[tokio::main]
22/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
23///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
24///     let connection = server.accept().await?;
25///     Ok(())
26/// }
27/// ```
28pub struct AsyncIgtlServer {
29    listener: TcpListener,
30}
31
32impl AsyncIgtlServer {
33    /// Bind to a local address and create a server asynchronously
34    ///
35    /// # Arguments
36    ///
37    /// * `addr` - Local address to bind (e.g., "127.0.0.1:18944")
38    ///
39    /// # Errors
40    ///
41    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to bind
42    ///
43    /// # Examples
44    ///
45    /// ```no_run
46    /// use openigtlink_rust::io::AsyncIgtlServer;
47    ///
48    /// #[tokio::main]
49    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
50    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
51    ///     Ok(())
52    /// }
53    /// ```
54    pub async fn bind(addr: &str) -> Result<Self> {
55        info!(addr = %addr, "Binding OpenIGTLink server (async)");
56        let listener = TcpListener::bind(addr).await?;
57        let local_addr = listener.local_addr()?;
58        info!(
59            local_addr = %local_addr,
60            "OpenIGTLink server listening (async)"
61        );
62        Ok(AsyncIgtlServer { listener })
63    }
64
65    /// Accept a new client connection asynchronously
66    ///
67    /// # Errors
68    ///
69    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to accept connection
70    ///
71    /// # Examples
72    ///
73    /// ```no_run
74    /// use openigtlink_rust::io::AsyncIgtlServer;
75    ///
76    /// #[tokio::main]
77    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
78    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
79    ///     let connection = server.accept().await?;
80    ///     Ok(())
81    /// }
82    /// ```
83    pub async fn accept(&self) -> Result<AsyncIgtlConnection> {
84        trace!("Waiting for client connection (async)");
85        let (stream, addr) = self.listener.accept().await?;
86        info!(
87            peer_addr = %addr,
88            "Client connected (async)"
89        );
90        Ok(AsyncIgtlConnection {
91            stream,
92            verify_crc: true,
93        })
94    }
95
96    /// Get the local address this server is bound to
97    pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
98        Ok(self.listener.local_addr()?)
99    }
100}
101
102/// Represents an accepted client connection (async)
103///
104/// Provides methods to send and receive OpenIGTLink messages asynchronously.
105pub struct AsyncIgtlConnection {
106    stream: TcpStream,
107    verify_crc: bool,
108}
109
110impl AsyncIgtlConnection {
111    /// Enable or disable CRC verification for received messages
112    ///
113    /// # Arguments
114    ///
115    /// * `verify` - true to enable CRC verification (default), false to disable
116    ///
117    /// # Safety
118    ///
119    /// Disabling CRC verification should only be done in trusted environments
120    /// where data corruption is unlikely (e.g., loopback, local network).
121    pub fn set_verify_crc(&mut self, verify: bool) {
122        if verify != self.verify_crc {
123            info!(verify = verify, "CRC verification setting changed");
124            if !verify {
125                warn!("CRC verification disabled - use only in trusted environments");
126            }
127        }
128        self.verify_crc = verify;
129    }
130
131    /// Get current CRC verification setting
132    pub fn verify_crc(&self) -> bool {
133        self.verify_crc
134    }
135
136    /// Send a message to the connected client asynchronously
137    ///
138    /// # Arguments
139    ///
140    /// * `msg` - Message to send
141    ///
142    /// # Errors
143    ///
144    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed
145    /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
146    ///
147    /// # Examples
148    ///
149    /// ```no_run
150    /// use openigtlink_rust::io::AsyncIgtlServer;
151    /// use openigtlink_rust::protocol::types::StatusMessage;
152    /// use openigtlink_rust::protocol::message::IgtlMessage;
153    ///
154    /// #[tokio::main]
155    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
156    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
157    ///     let mut conn = server.accept().await?;
158    ///
159    ///     let status = StatusMessage::ok("Ready");
160    ///     let msg = IgtlMessage::new(status, "Server")?;
161    ///     conn.send(&msg).await?;
162    ///
163    ///     Ok(())
164    /// }
165    /// ```
166    pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
167        let data = msg.encode()?;
168        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
169        let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
170
171        debug!(
172            msg_type = msg_type,
173            device_name = device_name,
174            size = data.len(),
175            "Sending message to client (async)"
176        );
177
178        self.stream.write_all(&data).await?;
179        self.stream.flush().await?;
180
181        trace!(
182            msg_type = msg_type,
183            bytes_sent = data.len(),
184            "Message sent successfully (async)"
185        );
186
187        Ok(())
188    }
189
190    /// Receive a message from the connected client asynchronously
191    ///
192    /// # Errors
193    ///
194    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed
195    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
196    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
197    /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// use openigtlink_rust::io::AsyncIgtlServer;
203    /// use openigtlink_rust::protocol::types::TransformMessage;
204    /// use openigtlink_rust::protocol::message::IgtlMessage;
205    ///
206    /// #[tokio::main]
207    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
208    ///     let server = AsyncIgtlServer::bind("127.0.0.1:18944").await?;
209    ///     let mut conn = server.accept().await?;
210    ///
211    ///     let msg: IgtlMessage<TransformMessage> = conn.receive().await?;
212    ///     Ok(())
213    /// }
214    /// ```
215    pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
216        trace!("Waiting for message header from client (async)");
217
218        let mut header_buf = vec![0u8; Header::SIZE];
219        self.stream.read_exact(&mut header_buf).await?;
220
221        let header = Header::decode(&header_buf)?;
222
223        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
224        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
225
226        debug!(
227            msg_type = msg_type,
228            device_name = device_name,
229            body_size = header.body_size,
230            version = header.version,
231            "Received message header from client (async)"
232        );
233
234        let mut body_buf = vec![0u8; header.body_size as usize];
235        self.stream.read_exact(&mut body_buf).await?;
236
237        trace!(
238            msg_type = msg_type,
239            bytes_read = body_buf.len(),
240            "Message body received from client (async)"
241        );
242
243        let mut full_msg = header_buf;
244        full_msg.extend_from_slice(&body_buf);
245
246        let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
247
248        match &result {
249            Ok(_) => {
250                debug!(
251                    msg_type = msg_type,
252                    device_name = device_name,
253                    "Message decoded successfully (async)"
254                );
255            }
256            Err(e) => {
257                warn!(
258                    msg_type = msg_type,
259                    error = %e,
260                    "Failed to decode message from client (async)"
261                );
262            }
263        }
264
265        result
266    }
267
268    /// Enable or disable TCP_NODELAY (Nagle's algorithm)
269    pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
270        self.stream.set_nodelay(nodelay)?;
271        debug!(nodelay = nodelay, "TCP_NODELAY configured (async)");
272        Ok(())
273    }
274
275    /// Get the current TCP_NODELAY setting
276    pub async fn nodelay(&self) -> Result<bool> {
277        Ok(self.stream.nodelay()?)
278    }
279
280    /// Get the remote peer address
281    pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
282        Ok(self.stream.peer_addr()?)
283    }
284
285    /// Split the connection into read and write halves
286    ///
287    /// This allows concurrent reading and writing on separate tasks.
288    pub fn into_split(self) -> (AsyncIgtlConnectionReader, AsyncIgtlConnectionWriter) {
289        let (reader, writer) = self.stream.into_split();
290        (
291            AsyncIgtlConnectionReader {
292                reader,
293                verify_crc: self.verify_crc,
294            },
295            AsyncIgtlConnectionWriter { writer },
296        )
297    }
298}
299
300/// Read half of an async OpenIGTLink connection
301pub struct AsyncIgtlConnectionReader {
302    reader: tokio::net::tcp::OwnedReadHalf,
303    verify_crc: bool,
304}
305
306impl AsyncIgtlConnectionReader {
307    /// Receive a message from the read half
308    pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
309        trace!("Waiting for message header (async connection reader)");
310
311        let mut header_buf = vec![0u8; Header::SIZE];
312        self.reader.read_exact(&mut header_buf).await?;
313
314        let header = Header::decode(&header_buf)?;
315
316        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
317
318        debug!(
319            msg_type = msg_type,
320            body_size = header.body_size,
321            "Received message header (async connection reader)"
322        );
323
324        let mut body_buf = vec![0u8; header.body_size as usize];
325        self.reader.read_exact(&mut body_buf).await?;
326
327        let mut full_msg = header_buf;
328        full_msg.extend_from_slice(&body_buf);
329
330        IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
331    }
332}
333
334/// Write half of an async OpenIGTLink connection
335pub struct AsyncIgtlConnectionWriter {
336    writer: tokio::net::tcp::OwnedWriteHalf,
337}
338
339impl AsyncIgtlConnectionWriter {
340    /// Send a message to the write half
341    pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
342        let data = msg.encode()?;
343        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
344
345        debug!(
346            msg_type = msg_type,
347            size = data.len(),
348            "Sending message (async connection writer)"
349        );
350
351        self.writer.write_all(&data).await?;
352        self.writer.flush().await?;
353
354        trace!(
355            msg_type = msg_type,
356            bytes_sent = data.len(),
357            "Message sent (async connection writer)"
358        );
359
360        Ok(())
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use crate::protocol::types::StatusMessage;
368    use tokio::time::Duration;
369
370    #[tokio::test]
371    async fn test_async_server_bind() {
372        let server = AsyncIgtlServer::bind("127.0.0.1:0").await;
373        assert!(server.is_ok());
374    }
375
376    #[tokio::test]
377    async fn test_async_server_local_addr() {
378        let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
379        let addr = server.local_addr().unwrap();
380        assert_eq!(addr.ip().to_string(), "127.0.0.1");
381    }
382
383    #[tokio::test]
384    async fn test_async_server_client_communication() {
385        // Create server
386        let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
387        let addr = server.local_addr().unwrap();
388
389        // Spawn server task
390        tokio::spawn(async move {
391            let mut conn = server.accept().await.unwrap();
392
393            // Receive message
394            let msg: IgtlMessage<StatusMessage> = conn.receive().await.unwrap();
395            assert_eq!(msg.content.status_string, "Hello from client");
396
397            // Send response
398            let response = StatusMessage::ok("Hello from server");
399            let response_msg = IgtlMessage::new(response, "Server").unwrap();
400            conn.send(&response_msg).await.unwrap();
401        });
402
403        tokio::time::sleep(Duration::from_millis(10)).await;
404
405        // Connect client
406        use crate::io::ClientBuilder;
407        let mut client = ClientBuilder::new()
408            .tcp(addr.to_string())
409            .async_mode()
410            .build()
411            .await
412            .unwrap();
413
414        // Send message
415        let status = StatusMessage::ok("Hello from client");
416        let msg = IgtlMessage::new(status, "Client").unwrap();
417        client.send(&msg).await.unwrap();
418
419        // Receive response
420        let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
421        assert_eq!(response.content.status_string, "Hello from server");
422    }
423
424    #[tokio::test]
425    async fn test_async_connection_split() {
426        let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
427        let addr = server.local_addr().unwrap();
428
429        tokio::spawn(async move {
430            let conn = server.accept().await.unwrap();
431            let (mut reader, mut writer) = conn.into_split();
432
433            // Receive and echo back
434            let msg: IgtlMessage<StatusMessage> = reader.receive().await.unwrap();
435            let echo = IgtlMessage::new(msg.content, "Echo").unwrap();
436            writer.send(&echo).await.unwrap();
437        });
438
439        tokio::time::sleep(Duration::from_millis(10)).await;
440
441        use crate::io::ClientBuilder;
442        let mut client = ClientBuilder::new()
443            .tcp(addr.to_string())
444            .async_mode()
445            .build()
446            .await
447            .unwrap();
448
449        let status = StatusMessage::ok("Echo test");
450        let msg = IgtlMessage::new(status, "Client").unwrap();
451        client.send(&msg).await.unwrap();
452
453        let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
454        assert_eq!(response.content.status_string, "Echo test");
455    }
456}