Skip to main content

push_packet/channels/
route.rs

1//! Communication primitives for routing packets to userspace
2
3use std::{collections::VecDeque, os::fd::BorrowedFd, sync::Arc};
4
5use crossbeam_queue::ArrayQueue;
6use nix::poll::PollFlags;
7use xdpilone::{CompletionQueue, FillQueue, RingRx, RingTx, xdp::XdpDesc};
8
9use crate::{af_xdp::OwnedUmem, cast, channels::ChannelError, events::route::RouteEvent};
10
11const CACHE_CAPACITY: u32 = 64;
12const FREE_LIST_BATCH: u32 = 64;
13
14/// Receiver for an `AF_XDP` socket.
15pub struct Receiver {
16    rx: RingRx,
17    fill_queue: FillQueue,
18    umem: Arc<OwnedUmem>,
19    cache: VecDeque<XdpDesc>,
20    free_list: Arc<ArrayQueue<u64>>,
21}
22
23impl Receiver {
24    pub(crate) fn new(
25        rx: RingRx,
26        fill_queue: FillQueue,
27        umem: Arc<OwnedUmem>,
28        free_list: Arc<ArrayQueue<u64>>,
29    ) -> Self {
30        Self {
31            rx,
32            fill_queue,
33            umem,
34            cache: VecDeque::with_capacity(
35                CACHE_CAPACITY
36                    .try_into()
37                    .expect("u32 must fit in usize for targets"),
38            ),
39            free_list,
40        }
41    }
42
43    fn replenish_fill_queue(&mut self) {
44        if self.free_list.len()
45            < FREE_LIST_BATCH
46                .try_into()
47                .expect("u32 must fit in usize for targets")
48        {
49            return;
50        }
51        {
52            let mut wf = self.fill_queue.fill(FREE_LIST_BATCH);
53            while let Some(addr) = self.free_list.pop() {
54                if !wf.insert_once(addr) {
55                    self.free_list
56                        .push(addr)
57                        .expect("Frame count == free list cap");
58                    break;
59                }
60            }
61            wf.commit();
62        }
63        if self.fill_queue.needs_wakeup() {
64            self.fill_queue.wake();
65        }
66    }
67
68    /// Blocks until a packet is available.
69    /// This returns a [`RouteEvent`], which references the underlying memory. It should quickly be
70    /// dropped or converted into an [`crate::events::route::OwnedRouteEvent`] to avoid frame
71    /// starvation.
72    ///
73    /// # Errors
74    /// Returns [`ChannelError::Disconnected`] if the channel is closed.
75    #[allow(clippy::missing_panics_doc)]
76    pub fn recv(&mut self) -> Result<RouteEvent<'_>, ChannelError> {
77        self.replenish_fill_queue();
78
79        while self.cache.is_empty() {
80            {
81                let mut reader = self.rx.receive(CACHE_CAPACITY);
82                self.cache.extend(reader.by_ref());
83                reader.release();
84            }
85            if self.cache.is_empty() {
86                // Safety: the fd is owned by self.rx, outlives this borrow
87                let fd = unsafe { BorrowedFd::borrow_raw(self.rx.as_raw_fd()) };
88                crate::channels::poll::poll_fd(fd, PollFlags::POLLIN)?;
89            }
90        }
91
92        let XdpDesc { addr, len, .. } = self
93            .cache
94            .pop_front()
95            .expect("cache non-empty after loop and poll");
96        Ok(RouteEvent {
97            address: addr,
98            len,
99            umem: &self.umem,
100            free_list: &self.free_list,
101        })
102    }
103    /// Attempts to receive a packet.
104    /// This returns a [`RouteEvent`], which references the underlying memory. It should quickly be
105    /// dropped or converted into an [`crate::events::route::OwnedRouteEvent`] to avoid frame
106    /// starvation.
107    ///
108    /// # Errors
109    /// Returns [`ChannelError::Disconnected`] if the channel is closed.
110    /// Returns [`ChannelError::Empty`] if there are no packets available.
111    pub fn try_recv(&mut self) -> Result<RouteEvent<'_>, ChannelError> {
112        self.replenish_fill_queue();
113        if self.cache.is_empty() {
114            let mut reader = self.rx.receive(CACHE_CAPACITY);
115            self.cache.extend(reader.by_ref());
116            reader.release();
117        }
118        let XdpDesc { addr, len, .. } = self.cache.pop_front().ok_or(ChannelError::Empty)?;
119
120        Ok(RouteEvent {
121            len,
122            address: addr,
123            umem: &self.umem,
124            free_list: &self.free_list,
125        })
126    }
127}
128
129/// Sender for an `AF_XDP` socket
130pub struct Sender {
131    tx: RingTx,
132    completion_queue: CompletionQueue,
133    umem: Arc<OwnedUmem>,
134    free_list: Arc<ArrayQueue<u64>>,
135}
136
137impl Sender {
138    pub(crate) fn new(
139        tx: RingTx,
140        completion_queue: CompletionQueue,
141        umem: Arc<OwnedUmem>,
142        free_list: Arc<ArrayQueue<u64>>,
143    ) -> Self {
144        Self {
145            tx,
146            completion_queue,
147            umem,
148            free_list,
149        }
150    }
151
152    fn drain_completion_queue(&mut self) {
153        let mut rc = self.completion_queue.complete(FREE_LIST_BATCH);
154        if rc.capacity() < FREE_LIST_BATCH {
155            return;
156        }
157        while let Some(addr) = rc.read() {
158            self.free_list.push(addr).expect("free list = frame count");
159        }
160        rc.release();
161    }
162
163    /// Attempts to send a packet.
164    ///
165    /// # Errors
166    /// Returns [`ChannelError::Disconnected`] if the channel is closed.
167    /// Returns [`ChannelError::Poll`] for unexpected poll errors.
168    #[allow(clippy::missing_panics_doc)]
169    pub fn try_send(&mut self, data: impl AsRef<[u8]>) -> Result<(), ChannelError> {
170        self.drain_completion_queue();
171        let bytes = data.as_ref();
172        let address = self.free_list.pop().ok_or(ChannelError::Empty)?;
173        // Safety: The address is from free_list, not accessible to the kernel.
174        unsafe {
175            self.umem
176                .write_at(cast::umem_offset_to_usize(address), bytes);
177        }
178        {
179            let mut wt = self.tx.transmit(1);
180            if !wt.insert_once(XdpDesc {
181                addr: address,
182                len: cast::packet_len_to_u32(bytes.len()),
183                options: 0,
184            }) {
185                self.free_list
186                    .push(address)
187                    .expect("free list cap = frame count");
188                return Err(ChannelError::Empty);
189            }
190            wt.commit();
191        }
192        if self.tx.needs_wakeup() {
193            self.tx.wake();
194        }
195        Ok(())
196    }
197
198    /// Blocks until the packet is sent.
199    ///
200    /// # Errors
201    /// Returns [`ChannelError::Disconnected`] if the channel is closed.
202    pub fn send(&mut self, data: impl AsRef<[u8]>) -> Result<(), ChannelError> {
203        let bytes = data.as_ref();
204
205        let address = loop {
206            self.drain_completion_queue();
207            if let Some(address) = self.free_list.pop() {
208                break address;
209            }
210            // Safety: the fd is owned by self.tx, outlives this borrow
211            let fd = unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) };
212
213            crate::channels::poll::poll_fd(fd, PollFlags::POLLIN | PollFlags::POLLOUT)?;
214        };
215
216        // Safety: The address came from free_list, and not accessible to the kernel
217        unsafe {
218            self.umem
219                .write_at(cast::umem_offset_to_usize(address), bytes);
220        }
221
222        loop {
223            let mut wt = self.tx.transmit(1);
224            if wt.insert_once(XdpDesc {
225                addr: address,
226                len: cast::packet_len_to_u32(bytes.len()),
227                options: 0,
228            }) {
229                wt.commit();
230                break;
231            }
232            drop(wt);
233            // Safety: the fd is owned by self.tx, outlives this borrow
234            let fd = unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) };
235            crate::channels::poll::poll_fd(fd, PollFlags::POLLIN | PollFlags::POLLOUT)?;
236        }
237
238        if self.tx.needs_wakeup() {
239            self.tx.wake();
240        }
241        Ok(())
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use crate::channels::route::{Receiver, Sender};
248
249    fn assert_send<T: Send>() {}
250
251    #[test]
252    fn sender_is_send() {
253        assert_send::<Sender>();
254    }
255
256    #[test]
257    fn receiver_is_send() {
258        assert_send::<Receiver>();
259    }
260}