mfio/backend/
mod.rs

1//! Backends for `mfio`.
2//!
3//! A backend is a stateful object that can be used to resolve a future to completion, either by
4//! blocking execution, or, exposing a handle, which can then be integrated into other asynchronous
5//! runtimes through [integrations].
6
7#[cfg(not(feature = "std"))]
8use crate::std_prelude::*;
9
10use crate::poller::{self, ParkHandle};
11
12use core::cell::UnsafeCell;
13use core::future::Future;
14use core::pin::Pin;
15use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
16use core::task::{Context, Poll, Waker};
17use tarc::Arc;
18
19pub mod integrations;
20
21pub use integrations::null::{Null, NullImpl};
22pub use integrations::Integration;
23
24#[cfg(all(unix, feature = "std"))]
25use nix::poll::*;
26#[cfg(all(unix, feature = "std"))]
27use std::os::fd::RawFd;
28#[cfg(all(windows, feature = "std"))]
29use std::os::windows::io::RawHandle;
30
31#[cfg(all(any(unix, target_os = "wasi"), feature = "std"))]
32#[cfg_attr(docsrs, doc(cfg(all(any(unix, target_os = "wasi"), feature = "std"))))]
33pub mod fd;
34
35#[cfg(all(windows, feature = "std"))]
36#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "std"))))]
37pub mod handle;
38
39#[cfg(all(windows, feature = "std"))]
40#[cfg_attr(docsrs, doc(cfg(windows)))]
41pub mod windows;
42
43// TODO: rename DefaultHandle to OsHandle, and get rid of Infallible one.
44
45#[cfg(all(unix, feature = "std"))]
46pub type DefaultHandle = RawFd;
47#[cfg(all(windows, feature = "std"))]
48pub type DefaultHandle = RawHandle;
49#[cfg(not(feature = "std"))]
50pub type DefaultHandle = core::convert::Infallible;
51
52pub type DynBackend = dyn Future<Output = ()> + Send;
53
54#[repr(C)]
55struct NestedBackend {
56    owner: *const (),
57    poll: unsafe extern "C" fn(*const (), &mut Context),
58    release: unsafe extern "C" fn(*const ()),
59}
60
61/// Stores a backend.
62///
63/// This type is always stored on backends, and is acquired by users in [`IoBackend::get_backend`].
64/// A backend can only be acquired once at a time, however, it does not matter who does it.
65///
66/// Once the backend is acquired, it can be used to drive I/O to completion.
67#[repr(C)]
68pub struct BackendContainer<B: ?Sized> {
69    nest: UnsafeCell<Option<NestedBackend>>,
70    backend: UnsafeCell<Pin<Box<B>>>,
71    lock: AtomicBool,
72}
73
74unsafe impl<B: ?Sized + Send> Send for BackendContainer<B> {}
75unsafe impl<B: ?Sized + Send> Sync for BackendContainer<B> {}
76
77impl<B: ?Sized> BackendContainer<B> {
78    /// Acquire a backend.
79    ///
80    /// This function locks the backend and the returned handle keeps it locked, until the handle
81    /// gets released.
82    ///
83    /// # Panics
84    ///
85    /// Panics if the backend has already been acquired.
86    pub fn acquire(&self, wake_flags: Option<Arc<AtomicU8>>) -> BackendHandle<B> {
87        if self.lock.swap(true, Ordering::AcqRel) {
88            panic!("Tried to acquire backend twice!");
89        }
90
91        let backend = unsafe { &mut *self.backend.get() }.as_mut();
92
93        BackendHandle {
94            owner: self,
95            backend,
96            wake_flags,
97        }
98    }
99
100    /// Acquires a backend in nested mode.
101    ///
102    /// This function is useful when layered I/O backends are desirable. When polling, first, this
103    /// backend will be polled, and afterwards, the provided handle. The ordering is consistent
104    /// with the behavior of first polling the user's future, and then polling the backend. In the
105    /// end, backends will be peeled off layer by layer, until the innermost backend is reached.
106    pub fn acquire_nested<B2: ?Sized + Future<Output = ()>>(
107        &self,
108        mut handle: BackendHandle<B2>,
109    ) -> BackendHandle<B> {
110        let wake_flags = handle.wake_flags.take();
111        let owner = handle.owner;
112
113        let our_handle = self.acquire(wake_flags);
114
115        unsafe extern "C" fn poll<B: ?Sized + Future<Output = ()>>(
116            data: *const (),
117            context: &mut Context,
118        ) {
119            let data = &*(data as *const BackendContainer<B>);
120            if Pin::new_unchecked(&mut *data.backend.get())
121                .poll(context)
122                .is_ready()
123            {
124                panic!("Backend polled to completion!")
125            }
126        }
127
128        unsafe extern "C" fn release<B: ?Sized>(data: *const ()) {
129            let data = &*(data as *const BackendContainer<B>);
130            data.lock.store(false, Ordering::Release);
131        }
132
133        // We must prevent drop from being called, since we are replacing the release mechanism
134        // ourselves.
135        core::mem::forget(handle);
136
137        unsafe {
138            *self.nest.get() = Some(NestedBackend {
139                owner: owner as *const _ as *const (),
140                poll: poll::<B2>,
141                release: release::<B2>,
142            });
143        }
144
145        our_handle
146    }
147}
148
149impl BackendContainer<DynBackend> {
150    /// Creates a new [`DynBackend`] container.
151    pub fn new_dyn<T: Future<Output = ()> + Send + 'static>(backend: T) -> Self {
152        Self {
153            backend: UnsafeCell::new(Box::pin(backend) as Pin<Box<dyn Future<Output = ()> + Send>>),
154            nest: UnsafeCell::new(None),
155            lock: Default::default(),
156        }
157    }
158}
159
160/// Handle to a backend.
161///
162/// This handle can be used to drive arbitrary future to completion by attaching a backend to it.
163/// This is typically done using [`WithBackend`] that is constructed in
164/// [`IoBackendExt::with_backend`].
165///
166/// Usually, the user would want to bypass this type and use [`IoBackendExt::block_on`], or an
167/// [`Integration`] equivalent.
168pub struct BackendHandle<'a, B: ?Sized> {
169    owner: &'a BackendContainer<B>,
170    backend: Pin<&'a mut B>,
171    wake_flags: Option<Arc<AtomicU8>>,
172}
173
174impl<'a, B: ?Sized> Drop for BackendHandle<'a, B> {
175    fn drop(&mut self) {
176        // SAFETY: we are still holding the lock to this data
177        if let Some(NestedBackend { owner, release, .. }) =
178            unsafe { (*self.owner.nest.get()).take() }
179        {
180            // SAFETY: this structure is constructed only in acquire_nested. We assume it
181            // constructs the structure correctly.
182            unsafe { release(owner) }
183        }
184
185        self.owner.lock.store(false, Ordering::Release);
186    }
187}
188
189impl<'a, B: ?Sized> core::ops::Deref for BackendHandle<'a, B> {
190    type Target = Pin<&'a mut B>;
191
192    fn deref(&self) -> &Self::Target {
193        &self.backend
194    }
195}
196
197impl<'a, B: ?Sized> core::ops::DerefMut for BackendHandle<'a, B> {
198    fn deref_mut(&mut self) -> &mut Self::Target {
199        &mut self.backend
200    }
201}
202
203/// Future combined with a backend.
204///
205/// This future can be used to drive arbitrary future to completion by attaching a backend to it.
206/// Construct this type using [`IoBackendExt::with_backend`].
207///
208/// Usually, the user would want to bypass this type and use [`IoBackendExt::block_on`], or an
209/// [`Integration`] equivalent.
210pub struct WithBackend<'a, Backend: ?Sized, Fut: ?Sized> {
211    backend: BackendHandle<'a, Backend>,
212    future: Fut,
213}
214
215impl<'a, Backend: Future + ?Sized, Fut: Future + ?Sized> Future for WithBackend<'a, Backend, Fut> {
216    type Output = Fut::Output;
217
218    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
219        let this = unsafe { self.get_unchecked_mut() };
220
221        loop {
222            this.backend
223                .wake_flags
224                .as_ref()
225                .map(|v| v.fetch_or(0b10, Ordering::AcqRel));
226            let fut = unsafe { Pin::new_unchecked(&mut this.future) };
227            let backend = this.backend.as_mut();
228
229            match fut.poll(cx) {
230                Poll::Ready(v) => {
231                    if let Some(v) = this.backend.wake_flags.as_ref() {
232                        v.store(0, Ordering::Release);
233                    }
234                    break Poll::Ready(v);
235                }
236                Poll::Pending => match backend.poll(cx) {
237                    Poll::Ready(_) => panic!("Backend future completed"),
238                    Poll::Pending => {
239                        // SAFETY: we are holding the lock to the backend.
240                        if let Some(NestedBackend { owner, poll, .. }) =
241                            unsafe { &*this.backend.owner.nest.get() }
242                        {
243                            // SAFETY: this structure is constructed only in acquire_nested. We
244                            // assume it constructs the structure correctly.
245                            unsafe { poll(*owner, cx) };
246                        }
247                    }
248                },
249            }
250
251            if this
252                .backend
253                .wake_flags
254                .as_ref()
255                .map(|v| v.fetch_and(0b0, Ordering::AcqRel) & 0b1)
256                .unwrap_or(0)
257                == 0
258            {
259                break Poll::Pending;
260            }
261        }
262    }
263}
264
265/// Cooperative polling handle.
266///
267/// This handle contains a handle and necessary metadata needed to cooperatively drive mfio code to
268/// completion.
269///
270/// This handle is typically created on the [`IoBackend`] side.
271pub struct PollingHandle<'a, Handle = DefaultHandle> {
272    pub handle: Handle,
273    pub cur_flags: &'a PollingFlags,
274    pub max_flags: PollingFlags,
275    pub waker: Waker,
276}
277
278/// Represents desired object state flags to poll for.
279///
280/// Different backends may expose handles with different requirements. Some handles, when polled
281/// with incorrect flags may return an error, meaning it is crucial for the caller to pass correct
282/// flags in.
283///
284/// This is an object accessible from IoBackends that describes these flags. The object is designed
285/// so that these flags may get modified on the fly. Note that there is no encapsulation, so the
286/// caller should take great care in ensuring they do not modify these flags in breaking manner.
287/// However, doing so should not result in undefined behavior.
288#[repr(transparent)]
289pub struct PollingFlags {
290    flags: AtomicU8,
291}
292
293const READ_POLL: u8 = 0b1;
294const WRITE_POLL: u8 = 0b10;
295
296impl PollingFlags {
297    const fn from_flags(flags: u8) -> Self {
298        Self {
299            flags: AtomicU8::new(flags),
300        }
301    }
302
303    /// Builds a new `PollingFlags` with no bits set.
304    pub const fn new() -> Self {
305        Self {
306            flags: AtomicU8::new(0),
307        }
308    }
309
310    /// Builds a new `PollingFlags` with all bits set.
311    pub const fn all() -> Self {
312        Self {
313            flags: AtomicU8::new(!0),
314        }
315    }
316
317    /// Consumes and returns a new `PollingFlags` with read bit set to specified value.
318    pub const fn read(self, val: bool) -> Self {
319        // SAFETY: data layout matches perfectly
320        // We need this since AtomicU8::into_inner is not const stable yet.
321        let mut flags = unsafe { core::mem::transmute(self) };
322        if val {
323            flags |= READ_POLL;
324        } else {
325            flags &= !READ_POLL;
326        }
327        Self::from_flags(flags)
328    }
329
330    /// Consumes and returns a new `PollingFlags` with write bit set to specified value.
331    pub const fn write(self, val: bool) -> Self {
332        // SAFETY: data layout matches perfectly
333        let mut flags = unsafe { core::mem::transmute(self) };
334        if val {
335            flags |= WRITE_POLL;
336        } else {
337            flags &= !WRITE_POLL;
338        }
339        Self::from_flags(flags)
340    }
341
342    /// Updates the read bit in-place.
343    pub fn set_read(&self, val: bool) {
344        if val {
345            self.flags.fetch_or(READ_POLL, Ordering::Relaxed);
346        } else {
347            self.flags.fetch_and(!READ_POLL, Ordering::Relaxed);
348        }
349    }
350
351    /// Updates the write bit in-place.
352    pub fn set_write(&self, val: bool) {
353        if val {
354            self.flags.fetch_or(WRITE_POLL, Ordering::Relaxed);
355        } else {
356            self.flags.fetch_and(!WRITE_POLL, Ordering::Relaxed);
357        }
358    }
359
360    /// Returns the values of current read and write bits.
361    pub fn get(&self) -> (bool, bool) {
362        let bits = self.flags.load(Ordering::Relaxed);
363        (bits & READ_POLL != 0, bits & WRITE_POLL != 0)
364    }
365
366    /// Converts these flags into posix PollFlags.
367    #[cfg(all(unix, feature = "std"))]
368    pub fn to_posix(&self) -> PollFlags {
369        let mut flags = PollFlags::empty();
370        // Relaxed is okay, because flags are meant to be set only by the owner of these flags, who
371        // we are going to poll on behalf of.
372        let bits = self.flags.load(Ordering::Relaxed);
373        if bits & READ_POLL != 0 {
374            flags.set(PollFlags::POLLIN, true);
375        }
376        if bits & WRITE_POLL != 0 {
377            flags.set(PollFlags::POLLIN, true);
378        }
379        flags
380    }
381}
382
383/// Primary trait describing I/O backends.
384///
385/// This trait is implemented at the outer-most stateful object of the I/O context. A `IoBackend`
386/// has the opportunity to expose efficient ways of driving said backend to completion.
387///
388/// Users may want to call methods available on [`IoBackendExt`], instead of the ones on this
389/// trait.
390pub trait IoBackend<Handle: Pollable = DefaultHandle> {
391    type Backend: Future<Output = ()> + Send + ?Sized;
392
393    /// Gets handle to the backing event system.
394    ///
395    /// This function returns a handle and a waker. The handle is a `RawFd` on Unix systems, and a
396    /// `RawHandle` on Windows. This handle is meant to be polled/waited on by the system.
397    ///
398    /// The waker is opaque, but should unblock the handle once signaled.
399    ///
400    /// If the function returns `None`, then it can be assumed that the backend will wake any waker
401    /// up with other mechanism (such as auxiliary thread), and that the polling implementation can
402    /// simply park the thread.
403    fn polling_handle(&self) -> Option<PollingHandle>;
404
405    /// Acquires exclusive handle to IO backend.
406    ///
407    /// # Panics
408    ///
409    /// This function panics when multiple backend handles are attempted to be acquired. This
410    /// function does not return an `Option`, because such case usually indicates a bug in the
411    /// code.
412    fn get_backend(&self) -> BackendHandle<Self::Backend>;
413}
414
415/// Helpers for [`IoBackend`].
416pub trait IoBackendExt<Handle: Pollable>: IoBackend<Handle> {
417    /// Builds a composite future that also polls the backend future.
418    ///
419    /// If second tuple element is not `None`, then the caller is responsible for registering and
420    /// handling read-readiness events.
421    fn with_backend<F: Future>(
422        &self,
423        future: F,
424    ) -> (WithBackend<Self::Backend, F>, Option<PollingHandle>) {
425        (
426            WithBackend {
427                backend: self.get_backend(),
428                future,
429            },
430            self.polling_handle(),
431        )
432    }
433
434    /// Executes a future to completion.
435    ///
436    /// This function uses mfio's mini-executor that is able to resolve an arbitrary future that is
437    /// either awoken externally, or through exported handle's readiness events.
438    fn block_on<F: Future>(&self, fut: F) -> F::Output {
439        let backend = self.get_backend();
440        let polling = self.polling_handle();
441        block_on::<Handle, F, Self>(fut, backend, polling)
442    }
443}
444
445impl<T: ?Sized + IoBackend<Handle>, Handle: Pollable> IoBackendExt<Handle> for T {}
446
447/// Represents types that contain an `IoBackend`.
448pub trait LinksIoBackend {
449    type Link: IoBackend + ?Sized;
450
451    fn get_mut(&self) -> &Self::Link;
452}
453
454impl<T: IoBackend> LinksIoBackend for T {
455    type Link = Self;
456
457    fn get_mut(&self) -> &Self::Link {
458        self
459    }
460}
461
462/// `IoBackend` wrapper for references.
463pub struct RefLink<'a, T: ?Sized>(&'a T);
464
465impl<'a, T: IoBackend + ?Sized> LinksIoBackend for RefLink<'a, T> {
466    type Link = T;
467
468    fn get_mut(&self) -> &Self::Link {
469        self.0
470    }
471}
472
473pub fn block_on<H: Pollable, F: Future, B: IoBackend<H> + ?Sized>(
474    future: F,
475    backend: BackendHandle<B::Backend>,
476    polling: Option<PollingHandle>,
477) -> F::Output {
478    let fut = WithBackend { backend, future };
479
480    if let Some(handle) = polling {
481        poller::block_on_handle(fut, &handle, &handle.waker)
482    } else {
483        poller::block_on(fut)
484    }
485}
486
487impl<H: Pollable> ParkHandle for PollingHandle<'_, H> {
488    fn unpark(&self) {
489        self.waker.wake_by_ref();
490    }
491
492    fn park(&self) {
493        self.handle.poll(self.cur_flags)
494    }
495}
496
497/// A pollable handle.
498///
499/// Implementing this trait on a custom type, allows one to build custom cooperation mechanisms for
500/// different operating environments.
501pub trait Pollable {
502    fn poll(&self, flags: &PollingFlags);
503}
504
505#[cfg(any(miri, not(feature = "std")))]
506impl Pollable for DefaultHandle {
507    fn poll(&self, _: &PollingFlags) {
508        unimplemented!("Polling on requires std feature, and not be run on miri")
509    }
510}
511
512#[cfg(all(not(miri), unix, feature = "std"))]
513#[cfg_attr(docsrs, doc(cfg(all(not(miri), feature = "std"))))]
514impl Pollable for DefaultHandle {
515    fn poll(&self, flags: &PollingFlags) {
516        let fd = PollFd::new(*self, flags.to_posix());
517        let _ = poll(&mut [fd], -1);
518    }
519}
520
521#[cfg(all(not(miri), windows, feature = "std"))]
522#[cfg_attr(docsrs, doc(cfg(all(not(miri), feature = "std"))))]
523impl Pollable for DefaultHandle {
524    fn poll(&self, _: &PollingFlags) {
525        use windows_sys::Win32::System::Threading::{WaitForSingleObject, INFINITE};
526        let _ = unsafe { WaitForSingleObject(*self as _, INFINITE) };
527    }
528}