enfync/
result_receiver.rs

1//local shortcuts
2use crate::*;
3
4//third-party shortcuts
5
6//standard shortcuts
7use futures::Future;
8use futures::future::{FusedFuture, MaybeDone};
9use std::fmt::Debug;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14//-------------------------------------------------------------------------------------------------------------------
15
16/// Represents a result receiver for async tasks. See [`PendingResult::new()`].
17#[async_trait::async_trait]
18pub trait ResultReceiver: Debug
19{
20    type Result: Send + Sync + 'static;
21
22    /// Check if the result is ready.
23    fn done(&self) -> bool;
24
25    /// Try to get the result.
26    /// - Returns `None` if the result is not available.
27    /// - Returns `Some(Err)` if the result could not be extracted (e.g. due to a task error OR due to the result already
28    ///   having been extracted.
29    fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>;
30
31    /// Consume self to get the result.
32    /// - Returns `Err` if the result could not be extracted (e.g. due to a task error OR due to the result already
33    ///   having been extracted.
34    async fn get(self: Box<Self>) -> Result<Self::Result, ResultError>;
35}
36
37//-------------------------------------------------------------------------------------------------------------------
38
39/// Uses a oneshot to receive the result.
40#[derive(Debug)]
41pub struct OneshotResultReceiver<R: Debug>
42{
43    done_flag       : Arc<AtomicBool>,
44    result_receiver : futures::channel::oneshot::Receiver<R>,
45    result_taken    : Option<ResultError>,
46}
47
48impl<R> OneshotResultReceiver<R>
49where
50    R: Debug + Send + Sync + 'static
51{
52    pub fn new<S, F>(spawner: &S, task: F) -> Self
53    where
54        S: OneshotSpawner,
55        F: std::future::Future<Output = R> + Send + 'static,
56    {
57        let done_flag = Arc::new(AtomicBool::new(false));
58        let done_flag_clone = done_flag.clone();
59        let (result_sender, result_receiver) = futures::channel::oneshot::channel();
60        let work_task = async move {
61                let result = task.await;
62                let _ = result_sender.send(result);
63
64                // ORDERING
65                // WASM compiles `AtomicBool` as `bool`, however since WASM is fully single-threaded, the ordering
66                // guarantee here is preserved.
67                done_flag_clone.store(true, Ordering::Release);
68            };
69        spawner.spawn(work_task);
70
71        Self{ done_flag, result_receiver, result_taken: None }
72    }
73}
74
75#[async_trait::async_trait]
76impl<R> ResultReceiver for OneshotResultReceiver<R>
77where
78    R: Debug + Send + Sync + 'static,
79{
80    type Result = R;
81
82    fn done(&self) -> bool
83    {
84        self.done_flag.load(Ordering::Acquire)
85    }
86
87    fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>
88    {
89        match self.result_receiver.try_recv()
90        {
91            Ok(Some(res)) => { self.result_taken = Some(ResultError::Taken); Some(Ok(res)) },
92            Err(_)        => { self.result_taken = Some(ResultError::TaskFailure); self.result_taken.map(|e| Err(e)) },
93            Ok(None)      => self.result_taken.map(|e| Err(e)),
94        }
95    }
96
97    async fn get(self: Box<Self>) -> Result<Self::Result, ResultError>
98    {
99        if let Some(error) = self.result_taken { return Err(error); }
100        match self.result_receiver.await
101        {
102            Ok(res) => Ok(res),
103            _       => Err(ResultError::TaskFailure),
104        }
105    }
106}
107
108//-------------------------------------------------------------------------------------------------------------------
109
110/// Uses a future to receive the result.
111#[derive(Debug)]
112pub struct SimpleResultReceiver<S: SimpleSpawner<R>, R: Debug>
113{
114    future_result : MaybeDone<<S as SimpleSpawner<R>>::Future>,
115    result_taken  : Option<ResultError>,
116}
117
118impl<S, R> SimpleResultReceiver<S, R>
119where
120    S: SimpleSpawner<R>,
121    R: Debug + Send + Sync + 'static,
122{
123    pub fn new<F>(spawner: &S, task: F) -> Self
124    where
125        F: std::future::Future<Output = R> + Send + 'static,
126    {
127        let future_result = futures::future::maybe_done(spawner.spawn(task));
128
129        Self{ future_result, result_taken: None }
130    }
131}
132
133#[async_trait::async_trait]
134impl<S, R> ResultReceiver for SimpleResultReceiver<S, R>
135where
136    S: SimpleSpawner<R>,
137    <S as spawners::SimpleSpawner<R>>::Future: Unpin,
138    R: Debug + Send + Sync + 'static,
139{
140    type Result = R;
141
142    fn done(&self) -> bool
143    {
144        match &self.future_result
145        {
146            MaybeDone::Future(fut) => S::is_done(fut),
147            MaybeDone::Done(_)     => true,
148            MaybeDone::Gone        => true,
149        }
150    }
151
152    fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>
153    {
154        // poll the future once
155        let mut pinned_fut = Pin::new(&mut self.future_result);
156
157        if !pinned_fut.is_terminated()
158        {
159            let noop_waker = futures::task::noop_waker();
160            let mut ctx = futures::task::Context::from_waker(&noop_waker);
161            let _ = pinned_fut.as_mut().poll(&mut ctx);
162        }
163
164        // check output
165        match pinned_fut.take_output()
166        {
167            Some(Ok(res)) => { self.result_taken = Some(ResultError::Taken); Some(Ok(res)) }
168            Some(Err(_))  => { self.result_taken = Some(ResultError::TaskFailure); self.result_taken.map(|e| Err(e)) },
169            None          => self.result_taken.map(|e| Err(e)),
170        }
171    }
172
173    async fn get(mut self: Box<Self>) -> Result<Self::Result, ResultError>
174    {
175        if let Some(error) = self.result_taken { return Err(error); }
176        let res = match self.future_result
177        {
178            MaybeDone::Future(fut) => fut.await,
179            MaybeDone::Done(res)   => res,
180            MaybeDone::Gone        => return Err(ResultError::Taken),
181        };
182
183        res.map_err(|_| ResultError::TaskFailure)
184    }
185}
186
187//-------------------------------------------------------------------------------------------------------------------
188
189/// Result receiver with an immediately-available result.
190#[derive(Debug)]
191pub struct ImmedateResultReceiver<R: Debug>
192{
193    result: Option<R>,
194}
195
196impl<R: Debug> ImmedateResultReceiver<R>
197{
198    pub fn new(result: R) -> Self
199    {
200        Self{ result: Some(result) }
201    }
202}
203
204#[async_trait::async_trait]
205impl<R: Debug + Send + Sync + 'static> ResultReceiver for ImmedateResultReceiver<R>
206{
207    type Result = R;
208
209    fn done(&self) -> bool
210    {
211        true
212    }
213
214    fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>
215    {
216        match self.result.take()
217        {
218            Some(res) => Some(Ok(res)),
219            None      => Some(Err(ResultError::Taken))
220        }
221    }
222
223    async fn get(mut self: Box<Self>) -> Result<Self::Result, ResultError>
224    {
225        self.try_get().unwrap_or(Err(ResultError::Taken))
226    }
227}
228
229//-------------------------------------------------------------------------------------------------------------------