futures_scopes/
local.rs

1use std::cell::RefCell;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::mem;
5use std::ops::DerefMut;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll, Waker};
9
10use futures::future::RemoteHandle;
11use futures::stream::FuturesUnordered;
12use futures::task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, noop_waker};
13use futures::{FutureExt, StreamExt};
14use pin_project::pin_project;
15
16use crate::ScopedSpawn;
17
18#[derive(Debug)]
19struct IncomingPad<'sc, T> {
20    futures: VecDeque<LocalFutureObj<'sc, T>>,
21    waker: Option<Waker>,
22}
23
24impl<'sc, T> IncomingPad<'sc, T> {
25    fn new() -> Self {
26        Self {
27            futures: VecDeque::new(),
28            waker: None,
29        }
30    }
31
32    fn push(&mut self, fut: LocalFutureObj<'sc, T>) {
33        self.futures.push_back(fut);
34        if let Some(waker) = self.waker.take() {
35            waker.wake();
36        }
37    }
38}
39
40type IncomingPadRef<'sc, T> = Rc<RefCell<Option<IncomingPad<'sc, T>>>>;
41
42/// A spawn scope that can be used to spawn non-send/sync futures of lifetime `'sc`.
43///
44/// Spawned futures are not processed until the scope is polled by awaiting one of the
45/// futures returned by [`until`](Self::until), [`until_stalled`](Self::until_stalled) or [`until_empty`](Self::until_stalled).
46///
47/// Spawned futures can return a result of type `T` that can be accessed via [`results`](Self::results) or [`take_results`](Self::take_results).
48///
49/// # Dropping
50/// If the scope is dropped, all futures that where spawned on it will be dropped as well.
51///
52/// # Examples
53///
54/// ```
55/// # use futures::future::FutureExt;
56/// # use futures::task::LocalSpawnExt;
57/// # use futures::executor::block_on;
58/// use futures_scopes::local::LocalScope;
59///
60/// let some_value = &42;
61///
62/// let mut scope = LocalScope::<usize>::new();
63/// let spawner = scope.spawner();
64/// spawner.spawn_local_scoped(async {
65///   // You can reference `some_value` here
66///   *some_value
67/// }).unwrap();
68///
69/// // Process the scope and wait until all futures have been completed
70/// block_on(scope.until_empty());
71///
72/// // use `scope.results()` to access the results of the spawned futures
73/// assert_eq!(scope.results(), &[42]);
74/// ```
75///
76#[derive(Debug)]
77pub struct LocalScope<'sc, T = ()> {
78    futures: FuturesUnordered<LocalFutureObj<'sc, T>>,
79    incoming: IncomingPadRef<'sc, T>,
80    result: Vec<T>,
81}
82
83impl<'sc, T> LocalScope<'sc, T> {
84    /// Creates a new spawn scope.
85    ///
86    /// Spawned futures can reference anything before the creation of the scope.
87    pub fn new() -> Self {
88        Self {
89            futures: FuturesUnordered::new(),
90            incoming: Rc::new(RefCell::new(Some(IncomingPad::new()))),
91            result: Vec::new(),
92        }
93    }
94
95    /// Get a cloneable spawner that can be used to spawn local futures to this scope.
96    ///
97    /// This spawner can live longer then the scope.
98    /// In case a future is spawned after the scope has been dropped, the spawner will return [`SpawnError::shutdown`].
99    pub fn spawner(&self) -> LocalScopeSpawner<'sc, T> {
100        LocalScopeSpawner {
101            scope: self.incoming.clone(),
102        }
103    }
104
105    /// Drops all spawned futures and prevents new futures from being spawned.
106    ///
107    /// In case a future is spawned after `cancel` has been called, the spawner will return [`SpawnError::shutdown`].
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// # use futures::future::FutureExt;
113    /// # use futures::task::LocalSpawnExt;
114    /// # use futures::task::SpawnError;
115    /// # use futures::executor::block_on;
116    /// use futures_scopes::local::LocalScope;
117    ///
118    /// let mut scope = LocalScope::new();
119    /// let spawner = scope.spawner();
120    /// spawner.spawn_local_scoped(async {
121    ///  // ...
122    /// }).unwrap();
123    ///
124    /// scope.cancel();
125    /// let res = spawner.spawn_local_scoped(async { () });
126    ///
127    /// assert!(matches!(res, Err(err) if err.is_shutdown()));
128    /// ```
129    pub fn cancel(&mut self) {
130        self.drain_incoming();
131        self.incoming.borrow_mut().take();
132    }
133
134    /// Returns a future that polls the scope until the given future `fut` is ready.
135    ///
136    /// The returned future will guarantee that at least some progress is made
137    /// on the futures spawned on this scope, by polling all these futures at least once.
138    ///
139    /// # Examples
140    /// ```
141    /// # use futures::future::{FutureExt, pending};
142    /// # use futures::task::LocalSpawnExt;
143    /// # use futures::executor::block_on;
144    /// # use std::cell::RefCell;
145    /// # use futures_scopes::local::LocalScope;
146    /// let counter = RefCell::new(0);
147    ///
148    /// let mut scope = LocalScope::new();
149    /// let spawner = scope.spawner();
150    /// for _ in 0..10 {
151    ///   spawner.spawn_local_scoped(async {
152    ///     *counter.borrow_mut() += 1;
153    ///   }).unwrap();
154    /// }
155    ///
156    /// // Spawning a never-ready future will not hinder .until from returning
157    /// spawner.spawn_local_scoped(pending()).unwrap();
158    ///
159    /// block_on(scope.until(async {
160    ///   // at least one future has been polled ready
161    ///   assert!(*counter.borrow() == 10);
162    /// }));
163    /// ```
164    ///
165    pub fn until<Fut: Future>(&mut self, fut: Fut) -> Until<'_, 'sc, T, Fut> {
166        Until {
167            scope: self,
168            future: fut,
169        }
170    }
171
172    /// Returns a future that polls the scope until no further progress can be made,
173    /// because all spawned futures are pending or the scope is empty.
174    ///
175    /// Guarantees that all spawned futures are polled at least once.
176    ///
177    /// # Examples
178    /// ```
179    /// # use futures::future::{FutureExt, pending};
180    /// # use futures::task::LocalSpawnExt;
181    /// # use futures::executor::block_on;
182    /// # use std::cell::RefCell;
183    /// # use futures::channel::oneshot;
184    /// # use futures_scopes::local::LocalScope;
185    /// let counter = RefCell::new(0);
186    /// let (sx, rx) = oneshot::channel();
187    ///
188    /// let mut scope = LocalScope::new();
189    /// scope.spawner().spawn_local_scoped(async {
190    ///   *counter.borrow_mut() += 1;
191    ///
192    ///    // wait until the oneshot is ready
193    ///   rx.await.unwrap();
194    ///
195    ///   *counter.borrow_mut() += 1;
196    /// }).unwrap();
197    ///
198    /// assert!(*counter.borrow() == 0);
199    ///
200    /// block_on(scope.until_stalled());
201    ///
202    /// // scope is stalled because the future is waiting for rx
203    /// assert!(*counter.borrow() == 1);
204    /// sx.send(()).unwrap();
205    ///
206    /// block_on(scope.until_stalled());
207    ///
208    /// // scope is empty because the future has finished
209    /// assert!(*counter.borrow() == 2);
210    /// ```
211    ///
212    pub fn until_stalled(&mut self) -> UntilStalled<'_, 'sc, T> {
213        UntilStalled { scope: self }
214    }
215
216    /// Returns a future that polls the scope until all spawned futures have been polled ready.
217    ///
218    /// # Examples
219    /// ```
220    /// # use futures::future::{FutureExt, pending};
221    /// # use futures::task::LocalSpawnExt;
222    /// # use futures::executor::block_on;
223    /// # use std::cell::RefCell;
224    /// # use futures_scopes::local::LocalScope;
225    /// let counter = RefCell::new(0);
226    ///
227    /// let mut scope = LocalScope::new();
228    /// let spawner = scope.spawner();
229    /// for _ in 0..10 {
230    ///   spawner.spawn_local_scoped(async {
231    ///     *counter.borrow_mut() += 1;
232    ///   }).unwrap();
233    /// }
234    ///
235    /// // Spawning a never-ready future will block .until_empty from ever returning
236    /// // spawner.spawn_local_scoped(pending()).unwrap();
237    ///
238    /// block_on(scope.until_empty());
239    ///
240    /// // all futures have been polled ready
241    /// assert!(*counter.borrow() == 10);
242    /// ```
243    ///
244    pub fn until_empty(&mut self) -> UntilEmpty<'_, 'sc, T> {
245        UntilEmpty { scope: self }
246    }
247
248    /// Returns a reference to the results of all futures that have been polled ready until now.
249    ///
250    /// Results are not ordered in any specific way.
251    ///
252    /// To take ownership of the results, use [`take_results`](Self::take_results).
253    ///
254    /// # Examples
255    ///
256    /// ```
257    /// # use futures::future::{FutureExt, pending};
258    /// # use futures::task::LocalSpawnExt;
259    /// # use futures::executor::block_on;
260    /// # use futures_scopes::local::LocalScope;
261    ///
262    /// let mut scope = LocalScope::new();
263    /// let spawner = scope.spawner();
264    /// for i in 0..5 {
265    ///     spawner.spawn_local_scoped(async move {
266    ///        i
267    ///    }).unwrap();
268    /// }
269    ///
270    /// block_on(scope.until_empty());
271    ///
272    /// assert_eq!(scope.results(), &[0, 1, 2, 3, 4]);
273    pub fn results(&self) -> &Vec<T> {
274        &self.result
275    }
276
277    /// Returns the results of all futures that have been polled ready until now.
278    /// This removes the results from the scope.
279    /// This does not hinder future results from being added to the scope.
280    ///
281    /// Results are not ordered in any specific way.
282    ///
283    /// To **not** take ownership of the results, use [`results`](Self::results).
284    ///
285    /// # Examples
286    ///
287    /// ```
288    /// # use futures::future::{FutureExt, pending};
289    /// # use futures::task::LocalSpawnExt;
290    /// # use futures::executor::block_on;
291    /// # use futures_scopes::local::LocalScope;
292    ///
293    /// let mut scope = LocalScope::new();
294    /// let spawner = scope.spawner();
295    /// for i in 0..5 {
296    ///     spawner.spawn_local_scoped(async move {
297    ///        i
298    ///    }).unwrap();
299    /// }
300    ///
301    /// block_on(scope.until_empty());
302    ///
303    /// assert_eq!(scope.take_results(), vec![0, 1, 2, 3, 4]);
304    pub fn take_results(mut self) -> Vec<T> {
305        mem::take(&mut self.result)
306    }
307
308    fn drain_incoming(&mut self) -> bool {
309        let mut incoming = self.incoming.borrow_mut();
310        if let Some(pad) = incoming.as_mut() {
311            pad.waker = None;
312            let has_incoming = !pad.futures.is_empty();
313            self.futures.extend(pad.futures.drain(..));
314            has_incoming
315        } else {
316            false
317        }
318    }
319
320    fn register_waker_on_incoming(&mut self, cx: &mut Context<'_>) {
321        if let Some(pad) = self.incoming.borrow_mut().as_mut() {
322            pad.waker = Some(cx.waker().clone());
323        }
324    }
325}
326
327impl<'sc, T> Drop for LocalScope<'sc, T> {
328    fn drop(&mut self) {
329        // Close the stream so that spawners can give a shutdown error
330        self.incoming.borrow_mut().take();
331    }
332}
333
334impl<'sc, T> Default for LocalScope<'sc, T> {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340/// Future returned by [`until`](LocalScope::until).
341#[pin_project]
342#[derive(Debug)]
343pub struct Until<'s, 'sc, T, Fut> {
344    scope: &'s mut LocalScope<'sc, T>,
345    #[pin]
346    future: Fut,
347}
348
349impl<'s, 'sc, T, Fut: Future> Future for Until<'s, 'sc, T, Fut> {
350    type Output = Fut::Output;
351
352    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
353        let this = self.as_mut().project();
354        let scope = this.scope.deref_mut();
355
356        loop {
357            scope.drain_incoming();
358            match scope.futures.poll_next_unpin(cx) {
359                Poll::Ready(Some(result)) => {
360                    scope.result.push(result);
361                    continue;
362                }
363                Poll::Ready(None) => break,
364                Poll::Pending => break,
365            };
366        }
367
368        match this.future.poll(cx) {
369            Poll::Ready(result) => Poll::Ready(result),
370            Poll::Pending => {
371                scope.register_waker_on_incoming(cx);
372                Poll::Pending
373            }
374        }
375    }
376}
377
378/// Future returned by [`until_stalled`](LocalScope::until_stalled).
379#[derive(Debug)]
380pub struct UntilStalled<'s, 'sc, T> {
381    scope: &'s mut LocalScope<'sc, T>,
382}
383
384impl<'s, 'sc, T> Future for UntilStalled<'s, 'sc, T> {
385    type Output = ();
386
387    fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
388        let scope = self.scope.deref_mut();
389        let waker = noop_waker();
390        let mut noop_ctx = Context::from_waker(&waker);
391        let mut polled_once = false;
392        loop {
393            if !scope.drain_incoming() && polled_once {
394                return Poll::Ready(());
395            }
396
397            match scope.futures.poll_next_unpin(&mut noop_ctx) {
398                Poll::Ready(Some(result)) => {
399                    scope.result.push(result);
400                    continue;
401                }
402                Poll::Ready(None) => (),
403                Poll::Pending => (),
404            };
405            polled_once = true;
406        }
407    }
408}
409
410/// Future returned by [`until_empty`](LocalScope::until_empty).
411#[derive(Debug)]
412pub struct UntilEmpty<'s, 'sc, T> {
413    scope: &'s mut LocalScope<'sc, T>,
414}
415
416impl<'s, 'sc, T> Future for UntilEmpty<'s, 'sc, T> {
417    type Output = ();
418
419    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
420        let scope = self.scope.deref_mut();
421        loop {
422            let result = match scope.futures.poll_next_unpin(cx) {
423                Poll::Ready(Some(result)) => {
424                    scope.result.push(result);
425                    continue;
426                }
427                Poll::Ready(None) => Poll::Ready(()),
428                Poll::Pending => Poll::Pending,
429            };
430
431            if !scope.drain_incoming() {
432                if result.is_pending() {
433                    scope.register_waker_on_incoming(cx);
434                }
435                return result;
436            }
437        }
438    }
439}
440
441/// A spawner that can be obtained from [`LocalScope::spawner`].
442///
443/// This spawner may live longer then the scope.
444/// In case a future is spawned after the scope has been canceled or dropped,
445/// the spawner will return [`SpawnError::shutdown`].
446#[derive(Debug)]
447pub struct LocalScopeSpawner<'sc, T> {
448    scope: IncomingPadRef<'sc, T>,
449}
450
451impl<'sc, T> Clone for LocalScopeSpawner<'sc, T> {
452    fn clone(&self) -> Self {
453        Self {
454            scope: self.scope.clone(),
455        }
456    }
457}
458
459impl<'sc, T> LocalScopeSpawner<'sc, T> {
460    /// Spawns a task that polls the given local future.
461    ///
462    /// # Errors
463    ///
464    /// This method returns a [`Result`] that contains a [`SpawnError`] if spawning fails.
465    pub fn spawn_scoped_local_obj(&self, future: LocalFutureObj<'sc, T>) -> Result<(), SpawnError> {
466        let mut incoming = self.scope.borrow_mut();
467        if let Some(incoming) = incoming.as_mut() {
468            incoming.push(future);
469            Ok(())
470        } else {
471            Err(SpawnError::shutdown())
472        }
473    }
474
475    /// Spawns a task that polls the given local future.
476    ///
477    /// # Errors
478    ///
479    /// This method returns a [`Result`] that contains a [`SpawnError`] if spawning fails.
480    pub fn spawn_local_scoped<Fut>(&self, future: Fut) -> Result<(), SpawnError>
481    where
482        Fut: Future<Output = T> + 'sc,
483    {
484        self.spawn_scoped_local_obj(LocalFutureObj::new(Box::new(future)))
485    }
486}
487
488impl<'sc> LocalScopeSpawner<'sc, ()> {
489    /// Spawns a task that polls the given local future and returns a handle to its output.
490    ///
491    /// # Errors
492    ///
493    /// This method returns a [`Result`] that contains a [`SpawnError`] if spawning fails.
494    pub fn spawn_local_scoped_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
495    where
496        Fut: Future + 'sc,
497    {
498        let (remote, handle) = future.remote_handle();
499        let _: () = self.spawn_local_scoped(remote)?;
500        Ok(handle)
501    }
502}
503
504impl<'sc, T> ScopedSpawn<'sc, T> for LocalScopeSpawner<'sc, T> {
505    fn spawn_obj_scoped(&self, future: FutureObj<'sc, T>) -> Result<(), SpawnError> {
506        self.spawn_scoped_local_obj(future.into())
507    }
508
509    fn status_scoped(&self) -> Result<(), SpawnError> {
510        if self.scope.borrow().is_some() {
511            Ok(())
512        } else {
513            Err(SpawnError::shutdown())
514        }
515    }
516}
517
518impl<'sc> LocalSpawn for LocalScopeSpawner<'sc, ()> {
519    fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
520        self.spawn_scoped_local_obj(future)
521    }
522
523    fn status_local(&self) -> Result<(), SpawnError> {
524        self.status_scoped()
525    }
526}
527
528impl<'sc> Spawn for LocalScopeSpawner<'sc, ()> {
529    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
530        self.spawn_obj_scoped(future)
531    }
532
533    fn status(&self) -> Result<(), SpawnError> {
534        self.status_scoped()
535    }
536}