1use std::collections::{HashMap, VecDeque};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, LazyLock, Mutex, MutexGuard};
4
5use futures::future::BoxFuture;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8
9use crate::api::{
10 DurableTaskError, ExternalEventResult, FailureDetails, HistoryPropagationScope,
11 OrchestrationStatus, PropagatedHistory, RetryPolicy,
12};
13use crate::internal::{to_json, to_timestamp};
14use crate::proto;
15
16use super::completable_task::CompletableTask;
17use super::options::{ActivityOptions, SubOrchestratorOptions};
18
19const EXTERNAL_EVENT_TIMER_PATCH: &str = "dapr:external-event-timer";
21
22static FAR_FUTURE_TIMESTAMP: LazyLock<chrono::DateTime<chrono::Utc>> = LazyLock::new(|| {
24 chrono::NaiveDate::from_ymd_opt(9999, 12, 31)
25 .unwrap()
26 .and_hms_opt(23, 59, 59)
27 .unwrap()
28 .and_utc()
29});
30
31pub(crate) fn lock_inner<T>(m: &Mutex<T>) -> MutexGuard<'_, T> {
32 m.lock().unwrap_or_else(|e| e.into_inner())
33}
34
35#[derive(Debug)]
36pub(crate) struct ContextConfig {
37 pub(crate) max_event_names: usize,
38 pub(crate) max_events_per_name: usize,
39 pub(crate) max_pending_tasks_per_name: usize,
40 pub(crate) max_json_payload_size: usize,
41}
42
43pub(crate) struct OrchestrationContextInner {
45 pub(crate) config: Arc<ContextConfig>,
46 pub(crate) instance_id: Arc<str>,
47 pub(crate) current_utc_datetime: chrono::DateTime<chrono::Utc>,
48 pub(crate) is_replaying: Arc<AtomicBool>,
49 pub(crate) is_complete: bool,
50 pub(crate) input: Option<String>,
51 pub(crate) name: Arc<str>,
52 pub(crate) custom_status: Option<String>,
53 pub(crate) sequence_number: i32,
54 pub(crate) pending_tasks: HashMap<i32, CompletableTask>,
55 pub(crate) pending_event_tasks: HashMap<String, VecDeque<CompletableTask>>,
56 pub(crate) buffered_events: HashMap<String, VecDeque<(Option<String>, bool)>>,
59 pub(crate) pending_actions: Vec<proto::WorkflowAction>,
60 pub(crate) completion_status: Option<OrchestrationStatus>,
61 pub(crate) completion_result: Option<String>,
62 pub(crate) completion_failure: Option<FailureDetails>,
63 pub(crate) continue_as_new_input: Option<String>,
64 pub(crate) save_events_on_continue: bool,
65 pub(crate) is_suspended: bool,
66 pub(crate) history_patches: std::collections::HashSet<String>,
68 pub(crate) applied_patches: HashMap<String, bool>,
70 pub(crate) history_scheduled_count: i32,
74 pub(crate) propagated_history: Option<Arc<PropagatedHistory>>,
77}
78
79#[derive(Clone)]
84pub struct OrchestrationContext {
85 pub(crate) inner: Arc<Mutex<OrchestrationContextInner>>,
86}
87
88impl OrchestrationContext {
89 pub(crate) fn new(
91 instance_id: String,
92 name: String,
93 input: Option<String>,
94 current_utc_datetime: chrono::DateTime<chrono::Utc>,
95 is_replaying: bool,
96 options: &crate::worker::WorkerOptions,
97 event_count_hint: usize,
98 ) -> Self {
99 let config = Arc::new(ContextConfig {
100 max_event_names: options.max_event_names,
101 max_events_per_name: options.max_events_per_name,
102 max_pending_tasks_per_name: options.max_pending_tasks_per_name,
103 max_json_payload_size: options.max_json_payload_size,
104 });
105
106 Self {
107 inner: Arc::new(Mutex::new(OrchestrationContextInner {
108 config,
109 instance_id: Arc::<str>::from(instance_id),
110 current_utc_datetime,
111 is_replaying: Arc::new(AtomicBool::new(is_replaying)),
112 is_complete: false,
113 input,
114 name: Arc::<str>::from(name),
115 custom_status: None,
116 sequence_number: 0,
117 pending_tasks: HashMap::with_capacity(event_count_hint / 2),
118 pending_event_tasks: HashMap::new(),
119 buffered_events: HashMap::new(),
120 pending_actions: Vec::with_capacity(event_count_hint / 2),
121 completion_status: None,
122 completion_result: None,
123 completion_failure: None,
124 continue_as_new_input: None,
125 save_events_on_continue: false,
126 is_suspended: false,
127 history_patches: std::collections::HashSet::new(),
128 applied_patches: HashMap::new(),
129 history_scheduled_count: 0,
130 propagated_history: None,
131 })),
132 }
133 }
134
135 pub fn instance_id(&self) -> Arc<str> {
137 lock_inner(&self.inner).instance_id.clone()
138 }
139
140 pub fn current_utc_datetime(&self) -> chrono::DateTime<chrono::Utc> {
142 lock_inner(&self.inner).current_utc_datetime
143 }
144
145 pub fn is_replaying(&self) -> bool {
147 lock_inner(&self.inner).is_replaying.load(Ordering::Acquire)
148 }
149
150 pub fn name(&self) -> Arc<str> {
152 lock_inner(&self.inner).name.clone()
153 }
154
155 pub fn input<T: DeserializeOwned>(&self) -> crate::api::Result<T> {
157 let inner = lock_inner(&self.inner);
158 crate::internal::from_json(inner.input.as_deref(), inner.config.max_json_payload_size)
159 }
160
161 pub fn propagated_history(&self) -> Option<Arc<PropagatedHistory>> {
167 lock_inner(&self.inner).propagated_history.clone()
168 }
169
170 pub fn set_custom_status(&self, status: impl Into<String>) {
172 let mut inner = lock_inner(&self.inner);
173 inner.custom_status = Some(status.into());
174 }
175
176 pub fn call_activity(&self, name: &str, input: impl Serialize) -> CompletableTask {
184 tracing::debug!(activity = %name, "Scheduling activity");
185 self.call_activity_inner(name, input, None, None)
186 }
187
188 pub fn call_activity_with_app_id(
190 &self,
191 name: &str,
192 input: impl Serialize,
193 app_id: &str,
194 ) -> CompletableTask {
195 tracing::debug!(activity = %name, app_id = %app_id, "Scheduling activity with app_id");
196 self.call_activity_inner(name, input, Some(app_id), None)
197 }
198
199 fn call_activity_inner(
200 &self,
201 name: &str,
202 input: impl Serialize,
203 app_id: Option<&str>,
204 history_propagation_scope: Option<HistoryPropagationScope>,
205 ) -> CompletableTask {
206 let input_json = match to_json(&input) {
207 Ok(json) => json,
208 Err(e) => {
209 let task = CompletableTask::new();
210 task.fail(FailureDetails {
211 message: format!("Failed to serialize activity input: {e}"),
212 error_type: "SerializationError".to_string(),
213 stack_trace: None,
214 });
215 return task;
216 }
217 };
218 self.call_activity_raw(name, input_json, app_id, history_propagation_scope)
219 }
220
221 fn call_activity_raw(
223 &self,
224 name: &str,
225 input_json: Option<String>,
226 app_id: Option<&str>,
227 history_propagation_scope: Option<HistoryPropagationScope>,
228 ) -> CompletableTask {
229 let mut inner = lock_inner(&self.inner);
230 let seq = inner.sequence_number;
231 inner.sequence_number += 1;
232
233 if let Some(existing) = inner.pending_tasks.get(&seq)
234 && existing.is_complete()
235 {
236 return existing.clone();
237 }
238
239 let task = CompletableTask::new();
240 task.set_replay_handle(inner.is_replaying.clone());
241 inner.pending_tasks.insert(seq, task.clone());
242
243 let router = app_id.map(|id| proto::TaskRouter {
244 source_app_id: String::new(),
245 target_app_id: Some(id.to_string()),
246 target_app_namespace: None,
247 });
248 let action = proto::WorkflowAction {
249 id: seq,
250 router: None,
251 workflow_action_type: Some(proto::workflow_action::WorkflowActionType::ScheduleTask(
252 proto::ScheduleTaskAction {
253 name: name.to_string(),
254 version: None,
255 input: input_json,
256 router,
257 task_execution_id: String::new(),
258 history_propagation_scope: history_propagation_scope
259 .map(|s| s.to_proto() as i32),
260 },
261 )),
262 };
263 inner.pending_actions.push(action);
264
265 task
266 }
267
268 pub fn call_activity_with_options(
273 &self,
274 name: &str,
275 input: impl Serialize,
276 options: ActivityOptions,
277 ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
278 {
279 let input_json = to_json(&input);
280 let name = name.to_string();
281 let app_id = options.app_id.clone();
282 let scope = options.history_propagation_scope;
283 let ctx = self.clone();
284
285 async move {
286 let input_json = input_json?;
287 match options.retry_policy {
288 Some(policy) => {
289 let first_attempt_time = ctx.current_utc_datetime();
290 let schedule: Arc<
291 dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
292 > = Arc::new(move |c: &OrchestrationContext| {
293 c.call_activity_raw(&name, input_json.clone(), app_id.as_deref(), scope)
294 });
295 call_with_retry(ctx, schedule, policy, first_attempt_time).await
296 }
297 None => {
298 ctx.call_activity_raw(&name, input_json, app_id.as_deref(), scope)
299 .await
300 }
301 }
302 }
303 }
304
305 pub fn call_sub_orchestrator(
307 &self,
308 name: &str,
309 input: impl Serialize,
310 instance_id: Option<&str>,
311 ) -> CompletableTask {
312 tracing::debug!(
313 sub_orchestrator = %name,
314 sub_instance_id = ?instance_id,
315 "Scheduling sub-orchestration"
316 );
317 self.call_sub_orchestrator_inner(name, input, instance_id, None, None)
318 }
319
320 pub fn call_sub_orchestrator_with_app_id(
322 &self,
323 name: &str,
324 input: impl Serialize,
325 instance_id: Option<&str>,
326 app_id: &str,
327 ) -> CompletableTask {
328 tracing::debug!(
329 sub_orchestrator = %name,
330 sub_instance_id = ?instance_id,
331 app_id = %app_id,
332 "Scheduling sub-orchestration with app_id"
333 );
334 self.call_sub_orchestrator_inner(name, input, instance_id, Some(app_id), None)
335 }
336
337 fn call_sub_orchestrator_inner(
338 &self,
339 name: &str,
340 input: impl Serialize,
341 instance_id: Option<&str>,
342 app_id: Option<&str>,
343 history_propagation_scope: Option<HistoryPropagationScope>,
344 ) -> CompletableTask {
345 let input_json = match to_json(&input) {
346 Ok(json) => json,
347 Err(e) => {
348 let task = CompletableTask::new();
349 task.fail(FailureDetails {
350 message: format!("Failed to serialize sub-orchestrator input: {e}"),
351 error_type: "SerializationError".to_string(),
352 stack_trace: None,
353 });
354 return task;
355 }
356 };
357 self.call_sub_orchestrator_raw(
358 name,
359 input_json,
360 instance_id,
361 app_id,
362 history_propagation_scope,
363 )
364 }
365
366 fn call_sub_orchestrator_raw(
368 &self,
369 name: &str,
370 input_json: Option<String>,
371 instance_id: Option<&str>,
372 app_id: Option<&str>,
373 history_propagation_scope: Option<HistoryPropagationScope>,
374 ) -> CompletableTask {
375 let mut inner = lock_inner(&self.inner);
376 let seq = inner.sequence_number;
377 inner.sequence_number += 1;
378
379 if let Some(existing) = inner.pending_tasks.get(&seq)
380 && existing.is_complete()
381 {
382 return existing.clone();
383 }
384
385 let task = CompletableTask::new();
386 task.set_replay_handle(inner.is_replaying.clone());
387 inner.pending_tasks.insert(seq, task.clone());
388
389 let sub_instance_id = instance_id
390 .map(|s| s.to_string())
391 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
392
393 let router = app_id.map(|id| proto::TaskRouter {
394 source_app_id: String::new(),
395 target_app_id: Some(id.to_string()),
396 target_app_namespace: None,
397 });
398
399 let action = proto::WorkflowAction {
400 id: seq,
401 router: None,
402 workflow_action_type: Some(
403 proto::workflow_action::WorkflowActionType::CreateChildWorkflow(
404 proto::CreateChildWorkflowAction {
405 instance_id: sub_instance_id,
406 name: name.to_string(),
407 version: None,
408 input: input_json,
409 router,
410 history_propagation_scope: history_propagation_scope
411 .map(|s| s.to_proto() as i32),
412 },
413 ),
414 ),
415 };
416 inner.pending_actions.push(action);
417
418 task
419 }
420
421 pub fn call_sub_orchestrator_with_options(
429 &self,
430 name: &str,
431 input: impl Serialize,
432 options: SubOrchestratorOptions,
433 ) -> impl std::future::Future<Output = crate::api::Result<Option<String>>> + Send + 'static
434 {
435 let input_json = to_json(&input);
436 let name = name.to_string();
437 let instance_id = options.instance_id.clone();
438 let app_id = options.app_id.clone();
439 let scope = options.history_propagation_scope;
440 let ctx = self.clone();
441
442 async move {
443 let input_json = input_json?;
444 match options.retry_policy {
445 Some(policy) => {
446 let first_attempt_time = ctx.current_utc_datetime();
447 let schedule: Arc<
448 dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync,
449 > = Arc::new(move |c: &OrchestrationContext| {
450 c.call_sub_orchestrator_raw(
451 &name,
452 input_json.clone(),
453 instance_id.as_deref(),
454 app_id.as_deref(),
455 scope,
456 )
457 });
458 call_with_retry(ctx, schedule, policy, first_attempt_time).await
459 }
460 None => {
461 ctx.call_sub_orchestrator_raw(
462 &name,
463 input_json,
464 instance_id.as_deref(),
465 app_id.as_deref(),
466 scope,
467 )
468 .await
469 }
470 }
471 }
472 }
473
474 pub fn create_timer(&self, delay: std::time::Duration) -> CompletableTask {
476 tracing::debug!(delay_ms = delay.as_millis() as u64, "Creating timer");
477 let mut inner = lock_inner(&self.inner);
478 let fire_at = inner.current_utc_datetime
479 + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::zero());
480 Self::create_timer_with_origin(&mut inner, fire_at, None, None)
481 }
482
483 fn create_timer_with_origin(
485 inner: &mut OrchestrationContextInner,
486 fire_at: chrono::DateTime<chrono::Utc>,
487 name: Option<String>,
488 origin: Option<proto::create_timer_action::Origin>,
489 ) -> CompletableTask {
490 let seq = inner.sequence_number;
491 inner.sequence_number += 1;
492
493 if let Some(existing) = inner.pending_tasks.get(&seq)
494 && existing.is_complete()
495 {
496 return existing.clone();
497 }
498
499 let task = CompletableTask::new();
500 task.set_replay_handle(inner.is_replaying.clone());
501 inner.pending_tasks.insert(seq, task.clone());
502
503 let action = proto::WorkflowAction {
504 id: seq,
505 router: None,
506 workflow_action_type: Some(proto::workflow_action::WorkflowActionType::CreateTimer(
507 proto::CreateTimerAction {
508 fire_at: Some(to_timestamp(fire_at)),
509 name,
510 origin,
511 },
512 )),
513 };
514 inner.pending_actions.push(action);
515
516 task
517 }
518
519 pub fn wait_for_external_event(&self, name: &str) -> CompletableTask {
526 tracing::debug!(event_name = %name, "Waiting for external event");
527 let mut inner = lock_inner(&self.inner);
528 let event_name = name.to_lowercase();
529
530 let emit_timer = Self::is_patched_inner(&mut inner, EXTERNAL_EVENT_TIMER_PATCH);
532 if emit_timer {
533 let origin = proto::create_timer_action::Origin::ExternalEvent(
534 proto::TimerOriginExternalEvent {
535 name: name.to_string(),
536 },
537 );
538 let _timer_task = Self::create_timer_with_origin(
540 &mut inner,
541 *FAR_FUTURE_TIMESTAMP,
542 None,
543 Some(origin),
544 );
545 }
546
547 if let Some(events) = inner.buffered_events.get_mut(&event_name)
548 && !events.is_empty()
549 {
550 let (data, during_replay) = events
551 .pop_front()
552 .expect("buffered event queue is not empty");
553 let task = CompletableTask::new();
554 task.set_replay_handle(inner.is_replaying.clone());
555 task.complete_with_phase(data, during_replay);
556 return task;
557 }
558
559 let task = CompletableTask::new();
560 task.set_replay_handle(inner.is_replaying.clone());
561 let max_pending = inner.config.max_pending_tasks_per_name;
562 let pending = inner.pending_event_tasks.entry(event_name).or_default();
563 if pending.len() >= max_pending {
564 tracing::warn!(event_name = %name, "Pending event task limit reached, discarding wait");
565 return task;
566 }
567 pending.push_back(task.clone());
568 task
569 }
570
571 pub async fn wait_for_external_event_with_timeout(
581 &self,
582 name: &str,
583 timeout: std::time::Duration,
584 ) -> crate::api::Result<ExternalEventResult> {
585 tracing::debug!(
586 event_name = %name,
587 timeout_ms = timeout.as_millis() as u64,
588 "Waiting for external event with timeout"
589 );
590
591 let (event_task, timer_task) = {
592 let mut inner = lock_inner(&self.inner);
593 let event_name = name.to_lowercase();
594
595 let fire_at = inner.current_utc_datetime
597 + chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
598 let origin = proto::create_timer_action::Origin::ExternalEvent(
599 proto::TimerOriginExternalEvent {
600 name: name.to_string(),
601 },
602 );
603 let timer_task =
604 Self::create_timer_with_origin(&mut inner, fire_at, None, Some(origin));
605
606 let event_task = if let Some(events) = inner.buffered_events.get_mut(&event_name)
608 && !events.is_empty()
609 {
610 let (data, during_replay) = events
611 .pop_front()
612 .expect("buffered event queue is not empty");
613 let task = CompletableTask::new();
614 task.set_replay_handle(inner.is_replaying.clone());
615 task.complete_with_phase(data, during_replay);
616 task
617 } else {
618 let task = CompletableTask::new();
619 task.set_replay_handle(inner.is_replaying.clone());
620 let max_pending = inner.config.max_pending_tasks_per_name;
621 let pending = inner.pending_event_tasks.entry(event_name).or_default();
622 if pending.len() >= max_pending {
623 tracing::warn!(
624 event_name = %name,
625 "Pending event task limit reached, discarding wait"
626 );
627 } else {
628 pending.push_back(task.clone());
629 }
630 task
631 };
632
633 (event_task, timer_task)
634 };
635
636 let winner = super::when_any::when_any(vec![event_task.clone(), timer_task]).await?;
638 match winner {
639 0 => {
640 let payload = event_task.await?;
641 Ok(ExternalEventResult::Received(payload))
642 }
643 _ => {
644 let mut inner = lock_inner(&self.inner);
647 let event_name = name.to_lowercase();
648 if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name) {
649 tasks.retain(|t| !t.ptr_eq(&event_task));
650 }
651 Ok(ExternalEventResult::TimedOut)
652 }
653 }
654 }
655
656 fn is_patched_inner(inner: &mut OrchestrationContextInner, patch_name: &str) -> bool {
658 if let Some(&cached) = inner.applied_patches.get(patch_name) {
659 return cached;
660 }
661 if inner.history_patches.contains(patch_name) {
662 inner.applied_patches.insert(patch_name.to_string(), true);
663 return true;
664 }
665 if inner.sequence_number < inner.history_scheduled_count {
666 inner.applied_patches.insert(patch_name.to_string(), false);
667 return false;
668 }
669 inner.applied_patches.insert(patch_name.to_string(), true);
670 true
671 }
672
673 pub fn continue_as_new(&self, input: impl Serialize, save_events: bool) {
675 tracing::debug!(save_events = save_events, "Continuing orchestration as new");
676 let mut inner = lock_inner(&self.inner);
677 inner.continue_as_new_input = to_json(&input).ok().flatten();
678 inner.save_events_on_continue = save_events;
679 }
680
681 pub fn is_patched(&self, patch_name: &str) -> bool {
695 let mut inner = lock_inner(&self.inner);
696
697 if let Some(&cached) = inner.applied_patches.get(patch_name) {
699 return cached;
700 }
701
702 if inner.history_patches.contains(patch_name) {
704 inner.applied_patches.insert(patch_name.to_string(), true);
705 return true;
706 }
707
708 if inner.sequence_number < inner.history_scheduled_count {
713 inner.applied_patches.insert(patch_name.to_string(), false);
714 return false;
715 }
716
717 inner.applied_patches.insert(patch_name.to_string(), true);
719 true
720 }
721}
722
723fn compute_retry_delay(
728 policy: &RetryPolicy,
729 attempt: u32,
730 first_attempt_time: chrono::DateTime<chrono::Utc>,
731 current_time: chrono::DateTime<chrono::Utc>,
732 details: &FailureDetails,
733) -> Option<std::time::Duration> {
734 if let Some(ref handle) = policy.handle
736 && !handle(details)
737 {
738 return None;
739 }
740
741 if let Some(timeout) = policy.retry_timeout {
743 let elapsed = current_time - first_attempt_time;
744 let timeout_dur = chrono::Duration::from_std(timeout).unwrap_or(chrono::Duration::zero());
745 if elapsed >= timeout_dur {
746 return None;
747 }
748 }
749
750 let first_ms = policy.first_retry_interval.as_millis() as f64;
752 let next_ms = first_ms * policy.backoff_coefficient.powi(attempt as i32);
753
754 let delay_ms = if let Some(max) = policy.max_retry_interval {
755 next_ms.min(max.as_millis() as f64)
756 } else {
757 next_ms
758 };
759
760 Some(std::time::Duration::from_millis(delay_ms as u64))
761}
762
763fn call_with_retry(
769 ctx: OrchestrationContext,
770 schedule: Arc<dyn Fn(&OrchestrationContext) -> CompletableTask + Send + Sync>,
771 policy: RetryPolicy,
772 first_attempt_time: chrono::DateTime<chrono::Utc>,
773) -> BoxFuture<'static, crate::api::Result<Option<String>>> {
774 Box::pin(async move {
775 let mut attempt = 0;
776 loop {
777 let task = schedule(&ctx);
778 match task.await {
779 Ok(v) => return Ok(v),
780 Err(DurableTaskError::TaskFailed {
781 message,
782 failure_details,
783 }) => {
784 let details = failure_details.clone().unwrap_or_else(|| FailureDetails {
785 message: message.clone(),
786 error_type: "TaskFailed".to_string(),
787 stack_trace: None,
788 });
789
790 if attempt + 1 >= policy.max_number_of_attempts {
791 tracing::debug!(
792 attempt,
793 max = policy.max_number_of_attempts,
794 "Max retry attempts reached"
795 );
796 return Err(DurableTaskError::TaskFailed {
797 message,
798 failure_details,
799 });
800 }
801
802 let current_time = ctx.current_utc_datetime();
803 let delay = match compute_retry_delay(
804 &policy,
805 attempt,
806 first_attempt_time,
807 current_time,
808 &details,
809 ) {
810 Some(d) => d,
811 None => {
812 tracing::debug!(attempt, "Retry predicate or timeout prevented retry");
813 return Err(DurableTaskError::TaskFailed {
814 message,
815 failure_details,
816 });
817 }
818 };
819
820 tracing::debug!(
821 attempt,
822 delay_ms = delay.as_millis(),
823 "Scheduling retry timer"
824 );
825 ctx.create_timer(delay).await?;
826 attempt += 1;
827 }
828 Err(e) => return Err(e),
829 }
830 }
831 })
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837 use chrono::Datelike;
838
839 fn make_ctx() -> OrchestrationContext {
840 OrchestrationContext::new(
841 "inst-1".to_string(),
842 "my_orch".to_string(),
843 Some("\"hello\"".to_string()),
844 chrono::Utc::now(),
845 false,
846 &crate::worker::WorkerOptions::default(),
847 0,
848 )
849 }
850
851 #[test]
852 fn test_basic_accessors() {
853 let ctx = make_ctx();
854 assert_eq!(ctx.instance_id().as_ref(), "inst-1");
855 assert_eq!(ctx.name().as_ref(), "my_orch");
856 assert!(!ctx.is_replaying());
857 }
858
859 #[test]
860 fn test_input() {
861 let ctx = make_ctx();
862 let input: String = ctx.input().unwrap();
863 assert_eq!(input, "hello");
864 }
865
866 #[test]
867 fn test_set_custom_status() {
868 let ctx = make_ctx();
869 ctx.set_custom_status("processing");
870 let inner = ctx.inner.lock().unwrap();
871 assert_eq!(inner.custom_status, Some("processing".to_string()));
872 }
873
874 #[test]
875 fn test_call_activity_creates_action() {
876 let ctx = make_ctx();
877 let _task = ctx.call_activity("greet", "world");
878
879 let inner = ctx.inner.lock().unwrap();
880 assert_eq!(inner.sequence_number, 1);
881 assert_eq!(inner.pending_actions.len(), 1);
882 assert_eq!(inner.pending_actions[0].id, 0);
883 match &inner.pending_actions[0].workflow_action_type {
884 Some(proto::workflow_action::WorkflowActionType::ScheduleTask(a)) => {
885 assert_eq!(a.name, "greet");
886 assert_eq!(a.input, Some("\"world\"".to_string()));
887 }
888 _ => panic!("expected ScheduleTask action"),
889 }
890 }
891
892 #[test]
893 fn test_call_activity_replay_returns_existing() {
894 let ctx = make_ctx();
895
896 {
898 let mut inner = ctx.inner.lock().unwrap();
899 let task = CompletableTask::new();
900 task.complete(Some("42".to_string()));
901 inner.pending_tasks.insert(0, task);
902 }
903
904 let task = ctx.call_activity("greet", "world");
905 assert!(task.is_complete());
906
907 let inner = ctx.inner.lock().unwrap();
908 assert_eq!(inner.pending_actions.len(), 0);
909 }
910
911 #[test]
912 fn test_call_sub_orchestrator() {
913 let ctx = make_ctx();
914 let _task = ctx.call_sub_orchestrator("child_orch", "input", Some("child-1"));
915
916 let inner = ctx.inner.lock().unwrap();
917 assert_eq!(inner.sequence_number, 1);
918 match &inner.pending_actions[0].workflow_action_type {
919 Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
920 assert_eq!(a.name, "child_orch");
921 assert_eq!(a.instance_id, "child-1");
922 }
923 _ => panic!("expected CreateChildWorkflow action"),
924 }
925 }
926
927 #[test]
928 fn test_create_timer() {
929 let ctx = make_ctx();
930 let _task = ctx.create_timer(std::time::Duration::from_secs(60));
931
932 let inner = ctx.inner.lock().unwrap();
933 assert_eq!(inner.sequence_number, 1);
934 match &inner.pending_actions[0].workflow_action_type {
935 Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => {
936 assert!(a.fire_at.is_some());
937 }
938 _ => panic!("expected CreateTimer action"),
939 }
940 }
941
942 #[test]
943 fn test_wait_for_external_event_buffered() {
944 let ctx = make_ctx();
945
946 {
948 let mut inner = ctx.inner.lock().unwrap();
949 inner
950 .buffered_events
951 .entry("approval".to_string())
952 .or_default()
953 .push_back((Some("\"yes\"".to_string()), true));
954 }
955
956 let task = ctx.wait_for_external_event("APPROVAL"); assert!(task.is_complete());
958 }
959
960 #[test]
961 fn test_wait_for_external_event_pending() {
962 let ctx = make_ctx();
963 let task = ctx.wait_for_external_event("approval");
964 assert!(!task.is_complete());
965
966 let inner = ctx.inner.lock().unwrap();
967 assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
968 }
969
970 #[test]
971 fn test_continue_as_new() {
972 let ctx = make_ctx();
973 ctx.continue_as_new("new_input", true);
974
975 let inner = ctx.inner.lock().unwrap();
976 assert_eq!(
977 inner.continue_as_new_input,
978 Some("\"new_input\"".to_string())
979 );
980 assert!(inner.save_events_on_continue);
981 }
982
983 #[test]
984 fn test_sequence_numbers_increment() {
985 let ctx = make_ctx();
986 let _t1 = ctx.call_activity("a", ());
987 let _t2 = ctx.call_activity("b", ());
988 let _t3 = ctx.create_timer(std::time::Duration::from_secs(1));
989
990 let inner = ctx.inner.lock().unwrap();
991 assert_eq!(inner.sequence_number, 3);
992 assert_eq!(inner.pending_actions[0].id, 0);
993 assert_eq!(inner.pending_actions[1].id, 1);
994 assert_eq!(inner.pending_actions[2].id, 2);
995 }
996
997 #[test]
998 fn test_call_sub_orchestrator_with_app_id() {
999 let ctx = make_ctx();
1000 let _task = ctx.call_sub_orchestrator_with_app_id(
1001 "child_orch",
1002 "input",
1003 Some("child-1"),
1004 "other-app",
1005 );
1006
1007 let inner = ctx.inner.lock().unwrap();
1008 assert_eq!(inner.sequence_number, 1);
1009 match &inner.pending_actions[0].workflow_action_type {
1010 Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(a)) => {
1011 assert_eq!(a.name, "child_orch");
1012 assert_eq!(a.instance_id, "child-1");
1013 let router = a.router.as_ref().expect("expected router");
1014 assert_eq!(router.target_app_id, Some("other-app".to_string()));
1015 }
1016 _ => panic!("expected CreateChildWorkflow action"),
1017 }
1018 }
1019
1020 #[test]
1021 fn test_is_patched_new_execution_returns_true() {
1022 let ctx = make_ctx();
1024 assert!(ctx.is_patched("my-patch"));
1025 }
1026
1027 #[test]
1028 fn test_is_patched_in_history_returns_true() {
1029 let ctx = make_ctx();
1031 ctx.inner
1032 .lock()
1033 .unwrap()
1034 .history_patches
1035 .insert("my-patch".to_string());
1036 assert!(ctx.is_patched("my-patch"));
1037 }
1038
1039 #[test]
1040 fn test_is_patched_mid_replay_returns_false() {
1041 let ctx = make_ctx();
1043 ctx.inner.lock().unwrap().history_scheduled_count = 2;
1044 assert!(!ctx.is_patched("my-patch"));
1045 }
1046
1047 #[test]
1048 fn test_is_patched_at_frontier_after_history_returns_true() {
1049 let ctx = make_ctx();
1051 {
1052 let mut inner = ctx.inner.lock().unwrap();
1053 inner.history_scheduled_count = 1;
1054 inner.sequence_number = 1;
1055 }
1056 assert!(ctx.is_patched("my-patch"));
1057 }
1058
1059 #[test]
1060 fn test_is_patched_caches_decision() {
1061 let ctx = make_ctx();
1062 assert!(ctx.is_patched("my-patch"));
1064 ctx.inner.lock().unwrap().history_scheduled_count = 99;
1066 assert!(ctx.is_patched("my-patch"));
1067 }
1068
1069 fn extract_create_timer(action: &proto::WorkflowAction) -> &proto::CreateTimerAction {
1071 match &action.workflow_action_type {
1072 Some(proto::workflow_action::WorkflowActionType::CreateTimer(a)) => a,
1073 other => panic!("expected CreateTimer action, got {other:?}"),
1074 }
1075 }
1076
1077 #[test]
1078 fn test_create_timer_origin_none() {
1079 let ctx = make_ctx();
1081 let _task = ctx.create_timer(std::time::Duration::from_secs(60));
1082
1083 let inner = ctx.inner.lock().unwrap();
1084 let timer_action = extract_create_timer(&inner.pending_actions[0]);
1085 assert!(
1086 timer_action.origin.is_none(),
1087 "generic timer should have no origin"
1088 );
1089 }
1090
1091 #[test]
1092 fn test_wait_for_external_event_emits_timer_new_execution() {
1093 let ctx = make_ctx();
1095 let _task = ctx.wait_for_external_event("approval");
1096
1097 let inner = ctx.inner.lock().unwrap();
1098 assert_eq!(
1099 inner.sequence_number, 1,
1100 "should have allocated a seq for the timer"
1101 );
1102 assert_eq!(
1103 inner.pending_actions.len(),
1104 1,
1105 "should have emitted a CreateTimerAction"
1106 );
1107
1108 let timer_action = extract_create_timer(&inner.pending_actions[0]);
1109 match &timer_action.origin {
1110 Some(proto::create_timer_action::Origin::ExternalEvent(e)) => {
1111 assert_eq!(e.name, "approval");
1112 }
1113 other => panic!("expected ExternalEvent origin, got {other:?}"),
1114 }
1115
1116 let fire_at = timer_action
1118 .fire_at
1119 .as_ref()
1120 .expect("fire_at should be set");
1121 let fire_at_dt = chrono::DateTime::from_timestamp(fire_at.seconds, fire_at.nanos as u32);
1122 assert!(fire_at_dt.is_some());
1123 assert!(fire_at_dt.unwrap().year() >= 9999);
1124 }
1125
1126 #[test]
1127 fn test_wait_for_external_event_no_timer_during_replay() {
1128 let ctx = make_ctx();
1130 ctx.inner.lock().unwrap().history_scheduled_count = 5;
1131
1132 let _task = ctx.wait_for_external_event("approval");
1133
1134 let inner = ctx.inner.lock().unwrap();
1135 assert_eq!(
1136 inner.sequence_number, 0,
1137 "should NOT allocate a seq during replay"
1138 );
1139 assert!(
1140 inner.pending_actions.is_empty(),
1141 "should NOT emit a timer during replay"
1142 );
1143 assert_eq!(inner.pending_event_tasks.get("approval").unwrap().len(), 1);
1144 }
1145
1146 #[test]
1147 fn test_wait_for_external_event_buffered_still_emits_timer() {
1148 let ctx = make_ctx();
1150 {
1151 let mut inner = ctx.inner.lock().unwrap();
1152 inner
1153 .buffered_events
1154 .entry("approval".to_string())
1155 .or_default()
1156 .push_back((Some("\"yes\"".to_string()), true));
1157 }
1158
1159 let task = ctx.wait_for_external_event("APPROVAL");
1160 assert!(
1161 task.is_complete(),
1162 "buffered event should complete immediately"
1163 );
1164
1165 let inner = ctx.inner.lock().unwrap();
1166 assert_eq!(
1167 inner.sequence_number, 1,
1168 "should have allocated a seq for the timer"
1169 );
1170 assert_eq!(inner.pending_actions.len(), 1);
1171 let timer_action = extract_create_timer(&inner.pending_actions[0]);
1172 assert!(
1173 matches!(
1174 &timer_action.origin,
1175 Some(proto::create_timer_action::Origin::ExternalEvent(_))
1176 ),
1177 "timer origin should be ExternalEvent"
1178 );
1179 }
1180
1181 #[test]
1182 fn test_wait_for_external_event_with_timeout_emits_timer() {
1183 let ctx = make_ctx();
1185
1186 {
1188 let mut inner = ctx.inner.lock().unwrap();
1189 let event_name = "approval".to_string();
1190 let fire_at = inner.current_utc_datetime + chrono::Duration::seconds(30);
1191 let origin = proto::create_timer_action::Origin::ExternalEvent(
1192 proto::TimerOriginExternalEvent {
1193 name: "approval".to_string(),
1194 },
1195 );
1196 let _timer = OrchestrationContext::create_timer_with_origin(
1197 &mut inner,
1198 fire_at,
1199 None,
1200 Some(origin),
1201 );
1202 let task = CompletableTask::new();
1204 inner
1205 .pending_event_tasks
1206 .entry(event_name)
1207 .or_default()
1208 .push_back(task);
1209 }
1210
1211 let inner = ctx.inner.lock().unwrap();
1212 assert_eq!(inner.sequence_number, 1);
1213 let timer_action = extract_create_timer(&inner.pending_actions[0]);
1214 match &timer_action.origin {
1215 Some(proto::create_timer_action::Origin::ExternalEvent(e)) => {
1216 assert_eq!(e.name, "approval");
1217 }
1218 other => panic!("expected ExternalEvent origin, got {other:?}"),
1219 }
1220 let fire_at = timer_action.fire_at.as_ref().unwrap();
1222 let fire_at_dt =
1223 chrono::DateTime::from_timestamp(fire_at.seconds, fire_at.nanos as u32).unwrap();
1224 assert!(fire_at_dt.year() < 9999, "should not be far-future");
1225 }
1226
1227 #[test]
1228 fn test_create_timer_refactor_still_works() {
1229 let ctx = make_ctx();
1231 let _t1 = ctx.create_timer(std::time::Duration::from_secs(10));
1232 let _t2 = ctx.create_timer(std::time::Duration::from_secs(20));
1233
1234 let inner = ctx.inner.lock().unwrap();
1235 assert_eq!(inner.sequence_number, 2);
1236 assert_eq!(inner.pending_actions.len(), 2);
1237 assert_eq!(inner.pending_actions[0].id, 0);
1238 assert_eq!(inner.pending_actions[1].id, 1);
1239
1240 for action in &inner.pending_actions {
1241 let timer = extract_create_timer(action);
1242 assert!(timer.fire_at.is_some());
1243 assert!(
1244 timer.origin.is_none(),
1245 "generic timer should have no origin"
1246 );
1247 }
1248 }
1249}