Skip to main content

dapr_durabletask/task/
completable_task.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Waker};
6
7use crate::api::{DurableTaskError, FailureDetails};
8
9/// The result of a completed task.
10#[derive(Debug, Clone)]
11pub enum TaskResult {
12    /// Task completed successfully with an optional JSON-serialised result.
13    Completed(Option<String>),
14    /// Task failed with failure details.
15    Failed(FailureDetails),
16}
17
18struct CompletableTaskInner {
19    result: Option<TaskResult>,
20    waker: Option<Waker>,
21    /// `true` if the result came from history replay, `false` if from a
22    /// newly-arrived event. Stand-alone tasks default to `true` so they
23    /// never flip the owning context's replay flag.
24    completed_during_replay: bool,
25    /// Shared `is_replaying` flag of the owning orchestration context, if any.
26    replay_handle: Option<Arc<AtomicBool>>,
27}
28
29/// A task that can be completed by the orchestration executor.
30///
31/// This is the primary awaitable type used by orchestrator functions.
32/// During replay, tasks that already completed return their results immediately.
33/// New tasks suspend execution until completed by the executor.
34#[derive(Clone)]
35pub struct CompletableTask {
36    inner: Arc<Mutex<CompletableTaskInner>>,
37}
38
39impl CompletableTask {
40    pub fn new() -> Self {
41        Self {
42            inner: Arc::new(Mutex::new(CompletableTaskInner {
43                result: None,
44                waker: None,
45                completed_during_replay: true,
46                replay_handle: None,
47            })),
48        }
49    }
50
51    /// Attach the owning context's shared `is_replaying` flag. The task
52    /// clears it on resolution when its result came from a new event.
53    pub(crate) fn set_replay_handle(&self, handle: Arc<AtomicBool>) {
54        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
55        inner.replay_handle = Some(handle);
56    }
57
58    /// Complete the task with a successful result.
59    pub fn complete(&self, result: Option<String>) {
60        self.complete_with_phase(result, true);
61    }
62
63    /// Complete the task, tagging whether the value came from history replay
64    /// or from a newly-arrived event.
65    pub(crate) fn complete_with_phase(&self, result: Option<String>, during_replay: bool) {
66        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
67        inner.result = Some(TaskResult::Completed(result));
68        inner.completed_during_replay = during_replay;
69        if let Some(waker) = inner.waker.take() {
70            waker.wake();
71        }
72    }
73
74    /// Fail the task with failure details.
75    pub fn fail(&self, details: FailureDetails) {
76        self.fail_with_phase(details, true);
77    }
78
79    /// Fail the task, tagging whether the failure came from history replay
80    /// or from a newly-arrived event.
81    pub(crate) fn fail_with_phase(&self, details: FailureDetails, during_replay: bool) {
82        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
83        inner.result = Some(TaskResult::Failed(details));
84        inner.completed_during_replay = during_replay;
85        if let Some(waker) = inner.waker.take() {
86            waker.wake();
87        }
88    }
89
90    /// Check if the task is complete (success or failure).
91    pub fn is_complete(&self) -> bool {
92        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
93        inner.result.is_some()
94    }
95
96    /// Check if the task failed.
97    pub fn is_failed(&self) -> bool {
98        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
99        matches!(inner.result, Some(TaskResult::Failed(_)))
100    }
101
102    /// Get the result, if complete. Returns `None` if not yet complete.
103    pub fn get_result(&self) -> Option<TaskResult> {
104        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
105        inner.result.clone()
106    }
107}
108
109impl Default for CompletableTask {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl Future for CompletableTask {
116    type Output = crate::api::Result<Option<String>>;
117
118    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
119        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
120        match &inner.result {
121            Some(TaskResult::Completed(value)) => {
122                let value = value.clone();
123                if !inner.completed_during_replay
124                    && let Some(handle) = inner.replay_handle.as_ref()
125                {
126                    handle.store(false, Ordering::Release);
127                }
128                Poll::Ready(Ok(value))
129            }
130            Some(TaskResult::Failed(details)) => {
131                let details = details.clone();
132                if !inner.completed_during_replay
133                    && let Some(handle) = inner.replay_handle.as_ref()
134                {
135                    handle.store(false, Ordering::Release);
136                }
137                Poll::Ready(Err(DurableTaskError::TaskFailed {
138                    message: details.message.clone(),
139                    failure_details: Some(details),
140                }))
141            }
142            None => {
143                inner.waker = Some(cx.waker().clone());
144                Poll::Pending
145            }
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use std::task::Waker;
154
155    fn noop_waker() -> Waker {
156        Waker::noop().clone()
157    }
158
159    #[test]
160    fn test_new_task_is_not_complete() {
161        let task = CompletableTask::new();
162        assert!(!task.is_complete());
163        assert!(!task.is_failed());
164        assert!(task.get_result().is_none());
165    }
166
167    #[test]
168    fn test_complete_task() {
169        let task = CompletableTask::new();
170        task.complete(Some("42".to_string()));
171        assert!(task.is_complete());
172        assert!(!task.is_failed());
173        match task.get_result() {
174            Some(TaskResult::Completed(v)) => assert_eq!(v, Some("42".to_string())),
175            _ => panic!("expected Completed"),
176        }
177    }
178
179    #[test]
180    fn test_fail_task() {
181        let task = CompletableTask::new();
182        let details = FailureDetails {
183            message: "boom".to_string(),
184            error_type: "Error".to_string(),
185            stack_trace: None,
186        };
187        task.fail(details);
188        assert!(task.is_complete());
189        assert!(task.is_failed());
190    }
191
192    #[test]
193    fn test_poll_pending_then_ready() {
194        let task = CompletableTask::new();
195        let waker = noop_waker();
196        let mut cx = Context::from_waker(&waker);
197
198        let mut t = task.clone();
199        assert!(Pin::new(&mut t).poll(&mut cx).is_pending());
200
201        task.complete(Some("\"hello\"".to_string()));
202
203        let mut t2 = task.clone();
204        match Pin::new(&mut t2).poll(&mut cx) {
205            Poll::Ready(Ok(v)) => assert_eq!(v, Some("\"hello\"".to_string())),
206            other => panic!("expected Ready(Ok), got {other:?}"),
207        }
208    }
209
210    #[test]
211    fn test_poll_failed() {
212        let task = CompletableTask::new();
213        let details = FailureDetails {
214            message: "oops".to_string(),
215            error_type: "TestError".to_string(),
216            stack_trace: None,
217        };
218        task.fail(details);
219
220        let waker = noop_waker();
221        let mut cx = Context::from_waker(&waker);
222        let mut t = task.clone();
223        match Pin::new(&mut t).poll(&mut cx) {
224            Poll::Ready(Err(DurableTaskError::TaskFailed { message, .. })) => {
225                assert_eq!(message, "oops");
226            }
227            other => panic!("expected Ready(Err(TaskFailed)), got {other:?}"),
228        }
229    }
230
231    #[test]
232    fn test_clone_shares_state() {
233        let task = CompletableTask::new();
234        let clone = task.clone();
235        task.complete(Some("shared".to_string()));
236        assert!(clone.is_complete());
237    }
238}