1#[cfg(not(feature = "std"))]
8use crate::std_prelude::*;
9
10use crate::poller::{self, ParkHandle};
11
12use core::cell::UnsafeCell;
13use core::future::Future;
14use core::pin::Pin;
15use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
16use core::task::{Context, Poll, Waker};
17use tarc::Arc;
18
19pub mod integrations;
20
21pub use integrations::null::{Null, NullImpl};
22pub use integrations::Integration;
23
24#[cfg(all(unix, feature = "std"))]
25use nix::poll::*;
26#[cfg(all(unix, feature = "std"))]
27use std::os::fd::RawFd;
28#[cfg(all(windows, feature = "std"))]
29use std::os::windows::io::RawHandle;
30
31#[cfg(all(any(unix, target_os = "wasi"), feature = "std"))]
32#[cfg_attr(docsrs, doc(cfg(all(any(unix, target_os = "wasi"), feature = "std"))))]
33pub mod fd;
34
35#[cfg(all(windows, feature = "std"))]
36#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "std"))))]
37pub mod handle;
38
39#[cfg(all(windows, feature = "std"))]
40#[cfg_attr(docsrs, doc(cfg(windows)))]
41pub mod windows;
42
43#[cfg(all(unix, feature = "std"))]
46pub type DefaultHandle = RawFd;
47#[cfg(all(windows, feature = "std"))]
48pub type DefaultHandle = RawHandle;
49#[cfg(not(feature = "std"))]
50pub type DefaultHandle = core::convert::Infallible;
51
52pub type DynBackend = dyn Future<Output = ()> + Send;
53
54#[repr(C)]
55struct NestedBackend {
56 owner: *const (),
57 poll: unsafe extern "C" fn(*const (), &mut Context),
58 release: unsafe extern "C" fn(*const ()),
59}
60
61#[repr(C)]
68pub struct BackendContainer<B: ?Sized> {
69 nest: UnsafeCell<Option<NestedBackend>>,
70 backend: UnsafeCell<Pin<Box<B>>>,
71 lock: AtomicBool,
72}
73
74unsafe impl<B: ?Sized + Send> Send for BackendContainer<B> {}
75unsafe impl<B: ?Sized + Send> Sync for BackendContainer<B> {}
76
77impl<B: ?Sized> BackendContainer<B> {
78 pub fn acquire(&self, wake_flags: Option<Arc<AtomicU8>>) -> BackendHandle<B> {
87 if self.lock.swap(true, Ordering::AcqRel) {
88 panic!("Tried to acquire backend twice!");
89 }
90
91 let backend = unsafe { &mut *self.backend.get() }.as_mut();
92
93 BackendHandle {
94 owner: self,
95 backend,
96 wake_flags,
97 }
98 }
99
100 pub fn acquire_nested<B2: ?Sized + Future<Output = ()>>(
107 &self,
108 mut handle: BackendHandle<B2>,
109 ) -> BackendHandle<B> {
110 let wake_flags = handle.wake_flags.take();
111 let owner = handle.owner;
112
113 let our_handle = self.acquire(wake_flags);
114
115 unsafe extern "C" fn poll<B: ?Sized + Future<Output = ()>>(
116 data: *const (),
117 context: &mut Context,
118 ) {
119 let data = &*(data as *const BackendContainer<B>);
120 if Pin::new_unchecked(&mut *data.backend.get())
121 .poll(context)
122 .is_ready()
123 {
124 panic!("Backend polled to completion!")
125 }
126 }
127
128 unsafe extern "C" fn release<B: ?Sized>(data: *const ()) {
129 let data = &*(data as *const BackendContainer<B>);
130 data.lock.store(false, Ordering::Release);
131 }
132
133 core::mem::forget(handle);
136
137 unsafe {
138 *self.nest.get() = Some(NestedBackend {
139 owner: owner as *const _ as *const (),
140 poll: poll::<B2>,
141 release: release::<B2>,
142 });
143 }
144
145 our_handle
146 }
147}
148
149impl BackendContainer<DynBackend> {
150 pub fn new_dyn<T: Future<Output = ()> + Send + 'static>(backend: T) -> Self {
152 Self {
153 backend: UnsafeCell::new(Box::pin(backend) as Pin<Box<dyn Future<Output = ()> + Send>>),
154 nest: UnsafeCell::new(None),
155 lock: Default::default(),
156 }
157 }
158}
159
160pub struct BackendHandle<'a, B: ?Sized> {
169 owner: &'a BackendContainer<B>,
170 backend: Pin<&'a mut B>,
171 wake_flags: Option<Arc<AtomicU8>>,
172}
173
174impl<'a, B: ?Sized> Drop for BackendHandle<'a, B> {
175 fn drop(&mut self) {
176 if let Some(NestedBackend { owner, release, .. }) =
178 unsafe { (*self.owner.nest.get()).take() }
179 {
180 unsafe { release(owner) }
183 }
184
185 self.owner.lock.store(false, Ordering::Release);
186 }
187}
188
189impl<'a, B: ?Sized> core::ops::Deref for BackendHandle<'a, B> {
190 type Target = Pin<&'a mut B>;
191
192 fn deref(&self) -> &Self::Target {
193 &self.backend
194 }
195}
196
197impl<'a, B: ?Sized> core::ops::DerefMut for BackendHandle<'a, B> {
198 fn deref_mut(&mut self) -> &mut Self::Target {
199 &mut self.backend
200 }
201}
202
203pub struct WithBackend<'a, Backend: ?Sized, Fut: ?Sized> {
211 backend: BackendHandle<'a, Backend>,
212 future: Fut,
213}
214
215impl<'a, Backend: Future + ?Sized, Fut: Future + ?Sized> Future for WithBackend<'a, Backend, Fut> {
216 type Output = Fut::Output;
217
218 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
219 let this = unsafe { self.get_unchecked_mut() };
220
221 loop {
222 this.backend
223 .wake_flags
224 .as_ref()
225 .map(|v| v.fetch_or(0b10, Ordering::AcqRel));
226 let fut = unsafe { Pin::new_unchecked(&mut this.future) };
227 let backend = this.backend.as_mut();
228
229 match fut.poll(cx) {
230 Poll::Ready(v) => {
231 if let Some(v) = this.backend.wake_flags.as_ref() {
232 v.store(0, Ordering::Release);
233 }
234 break Poll::Ready(v);
235 }
236 Poll::Pending => match backend.poll(cx) {
237 Poll::Ready(_) => panic!("Backend future completed"),
238 Poll::Pending => {
239 if let Some(NestedBackend { owner, poll, .. }) =
241 unsafe { &*this.backend.owner.nest.get() }
242 {
243 unsafe { poll(*owner, cx) };
246 }
247 }
248 },
249 }
250
251 if this
252 .backend
253 .wake_flags
254 .as_ref()
255 .map(|v| v.fetch_and(0b0, Ordering::AcqRel) & 0b1)
256 .unwrap_or(0)
257 == 0
258 {
259 break Poll::Pending;
260 }
261 }
262 }
263}
264
265pub struct PollingHandle<'a, Handle = DefaultHandle> {
272 pub handle: Handle,
273 pub cur_flags: &'a PollingFlags,
274 pub max_flags: PollingFlags,
275 pub waker: Waker,
276}
277
278#[repr(transparent)]
289pub struct PollingFlags {
290 flags: AtomicU8,
291}
292
293const READ_POLL: u8 = 0b1;
294const WRITE_POLL: u8 = 0b10;
295
296impl PollingFlags {
297 const fn from_flags(flags: u8) -> Self {
298 Self {
299 flags: AtomicU8::new(flags),
300 }
301 }
302
303 pub const fn new() -> Self {
305 Self {
306 flags: AtomicU8::new(0),
307 }
308 }
309
310 pub const fn all() -> Self {
312 Self {
313 flags: AtomicU8::new(!0),
314 }
315 }
316
317 pub const fn read(self, val: bool) -> Self {
319 let mut flags = unsafe { core::mem::transmute(self) };
322 if val {
323 flags |= READ_POLL;
324 } else {
325 flags &= !READ_POLL;
326 }
327 Self::from_flags(flags)
328 }
329
330 pub const fn write(self, val: bool) -> Self {
332 let mut flags = unsafe { core::mem::transmute(self) };
334 if val {
335 flags |= WRITE_POLL;
336 } else {
337 flags &= !WRITE_POLL;
338 }
339 Self::from_flags(flags)
340 }
341
342 pub fn set_read(&self, val: bool) {
344 if val {
345 self.flags.fetch_or(READ_POLL, Ordering::Relaxed);
346 } else {
347 self.flags.fetch_and(!READ_POLL, Ordering::Relaxed);
348 }
349 }
350
351 pub fn set_write(&self, val: bool) {
353 if val {
354 self.flags.fetch_or(WRITE_POLL, Ordering::Relaxed);
355 } else {
356 self.flags.fetch_and(!WRITE_POLL, Ordering::Relaxed);
357 }
358 }
359
360 pub fn get(&self) -> (bool, bool) {
362 let bits = self.flags.load(Ordering::Relaxed);
363 (bits & READ_POLL != 0, bits & WRITE_POLL != 0)
364 }
365
366 #[cfg(all(unix, feature = "std"))]
368 pub fn to_posix(&self) -> PollFlags {
369 let mut flags = PollFlags::empty();
370 let bits = self.flags.load(Ordering::Relaxed);
373 if bits & READ_POLL != 0 {
374 flags.set(PollFlags::POLLIN, true);
375 }
376 if bits & WRITE_POLL != 0 {
377 flags.set(PollFlags::POLLIN, true);
378 }
379 flags
380 }
381}
382
383pub trait IoBackend<Handle: Pollable = DefaultHandle> {
391 type Backend: Future<Output = ()> + Send + ?Sized;
392
393 fn polling_handle(&self) -> Option<PollingHandle>;
404
405 fn get_backend(&self) -> BackendHandle<Self::Backend>;
413}
414
415pub trait IoBackendExt<Handle: Pollable>: IoBackend<Handle> {
417 fn with_backend<F: Future>(
422 &self,
423 future: F,
424 ) -> (WithBackend<Self::Backend, F>, Option<PollingHandle>) {
425 (
426 WithBackend {
427 backend: self.get_backend(),
428 future,
429 },
430 self.polling_handle(),
431 )
432 }
433
434 fn block_on<F: Future>(&self, fut: F) -> F::Output {
439 let backend = self.get_backend();
440 let polling = self.polling_handle();
441 block_on::<Handle, F, Self>(fut, backend, polling)
442 }
443}
444
445impl<T: ?Sized + IoBackend<Handle>, Handle: Pollable> IoBackendExt<Handle> for T {}
446
447pub trait LinksIoBackend {
449 type Link: IoBackend + ?Sized;
450
451 fn get_mut(&self) -> &Self::Link;
452}
453
454impl<T: IoBackend> LinksIoBackend for T {
455 type Link = Self;
456
457 fn get_mut(&self) -> &Self::Link {
458 self
459 }
460}
461
462pub struct RefLink<'a, T: ?Sized>(&'a T);
464
465impl<'a, T: IoBackend + ?Sized> LinksIoBackend for RefLink<'a, T> {
466 type Link = T;
467
468 fn get_mut(&self) -> &Self::Link {
469 self.0
470 }
471}
472
473pub fn block_on<H: Pollable, F: Future, B: IoBackend<H> + ?Sized>(
474 future: F,
475 backend: BackendHandle<B::Backend>,
476 polling: Option<PollingHandle>,
477) -> F::Output {
478 let fut = WithBackend { backend, future };
479
480 if let Some(handle) = polling {
481 poller::block_on_handle(fut, &handle, &handle.waker)
482 } else {
483 poller::block_on(fut)
484 }
485}
486
487impl<H: Pollable> ParkHandle for PollingHandle<'_, H> {
488 fn unpark(&self) {
489 self.waker.wake_by_ref();
490 }
491
492 fn park(&self) {
493 self.handle.poll(self.cur_flags)
494 }
495}
496
497pub trait Pollable {
502 fn poll(&self, flags: &PollingFlags);
503}
504
505#[cfg(any(miri, not(feature = "std")))]
506impl Pollable for DefaultHandle {
507 fn poll(&self, _: &PollingFlags) {
508 unimplemented!("Polling on requires std feature, and not be run on miri")
509 }
510}
511
512#[cfg(all(not(miri), unix, feature = "std"))]
513#[cfg_attr(docsrs, doc(cfg(all(not(miri), feature = "std"))))]
514impl Pollable for DefaultHandle {
515 fn poll(&self, flags: &PollingFlags) {
516 let fd = PollFd::new(*self, flags.to_posix());
517 let _ = poll(&mut [fd], -1);
518 }
519}
520
521#[cfg(all(not(miri), windows, feature = "std"))]
522#[cfg_attr(docsrs, doc(cfg(all(not(miri), feature = "std"))))]
523impl Pollable for DefaultHandle {
524 fn poll(&self, _: &PollingFlags) {
525 use windows_sys::Win32::System::Threading::{WaitForSingleObject, INFINITE};
526 let _ = unsafe { WaitForSingleObject(*self as _, INFINITE) };
527 }
528}