Skip to main content

dapr_durabletask/task/
completable_task.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll, Waker};
5
6use crate::api::{DurableTaskError, FailureDetails};
7
8/// The result of a completed task.
9#[derive(Debug, Clone)]
10pub enum TaskResult {
11    /// Task completed successfully with an optional JSON-serialised result.
12    Completed(Option<String>),
13    /// Task failed with failure details.
14    Failed(FailureDetails),
15}
16
17struct CompletableTaskInner {
18    result: Option<TaskResult>,
19    waker: Option<Waker>,
20}
21
22/// A task that can be completed by the orchestration executor.
23///
24/// This is the primary awaitable type used by orchestrator functions.
25/// During replay, tasks that already completed return their results immediately.
26/// New tasks suspend execution until completed by the executor.
27#[derive(Clone)]
28pub struct CompletableTask {
29    inner: Arc<Mutex<CompletableTaskInner>>,
30}
31
32impl CompletableTask {
33    pub fn new() -> Self {
34        Self {
35            inner: Arc::new(Mutex::new(CompletableTaskInner {
36                result: None,
37                waker: None,
38            })),
39        }
40    }
41
42    /// Complete the task with a successful result.
43    pub fn complete(&self, result: Option<String>) {
44        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
45        inner.result = Some(TaskResult::Completed(result));
46        if let Some(waker) = inner.waker.take() {
47            waker.wake();
48        }
49    }
50
51    /// Fail the task with failure details.
52    pub fn fail(&self, details: FailureDetails) {
53        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
54        inner.result = Some(TaskResult::Failed(details));
55        if let Some(waker) = inner.waker.take() {
56            waker.wake();
57        }
58    }
59
60    /// Check if the task is complete (success or failure).
61    pub fn is_complete(&self) -> bool {
62        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
63        inner.result.is_some()
64    }
65
66    /// Check if the task failed.
67    pub fn is_failed(&self) -> bool {
68        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
69        matches!(inner.result, Some(TaskResult::Failed(_)))
70    }
71
72    /// Get the result, if complete. Returns `None` if not yet complete.
73    pub fn get_result(&self) -> Option<TaskResult> {
74        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
75        inner.result.clone()
76    }
77}
78
79impl Default for CompletableTask {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl Future for CompletableTask {
86    type Output = crate::api::Result<Option<String>>;
87
88    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
90        match &inner.result {
91            Some(TaskResult::Completed(value)) => Poll::Ready(Ok(value.clone())),
92            Some(TaskResult::Failed(details)) => Poll::Ready(Err(DurableTaskError::TaskFailed {
93                message: details.message.clone(),
94                failure_details: Some(details.clone()),
95            })),
96            None => {
97                inner.waker = Some(cx.waker().clone());
98                Poll::Pending
99            }
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use std::task::Waker;
108
109    fn noop_waker() -> Waker {
110        Waker::noop().clone()
111    }
112
113    #[test]
114    fn test_new_task_is_not_complete() {
115        let task = CompletableTask::new();
116        assert!(!task.is_complete());
117        assert!(!task.is_failed());
118        assert!(task.get_result().is_none());
119    }
120
121    #[test]
122    fn test_complete_task() {
123        let task = CompletableTask::new();
124        task.complete(Some("42".to_string()));
125        assert!(task.is_complete());
126        assert!(!task.is_failed());
127        match task.get_result() {
128            Some(TaskResult::Completed(v)) => assert_eq!(v, Some("42".to_string())),
129            _ => panic!("expected Completed"),
130        }
131    }
132
133    #[test]
134    fn test_fail_task() {
135        let task = CompletableTask::new();
136        let details = FailureDetails {
137            message: "boom".to_string(),
138            error_type: "Error".to_string(),
139            stack_trace: None,
140        };
141        task.fail(details);
142        assert!(task.is_complete());
143        assert!(task.is_failed());
144    }
145
146    #[test]
147    fn test_poll_pending_then_ready() {
148        let task = CompletableTask::new();
149        let waker = noop_waker();
150        let mut cx = Context::from_waker(&waker);
151
152        let mut t = task.clone();
153        assert!(Pin::new(&mut t).poll(&mut cx).is_pending());
154
155        task.complete(Some("\"hello\"".to_string()));
156
157        let mut t2 = task.clone();
158        match Pin::new(&mut t2).poll(&mut cx) {
159            Poll::Ready(Ok(v)) => assert_eq!(v, Some("\"hello\"".to_string())),
160            other => panic!("expected Ready(Ok), got {:?}", other),
161        }
162    }
163
164    #[test]
165    fn test_poll_failed() {
166        let task = CompletableTask::new();
167        let details = FailureDetails {
168            message: "oops".to_string(),
169            error_type: "TestError".to_string(),
170            stack_trace: None,
171        };
172        task.fail(details);
173
174        let waker = noop_waker();
175        let mut cx = Context::from_waker(&waker);
176        let mut t = task.clone();
177        match Pin::new(&mut t).poll(&mut cx) {
178            Poll::Ready(Err(DurableTaskError::TaskFailed { message, .. })) => {
179                assert_eq!(message, "oops");
180            }
181            other => panic!("expected Ready(Err(TaskFailed)), got {:?}", other),
182        }
183    }
184
185    #[test]
186    fn test_clone_shares_state() {
187        let task = CompletableTask::new();
188        let clone = task.clone();
189        task.complete(Some("shared".to_string()));
190        assert!(clone.is_complete());
191    }
192}