dapr_durabletask/task/
completable_task.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll, Waker};
5
6use crate::api::{DurableTaskError, FailureDetails};
7
8#[derive(Debug, Clone)]
10pub enum TaskResult {
11 Completed(Option<String>),
13 Failed(FailureDetails),
15}
16
17struct CompletableTaskInner {
18 result: Option<TaskResult>,
19 waker: Option<Waker>,
20}
21
22#[derive(Clone)]
28pub struct CompletableTask {
29 inner: Arc<Mutex<CompletableTaskInner>>,
30}
31
32impl CompletableTask {
33 pub fn new() -> Self {
34 Self {
35 inner: Arc::new(Mutex::new(CompletableTaskInner {
36 result: None,
37 waker: None,
38 })),
39 }
40 }
41
42 pub fn complete(&self, result: Option<String>) {
44 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
45 inner.result = Some(TaskResult::Completed(result));
46 if let Some(waker) = inner.waker.take() {
47 waker.wake();
48 }
49 }
50
51 pub fn fail(&self, details: FailureDetails) {
53 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
54 inner.result = Some(TaskResult::Failed(details));
55 if let Some(waker) = inner.waker.take() {
56 waker.wake();
57 }
58 }
59
60 pub fn is_complete(&self) -> bool {
62 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
63 inner.result.is_some()
64 }
65
66 pub fn is_failed(&self) -> bool {
68 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
69 matches!(inner.result, Some(TaskResult::Failed(_)))
70 }
71
72 pub fn get_result(&self) -> Option<TaskResult> {
74 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
75 inner.result.clone()
76 }
77}
78
79impl Default for CompletableTask {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl Future for CompletableTask {
86 type Output = crate::api::Result<Option<String>>;
87
88 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
90 match &inner.result {
91 Some(TaskResult::Completed(value)) => Poll::Ready(Ok(value.clone())),
92 Some(TaskResult::Failed(details)) => Poll::Ready(Err(DurableTaskError::TaskFailed {
93 message: details.message.clone(),
94 failure_details: Some(details.clone()),
95 })),
96 None => {
97 inner.waker = Some(cx.waker().clone());
98 Poll::Pending
99 }
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use std::task::Waker;
108
109 fn noop_waker() -> Waker {
110 Waker::noop().clone()
111 }
112
113 #[test]
114 fn test_new_task_is_not_complete() {
115 let task = CompletableTask::new();
116 assert!(!task.is_complete());
117 assert!(!task.is_failed());
118 assert!(task.get_result().is_none());
119 }
120
121 #[test]
122 fn test_complete_task() {
123 let task = CompletableTask::new();
124 task.complete(Some("42".to_string()));
125 assert!(task.is_complete());
126 assert!(!task.is_failed());
127 match task.get_result() {
128 Some(TaskResult::Completed(v)) => assert_eq!(v, Some("42".to_string())),
129 _ => panic!("expected Completed"),
130 }
131 }
132
133 #[test]
134 fn test_fail_task() {
135 let task = CompletableTask::new();
136 let details = FailureDetails {
137 message: "boom".to_string(),
138 error_type: "Error".to_string(),
139 stack_trace: None,
140 };
141 task.fail(details);
142 assert!(task.is_complete());
143 assert!(task.is_failed());
144 }
145
146 #[test]
147 fn test_poll_pending_then_ready() {
148 let task = CompletableTask::new();
149 let waker = noop_waker();
150 let mut cx = Context::from_waker(&waker);
151
152 let mut t = task.clone();
153 assert!(Pin::new(&mut t).poll(&mut cx).is_pending());
154
155 task.complete(Some("\"hello\"".to_string()));
156
157 let mut t2 = task.clone();
158 match Pin::new(&mut t2).poll(&mut cx) {
159 Poll::Ready(Ok(v)) => assert_eq!(v, Some("\"hello\"".to_string())),
160 other => panic!("expected Ready(Ok), got {:?}", other),
161 }
162 }
163
164 #[test]
165 fn test_poll_failed() {
166 let task = CompletableTask::new();
167 let details = FailureDetails {
168 message: "oops".to_string(),
169 error_type: "TestError".to_string(),
170 stack_trace: None,
171 };
172 task.fail(details);
173
174 let waker = noop_waker();
175 let mut cx = Context::from_waker(&waker);
176 let mut t = task.clone();
177 match Pin::new(&mut t).poll(&mut cx) {
178 Poll::Ready(Err(DurableTaskError::TaskFailed { message, .. })) => {
179 assert_eq!(message, "oops");
180 }
181 other => panic!("expected Ready(Err(TaskFailed)), got {:?}", other),
182 }
183 }
184
185 #[test]
186 fn test_clone_shares_state() {
187 let task = CompletableTask::new();
188 let clone = task.clone();
189 task.complete(Some("shared".to_string()));
190 assert!(clone.is_complete());
191 }
192}