deferred_future/
thread_deferred_future.rs

1use ::futures::future::FusedFuture;
2use ::std::{cell::Cell, future::Future, pin::Pin, sync::{Arc, Mutex, PoisonError}, task::{Context, Poll, Waker}};
3#[derive(Default)]
4pub struct ThreadSharedState<T>
5where T: Send + Sync {
6    data: Option<T>,
7    waker: Option<Waker>,
8}
9impl<T> ThreadSharedState<T>
10where T: Send + Sync {
11    #[allow(unused)]
12    pub fn complete(&mut self, data: T) {
13        self.data.replace(data);
14        self.waker.take().map(|waker| {waker.wake()});
15    }
16}
17pub struct ThreadDeferredFuture<T>
18where T: Send + Sync {
19    is_terminated: Cell<bool>,
20    shared_state: Arc<Mutex<ThreadSharedState<T>>>
21}
22impl<T> ThreadDeferredFuture<T>
23where T: Send + Sync {
24    pub fn defer(&self) -> Arc<Mutex<ThreadSharedState<T>>> {
25        Arc::clone(&self.shared_state)
26    }
27}
28impl<T> Default for ThreadDeferredFuture<T>
29where T: Send + Sync {
30    fn default() -> Self {
31        Self {
32            is_terminated: Cell::new(false),
33            shared_state: Arc::new(Mutex::new(ThreadSharedState {
34                data: None,
35                waker: None
36            }))
37        }
38    }
39}
40impl<T> Future for ThreadDeferredFuture<T>
41where T: Send + Sync {
42    type Output = T;
43    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
44        let current_waker = context.waker();
45        let mut shared_state = self.shared_state.lock().unwrap_or_else(PoisonError::into_inner);
46        if shared_state.waker.as_ref().map_or(true, |w| !w.will_wake(current_waker)) {
47            shared_state.waker.replace(current_waker.clone());
48        }
49        if shared_state.data.is_none() {
50            self.is_terminated.set(false);
51            Poll::Pending
52        } else {
53            self.is_terminated.set(true);
54            Poll::Ready(shared_state.data.take().unwrap())
55        }
56    }
57}
58impl<T> FusedFuture for ThreadDeferredFuture<T>
59where T: Send + Sync {
60    fn is_terminated(&self) -> bool {
61        self.is_terminated.get()
62    }
63}