openigtlink_rust/io/
async_server.rs1use crate::error::Result;
6use crate::protocol::header::Header;
7use crate::protocol::message::{IgtlMessage, Message};
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9use tokio::net::{TcpListener, TcpStream};
10use tracing::{debug, info, trace, warn};
11
12pub struct AsyncIgtlServer {
29 listener: TcpListener,
30}
31
32impl AsyncIgtlServer {
33 pub async fn bind(addr: &str) -> Result<Self> {
55 info!(addr = %addr, "Binding OpenIGTLink server (async)");
56 let listener = TcpListener::bind(addr).await?;
57 let local_addr = listener.local_addr()?;
58 info!(
59 local_addr = %local_addr,
60 "OpenIGTLink server listening (async)"
61 );
62 Ok(AsyncIgtlServer { listener })
63 }
64
65 pub async fn accept(&self) -> Result<AsyncIgtlConnection> {
84 trace!("Waiting for client connection (async)");
85 let (stream, addr) = self.listener.accept().await?;
86 info!(
87 peer_addr = %addr,
88 "Client connected (async)"
89 );
90 Ok(AsyncIgtlConnection {
91 stream,
92 verify_crc: true,
93 })
94 }
95
96 pub fn local_addr(&self) -> Result<std::net::SocketAddr> {
98 Ok(self.listener.local_addr()?)
99 }
100}
101
102pub struct AsyncIgtlConnection {
106 stream: TcpStream,
107 verify_crc: bool,
108}
109
110impl AsyncIgtlConnection {
111 pub fn set_verify_crc(&mut self, verify: bool) {
122 if verify != self.verify_crc {
123 info!(verify = verify, "CRC verification setting changed");
124 if !verify {
125 warn!("CRC verification disabled - use only in trusted environments");
126 }
127 }
128 self.verify_crc = verify;
129 }
130
131 pub fn verify_crc(&self) -> bool {
133 self.verify_crc
134 }
135
136 pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
167 let data = msg.encode()?;
168 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
169 let device_name = msg.header.device_name.as_str().unwrap_or("UNKNOWN");
170
171 debug!(
172 msg_type = msg_type,
173 device_name = device_name,
174 size = data.len(),
175 "Sending message to client (async)"
176 );
177
178 self.stream.write_all(&data).await?;
179 self.stream.flush().await?;
180
181 trace!(
182 msg_type = msg_type,
183 bytes_sent = data.len(),
184 "Message sent successfully (async)"
185 );
186
187 Ok(())
188 }
189
190 pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
216 trace!("Waiting for message header from client (async)");
217
218 let mut header_buf = vec![0u8; Header::SIZE];
219 self.stream.read_exact(&mut header_buf).await?;
220
221 let header = Header::decode(&header_buf)?;
222
223 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
224 let device_name = header.device_name.as_str().unwrap_or("UNKNOWN");
225
226 debug!(
227 msg_type = msg_type,
228 device_name = device_name,
229 body_size = header.body_size,
230 version = header.version,
231 "Received message header from client (async)"
232 );
233
234 let mut body_buf = vec![0u8; header.body_size as usize];
235 self.stream.read_exact(&mut body_buf).await?;
236
237 trace!(
238 msg_type = msg_type,
239 bytes_read = body_buf.len(),
240 "Message body received from client (async)"
241 );
242
243 let mut full_msg = header_buf;
244 full_msg.extend_from_slice(&body_buf);
245
246 let result = IgtlMessage::decode_with_options(&full_msg, self.verify_crc);
247
248 match &result {
249 Ok(_) => {
250 debug!(
251 msg_type = msg_type,
252 device_name = device_name,
253 "Message decoded successfully (async)"
254 );
255 }
256 Err(e) => {
257 warn!(
258 msg_type = msg_type,
259 error = %e,
260 "Failed to decode message from client (async)"
261 );
262 }
263 }
264
265 result
266 }
267
268 pub async fn set_nodelay(&self, nodelay: bool) -> Result<()> {
270 self.stream.set_nodelay(nodelay)?;
271 debug!(nodelay = nodelay, "TCP_NODELAY configured (async)");
272 Ok(())
273 }
274
275 pub async fn nodelay(&self) -> Result<bool> {
277 Ok(self.stream.nodelay()?)
278 }
279
280 pub fn peer_addr(&self) -> Result<std::net::SocketAddr> {
282 Ok(self.stream.peer_addr()?)
283 }
284
285 pub fn into_split(self) -> (AsyncIgtlConnectionReader, AsyncIgtlConnectionWriter) {
289 let (reader, writer) = self.stream.into_split();
290 (
291 AsyncIgtlConnectionReader {
292 reader,
293 verify_crc: self.verify_crc,
294 },
295 AsyncIgtlConnectionWriter { writer },
296 )
297 }
298}
299
300pub struct AsyncIgtlConnectionReader {
302 reader: tokio::net::tcp::OwnedReadHalf,
303 verify_crc: bool,
304}
305
306impl AsyncIgtlConnectionReader {
307 pub async fn receive<T: Message>(&mut self) -> Result<IgtlMessage<T>> {
309 trace!("Waiting for message header (async connection reader)");
310
311 let mut header_buf = vec![0u8; Header::SIZE];
312 self.reader.read_exact(&mut header_buf).await?;
313
314 let header = Header::decode(&header_buf)?;
315
316 let msg_type = header.type_name.as_str().unwrap_or("UNKNOWN");
317
318 debug!(
319 msg_type = msg_type,
320 body_size = header.body_size,
321 "Received message header (async connection reader)"
322 );
323
324 let mut body_buf = vec![0u8; header.body_size as usize];
325 self.reader.read_exact(&mut body_buf).await?;
326
327 let mut full_msg = header_buf;
328 full_msg.extend_from_slice(&body_buf);
329
330 IgtlMessage::decode_with_options(&full_msg, self.verify_crc)
331 }
332}
333
334pub struct AsyncIgtlConnectionWriter {
336 writer: tokio::net::tcp::OwnedWriteHalf,
337}
338
339impl AsyncIgtlConnectionWriter {
340 pub async fn send<T: Message>(&mut self, msg: &IgtlMessage<T>) -> Result<()> {
342 let data = msg.encode()?;
343 let msg_type = msg.header.type_name.as_str().unwrap_or("UNKNOWN");
344
345 debug!(
346 msg_type = msg_type,
347 size = data.len(),
348 "Sending message (async connection writer)"
349 );
350
351 self.writer.write_all(&data).await?;
352 self.writer.flush().await?;
353
354 trace!(
355 msg_type = msg_type,
356 bytes_sent = data.len(),
357 "Message sent (async connection writer)"
358 );
359
360 Ok(())
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use crate::protocol::types::StatusMessage;
368 use tokio::time::Duration;
369
370 #[tokio::test]
371 async fn test_async_server_bind() {
372 let server = AsyncIgtlServer::bind("127.0.0.1:0").await;
373 assert!(server.is_ok());
374 }
375
376 #[tokio::test]
377 async fn test_async_server_local_addr() {
378 let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
379 let addr = server.local_addr().unwrap();
380 assert_eq!(addr.ip().to_string(), "127.0.0.1");
381 }
382
383 #[tokio::test]
384 async fn test_async_server_client_communication() {
385 let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
387 let addr = server.local_addr().unwrap();
388
389 tokio::spawn(async move {
391 let mut conn = server.accept().await.unwrap();
392
393 let msg: IgtlMessage<StatusMessage> = conn.receive().await.unwrap();
395 assert_eq!(msg.content.status_string, "Hello from client");
396
397 let response = StatusMessage::ok("Hello from server");
399 let response_msg = IgtlMessage::new(response, "Server").unwrap();
400 conn.send(&response_msg).await.unwrap();
401 });
402
403 tokio::time::sleep(Duration::from_millis(10)).await;
404
405 use crate::io::ClientBuilder;
407 let mut client = ClientBuilder::new()
408 .tcp(addr.to_string())
409 .async_mode()
410 .build()
411 .await
412 .unwrap();
413
414 let status = StatusMessage::ok("Hello from client");
416 let msg = IgtlMessage::new(status, "Client").unwrap();
417 client.send(&msg).await.unwrap();
418
419 let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
421 assert_eq!(response.content.status_string, "Hello from server");
422 }
423
424 #[tokio::test]
425 async fn test_async_connection_split() {
426 let server = AsyncIgtlServer::bind("127.0.0.1:0").await.unwrap();
427 let addr = server.local_addr().unwrap();
428
429 tokio::spawn(async move {
430 let conn = server.accept().await.unwrap();
431 let (mut reader, mut writer) = conn.into_split();
432
433 let msg: IgtlMessage<StatusMessage> = reader.receive().await.unwrap();
435 let echo = IgtlMessage::new(msg.content, "Echo").unwrap();
436 writer.send(&echo).await.unwrap();
437 });
438
439 tokio::time::sleep(Duration::from_millis(10)).await;
440
441 use crate::io::ClientBuilder;
442 let mut client = ClientBuilder::new()
443 .tcp(addr.to_string())
444 .async_mode()
445 .build()
446 .await
447 .unwrap();
448
449 let status = StatusMessage::ok("Echo test");
450 let msg = IgtlMessage::new(status, "Client").unwrap();
451 client.send(&msg).await.unwrap();
452
453 let response: IgtlMessage<StatusMessage> = client.receive().await.unwrap();
454 assert_eq!(response.content.status_string, "Echo test");
455 }
456}