openigtlink_rust/io/
async_client.rs

1//! Asynchronous OpenIGTLink client implementation
2//!
3//! Provides a non-blocking, async/await-based client for OpenIGTLink communication.
4//! Uses Tokio for async I/O operations.
5
6use crate::error::Result;
7use crate::protocol::header::Header;
8use crate::protocol::message::{IgtlMessage, Message};
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::TcpStream;
11use tracing::{debug, info, trace, warn};
12
13/// Asynchronous OpenIGTLink client
14///
15/// Uses non-blocking I/O with Tokio for high-concurrency scenarios.
16///
17/// # Examples
18///
19/// ```no_run
20/// use openigtlink_rust::io::AsyncIgtlClient;
21/// use openigtlink_rust::protocol::types::StatusMessage;
22/// use openigtlink_rust::protocol::message::IgtlMessage;
23///
24/// #[tokio::main]
25/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
26///     let mut client = AsyncIgtlClient::connect("127.0.0.1:18944").await?;
27///
28///     let status = StatusMessage::ok("Hello");
29///     let msg = IgtlMessage::new(status, "AsyncClient")?;
30///     client.send(&msg).await?;
31///
32///     Ok(())
33/// }
34/// ```
35#[deprecated(
36    since = "0.2.0",
37    note = "Use ClientBuilder instead: ClientBuilder::new().tcp(addr).async_mode().build().await"
38)]
39pub struct AsyncIgtlClient {
40    stream: TcpStream,
41    verify_crc: bool,
42}
43
44impl AsyncIgtlClient {
45    /// Connect to an OpenIGTLink server asynchronously
46    ///
47    /// # Arguments
48    ///
49    /// * `addr` - Server address (e.g., "127.0.0.1:18944")
50    ///
51    /// # Errors
52    ///
53    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Failed to connect
54    ///
55    /// # Examples
56    ///
57    /// ```no_run
58    /// use openigtlink_rust::io::AsyncIgtlClient;
59    ///
60    /// #[tokio::main]
61    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
62    ///     let client = AsyncIgtlClient::connect("127.0.0.1:18944").await?;
63    ///     Ok(())
64    /// }
65    /// ```
66    pub async fn connect(addr: &str) -> Result<Self> {
67        info!(addr = %addr, "Connecting to OpenIGTLink server (async)");
68        let stream = TcpStream::connect(addr).await?;
69        let local_addr = stream.local_addr()?;
70        info!(
71            local_addr = %local_addr,
72            remote_addr = %addr,
73            "Connected to OpenIGTLink server (async)"
74        );
75        Ok(AsyncIgtlClient {
76            stream,
77            verify_crc: true,
78        })
79    }
80
81    /// Enable or disable CRC verification for received messages
82    ///
83    /// # Arguments
84    ///
85    /// * `verify` - true to enable CRC verification (default), false to disable
86    ///
87    /// # Safety
88    ///
89    /// Disabling CRC verification should only be done in trusted environments
90    /// where data corruption is unlikely (e.g., loopback, local network).
91    pub fn set_verify_crc(&mut self, verify: bool) {
92        if verify != self.verify_crc {
93            info!(verify = verify, "CRC verification setting changed");
94            if !verify {
95                warn!("CRC verification disabled - use only in trusted environments");
96            }
97        }
98        self.verify_crc = verify;
99    }
100
101    /// Get current CRC verification setting
102    pub fn verify_crc(&self) -> bool {
103        self.verify_crc
104    }
105
106    /// Send a message to the server asynchronously
107    ///
108    /// # Arguments
109    ///
110    /// * `msg` - Message to send
111    ///
112    /// # Errors
113    ///
114    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network write failed
115    /// - [`IgtlError::BodyTooLarge`](crate::error::IgtlError::BodyTooLarge) - Message exceeds maximum size
116    ///
117    /// # Examples
118    ///
119    /// ```no_run
120    /// use openigtlink_rust::io::AsyncIgtlClient;
121    /// use openigtlink_rust::protocol::types::StatusMessage;
122    /// use openigtlink_rust::protocol::message::IgtlMessage;
123    ///
124    /// #[tokio::main]
125    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
126    ///     let mut client = AsyncIgtlClient::connect("127.0.0.1:18944").await?;
127    ///
128    ///     let status = StatusMessage::ok("Ready");
129    ///     let msg = IgtlMessage::new(status, "Client")?;
130    ///     client.send(&msg).await?;
131    ///
132    ///     Ok(())
133    /// }
134    /// ```
135    pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
136        let data = msg.encode()?;
137        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
138        let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
139
140        debug!(
141            msg_type = msg_type,
142            device_name = device_name,
143            size = data.len(),
144            "Sending message (async)"
145        );
146
147        self.stream.write_all(&data).await?;
148        self.stream.flush().await?;
149
150        trace!(
151            msg_type = msg_type,
152            bytes_sent = data.len(),
153            "Message sent successfully (async)"
154        );
155
156        Ok(())
157    }
158
159    /// Receive a message from the server asynchronously
160    ///
161    /// # Errors
162    ///
163    /// - [`IgtlError::Io`](crate::error::IgtlError::Io) - Network read failed
164    /// - [`IgtlError::InvalidHeader`](crate::error::IgtlError::InvalidHeader) - Received malformed header
165    /// - [`IgtlError::CrcMismatch`](crate::error::IgtlError::CrcMismatch) - Data corruption detected
166    /// - [`IgtlError::UnknownMessageType`](crate::error::IgtlError::UnknownMessageType) - Unsupported message type
167    ///
168    /// # Examples
169    ///
170    /// ```no_run
171    /// use openigtlink_rust::io::AsyncIgtlClient;
172    /// use openigtlink_rust::protocol::types::TransformMessage;
173    /// use openigtlink_rust::protocol::message::IgtlMessage;
174    ///
175    /// #[tokio::main]
176    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
177    ///     let mut client = AsyncIgtlClient::connect("127.0.0.1:18944").await?;
178    ///     let msg: IgtlMessage<TransformMessage> = client.receive().await?;
179    ///     Ok(())
180    /// }
181    /// ```
182    pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
183        trace!("Waiting for message header (async)");
184
185        let mut header_buf = vec![0u8; Header::SIZE];
186        self.stream.read_exact(&mut header_buf).await?;
187
188        let header = Header::decode(&header_buf)?;
189
190        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
191        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
192
193        debug!(
194            msg_type = msg_type,
195            device_name = device_name,
196            body_size = header.body_size,
197            version = header.version,
198            "Received message header (async)"
199        );
200
201        let mut body_buf = vec![0u8; header.body_size as usize];
202        self.stream.read_exact(&mut body_buf).await?;
203
204        trace!(
205            msg_type = msg_type,
206            bytes_read = body_buf.len(),
207            "Message body received (async)"
208        );
209
210        let mut full_msg = header_buf;
211        full_msg.extend_from_slice(&body_buf);
212
213        let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
214
215        match &result {
216            Ok(_) => {
217                debug!(
218                    msg_type = msg_type,
219                    device_name = device_name,
220                    "Message decoded successfully (async)"
221                );
222            }
223            Err(e) => {
224                warn!(
225                    msg_type = msg_type,
226                    error = %e,
227                    "Failed to decode message (async)"
228                );
229            }
230        }
231
232        result
233    }
234
235    /// Set read timeout for the underlying TCP stream
236    pub async fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
237        // Tokio doesn't use set_read_timeout, instead use tokio::time::timeout
238        // This is a placeholder for API compatibility
239        debug!(timeout_ms = ?timeout.map(|d| d.as_millis()), "Read timeout not directly supported in async (use tokio::time::timeout)");
240        Ok(())
241    }
242
243    /// Set write timeout for the underlying TCP stream
244    pub async fn set_write_timeout(
245        &mut self,
246        timeout: Option<std::time::Duration>,
247    ) -> Result<()> {
248        // Tokio doesn't use set_write_timeout, instead use tokio::time::timeout
249        // This is a placeholder for API compatibility
250        debug!(timeout_ms = ?timeout.map(|d| d.as_millis()), "Write timeout not directly supported in async (use tokio::time::timeout)");
251        Ok(())
252    }
253
254    /// Enable or disable TCP_NODELAY (Nagle's algorithm)
255    pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
256        self.stream.set_nodelay(nodelay)?;
257        debug!(nodelay = nodelay, "TCP_NODELAY configured");
258        Ok(())
259    }
260
261    /// Get the current TCP_NODELAY setting
262    pub async fn nodelay(&self) -> Result<bool> {
263        Ok(self.stream.nodelay()?)
264    }
265
266    /// Get the local address
267    pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
268        Ok(self.stream.local_addr()?)
269    }
270
271    /// Get the remote peer address
272    pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
273        Ok(self.stream.peer_addr()?)
274    }
275
276    /// Split the client into read and write halves
277    ///
278    /// This allows concurrent reading and writing on separate tasks.
279    ///
280    /// # Examples
281    ///
282    /// ```no_run
283    /// use openigtlink_rust::io::AsyncIgtlClient;
284    ///
285    /// #[tokio::main]
286    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
287    ///     let client = AsyncIgtlClient::connect("127.0.0.1:18944").await?;
288    ///     let (reader, writer) = client.into_split();
289    ///
290    ///     // Use reader and writer in separate tasks
291    ///     Ok(())
292    /// }
293    /// ```
294    pub fn into_split(self) -> (AsyncIgtlReader, AsyncIgtlWriter) {
295        let (reader, writer) = self.stream.into_split();
296        (
297            AsyncIgtlReader {
298                reader,
299                verify_crc: self.verify_crc,
300            },
301            AsyncIgtlWriter { writer },
302        )
303    }
304}
305
306/// Read half of an async OpenIGTLink client
307pub struct AsyncIgtlReader {
308    reader: tokio::net::tcp::OwnedReadHalf,
309    verify_crc: bool,
310}
311
312impl AsyncIgtlReader {
313    /// Receive a message from the read half
314    pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
315        trace!("Waiting for message header (async reader)");
316
317        let mut header_buf = vec![0u8; Header::SIZE];
318        self.reader.read_exact(&mut header_buf).await?;
319
320        let header = Header::decode(&header_buf)?;
321
322        let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
323        let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
324
325        debug!(
326            msg_type = msg_type,
327            device_name = device_name,
328            body_size = header.body_size,
329            "Received message header (async reader)"
330        );
331
332        let mut body_buf = vec![0u8; header.body_size as usize];
333        self.reader.read_exact(&mut body_buf).await?;
334
335        trace!(
336            msg_type = msg_type,
337            bytes_read = body_buf.len(),
338            "Message body received (async reader)"
339        );
340
341        let mut full_msg = header_buf;
342        full_msg.extend_from_slice(&body_buf);
343
344        IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
345    }
346}
347
348/// Write half of an async OpenIGTLink client
349pub struct AsyncIgtlWriter {
350    writer: tokio::net::tcp::OwnedWriteHalf,
351}
352
353impl AsyncIgtlWriter {
354    /// Send a message to the write half
355    pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
356        let data = msg.encode()?;
357        let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
358
359        debug!(
360            msg_type = msg_type,
361            size = data.len(),
362            "Sending message (async writer)"
363        );
364
365        self.writer.write_all(&data).await?;
366        self.writer.flush().await?;
367
368        trace!(
369            msg_type = msg_type,
370            bytes_sent = data.len(),
371            "Message sent (async writer)"
372        );
373
374        Ok(())
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::protocol::types::StatusMessage;
382    use tokio::time::Duration;
383
384    #[tokio::test]
385    async fn test_async_client_connect_timeout() {
386        // Try to connect to a non-existent server with timeout
387        let result = tokio::time::timeout(
388            Duration::from_millis(100),
389            AsyncIgtlClient::connect("127.0.0.1:19999"),
390        )
391        .await;
392
393        // Should timeout or fail to connect
394        assert!(result.is_err() || result.unwrap().is_err());
395    }
396
397    #[tokio::test]
398    async fn test_async_client_crc_setting() {
399        // Test CRC setting without actual connection
400        let stream = tokio::net::TcpListener::bind("127.0.0.1:0")
401            .await
402            .unwrap()
403            .local_addr()
404            .unwrap();
405
406        tokio::spawn(async move {
407            let listener = tokio::net::TcpListener::bind(stream).await.unwrap();
408            let _ = listener.accept().await;
409        });
410
411        tokio::time::sleep(Duration::from_millis(10)).await;
412
413        let mut client = AsyncIgtlClient::connect(&stream.to_string())
414            .await
415            .unwrap();
416        assert_eq!(client.verify_crc(), true);
417
418        client.set_verify_crc(false);
419        assert_eq!(client.verify_crc(), false);
420
421        client.set_verify_crc(true);
422        assert_eq!(client.verify_crc(), true);
423    }
424
425    #[tokio::test]
426    async fn test_async_client_server_communication() {
427        // Create server
428        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
429            .await
430            .unwrap();
431        let addr = listener.local_addr().unwrap();
432
433        // Spawn server task
434        tokio::spawn(async move {
435            let (stream, _) = listener.accept().await.unwrap();
436            let mut client = AsyncIgtlClient {
437                stream,
438                verify_crc: true,
439            };
440
441            // Receive message
442            let msg: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
443            assert_eq!(msg.content.status_string, "Hello");
444
445            // Send response
446            let response = StatusMessage::ok("World");
447            let response_msg = IgtlMessage::new(response, "Server").unwrap();
448            client.send(&response_msg).await.unwrap();
449        });
450
451        tokio::time::sleep(Duration::from_millis(10)).await;
452
453        // Connect client
454        let mut client = AsyncIgtlClient::connect(&addr.to_string())
455            .await
456            .unwrap();
457
458        // Send message
459        let status = StatusMessage::ok("Hello");
460        let msg = IgtlMessage::new(status, "Client").unwrap();
461        client.send(&msg).await.unwrap();
462
463        // Receive response
464        let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
465        assert_eq!(response.content.status_string, "World");
466    }
467}