dapr_durabletask/task/
when_any.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use super::completable_task::CompletableTask;
6
7pub struct WhenAnyTask {
10 tasks: Vec<CompletableTask>,
11}
12
13impl WhenAnyTask {
14 pub fn new(tasks: Vec<CompletableTask>) -> Self {
15 Self { tasks }
16 }
17}
18
19impl Future for WhenAnyTask {
20 type Output = crate::api::Result<usize>;
21
22 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
23 let this = self.get_mut();
24
25 for (i, task) in this.tasks.iter_mut().enumerate() {
27 match Pin::new(task).poll(cx) {
28 Poll::Ready(_) => return Poll::Ready(Ok(i)),
29 Poll::Pending => {}
30 }
31 }
32
33 Poll::Pending
34 }
35}
36
37pub fn when_any(tasks: Vec<CompletableTask>) -> WhenAnyTask {
39 WhenAnyTask::new(tasks)
40}
41
42#[cfg(test)]
43mod tests {
44 use super::*;
45 use crate::api::FailureDetails;
46 use std::task::Waker;
47
48 fn noop_waker() -> Waker {
49 Waker::noop().clone()
50 }
51
52 #[test]
53 fn test_when_any_first_complete() {
54 let t1 = CompletableTask::new();
55 let t2 = CompletableTask::new();
56 t1.complete(Some("first".to_string()));
57
58 let waker = noop_waker();
59 let mut cx = Context::from_waker(&waker);
60 let mut fut = when_any(vec![t1, t2]);
61 match Pin::new(&mut fut).poll(&mut cx) {
62 Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
63 other => panic!("expected Ready(Ok(0)), got {:?}", other),
64 }
65 }
66
67 #[test]
68 fn test_when_any_second_complete() {
69 let t1 = CompletableTask::new();
70 let t2 = CompletableTask::new();
71 t2.complete(Some("second".to_string()));
72
73 let waker = noop_waker();
74 let mut cx = Context::from_waker(&waker);
75 let mut fut = when_any(vec![t1, t2]);
76 match Pin::new(&mut fut).poll(&mut cx) {
77 Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
78 other => panic!("expected Ready(Ok(1)), got {:?}", other),
79 }
80 }
81
82 #[test]
83 fn test_when_any_pending_then_ready() {
84 let t1 = CompletableTask::new();
85 let t2 = CompletableTask::new();
86
87 let waker = noop_waker();
88 let mut cx = Context::from_waker(&waker);
89 let mut fut = when_any(vec![t1.clone(), t2]);
90 assert!(Pin::new(&mut fut).poll(&mut cx).is_pending());
91
92 t1.complete(None);
93 match Pin::new(&mut fut).poll(&mut cx) {
94 Poll::Ready(Ok(idx)) => assert_eq!(idx, 0),
95 other => panic!("expected Ready(Ok(0)), got {:?}", other),
96 }
97 }
98
99 #[test]
100 fn test_when_any_failed_task_counts() {
101 let t1 = CompletableTask::new();
102 let t2 = CompletableTask::new();
103 t2.fail(FailureDetails {
104 message: "boom".to_string(),
105 error_type: "Error".to_string(),
106 stack_trace: None,
107 });
108
109 let waker = noop_waker();
110 let mut cx = Context::from_waker(&waker);
111 let mut fut = when_any(vec![t1, t2]);
112 match Pin::new(&mut fut).poll(&mut cx) {
113 Poll::Ready(Ok(idx)) => assert_eq!(idx, 1),
114 other => panic!("expected Ready(Ok(1)), got {:?}", other),
115 }
116 }
117}