dapr_durabletask/task/
when_any.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use super::completable_task::CompletableTask;
6
7pub struct WhenAnyTask {
10 pub(crate) tasks: Vec<CompletableTask>,
11}
12
13impl Future for WhenAnyTask {
14 type Output = crate::api::Result<usize>;
15
16 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
17 let this = self.get_mut();
18
19 for (i, task) in this.tasks.iter_mut().enumerate() {
21 match Pin::new(task).poll(cx) {
22 Poll::Ready(_) => return Poll::Ready(Ok(i)),
23 Poll::Pending => {}
24 }
25 }
26
27 Poll::Pending
28 }
29}
30
31pub fn when_any(tasks: Vec<CompletableTask>) -> WhenAnyTask {
33 WhenAnyTask { tasks }
34}
35
36#[cfg(test)]
37mod tests {
38 use super::*;
39 use crate::api::FailureDetails;
40 use std::task::Waker;
41
42 fn noop_waker() -> Waker {
43 Waker::noop().clone()
44 }
45
46 #[test]
47 fn test_when_any_first_complete() {
48 let t1 = CompletableTask::new();
49 let t2 = CompletableTask::new();
50 t1.complete(Some("first".to_string()));
51
52 let waker = noop_waker();
53 let mut cx = Context::from_waker(&waker);
54 let mut fut = when_any(vec![t1, t2]);
55 match Pin::new(&mut fut).poll(&mut cx) {
56 Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
57 other => panic!("expected Ready(Ok(0)), got {other:?}"),
58 }
59 }
60
61 #[test]
62 fn test_when_any_second_complete() {
63 let t1 = CompletableTask::new();
64 let t2 = CompletableTask::new();
65 t2.complete(Some("second".to_string()));
66
67 let waker = noop_waker();
68 let mut cx = Context::from_waker(&waker);
69 let mut fut = when_any(vec![t1, t2]);
70 match Pin::new(&mut fut).poll(&mut cx) {
71 Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
72 other => panic!("expected Ready(Ok(1)), got {other:?}"),
73 }
74 }
75
76 #[test]
77 fn test_when_any_pending_then_ready() {
78 let t1 = CompletableTask::new();
79 let t2 = CompletableTask::new();
80
81 let waker = noop_waker();
82 let mut cx = Context::from_waker(&waker);
83 let mut fut = when_any(vec![t1.clone(), t2]);
84 assert!(Pin::new(&mut fut).poll(&mut cx).is_pending());
85
86 t1.complete(None);
87 match Pin::new(&mut fut).poll(&mut cx) {
88 Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
89 other => panic!("expected Ready(Ok(0)), got {other:?}"),
90 }
91 }
92
93 #[test]
94 fn test_when_any_failed_task_counts() {
95 let t1 = CompletableTask::new();
96 let t2 = CompletableTask::new();
97 t2.fail(FailureDetails {
98 message: "boom".to_string(),
99 error_type: "Error".to_string(),
100 stack_trace: None,
101 });
102
103 let waker = noop_waker();
104 let mut cx = Context::from_waker(&waker);
105 let mut fut = when_any(vec![t1, t2]);
106 match Pin::new(&mut fut).poll(&mut cx) {
107 Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
108 other => panic!("expected Ready(Ok(1)), got {other:?}"),
109 }
110 }
111}