Skip to main content

dapr_durabletask/task/
when_any.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use super::completable_task::CompletableTask;
6
7/// A future that completes when ANY task completes (success or failure).
8/// Returns the index of the first completed task.
9pub struct WhenAnyTask {
10    tasks: Vec<CompletableTask>,
11}
12
13impl WhenAnyTask {
14    pub fn new(tasks: Vec<CompletableTask>) -> Self {
15        Self { tasks }
16    }
17}
18
19impl Future for WhenAnyTask {
20    type Output = crate::api::Result<usize>;
21
22    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
23        let this = self.get_mut();
24
25        // Single pass: poll registers wakers and detects the first ready task.
26        for (i, task) in this.tasks.iter_mut().enumerate() {
27            match Pin::new(task).poll(cx) {
28                Poll::Ready(_) => return Poll::Ready(Ok(i)),
29                Poll::Pending => {}
30            }
31        }
32
33        Poll::Pending
34    }
35}
36
37/// Wait for any task to complete. Returns the index of the first completed task.
38pub fn when_any(tasks: Vec<CompletableTask>) -> WhenAnyTask {
39    WhenAnyTask::new(tasks)
40}
41
42#[cfg(test)]
43mod tests {
44    use super::*;
45    use crate::api::FailureDetails;
46    use std::task::Waker;
47
48    fn noop_waker() -> Waker {
49        Waker::noop().clone()
50    }
51
52    #[test]
53    fn test_when_any_first_complete() {
54        let t1 = CompletableTask::new();
55        let t2 = CompletableTask::new();
56        t1.complete(Some("first".to_string()));
57
58        let waker = noop_waker();
59        let mut cx = Context::from_waker(&waker);
60        let mut fut = when_any(vec![t1, t2]);
61        match Pin::new(&mut fut).poll(&mut cx) {
62            Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
63            other => panic!("expected Ready(Ok(0)), got {:?}", other),
64        }
65    }
66
67    #[test]
68    fn test_when_any_second_complete() {
69        let t1 = CompletableTask::new();
70        let t2 = CompletableTask::new();
71        t2.complete(Some("second".to_string()));
72
73        let waker = noop_waker();
74        let mut cx = Context::from_waker(&waker);
75        let mut fut = when_any(vec![t1, t2]);
76        match Pin::new(&mut fut).poll(&mut cx) {
77            Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
78            other => panic!("expected Ready(Ok(1)), got {:?}", other),
79        }
80    }
81
82    #[test]
83    fn test_when_any_pending_then_ready() {
84        let t1 = CompletableTask::new();
85        let t2 = CompletableTask::new();
86
87        let waker = noop_waker();
88        let mut cx = Context::from_waker(&waker);
89        let mut fut = when_any(vec![t1.clone(), t2]);
90        assert!(Pin::new(&mut fut).poll(&mut cx).is_pending());
91
92        t1.complete(None);
93        match Pin::new(&mut fut).poll(&mut cx) {
94            Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
95            other => panic!("expected Ready(Ok(0)), got {:?}", other),
96        }
97    }
98
99    #[test]
100    fn test_when_any_failed_task_counts() {
101        let t1 = CompletableTask::new();
102        let t2 = CompletableTask::new();
103        t2.fail(FailureDetails {
104            message: "boom".to_string(),
105            error_type: "Error".to_string(),
106            stack_trace: None,
107        });
108
109        let waker = noop_waker();
110        let mut cx = Context::from_waker(&waker);
111        let mut fut = when_any(vec![t1, t2]);
112        match Pin::new(&mut fut).poll(&mut cx) {
113            Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
114            other => panic!("expected Ready(Ok(1)), got {:?}", other),
115        }
116    }
117}