1use std::sync::atomic::Ordering;
2use std::task::Poll;
3
4use futures::FutureExt;
5
6use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
7use crate::internal::from_timestamp;
8use crate::proto;
9use crate::proto::history_event::EventType;
10use crate::task::OrchestrationContext;
11use crate::task::orchestration_context::{OrchestrationContextInner, lock_inner};
12
13use super::options::WorkerOptions;
14use super::registry::OrchestratorFn;
15
16pub struct OrchestrationExecutor;
24
25impl OrchestrationExecutor {
26 pub async fn execute(
30 orchestrator_fn: &OrchestratorFn,
31 instance_id: &str,
32 old_events: Vec<proto::HistoryEvent>,
33 new_events: Vec<proto::HistoryEvent>,
34 completion_token: String,
35 options: &WorkerOptions,
36 propagated_history: Option<crate::api::PropagatedHistory>,
37 ) -> crate::api::Result<proto::WorkflowResponse> {
38 tracing::info!(
39 instance_id = %instance_id,
40 past_events = old_events.len(),
41 new_events = new_events.len(),
42 "Starting orchestration execution"
43 );
44
45 let ctx = OrchestrationContext::new(
47 instance_id.to_string(),
48 String::new(),
49 None,
50 chrono::Utc::now(),
51 true,
52 options,
53 old_events.len() + new_events.len(),
54 );
55
56 if propagated_history.is_some() {
59 let mut inner = lock_inner(&ctx.inner);
60 inner.propagated_history = propagated_history.map(std::sync::Arc::new);
61 }
62
63 let initially_replaying = !old_events.is_empty();
71 {
72 let mut inner = lock_inner(&ctx.inner);
73 inner.is_replaying.store(true, Ordering::Release);
74 tracing::debug!(
75 instance_id = %instance_id,
76 count = old_events.len(),
77 "Replaying old events"
78 );
79 for event in &old_events {
80 Self::process_event(
81 &mut inner,
82 event,
83 instance_id,
84 options.max_identifier_length,
85 );
86 }
87
88 inner.is_replaying.store(false, Ordering::Release);
89 tracing::debug!(
90 instance_id = %instance_id,
91 count = new_events.len(),
92 "Processing new events"
93 );
94 for event in &new_events {
95 Self::process_event(
96 &mut inner,
97 event,
98 instance_id,
99 options.max_identifier_length,
100 );
101 }
102
103 inner
106 .is_replaying
107 .store(initially_replaying, Ordering::Release);
108 }
109
110 #[cfg(feature = "opentelemetry")]
112 let otel_ctx = {
113 let inner = lock_inner(&ctx.inner);
114 let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
115 let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
116 crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
117 };
118
119 let should_run = {
120 let inner = lock_inner(&ctx.inner);
121 !inner.is_suspended && !inner.is_complete
122 };
123
124 if should_run {
125 tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
126
127 let mut future = (orchestrator_fn)(ctx.clone()).boxed();
130 let poll_result = futures::poll!(future.as_mut());
131
132 match poll_result {
133 Poll::Ready(Ok(output)) => {
134 let mut inner = lock_inner(&ctx.inner);
135 if inner.continue_as_new_input.is_some() {
136 tracing::info!(
137 instance_id = %instance_id,
138 orchestrator = %inner.name,
139 "Orchestration continuing as new"
140 );
141 #[cfg(feature = "opentelemetry")]
142 crate::internal::otel::set_span_status_attribute(
143 &otel_ctx,
144 "CONTINUED_AS_NEW",
145 );
146 inner.is_complete = true;
147 inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
148 } else if !inner.is_complete {
149 tracing::info!(
150 instance_id = %instance_id,
151 orchestrator = %inner.name,
152 "Orchestration completed successfully"
153 );
154 #[cfg(feature = "opentelemetry")]
155 crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
156 inner.is_complete = true;
157 inner.completion_status = Some(OrchestrationStatus::Completed);
158 inner.completion_result = output;
159 }
160 }
161 Poll::Ready(Err(DurableTaskError::TaskFailed {
162 message,
163 failure_details,
164 })) => {
165 let mut inner = lock_inner(&ctx.inner);
166 tracing::warn!(
167 instance_id = %instance_id,
168 orchestrator = %inner.name,
169 error = %message,
170 "Orchestration failed due to task failure"
171 );
172 #[cfg(feature = "opentelemetry")]
173 {
174 crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
175 crate::internal::otel::set_span_error(&otel_ctx, &message);
176 }
177 inner.is_complete = true;
178 inner.completion_status = Some(OrchestrationStatus::Failed);
179 inner.completion_failure =
180 Some(failure_details.unwrap_or_else(|| FailureDetails {
181 message: message.clone(),
182 error_type: "TaskFailed".to_string(),
183 stack_trace: None,
184 }));
185 }
186 Poll::Ready(Err(e)) => {
187 let mut inner = lock_inner(&ctx.inner);
188 tracing::error!(
189 instance_id = %instance_id,
190 orchestrator = %inner.name,
191 error = %e,
192 "Orchestration failed with error"
193 );
194 #[cfg(feature = "opentelemetry")]
195 {
196 crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
197 crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
198 }
199 inner.is_complete = true;
200 inner.completion_status = Some(OrchestrationStatus::Failed);
201 inner.completion_failure = Some(FailureDetails {
202 message: e.to_string(),
203 error_type: "OrchestratorError".to_string(),
204 stack_trace: None,
205 });
206 }
207 Poll::Pending => {
208 let inner = lock_inner(&ctx.inner);
209 tracing::debug!(
210 instance_id = %instance_id,
211 orchestrator = %inner.name,
212 pending_actions = inner.pending_actions.len(),
213 "Orchestrator yielded, waiting for tasks"
214 );
215 }
216 }
217 } else {
218 let inner = lock_inner(&ctx.inner);
219 tracing::debug!(
220 instance_id = %instance_id,
221 is_suspended = inner.is_suspended,
222 is_complete = inner.is_complete,
223 "Skipping orchestrator execution"
224 );
225 #[cfg(feature = "opentelemetry")]
226 if inner.is_complete {
227 crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
228 }
229 }
230
231 #[cfg(feature = "opentelemetry")]
233 crate::internal::otel::end_span(&otel_ctx);
234
235 let response = Self::build_response(&ctx, instance_id, completion_token);
236 tracing::debug!(
237 instance_id = %instance_id,
238 actions = response.actions.len(),
239 "Built orchestration response"
240 );
241 Ok(response)
242 }
243
244 #[cfg(feature = "opentelemetry")]
246 fn find_parent_trace_context<'a>(
247 old_events: &'a [proto::HistoryEvent],
248 new_events: &'a [proto::HistoryEvent],
249 ) -> Option<&'a proto::TraceContext> {
250 old_events.iter().chain(new_events.iter()).find_map(|e| {
251 if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
252 es.parent_trace_context.as_ref()
253 } else {
254 None
255 }
256 })
257 }
258
259 fn process_event(
264 inner: &mut OrchestrationContextInner,
265 event: &proto::HistoryEvent,
266 instance_id: &str,
267 max_identifier_length: usize,
268 ) {
269 let event_type = match &event.event_type {
270 Some(et) => et,
271 None => return,
272 };
273
274 let during_replay = inner.is_replaying.load(Ordering::Acquire);
275 let replay_handle = inner.is_replaying.clone();
276 let pending_task = |inner: &mut OrchestrationContextInner, seq: i32| {
279 let task = inner.pending_tasks.entry(seq).or_default();
280 task.set_replay_handle(replay_handle.clone());
281 task.clone()
282 };
283
284 match event_type {
285 EventType::WorkflowStarted(ws) => {
286 if let Some(ts) = &event.timestamp
287 && let Some(dt) = from_timestamp(ts)
288 {
289 inner.current_utc_datetime = dt;
290 }
291 if let Some(version) = &ws.version {
292 for patch in &version.patches {
293 inner.history_patches.insert(patch.clone());
294 }
295 }
296 }
297 EventType::ExecutionStarted(e) => {
298 tracing::debug!(
299 instance_id = %instance_id,
300 orchestrator = %e.name,
301 "Execution started event"
302 );
303 inner.name = std::sync::Arc::<str>::from(e.name.clone());
304 inner.input = e.input.clone();
305 }
306 EventType::TaskCompleted(e) => {
307 let seq = e.task_scheduled_id;
308 tracing::debug!(
309 instance_id = %instance_id,
310 task_id = seq,
311 "Task completed"
312 );
313 let task = pending_task(inner, seq);
314 if task.is_complete() {
315 tracing::debug!(
316 instance_id = %instance_id,
317 task_id = seq,
318 "Skipping duplicate task completion"
319 );
320 return;
321 }
322 task.complete_with_phase(e.result.clone(), during_replay);
323 }
324 EventType::TaskFailed(e) => {
325 let seq = e.task_scheduled_id;
326 let details = e
327 .failure_details
328 .as_ref()
329 .map(FailureDetails::from)
330 .unwrap_or_else(|| FailureDetails {
331 message: "Task failed".to_string(),
332 error_type: "Unknown".to_string(),
333 stack_trace: None,
334 });
335 tracing::debug!(
336 instance_id = %instance_id,
337 task_id = seq,
338 error = %details.message,
339 "Task failed"
340 );
341 let task = pending_task(inner, seq);
342 if task.is_complete() {
343 tracing::debug!(
344 instance_id = %instance_id,
345 task_id = seq,
346 "Skipping duplicate task completion"
347 );
348 return;
349 }
350 task.fail_with_phase(details, during_replay);
351 }
352 EventType::TaskScheduled(_)
353 | EventType::TimerCreated(_)
354 | EventType::ChildWorkflowInstanceCreated(_) => {
355 inner.history_scheduled_count += 1;
356 }
357 EventType::TimerFired(e) => {
358 let seq = e.timer_id;
359 tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
360 let task = pending_task(inner, seq);
361 if task.is_complete() {
362 tracing::debug!(
363 instance_id = %instance_id,
364 task_id = seq,
365 "Skipping duplicate task completion"
366 );
367 return;
368 }
369 task.complete_with_phase(None, during_replay);
370 }
371 EventType::ChildWorkflowInstanceCompleted(e) => {
372 let seq = e.task_scheduled_id;
373 tracing::debug!(
374 instance_id = %instance_id,
375 task_id = seq,
376 "Child workflow completed"
377 );
378 let task = pending_task(inner, seq);
379 if task.is_complete() {
380 tracing::debug!(
381 instance_id = %instance_id,
382 task_id = seq,
383 "Skipping duplicate task completion"
384 );
385 return;
386 }
387 task.complete_with_phase(e.result.clone(), during_replay);
388 }
389 EventType::ChildWorkflowInstanceFailed(e) => {
390 let seq = e.task_scheduled_id;
391 let details = e
392 .failure_details
393 .as_ref()
394 .map(FailureDetails::from)
395 .unwrap_or_else(|| FailureDetails {
396 message: "Sub-orchestration failed".to_string(),
397 error_type: "Unknown".to_string(),
398 stack_trace: None,
399 });
400 tracing::debug!(
401 instance_id = %instance_id,
402 task_id = seq,
403 error = %details.message,
404 "Child workflow failed"
405 );
406 let task = pending_task(inner, seq);
407 if task.is_complete() {
408 tracing::debug!(
409 instance_id = %instance_id,
410 task_id = seq,
411 "Skipping duplicate task completion"
412 );
413 return;
414 }
415 task.fail_with_phase(details, during_replay);
416 }
417 EventType::EventRaised(e) => {
418 if let Err(err) = crate::internal::validate_identifier(
419 &e.name,
420 "event name",
421 max_identifier_length,
422 ) {
423 tracing::warn!(
424 instance_id = %instance_id,
425 event_name = %e.name,
426 error = %err,
427 "Rejected event: invalid event name"
428 );
429 return;
430 }
431 let event_name = e.name.to_lowercase();
432 tracing::debug!(
433 instance_id = %instance_id,
434 event_name = %e.name,
435 "External event raised"
436 );
437
438 if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name)
439 && !tasks.is_empty()
440 {
441 let task = tasks
442 .pop_front()
443 .expect("pending event task queue is not empty");
444 if task.is_complete() {
445 tracing::debug!(
446 instance_id = %instance_id,
447 event_name = %e.name,
448 "Skipping duplicate task completion"
449 );
450 return;
451 }
452 task.complete_with_phase(e.input.clone(), during_replay);
453 return;
454 }
455
456 if inner.buffered_events.len() >= inner.config.max_event_names
457 && !inner.buffered_events.contains_key(&event_name)
458 {
459 tracing::warn!(
460 instance_id = %instance_id,
461 event_name = %e.name,
462 "Event name limit reached, discarding event"
463 );
464 return;
465 }
466
467 let max_events = inner.config.max_events_per_name;
468 let events = inner.buffered_events.entry(event_name).or_default();
469 if events.len() >= max_events {
470 tracing::warn!(
471 instance_id = %instance_id,
472 event_name = %e.name,
473 "Event buffer limit reached, discarding event"
474 );
475 return;
476 }
477 events.push_back((e.input.clone(), during_replay));
478 }
479 EventType::ExecutionSuspended(_) => {
480 tracing::info!(instance_id = %instance_id, "Orchestration suspended");
481 inner.is_suspended = true;
482 }
483 EventType::ExecutionResumed(_) => {
484 tracing::info!(instance_id = %instance_id, "Orchestration resumed");
485 inner.is_suspended = false;
486 }
487 EventType::ExecutionTerminated(e) => {
488 tracing::info!(instance_id = %instance_id, "Orchestration terminated");
489 inner.is_complete = true;
490 inner.completion_status = Some(OrchestrationStatus::Terminated);
491 inner.completion_result = e.input.clone();
492 inner.pending_actions.clear();
493 }
494 EventType::ExecutionCompleted(_)
495 | EventType::WorkflowCompleted(_)
496 | EventType::EventSent(_)
497 | EventType::ContinueAsNew(_)
498 | EventType::ExecutionStalled(_)
499 | EventType::DetachedWorkflowInstanceCreated(_) => {}
500 }
501 }
502
503 fn make_complete_action(
504 id: i32,
505 status: proto::OrchestrationStatus,
506 result: Option<String>,
507 carryover_events: Vec<proto::HistoryEvent>,
508 failure: Option<FailureDetails>,
509 ) -> proto::WorkflowAction {
510 proto::WorkflowAction {
511 id,
512 router: None,
513 workflow_action_type: Some(
514 proto::workflow_action::WorkflowActionType::CompleteWorkflow(
515 proto::CompleteWorkflowAction {
516 workflow_status: status as i32,
517 result,
518 details: None,
519 new_version: None,
520 carryover_events,
521 failure_details: failure.map(|f| proto::TaskFailureDetails {
522 error_type: f.error_type,
523 error_message: f.message,
524 stack_trace: f.stack_trace,
525 inner_failure: None,
526 is_non_retriable: false,
527 }),
528 },
529 ),
530 ),
531 }
532 }
533
534 fn build_response(
535 ctx: &OrchestrationContext,
536 instance_id: &str,
537 completion_token: String,
538 ) -> proto::WorkflowResponse {
539 let mut inner = lock_inner(&ctx.inner);
540
541 let mut actions = std::mem::take(&mut inner.pending_actions);
544
545 if let Some(new_input) = inner.continue_as_new_input.take() {
546 let mut carryover_events = Vec::new();
547 if inner.save_events_on_continue {
548 for (name, events) in &inner.buffered_events {
549 for (input, _during_replay) in events {
550 carryover_events.push(proto::HistoryEvent {
551 event_id: -1,
552 timestamp: None,
553 router: None,
554 event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
555 name: name.clone(),
556 input: input.clone(),
557 })),
558 });
559 }
560 }
561 }
562
563 actions.push(Self::make_complete_action(
564 actions.len() as i32,
565 proto::OrchestrationStatus::ContinuedAsNew,
566 Some(new_input),
567 carryover_events,
568 None,
569 ));
570 } else if let Some(status) = inner.completion_status {
571 match status {
572 OrchestrationStatus::Completed => {
573 actions.push(Self::make_complete_action(
574 actions.len() as i32,
575 proto::OrchestrationStatus::Completed,
576 inner.completion_result.take(),
577 Vec::new(),
578 None,
579 ));
580 }
581 OrchestrationStatus::Failed => {
582 let failure = inner.completion_failure.take();
583 actions.push(Self::make_complete_action(
584 actions.len() as i32,
585 proto::OrchestrationStatus::Failed,
586 None,
587 Vec::new(),
588 failure,
589 ));
590 }
591 OrchestrationStatus::Terminated => {
592 actions.push(Self::make_complete_action(
593 actions.len() as i32,
594 proto::OrchestrationStatus::Terminated,
595 inner.completion_result.take(),
596 Vec::new(),
597 None,
598 ));
599 }
600 _ => {
601 }
603 }
604 }
605
606 proto::WorkflowResponse {
607 instance_id: instance_id.to_string(),
608 actions,
609 custom_status: inner.custom_status.take(),
610 completion_token,
611 num_events_processed: None,
612 version: None,
613 }
614 }
615}
616
617#[cfg(test)]
618mod tests {
619 use super::*;
620 use crate::internal::to_timestamp;
621 use crate::proto::history_event::EventType;
622
623 use std::sync::Arc;
624
625 fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
626 proto::HistoryEvent {
627 event_id: 1,
628 timestamp: Some(to_timestamp(ts)),
629 router: None,
630 event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
631 version: None,
632 })),
633 }
634 }
635
636 fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
637 proto::HistoryEvent {
638 event_id: 2,
639 timestamp: Some(to_timestamp(chrono::Utc::now())),
640 router: None,
641 event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
642 name: name.to_string(),
643 version: None,
644 input,
645 workflow_instance: None,
646 parent_instance: None,
647 scheduled_start_timestamp: None,
648 parent_trace_context: None,
649 workflow_span_id: None,
650 tags: Default::default(),
651 })),
652 }
653 }
654
655 fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
656 proto::HistoryEvent {
657 event_id,
658 timestamp: Some(to_timestamp(chrono::Utc::now())),
659 router: None,
660 event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
661 name: name.to_string(),
662 version: None,
663 input: None,
664 parent_trace_context: None,
665 task_execution_id: String::new(),
666 rerun_parent_instance_info: None,
667 history_propagation_scope: None,
668 })),
669 }
670 }
671
672 fn make_task_completed(
673 event_id: i32,
674 task_scheduled_id: i32,
675 result: Option<String>,
676 ) -> proto::HistoryEvent {
677 proto::HistoryEvent {
678 event_id,
679 timestamp: Some(to_timestamp(chrono::Utc::now())),
680 router: None,
681 event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
682 task_scheduled_id,
683 result,
684 task_execution_id: String::new(),
685 attestation: None,
686 signer_certificate: None,
687 })),
688 }
689 }
690
691 fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
692 proto::HistoryEvent {
693 event_id,
694 timestamp: Some(to_timestamp(chrono::Utc::now())),
695 router: None,
696 event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
697 task_scheduled_id,
698 failure_details: Some(proto::TaskFailureDetails {
699 error_type: "TestError".to_string(),
700 error_message: "test failure".to_string(),
701 stack_trace: None,
702 inner_failure: None,
703 is_non_retriable: false,
704 }),
705 task_execution_id: String::new(),
706 attestation: None,
707 signer_certificate: None,
708 })),
709 }
710 }
711
712 #[tokio::test]
713 async fn test_simple_orchestrator_completes() {
714 let orch_fn: OrchestratorFn =
715 Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
716
717 let ts = chrono::Utc::now();
718 let old_events = vec![make_workflow_started(ts)];
719 let new_events = vec![make_execution_started("test_orch", None)];
720
721 let resp = OrchestrationExecutor::execute(
722 &orch_fn,
723 "inst-1",
724 old_events,
725 new_events,
726 String::new(),
727 &WorkerOptions::default(),
728 None,
729 )
730 .await
731 .unwrap();
732
733 assert_eq!(resp.instance_id, "inst-1");
734 let complete_action = resp.actions.iter().find(|a| {
735 matches!(
736 &a.workflow_action_type,
737 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
738 )
739 });
740 assert!(complete_action.is_some());
741 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
742 &complete_action.unwrap().workflow_action_type
743 {
744 assert_eq!(
745 cw.workflow_status,
746 proto::OrchestrationStatus::Completed as i32
747 );
748 assert_eq!(cw.result, Some("\"done\"".to_string()));
749 }
750 }
751
752 #[tokio::test]
753 async fn test_orchestrator_with_activity_replay() {
754 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
755 Box::pin(async move {
756 let result = ctx.call_activity("greet", "world").await?;
757 Ok(result)
758 })
759 });
760
761 let ts = chrono::Utc::now();
762 let old_events = vec![
763 make_workflow_started(ts),
764 make_execution_started("test_orch", None),
765 make_task_scheduled(3, "greet"),
766 make_task_completed(4, 0, Some("\"hello world\"".to_string())),
767 ];
768 let new_events = vec![];
769
770 let resp = OrchestrationExecutor::execute(
771 &orch_fn,
772 "inst-1",
773 old_events,
774 new_events,
775 String::new(),
776 &WorkerOptions::default(),
777 None,
778 )
779 .await
780 .unwrap();
781
782 let complete_action = resp.actions.iter().find(|a| {
783 matches!(
784 &a.workflow_action_type,
785 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
786 )
787 });
788 assert!(complete_action.is_some());
789 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
790 &complete_action.unwrap().workflow_action_type
791 {
792 assert_eq!(
793 cw.workflow_status,
794 proto::OrchestrationStatus::Completed as i32
795 );
796 assert_eq!(cw.result, Some("\"hello world\"".to_string()));
797 }
798 }
799
800 #[tokio::test]
801 async fn test_orchestrator_pending_activity() {
802 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
803 Box::pin(async move {
804 let result = ctx.call_activity("greet", "world").await?;
805 Ok(result)
806 })
807 });
808
809 let ts = chrono::Utc::now();
810 let old_events = vec![make_workflow_started(ts)];
811 let new_events = vec![make_execution_started("test_orch", None)];
812
813 let resp = OrchestrationExecutor::execute(
814 &orch_fn,
815 "inst-1",
816 old_events,
817 new_events,
818 String::new(),
819 &WorkerOptions::default(),
820 None,
821 )
822 .await
823 .unwrap();
824
825 let has_schedule = resp.actions.iter().any(|a| {
826 matches!(
827 &a.workflow_action_type,
828 Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
829 )
830 });
831 assert!(has_schedule);
832
833 let has_complete = resp.actions.iter().any(|a| {
834 matches!(
835 &a.workflow_action_type,
836 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
837 )
838 });
839 assert!(!has_complete);
840 }
841
842 #[tokio::test]
843 async fn test_orchestrator_task_failure() {
844 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
845 Box::pin(async move {
846 let result = ctx.call_activity("greet", "world").await?;
847 Ok(result)
848 })
849 });
850
851 let ts = chrono::Utc::now();
852 let old_events = vec![
853 make_workflow_started(ts),
854 make_execution_started("test_orch", None),
855 make_task_scheduled(3, "greet"),
856 make_task_failed(4, 0),
857 ];
858 let new_events = vec![];
859
860 let resp = OrchestrationExecutor::execute(
861 &orch_fn,
862 "inst-1",
863 old_events,
864 new_events,
865 String::new(),
866 &WorkerOptions::default(),
867 None,
868 )
869 .await
870 .unwrap();
871
872 let complete_action = resp.actions.iter().find(|a| {
873 matches!(
874 &a.workflow_action_type,
875 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
876 )
877 });
878 assert!(complete_action.is_some());
879 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
880 &complete_action.unwrap().workflow_action_type
881 {
882 assert_eq!(
883 cw.workflow_status,
884 proto::OrchestrationStatus::Failed as i32
885 );
886 assert!(cw.failure_details.is_some());
887 }
888 }
889
890 #[tokio::test]
891 async fn test_suspended_orchestration_not_run() {
892 let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
893
894 let ts = chrono::Utc::now();
895 let old_events = vec![make_workflow_started(ts)];
896 let new_events = vec![
897 make_execution_started("test_orch", None),
898 proto::HistoryEvent {
899 event_id: 3,
900 timestamp: Some(to_timestamp(chrono::Utc::now())),
901 router: None,
902 event_type: Some(EventType::ExecutionSuspended(
903 proto::ExecutionSuspendedEvent {
904 input: Some("paused".to_string()),
905 },
906 )),
907 },
908 ];
909
910 let resp = OrchestrationExecutor::execute(
911 &orch_fn,
912 "inst-1",
913 old_events,
914 new_events,
915 String::new(),
916 &WorkerOptions::default(),
917 None,
918 )
919 .await
920 .unwrap();
921
922 assert!(resp.actions.is_empty());
923 }
924
925 #[tokio::test]
926 async fn test_terminated_orchestration_not_run() {
927 let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
928
929 let ts = chrono::Utc::now();
930 let old_events = vec![make_workflow_started(ts)];
931 let new_events = vec![
932 make_execution_started("test_orch", None),
933 proto::HistoryEvent {
934 event_id: 3,
935 timestamp: Some(to_timestamp(chrono::Utc::now())),
936 router: None,
937 event_type: Some(EventType::ExecutionTerminated(
938 proto::ExecutionTerminatedEvent {
939 input: None,
940 recurse: false,
941 },
942 )),
943 },
944 ];
945
946 let resp = OrchestrationExecutor::execute(
947 &orch_fn,
948 "inst-1",
949 old_events,
950 new_events,
951 String::new(),
952 &WorkerOptions::default(),
953 None,
954 )
955 .await
956 .unwrap();
957
958 assert_eq!(resp.actions.len(), 1);
960 match &resp.actions[0].workflow_action_type {
961 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
962 assert_eq!(
963 cw.workflow_status,
964 proto::OrchestrationStatus::Terminated as i32
965 );
966 assert!(cw.result.is_none());
967 }
968 other => panic!("expected CompleteWorkflow, got {other:?}"),
969 }
970 }
971
972 #[tokio::test]
973 async fn test_continue_as_new() {
974 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
975 Box::pin(async move {
976 ctx.continue_as_new("new_input", false);
977 Ok(None)
978 })
979 });
980
981 let ts = chrono::Utc::now();
982 let old_events = vec![make_workflow_started(ts)];
983 let new_events = vec![make_execution_started("test_orch", None)];
984
985 let resp = OrchestrationExecutor::execute(
986 &orch_fn,
987 "inst-1",
988 old_events,
989 new_events,
990 String::new(),
991 &WorkerOptions::default(),
992 None,
993 )
994 .await
995 .unwrap();
996
997 let complete_action = resp.actions.iter().find(|a| {
998 matches!(
999 &a.workflow_action_type,
1000 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1001 )
1002 });
1003 assert!(complete_action.is_some());
1004 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1005 &complete_action.unwrap().workflow_action_type
1006 {
1007 assert_eq!(
1008 cw.workflow_status,
1009 proto::OrchestrationStatus::ContinuedAsNew as i32
1010 );
1011 assert_eq!(cw.result, Some("\"new_input\"".to_string()));
1012 }
1013 }
1014
1015 #[tokio::test]
1016 async fn test_external_event_delivery() {
1017 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
1018 Box::pin(async move {
1019 let result = ctx.wait_for_external_event("approval").await?;
1020 Ok(result)
1021 })
1022 });
1023
1024 let ts = chrono::Utc::now();
1025 let old_events = vec![make_workflow_started(ts)];
1026 let new_events = vec![
1027 make_execution_started("test_orch", None),
1028 proto::HistoryEvent {
1029 event_id: 3,
1030 timestamp: Some(to_timestamp(chrono::Utc::now())),
1031 router: None,
1032 event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
1033 name: "approval".to_string(),
1034 input: Some("\"yes\"".to_string()),
1035 })),
1036 },
1037 ];
1038
1039 let resp = OrchestrationExecutor::execute(
1040 &orch_fn,
1041 "inst-1",
1042 old_events,
1043 new_events,
1044 String::new(),
1045 &WorkerOptions::default(),
1046 None,
1047 )
1048 .await
1049 .unwrap();
1050
1051 let complete_action = resp.actions.iter().find(|a| {
1052 matches!(
1053 &a.workflow_action_type,
1054 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1055 )
1056 });
1057 assert!(complete_action.is_some());
1058 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1059 &complete_action.unwrap().workflow_action_type
1060 {
1061 assert_eq!(
1062 cw.workflow_status,
1063 proto::OrchestrationStatus::Completed as i32
1064 );
1065 assert_eq!(cw.result, Some("\"yes\"".to_string()));
1066 }
1067 }
1068}