Skip to main content

winio_pollable/
lib.rs

1//! A pollable runtime based on [`compio`]. It is just a compio runtime except
2//! Linux, where there is also an eventfd if the driver is io-uring. This crate
3//! ensures that the raw fd returned by [`RawFd::as_raw_fd`] is always actively
4//! pollable.
5
6#![warn(missing_docs)]
7
8#[cfg(target_os = "linux")]
9use std::os::fd::OwnedFd;
10use std::{
11    future::Future,
12    io,
13    ops::{Deref, DerefMut},
14    time::Duration,
15};
16
17use compio::driver::{AsRawFd, RawFd};
18
19/// See [`Runtime`]
20///
21/// [`Runtime`]: compio::runtime::Runtime
22pub struct Runtime {
23    runtime: compio::runtime::Runtime,
24    #[cfg(target_os = "linux")]
25    efd: Option<OwnedFd>,
26}
27
28#[cfg(target_os = "linux")]
29impl Runtime {
30    /// Create [`Runtime`].
31    pub fn new() -> io::Result<Self> {
32        use rustix::event::{EventfdFlags, eventfd};
33        let efd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
34        let mut builder = compio::driver::ProactorBuilder::new();
35        builder.register_eventfd(efd.as_raw_fd());
36        let runtime = compio::runtime::RuntimeBuilder::new()
37            .with_proactor(builder)
38            .build()?;
39        let efd = if runtime.driver_type().is_iouring() {
40            Some(efd)
41        } else {
42            None
43        };
44        Ok(Self { runtime, efd })
45    }
46
47    /// Clear the eventfd, if possible.
48    pub(crate) fn clear(&self) -> io::Result<()> {
49        if let Some(efd) = &self.efd {
50            let mut buf = [0u8; 8];
51            rustix::io::read(efd, &mut buf)?;
52        }
53        Ok(())
54    }
55}
56
57#[cfg(not(target_os = "linux"))]
58impl Runtime {
59    /// Create [`Runtime`].
60    pub fn new() -> io::Result<Self> {
61        Ok(Self {
62            runtime: compio::runtime::Runtime::new()?,
63        })
64    }
65
66    /// Clear the eventfd, if possible.
67    pub(crate) fn clear(&self) -> io::Result<()> {
68        Ok(())
69    }
70}
71
72impl Runtime {
73    /// Poll the runtime. Returns the next timeout.
74    pub fn poll_and_run(&self) -> Option<Duration> {
75        self.runtime.poll_with(Some(Duration::ZERO));
76
77        let remaining_tasks = self.runtime.run();
78
79        if remaining_tasks {
80            Some(Duration::ZERO)
81        } else {
82            self.runtime.current_timeout()
83        }
84    }
85
86    /// Block on the future till it completes. Users should enter the runtime
87    /// before calling this function, and poll the runtime themselves.
88    pub fn block_on<F: Future>(&self, future: F, poll: impl Fn(Option<Duration>)) -> F::Output {
89        let mut result = None;
90        unsafe {
91            self.runtime
92                .spawn_unchecked(async { result = Some(future.await) })
93        }
94        .detach();
95        loop {
96            let timeout = self.poll_and_run();
97            if let Some(result) = result.take() {
98                break result;
99            }
100
101            poll(timeout);
102
103            self.clear().ok();
104        }
105    }
106}
107
108impl Deref for Runtime {
109    type Target = compio::runtime::Runtime;
110
111    fn deref(&self) -> &Self::Target {
112        &self.runtime
113    }
114}
115
116impl DerefMut for Runtime {
117    fn deref_mut(&mut self) -> &mut Self::Target {
118        &mut self.runtime
119    }
120}
121
122impl AsRawFd for Runtime {
123    fn as_raw_fd(&self) -> RawFd {
124        #[cfg(target_os = "linux")]
125        {
126            self.efd
127                .as_ref()
128                .map(|f| f.as_raw_fd())
129                .unwrap_or_else(|| self.runtime.as_raw_fd())
130        }
131        #[cfg(not(target_os = "linux"))]
132        {
133            self.runtime.as_raw_fd()
134        }
135    }
136}