Skip to main content

dpdk_stdlib/
queue.rs

1//! Queue management for DPDK
2//!
3//! DPDK uses receive (RX) and transmit (TX) queues for packet I/O.
4//! Each port can have multiple queues for parallel processing.
5
6use crate::error::DpdkResult;
7use crate::mbuf::Mbuf;
8
9/// Configuration for a receive queue
10#[derive(Debug, Clone)]
11pub struct RxQueueConfig {
12    /// Number of receive descriptors
13    pub nb_desc: u16,
14    /// NUMA socket ID
15    pub socket_id: i32,
16    /// Enable rx_free_thresh (free RX mbufs lazily)
17    pub rx_free_thresh: u16,
18}
19
20impl Default for RxQueueConfig {
21    fn default() -> Self {
22        Self {
23            nb_desc: 1024,
24            socket_id: 0,
25            rx_free_thresh: 32,
26        }
27    }
28}
29
30/// Configuration for a transmit queue
31#[derive(Debug, Clone)]
32pub struct TxQueueConfig {
33    /// Number of transmit descriptors
34    pub nb_desc: u16,
35    /// NUMA socket ID
36    pub socket_id: i32,
37    /// TX free threshold
38    pub tx_free_thresh: u16,
39    /// TX RS bit threshold
40    pub tx_rs_thresh: u16,
41}
42
43impl Default for TxQueueConfig {
44    fn default() -> Self {
45        Self {
46            nb_desc: 1024,
47            socket_id: 0,
48            tx_free_thresh: 32,
49            tx_rs_thresh: 32,
50        }
51    }
52}
53
54/// A receive queue on a DPDK port
55pub struct RxQueue {
56    port_id: u16,
57    queue_id: u16,
58}
59
60impl RxQueue {
61    /// Create a new receive queue
62    pub fn new(port_id: u16, queue_id: u16, _config: RxQueueConfig) -> DpdkResult<Self> {
63        // In a real implementation, this would call rte_eth_rx_queue_setup
64        Ok(Self { port_id, queue_id })
65    }
66
67    /// Get the port ID this queue belongs to
68    pub fn port_id(&self) -> u16 {
69        self.port_id
70    }
71
72    /// Get the queue ID
73    pub fn queue_id(&self) -> u16 {
74        self.queue_id
75    }
76
77    /// Receive a burst of packets
78    ///
79    /// Returns a vector of received mbufs, up to `max_packets` count.
80    /// This is a non-blocking operation - returns immediately with
81    /// whatever packets are available.
82    pub fn receive_burst(&self, max_packets: u16) -> DpdkResult<Vec<Mbuf>> {
83        // In a real implementation, this would call rte_eth_rx_burst
84        // For now, return empty - this is a placeholder
85        let _ = max_packets;
86        Ok(Vec::new())
87    }
88
89    /// Poll for packets with a timeout
90    ///
91    /// This is a convenience method that polls until either:
92    /// - At least one packet is received
93    /// - The timeout expires
94    pub fn receive_burst_timeout(
95        &self,
96        max_packets: u16,
97        timeout_us: u64,
98    ) -> DpdkResult<Vec<Mbuf>> {
99        let start = std::time::Instant::now();
100        let timeout = std::time::Duration::from_micros(timeout_us);
101
102        loop {
103            let packets = self.receive_burst(max_packets)?;
104            if !packets.is_empty() {
105                return Ok(packets);
106            }
107
108            if start.elapsed() >= timeout {
109                return Ok(Vec::new());
110            }
111
112            // Brief pause before retry to avoid burning CPU
113            std::hint::spin_loop();
114        }
115    }
116}
117
118/// A transmit queue on a DPDK port
119pub struct TxQueue {
120    port_id: u16,
121    queue_id: u16,
122}
123
124impl TxQueue {
125    /// Create a new transmit queue
126    pub fn new(port_id: u16, queue_id: u16, _config: TxQueueConfig) -> DpdkResult<Self> {
127        // In a real implementation, this would call rte_eth_tx_queue_setup
128        Ok(Self { port_id, queue_id })
129    }
130
131    /// Get the port ID this queue belongs to
132    pub fn port_id(&self) -> u16 {
133        self.port_id
134    }
135
136    /// Get the queue ID
137    pub fn queue_id(&self) -> u16 {
138        self.queue_id
139    }
140
141    /// Send a burst of packets
142    ///
143    /// Returns the number of packets successfully queued for transmission.
144    /// Packets that are sent are consumed (freed) by DPDK.
145    pub fn send_burst(&self, packets: Vec<Mbuf>) -> DpdkResult<u16> {
146        // In a real implementation, this would call rte_eth_tx_burst
147        // For now, just consume the packets
148        let count = packets.len() as u16;
149
150        // Packets are consumed - need to prevent Drop from running
151        // since DPDK will free them
152        for packet in packets {
153            let _ = packet.into_raw(); // Leak intentionally - DPDK frees
154        }
155
156        Ok(count)
157    }
158
159    /// Send packets with flush
160    ///
161    /// Same as send_burst but also flushes any buffered packets
162    pub fn send_burst_flush(&self, packets: Vec<Mbuf>) -> DpdkResult<u16> {
163        // In a real implementation, this would also call rte_eth_tx_done_cleanup
164        self.send_burst(packets)
165    }
166}
167
168/// Queue pair combining RX and TX queues
169///
170/// This is a common pattern for per-core packet processing
171pub struct QueuePair {
172    pub rx: RxQueue,
173    pub tx: TxQueue,
174}
175
176impl QueuePair {
177    /// Create a new queue pair
178    pub fn new(
179        port_id: u16,
180        queue_id: u16,
181        rx_config: RxQueueConfig,
182        tx_config: TxQueueConfig,
183    ) -> DpdkResult<Self> {
184        Ok(Self {
185            rx: RxQueue::new(port_id, queue_id, rx_config)?,
186            tx: TxQueue::new(port_id, queue_id, tx_config)?,
187        })
188    }
189
190    /// Echo packets back (common test pattern)
191    ///
192    /// Receives packets and immediately sends them back out
193    pub fn echo(&self, max_burst: u16) -> DpdkResult<u16> {
194        let packets = self.rx.receive_burst(max_burst)?;
195        if packets.is_empty() {
196            return Ok(0);
197        }
198        self.tx.send_burst(packets)
199    }
200}
201
202/// Statistics for a queue
203#[derive(Debug, Default, Clone)]
204pub struct QueueStats {
205    /// Total packets received/transmitted
206    pub packets: u64,
207    /// Total bytes received/transmitted
208    pub bytes: u64,
209    /// Packets dropped
210    pub dropped: u64,
211    /// Errors encountered
212    pub errors: u64,
213}
214
215impl QueueStats {
216    pub fn new() -> Self {
217        Self::default()
218    }
219
220    /// Add packet stats
221    pub fn add_packet(&mut self, bytes: usize) {
222        self.packets += 1;
223        self.bytes += bytes as u64;
224    }
225
226    /// Add error
227    pub fn add_error(&mut self) {
228        self.errors += 1;
229    }
230
231    /// Add dropped packet
232    pub fn add_dropped(&mut self) {
233        self.dropped += 1;
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn test_rx_queue_creation() {
243        let queue = RxQueue::new(0, 0, RxQueueConfig::default());
244        assert!(queue.is_ok());
245        let queue = queue.unwrap();
246        assert_eq!(queue.port_id(), 0);
247        assert_eq!(queue.queue_id(), 0);
248    }
249
250    #[test]
251    fn test_tx_queue_creation() {
252        let queue = TxQueue::new(0, 0, TxQueueConfig::default());
253        assert!(queue.is_ok());
254        let queue = queue.unwrap();
255        assert_eq!(queue.port_id(), 0);
256        assert_eq!(queue.queue_id(), 0);
257    }
258
259    #[test]
260    fn test_queue_stats() {
261        let mut stats = QueueStats::new();
262        stats.add_packet(100);
263        stats.add_packet(200);
264        assert_eq!(stats.packets, 2);
265        assert_eq!(stats.bytes, 300);
266    }
267}