openigtlink_rust/io/
async_client.rs1use crate::error::Result;
7use crate::protocol::header::Header;
8use crate::protocol::message::{IgtlMessage, Message};
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::TcpStream;
11use tracing::{debug, info, trace, warn};
12
13#[deprecated(
36 since = "0.2.0",
37 note = "Use ClientBuilder instead: ClientBuilder::new().tcp(addr).async_mode().build().await"
38)]
39pub struct AsyncIgtlClient {
40 stream: TcpStream,
41 verify_crc: bool,
42}
43
44impl AsyncIgtlClient {
45 pub async fn connect(addr: &str) -> Result<Self> {
67 info!(addr = %addr, "Connecting to OpenIGTLink server (async)");
68 let stream = TcpStream::connect(addr).await?;
69 let local_addr = stream.local_addr()?;
70 info!(
71 local_addr = %local_addr,
72 remote_addr = %addr,
73 "Connected to OpenIGTLink server (async)"
74 );
75 Ok(AsyncIgtlClient {
76 stream,
77 verify_crc: true,
78 })
79 }
80
81 pub fn set_verify_crc(&mut self, verify: bool) {
92 if verify != self.verify_crc {
93 info!(verify = verify, "CRC verification setting changed");
94 if !verify {
95 warn!("CRC verification disabled - use only in trusted environments");
96 }
97 }
98 self.verify_crc = verify;
99 }
100
101 pub fn verify_crc(&self) -> bool {
103 self.verify_crc
104 }
105
106 pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
136 let data = msg.encode()?;
137 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
138 let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
139
140 debug!(
141 msg_type = msg_type,
142 device_name = device_name,
143 size = data.len(),
144 "Sending message (async)"
145 );
146
147 self.stream.write_all(&data).await?;
148 self.stream.flush().await?;
149
150 trace!(
151 msg_type = msg_type,
152 bytes_sent = data.len(),
153 "Message sent successfully (async)"
154 );
155
156 Ok(())
157 }
158
159 pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
183 trace!("Waiting for message header (async)");
184
185 let mut header_buf = vec![0u8; Header::SIZE];
186 self.stream.read_exact(&mut header_buf).await?;
187
188 let header = Header::decode(&header_buf)?;
189
190 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
191 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
192
193 debug!(
194 msg_type = msg_type,
195 device_name = device_name,
196 body_size = header.body_size,
197 version = header.version,
198 "Received message header (async)"
199 );
200
201 let mut body_buf = vec![0u8; header.body_size as usize];
202 self.stream.read_exact(&mut body_buf).await?;
203
204 trace!(
205 msg_type = msg_type,
206 bytes_read = body_buf.len(),
207 "Message body received (async)"
208 );
209
210 let mut full_msg = header_buf;
211 full_msg.extend_from_slice(&body_buf);
212
213 let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
214
215 match &result {
216 Ok(_) => {
217 debug!(
218 msg_type = msg_type,
219 device_name = device_name,
220 "Message decoded successfully (async)"
221 );
222 }
223 Err(e) => {
224 warn!(
225 msg_type = msg_type,
226 error = %e,
227 "Failed to decode message (async)"
228 );
229 }
230 }
231
232 result
233 }
234
235 pub async fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
237 debug!(timeout_ms = ?timeout.map(|d| d.as_millis()), "Read timeout not directly supported in async (use tokio::time::timeout)");
240 Ok(())
241 }
242
243 pub async fn set_write_timeout(
245 &mut self,
246 timeout: Option<std::time::Duration>,
247 ) -> Result<()> {
248 debug!(timeout_ms = ?timeout.map(|d| d.as_millis()), "Write timeout not directly supported in async (use tokio::time::timeout)");
251 Ok(())
252 }
253
254 pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
256 self.stream.set_nodelay(nodelay)?;
257 debug!(nodelay = nodelay, "TCP_NODELAY configured");
258 Ok(())
259 }
260
261 pub async fn nodelay(&self) -> Result<bool> {
263 Ok(self.stream.nodelay()?)
264 }
265
266 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
268 Ok(self.stream.local_addr()?)
269 }
270
271 pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
273 Ok(self.stream.peer_addr()?)
274 }
275
276 pub fn into_split(self) -> (AsyncIgtlReader, AsyncIgtlWriter) {
295 let (reader, writer) = self.stream.into_split();
296 (
297 AsyncIgtlReader {
298 reader,
299 verify_crc: self.verify_crc,
300 },
301 AsyncIgtlWriter { writer },
302 )
303 }
304}
305
306pub struct AsyncIgtlReader {
308 reader: tokio::net::tcp::OwnedReadHalf,
309 verify_crc: bool,
310}
311
312impl AsyncIgtlReader {
313 pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
315 trace!("Waiting for message header (async reader)");
316
317 let mut header_buf = vec![0u8; Header::SIZE];
318 self.reader.read_exact(&mut header_buf).await?;
319
320 let header = Header::decode(&header_buf)?;
321
322 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
323 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
324
325 debug!(
326 msg_type = msg_type,
327 device_name = device_name,
328 body_size = header.body_size,
329 "Received message header (async reader)"
330 );
331
332 let mut body_buf = vec![0u8; header.body_size as usize];
333 self.reader.read_exact(&mut body_buf).await?;
334
335 trace!(
336 msg_type = msg_type,
337 bytes_read = body_buf.len(),
338 "Message body received (async reader)"
339 );
340
341 let mut full_msg = header_buf;
342 full_msg.extend_from_slice(&body_buf);
343
344 IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
345 }
346}
347
348pub struct AsyncIgtlWriter {
350 writer: tokio::net::tcp::OwnedWriteHalf,
351}
352
353impl AsyncIgtlWriter {
354 pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
356 let data = msg.encode()?;
357 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
358
359 debug!(
360 msg_type = msg_type,
361 size = data.len(),
362 "Sending message (async writer)"
363 );
364
365 self.writer.write_all(&data).await?;
366 self.writer.flush().await?;
367
368 trace!(
369 msg_type = msg_type,
370 bytes_sent = data.len(),
371 "Message sent (async writer)"
372 );
373
374 Ok(())
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::protocol::types::StatusMessage;
382 use tokio::time::Duration;
383
384 #[tokio::test]
385 async fn test_async_client_connect_timeout() {
386 let result = tokio::time::timeout(
388 Duration::from_millis(100),
389 AsyncIgtlClient::connect("127.0.0.1:19999"),
390 )
391 .await;
392
393 assert!(result.is_err() || result.unwrap().is_err());
395 }
396
397 #[tokio::test]
398 async fn test_async_client_crc_setting() {
399 let stream = tokio::net::TcpListener::bind("127.0.0.1:0")
401 .await
402 .unwrap()
403 .local_addr()
404 .unwrap();
405
406 tokio::spawn(async move {
407 let listener = tokio::net::TcpListener::bind(stream).await.unwrap();
408 let _ = listener.accept().await;
409 });
410
411 tokio::time::sleep(Duration::from_millis(10)).await;
412
413 let mut client = AsyncIgtlClient::connect(&stream.to_string())
414 .await
415 .unwrap();
416 assert_eq!(client.verify_crc(), true);
417
418 client.set_verify_crc(false);
419 assert_eq!(client.verify_crc(), false);
420
421 client.set_verify_crc(true);
422 assert_eq!(client.verify_crc(), true);
423 }
424
425 #[tokio::test]
426 async fn test_async_client_server_communication() {
427 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
429 .await
430 .unwrap();
431 let addr = listener.local_addr().unwrap();
432
433 tokio::spawn(async move {
435 let (stream, _) = listener.accept().await.unwrap();
436 let mut client = AsyncIgtlClient {
437 stream,
438 verify_crc: true,
439 };
440
441 let msg: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
443 assert_eq!(msg.content.status_string, "Hello");
444
445 let response = StatusMessage::ok("World");
447 let response_msg = IgtlMessage::new(response, "Server").unwrap();
448 client.send(&response_msg).await.unwrap();
449 });
450
451 tokio::time::sleep(Duration::from_millis(10)).await;
452
453 let mut client = AsyncIgtlClient::connect(&addr.to_string())
455 .await
456 .unwrap();
457
458 let status = StatusMessage::ok("Hello");
460 let msg = IgtlMessage::new(status, "Client").unwrap();
461 client.send(&msg).await.unwrap();
462
463 let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
465 assert_eq!(response.content.status_string, "World");
466 }
467}