1use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
2use std::{
3 cell::Cell, cell::RefCell, cmp, collections::VecDeque, fmt, io, mem, net, rc::Rc,
4 sync::Arc,
5};
6
7#[cfg(unix)]
8use std::os::unix::net::UnixStream as OsUnixStream;
9
10use io_uring::cqueue::{self, Entry as CEntry, more};
11use io_uring::opcode::{AsyncCancel, PollAdd};
12use io_uring::squeue::{Entry as SEntry, SubmissionQueue};
13use io_uring::{IoUring, Probe, types::Fd};
14use ntex_io::Io;
15use ntex_rt::{DriverType, Notify, PollResult, Runtime, syscall};
16use ntex_service::cfg::SharedCfg;
17use socket2::{Protocol, SockAddr, Socket, Type};
18
19use super::{TcpStream, UnixStream, stream::StreamOps};
20use crate::channel::Receiver;
21
22pub trait Handler {
23 fn completed(&mut self, id: usize, flags: u32, result: io::Result<usize>);
25
26 fn canceled(&mut self, id: usize);
28
29 fn tick(&mut self);
31
32 fn cleanup(&mut self);
34}
35
36pub struct DriverApi {
37 batch: u64,
38 inner: Rc<DriverInner>,
39}
40
41impl DriverApi {
42 #[inline]
43 pub fn is_new(&self) -> bool {
45 self.inner.flags.get().contains(Flags::NEW)
46 }
47
48 fn submit_inner<F>(&self, f: F)
49 where
50 F: FnOnce(&mut SEntry),
51 {
52 let mut changes = self.inner.changes.borrow_mut();
53 let sq = self.inner.ring.submission();
54 if !changes.is_empty() || sq.is_full() {
55 let mut entry = Default::default();
56 f(&mut entry);
57 changes.push_back(entry);
58 } else {
59 unsafe {
60 sq.push_inline(f).expect("Queue size is checked");
61 }
62 }
63 }
64
65 #[inline]
66 pub fn submit(&self, id: u32, entry: SEntry) {
68 self.submit_inner(|en| {
69 *en = entry;
70 en.set_user_data(id as u64 | self.batch);
71 });
72 }
73
74 #[inline]
75 pub fn submit_inline<F>(&self, id: u32, f: F)
77 where
78 F: FnOnce(&mut SEntry),
79 {
80 self.submit_inner(|en| {
81 f(en);
82 en.set_user_data(id as u64 | self.batch);
83 });
84 }
85
86 #[inline]
87 pub fn cancel(&self, id: u32) {
89 self.submit_inner(|en| {
90 *en = AsyncCancel::new(id as u64 | self.batch)
91 .build()
92 .user_data(Driver::CANCEL);
93 });
94 }
95
96 pub fn is_supported(&self, opcode: u8) -> bool {
98 self.inner.probe.is_supported(opcode)
99 }
100}
101
102pub struct Driver {
104 fd: RawFd,
105 hid: Cell<u64>,
106 notifier: Notifier,
107 #[allow(clippy::box_collection)]
108 handlers: Cell<Option<Box<Vec<HandlerItem>>>>,
109 inner: Rc<DriverInner>,
110}
111
112struct HandlerItem {
113 hnd: Box<dyn Handler>,
114 modified: bool,
115}
116
117impl HandlerItem {
118 fn tick(&mut self) {
119 if self.modified {
120 self.modified = false;
121 self.hnd.tick();
122 }
123 }
124}
125
126bitflags::bitflags! {
127 #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
128 struct Flags: u8 {
129 const NEW = 0b0000_0001;
130 const NOTIFIER = 0b0000_0010;
131 }
132}
133
134struct DriverInner {
135 probe: Probe,
136 flags: Cell<Flags>,
137 ring: IoUring<SEntry, CEntry>,
138 changes: RefCell<VecDeque<SEntry>>,
139}
140
141impl Driver {
142 const NOTIFY: u64 = u64::MAX;
143 const CANCEL: u64 = u64::MAX - 1;
144 const BATCH: u64 = 48;
145 const BATCH_MASK: u64 = 0xFFFF_0000_0000_0000;
146 const DATA_MASK: u64 = 0x0000_FFFF_FFFF_FFFF;
147
148 pub fn new(capacity: u32) -> io::Result<Self> {
150 let (new, ring) = if let Ok(ring) = IoUring::builder()
152 .setup_coop_taskrun()
153 .setup_single_issuer()
154 .setup_defer_taskrun()
155 .build(capacity)
156 {
157 log::info!(
158 "New io-uring driver with single-issuer, coop-taskrun, defer-taskrun"
159 );
160 (true, ring)
161 } else if let Ok(ring) = IoUring::builder().setup_single_issuer().build(capacity) {
162 log::info!("New io-uring driver with single-issuer");
163 (true, ring)
164 } else {
165 let ring = IoUring::builder().build(capacity)?;
166 log::info!("New io-uring driver");
167 (false, ring)
168 };
169
170 let mut probe = Probe::new();
171 ring.submitter().register_probe(&mut probe)?;
172
173 let notifier = Notifier::new()?;
175 unsafe {
176 let sq = ring.submission();
177 sq.push(
178 &PollAdd::new(Fd(notifier.as_raw_fd()), libc::POLLIN as _)
179 .multi(true)
180 .build()
181 .user_data(Self::NOTIFY),
182 )
183 .expect("the squeue sould not be full");
184 sq.sync();
185 }
186
187 let fd = ring.as_raw_fd();
188 let inner = Rc::new(DriverInner {
189 ring,
190 probe,
191 flags: Cell::new(if new { Flags::NEW } else { Flags::empty() }),
192 changes: RefCell::new(VecDeque::with_capacity(32)),
193 });
194
195 Ok(Self {
196 fd,
197 inner,
198 notifier,
199 hid: Cell::new(0),
200 handlers: Cell::new(Some(Box::new(Vec::new()))),
201 })
202 }
203
204 pub const fn tp(&self) -> DriverType {
206 DriverType::IoUring
207 }
208
209 pub fn register<F>(&self, f: F)
211 where
212 F: FnOnce(DriverApi) -> Box<dyn Handler>,
213 {
214 let id = self.hid.get();
215 let mut handlers = self.handlers.take().unwrap_or_default();
216 handlers.push(HandlerItem {
217 hnd: f(DriverApi {
218 batch: id << Self::BATCH,
219 inner: self.inner.clone(),
220 }),
221 modified: false,
222 });
223 self.handlers.set(Some(handlers));
224 self.hid.set(id + 1);
225 }
226
227 fn apply_changes(&self, sq: SubmissionQueue<'_, SEntry>) -> bool {
228 let mut changes = self.inner.changes.borrow_mut();
229 if changes.is_empty() {
230 false
231 } else {
232 let num = cmp::min(changes.len(), sq.capacity() - sq.len());
233 let (s1, s2) = changes.as_slices();
234 let s1_num = cmp::min(s1.len(), num);
235 if s1_num > 0 {
236 unsafe { sq.push_multiple(&s1[0..s1_num]) }.unwrap();
237 } else if !s2.is_empty() {
238 let s2_num = cmp::min(s2.len(), num - s1_num);
239 if s2_num > 0 {
240 unsafe { sq.push_multiple(&s2[0..s2_num]) }.unwrap();
241 }
242 }
243 changes.drain(0..num);
244
245 !changes.is_empty()
246 }
247 }
248
249 fn poll_completions(
251 &self,
252 cq: &mut cqueue::CompletionQueue<'_, CEntry>,
253 sq: &SubmissionQueue<'_, SEntry>,
254 ) {
255 cq.sync();
256
257 if !cqueue::CompletionQueue::<'_, _>::is_empty(cq) {
258 let mut handlers = self.handlers.take().unwrap();
259 for entry in cq {
260 let user_data = entry.user_data();
261 match user_data {
262 Self::CANCEL => {}
263 Self::NOTIFY => {
264 let flags = entry.flags();
265 self.notifier.clear().expect("cannot clear notifier");
266
267 if !more(flags) {
269 unsafe {
270 sq.push(
271 &PollAdd::new(
272 Fd(self.notifier.as_raw_fd()),
273 libc::POLLIN as _,
274 )
275 .multi(true)
276 .build()
277 .user_data(Self::NOTIFY),
278 )
279 }
280 .expect("the squeue sould not be full");
281 }
282 }
283 _ => {
284 let batch =
285 ((user_data & Self::BATCH_MASK) >> Self::BATCH) as usize;
286 let user_data = (user_data & Self::DATA_MASK) as usize;
287
288 let result = entry.result();
289 if result == -libc::ECANCELED {
290 handlers[batch].modified = true;
291 handlers[batch].hnd.canceled(user_data);
292 } else {
293 let result = if result < 0 {
294 Err(io::Error::from_raw_os_error(-result))
295 } else {
296 Ok(result as _)
297 };
298 handlers[batch].modified = true;
299 handlers[batch]
300 .hnd
301 .completed(user_data, entry.flags(), result);
302 }
303 }
304 }
305 }
306 for h in handlers.iter_mut() {
307 h.tick();
308 }
309 self.handlers.set(Some(handlers));
310 }
311 }
312}
313
314impl AsRawFd for Driver {
315 fn as_raw_fd(&self) -> RawFd {
316 self.fd
317 }
318}
319
320impl crate::Reactor for Driver {
321 fn tcp_connect(&self, addr: net::SocketAddr, cfg: SharedCfg) -> Receiver<Io> {
322 let addr = SockAddr::from(addr);
323 let result = Socket::new(addr.domain(), Type::STREAM, Some(Protocol::TCP))
324 .and_then(crate::helpers::prep_socket)
325 .map(move |sock| (addr, sock));
326
327 match result {
328 Err(err) => Receiver::new(Err(err)),
329 Ok((addr, sock)) => {
330 super::connect::ConnectOps::get(self).connect(sock, addr, cfg)
331 }
332 }
333 }
334
335 fn unix_connect(&self, addr: std::path::PathBuf, cfg: SharedCfg) -> Receiver<Io> {
336 let result = SockAddr::unix(addr).and_then(|addr| {
337 Socket::new(addr.domain(), Type::STREAM, None)
338 .and_then(crate::helpers::prep_socket)
339 .map(move |sock| (addr, sock))
340 });
341
342 match result {
343 Err(err) => Receiver::new(Err(err)),
344 Ok((addr, sock)) => {
345 super::connect::ConnectOps::get(self).connect(sock, addr, cfg)
346 }
347 }
348 }
349
350 fn from_tcp_stream(&self, stream: net::TcpStream, cfg: SharedCfg) -> io::Result<Io> {
351 stream.set_nodelay(true)?;
352
353 Ok(Io::new(
354 TcpStream(
355 crate::helpers::prep_socket(Socket::from(stream))?,
356 StreamOps::get(self),
357 ),
358 cfg,
359 ))
360 }
361
362 #[cfg(unix)]
363 fn from_unix_stream(&self, stream: OsUnixStream, cfg: SharedCfg) -> io::Result<Io> {
364 Ok(Io::new(
365 UnixStream(
366 crate::helpers::prep_socket(Socket::from(stream))?,
367 StreamOps::get(self),
368 ),
369 cfg,
370 ))
371 }
372}
373
374impl ntex_rt::Driver for Driver {
375 fn run(&self, rt: &Runtime) -> io::Result<()> {
377 let ring = &self.inner.ring;
378 let sq = ring.submission();
379 let mut cq = unsafe { ring.completion_shared() };
380 let submitter = ring.submitter();
381 loop {
382 self.poll_completions(&mut cq, &sq);
383
384 let more_tasks = match rt.poll() {
385 PollResult::Pending => false,
386 PollResult::PollAgain => true,
387 PollResult::Ready => return Ok(()),
388 };
389 let more_changes = self.apply_changes(sq);
390
391 sq.sync();
394
395 let result = if more_changes || more_tasks {
396 submitter.submit()
397 } else {
398 submitter.submit_and_wait(1)
399 };
400
401 if let Err(e) = result {
402 match e.raw_os_error() {
403 Some(libc::ETIME) | Some(libc::EBUSY) | Some(libc::EAGAIN)
404 | Some(libc::EINTR) => {
405 log::info!("Ring submit interrupted, {:?}", e);
406 }
407 _ => return Err(e),
408 }
409 }
410 }
411 }
412
413 fn handle(&self) -> Box<dyn Notify> {
415 Box::new(self.notifier.handle())
416 }
417
418 fn clear(&self) {
419 for mut h in self.handlers.take().unwrap().into_iter() {
420 h.hnd.cleanup()
421 }
422 }
423}
424
425#[derive(Debug)]
426pub(crate) struct Notifier {
427 fd: Arc<OwnedFd>,
428}
429
430impl Notifier {
431 pub(crate) fn new() -> io::Result<Self> {
433 let fd = syscall!(libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
434 let fd = unsafe { OwnedFd::from_raw_fd(fd) };
435 Ok(Self { fd: Arc::new(fd) })
436 }
437
438 pub(crate) fn clear(&self) -> io::Result<()> {
439 loop {
440 let mut buffer = [0u64];
441 let res = syscall!(libc::read(
442 self.fd.as_raw_fd(),
443 buffer.as_mut_ptr().cast(),
444 mem::size_of::<u64>()
445 ));
446 match res {
447 Ok(len) => {
448 debug_assert_eq!(len, mem::size_of::<u64>() as isize);
449 break Ok(());
450 }
451 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break Ok(()),
453 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
455 Err(e) => break Err(e),
456 }
457 }
458 }
459
460 pub(crate) fn handle(&self) -> NotifyHandle {
461 NotifyHandle::new(self.fd.clone())
462 }
463}
464
465impl AsRawFd for Notifier {
466 fn as_raw_fd(&self) -> RawFd {
467 self.fd.as_raw_fd()
468 }
469}
470
471#[derive(Clone, Debug)]
472pub(crate) struct NotifyHandle {
474 fd: Arc<OwnedFd>,
475}
476
477impl NotifyHandle {
478 pub(crate) fn new(fd: Arc<OwnedFd>) -> Self {
479 Self { fd }
480 }
481}
482
483impl Notify for NotifyHandle {
484 fn notify(&self) -> io::Result<()> {
486 let data = 1u64;
487 syscall!(libc::write(
488 self.fd.as_raw_fd(),
489 &data as *const _ as *const _,
490 std::mem::size_of::<u64>(),
491 ))?;
492 Ok(())
493 }
494}
495
496impl fmt::Debug for Driver {
497 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498 f.debug_struct("Driver")
499 .field("fd", &self.fd)
500 .field("hid", &self.hid)
501 .field("nodifier", &self.notifier)
502 .finish()
503 }
504}
505
506impl fmt::Debug for DriverApi {
507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
508 f.debug_struct("DriverApi")
509 .field("batch", &self.batch)
510 .finish()
511 }
512}