push_packet/channels/
route.rs1use std::{collections::VecDeque, os::fd::BorrowedFd, sync::Arc};
4
5use crossbeam_queue::ArrayQueue;
6use nix::poll::PollFlags;
7use xdpilone::{CompletionQueue, FillQueue, RingRx, RingTx, xdp::XdpDesc};
8
9use crate::{af_xdp::OwnedUmem, cast, channels::ChannelError, events::route::RouteEvent};
10
11const CACHE_CAPACITY: u32 = 64;
12const FREE_LIST_BATCH: u32 = 64;
13
14pub struct Receiver {
16 rx: RingRx,
17 fill_queue: FillQueue,
18 umem: Arc<OwnedUmem>,
19 cache: VecDeque<XdpDesc>,
20 free_list: Arc<ArrayQueue<u64>>,
21}
22
23impl Receiver {
24 pub(crate) fn new(
25 rx: RingRx,
26 fill_queue: FillQueue,
27 umem: Arc<OwnedUmem>,
28 free_list: Arc<ArrayQueue<u64>>,
29 ) -> Self {
30 Self {
31 rx,
32 fill_queue,
33 umem,
34 cache: VecDeque::with_capacity(
35 CACHE_CAPACITY
36 .try_into()
37 .expect("u32 must fit in usize for targets"),
38 ),
39 free_list,
40 }
41 }
42
43 fn replenish_fill_queue(&mut self) {
44 if self.free_list.len()
45 < FREE_LIST_BATCH
46 .try_into()
47 .expect("u32 must fit in usize for targets")
48 {
49 return;
50 }
51 {
52 let mut wf = self.fill_queue.fill(FREE_LIST_BATCH);
53 while let Some(addr) = self.free_list.pop() {
54 if !wf.insert_once(addr) {
55 self.free_list
56 .push(addr)
57 .expect("Frame count == free list cap");
58 break;
59 }
60 }
61 wf.commit();
62 }
63 if self.fill_queue.needs_wakeup() {
64 self.fill_queue.wake();
65 }
66 }
67
68 #[allow(clippy::missing_panics_doc)]
76 pub fn recv(&mut self) -> Result<RouteEvent<'_>, ChannelError> {
77 self.replenish_fill_queue();
78
79 while self.cache.is_empty() {
80 {
81 let mut reader = self.rx.receive(CACHE_CAPACITY);
82 self.cache.extend(reader.by_ref());
83 reader.release();
84 }
85 if self.cache.is_empty() {
86 let fd = unsafe { BorrowedFd::borrow_raw(self.rx.as_raw_fd()) };
88 crate::channels::poll::poll_fd(fd, PollFlags::POLLIN)?;
89 }
90 }
91
92 let XdpDesc { addr, len, .. } = self
93 .cache
94 .pop_front()
95 .expect("cache non-empty after loop and poll");
96 Ok(RouteEvent {
97 address: addr,
98 len,
99 umem: &self.umem,
100 free_list: &self.free_list,
101 })
102 }
103 pub fn try_recv(&mut self) -> Result<RouteEvent<'_>, ChannelError> {
112 self.replenish_fill_queue();
113 if self.cache.is_empty() {
114 let mut reader = self.rx.receive(CACHE_CAPACITY);
115 self.cache.extend(reader.by_ref());
116 reader.release();
117 }
118 let XdpDesc { addr, len, .. } = self.cache.pop_front().ok_or(ChannelError::Empty)?;
119
120 Ok(RouteEvent {
121 len,
122 address: addr,
123 umem: &self.umem,
124 free_list: &self.free_list,
125 })
126 }
127}
128
129pub struct Sender {
131 tx: RingTx,
132 completion_queue: CompletionQueue,
133 umem: Arc<OwnedUmem>,
134 free_list: Arc<ArrayQueue<u64>>,
135}
136
137impl Sender {
138 pub(crate) fn new(
139 tx: RingTx,
140 completion_queue: CompletionQueue,
141 umem: Arc<OwnedUmem>,
142 free_list: Arc<ArrayQueue<u64>>,
143 ) -> Self {
144 Self {
145 tx,
146 completion_queue,
147 umem,
148 free_list,
149 }
150 }
151
152 fn drain_completion_queue(&mut self) {
153 let mut rc = self.completion_queue.complete(FREE_LIST_BATCH);
154 if rc.capacity() < FREE_LIST_BATCH {
155 return;
156 }
157 while let Some(addr) = rc.read() {
158 self.free_list.push(addr).expect("free list = frame count");
159 }
160 rc.release();
161 }
162
163 #[allow(clippy::missing_panics_doc)]
169 pub fn try_send(&mut self, data: impl AsRef<[u8]>) -> Result<(), ChannelError> {
170 self.drain_completion_queue();
171 let bytes = data.as_ref();
172 let address = self.free_list.pop().ok_or(ChannelError::Empty)?;
173 unsafe {
175 self.umem
176 .write_at(cast::umem_offset_to_usize(address), bytes);
177 }
178 {
179 let mut wt = self.tx.transmit(1);
180 if !wt.insert_once(XdpDesc {
181 addr: address,
182 len: cast::packet_len_to_u32(bytes.len()),
183 options: 0,
184 }) {
185 self.free_list
186 .push(address)
187 .expect("free list cap = frame count");
188 return Err(ChannelError::Empty);
189 }
190 wt.commit();
191 }
192 if self.tx.needs_wakeup() {
193 self.tx.wake();
194 }
195 Ok(())
196 }
197
198 pub fn send(&mut self, data: impl AsRef<[u8]>) -> Result<(), ChannelError> {
203 let bytes = data.as_ref();
204
205 let address = loop {
206 self.drain_completion_queue();
207 if let Some(address) = self.free_list.pop() {
208 break address;
209 }
210 let fd = unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) };
212
213 crate::channels::poll::poll_fd(fd, PollFlags::POLLIN | PollFlags::POLLOUT)?;
214 };
215
216 unsafe {
218 self.umem
219 .write_at(cast::umem_offset_to_usize(address), bytes);
220 }
221
222 loop {
223 let mut wt = self.tx.transmit(1);
224 if wt.insert_once(XdpDesc {
225 addr: address,
226 len: cast::packet_len_to_u32(bytes.len()),
227 options: 0,
228 }) {
229 wt.commit();
230 break;
231 }
232 drop(wt);
233 let fd = unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) };
235 crate::channels::poll::poll_fd(fd, PollFlags::POLLIN | PollFlags::POLLOUT)?;
236 }
237
238 if self.tx.needs_wakeup() {
239 self.tx.wake();
240 }
241 Ok(())
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use crate::channels::route::{Receiver, Sender};
248
249 fn assert_send<T: Send>() {}
250
251 #[test]
252 fn sender_is_send() {
253 assert_send::<Sender>();
254 }
255
256 #[test]
257 fn receiver_is_send() {
258 assert_send::<Receiver>();
259 }
260}