ntex_neon/driver/
iour.rs

1use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
2use std::{cell::Cell, cell::RefCell, cmp, collections::VecDeque, io, mem, rc::Rc, sync::Arc};
3
4use io_uring::cqueue::{self, more, Entry as CEntry};
5use io_uring::opcode::{AsyncCancel, PollAdd};
6use io_uring::squeue::{Entry as SEntry, SubmissionQueue};
7use io_uring::{types::Fd, IoUring, Probe};
8
9use crate::pool::Dispatchable;
10
11pub use io_uring;
12
13pub trait Handler {
14    /// Operation is completed
15    fn completed(&mut self, id: usize, flags: u32, result: io::Result<usize>);
16
17    /// Operation is canceled
18    fn canceled(&mut self, id: usize);
19}
20
21#[inline(always)]
22pub(crate) fn spawn_blocking(rt: &crate::Runtime, _: &Driver, f: Box<dyn Dispatchable + Send>) {
23    let _ = rt.pool.dispatch(f);
24}
25
26pub struct DriverApi {
27    batch: u64,
28    inner: Rc<DriverInner>,
29}
30
31impl DriverApi {
32    #[inline]
33    /// Check if kernel ver 6.1 or greater
34    pub fn is_new(&self) -> bool {
35        self.inner.flags.get().contains(Flags::NEW)
36    }
37
38    fn submit_inner<F>(&self, f: F)
39    where
40        F: FnOnce(&mut SEntry),
41    {
42        let mut changes = self.inner.changes.borrow_mut();
43        let sq = self.inner.ring.submission();
44        if !changes.is_empty() || sq.is_full() {
45            let mut entry = Default::default();
46            f(&mut entry);
47            changes.push_back(entry);
48        } else {
49            unsafe {
50                sq.push_inline(f).expect("Queue size is checked");
51            }
52        }
53    }
54
55    #[inline]
56    /// Submit request to the driver.
57    pub fn submit(&self, id: u32, entry: SEntry) {
58        self.submit_inner(|en| {
59            *en = entry;
60            en.set_user_data(id as u64 | self.batch);
61        });
62    }
63
64    #[inline]
65    /// Submit request to the driver.
66    pub fn submit_inline<F>(&self, id: u32, f: F)
67    where
68        F: FnOnce(&mut SEntry),
69    {
70        self.submit_inner(|en| {
71            f(en);
72            en.set_user_data(id as u64 | self.batch);
73        });
74    }
75
76    #[inline]
77    /// Attempt to cancel an already issued request.
78    pub fn cancel(&self, id: u32) {
79        self.submit_inner(|en| {
80            *en = AsyncCancel::new(id as u64 | self.batch)
81                .build()
82                .user_data(Driver::CANCEL);
83        });
84    }
85
86    /// Get whether a specific io-uring opcode is supported.
87    pub fn is_supported(&self, opcode: u8) -> bool {
88        self.inner.probe.is_supported(opcode)
89    }
90}
91
92/// Low-level driver of io-uring.
93pub struct Driver {
94    fd: RawFd,
95    hid: Cell<u64>,
96    notifier: Notifier,
97    handlers: Cell<Option<Box<Vec<Box<dyn Handler>>>>>,
98    inner: Rc<DriverInner>,
99}
100
101bitflags::bitflags! {
102    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
103    struct Flags: u8 {
104        const NEW      = 0b0000_0001;
105        const NOTIFIER = 0b0000_0010;
106    }
107}
108
109struct DriverInner {
110    probe: Probe,
111    flags: Cell<Flags>,
112    ring: IoUring<SEntry, CEntry>,
113    changes: RefCell<VecDeque<SEntry>>,
114}
115
116impl Driver {
117    const NOTIFY: u64 = u64::MAX;
118    const CANCEL: u64 = u64::MAX - 1;
119    const BATCH: u64 = 48;
120    const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
121    const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
122
123    /// Create io-uring driver
124    pub(crate) fn new(capacity: u32) -> io::Result<Self> {
125        // Create ring
126        let (new, ring) = if let Ok(ring) = IoUring::builder()
127            .setup_coop_taskrun()
128            .setup_single_issuer()
129            .setup_defer_taskrun()
130            .build(capacity)
131        {
132            log::info!("New io-uring driver with single-issuer, coop-taskrun, defer-taskrun");
133            (true, ring)
134        } else if let Ok(ring) = IoUring::builder().setup_single_issuer().build(capacity) {
135            log::info!("New io-uring driver with single-issuer");
136            (true, ring)
137        } else {
138            let ring = IoUring::builder().build(capacity)?;
139            log::info!("New io-uring driver");
140            (false, ring)
141        };
142
143        let mut probe = Probe::new();
144        ring.submitter().register_probe(&mut probe)?;
145
146        // Remote notifier
147        let notifier = Notifier::new()?;
148        unsafe {
149            let sq = ring.submission();
150            sq.push(
151                &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
152                    .multi(true)
153                    .build()
154                    .user_data(Self::NOTIFY),
155            )
156            .expect("the squeue sould not be full");
157            sq.sync();
158        }
159
160        let fd = ring.as_raw_fd();
161        let inner = Rc::new(DriverInner {
162            ring,
163            probe,
164            flags: Cell::new(if new { Flags::NEW } else { Flags::empty() }),
165            changes: RefCell::new(VecDeque::with_capacity(32)),
166        });
167
168        Ok(Self {
169            fd,
170            inner,
171            notifier,
172            hid: Cell::new(0),
173            handlers: Cell::new(Some(Box::new(Vec::new()))),
174        })
175    }
176
177    /// Driver type
178    pub const fn tp(&self) -> crate::driver::DriverType {
179        crate::driver::DriverType::IoUring
180    }
181
182    /// Register updates handler
183    pub fn register<F>(&self, f: F)
184    where
185        F: FnOnce(DriverApi) -> Box<dyn Handler>,
186    {
187        let id = self.hid.get();
188        let mut handlers = self.handlers.take().unwrap_or_default();
189        handlers.push(f(DriverApi {
190            batch: id << Self::BATCH,
191            inner: self.inner.clone(),
192        }));
193        self.handlers.set(Some(handlers));
194        self.hid.set(id + 1);
195    }
196
197    /// Poll the driver and handle completed operations.
198    pub(crate) fn poll<T, F>(&self, mut run: F) -> io::Result<T>
199    where
200        F: FnMut() -> super::PollResult<T>,
201    {
202        let ring = &self.inner.ring;
203        let sq = ring.submission();
204        let mut cq = unsafe { ring.completion_shared() };
205        let submitter = ring.submitter();
206        loop {
207            self.poll_completions(&mut cq, &sq);
208
209            let more_tasks = match run() {
210                super::PollResult::Pending => false,
211                super::PollResult::HasTasks => true,
212                super::PollResult::Ready(val) => return Ok(val),
213            };
214            let more_changes = self.apply_changes(sq);
215
216            // squeue has to sync after we apply all changes
217            // otherwise ring wont see any change in submit call
218            sq.sync();
219
220            let result = if more_changes || more_tasks {
221                submitter.submit()
222            } else {
223                submitter.submit_and_wait(1)
224            };
225
226            if let Err(e) = result {
227                match e.raw_os_error() {
228                    Some(libc::ETIME) | Some(libc::EBUSY) | Some(libc::EAGAIN)
229                    | Some(libc::EINTR) => {
230                        log::info!("Ring submit interrupted, {:?}", e);
231                    }
232                    _ => return Err(e),
233                }
234            }
235        }
236    }
237
238    fn apply_changes(&self, sq: SubmissionQueue<'_, SEntry>) -> bool {
239        let mut changes = self.inner.changes.borrow_mut();
240        if changes.is_empty() {
241            false
242        } else {
243            let num = cmp::min(changes.len(), sq.capacity() - sq.len());
244            let (s1, s2) = changes.as_slices();
245            let s1_num = cmp::min(s1.len(), num);
246            if s1_num > 0 {
247                unsafe { sq.push_multiple(&s1[0..s1_num]) }.unwrap();
248            } else if !s2.is_empty() {
249                let s2_num = cmp::min(s2.len(), num - s1_num);
250                if s2_num > 0 {
251                    unsafe { sq.push_multiple(&s2[0..s2_num]) }.unwrap();
252                }
253            }
254            changes.drain(0..num);
255
256            !changes.is_empty()
257        }
258    }
259
260    /// Handle ring completions, forward changes to specific handler
261    fn poll_completions(
262        &self,
263        cq: &mut cqueue::CompletionQueue<'_, CEntry>,
264        sq: &SubmissionQueue<'_, SEntry>,
265    ) {
266        cq.sync();
267
268        if !cqueue::CompletionQueue::<'_, _>::is_empty(cq) {
269            let mut handlers = self.handlers.take().unwrap();
270            for entry in cq {
271                let user_data = entry.user_data();
272                match user_data {
273                    Self::CANCEL => {}
274                    Self::NOTIFY => {
275                        let flags = entry.flags();
276                        self.notifier.clear().expect("cannot clear notifier");
277
278                        // re-submit notifier fd
279                        if !more(flags) {
280                            unsafe {
281                                sq.push(
282                                    &PollAdd::new(
283                                        Fd(self.notifier.as_raw_fd()),
284                                        libc::POLLIN as _,
285                                    )
286                                    .multi(true)
287                                    .build()
288                                    .user_data(Self::NOTIFY),
289                                )
290                            }
291                            .expect("the squeue sould not be full");
292                        }
293                    }
294                    _ => {
295                        let batch = ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize;
296                        let user_data = (user_data & Self::DATA_MASK) as usize;
297
298                        let result = entry.result();
299                        if result == -libc::ECANCELED {
300                            handlers[batch].canceled(user_data);
301                        } else {
302                            let result = if result < 0 {
303                                Err(io::Error::from_raw_os_error(-result))
304                            } else {
305                                Ok(result as _)
306                            };
307                            handlers[batch].completed(user_data, entry.flags(), result);
308                        }
309                    }
310                }
311            }
312            self.handlers.set(Some(handlers));
313        }
314    }
315
316    /// Get notification handle for this driver
317    pub(crate) fn handle(&self) -> NotifyHandle {
318        self.notifier.handle()
319    }
320}
321
322impl AsRawFd for Driver {
323    fn as_raw_fd(&self) -> RawFd {
324        self.fd
325    }
326}
327
328#[derive(Debug)]
329pub(crate) struct Notifier {
330    fd: Arc<OwnedFd>,
331}
332
333impl Notifier {
334    /// Create a new notifier.
335    pub(crate) fn new() -> io::Result<Self> {
336        let fd = crate::syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
337        let fd = unsafe { OwnedFd::from_raw_fd(fd) };
338        Ok(Self { fd: Arc::new(fd) })
339    }
340
341    pub(crate) fn clear(&self) -> io::Result<()> {
342        loop {
343            let mut buffer = [0u64];
344            let res = crate::syscall!(libc::read(
345                self.fd.as_raw_fd(),
346                buffer.as_mut_ptr().cast(),
347                mem::size_of::<u64>()
348            ));
349            match res {
350                Ok(len) => {
351                    debug_assert_eq!(len, mem::size_of::<u64>() as _);
352                    break Ok(());
353                }
354                // Clear the next time
355                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
356                // Just like read_exact
357                Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
358                Err(e) => break Err(e),
359            }
360        }
361    }
362
363    pub(crate) fn handle(&self) -> NotifyHandle {
364        NotifyHandle::new(self.fd.clone())
365    }
366}
367
368impl AsRawFd for Notifier {
369    fn as_raw_fd(&self) -> RawFd {
370        self.fd.as_raw_fd()
371    }
372}
373
374#[derive(Clone, Debug)]
375/// A notify handle to the driver.
376pub(crate) struct NotifyHandle {
377    fd: Arc<OwnedFd>,
378}
379
380impl NotifyHandle {
381    pub(crate) fn new(fd: Arc<OwnedFd>) -> Self {
382        Self { fd }
383    }
384
385    /// Notify the driver.
386    pub(crate) fn notify(&self) -> io::Result<()> {
387        let data = 1u64;
388        crate::syscall!(libc::write(
389            self.fd.as_raw_fd(),
390            &data as *const _ as *const _,
391            std::mem::size_of::<u64>(),
392        ))?;
393        Ok(())
394    }
395}