Skip to main content

dapr_durabletask/task/
when_any.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use super::completable_task::CompletableTask;
6
7/// A future that completes when ANY task completes (success or failure).
8/// Returns the index of the first completed task.
9pub struct WhenAnyTask {
10    pub(crate) tasks: Vec<CompletableTask>,
11}
12
13impl Future for WhenAnyTask {
14    type Output = crate::api::Result<usize>;
15
16    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
17        let this = self.get_mut();
18
19        // Single pass: poll registers wakers and detects the first ready task.
20        for (i, task) in this.tasks.iter_mut().enumerate() {
21            match Pin::new(task).poll(cx) {
22                Poll::Ready(_) => return Poll::Ready(Ok(i)),
23                Poll::Pending => {}
24            }
25        }
26
27        Poll::Pending
28    }
29}
30
31/// Wait for any task to complete. Returns the index of the first completed task.
32pub fn when_any(tasks: Vec<CompletableTask>) -> WhenAnyTask {
33    WhenAnyTask { tasks }
34}
35
36#[cfg(test)]
37mod tests {
38    use super::*;
39    use crate::api::FailureDetails;
40    use std::task::Waker;
41
42    fn noop_waker() -> Waker {
43        Waker::noop().clone()
44    }
45
46    #[test]
47    fn test_when_any_first_complete() {
48        let t1 = CompletableTask::new();
49        let t2 = CompletableTask::new();
50        t1.complete(Some("first".to_string()));
51
52        let waker = noop_waker();
53        let mut cx = Context::from_waker(&waker);
54        let mut fut = when_any(vec![t1, t2]);
55        match Pin::new(&mut fut).poll(&mut cx) {
56            Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
57            other => panic!("expected Ready(Ok(0)), got {other:?}"),
58        }
59    }
60
61    #[test]
62    fn test_when_any_second_complete() {
63        let t1 = CompletableTask::new();
64        let t2 = CompletableTask::new();
65        t2.complete(Some("second".to_string()));
66
67        let waker = noop_waker();
68        let mut cx = Context::from_waker(&waker);
69        let mut fut = when_any(vec![t1, t2]);
70        match Pin::new(&mut fut).poll(&mut cx) {
71            Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
72            other => panic!("expected Ready(Ok(1)), got {other:?}"),
73        }
74    }
75
76    #[test]
77    fn test_when_any_pending_then_ready() {
78        let t1 = CompletableTask::new();
79        let t2 = CompletableTask::new();
80
81        let waker = noop_waker();
82        let mut cx = Context::from_waker(&waker);
83        let mut fut = when_any(vec![t1.clone(), t2]);
84        assert!(Pin::new(&mut fut).poll(&mut cx).is_pending());
85
86        t1.complete(None);
87        match Pin::new(&mut fut).poll(&mut cx) {
88            Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
89            other => panic!("expected Ready(Ok(0)), got {other:?}"),
90        }
91    }
92
93    #[test]
94    fn test_when_any_failed_task_counts() {
95        let t1 = CompletableTask::new();
96        let t2 = CompletableTask::new();
97        t2.fail(FailureDetails {
98            message: "boom".to_string(),
99            error_type: "Error".to_string(),
100            stack_trace: None,
101        });
102
103        let waker = noop_waker();
104        let mut cx = Context::from_waker(&waker);
105        let mut fut = when_any(vec![t1, t2]);
106        match Pin::new(&mut fut).poll(&mut cx) {
107            Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
108            other => panic!("expected Ready(Ok(1)), got {other:?}"),
109        }
110    }
111}