enfync/
result_receiver.rs1use crate::*;
3
4use futures::Future;
8use futures::future::{FusedFuture, MaybeDone};
9use std::fmt::Debug;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13
14#[async_trait::async_trait]
18pub trait ResultReceiver: Debug
19{
20 type Result: Send + Sync + 'static;
21
22 fn done(&self) -> bool;
24
25 fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>;
30
31 async fn get(self: Box<Self>) -> Result<Self::Result, ResultError>;
35}
36
37#[derive(Debug)]
41pub struct OneshotResultReceiver<R: Debug>
42{
43 done_flag : Arc<AtomicBool>,
44 result_receiver : futures::channel::oneshot::Receiver<R>,
45 result_taken : Option<ResultError>,
46}
47
48impl<R> OneshotResultReceiver<R>
49where
50 R: Debug + Send + Sync + 'static
51{
52 pub fn new<S, F>(spawner: &S, task: F) -> Self
53 where
54 S: OneshotSpawner,
55 F: std::future::Future<Output = R> + Send + 'static,
56 {
57 let done_flag = Arc::new(AtomicBool::new(false));
58 let done_flag_clone = done_flag.clone();
59 let (result_sender, result_receiver) = futures::channel::oneshot::channel();
60 let work_task = async move {
61 let result = task.await;
62 let _ = result_sender.send(result);
63
64 done_flag_clone.store(true, Ordering::Release);
68 };
69 spawner.spawn(work_task);
70
71 Self{ done_flag, result_receiver, result_taken: None }
72 }
73}
74
75#[async_trait::async_trait]
76impl<R> ResultReceiver for OneshotResultReceiver<R>
77where
78 R: Debug + Send + Sync + 'static,
79{
80 type Result = R;
81
82 fn done(&self) -> bool
83 {
84 self.done_flag.load(Ordering::Acquire)
85 }
86
87 fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>
88 {
89 match self.result_receiver.try_recv()
90 {
91 Ok(Some(res)) => { self.result_taken = Some(ResultError::Taken); Some(Ok(res)) },
92 Err(_) => { self.result_taken = Some(ResultError::TaskFailure); self.result_taken.map(|e| Err(e)) },
93 Ok(None) => self.result_taken.map(|e| Err(e)),
94 }
95 }
96
97 async fn get(self: Box<Self>) -> Result<Self::Result, ResultError>
98 {
99 if let Some(error) = self.result_taken { return Err(error); }
100 match self.result_receiver.await
101 {
102 Ok(res) => Ok(res),
103 _ => Err(ResultError::TaskFailure),
104 }
105 }
106}
107
108#[derive(Debug)]
112pub struct SimpleResultReceiver<S: SimpleSpawner<R>, R: Debug>
113{
114 future_result : MaybeDone<<S as SimpleSpawner<R>>::Future>,
115 result_taken : Option<ResultError>,
116}
117
118impl<S, R> SimpleResultReceiver<S, R>
119where
120 S: SimpleSpawner<R>,
121 R: Debug + Send + Sync + 'static,
122{
123 pub fn new<F>(spawner: &S, task: F) -> Self
124 where
125 F: std::future::Future<Output = R> + Send + 'static,
126 {
127 let future_result = futures::future::maybe_done(spawner.spawn(task));
128
129 Self{ future_result, result_taken: None }
130 }
131}
132
133#[async_trait::async_trait]
134impl<S, R> ResultReceiver for SimpleResultReceiver<S, R>
135where
136 S: SimpleSpawner<R>,
137 <S as spawners::SimpleSpawner<R>>::Future: Unpin,
138 R: Debug + Send + Sync + 'static,
139{
140 type Result = R;
141
142 fn done(&self) -> bool
143 {
144 match &self.future_result
145 {
146 MaybeDone::Future(fut) => S::is_done(fut),
147 MaybeDone::Done(_) => true,
148 MaybeDone::Gone => true,
149 }
150 }
151
152 fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>
153 {
154 let mut pinned_fut = Pin::new(&mut self.future_result);
156
157 if !pinned_fut.is_terminated()
158 {
159 let noop_waker = futures::task::noop_waker();
160 let mut ctx = futures::task::Context::from_waker(&noop_waker);
161 let _ = pinned_fut.as_mut().poll(&mut ctx);
162 }
163
164 match pinned_fut.take_output()
166 {
167 Some(Ok(res)) => { self.result_taken = Some(ResultError::Taken); Some(Ok(res)) }
168 Some(Err(_)) => { self.result_taken = Some(ResultError::TaskFailure); self.result_taken.map(|e| Err(e)) },
169 None => self.result_taken.map(|e| Err(e)),
170 }
171 }
172
173 async fn get(mut self: Box<Self>) -> Result<Self::Result, ResultError>
174 {
175 if let Some(error) = self.result_taken { return Err(error); }
176 let res = match self.future_result
177 {
178 MaybeDone::Future(fut) => fut.await,
179 MaybeDone::Done(res) => res,
180 MaybeDone::Gone => return Err(ResultError::Taken),
181 };
182
183 res.map_err(|_| ResultError::TaskFailure)
184 }
185}
186
187#[derive(Debug)]
191pub struct ImmedateResultReceiver<R: Debug>
192{
193 result: Option<R>,
194}
195
196impl<R: Debug> ImmedateResultReceiver<R>
197{
198 pub fn new(result: R) -> Self
199 {
200 Self{ result: Some(result) }
201 }
202}
203
204#[async_trait::async_trait]
205impl<R: Debug + Send + Sync + 'static> ResultReceiver for ImmedateResultReceiver<R>
206{
207 type Result = R;
208
209 fn done(&self) -> bool
210 {
211 true
212 }
213
214 fn try_get(&mut self) -> Option<Result<Self::Result, ResultError>>
215 {
216 match self.result.take()
217 {
218 Some(res) => Some(Ok(res)),
219 None => Some(Err(ResultError::Taken))
220 }
221 }
222
223 async fn get(mut self: Box<Self>) -> Result<Self::Result, ResultError>
224 {
225 self.try_get().unwrap_or(Err(ResultError::Taken))
226 }
227}
228
229