use std::error::Error;
use std::os::unix::io::AsRawFd;
use std::time::Duration;
use netmap_rs::prelude::*;
use polling::{Event, Poller};
const PIPE_NAME_POLL: &str = "netmap:pipe{poll_example_456}";
const NUM_PACKETS_TO_SEND: usize = 5;
fn main() -> Result<(), Box<dyn Error>> {
println!("Netmap Polling Example using '{}'", PIPE_NAME_POLL);
let mut pipe_a = NetmapBuilder::new(PIPE_NAME_POLL)
.num_tx_rings(1)
.num_rx_rings(1) .build()
.expect("Failed to open pipe endpoint A");
let mut pipe_b = NetmapBuilder::new(PIPE_NAME_POLL)
.num_tx_rings(1) .num_rx_rings(1)
.build()
.expect("Failed to open pipe endpoint B");
let mut tx_a = pipe_a.tx_ring(0).expect("Pipe A: Failed to get TX ring");
let mut rx_b = pipe_b.rx_ring(0).expect("Pipe B: Failed to get RX ring");
let fd_b = pipe_b.as_raw_fd();
let poller = Poller::new().expect("Failed to create Poller");
poller.add(fd_b, Event::readable(0)).expect("Failed to register fd_b with Poller");
let mut packets_sent = 0;
let mut packets_received = 0;
let mut main_loop_iterations = 0;
let mut events = Vec::new();
println!("Starting event loop. Will send {} packets.", NUM_PACKETS_TO_SEND);
println!("Monitoring pipe_b's fd ({}) for readable events (packets from pipe_a).", fd_b);
loop {
main_loop_iterations += 1;
events.clear();
if packets_sent < NUM_PACKETS_TO_SEND {
let fd_a = pipe_a.as_raw_fd();
poller.add(fd_a, Event::writable(1)).expect("Failed to add fd_a for write polling");
match poller.wait(&mut events, Some(Duration::from_millis(0))) { Ok(_) => {
let mut can_write_to_a = false;
for ev in &events {
if ev.key == 1 && ev.writable { can_write_to_a = true;
break;
}
}
if can_write_to_a || tx_a.num_slots() - (tx_a.head() - tx_a.tail() + tx_a.num_slots() as u32) % tx_a.num_slots() as u32 > 1 { let mut payload = format!("Packet #{}", packets_sent).into_bytes();
payload.resize(60, 0);
match tx_a.send(&payload) {
Ok(_) => {
tx_a.sync(); println!("[Sender A] Sent packet #{} ({} bytes)", packets_sent, payload.len());
packets_sent += 1;
}
Err(Error::InsufficientSpace) => {
println!("[Sender A] TX ring full, will try later.");
}
Err(e) => {
eprintln!("[Sender A] Error sending packet: {:?}", e);
break; }
}
} else {
}
}
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => { }
Err(e) => {
eprintln!("[Sender A] Polling error for fd_a: {:?}", e);
break;
}
}
poller.modify(fd_a, Event::none(1)).expect("Failed to remove interest from fd_a"); }
match poller.wait(&mut events, Some(Duration::from_millis(100))) {
Ok(_) => {
for ev in &events {
if ev.key == 0 { if ev.readable {
rx_b.sync();
let mut received_in_batch = 0;
while let Some(frame) = rx_b.recv() {
if frame.is_empty() { continue; }
received_in_batch += 1;
println!(
"[Receiver B] Received packet #{} ({} bytes): {:?}",
packets_received,
frame.len(),
String::from_utf8_lossy(&frame.payload()[..frame.payload().iter().position(|&x| x == 0).unwrap_or(frame.len())])
);
packets_received += 1;
}
if received_in_batch > 0 {
} else {
}
}
if ev.writable { }
}
}
}
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {
}
Err(e) => {
eprintln!("[Event Loop] Polling error: {:?}", e);
break; }
}
if packets_received >= NUM_PACKETS_TO_SEND && packets_sent >= NUM_PACKETS_TO_SEND {
println!("All {} packets sent and received. Exiting.", NUM_PACKETS_TO_SEND);
break;
}
if main_loop_iterations > (NUM_PACKETS_TO_SEND * 10) + 20 && (packets_received < NUM_PACKETS_TO_SEND) {
println!("Potential stall or slow processing, exiting. Sent: {}, Received: {}", packets_sent, packets_received);
break;
}
}
poller.delete(fd_b).ok();
println!("Example finished. Total iterations: {}", main_loop_iterations);
Ok(())
}