1use std::collections::{HashMap, VecDeque};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex, MutexGuard};
4
5use futures::future::BoxFuture;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9use crate::api::{
10 DurableTaskError, FailureDetails, HistoryPropagationScope, OrchestrationStatus,
11 PropagatedHistory, RetryPolicy,
12};
13use crate::internal::{to_json, to_timestamp};
14use crate::proto;
15
16use super::completable_task::CompletableTask;
17use super::options::{ActivityOptions, SubOrchestratorOptions};
18
19pub(crate) fn lock_inner<T>(m: &Mutex<T>) -> MutexGuard<'_, T> {
20 m.lock().unwrap_or_else(|e| e.into_inner())
21}
22
23#[derive(Debug)]
24pub(crate) struct ContextConfig {
25 pub(crate) max_event_names: usize,
26 pub(crate) max_events_per_name: usize,
27 pub(crate) max_pending_tasks_per_name: usize,
28 pub(crate) max_json_payload_size: usize,
29}
30
31pub(crate) struct OrchestrationContextInner {
33 pub(crate) config: Arc<ContextConfig>,
34 pub(crate) instance_id: Arc<str>,
35 pub(crate) current_utc_datetime: chrono::DateTime<chrono::Utc>,
36 pub(crate) is_replaying: Arc<AtomicBool>,
37 pub(crate) is_complete: bool,
38 pub(crate) input: Option<String>,
39 pub(crate) name: Arc<str>,
40 pub(crate) custom_status: Option<String>,
41 pub(crate) sequence_number: i32,
42 pub(crate) pending_tasks: HashMap<i32, CompletableTask>,
43 pub(crate) pending_event_tasks: HashMap<String, VecDeque<CompletableTask>>,
44 pub(crate) buffered_events: HashMap<String, VecDeque<(Option<String>, bool)>>,
47 pub(crate) pending_actions: Vec<proto::WorkflowAction>,
48 pub(crate) completion_status: Option<OrchestrationStatus>,
49 pub(crate) completion_result: Option<String>,
50 pub(crate) completion_failure: Option<FailureDetails>,
51 pub(crate) continue_as_new_input: Option<String>,
52 pub(crate) save_events_on_continue: bool,
53 pub(crate) is_suspended: bool,
54 pub(crate) history_patches: std::collections::HashSet<String>,
56 pub(crate) applied_patches: HashMap<String, bool>,
58 pub(crate) history_scheduled_count: i32,
62 pub(crate) propagated_history: Option<Arc<PropagatedHistory>>,
65}
66
67#[derive(Clone)]
72pub struct OrchestrationContext {
73 pub(crate) inner: Arc<Mutex<OrchestrationContextInner>>,
74}
75
76impl OrchestrationContext {
77 pub(crate) fn new(
79 instance_id: String,
80 name: String,
81 input: Option<String>,
82 current_utc_datetime: chrono::DateTime<chrono::Utc>,
83 is_replaying: bool,
84 options: &crate::worker::WorkerOptions,
85 event_count_hint: usize,
86 ) -> Self {
87 let config = Arc::new(ContextConfig {
88 max_event_names: options.max_event_names,
89 max_events_per_name: options.max_events_per_name,
90 max_pending_tasks_per_name: options.max_pending_tasks_per_name,
91 max_json_payload_size: options.max_json_payload_size,
92 });
93
94 Self {
95 inner: Arc::new(Mutex::new(OrchestrationContextInner {
96 config,
97 instance_id: Arc::<str>::from(instance_id),
98 current_utc_datetime,
99 is_replaying: Arc::new(AtomicBool::new(is_replaying)),
100 is_complete: false,
101 input,
102 name: Arc::<str>::from(name),
103 custom_status: None,
104 sequence_number: 0,
105 pending_tasks: HashMap::with_capacity(event_count_hint / 2),
106 pending_event_tasks: HashMap::new(),
107 buffered_events: HashMap::new(),
108 pending_actions: Vec::with_capacity(event_count_hint / 2),
109 completion_status: None,
110 completion_result: None,
111 completion_failure: None,
112 continue_as_new_input: None,
113 save_events_on_continue: false,
114 is_suspended: false,
115 history_patches: std::collections::HashSet::new(),
116 applied_patches: HashMap::new(),
117 history_scheduled_count: 0,
118 propagated_history: None,
119 })),
120 }
121 }
122
123 pub fn instance_id(&self) -> Arc<str> {
125 lock_inner(&self.inner).instance_id.clone()
126 }
127
128 pub fn current_utc_datetime(&self) -> chrono::DateTime<chrono::Utc> {
130 lock_inner(&self.inner).current_utc_datetime
131 }
132
133 pub fn is_replaying(&self) -> bool {
135 lock_inner(&self.inner).is_replaying.load(Ordering::Acquire)
136 }
137
138 pub fn name(&self) -> Arc<str> {
140 lock_inner(&self.inner).name.clone()
141 }
142
143 pub fn input<T: DeserializeOwned>(&self) -> crate::api::Result<T> {
145 let inner = lock_inner(&self.inner);
146 crate::internal::from_json(inner.input.as_deref(), inner.config.max_json_payload_size)
147 }
148
149 pub fn propagated_history(&self) -> Option<Arc<PropagatedHistory>> {
155 lock_inner(&self.inner).propagated_history.clone()
156 }
157
158 pub fn set_custom_status(&self, status: impl Into<String>) {
160 let mut inner = lock_inner(&self.inner);
161 inner.custom_status = Some(status.into());
162 }
163
164 pub fn call_activity(&self, name: &str, input: impl Serialize) -> CompletableTask {
172 tracing::debug!(activity = %name, "Scheduling activity");
173 self.call_activity_inner(name, input, None, None)
174 }
175
176 pub fn call_activity_with_app_id(
178 &self,
179 name: &str,
180 input: impl Serialize,
181 app_id: &str,
182 ) -> CompletableTask {
183 tracing::debug!(activity = %name, app_id = %app_id, "Scheduling activity with app_id");
184 self.call_activity_inner(name, input, Some(app_id), None)
185 }
186
187 fn call_activity_inner(
188 &self,
189 name: &str,
190 input: impl Serialize,
191 app_id: Option<&str>,
192 history_propagation_scope: Option<HistoryPropagationScope>,
193 ) -> CompletableTask {
194 let input_json = match to_json(&input) {
195 Ok(json) => json,
196 Err(e) => {
197 let task = CompletableTask::new();
198 task.fail(FailureDetails {
199 message: format!("Failed to serialize activity input: {e}"),
200 error_type: "SerializationError".to_string(),
201 stack_trace: None,
202 });
203 return task;
204 }
205 };
206 self.call_activity_raw(name, input_json, app_id, history_propagation_scope)
207 }
208
209 fn call_activity_raw(
211 &self,
212 name: &str,
213 input_json: Option<String>,
214 app_id: Option<&str>,
215 history_propagation_scope: Option<HistoryPropagationScope>,
216 ) -> CompletableTask {
217 let mut inner = lock_inner(&self.inner);
218 let seq = inner.sequence_number;
219 inner.sequence_number += 1;
220
221 if let Some(existing) = inner.pending_tasks.get(&seq)
222 && existing.is_complete()
223 {
224 return existing.clone();
225 }
226
227 let task = CompletableTask::new();
228 task.set_replay_handle(inner.is_replaying.clone());
229 inner.pending_tasks.insert(seq, task.clone());
230
231 let router = app_id.map(|id| proto::TaskRouter {
232 source_app_id: String::new(),
233 target_app_id: Some(id.to_string()),
234 target_app_namespace: None,
235 });
236 let action = proto::WorkflowAction {
237 id: seq,
238 router: None,
239 workflow_action_type: Some(proto::workflow_action::WorkflowActionType::ScheduleTask(
240 proto::ScheduleTaskAction {
241 name: name.to_string(),
242 version: None,
243 input: input_json,
244 router,
245 task_execution_id: String::new(),
246 history_propagation_scope: history_propagation_scope
247 .map(|s| s.to_proto() as i32),
248 },
249 )),
250 };
251 inner.pending_actions.push(action);
252
253 task
254 }
255
256 pub fn call_activity_with_options(
261 &self,
262 name: &str,
263 input: impl Serialize,
264 options: ActivityOptions,
265 ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
266 {
267 let input_json = to_json(&input);
268 let name = name.to_string();
269 let app_id = options.app_id.clone();
270 let scope = options.history_propagation_scope;
271 let ctx = self.clone();
272
273 async move {
274 let input_json = input_json?;
275 match options.retry_policy {
276 Some(policy) => {
277 let first_attempt_time = ctx.current_utc_datetime();
278 let schedule: Arc<
279 dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
280 > = Arc::new(move |c: &OrchestrationContext| {
281 c.call_activity_raw(&name, input_json.clone(), app_id.as_deref(), scope)
282 });
283 call_with_retry(ctx, schedule, policy, first_attempt_time).await
284 }
285 None => {
286 ctx.call_activity_raw(&name, input_json, app_id.as_deref(), scope)
287 .await
288 }
289 }
290 }
291 }
292
293 pub fn call_sub_orchestrator(
295 &self,
296 name: &str,
297 input: impl Serialize,
298 instance_id: Option<&str>,
299 ) -> CompletableTask {
300 tracing::debug!(
301 sub_orchestrator = %name,
302 sub_instance_id = ?instance_id,
303 "Scheduling sub-orchestration"
304 );
305 self.call_sub_orchestrator_inner(name, input, instance_id, None, None)
306 }
307
308 pub fn call_sub_orchestrator_with_app_id(
310 &self,
311 name: &str,
312 input: impl Serialize,
313 instance_id: Option<&str>,
314 app_id: &str,
315 ) -> CompletableTask {
316 tracing::debug!(
317 sub_orchestrator = %name,
318 sub_instance_id = ?instance_id,
319 app_id = %app_id,
320 "Scheduling sub-orchestration with app_id"
321 );
322 self.call_sub_orchestrator_inner(name, input, instance_id, Some(app_id), None)
323 }
324
325 fn call_sub_orchestrator_inner(
326 &self,
327 name: &str,
328 input: impl Serialize,
329 instance_id: Option<&str>,
330 app_id: Option<&str>,
331 history_propagation_scope: Option<HistoryPropagationScope>,
332 ) -> CompletableTask {
333 let input_json = match to_json(&input) {
334 Ok(json) => json,
335 Err(e) => {
336 let task = CompletableTask::new();
337 task.fail(FailureDetails {
338 message: format!("Failed to serialize sub-orchestrator input: {e}"),
339 error_type: "SerializationError".to_string(),
340 stack_trace: None,
341 });
342 return task;
343 }
344 };
345 self.call_sub_orchestrator_raw(
346 name,
347 input_json,
348 instance_id,
349 app_id,
350 history_propagation_scope,
351 )
352 }
353
354 fn call_sub_orchestrator_raw(
356 &self,
357 name: &str,
358 input_json: Option<String>,
359 instance_id: Option<&str>,
360 app_id: Option<&str>,
361 history_propagation_scope: Option<HistoryPropagationScope>,
362 ) -> CompletableTask {
363 let mut inner = lock_inner(&self.inner);
364 let seq = inner.sequence_number;
365 inner.sequence_number += 1;
366
367 if let Some(existing) = inner.pending_tasks.get(&seq)
368 && existing.is_complete()
369 {
370 return existing.clone();
371 }
372
373 let task = CompletableTask::new();
374 task.set_replay_handle(inner.is_replaying.clone());
375 inner.pending_tasks.insert(seq, task.clone());
376
377 let sub_instance_id = instance_id
378 .map(|s| s.to_string())
379 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
380
381 let router = app_id.map(|id| proto::TaskRouter {
382 source_app_id: String::new(),
383 target_app_id: Some(id.to_string()),
384 target_app_namespace: None,
385 });
386
387 let action = proto::WorkflowAction {
388 id: seq,
389 router: None,
390 workflow_action_type: Some(
391 proto::workflow_action::WorkflowActionType::CreateChildWorkflow(
392 proto::CreateChildWorkflowAction {
393 instance_id: sub_instance_id,
394 name: name.to_string(),
395 version: None,
396 input: input_json,
397 router,
398 history_propagation_scope: history_propagation_scope
399 .map(|s| s.to_proto() as i32),
400 },
401 ),
402 ),
403 };
404 inner.pending_actions.push(action);
405
406 task
407 }
408
409 pub fn call_sub_orchestrator_with_options(
417 &self,
418 name: &str,
419 input: impl Serialize,
420 options: SubOrchestratorOptions,
421 ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
422 {
423 let input_json = to_json(&input);
424 let name = name.to_string();
425 let instance_id = options.instance_id.clone();
426 let app_id = options.app_id.clone();
427 let scope = options.history_propagation_scope;
428 let ctx = self.clone();
429
430 async move {
431 let input_json = input_json?;
432 match options.retry_policy {
433 Some(policy) => {
434 let first_attempt_time = ctx.current_utc_datetime();
435 let schedule: Arc<
436 dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
437 > = Arc::new(move |c: &OrchestrationContext| {
438 c.call_sub_orchestrator_raw(
439 &name,
440 input_json.clone(),
441 instance_id.as_deref(),
442 app_id.as_deref(),
443 scope,
444 )
445 });
446 call_with_retry(ctx, schedule, policy, first_attempt_time).await
447 }
448 None => {
449 ctx.call_sub_orchestrator_raw(
450 &name,
451 input_json,
452 instance_id.as_deref(),
453 app_id.as_deref(),
454 scope,
455 )
456 .await
457 }
458 }
459 }
460 }
461
462 pub fn create_timer(&self, delay: std::time::Duration) -> CompletableTask {
464 tracing::debug!(delay_ms = delay.as_millis() as u64, "Creating timer");
465 let mut inner = lock_inner(&self.inner);
466 let seq = inner.sequence_number;
467 inner.sequence_number += 1;
468
469 if let Some(existing) = inner.pending_tasks.get(&seq)
470 && existing.is_complete()
471 {
472 return existing.clone();
473 }
474
475 let task = CompletableTask::new();
476 task.set_replay_handle(inner.is_replaying.clone());
477 inner.pending_tasks.insert(seq, task.clone());
478
479 let fire_at = inner.current_utc_datetime
480 + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
481 let action = proto::WorkflowAction {
482 id: seq,
483 router: None,
484 workflow_action_type: Some(proto::workflow_action::WorkflowActionType::CreateTimer(
485 proto::CreateTimerAction {
486 fire_at: Some(to_timestamp(fire_at)),
487 name: None,
488 origin: None,
489 },
490 )),
491 };
492 inner.pending_actions.push(action);
493
494 task
495 }
496
497 pub fn wait_for_external_event(&self, name: &str) -> CompletableTask {
501 tracing::debug!(event_name = %name, "Waiting for external event");
502 let mut inner = lock_inner(&self.inner);
503 let event_name = name.to_lowercase();
504
505 if let Some(events) = inner.buffered_events.get_mut(&event_name)
506 && !events.is_empty()
507 {
508 let (data, during_replay) = events
509 .pop_front()
510 .expect("buffered event queue is not empty");
511 let task = CompletableTask::new();
512 task.set_replay_handle(inner.is_replaying.clone());
513 task.complete_with_phase(data, during_replay);
514 return task;
515 }
516
517 let task = CompletableTask::new();
518 task.set_replay_handle(inner.is_replaying.clone());
519 let max_pending = inner.config.max_pending_tasks_per_name;
520 let pending = inner.pending_event_tasks.entry(event_name).or_default();
521 if pending.len() >= max_pending {
522 tracing::warn!(event_name = %name, "Pending event task limit reached, discarding wait");
523 return task;
524 }
525 pending.push_back(task.clone());
526 task
527 }
528
529 pub fn continue_as_new(&self, input: impl Serialize, save_events: bool) {
531 tracing::debug!(save_events = save_events, "Continuing orchestration as new");
532 let mut inner = lock_inner(&self.inner);
533 inner.continue_as_new_input = to_json(&input).ok().flatten();
534 inner.save_events_on_continue = save_events;
535 }
536
537 pub fn is_patched(&self, patch_name: &str) -> bool {
551 let mut inner = lock_inner(&self.inner);
552
553 if let Some(&cached) = inner.applied_patches.get(patch_name) {
555 return cached;
556 }
557
558 if inner.history_patches.contains(patch_name) {
560 inner.applied_patches.insert(patch_name.to_string(), true);
561 return true;
562 }
563
564 if inner.sequence_number < inner.history_scheduled_count {
569 inner.applied_patches.insert(patch_name.to_string(), false);
570 return false;
571 }
572
573 inner.applied_patches.insert(patch_name.to_string(), true);
575 true
576 }
577}
578
579fn compute_retry_delay(
584 policy: &RetryPolicy,
585 attempt: u32,
586 first_attempt_time: chrono::DateTime<chrono::Utc>,
587 current_time: chrono::DateTime<chrono::Utc>,
588 details: &FailureDetails,
589) -> Option<std::time::Duration> {
590 if let Some(ref handle) = policy.handle
592 && !handle(details)
593 {
594 return None;
595 }
596
597 if let Some(timeout) = policy.retry_timeout {
599 let elapsed = current_time - first_attempt_time;
600 let timeout_dur = chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
601 if elapsed >= timeout_dur {
602 return None;
603 }
604 }
605
606 let first_ms = policy.first_retry_interval.as_millis() as f64;
608 let next_ms = first_ms * policy.backoff_coefficient.powi(attempt as i32);
609
610 let delay_ms = if let Some(max) = policy.max_retry_interval {
611 next_ms.min(max.as_millis() as f64)
612 } else {
613 next_ms
614 };
615
616 Some(std::time::Duration::from_millis(delay_ms as u64))
617}
618
619fn call_with_retry(
625 ctx: OrchestrationContext,
626 schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync>,
627 policy: RetryPolicy,
628 first_attempt_time: chrono::DateTime<chrono::Utc>,
629) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
630 Box::pin(async move {
631 let mut attempt = 0;
632 loop {
633 let task = schedule(&ctx);
634 match task.await {
635 Ok(v) => return Ok(v),
636 Err(DurableTaskError::TaskFailed {
637 message,
638 failure_details,
639 }) => {
640 let details = failure_details.clone().unwrap_or_else(|| FailureDetails {
641 message: message.clone(),
642 error_type: "TaskFailed".to_string(),
643 stack_trace: None,
644 });
645
646 if attempt + 1 >= policy.max_number_of_attempts {
647 tracing::debug!(
648 attempt,
649 max = policy.max_number_of_attempts,
650 "Max retry attempts reached"
651 );
652 return Err(DurableTaskError::TaskFailed {
653 message,
654 failure_details,
655 });
656 }
657
658 let current_time = ctx.current_utc_datetime();
659 let delay = match compute_retry_delay(
660 &policy,
661 attempt,
662 first_attempt_time,
663 current_time,
664 &details,
665 ) {
666 Some(d) => d,
667 None => {
668 tracing::debug!(attempt, "Retry predicate or timeout prevented retry");
669 return Err(DurableTaskError::TaskFailed {
670 message,
671 failure_details,
672 });
673 }
674 };
675
676 tracing::debug!(
677 attempt,
678 delay_ms = delay.as_millis(),
679 "Scheduling retry timer"
680 );
681 ctx.create_timer(delay).await?;
682 attempt += 1;
683 }
684 Err(e) => return Err(e),
685 }
686 }
687 })
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693
694 fn make_ctx() -> OrchestrationContext {
695 OrchestrationContext::new(
696 "inst-1".to_string(),
697 "my_orch".to_string(),
698 Some("\"hello\"".to_string()),
699 chrono::Utc::now(),
700 false,
701 &crate::worker::WorkerOptions::default(),
702 0,
703 )
704 }
705
706 #[test]
707 fn test_basic_accessors() {
708 let ctx = make_ctx();
709 assert_eq!(ctx.instance_id().as_ref(), "inst-1");
710 assert_eq!(ctx.name().as_ref(), "my_orch");
711 assert!(!ctx.is_replaying());
712 }
713
714 #[test]
715 fn test_input() {
716 let ctx = make_ctx();
717 let input: String = ctx.input().unwrap();
718 assert_eq!(input, "hello");
719 }
720
721 #[test]
722 fn test_set_custom_status() {
723 let ctx = make_ctx();
724 ctx.set_custom_status("processing");
725 let inner = ctx.inner.lock().unwrap();
726 assert_eq!(inner.custom_status, Some("processing".to_string()));
727 }
728
729 #[test]
730 fn test_call_activity_creates_action() {
731 let ctx = make_ctx();
732 let _task = ctx.call_activity("greet", "world");
733
734 let inner = ctx.inner.lock().unwrap();
735 assert_eq!(inner.sequence_number, 1);
736 assert_eq!(inner.pending_actions.len(), 1);
737 assert_eq!(inner.pending_actions[0].id, 0);
738 match &inner.pending_actions[0].workflow_action_type {
739 Some(proto::workflow_action::WorkflowActionType::ScheduleTask(a)) => {
740 assert_eq!(a.name, "greet");
741 assert_eq!(a.input, Some("\"world\"".to_string()));
742 }
743 _ => panic!("expected ScheduleTask action"),
744 }
745 }
746
747 #[test]
748 fn test_call_activity_replay_returns_existing() {
749 let ctx = make_ctx();
750
751 {
753 let mut inner = ctx.inner.lock().unwrap();
754 let task = CompletableTask::new();
755 task.complete(Some("42".to_string()));
756 inner.pending_tasks.insert(0, task);
757 }
758
759 let task = ctx.call_activity("greet", "world");
760 assert!(task.is_complete());
761
762 let inner = ctx.inner.lock().unwrap();
763 assert_eq!(inner.pending_actions.len(), 0);
764 }
765
766 #[test]
767 fn test_call_sub_orchestrator() {
768 let ctx = make_ctx();
769 let _task = ctx.call_sub_orchestrator("child_orch", "input", Some("child-1"));
770
771 let inner = ctx.inner.lock().unwrap();
772 assert_eq!(inner.sequence_number, 1);
773 match &inner.pending_actions[0].workflow_action_type {
774 Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
775 assert_eq!(a.name, "child_orch");
776 assert_eq!(a.instance_id, "child-1");
777 }
778 _ => panic!("expected CreateChildWorkflow action"),
779 }
780 }
781
782 #[test]
783 fn test_create_timer() {
784 let ctx = make_ctx();
785 let _task = ctx.create_timer(std::time::Duration::from_secs(60));
786
787 let inner = ctx.inner.lock().unwrap();
788 assert_eq!(inner.sequence_number, 1);
789 match &inner.pending_actions[0].workflow_action_type {
790 Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => {
791 assert!(a.fire_at.is_some());
792 }
793 _ => panic!("expected CreateTimer action"),
794 }
795 }
796
797 #[test]
798 fn test_wait_for_external_event_buffered() {
799 let ctx = make_ctx();
800
801 {
803 let mut inner = ctx.inner.lock().unwrap();
804 inner
805 .buffered_events
806 .entry("approval".to_string())
807 .or_default()
808 .push_back((Some("\"yes\"".to_string()), true));
809 }
810
811 let task = ctx.wait_for_external_event("APPROVAL"); assert!(task.is_complete());
813 }
814
815 #[test]
816 fn test_wait_for_external_event_pending() {
817 let ctx = make_ctx();
818 let task = ctx.wait_for_external_event("approval");
819 assert!(!task.is_complete());
820
821 let inner = ctx.inner.lock().unwrap();
822 assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
823 }
824
825 #[test]
826 fn test_continue_as_new() {
827 let ctx = make_ctx();
828 ctx.continue_as_new("new_input", true);
829
830 let inner = ctx.inner.lock().unwrap();
831 assert_eq!(
832 inner.continue_as_new_input,
833 Some("\"new_input\"".to_string())
834 );
835 assert!(inner.save_events_on_continue);
836 }
837
838 #[test]
839 fn test_sequence_numbers_increment() {
840 let ctx = make_ctx();
841 let _t1 = ctx.call_activity("a", ());
842 let _t2 = ctx.call_activity("b", ());
843 let _t3 = ctx.create_timer(std::time::Duration::from_secs(1));
844
845 let inner = ctx.inner.lock().unwrap();
846 assert_eq!(inner.sequence_number, 3);
847 assert_eq!(inner.pending_actions[0].id, 0);
848 assert_eq!(inner.pending_actions[1].id, 1);
849 assert_eq!(inner.pending_actions[2].id, 2);
850 }
851
852 #[test]
853 fn test_call_sub_orchestrator_with_app_id() {
854 let ctx = make_ctx();
855 let _task = ctx.call_sub_orchestrator_with_app_id(
856 "child_orch",
857 "input",
858 Some("child-1"),
859 "other-app",
860 );
861
862 let inner = ctx.inner.lock().unwrap();
863 assert_eq!(inner.sequence_number, 1);
864 match &inner.pending_actions[0].workflow_action_type {
865 Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
866 assert_eq!(a.name, "child_orch");
867 assert_eq!(a.instance_id, "child-1");
868 let router = a.router.as_ref().expect("expected router");
869 assert_eq!(router.target_app_id, Some("other-app".to_string()));
870 }
871 _ => panic!("expected CreateChildWorkflow action"),
872 }
873 }
874
875 #[test]
876 fn test_is_patched_new_execution_returns_true() {
877 let ctx = make_ctx();
879 assert!(ctx.is_patched("my-patch"));
880 }
881
882 #[test]
883 fn test_is_patched_in_history_returns_true() {
884 let ctx = make_ctx();
886 ctx.inner
887 .lock()
888 .unwrap()
889 .history_patches
890 .insert("my-patch".to_string());
891 assert!(ctx.is_patched("my-patch"));
892 }
893
894 #[test]
895 fn test_is_patched_mid_replay_returns_false() {
896 let ctx = make_ctx();
898 ctx.inner.lock().unwrap().history_scheduled_count = 2;
899 assert!(!ctx.is_patched("my-patch"));
900 }
901
902 #[test]
903 fn test_is_patched_at_frontier_after_history_returns_true() {
904 let ctx = make_ctx();
906 {
907 let mut inner = ctx.inner.lock().unwrap();
908 inner.history_scheduled_count = 1;
909 inner.sequence_number = 1;
910 }
911 assert!(ctx.is_patched("my-patch"));
912 }
913
914 #[test]
915 fn test_is_patched_caches_decision() {
916 let ctx = make_ctx();
917 assert!(ctx.is_patched("my-patch"));
919 ctx.inner.lock().unwrap().history_scheduled_count = 99;
921 assert!(ctx.is_patched("my-patch"));
922 }
923}