Skip to main content

dapr_durabletask/task/
orchestration_context.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, LazyLock, Mutex, MutexGuard};
4
5use futures::future::BoxFuture;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9use crate::api::{
10    DurableTaskError, ExternalEventResult, FailureDetails, HistoryPropagationScope,
11    OrchestrationStatus, PropagatedHistory, RetryPolicy,
12};
13use crate::internal::{to_json, to_timestamp};
14use crate::proto;
15
16use super::completable_task::CompletableTask;
17use super::options::{ActivityOptions, SubOrchestratorOptions};
18
19/// Patch gate for replay-safe external-event timers.
20const EXTERNAL_EVENT_TIMER_PATCH: &str = "dapr:external-event-timer";
21
22/// Sentinel timestamp for indefinite event waits.
23static FAR_FUTURE_TIMESTAMP: LazyLock<chrono::DateTime<chrono::Utc>> = LazyLock::new(|| {
24    chrono::NaiveDate::from_ymd_opt(9999, 12, 31)
25        .unwrap()
26        .and_hms_opt(23, 59, 59)
27        .unwrap()
28        .and_utc()
29});
30
31pub(crate) fn lock_inner<T>(m: &Mutex<T>) -> MutexGuard<'_, T> {
32    m.lock().unwrap_or_else(|e| e.into_inner())
33}
34
35#[derive(Debug)]
36pub(crate) struct ContextConfig {
37    pub(crate) max_event_names: usize,
38    pub(crate) max_events_per_name: usize,
39    pub(crate) max_pending_tasks_per_name: usize,
40    pub(crate) max_json_payload_size: usize,
41}
42
43/// Internal state shared between the context and the orchestration executor.
44pub(crate) struct OrchestrationContextInner {
45    pub(crate) config: Arc<ContextConfig>,
46    pub(crate) instance_id: Arc<str>,
47    pub(crate) current_utc_datetime: chrono::DateTime<chrono::Utc>,
48    pub(crate) is_replaying: Arc<AtomicBool>,
49    pub(crate) is_complete: bool,
50    pub(crate) input: Option<String>,
51    pub(crate) name: Arc<str>,
52    pub(crate) custom_status: Option<String>,
53    pub(crate) sequence_number: i32,
54    pub(crate) pending_tasks: HashMap<i32, CompletableTask>,
55    pub(crate) pending_event_tasks: HashMap<String, VecDeque<CompletableTask>>,
56    /// Events buffered while no waiter exists. The `bool` records whether
57    /// the originating `EventRaised` event was applied during replay.
58    pub(crate) buffered_events: HashMap<String, VecDeque<(Option<String>, bool)>>,
59    pub(crate) pending_actions: Vec<proto::WorkflowAction>,
60    pub(crate) completion_status: Option<OrchestrationStatus>,
61    pub(crate) completion_result: Option<String>,
62    pub(crate) completion_failure: Option<FailureDetails>,
63    pub(crate) continue_as_new_input: Option<String>,
64    pub(crate) save_events_on_continue: bool,
65    pub(crate) is_suspended: bool,
66    /// Patches recorded as applied in the orchestration history (from `WorkflowStarted` events).
67    pub(crate) history_patches: std::collections::HashSet<String>,
68    /// Cache of patch decisions made during the current execution.
69    pub(crate) applied_patches: HashMap<String, bool>,
70    /// Number of sequence-consuming scheduled actions recorded in history
71    /// (TaskScheduled + TimerCreated + ChildWorkflowInstanceCreated).
72    /// Used to determine whether `is_patched` is called mid-history or at the frontier.
73    pub(crate) history_scheduled_count: i32,
74    /// History forwarded from the parent workflow (if any). Populated from
75    /// the `WorkflowRequest.propagated_history` field.
76    pub(crate) propagated_history: Option<Arc<PropagatedHistory>>,
77}
78
79/// The orchestration context provided to orchestrator functions.
80///
81/// All methods are safe to call from async code. The context is cloneable
82/// and thread-safe (`Send + Sync`), backed by `Arc<Mutex<>>`.
83#[derive(Clone)]
84pub struct OrchestrationContext {
85    pub(crate) inner: Arc<Mutex<OrchestrationContextInner>>,
86}
87
88impl OrchestrationContext {
89    /// Create a new orchestration context with the given parameters.
90    pub(crate) fn new(
91        instance_id: String,
92        name: String,
93        input: Option<String>,
94        current_utc_datetime: chrono::DateTime<chrono::Utc>,
95        is_replaying: bool,
96        options: &crate::worker::WorkerOptions,
97        event_count_hint: usize,
98    ) -> Self {
99        let config = Arc::new(ContextConfig {
100            max_event_names: options.max_event_names,
101            max_events_per_name: options.max_events_per_name,
102            max_pending_tasks_per_name: options.max_pending_tasks_per_name,
103            max_json_payload_size: options.max_json_payload_size,
104        });
105
106        Self {
107            inner: Arc::new(Mutex::new(OrchestrationContextInner {
108                config,
109                instance_id: Arc::<str>::from(instance_id),
110                current_utc_datetime,
111                is_replaying: Arc::new(AtomicBool::new(is_replaying)),
112                is_complete: false,
113                input,
114                name: Arc::<str>::from(name),
115                custom_status: None,
116                sequence_number: 0,
117                pending_tasks: HashMap::with_capacity(event_count_hint / 2),
118                pending_event_tasks: HashMap::new(),
119                buffered_events: HashMap::new(),
120                pending_actions: Vec::with_capacity(event_count_hint / 2),
121                completion_status: None,
122                completion_result: None,
123                completion_failure: None,
124                continue_as_new_input: None,
125                save_events_on_continue: false,
126                is_suspended: false,
127                history_patches: std::collections::HashSet::new(),
128                applied_patches: HashMap::new(),
129                history_scheduled_count: 0,
130                propagated_history: None,
131            })),
132        }
133    }
134
135    /// Get the instance ID.
136    pub fn instance_id(&self) -> Arc<str> {
137        lock_inner(&self.inner).instance_id.clone()
138    }
139
140    /// Get the current UTC datetime (deterministic, from history events).
141    pub fn current_utc_datetime(&self) -> chrono::DateTime<chrono::Utc> {
142        lock_inner(&self.inner).current_utc_datetime
143    }
144
145    /// Check if the orchestrator is currently replaying.
146    pub fn is_replaying(&self) -> bool {
147        lock_inner(&self.inner).is_replaying.load(Ordering::Acquire)
148    }
149
150    /// Get the orchestration name.
151    pub fn name(&self) -> Arc<str> {
152        lock_inner(&self.inner).name.clone()
153    }
154
155    /// Get the orchestration input, deserialised from JSON.
156    pub fn input<T: DeserializeOwned>(&self) -> crate::api::Result<T> {
157        let inner = lock_inner(&self.inner);
158        crate::internal::from_json(inner.input.as_deref(), inner.config.max_json_payload_size)
159    }
160
161    /// Returns history forwarded from the parent workflow, if the parent
162    /// scheduled this child with a non-`None` history propagation scope.
163    ///
164    /// See [`HistoryPropagationScope`] for the parent-side trade-off between
165    /// `OwnHistory` and `Lineage`.
166    pub fn propagated_history(&self) -> Option<Arc<PropagatedHistory>> {
167        lock_inner(&self.inner).propagated_history.clone()
168    }
169
170    /// Set a custom status string.
171    pub fn set_custom_status(&self, status: impl Into<String>) {
172        let mut inner = lock_inner(&self.inner);
173        inner.custom_status = Some(status.into());
174    }
175
176    /// Schedule an activity for execution.
177    ///
178    /// Returns a [`CompletableTask`] that resolves when the activity completes.
179    ///
180    /// During replay: if the corresponding `TaskCompleted`/`TaskFailed` event
181    /// exists in history, the task will already be complete.
182    /// During new execution: creates a `ScheduleTaskAction`.
183    pub fn call_activity(&self, name: &str, input: impl Serialize) -> CompletableTask {
184        tracing::debug!(activity = %name, "Scheduling activity");
185        self.call_activity_inner(name, input, None, None)
186    }
187
188    /// Schedule an activity with an `app_id` for cross-app scenarios.
189    pub fn call_activity_with_app_id(
190        &self,
191        name: &str,
192        input: impl Serialize,
193        app_id: &str,
194    ) -> CompletableTask {
195        tracing::debug!(activity = %name, app_id = %app_id, "Scheduling activity with app_id");
196        self.call_activity_inner(name, input, Some(app_id), None)
197    }
198
199    fn call_activity_inner(
200        &self,
201        name: &str,
202        input: impl Serialize,
203        app_id: Option<&str>,
204        history_propagation_scope: Option<HistoryPropagationScope>,
205    ) -> CompletableTask {
206        let input_json = match to_json(&input) {
207            Ok(json) => json,
208            Err(e) => {
209                let task = CompletableTask::new();
210                task.fail(FailureDetails {
211                    message: format!("Failed to serialize activity input: {e}"),
212                    error_type: "SerializationError".to_string(),
213                    stack_trace: None,
214                });
215                return task;
216            }
217        };
218        self.call_activity_raw(name, input_json, app_id, history_propagation_scope)
219    }
220
221    /// Internal: schedule an activity using a pre-serialised JSON input.
222    fn call_activity_raw(
223        &self,
224        name: &str,
225        input_json: Option<String>,
226        app_id: Option<&str>,
227        history_propagation_scope: Option<HistoryPropagationScope>,
228    ) -> CompletableTask {
229        let mut inner = lock_inner(&self.inner);
230        let seq = inner.sequence_number;
231        inner.sequence_number += 1;
232
233        if let Some(existing) = inner.pending_tasks.get(&seq)
234            && existing.is_complete()
235        {
236            return existing.clone();
237        }
238
239        let task = CompletableTask::new();
240        task.set_replay_handle(inner.is_replaying.clone());
241        inner.pending_tasks.insert(seq, task.clone());
242
243        let router = app_id.map(|id| proto::TaskRouter {
244            source_app_id: String::new(),
245            target_app_id: Some(id.to_string()),
246            target_app_namespace: None,
247        });
248        let action = proto::WorkflowAction {
249            id: seq,
250            router: None,
251            workflow_action_type: Some(proto::workflow_action::WorkflowActionType::ScheduleTask(
252                proto::ScheduleTaskAction {
253                    name: name.to_string(),
254                    version: None,
255                    input: input_json,
256                    router,
257                    task_execution_id: String::new(),
258                    history_propagation_scope: history_propagation_scope
259                        .map(|s| s.to_proto() as i32),
260                },
261            )),
262        };
263        inner.pending_actions.push(action);
264
265        task
266    }
267
268    /// Schedule an activity with options (retry policy, app ID).
269    ///
270    /// Returns a future that drives the activity to completion, transparently
271    /// scheduling durable timers and re-issuing the activity on each retry.
272    pub fn call_activity_with_options(
273        &self,
274        name: &str,
275        input: impl Serialize,
276        options: ActivityOptions,
277    ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
278    {
279        let input_json = to_json(&input);
280        let name = name.to_string();
281        let app_id = options.app_id.clone();
282        let scope = options.history_propagation_scope;
283        let ctx = self.clone();
284
285        async move {
286            let input_json = input_json?;
287            match options.retry_policy {
288                Some(policy) => {
289                    let first_attempt_time = ctx.current_utc_datetime();
290                    let schedule: Arc<
291                        dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
292                    > = Arc::new(move |c: &OrchestrationContext| {
293                        c.call_activity_raw(&name, input_json.clone(), app_id.as_deref(), scope)
294                    });
295                    call_with_retry(ctx, schedule, policy, first_attempt_time).await
296                }
297                None => {
298                    ctx.call_activity_raw(&name, input_json, app_id.as_deref(), scope)
299                        .await
300                }
301            }
302        }
303    }
304
305    /// Schedule a sub-orchestration for execution.
306    pub fn call_sub_orchestrator(
307        &self,
308        name: &str,
309        input: impl Serialize,
310        instance_id: Option<&str>,
311    ) -> CompletableTask {
312        tracing::debug!(
313            sub_orchestrator = %name,
314            sub_instance_id = ?instance_id,
315            "Scheduling sub-orchestration"
316        );
317        self.call_sub_orchestrator_inner(name, input, instance_id, None, None)
318    }
319
320    /// Schedule a sub-orchestration targeting a specific Dapr app ID.
321    pub fn call_sub_orchestrator_with_app_id(
322        &self,
323        name: &str,
324        input: impl Serialize,
325        instance_id: Option<&str>,
326        app_id: &str,
327    ) -> CompletableTask {
328        tracing::debug!(
329            sub_orchestrator = %name,
330            sub_instance_id = ?instance_id,
331            app_id = %app_id,
332            "Scheduling sub-orchestration with app_id"
333        );
334        self.call_sub_orchestrator_inner(name, input, instance_id, Some(app_id), None)
335    }
336
337    fn call_sub_orchestrator_inner(
338        &self,
339        name: &str,
340        input: impl Serialize,
341        instance_id: Option<&str>,
342        app_id: Option<&str>,
343        history_propagation_scope: Option<HistoryPropagationScope>,
344    ) -> CompletableTask {
345        let input_json = match to_json(&input) {
346            Ok(json) => json,
347            Err(e) => {
348                let task = CompletableTask::new();
349                task.fail(FailureDetails {
350                    message: format!("Failed to serialize sub-orchestrator input: {e}"),
351                    error_type: "SerializationError".to_string(),
352                    stack_trace: None,
353                });
354                return task;
355            }
356        };
357        self.call_sub_orchestrator_raw(
358            name,
359            input_json,
360            instance_id,
361            app_id,
362            history_propagation_scope,
363        )
364    }
365
366    /// Internal: schedule a sub-orchestration using a pre-serialised JSON input.
367    fn call_sub_orchestrator_raw(
368        &self,
369        name: &str,
370        input_json: Option<String>,
371        instance_id: Option<&str>,
372        app_id: Option<&str>,
373        history_propagation_scope: Option<HistoryPropagationScope>,
374    ) -> CompletableTask {
375        let mut inner = lock_inner(&self.inner);
376        let seq = inner.sequence_number;
377        inner.sequence_number += 1;
378
379        if let Some(existing) = inner.pending_tasks.get(&seq)
380            && existing.is_complete()
381        {
382            return existing.clone();
383        }
384
385        let task = CompletableTask::new();
386        task.set_replay_handle(inner.is_replaying.clone());
387        inner.pending_tasks.insert(seq, task.clone());
388
389        let sub_instance_id = instance_id
390            .map(|s| s.to_string())
391            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
392
393        let router = app_id.map(|id| proto::TaskRouter {
394            source_app_id: String::new(),
395            target_app_id: Some(id.to_string()),
396            target_app_namespace: None,
397        });
398
399        let action = proto::WorkflowAction {
400            id: seq,
401            router: None,
402            workflow_action_type: Some(
403                proto::workflow_action::WorkflowActionType::CreateChildWorkflow(
404                    proto::CreateChildWorkflowAction {
405                        instance_id: sub_instance_id,
406                        name: name.to_string(),
407                        version: None,
408                        input: input_json,
409                        router,
410                        history_propagation_scope: history_propagation_scope
411                            .map(|s| s.to_proto() as i32),
412                    },
413                ),
414            ),
415        };
416        inner.pending_actions.push(action);
417
418        task
419    }
420
421    /// Schedule a sub-orchestration with options (instance ID, retry policy, app ID).
422    ///
423    /// Returns a future that drives the sub-orchestration to completion,
424    /// transparently scheduling durable timers and re-issuing the call on each retry.
425    ///
426    /// Note: when a retry policy is set and no explicit `instance_id` is given,
427    /// each retry uses a freshly generated instance ID.
428    pub fn call_sub_orchestrator_with_options(
429        &self,
430        name: &str,
431        input: impl Serialize,
432        options: SubOrchestratorOptions,
433    ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
434    {
435        let input_json = to_json(&input);
436        let name = name.to_string();
437        let instance_id = options.instance_id.clone();
438        let app_id = options.app_id.clone();
439        let scope = options.history_propagation_scope;
440        let ctx = self.clone();
441
442        async move {
443            let input_json = input_json?;
444            match options.retry_policy {
445                Some(policy) => {
446                    let first_attempt_time = ctx.current_utc_datetime();
447                    let schedule: Arc<
448                        dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
449                    > = Arc::new(move |c: &OrchestrationContext| {
450                        c.call_sub_orchestrator_raw(
451                            &name,
452                            input_json.clone(),
453                            instance_id.as_deref(),
454                            app_id.as_deref(),
455                            scope,
456                        )
457                    });
458                    call_with_retry(ctx, schedule, policy, first_attempt_time).await
459                }
460                None => {
461                    ctx.call_sub_orchestrator_raw(
462                        &name,
463                        input_json,
464                        instance_id.as_deref(),
465                        app_id.as_deref(),
466                        scope,
467                    )
468                    .await
469                }
470            }
471        }
472    }
473
474    /// Create a durable timer that fires after the specified duration.
475    pub fn create_timer(&self, delay: std::time::Duration) -> CompletableTask {
476        tracing::debug!(delay_ms = delay.as_millis() as u64, "Creating timer");
477        let mut inner = lock_inner(&self.inner);
478        let fire_at = inner.current_utc_datetime
479            + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
480        Self::create_timer_with_origin(&mut inner, fire_at, None, None)
481    }
482
483    /// Create a timer action, optionally tagging its origin.
484    fn create_timer_with_origin(
485        inner: &mut OrchestrationContextInner,
486        fire_at: chrono::DateTime<chrono::Utc>,
487        name: Option<String>,
488        origin: Option<proto::create_timer_action::Origin>,
489    ) -> CompletableTask {
490        let seq = inner.sequence_number;
491        inner.sequence_number += 1;
492
493        if let Some(existing) = inner.pending_tasks.get(&seq)
494            && existing.is_complete()
495        {
496            return existing.clone();
497        }
498
499        let task = CompletableTask::new();
500        task.set_replay_handle(inner.is_replaying.clone());
501        inner.pending_tasks.insert(seq, task.clone());
502
503        let action = proto::WorkflowAction {
504            id: seq,
505            router: None,
506            workflow_action_type: Some(proto::workflow_action::WorkflowActionType::CreateTimer(
507                proto::CreateTimerAction {
508                    fire_at: Some(to_timestamp(fire_at)),
509                    name,
510                    origin,
511                },
512            )),
513        };
514        inner.pending_actions.push(action);
515
516        task
517    }
518
519    /// Wait for an external event with the given name.
520    ///
521    /// Event names are case-insensitive.
522    ///
523    /// Patched executions also emit a far-future timer tagged with the event
524    /// name, letting the runtime track the wait.
525    pub fn wait_for_external_event(&self, name: &str) -> CompletableTask {
526        tracing::debug!(event_name = %name, "Waiting for external event");
527        let mut inner = lock_inner(&self.inner);
528        let event_name = name.to_lowercase();
529
530        // Gate timer emission for replay safety.
531        let emit_timer = Self::is_patched_inner(&mut inner, EXTERNAL_EVENT_TIMER_PATCH);
532        if emit_timer {
533            let origin = proto::create_timer_action::Origin::ExternalEvent(
534                proto::TimerOriginExternalEvent {
535                    name: name.to_string(),
536                },
537            );
538            // Use a sentinel timer; this API returns only the event task.
539            let _timer_task = Self::create_timer_with_origin(
540                &mut inner,
541                *FAR_FUTURE_TIMESTAMP,
542                None,
543                Some(origin),
544            );
545        }
546
547        if let Some(events) = inner.buffered_events.get_mut(&event_name)
548            && !events.is_empty()
549        {
550            let (data, during_replay) = events
551                .pop_front()
552                .expect("buffered event queue is not empty");
553            let task = CompletableTask::new();
554            task.set_replay_handle(inner.is_replaying.clone());
555            task.complete_with_phase(data, during_replay);
556            return task;
557        }
558
559        let task = CompletableTask::new();
560        task.set_replay_handle(inner.is_replaying.clone());
561        let max_pending = inner.config.max_pending_tasks_per_name;
562        let pending = inner.pending_event_tasks.entry(event_name).or_default();
563        if pending.len() >= max_pending {
564            tracing::warn!(event_name = %name, "Pending event task limit reached, discarding wait");
565            return task;
566        }
567        pending.push_back(task.clone());
568        task
569    }
570
571    /// Wait for an external event with a timeout.
572    ///
573    /// Returns [`ExternalEventResult::Received`] if the event arrives before
574    /// the timeout, or [`ExternalEventResult::TimedOut`] if the timeout fires
575    /// first.
576    ///
577    /// Always emits a timer tagged with the event name.
578    ///
579    /// Event names are case-insensitive.
580    pub async fn wait_for_external_event_with_timeout(
581        &self,
582        name: &str,
583        timeout: std::time::Duration,
584    ) -> crate::api::Result<ExternalEventResult> {
585        tracing::debug!(
586            event_name = %name,
587            timeout_ms = timeout.as_millis() as u64,
588            "Waiting for external event with timeout"
589        );
590
591        let (event_task, timer_task) = {
592            let mut inner = lock_inner(&self.inner);
593            let event_name = name.to_lowercase();
594
595            // Create the external-event timeout timer.
596            let fire_at = inner.current_utc_datetime
597                + chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
598            let origin = proto::create_timer_action::Origin::ExternalEvent(
599                proto::TimerOriginExternalEvent {
600                    name: name.to_string(),
601                },
602            );
603            let timer_task =
604                Self::create_timer_with_origin(&mut inner, fire_at, None, Some(origin));
605
606            // Register the event wait.
607            let event_task = if let Some(events) = inner.buffered_events.get_mut(&event_name)
608                && !events.is_empty()
609            {
610                let (data, during_replay) = events
611                    .pop_front()
612                    .expect("buffered event queue is not empty");
613                let task = CompletableTask::new();
614                task.set_replay_handle(inner.is_replaying.clone());
615                task.complete_with_phase(data, during_replay);
616                task
617            } else {
618                let task = CompletableTask::new();
619                task.set_replay_handle(inner.is_replaying.clone());
620                let max_pending = inner.config.max_pending_tasks_per_name;
621                let pending = inner.pending_event_tasks.entry(event_name).or_default();
622                if pending.len() >= max_pending {
623                    tracing::warn!(
624                        event_name = %name,
625                        "Pending event task limit reached, discarding wait"
626                    );
627                } else {
628                    pending.push_back(task.clone());
629                }
630                task
631            };
632
633            (event_task, timer_task)
634        };
635
636        // Race the event and timer (0 = event, 1 = timer).
637        let winner = super::when_any::when_any(vec![event_task.clone(), timer_task]).await?;
638        match winner {
639            0 => {
640                let payload = event_task.await?;
641                Ok(ExternalEventResult::Received(payload))
642            }
643            _ => {
644                // Timer won — remove the stale event waiter so it does not
645                // silently consume a later event with the same name.
646                let mut inner = lock_inner(&self.inner);
647                let event_name = name.to_lowercase();
648                if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name) {
649                    tasks.retain(|t| !t.ptr_eq(&event_task));
650                }
651                Ok(ExternalEventResult::TimedOut)
652            }
653        }
654    }
655
656    /// `is_patched` variant for callers that already hold the lock.
657    fn is_patched_inner(inner: &mut OrchestrationContextInner, patch_name: &str) -> bool {
658        if let Some(&cached) = inner.applied_patches.get(patch_name) {
659            return cached;
660        }
661        if inner.history_patches.contains(patch_name) {
662            inner.applied_patches.insert(patch_name.to_string(), true);
663            return true;
664        }
665        if inner.sequence_number < inner.history_scheduled_count {
666            inner.applied_patches.insert(patch_name.to_string(), false);
667            return false;
668        }
669        inner.applied_patches.insert(patch_name.to_string(), true);
670        true
671    }
672
673    /// Continue the orchestration as new with new input.
674    pub fn continue_as_new(&self, input: impl Serialize, save_events: bool) {
675        tracing::debug!(save_events = save_events, "Continuing orchestration as new");
676        let mut inner = lock_inner(&self.inner);
677        inner.continue_as_new_input = to_json(&input).ok().flatten();
678        inner.save_events_on_continue = save_events;
679    }
680
681    /// Check whether a named patch should be applied in the current execution.
682    ///
683    /// This enables safe, deterministic code upgrades. Wrap new behaviour in
684    /// `if ctx.is_patched("my-patch")` to ensure that:
685    ///
686    /// - Replaying executions that previously ran the *unpatched* path continue
687    ///   on the unpatched path (preserving determinism).
688    /// - Executions that previously ran the *patched* path continue on the
689    ///   patched path.
690    /// - Brand-new executions (at the history frontier) always take the patched
691    ///   path.
692    ///
693    /// This matches the behaviour of the Go and Python SDKs.
694    pub fn is_patched(&self, patch_name: &str) -> bool {
695        let mut inner = lock_inner(&self.inner);
696
697        // Return the cached decision from the current execution if available.
698        if let Some(&cached) = inner.applied_patches.get(patch_name) {
699            return cached;
700        }
701
702        // If this patch was recorded as applied in the history, honour it.
703        if inner.history_patches.contains(patch_name) {
704            inner.applied_patches.insert(patch_name.to_string(), true);
705            return true;
706        }
707
708        // If the orchestrator hasn't yet consumed all scheduled actions from
709        // history, this call is mid-replay.  The previous execution did NOT
710        // apply this patch, so we must stay on the unpatched path to preserve
711        // determinism.
712        if inner.sequence_number < inner.history_scheduled_count {
713            inner.applied_patches.insert(patch_name.to_string(), false);
714            return false;
715        }
716
717        // We're at (or past) the history frontier — apply the patch.
718        inner.applied_patches.insert(patch_name.to_string(), true);
719        true
720    }
721}
722
723// ── Retry helpers ─────────────────────────────────────────────────────────────
724
725/// Compute the delay before the next retry attempt, or `None` if the retry
726/// should not proceed (timeout exceeded or predicate returned false).
727fn compute_retry_delay(
728    policy: &RetryPolicy,
729    attempt: u32,
730    first_attempt_time: chrono::DateTime<chrono::Utc>,
731    current_time: chrono::DateTime<chrono::Utc>,
732    details: &FailureDetails,
733) -> Option<std::time::Duration> {
734    // Check custom predicate.
735    if let Some(ref handle) = policy.handle
736        && !handle(details)
737    {
738        return None;
739    }
740
741    // Check overall retry timeout.
742    if let Some(timeout) = policy.retry_timeout {
743        let elapsed = current_time - first_attempt_time;
744        let timeout_dur = chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
745        if elapsed >= timeout_dur {
746            return None;
747        }
748    }
749
750    // Exponential backoff.
751    let first_ms = policy.first_retry_interval.as_millis() as f64;
752    let next_ms = first_ms * policy.backoff_coefficient.powi(attempt as i32);
753
754    let delay_ms = if let Some(max) = policy.max_retry_interval {
755        next_ms.min(max.as_millis() as f64)
756    } else {
757        next_ms
758    };
759
760    Some(std::time::Duration::from_millis(delay_ms as u64))
761}
762
763/// Drive a task to completion, retrying on failure according to `policy`.
764///
765/// `schedule` is called once per attempt and must return a fresh [`CompletableTask`].
766/// Between attempts a durable timer is created for the computed backoff delay,
767/// preserving determinism across replays.
768fn call_with_retry(
769    ctx: OrchestrationContext,
770    schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync>,
771    policy: RetryPolicy,
772    first_attempt_time: chrono::DateTime<chrono::Utc>,
773) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
774    Box::pin(async move {
775        let mut attempt = 0;
776        loop {
777            let task = schedule(&ctx);
778            match task.await {
779                Ok(v) => return Ok(v),
780                Err(DurableTaskError::TaskFailed {
781                    message,
782                    failure_details,
783                }) => {
784                    let details = failure_details.clone().unwrap_or_else(|| FailureDetails {
785                        message: message.clone(),
786                        error_type: "TaskFailed".to_string(),
787                        stack_trace: None,
788                    });
789
790                    if attempt + 1 >= policy.max_number_of_attempts {
791                        tracing::debug!(
792                            attempt,
793                            max = policy.max_number_of_attempts,
794                            "Max retry attempts reached"
795                        );
796                        return Err(DurableTaskError::TaskFailed {
797                            message,
798                            failure_details,
799                        });
800                    }
801
802                    let current_time = ctx.current_utc_datetime();
803                    let delay = match compute_retry_delay(
804                        &policy,
805                        attempt,
806                        first_attempt_time,
807                        current_time,
808                        &details,
809                    ) {
810                        Some(d) => d,
811                        None => {
812                            tracing::debug!(attempt, "Retry predicate or timeout prevented retry");
813                            return Err(DurableTaskError::TaskFailed {
814                                message,
815                                failure_details,
816                            });
817                        }
818                    };
819
820                    tracing::debug!(
821                        attempt,
822                        delay_ms = delay.as_millis(),
823                        "Scheduling retry timer"
824                    );
825                    ctx.create_timer(delay).await?;
826                    attempt += 1;
827                }
828                Err(e) => return Err(e),
829            }
830        }
831    })
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837    use chrono::Datelike;
838
839    fn make_ctx() -> OrchestrationContext {
840        OrchestrationContext::new(
841            "inst-1".to_string(),
842            "my_orch".to_string(),
843            Some("\"hello\"".to_string()),
844            chrono::Utc::now(),
845            false,
846            &crate::worker::WorkerOptions::default(),
847            0,
848        )
849    }
850
851    #[test]
852    fn test_basic_accessors() {
853        let ctx = make_ctx();
854        assert_eq!(ctx.instance_id().as_ref(), "inst-1");
855        assert_eq!(ctx.name().as_ref(), "my_orch");
856        assert!(!ctx.is_replaying());
857    }
858
859    #[test]
860    fn test_input() {
861        let ctx = make_ctx();
862        let input: String = ctx.input().unwrap();
863        assert_eq!(input, "hello");
864    }
865
866    #[test]
867    fn test_set_custom_status() {
868        let ctx = make_ctx();
869        ctx.set_custom_status("processing");
870        let inner = ctx.inner.lock().unwrap();
871        assert_eq!(inner.custom_status, Some("processing".to_string()));
872    }
873
874    #[test]
875    fn test_call_activity_creates_action() {
876        let ctx = make_ctx();
877        let _task = ctx.call_activity("greet", "world");
878
879        let inner = ctx.inner.lock().unwrap();
880        assert_eq!(inner.sequence_number, 1);
881        assert_eq!(inner.pending_actions.len(), 1);
882        assert_eq!(inner.pending_actions[0].id, 0);
883        match &inner.pending_actions[0].workflow_action_type {
884            Some(proto::workflow_action::WorkflowActionType::ScheduleTask(a)) => {
885                assert_eq!(a.name, "greet");
886                assert_eq!(a.input, Some("\"world\"".to_string()));
887            }
888            _ => panic!("expected ScheduleTask action"),
889        }
890    }
891
892    #[test]
893    fn test_call_activity_replay_returns_existing() {
894        let ctx = make_ctx();
895
896        // Pre-populate a completed task at sequence 0 (simulating replay)
897        {
898            let mut inner = ctx.inner.lock().unwrap();
899            let task = CompletableTask::new();
900            task.complete(Some("42".to_string()));
901            inner.pending_tasks.insert(0, task);
902        }
903
904        let task = ctx.call_activity("greet", "world");
905        assert!(task.is_complete());
906
907        let inner = ctx.inner.lock().unwrap();
908        assert_eq!(inner.pending_actions.len(), 0);
909    }
910
911    #[test]
912    fn test_call_sub_orchestrator() {
913        let ctx = make_ctx();
914        let _task = ctx.call_sub_orchestrator("child_orch", "input", Some("child-1"));
915
916        let inner = ctx.inner.lock().unwrap();
917        assert_eq!(inner.sequence_number, 1);
918        match &inner.pending_actions[0].workflow_action_type {
919            Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
920                assert_eq!(a.name, "child_orch");
921                assert_eq!(a.instance_id, "child-1");
922            }
923            _ => panic!("expected CreateChildWorkflow action"),
924        }
925    }
926
927    #[test]
928    fn test_create_timer() {
929        let ctx = make_ctx();
930        let _task = ctx.create_timer(std::time::Duration::from_secs(60));
931
932        let inner = ctx.inner.lock().unwrap();
933        assert_eq!(inner.sequence_number, 1);
934        match &inner.pending_actions[0].workflow_action_type {
935            Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => {
936                assert!(a.fire_at.is_some());
937            }
938            _ => panic!("expected CreateTimer action"),
939        }
940    }
941
942    #[test]
943    fn test_wait_for_external_event_buffered() {
944        let ctx = make_ctx();
945
946        // Buffer an event
947        {
948            let mut inner = ctx.inner.lock().unwrap();
949            inner
950                .buffered_events
951                .entry("approval".to_string())
952                .or_default()
953                .push_back((Some("\"yes\"".to_string()), true));
954        }
955
956        let task = ctx.wait_for_external_event("APPROVAL"); // case-insensitive
957        assert!(task.is_complete());
958    }
959
960    #[test]
961    fn test_wait_for_external_event_pending() {
962        let ctx = make_ctx();
963        let task = ctx.wait_for_external_event("approval");
964        assert!(!task.is_complete());
965
966        let inner = ctx.inner.lock().unwrap();
967        assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
968    }
969
970    #[test]
971    fn test_continue_as_new() {
972        let ctx = make_ctx();
973        ctx.continue_as_new("new_input", true);
974
975        let inner = ctx.inner.lock().unwrap();
976        assert_eq!(
977            inner.continue_as_new_input,
978            Some("\"new_input\"".to_string())
979        );
980        assert!(inner.save_events_on_continue);
981    }
982
983    #[test]
984    fn test_sequence_numbers_increment() {
985        let ctx = make_ctx();
986        let _t1 = ctx.call_activity("a", ());
987        let _t2 = ctx.call_activity("b", ());
988        let _t3 = ctx.create_timer(std::time::Duration::from_secs(1));
989
990        let inner = ctx.inner.lock().unwrap();
991        assert_eq!(inner.sequence_number, 3);
992        assert_eq!(inner.pending_actions[0].id, 0);
993        assert_eq!(inner.pending_actions[1].id, 1);
994        assert_eq!(inner.pending_actions[2].id, 2);
995    }
996
997    #[test]
998    fn test_call_sub_orchestrator_with_app_id() {
999        let ctx = make_ctx();
1000        let _task = ctx.call_sub_orchestrator_with_app_id(
1001            "child_orch",
1002            "input",
1003            Some("child-1"),
1004            "other-app",
1005        );
1006
1007        let inner = ctx.inner.lock().unwrap();
1008        assert_eq!(inner.sequence_number, 1);
1009        match &inner.pending_actions[0].workflow_action_type {
1010            Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
1011                assert_eq!(a.name, "child_orch");
1012                assert_eq!(a.instance_id, "child-1");
1013                let router = a.router.as_ref().expect("expected router");
1014                assert_eq!(router.target_app_id, Some("other-app".to_string()));
1015            }
1016            _ => panic!("expected CreateChildWorkflow action"),
1017        }
1018    }
1019
1020    #[test]
1021    fn test_is_patched_new_execution_returns_true() {
1022        // No history → always at the frontier → patch applies.
1023        let ctx = make_ctx();
1024        assert!(ctx.is_patched("my-patch"));
1025    }
1026
1027    #[test]
1028    fn test_is_patched_in_history_returns_true() {
1029        // Patch recorded in history → return true.
1030        let ctx = make_ctx();
1031        ctx.inner
1032            .lock()
1033            .unwrap()
1034            .history_patches
1035            .insert("my-patch".to_string());
1036        assert!(ctx.is_patched("my-patch"));
1037    }
1038
1039    #[test]
1040    fn test_is_patched_mid_replay_returns_false() {
1041        // history_scheduled_count = 2, but seq = 0 → mid-replay, unpatched.
1042        let ctx = make_ctx();
1043        ctx.inner.lock().unwrap().history_scheduled_count = 2;
1044        assert!(!ctx.is_patched("my-patch"));
1045    }
1046
1047    #[test]
1048    fn test_is_patched_at_frontier_after_history_returns_true() {
1049        // history_scheduled_count = 1, seq = 1 → at frontier.
1050        let ctx = make_ctx();
1051        {
1052            let mut inner = ctx.inner.lock().unwrap();
1053            inner.history_scheduled_count = 1;
1054            inner.sequence_number = 1;
1055        }
1056        assert!(ctx.is_patched("my-patch"));
1057    }
1058
1059    #[test]
1060    fn test_is_patched_caches_decision() {
1061        let ctx = make_ctx();
1062        // First call caches the result.
1063        assert!(ctx.is_patched("my-patch"));
1064        // Second call uses the cache regardless of state changes.
1065        ctx.inner.lock().unwrap().history_scheduled_count = 99;
1066        assert!(ctx.is_patched("my-patch"));
1067    }
1068
1069    /// Extract a `CreateTimerAction`.
1070    fn extract_create_timer(action: &proto::WorkflowAction) -> &proto::CreateTimerAction {
1071        match &action.workflow_action_type {
1072            Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => a,
1073            other => panic!("expected CreateTimer action, got {other:?}"),
1074        }
1075    }
1076
1077    #[test]
1078    fn test_create_timer_origin_none() {
1079        // Generic timers have no origin.
1080        let ctx = make_ctx();
1081        let _task = ctx.create_timer(std::time::Duration::from_secs(60));
1082
1083        let inner = ctx.inner.lock().unwrap();
1084        let timer_action = extract_create_timer(&inner.pending_actions[0]);
1085        assert!(
1086            timer_action.origin.is_none(),
1087            "generic timer should have no origin"
1088        );
1089    }
1090
1091    #[test]
1092    fn test_wait_for_external_event_emits_timer_new_execution() {
1093        // New executions emit a far-future ExternalEvent timer.
1094        let ctx = make_ctx();
1095        let _task = ctx.wait_for_external_event("approval");
1096
1097        let inner = ctx.inner.lock().unwrap();
1098        assert_eq!(
1099            inner.sequence_number, 1,
1100            "should have allocated a seq for the timer"
1101        );
1102        assert_eq!(
1103            inner.pending_actions.len(),
1104            1,
1105            "should have emitted a CreateTimerAction"
1106        );
1107
1108        let timer_action = extract_create_timer(&inner.pending_actions[0]);
1109        match &timer_action.origin {
1110            Some(proto::create_timer_action::Origin::ExternalEvent(e)) => {
1111                assert_eq!(e.name, "approval");
1112            }
1113            other => panic!("expected ExternalEvent origin, got {other:?}"),
1114        }
1115
1116        // Assert the far-future sentinel.
1117        let fire_at = timer_action
1118            .fire_at
1119            .as_ref()
1120            .expect("fire_at should be set");
1121        let fire_at_dt = chrono::DateTime::from_timestamp(fire_at.seconds, fire_at.nanos as u32);
1122        assert!(fire_at_dt.is_some());
1123        assert!(fire_at_dt.unwrap().year() >= 9999);
1124    }
1125
1126    #[test]
1127    fn test_wait_for_external_event_no_timer_during_replay() {
1128        // Mid-replay without patch history keeps old behaviour: no timer.
1129        let ctx = make_ctx();
1130        ctx.inner.lock().unwrap().history_scheduled_count = 5;
1131
1132        let _task = ctx.wait_for_external_event("approval");
1133
1134        let inner = ctx.inner.lock().unwrap();
1135        assert_eq!(
1136            inner.sequence_number, 0,
1137            "should NOT allocate a seq during replay"
1138        );
1139        assert!(
1140            inner.pending_actions.is_empty(),
1141            "should NOT emit a timer during replay"
1142        );
1143        assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
1144    }
1145
1146    #[test]
1147    fn test_wait_for_external_event_buffered_still_emits_timer() {
1148        // Buffered events still emit the timer in patched executions.
1149        let ctx = make_ctx();
1150        {
1151            let mut inner = ctx.inner.lock().unwrap();
1152            inner
1153                .buffered_events
1154                .entry("approval".to_string())
1155                .or_default()
1156                .push_back((Some("\"yes\"".to_string()), true));
1157        }
1158
1159        let task = ctx.wait_for_external_event("APPROVAL");
1160        assert!(
1161            task.is_complete(),
1162            "buffered event should complete immediately"
1163        );
1164
1165        let inner = ctx.inner.lock().unwrap();
1166        assert_eq!(
1167            inner.sequence_number, 1,
1168            "should have allocated a seq for the timer"
1169        );
1170        assert_eq!(inner.pending_actions.len(), 1);
1171        let timer_action = extract_create_timer(&inner.pending_actions[0]);
1172        assert!(
1173            matches!(
1174                &timer_action.origin,
1175                Some(proto::create_timer_action::Origin::ExternalEvent(_))
1176            ),
1177            "timer origin should be ExternalEvent"
1178        );
1179    }
1180
1181    #[test]
1182    fn test_wait_for_external_event_with_timeout_emits_timer() {
1183        // Timeout waits always emit the explicit timer.
1184        let ctx = make_ctx();
1185
1186        // Mirror the method setup without awaiting the future.
1187        {
1188            let mut inner = ctx.inner.lock().unwrap();
1189            let event_name = "approval".to_string();
1190            let fire_at = inner.current_utc_datetime + chrono::Duration::seconds(30);
1191            let origin = proto::create_timer_action::Origin::ExternalEvent(
1192                proto::TimerOriginExternalEvent {
1193                    name: "approval".to_string(),
1194                },
1195            );
1196            let _timer = OrchestrationContext::create_timer_with_origin(
1197                &mut inner,
1198                fire_at,
1199                None,
1200                Some(origin),
1201            );
1202            // Register the event wait.
1203            let task = CompletableTask::new();
1204            inner
1205                .pending_event_tasks
1206                .entry(event_name)
1207                .or_default()
1208                .push_back(task);
1209        }
1210
1211        let inner = ctx.inner.lock().unwrap();
1212        assert_eq!(inner.sequence_number, 1);
1213        let timer_action = extract_create_timer(&inner.pending_actions[0]);
1214        match &timer_action.origin {
1215            Some(proto::create_timer_action::Origin::ExternalEvent(e)) => {
1216                assert_eq!(e.name, "approval");
1217            }
1218            other => panic!("expected ExternalEvent origin, got {other:?}"),
1219        }
1220        // Timeout timers are not the far-future sentinel.
1221        let fire_at = timer_action.fire_at.as_ref().unwrap();
1222        let fire_at_dt =
1223            chrono::DateTime::from_timestamp(fire_at.seconds, fire_at.nanos as u32).unwrap();
1224        assert!(fire_at_dt.year() < 9999, "should not be far-future");
1225    }
1226
1227    #[test]
1228    fn test_create_timer_refactor_still_works() {
1229        // create_timer still delegates without adding an origin.
1230        let ctx = make_ctx();
1231        let _t1 = ctx.create_timer(std::time::Duration::from_secs(10));
1232        let _t2 = ctx.create_timer(std::time::Duration::from_secs(20));
1233
1234        let inner = ctx.inner.lock().unwrap();
1235        assert_eq!(inner.sequence_number, 2);
1236        assert_eq!(inner.pending_actions.len(), 2);
1237        assert_eq!(inner.pending_actions[0].id, 0);
1238        assert_eq!(inner.pending_actions[1].id, 1);
1239
1240        for action in &inner.pending_actions {
1241            let timer = extract_create_timer(action);
1242            assert!(timer.fire_at.is_some());
1243            assert!(
1244                timer.origin.is_none(),
1245                "generic timer should have no origin"
1246            );
1247        }
1248    }
1249}