winio_pollable/
lib.rs

1//! A pollable runtime based on [`compio`]. It is just a compio runtime except
2//! Linux, where there is also an eventfd if the driver is io-uring. This crate
3//! ensures that the raw fd returned by [`RawFd::as_raw_fd`] is always actively
4//! pollable.
5
6#![warn(missing_docs)]
7
8#[cfg(target_os = "linux")]
9use std::os::fd::OwnedFd;
10use std::{
11    future::Future,
12    io,
13    ops::{Deref, DerefMut},
14    time::Duration,
15};
16
17use compio::driver::{AsRawFd, RawFd};
18
19/// See [`Runtime`]
20///
21/// [`Runtime`]: compio::runtime::Runtime
22pub struct Runtime {
23    runtime: compio::runtime::Runtime,
24    #[cfg(target_os = "linux")]
25    efd: Option<OwnedFd>,
26}
27
28#[cfg(target_os = "linux")]
29impl Runtime {
30    /// Create [`Runtime`].
31    pub fn new() -> io::Result<Self> {
32        let efd = if compio::driver::DriverType::is_iouring() {
33            use rustix::event::{EventfdFlags, eventfd};
34            Some(eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?)
35        } else {
36            None
37        };
38        let mut builder = compio::driver::ProactorBuilder::new();
39        if let Some(fd) = &efd {
40            builder.register_eventfd(fd.as_raw_fd());
41        }
42        let runtime = compio::runtime::RuntimeBuilder::new()
43            .with_proactor(builder)
44            .build()?;
45        Ok(Self { runtime, efd })
46    }
47
48    /// Clear the eventfd, if possible.
49    pub(crate) fn clear(&self) -> io::Result<()> {
50        if let Some(efd) = &self.efd {
51            let mut buf = [0u8; 8];
52            rustix::io::read(efd, &mut buf)?;
53        }
54        Ok(())
55    }
56}
57
58#[cfg(not(target_os = "linux"))]
59impl Runtime {
60    /// Create [`Runtime`].
61    pub fn new() -> io::Result<Self> {
62        Ok(Self {
63            runtime: compio::runtime::Runtime::new()?,
64        })
65    }
66
67    /// Clear the eventfd, if possible.
68    pub(crate) fn clear(&self) -> io::Result<()> {
69        Ok(())
70    }
71}
72
73impl Runtime {
74    /// Block on the future till it completes. Users should enter the runtime
75    /// before calling this function, and poll the runtime themselves.
76    pub fn block_on<F: Future>(&self, future: F, poll: impl Fn(Option<Duration>)) -> F::Output {
77        let mut result = None;
78        unsafe {
79            self.runtime
80                .spawn_unchecked(async { result = Some(future.await) })
81        }
82        .detach();
83        loop {
84            self.runtime.poll_with(Some(Duration::ZERO));
85
86            let remaining_tasks = self.runtime.run();
87            if let Some(result) = result.take() {
88                break result;
89            }
90
91            let timeout = if remaining_tasks {
92                Some(Duration::ZERO)
93            } else {
94                self.runtime.current_timeout()
95            };
96
97            poll(timeout);
98
99            self.clear().ok();
100        }
101    }
102}
103
104impl Deref for Runtime {
105    type Target = compio::runtime::Runtime;
106
107    fn deref(&self) -> &Self::Target {
108        &self.runtime
109    }
110}
111
112impl DerefMut for Runtime {
113    fn deref_mut(&mut self) -> &mut Self::Target {
114        &mut self.runtime
115    }
116}
117
118impl AsRawFd for Runtime {
119    fn as_raw_fd(&self) -> RawFd {
120        #[cfg(target_os = "linux")]
121        {
122            self.efd
123                .as_ref()
124                .map(|f| f.as_raw_fd())
125                .unwrap_or_else(|| self.runtime.as_raw_fd())
126        }
127        #[cfg(not(target_os = "linux"))]
128        {
129            self.runtime.as_raw_fd()
130        }
131    }
132}