1use std::sync::atomic::Ordering;
2use std::task::Poll;
3
4use futures::FutureExt;
5
6use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
7use crate::internal::from_timestamp;
8use crate::proto;
9use crate::proto::history_event::EventType;
10use crate::task::OrchestrationContext;
11use crate::task::orchestration_context::{OrchestrationContextInner, lock_inner};
12
13use super::options::WorkerOptions;
14use super::registry::OrchestratorFn;
15
16pub struct OrchestrationExecutor;
24
25impl OrchestrationExecutor {
26 pub async fn execute(
30 orchestrator_fn: &OrchestratorFn,
31 instance_id: &str,
32 old_events: Vec<proto::HistoryEvent>,
33 new_events: Vec<proto::HistoryEvent>,
34 completion_token: String,
35 options: &WorkerOptions,
36 propagated_history: Option<crate::api::PropagatedHistory>,
37 ) -> crate::api::Result<proto::WorkflowResponse> {
38 tracing::info!(
39 instance_id = %instance_id,
40 past_events = old_events.len(),
41 new_events = new_events.len(),
42 "Starting orchestration execution"
43 );
44
45 let ctx = OrchestrationContext::new(
47 instance_id.to_string(),
48 String::new(),
49 None,
50 chrono::Utc::now(),
51 true,
52 options,
53 old_events.len() + new_events.len(),
54 );
55
56 if propagated_history.is_some() {
59 let mut inner = lock_inner(&ctx.inner);
60 inner.propagated_history = propagated_history.map(std::sync::Arc::new);
61 }
62
63 let initially_replaying = !old_events.is_empty();
71 {
72 let mut inner = lock_inner(&ctx.inner);
73 inner.is_replaying.store(true, Ordering::Release);
74 tracing::debug!(
75 instance_id = %instance_id,
76 count = old_events.len(),
77 "Replaying old events"
78 );
79 for event in &old_events {
80 Self::process_event(
81 &mut inner,
82 event,
83 instance_id,
84 options.max_identifier_length,
85 );
86 }
87
88 inner.is_replaying.store(false, Ordering::Release);
89 tracing::debug!(
90 instance_id = %instance_id,
91 count = new_events.len(),
92 "Processing new events"
93 );
94 for event in &new_events {
95 Self::process_event(
96 &mut inner,
97 event,
98 instance_id,
99 options.max_identifier_length,
100 );
101 }
102
103 inner
106 .is_replaying
107 .store(initially_replaying, Ordering::Release);
108 }
109
110 #[cfg(feature = "opentelemetry")]
112 let otel_ctx = {
113 let inner = lock_inner(&ctx.inner);
114 let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
115 let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
116 crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
117 };
118
119 let should_run = {
120 let inner = lock_inner(&ctx.inner);
121 !inner.is_suspended && !inner.is_complete
122 };
123
124 if should_run {
125 tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
126
127 let mut future = (orchestrator_fn)(ctx.clone()).boxed();
130 let poll_result = futures::poll!(future.as_mut());
131
132 match poll_result {
133 Poll::Ready(Ok(output)) => {
134 let mut inner = lock_inner(&ctx.inner);
135 if inner.continue_as_new_input.is_some() {
136 tracing::info!(
137 instance_id = %instance_id,
138 orchestrator = %inner.name,
139 "Orchestration continuing as new"
140 );
141 #[cfg(feature = "opentelemetry")]
142 crate::internal::otel::set_span_status_attribute(
143 &otel_ctx,
144 "CONTINUED_AS_NEW",
145 );
146 inner.is_complete = true;
147 inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
148 } else if !inner.is_complete {
149 tracing::info!(
150 instance_id = %instance_id,
151 orchestrator = %inner.name,
152 "Orchestration completed successfully"
153 );
154 #[cfg(feature = "opentelemetry")]
155 crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
156 inner.is_complete = true;
157 inner.completion_status = Some(OrchestrationStatus::Completed);
158 inner.completion_result = output;
159 }
160 }
161 Poll::Ready(Err(DurableTaskError::TaskFailed {
162 message,
163 failure_details,
164 })) => {
165 let mut inner = lock_inner(&ctx.inner);
166 tracing::warn!(
167 instance_id = %instance_id,
168 orchestrator = %inner.name,
169 error = %message,
170 "Orchestration failed due to task failure"
171 );
172 #[cfg(feature = "opentelemetry")]
173 {
174 crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
175 crate::internal::otel::set_span_error(&otel_ctx, &message);
176 }
177 inner.is_complete = true;
178 inner.completion_status = Some(OrchestrationStatus::Failed);
179 inner.completion_failure =
180 Some(failure_details.unwrap_or_else(|| FailureDetails {
181 message: message.clone(),
182 error_type: "TaskFailed".to_string(),
183 stack_trace: None,
184 }));
185 }
186 Poll::Ready(Err(e)) => {
187 let mut inner = lock_inner(&ctx.inner);
188 tracing::error!(
189 instance_id = %instance_id,
190 orchestrator = %inner.name,
191 error = %e,
192 "Orchestration failed with error"
193 );
194 #[cfg(feature = "opentelemetry")]
195 {
196 crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
197 crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
198 }
199 inner.is_complete = true;
200 inner.completion_status = Some(OrchestrationStatus::Failed);
201 inner.completion_failure = Some(FailureDetails {
202 message: e.to_string(),
203 error_type: "OrchestratorError".to_string(),
204 stack_trace: None,
205 });
206 }
207 Poll::Pending => {
208 let inner = lock_inner(&ctx.inner);
209 tracing::debug!(
210 instance_id = %instance_id,
211 orchestrator = %inner.name,
212 pending_actions = inner.pending_actions.len(),
213 "Orchestrator yielded, waiting for tasks"
214 );
215 }
216 }
217 } else {
218 let inner = lock_inner(&ctx.inner);
219 tracing::debug!(
220 instance_id = %instance_id,
221 is_suspended = inner.is_suspended,
222 is_complete = inner.is_complete,
223 "Skipping orchestrator execution"
224 );
225 #[cfg(feature = "opentelemetry")]
226 if inner.is_complete {
227 crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
228 }
229 }
230
231 #[cfg(feature = "opentelemetry")]
233 crate::internal::otel::end_span(&otel_ctx);
234
235 let response = Self::build_response(&ctx, instance_id, completion_token);
236 tracing::debug!(
237 instance_id = %instance_id,
238 actions = response.actions.len(),
239 "Built orchestration response"
240 );
241 Ok(response)
242 }
243
244 #[cfg(feature = "opentelemetry")]
246 fn find_parent_trace_context<'a>(
247 old_events: &'a [proto::HistoryEvent],
248 new_events: &'a [proto::HistoryEvent],
249 ) -> Option<&'a proto::TraceContext> {
250 old_events.iter().chain(new_events.iter()).find_map(|e| {
251 if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
252 es.parent_trace_context.as_ref()
253 } else {
254 None
255 }
256 })
257 }
258
259 fn process_event(
264 inner: &mut OrchestrationContextInner,
265 event: &proto::HistoryEvent,
266 instance_id: &str,
267 max_identifier_length: usize,
268 ) {
269 let event_type = match &event.event_type {
270 Some(et) => et,
271 None => return,
272 };
273
274 let during_replay = inner.is_replaying.load(Ordering::Acquire);
275 let replay_handle = inner.is_replaying.clone();
276 let pending_task = |inner: &mut OrchestrationContextInner, seq: i32| {
279 let task = inner.pending_tasks.entry(seq).or_default();
280 task.set_replay_handle(replay_handle.clone());
281 task.clone()
282 };
283
284 match event_type {
285 EventType::WorkflowStarted(ws) => {
286 if let Some(ts) = &event.timestamp
287 && let Some(dt) = from_timestamp(ts)
288 {
289 inner.current_utc_datetime = dt;
290 }
291 if let Some(version) = &ws.version {
292 for patch in &version.patches {
293 inner.history_patches.insert(patch.clone());
294 }
295 }
296 }
297 EventType::ExecutionStarted(e) => {
298 tracing::debug!(
299 instance_id = %instance_id,
300 orchestrator = %e.name,
301 "Execution started event"
302 );
303 inner.name = std::sync::Arc::<str>::from(e.name.clone());
304 inner.input = e.input.clone();
305 }
306 EventType::TaskCompleted(e) => {
307 let seq = e.task_scheduled_id;
308 tracing::debug!(
309 instance_id = %instance_id,
310 task_id = seq,
311 "Task completed"
312 );
313 let task = pending_task(inner, seq);
314 if task.is_complete() {
315 tracing::debug!(
316 instance_id = %instance_id,
317 task_id = seq,
318 "Skipping duplicate task completion"
319 );
320 return;
321 }
322 task.complete_with_phase(e.result.clone(), during_replay);
323 }
324 EventType::TaskFailed(e) => {
325 let seq = e.task_scheduled_id;
326 let details = e
327 .failure_details
328 .as_ref()
329 .map(FailureDetails::from)
330 .unwrap_or_else(|| FailureDetails {
331 message: "Task failed".to_string(),
332 error_type: "Unknown".to_string(),
333 stack_trace: None,
334 });
335 tracing::debug!(
336 instance_id = %instance_id,
337 task_id = seq,
338 error = %details.message,
339 "Task failed"
340 );
341 let task = pending_task(inner, seq);
342 if task.is_complete() {
343 tracing::debug!(
344 instance_id = %instance_id,
345 task_id = seq,
346 "Skipping duplicate task completion"
347 );
348 return;
349 }
350 task.fail_with_phase(details, during_replay);
351 }
352 EventType::TaskScheduled(_)
353 | EventType::TimerCreated(_)
354 | EventType::ChildWorkflowInstanceCreated(_) => {
355 inner.history_scheduled_count += 1;
356 }
357 EventType::TimerFired(e) => {
358 let seq = e.timer_id;
359 tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
360 let task = pending_task(inner, seq);
361 if task.is_complete() {
362 tracing::debug!(
363 instance_id = %instance_id,
364 task_id = seq,
365 "Skipping duplicate task completion"
366 );
367 return;
368 }
369 task.complete_with_phase(None, during_replay);
370 }
371 EventType::ChildWorkflowInstanceCompleted(e) => {
372 let seq = e.task_scheduled_id;
373 tracing::debug!(
374 instance_id = %instance_id,
375 task_id = seq,
376 "Child workflow completed"
377 );
378 let task = pending_task(inner, seq);
379 if task.is_complete() {
380 tracing::debug!(
381 instance_id = %instance_id,
382 task_id = seq,
383 "Skipping duplicate task completion"
384 );
385 return;
386 }
387 task.complete_with_phase(e.result.clone(), during_replay);
388 }
389 EventType::ChildWorkflowInstanceFailed(e) => {
390 let seq = e.task_scheduled_id;
391 let details = e
392 .failure_details
393 .as_ref()
394 .map(FailureDetails::from)
395 .unwrap_or_else(|| FailureDetails {
396 message: "Sub-orchestration failed".to_string(),
397 error_type: "Unknown".to_string(),
398 stack_trace: None,
399 });
400 tracing::debug!(
401 instance_id = %instance_id,
402 task_id = seq,
403 error = %details.message,
404 "Child workflow failed"
405 );
406 let task = pending_task(inner, seq);
407 if task.is_complete() {
408 tracing::debug!(
409 instance_id = %instance_id,
410 task_id = seq,
411 "Skipping duplicate task completion"
412 );
413 return;
414 }
415 task.fail_with_phase(details, during_replay);
416 }
417 EventType::EventRaised(e) => {
418 if let Err(err) = crate::internal::validate_identifier(
419 &e.name,
420 "event name",
421 max_identifier_length,
422 ) {
423 tracing::warn!(
424 instance_id = %instance_id,
425 event_name = %e.name,
426 error = %err,
427 "Rejected event: invalid event name"
428 );
429 return;
430 }
431 let event_name = e.name.to_lowercase();
432 tracing::debug!(
433 instance_id = %instance_id,
434 event_name = %e.name,
435 "External event raised"
436 );
437
438 if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name)
439 && !tasks.is_empty()
440 {
441 let task = tasks
442 .pop_front()
443 .expect("pending event task queue is not empty");
444 if task.is_complete() {
445 tracing::debug!(
446 instance_id = %instance_id,
447 event_name = %e.name,
448 "Skipping duplicate task completion"
449 );
450 return;
451 }
452 task.complete_with_phase(e.input.clone(), during_replay);
453 return;
454 }
455
456 if inner.buffered_events.len() >= inner.config.max_event_names
457 && !inner.buffered_events.contains_key(&event_name)
458 {
459 tracing::warn!(
460 instance_id = %instance_id,
461 event_name = %e.name,
462 "Event name limit reached, discarding event"
463 );
464 return;
465 }
466
467 let max_events = inner.config.max_events_per_name;
468 let events = inner.buffered_events.entry(event_name).or_default();
469 if events.len() >= max_events {
470 tracing::warn!(
471 instance_id = %instance_id,
472 event_name = %e.name,
473 "Event buffer limit reached, discarding event"
474 );
475 return;
476 }
477 events.push_back((e.input.clone(), during_replay));
478 }
479 EventType::ExecutionSuspended(_) => {
480 tracing::info!(instance_id = %instance_id, "Orchestration suspended");
481 inner.is_suspended = true;
482 }
483 EventType::ExecutionResumed(_) => {
484 tracing::info!(instance_id = %instance_id, "Orchestration resumed");
485 inner.is_suspended = false;
486 }
487 EventType::ExecutionTerminated(e) => {
488 tracing::info!(instance_id = %instance_id, "Orchestration terminated");
489 inner.is_complete = true;
490 inner.completion_status = Some(OrchestrationStatus::Terminated);
491 inner.completion_result = e.input.clone();
492 inner.pending_actions.clear();
493 }
494 EventType::ExecutionCompleted(_)
495 | EventType::WorkflowCompleted(_)
496 | EventType::EventSent(_)
497 | EventType::ContinueAsNew(_)
498 | EventType::ExecutionStalled(_)
499 | EventType::DetachedWorkflowInstanceCreated(_) => {}
500 }
501 }
502
503 fn make_complete_action(
504 id: i32,
505 status: proto::OrchestrationStatus,
506 result: Option<String>,
507 carryover_events: Vec<proto::HistoryEvent>,
508 failure: Option<FailureDetails>,
509 ) -> proto::WorkflowAction {
510 proto::WorkflowAction {
511 id,
512 router: None,
513 workflow_action_type: Some(
514 proto::workflow_action::WorkflowActionType::CompleteWorkflow(
515 proto::CompleteWorkflowAction {
516 workflow_status: status as i32,
517 result,
518 details: None,
519 new_version: None,
520 carryover_events,
521 failure_details: failure.map(|f| proto::TaskFailureDetails {
522 error_type: f.error_type,
523 error_message: f.message,
524 stack_trace: f.stack_trace,
525 inner_failure: None,
526 is_non_retriable: false,
527 }),
528 },
529 ),
530 ),
531 }
532 }
533
534 fn build_response(
535 ctx: &OrchestrationContext,
536 instance_id: &str,
537 completion_token: String,
538 ) -> proto::WorkflowResponse {
539 let mut inner = lock_inner(&ctx.inner);
540
541 let mut actions = std::mem::take(&mut inner.pending_actions);
544
545 if let Some(new_input) = inner.continue_as_new_input.take() {
546 let mut carryover_events = Vec::new();
547 if inner.save_events_on_continue {
548 for (name, events) in &inner.buffered_events {
549 for (input, _during_replay) in events {
550 carryover_events.push(proto::HistoryEvent {
551 event_id: -1,
552 timestamp: None,
553 router: None,
554 event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
555 name: name.clone(),
556 input: input.clone(),
557 })),
558 });
559 }
560 }
561 }
562
563 actions.push(Self::make_complete_action(
564 actions.len() as i32,
565 proto::OrchestrationStatus::ContinuedAsNew,
566 Some(new_input),
567 carryover_events,
568 None,
569 ));
570 } else if let Some(status) = inner.completion_status {
571 match status {
572 OrchestrationStatus::Completed => {
573 actions.push(Self::make_complete_action(
574 actions.len() as i32,
575 proto::OrchestrationStatus::Completed,
576 inner.completion_result.take(),
577 Vec::new(),
578 None,
579 ));
580 }
581 OrchestrationStatus::Failed => {
582 let failure = inner.completion_failure.take();
583 actions.push(Self::make_complete_action(
584 actions.len() as i32,
585 proto::OrchestrationStatus::Failed,
586 None,
587 Vec::new(),
588 failure,
589 ));
590 }
591 OrchestrationStatus::Terminated => {
592 actions.push(Self::make_complete_action(
593 actions.len() as i32,
594 proto::OrchestrationStatus::Terminated,
595 inner.completion_result.take(),
596 Vec::new(),
597 None,
598 ));
599 }
600 _ => {
601 }
603 }
604 }
605
606 let version = {
609 let mut applied: Vec<String> = inner
610 .applied_patches
611 .iter()
612 .filter(|(_, v)| **v)
613 .map(|(k, _)| k.clone())
614 .collect();
615 if applied.is_empty() {
616 None
617 } else {
618 applied.sort();
619 Some(proto::WorkflowVersion {
620 patches: applied,
621 name: None,
622 })
623 }
624 };
625
626 proto::WorkflowResponse {
627 instance_id: instance_id.to_string(),
628 actions,
629 custom_status: inner.custom_status.take(),
630 completion_token,
631 num_events_processed: None,
632 version,
633 }
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use crate::internal::to_timestamp;
641 use crate::proto::history_event::EventType;
642
643 use std::sync::Arc;
644
645 fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
646 proto::HistoryEvent {
647 event_id: 1,
648 timestamp: Some(to_timestamp(ts)),
649 router: None,
650 event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
651 version: None,
652 })),
653 }
654 }
655
656 fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
657 proto::HistoryEvent {
658 event_id: 2,
659 timestamp: Some(to_timestamp(chrono::Utc::now())),
660 router: None,
661 event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
662 name: name.to_string(),
663 version: None,
664 input,
665 workflow_instance: None,
666 parent_instance: None,
667 scheduled_start_timestamp: None,
668 parent_trace_context: None,
669 workflow_span_id: None,
670 tags: Default::default(),
671 })),
672 }
673 }
674
675 fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
676 proto::HistoryEvent {
677 event_id,
678 timestamp: Some(to_timestamp(chrono::Utc::now())),
679 router: None,
680 event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
681 name: name.to_string(),
682 version: None,
683 input: None,
684 parent_trace_context: None,
685 task_execution_id: String::new(),
686 rerun_parent_instance_info: None,
687 history_propagation_scope: None,
688 })),
689 }
690 }
691
692 fn make_task_completed(
693 event_id: i32,
694 task_scheduled_id: i32,
695 result: Option<String>,
696 ) -> proto::HistoryEvent {
697 proto::HistoryEvent {
698 event_id,
699 timestamp: Some(to_timestamp(chrono::Utc::now())),
700 router: None,
701 event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
702 task_scheduled_id,
703 result,
704 task_execution_id: String::new(),
705 attestation: None,
706 signer_certificate: None,
707 })),
708 }
709 }
710
711 fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
712 proto::HistoryEvent {
713 event_id,
714 timestamp: Some(to_timestamp(chrono::Utc::now())),
715 router: None,
716 event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
717 task_scheduled_id,
718 failure_details: Some(proto::TaskFailureDetails {
719 error_type: "TestError".to_string(),
720 error_message: "test failure".to_string(),
721 stack_trace: None,
722 inner_failure: None,
723 is_non_retriable: false,
724 }),
725 task_execution_id: String::new(),
726 attestation: None,
727 signer_certificate: None,
728 })),
729 }
730 }
731
732 #[tokio::test]
733 async fn test_simple_orchestrator_completes() {
734 let orch_fn: OrchestratorFn =
735 Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
736
737 let ts = chrono::Utc::now();
738 let old_events = vec![make_workflow_started(ts)];
739 let new_events = vec![make_execution_started("test_orch", None)];
740
741 let resp = OrchestrationExecutor::execute(
742 &orch_fn,
743 "inst-1",
744 old_events,
745 new_events,
746 String::new(),
747 &WorkerOptions::default(),
748 None,
749 )
750 .await
751 .unwrap();
752
753 assert_eq!(resp.instance_id, "inst-1");
754 let complete_action = resp.actions.iter().find(|a| {
755 matches!(
756 &a.workflow_action_type,
757 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
758 )
759 });
760 assert!(complete_action.is_some());
761 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
762 &complete_action.unwrap().workflow_action_type
763 {
764 assert_eq!(
765 cw.workflow_status,
766 proto::OrchestrationStatus::Completed as i32
767 );
768 assert_eq!(cw.result, Some("\"done\"".to_string()));
769 }
770 }
771
772 #[tokio::test]
773 async fn test_orchestrator_with_activity_replay() {
774 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
775 Box::pin(async move {
776 let result = ctx.call_activity("greet", "world").await?;
777 Ok(result)
778 })
779 });
780
781 let ts = chrono::Utc::now();
782 let old_events = vec![
783 make_workflow_started(ts),
784 make_execution_started("test_orch", None),
785 make_task_scheduled(3, "greet"),
786 make_task_completed(4, 0, Some("\"hello world\"".to_string())),
787 ];
788 let new_events = vec![];
789
790 let resp = OrchestrationExecutor::execute(
791 &orch_fn,
792 "inst-1",
793 old_events,
794 new_events,
795 String::new(),
796 &WorkerOptions::default(),
797 None,
798 )
799 .await
800 .unwrap();
801
802 let complete_action = resp.actions.iter().find(|a| {
803 matches!(
804 &a.workflow_action_type,
805 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
806 )
807 });
808 assert!(complete_action.is_some());
809 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
810 &complete_action.unwrap().workflow_action_type
811 {
812 assert_eq!(
813 cw.workflow_status,
814 proto::OrchestrationStatus::Completed as i32
815 );
816 assert_eq!(cw.result, Some("\"hello world\"".to_string()));
817 }
818 }
819
820 #[tokio::test]
821 async fn test_orchestrator_pending_activity() {
822 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
823 Box::pin(async move {
824 let result = ctx.call_activity("greet", "world").await?;
825 Ok(result)
826 })
827 });
828
829 let ts = chrono::Utc::now();
830 let old_events = vec![make_workflow_started(ts)];
831 let new_events = vec![make_execution_started("test_orch", None)];
832
833 let resp = OrchestrationExecutor::execute(
834 &orch_fn,
835 "inst-1",
836 old_events,
837 new_events,
838 String::new(),
839 &WorkerOptions::default(),
840 None,
841 )
842 .await
843 .unwrap();
844
845 let has_schedule = resp.actions.iter().any(|a| {
846 matches!(
847 &a.workflow_action_type,
848 Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
849 )
850 });
851 assert!(has_schedule);
852
853 let has_complete = resp.actions.iter().any(|a| {
854 matches!(
855 &a.workflow_action_type,
856 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
857 )
858 });
859 assert!(!has_complete);
860 }
861
862 #[tokio::test]
863 async fn test_orchestrator_task_failure() {
864 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
865 Box::pin(async move {
866 let result = ctx.call_activity("greet", "world").await?;
867 Ok(result)
868 })
869 });
870
871 let ts = chrono::Utc::now();
872 let old_events = vec![
873 make_workflow_started(ts),
874 make_execution_started("test_orch", None),
875 make_task_scheduled(3, "greet"),
876 make_task_failed(4, 0),
877 ];
878 let new_events = vec![];
879
880 let resp = OrchestrationExecutor::execute(
881 &orch_fn,
882 "inst-1",
883 old_events,
884 new_events,
885 String::new(),
886 &WorkerOptions::default(),
887 None,
888 )
889 .await
890 .unwrap();
891
892 let complete_action = resp.actions.iter().find(|a| {
893 matches!(
894 &a.workflow_action_type,
895 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
896 )
897 });
898 assert!(complete_action.is_some());
899 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
900 &complete_action.unwrap().workflow_action_type
901 {
902 assert_eq!(
903 cw.workflow_status,
904 proto::OrchestrationStatus::Failed as i32
905 );
906 assert!(cw.failure_details.is_some());
907 }
908 }
909
910 #[tokio::test]
911 async fn test_suspended_orchestration_not_run() {
912 let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
913
914 let ts = chrono::Utc::now();
915 let old_events = vec![make_workflow_started(ts)];
916 let new_events = vec![
917 make_execution_started("test_orch", None),
918 proto::HistoryEvent {
919 event_id: 3,
920 timestamp: Some(to_timestamp(chrono::Utc::now())),
921 router: None,
922 event_type: Some(EventType::ExecutionSuspended(
923 proto::ExecutionSuspendedEvent {
924 input: Some("paused".to_string()),
925 },
926 )),
927 },
928 ];
929
930 let resp = OrchestrationExecutor::execute(
931 &orch_fn,
932 "inst-1",
933 old_events,
934 new_events,
935 String::new(),
936 &WorkerOptions::default(),
937 None,
938 )
939 .await
940 .unwrap();
941
942 assert!(resp.actions.is_empty());
943 }
944
945 #[tokio::test]
946 async fn test_terminated_orchestration_not_run() {
947 let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
948
949 let ts = chrono::Utc::now();
950 let old_events = vec![make_workflow_started(ts)];
951 let new_events = vec![
952 make_execution_started("test_orch", None),
953 proto::HistoryEvent {
954 event_id: 3,
955 timestamp: Some(to_timestamp(chrono::Utc::now())),
956 router: None,
957 event_type: Some(EventType::ExecutionTerminated(
958 proto::ExecutionTerminatedEvent {
959 input: None,
960 recurse: false,
961 },
962 )),
963 },
964 ];
965
966 let resp = OrchestrationExecutor::execute(
967 &orch_fn,
968 "inst-1",
969 old_events,
970 new_events,
971 String::new(),
972 &WorkerOptions::default(),
973 None,
974 )
975 .await
976 .unwrap();
977
978 assert_eq!(resp.actions.len(), 1);
980 match &resp.actions[0].workflow_action_type {
981 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
982 assert_eq!(
983 cw.workflow_status,
984 proto::OrchestrationStatus::Terminated as i32
985 );
986 assert!(cw.result.is_none());
987 }
988 other => panic!("expected CompleteWorkflow, got {other:?}"),
989 }
990 }
991
992 #[tokio::test]
993 async fn test_continue_as_new() {
994 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
995 Box::pin(async move {
996 ctx.continue_as_new("new_input", false);
997 Ok(None)
998 })
999 });
1000
1001 let ts = chrono::Utc::now();
1002 let old_events = vec![make_workflow_started(ts)];
1003 let new_events = vec![make_execution_started("test_orch", None)];
1004
1005 let resp = OrchestrationExecutor::execute(
1006 &orch_fn,
1007 "inst-1",
1008 old_events,
1009 new_events,
1010 String::new(),
1011 &WorkerOptions::default(),
1012 None,
1013 )
1014 .await
1015 .unwrap();
1016
1017 let complete_action = resp.actions.iter().find(|a| {
1018 matches!(
1019 &a.workflow_action_type,
1020 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1021 )
1022 });
1023 assert!(complete_action.is_some());
1024 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1025 &complete_action.unwrap().workflow_action_type
1026 {
1027 assert_eq!(
1028 cw.workflow_status,
1029 proto::OrchestrationStatus::ContinuedAsNew as i32
1030 );
1031 assert_eq!(cw.result, Some("\"new_input\"".to_string()));
1032 }
1033 }
1034
1035 #[tokio::test]
1036 async fn test_external_event_delivery() {
1037 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
1038 Box::pin(async move {
1039 let result = ctx.wait_for_external_event("approval").await?;
1040 Ok(result)
1041 })
1042 });
1043
1044 let ts = chrono::Utc::now();
1045 let old_events = vec![make_workflow_started(ts)];
1046 let new_events = vec![
1047 make_execution_started("test_orch", None),
1048 proto::HistoryEvent {
1049 event_id: 3,
1050 timestamp: Some(to_timestamp(chrono::Utc::now())),
1051 router: None,
1052 event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
1053 name: "approval".to_string(),
1054 input: Some("\"yes\"".to_string()),
1055 })),
1056 },
1057 ];
1058
1059 let resp = OrchestrationExecutor::execute(
1060 &orch_fn,
1061 "inst-1",
1062 old_events,
1063 new_events,
1064 String::new(),
1065 &WorkerOptions::default(),
1066 None,
1067 )
1068 .await
1069 .unwrap();
1070
1071 let complete_action = resp.actions.iter().find(|a| {
1072 matches!(
1073 &a.workflow_action_type,
1074 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1075 )
1076 });
1077 assert!(complete_action.is_some());
1078 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1079 &complete_action.unwrap().workflow_action_type
1080 {
1081 assert_eq!(
1082 cw.workflow_status,
1083 proto::OrchestrationStatus::Completed as i32
1084 );
1085 assert_eq!(cw.result, Some("\"yes\"".to_string()));
1086 }
1087 }
1088}