#![cfg(all(unix, feature = "sys"))]
mod test_helpers {
use netmap_rs::prelude::*;
use std::time::{Duration, Instant};
pub const VALE_IF_A: &str = "vale_test_a";
pub const VALE_IF_B: &str = "vale_test_b";
pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(200);
pub fn setup_vale_interface(if_name: &str, num_rings: usize) -> Result<Netmap, Error> {
NetmapBuilder::new(if_name)
.num_tx_rings(num_rings)
.num_rx_rings(num_rings)
.build()
}
pub fn setup_vale_interfaces_pair(
num_rings: usize,
) -> Result<(Netmap, Netmap), Error> {
let nm_a = setup_vale_interface(VALE_IF_A, num_rings)?;
let nm_b = setup_vale_interface(VALE_IF_B, num_rings)?;
Ok((nm_a, nm_b))
}
pub fn send_packet_and_sync(tx_ring: &mut TxRing, payload: &[u8]) -> Result<(), Error> {
tx_ring.send(payload)?;
tx_ring.sync();
Ok(())
}
pub fn receive_packet_timeout(
rx_ring: &mut RxRing,
expected_payload: Option<&[u8]>,
timeout: Duration,
) -> Result<Option<Vec<u8>>, String> {
let start_time = Instant::now();
loop {
if start_time.elapsed() > timeout {
return Ok(None); }
rx_ring.sync(); if let Some(frame) = rx_ring.recv() {
let received_payload = frame.payload().to_vec();
if let Some(expected) = expected_payload {
if received_payload != expected {
return Err(format!(
"Payload mismatch. Expected: {:?}, Got: {:?}",
expected,
received_payload
));
}
}
return Ok(Some(received_payload));
}
std::thread::sleep(Duration::from_micros(50));
}
}
#[test]
fn test_open_host_rings_loopback() {
const LOOPBACK_HOST_IF: &str = "netmap:lo^";
match NetmapBuilder::new(LOOPBACK_HOST_IF)
.num_tx_rings(1) .num_rx_rings(1) .build()
{
Ok(nm) => {
assert!(nm.is_host_if(), "Netmap interface opened with '{}' should be identified as a host interface.", LOOPBACK_HOST_IF);
let num_tx = nm.num_tx_rings();
let num_rx = nm.num_rx_rings();
println!("Interface {}: Host TX rings = {}, Host RX rings = {}", LOOPBACK_HOST_IF, num_tx, num_rx);
assert!(num_tx > 0, "Expected at least one host TX ring on {}.", LOOPBACK_HOST_IF);
assert!(num_rx > 0, "Expected at least one host RX ring on {}.", LOOPBACK_HOST_IF);
assert!(nm.tx_ring(0).is_ok(), "Failed to get host TX ring 0 from {}.", LOOPBACK_HOST_IF);
assert!(nm.rx_ring(0).is_ok(), "Failed to get host RX ring 0 from {}.", LOOPBACK_HOST_IF);
assert!(matches!(nm.tx_ring(num_tx), Err(Error::InvalidRingIndex(_))), "Accessing out-of-bounds host TX ring did not return InvalidRingIndex.");
assert!(matches!(nm.rx_ring(num_rx), Err(Error::InvalidRingIndex(_))), "Accessing out-of-bounds host RX ring did not return InvalidRingIndex.");
}
Err(e) => {
println!("Warning: Failed to open host rings on '{}': {:?}. This test requires appropriate permissions and netmap support for loopback host stack.", LOOPBACK_HOST_IF, e);
}
}
}
}
mod netmap_tests {
use super::test_helpers::*;
use netmap_rs::prelude::*;
use std::time::Duration;
#[test]
fn test_vale_interface_availability() {
let nm_a = setup_vale_interface(VALE_IF_A, 1);
assert!(nm_a.is_ok(), "Failed to open VALE interface A ({}). Ensure VALE ports are set up for testing.", VALE_IF_A);
let nm_b = setup_vale_interface(VALE_IF_B, 1);
assert!(nm_b.is_ok(), "Failed to open VALE interface B ({}). Ensure VALE ports are set up for testing.", VALE_IF_B);
}
#[test]
fn test_original_netmap_creation_on_vale() { let nm = setup_vale_interface(VALE_IF_A, 1);
assert!(nm.is_ok(), "Failed to create Netmap instance on VALE port {}", VALE_IF_A);
}
#[test]
fn test_single_packet_vale_loopback() { let (nm_a, nm_b) =
setup_vale_interfaces_pair(1).expect("Failed to setup VALE interfaces for single packet test");
let mut tx_ring_a = nm_a.tx_ring(0).expect("Failed to get TX ring from VALE_IF_A");
let mut rx_ring_b = nm_b.rx_ring(0).expect("Failed to get RX ring from VALE_IF_B");
let packet_payload = b"hello_vale_single";
send_packet_and_sync(&mut tx_ring_a, packet_payload)
.expect("Send failed on VALE_IF_A");
match receive_packet_timeout(&mut rx_ring_b, Some(packet_payload), DEFAULT_TIMEOUT) {
Ok(Some(payload)) => assert_eq!(payload, packet_payload, "Received payload does not match"),
Ok(None) => panic!("Timeout: Did not receive packet on VALE_IF_B"),
Err(e) => panic!("Receive error: {}", e),
}
}
#[test]
fn test_batch_vale_loopback() { let (nm_a, nm_b) =
setup_vale_interfaces_pair(1).expect("Failed to setup VALE interfaces for batch test");
let mut tx_ring_a = nm_a.tx_ring(0).expect("Failed to get TX ring from VALE_IF_A");
let mut rx_ring_b = nm_b.rx_ring(0).expect("Failed to get RX ring from VALE_IF_B");
let batch_size = 8;
let packet_len = 10;
let mut reservation = tx_ring_a
.reserve_batch(batch_size)
.expect("Batch reservation failed on VALE_IF_A");
let mut sent_payloads = Vec::new();
for i in 0..batch_size {
let mut payload_data = vec![0u8; packet_len];
payload_data[0] = i as u8;
let slot = reservation
.packet(i, payload_data.len())
.expect("Failed to get packet slot in batch reservation");
slot.copy_from_slice(&payload_data);
sent_payloads.push(payload_data);
}
reservation.commit();
tx_ring_a.sync();
let mut received_frames_data = Vec::new();
let mut total_received_count = 0;
let start_time = std::time::Instant::now();
let mut frame_buffer: Vec<Frame> = (0..batch_size).map(|_| Frame::new_borrowed(&[])).collect();
while total_received_count < batch_size && start_time.elapsed() < DEFAULT_TIMEOUT * 2 { rx_ring_b.sync(); let count = rx_ring_b.recv_batch(&mut frame_buffer[total_received_count..]);
if count > 0 {
for i in 0..count {
received_frames_data.push(frame_buffer[total_received_count + i].payload().to_vec());
}
total_received_count += count;
}
if total_received_count < batch_size {
std::thread::sleep(Duration::from_micros(50)); }
}
assert_eq!(
total_received_count, batch_size,
"Did not receive the full batch. Received {} out of {}", total_received_count, batch_size
);
for i in 0..batch_size {
assert_eq!(
received_frames_data.get(i).expect("Missing received frame data"),
sent_payloads.get(i).expect("Missing sent payload data for comparison"),
"Mismatch in packet {} of the batch", i
);
}
}
#[test]
fn test_multi_ring_independent_loopback() {
let num_rings = 2;
let (nm_a, nm_b) = setup_vale_interfaces_pair(num_rings)
.expect("Failed to setup VALE interfaces for multi-ring test");
for i in 0..num_rings {
let mut tx_ring_a = nm_a.tx_ring(i).expect(&format!("Failed to get TX ring {} from VALE_IF_A", i));
let mut rx_ring_b = nm_b.rx_ring(i).expect(&format!("Failed to get RX ring {} from VALE_IF_B", i));
let payload = format!("packet_on_ring_{}", i).into_bytes();
send_packet_and_sync(&mut tx_ring_a, &payload)
.expect(&format!("Send failed on VALE_IF_A, ring {}", i));
match receive_packet_timeout(&mut rx_ring_b, Some(&payload), DEFAULT_TIMEOUT) {
Ok(Some(received_payload)) => assert_eq!(received_payload, payload, "Payload mismatch on ring {}", i),
Ok(None) => panic!("Timeout: Did not receive packet on VALE_IF_B, ring {}", i),
Err(e) => panic!("Receive error on ring {}: {}", i, e),
}
println!("Successfully sent and received packet on ring {}", i);
}
}
#[test]
fn test_tx_ring_error_packet_too_large() {
let (nm_a, _nm_b) = setup_vale_interfaces_pair(1)
.expect("Failed to setup VALE interfaces for packet_too_large test");
let mut tx_ring = nm_a.tx_ring(0).expect("Failed to get TX ring");
let max_size = tx_ring.max_payload_size();
assert!(max_size > 0, "max_payload_size returned 0 or less, cannot run test meaningfully.");
let large_payload = vec![0u8; max_size + 1];
let result = tx_ring.send(&large_payload);
match result {
Err(Error::PacketTooLarge(size)) => {
assert_eq!(size, large_payload.len(), "Error::PacketTooLarge reported incorrect size.");
}
Err(e) => panic!("Expected Error::PacketTooLarge, got {:?}", e),
Ok(_) => panic!("Send succeeded with a too-large packet, which is an error."),
}
}
#[test]
fn test_tx_ring_error_reserve_batch_insufficient_space() {
let (nm_a, _nm_b) = setup_vale_interfaces_pair(1)
.expect("Failed to setup VALE interfaces for insufficient_space test");
let mut tx_ring = nm_a.tx_ring(0).expect("Failed to get TX ring");
let num_total_slots = tx_ring.num_slots();
assert!(num_total_slots > 0, "Ring reported 0 slots.");
let result = tx_ring.reserve_batch(num_total_slots);
match result {
Err(Error::InsufficientSpace) => {
}
Err(e) => panic!("Expected Error::InsufficientSpace, got {:?}", e),
Ok(_) => panic!("Batch reservation succeeded when it should have failed due to insufficient space."),
}
}
#[test]
fn test_netmap_error_invalid_ring_index() {
let num_rings = 1;
let nm_a = setup_vale_interface(VALE_IF_A, num_rings)
.expect("Failed to setup VALE_IF_A for invalid_ring_index test");
let result_tx = nm_a.tx_ring(num_rings);
match result_tx {
Err(Error::InvalidRingIndex(idx)) => {
assert_eq!(idx, num_rings, "Error::InvalidRingIndex reported incorrect index for TX ring.");
}
Err(e) => panic!("Expected Error::InvalidRingIndex for TX ring, got {:?}", e),
Ok(_) => panic!("Getting TX ring with invalid index succeeded, which is an error."),
}
let result_rx = nm_a.rx_ring(num_rings);
match result_rx {
Err(Error::InvalidRingIndex(idx)) => {
assert_eq!(idx, num_rings, "Error::InvalidRingIndex reported incorrect index for RX ring.");
}
Err(e) => panic!("Expected Error::InvalidRingIndex for RX ring, got {:?}", e),
Ok(_) => panic!("Getting RX ring with invalid index succeeded, which is an error."),
}
}
const TEST_PIPE_NAME: &str = "netmap:pipe{integration_test_pipe}";
#[test]
fn test_pipe_open() {
match NetmapBuilder::new(TEST_PIPE_NAME).build() {
Ok(nm_pipe_master) => {
assert!(!nm_pipe_master.is_host_if(), "Pipe interface should not be identified as host interface.");
assert_eq!(nm_pipe_master.num_tx_rings(), 1, "Pipe master endpoint should have 1 TX ring by default.");
assert_eq!(nm_pipe_master.num_rx_rings(), 1, "Pipe master endpoint should have 1 RX ring by default.");
}
Err(e) => {
panic!("Failed to open first endpoint (master) of pipe '{}': {:?}", TEST_PIPE_NAME, e);
}
}
}
#[test]
fn test_pipe_intra_process_send_recv() {
let nm_master = NetmapBuilder::new(TEST_PIPE_NAME)
.num_tx_rings(1) .num_rx_rings(1)
.build()
.expect(&format!("Failed to open pipe master endpoint: {}", TEST_PIPE_NAME));
let nm_slave = NetmapBuilder::new(TEST_PIPE_NAME)
.num_tx_rings(1)
.num_rx_rings(1)
.build()
.expect(&format!("Failed to open pipe slave endpoint: {}", TEST_PIPE_NAME));
let mut master_tx_ring = nm_master.tx_ring(0).expect("Master: failed to get TX ring");
let mut master_rx_ring = nm_master.rx_ring(0).expect("Master: failed to get RX ring");
let mut slave_tx_ring = nm_slave.tx_ring(0).expect("Slave: failed to get TX ring");
let mut slave_rx_ring = nm_slave.rx_ring(0).expect("Slave: failed to get RX ring");
let payload_m_to_s = b"master_to_slave_pipe_test";
let payload_s_to_m = b"slave_to_master_pipe_test";
send_packet_and_sync(&mut master_tx_ring, payload_m_to_s)
.expect("Master: send failed");
match receive_packet_timeout(&mut slave_rx_ring, Some(payload_m_to_s), DEFAULT_TIMEOUT) {
Ok(Some(p)) => assert_eq!(p, payload_m_to_s, "Slave: payload mismatch from master"),
Ok(None) => panic!("Slave: timeout receiving from master"),
Err(e) => panic!("Slave: receive error from master: {}", e),
}
println!("Pipe: Master to Slave communication successful.");
send_packet_and_sync(&mut slave_tx_ring, payload_s_to_m)
.expect("Slave: send failed");
match receive_packet_timeout(&mut master_rx_ring, Some(payload_s_to_m), DEFAULT_TIMEOUT) {
Ok(Some(p)) => assert_eq!(p, payload_s_to_m, "Master: payload mismatch from slave"),
Ok(None) => panic!("Master: timeout receiving from slave"),
Err(e) => panic!("Master: receive error from slave: {}", e),
}
println!("Pipe: Slave to Master communication successful.");
}
}
#[cfg(feature = "tokio-async")]
mod tokio_async_tests {
use super::test_helpers::*; use netmap_rs::NetmapBuilder;
use netmap_rs::tokio_async::{TokioNetmap, AsyncNetmapRxRing, AsyncNetmapTxRing};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::time::Duration;
const ASYNC_TEST_PIPE_NAME: &str = "netmap:pipe{tokio_integration_test}";
const ASYNC_TEST_PAYLOAD: &[u8] = b"tokio_async_pipe_payload_test_data";
const ASYNC_TEST_PACKET_SIZE: usize = 64;
#[tokio::test]
#[cfg(all(unix, feature = "sys"))] async fn test_tokio_pipe_async_send_recv() {
let netmap_a = NetmapBuilder::new(ASYNC_TEST_PIPE_NAME)
.build()
.expect("Failed to open pipe endpoint A for tokio test");
let netmap_b = NetmapBuilder::new(ASYNC_TEST_PIPE_NAME)
.build()
.expect("Failed to open pipe endpoint B for tokio test");
let tokio_nm_a = TokioNetmap::new(netmap_a)
.expect("Failed to create TokioNetmap for endpoint A");
let tokio_nm_b = TokioNetmap::new(netmap_b)
.expect("Failed to create TokioNetmap for endpoint B");
let mut tx_ring_a = tokio_nm_a.tx_ring(0)
.expect("Tokio A: Failed to get async TX ring");
let mut rx_ring_b = tokio_nm_b.rx_ring(0)
.expect("Tokio B: Failed to get async RX ring");
let mut payload_to_send = ASYNC_TEST_PAYLOAD.to_vec();
payload_to_send.resize(ASYNC_TEST_PACKET_SIZE, 0);
let send_future = async {
tx_ring_a.write_all(&payload_to_send).await?;
tx_ring_a.flush().await?;
Result::<_, std::io::Error>::Ok(())
};
let mut receive_buffer = vec![0u8; ASYNC_TEST_PACKET_SIZE * 2];
let recv_future = async {
match tokio::time::timeout(DEFAULT_TIMEOUT * 5, rx_ring_b.read(&mut receive_buffer)).await {
Ok(Ok(n)) => Ok((n, receive_buffer)), Ok(Err(e)) => Err(e),
Err(_timeout_elapsed) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "Receive operation timed out")),
}
};
let (send_result, recv_result) = tokio::join!(send_future, recv_future);
send_result.expect("Sending packet failed");
match recv_result {
Ok((n, buffer)) => {
assert_eq!(n, ASYNC_TEST_PACKET_SIZE, "Received incorrect number of bytes");
assert_eq!(&buffer[..n], payload_to_send.as_slice(), "Received payload does not match sent payload");
println!("Tokio async pipe test: Successfully sent and received packet.");
}
Err(e) => {
panic!("Receiving packet failed: {}", e);
}
}
}
}
#[cfg(not(all(unix, feature = "sys")))]
mod fallback_tests {
use netmap_rs::prelude::*; #[test]
fn test_fallback_mode() {
assert!(true);
}
}