Skip to main content

compio_driver/sys/poll/
mod.rs

1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7    collections::{HashMap, VecDeque},
8    io,
9    num::NonZeroUsize,
10    pin::Pin,
11    sync::Arc,
12    task::{Poll, Wake, Waker},
13    time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use flume::{Receiver, Sender};
18use polling::{Event, Events, Poller};
19use smallvec::SmallVec;
20
21use crate::{
22    AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
23    key::{BorrowedKey, Key, RefExt},
24    op::Interest,
25    syscall,
26};
27
28mod extra;
29pub use extra::Extra;
30pub(crate) mod op;
31
32struct Track {
33    arg: WaitArg,
34    ready: bool,
35}
36
37impl From<WaitArg> for Track {
38    fn from(arg: WaitArg) -> Self {
39        Self { arg, ready: false }
40    }
41}
42
43/// Abstraction of operations.
44///
45/// # Safety
46///
47/// If `pre_submit` returns `Decision::Wait`, `op_type` must also return
48/// `Some(OpType::Fd)` with same fds as the `WaitArg`s. Similarly, if
49/// `pre_submit` returns `Decision::Aio`, `op_type` must return
50/// `Some(OpType::Aio)` with the correct `aiocb` pointer.
51pub unsafe trait OpCode {
52    /// Perform the operation before submit, and return [`Decision`] to
53    /// indicate whether submitting the operation to polling is required.
54    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
55
56    /// Get the operation type when an event is occurred.
57    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
58        None
59    }
60
61    /// Perform the operation after received corresponding
62    /// event. If this operation is blocking, the return value should be
63    /// [`Poll::Ready`].
64    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
65}
66
67pub use OpCode as PollOpCode;
68
69/// One item in local or more items on heap.
70type Multi<T> = SmallVec<[T; 1]>;
71
72/// Result of [`OpCode::pre_submit`].
73#[non_exhaustive]
74pub enum Decision {
75    /// Instant operation, no need to submit
76    Completed(usize),
77    /// Async operation, needs to submit
78    Wait(Multi<WaitArg>),
79    /// Blocking operation, needs to be spawned in another thread
80    Blocking,
81    /// AIO operation, needs to be spawned to the kernel.
82    #[cfg(aio)]
83    Aio(AioControl),
84}
85
86impl Decision {
87    /// Decide to wait for the given fd with the given interest.
88    pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
89        Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
90    }
91
92    /// Decide to wait for many fds.
93    pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
94        Self::Wait(Multi::from_iter(args))
95    }
96
97    /// Decide to wait for the given fd to be readable.
98    pub fn wait_readable(fd: RawFd) -> Self {
99        Self::wait_for(fd, Interest::Readable)
100    }
101
102    /// Decide to wait for the given fd to be writable.
103    pub fn wait_writable(fd: RawFd) -> Self {
104        Self::wait_for(fd, Interest::Writable)
105    }
106
107    /// Decide to spawn an AIO operation. `submit` is a method like `aio_read`.
108    #[cfg(aio)]
109    pub fn aio(
110        cb: &mut libc::aiocb,
111        submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
112    ) -> Self {
113        Self::Aio(AioControl {
114            aiocbp: NonNull::from(cb),
115            submit,
116        })
117    }
118}
119
120/// Meta of polling operations.
121#[derive(Debug, Clone, Copy)]
122pub struct WaitArg {
123    /// The raw fd of the operation.
124    pub fd: RawFd,
125    /// The interest to be registered.
126    pub interest: Interest,
127}
128
129impl WaitArg {
130    /// Create a new readable `WaitArg`.
131    pub fn readable(fd: RawFd) -> Self {
132        Self {
133            fd,
134            interest: Interest::Readable,
135        }
136    }
137
138    /// Create a new writable `WaitArg`.
139    pub fn writable(fd: RawFd) -> Self {
140        Self {
141            fd,
142            interest: Interest::Writable,
143        }
144    }
145}
146
147/// Meta of AIO operations.
148#[cfg(aio)]
149#[derive(Debug, Clone, Copy)]
150pub struct AioControl {
151    /// Pointer of the control block.
152    pub aiocbp: NonNull<libc::aiocb>,
153    /// The aio_* submit function.
154    pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
155}
156
157#[derive(Debug, Default)]
158struct FdQueue {
159    read_queue: VecDeque<ErasedKey>,
160    write_queue: VecDeque<ErasedKey>,
161}
162
163/// A token to remove an interest from `FdQueue`.
164///
165/// It is returned when an interest is pushed, and can be used to remove the
166/// interest later. However do be careful that the index may be invalid or does
167/// not correspond to the one inserted if other interests are added or removed
168/// before it (toctou).
169struct RemoveToken {
170    idx: usize,
171    is_read: bool,
172}
173
174impl RemoveToken {
175    fn read(idx: usize) -> Self {
176        Self { idx, is_read: true }
177    }
178
179    fn write(idx: usize) -> Self {
180        Self {
181            idx,
182            is_read: false,
183        }
184    }
185}
186
187impl FdQueue {
188    fn is_empty(&self) -> bool {
189        self.read_queue.is_empty() && self.write_queue.is_empty()
190    }
191
192    fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
193        if token.is_read {
194            self.read_queue.remove(token.idx)
195        } else {
196            self.write_queue.remove(token.idx)
197        }
198    }
199
200    pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
201        match interest {
202            Interest::Readable => {
203                self.read_queue.push_back(key);
204                RemoveToken::read(self.read_queue.len() - 1)
205            }
206            Interest::Writable => {
207                self.write_queue.push_back(key);
208                RemoveToken::write(self.write_queue.len() - 1)
209            }
210        }
211    }
212
213    pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
214        let is_read = match interest {
215            Interest::Readable => {
216                self.read_queue.push_front(key);
217                true
218            }
219            Interest::Writable => {
220                self.write_queue.push_front(key);
221                false
222            }
223        };
224        RemoveToken { idx: 0, is_read }
225    }
226
227    pub fn remove(&mut self, key: &ErasedKey) {
228        self.read_queue.retain(|k| k != key);
229        self.write_queue.retain(|k| k != key);
230    }
231
232    pub fn event(&self) -> Event {
233        let mut event = Event::none(0);
234        if let Some(key) = self.read_queue.front() {
235            event.readable = true;
236            event.key = key.as_raw();
237        }
238        if let Some(key) = self.write_queue.front() {
239            event.writable = true;
240            event.key = key.as_raw();
241        }
242        event
243    }
244
245    pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
246        if event.readable
247            && let Some(key) = self.read_queue.pop_front()
248        {
249            return Some((key, Interest::Readable));
250        }
251        if event.writable
252            && let Some(key) = self.write_queue.pop_front()
253        {
254            return Some((key, Interest::Writable));
255        }
256        None
257    }
258}
259
260/// Represents the filter type of kqueue. `polling` crate doesn't expose such
261/// API, and we need to know about it when `cancel` is called.
262#[non_exhaustive]
263pub enum OpType {
264    /// The operation polls an fd.
265    Fd(Multi<RawFd>),
266    /// The operation submits an AIO.
267    #[cfg(aio)]
268    Aio(NonNull<libc::aiocb>),
269}
270
271impl OpType {
272    /// Create an [`OpType::Fd`] with one [`RawFd`].
273    pub fn fd(fd: RawFd) -> Self {
274        Self::Fd(SmallVec::from_buf([fd]))
275    }
276
277    /// Create an [`OpType::Fd`] with multiple [`RawFd`]s.
278    pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
279        Self::Fd(Multi::from_iter(fds))
280    }
281}
282
283/// Low-level driver of polling.
284pub(crate) struct Driver {
285    events: Events,
286    notify: Arc<Notify>,
287    registry: HashMap<RawFd, FdQueue>,
288    pool: AsyncifyPool,
289    completed_tx: Sender<Entry>,
290    completed_rx: Receiver<Entry>,
291}
292
293impl Driver {
294    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
295        instrument!(compio_log::Level::TRACE, "new", ?builder);
296        trace!("new poll driver");
297
298        let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
299            Events::with_capacity(cap)
300        } else {
301            Events::new()
302        };
303        let poll = Poller::new()?;
304        let notify = Arc::new(Notify::new(poll));
305        let (completed_tx, completed_rx) = flume::unbounded();
306
307        Ok(Self {
308            events,
309            notify,
310            registry: HashMap::new(),
311            pool: builder.create_or_get_thread_pool(),
312            completed_tx,
313            completed_rx,
314        })
315    }
316
317    pub fn driver_type(&self) -> DriverType {
318        DriverType::Poll
319    }
320
321    pub fn default_extra(&self) -> Extra {
322        Extra::new()
323    }
324
325    fn poller(&self) -> &Poller {
326        &self.notify.poll
327    }
328
329    fn with_events<F, R>(&mut self, f: F) -> R
330    where
331        F: FnOnce(&mut Self, &mut Events) -> R,
332    {
333        let mut events = std::mem::take(&mut self.events);
334        let res = f(self, &mut events);
335        self.events = events;
336        res
337    }
338
339    fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
340        self.registry.get_mut(&fd)
341    }
342
343    fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
344        self.try_get_queue(fd).expect("the fd should be submitted")
345    }
346
347    /// Submit a new operation to the end of the queue.
348    ///
349    ///  # Safety
350    /// The input fd should be valid.
351    unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
352        let Self {
353            registry, notify, ..
354        } = self;
355        let need_add = !registry.contains_key(&arg.fd);
356        let queue = registry.entry(arg.fd).or_default();
357        let token = queue.push_back_interest(key, arg.interest);
358        let event = queue.event();
359        let res = if need_add {
360            // SAFETY: the events are deleted correctly.
361            unsafe { notify.poll.add(arg.fd, event) }
362        } else {
363            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
364            notify.poll.modify(fd, event)
365        };
366        if res.is_err() {
367            // Rollback the push if submission failed.
368            queue.remove_token(token);
369            if queue.is_empty() {
370                registry.remove(&arg.fd);
371            }
372        }
373
374        res
375    }
376
377    /// Submit a new operation to the front of the queue.
378    ///
379    /// # Safety
380    /// The input fd should be valid.
381    unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
382        let need_add = !self.registry.contains_key(&arg.fd);
383        let queue = self.registry.entry(arg.fd).or_default();
384        queue.push_front_interest(key, arg.interest);
385        let event = queue.event();
386        if need_add {
387            // SAFETY: the events are deleted correctly.
388            unsafe { self.poller().add(arg.fd, event)? }
389        } else {
390            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
391            self.poller().modify(fd, event)?;
392        }
393        Ok(())
394    }
395
396    fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
397        if !renew_event.readable && !renew_event.writable {
398            self.poller().delete(fd)?;
399            self.registry.remove(&fd.as_raw_fd());
400        } else {
401            self.poller().modify(fd, renew_event)?;
402        }
403        Ok(())
404    }
405
406    /// Remove one interest from the queue.
407    fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
408        let Some(queue) = self.try_get_queue(fd) else {
409            return Ok(());
410        };
411        queue.remove(key);
412        let renew_event = queue.event();
413        if queue.is_empty() {
414            self.registry.remove(&fd);
415        }
416        self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
417    }
418
419    /// Remove one interest from the queue, and emit a cancelled entry.
420    fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
421        self.remove_one(&key, fd)
422            .map_or(None, |_| Some(Entry::new_cancelled(key)))
423    }
424
425    pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
426        Ok(())
427    }
428
429    pub fn cancel<T>(&mut self, key: Key<T>) {
430        let op_type = key.borrow().pinned_op().op_type();
431        match op_type {
432            None => {}
433            Some(OpType::Fd(fds)) => {
434                let mut pushed = false;
435                for fd in fds {
436                    let entry = self.cancel_one(key.clone().erase(), fd);
437                    if !pushed && let Some(entry) = entry {
438                        _ = self.completed_tx.send(entry);
439                        pushed = true;
440                    }
441                }
442            }
443            #[cfg(aio)]
444            Some(OpType::Aio(aiocbp)) => {
445                let aiocb = unsafe { aiocbp.as_ref() };
446                let fd = aiocb.aio_fildes;
447                syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
448            }
449        }
450    }
451
452    pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
453        instrument!(compio_log::Level::TRACE, "push", ?key);
454        match { key.borrow().pinned_op().pre_submit()? } {
455            Decision::Wait(args) => {
456                key.borrow()
457                    .extra_mut()
458                    .as_poll_mut()
459                    .set_args(args.clone());
460                for arg in args.iter().copied() {
461                    // SAFETY: fd is from the OpCode.
462                    let res = unsafe { self.submit(key.clone(), arg) };
463                    // if submission fails, remove all previously submitted fds.
464                    if let Err(e) = res {
465                        args.into_iter().for_each(|arg| {
466                            // we don't care about renew errors
467                            let _ = self.remove_one(&key, arg.fd);
468                        });
469                        return Poll::Ready(Err(e));
470                    }
471                    trace!("register {:?}", arg);
472                }
473                Poll::Pending
474            }
475            Decision::Completed(res) => Poll::Ready(Ok(res)),
476            Decision::Blocking => {
477                self.push_blocking(key);
478                Poll::Pending
479            }
480            #[cfg(aio)]
481            Decision::Aio(AioControl { mut aiocbp, submit }) => {
482                let aiocb = unsafe { aiocbp.as_mut() };
483                let user_data = key.as_raw();
484                #[cfg(freebsd)]
485                {
486                    // sigev_notify_kqueue
487                    aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
488                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
489                    aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
490                }
491                #[cfg(solarish)]
492                let mut notify = libc::port_notify {
493                    portnfy_port: self.as_raw_fd(),
494                    portnfy_user: user_data as _,
495                };
496                #[cfg(solarish)]
497                {
498                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
499                    aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
500                }
501                match syscall!(submit(aiocbp.as_ptr())) {
502                    Ok(_) => {
503                        // Key is successfully submitted, leak it on this side.
504                        key.into_raw();
505                        Poll::Pending
506                    }
507                    // FreeBSD:
508                    //   * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to
509                    //     blocking IO.
510                    //   * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe)
511                    //     dead entries.
512                    // Solarish:
513                    //   * EAGAIN: Allocation failed.
514                    Err(e)
515                        if matches!(
516                            e.raw_os_error(),
517                            Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
518                        ) =>
519                    {
520                        self.push_blocking(key);
521                        Poll::Pending
522                    }
523                    Err(e) => Poll::Ready(Err(e)),
524                }
525            }
526        }
527    }
528
529    fn push_blocking(&mut self, key: ErasedKey) {
530        let waker = self.waker();
531        let completed = self.completed_tx.clone();
532        // SAFETY: we're submitting into the driver, so it's safe to freeze here.
533        let mut key = unsafe { key.freeze() };
534
535        let mut closure = move || {
536            let poll = key.pinned_op().operate();
537            let res = match poll {
538                Poll::Pending => unreachable!("this operation is not non-blocking"),
539                Poll::Ready(res) => res,
540            };
541            let _ = completed.send(Entry::new(key.into_inner(), res));
542            waker.wake();
543        };
544
545        while let Err(e) = self.pool.dispatch(closure) {
546            closure = e.0;
547            self.poll_completed();
548        }
549    }
550
551    fn poll_completed(&mut self) -> bool {
552        let mut ret = false;
553        while let Ok(entry) = self.completed_rx.try_recv() {
554            entry.notify();
555            ret = true;
556        }
557        ret
558    }
559
560    #[allow(clippy::blocks_in_conditions)]
561    fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
562        let queue = self.get_queue(fd);
563
564        if let Some((key, _)) = queue.pop_interest(&event)
565            && let mut op = key.borrow()
566            && op.extra_mut().as_poll_mut().handle_event(fd)
567        {
568            // Add brace here to force `Ref` drop within the scrutinee
569            match { op.pinned_op().operate() } {
570                // Submit all fd's back to the front of the queue
571                Poll::Pending => {
572                    let extra = op.extra_mut().as_poll_mut();
573                    extra.reset();
574                    // `FdQueue` may have been removed, need to submit again
575                    for t in extra.track.iter() {
576                        let res = unsafe { self.submit_front(key.clone(), t.arg) };
577                        if let Err(e) = res {
578                            // On error, remove all previously submitted fds.
579                            for t in extra.track.iter() {
580                                let _ = self.remove_one(&key, t.arg.fd);
581                            }
582                            return Err(e);
583                        }
584                    }
585                }
586                Poll::Ready(res) => {
587                    drop(op);
588                    Entry::new(key, res).notify()
589                }
590            };
591        }
592
593        let renew_event = self.get_queue(fd).event();
594        let fd = unsafe { BorrowedFd::borrow_raw(fd) };
595        self.renew(fd, renew_event)
596    }
597
598    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
599        instrument!(compio_log::Level::TRACE, "poll", ?timeout);
600        if self.poll_completed() {
601            return Ok(());
602        }
603        self.events.clear();
604        self.notify.poll.wait(&mut self.events, timeout)?;
605        if self.events.is_empty() && timeout.is_some() {
606            return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
607        }
608        self.with_events(|this, events| {
609            for event in events.iter() {
610                trace!("receive {} for {:?}", event.key, event);
611                // SAFETY: user_data is promised to be valid.
612                let key = unsafe { BorrowedKey::from_raw(event.key) };
613                let mut op = key.borrow();
614                let op_type = op.pinned_op().op_type();
615                match op_type {
616                    None => {
617                        // On epoll, multiple event may be received even if it is registered as
618                        // one-shot. It is safe to ignore it.
619                        trace!("op {} is completed", event.key);
620                    }
621                    Some(OpType::Fd(_)) => {
622                        // FIXME: This should not happen
623                        let Some(fd) = op.extra().as_poll().next_fd() else {
624                            return Ok(());
625                        };
626                        drop(op);
627                        this.poll_one(event, fd)?;
628                    }
629                    #[cfg(aio)]
630                    Some(OpType::Aio(aiocbp)) => {
631                        drop(op);
632                        let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
633                        let res = match err {
634                            // If the user_data is reused but the previously registered event still
635                            // emits (for example, HUP in epoll; however it is impossible now
636                            // because we only use AIO on FreeBSD), we'd better ignore the current
637                            // one and wait for the real event.
638                            libc::EINPROGRESS => {
639                                trace!("op {} is not completed", key.as_raw());
640                                continue;
641                            }
642                            libc::ECANCELED => {
643                                // Remove the aiocb from kqueue.
644                                unsafe { libc::aio_return(aiocbp.as_ptr()) };
645                                Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
646                            }
647                            _ => {
648                                syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
649                            }
650                        };
651                        let key = unsafe { ErasedKey::from_raw(event.key) };
652                        Entry::new(key, res).notify()
653                    }
654                }
655            }
656
657            Ok(())
658        })
659    }
660
661    pub fn waker(&self) -> Waker {
662        Waker::from(self.notify.clone())
663    }
664
665    pub fn create_buffer_pool(
666        &mut self,
667        buffer_len: u16,
668        buffer_size: usize,
669    ) -> io::Result<BufferPool> {
670        #[cfg(fusion)]
671        {
672            Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
673                buffer_len,
674                buffer_size,
675            )))
676        }
677        #[cfg(not(fusion))]
678        {
679            Ok(BufferPool::new(buffer_len, buffer_size))
680        }
681    }
682
683    /// # Safety
684    ///
685    /// caller must make sure release the buffer pool with correct driver
686    pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
687        Ok(())
688    }
689}
690
691impl AsRawFd for Driver {
692    fn as_raw_fd(&self) -> RawFd {
693        self.poller().as_raw_fd()
694    }
695}
696
697impl Drop for Driver {
698    fn drop(&mut self) {
699        for fd in self.registry.keys() {
700            unsafe {
701                let fd = BorrowedFd::borrow_raw(*fd);
702                self.poller().delete(fd).ok();
703            }
704        }
705    }
706}
707
708impl Entry {
709    pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
710        Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
711    }
712}
713
714/// A notify handle to the inner driver.
715pub(crate) struct Notify {
716    poll: Poller,
717}
718
719impl Notify {
720    fn new(poll: Poller) -> Self {
721        Self { poll }
722    }
723
724    /// Notify the inner driver.
725    pub fn notify(&self) -> io::Result<()> {
726        self.poll.notify()
727    }
728}
729
730impl Wake for Notify {
731    fn wake(self: Arc<Self>) {
732        self.wake_by_ref();
733    }
734
735    fn wake_by_ref(self: &Arc<Self>) {
736        self.notify().ok();
737    }
738}