1use crate::error::DpdkResult;
7use crate::mbuf::Mbuf;
8
9#[derive(Debug, Clone)]
11pub struct RxQueueConfig {
12 pub nb_desc: u16,
14 pub socket_id: i32,
16 pub rx_free_thresh: u16,
18}
19
20impl Default for RxQueueConfig {
21 fn default() -> Self {
22 Self {
23 nb_desc: 1024,
24 socket_id: 0,
25 rx_free_thresh: 32,
26 }
27 }
28}
29
30#[derive(Debug, Clone)]
32pub struct TxQueueConfig {
33 pub nb_desc: u16,
35 pub socket_id: i32,
37 pub tx_free_thresh: u16,
39 pub tx_rs_thresh: u16,
41}
42
43impl Default for TxQueueConfig {
44 fn default() -> Self {
45 Self {
46 nb_desc: 1024,
47 socket_id: 0,
48 tx_free_thresh: 32,
49 tx_rs_thresh: 32,
50 }
51 }
52}
53
54pub struct RxQueue {
56 port_id: u16,
57 queue_id: u16,
58}
59
60impl RxQueue {
61 pub fn new(port_id: u16, queue_id: u16, _config: RxQueueConfig) -> DpdkResult<Self> {
63 Ok(Self { port_id, queue_id })
65 }
66
67 pub fn port_id(&self) -> u16 {
69 self.port_id
70 }
71
72 pub fn queue_id(&self) -> u16 {
74 self.queue_id
75 }
76
77 pub fn receive_burst(&self, max_packets: u16) -> DpdkResult<Vec<Mbuf>> {
83 let _ = max_packets;
86 Ok(Vec::new())
87 }
88
89 pub fn receive_burst_timeout(
95 &self,
96 max_packets: u16,
97 timeout_us: u64,
98 ) -> DpdkResult<Vec<Mbuf>> {
99 let start = std::time::Instant::now();
100 let timeout = std::time::Duration::from_micros(timeout_us);
101
102 loop {
103 let packets = self.receive_burst(max_packets)?;
104 if !packets.is_empty() {
105 return Ok(packets);
106 }
107
108 if start.elapsed() >= timeout {
109 return Ok(Vec::new());
110 }
111
112 std::hint::spin_loop();
114 }
115 }
116}
117
118pub struct TxQueue {
120 port_id: u16,
121 queue_id: u16,
122}
123
124impl TxQueue {
125 pub fn new(port_id: u16, queue_id: u16, _config: TxQueueConfig) -> DpdkResult<Self> {
127 Ok(Self { port_id, queue_id })
129 }
130
131 pub fn port_id(&self) -> u16 {
133 self.port_id
134 }
135
136 pub fn queue_id(&self) -> u16 {
138 self.queue_id
139 }
140
141 pub fn send_burst(&self, packets: Vec<Mbuf>) -> DpdkResult<u16> {
146 let count = packets.len() as u16;
149
150 for packet in packets {
153 let _ = packet.into_raw(); }
155
156 Ok(count)
157 }
158
159 pub fn send_burst_flush(&self, packets: Vec<Mbuf>) -> DpdkResult<u16> {
163 self.send_burst(packets)
165 }
166}
167
168pub struct QueuePair {
172 pub rx: RxQueue,
173 pub tx: TxQueue,
174}
175
176impl QueuePair {
177 pub fn new(
179 port_id: u16,
180 queue_id: u16,
181 rx_config: RxQueueConfig,
182 tx_config: TxQueueConfig,
183 ) -> DpdkResult<Self> {
184 Ok(Self {
185 rx: RxQueue::new(port_id, queue_id, rx_config)?,
186 tx: TxQueue::new(port_id, queue_id, tx_config)?,
187 })
188 }
189
190 pub fn echo(&self, max_burst: u16) -> DpdkResult<u16> {
194 let packets = self.rx.receive_burst(max_burst)?;
195 if packets.is_empty() {
196 return Ok(0);
197 }
198 self.tx.send_burst(packets)
199 }
200}
201
202#[derive(Debug, Default, Clone)]
204pub struct QueueStats {
205 pub packets: u64,
207 pub bytes: u64,
209 pub dropped: u64,
211 pub errors: u64,
213}
214
215impl QueueStats {
216 pub fn new() -> Self {
217 Self::default()
218 }
219
220 pub fn add_packet(&mut self, bytes: usize) {
222 self.packets += 1;
223 self.bytes += bytes as u64;
224 }
225
226 pub fn add_error(&mut self) {
228 self.errors += 1;
229 }
230
231 pub fn add_dropped(&mut self) {
233 self.dropped += 1;
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn test_rx_queue_creation() {
243 let queue = RxQueue::new(0, 0, RxQueueConfig::default());
244 assert!(queue.is_ok());
245 let queue = queue.unwrap();
246 assert_eq!(queue.port_id(), 0);
247 assert_eq!(queue.queue_id(), 0);
248 }
249
250 #[test]
251 fn test_tx_queue_creation() {
252 let queue = TxQueue::new(0, 0, TxQueueConfig::default());
253 assert!(queue.is_ok());
254 let queue = queue.unwrap();
255 assert_eq!(queue.port_id(), 0);
256 assert_eq!(queue.queue_id(), 0);
257 }
258
259 #[test]
260 fn test_queue_stats() {
261 let mut stats = QueueStats::new();
262 stats.add_packet(100);
263 stats.add_packet(200);
264 assert_eq!(stats.packets, 2);
265 assert_eq!(stats.bytes, 300);
266 }
267}