Skip to main content

dapr_durabletask/task/
orchestration_context.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex, MutexGuard};
4
5use futures::future::BoxFuture;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9use crate::api::{
10    DurableTaskError, FailureDetails, HistoryPropagationScope, OrchestrationStatus,
11    PropagatedHistory, RetryPolicy,
12};
13use crate::internal::{to_json, to_timestamp};
14use crate::proto;
15
16use super::completable_task::CompletableTask;
17use super::options::{ActivityOptions, SubOrchestratorOptions};
18
19pub(crate) fn lock_inner<T>(m: &Mutex<T>) -> MutexGuard<'_, T> {
20    m.lock().unwrap_or_else(|e| e.into_inner())
21}
22
23#[derive(Debug)]
24pub(crate) struct ContextConfig {
25    pub(crate) max_event_names: usize,
26    pub(crate) max_events_per_name: usize,
27    pub(crate) max_pending_tasks_per_name: usize,
28    pub(crate) max_json_payload_size: usize,
29}
30
31/// Internal state shared between the context and the orchestration executor.
32pub(crate) struct OrchestrationContextInner {
33    pub(crate) config: Arc<ContextConfig>,
34    pub(crate) instance_id: Arc<str>,
35    pub(crate) current_utc_datetime: chrono::DateTime<chrono::Utc>,
36    pub(crate) is_replaying: Arc<AtomicBool>,
37    pub(crate) is_complete: bool,
38    pub(crate) input: Option<String>,
39    pub(crate) name: Arc<str>,
40    pub(crate) custom_status: Option<String>,
41    pub(crate) sequence_number: i32,
42    pub(crate) pending_tasks: HashMap<i32, CompletableTask>,
43    pub(crate) pending_event_tasks: HashMap<String, VecDeque<CompletableTask>>,
44    /// Events buffered while no waiter exists. The `bool` records whether
45    /// the originating `EventRaised` event was applied during replay.
46    pub(crate) buffered_events: HashMap<String, VecDeque<(Option<String>, bool)>>,
47    pub(crate) pending_actions: Vec<proto::WorkflowAction>,
48    pub(crate) completion_status: Option<OrchestrationStatus>,
49    pub(crate) completion_result: Option<String>,
50    pub(crate) completion_failure: Option<FailureDetails>,
51    pub(crate) continue_as_new_input: Option<String>,
52    pub(crate) save_events_on_continue: bool,
53    pub(crate) is_suspended: bool,
54    /// Patches recorded as applied in the orchestration history (from `WorkflowStarted` events).
55    pub(crate) history_patches: std::collections::HashSet<String>,
56    /// Cache of patch decisions made during the current execution.
57    pub(crate) applied_patches: HashMap<String, bool>,
58    /// Number of sequence-consuming scheduled actions recorded in history
59    /// (TaskScheduled + TimerCreated + ChildWorkflowInstanceCreated).
60    /// Used to determine whether `is_patched` is called mid-history or at the frontier.
61    pub(crate) history_scheduled_count: i32,
62    /// History forwarded from the parent workflow (if any). Populated from
63    /// the `WorkflowRequest.propagated_history` field.
64    pub(crate) propagated_history: Option<Arc<PropagatedHistory>>,
65}
66
67/// The orchestration context provided to orchestrator functions.
68///
69/// All methods are safe to call from async code. The context is cloneable
70/// and thread-safe (`Send + Sync`), backed by `Arc<Mutex<>>`.
71#[derive(Clone)]
72pub struct OrchestrationContext {
73    pub(crate) inner: Arc<Mutex<OrchestrationContextInner>>,
74}
75
76impl OrchestrationContext {
77    /// Create a new orchestration context with the given parameters.
78    pub(crate) fn new(
79        instance_id: String,
80        name: String,
81        input: Option<String>,
82        current_utc_datetime: chrono::DateTime<chrono::Utc>,
83        is_replaying: bool,
84        options: &crate::worker::WorkerOptions,
85        event_count_hint: usize,
86    ) -> Self {
87        let config = Arc::new(ContextConfig {
88            max_event_names: options.max_event_names,
89            max_events_per_name: options.max_events_per_name,
90            max_pending_tasks_per_name: options.max_pending_tasks_per_name,
91            max_json_payload_size: options.max_json_payload_size,
92        });
93
94        Self {
95            inner: Arc::new(Mutex::new(OrchestrationContextInner {
96                config,
97                instance_id: Arc::<str>::from(instance_id),
98                current_utc_datetime,
99                is_replaying: Arc::new(AtomicBool::new(is_replaying)),
100                is_complete: false,
101                input,
102                name: Arc::<str>::from(name),
103                custom_status: None,
104                sequence_number: 0,
105                pending_tasks: HashMap::with_capacity(event_count_hint / 2),
106                pending_event_tasks: HashMap::new(),
107                buffered_events: HashMap::new(),
108                pending_actions: Vec::with_capacity(event_count_hint / 2),
109                completion_status: None,
110                completion_result: None,
111                completion_failure: None,
112                continue_as_new_input: None,
113                save_events_on_continue: false,
114                is_suspended: false,
115                history_patches: std::collections::HashSet::new(),
116                applied_patches: HashMap::new(),
117                history_scheduled_count: 0,
118                propagated_history: None,
119            })),
120        }
121    }
122
123    /// Get the instance ID.
124    pub fn instance_id(&self) -> Arc<str> {
125        lock_inner(&self.inner).instance_id.clone()
126    }
127
128    /// Get the current UTC datetime (deterministic, from history events).
129    pub fn current_utc_datetime(&self) -> chrono::DateTime<chrono::Utc> {
130        lock_inner(&self.inner).current_utc_datetime
131    }
132
133    /// Check if the orchestrator is currently replaying.
134    pub fn is_replaying(&self) -> bool {
135        lock_inner(&self.inner).is_replaying.load(Ordering::Acquire)
136    }
137
138    /// Get the orchestration name.
139    pub fn name(&self) -> Arc<str> {
140        lock_inner(&self.inner).name.clone()
141    }
142
143    /// Get the orchestration input, deserialised from JSON.
144    pub fn input<T: DeserializeOwned>(&self) -> crate::api::Result<T> {
145        let inner = lock_inner(&self.inner);
146        crate::internal::from_json(inner.input.as_deref(), inner.config.max_json_payload_size)
147    }
148
149    /// Returns history forwarded from the parent workflow, if the parent
150    /// scheduled this child with a non-`None` history propagation scope.
151    ///
152    /// See [`HistoryPropagationScope`] for the parent-side trade-off between
153    /// `OwnHistory` and `Lineage`.
154    pub fn propagated_history(&self) -> Option<Arc<PropagatedHistory>> {
155        lock_inner(&self.inner).propagated_history.clone()
156    }
157
158    /// Set a custom status string.
159    pub fn set_custom_status(&self, status: impl Into<String>) {
160        let mut inner = lock_inner(&self.inner);
161        inner.custom_status = Some(status.into());
162    }
163
164    /// Schedule an activity for execution.
165    ///
166    /// Returns a [`CompletableTask`] that resolves when the activity completes.
167    ///
168    /// During replay: if the corresponding `TaskCompleted`/`TaskFailed` event
169    /// exists in history, the task will already be complete.
170    /// During new execution: creates a `ScheduleTaskAction`.
171    pub fn call_activity(&self, name: &str, input: impl Serialize) -> CompletableTask {
172        tracing::debug!(activity = %name, "Scheduling activity");
173        self.call_activity_inner(name, input, None, None)
174    }
175
176    /// Schedule an activity with an `app_id` for cross-app scenarios.
177    pub fn call_activity_with_app_id(
178        &self,
179        name: &str,
180        input: impl Serialize,
181        app_id: &str,
182    ) -> CompletableTask {
183        tracing::debug!(activity = %name, app_id = %app_id, "Scheduling activity with app_id");
184        self.call_activity_inner(name, input, Some(app_id), None)
185    }
186
187    fn call_activity_inner(
188        &self,
189        name: &str,
190        input: impl Serialize,
191        app_id: Option<&str>,
192        history_propagation_scope: Option<HistoryPropagationScope>,
193    ) -> CompletableTask {
194        let input_json = match to_json(&input) {
195            Ok(json) => json,
196            Err(e) => {
197                let task = CompletableTask::new();
198                task.fail(FailureDetails {
199                    message: format!("Failed to serialize activity input: {e}"),
200                    error_type: "SerializationError".to_string(),
201                    stack_trace: None,
202                });
203                return task;
204            }
205        };
206        self.call_activity_raw(name, input_json, app_id, history_propagation_scope)
207    }
208
209    /// Internal: schedule an activity using a pre-serialised JSON input.
210    fn call_activity_raw(
211        &self,
212        name: &str,
213        input_json: Option<String>,
214        app_id: Option<&str>,
215        history_propagation_scope: Option<HistoryPropagationScope>,
216    ) -> CompletableTask {
217        let mut inner = lock_inner(&self.inner);
218        let seq = inner.sequence_number;
219        inner.sequence_number += 1;
220
221        if let Some(existing) = inner.pending_tasks.get(&seq)
222            && existing.is_complete()
223        {
224            return existing.clone();
225        }
226
227        let task = CompletableTask::new();
228        task.set_replay_handle(inner.is_replaying.clone());
229        inner.pending_tasks.insert(seq, task.clone());
230
231        let router = app_id.map(|id| proto::TaskRouter {
232            source_app_id: String::new(),
233            target_app_id: Some(id.to_string()),
234            target_app_namespace: None,
235        });
236        let action = proto::WorkflowAction {
237            id: seq,
238            router: None,
239            workflow_action_type: Some(proto::workflow_action::WorkflowActionType::ScheduleTask(
240                proto::ScheduleTaskAction {
241                    name: name.to_string(),
242                    version: None,
243                    input: input_json,
244                    router,
245                    task_execution_id: String::new(),
246                    history_propagation_scope: history_propagation_scope
247                        .map(|s| s.to_proto() as i32),
248                },
249            )),
250        };
251        inner.pending_actions.push(action);
252
253        task
254    }
255
256    /// Schedule an activity with options (retry policy, app ID).
257    ///
258    /// Returns a future that drives the activity to completion, transparently
259    /// scheduling durable timers and re-issuing the activity on each retry.
260    pub fn call_activity_with_options(
261        &self,
262        name: &str,
263        input: impl Serialize,
264        options: ActivityOptions,
265    ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
266    {
267        let input_json = to_json(&input);
268        let name = name.to_string();
269        let app_id = options.app_id.clone();
270        let scope = options.history_propagation_scope;
271        let ctx = self.clone();
272
273        async move {
274            let input_json = input_json?;
275            match options.retry_policy {
276                Some(policy) => {
277                    let first_attempt_time = ctx.current_utc_datetime();
278                    let schedule: Arc<
279                        dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
280                    > = Arc::new(move |c: &OrchestrationContext| {
281                        c.call_activity_raw(&name, input_json.clone(), app_id.as_deref(), scope)
282                    });
283                    call_with_retry(ctx, schedule, policy, first_attempt_time).await
284                }
285                None => {
286                    ctx.call_activity_raw(&name, input_json, app_id.as_deref(), scope)
287                        .await
288                }
289            }
290        }
291    }
292
293    /// Schedule a sub-orchestration for execution.
294    pub fn call_sub_orchestrator(
295        &self,
296        name: &str,
297        input: impl Serialize,
298        instance_id: Option<&str>,
299    ) -> CompletableTask {
300        tracing::debug!(
301            sub_orchestrator = %name,
302            sub_instance_id = ?instance_id,
303            "Scheduling sub-orchestration"
304        );
305        self.call_sub_orchestrator_inner(name, input, instance_id, None, None)
306    }
307
308    /// Schedule a sub-orchestration targeting a specific Dapr app ID.
309    pub fn call_sub_orchestrator_with_app_id(
310        &self,
311        name: &str,
312        input: impl Serialize,
313        instance_id: Option<&str>,
314        app_id: &str,
315    ) -> CompletableTask {
316        tracing::debug!(
317            sub_orchestrator = %name,
318            sub_instance_id = ?instance_id,
319            app_id = %app_id,
320            "Scheduling sub-orchestration with app_id"
321        );
322        self.call_sub_orchestrator_inner(name, input, instance_id, Some(app_id), None)
323    }
324
325    fn call_sub_orchestrator_inner(
326        &self,
327        name: &str,
328        input: impl Serialize,
329        instance_id: Option<&str>,
330        app_id: Option<&str>,
331        history_propagation_scope: Option<HistoryPropagationScope>,
332    ) -> CompletableTask {
333        let input_json = match to_json(&input) {
334            Ok(json) => json,
335            Err(e) => {
336                let task = CompletableTask::new();
337                task.fail(FailureDetails {
338                    message: format!("Failed to serialize sub-orchestrator input: {e}"),
339                    error_type: "SerializationError".to_string(),
340                    stack_trace: None,
341                });
342                return task;
343            }
344        };
345        self.call_sub_orchestrator_raw(
346            name,
347            input_json,
348            instance_id,
349            app_id,
350            history_propagation_scope,
351        )
352    }
353
354    /// Internal: schedule a sub-orchestration using a pre-serialised JSON input.
355    fn call_sub_orchestrator_raw(
356        &self,
357        name: &str,
358        input_json: Option<String>,
359        instance_id: Option<&str>,
360        app_id: Option<&str>,
361        history_propagation_scope: Option<HistoryPropagationScope>,
362    ) -> CompletableTask {
363        let mut inner = lock_inner(&self.inner);
364        let seq = inner.sequence_number;
365        inner.sequence_number += 1;
366
367        if let Some(existing) = inner.pending_tasks.get(&seq)
368            && existing.is_complete()
369        {
370            return existing.clone();
371        }
372
373        let task = CompletableTask::new();
374        task.set_replay_handle(inner.is_replaying.clone());
375        inner.pending_tasks.insert(seq, task.clone());
376
377        let sub_instance_id = instance_id
378            .map(|s| s.to_string())
379            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
380
381        let router = app_id.map(|id| proto::TaskRouter {
382            source_app_id: String::new(),
383            target_app_id: Some(id.to_string()),
384            target_app_namespace: None,
385        });
386
387        let action = proto::WorkflowAction {
388            id: seq,
389            router: None,
390            workflow_action_type: Some(
391                proto::workflow_action::WorkflowActionType::CreateChildWorkflow(
392                    proto::CreateChildWorkflowAction {
393                        instance_id: sub_instance_id,
394                        name: name.to_string(),
395                        version: None,
396                        input: input_json,
397                        router,
398                        history_propagation_scope: history_propagation_scope
399                            .map(|s| s.to_proto() as i32),
400                    },
401                ),
402            ),
403        };
404        inner.pending_actions.push(action);
405
406        task
407    }
408
409    /// Schedule a sub-orchestration with options (instance ID, retry policy, app ID).
410    ///
411    /// Returns a future that drives the sub-orchestration to completion,
412    /// transparently scheduling durable timers and re-issuing the call on each retry.
413    ///
414    /// Note: when a retry policy is set and no explicit `instance_id` is given,
415    /// each retry uses a freshly generated instance ID.
416    pub fn call_sub_orchestrator_with_options(
417        &self,
418        name: &str,
419        input: impl Serialize,
420        options: SubOrchestratorOptions,
421    ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
422    {
423        let input_json = to_json(&input);
424        let name = name.to_string();
425        let instance_id = options.instance_id.clone();
426        let app_id = options.app_id.clone();
427        let scope = options.history_propagation_scope;
428        let ctx = self.clone();
429
430        async move {
431            let input_json = input_json?;
432            match options.retry_policy {
433                Some(policy) => {
434                    let first_attempt_time = ctx.current_utc_datetime();
435                    let schedule: Arc<
436                        dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
437                    > = Arc::new(move |c: &OrchestrationContext| {
438                        c.call_sub_orchestrator_raw(
439                            &name,
440                            input_json.clone(),
441                            instance_id.as_deref(),
442                            app_id.as_deref(),
443                            scope,
444                        )
445                    });
446                    call_with_retry(ctx, schedule, policy, first_attempt_time).await
447                }
448                None => {
449                    ctx.call_sub_orchestrator_raw(
450                        &name,
451                        input_json,
452                        instance_id.as_deref(),
453                        app_id.as_deref(),
454                        scope,
455                    )
456                    .await
457                }
458            }
459        }
460    }
461
462    /// Create a durable timer that fires after the specified duration.
463    pub fn create_timer(&self, delay: std::time::Duration) -> CompletableTask {
464        tracing::debug!(delay_ms = delay.as_millis() as u64, "Creating timer");
465        let mut inner = lock_inner(&self.inner);
466        let seq = inner.sequence_number;
467        inner.sequence_number += 1;
468
469        if let Some(existing) = inner.pending_tasks.get(&seq)
470            && existing.is_complete()
471        {
472            return existing.clone();
473        }
474
475        let task = CompletableTask::new();
476        task.set_replay_handle(inner.is_replaying.clone());
477        inner.pending_tasks.insert(seq, task.clone());
478
479        let fire_at = inner.current_utc_datetime
480            + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
481        let action = proto::WorkflowAction {
482            id: seq,
483            router: None,
484            workflow_action_type: Some(proto::workflow_action::WorkflowActionType::CreateTimer(
485                proto::CreateTimerAction {
486                    fire_at: Some(to_timestamp(fire_at)),
487                    name: None,
488                    origin: None,
489                },
490            )),
491        };
492        inner.pending_actions.push(action);
493
494        task
495    }
496
497    /// Wait for an external event with the given name.
498    ///
499    /// Event names are case-insensitive.
500    pub fn wait_for_external_event(&self, name: &str) -> CompletableTask {
501        tracing::debug!(event_name = %name, "Waiting for external event");
502        let mut inner = lock_inner(&self.inner);
503        let event_name = name.to_lowercase();
504
505        if let Some(events) = inner.buffered_events.get_mut(&event_name)
506            && !events.is_empty()
507        {
508            let (data, during_replay) = events
509                .pop_front()
510                .expect("buffered event queue is not empty");
511            let task = CompletableTask::new();
512            task.set_replay_handle(inner.is_replaying.clone());
513            task.complete_with_phase(data, during_replay);
514            return task;
515        }
516
517        let task = CompletableTask::new();
518        task.set_replay_handle(inner.is_replaying.clone());
519        let max_pending = inner.config.max_pending_tasks_per_name;
520        let pending = inner.pending_event_tasks.entry(event_name).or_default();
521        if pending.len() >= max_pending {
522            tracing::warn!(event_name = %name, "Pending event task limit reached, discarding wait");
523            return task;
524        }
525        pending.push_back(task.clone());
526        task
527    }
528
529    /// Continue the orchestration as new with new input.
530    pub fn continue_as_new(&self, input: impl Serialize, save_events: bool) {
531        tracing::debug!(save_events = save_events, "Continuing orchestration as new");
532        let mut inner = lock_inner(&self.inner);
533        inner.continue_as_new_input = to_json(&input).ok().flatten();
534        inner.save_events_on_continue = save_events;
535    }
536
537    /// Check whether a named patch should be applied in the current execution.
538    ///
539    /// This enables safe, deterministic code upgrades. Wrap new behaviour in
540    /// `if ctx.is_patched("my-patch")` to ensure that:
541    ///
542    /// - Replaying executions that previously ran the *unpatched* path continue
543    ///   on the unpatched path (preserving determinism).
544    /// - Executions that previously ran the *patched* path continue on the
545    ///   patched path.
546    /// - Brand-new executions (at the history frontier) always take the patched
547    ///   path.
548    ///
549    /// This matches the behaviour of the Go and Python SDKs.
550    pub fn is_patched(&self, patch_name: &str) -> bool {
551        let mut inner = lock_inner(&self.inner);
552
553        // Return the cached decision from the current execution if available.
554        if let Some(&cached) = inner.applied_patches.get(patch_name) {
555            return cached;
556        }
557
558        // If this patch was recorded as applied in the history, honour it.
559        if inner.history_patches.contains(patch_name) {
560            inner.applied_patches.insert(patch_name.to_string(), true);
561            return true;
562        }
563
564        // If the orchestrator hasn't yet consumed all scheduled actions from
565        // history, this call is mid-replay.  The previous execution did NOT
566        // apply this patch, so we must stay on the unpatched path to preserve
567        // determinism.
568        if inner.sequence_number < inner.history_scheduled_count {
569            inner.applied_patches.insert(patch_name.to_string(), false);
570            return false;
571        }
572
573        // We're at (or past) the history frontier — apply the patch.
574        inner.applied_patches.insert(patch_name.to_string(), true);
575        true
576    }
577}
578
579// ── Retry helpers ─────────────────────────────────────────────────────────────
580
581/// Compute the delay before the next retry attempt, or `None` if the retry
582/// should not proceed (timeout exceeded or predicate returned false).
583fn compute_retry_delay(
584    policy: &RetryPolicy,
585    attempt: u32,
586    first_attempt_time: chrono::DateTime<chrono::Utc>,
587    current_time: chrono::DateTime<chrono::Utc>,
588    details: &FailureDetails,
589) -> Option<std::time::Duration> {
590    // Check custom predicate.
591    if let Some(ref handle) = policy.handle
592        && !handle(details)
593    {
594        return None;
595    }
596
597    // Check overall retry timeout.
598    if let Some(timeout) = policy.retry_timeout {
599        let elapsed = current_time - first_attempt_time;
600        let timeout_dur = chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
601        if elapsed >= timeout_dur {
602            return None;
603        }
604    }
605
606    // Exponential backoff.
607    let first_ms = policy.first_retry_interval.as_millis() as f64;
608    let next_ms = first_ms * policy.backoff_coefficient.powi(attempt as i32);
609
610    let delay_ms = if let Some(max) = policy.max_retry_interval {
611        next_ms.min(max.as_millis() as f64)
612    } else {
613        next_ms
614    };
615
616    Some(std::time::Duration::from_millis(delay_ms as u64))
617}
618
619/// Drive a task to completion, retrying on failure according to `policy`.
620///
621/// `schedule` is called once per attempt and must return a fresh [`CompletableTask`].
622/// Between attempts a durable timer is created for the computed backoff delay,
623/// preserving determinism across replays.
624fn call_with_retry(
625    ctx: OrchestrationContext,
626    schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync>,
627    policy: RetryPolicy,
628    first_attempt_time: chrono::DateTime<chrono::Utc>,
629) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
630    Box::pin(async move {
631        let mut attempt = 0;
632        loop {
633            let task = schedule(&ctx);
634            match task.await {
635                Ok(v) => return Ok(v),
636                Err(DurableTaskError::TaskFailed {
637                    message,
638                    failure_details,
639                }) => {
640                    let details = failure_details.clone().unwrap_or_else(|| FailureDetails {
641                        message: message.clone(),
642                        error_type: "TaskFailed".to_string(),
643                        stack_trace: None,
644                    });
645
646                    if attempt + 1 >= policy.max_number_of_attempts {
647                        tracing::debug!(
648                            attempt,
649                            max = policy.max_number_of_attempts,
650                            "Max retry attempts reached"
651                        );
652                        return Err(DurableTaskError::TaskFailed {
653                            message,
654                            failure_details,
655                        });
656                    }
657
658                    let current_time = ctx.current_utc_datetime();
659                    let delay = match compute_retry_delay(
660                        &policy,
661                        attempt,
662                        first_attempt_time,
663                        current_time,
664                        &details,
665                    ) {
666                        Some(d) => d,
667                        None => {
668                            tracing::debug!(attempt, "Retry predicate or timeout prevented retry");
669                            return Err(DurableTaskError::TaskFailed {
670                                message,
671                                failure_details,
672                            });
673                        }
674                    };
675
676                    tracing::debug!(
677                        attempt,
678                        delay_ms = delay.as_millis(),
679                        "Scheduling retry timer"
680                    );
681                    ctx.create_timer(delay).await?;
682                    attempt += 1;
683                }
684                Err(e) => return Err(e),
685            }
686        }
687    })
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693
694    fn make_ctx() -> OrchestrationContext {
695        OrchestrationContext::new(
696            "inst-1".to_string(),
697            "my_orch".to_string(),
698            Some("\"hello\"".to_string()),
699            chrono::Utc::now(),
700            false,
701            &crate::worker::WorkerOptions::default(),
702            0,
703        )
704    }
705
706    #[test]
707    fn test_basic_accessors() {
708        let ctx = make_ctx();
709        assert_eq!(ctx.instance_id().as_ref(), "inst-1");
710        assert_eq!(ctx.name().as_ref(), "my_orch");
711        assert!(!ctx.is_replaying());
712    }
713
714    #[test]
715    fn test_input() {
716        let ctx = make_ctx();
717        let input: String = ctx.input().unwrap();
718        assert_eq!(input, "hello");
719    }
720
721    #[test]
722    fn test_set_custom_status() {
723        let ctx = make_ctx();
724        ctx.set_custom_status("processing");
725        let inner = ctx.inner.lock().unwrap();
726        assert_eq!(inner.custom_status, Some("processing".to_string()));
727    }
728
729    #[test]
730    fn test_call_activity_creates_action() {
731        let ctx = make_ctx();
732        let _task = ctx.call_activity("greet", "world");
733
734        let inner = ctx.inner.lock().unwrap();
735        assert_eq!(inner.sequence_number, 1);
736        assert_eq!(inner.pending_actions.len(), 1);
737        assert_eq!(inner.pending_actions[0].id, 0);
738        match &inner.pending_actions[0].workflow_action_type {
739            Some(proto::workflow_action::WorkflowActionType::ScheduleTask(a)) => {
740                assert_eq!(a.name, "greet");
741                assert_eq!(a.input, Some("\"world\"".to_string()));
742            }
743            _ => panic!("expected ScheduleTask action"),
744        }
745    }
746
747    #[test]
748    fn test_call_activity_replay_returns_existing() {
749        let ctx = make_ctx();
750
751        // Pre-populate a completed task at sequence 0 (simulating replay)
752        {
753            let mut inner = ctx.inner.lock().unwrap();
754            let task = CompletableTask::new();
755            task.complete(Some("42".to_string()));
756            inner.pending_tasks.insert(0, task);
757        }
758
759        let task = ctx.call_activity("greet", "world");
760        assert!(task.is_complete());
761
762        let inner = ctx.inner.lock().unwrap();
763        assert_eq!(inner.pending_actions.len(), 0);
764    }
765
766    #[test]
767    fn test_call_sub_orchestrator() {
768        let ctx = make_ctx();
769        let _task = ctx.call_sub_orchestrator("child_orch", "input", Some("child-1"));
770
771        let inner = ctx.inner.lock().unwrap();
772        assert_eq!(inner.sequence_number, 1);
773        match &inner.pending_actions[0].workflow_action_type {
774            Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
775                assert_eq!(a.name, "child_orch");
776                assert_eq!(a.instance_id, "child-1");
777            }
778            _ => panic!("expected CreateChildWorkflow action"),
779        }
780    }
781
782    #[test]
783    fn test_create_timer() {
784        let ctx = make_ctx();
785        let _task = ctx.create_timer(std::time::Duration::from_secs(60));
786
787        let inner = ctx.inner.lock().unwrap();
788        assert_eq!(inner.sequence_number, 1);
789        match &inner.pending_actions[0].workflow_action_type {
790            Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => {
791                assert!(a.fire_at.is_some());
792            }
793            _ => panic!("expected CreateTimer action"),
794        }
795    }
796
797    #[test]
798    fn test_wait_for_external_event_buffered() {
799        let ctx = make_ctx();
800
801        // Buffer an event
802        {
803            let mut inner = ctx.inner.lock().unwrap();
804            inner
805                .buffered_events
806                .entry("approval".to_string())
807                .or_default()
808                .push_back((Some("\"yes\"".to_string()), true));
809        }
810
811        let task = ctx.wait_for_external_event("APPROVAL"); // case-insensitive
812        assert!(task.is_complete());
813    }
814
815    #[test]
816    fn test_wait_for_external_event_pending() {
817        let ctx = make_ctx();
818        let task = ctx.wait_for_external_event("approval");
819        assert!(!task.is_complete());
820
821        let inner = ctx.inner.lock().unwrap();
822        assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
823    }
824
825    #[test]
826    fn test_continue_as_new() {
827        let ctx = make_ctx();
828        ctx.continue_as_new("new_input", true);
829
830        let inner = ctx.inner.lock().unwrap();
831        assert_eq!(
832            inner.continue_as_new_input,
833            Some("\"new_input\"".to_string())
834        );
835        assert!(inner.save_events_on_continue);
836    }
837
838    #[test]
839    fn test_sequence_numbers_increment() {
840        let ctx = make_ctx();
841        let _t1 = ctx.call_activity("a", ());
842        let _t2 = ctx.call_activity("b", ());
843        let _t3 = ctx.create_timer(std::time::Duration::from_secs(1));
844
845        let inner = ctx.inner.lock().unwrap();
846        assert_eq!(inner.sequence_number, 3);
847        assert_eq!(inner.pending_actions[0].id, 0);
848        assert_eq!(inner.pending_actions[1].id, 1);
849        assert_eq!(inner.pending_actions[2].id, 2);
850    }
851
852    #[test]
853    fn test_call_sub_orchestrator_with_app_id() {
854        let ctx = make_ctx();
855        let _task = ctx.call_sub_orchestrator_with_app_id(
856            "child_orch",
857            "input",
858            Some("child-1"),
859            "other-app",
860        );
861
862        let inner = ctx.inner.lock().unwrap();
863        assert_eq!(inner.sequence_number, 1);
864        match &inner.pending_actions[0].workflow_action_type {
865            Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
866                assert_eq!(a.name, "child_orch");
867                assert_eq!(a.instance_id, "child-1");
868                let router = a.router.as_ref().expect("expected router");
869                assert_eq!(router.target_app_id, Some("other-app".to_string()));
870            }
871            _ => panic!("expected CreateChildWorkflow action"),
872        }
873    }
874
875    #[test]
876    fn test_is_patched_new_execution_returns_true() {
877        // No history → always at the frontier → patch applies.
878        let ctx = make_ctx();
879        assert!(ctx.is_patched("my-patch"));
880    }
881
882    #[test]
883    fn test_is_patched_in_history_returns_true() {
884        // Patch recorded in history → return true.
885        let ctx = make_ctx();
886        ctx.inner
887            .lock()
888            .unwrap()
889            .history_patches
890            .insert("my-patch".to_string());
891        assert!(ctx.is_patched("my-patch"));
892    }
893
894    #[test]
895    fn test_is_patched_mid_replay_returns_false() {
896        // history_scheduled_count = 2, but seq = 0 → mid-replay, unpatched.
897        let ctx = make_ctx();
898        ctx.inner.lock().unwrap().history_scheduled_count = 2;
899        assert!(!ctx.is_patched("my-patch"));
900    }
901
902    #[test]
903    fn test_is_patched_at_frontier_after_history_returns_true() {
904        // history_scheduled_count = 1, seq = 1 → at frontier.
905        let ctx = make_ctx();
906        {
907            let mut inner = ctx.inner.lock().unwrap();
908            inner.history_scheduled_count = 1;
909            inner.sequence_number = 1;
910        }
911        assert!(ctx.is_patched("my-patch"));
912    }
913
914    #[test]
915    fn test_is_patched_caches_decision() {
916        let ctx = make_ctx();
917        // First call caches the result.
918        assert!(ctx.is_patched("my-patch"));
919        // Second call uses the cache regardless of state changes.
920        ctx.inner.lock().unwrap().history_scheduled_count = 99;
921        assert!(ctx.is_patched("my-patch"));
922    }
923}