1use futures_timer::Delay;
2use std::future::{pending, Future};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7mod stream_ext;
8pub use stream_ext::StreamExt;
9
10pub mod agent;
11pub mod notify;
12pub mod task;
13
14pub trait OnDropFutureExt
19where
20 Self: Future + Sized,
21{
22 fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D>;
50}
51impl<F: Future> OnDropFutureExt for F {
52 fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D> {
53 OnDropFuture {
54 inner: self,
55 on_drop,
56 }
57 }
58}
59
60pub struct OnDropFuture<F: Future, D: FnMut()> {
61 inner: F,
62 on_drop: D,
63}
64impl<F: Future, D: FnMut()> OnDropFuture<F, D> {
65 fn get_mut_inner(self: Pin<&mut Self>) -> Pin<&mut F> {
67 unsafe { self.map_unchecked_mut(|s| &mut s.inner) }
68 }
69
70 fn get_mut_on_drop(self: Pin<&mut Self>) -> &mut D {
72 unsafe { &mut self.get_unchecked_mut().on_drop }
73 }
74}
75impl<F: Future, D: FnMut()> Future for OnDropFuture<F, D> {
76 type Output = F::Output;
77
78 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
79 self.get_mut_inner().poll(cx)
80 }
81}
82impl<F: Future, D: FnMut()> Drop for OnDropFuture<F, D> {
83 fn drop(&mut self) {
84 inner_drop(unsafe { Pin::new_unchecked(self) });
86 fn inner_drop<F: Future, D: FnMut()>(this: Pin<&mut OnDropFuture<F, D>>) {
87 this.get_mut_on_drop()();
88 }
89 }
90}
91
92#[cfg(target_family = "wasm")]
93pub trait MaybeSend {}
94#[cfg(target_family = "wasm")]
95impl<T> MaybeSend for T {}
96
97#[cfg(target_family = "wasm")]
98pub trait MaybeSync {}
99#[cfg(target_family = "wasm")]
100impl<T> MaybeSync for T {}
101
102#[cfg(not(target_family = "wasm"))]
104pub trait MaybeSend: Send {}
105#[cfg(not(target_family = "wasm"))]
106impl<T: Send> MaybeSend for T {}
107
108#[cfg(not(target_family = "wasm"))]
109pub trait MaybeSync: Sync {}
110#[cfg(not(target_family = "wasm"))]
111impl<T: Sync> MaybeSync for T {}
112
113#[derive(Debug)]
114pub struct TimeoutError {}
115
116pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, TimeoutError> {
117 tokio::select! {
118 _ = Delay::new(duration) => Err(TimeoutError{}),
119 result = future => Ok(result),
120 }
121}
122
123pub async fn sleep(duration: Duration) {
124 let _ = timeout(duration, pending::<()>()).await;
125}
126
127#[cfg(test)]
128mod test {
129 use std::time::Duration;
130
131 #[tokio::test]
132 async fn test_timeout() {
133 assert!(super::timeout(Duration::from_millis(10), async move {
134 loop {
135 tokio::task::yield_now().await;
136 }
137 })
138 .await
139 .is_err())
140 }
141}