ethox_io_uring/
lib.rs

1// Yes, we are not no_std but we might be one day.
2// Just use the minimal dependencies.
3extern crate alloc;
4use core::{cmp, iter, mem};
5
6use alloc::rc::Rc;
7use alloc::collections::VecDeque;
8
9use ethox::{layer, nic, wire};
10use ethox::managed::Partial;
11use io_uring::opcode::{SendMsg, RecvMsg, types::Target};
12
13mod pool;
14
15pub struct RawRing {
16    /// The ring which we use for the network interface (or UDS, or whatever fd if you go wild).
17    io_ring: io_uring::IoUring,
18    /// The packet memory allocation.
19    #[allow(dead_code)] // Keep a reference, if we need it later.
20    memory: Rc<pool::Pool>,
21    /// The fd of our socket.
22    fd: libc::c_int,
23    io_queue: Queue,
24}
25
26struct SubmitInterface<'io> {
27    inner: &'io mut io_uring::SubmissionQueue,
28    fd: libc::c_int,
29}
30
31pub struct PacketBuf {
32    inner: Partial<pool::Entry>,
33}
34
35pub struct Handle {
36    state: State,
37    info: PacketInfo,
38}
39
40/// Contains packet buffers that may be submitted to the io-uring.
41struct Queue {
42    /// ManuallyDrop since we can't allow the data to be dropped and freed while the kernel is
43    /// still working on it. Otherwise, it might get reclaimed for another object that is then
44    /// thoroughly destroyed.
45    // TODO: drop it anyways if the queue is empty on drop or by closing the uring first. Unclear
46    // question about blocking entries in the uring (of which we should not have any, but how to
47    // ensure this?): The kernel worker will remain active but does closing the uring succeed
48    // anyways? If yes then that risks memory unsafety and we can't rely on the closing to make the
49    // decision to drop the buffers here.
50    buffers: mem::ManuallyDrop<Box<[PacketData]>>,
51    /// Buffers we still haven't sent but should.
52    to_send: VecDeque<usize>,
53    /// Buffers that were completed but not yet inspected.
54    to_recv: VecDeque<usize>,
55    /// Buffers that are unused.
56    free: VecDeque<usize>,
57}
58
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60enum State {
61    Raw,
62    Received,
63    Unsent,
64    Sending,
65    Receiving,
66}
67
68struct PacketInfo {
69    // TODO cmsg buffer for kernel generated timestamps.
70    timestamp: ethox::time::Instant,
71}
72
73struct Tag(u64);
74
75struct PacketData {
76    handle: Handle,
77    buffer: PacketBuf,
78    io_vec: libc::iovec,
79    io_hdr: libc::msghdr,
80}
81
82impl RawRing {
83    pub fn from_fd(fd: libc::c_int) -> Result<Self, std::io::Error> {
84        let ring = io_uring::Builder::default()
85            // iopoll is incompatible with recvmsg, apparently, looking through Linux source code
86            // as of 5.5
87            // .setup_iopoll()
88            .build(32)?;
89        Ok(RawRing::from_ring(ring, fd))
90    }
91
92    pub fn from_ring(io_ring: io_uring::IoUring, fd: libc::c_int) -> Self {
93        // TODO: register buffers from the pool and socket fd.
94        let memory = Rc::new(pool::Pool::with_size_and_count(2048, 128));
95        let io_queue = Queue::with_capacity(Rc::clone(&memory), 32);
96        RawRing {
97            io_ring,
98            memory,
99            fd,
100            io_queue,
101        }
102    }
103
104    pub fn flush_and_reap(&mut self) -> std::io::Result<usize> {
105        // Drain current completion queue.
106        self.io_queue.reap(self.io_ring.completion());
107        // Enter the uring.
108        let result = self.io_ring.submit();
109        // Reap again in case something got completed.
110        self.io_queue.reap(self.io_ring.completion());
111        result
112    }
113}
114
115impl SubmitInterface<'_> {
116    fn open_slots(&self) -> usize {
117        self.inner.capacity() - self.inner.len()
118    }
119
120    /// Submit packet data, returning the number of submitted packets. Those submitted should not
121    /// be moved before completion as the msghdr will point into of them.
122    unsafe fn submit_send<'local>(
123        &mut self,
124        data: impl Iterator<Item=(&'local mut PacketData, Tag)> + ExactSizeIterator,
125    ) {
126        let mut submission = self.inner.available();
127        let remaining = submission.capacity() - submission.len();
128        assert!(data.len() <= remaining);
129
130        for (packet, Tag(tag)) in data {
131            packet.io_hdr.msg_iov = &mut packet.io_vec;
132            packet.io_hdr.msg_iovlen = 1;
133            let send = SendMsg::new(Target::Fd(self.fd), &packet.io_hdr)
134                .build()
135                .user_data(tag);
136            #[allow(unused_unsafe)]
137            match unsafe {
138                submission.push(send)
139            } {
140                Ok(()) => packet.handle.state = State::Sending,
141                // We might even declare this unreachable.
142                Err(_) => panic!("Pushed into full queue"),
143            }
144        }
145    }
146
147    /// Submit packet data, returning the number of submitted packets. Those submitted should not
148    /// be moved before completion as the msghdr will point into of them.
149    unsafe fn submit_recv<'local>(
150        &mut self,
151        data: impl Iterator<Item=(&'local mut PacketData, Tag)> + ExactSizeIterator,
152    ) {
153        let mut submission = self.inner.available();
154        let remaining = submission.capacity() - submission.len();
155        assert!(data.len() <= remaining);
156
157        for (packet, Tag(tag)) in data {
158            packet.io_hdr.msg_iov = &mut packet.io_vec;
159            packet.io_hdr.msg_iovlen = 1;
160            let send = RecvMsg::new(Target::Fd(self.fd), &mut packet.io_hdr)
161                // TODO: investigate IORING_OP_ASYNC_CANCEL and timeout cancel.
162                .flags(libc::MSG_DONTWAIT as u32)
163                .build()
164                .user_data(tag);
165            #[allow(unused_unsafe)]
166            match unsafe {
167                submission.push(send)
168            } {
169                Ok(()) => packet.handle.state = State::Receiving,
170                // We might even declare this unreachable.
171                Err(_) => panic!("Pushed into full queue"),
172            }
173        }
174    }
175}
176
177impl PacketData {
178    pub fn new(buffer: pool::Entry) -> Self {
179        let io_vec = pool::Entry::io_vec(&buffer);
180        PacketData {
181            handle: Handle {
182                state: State::Raw,
183                info: PacketInfo {
184                    timestamp: ethox::time::Instant::from_secs(0),
185                },
186            },
187            buffer: PacketBuf {
188                inner: Partial::new(buffer),
189            },
190            io_vec,
191            // SAFETY: this is a valid initialization for a msghdr
192            io_hdr: unsafe { mem::zeroed() },
193        }
194    }
195}
196
197impl SubmitInterface<'_> {
198    fn borrow(&mut self) -> SubmitInterface<'_> {
199        SubmitInterface { fd: self.fd, inner: self.inner }
200    }
201}
202
203impl Drop for RawRing {
204    fn drop(&mut self) {
205        unsafe {
206            libc::close(self.fd);
207        }
208    }
209}
210
211impl nic::Device for RawRing {
212    type Payload = PacketBuf;
213    type Handle = Handle;
214
215    fn personality(&self) -> nic::Personality {
216        nic::Personality::baseline()
217    }
218
219    fn rx(&mut self, max: usize, mut receiver: impl nic::Recv<Handle, PacketBuf>)
220        -> layer::Result<usize>
221    {
222        let (submitter, submission, completion) = self.io_ring.split();
223        let mut submit = SubmitInterface {
224            inner: submission,
225            fd: self.fd,
226        };
227
228        self.io_queue.fill(submit.borrow());
229        submitter.submit().map_err(|_| layer::Error::Illegal)?;
230        self.io_queue.reap(completion);
231
232        let mut count = 0;
233
234        for _ in 0..max {
235            let idx = match self.io_queue.pop_recv() {
236                Some(idx) => idx,
237                None => break,
238            };
239
240            let packet = self.io_queue.get_mut(idx).unwrap();
241            count += 1;
242            receiver.receive(nic::Packet {
243                handle: &mut packet.handle,
244                payload: &mut packet.buffer,
245            });
246
247            match packet.handle.state {
248                State::Unsent => {
249                    self.io_queue.push_send(idx)
250                },
251                State::Received => {
252                    packet.handle.state = State::Raw;
253                    self.io_queue.push_free(idx);
254                },
255                other => panic!("Unexpected operation {:?} associated with retransmission buffer.", other),
256            }
257        }
258
259        self.io_queue.flush(submit);
260        self.io_ring.submit().map_err(|_| layer::Error::Illegal)?;
261
262        Ok(count)
263    }
264
265    fn tx(&mut self, max: usize, mut sender: impl nic::Send<Handle, PacketBuf>)
266        -> layer::Result<usize>
267    {
268        let (_, submission, _) = self.io_ring.split();
269        let submit = SubmitInterface {
270            inner: submission,
271            fd: self.fd,
272        };
273
274        let mut count = 0;
275        let max = cmp::min(max, submit.open_slots());
276
277        for _ in 0..max {
278            let idx = match self.io_queue.pop_free() {
279                Some(idx) => idx,
280                None => break,
281            };
282
283            let packet = self.io_queue.get_mut(idx).unwrap();
284            packet.handle.state = State::Raw;
285            packet.handle.info.timestamp = ethox::time::Instant::now();
286
287            sender.send(nic::Packet {
288                handle: &mut packet.handle,
289                payload: &mut packet.buffer,
290            });
291
292            match packet.handle.state {
293                State::Unsent => {
294                    self.io_queue.push_send(idx);
295                    count += 1;
296                },
297                State::Raw => {
298                    packet.handle.state = State::Raw;
299                    self.io_queue.push_free(idx);
300                },
301                other => panic!("Unexpected operation {:?} associated with transmission buffer.", other),
302            }
303        }
304
305        self.io_queue.flush(submit);
306        self.io_ring.submit().map_err(|_| layer::Error::Illegal)?;
307
308        Ok(count)
309    }
310}
311
312impl Queue {
313    fn with_capacity(pool: Rc<pool::Pool>, capacity: usize) -> Self {
314        assert_eq!(capacity as u64 as usize, capacity, "Indexing does not survive roundtrip");
315        let entries = pool::Pool::spawn_entries(pool)
316            .take(capacity)
317            .map(PacketData::new)
318            .collect::<Vec<_>>()
319            .into_boxed_slice();
320
321        Queue {
322            buffers: mem::ManuallyDrop::new(entries),
323            to_send: VecDeque::with_capacity(capacity),
324            to_recv: VecDeque::with_capacity(capacity),
325            free: (0..capacity).collect(),
326        }
327    }
328
329    fn get_mut(&mut self, idx: usize) -> Option<&mut PacketData> {
330        self.buffers.get_mut(idx)
331    }
332
333    fn push_send(&mut self, idx: usize) {
334        self.to_send.push_back(idx);
335    }
336
337    fn pop_recv(&mut self) -> Option<usize> {
338        self.to_recv.pop_front()
339    }
340
341    fn push_free(&mut self, idx: usize) {
342        self.free.push_back(idx);
343    }
344
345    fn pop_free(&mut self) -> Option<usize> {
346        self.free.pop_front()
347    }
348
349    fn fill(&mut self, mut submit: SubmitInterface) {
350        let max = submit.open_slots();
351        for _ in 0..max {
352            let idx = match self.free.pop_front() {
353                Some(idx) => idx,
354                None => break,
355            };
356            let packet = self.buffers.get_mut(idx).unwrap();
357            packet.io_vec.iov_len = packet.buffer.inner.capacity();
358            assert_eq!(packet.handle.state, State::Raw);
359            let tag = Tag(idx as u64);
360            unsafe {
361                submit.submit_recv(iter::once((packet, tag)));
362            }
363        }
364    }
365
366    fn reap(&mut self, cq: &mut io_uring::CompletionQueue) {
367        for entry in cq.available() {
368            let idx = entry.user_data() as usize;
369            let packet = self.get_mut(idx).unwrap();
370            match packet.handle.state {
371                State::Sending => {
372                    packet.handle.state = State::Raw;
373                    self.push_free(idx);
374                    continue;
375                },
376                State::Receiving => (),
377                other => panic!("Unexpected operation {:?} associated with completed buffer.", other),
378            }
379
380            if entry.result() >= 0 {
381                packet.handle.state = State::Received;
382                packet.buffer.inner.set_len_unchecked(entry.result() as usize);
383                packet.handle.info.timestamp = ethox::time::Instant::now();
384                self.to_recv.push_back(idx);
385            } else {
386                packet.handle.state = State::Raw;
387                self.free.push_back(idx);
388                // Unhandled error.
389            }
390        }
391    }
392
393    fn flush(&mut self, mut submit: SubmitInterface) {
394        let max = submit.open_slots();
395        for _ in 0..max {
396            let idx = match self.to_send.pop_front() {
397                Some(idx) => idx,
398                None => break,
399            };
400            let packet = self.buffers.get_mut(idx).unwrap();
401            assert_eq!(packet.handle.state, State::Unsent);
402            packet.io_vec.iov_len = packet.buffer.inner.len();
403            let tag = Tag(idx as u64);
404            unsafe {
405                submit.submit_send(iter::once((packet, tag)));
406            }
407        }
408    }
409}
410
411impl nic::Handle for Handle {
412    fn queue(&mut self) -> Result<(), layer::Error> {
413        self.state = State::Unsent;
414        Ok(())
415    }
416
417    fn info(&self) -> &dyn nic::Info {
418        &self.info
419    }
420}
421
422impl nic::Info for PacketInfo {
423    fn capabilities(&self) -> nic::Capabilities {
424        nic::Capabilities::no_support()
425    }
426
427    fn timestamp(&self) -> ethox::time::Instant {
428        self.timestamp
429    }
430}
431
432impl wire::Payload for PacketBuf {
433    fn payload(&self) -> &wire::payload {
434        <Partial<_> as wire::Payload>::payload(&self.inner)
435    }
436}
437
438impl wire::PayloadMut for PacketBuf {
439    fn payload_mut(&mut self) -> &mut wire::payload {
440        <Partial<_> as wire::PayloadMut>::payload_mut(&mut self.inner)
441    }
442
443    fn resize(&mut self, length: usize) -> Result<(), wire::PayloadError> {
444        <Partial<_> as wire::PayloadMut>::resize(&mut self.inner, length)
445    }
446
447    fn reframe(&mut self, frame: wire::Reframe) -> Result<(), wire::PayloadError> {
448        <Partial<_> as wire::PayloadMut>::reframe(&mut self.inner, frame)
449    }
450}
451