1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use futures::future::BoxFuture;
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7
8use crate::api::{
9 DurableTaskError, FailureDetails, HistoryPropagationScope, OrchestrationStatus,
10 PropagatedHistory, RetryPolicy,
11};
12use crate::internal::{to_json, to_timestamp};
13use crate::proto;
14
15use super::completable_task::CompletableTask;
16use super::options::{ActivityOptions, SubOrchestratorOptions};
17
18pub(crate) struct OrchestrationContextInner {
20 pub(crate) instance_id: String,
21 pub(crate) current_utc_datetime: chrono::DateTime<chrono::Utc>,
22 pub(crate) is_replaying: bool,
23 pub(crate) is_complete: bool,
24 pub(crate) input: Option<String>,
25 pub(crate) name: String,
26 pub(crate) custom_status: Option<String>,
27 pub(crate) sequence_number: i32,
28 pub(crate) pending_tasks: HashMap<i32, CompletableTask>,
29 pub(crate) pending_event_tasks: HashMap<String, Vec<CompletableTask>>,
30 pub(crate) buffered_events: HashMap<String, Vec<Option<String>>>,
31 pub(crate) pending_actions: Vec<proto::WorkflowAction>,
32 pub(crate) completion_status: Option<OrchestrationStatus>,
33 pub(crate) completion_result: Option<String>,
34 pub(crate) completion_failure: Option<FailureDetails>,
35 pub(crate) continue_as_new_input: Option<String>,
36 pub(crate) save_events_on_continue: bool,
37 pub(crate) is_suspended: bool,
38 pub(crate) max_event_names: usize,
39 pub(crate) max_events_per_name: usize,
40 pub(crate) max_pending_tasks_per_name: usize,
41 pub(crate) max_json_payload_size: usize,
42 pub(crate) history_patches: std::collections::HashSet<String>,
44 pub(crate) applied_patches: HashMap<String, bool>,
46 pub(crate) history_scheduled_count: i32,
50 pub(crate) propagated_history: Option<PropagatedHistory>,
53}
54
55#[derive(Clone)]
60pub struct OrchestrationContext {
61 pub(crate) inner: Arc<Mutex<OrchestrationContextInner>>,
62}
63
64impl OrchestrationContext {
65 pub(crate) fn new(
67 instance_id: String,
68 name: String,
69 input: Option<String>,
70 current_utc_datetime: chrono::DateTime<chrono::Utc>,
71 is_replaying: bool,
72 options: &crate::worker::WorkerOptions,
73 event_count_hint: usize,
74 ) -> Self {
75 Self {
76 inner: Arc::new(Mutex::new(OrchestrationContextInner {
77 instance_id,
78 current_utc_datetime,
79 is_replaying,
80 is_complete: false,
81 input,
82 name,
83 custom_status: None,
84 sequence_number: 0,
85 pending_tasks: HashMap::with_capacity(event_count_hint / 2),
86 pending_event_tasks: HashMap::new(),
87 buffered_events: HashMap::new(),
88 pending_actions: Vec::with_capacity(event_count_hint / 2),
89 completion_status: None,
90 completion_result: None,
91 completion_failure: None,
92 continue_as_new_input: None,
93 save_events_on_continue: false,
94 is_suspended: false,
95 max_event_names: options.max_event_names,
96 max_events_per_name: options.max_events_per_name,
97 max_pending_tasks_per_name: options.max_pending_tasks_per_name,
98 max_json_payload_size: options.max_json_payload_size,
99 history_patches: std::collections::HashSet::new(),
100 applied_patches: HashMap::new(),
101 history_scheduled_count: 0,
102 propagated_history: None,
103 })),
104 }
105 }
106
107 pub fn instance_id(&self) -> String {
109 self.inner
110 .lock()
111 .unwrap_or_else(|e| e.into_inner())
112 .instance_id
113 .clone()
114 }
115
116 pub fn current_utc_datetime(&self) -> chrono::DateTime<chrono::Utc> {
118 self.inner
119 .lock()
120 .unwrap_or_else(|e| e.into_inner())
121 .current_utc_datetime
122 }
123
124 pub fn is_replaying(&self) -> bool {
126 self.inner
127 .lock()
128 .unwrap_or_else(|e| e.into_inner())
129 .is_replaying
130 }
131
132 pub fn name(&self) -> String {
134 self.inner
135 .lock()
136 .unwrap_or_else(|e| e.into_inner())
137 .name
138 .clone()
139 }
140
141 pub fn get_input<T: DeserializeOwned>(&self) -> crate::api::Result<T> {
143 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
144 crate::internal::from_json(inner.input.as_deref(), inner.max_json_payload_size)
145 }
146
147 pub fn propagated_history(&self) -> Option<PropagatedHistory> {
153 self.inner
154 .lock()
155 .unwrap_or_else(|e| e.into_inner())
156 .propagated_history
157 .clone()
158 }
159
160 pub fn set_custom_status(&self, status: impl Into<String>) {
162 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
163 inner.custom_status = Some(status.into());
164 }
165
166 pub fn call_activity(&self, name: &str, input: impl Serialize) -> CompletableTask {
174 tracing::debug!(activity = %name, "Scheduling activity");
175 self.call_activity_inner(name, input, None, None)
176 }
177
178 pub fn call_activity_with_app_id(
180 &self,
181 name: &str,
182 input: impl Serialize,
183 app_id: &str,
184 ) -> CompletableTask {
185 tracing::debug!(activity = %name, app_id = %app_id, "Scheduling activity with app_id");
186 self.call_activity_inner(name, input, Some(app_id), None)
187 }
188
189 fn call_activity_inner(
190 &self,
191 name: &str,
192 input: impl Serialize,
193 app_id: Option<&str>,
194 history_propagation_scope: Option<HistoryPropagationScope>,
195 ) -> CompletableTask {
196 let input_json = match to_json(&input) {
197 Ok(json) => json,
198 Err(e) => {
199 let task = CompletableTask::new();
200 task.fail(FailureDetails {
201 message: format!("Failed to serialize activity input: {e}"),
202 error_type: "SerializationError".to_string(),
203 stack_trace: None,
204 });
205 return task;
206 }
207 };
208 self.call_activity_raw(name, input_json, app_id, history_propagation_scope)
209 }
210
211 fn call_activity_raw(
213 &self,
214 name: &str,
215 input_json: Option<String>,
216 app_id: Option<&str>,
217 history_propagation_scope: Option<HistoryPropagationScope>,
218 ) -> CompletableTask {
219 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
220 let seq = inner.sequence_number;
221 inner.sequence_number += 1;
222
223 if let Some(existing) = inner.pending_tasks.get(&seq) {
224 if existing.is_complete() {
225 return existing.clone();
226 }
227 }
228
229 let task = CompletableTask::new();
230 inner.pending_tasks.insert(seq, task.clone());
231
232 let router = app_id.map(|id| proto::TaskRouter {
233 source_app_id: String::new(),
234 target_app_id: Some(id.to_string()),
235 target_app_namespace: None,
236 });
237 let action = proto::WorkflowAction {
238 id: seq,
239 router: None,
240 workflow_action_type: Some(proto::workflow_action::WorkflowActionType::ScheduleTask(
241 proto::ScheduleTaskAction {
242 name: name.to_string(),
243 version: None,
244 input: input_json,
245 router,
246 task_execution_id: String::new(),
247 history_propagation_scope: history_propagation_scope
248 .map(|s| s.to_proto() as i32),
249 },
250 )),
251 };
252 inner.pending_actions.push(action);
253
254 task
255 }
256
257 pub fn call_activity_with_options(
262 &self,
263 name: &str,
264 input: impl Serialize,
265 options: ActivityOptions,
266 ) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
267 let input_json = match to_json(&input) {
268 Ok(json) => json,
269 Err(e) => return Box::pin(async move { Err(e) }),
270 };
271 let name = name.to_string();
272 let app_id = options.app_id.clone();
273 let scope = options.history_propagation_scope;
274 let ctx = self.clone();
275
276 match options.retry_policy {
277 Some(policy) => {
278 let first_attempt_time = ctx.current_utc_datetime();
279 let schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync> =
280 Arc::new(move |c: &OrchestrationContext| {
281 c.call_activity_raw(&name, input_json.clone(), app_id.as_deref(), scope)
282 });
283 call_with_retry(ctx, schedule, policy, 0, first_attempt_time)
284 }
285 None => {
286 let task = self.call_activity_raw(&name, input_json, app_id.as_deref(), scope);
287 Box::pin(task)
288 }
289 }
290 }
291
292 pub fn call_sub_orchestrator(
294 &self,
295 name: &str,
296 input: impl Serialize,
297 instance_id: Option<&str>,
298 ) -> CompletableTask {
299 tracing::debug!(
300 sub_orchestrator = %name,
301 sub_instance_id = ?instance_id,
302 "Scheduling sub-orchestration"
303 );
304 self.call_sub_orchestrator_inner(name, input, instance_id, None, None)
305 }
306
307 pub fn call_sub_orchestrator_with_app_id(
309 &self,
310 name: &str,
311 input: impl Serialize,
312 instance_id: Option<&str>,
313 app_id: &str,
314 ) -> CompletableTask {
315 tracing::debug!(
316 sub_orchestrator = %name,
317 sub_instance_id = ?instance_id,
318 app_id = %app_id,
319 "Scheduling sub-orchestration with app_id"
320 );
321 self.call_sub_orchestrator_inner(name, input, instance_id, Some(app_id), None)
322 }
323
324 fn call_sub_orchestrator_inner(
325 &self,
326 name: &str,
327 input: impl Serialize,
328 instance_id: Option<&str>,
329 app_id: Option<&str>,
330 history_propagation_scope: Option<HistoryPropagationScope>,
331 ) -> CompletableTask {
332 let input_json = match to_json(&input) {
333 Ok(json) => json,
334 Err(e) => {
335 let task = CompletableTask::new();
336 task.fail(FailureDetails {
337 message: format!("Failed to serialize sub-orchestrator input: {e}"),
338 error_type: "SerializationError".to_string(),
339 stack_trace: None,
340 });
341 return task;
342 }
343 };
344 self.call_sub_orchestrator_raw(
345 name,
346 input_json,
347 instance_id,
348 app_id,
349 history_propagation_scope,
350 )
351 }
352
353 fn call_sub_orchestrator_raw(
355 &self,
356 name: &str,
357 input_json: Option<String>,
358 instance_id: Option<&str>,
359 app_id: Option<&str>,
360 history_propagation_scope: Option<HistoryPropagationScope>,
361 ) -> CompletableTask {
362 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
363 let seq = inner.sequence_number;
364 inner.sequence_number += 1;
365
366 if let Some(existing) = inner.pending_tasks.get(&seq) {
367 if existing.is_complete() {
368 return existing.clone();
369 }
370 }
371
372 let task = CompletableTask::new();
373 inner.pending_tasks.insert(seq, task.clone());
374
375 let sub_instance_id = instance_id
376 .map(|s| s.to_string())
377 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
378
379 let router = app_id.map(|id| proto::TaskRouter {
380 source_app_id: String::new(),
381 target_app_id: Some(id.to_string()),
382 target_app_namespace: None,
383 });
384
385 let action = proto::WorkflowAction {
386 id: seq,
387 router: None,
388 workflow_action_type: Some(
389 proto::workflow_action::WorkflowActionType::CreateChildWorkflow(
390 proto::CreateChildWorkflowAction {
391 instance_id: sub_instance_id,
392 name: name.to_string(),
393 version: None,
394 input: input_json,
395 router,
396 history_propagation_scope: history_propagation_scope
397 .map(|s| s.to_proto() as i32),
398 },
399 ),
400 ),
401 };
402 inner.pending_actions.push(action);
403
404 task
405 }
406
407 pub fn call_sub_orchestrator_with_options(
415 &self,
416 name: &str,
417 input: impl Serialize,
418 options: SubOrchestratorOptions,
419 ) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
420 let input_json = match to_json(&input) {
421 Ok(json) => json,
422 Err(e) => return Box::pin(async move { Err(e) }),
423 };
424 let name = name.to_string();
425 let instance_id = options.instance_id.clone();
426 let app_id = options.app_id.clone();
427 let scope = options.history_propagation_scope;
428 let ctx = self.clone();
429
430 match options.retry_policy {
431 Some(policy) => {
432 let first_attempt_time = ctx.current_utc_datetime();
433 let schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync> =
434 Arc::new(move |c: &OrchestrationContext| {
435 c.call_sub_orchestrator_raw(
436 &name,
437 input_json.clone(),
438 instance_id.as_deref(),
439 app_id.as_deref(),
440 scope,
441 )
442 });
443 call_with_retry(ctx, schedule, policy, 0, first_attempt_time)
444 }
445 None => {
446 let task = self.call_sub_orchestrator_raw(
447 &name,
448 input_json,
449 instance_id.as_deref(),
450 app_id.as_deref(),
451 scope,
452 );
453 Box::pin(task)
454 }
455 }
456 }
457
458 pub fn create_timer(&self, delay: std::time::Duration) -> CompletableTask {
460 tracing::debug!(delay_ms = delay.as_millis() as u64, "Creating timer");
461 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
462 let seq = inner.sequence_number;
463 inner.sequence_number += 1;
464
465 if let Some(existing) = inner.pending_tasks.get(&seq) {
466 if existing.is_complete() {
467 return existing.clone();
468 }
469 }
470
471 let task = CompletableTask::new();
472 inner.pending_tasks.insert(seq, task.clone());
473
474 let fire_at = inner.current_utc_datetime
475 + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
476 let action = proto::WorkflowAction {
477 id: seq,
478 router: None,
479 workflow_action_type: Some(proto::workflow_action::WorkflowActionType::CreateTimer(
480 proto::CreateTimerAction {
481 fire_at: Some(to_timestamp(fire_at)),
482 name: None,
483 origin: None,
484 },
485 )),
486 };
487 inner.pending_actions.push(action);
488
489 task
490 }
491
492 pub fn wait_for_external_event(&self, name: &str) -> CompletableTask {
496 tracing::debug!(event_name = %name, "Waiting for external event");
497 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
498 let event_name = name.to_lowercase();
499
500 if let Some(events) = inner.buffered_events.get_mut(&event_name) {
501 if !events.is_empty() {
502 let data = events.remove(0);
503 let task = CompletableTask::new();
504 task.complete(data);
505 return task;
506 }
507 }
508
509 let task = CompletableTask::new();
510 let max_pending = inner.max_pending_tasks_per_name;
511 let pending = inner.pending_event_tasks.entry(event_name).or_default();
512 if pending.len() >= max_pending {
513 tracing::warn!(event_name = %name, "Pending event task limit reached, discarding wait");
514 return task;
515 }
516 pending.push(task.clone());
517 task
518 }
519
520 pub fn continue_as_new(&self, input: impl Serialize, save_events: bool) {
522 tracing::debug!(save_events = save_events, "Continuing orchestration as new");
523 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
524 inner.continue_as_new_input = to_json(&input).ok().flatten();
525 inner.save_events_on_continue = save_events;
526 }
527
528 pub fn is_patched(&self, patch_name: &str) -> bool {
542 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
543
544 if let Some(&cached) = inner.applied_patches.get(patch_name) {
546 return cached;
547 }
548
549 if inner.history_patches.contains(patch_name) {
551 inner.applied_patches.insert(patch_name.to_string(), true);
552 return true;
553 }
554
555 if inner.sequence_number < inner.history_scheduled_count {
560 inner.applied_patches.insert(patch_name.to_string(), false);
561 return false;
562 }
563
564 inner.applied_patches.insert(patch_name.to_string(), true);
566 true
567 }
568}
569
570fn compute_retry_delay(
575 policy: &RetryPolicy,
576 attempt: u32,
577 first_attempt_time: chrono::DateTime<chrono::Utc>,
578 current_time: chrono::DateTime<chrono::Utc>,
579 details: &FailureDetails,
580) -> Option<std::time::Duration> {
581 if let Some(ref handle) = policy.handle {
583 if !handle(details) {
584 return None;
585 }
586 }
587
588 if let Some(timeout) = policy.retry_timeout {
590 let elapsed = current_time - first_attempt_time;
591 let timeout_dur = chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
592 if elapsed >= timeout_dur {
593 return None;
594 }
595 }
596
597 let first_ms = policy.first_retry_interval.as_millis() as f64;
599 let next_ms = first_ms * policy.backoff_coefficient.powi(attempt as i32);
600
601 let delay_ms = if let Some(max) = policy.max_retry_interval {
602 next_ms.min(max.as_millis() as f64)
603 } else {
604 next_ms
605 };
606
607 Some(std::time::Duration::from_millis(delay_ms as u64))
608}
609
610fn call_with_retry(
616 ctx: OrchestrationContext,
617 schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync>,
618 policy: RetryPolicy,
619 attempt: u32,
620 first_attempt_time: chrono::DateTime<chrono::Utc>,
621) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
622 Box::pin(async move {
623 let task = schedule(&ctx);
624 match task.await {
625 Ok(v) => Ok(v),
626 Err(DurableTaskError::TaskFailed {
627 message,
628 failure_details,
629 }) => {
630 let details = failure_details.clone().unwrap_or_else(|| FailureDetails {
631 message: message.clone(),
632 error_type: "TaskFailed".to_string(),
633 stack_trace: None,
634 });
635
636 if attempt + 1 >= policy.max_number_of_attempts {
637 tracing::debug!(
638 attempt,
639 max = policy.max_number_of_attempts,
640 "Max retry attempts reached"
641 );
642 return Err(DurableTaskError::TaskFailed {
643 message,
644 failure_details,
645 });
646 }
647
648 let current_time = ctx.current_utc_datetime();
649 let delay = match compute_retry_delay(
650 &policy,
651 attempt,
652 first_attempt_time,
653 current_time,
654 &details,
655 ) {
656 Some(d) => d,
657 None => {
658 tracing::debug!(attempt, "Retry predicate or timeout prevented retry");
659 return Err(DurableTaskError::TaskFailed {
660 message,
661 failure_details,
662 });
663 }
664 };
665
666 tracing::debug!(
667 attempt,
668 delay_ms = delay.as_millis(),
669 "Scheduling retry timer"
670 );
671 ctx.create_timer(delay).await?;
672
673 call_with_retry(ctx, schedule, policy, attempt + 1, first_attempt_time).await
674 }
675 Err(e) => Err(e),
676 }
677 })
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683
684 fn make_ctx() -> OrchestrationContext {
685 OrchestrationContext::new(
686 "inst-1".to_string(),
687 "my_orch".to_string(),
688 Some("\"hello\"".to_string()),
689 chrono::Utc::now(),
690 false,
691 &crate::worker::WorkerOptions::default(),
692 0,
693 )
694 }
695
696 #[test]
697 fn test_basic_accessors() {
698 let ctx = make_ctx();
699 assert_eq!(ctx.instance_id(), "inst-1");
700 assert_eq!(ctx.name(), "my_orch");
701 assert!(!ctx.is_replaying());
702 }
703
704 #[test]
705 fn test_get_input() {
706 let ctx = make_ctx();
707 let input: String = ctx.get_input().unwrap();
708 assert_eq!(input, "hello");
709 }
710
711 #[test]
712 fn test_set_custom_status() {
713 let ctx = make_ctx();
714 ctx.set_custom_status("processing");
715 let inner = ctx.inner.lock().unwrap();
716 assert_eq!(inner.custom_status, Some("processing".to_string()));
717 }
718
719 #[test]
720 fn test_call_activity_creates_action() {
721 let ctx = make_ctx();
722 let _task = ctx.call_activity("greet", "world");
723
724 let inner = ctx.inner.lock().unwrap();
725 assert_eq!(inner.sequence_number, 1);
726 assert_eq!(inner.pending_actions.len(), 1);
727 assert_eq!(inner.pending_actions[0].id, 0);
728 match &inner.pending_actions[0].workflow_action_type {
729 Some(proto::workflow_action::WorkflowActionType::ScheduleTask(a)) => {
730 assert_eq!(a.name, "greet");
731 assert_eq!(a.input, Some("\"world\"".to_string()));
732 }
733 _ => panic!("expected ScheduleTask action"),
734 }
735 }
736
737 #[test]
738 fn test_call_activity_replay_returns_existing() {
739 let ctx = make_ctx();
740
741 {
743 let mut inner = ctx.inner.lock().unwrap();
744 let task = CompletableTask::new();
745 task.complete(Some("42".to_string()));
746 inner.pending_tasks.insert(0, task);
747 }
748
749 let task = ctx.call_activity("greet", "world");
750 assert!(task.is_complete());
751
752 let inner = ctx.inner.lock().unwrap();
753 assert_eq!(inner.pending_actions.len(), 0);
754 }
755
756 #[test]
757 fn test_call_sub_orchestrator() {
758 let ctx = make_ctx();
759 let _task = ctx.call_sub_orchestrator("child_orch", "input", Some("child-1"));
760
761 let inner = ctx.inner.lock().unwrap();
762 assert_eq!(inner.sequence_number, 1);
763 match &inner.pending_actions[0].workflow_action_type {
764 Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
765 assert_eq!(a.name, "child_orch");
766 assert_eq!(a.instance_id, "child-1");
767 }
768 _ => panic!("expected CreateChildWorkflow action"),
769 }
770 }
771
772 #[test]
773 fn test_create_timer() {
774 let ctx = make_ctx();
775 let _task = ctx.create_timer(std::time::Duration::from_secs(60));
776
777 let inner = ctx.inner.lock().unwrap();
778 assert_eq!(inner.sequence_number, 1);
779 match &inner.pending_actions[0].workflow_action_type {
780 Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => {
781 assert!(a.fire_at.is_some());
782 }
783 _ => panic!("expected CreateTimer action"),
784 }
785 }
786
787 #[test]
788 fn test_wait_for_external_event_buffered() {
789 let ctx = make_ctx();
790
791 {
793 let mut inner = ctx.inner.lock().unwrap();
794 inner
795 .buffered_events
796 .entry("approval".to_string())
797 .or_default()
798 .push(Some("\"yes\"".to_string()));
799 }
800
801 let task = ctx.wait_for_external_event("APPROVAL"); assert!(task.is_complete());
803 }
804
805 #[test]
806 fn test_wait_for_external_event_pending() {
807 let ctx = make_ctx();
808 let task = ctx.wait_for_external_event("approval");
809 assert!(!task.is_complete());
810
811 let inner = ctx.inner.lock().unwrap();
812 assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
813 }
814
815 #[test]
816 fn test_continue_as_new() {
817 let ctx = make_ctx();
818 ctx.continue_as_new("new_input", true);
819
820 let inner = ctx.inner.lock().unwrap();
821 assert_eq!(
822 inner.continue_as_new_input,
823 Some("\"new_input\"".to_string())
824 );
825 assert!(inner.save_events_on_continue);
826 }
827
828 #[test]
829 fn test_sequence_numbers_increment() {
830 let ctx = make_ctx();
831 let _t1 = ctx.call_activity("a", ());
832 let _t2 = ctx.call_activity("b", ());
833 let _t3 = ctx.create_timer(std::time::Duration::from_secs(1));
834
835 let inner = ctx.inner.lock().unwrap();
836 assert_eq!(inner.sequence_number, 3);
837 assert_eq!(inner.pending_actions[0].id, 0);
838 assert_eq!(inner.pending_actions[1].id, 1);
839 assert_eq!(inner.pending_actions[2].id, 2);
840 }
841
842 #[test]
843 fn test_call_sub_orchestrator_with_app_id() {
844 let ctx = make_ctx();
845 let _task = ctx.call_sub_orchestrator_with_app_id(
846 "child_orch",
847 "input",
848 Some("child-1"),
849 "other-app",
850 );
851
852 let inner = ctx.inner.lock().unwrap();
853 assert_eq!(inner.sequence_number, 1);
854 match &inner.pending_actions[0].workflow_action_type {
855 Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
856 assert_eq!(a.name, "child_orch");
857 assert_eq!(a.instance_id, "child-1");
858 let router = a.router.as_ref().expect("expected router");
859 assert_eq!(router.target_app_id, Some("other-app".to_string()));
860 }
861 _ => panic!("expected CreateChildWorkflow action"),
862 }
863 }
864
865 #[test]
866 fn test_is_patched_new_execution_returns_true() {
867 let ctx = make_ctx();
869 assert!(ctx.is_patched("my-patch"));
870 }
871
872 #[test]
873 fn test_is_patched_in_history_returns_true() {
874 let ctx = make_ctx();
876 ctx.inner
877 .lock()
878 .unwrap()
879 .history_patches
880 .insert("my-patch".to_string());
881 assert!(ctx.is_patched("my-patch"));
882 }
883
884 #[test]
885 fn test_is_patched_mid_replay_returns_false() {
886 let ctx = make_ctx();
888 ctx.inner.lock().unwrap().history_scheduled_count = 2;
889 assert!(!ctx.is_patched("my-patch"));
890 }
891
892 #[test]
893 fn test_is_patched_at_frontier_after_history_returns_true() {
894 let ctx = make_ctx();
896 {
897 let mut inner = ctx.inner.lock().unwrap();
898 inner.history_scheduled_count = 1;
899 inner.sequence_number = 1;
900 }
901 assert!(ctx.is_patched("my-patch"));
902 }
903
904 #[test]
905 fn test_is_patched_caches_decision() {
906 let ctx = make_ctx();
907 assert!(ctx.is_patched("my-patch"));
909 ctx.inner.lock().unwrap().history_scheduled_count = 99;
911 assert!(ctx.is_patched("my-patch"));
912 }
913}