dprint_core/
async_runtime.rs

1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::Context;
4use std::task::Poll;
5
6use futures::Future;
7use tokio::runtime::Handle;
8use tokio::runtime::RuntimeFlavor;
9
10pub use futures::FutureExt;
11
12pub type LocalBoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;
13
14pub use async_trait::async_trait;
15pub use futures::future;
16
17// The below is lifted from https://github.com/denoland/deno_core/blob/80c7f5551a907def8253af8c6d4e9ded49ee638d/core/task.rs#L48
18// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
19
20/// Equivalent to [`tokio::task::JoinHandle`].
21#[repr(transparent)]
22pub struct JoinHandle<R> {
23  handle: tokio::task::JoinHandle<MaskResultAsSend<R>>,
24  _r: PhantomData<R>,
25}
26
27impl<R> JoinHandle<R> {
28  /// Equivalent to [`tokio::task::JoinHandle::abort`].
29  pub fn abort(&self) {
30    self.handle.abort()
31  }
32}
33
34impl<R> Future for JoinHandle<R> {
35  type Output = Result<R, tokio::task::JoinError>;
36
37  fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
38    // SAFETY: We are sure that handle is valid here
39    unsafe {
40      let me: &mut Self = Pin::into_inner_unchecked(self);
41      let handle = Pin::new_unchecked(&mut me.handle);
42      match handle.poll(cx) {
43        Poll::Pending => Poll::Pending,
44        Poll::Ready(Ok(r)) => Poll::Ready(Ok(r.into_inner())),
45        Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
46      }
47    }
48  }
49}
50
51/// Equivalent to [`tokio::task::spawn`], but does not require the future to be [`Send`]. Must only be
52/// used on a [`RuntimeFlavor::CurrentThread`] executor, though this is only checked when running with
53/// debug assertions.
54#[inline(always)]
55pub fn spawn<F: Future<Output = R> + 'static, R: 'static>(f: F) -> JoinHandle<R> {
56  debug_assert!(Handle::current().runtime_flavor() == RuntimeFlavor::CurrentThread);
57  // SAFETY: we know this is a current-thread executor
58  let future = unsafe { MaskFutureAsSend::new(f) };
59  JoinHandle {
60    handle: tokio::task::spawn(future),
61    _r: Default::default(),
62  }
63}
64
65/// Equivalent to [`tokio::task::spawn_blocking`]. Currently a thin wrapper around the tokio API, but this
66/// may change in the future.
67#[inline(always)]
68pub fn spawn_blocking<F: (FnOnce() -> R) + Send + 'static, R: Send + 'static>(f: F) -> JoinHandle<R> {
69  let handle = tokio::task::spawn_blocking(|| MaskResultAsSend { result: f() });
70  JoinHandle {
71    handle,
72    _r: Default::default(),
73  }
74}
75
76#[repr(transparent)]
77#[doc(hidden)]
78pub struct MaskResultAsSend<R> {
79  result: R,
80}
81
82/// SAFETY: We ensure that Send bounds are only faked when tokio is running on a current-thread executor
83unsafe impl<R> Send for MaskResultAsSend<R> {}
84
85impl<R> MaskResultAsSend<R> {
86  #[inline(always)]
87  pub fn into_inner(self) -> R {
88    self.result
89  }
90}
91
92#[repr(transparent)]
93pub struct MaskFutureAsSend<F> {
94  future: F,
95}
96
97impl<F> MaskFutureAsSend<F> {
98  /// Mark a non-`Send` future as `Send`. This is a trick to be able to use
99  /// `tokio::spawn()` (which requires `Send` futures) in a current thread
100  /// runtime.
101  ///
102  /// # Safety
103  ///
104  /// You must ensure that the future is actually used on the same
105  /// thread, ie. always use current thread runtime flavor from Tokio.
106  #[inline(always)]
107  pub unsafe fn new(future: F) -> Self {
108    Self { future }
109  }
110}
111
112// SAFETY: we are cheating here - this struct is NOT really Send,
113// but we need to mark it Send so that we can use `spawn()` in Tokio.
114unsafe impl<F> Send for MaskFutureAsSend<F> {}
115
116impl<F: Future> Future for MaskFutureAsSend<F> {
117  type Output = MaskResultAsSend<F::Output>;
118
119  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<MaskResultAsSend<F::Output>> {
120    // SAFETY: We are sure that future is valid here
121    unsafe {
122      let me: &mut MaskFutureAsSend<F> = Pin::into_inner_unchecked(self);
123      let future = Pin::new_unchecked(&mut me.future);
124      match future.poll(cx) {
125        Poll::Pending => Poll::Pending,
126        Poll::Ready(result) => Poll::Ready(MaskResultAsSend { result }),
127      }
128    }
129  }
130}
131
132/// A drop guard that runs a finalizer when dropped.
133///
134/// This is useful in scenarios when you need to ensure cleanup happens
135/// after awaiting a future.
136pub struct DropGuardAction<T: FnOnce()> {
137  finalizer: Option<T>,
138}
139
140impl<T: FnOnce()> Drop for DropGuardAction<T> {
141  fn drop(&mut self) {
142    if let Some(finalizer) = self.finalizer.take() {
143      (finalizer)();
144    }
145  }
146}
147
148impl<T: FnOnce()> DropGuardAction<T> {
149  pub fn new(finalizer: T) -> Self {
150    Self { finalizer: Some(finalizer) }
151  }
152
153  /// Forget about running the finalizer on this drop guard.
154  pub fn forget(&mut self) {
155    self.finalizer.take();
156  }
157}