winio_pollable/
lib.rs

1//! A pollable runtime based on [`compio`]. It is just a compio runtime except
2//! Linux, where there is also an eventfd if the driver is io-uring. This crate
3//! ensures that the raw fd returned by [`RawFd::as_raw_fd`] is always actively
4//! pollable.
5
6#![warn(missing_docs)]
7
8#[cfg(target_os = "linux")]
9use std::os::fd::OwnedFd;
10use std::{
11    future::Future,
12    io,
13    ops::{Deref, DerefMut},
14    time::Duration,
15};
16
17use compio::driver::{AsRawFd, RawFd};
18
19/// See [`Runtime`]
20///
21/// [`Runtime`]: compio::runtime::Runtime
22pub struct Runtime {
23    runtime: compio::runtime::Runtime,
24    #[cfg(target_os = "linux")]
25    efd: Option<OwnedFd>,
26}
27
28#[cfg(target_os = "linux")]
29impl Runtime {
30    /// Create [`Runtime`].
31    pub fn new() -> io::Result<Self> {
32        use rustix::event::{EventfdFlags, eventfd};
33        let efd = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
34        let mut builder = compio::driver::ProactorBuilder::new();
35        builder.register_eventfd(efd.as_raw_fd());
36        let runtime = compio::runtime::RuntimeBuilder::new()
37            .with_proactor(builder)
38            .build()?;
39        let efd = if runtime.driver_type().is_iouring() {
40            Some(efd)
41        } else {
42            None
43        };
44        Ok(Self { runtime, efd })
45    }
46
47    /// Clear the eventfd, if possible.
48    pub(crate) fn clear(&self) -> io::Result<()> {
49        if let Some(efd) = &self.efd {
50            let mut buf = [0u8; 8];
51            rustix::io::read(efd, &mut buf)?;
52        }
53        Ok(())
54    }
55}
56
57#[cfg(not(target_os = "linux"))]
58impl Runtime {
59    /// Create [`Runtime`].
60    pub fn new() -> io::Result<Self> {
61        Ok(Self {
62            runtime: compio::runtime::Runtime::new()?,
63        })
64    }
65
66    /// Clear the eventfd, if possible.
67    pub(crate) fn clear(&self) -> io::Result<()> {
68        Ok(())
69    }
70}
71
72impl Runtime {
73    /// Block on the future till it completes. Users should enter the runtime
74    /// before calling this function, and poll the runtime themselves.
75    pub fn block_on<F: Future>(&self, future: F, poll: impl Fn(Option<Duration>)) -> F::Output {
76        let mut result = None;
77        unsafe {
78            self.runtime
79                .spawn_unchecked(async { result = Some(future.await) })
80        }
81        .detach();
82        loop {
83            self.runtime.poll_with(Some(Duration::ZERO));
84
85            let remaining_tasks = self.runtime.run();
86            if let Some(result) = result.take() {
87                break result;
88            }
89
90            let timeout = if remaining_tasks {
91                Some(Duration::ZERO)
92            } else {
93                self.runtime.current_timeout()
94            };
95
96            poll(timeout);
97
98            self.clear().ok();
99        }
100    }
101}
102
103impl Deref for Runtime {
104    type Target = compio::runtime::Runtime;
105
106    fn deref(&self) -> &Self::Target {
107        &self.runtime
108    }
109}
110
111impl DerefMut for Runtime {
112    fn deref_mut(&mut self) -> &mut Self::Target {
113        &mut self.runtime
114    }
115}
116
117impl AsRawFd for Runtime {
118    fn as_raw_fd(&self) -> RawFd {
119        #[cfg(target_os = "linux")]
120        {
121            self.efd
122                .as_ref()
123                .map(|f| f.as_raw_fd())
124                .unwrap_or_else(|| self.runtime.as_raw_fd())
125        }
126        #[cfg(not(target_os = "linux"))]
127        {
128            self.runtime.as_raw_fd()
129        }
130    }
131}