#[cfg(not(feature = "std"))]
use crate::std_prelude::*;
use crate::poller::{self, ParkHandle};
use core::cell::UnsafeCell;
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use core::task::{Context, Poll, Waker};
use tarc::Arc;
pub mod integrations;
pub use integrations::null::{Null, NullImpl};
pub use integrations::Integration;
#[cfg(all(unix, feature = "std"))]
use nix::poll::*;
#[cfg(all(unix, feature = "std"))]
use std::os::fd::RawFd;
#[cfg(all(windows, feature = "std"))]
use std::os::windows::io::RawHandle;
#[cfg(all(any(unix, target_os = "wasi"), feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(all(any(unix, target_os = "wasi"), feature = "std"))))]
pub mod fd;
#[cfg(all(windows, feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "std"))))]
pub mod handle;
#[cfg(all(windows, feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(windows)))]
pub mod windows;
#[cfg(all(unix, feature = "std"))]
pub type DefaultHandle = RawFd;
#[cfg(all(windows, feature = "std"))]
pub type DefaultHandle = RawHandle;
#[cfg(not(feature = "std"))]
pub type DefaultHandle = core::convert::Infallible;
pub type DynBackend = dyn Future<Output = ()> + Send;
#[repr(C)]
struct NestedBackend {
owner: *const (),
poll: unsafe extern "C" fn(*const (), &mut Context),
release: unsafe extern "C" fn(*const ()),
}
#[repr(C)]
pub struct BackendContainer<B: ?Sized> {
nest: UnsafeCell<Option<NestedBackend>>,
backend: UnsafeCell<Pin<Box<B>>>,
lock: AtomicBool,
}
unsafe impl<B: ?Sized + Send> Send for BackendContainer<B> {}
unsafe impl<B: ?Sized + Send> Sync for BackendContainer<B> {}
impl<B: ?Sized> BackendContainer<B> {
pub fn acquire(&self, wake_flags: Option<Arc<AtomicU8>>) -> BackendHandle<B> {
if self.lock.swap(true, Ordering::AcqRel) {
panic!("Tried to acquire backend twice!");
}
let backend = unsafe { &mut *self.backend.get() }.as_mut();
BackendHandle {
owner: self,
backend,
wake_flags,
}
}
pub fn acquire_nested<B2: ?Sized + Future<Output = ()>>(
&self,
mut handle: BackendHandle<B2>,
) -> BackendHandle<B> {
let wake_flags = handle.wake_flags.take();
let owner = handle.owner;
let our_handle = self.acquire(wake_flags);
unsafe extern "C" fn poll<B: ?Sized + Future<Output = ()>>(
data: *const (),
context: &mut Context,
) {
let data = &*(data as *const BackendContainer<B>);
if Pin::new_unchecked(&mut *data.backend.get())
.poll(context)
.is_ready()
{
panic!("Backend polled to completion!")
}
}
unsafe extern "C" fn release<B: ?Sized>(data: *const ()) {
let data = &*(data as *const BackendContainer<B>);
data.lock.store(false, Ordering::Release);
}
core::mem::forget(handle);
unsafe {
*self.nest.get() = Some(NestedBackend {
owner: owner as *const _ as *const (),
poll: poll::<B2>,
release: release::<B2>,
});
}
our_handle
}
}
impl BackendContainer<DynBackend> {
pub fn new_dyn<T: Future<Output = ()> + Send + 'static>(backend: T) -> Self {
Self {
backend: UnsafeCell::new(Box::pin(backend) as Pin<Box<dyn Future<Output = ()> + Send>>),
nest: UnsafeCell::new(None),
lock: Default::default(),
}
}
}
pub struct BackendHandle<'a, B: ?Sized> {
owner: &'a BackendContainer<B>,
backend: Pin<&'a mut B>,
wake_flags: Option<Arc<AtomicU8>>,
}
impl<'a, B: ?Sized> Drop for BackendHandle<'a, B> {
fn drop(&mut self) {
if let Some(NestedBackend { owner, release, .. }) =
unsafe { (*self.owner.nest.get()).take() }
{
unsafe { release(owner) }
}
self.owner.lock.store(false, Ordering::Release);
}
}
impl<'a, B: ?Sized> core::ops::Deref for BackendHandle<'a, B> {
type Target = Pin<&'a mut B>;
fn deref(&self) -> &Self::Target {
&self.backend
}
}
impl<'a, B: ?Sized> core::ops::DerefMut for BackendHandle<'a, B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.backend
}
}
pub struct WithBackend<'a, Backend: ?Sized, Fut: ?Sized> {
backend: BackendHandle<'a, Backend>,
future: Fut,
}
impl<'a, Backend: Future + ?Sized, Fut: Future + ?Sized> Future for WithBackend<'a, Backend, Fut> {
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
loop {
this.backend
.wake_flags
.as_ref()
.map(|v| v.fetch_or(0b10, Ordering::AcqRel));
let fut = unsafe { Pin::new_unchecked(&mut this.future) };
let backend = this.backend.as_mut();
match fut.poll(cx) {
Poll::Ready(v) => {
if let Some(v) = this.backend.wake_flags.as_ref() {
v.store(0, Ordering::Release);
}
break Poll::Ready(v);
}
Poll::Pending => match backend.poll(cx) {
Poll::Ready(_) => panic!("Backend future completed"),
Poll::Pending => {
if let Some(NestedBackend { owner, poll, .. }) =
unsafe { &*this.backend.owner.nest.get() }
{
unsafe { poll(*owner, cx) };
}
}
},
}
if this
.backend
.wake_flags
.as_ref()
.map(|v| v.fetch_and(0b0, Ordering::AcqRel) & 0b1)
.unwrap_or(0)
== 0
{
break Poll::Pending;
}
}
}
}
pub struct PollingHandle<'a, Handle = DefaultHandle> {
pub handle: Handle,
pub cur_flags: &'a PollingFlags,
pub max_flags: PollingFlags,
pub waker: Waker,
}
#[repr(transparent)]
pub struct PollingFlags {
flags: AtomicU8,
}
const READ_POLL: u8 = 0b1;
const WRITE_POLL: u8 = 0b10;
impl PollingFlags {
const fn from_flags(flags: u8) -> Self {
Self {
flags: AtomicU8::new(flags),
}
}
pub const fn new() -> Self {
Self {
flags: AtomicU8::new(0),
}
}
pub const fn all() -> Self {
Self {
flags: AtomicU8::new(!0),
}
}
pub const fn read(self, val: bool) -> Self {
let mut flags = unsafe { core::mem::transmute(self) };
if val {
flags |= READ_POLL;
} else {
flags &= !READ_POLL;
}
Self::from_flags(flags)
}
pub const fn write(self, val: bool) -> Self {
let mut flags = unsafe { core::mem::transmute(self) };
if val {
flags |= WRITE_POLL;
} else {
flags &= !WRITE_POLL;
}
Self::from_flags(flags)
}
pub fn set_read(&self, val: bool) {
if val {
self.flags.fetch_or(READ_POLL, Ordering::Relaxed);
} else {
self.flags.fetch_and(!READ_POLL, Ordering::Relaxed);
}
}
pub fn set_write(&self, val: bool) {
if val {
self.flags.fetch_or(WRITE_POLL, Ordering::Relaxed);
} else {
self.flags.fetch_and(!WRITE_POLL, Ordering::Relaxed);
}
}
pub fn get(&self) -> (bool, bool) {
let bits = self.flags.load(Ordering::Relaxed);
(bits & READ_POLL != 0, bits & WRITE_POLL != 0)
}
#[cfg(all(unix, feature = "std"))]
pub fn to_posix(&self) -> PollFlags {
let mut flags = PollFlags::empty();
let bits = self.flags.load(Ordering::Relaxed);
if bits & READ_POLL != 0 {
flags.set(PollFlags::POLLIN, true);
}
if bits & WRITE_POLL != 0 {
flags.set(PollFlags::POLLIN, true);
}
flags
}
}
pub trait IoBackend<Handle: Pollable = DefaultHandle> {
type Backend: Future<Output = ()> + Send + ?Sized;
fn polling_handle(&self) -> Option<PollingHandle>;
fn get_backend(&self) -> BackendHandle<Self::Backend>;
}
pub trait IoBackendExt<Handle: Pollable>: IoBackend<Handle> {
fn with_backend<F: Future>(
&self,
future: F,
) -> (WithBackend<Self::Backend, F>, Option<PollingHandle>) {
(
WithBackend {
backend: self.get_backend(),
future,
},
self.polling_handle(),
)
}
fn block_on<F: Future>(&self, fut: F) -> F::Output {
let backend = self.get_backend();
let polling = self.polling_handle();
block_on::<Handle, F, Self>(fut, backend, polling)
}
}
impl<T: ?Sized + IoBackend<Handle>, Handle: Pollable> IoBackendExt<Handle> for T {}
pub trait LinksIoBackend {
type Link: IoBackend + ?Sized;
fn get_mut(&self) -> &Self::Link;
}
impl<T: IoBackend> LinksIoBackend for T {
type Link = Self;
fn get_mut(&self) -> &Self::Link {
self
}
}
pub struct RefLink<'a, T: ?Sized>(&'a T);
impl<'a, T: IoBackend + ?Sized> LinksIoBackend for RefLink<'a, T> {
type Link = T;
fn get_mut(&self) -> &Self::Link {
self.0
}
}
pub fn block_on<H: Pollable, F: Future, B: IoBackend<H> + ?Sized>(
future: F,
backend: BackendHandle<B::Backend>,
polling: Option<PollingHandle>,
) -> F::Output {
let fut = WithBackend { backend, future };
if let Some(handle) = polling {
poller::block_on_handle(fut, &handle, &handle.waker)
} else {
poller::block_on(fut)
}
}
impl<H: Pollable> ParkHandle for PollingHandle<'_, H> {
fn unpark(&self) {
self.waker.wake_by_ref();
}
fn park(&self) {
self.handle.poll(self.cur_flags)
}
}
pub trait Pollable {
fn poll(&self, flags: &PollingFlags);
}
#[cfg(any(miri, not(feature = "std")))]
impl Pollable for DefaultHandle {
fn poll(&self, _: &PollingFlags) {
unimplemented!("Polling on requires std feature, and not be run on miri")
}
}
#[cfg(all(not(miri), unix, feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(all(not(miri), feature = "std"))))]
impl Pollable for DefaultHandle {
fn poll(&self, flags: &PollingFlags) {
let fd = PollFd::new(*self, flags.to_posix());
let _ = poll(&mut [fd], -1);
}
}
#[cfg(all(not(miri), windows, feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(all(not(miri), feature = "std"))))]
impl Pollable for DefaultHandle {
fn poll(&self, _: &PollingFlags) {
use windows_sys::Win32::System::Threading::{WaitForSingleObject, INFINITE};
let _ = unsafe { WaitForSingleObject(*self as _, INFINITE) };
}
}