openigtlink_rust/io/
async_server.rs1use crate::error::Result;
6use crate::protocol::header::Header;
7use crate::protocol::message::{IgtlMessage, Message};
8use crate::protocol::AnyMessage;
9use tokio::io::{AsyncReadExt, AsyncWriteExt};
10use tokio::net::{TcpListener, TcpStream};
11use tracing::{debug, info, trace, warn};
12
13pub struct AsyncIgtlServer {
30 listener: TcpListener,
31}
32
33impl AsyncIgtlServer {
34 pub async fn bind(addr: &str) -> Result<Self> {
56 info!(addr = %addr, "Binding OpenIGTLink server (async)");
57 let listener = TcpListener::bind(addr).await?;
58 let local_addr = listener.local_addr()?;
59 info!(
60 local_addr = %local_addr,
61 "OpenIGTLink server listening (async)"
62 );
63 Ok(AsyncIgtlServer { listener })
64 }
65
66 pub async fn accept(&self) -> Result<AsyncIgtlConnection> {
85 trace!("Waiting for client connection (async)");
86 let (stream, addr) = self.listener.accept().await?;
87 info!(
88 peer_addr = %addr,
89 "Client connected (async)"
90 );
91 Ok(AsyncIgtlConnection {
92 stream,
93 verify_crc: true,
94 })
95 }
96
97 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
99 Ok(self.listener.local_addr()?)
100 }
101}
102
103pub struct AsyncIgtlConnection {
107 stream: TcpStream,
108 verify_crc: bool,
109}
110
111impl AsyncIgtlConnection {
112 pub fn set_verify_crc(&mut self, verify: bool) {
123 if verify != self.verify_crc {
124 info!(verify = verify, "CRC verification setting changed");
125 if !verify {
126 warn!("CRC verification disabled - use only in trusted environments");
127 }
128 }
129 self.verify_crc = verify;
130 }
131
132 pub fn verify_crc(&self) -> bool {
134 self.verify_crc
135 }
136
137 pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
168 let data = msg.encode()?;
169 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
170 let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
171
172 debug!(
173 msg_type = msg_type,
174 device_name = device_name,
175 size = data.len(),
176 "Sending message to client (async)"
177 );
178
179 self.stream.write_all(&data).await?;
180 self.stream.flush().await?;
181
182 trace!(
183 msg_type = msg_type,
184 bytes_sent = data.len(),
185 "Message sent successfully (async)"
186 );
187
188 Ok(())
189 }
190
191 pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
217 trace!("Waiting for message header from client (async)");
218
219 let mut header_buf = vec![0u8; Header::SIZE];
220 self.stream.read_exact(&mut header_buf).await?;
221
222 let header = Header::decode(&header_buf)?;
223
224 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
225 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
226
227 debug!(
228 msg_type = msg_type,
229 device_name = device_name,
230 body_size = header.body_size,
231 version = header.version,
232 "Received message header from client (async)"
233 );
234
235 let mut body_buf = vec![0u8; header.body_size as usize];
236 self.stream.read_exact(&mut body_buf).await?;
237
238 trace!(
239 msg_type = msg_type,
240 bytes_read = body_buf.len(),
241 "Message body received from client (async)"
242 );
243
244 let mut full_msg = header_buf;
245 full_msg.extend_from_slice(&body_buf);
246
247 let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
248
249 match &result {
250 Ok(_) => {
251 debug!(
252 msg_type = msg_type,
253 device_name = device_name,
254 "Message decoded successfully (async)"
255 );
256 }
257 Err(e) => {
258 warn!(
259 msg_type = msg_type,
260 error = %e,
261 "Failed to decode message from client (async)"
262 );
263 }
264 }
265
266 result
267 }
268
269 pub async fn receive_any(&mut self) -> Result<AnyMessage> {
302 trace!("Waiting for any message type from client (async)");
303
304 let mut header_buf = vec![0u8; Header::SIZE];
305 self.stream.read_exact(&mut header_buf).await?;
306
307 let header = Header::decode(&header_buf)?;
308
309 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
310 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
311
312 debug!(
313 msg_type = msg_type,
314 device_name = device_name,
315 body_size = header.body_size,
316 version = header.version,
317 "Received message header from client (async)"
318 );
319
320 let mut body_buf = vec![0u8; header.body_size as usize];
321 self.stream.read_exact(&mut body_buf).await?;
322
323 trace!(
324 msg_type = msg_type,
325 bytes_read = body_buf.len(),
326 "Message body received from client (async)"
327 );
328
329 let mut full_msg = header_buf;
330 full_msg.extend_from_slice(&body_buf);
331
332 let result = AnyMessage::decode_with_options(&full_msg, self.verify_crc);
333
334 match &result {
335 Ok(_) => {
336 debug!(
337 msg_type = msg_type,
338 device_name = device_name,
339 "Message decoded successfully as AnyMessage (async)"
340 );
341 }
342 Err(e) => {
343 warn!(
344 msg_type = msg_type,
345 error = %e,
346 "Failed to decode message from client (async)"
347 );
348 }
349 }
350
351 result
352 }
353
354 pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
356 self.stream.set_nodelay(nodelay)?;
357 debug!(nodelay = nodelay, "TCP_NODELAY configured (async)");
358 Ok(())
359 }
360
361 pub async fn nodelay(&self) -> Result<bool> {
363 Ok(self.stream.nodelay()?)
364 }
365
366 pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
368 Ok(self.stream.peer_addr()?)
369 }
370
371 pub fn into_split(self) -> (AsyncIgtlConnectionReader, AsyncIgtlConnectionWriter) {
375 let (reader, writer) = self.stream.into_split();
376 (
377 AsyncIgtlConnectionReader {
378 reader,
379 verify_crc: self.verify_crc,
380 },
381 AsyncIgtlConnectionWriter { writer },
382 )
383 }
384}
385
386pub struct AsyncIgtlConnectionReader {
388 reader: tokio::net::tcp::OwnedReadHalf,
389 verify_crc: bool,
390}
391
392impl AsyncIgtlConnectionReader {
393 pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
395 trace!("Waiting for message header (async connection reader)");
396
397 let mut header_buf = vec![0u8; Header::SIZE];
398 self.reader.read_exact(&mut header_buf).await?;
399
400 let header = Header::decode(&header_buf)?;
401
402 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
403
404 debug!(
405 msg_type = msg_type,
406 body_size = header.body_size,
407 "Received message header (async connection reader)"
408 );
409
410 let mut body_buf = vec![0u8; header.body_size as usize];
411 self.reader.read_exact(&mut body_buf).await?;
412
413 let mut full_msg = header_buf;
414 full_msg.extend_from_slice(&body_buf);
415
416 IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
417 }
418}
419
420pub struct AsyncIgtlConnectionWriter {
422 writer: tokio::net::tcp::OwnedWriteHalf,
423}
424
425impl AsyncIgtlConnectionWriter {
426 pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
428 let data = msg.encode()?;
429 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
430
431 debug!(
432 msg_type = msg_type,
433 size = data.len(),
434 "Sending message (async connection writer)"
435 );
436
437 self.writer.write_all(&data).await?;
438 self.writer.flush().await?;
439
440 trace!(
441 msg_type = msg_type,
442 bytes_sent = data.len(),
443 "Message sent (async connection writer)"
444 );
445
446 Ok(())
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::protocol::types::StatusMessage;
454 use tokio::time::Duration;
455
456 #[tokio::test]
457 async fn test_async_server_bind() {
458 let server = AsyncIgtlServer::bind("127.0.0.1:0").await;
459 assert!(server.is_ok());
460 }
461
462 #[tokio::test]
463 async fn test_async_server_local_addr() {
464 let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
465 let addr = server.local_addr().unwrap();
466 assert_eq!(addr.ip().to_string(), "127.0.0.1");
467 }
468
469 #[tokio::test]
470 async fn test_async_server_client_communication() {
471 let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
473 let addr = server.local_addr().unwrap();
474
475 tokio::spawn(async move {
477 let mut conn = server.accept().await.unwrap();
478
479 let msg: IgtlMessage<StatusMessage> = conn.receive().await.unwrap();
481 assert_eq!(msg.content.status_string, "Hello from client");
482
483 let response = StatusMessage::ok("Hello from server");
485 let response_msg = IgtlMessage::new(response, "Server").unwrap();
486 conn.send(&response_msg).await.unwrap();
487 });
488
489 tokio::time::sleep(Duration::from_millis(10)).await;
490
491 use crate::io::ClientBuilder;
493 let mut client = ClientBuilder::new()
494 .tcp(addr.to_string())
495 .async_mode()
496 .build()
497 .await
498 .unwrap();
499
500 let status = StatusMessage::ok("Hello from client");
502 let msg = IgtlMessage::new(status, "Client").unwrap();
503 client.send(&msg).await.unwrap();
504
505 let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
507 assert_eq!(response.content.status_string, "Hello from server");
508 }
509
510 #[tokio::test]
511 async fn test_async_connection_split() {
512 let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
513 let addr = server.local_addr().unwrap();
514
515 tokio::spawn(async move {
516 let conn = server.accept().await.unwrap();
517 let (mut reader, mut writer) = conn.into_split();
518
519 let msg: IgtlMessage<StatusMessage> = reader.receive().await.unwrap();
521 let echo = IgtlMessage::new(msg.content, "Echo").unwrap();
522 writer.send(&echo).await.unwrap();
523 });
524
525 tokio::time::sleep(Duration::from_millis(10)).await;
526
527 use crate::io::ClientBuilder;
528 let mut client = ClientBuilder::new()
529 .tcp(addr.to_string())
530 .async_mode()
531 .build()
532 .await
533 .unwrap();
534
535 let status = StatusMessage::ok("Echo test");
536 let msg = IgtlMessage::new(status, "Client").unwrap();
537 client.send(&msg).await.unwrap();
538
539 let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
540 assert_eq!(response.content.status_string, "Echo test");
541 }
542}