use crate::error::DpdkResult;
use crate::mbuf::Mbuf;
#[derive(Debug, Clone)]
pub struct RxQueueConfig {
pub nb_desc: u16,
pub socket_id: i32,
pub rx_free_thresh: u16,
}
impl Default for RxQueueConfig {
fn default() -> Self {
Self {
nb_desc: 1024,
socket_id: 0,
rx_free_thresh: 32,
}
}
}
#[derive(Debug, Clone)]
pub struct TxQueueConfig {
pub nb_desc: u16,
pub socket_id: i32,
pub tx_free_thresh: u16,
pub tx_rs_thresh: u16,
}
impl Default for TxQueueConfig {
fn default() -> Self {
Self {
nb_desc: 1024,
socket_id: 0,
tx_free_thresh: 32,
tx_rs_thresh: 32,
}
}
}
pub struct RxQueue {
port_id: u16,
queue_id: u16,
}
impl RxQueue {
pub fn new(port_id: u16, queue_id: u16, _config: RxQueueConfig) -> DpdkResult<Self> {
Ok(Self { port_id, queue_id })
}
pub fn port_id(&self) -> u16 {
self.port_id
}
pub fn queue_id(&self) -> u16 {
self.queue_id
}
pub fn receive_burst(&self, max_packets: u16) -> DpdkResult<Vec<Mbuf>> {
let _ = max_packets;
Ok(Vec::new())
}
pub fn receive_burst_timeout(
&self,
max_packets: u16,
timeout_us: u64,
) -> DpdkResult<Vec<Mbuf>> {
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_micros(timeout_us);
loop {
let packets = self.receive_burst(max_packets)?;
if !packets.is_empty() {
return Ok(packets);
}
if start.elapsed() >= timeout {
return Ok(Vec::new());
}
std::hint::spin_loop();
}
}
}
pub struct TxQueue {
port_id: u16,
queue_id: u16,
}
impl TxQueue {
pub fn new(port_id: u16, queue_id: u16, _config: TxQueueConfig) -> DpdkResult<Self> {
Ok(Self { port_id, queue_id })
}
pub fn port_id(&self) -> u16 {
self.port_id
}
pub fn queue_id(&self) -> u16 {
self.queue_id
}
pub fn send_burst(&self, packets: Vec<Mbuf>) -> DpdkResult<u16> {
let count = packets.len() as u16;
for packet in packets {
let _ = packet.into_raw(); }
Ok(count)
}
pub fn send_burst_flush(&self, packets: Vec<Mbuf>) -> DpdkResult<u16> {
self.send_burst(packets)
}
}
pub struct QueuePair {
pub rx: RxQueue,
pub tx: TxQueue,
}
impl QueuePair {
pub fn new(
port_id: u16,
queue_id: u16,
rx_config: RxQueueConfig,
tx_config: TxQueueConfig,
) -> DpdkResult<Self> {
Ok(Self {
rx: RxQueue::new(port_id, queue_id, rx_config)?,
tx: TxQueue::new(port_id, queue_id, tx_config)?,
})
}
pub fn echo(&self, max_burst: u16) -> DpdkResult<u16> {
let packets = self.rx.receive_burst(max_burst)?;
if packets.is_empty() {
return Ok(0);
}
self.tx.send_burst(packets)
}
}
#[derive(Debug, Default, Clone)]
pub struct QueueStats {
pub packets: u64,
pub bytes: u64,
pub dropped: u64,
pub errors: u64,
}
impl QueueStats {
pub fn new() -> Self {
Self::default()
}
pub fn add_packet(&mut self, bytes: usize) {
self.packets += 1;
self.bytes += bytes as u64;
}
pub fn add_error(&mut self) {
self.errors += 1;
}
pub fn add_dropped(&mut self) {
self.dropped += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rx_queue_creation() {
let queue = RxQueue::new(0, 0, RxQueueConfig::default());
assert!(queue.is_ok());
let queue = queue.unwrap();
assert_eq!(queue.port_id(), 0);
assert_eq!(queue.queue_id(), 0);
}
#[test]
fn test_tx_queue_creation() {
let queue = TxQueue::new(0, 0, TxQueueConfig::default());
assert!(queue.is_ok());
let queue = queue.unwrap();
assert_eq!(queue.port_id(), 0);
assert_eq!(queue.queue_id(), 0);
}
#[test]
fn test_queue_stats() {
let mut stats = QueueStats::new();
stats.add_packet(100);
stats.add_packet(200);
assert_eq!(stats.packets, 2);
assert_eq!(stats.bytes, 300);
}
}