Skip to main content

dapr_durabletask/task/
orchestration_context.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use futures::future::BoxFuture;
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::api::{
9    DurableTaskError, FailureDetails, HistoryPropagationScope, OrchestrationStatus,
10    PropagatedHistory, RetryPolicy,
11};
12use crate::internal::{to_json, to_timestamp};
13use crate::proto;
14
15use super::completable_task::CompletableTask;
16use super::options::{ActivityOptions, SubOrchestratorOptions};
17
18/// Internal state shared between the context and the orchestration executor.
19pub(crate) struct OrchestrationContextInner {
20    pub(crate) instance_id: String,
21    pub(crate) current_utc_datetime: chrono::DateTime<chrono::Utc>,
22    pub(crate) is_replaying: bool,
23    pub(crate) is_complete: bool,
24    pub(crate) input: Option<String>,
25    pub(crate) name: String,
26    pub(crate) custom_status: Option<String>,
27    pub(crate) sequence_number: i32,
28    pub(crate) pending_tasks: HashMap<i32, CompletableTask>,
29    pub(crate) pending_event_tasks: HashMap<String, Vec<CompletableTask>>,
30    pub(crate) buffered_events: HashMap<String, Vec<Option<String>>>,
31    pub(crate) pending_actions: Vec<proto::WorkflowAction>,
32    pub(crate) completion_status: Option<OrchestrationStatus>,
33    pub(crate) completion_result: Option<String>,
34    pub(crate) completion_failure: Option<FailureDetails>,
35    pub(crate) continue_as_new_input: Option<String>,
36    pub(crate) save_events_on_continue: bool,
37    pub(crate) is_suspended: bool,
38    pub(crate) max_event_names: usize,
39    pub(crate) max_events_per_name: usize,
40    pub(crate) max_pending_tasks_per_name: usize,
41    pub(crate) max_json_payload_size: usize,
42    /// Patches recorded as applied in the orchestration history (from `WorkflowStarted` events).
43    pub(crate) history_patches: std::collections::HashSet<String>,
44    /// Cache of patch decisions made during the current execution.
45    pub(crate) applied_patches: HashMap<String, bool>,
46    /// Number of sequence-consuming scheduled actions recorded in history
47    /// (TaskScheduled + TimerCreated + ChildWorkflowInstanceCreated).
48    /// Used to determine whether `is_patched` is called mid-history or at the frontier.
49    pub(crate) history_scheduled_count: i32,
50    /// History forwarded from the parent workflow (if any). Populated from
51    /// the `WorkflowRequest.propagated_history` field.
52    pub(crate) propagated_history: Option<PropagatedHistory>,
53}
54
55/// The orchestration context provided to orchestrator functions.
56///
57/// All methods are safe to call from async code. The context is cloneable
58/// and thread-safe (`Send + Sync`), backed by `Arc<Mutex<>>`.
59#[derive(Clone)]
60pub struct OrchestrationContext {
61    pub(crate) inner: Arc<Mutex<OrchestrationContextInner>>,
62}
63
64impl OrchestrationContext {
65    /// Create a new orchestration context with the given parameters.
66    pub(crate) fn new(
67        instance_id: String,
68        name: String,
69        input: Option<String>,
70        current_utc_datetime: chrono::DateTime<chrono::Utc>,
71        is_replaying: bool,
72        options: &crate::worker::WorkerOptions,
73        event_count_hint: usize,
74    ) -> Self {
75        Self {
76            inner: Arc::new(Mutex::new(OrchestrationContextInner {
77                instance_id,
78                current_utc_datetime,
79                is_replaying,
80                is_complete: false,
81                input,
82                name,
83                custom_status: None,
84                sequence_number: 0,
85                pending_tasks: HashMap::with_capacity(event_count_hint / 2),
86                pending_event_tasks: HashMap::new(),
87                buffered_events: HashMap::new(),
88                pending_actions: Vec::with_capacity(event_count_hint / 2),
89                completion_status: None,
90                completion_result: None,
91                completion_failure: None,
92                continue_as_new_input: None,
93                save_events_on_continue: false,
94                is_suspended: false,
95                max_event_names: options.max_event_names,
96                max_events_per_name: options.max_events_per_name,
97                max_pending_tasks_per_name: options.max_pending_tasks_per_name,
98                max_json_payload_size: options.max_json_payload_size,
99                history_patches: std::collections::HashSet::new(),
100                applied_patches: HashMap::new(),
101                history_scheduled_count: 0,
102                propagated_history: None,
103            })),
104        }
105    }
106
107    /// Get the instance ID.
108    pub fn instance_id(&self) -> String {
109        self.inner
110            .lock()
111            .unwrap_or_else(|e| e.into_inner())
112            .instance_id
113            .clone()
114    }
115
116    /// Get the current UTC datetime (deterministic, from history events).
117    pub fn current_utc_datetime(&self) -> chrono::DateTime<chrono::Utc> {
118        self.inner
119            .lock()
120            .unwrap_or_else(|e| e.into_inner())
121            .current_utc_datetime
122    }
123
124    /// Check if the orchestrator is currently replaying.
125    pub fn is_replaying(&self) -> bool {
126        self.inner
127            .lock()
128            .unwrap_or_else(|e| e.into_inner())
129            .is_replaying
130    }
131
132    /// Get the orchestration name.
133    pub fn name(&self) -> String {
134        self.inner
135            .lock()
136            .unwrap_or_else(|e| e.into_inner())
137            .name
138            .clone()
139    }
140
141    /// Get the orchestration input, deserialised from JSON.
142    pub fn get_input<T: DeserializeOwned>(&self) -> crate::api::Result<T> {
143        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
144        crate::internal::from_json(inner.input.as_deref(), inner.max_json_payload_size)
145    }
146
147    /// Returns history forwarded from the parent workflow, if the parent
148    /// scheduled this child with a non-`None` history propagation scope.
149    ///
150    /// See [`HistoryPropagationScope`] for the parent-side trade-off between
151    /// `OwnHistory` and `Lineage`.
152    pub fn propagated_history(&self) -> Option<PropagatedHistory> {
153        self.inner
154            .lock()
155            .unwrap_or_else(|e| e.into_inner())
156            .propagated_history
157            .clone()
158    }
159
160    /// Set a custom status string.
161    pub fn set_custom_status(&self, status: impl Into<String>) {
162        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
163        inner.custom_status = Some(status.into());
164    }
165
166    /// Schedule an activity for execution.
167    ///
168    /// Returns a [`CompletableTask`] that resolves when the activity completes.
169    ///
170    /// During replay: if the corresponding `TaskCompleted`/`TaskFailed` event
171    /// exists in history, the task will already be complete.
172    /// During new execution: creates a `ScheduleTaskAction`.
173    pub fn call_activity(&self, name: &str, input: impl Serialize) -> CompletableTask {
174        tracing::debug!(activity = %name, "Scheduling activity");
175        self.call_activity_inner(name, input, None, None)
176    }
177
178    /// Schedule an activity with an `app_id` for cross-app scenarios.
179    pub fn call_activity_with_app_id(
180        &self,
181        name: &str,
182        input: impl Serialize,
183        app_id: &str,
184    ) -> CompletableTask {
185        tracing::debug!(activity = %name, app_id = %app_id, "Scheduling activity with app_id");
186        self.call_activity_inner(name, input, Some(app_id), None)
187    }
188
189    fn call_activity_inner(
190        &self,
191        name: &str,
192        input: impl Serialize,
193        app_id: Option<&str>,
194        history_propagation_scope: Option<HistoryPropagationScope>,
195    ) -> CompletableTask {
196        let input_json = match to_json(&input) {
197            Ok(json) => json,
198            Err(e) => {
199                let task = CompletableTask::new();
200                task.fail(FailureDetails {
201                    message: format!("Failed to serialize activity input: {e}"),
202                    error_type: "SerializationError".to_string(),
203                    stack_trace: None,
204                });
205                return task;
206            }
207        };
208        self.call_activity_raw(name, input_json, app_id, history_propagation_scope)
209    }
210
211    /// Internal: schedule an activity using a pre-serialised JSON input.
212    fn call_activity_raw(
213        &self,
214        name: &str,
215        input_json: Option<String>,
216        app_id: Option<&str>,
217        history_propagation_scope: Option<HistoryPropagationScope>,
218    ) -> CompletableTask {
219        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
220        let seq = inner.sequence_number;
221        inner.sequence_number += 1;
222
223        if let Some(existing) = inner.pending_tasks.get(&seq) {
224            if existing.is_complete() {
225                return existing.clone();
226            }
227        }
228
229        let task = CompletableTask::new();
230        inner.pending_tasks.insert(seq, task.clone());
231
232        let router = app_id.map(|id| proto::TaskRouter {
233            source_app_id: String::new(),
234            target_app_id: Some(id.to_string()),
235            target_app_namespace: None,
236        });
237        let action = proto::WorkflowAction {
238            id: seq,
239            router: None,
240            workflow_action_type: Some(proto::workflow_action::WorkflowActionType::ScheduleTask(
241                proto::ScheduleTaskAction {
242                    name: name.to_string(),
243                    version: None,
244                    input: input_json,
245                    router,
246                    task_execution_id: String::new(),
247                    history_propagation_scope: history_propagation_scope
248                        .map(|s| s.to_proto() as i32),
249                },
250            )),
251        };
252        inner.pending_actions.push(action);
253
254        task
255    }
256
257    /// Schedule an activity with options (retry policy, app ID).
258    ///
259    /// Returns a future that drives the activity to completion, transparently
260    /// scheduling durable timers and re-issuing the activity on each retry.
261    pub fn call_activity_with_options(
262        &self,
263        name: &str,
264        input: impl Serialize,
265        options: ActivityOptions,
266    ) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
267        let input_json = match to_json(&input) {
268            Ok(json) => json,
269            Err(e) => return Box::pin(async move { Err(e) }),
270        };
271        let name = name.to_string();
272        let app_id = options.app_id.clone();
273        let scope = options.history_propagation_scope;
274        let ctx = self.clone();
275
276        match options.retry_policy {
277            Some(policy) => {
278                let first_attempt_time = ctx.current_utc_datetime();
279                let schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync> =
280                    Arc::new(move |c: &OrchestrationContext| {
281                        c.call_activity_raw(&name, input_json.clone(), app_id.as_deref(), scope)
282                    });
283                call_with_retry(ctx, schedule, policy, 0, first_attempt_time)
284            }
285            None => {
286                let task = self.call_activity_raw(&name, input_json, app_id.as_deref(), scope);
287                Box::pin(task)
288            }
289        }
290    }
291
292    /// Schedule a sub-orchestration for execution.
293    pub fn call_sub_orchestrator(
294        &self,
295        name: &str,
296        input: impl Serialize,
297        instance_id: Option<&str>,
298    ) -> CompletableTask {
299        tracing::debug!(
300            sub_orchestrator = %name,
301            sub_instance_id = ?instance_id,
302            "Scheduling sub-orchestration"
303        );
304        self.call_sub_orchestrator_inner(name, input, instance_id, None, None)
305    }
306
307    /// Schedule a sub-orchestration targeting a specific Dapr app ID.
308    pub fn call_sub_orchestrator_with_app_id(
309        &self,
310        name: &str,
311        input: impl Serialize,
312        instance_id: Option<&str>,
313        app_id: &str,
314    ) -> CompletableTask {
315        tracing::debug!(
316            sub_orchestrator = %name,
317            sub_instance_id = ?instance_id,
318            app_id = %app_id,
319            "Scheduling sub-orchestration with app_id"
320        );
321        self.call_sub_orchestrator_inner(name, input, instance_id, Some(app_id), None)
322    }
323
324    fn call_sub_orchestrator_inner(
325        &self,
326        name: &str,
327        input: impl Serialize,
328        instance_id: Option<&str>,
329        app_id: Option<&str>,
330        history_propagation_scope: Option<HistoryPropagationScope>,
331    ) -> CompletableTask {
332        let input_json = match to_json(&input) {
333            Ok(json) => json,
334            Err(e) => {
335                let task = CompletableTask::new();
336                task.fail(FailureDetails {
337                    message: format!("Failed to serialize sub-orchestrator input: {e}"),
338                    error_type: "SerializationError".to_string(),
339                    stack_trace: None,
340                });
341                return task;
342            }
343        };
344        self.call_sub_orchestrator_raw(
345            name,
346            input_json,
347            instance_id,
348            app_id,
349            history_propagation_scope,
350        )
351    }
352
353    /// Internal: schedule a sub-orchestration using a pre-serialised JSON input.
354    fn call_sub_orchestrator_raw(
355        &self,
356        name: &str,
357        input_json: Option<String>,
358        instance_id: Option<&str>,
359        app_id: Option<&str>,
360        history_propagation_scope: Option<HistoryPropagationScope>,
361    ) -> CompletableTask {
362        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
363        let seq = inner.sequence_number;
364        inner.sequence_number += 1;
365
366        if let Some(existing) = inner.pending_tasks.get(&seq) {
367            if existing.is_complete() {
368                return existing.clone();
369            }
370        }
371
372        let task = CompletableTask::new();
373        inner.pending_tasks.insert(seq, task.clone());
374
375        let sub_instance_id = instance_id
376            .map(|s| s.to_string())
377            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
378
379        let router = app_id.map(|id| proto::TaskRouter {
380            source_app_id: String::new(),
381            target_app_id: Some(id.to_string()),
382            target_app_namespace: None,
383        });
384
385        let action = proto::WorkflowAction {
386            id: seq,
387            router: None,
388            workflow_action_type: Some(
389                proto::workflow_action::WorkflowActionType::CreateChildWorkflow(
390                    proto::CreateChildWorkflowAction {
391                        instance_id: sub_instance_id,
392                        name: name.to_string(),
393                        version: None,
394                        input: input_json,
395                        router,
396                        history_propagation_scope: history_propagation_scope
397                            .map(|s| s.to_proto() as i32),
398                    },
399                ),
400            ),
401        };
402        inner.pending_actions.push(action);
403
404        task
405    }
406
407    /// Schedule a sub-orchestration with options (instance ID, retry policy, app ID).
408    ///
409    /// Returns a future that drives the sub-orchestration to completion,
410    /// transparently scheduling durable timers and re-issuing the call on each retry.
411    ///
412    /// Note: when a retry policy is set and no explicit `instance_id` is given,
413    /// each retry uses a freshly generated instance ID.
414    pub fn call_sub_orchestrator_with_options(
415        &self,
416        name: &str,
417        input: impl Serialize,
418        options: SubOrchestratorOptions,
419    ) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
420        let input_json = match to_json(&input) {
421            Ok(json) => json,
422            Err(e) => return Box::pin(async move { Err(e) }),
423        };
424        let name = name.to_string();
425        let instance_id = options.instance_id.clone();
426        let app_id = options.app_id.clone();
427        let scope = options.history_propagation_scope;
428        let ctx = self.clone();
429
430        match options.retry_policy {
431            Some(policy) => {
432                let first_attempt_time = ctx.current_utc_datetime();
433                let schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync> =
434                    Arc::new(move |c: &OrchestrationContext| {
435                        c.call_sub_orchestrator_raw(
436                            &name,
437                            input_json.clone(),
438                            instance_id.as_deref(),
439                            app_id.as_deref(),
440                            scope,
441                        )
442                    });
443                call_with_retry(ctx, schedule, policy, 0, first_attempt_time)
444            }
445            None => {
446                let task = self.call_sub_orchestrator_raw(
447                    &name,
448                    input_json,
449                    instance_id.as_deref(),
450                    app_id.as_deref(),
451                    scope,
452                );
453                Box::pin(task)
454            }
455        }
456    }
457
458    /// Create a durable timer that fires after the specified duration.
459    pub fn create_timer(&self, delay: std::time::Duration) -> CompletableTask {
460        tracing::debug!(delay_ms = delay.as_millis() as u64, "Creating timer");
461        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
462        let seq = inner.sequence_number;
463        inner.sequence_number += 1;
464
465        if let Some(existing) = inner.pending_tasks.get(&seq) {
466            if existing.is_complete() {
467                return existing.clone();
468            }
469        }
470
471        let task = CompletableTask::new();
472        inner.pending_tasks.insert(seq, task.clone());
473
474        let fire_at = inner.current_utc_datetime
475            + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
476        let action = proto::WorkflowAction {
477            id: seq,
478            router: None,
479            workflow_action_type: Some(proto::workflow_action::WorkflowActionType::CreateTimer(
480                proto::CreateTimerAction {
481                    fire_at: Some(to_timestamp(fire_at)),
482                    name: None,
483                    origin: None,
484                },
485            )),
486        };
487        inner.pending_actions.push(action);
488
489        task
490    }
491
492    /// Wait for an external event with the given name.
493    ///
494    /// Event names are case-insensitive.
495    pub fn wait_for_external_event(&self, name: &str) -> CompletableTask {
496        tracing::debug!(event_name = %name, "Waiting for external event");
497        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
498        let event_name = name.to_lowercase();
499
500        if let Some(events) = inner.buffered_events.get_mut(&event_name) {
501            if !events.is_empty() {
502                let data = events.remove(0);
503                let task = CompletableTask::new();
504                task.complete(data);
505                return task;
506            }
507        }
508
509        let task = CompletableTask::new();
510        let max_pending = inner.max_pending_tasks_per_name;
511        let pending = inner.pending_event_tasks.entry(event_name).or_default();
512        if pending.len() >= max_pending {
513            tracing::warn!(event_name = %name, "Pending event task limit reached, discarding wait");
514            return task;
515        }
516        pending.push(task.clone());
517        task
518    }
519
520    /// Continue the orchestration as new with new input.
521    pub fn continue_as_new(&self, input: impl Serialize, save_events: bool) {
522        tracing::debug!(save_events = save_events, "Continuing orchestration as new");
523        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
524        inner.continue_as_new_input = to_json(&input).ok().flatten();
525        inner.save_events_on_continue = save_events;
526    }
527
528    /// Check whether a named patch should be applied in the current execution.
529    ///
530    /// This enables safe, deterministic code upgrades. Wrap new behaviour in
531    /// `if ctx.is_patched("my-patch")` to ensure that:
532    ///
533    /// - Replaying executions that previously ran the *unpatched* path continue
534    ///   on the unpatched path (preserving determinism).
535    /// - Executions that previously ran the *patched* path continue on the
536    ///   patched path.
537    /// - Brand-new executions (at the history frontier) always take the patched
538    ///   path.
539    ///
540    /// This matches the behaviour of the Go and Python SDKs.
541    pub fn is_patched(&self, patch_name: &str) -> bool {
542        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
543
544        // Return the cached decision from the current execution if available.
545        if let Some(&cached) = inner.applied_patches.get(patch_name) {
546            return cached;
547        }
548
549        // If this patch was recorded as applied in the history, honour it.
550        if inner.history_patches.contains(patch_name) {
551            inner.applied_patches.insert(patch_name.to_string(), true);
552            return true;
553        }
554
555        // If the orchestrator hasn't yet consumed all scheduled actions from
556        // history, this call is mid-replay.  The previous execution did NOT
557        // apply this patch, so we must stay on the unpatched path to preserve
558        // determinism.
559        if inner.sequence_number < inner.history_scheduled_count {
560            inner.applied_patches.insert(patch_name.to_string(), false);
561            return false;
562        }
563
564        // We're at (or past) the history frontier — apply the patch.
565        inner.applied_patches.insert(patch_name.to_string(), true);
566        true
567    }
568}
569
570// ── Retry helpers ─────────────────────────────────────────────────────────────
571
572/// Compute the delay before the next retry attempt, or `None` if the retry
573/// should not proceed (timeout exceeded or predicate returned false).
574fn compute_retry_delay(
575    policy: &RetryPolicy,
576    attempt: u32,
577    first_attempt_time: chrono::DateTime<chrono::Utc>,
578    current_time: chrono::DateTime<chrono::Utc>,
579    details: &FailureDetails,
580) -> Option<std::time::Duration> {
581    // Check custom predicate.
582    if let Some(ref handle) = policy.handle {
583        if !handle(details) {
584            return None;
585        }
586    }
587
588    // Check overall retry timeout.
589    if let Some(timeout) = policy.retry_timeout {
590        let elapsed = current_time - first_attempt_time;
591        let timeout_dur = chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
592        if elapsed >= timeout_dur {
593            return None;
594        }
595    }
596
597    // Exponential backoff.
598    let first_ms = policy.first_retry_interval.as_millis() as f64;
599    let next_ms = first_ms * policy.backoff_coefficient.powi(attempt as i32);
600
601    let delay_ms = if let Some(max) = policy.max_retry_interval {
602        next_ms.min(max.as_millis() as f64)
603    } else {
604        next_ms
605    };
606
607    Some(std::time::Duration::from_millis(delay_ms as u64))
608}
609
610/// Drive a task to completion, retrying on failure according to `policy`.
611///
612/// `schedule` is called once per attempt and must return a fresh [`CompletableTask`].
613/// Between attempts a durable timer is created for the computed backoff delay,
614/// preserving determinism across replays.
615fn call_with_retry(
616    ctx: OrchestrationContext,
617    schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync>,
618    policy: RetryPolicy,
619    attempt: u32,
620    first_attempt_time: chrono::DateTime<chrono::Utc>,
621) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
622    Box::pin(async move {
623        let task = schedule(&ctx);
624        match task.await {
625            Ok(v) => Ok(v),
626            Err(DurableTaskError::TaskFailed {
627                message,
628                failure_details,
629            }) => {
630                let details = failure_details.clone().unwrap_or_else(|| FailureDetails {
631                    message: message.clone(),
632                    error_type: "TaskFailed".to_string(),
633                    stack_trace: None,
634                });
635
636                if attempt + 1 >= policy.max_number_of_attempts {
637                    tracing::debug!(
638                        attempt,
639                        max = policy.max_number_of_attempts,
640                        "Max retry attempts reached"
641                    );
642                    return Err(DurableTaskError::TaskFailed {
643                        message,
644                        failure_details,
645                    });
646                }
647
648                let current_time = ctx.current_utc_datetime();
649                let delay = match compute_retry_delay(
650                    &policy,
651                    attempt,
652                    first_attempt_time,
653                    current_time,
654                    &details,
655                ) {
656                    Some(d) => d,
657                    None => {
658                        tracing::debug!(attempt, "Retry predicate or timeout prevented retry");
659                        return Err(DurableTaskError::TaskFailed {
660                            message,
661                            failure_details,
662                        });
663                    }
664                };
665
666                tracing::debug!(
667                    attempt,
668                    delay_ms = delay.as_millis(),
669                    "Scheduling retry timer"
670                );
671                ctx.create_timer(delay).await?;
672
673                call_with_retry(ctx, schedule, policy, attempt + 1, first_attempt_time).await
674            }
675            Err(e) => Err(e),
676        }
677    })
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683
684    fn make_ctx() -> OrchestrationContext {
685        OrchestrationContext::new(
686            "inst-1".to_string(),
687            "my_orch".to_string(),
688            Some("\"hello\"".to_string()),
689            chrono::Utc::now(),
690            false,
691            &crate::worker::WorkerOptions::default(),
692            0,
693        )
694    }
695
696    #[test]
697    fn test_basic_accessors() {
698        let ctx = make_ctx();
699        assert_eq!(ctx.instance_id(), "inst-1");
700        assert_eq!(ctx.name(), "my_orch");
701        assert!(!ctx.is_replaying());
702    }
703
704    #[test]
705    fn test_get_input() {
706        let ctx = make_ctx();
707        let input: String = ctx.get_input().unwrap();
708        assert_eq!(input, "hello");
709    }
710
711    #[test]
712    fn test_set_custom_status() {
713        let ctx = make_ctx();
714        ctx.set_custom_status("processing");
715        let inner = ctx.inner.lock().unwrap();
716        assert_eq!(inner.custom_status, Some("processing".to_string()));
717    }
718
719    #[test]
720    fn test_call_activity_creates_action() {
721        let ctx = make_ctx();
722        let _task = ctx.call_activity("greet", "world");
723
724        let inner = ctx.inner.lock().unwrap();
725        assert_eq!(inner.sequence_number, 1);
726        assert_eq!(inner.pending_actions.len(), 1);
727        assert_eq!(inner.pending_actions[0].id, 0);
728        match &inner.pending_actions[0].workflow_action_type {
729            Some(proto::workflow_action::WorkflowActionType::ScheduleTask(a)) => {
730                assert_eq!(a.name, "greet");
731                assert_eq!(a.input, Some("\"world\"".to_string()));
732            }
733            _ => panic!("expected ScheduleTask action"),
734        }
735    }
736
737    #[test]
738    fn test_call_activity_replay_returns_existing() {
739        let ctx = make_ctx();
740
741        // Pre-populate a completed task at sequence 0 (simulating replay)
742        {
743            let mut inner = ctx.inner.lock().unwrap();
744            let task = CompletableTask::new();
745            task.complete(Some("42".to_string()));
746            inner.pending_tasks.insert(0, task);
747        }
748
749        let task = ctx.call_activity("greet", "world");
750        assert!(task.is_complete());
751
752        let inner = ctx.inner.lock().unwrap();
753        assert_eq!(inner.pending_actions.len(), 0);
754    }
755
756    #[test]
757    fn test_call_sub_orchestrator() {
758        let ctx = make_ctx();
759        let _task = ctx.call_sub_orchestrator("child_orch", "input", Some("child-1"));
760
761        let inner = ctx.inner.lock().unwrap();
762        assert_eq!(inner.sequence_number, 1);
763        match &inner.pending_actions[0].workflow_action_type {
764            Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
765                assert_eq!(a.name, "child_orch");
766                assert_eq!(a.instance_id, "child-1");
767            }
768            _ => panic!("expected CreateChildWorkflow action"),
769        }
770    }
771
772    #[test]
773    fn test_create_timer() {
774        let ctx = make_ctx();
775        let _task = ctx.create_timer(std::time::Duration::from_secs(60));
776
777        let inner = ctx.inner.lock().unwrap();
778        assert_eq!(inner.sequence_number, 1);
779        match &inner.pending_actions[0].workflow_action_type {
780            Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => {
781                assert!(a.fire_at.is_some());
782            }
783            _ => panic!("expected CreateTimer action"),
784        }
785    }
786
787    #[test]
788    fn test_wait_for_external_event_buffered() {
789        let ctx = make_ctx();
790
791        // Buffer an event
792        {
793            let mut inner = ctx.inner.lock().unwrap();
794            inner
795                .buffered_events
796                .entry("approval".to_string())
797                .or_default()
798                .push(Some("\"yes\"".to_string()));
799        }
800
801        let task = ctx.wait_for_external_event("APPROVAL"); // case-insensitive
802        assert!(task.is_complete());
803    }
804
805    #[test]
806    fn test_wait_for_external_event_pending() {
807        let ctx = make_ctx();
808        let task = ctx.wait_for_external_event("approval");
809        assert!(!task.is_complete());
810
811        let inner = ctx.inner.lock().unwrap();
812        assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
813    }
814
815    #[test]
816    fn test_continue_as_new() {
817        let ctx = make_ctx();
818        ctx.continue_as_new("new_input", true);
819
820        let inner = ctx.inner.lock().unwrap();
821        assert_eq!(
822            inner.continue_as_new_input,
823            Some("\"new_input\"".to_string())
824        );
825        assert!(inner.save_events_on_continue);
826    }
827
828    #[test]
829    fn test_sequence_numbers_increment() {
830        let ctx = make_ctx();
831        let _t1 = ctx.call_activity("a", ());
832        let _t2 = ctx.call_activity("b", ());
833        let _t3 = ctx.create_timer(std::time::Duration::from_secs(1));
834
835        let inner = ctx.inner.lock().unwrap();
836        assert_eq!(inner.sequence_number, 3);
837        assert_eq!(inner.pending_actions[0].id, 0);
838        assert_eq!(inner.pending_actions[1].id, 1);
839        assert_eq!(inner.pending_actions[2].id, 2);
840    }
841
842    #[test]
843    fn test_call_sub_orchestrator_with_app_id() {
844        let ctx = make_ctx();
845        let _task = ctx.call_sub_orchestrator_with_app_id(
846            "child_orch",
847            "input",
848            Some("child-1"),
849            "other-app",
850        );
851
852        let inner = ctx.inner.lock().unwrap();
853        assert_eq!(inner.sequence_number, 1);
854        match &inner.pending_actions[0].workflow_action_type {
855            Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
856                assert_eq!(a.name, "child_orch");
857                assert_eq!(a.instance_id, "child-1");
858                let router = a.router.as_ref().expect("expected router");
859                assert_eq!(router.target_app_id, Some("other-app".to_string()));
860            }
861            _ => panic!("expected CreateChildWorkflow action"),
862        }
863    }
864
865    #[test]
866    fn test_is_patched_new_execution_returns_true() {
867        // No history → always at the frontier → patch applies.
868        let ctx = make_ctx();
869        assert!(ctx.is_patched("my-patch"));
870    }
871
872    #[test]
873    fn test_is_patched_in_history_returns_true() {
874        // Patch recorded in history → return true.
875        let ctx = make_ctx();
876        ctx.inner
877            .lock()
878            .unwrap()
879            .history_patches
880            .insert("my-patch".to_string());
881        assert!(ctx.is_patched("my-patch"));
882    }
883
884    #[test]
885    fn test_is_patched_mid_replay_returns_false() {
886        // history_scheduled_count = 2, but seq = 0 → mid-replay, unpatched.
887        let ctx = make_ctx();
888        ctx.inner.lock().unwrap().history_scheduled_count = 2;
889        assert!(!ctx.is_patched("my-patch"));
890    }
891
892    #[test]
893    fn test_is_patched_at_frontier_after_history_returns_true() {
894        // history_scheduled_count = 1, seq = 1 → at frontier.
895        let ctx = make_ctx();
896        {
897            let mut inner = ctx.inner.lock().unwrap();
898            inner.history_scheduled_count = 1;
899            inner.sequence_number = 1;
900        }
901        assert!(ctx.is_patched("my-patch"));
902    }
903
904    #[test]
905    fn test_is_patched_caches_decision() {
906        let ctx = make_ctx();
907        // First call caches the result.
908        assert!(ctx.is_patched("my-patch"));
909        // Second call uses the cache regardless of state changes.
910        ctx.inner.lock().unwrap().history_scheduled_count = 99;
911        assert!(ctx.is_patched("my-patch"));
912    }
913}