restate_sdk/context/select_any.rs
1use crate::context::DurableFuture;
2use crate::errors::TerminalError;
3
4/// A collection of durable futures that yields results as they complete,
5/// similar to [`futures::stream::FuturesUnordered`].
6///
7/// Each future is assigned a stable index when pushed, which is returned
8/// alongside the result from [`next`](Self::next) so you can correlate
9/// outputs with inputs.
10///
11/// # Example
12///
13/// ```rust,no_run
14/// # use restate_sdk::prelude::*;
15/// # use std::time::Duration;
16/// #
17/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
18/// let labels = vec!["fast", "medium", "slow"];
19/// let durations = vec![1, 2, 3];
20///
21/// let mut futures = DurableFuturesUnordered::new();
22/// for secs in &durations {
23/// futures.push(ctx.sleep(Duration::from_secs(*secs)));
24/// }
25///
26/// while let Some((index, result)) = futures.next().await? {
27/// result?;
28/// println!("{} timer done!", labels[index]);
29/// }
30/// # Ok(())
31/// # }
32/// ```
33///
34/// You can also collect into it from an iterator:
35///
36/// ```rust,no_run
37/// # use restate_sdk::prelude::*;
38/// # use std::time::Duration;
39/// #
40/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
41/// let durations = vec![1, 2, 3];
42/// let mut futures: DurableFuturesUnordered<_> = durations
43/// .iter()
44/// .map(|secs| ctx.sleep(Duration::from_secs(*secs)))
45/// .collect();
46///
47/// while let Some((index, result)) = futures.next().await? {
48/// result?;
49/// }
50/// # Ok(())
51/// # }
52/// ```
53pub struct DurableFuturesUnordered<F> {
54 futures: Vec<(usize, F)>,
55 next_index: usize,
56}
57
58impl<F> DurableFuturesUnordered<F> {
59 /// Create an empty collection.
60 pub fn new() -> Self {
61 Self {
62 futures: Vec::new(),
63 next_index: 0,
64 }
65 }
66
67 /// Add a durable future to the collection.
68 /// Returns the stable index assigned to this future.
69 pub fn push(&mut self, future: F) -> usize {
70 let index = self.next_index;
71 self.next_index += 1;
72 self.futures.push((index, future));
73 index
74 }
75
76 /// Returns `true` if there are no remaining futures.
77 pub fn is_empty(&self) -> bool {
78 self.futures.is_empty()
79 }
80
81 /// Returns the number of remaining futures.
82 pub fn len(&self) -> usize {
83 self.futures.len()
84 }
85}
86
87impl<F> Default for DurableFuturesUnordered<F> {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl<F> FromIterator<F> for DurableFuturesUnordered<F> {
94 fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
95 let futures: Vec<_> = iter.into_iter().enumerate().collect();
96 let next_index = futures.len();
97 Self {
98 futures,
99 next_index,
100 }
101 }
102}
103
104impl<F: DurableFuture> DurableFuturesUnordered<F> {
105 /// Await the next completed future.
106 ///
107 /// Returns `Ok(None)` if there are no remaining futures.
108 /// Returns `Err(TerminalError)` if the invocation was cancelled.
109 /// Returns `Ok(Some((index, output)))` with the stable index and output
110 /// of the next completed future.
111 pub async fn next(&mut self) -> Result<Option<(usize, F::Output)>, TerminalError> {
112 if self.futures.is_empty() {
113 return Ok(None);
114 }
115
116 let handles: Vec<_> = self.futures.iter().map(|(_, f)| f.handle()).collect();
117 let ctx = self.futures[0].1.inner_context();
118 let pos = ctx.select(handles).await?;
119 let (index, future) = self.futures.swap_remove(pos);
120 let output = future.await;
121 Ok(Some((index, output)))
122 }
123}