1use std::task::Poll;
2
3use futures::FutureExt;
4
5use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
6use crate::internal::from_timestamp;
7use crate::proto;
8use crate::proto::history_event::EventType;
9use crate::task::OrchestrationContext;
10use crate::task::orchestration_context::OrchestrationContextInner;
11
12use super::options::WorkerOptions;
13use super::registry::OrchestratorFn;
14
15pub struct OrchestrationExecutor;
23
24impl OrchestrationExecutor {
25 pub async fn execute(
29 orchestrator_fn: &OrchestratorFn,
30 instance_id: &str,
31 old_events: Vec<proto::HistoryEvent>,
32 new_events: Vec<proto::HistoryEvent>,
33 completion_token: String,
34 options: &WorkerOptions,
35 propagated_history: Option<crate::api::PropagatedHistory>,
36 ) -> crate::api::Result<proto::WorkflowResponse> {
37 tracing::info!(
38 instance_id = %instance_id,
39 past_events = old_events.len(),
40 new_events = new_events.len(),
41 "Starting orchestration execution"
42 );
43
44 let ctx = OrchestrationContext::new(
45 instance_id.to_string(),
46 String::new(),
47 None,
48 chrono::Utc::now(),
49 true,
50 options,
51 old_events.len() + new_events.len(),
52 );
53
54 if propagated_history.is_some() {
57 let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
58 inner.propagated_history = propagated_history;
59 }
60
61 {
65 let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
66 inner.is_replaying = true;
67 tracing::debug!(
68 instance_id = %instance_id,
69 count = old_events.len(),
70 "Replaying old events"
71 );
72 for event in &old_events {
73 Self::process_event(
74 &mut inner,
75 event,
76 instance_id,
77 options.max_identifier_length,
78 );
79 }
80
81 inner.is_replaying = false;
82 tracing::debug!(
83 instance_id = %instance_id,
84 count = new_events.len(),
85 "Processing new events"
86 );
87 for event in &new_events {
88 Self::process_event(
89 &mut inner,
90 event,
91 instance_id,
92 options.max_identifier_length,
93 );
94 }
95 }
96
97 #[cfg(feature = "opentelemetry")]
99 let otel_ctx = {
100 let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
101 let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
102 let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
103 crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
104 };
105
106 let should_run = {
107 let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
108 !inner.is_suspended && !inner.is_complete
109 };
110
111 if should_run {
112 tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
113
114 let mut future = (orchestrator_fn)(ctx.clone()).boxed();
117 let poll_result = futures::poll!(future.as_mut());
118
119 match poll_result {
120 Poll::Ready(Ok(output)) => {
121 let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
122 if inner.continue_as_new_input.is_some() {
123 tracing::info!(
124 instance_id = %instance_id,
125 orchestrator = %inner.name,
126 "Orchestration continuing as new"
127 );
128 #[cfg(feature = "opentelemetry")]
129 crate::internal::otel::set_span_status_attribute(
130 &otel_ctx,
131 "CONTINUED_AS_NEW",
132 );
133 inner.is_complete = true;
134 inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
135 } else if !inner.is_complete {
136 tracing::info!(
137 instance_id = %instance_id,
138 orchestrator = %inner.name,
139 "Orchestration completed successfully"
140 );
141 #[cfg(feature = "opentelemetry")]
142 crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
143 inner.is_complete = true;
144 inner.completion_status = Some(OrchestrationStatus::Completed);
145 inner.completion_result = output;
146 }
147 }
148 Poll::Ready(Err(DurableTaskError::TaskFailed {
149 message,
150 failure_details,
151 })) => {
152 let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
153 tracing::warn!(
154 instance_id = %instance_id,
155 orchestrator = %inner.name,
156 error = %message,
157 "Orchestration failed due to task failure"
158 );
159 #[cfg(feature = "opentelemetry")]
160 {
161 crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
162 crate::internal::otel::set_span_error(&otel_ctx, &message);
163 }
164 inner.is_complete = true;
165 inner.completion_status = Some(OrchestrationStatus::Failed);
166 inner.completion_failure =
167 Some(failure_details.unwrap_or_else(|| FailureDetails {
168 message: message.clone(),
169 error_type: "TaskFailed".to_string(),
170 stack_trace: None,
171 }));
172 }
173 Poll::Ready(Err(e)) => {
174 let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
175 tracing::error!(
176 instance_id = %instance_id,
177 orchestrator = %inner.name,
178 error = %e,
179 "Orchestration failed with error"
180 );
181 #[cfg(feature = "opentelemetry")]
182 {
183 crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
184 crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
185 }
186 inner.is_complete = true;
187 inner.completion_status = Some(OrchestrationStatus::Failed);
188 inner.completion_failure = Some(FailureDetails {
189 message: e.to_string(),
190 error_type: "OrchestratorError".to_string(),
191 stack_trace: None,
192 });
193 }
194 Poll::Pending => {
195 let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
196 tracing::debug!(
197 instance_id = %instance_id,
198 orchestrator = %inner.name,
199 pending_actions = inner.pending_actions.len(),
200 "Orchestrator yielded, waiting for tasks"
201 );
202 }
203 }
204 } else {
205 let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
206 tracing::debug!(
207 instance_id = %instance_id,
208 is_suspended = inner.is_suspended,
209 is_complete = inner.is_complete,
210 "Skipping orchestrator execution"
211 );
212 #[cfg(feature = "opentelemetry")]
213 if inner.is_complete {
214 crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
215 }
216 }
217
218 #[cfg(feature = "opentelemetry")]
220 crate::internal::otel::end_span(&otel_ctx);
221
222 let response = Self::build_response(&ctx, instance_id, completion_token);
223 tracing::debug!(
224 instance_id = %instance_id,
225 actions = response.actions.len(),
226 "Built orchestration response"
227 );
228 Ok(response)
229 }
230
231 #[cfg(feature = "opentelemetry")]
233 fn find_parent_trace_context<'a>(
234 old_events: &'a [proto::HistoryEvent],
235 new_events: &'a [proto::HistoryEvent],
236 ) -> Option<&'a proto::TraceContext> {
237 old_events.iter().chain(new_events.iter()).find_map(|e| {
238 if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
239 es.parent_trace_context.as_ref()
240 } else {
241 None
242 }
243 })
244 }
245
246 fn process_event(
251 inner: &mut OrchestrationContextInner,
252 event: &proto::HistoryEvent,
253 instance_id: &str,
254 max_identifier_length: usize,
255 ) {
256 let event_type = match &event.event_type {
257 Some(et) => et,
258 None => return,
259 };
260
261 match event_type {
262 EventType::WorkflowStarted(ws) => {
263 if let Some(ts) = &event.timestamp {
264 if let Some(dt) = from_timestamp(ts) {
265 inner.current_utc_datetime = dt;
266 }
267 }
268 if let Some(version) = &ws.version {
269 for patch in &version.patches {
270 inner.history_patches.insert(patch.clone());
271 }
272 }
273 }
274 EventType::ExecutionStarted(e) => {
275 tracing::debug!(
276 instance_id = %instance_id,
277 orchestrator = %e.name,
278 "Execution started event"
279 );
280 inner.name = e.name.clone();
281 inner.input = e.input.clone();
282 }
283 EventType::TaskCompleted(e) => {
284 let seq = e.task_scheduled_id;
285 tracing::debug!(
286 instance_id = %instance_id,
287 task_id = seq,
288 "Task completed"
289 );
290 let task = inner.pending_tasks.entry(seq).or_default();
291 if task.is_complete() {
292 tracing::debug!(
293 instance_id = %instance_id,
294 task_id = seq,
295 "Skipping duplicate task completion"
296 );
297 return;
298 }
299 task.complete(e.result.clone());
300 }
301 EventType::TaskFailed(e) => {
302 let seq = e.task_scheduled_id;
303 let details = e
304 .failure_details
305 .as_ref()
306 .map(FailureDetails::from)
307 .unwrap_or_else(|| FailureDetails {
308 message: "Task failed".to_string(),
309 error_type: "Unknown".to_string(),
310 stack_trace: None,
311 });
312 tracing::debug!(
313 instance_id = %instance_id,
314 task_id = seq,
315 error = %details.message,
316 "Task failed"
317 );
318 let task = inner.pending_tasks.entry(seq).or_default();
319 if task.is_complete() {
320 tracing::debug!(
321 instance_id = %instance_id,
322 task_id = seq,
323 "Skipping duplicate task completion"
324 );
325 return;
326 }
327 task.fail(details);
328 }
329 EventType::TaskScheduled(_)
330 | EventType::TimerCreated(_)
331 | EventType::ChildWorkflowInstanceCreated(_) => {
332 inner.history_scheduled_count += 1;
333 }
334 EventType::TimerFired(e) => {
335 let seq = e.timer_id;
336 tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
337 let task = inner.pending_tasks.entry(seq).or_default();
338 if task.is_complete() {
339 tracing::debug!(
340 instance_id = %instance_id,
341 task_id = seq,
342 "Skipping duplicate task completion"
343 );
344 return;
345 }
346 task.complete(None);
347 }
348 EventType::ChildWorkflowInstanceCompleted(e) => {
349 let seq = e.task_scheduled_id;
350 tracing::debug!(
351 instance_id = %instance_id,
352 task_id = seq,
353 "Child workflow completed"
354 );
355 let task = inner.pending_tasks.entry(seq).or_default();
356 if task.is_complete() {
357 tracing::debug!(
358 instance_id = %instance_id,
359 task_id = seq,
360 "Skipping duplicate task completion"
361 );
362 return;
363 }
364 task.complete(e.result.clone());
365 }
366 EventType::ChildWorkflowInstanceFailed(e) => {
367 let seq = e.task_scheduled_id;
368 let details = e
369 .failure_details
370 .as_ref()
371 .map(FailureDetails::from)
372 .unwrap_or_else(|| FailureDetails {
373 message: "Sub-orchestration failed".to_string(),
374 error_type: "Unknown".to_string(),
375 stack_trace: None,
376 });
377 tracing::debug!(
378 instance_id = %instance_id,
379 task_id = seq,
380 error = %details.message,
381 "Child workflow failed"
382 );
383 let task = inner.pending_tasks.entry(seq).or_default();
384 if task.is_complete() {
385 tracing::debug!(
386 instance_id = %instance_id,
387 task_id = seq,
388 "Skipping duplicate task completion"
389 );
390 return;
391 }
392 task.fail(details);
393 }
394 EventType::EventRaised(e) => {
395 if let Err(err) = crate::internal::validate_identifier(
396 &e.name,
397 "event name",
398 max_identifier_length,
399 ) {
400 tracing::warn!(
401 instance_id = %instance_id,
402 event_name = %e.name,
403 error = %err,
404 "Rejected event: invalid event name"
405 );
406 return;
407 }
408 let event_name = e.name.to_lowercase();
409 tracing::debug!(
410 instance_id = %instance_id,
411 event_name = %e.name,
412 "External event raised"
413 );
414
415 if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name) {
416 if !tasks.is_empty() {
417 let task = tasks.remove(0);
418 if task.is_complete() {
419 tracing::debug!(
420 instance_id = %instance_id,
421 event_name = %e.name,
422 "Skipping duplicate task completion"
423 );
424 return;
425 }
426 task.complete(e.input.clone());
427 return;
428 }
429 }
430
431 if inner.buffered_events.len() >= inner.max_event_names
432 && !inner.buffered_events.contains_key(&event_name)
433 {
434 tracing::warn!(
435 instance_id = %instance_id,
436 event_name = %e.name,
437 "Event name limit reached, discarding event"
438 );
439 return;
440 }
441
442 let max_events = inner.max_events_per_name;
443 let events = inner.buffered_events.entry(event_name).or_default();
444 if events.len() >= max_events {
445 tracing::warn!(
446 instance_id = %instance_id,
447 event_name = %e.name,
448 "Event buffer limit reached, discarding event"
449 );
450 return;
451 }
452 events.push(e.input.clone());
453 }
454 EventType::ExecutionSuspended(_) => {
455 tracing::info!(instance_id = %instance_id, "Orchestration suspended");
456 inner.is_suspended = true;
457 }
458 EventType::ExecutionResumed(_) => {
459 tracing::info!(instance_id = %instance_id, "Orchestration resumed");
460 inner.is_suspended = false;
461 }
462 EventType::ExecutionTerminated(e) => {
463 tracing::info!(instance_id = %instance_id, "Orchestration terminated");
464 inner.is_complete = true;
465 inner.completion_status = Some(OrchestrationStatus::Terminated);
466 inner.completion_result = e.input.clone();
467 inner.pending_actions.clear();
468 }
469 EventType::ExecutionCompleted(_)
470 | EventType::WorkflowCompleted(_)
471 | EventType::EventSent(_)
472 | EventType::ContinueAsNew(_)
473 | EventType::ExecutionStalled(_)
474 | EventType::DetachedWorkflowInstanceCreated(_) => {}
475 }
476 }
477
478 fn build_response(
479 ctx: &OrchestrationContext,
480 instance_id: &str,
481 completion_token: String,
482 ) -> proto::WorkflowResponse {
483 let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
484
485 let mut actions = std::mem::take(&mut inner.pending_actions);
488
489 if let Some(new_input) = inner.continue_as_new_input.take() {
490 let mut carryover_events = Vec::new();
491 if inner.save_events_on_continue {
492 for (name, events) in &inner.buffered_events {
493 for input in events {
494 carryover_events.push(proto::HistoryEvent {
495 event_id: -1,
496 timestamp: None,
497 router: None,
498 event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
499 name: name.clone(),
500 input: input.clone(),
501 })),
502 });
503 }
504 }
505 }
506
507 actions.push(proto::WorkflowAction {
508 id: actions.len() as i32,
509 router: None,
510 workflow_action_type: Some(
511 proto::workflow_action::WorkflowActionType::CompleteWorkflow(
512 proto::CompleteWorkflowAction {
513 workflow_status: proto::OrchestrationStatus::ContinuedAsNew as i32,
514 result: Some(new_input),
515 details: None,
516 new_version: None,
517 carryover_events,
518 failure_details: None,
519 },
520 ),
521 ),
522 });
523 } else if let Some(ref status) = inner.completion_status {
524 match status {
525 OrchestrationStatus::Completed => {
526 actions.push(proto::WorkflowAction {
527 id: actions.len() as i32,
528 router: None,
529 workflow_action_type: Some(
530 proto::workflow_action::WorkflowActionType::CompleteWorkflow(
531 proto::CompleteWorkflowAction {
532 workflow_status: proto::OrchestrationStatus::Completed as i32,
533 result: inner.completion_result.take(),
534 details: None,
535 new_version: None,
536 carryover_events: vec![],
537 failure_details: None,
538 },
539 ),
540 ),
541 });
542 }
543 OrchestrationStatus::Failed => {
544 let failure = inner.completion_failure.take();
545 actions.push(proto::WorkflowAction {
546 id: actions.len() as i32,
547 router: None,
548 workflow_action_type: Some(
549 proto::workflow_action::WorkflowActionType::CompleteWorkflow(
550 proto::CompleteWorkflowAction {
551 workflow_status: proto::OrchestrationStatus::Failed as i32,
552 result: None,
553 details: None,
554 new_version: None,
555 carryover_events: vec![],
556 failure_details: failure.map(|f| proto::TaskFailureDetails {
557 error_type: f.error_type,
558 error_message: f.message,
559 stack_trace: f.stack_trace,
560 inner_failure: None,
561 is_non_retriable: false,
562 }),
563 },
564 ),
565 ),
566 });
567 }
568 OrchestrationStatus::Terminated => {
569 actions.push(proto::WorkflowAction {
570 id: actions.len() as i32,
571 router: None,
572 workflow_action_type: Some(
573 proto::workflow_action::WorkflowActionType::CompleteWorkflow(
574 proto::CompleteWorkflowAction {
575 workflow_status: proto::OrchestrationStatus::Terminated as i32,
576 result: inner.completion_result.take(),
577 details: None,
578 new_version: None,
579 carryover_events: vec![],
580 failure_details: None,
581 },
582 ),
583 ),
584 });
585 }
586 _ => {
587 }
589 }
590 }
591
592 proto::WorkflowResponse {
593 instance_id: instance_id.to_string(),
594 actions,
595 custom_status: inner.custom_status.take(),
596 completion_token,
597 num_events_processed: None,
598 version: None,
599 }
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use crate::internal::to_timestamp;
607 use crate::proto::history_event::EventType;
608
609 use std::sync::Arc;
610
611 fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
612 proto::HistoryEvent {
613 event_id: 1,
614 timestamp: Some(to_timestamp(ts)),
615 router: None,
616 event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
617 version: None,
618 })),
619 }
620 }
621
622 fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
623 proto::HistoryEvent {
624 event_id: 2,
625 timestamp: Some(to_timestamp(chrono::Utc::now())),
626 router: None,
627 event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
628 name: name.to_string(),
629 version: None,
630 input,
631 workflow_instance: None,
632 parent_instance: None,
633 scheduled_start_timestamp: None,
634 parent_trace_context: None,
635 workflow_span_id: None,
636 tags: Default::default(),
637 })),
638 }
639 }
640
641 fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
642 proto::HistoryEvent {
643 event_id,
644 timestamp: Some(to_timestamp(chrono::Utc::now())),
645 router: None,
646 event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
647 name: name.to_string(),
648 version: None,
649 input: None,
650 parent_trace_context: None,
651 task_execution_id: String::new(),
652 rerun_parent_instance_info: None,
653 history_propagation_scope: None,
654 })),
655 }
656 }
657
658 fn make_task_completed(
659 event_id: i32,
660 task_scheduled_id: i32,
661 result: Option<String>,
662 ) -> proto::HistoryEvent {
663 proto::HistoryEvent {
664 event_id,
665 timestamp: Some(to_timestamp(chrono::Utc::now())),
666 router: None,
667 event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
668 task_scheduled_id,
669 result,
670 task_execution_id: String::new(),
671 attestation: None,
672 signer_certificate: None,
673 })),
674 }
675 }
676
677 fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
678 proto::HistoryEvent {
679 event_id,
680 timestamp: Some(to_timestamp(chrono::Utc::now())),
681 router: None,
682 event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
683 task_scheduled_id,
684 failure_details: Some(proto::TaskFailureDetails {
685 error_type: "TestError".to_string(),
686 error_message: "test failure".to_string(),
687 stack_trace: None,
688 inner_failure: None,
689 is_non_retriable: false,
690 }),
691 task_execution_id: String::new(),
692 attestation: None,
693 signer_certificate: None,
694 })),
695 }
696 }
697
698 #[tokio::test]
699 async fn test_simple_orchestrator_completes() {
700 let orch_fn: OrchestratorFn =
701 Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
702
703 let ts = chrono::Utc::now();
704 let old_events = vec![make_workflow_started(ts)];
705 let new_events = vec![make_execution_started("test_orch", None)];
706
707 let resp = OrchestrationExecutor::execute(
708 &orch_fn,
709 "inst-1",
710 old_events,
711 new_events,
712 String::new(),
713 &WorkerOptions::default(),
714 None,
715 )
716 .await
717 .unwrap();
718
719 assert_eq!(resp.instance_id, "inst-1");
720 let complete_action = resp.actions.iter().find(|a| {
721 matches!(
722 &a.workflow_action_type,
723 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
724 )
725 });
726 assert!(complete_action.is_some());
727 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
728 &complete_action.unwrap().workflow_action_type
729 {
730 assert_eq!(
731 cw.workflow_status,
732 proto::OrchestrationStatus::Completed as i32
733 );
734 assert_eq!(cw.result, Some("\"done\"".to_string()));
735 }
736 }
737
738 #[tokio::test]
739 async fn test_orchestrator_with_activity_replay() {
740 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
741 Box::pin(async move {
742 let result = ctx.call_activity("greet", "world").await?;
743 Ok(result)
744 })
745 });
746
747 let ts = chrono::Utc::now();
748 let old_events = vec![
749 make_workflow_started(ts),
750 make_execution_started("test_orch", None),
751 make_task_scheduled(3, "greet"),
752 make_task_completed(4, 0, Some("\"hello world\"".to_string())),
753 ];
754 let new_events = vec![];
755
756 let resp = OrchestrationExecutor::execute(
757 &orch_fn,
758 "inst-1",
759 old_events,
760 new_events,
761 String::new(),
762 &WorkerOptions::default(),
763 None,
764 )
765 .await
766 .unwrap();
767
768 let complete_action = resp.actions.iter().find(|a| {
769 matches!(
770 &a.workflow_action_type,
771 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
772 )
773 });
774 assert!(complete_action.is_some());
775 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
776 &complete_action.unwrap().workflow_action_type
777 {
778 assert_eq!(
779 cw.workflow_status,
780 proto::OrchestrationStatus::Completed as i32
781 );
782 assert_eq!(cw.result, Some("\"hello world\"".to_string()));
783 }
784 }
785
786 #[tokio::test]
787 async fn test_orchestrator_pending_activity() {
788 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
789 Box::pin(async move {
790 let result = ctx.call_activity("greet", "world").await?;
791 Ok(result)
792 })
793 });
794
795 let ts = chrono::Utc::now();
796 let old_events = vec![make_workflow_started(ts)];
797 let new_events = vec![make_execution_started("test_orch", None)];
798
799 let resp = OrchestrationExecutor::execute(
800 &orch_fn,
801 "inst-1",
802 old_events,
803 new_events,
804 String::new(),
805 &WorkerOptions::default(),
806 None,
807 )
808 .await
809 .unwrap();
810
811 let has_schedule = resp.actions.iter().any(|a| {
812 matches!(
813 &a.workflow_action_type,
814 Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
815 )
816 });
817 assert!(has_schedule);
818
819 let has_complete = resp.actions.iter().any(|a| {
820 matches!(
821 &a.workflow_action_type,
822 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
823 )
824 });
825 assert!(!has_complete);
826 }
827
828 #[tokio::test]
829 async fn test_orchestrator_task_failure() {
830 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
831 Box::pin(async move {
832 let result = ctx.call_activity("greet", "world").await?;
833 Ok(result)
834 })
835 });
836
837 let ts = chrono::Utc::now();
838 let old_events = vec![
839 make_workflow_started(ts),
840 make_execution_started("test_orch", None),
841 make_task_scheduled(3, "greet"),
842 make_task_failed(4, 0),
843 ];
844 let new_events = vec![];
845
846 let resp = OrchestrationExecutor::execute(
847 &orch_fn,
848 "inst-1",
849 old_events,
850 new_events,
851 String::new(),
852 &WorkerOptions::default(),
853 None,
854 )
855 .await
856 .unwrap();
857
858 let complete_action = resp.actions.iter().find(|a| {
859 matches!(
860 &a.workflow_action_type,
861 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
862 )
863 });
864 assert!(complete_action.is_some());
865 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
866 &complete_action.unwrap().workflow_action_type
867 {
868 assert_eq!(
869 cw.workflow_status,
870 proto::OrchestrationStatus::Failed as i32
871 );
872 assert!(cw.failure_details.is_some());
873 }
874 }
875
876 #[tokio::test]
877 async fn test_suspended_orchestration_not_run() {
878 let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
879
880 let ts = chrono::Utc::now();
881 let old_events = vec![make_workflow_started(ts)];
882 let new_events = vec![
883 make_execution_started("test_orch", None),
884 proto::HistoryEvent {
885 event_id: 3,
886 timestamp: Some(to_timestamp(chrono::Utc::now())),
887 router: None,
888 event_type: Some(EventType::ExecutionSuspended(
889 proto::ExecutionSuspendedEvent {
890 input: Some("paused".to_string()),
891 },
892 )),
893 },
894 ];
895
896 let resp = OrchestrationExecutor::execute(
897 &orch_fn,
898 "inst-1",
899 old_events,
900 new_events,
901 String::new(),
902 &WorkerOptions::default(),
903 None,
904 )
905 .await
906 .unwrap();
907
908 assert!(resp.actions.is_empty());
909 }
910
911 #[tokio::test]
912 async fn test_terminated_orchestration_not_run() {
913 let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
914
915 let ts = chrono::Utc::now();
916 let old_events = vec![make_workflow_started(ts)];
917 let new_events = vec![
918 make_execution_started("test_orch", None),
919 proto::HistoryEvent {
920 event_id: 3,
921 timestamp: Some(to_timestamp(chrono::Utc::now())),
922 router: None,
923 event_type: Some(EventType::ExecutionTerminated(
924 proto::ExecutionTerminatedEvent {
925 input: None,
926 recurse: false,
927 },
928 )),
929 },
930 ];
931
932 let resp = OrchestrationExecutor::execute(
933 &orch_fn,
934 "inst-1",
935 old_events,
936 new_events,
937 String::new(),
938 &WorkerOptions::default(),
939 None,
940 )
941 .await
942 .unwrap();
943
944 assert_eq!(resp.actions.len(), 1);
946 match &resp.actions[0].workflow_action_type {
947 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
948 assert_eq!(
949 cw.workflow_status,
950 proto::OrchestrationStatus::Terminated as i32
951 );
952 assert!(cw.result.is_none());
953 }
954 other => panic!("expected CompleteWorkflow, got {:?}", other),
955 }
956 }
957
958 #[tokio::test]
959 async fn test_continue_as_new() {
960 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
961 Box::pin(async move {
962 ctx.continue_as_new("new_input", false);
963 Ok(None)
964 })
965 });
966
967 let ts = chrono::Utc::now();
968 let old_events = vec![make_workflow_started(ts)];
969 let new_events = vec![make_execution_started("test_orch", None)];
970
971 let resp = OrchestrationExecutor::execute(
972 &orch_fn,
973 "inst-1",
974 old_events,
975 new_events,
976 String::new(),
977 &WorkerOptions::default(),
978 None,
979 )
980 .await
981 .unwrap();
982
983 let complete_action = resp.actions.iter().find(|a| {
984 matches!(
985 &a.workflow_action_type,
986 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
987 )
988 });
989 assert!(complete_action.is_some());
990 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
991 &complete_action.unwrap().workflow_action_type
992 {
993 assert_eq!(
994 cw.workflow_status,
995 proto::OrchestrationStatus::ContinuedAsNew as i32
996 );
997 assert_eq!(cw.result, Some("\"new_input\"".to_string()));
998 }
999 }
1000
1001 #[tokio::test]
1002 async fn test_external_event_delivery() {
1003 let orch_fn: OrchestratorFn = Arc::new(|ctx| {
1004 Box::pin(async move {
1005 let result = ctx.wait_for_external_event("approval").await?;
1006 Ok(result)
1007 })
1008 });
1009
1010 let ts = chrono::Utc::now();
1011 let old_events = vec![make_workflow_started(ts)];
1012 let new_events = vec![
1013 make_execution_started("test_orch", None),
1014 proto::HistoryEvent {
1015 event_id: 3,
1016 timestamp: Some(to_timestamp(chrono::Utc::now())),
1017 router: None,
1018 event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
1019 name: "approval".to_string(),
1020 input: Some("\"yes\"".to_string()),
1021 })),
1022 },
1023 ];
1024
1025 let resp = OrchestrationExecutor::execute(
1026 &orch_fn,
1027 "inst-1",
1028 old_events,
1029 new_events,
1030 String::new(),
1031 &WorkerOptions::default(),
1032 None,
1033 )
1034 .await
1035 .unwrap();
1036
1037 let complete_action = resp.actions.iter().find(|a| {
1038 matches!(
1039 &a.workflow_action_type,
1040 Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1041 )
1042 });
1043 assert!(complete_action.is_some());
1044 if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1045 &complete_action.unwrap().workflow_action_type
1046 {
1047 assert_eq!(
1048 cw.workflow_status,
1049 proto::OrchestrationStatus::Completed as i32
1050 );
1051 assert_eq!(cw.result, Some("\"yes\"".to_string()));
1052 }
1053 }
1054}