dapr_durabletask/task/
completable_task.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Waker};
6
7use crate::api::{DurableTaskError, FailureDetails};
8
9#[derive(Debug, Clone)]
11pub enum TaskResult {
12 Completed(Option<String>),
14 Failed(FailureDetails),
16}
17
18struct CompletableTaskInner {
19 result: Option<TaskResult>,
20 waker: Option<Waker>,
21 completed_during_replay: bool,
25 replay_handle: Option<Arc<AtomicBool>>,
27}
28
29#[derive(Clone)]
35pub struct CompletableTask {
36 inner: Arc<Mutex<CompletableTaskInner>>,
37}
38
39impl CompletableTask {
40 pub fn new() -> Self {
41 Self {
42 inner: Arc::new(Mutex::new(CompletableTaskInner {
43 result: None,
44 waker: None,
45 completed_during_replay: true,
46 replay_handle: None,
47 })),
48 }
49 }
50
51 pub(crate) fn set_replay_handle(&self, handle: Arc<AtomicBool>) {
54 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
55 inner.replay_handle = Some(handle);
56 }
57
58 pub fn complete(&self, result: Option<String>) {
60 self.complete_with_phase(result, true);
61 }
62
63 pub(crate) fn complete_with_phase(&self, result: Option<String>, during_replay: bool) {
66 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
67 inner.result = Some(TaskResult::Completed(result));
68 inner.completed_during_replay = during_replay;
69 if let Some(waker) = inner.waker.take() {
70 waker.wake();
71 }
72 }
73
74 pub fn fail(&self, details: FailureDetails) {
76 self.fail_with_phase(details, true);
77 }
78
79 pub(crate) fn fail_with_phase(&self, details: FailureDetails, during_replay: bool) {
82 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
83 inner.result = Some(TaskResult::Failed(details));
84 inner.completed_during_replay = during_replay;
85 if let Some(waker) = inner.waker.take() {
86 waker.wake();
87 }
88 }
89
90 pub fn is_complete(&self) -> bool {
92 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
93 inner.result.is_some()
94 }
95
96 pub fn is_failed(&self) -> bool {
98 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
99 matches!(inner.result, Some(TaskResult::Failed(_)))
100 }
101
102 pub fn get_result(&self) -> Option<TaskResult> {
104 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
105 inner.result.clone()
106 }
107}
108
109impl Default for CompletableTask {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl Future for CompletableTask {
116 type Output = crate::api::Result<Option<String>>;
117
118 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
120 match &inner.result {
121 Some(TaskResult::Completed(value)) => {
122 let value = value.clone();
123 if !inner.completed_during_replay
124 && let Some(handle) = inner.replay_handle.as_ref()
125 {
126 handle.store(false, Ordering::Release);
127 }
128 Poll::Ready(Ok(value))
129 }
130 Some(TaskResult::Failed(details)) => {
131 let details = details.clone();
132 if !inner.completed_during_replay
133 && let Some(handle) = inner.replay_handle.as_ref()
134 {
135 handle.store(false, Ordering::Release);
136 }
137 Poll::Ready(Err(DurableTaskError::TaskFailed {
138 message: details.message.clone(),
139 failure_details: Some(details),
140 }))
141 }
142 None => {
143 inner.waker = Some(cx.waker().clone());
144 Poll::Pending
145 }
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use std::task::Waker;
154
155 fn noop_waker() -> Waker {
156 Waker::noop().clone()
157 }
158
159 #[test]
160 fn test_new_task_is_not_complete() {
161 let task = CompletableTask::new();
162 assert!(!task.is_complete());
163 assert!(!task.is_failed());
164 assert!(task.get_result().is_none());
165 }
166
167 #[test]
168 fn test_complete_task() {
169 let task = CompletableTask::new();
170 task.complete(Some("42".to_string()));
171 assert!(task.is_complete());
172 assert!(!task.is_failed());
173 match task.get_result() {
174 Some(TaskResult::Completed(v)) => assert_eq!(v, Some("42".to_string())),
175 _ => panic!("expected Completed"),
176 }
177 }
178
179 #[test]
180 fn test_fail_task() {
181 let task = CompletableTask::new();
182 let details = FailureDetails {
183 message: "boom".to_string(),
184 error_type: "Error".to_string(),
185 stack_trace: None,
186 };
187 task.fail(details);
188 assert!(task.is_complete());
189 assert!(task.is_failed());
190 }
191
192 #[test]
193 fn test_poll_pending_then_ready() {
194 let task = CompletableTask::new();
195 let waker = noop_waker();
196 let mut cx = Context::from_waker(&waker);
197
198 let mut t = task.clone();
199 assert!(Pin::new(&mut t).poll(&mut cx).is_pending());
200
201 task.complete(Some("\"hello\"".to_string()));
202
203 let mut t2 = task.clone();
204 match Pin::new(&mut t2).poll(&mut cx) {
205 Poll::Ready(Ok(v)) => assert_eq!(v, Some("\"hello\"".to_string())),
206 other => panic!("expected Ready(Ok), got {other:?}"),
207 }
208 }
209
210 #[test]
211 fn test_poll_failed() {
212 let task = CompletableTask::new();
213 let details = FailureDetails {
214 message: "oops".to_string(),
215 error_type: "TestError".to_string(),
216 stack_trace: None,
217 };
218 task.fail(details);
219
220 let waker = noop_waker();
221 let mut cx = Context::from_waker(&waker);
222 let mut t = task.clone();
223 match Pin::new(&mut t).poll(&mut cx) {
224 Poll::Ready(Err(DurableTaskError::TaskFailed { message, .. })) => {
225 assert_eq!(message, "oops");
226 }
227 other => panic!("expected Ready(Err(TaskFailed)), got {other:?}"),
228 }
229 }
230
231 #[test]
232 fn test_clone_shares_state() {
233 let task = CompletableTask::new();
234 let clone = task.clone();
235 task.complete(Some("shared".to_string()));
236 assert!(clone.is_complete());
237 }
238}