twinkle_client/
lib.rs

1use futures_timer::Delay;
2use std::future::{pending, Future};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7mod stream_ext;
8pub use stream_ext::StreamExt;
9
10pub mod agent;
11pub mod notify;
12pub mod task;
13
14// https://stackoverflow.com/questions/74985153/implementing-drop-for-a-future-in-rust
15
16/// Trait allowing you to attach a function to a [Future] that will be called when
17/// the future is dropped.  
18pub trait OnDropFutureExt
19where
20    Self: Future + Sized,
21{
22    /// Wraps the future with an OnDropFuture that will execute the given function
23    /// when the future is dropped.  This is useful for situations where some resources need
24    /// to be cleaned up when the future goes away.  Note; the function registered with this
25    /// method will *always* run when the future is dropped which happens when a future is run
26    /// to completion, and when it isn't.
27    /// # Example
28    /// ```
29    /// use twinkle_client::OnDropFutureExt;
30    /// use std::sync::{Mutex, Arc};
31    /// async move {
32    ///     let val1 = Arc::new(Mutex::new(0));
33    ///     let val2 = val1.clone();
34    ///     let val3 = val1.clone();
35    ///     let future = async {
36    ///         println!("In the future!");
37    ///         let mut val_lock = val1.lock().unwrap();
38    ///         assert_eq!(*val_lock, 0);
39    ///         *val_lock += 1;
40    ///     }.on_drop(move ||  {
41    ///         println!("On the drop");
42    ///         let mut val_lock = val2.lock().unwrap();
43    ///         assert_eq!(*val_lock, 1);
44    ///         *val_lock += 1;
45    ///     });
46    ///     future.await;
47    ///     assert_eq!(*val3.lock().unwrap(), 2);
48    /// };
49    fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D>;
50}
51impl<F: Future> OnDropFutureExt for F {
52    fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D> {
53        OnDropFuture {
54            inner: self,
55            on_drop,
56        }
57    }
58}
59
60pub struct OnDropFuture<F: Future, D: FnMut()> {
61    inner: F,
62    on_drop: D,
63}
64impl<F: Future, D: FnMut()> OnDropFuture<F, D> {
65    // See: https://doc.rust-lang.org/std/pin/#pinning-is-structural-for-field
66    fn get_mut_inner(self: Pin<&mut Self>) -> Pin<&mut F> {
67        unsafe { self.map_unchecked_mut(|s| &mut s.inner) }
68    }
69
70    // See: https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field
71    fn get_mut_on_drop(self: Pin<&mut Self>) -> &mut D {
72        unsafe { &mut self.get_unchecked_mut().on_drop }
73    }
74}
75impl<F: Future, D: FnMut()> Future for OnDropFuture<F, D> {
76    type Output = F::Output;
77
78    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
79        self.get_mut_inner().poll(cx)
80    }
81}
82impl<F: Future, D: FnMut()> Drop for OnDropFuture<F, D> {
83    fn drop(&mut self) {
84        // See: https://doc.rust-lang.org/std/pin/#drop-implementation
85        inner_drop(unsafe { Pin::new_unchecked(self) });
86        fn inner_drop<F: Future, D: FnMut()>(this: Pin<&mut OnDropFuture<F, D>>) {
87            this.get_mut_on_drop()();
88        }
89    }
90}
91
92#[cfg(target_family = "wasm")]
93pub trait MaybeSend {}
94#[cfg(target_family = "wasm")]
95impl<T> MaybeSend for T {}
96
97#[cfg(target_family = "wasm")]
98pub trait MaybeSync {}
99#[cfg(target_family = "wasm")]
100impl<T> MaybeSync for T {}
101
102// Helper trait that requires Send for non-wasm
103#[cfg(not(target_family = "wasm"))]
104pub trait MaybeSend: Send {}
105#[cfg(not(target_family = "wasm"))]
106impl<T: Send> MaybeSend for T {}
107
108#[cfg(not(target_family = "wasm"))]
109pub trait MaybeSync: Sync {}
110#[cfg(not(target_family = "wasm"))]
111impl<T: Sync> MaybeSync for T {}
112
113#[derive(Debug)]
114pub struct TimeoutError {}
115
116pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, TimeoutError> {
117    tokio::select! {
118        _ = Delay::new(duration) => Err(TimeoutError{}),
119        result = future => Ok(result),
120    }
121}
122
123pub async fn sleep(duration: Duration) {
124    let _ = timeout(duration, pending::<()>()).await;
125}
126
127#[cfg(test)]
128mod test {
129    use std::time::Duration;
130
131    #[tokio::test]
132    async fn test_timeout() {
133        assert!(super::timeout(Duration::from_millis(10), async move {
134            loop {
135                tokio::task::yield_now().await;
136            }
137        })
138        .await
139        .is_err())
140    }
141}