use openigtlink_rust::error::Result;
use openigtlink_rust::io::AsyncIgtlServer;
use openigtlink_rust::protocol::message::IgtlMessage;
use openigtlink_rust::protocol::types::{CapabilityMessage, StatusMessage};
use openigtlink_rust::protocol::AnyMessage;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::time::{interval, Duration};
static CLIENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[tokio::main]
async fn main() {
if let Err(e) = run_server().await {
eprintln!("[ERROR] Server failed: {}", e);
std::process::exit(1);
}
}
async fn run_server() -> Result<()> {
let addr = "127.0.0.1:18944";
let server = AsyncIgtlServer::bind(addr).await?;
println!("=== OpenIGTLink Async Server (AnyMessage) ===\n");
println!("[INFO] Configuration:");
println!(" Address: {}", addr);
println!(" Runtime: Tokio (async/await)");
println!(" Concurrency: Unlimited clients");
println!(" I/O Mode: Non-blocking");
println!(" Message Handling: Dynamic (AnyMessage)\n");
let active_clients = Arc::new(AtomicUsize::new(0));
let stats_clients = active_clients.clone();
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(10));
loop {
ticker.tick().await;
let count = stats_clients.load(Ordering::Relaxed);
if count > 0 {
println!("[STATS] Active clients: {}", count);
}
}
});
println!("[INFO] Server ready, waiting for connections...\n");
loop {
let mut conn = server.accept().await?;
let client_id = CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed) + 1;
let clients = active_clients.clone();
println!("[CONNECT] Client #{} connected", client_id);
clients.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
let result = handle_client(&mut conn, client_id).await;
clients.fetch_sub(1, Ordering::Relaxed);
match result {
Ok(_) => println!("[DISCONNECT] Client #{} disconnected gracefully", client_id),
Err(e) => eprintln!("[ERROR] Client #{} error: {}", client_id, e),
}
});
}
}
async fn handle_client(
conn: &mut openigtlink_rust::io::AsyncIgtlConnection,
client_id: usize,
) -> Result<()> {
println!(" [#{}] Handler started", client_id);
let mut message_count = 0;
loop {
match conn.receive_any().await {
Ok(msg) => {
message_count += 1;
let msg_type = msg.message_type();
let device_name = msg.device_name().unwrap_or("Unknown");
println!(
" [#{}] Message #{}: {} from '{}'",
client_id, message_count, msg_type, device_name
);
match msg {
AnyMessage::Transform(transform_msg) => {
println!(
" Matrix (first row): [{:.2}, {:.2}, {:.2}, {:.2}]",
transform_msg.content.matrix[0][0],
transform_msg.content.matrix[0][1],
transform_msg.content.matrix[0][2],
transform_msg.content.matrix[0][3]
);
let status = StatusMessage::ok(&format!(
"Transform #{} processed by client #{}",
message_count, client_id
));
let response = IgtlMessage::new(status, "AsyncServer")?;
conn.send(&response).await?;
println!(" [#{}] Sent STATUS acknowledgment", client_id);
}
AnyMessage::Status(status_msg) => {
println!(
" Code: {}, Message: '{}'",
status_msg.content.code, status_msg.content.status_string
);
let capability = CapabilityMessage::new(vec![
"TRANSFORM".to_string(),
"STATUS".to_string(),
"CAPABILITY".to_string(),
"IMAGE".to_string(),
"POSITION".to_string(),
]);
let response = IgtlMessage::new(capability, "AsyncServer")?;
conn.send(&response).await?;
println!(" [#{}] Sent CAPABILITY response", client_id);
}
AnyMessage::Capability(cap_msg) => {
println!(
" Client capabilities ({} types):",
cap_msg.content.types.len()
);
for (i, typ) in cap_msg.content.types.iter().enumerate() {
println!(" {}. {}", i + 1, typ);
}
println!(" [#{}] Session completed", client_id);
break;
}
AnyMessage::Image(img_msg) => {
println!(
" Image: {}x{}x{}, {} bytes",
img_msg.content.size[0],
img_msg.content.size[1],
img_msg.content.size[2],
img_msg.content.data.len()
);
let status = StatusMessage::ok("Image received");
let response = IgtlMessage::new(status, "AsyncServer")?;
conn.send(&response).await?;
println!(" [#{}] Sent STATUS acknowledgment", client_id);
}
AnyMessage::Position(pos_msg) => {
println!(
" Position: ({:.2}, {:.2}, {:.2})",
pos_msg.content.position[0],
pos_msg.content.position[1],
pos_msg.content.position[2]
);
let status = StatusMessage::ok("Position received");
let response = IgtlMessage::new(status, "AsyncServer")?;
conn.send(&response).await?;
println!(" [#{}] Sent STATUS acknowledgment", client_id);
}
_ => {
println!(" (Generic handler)");
let status = StatusMessage::ok(&format!("{} received", msg_type));
let response = IgtlMessage::new(status, "AsyncServer")?;
conn.send(&response).await?;
println!(" [#{}] Sent STATUS acknowledgment", client_id);
}
}
}
Err(e) => {
eprintln!(" [#{}] Connection closed or error: {}", client_id, e);
break;
}
}
}
println!(
" [#{}] Processed {} messages total",
client_id, message_count
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_server_bind() {
let result = AsyncIgtlServer::bind("127.0.0.1:0").await;
assert!(result.is_ok());
}
#[test]
fn test_client_counter() {
let count1 = CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed);
let count2 = CLIENT_COUNTER.fetch_add(1, Ordering::Relaxed);
assert!(count2 > count1);
}
}