futures_scopes/local.rs
1use std::cell::RefCell;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::mem;
5use std::ops::DerefMut;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll, Waker};
9
10use futures::future::RemoteHandle;
11use futures::stream::FuturesUnordered;
12use futures::task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, noop_waker};
13use futures::{FutureExt, StreamExt};
14use pin_project::pin_project;
15
16use crate::ScopedSpawn;
17
18#[derive(Debug)]
19struct IncomingPad<'sc, T> {
20 futures: VecDeque<LocalFutureObj<'sc, T>>,
21 waker: Option<Waker>,
22}
23
24impl<'sc, T> IncomingPad<'sc, T> {
25 fn new() -> Self {
26 Self {
27 futures: VecDeque::new(),
28 waker: None,
29 }
30 }
31
32 fn push(&mut self, fut: LocalFutureObj<'sc, T>) {
33 self.futures.push_back(fut);
34 if let Some(waker) = self.waker.take() {
35 waker.wake();
36 }
37 }
38}
39
40type IncomingPadRef<'sc, T> = Rc<RefCell<Option<IncomingPad<'sc, T>>>>;
41
42/// A spawn scope that can be used to spawn non-send/sync futures of lifetime `'sc`.
43///
44/// Spawned futures are not processed until the scope is polled by awaiting one of the
45/// futures returned by [`until`](Self::until), [`until_stalled`](Self::until_stalled) or [`until_empty`](Self::until_stalled).
46///
47/// Spawned futures can return a result of type `T` that can be accessed via [`results`](Self::results) or [`take_results`](Self::take_results).
48///
49/// # Dropping
50/// If the scope is dropped, all futures that where spawned on it will be dropped as well.
51///
52/// # Examples
53///
54/// ```
55/// # use futures::future::FutureExt;
56/// # use futures::task::LocalSpawnExt;
57/// # use futures::executor::block_on;
58/// use futures_scopes::local::LocalScope;
59///
60/// let some_value = &42;
61///
62/// let mut scope = LocalScope::<usize>::new();
63/// let spawner = scope.spawner();
64/// spawner.spawn_local_scoped(async {
65/// // You can reference `some_value` here
66/// *some_value
67/// }).unwrap();
68///
69/// // Process the scope and wait until all futures have been completed
70/// block_on(scope.until_empty());
71///
72/// // use `scope.results()` to access the results of the spawned futures
73/// assert_eq!(scope.results(), &[42]);
74/// ```
75///
76#[derive(Debug)]
77pub struct LocalScope<'sc, T = ()> {
78 futures: FuturesUnordered<LocalFutureObj<'sc, T>>,
79 incoming: IncomingPadRef<'sc, T>,
80 result: Vec<T>,
81}
82
83impl<'sc, T> LocalScope<'sc, T> {
84 /// Creates a new spawn scope.
85 ///
86 /// Spawned futures can reference anything before the creation of the scope.
87 pub fn new() -> Self {
88 Self {
89 futures: FuturesUnordered::new(),
90 incoming: Rc::new(RefCell::new(Some(IncomingPad::new()))),
91 result: Vec::new(),
92 }
93 }
94
95 /// Get a cloneable spawner that can be used to spawn local futures to this scope.
96 ///
97 /// This spawner can live longer then the scope.
98 /// In case a future is spawned after the scope has been dropped, the spawner will return [`SpawnError::shutdown`].
99 pub fn spawner(&self) -> LocalScopeSpawner<'sc, T> {
100 LocalScopeSpawner {
101 scope: self.incoming.clone(),
102 }
103 }
104
105 /// Drops all spawned futures and prevents new futures from being spawned.
106 ///
107 /// In case a future is spawned after `cancel` has been called, the spawner will return [`SpawnError::shutdown`].
108 ///
109 /// # Examples
110 ///
111 /// ```
112 /// # use futures::future::FutureExt;
113 /// # use futures::task::LocalSpawnExt;
114 /// # use futures::task::SpawnError;
115 /// # use futures::executor::block_on;
116 /// use futures_scopes::local::LocalScope;
117 ///
118 /// let mut scope = LocalScope::new();
119 /// let spawner = scope.spawner();
120 /// spawner.spawn_local_scoped(async {
121 /// // ...
122 /// }).unwrap();
123 ///
124 /// scope.cancel();
125 /// let res = spawner.spawn_local_scoped(async { () });
126 ///
127 /// assert!(matches!(res, Err(err) if err.is_shutdown()));
128 /// ```
129 pub fn cancel(&mut self) {
130 self.drain_incoming();
131 self.incoming.borrow_mut().take();
132 }
133
134 /// Returns a future that polls the scope until the given future `fut` is ready.
135 ///
136 /// The returned future will guarantee that at least some progress is made
137 /// on the futures spawned on this scope, by polling all these futures at least once.
138 ///
139 /// # Examples
140 /// ```
141 /// # use futures::future::{FutureExt, pending};
142 /// # use futures::task::LocalSpawnExt;
143 /// # use futures::executor::block_on;
144 /// # use std::cell::RefCell;
145 /// # use futures_scopes::local::LocalScope;
146 /// let counter = RefCell::new(0);
147 ///
148 /// let mut scope = LocalScope::new();
149 /// let spawner = scope.spawner();
150 /// for _ in 0..10 {
151 /// spawner.spawn_local_scoped(async {
152 /// *counter.borrow_mut() += 1;
153 /// }).unwrap();
154 /// }
155 ///
156 /// // Spawning a never-ready future will not hinder .until from returning
157 /// spawner.spawn_local_scoped(pending()).unwrap();
158 ///
159 /// block_on(scope.until(async {
160 /// // at least one future has been polled ready
161 /// assert!(*counter.borrow() == 10);
162 /// }));
163 /// ```
164 ///
165 pub fn until<Fut: Future>(&mut self, fut: Fut) -> Until<'_, 'sc, T, Fut> {
166 Until {
167 scope: self,
168 future: fut,
169 }
170 }
171
172 /// Returns a future that polls the scope until no further progress can be made,
173 /// because all spawned futures are pending or the scope is empty.
174 ///
175 /// Guarantees that all spawned futures are polled at least once.
176 ///
177 /// # Examples
178 /// ```
179 /// # use futures::future::{FutureExt, pending};
180 /// # use futures::task::LocalSpawnExt;
181 /// # use futures::executor::block_on;
182 /// # use std::cell::RefCell;
183 /// # use futures::channel::oneshot;
184 /// # use futures_scopes::local::LocalScope;
185 /// let counter = RefCell::new(0);
186 /// let (sx, rx) = oneshot::channel();
187 ///
188 /// let mut scope = LocalScope::new();
189 /// scope.spawner().spawn_local_scoped(async {
190 /// *counter.borrow_mut() += 1;
191 ///
192 /// // wait until the oneshot is ready
193 /// rx.await.unwrap();
194 ///
195 /// *counter.borrow_mut() += 1;
196 /// }).unwrap();
197 ///
198 /// assert!(*counter.borrow() == 0);
199 ///
200 /// block_on(scope.until_stalled());
201 ///
202 /// // scope is stalled because the future is waiting for rx
203 /// assert!(*counter.borrow() == 1);
204 /// sx.send(()).unwrap();
205 ///
206 /// block_on(scope.until_stalled());
207 ///
208 /// // scope is empty because the future has finished
209 /// assert!(*counter.borrow() == 2);
210 /// ```
211 ///
212 pub fn until_stalled(&mut self) -> UntilStalled<'_, 'sc, T> {
213 UntilStalled { scope: self }
214 }
215
216 /// Returns a future that polls the scope until all spawned futures have been polled ready.
217 ///
218 /// # Examples
219 /// ```
220 /// # use futures::future::{FutureExt, pending};
221 /// # use futures::task::LocalSpawnExt;
222 /// # use futures::executor::block_on;
223 /// # use std::cell::RefCell;
224 /// # use futures_scopes::local::LocalScope;
225 /// let counter = RefCell::new(0);
226 ///
227 /// let mut scope = LocalScope::new();
228 /// let spawner = scope.spawner();
229 /// for _ in 0..10 {
230 /// spawner.spawn_local_scoped(async {
231 /// *counter.borrow_mut() += 1;
232 /// }).unwrap();
233 /// }
234 ///
235 /// // Spawning a never-ready future will block .until_empty from ever returning
236 /// // spawner.spawn_local_scoped(pending()).unwrap();
237 ///
238 /// block_on(scope.until_empty());
239 ///
240 /// // all futures have been polled ready
241 /// assert!(*counter.borrow() == 10);
242 /// ```
243 ///
244 pub fn until_empty(&mut self) -> UntilEmpty<'_, 'sc, T> {
245 UntilEmpty { scope: self }
246 }
247
248 /// Returns a reference to the results of all futures that have been polled ready until now.
249 ///
250 /// Results are not ordered in any specific way.
251 ///
252 /// To take ownership of the results, use [`take_results`](Self::take_results).
253 ///
254 /// # Examples
255 ///
256 /// ```
257 /// # use futures::future::{FutureExt, pending};
258 /// # use futures::task::LocalSpawnExt;
259 /// # use futures::executor::block_on;
260 /// # use futures_scopes::local::LocalScope;
261 ///
262 /// let mut scope = LocalScope::new();
263 /// let spawner = scope.spawner();
264 /// for i in 0..5 {
265 /// spawner.spawn_local_scoped(async move {
266 /// i
267 /// }).unwrap();
268 /// }
269 ///
270 /// block_on(scope.until_empty());
271 ///
272 /// assert_eq!(scope.results(), &[0, 1, 2, 3, 4]);
273 pub fn results(&self) -> &Vec<T> {
274 &self.result
275 }
276
277 /// Returns the results of all futures that have been polled ready until now.
278 /// This removes the results from the scope.
279 /// This does not hinder future results from being added to the scope.
280 ///
281 /// Results are not ordered in any specific way.
282 ///
283 /// To **not** take ownership of the results, use [`results`](Self::results).
284 ///
285 /// # Examples
286 ///
287 /// ```
288 /// # use futures::future::{FutureExt, pending};
289 /// # use futures::task::LocalSpawnExt;
290 /// # use futures::executor::block_on;
291 /// # use futures_scopes::local::LocalScope;
292 ///
293 /// let mut scope = LocalScope::new();
294 /// let spawner = scope.spawner();
295 /// for i in 0..5 {
296 /// spawner.spawn_local_scoped(async move {
297 /// i
298 /// }).unwrap();
299 /// }
300 ///
301 /// block_on(scope.until_empty());
302 ///
303 /// assert_eq!(scope.take_results(), vec![0, 1, 2, 3, 4]);
304 pub fn take_results(mut self) -> Vec<T> {
305 mem::take(&mut self.result)
306 }
307
308 fn drain_incoming(&mut self) -> bool {
309 let mut incoming = self.incoming.borrow_mut();
310 if let Some(pad) = incoming.as_mut() {
311 pad.waker = None;
312 let has_incoming = !pad.futures.is_empty();
313 self.futures.extend(pad.futures.drain(..));
314 has_incoming
315 } else {
316 false
317 }
318 }
319
320 fn register_waker_on_incoming(&mut self, cx: &mut Context<'_>) {
321 if let Some(pad) = self.incoming.borrow_mut().as_mut() {
322 pad.waker = Some(cx.waker().clone());
323 }
324 }
325}
326
327impl<'sc, T> Drop for LocalScope<'sc, T> {
328 fn drop(&mut self) {
329 // Close the stream so that spawners can give a shutdown error
330 self.incoming.borrow_mut().take();
331 }
332}
333
334impl<'sc, T> Default for LocalScope<'sc, T> {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340/// Future returned by [`until`](LocalScope::until).
341#[pin_project]
342#[derive(Debug)]
343pub struct Until<'s, 'sc, T, Fut> {
344 scope: &'s mut LocalScope<'sc, T>,
345 #[pin]
346 future: Fut,
347}
348
349impl<'s, 'sc, T, Fut: Future> Future for Until<'s, 'sc, T, Fut> {
350 type Output = Fut::Output;
351
352 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
353 let this = self.as_mut().project();
354 let scope = this.scope.deref_mut();
355
356 loop {
357 scope.drain_incoming();
358 match scope.futures.poll_next_unpin(cx) {
359 Poll::Ready(Some(result)) => {
360 scope.result.push(result);
361 continue;
362 }
363 Poll::Ready(None) => break,
364 Poll::Pending => break,
365 };
366 }
367
368 match this.future.poll(cx) {
369 Poll::Ready(result) => Poll::Ready(result),
370 Poll::Pending => {
371 scope.register_waker_on_incoming(cx);
372 Poll::Pending
373 }
374 }
375 }
376}
377
378/// Future returned by [`until_stalled`](LocalScope::until_stalled).
379#[derive(Debug)]
380pub struct UntilStalled<'s, 'sc, T> {
381 scope: &'s mut LocalScope<'sc, T>,
382}
383
384impl<'s, 'sc, T> Future for UntilStalled<'s, 'sc, T> {
385 type Output = ();
386
387 fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
388 let scope = self.scope.deref_mut();
389 let waker = noop_waker();
390 let mut noop_ctx = Context::from_waker(&waker);
391 let mut polled_once = false;
392 loop {
393 if !scope.drain_incoming() && polled_once {
394 return Poll::Ready(());
395 }
396
397 match scope.futures.poll_next_unpin(&mut noop_ctx) {
398 Poll::Ready(Some(result)) => {
399 scope.result.push(result);
400 continue;
401 }
402 Poll::Ready(None) => (),
403 Poll::Pending => (),
404 };
405 polled_once = true;
406 }
407 }
408}
409
410/// Future returned by [`until_empty`](LocalScope::until_empty).
411#[derive(Debug)]
412pub struct UntilEmpty<'s, 'sc, T> {
413 scope: &'s mut LocalScope<'sc, T>,
414}
415
416impl<'s, 'sc, T> Future for UntilEmpty<'s, 'sc, T> {
417 type Output = ();
418
419 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
420 let scope = self.scope.deref_mut();
421 loop {
422 let result = match scope.futures.poll_next_unpin(cx) {
423 Poll::Ready(Some(result)) => {
424 scope.result.push(result);
425 continue;
426 }
427 Poll::Ready(None) => Poll::Ready(()),
428 Poll::Pending => Poll::Pending,
429 };
430
431 if !scope.drain_incoming() {
432 if result.is_pending() {
433 scope.register_waker_on_incoming(cx);
434 }
435 return result;
436 }
437 }
438 }
439}
440
441/// A spawner that can be obtained from [`LocalScope::spawner`].
442///
443/// This spawner may live longer then the scope.
444/// In case a future is spawned after the scope has been canceled or dropped,
445/// the spawner will return [`SpawnError::shutdown`].
446#[derive(Debug)]
447pub struct LocalScopeSpawner<'sc, T> {
448 scope: IncomingPadRef<'sc, T>,
449}
450
451impl<'sc, T> Clone for LocalScopeSpawner<'sc, T> {
452 fn clone(&self) -> Self {
453 Self {
454 scope: self.scope.clone(),
455 }
456 }
457}
458
459impl<'sc, T> LocalScopeSpawner<'sc, T> {
460 /// Spawns a task that polls the given local future.
461 ///
462 /// # Errors
463 ///
464 /// This method returns a [`Result`] that contains a [`SpawnError`] if spawning fails.
465 pub fn spawn_scoped_local_obj(&self, future: LocalFutureObj<'sc, T>) -> Result<(), SpawnError> {
466 let mut incoming = self.scope.borrow_mut();
467 if let Some(incoming) = incoming.as_mut() {
468 incoming.push(future);
469 Ok(())
470 } else {
471 Err(SpawnError::shutdown())
472 }
473 }
474
475 /// Spawns a task that polls the given local future.
476 ///
477 /// # Errors
478 ///
479 /// This method returns a [`Result`] that contains a [`SpawnError`] if spawning fails.
480 pub fn spawn_local_scoped<Fut>(&self, future: Fut) -> Result<(), SpawnError>
481 where
482 Fut: Future<Output = T> + 'sc,
483 {
484 self.spawn_scoped_local_obj(LocalFutureObj::new(Box::new(future)))
485 }
486}
487
488impl<'sc> LocalScopeSpawner<'sc, ()> {
489 /// Spawns a task that polls the given local future and returns a handle to its output.
490 ///
491 /// # Errors
492 ///
493 /// This method returns a [`Result`] that contains a [`SpawnError`] if spawning fails.
494 pub fn spawn_local_scoped_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
495 where
496 Fut: Future + 'sc,
497 {
498 let (remote, handle) = future.remote_handle();
499 let _: () = self.spawn_local_scoped(remote)?;
500 Ok(handle)
501 }
502}
503
504impl<'sc, T> ScopedSpawn<'sc, T> for LocalScopeSpawner<'sc, T> {
505 fn spawn_obj_scoped(&self, future: FutureObj<'sc, T>) -> Result<(), SpawnError> {
506 self.spawn_scoped_local_obj(future.into())
507 }
508
509 fn status_scoped(&self) -> Result<(), SpawnError> {
510 if self.scope.borrow().is_some() {
511 Ok(())
512 } else {
513 Err(SpawnError::shutdown())
514 }
515 }
516}
517
518impl<'sc> LocalSpawn for LocalScopeSpawner<'sc, ()> {
519 fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
520 self.spawn_scoped_local_obj(future)
521 }
522
523 fn status_local(&self) -> Result<(), SpawnError> {
524 self.status_scoped()
525 }
526}
527
528impl<'sc> Spawn for LocalScopeSpawner<'sc, ()> {
529 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
530 self.spawn_obj_scoped(future)
531 }
532
533 fn status(&self) -> Result<(), SpawnError> {
534 self.status_scoped()
535 }
536}