compio_driver/poll/
mod.rs

1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7    collections::{HashMap, VecDeque},
8    io,
9    num::NonZeroUsize,
10    pin::Pin,
11    sync::Arc,
12    task::{Poll, Wake, Waker},
13    time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use crossbeam_queue::SegQueue;
18use polling::{Event, Events, Poller};
19
20use crate::{
21    AsyncifyPool, BufferPool, DriverType, Entry, Key, ProactorBuilder, op::Interest, syscall,
22};
23
24pub(crate) mod op;
25
26/// Abstraction of operations.
27pub trait OpCode {
28    /// Perform the operation before submit, and return [`Decision`] to
29    /// indicate whether submitting the operation to polling is required.
30    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
31
32    /// Get the operation type when an event is occurred.
33    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
34        None
35    }
36
37    /// Perform the operation after received corresponding
38    /// event. If this operation is blocking, the return value should be
39    /// [`Poll::Ready`].
40    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
41}
42
43/// Result of [`OpCode::pre_submit`].
44#[non_exhaustive]
45pub enum Decision {
46    /// Instant operation, no need to submit
47    Completed(usize),
48    /// Async operation, needs to submit
49    Wait(WaitArg),
50    /// Blocking operation, needs to be spawned in another thread
51    Blocking,
52    /// AIO operation, needs to be spawned to the kernel.
53    #[cfg(aio)]
54    Aio(AioControl),
55}
56
57impl Decision {
58    /// Decide to wait for the given fd with the given interest.
59    pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
60        Self::Wait(WaitArg { fd, interest })
61    }
62
63    /// Decide to wait for the given fd to be readable.
64    pub fn wait_readable(fd: RawFd) -> Self {
65        Self::wait_for(fd, Interest::Readable)
66    }
67
68    /// Decide to wait for the given fd to be writable.
69    pub fn wait_writable(fd: RawFd) -> Self {
70        Self::wait_for(fd, Interest::Writable)
71    }
72
73    /// Decide to spawn an AIO operation. `submit` is a method like `aio_read`.
74    #[cfg(aio)]
75    pub fn aio(
76        cb: &mut libc::aiocb,
77        submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
78    ) -> Self {
79        Self::Aio(AioControl {
80            aiocbp: NonNull::from(cb),
81            submit,
82        })
83    }
84}
85
86/// Meta of polling operations.
87#[derive(Debug, Clone, Copy)]
88pub struct WaitArg {
89    /// The raw fd of the operation.
90    pub fd: RawFd,
91    /// The interest to be registered.
92    pub interest: Interest,
93}
94
95/// Meta of AIO operations.
96#[cfg(aio)]
97#[derive(Debug, Clone, Copy)]
98pub struct AioControl {
99    /// Pointer of the control block.
100    pub aiocbp: NonNull<libc::aiocb>,
101    /// The aio_* submit function.
102    pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
103}
104
105#[derive(Debug, Default)]
106struct FdQueue {
107    read_queue: VecDeque<usize>,
108    write_queue: VecDeque<usize>,
109}
110
111impl FdQueue {
112    pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) {
113        match interest {
114            Interest::Readable => self.read_queue.push_back(user_data),
115            Interest::Writable => self.write_queue.push_back(user_data),
116        }
117    }
118
119    pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) {
120        match interest {
121            Interest::Readable => self.read_queue.push_front(user_data),
122            Interest::Writable => self.write_queue.push_front(user_data),
123        }
124    }
125
126    pub fn remove(&mut self, user_data: usize) {
127        self.read_queue.retain(|&k| k != user_data);
128        self.write_queue.retain(|&k| k != user_data);
129    }
130
131    pub fn event(&self) -> Event {
132        let mut event = Event::none(0);
133        if let Some(&key) = self.read_queue.front() {
134            event.readable = true;
135            event.key = key;
136        }
137        if let Some(&key) = self.write_queue.front() {
138            event.writable = true;
139            event.key = key;
140        }
141        event
142    }
143
144    pub fn pop_interest(&mut self, event: &Event) -> Option<(usize, Interest)> {
145        if event.readable
146            && let Some(user_data) = self.read_queue.pop_front()
147        {
148            return Some((user_data, Interest::Readable));
149        }
150        if event.writable
151            && let Some(user_data) = self.write_queue.pop_front()
152        {
153            return Some((user_data, Interest::Writable));
154        }
155        None
156    }
157}
158
159/// Represents the filter type of kqueue. `polling` crate doesn't expose such
160/// API, and we need to know about it when `cancel` is called.
161#[non_exhaustive]
162pub enum OpType {
163    /// The operation polls an fd.
164    Fd(RawFd),
165    /// The operation submits an AIO.
166    #[cfg(aio)]
167    Aio(NonNull<libc::aiocb>),
168}
169
170/// Low-level driver of polling.
171pub(crate) struct Driver {
172    events: Events,
173    notify: Arc<Notify>,
174    registry: HashMap<RawFd, FdQueue>,
175    pool: AsyncifyPool,
176    pool_completed: Arc<SegQueue<Entry>>,
177}
178
179impl Driver {
180    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
181        instrument!(compio_log::Level::TRACE, "new", ?builder);
182        trace!("new poll driver");
183        let entries = builder.capacity as usize; // for the sake of consistency, use u32 like iour
184        let events = if entries == 0 {
185            Events::new()
186        } else {
187            Events::with_capacity(NonZeroUsize::new(entries).unwrap())
188        };
189
190        let poll = Poller::new()?;
191        let notify = Arc::new(Notify::new(poll));
192
193        Ok(Self {
194            events,
195            notify,
196            registry: HashMap::new(),
197            pool: builder.create_or_get_thread_pool(),
198            pool_completed: Arc::new(SegQueue::new()),
199        })
200    }
201
202    pub fn driver_type(&self) -> DriverType {
203        DriverType::Poll
204    }
205
206    fn poller(&self) -> &Poller {
207        &self.notify.poll
208    }
209
210    pub fn create_op<T: crate::sys::OpCode + 'static>(&self, op: T) -> Key<T> {
211        Key::new(self.as_raw_fd(), op)
212    }
213
214    /// # Safety
215    /// The input fd should be valid.
216    unsafe fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> {
217        let need_add = !self.registry.contains_key(&arg.fd);
218        let queue = self.registry.entry(arg.fd).or_default();
219        queue.push_back_interest(user_data, arg.interest);
220        let event = queue.event();
221        if need_add {
222            // SAFETY: the events are deleted correctly.
223            unsafe { self.poller().add(arg.fd, event)? }
224        } else {
225            let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
226            self.poller().modify(fd, event)?;
227        }
228        Ok(())
229    }
230
231    fn renew(
232        poll: &Poller,
233        registry: &mut HashMap<RawFd, FdQueue>,
234        fd: BorrowedFd,
235        renew_event: Event,
236    ) -> io::Result<()> {
237        if !renew_event.readable && !renew_event.writable {
238            poll.delete(fd)?;
239            registry.remove(&fd.as_raw_fd());
240        } else {
241            poll.modify(fd, renew_event)?;
242        }
243        Ok(())
244    }
245
246    pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
247        Ok(())
248    }
249
250    pub fn cancel(&mut self, op: &mut Key<dyn crate::sys::OpCode>) {
251        let op_pin = op.as_op_pin();
252        match op_pin.op_type() {
253            None => {}
254            Some(OpType::Fd(fd)) => {
255                let queue = self
256                    .registry
257                    .get_mut(&fd)
258                    .expect("the fd should be attached");
259                queue.remove(op.user_data());
260                let renew_event = queue.event();
261                if Self::renew(
262                    &self.notify.poll,
263                    &mut self.registry,
264                    unsafe { BorrowedFd::borrow_raw(fd) },
265                    renew_event,
266                )
267                .is_ok()
268                {
269                    self.pool_completed.push(entry_cancelled(op.user_data()));
270                }
271            }
272            #[cfg(aio)]
273            Some(OpType::Aio(aiocbp)) => {
274                let aiocb = unsafe { aiocbp.as_ref() };
275                let fd = aiocb.aio_fildes;
276                syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
277            }
278        }
279    }
280
281    pub fn push(&mut self, op: &mut Key<dyn crate::sys::OpCode>) -> Poll<io::Result<usize>> {
282        instrument!(compio_log::Level::TRACE, "push", ?op);
283        let user_data = op.user_data();
284        let op_pin = op.as_op_pin();
285        match op_pin.pre_submit()? {
286            Decision::Wait(arg) => {
287                // SAFETY: fd is from the OpCode.
288                unsafe {
289                    self.submit(user_data, arg)?;
290                }
291                trace!("register {:?}", arg);
292                Poll::Pending
293            }
294            Decision::Completed(res) => Poll::Ready(Ok(res)),
295            Decision::Blocking => self.push_blocking(user_data),
296            #[cfg(aio)]
297            Decision::Aio(AioControl { mut aiocbp, submit }) => {
298                let aiocb = unsafe { aiocbp.as_mut() };
299                #[cfg(freebsd)]
300                {
301                    // sigev_notify_kqueue
302                    aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
303                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
304                    aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
305                }
306                #[cfg(solarish)]
307                let mut notify = libc::port_notify {
308                    portnfy_port: self.as_raw_fd(),
309                    portnfy_user: user_data as _,
310                };
311                #[cfg(solarish)]
312                {
313                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
314                    aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
315                }
316                match syscall!(submit(aiocbp.as_ptr())) {
317                    Ok(_) => Poll::Pending,
318                    // FreeBSD:
319                    //   * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to
320                    //     blocking IO.
321                    //   * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe)
322                    //     dead entries.
323                    // Solarish:
324                    //   * EAGAIN: Allocation failed.
325                    Err(e)
326                        if matches!(
327                            e.raw_os_error(),
328                            Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
329                        ) =>
330                    {
331                        self.push_blocking(user_data)
332                    }
333                    Err(e) => Poll::Ready(Err(e)),
334                }
335            }
336        }
337    }
338
339    fn push_blocking(&mut self, user_data: usize) -> Poll<io::Result<usize>> {
340        let waker = self.waker();
341        let completed = self.pool_completed.clone();
342        let mut closure = move || {
343            let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
344            let op_pin = op.as_op_pin();
345            let res = match op_pin.operate() {
346                Poll::Pending => unreachable!("this operation is not non-blocking"),
347                Poll::Ready(res) => res,
348            };
349            completed.push(Entry::new(user_data, res));
350            waker.wake();
351        };
352        loop {
353            match self.pool.dispatch(closure) {
354                Ok(()) => return Poll::Pending,
355                Err(e) => {
356                    closure = e.0;
357                    self.poll_blocking();
358                }
359            }
360        }
361    }
362
363    fn poll_blocking(&mut self) -> bool {
364        if self.pool_completed.is_empty() {
365            return false;
366        }
367        while let Some(entry) = self.pool_completed.pop() {
368            unsafe {
369                entry.notify();
370            }
371        }
372        true
373    }
374
375    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
376        instrument!(compio_log::Level::TRACE, "poll", ?timeout);
377        if self.poll_blocking() {
378            return Ok(());
379        }
380        self.events.clear();
381        self.notify.poll.wait(&mut self.events, timeout)?;
382        if self.events.is_empty() && timeout.is_some() {
383            return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
384        }
385        for event in self.events.iter() {
386            let user_data = event.key;
387            trace!("receive {} for {:?}", user_data, event);
388            // SAFETY: user_data is promised to be valid.
389            let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
390            let op = op.as_op_pin();
391            match op.op_type() {
392                None => {
393                    // On epoll, multiple event may be received even if it is registered as
394                    // one-shot. It is safe to ignore it.
395                    trace!("op {} is completed", user_data);
396                }
397                Some(OpType::Fd(fd)) => {
398                    // If it's an FD op, the returned user_data is only for calling `op_type`. We
399                    // need to pop the real user_data from the queue.
400                    let queue = self
401                        .registry
402                        .get_mut(&fd)
403                        .expect("the fd should be attached");
404                    if let Some((user_data, interest)) = queue.pop_interest(&event) {
405                        let mut op =
406                            unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
407                        let op = op.as_op_pin();
408                        let res = match op.operate() {
409                            Poll::Pending => {
410                                // The operation should go back to the front.
411                                queue.push_front_interest(user_data, interest);
412                                None
413                            }
414                            Poll::Ready(res) => Some(res),
415                        };
416                        if let Some(res) = res {
417                            // SAFETY: `notify` is called only once.
418                            unsafe { Entry::new(user_data, res).notify() }
419                        }
420                    }
421                    let renew_event = queue.event();
422                    Self::renew(
423                        &self.notify.poll,
424                        &mut self.registry,
425                        unsafe { BorrowedFd::borrow_raw(fd) },
426                        renew_event,
427                    )?;
428                }
429                #[cfg(aio)]
430                Some(OpType::Aio(aiocbp)) => {
431                    let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
432                    let res = match err {
433                        // If the user_data is reused but the previously registered event still
434                        // emits (for example, HUP in epoll; however it is impossible now
435                        // because we only use AIO on FreeBSD), we'd better ignore the current
436                        // one and wait for the real event.
437                        libc::EINPROGRESS => {
438                            trace!("op {} is not completed", user_data);
439                            continue;
440                        }
441                        libc::ECANCELED => {
442                            // Remove the aiocb from kqueue.
443                            unsafe { libc::aio_return(aiocbp.as_ptr()) };
444                            Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
445                        }
446                        _ => syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize),
447                    };
448                    // SAFETY: `notify` is called only once.
449                    unsafe { Entry::new(user_data, res).notify() }
450                }
451            }
452        }
453        Ok(())
454    }
455
456    pub fn waker(&self) -> Waker {
457        Waker::from(self.notify.clone())
458    }
459
460    pub fn create_buffer_pool(
461        &mut self,
462        buffer_len: u16,
463        buffer_size: usize,
464    ) -> io::Result<BufferPool> {
465        #[cfg(fusion)]
466        {
467            Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
468                buffer_len,
469                buffer_size,
470            )))
471        }
472        #[cfg(not(fusion))]
473        {
474            Ok(BufferPool::new(buffer_len, buffer_size))
475        }
476    }
477
478    /// # Safety
479    ///
480    /// caller must make sure release the buffer pool with correct driver
481    pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
482        Ok(())
483    }
484}
485
486impl AsRawFd for Driver {
487    fn as_raw_fd(&self) -> RawFd {
488        self.poller().as_raw_fd()
489    }
490}
491
492impl Drop for Driver {
493    fn drop(&mut self) {
494        for fd in self.registry.keys() {
495            unsafe {
496                let fd = BorrowedFd::borrow_raw(*fd);
497                self.poller().delete(fd).ok();
498            }
499        }
500    }
501}
502
503fn entry_cancelled(user_data: usize) -> Entry {
504    Entry::new(
505        user_data,
506        Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
507    )
508}
509
510/// A notify handle to the inner driver.
511struct Notify {
512    poll: Poller,
513}
514
515impl Notify {
516    fn new(poll: Poller) -> Self {
517        Self { poll }
518    }
519
520    /// Notify the inner driver.
521    pub fn notify(&self) -> io::Result<()> {
522        self.poll.notify()
523    }
524}
525
526impl Wake for Notify {
527    fn wake(self: Arc<Self>) {
528        self.wake_by_ref();
529    }
530
531    fn wake_by_ref(self: &Arc<Self>) {
532        self.notify().ok();
533    }
534}