Skip to main content

compio_driver/sys/poll/
mod.rs

1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7    collections::{HashMap, VecDeque},
8    io,
9    num::NonZeroUsize,
10    pin::Pin,
11    sync::Arc,
12    task::{Poll, Wake, Waker},
13    time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use crossbeam_queue::SegQueue;
18use polling::{Event, Events, Poller};
19use smallvec::SmallVec;
20
21use crate::{
22    AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
23    key::{BorrowedKey, Key, RefExt},
24    op::Interest,
25    syscall,
26};
27
28mod extra;
29pub use extra::Extra;
30pub(crate) mod op;
31
32struct Track {
33    arg: WaitArg,
34    ready: bool,
35}
36
37impl From<WaitArg> for Track {
38    fn from(arg: WaitArg) -> Self {
39        Self { arg, ready: false }
40    }
41}
42
43/// Abstraction of operations.
44///
45/// # Safety
46///
47/// If `pre_submit` returns `Decision::Wait`, `op_type` must also return
48/// `Some(OpType::Fd)` with same fds as the `WaitArg`s. Similarly, if
49/// `pre_submit` returns `Decision::Aio`, `op_type` must return
50/// `Some(OpType::Aio)` with the correct `aiocb` pointer.
51pub unsafe trait OpCode {
52    /// Perform the operation before submit, and return [`Decision`] to
53    /// indicate whether submitting the operation to polling is required.
54    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
55
56    /// Get the operation type when an event is occurred.
57    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
58        None
59    }
60
61    /// Perform the operation after received corresponding
62    /// event. If this operation is blocking, the return value should be
63    /// [`Poll::Ready`].
64    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
65}
66
67pub use OpCode as PollOpCode;
68
69/// One item in local or more items on heap.
70type Multi<T> = SmallVec<[T; 1]>;
71
72/// Result of [`OpCode::pre_submit`].
73#[non_exhaustive]
74pub enum Decision {
75    /// Instant operation, no need to submit
76    Completed(usize),
77    /// Async operation, needs to submit
78    Wait(Multi<WaitArg>),
79    /// Blocking operation, needs to be spawned in another thread
80    Blocking,
81    /// AIO operation, needs to be spawned to the kernel.
82    #[cfg(aio)]
83    Aio(AioControl),
84}
85
86impl Decision {
87    /// Decide to wait for the given fd with the given interest.
88    pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
89        Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
90    }
91
92    /// Decide to wait for many fds.
93    pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
94        Self::Wait(Multi::from_iter(args))
95    }
96
97    /// Decide to wait for the given fd to be readable.
98    pub fn wait_readable(fd: RawFd) -> Self {
99        Self::wait_for(fd, Interest::Readable)
100    }
101
102    /// Decide to wait for the given fd to be writable.
103    pub fn wait_writable(fd: RawFd) -> Self {
104        Self::wait_for(fd, Interest::Writable)
105    }
106
107    /// Decide to spawn an AIO operation. `submit` is a method like `aio_read`.
108    #[cfg(aio)]
109    pub fn aio(
110        cb: &mut libc::aiocb,
111        submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
112    ) -> Self {
113        Self::Aio(AioControl {
114            aiocbp: NonNull::from(cb),
115            submit,
116        })
117    }
118}
119
120/// Meta of polling operations.
121#[derive(Debug, Clone, Copy)]
122pub struct WaitArg {
123    /// The raw fd of the operation.
124    pub fd: RawFd,
125    /// The interest to be registered.
126    pub interest: Interest,
127}
128
129impl WaitArg {
130    /// Create a new readable `WaitArg`.
131    pub fn readable(fd: RawFd) -> Self {
132        Self {
133            fd,
134            interest: Interest::Readable,
135        }
136    }
137
138    /// Create a new writable `WaitArg`.
139    pub fn writable(fd: RawFd) -> Self {
140        Self {
141            fd,
142            interest: Interest::Writable,
143        }
144    }
145}
146
147/// Meta of AIO operations.
148#[cfg(aio)]
149#[derive(Debug, Clone, Copy)]
150pub struct AioControl {
151    /// Pointer of the control block.
152    pub aiocbp: NonNull<libc::aiocb>,
153    /// The aio_* submit function.
154    pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
155}
156
157#[derive(Debug, Default)]
158struct FdQueue {
159    read_queue: VecDeque<ErasedKey>,
160    write_queue: VecDeque<ErasedKey>,
161}
162
163/// A token to remove an interest from `FdQueue`.
164///
165/// It is returned when an interest is pushed, and can be used to remove the
166/// interest later. However do be careful that the index may be invalid or does
167/// not correspond to the one inserted if other interests are added or removed
168/// before it (toctou).
169struct RemoveToken {
170    idx: usize,
171    is_read: bool,
172}
173
174impl RemoveToken {
175    fn read(idx: usize) -> Self {
176        Self { idx, is_read: true }
177    }
178
179    fn write(idx: usize) -> Self {
180        Self {
181            idx,
182            is_read: false,
183        }
184    }
185}
186
187impl FdQueue {
188    fn is_empty(&self) -> bool {
189        self.read_queue.is_empty() && self.write_queue.is_empty()
190    }
191
192    fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
193        if token.is_read {
194            self.read_queue.remove(token.idx)
195        } else {
196            self.write_queue.remove(token.idx)
197        }
198    }
199
200    pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
201        match interest {
202            Interest::Readable => {
203                self.read_queue.push_back(key);
204                RemoveToken::read(self.read_queue.len() - 1)
205            }
206            Interest::Writable => {
207                self.write_queue.push_back(key);
208                RemoveToken::write(self.write_queue.len() - 1)
209            }
210        }
211    }
212
213    pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
214        let is_read = match interest {
215            Interest::Readable => {
216                self.read_queue.push_front(key);
217                true
218            }
219            Interest::Writable => {
220                self.write_queue.push_front(key);
221                false
222            }
223        };
224        RemoveToken { idx: 0, is_read }
225    }
226
227    pub fn remove(&mut self, key: &ErasedKey) {
228        self.read_queue.retain(|k| k != key);
229        self.write_queue.retain(|k| k != key);
230    }
231
232    pub fn event(&self) -> Event {
233        let mut event = Event::none(0);
234        if let Some(key) = self.read_queue.front() {
235            event.readable = true;
236            event.key = key.as_raw();
237        }
238        if let Some(key) = self.write_queue.front() {
239            event.writable = true;
240            event.key = key.as_raw();
241        }
242        event
243    }
244
245    pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
246        if event.readable
247            && let Some(key) = self.read_queue.pop_front()
248        {
249            return Some((key, Interest::Readable));
250        }
251        if event.writable
252            && let Some(key) = self.write_queue.pop_front()
253        {
254            return Some((key, Interest::Writable));
255        }
256        None
257    }
258}
259
260/// Represents the filter type of kqueue. `polling` crate doesn't expose such
261/// API, and we need to know about it when `cancel` is called.
262#[non_exhaustive]
263pub enum OpType {
264    /// The operation polls an fd.
265    Fd(Multi<RawFd>),
266    /// The operation submits an AIO.
267    #[cfg(aio)]
268    Aio(NonNull<libc::aiocb>),
269}
270
271impl OpType {
272    /// Create an [`OpType::Fd`] with one [`RawFd`].
273    pub fn fd(fd: RawFd) -> Self {
274        Self::Fd(SmallVec::from_buf([fd]))
275    }
276
277    /// Create an [`OpType::Fd`] with multiple [`RawFd`]s.
278    pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
279        Self::Fd(Multi::from_iter(fds))
280    }
281}
282
283/// Low-level driver of polling.
284pub(crate) struct Driver {
285    events: Events,
286    notify: Arc<Notify>,
287    registry: HashMap<RawFd, FdQueue>,
288    pool: AsyncifyPool,
289    pool_completed: Arc<SegQueue<Entry>>,
290}
291
292impl Driver {
293    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
294        instrument!(compio_log::Level::TRACE, "new", ?builder);
295        trace!("new poll driver");
296
297        let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
298            Events::with_capacity(cap)
299        } else {
300            Events::new()
301        };
302        let poll = Poller::new()?;
303        let notify = Arc::new(Notify::new(poll));
304
305        Ok(Self {
306            events,
307            notify,
308            registry: HashMap::new(),
309            pool: builder.create_or_get_thread_pool(),
310            pool_completed: Arc::new(SegQueue::new()),
311        })
312    }
313
314    pub fn driver_type(&self) -> DriverType {
315        DriverType::Poll
316    }
317
318    pub fn default_extra(&self) -> Extra {
319        Extra::new()
320    }
321
322    fn poller(&self) -> &Poller {
323        &self.notify.poll
324    }
325
326    fn with_events<F, R>(&mut self, f: F) -> R
327    where
328        F: FnOnce(&mut Self, &mut Events) -> R,
329    {
330        let mut events = std::mem::take(&mut self.events);
331        let res = f(self, &mut events);
332        self.events = events;
333        res
334    }
335
336    fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
337        self.registry.get_mut(&fd)
338    }
339
340    fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
341        self.try_get_queue(fd).expect("the fd should be submitted")
342    }
343
344    /// Submit a new operation to the end of the queue.
345    ///
346    ///  # Safety
347    /// The input fd should be valid.
348    unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
349        let Self {
350            registry, notify, ..
351        } = self;
352        let need_add = !registry.contains_key(&arg.fd);
353        let queue = registry.entry(arg.fd).or_default();
354        let token = queue.push_back_interest(key, arg.interest);
355        let event = queue.event();
356        let res = if need_add {
357            // SAFETY: the events are deleted correctly.
358            unsafe { notify.poll.add(arg.fd, event) }
359        } else {
360            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
361            notify.poll.modify(fd, event)
362        };
363        if res.is_err() {
364            // Rollback the push if submission failed.
365            queue.remove_token(token);
366            if queue.is_empty() {
367                registry.remove(&arg.fd);
368            }
369        }
370
371        res
372    }
373
374    /// Submit a new operation to the front of the queue.
375    ///
376    /// # Safety
377    /// The input fd should be valid.
378    unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
379        let need_add = !self.registry.contains_key(&arg.fd);
380        let queue = self.registry.entry(arg.fd).or_default();
381        queue.push_front_interest(key, arg.interest);
382        let event = queue.event();
383        if need_add {
384            // SAFETY: the events are deleted correctly.
385            unsafe { self.poller().add(arg.fd, event)? }
386        } else {
387            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
388            self.poller().modify(fd, event)?;
389        }
390        Ok(())
391    }
392
393    fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
394        if !renew_event.readable && !renew_event.writable {
395            self.poller().delete(fd)?;
396            self.registry.remove(&fd.as_raw_fd());
397        } else {
398            self.poller().modify(fd, renew_event)?;
399        }
400        Ok(())
401    }
402
403    /// Remove one interest from the queue.
404    fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
405        let Some(queue) = self.try_get_queue(fd) else {
406            return Ok(());
407        };
408        queue.remove(key);
409        let renew_event = queue.event();
410        if queue.is_empty() {
411            self.registry.remove(&fd);
412        }
413        self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
414    }
415
416    /// Remove one interest from the queue, and emit a cancelled entry.
417    fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
418        self.remove_one(&key, fd)
419            .map_or(None, |_| Some(Entry::new_cancelled(key)))
420    }
421
422    pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
423        Ok(())
424    }
425
426    pub fn cancel<T>(&mut self, key: Key<T>) {
427        let op_type = key.borrow().pinned_op().op_type();
428        match op_type {
429            None => {}
430            Some(OpType::Fd(fds)) => {
431                let mut pushed = false;
432                for fd in fds {
433                    let entry = self.cancel_one(key.clone().erase(), fd);
434                    if !pushed && let Some(entry) = entry {
435                        self.pool_completed.push(entry);
436                        pushed = true;
437                    }
438                }
439            }
440            #[cfg(aio)]
441            Some(OpType::Aio(aiocbp)) => {
442                let aiocb = unsafe { aiocbp.as_ref() };
443                let fd = aiocb.aio_fildes;
444                syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
445            }
446        }
447    }
448
449    pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
450        instrument!(compio_log::Level::TRACE, "push", ?key);
451        match { key.borrow().pinned_op().pre_submit()? } {
452            Decision::Wait(args) => {
453                key.borrow()
454                    .extra_mut()
455                    .as_poll_mut()
456                    .set_args(args.clone());
457                for arg in args.iter().copied() {
458                    // SAFETY: fd is from the OpCode.
459                    let res = unsafe { self.submit(key.clone(), arg) };
460                    // if submission fails, remove all previously submitted fds.
461                    if let Err(e) = res {
462                        args.into_iter().for_each(|arg| {
463                            // we don't care about renew errors
464                            let _ = self.remove_one(&key, arg.fd);
465                        });
466                        return Poll::Ready(Err(e));
467                    }
468                    trace!("register {:?}", arg);
469                }
470                Poll::Pending
471            }
472            Decision::Completed(res) => Poll::Ready(Ok(res)),
473            Decision::Blocking => self.push_blocking(key),
474            #[cfg(aio)]
475            Decision::Aio(AioControl { mut aiocbp, submit }) => {
476                let aiocb = unsafe { aiocbp.as_mut() };
477                let user_data = key.as_raw();
478                #[cfg(freebsd)]
479                {
480                    // sigev_notify_kqueue
481                    aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
482                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
483                    aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
484                }
485                #[cfg(solarish)]
486                let mut notify = libc::port_notify {
487                    portnfy_port: self.as_raw_fd(),
488                    portnfy_user: user_data as _,
489                };
490                #[cfg(solarish)]
491                {
492                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
493                    aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
494                }
495                match syscall!(submit(aiocbp.as_ptr())) {
496                    Ok(_) => {
497                        // Key is successfully submitted, leak it on this side.
498                        key.into_raw();
499                        Poll::Pending
500                    }
501                    // FreeBSD:
502                    //   * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to
503                    //     blocking IO.
504                    //   * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe)
505                    //     dead entries.
506                    // Solarish:
507                    //   * EAGAIN: Allocation failed.
508                    Err(e)
509                        if matches!(
510                            e.raw_os_error(),
511                            Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
512                        ) =>
513                    {
514                        self.push_blocking(key)
515                    }
516                    Err(e) => Poll::Ready(Err(e)),
517                }
518            }
519        }
520    }
521
522    fn push_blocking(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
523        let waker = self.waker();
524        let completed = self.pool_completed.clone();
525        // SAFETY: we're submitting into the driver, so it's safe to freeze here.
526        let mut key = unsafe { key.freeze() };
527
528        let mut closure = move || {
529            let poll = key.pinned_op().operate();
530            let res = match poll {
531                Poll::Pending => unreachable!("this operation is not non-blocking"),
532                Poll::Ready(res) => res,
533            };
534            completed.push(Entry::new(key.into_inner(), res));
535            waker.wake();
536        };
537        loop {
538            match self.pool.dispatch(closure) {
539                Ok(()) => return Poll::Pending,
540                Err(e) => {
541                    closure = e.0;
542                    self.poll_blocking();
543                }
544            }
545        }
546    }
547
548    fn poll_blocking(&mut self) -> bool {
549        if self.pool_completed.is_empty() {
550            return false;
551        }
552        while let Some(entry) = self.pool_completed.pop() {
553            entry.notify();
554        }
555        true
556    }
557
558    #[allow(clippy::blocks_in_conditions)]
559    fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
560        let queue = self.get_queue(fd);
561
562        if let Some((key, _)) = queue.pop_interest(&event)
563            && let mut op = key.borrow()
564            && op.extra_mut().as_poll_mut().handle_event(fd)
565        {
566            // Add brace here to force `Ref` drop within the scrutinee
567            match { op.pinned_op().operate() } {
568                // Submit all fd's back to the front of the queue
569                Poll::Pending => {
570                    let extra = op.extra_mut().as_poll_mut();
571                    extra.reset();
572                    // `FdQueue` may have been removed, need to submit again
573                    for t in extra.track.iter() {
574                        let res = unsafe { self.submit_front(key.clone(), t.arg) };
575                        if let Err(e) = res {
576                            // On error, remove all previously submitted fds.
577                            for t in extra.track.iter() {
578                                let _ = self.remove_one(&key, t.arg.fd);
579                            }
580                            return Err(e);
581                        }
582                    }
583                }
584                Poll::Ready(res) => {
585                    drop(op);
586                    Entry::new(key, res).notify()
587                }
588            };
589        }
590
591        let renew_event = self.get_queue(fd).event();
592        let fd = unsafe { BorrowedFd::borrow_raw(fd) };
593        self.renew(fd, renew_event)
594    }
595
596    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
597        instrument!(compio_log::Level::TRACE, "poll", ?timeout);
598        if self.poll_blocking() {
599            return Ok(());
600        }
601        self.events.clear();
602        self.notify.poll.wait(&mut self.events, timeout)?;
603        if self.events.is_empty() && timeout.is_some() {
604            return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
605        }
606        self.with_events(|this, events| {
607            for event in events.iter() {
608                trace!("receive {} for {:?}", event.key, event);
609                // SAFETY: user_data is promised to be valid.
610                let key = unsafe { BorrowedKey::from_raw(event.key) };
611                let mut op = key.borrow();
612                let op_type = op.pinned_op().op_type();
613                match op_type {
614                    None => {
615                        // On epoll, multiple event may be received even if it is registered as
616                        // one-shot. It is safe to ignore it.
617                        trace!("op {} is completed", event.key);
618                    }
619                    Some(OpType::Fd(_)) => {
620                        // FIXME: This should not happen
621                        let Some(fd) = op.extra().as_poll().next_fd() else {
622                            return Ok(());
623                        };
624                        drop(op);
625                        this.poll_one(event, fd)?;
626                    }
627                    #[cfg(aio)]
628                    Some(OpType::Aio(aiocbp)) => {
629                        drop(op);
630                        let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
631                        let res = match err {
632                            // If the user_data is reused but the previously registered event still
633                            // emits (for example, HUP in epoll; however it is impossible now
634                            // because we only use AIO on FreeBSD), we'd better ignore the current
635                            // one and wait for the real event.
636                            libc::EINPROGRESS => {
637                                trace!("op {} is not completed", key.as_raw());
638                                continue;
639                            }
640                            libc::ECANCELED => {
641                                // Remove the aiocb from kqueue.
642                                unsafe { libc::aio_return(aiocbp.as_ptr()) };
643                                Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
644                            }
645                            _ => {
646                                syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
647                            }
648                        };
649                        let key = unsafe { ErasedKey::from_raw(event.key) };
650                        Entry::new(key, res).notify()
651                    }
652                }
653            }
654
655            Ok(())
656        })
657    }
658
659    pub fn waker(&self) -> Waker {
660        Waker::from(self.notify.clone())
661    }
662
663    pub fn create_buffer_pool(
664        &mut self,
665        buffer_len: u16,
666        buffer_size: usize,
667    ) -> io::Result<BufferPool> {
668        #[cfg(fusion)]
669        {
670            Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
671                buffer_len,
672                buffer_size,
673            )))
674        }
675        #[cfg(not(fusion))]
676        {
677            Ok(BufferPool::new(buffer_len, buffer_size))
678        }
679    }
680
681    /// # Safety
682    ///
683    /// caller must make sure release the buffer pool with correct driver
684    pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
685        Ok(())
686    }
687}
688
689impl AsRawFd for Driver {
690    fn as_raw_fd(&self) -> RawFd {
691        self.poller().as_raw_fd()
692    }
693}
694
695impl Drop for Driver {
696    fn drop(&mut self) {
697        for fd in self.registry.keys() {
698            unsafe {
699                let fd = BorrowedFd::borrow_raw(*fd);
700                self.poller().delete(fd).ok();
701            }
702        }
703    }
704}
705
706impl Entry {
707    pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
708        Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
709    }
710}
711
712/// A notify handle to the inner driver.
713pub(crate) struct Notify {
714    poll: Poller,
715}
716
717impl Notify {
718    fn new(poll: Poller) -> Self {
719        Self { poll }
720    }
721
722    /// Notify the inner driver.
723    pub fn notify(&self) -> io::Result<()> {
724        self.poll.notify()
725    }
726}
727
728impl Wake for Notify {
729    fn wake(self: Arc<Self>) {
730        self.wake_by_ref();
731    }
732
733    fn wake_by_ref(self: &Arc<Self>) {
734        self.notify().ok();
735    }
736}