use std::error::Error;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use netmap_rs::prelude::*;
const PIPE_NAME: &str = "netmap:pipe{intraproc_example_123}"; const NUM_PACKETS: usize = 5;
const PACKET_BASE_PAYLOAD: &[u8] = b"Hello from pipe sender, msg=";
fn sender_thread(
mut tx_pipe_ep: Netmap,
done_tx: mpsc::Sender<String>,
) -> Result<(), String> {
println!("[Sender] Thread started. TX Rings: {}", tx_pipe_ep.num_tx_rings());
if tx_pipe_ep.num_tx_rings() == 0 {
return Err("[Sender] No TX rings available on pipe endpoint.".to_string());
}
let mut tx_ring = tx_pipe_ep
.tx_ring(0)
.map_err(|e| format!("[Sender] Failed to get TX ring: {:?}", e))?;
for i in 0..NUM_PACKETS {
let mut payload = PACKET_BASE_PAYLOAD.to_vec();
payload.extend_from_slice(i.to_string().as_bytes());
print!("[Sender] Sending packet {} ({} bytes): {:?}...", i, payload.len(), std::str::from_utf8(&payload).unwrap_or("non-utf8"));
match tx_ring.send(&payload) {
Ok(_) => {
tx_ring.sync(); println!(" Sent.");
}
Err(e) => {
return Err(format!("[Sender] Failed to send packet {}: {:?}", i, e));
}
}
thread::sleep(Duration::from_millis(10)); }
done_tx.send("[Sender] All packets sent.".to_string()).unwrap();
Ok(())
}
fn receiver_thread(
mut rx_pipe_ep: Netmap,
done_rx: mpsc::Sender<String>,
) -> Result<(), String> {
println!("[Receiver] Thread started. RX Rings: {}", rx_pipe_ep.num_rx_rings());
if rx_pipe_ep.num_rx_rings() == 0 {
return Err("[Receiver] No RX rings available on pipe endpoint.".to_string());
}
let mut rx_ring = rx_pipe_ep
.rx_ring(0)
.map_err(|e| format!("[Receiver] Failed to get RX ring: {:?}", e))?;
let mut packets_received = 0;
let mut missed_count = 0;
const MAX_MISSES: usize = 100;
while packets_received < NUM_PACKETS && missed_count < MAX_MISSES {
rx_ring.sync(); let mut received_in_batch = 0;
while let Some(frame) = rx_ring.recv() {
if frame.is_empty() {
continue;
}
received_in_batch +=1;
let mut expected_payload = PACKET_BASE_PAYLOAD.to_vec();
expected_payload.extend_from_slice(packets_received.to_string().as_bytes());
println!(
"[Receiver] Received packet {} ({} bytes): {:?}",
packets_received, frame.len(), std::str::from_utf8(frame.payload()).unwrap_or("non-utf8")
);
assert_eq!(frame.payload(), expected_payload.as_slice(), "Packet content mismatch!");
packets_received += 1;
if packets_received == NUM_PACKETS {
break;
}
}
if received_in_batch == 0 {
missed_count += 1;
thread::sleep(Duration::from_millis(10)); } else {
missed_count = 0; }
}
if packets_received == NUM_PACKETS {
done_rx.send(format!("[Receiver] Successfully received all {} packets.", NUM_PACKETS)).unwrap();
Ok(())
} else {
Err(format!("[Receiver] Timed out. Received only {} out of {} packets.", packets_received, NUM_PACKETS))
}
}
fn main() -> Result<(), Box<dyn Error>> {
println!("Netmap Pipe Thread-to-Thread Communication Example");
println!("Using pipe name: {}", PIPE_NAME);
let pipe_ep1 = NetmapBuilder::new(PIPE_NAME)
.num_tx_rings(1)
.num_rx_rings(1)
.build()
.map_err(|e| format!("Failed to open pipe endpoint 1 (master): {:?}", e))?;
println!("Pipe endpoint 1 (master) opened. TX rings: {}, RX rings: {}", pipe_ep1.num_tx_rings(), pipe_ep1.num_rx_rings());
let pipe_ep2 = NetmapBuilder::new(PIPE_NAME)
.num_tx_rings(1)
.num_rx_rings(1)
.build()
.map_err(|e| format!("Failed to open pipe endpoint 2 (slave): {:?}", e))?;
println!("Pipe endpoint 2 (slave) opened. TX rings: {}, RX rings: {}", pipe_ep2.num_tx_rings(), pipe_ep2.num_rx_rings());
let (done_tx_s, done_tx_r) = mpsc::channel();
let (done_rx_s, done_rx_r) = mpsc::channel();
let sender_handle = thread::spawn(move || sender_thread(pipe_ep1, done_tx_s));
let receiver_handle = thread::spawn(move || receiver_thread(pipe_ep2, done_rx_s));
match sender_handle.join() {
Ok(Ok(_)) => println!("{}", done_tx_r.recv().unwrap_or_default()),
Ok(Err(e)) => eprintln!("[Main] Sender thread failed: {}", e),
Err(e) => eprintln!("[Main] Sender thread panicked: {:?}", e),
}
match receiver_handle.join() {
Ok(Ok(_)) => println!("{}", done_rx_r.recv().unwrap_or_default()),
Ok(Err(e)) => eprintln!("[Main] Receiver thread failed: {}", e),
Err(e) => eprintln!("[Main] Receiver thread panicked: {:?}", e),
}
println!("Example finished.");
Ok(())
}