seaslug/io/io_uring/
sq.rs

1#![allow(unsafe_code)]
2
3use std::slice::from_raw_parts_mut;
4
5use super::*;
6
7/// Sprays uring submissions.
8#[derive(Debug)]
9pub(crate) struct Sq {
10    khead: *mut AtomicU32,
11    ktail: *mut AtomicU32,
12    kring_mask: *mut u32,
13    kflags: *mut AtomicU32,
14    kdropped: *mut AtomicU32,
15    array: &'static mut [AtomicU32],
16    sqes: &'static mut [io_uring_sqe],
17    sqe_head: u32,
18    sqe_tail: u32,
19    ring_ptr: *const libc::c_void,
20    ring_mmap_sz: usize,
21    sqes_mmap_sz: usize,
22}
23
24impl Drop for Sq {
25    #[allow(unsafe_code)]
26    fn drop(&mut self) {
27        unsafe {
28            libc::munmap(
29                self.sqes.as_ptr() as *mut libc::c_void,
30                self.sqes_mmap_sz,
31            );
32        }
33        unsafe {
34            libc::munmap(
35                self.ring_ptr as *mut libc::c_void,
36                self.ring_mmap_sz,
37            );
38        }
39    }
40}
41
42impl Sq {
43    pub(crate) fn new(
44        params: &io_uring_params,
45        ring_fd: i32,
46    ) -> io::Result<Sq> {
47        let sq_ring_mmap_sz = params.sq_off.array as usize
48            + (params.sq_entries as usize
49                * std::mem::size_of::<u32>());
50
51        // TODO IORING_FEAT_SINGLE_MMAP for sq
52
53        let sq_ring_ptr = uring_mmap(
54            sq_ring_mmap_sz,
55            ring_fd,
56            IORING_OFF_SQ_RING,
57        )?;
58
59        let sqes_mmap_sz: usize = params.sq_entries
60            as usize
61            * std::mem::size_of::<io_uring_sqe>();
62
63        let sqes_ptr: *mut io_uring_sqe = uring_mmap(
64            sqes_mmap_sz,
65            ring_fd,
66            IORING_OFF_SQES,
67        )? as _;
68
69        Ok(unsafe {
70            Sq {
71                sqe_head: 0,
72                sqe_tail: 0,
73                ring_ptr: sq_ring_ptr,
74                ring_mmap_sz: sq_ring_mmap_sz,
75                sqes_mmap_sz,
76                khead: sq_ring_ptr
77                    .add(params.sq_off.head as usize)
78                    as *mut AtomicU32,
79                ktail: sq_ring_ptr
80                    .add(params.sq_off.tail as usize)
81                    as *mut AtomicU32,
82                kring_mask: sq_ring_ptr
83                    .add(params.sq_off.ring_mask as usize)
84                    as *mut u32,
85                kflags: sq_ring_ptr
86                    .add(params.sq_off.flags as usize)
87                    as *mut AtomicU32,
88                kdropped: sq_ring_ptr
89                    .add(params.sq_off.dropped as usize)
90                    as *mut AtomicU32,
91                array: from_raw_parts_mut(
92                    sq_ring_ptr
93                        .add(params.sq_off.array as usize)
94                        as _,
95                    params.sq_entries as usize,
96                ),
97                sqes: from_raw_parts_mut(
98                    sqes_ptr,
99                    params.sq_entries as usize,
100                ),
101            }
102        })
103    }
104
105    pub(crate) fn try_get_sqe(
106        &mut self,
107        ring_flags: u32,
108    ) -> Option<&mut io_uring_sqe> {
109        let next = self.sqe_tail + 1;
110
111        let head =
112            if (ring_flags & IORING_SETUP_SQPOLL) == 0 {
113                // non-polling mode
114                self.sqe_head
115            } else {
116                // polling mode
117                unsafe { &*self.khead }.load(Acquire)
118            };
119
120        if next - head <= self.sqes.len() as u32 {
121            let idx =
122                self.sqe_tail & unsafe { *self.kring_mask };
123            let sqe = &mut self.sqes[idx as usize];
124            self.sqe_tail = next;
125
126            Some(sqe)
127        } else {
128            None
129        }
130    }
131
132    // sets sq.array to point to current sq.sqe_head
133    fn flush(&mut self) -> u32 {
134        let mask: u32 = unsafe { *self.kring_mask };
135        let to_submit = self.sqe_tail - self.sqe_head;
136
137        let mut ktail =
138            unsafe { &*self.ktail }.load(Acquire);
139
140        for _ in 0..to_submit {
141            let index = ktail & mask;
142            self.array[index as usize]
143                .store(self.sqe_head & mask, Release);
144            ktail += 1;
145            self.sqe_head += 1;
146        }
147
148        let swapped =
149            unsafe { &*self.ktail }.swap(ktail, Release);
150
151        assert_eq!(swapped, ktail - to_submit);
152
153        to_submit
154    }
155
156    pub(crate) fn submit_all(
157        &mut self,
158        ring_flags: u32,
159        ring_fd: i32,
160    ) -> u64 {
161        let submitted = if ring_flags & IORING_SETUP_SQPOLL
162            == 0
163        {
164            // non-SQPOLL mode, we need to use
165            // `enter` to submit our SQEs.
166
167            // TODO for polling, keep flags at 0
168
169            let flags = IORING_ENTER_GETEVENTS;
170            let flushed = self.flush();
171            let mut to_submit = flushed;
172            while to_submit > 0 {
173                let _ = Measure::new(&M.enter_sqe);
174                let ret = enter(
175                    ring_fd,
176                    to_submit,
177                    0,
178                    flags,
179                    std::ptr::null_mut(),
180                )
181                .expect(
182                    "Failed to submit items to kernel via \
183                     io_uring. This should never fail.",
184                );
185                to_submit -= u32::try_from(ret).unwrap();
186            }
187            flushed
188        } else if unsafe { &*self.kflags }.load(Acquire)
189            & IORING_SQ_NEED_WAKEUP
190            != 0
191        {
192            // the kernel has signalled to us that the
193            // SQPOLL thread that checks the submission
194            // queue has terminated due to inactivity,
195            // and needs to be restarted.
196            let to_submit = self.sqe_tail - self.sqe_head;
197            let _ = Measure::new(&M.enter_sqe);
198            enter(
199                ring_fd,
200                to_submit,
201                0,
202                IORING_ENTER_SQ_WAKEUP,
203                std::ptr::null_mut(),
204            )
205            .expect(
206                "Failed to wake up SQPOLL io_uring \
207                 kernel thread. This should never fail.",
208            );
209            0
210        } else {
211            0
212        };
213        assert_eq!(
214            unsafe { &*self.kdropped }.load(Relaxed),
215            0
216        );
217        u64::from(submitted)
218    }
219}