seaslug/io/io_uring/
sq.rs1#![allow(unsafe_code)]
2
3use std::slice::from_raw_parts_mut;
4
5use super::*;
6
7#[derive(Debug)]
9pub(crate) struct Sq {
10 khead: *mut AtomicU32,
11 ktail: *mut AtomicU32,
12 kring_mask: *mut u32,
13 kflags: *mut AtomicU32,
14 kdropped: *mut AtomicU32,
15 array: &'static mut [AtomicU32],
16 sqes: &'static mut [io_uring_sqe],
17 sqe_head: u32,
18 sqe_tail: u32,
19 ring_ptr: *const libc::c_void,
20 ring_mmap_sz: usize,
21 sqes_mmap_sz: usize,
22}
23
24impl Drop for Sq {
25 #[allow(unsafe_code)]
26 fn drop(&mut self) {
27 unsafe {
28 libc::munmap(
29 self.sqes.as_ptr() as *mut libc::c_void,
30 self.sqes_mmap_sz,
31 );
32 }
33 unsafe {
34 libc::munmap(
35 self.ring_ptr as *mut libc::c_void,
36 self.ring_mmap_sz,
37 );
38 }
39 }
40}
41
42impl Sq {
43 pub(crate) fn new(
44 params: &io_uring_params,
45 ring_fd: i32,
46 ) -> io::Result<Sq> {
47 let sq_ring_mmap_sz = params.sq_off.array as usize
48 + (params.sq_entries as usize
49 * std::mem::size_of::<u32>());
50
51 let sq_ring_ptr = uring_mmap(
54 sq_ring_mmap_sz,
55 ring_fd,
56 IORING_OFF_SQ_RING,
57 )?;
58
59 let sqes_mmap_sz: usize = params.sq_entries
60 as usize
61 * std::mem::size_of::<io_uring_sqe>();
62
63 let sqes_ptr: *mut io_uring_sqe = uring_mmap(
64 sqes_mmap_sz,
65 ring_fd,
66 IORING_OFF_SQES,
67 )? as _;
68
69 Ok(unsafe {
70 Sq {
71 sqe_head: 0,
72 sqe_tail: 0,
73 ring_ptr: sq_ring_ptr,
74 ring_mmap_sz: sq_ring_mmap_sz,
75 sqes_mmap_sz,
76 khead: sq_ring_ptr
77 .add(params.sq_off.head as usize)
78 as *mut AtomicU32,
79 ktail: sq_ring_ptr
80 .add(params.sq_off.tail as usize)
81 as *mut AtomicU32,
82 kring_mask: sq_ring_ptr
83 .add(params.sq_off.ring_mask as usize)
84 as *mut u32,
85 kflags: sq_ring_ptr
86 .add(params.sq_off.flags as usize)
87 as *mut AtomicU32,
88 kdropped: sq_ring_ptr
89 .add(params.sq_off.dropped as usize)
90 as *mut AtomicU32,
91 array: from_raw_parts_mut(
92 sq_ring_ptr
93 .add(params.sq_off.array as usize)
94 as _,
95 params.sq_entries as usize,
96 ),
97 sqes: from_raw_parts_mut(
98 sqes_ptr,
99 params.sq_entries as usize,
100 ),
101 }
102 })
103 }
104
105 pub(crate) fn try_get_sqe(
106 &mut self,
107 ring_flags: u32,
108 ) -> Option<&mut io_uring_sqe> {
109 let next = self.sqe_tail + 1;
110
111 let head =
112 if (ring_flags & IORING_SETUP_SQPOLL) == 0 {
113 self.sqe_head
115 } else {
116 unsafe { &*self.khead }.load(Acquire)
118 };
119
120 if next - head <= self.sqes.len() as u32 {
121 let idx =
122 self.sqe_tail & unsafe { *self.kring_mask };
123 let sqe = &mut self.sqes[idx as usize];
124 self.sqe_tail = next;
125
126 Some(sqe)
127 } else {
128 None
129 }
130 }
131
132 fn flush(&mut self) -> u32 {
134 let mask: u32 = unsafe { *self.kring_mask };
135 let to_submit = self.sqe_tail - self.sqe_head;
136
137 let mut ktail =
138 unsafe { &*self.ktail }.load(Acquire);
139
140 for _ in 0..to_submit {
141 let index = ktail & mask;
142 self.array[index as usize]
143 .store(self.sqe_head & mask, Release);
144 ktail += 1;
145 self.sqe_head += 1;
146 }
147
148 let swapped =
149 unsafe { &*self.ktail }.swap(ktail, Release);
150
151 assert_eq!(swapped, ktail - to_submit);
152
153 to_submit
154 }
155
156 pub(crate) fn submit_all(
157 &mut self,
158 ring_flags: u32,
159 ring_fd: i32,
160 ) -> u64 {
161 let submitted = if ring_flags & IORING_SETUP_SQPOLL
162 == 0
163 {
164 let flags = IORING_ENTER_GETEVENTS;
170 let flushed = self.flush();
171 let mut to_submit = flushed;
172 while to_submit > 0 {
173 let _ = Measure::new(&M.enter_sqe);
174 let ret = enter(
175 ring_fd,
176 to_submit,
177 0,
178 flags,
179 std::ptr::null_mut(),
180 )
181 .expect(
182 "Failed to submit items to kernel via \
183 io_uring. This should never fail.",
184 );
185 to_submit -= u32::try_from(ret).unwrap();
186 }
187 flushed
188 } else if unsafe { &*self.kflags }.load(Acquire)
189 & IORING_SQ_NEED_WAKEUP
190 != 0
191 {
192 let to_submit = self.sqe_tail - self.sqe_head;
197 let _ = Measure::new(&M.enter_sqe);
198 enter(
199 ring_fd,
200 to_submit,
201 0,
202 IORING_ENTER_SQ_WAKEUP,
203 std::ptr::null_mut(),
204 )
205 .expect(
206 "Failed to wake up SQPOLL io_uring \
207 kernel thread. This should never fail.",
208 );
209 0
210 } else {
211 0
212 };
213 assert_eq!(
214 unsafe { &*self.kdropped }.load(Relaxed),
215 0
216 );
217 u64::from(submitted)
218 }
219}