ntex_net/uring/
driver.rs

1use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
2use std::{
3    cell::Cell, cell::RefCell, cmp, collections::VecDeque, fmt, io, mem, net, rc::Rc,
4    sync::Arc,
5};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixStream as OsUnixStream;
9
10use io_uring::cqueue::{self, Entry as CEntry, more};
11use io_uring::opcode::{AsyncCancel, PollAdd};
12use io_uring::squeue::{Entry as SEntry, SubmissionQueue};
13use io_uring::{IoUring, Probe, types::Fd};
14use ntex_io::Io;
15use ntex_rt::{DriverType, Notify, PollResult, Runtime, syscall};
16use ntex_service::cfg::SharedCfg;
17use socket2::{Protocol, SockAddr, Socket, Type};
18
19use super::{TcpStream, UnixStream, stream::StreamOps};
20use crate::channel::Receiver;
21
22pub trait Handler {
23    /// Operation is completed
24    fn completed(&mut self, id: usize, flags: u32, result: io::Result<usize>);
25
26    /// Operation is canceled
27    fn canceled(&mut self, id: usize);
28
29    /// Driver turn is completed
30    fn tick(&mut self);
31
32    /// Cleanup before drop
33    fn cleanup(&mut self);
34}
35
36pub struct DriverApi {
37    batch: u64,
38    inner: Rc<DriverInner>,
39}
40
41impl DriverApi {
42    #[inline]
43    /// Check if kernel ver 6.1 or greater
44    pub fn is_new(&self) -> bool {
45        self.inner.flags.get().contains(Flags::NEW)
46    }
47
48    fn submit_inner<F>(&self, f: F)
49    where
50        F: FnOnce(&mut SEntry),
51    {
52        let mut changes = self.inner.changes.borrow_mut();
53        let sq = self.inner.ring.submission();
54        if !changes.is_empty() || sq.is_full() {
55            let mut entry = Default::default();
56            f(&mut entry);
57            changes.push_back(entry);
58        } else {
59            unsafe {
60                sq.push_inline(f).expect("Queue size is checked");
61            }
62        }
63    }
64
65    #[inline]
66    /// Submit request to the driver.
67    pub fn submit(&self, id: u32, entry: SEntry) {
68        self.submit_inner(|en| {
69            *en = entry;
70            en.set_user_data(id as u64 | self.batch);
71        });
72    }
73
74    #[inline]
75    /// Submit request to the driver.
76    pub fn submit_inline<F>(&self, id: u32, f: F)
77    where
78        F: FnOnce(&mut SEntry),
79    {
80        self.submit_inner(|en| {
81            f(en);
82            en.set_user_data(id as u64 | self.batch);
83        });
84    }
85
86    #[inline]
87    /// Attempt to cancel an already issued request.
88    pub fn cancel(&self, id: u32) {
89        self.submit_inner(|en| {
90            *en = AsyncCancel::new(id as u64 | self.batch)
91                .build()
92                .user_data(Driver::CANCEL);
93        });
94    }
95
96    /// Get whether a specific io-uring opcode is supported.
97    pub fn is_supported(&self, opcode: u8) -> bool {
98        self.inner.probe.is_supported(opcode)
99    }
100}
101
102/// Low-level driver of io-uring.
103pub struct Driver {
104    fd: RawFd,
105    hid: Cell<u64>,
106    notifier: Notifier,
107    #[allow(clippy::box_collection)]
108    handlers: Cell<Option<Box<Vec<HandlerItem>>>>,
109    inner: Rc<DriverInner>,
110}
111
112struct HandlerItem {
113    hnd: Box<dyn Handler>,
114    modified: bool,
115}
116
117impl HandlerItem {
118    fn tick(&mut self) {
119        if self.modified {
120            self.modified = false;
121            self.hnd.tick();
122        }
123    }
124}
125
126bitflags::bitflags! {
127    #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
128    struct Flags: u8 {
129        const NEW      = 0b0000_0001;
130        const NOTIFIER = 0b0000_0010;
131    }
132}
133
134struct DriverInner {
135    probe: Probe,
136    flags: Cell<Flags>,
137    ring: IoUring<SEntry, CEntry>,
138    changes: RefCell<VecDeque<SEntry>>,
139}
140
141impl Driver {
142    const NOTIFY: u64 = u64::MAX;
143    const CANCEL: u64 = u64::MAX - 1;
144    const BATCH: u64 = 48;
145    const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
146    const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
147
148    /// Create io-uring driver
149    pub fn new(capacity: u32) -> io::Result<Self> {
150        // Create ring
151        let (new, ring) = if let Ok(ring) = IoUring::builder()
152            .setup_coop_taskrun()
153            .setup_single_issuer()
154            .setup_defer_taskrun()
155            .build(capacity)
156        {
157            log::info!(
158                "New io-uring driver with single-issuer, coop-taskrun, defer-taskrun"
159            );
160            (true, ring)
161        } else if let Ok(ring) = IoUring::builder().setup_single_issuer().build(capacity) {
162            log::info!("New io-uring driver with single-issuer");
163            (true, ring)
164        } else {
165            let ring = IoUring::builder().build(capacity)?;
166            log::info!("New io-uring driver");
167            (false, ring)
168        };
169
170        let mut probe = Probe::new();
171        ring.submitter().register_probe(&mut probe)?;
172
173        // Remote notifier
174        let notifier = Notifier::new()?;
175        unsafe {
176            let sq = ring.submission();
177            sq.push(
178                &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
179                    .multi(true)
180                    .build()
181                    .user_data(Self::NOTIFY),
182            )
183            .expect("the squeue sould not be full");
184            sq.sync();
185        }
186
187        let fd = ring.as_raw_fd();
188        let inner = Rc::new(DriverInner {
189            ring,
190            probe,
191            flags: Cell::new(if new { Flags::NEW } else { Flags::empty() }),
192            changes: RefCell::new(VecDeque::with_capacity(32)),
193        });
194
195        Ok(Self {
196            fd,
197            inner,
198            notifier,
199            hid: Cell::new(0),
200            handlers: Cell::new(Some(Box::new(Vec::new()))),
201        })
202    }
203
204    /// Driver type
205    pub const fn tp(&self) -> DriverType {
206        DriverType::IoUring
207    }
208
209    /// Register updates handler
210    pub fn register<F>(&self, f: F)
211    where
212        F: FnOnce(DriverApi) -> Box<dyn Handler>,
213    {
214        let id = self.hid.get();
215        let mut handlers = self.handlers.take().unwrap_or_default();
216        handlers.push(HandlerItem {
217            hnd: f(DriverApi {
218                batch: id << Self::BATCH,
219                inner: self.inner.clone(),
220            }),
221            modified: false,
222        });
223        self.handlers.set(Some(handlers));
224        self.hid.set(id + 1);
225    }
226
227    fn apply_changes(&self, sq: SubmissionQueue<'_, SEntry>) -> bool {
228        let mut changes = self.inner.changes.borrow_mut();
229        if changes.is_empty() {
230            false
231        } else {
232            let num = cmp::min(changes.len(), sq.capacity() - sq.len());
233            let (s1, s2) = changes.as_slices();
234            let s1_num = cmp::min(s1.len(), num);
235            if s1_num > 0 {
236                unsafe { sq.push_multiple(&s1[0..s1_num]) }.unwrap();
237            } else if !s2.is_empty() {
238                let s2_num = cmp::min(s2.len(), num - s1_num);
239                if s2_num > 0 {
240                    unsafe { sq.push_multiple(&s2[0..s2_num]) }.unwrap();
241                }
242            }
243            changes.drain(0..num);
244
245            !changes.is_empty()
246        }
247    }
248
249    /// Handle ring completions, forward changes to specific handler
250    fn poll_completions(
251        &self,
252        cq: &mut cqueue::CompletionQueue<'_, CEntry>,
253        sq: &SubmissionQueue<'_, SEntry>,
254    ) {
255        cq.sync();
256
257        if !cqueue::CompletionQueue::<'_, _>::is_empty(cq) {
258            let mut handlers = self.handlers.take().unwrap();
259            for entry in cq {
260                let user_data = entry.user_data();
261                match user_data {
262                    Self::CANCEL => {}
263                    Self::NOTIFY => {
264                        let flags = entry.flags();
265                        self.notifier.clear().expect("cannot clear notifier");
266
267                        // re-submit notifier fd
268                        if !more(flags) {
269                            unsafe {
270                                sq.push(
271                                    &PollAdd::new(
272                                        Fd(self.notifier.as_raw_fd()),
273                                        libc::POLLIN as _,
274                                    )
275                                    .multi(true)
276                                    .build()
277                                    .user_data(Self::NOTIFY),
278                                )
279                            }
280                            .expect("the squeue sould not be full");
281                        }
282                    }
283                    _ => {
284                        let batch =
285                            ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize;
286                        let user_data = (user_data & Self::DATA_MASK) as usize;
287
288                        let result = entry.result();
289                        if result == -libc::ECANCELED {
290                            handlers[batch].modified = true;
291                            handlers[batch].hnd.canceled(user_data);
292                        } else {
293                            let result = if result < 0 {
294                                Err(io::Error::from_raw_os_error(-result))
295                            } else {
296                                Ok(result as _)
297                            };
298                            handlers[batch].modified = true;
299                            handlers[batch]
300                                .hnd
301                                .completed(user_data, entry.flags(), result);
302                        }
303                    }
304                }
305            }
306            for h in handlers.iter_mut() {
307                h.tick();
308            }
309            self.handlers.set(Some(handlers));
310        }
311    }
312}
313
314impl AsRawFd for Driver {
315    fn as_raw_fd(&self) -> RawFd {
316        self.fd
317    }
318}
319
320impl crate::Reactor for Driver {
321    fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> Receiver<Io> {
322        let addr = SockAddr::from(addr);
323        let result = Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP))
324            .and_then(crate::helpers::prep_socket)
325            .map(move |sock| (addr, sock));
326
327        match result {
328            Err(err) => Receiver::new(Err(err)),
329            Ok((addr, sock)) => {
330                super::connect::ConnectOps::get(self).connect(sock, addr, cfg)
331            }
332        }
333    }
334
335    fn unix_connect(&self, addr: std::path::PathBuf, cfg: SharedCfg) -> Receiver<Io> {
336        let result = SockAddr::unix(addr).and_then(|addr| {
337            Socket::new(addr.domain(), Type::STREAM, None)
338                .and_then(crate::helpers::prep_socket)
339                .map(move |sock| (addr, sock))
340        });
341
342        match result {
343            Err(err) => Receiver::new(Err(err)),
344            Ok((addr, sock)) => {
345                super::connect::ConnectOps::get(self).connect(sock, addr, cfg)
346            }
347        }
348    }
349
350    fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
351        stream.set_nodelay(true)?;
352
353        Ok(Io::new(
354            TcpStream(
355                crate::helpers::prep_socket(Socket::from(stream))?,
356                StreamOps::get(self),
357            ),
358            cfg,
359        ))
360    }
361
362    #[cfg(unix)]
363    fn from_unix_stream(&self, stream: OsUnixStream, cfg: SharedCfg) -> io::Result<Io> {
364        Ok(Io::new(
365            UnixStream(
366                crate::helpers::prep_socket(Socket::from(stream))?,
367                StreamOps::get(self),
368            ),
369            cfg,
370        ))
371    }
372}
373
374impl ntex_rt::Driver for Driver {
375    /// Poll the driver and handle completed operations.
376    fn run(&self, rt: &Runtime) -> io::Result<()> {
377        let ring = &self.inner.ring;
378        let sq = ring.submission();
379        let mut cq = unsafe { ring.completion_shared() };
380        let submitter = ring.submitter();
381        loop {
382            self.poll_completions(&mut cq, &sq);
383
384            let more_tasks = match rt.poll() {
385                PollResult::Pending => false,
386                PollResult::PollAgain => true,
387                PollResult::Ready => return Ok(()),
388            };
389            let more_changes = self.apply_changes(sq);
390
391            // squeue has to sync after we apply all changes
392            // otherwise ring wont see any change in submit call
393            sq.sync();
394
395            let result = if more_changes || more_tasks {
396                submitter.submit()
397            } else {
398                submitter.submit_and_wait(1)
399            };
400
401            if let Err(e) = result {
402                match e.raw_os_error() {
403                    Some(libc::ETIME) | Some(libc::EBUSY) | Some(libc::EAGAIN)
404                    | Some(libc::EINTR) => {
405                        log::info!("Ring submit interrupted, {:?}", e);
406                    }
407                    _ => return Err(e),
408                }
409            }
410        }
411    }
412
413    /// Get notification handle
414    fn handle(&self) -> Box<dyn Notify> {
415        Box::new(self.notifier.handle())
416    }
417
418    fn clear(&self) {
419        for mut h in self.handlers.take().unwrap().into_iter() {
420            h.hnd.cleanup()
421        }
422    }
423}
424
425#[derive(Debug)]
426pub(crate) struct Notifier {
427    fd: Arc<OwnedFd>,
428}
429
430impl Notifier {
431    /// Create a new notifier.
432    pub(crate) fn new() -> io::Result<Self> {
433        let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
434        let fd = unsafe { OwnedFd::from_raw_fd(fd) };
435        Ok(Self { fd: Arc::new(fd) })
436    }
437
438    pub(crate) fn clear(&self) -> io::Result<()> {
439        loop {
440            let mut buffer = [0u64];
441            let res = syscall!(libc::read(
442                self.fd.as_raw_fd(),
443                buffer.as_mut_ptr().cast(),
444                mem::size_of::<u64>()
445            ));
446            match res {
447                Ok(len) => {
448                    debug_assert_eq!(len, mem::size_of::<u64>() as isize);
449                    break Ok(());
450                }
451                // Clear the next time
452                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
453                // Just like read_exact
454                Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
455                Err(e) => break Err(e),
456            }
457        }
458    }
459
460    pub(crate) fn handle(&self) -> NotifyHandle {
461        NotifyHandle::new(self.fd.clone())
462    }
463}
464
465impl AsRawFd for Notifier {
466    fn as_raw_fd(&self) -> RawFd {
467        self.fd.as_raw_fd()
468    }
469}
470
471#[derive(Clone, Debug)]
472/// A notify handle to the driver.
473pub(crate) struct NotifyHandle {
474    fd: Arc<OwnedFd>,
475}
476
477impl NotifyHandle {
478    pub(crate) fn new(fd: Arc<OwnedFd>) -> Self {
479        Self { fd }
480    }
481}
482
483impl Notify for NotifyHandle {
484    /// Notify the driver.
485    fn notify(&self) -> io::Result<()> {
486        let data = 1u64;
487        syscall!(libc::write(
488            self.fd.as_raw_fd(),
489            &data as *const _ as *const _,
490            std::mem::size_of::<u64>(),
491        ))?;
492        Ok(())
493    }
494}
495
496impl fmt::Debug for Driver {
497    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498        f.debug_struct("Driver")
499            .field("fd", &self.fd)
500            .field("hid", &self.hid)
501            .field("nodifier", &self.notifier)
502            .finish()
503    }
504}
505
506impl fmt::Debug for DriverApi {
507    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508        f.debug_struct("DriverApi")
509            .field("batch", &self.batch)
510            .finish()
511    }
512}