use crate::error::Result;
use crate::protocol::header::Header;
use crate::protocol::message::{IgtlMessage, Message};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{debug, info, trace, warn};
#[deprecated(
since = "0.2.0",
note = "Use ClientBuilder instead: ClientBuilder::new().tcp(addr).async_mode().build().await"
)]
pub struct AsyncIgtlClient {
stream: TcpStream,
verify_crc: bool,
}
impl AsyncIgtlClient {
pub async fn connect(addr: &str) -> Result<Self> {
info!(addr = %addr, "Connecting to OpenIGTLink server (async)");
let stream = TcpStream::connect(addr).await?;
let local_addr = stream.local_addr()?;
info!(
local_addr = %local_addr,
remote_addr = %addr,
"Connected to OpenIGTLink server (async)"
);
Ok(AsyncIgtlClient {
stream,
verify_crc: true,
})
}
pub fn set_verify_crc(&mut self, verify: bool) {
if verify != self.verify_crc {
info!(verify = verify, "CRC verification setting changed");
if !verify {
warn!("CRC verification disabled - use only in trusted environments");
}
}
self.verify_crc = verify;
}
pub fn verify_crc(&self) -> bool {
self.verify_crc
}
pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
let data = msg.encode()?;
let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
debug!(
msg_type = msg_type,
device_name = device_name,
size = data.len(),
"Sending message (async)"
);
self.stream.write_all(&data).await?;
self.stream.flush().await?;
trace!(
msg_type = msg_type,
bytes_sent = data.len(),
"Message sent successfully (async)"
);
Ok(())
}
pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
trace!("Waiting for message header (async)");
let mut header_buf = vec![0u8; Header::SIZE];
self.stream.read_exact(&mut header_buf).await?;
let header = Header::decode(&header_buf)?;
let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
debug!(
msg_type = msg_type,
device_name = device_name,
body_size = header.body_size,
version = header.version,
"Received message header (async)"
);
let mut body_buf = vec![0u8; header.body_size as usize];
self.stream.read_exact(&mut body_buf).await?;
trace!(
msg_type = msg_type,
bytes_read = body_buf.len(),
"Message body received (async)"
);
let mut full_msg = header_buf;
full_msg.extend_from_slice(&body_buf);
let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
match &result {
Ok(_) => {
debug!(
msg_type = msg_type,
device_name = device_name,
"Message decoded successfully (async)"
);
}
Err(e) => {
warn!(
msg_type = msg_type,
error = %e,
"Failed to decode message (async)"
);
}
}
result
}
pub async fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) -> Result<()> {
debug!(timeout_ms = ?timeout.map(|d| d.as_millis()), "Read timeout not directly supported in async (use tokio::time::timeout)");
Ok(())
}
pub async fn set_write_timeout(
&mut self,
timeout: Option<std::time::Duration>,
) -> Result<()> {
debug!(timeout_ms = ?timeout.map(|d| d.as_millis()), "Write timeout not directly supported in async (use tokio::time::timeout)");
Ok(())
}
pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
self.stream.set_nodelay(nodelay)?;
debug!(nodelay = nodelay, "TCP_NODELAY configured");
Ok(())
}
pub async fn nodelay(&self) -> Result<bool> {
Ok(self.stream.nodelay()?)
}
pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
Ok(self.stream.local_addr()?)
}
pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
Ok(self.stream.peer_addr()?)
}
pub fn into_split(self) -> (AsyncIgtlReader, AsyncIgtlWriter) {
let (reader, writer) = self.stream.into_split();
(
AsyncIgtlReader {
reader,
verify_crc: self.verify_crc,
},
AsyncIgtlWriter { writer },
)
}
}
pub struct AsyncIgtlReader {
reader: tokio::net::tcp::OwnedReadHalf,
verify_crc: bool,
}
impl AsyncIgtlReader {
pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
trace!("Waiting for message header (async reader)");
let mut header_buf = vec![0u8; Header::SIZE];
self.reader.read_exact(&mut header_buf).await?;
let header = Header::decode(&header_buf)?;
let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
debug!(
msg_type = msg_type,
device_name = device_name,
body_size = header.body_size,
"Received message header (async reader)"
);
let mut body_buf = vec![0u8; header.body_size as usize];
self.reader.read_exact(&mut body_buf).await?;
trace!(
msg_type = msg_type,
bytes_read = body_buf.len(),
"Message body received (async reader)"
);
let mut full_msg = header_buf;
full_msg.extend_from_slice(&body_buf);
IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
}
}
pub struct AsyncIgtlWriter {
writer: tokio::net::tcp::OwnedWriteHalf,
}
impl AsyncIgtlWriter {
pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
let data = msg.encode()?;
let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
debug!(
msg_type = msg_type,
size = data.len(),
"Sending message (async writer)"
);
self.writer.write_all(&data).await?;
self.writer.flush().await?;
trace!(
msg_type = msg_type,
bytes_sent = data.len(),
"Message sent (async writer)"
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::types::StatusMessage;
use tokio::time::Duration;
#[tokio::test]
async fn test_async_client_connect_timeout() {
let result = tokio::time::timeout(
Duration::from_millis(100),
AsyncIgtlClient::connect("127.0.0.1:19999"),
)
.await;
assert!(result.is_err() || result.unwrap().is_err());
}
#[tokio::test]
async fn test_async_client_crc_setting() {
let stream = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.unwrap()
.local_addr()
.unwrap();
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(stream).await.unwrap();
let _ = listener.accept().await;
});
tokio::time::sleep(Duration::from_millis(10)).await;
let mut client = AsyncIgtlClient::connect(&stream.to_string())
.await
.unwrap();
assert_eq!(client.verify_crc(), true);
client.set_verify_crc(false);
assert_eq!(client.verify_crc(), false);
client.set_verify_crc(true);
assert_eq!(client.verify_crc(), true);
}
#[tokio::test]
async fn test_async_client_server_communication() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut client = AsyncIgtlClient {
stream,
verify_crc: true,
};
let msg: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
assert_eq!(msg.content.status_string, "Hello");
let response = StatusMessage::ok("World");
let response_msg = IgtlMessage::new(response, "Server").unwrap();
client.send(&response_msg).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(10)).await;
let mut client = AsyncIgtlClient::connect(&addr.to_string())
.await
.unwrap();
let status = StatusMessage::ok("Hello");
let msg = IgtlMessage::new(status, "Client").unwrap();
client.send(&msg).await.unwrap();
let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
assert_eq!(response.content.status_string, "World");
}
}