1use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
2use std::{cell::Cell, cell::RefCell, cmp, collections::VecDeque, io, mem, rc::Rc, sync::Arc};
3
4use io_uring::cqueue::{self, more, Entry as CEntry};
5use io_uring::opcode::{AsyncCancel, PollAdd};
6use io_uring::squeue::{Entry as SEntry, SubmissionQueue};
7use io_uring::{types::Fd, IoUring, Probe};
8
9use crate::pool::Dispatchable;
10
11pub use io_uring;
12
13pub trait Handler {
14 fn completed(&mut self, id: usize, flags: u32, result: io::Result<usize>);
16
17 fn canceled(&mut self, id: usize);
19}
20
21#[inline(always)]
22pub(crate) fn spawn_blocking(rt: &crate::Runtime, _: &Driver, f: Box<dyn Dispatchable + Send>) {
23 let _ = rt.pool.dispatch(f);
24}
25
26pub struct DriverApi {
27 batch: u64,
28 inner: Rc<DriverInner>,
29}
30
31impl DriverApi {
32 #[inline]
33 pub fn is_new(&self) -> bool {
35 self.inner.flags.get().contains(Flags::NEW)
36 }
37
38 fn submit_inner<F>(&self, f: F)
39 where
40 F: FnOnce(&mut SEntry),
41 {
42 let mut changes = self.inner.changes.borrow_mut();
43 let sq = self.inner.ring.submission();
44 if !changes.is_empty() || sq.is_full() {
45 let mut entry = Default::default();
46 f(&mut entry);
47 changes.push_back(entry);
48 } else {
49 unsafe {
50 sq.push_inline(f).expect("Queue size is checked");
51 }
52 }
53 }
54
55 #[inline]
56 pub fn submit(&self, id: u32, entry: SEntry) {
58 self.submit_inner(|en| {
59 *en = entry;
60 en.set_user_data(id as u64 | self.batch);
61 });
62 }
63
64 #[inline]
65 pub fn submit_inline<F>(&self, id: u32, f: F)
67 where
68 F: FnOnce(&mut SEntry),
69 {
70 self.submit_inner(|en| {
71 f(en);
72 en.set_user_data(id as u64 | self.batch);
73 });
74 }
75
76 #[inline]
77 pub fn cancel(&self, id: u32) {
79 self.submit_inner(|en| {
80 *en = AsyncCancel::new(id as u64 | self.batch)
81 .build()
82 .user_data(Driver::CANCEL);
83 });
84 }
85
86 pub fn is_supported(&self, opcode: u8) -> bool {
88 self.inner.probe.is_supported(opcode)
89 }
90}
91
92pub struct Driver {
94 fd: RawFd,
95 hid: Cell<u64>,
96 notifier: Notifier,
97 handlers: Cell<Option<Box<Vec<Box<dyn Handler>>>>>,
98 inner: Rc<DriverInner>,
99}
100
101bitflags::bitflags! {
102 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
103 struct Flags: u8 {
104 const NEW = 0b0000_0001;
105 const NOTIFIER = 0b0000_0010;
106 }
107}
108
109struct DriverInner {
110 probe: Probe,
111 flags: Cell<Flags>,
112 ring: IoUring<SEntry, CEntry>,
113 changes: RefCell<VecDeque<SEntry>>,
114}
115
116impl Driver {
117 const NOTIFY: u64 = u64::MAX;
118 const CANCEL: u64 = u64::MAX - 1;
119 const BATCH: u64 = 48;
120 const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
121 const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
122
123 pub(crate) fn new(capacity: u32) -> io::Result<Self> {
125 let (new, ring) = if let Ok(ring) = IoUring::builder()
127 .setup_coop_taskrun()
128 .setup_single_issuer()
129 .setup_defer_taskrun()
130 .build(capacity)
131 {
132 log::info!("New io-uring driver with single-issuer, coop-taskrun, defer-taskrun");
133 (true, ring)
134 } else if let Ok(ring) = IoUring::builder().setup_single_issuer().build(capacity) {
135 log::info!("New io-uring driver with single-issuer");
136 (true, ring)
137 } else {
138 let ring = IoUring::builder().build(capacity)?;
139 log::info!("New io-uring driver");
140 (false, ring)
141 };
142
143 let mut probe = Probe::new();
144 ring.submitter().register_probe(&mut probe)?;
145
146 let notifier = Notifier::new()?;
148 unsafe {
149 let sq = ring.submission();
150 sq.push(
151 &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
152 .multi(true)
153 .build()
154 .user_data(Self::NOTIFY),
155 )
156 .expect("the squeue sould not be full");
157 sq.sync();
158 }
159
160 let fd = ring.as_raw_fd();
161 let inner = Rc::new(DriverInner {
162 ring,
163 probe,
164 flags: Cell::new(if new { Flags::NEW } else { Flags::empty() }),
165 changes: RefCell::new(VecDeque::with_capacity(32)),
166 });
167
168 Ok(Self {
169 fd,
170 inner,
171 notifier,
172 hid: Cell::new(0),
173 handlers: Cell::new(Some(Box::new(Vec::new()))),
174 })
175 }
176
177 pub const fn tp(&self) -> crate::driver::DriverType {
179 crate::driver::DriverType::IoUring
180 }
181
182 pub fn register<F>(&self, f: F)
184 where
185 F: FnOnce(DriverApi) -> Box<dyn Handler>,
186 {
187 let id = self.hid.get();
188 let mut handlers = self.handlers.take().unwrap_or_default();
189 handlers.push(f(DriverApi {
190 batch: id << Self::BATCH,
191 inner: self.inner.clone(),
192 }));
193 self.handlers.set(Some(handlers));
194 self.hid.set(id + 1);
195 }
196
197 pub(crate) fn poll<T, F>(&self, mut run: F) -> io::Result<T>
199 where
200 F: FnMut() -> super::PollResult<T>,
201 {
202 let ring = &self.inner.ring;
203 let sq = ring.submission();
204 let mut cq = unsafe { ring.completion_shared() };
205 let submitter = ring.submitter();
206 loop {
207 self.poll_completions(&mut cq, &sq);
208
209 let more_tasks = match run() {
210 super::PollResult::Pending => false,
211 super::PollResult::HasTasks => true,
212 super::PollResult::Ready(val) => return Ok(val),
213 };
214 let more_changes = self.apply_changes(sq);
215
216 sq.sync();
219
220 let result = if more_changes || more_tasks {
221 submitter.submit()
222 } else {
223 submitter.submit_and_wait(1)
224 };
225
226 if let Err(e) = result {
227 match e.raw_os_error() {
228 Some(libc::ETIME) | Some(libc::EBUSY) | Some(libc::EAGAIN)
229 | Some(libc::EINTR) => {
230 log::info!("Ring submit interrupted, {:?}", e);
231 }
232 _ => return Err(e),
233 }
234 }
235 }
236 }
237
238 fn apply_changes(&self, sq: SubmissionQueue<'_, SEntry>) -> bool {
239 let mut changes = self.inner.changes.borrow_mut();
240 if changes.is_empty() {
241 false
242 } else {
243 let num = cmp::min(changes.len(), sq.capacity() - sq.len());
244 let (s1, s2) = changes.as_slices();
245 let s1_num = cmp::min(s1.len(), num);
246 if s1_num > 0 {
247 unsafe { sq.push_multiple(&s1[0..s1_num]) }.unwrap();
248 } else if !s2.is_empty() {
249 let s2_num = cmp::min(s2.len(), num - s1_num);
250 if s2_num > 0 {
251 unsafe { sq.push_multiple(&s2[0..s2_num]) }.unwrap();
252 }
253 }
254 changes.drain(0..num);
255
256 !changes.is_empty()
257 }
258 }
259
260 fn poll_completions(
262 &self,
263 cq: &mut cqueue::CompletionQueue<'_, CEntry>,
264 sq: &SubmissionQueue<'_, SEntry>,
265 ) {
266 cq.sync();
267
268 if !cqueue::CompletionQueue::<'_, _>::is_empty(cq) {
269 let mut handlers = self.handlers.take().unwrap();
270 for entry in cq {
271 let user_data = entry.user_data();
272 match user_data {
273 Self::CANCEL => {}
274 Self::NOTIFY => {
275 let flags = entry.flags();
276 self.notifier.clear().expect("cannot clear notifier");
277
278 if !more(flags) {
280 unsafe {
281 sq.push(
282 &PollAdd::new(
283 Fd(self.notifier.as_raw_fd()),
284 libc::POLLIN as _,
285 )
286 .multi(true)
287 .build()
288 .user_data(Self::NOTIFY),
289 )
290 }
291 .expect("the squeue sould not be full");
292 }
293 }
294 _ => {
295 let batch = ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize;
296 let user_data = (user_data & Self::DATA_MASK) as usize;
297
298 let result = entry.result();
299 if result == -libc::ECANCELED {
300 handlers[batch].canceled(user_data);
301 } else {
302 let result = if result < 0 {
303 Err(io::Error::from_raw_os_error(-result))
304 } else {
305 Ok(result as _)
306 };
307 handlers[batch].completed(user_data, entry.flags(), result);
308 }
309 }
310 }
311 }
312 self.handlers.set(Some(handlers));
313 }
314 }
315
316 pub(crate) fn handle(&self) -> NotifyHandle {
318 self.notifier.handle()
319 }
320}
321
322impl AsRawFd for Driver {
323 fn as_raw_fd(&self) -> RawFd {
324 self.fd
325 }
326}
327
328#[derive(Debug)]
329pub(crate) struct Notifier {
330 fd: Arc<OwnedFd>,
331}
332
333impl Notifier {
334 pub(crate) fn new() -> io::Result<Self> {
336 let fd = crate::syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
337 let fd = unsafe { OwnedFd::from_raw_fd(fd) };
338 Ok(Self { fd: Arc::new(fd) })
339 }
340
341 pub(crate) fn clear(&self) -> io::Result<()> {
342 loop {
343 let mut buffer = [0u64];
344 let res = crate::syscall!(libc::read(
345 self.fd.as_raw_fd(),
346 buffer.as_mut_ptr().cast(),
347 mem::size_of::<u64>()
348 ));
349 match res {
350 Ok(len) => {
351 debug_assert_eq!(len, mem::size_of::<u64>() as _);
352 break Ok(());
353 }
354 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
356 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
358 Err(e) => break Err(e),
359 }
360 }
361 }
362
363 pub(crate) fn handle(&self) -> NotifyHandle {
364 NotifyHandle::new(self.fd.clone())
365 }
366}
367
368impl AsRawFd for Notifier {
369 fn as_raw_fd(&self) -> RawFd {
370 self.fd.as_raw_fd()
371 }
372}
373
374#[derive(Clone, Debug)]
375pub(crate) struct NotifyHandle {
377 fd: Arc<OwnedFd>,
378}
379
380impl NotifyHandle {
381 pub(crate) fn new(fd: Arc<OwnedFd>) -> Self {
382 Self { fd }
383 }
384
385 pub(crate) fn notify(&self) -> io::Result<()> {
387 let data = 1u64;
388 crate::syscall!(libc::write(
389 self.fd.as_raw_fd(),
390 &data as *const _ as *const _,
391 std::mem::size_of::<u64>(),
392 ))?;
393 Ok(())
394 }
395}