#![cfg(feature = "tokio-async")]
use std::error::Error;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use netmap_rs::NetmapBuilder;
use netmap_rs::tokio_async::{TokioNetmap, AsyncNetmapRxRing, AsyncNetmapTxRing};
const ASYNC_PIPE_NAME: &str = "netmap:pipe{tokio_async_example_789}";
const ASYNC_NUM_PACKETS: usize = 5;
const ASYNC_PACKET_SIZE: usize = 60;
async fn sender_task(mut tx_ring: AsyncNetmapTxRing) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("[Async Sender] Task started.");
for i in 0..ASYNC_NUM_PACKETS {
let mut payload = format!("AsyncPacket #{}", i).into_bytes();
payload.resize(ASYNC_PACKET_SIZE, 0);
print!("[Async Sender] Sending packet #{} ({} bytes)...", i, payload.len());
tx_ring.write_all(&payload).await?;
tx_ring.flush().await?;
println!(" Sent.");
tokio::time::sleep(Duration::from_millis(50)).await; }
println!("[Async Sender] All packets sent and flushed.");
Ok(())
}
async fn receiver_task(mut rx_ring: AsyncNetmapRxRing) -> Result<(), Box<dyn Error + Send + Sync>> {
println!("[Async Receiver] Task started. Waiting for packets...");
let mut packets_received = 0;
let mut receive_buffer = vec![0u8; ASYNC_PACKET_SIZE * 2];
while packets_received < ASYNC_NUM_PACKETS {
print!("[Async Receiver] Attempting to read packet #{}...", packets_received);
match rx_ring.read(&mut receive_buffer).await {
Ok(0) => {
println!(" EOF received. Assuming sender finished.");
break;
}
Ok(n) => {
let received_payload = &receive_buffer[..n];
println!(
" Received {} bytes: '{}'",
n,
String::from_utf8_lossy(&received_payload[..std::cmp::min(n, 20)]) );
let expected_prefix = format!("AsyncPacket #{}", packets_received);
if !received_payload.starts_with(expected_prefix.as_bytes()) {
eprintln!(
"[Async Receiver] Payload mismatch! Expected prefix: '{}', Got: '{}'",
expected_prefix,
String::from_utf8_lossy(&received_payload[..std::cmp::min(n, expected_prefix.len())])
);
}
packets_received += 1;
}
Err(e) => {
eprintln!("[Async Receiver] Error reading from ring: {}", e);
return Err(Box::new(e));
}
}
}
if packets_received >= ASYNC_NUM_PACKETS {
println!("[Async Receiver] Successfully received all {} packets.", ASYNC_NUM_PACKETS);
} else {
println!("[Async Receiver] Finished. Received {} out of {} packets.", packets_received, ASYNC_NUM_PACKETS);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!(
"Tokio Async Netmap Pipe Example using '{}'",
ASYNC_PIPE_NAME
);
let netmap_a = NetmapBuilder::new(ASYNC_PIPE_NAME)
.num_tx_rings(1)
.num_rx_rings(1) .build()
.map_err(|e| format!("Failed to open pipe endpoint A (master): {:?}", e))?;
let netmap_b = NetmapBuilder::new(ASYNC_PIPE_NAME)
.num_tx_rings(1)
.num_rx_rings(1)
.build()
.map_err(|e| format!("Failed to open pipe endpoint B (slave): {:?}", e))?;
let tokio_netmap_a = TokioNetmap::new(netmap_a)?;
let tokio_netmap_b = TokioNetmap::new(netmap_b)?;
let async_tx_a = tokio_netmap_a.tx_ring(0)?;
let async_rx_b = tokio_netmap_b.rx_ring(0)?;
println!("[Main] Pipe endpoints created and wrapped for Tokio.");
let sender_handle = tokio::spawn(sender_task(async_tx_a));
let receiver_handle = tokio::spawn(receiver_task(async_rx_b));
let sender_result = sender_handle.await?;
if let Err(e) = sender_result {
eprintln!("[Main] Sender task failed: {}", e);
}
let receiver_result = receiver_handle.await?;
if let Err(e) = receiver_result {
eprintln!("[Main] Receiver task failed: {}", e);
}
println!("[Main] Example finished.");
Ok(())
}