compio_driver/poll/
mod.rs

1#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
2#[allow(unused_imports)]
3pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
4#[cfg(aio)]
5use std::ptr::NonNull;
6use std::{
7    collections::{HashMap, VecDeque},
8    io,
9    num::NonZeroUsize,
10    pin::Pin,
11    sync::Arc,
12    task::Poll,
13    time::Duration,
14};
15
16use compio_log::{instrument, trace};
17use crossbeam_queue::SegQueue;
18pub(crate) use libc::{sockaddr_storage, socklen_t};
19use polling::{Event, Events, Poller};
20
21use crate::{AsyncifyPool, BufferPool, Entry, Key, ProactorBuilder, op::Interest, syscall};
22
23pub(crate) mod op;
24
25/// Abstraction of operations.
26pub trait OpCode {
27    /// Perform the operation before submit, and return [`Decision`] to
28    /// indicate whether submitting the operation to polling is required.
29    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
30
31    /// Get the operation type when an event is occurred.
32    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
33        None
34    }
35
36    /// Perform the operation after received corresponding
37    /// event. If this operation is blocking, the return value should be
38    /// [`Poll::Ready`].
39    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
40}
41
42/// Result of [`OpCode::pre_submit`].
43#[non_exhaustive]
44pub enum Decision {
45    /// Instant operation, no need to submit
46    Completed(usize),
47    /// Async operation, needs to submit
48    Wait(WaitArg),
49    /// Blocking operation, needs to be spawned in another thread
50    Blocking,
51    /// AIO operation, needs to be spawned to the kernel.
52    #[cfg(aio)]
53    Aio(AioControl),
54}
55
56impl Decision {
57    /// Decide to wait for the given fd with the given interest.
58    pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
59        Self::Wait(WaitArg { fd, interest })
60    }
61
62    /// Decide to wait for the given fd to be readable.
63    pub fn wait_readable(fd: RawFd) -> Self {
64        Self::wait_for(fd, Interest::Readable)
65    }
66
67    /// Decide to wait for the given fd to be writable.
68    pub fn wait_writable(fd: RawFd) -> Self {
69        Self::wait_for(fd, Interest::Writable)
70    }
71
72    /// Decide to spawn an AIO operation. `submit` is a method like `aio_read`.
73    #[cfg(aio)]
74    pub fn aio(
75        cb: &mut libc::aiocb,
76        submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
77    ) -> Self {
78        Self::Aio(AioControl {
79            aiocbp: NonNull::from(cb),
80            submit,
81        })
82    }
83}
84
85/// Meta of polling operations.
86#[derive(Debug, Clone, Copy)]
87pub struct WaitArg {
88    /// The raw fd of the operation.
89    pub fd: RawFd,
90    /// The interest to be registered.
91    pub interest: Interest,
92}
93
94/// Meta of AIO operations.
95#[cfg(aio)]
96#[derive(Debug, Clone, Copy)]
97pub struct AioControl {
98    /// Pointer of the control block.
99    pub aiocbp: NonNull<libc::aiocb>,
100    /// The aio_* submit function.
101    pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
102}
103
104#[derive(Debug, Default)]
105struct FdQueue {
106    read_queue: VecDeque<usize>,
107    write_queue: VecDeque<usize>,
108}
109
110impl FdQueue {
111    pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) {
112        match interest {
113            Interest::Readable => self.read_queue.push_back(user_data),
114            Interest::Writable => self.write_queue.push_back(user_data),
115        }
116    }
117
118    pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) {
119        match interest {
120            Interest::Readable => self.read_queue.push_front(user_data),
121            Interest::Writable => self.write_queue.push_front(user_data),
122        }
123    }
124
125    pub fn remove(&mut self, user_data: usize) {
126        self.read_queue.retain(|&k| k != user_data);
127        self.write_queue.retain(|&k| k != user_data);
128    }
129
130    pub fn event(&self) -> Event {
131        let mut event = Event::none(0);
132        if let Some(&key) = self.read_queue.front() {
133            event.readable = true;
134            event.key = key;
135        }
136        if let Some(&key) = self.write_queue.front() {
137            event.writable = true;
138            event.key = key;
139        }
140        event
141    }
142
143    pub fn pop_interest(&mut self, event: &Event) -> Option<(usize, Interest)> {
144        if event.readable {
145            if let Some(user_data) = self.read_queue.pop_front() {
146                return Some((user_data, Interest::Readable));
147            }
148        }
149        if event.writable {
150            if let Some(user_data) = self.write_queue.pop_front() {
151                return Some((user_data, Interest::Writable));
152            }
153        }
154        None
155    }
156}
157
158/// Represents the filter type of kqueue. `polling` crate doesn't expose such
159/// API, and we need to know about it when `cancel` is called.
160#[non_exhaustive]
161pub enum OpType {
162    /// The operation polls an fd.
163    Fd(RawFd),
164    /// The operation submits an AIO.
165    #[cfg(aio)]
166    Aio(NonNull<libc::aiocb>),
167}
168
169/// Low-level driver of polling.
170pub(crate) struct Driver {
171    events: Events,
172    poll: Arc<Poller>,
173    registry: HashMap<RawFd, FdQueue>,
174    pool: AsyncifyPool,
175    pool_completed: Arc<SegQueue<Entry>>,
176}
177
178impl Driver {
179    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
180        instrument!(compio_log::Level::TRACE, "new", ?builder);
181        trace!("new poll driver");
182        let entries = builder.capacity as usize; // for the sake of consistency, use u32 like iour
183        let events = if entries == 0 {
184            Events::new()
185        } else {
186            Events::with_capacity(NonZeroUsize::new(entries).unwrap())
187        };
188
189        let poll = Arc::new(Poller::new()?);
190
191        Ok(Self {
192            events,
193            poll,
194            registry: HashMap::new(),
195            pool: builder.create_or_get_thread_pool(),
196            pool_completed: Arc::new(SegQueue::new()),
197        })
198    }
199
200    pub fn create_op<T: crate::sys::OpCode + 'static>(&self, op: T) -> Key<T> {
201        Key::new(self.as_raw_fd(), op)
202    }
203
204    /// # Safety
205    /// The input fd should be valid.
206    unsafe fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> {
207        let need_add = !self.registry.contains_key(&arg.fd);
208        let queue = self.registry.entry(arg.fd).or_default();
209        queue.push_back_interest(user_data, arg.interest);
210        let event = queue.event();
211        if need_add {
212            self.poll.add(arg.fd, event)?;
213        } else {
214            let fd = BorrowedFd::borrow_raw(arg.fd);
215            self.poll.modify(fd, event)?;
216        }
217        Ok(())
218    }
219
220    fn renew(
221        poll: &Poller,
222        registry: &mut HashMap<RawFd, FdQueue>,
223        fd: BorrowedFd,
224        renew_event: Event,
225    ) -> io::Result<()> {
226        if !renew_event.readable && !renew_event.writable {
227            poll.delete(fd)?;
228            registry.remove(&fd.as_raw_fd());
229        } else {
230            poll.modify(fd, renew_event)?;
231        }
232        Ok(())
233    }
234
235    pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
236        Ok(())
237    }
238
239    pub fn cancel(&mut self, op: &mut Key<dyn crate::sys::OpCode>) {
240        let op_pin = op.as_op_pin();
241        match op_pin.op_type() {
242            None => {}
243            Some(OpType::Fd(fd)) => {
244                let queue = self
245                    .registry
246                    .get_mut(&fd)
247                    .expect("the fd should be attached");
248                queue.remove(op.user_data());
249                let renew_event = queue.event();
250                if Self::renew(
251                    &self.poll,
252                    &mut self.registry,
253                    unsafe { BorrowedFd::borrow_raw(fd) },
254                    renew_event,
255                )
256                .is_ok()
257                {
258                    self.pool_completed.push(entry_cancelled(op.user_data()));
259                }
260            }
261            #[cfg(aio)]
262            Some(OpType::Aio(aiocbp)) => {
263                let aiocb = unsafe { aiocbp.as_ref() };
264                let fd = aiocb.aio_fildes;
265                syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
266            }
267        }
268    }
269
270    pub fn push(&mut self, op: &mut Key<dyn crate::sys::OpCode>) -> Poll<io::Result<usize>> {
271        instrument!(compio_log::Level::TRACE, "push", ?op);
272        let user_data = op.user_data();
273        let op_pin = op.as_op_pin();
274        match op_pin.pre_submit()? {
275            Decision::Wait(arg) => {
276                // SAFETY: fd is from the OpCode.
277                unsafe {
278                    self.submit(user_data, arg)?;
279                }
280                trace!("register {:?}", arg);
281                Poll::Pending
282            }
283            Decision::Completed(res) => Poll::Ready(Ok(res)),
284            Decision::Blocking => self.push_blocking(user_data),
285            #[cfg(aio)]
286            Decision::Aio(AioControl { mut aiocbp, submit }) => {
287                let aiocb = unsafe { aiocbp.as_mut() };
288                #[cfg(freebsd)]
289                {
290                    // sigev_notify_kqueue
291                    aiocb.aio_sigevent.sigev_signo = self.poll.as_raw_fd();
292                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
293                    aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
294                }
295                #[cfg(solarish)]
296                let mut notify = libc::port_notify {
297                    portnfy_port: self.poll.as_raw_fd(),
298                    portnfy_user: user_data as _,
299                };
300                #[cfg(solarish)]
301                {
302                    aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
303                    aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
304                }
305                match syscall!(submit(aiocbp.as_ptr())) {
306                    Ok(_) => Poll::Pending,
307                    // FreeBSD:
308                    //   * EOPNOTSUPP: It's on a filesystem without AIO support. Just fallback to
309                    //     blocking IO.
310                    //   * EAGAIN: The process-wide queue is full. No safe way to remove the (maybe)
311                    //     dead entries.
312                    // Solarish:
313                    //   * EAGAIN: Allocation failed.
314                    Err(e)
315                        if matches!(
316                            e.raw_os_error(),
317                            Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
318                        ) =>
319                    {
320                        self.push_blocking(user_data)
321                    }
322                    Err(e) => Poll::Ready(Err(e)),
323                }
324            }
325        }
326    }
327
328    fn push_blocking(&mut self, user_data: usize) -> Poll<io::Result<usize>> {
329        let poll = self.poll.clone();
330        let completed = self.pool_completed.clone();
331        let mut closure = move || {
332            let mut op = unsafe { Key::<dyn crate::sys::OpCode>::new_unchecked(user_data) };
333            let op_pin = op.as_op_pin();
334            let res = match op_pin.operate() {
335                Poll::Pending => unreachable!("this operation is not non-blocking"),
336                Poll::Ready(res) => res,
337            };
338            completed.push(Entry::new(user_data, res));
339            poll.notify().ok();
340        };
341        loop {
342            match self.pool.dispatch(closure) {
343                Ok(()) => return Poll::Pending,
344                Err(e) => {
345                    closure = e.0;
346                    self.poll_blocking();
347                }
348            }
349        }
350    }
351
352    fn poll_blocking(&mut self) -> bool {
353        if self.pool_completed.is_empty() {
354            return false;
355        }
356        while let Some(entry) = self.pool_completed.pop() {
357            unsafe {
358                entry.notify();
359            }
360        }
361        true
362    }
363
364    pub unsafe fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
365        instrument!(compio_log::Level::TRACE, "poll", ?timeout);
366        if self.poll_blocking() {
367            return Ok(());
368        }
369        self.events.clear();
370        self.poll.wait(&mut self.events, timeout)?;
371        if self.events.is_empty() && timeout.is_some() {
372            return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
373        }
374        for event in self.events.iter() {
375            let user_data = event.key;
376            trace!("receive {} for {:?}", user_data, event);
377            let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
378            let op = op.as_op_pin();
379            match op.op_type() {
380                None => {
381                    // On epoll, multiple event may be received even if it is registered as
382                    // one-shot. It is safe to ignore it.
383                    trace!("op {} is completed", user_data);
384                }
385                Some(OpType::Fd(fd)) => {
386                    // If it's an FD op, the returned user_data is only for calling `op_type`. We
387                    // need to pop the real user_data from the queue.
388                    let queue = self
389                        .registry
390                        .get_mut(&fd)
391                        .expect("the fd should be attached");
392                    if let Some((user_data, interest)) = queue.pop_interest(&event) {
393                        let mut op = Key::<dyn crate::sys::OpCode>::new_unchecked(user_data);
394                        let op = op.as_op_pin();
395                        let res = match op.operate() {
396                            Poll::Pending => {
397                                // The operation should go back to the front.
398                                queue.push_front_interest(user_data, interest);
399                                None
400                            }
401                            Poll::Ready(res) => Some(res),
402                        };
403                        if let Some(res) = res {
404                            Entry::new(user_data, res).notify();
405                        }
406                    }
407                    let renew_event = queue.event();
408                    Self::renew(
409                        &self.poll,
410                        &mut self.registry,
411                        BorrowedFd::borrow_raw(fd),
412                        renew_event,
413                    )?;
414                }
415                #[cfg(aio)]
416                Some(OpType::Aio(aiocbp)) => {
417                    let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
418                    let res = match err {
419                        // If the user_data is reused but the previously registered event still
420                        // emits (for example, HUP in epoll; however it is impossible now
421                        // because we only use AIO on FreeBSD), we'd better ignore the current
422                        // one and wait for the real event.
423                        libc::EINPROGRESS => {
424                            trace!("op {} is not completed", user_data);
425                            continue;
426                        }
427                        libc::ECANCELED => {
428                            // Remove the aiocb from kqueue.
429                            libc::aio_return(aiocbp.as_ptr());
430                            Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
431                        }
432                        _ => syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize),
433                    };
434                    Entry::new(user_data, res).notify();
435                }
436            }
437        }
438        Ok(())
439    }
440
441    pub fn handle(&self) -> NotifyHandle {
442        NotifyHandle::new(self.poll.clone())
443    }
444
445    pub fn create_buffer_pool(
446        &mut self,
447        buffer_len: u16,
448        buffer_size: usize,
449    ) -> io::Result<BufferPool> {
450        #[cfg(fusion)]
451        {
452            Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
453                buffer_len,
454                buffer_size,
455            )))
456        }
457        #[cfg(not(fusion))]
458        {
459            Ok(BufferPool::new(buffer_len, buffer_size))
460        }
461    }
462
463    /// # Safety
464    ///
465    /// caller must make sure release the buffer pool with correct driver
466    pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
467        Ok(())
468    }
469}
470
471impl AsRawFd for Driver {
472    fn as_raw_fd(&self) -> RawFd {
473        self.poll.as_raw_fd()
474    }
475}
476
477impl Drop for Driver {
478    fn drop(&mut self) {
479        for fd in self.registry.keys() {
480            unsafe {
481                let fd = BorrowedFd::borrow_raw(*fd);
482                self.poll.delete(fd).ok();
483            }
484        }
485    }
486}
487
488fn entry_cancelled(user_data: usize) -> Entry {
489    Entry::new(
490        user_data,
491        Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)),
492    )
493}
494
495/// A notify handle to the inner driver.
496pub struct NotifyHandle {
497    poll: Arc<Poller>,
498}
499
500impl NotifyHandle {
501    fn new(poll: Arc<Poller>) -> Self {
502        Self { poll }
503    }
504
505    /// Notify the inner driver.
506    pub fn notify(&self) -> io::Result<()> {
507        self.poll.notify()
508    }
509}