Skip to main content

dapr_durabletask/worker/
orchestration_executor.rs

1use std::task::Poll;
2
3use futures::FutureExt;
4
5use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
6use crate::internal::from_timestamp;
7use crate::proto;
8use crate::proto::history_event::EventType;
9use crate::task::OrchestrationContext;
10use crate::task::orchestration_context::OrchestrationContextInner;
11
12use super::options::WorkerOptions;
13use super::registry::OrchestratorFn;
14
15/// Executes orchestrator functions by replaying history and processing new events.
16///
17/// The executor follows the durable task replay model:
18/// 1. Process old (past) events to pre-populate completed tasks
19/// 2. Process new events to deliver results and external events
20/// 3. Run the orchestrator function (unless suspended, terminated, or already complete)
21/// 4. Collect pending actions and build the response
22pub struct OrchestrationExecutor;
23
24impl OrchestrationExecutor {
25    /// Execute an orchestrator function by replaying history and processing new events.
26    ///
27    /// Returns a `WorkflowResponse` with the actions to take.
28    pub async fn execute(
29        orchestrator_fn: &OrchestratorFn,
30        instance_id: &str,
31        old_events: Vec<proto::HistoryEvent>,
32        new_events: Vec<proto::HistoryEvent>,
33        completion_token: String,
34        options: &WorkerOptions,
35        propagated_history: Option<crate::api::PropagatedHistory>,
36    ) -> crate::api::Result<proto::WorkflowResponse> {
37        tracing::info!(
38            instance_id = %instance_id,
39            past_events = old_events.len(),
40            new_events = new_events.len(),
41            "Starting orchestration execution"
42        );
43
44        let ctx = OrchestrationContext::new(
45            instance_id.to_string(),
46            String::new(),
47            None,
48            chrono::Utc::now(),
49            true,
50            options,
51            old_events.len() + new_events.len(),
52        );
53
54        // Stash the propagated history (if any) before running the function so
55        // that ctx.propagated_history() is available during user code.
56        if propagated_history.is_some() {
57            let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
58            inner.propagated_history = propagated_history;
59        }
60
61        // Process all history events under a single lock acquisition.
62        // Previously each event locked/unlocked individually — O(N) lock ops
63        // for N events. This reduces it to O(1).
64        {
65            let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
66            inner.is_replaying = true;
67            tracing::debug!(
68                instance_id = %instance_id,
69                count = old_events.len(),
70                "Replaying old events"
71            );
72            for event in &old_events {
73                Self::process_event(
74                    &mut inner,
75                    event,
76                    instance_id,
77                    options.max_identifier_length,
78                );
79            }
80
81            inner.is_replaying = false;
82            tracing::debug!(
83                instance_id = %instance_id,
84                count = new_events.len(),
85                "Processing new events"
86            );
87            for event in &new_events {
88                Self::process_event(
89                    &mut inner,
90                    event,
91                    instance_id,
92                    options.max_identifier_length,
93                );
94            }
95        }
96
97        // Start an OTel orchestration span
98        #[cfg(feature = "opentelemetry")]
99        let otel_ctx = {
100            let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
101            let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
102            let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
103            crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
104        };
105
106        let should_run = {
107            let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
108            !inner.is_suspended && !inner.is_complete
109        };
110
111        if should_run {
112            tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
113
114            // Poll the orchestrator future once — if all awaited tasks are already
115            // completed from replay, the future runs to completion in a single poll.
116            let mut future = (orchestrator_fn)(ctx.clone()).boxed();
117            let poll_result = futures::poll!(future.as_mut());
118
119            match poll_result {
120                Poll::Ready(Ok(output)) => {
121                    let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
122                    if inner.continue_as_new_input.is_some() {
123                        tracing::info!(
124                            instance_id = %instance_id,
125                            orchestrator = %inner.name,
126                            "Orchestration continuing as new"
127                        );
128                        #[cfg(feature = "opentelemetry")]
129                        crate::internal::otel::set_span_status_attribute(
130                            &otel_ctx,
131                            "CONTINUED_AS_NEW",
132                        );
133                        inner.is_complete = true;
134                        inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
135                    } else if !inner.is_complete {
136                        tracing::info!(
137                            instance_id = %instance_id,
138                            orchestrator = %inner.name,
139                            "Orchestration completed successfully"
140                        );
141                        #[cfg(feature = "opentelemetry")]
142                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
143                        inner.is_complete = true;
144                        inner.completion_status = Some(OrchestrationStatus::Completed);
145                        inner.completion_result = output;
146                    }
147                }
148                Poll::Ready(Err(DurableTaskError::TaskFailed {
149                    message,
150                    failure_details,
151                })) => {
152                    let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
153                    tracing::warn!(
154                        instance_id = %instance_id,
155                        orchestrator = %inner.name,
156                        error = %message,
157                        "Orchestration failed due to task failure"
158                    );
159                    #[cfg(feature = "opentelemetry")]
160                    {
161                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
162                        crate::internal::otel::set_span_error(&otel_ctx, &message);
163                    }
164                    inner.is_complete = true;
165                    inner.completion_status = Some(OrchestrationStatus::Failed);
166                    inner.completion_failure =
167                        Some(failure_details.unwrap_or_else(|| FailureDetails {
168                            message: message.clone(),
169                            error_type: "TaskFailed".to_string(),
170                            stack_trace: None,
171                        }));
172                }
173                Poll::Ready(Err(e)) => {
174                    let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
175                    tracing::error!(
176                        instance_id = %instance_id,
177                        orchestrator = %inner.name,
178                        error = %e,
179                        "Orchestration failed with error"
180                    );
181                    #[cfg(feature = "opentelemetry")]
182                    {
183                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
184                        crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
185                    }
186                    inner.is_complete = true;
187                    inner.completion_status = Some(OrchestrationStatus::Failed);
188                    inner.completion_failure = Some(FailureDetails {
189                        message: e.to_string(),
190                        error_type: "OrchestratorError".to_string(),
191                        stack_trace: None,
192                    });
193                }
194                Poll::Pending => {
195                    let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
196                    tracing::debug!(
197                        instance_id = %instance_id,
198                        orchestrator = %inner.name,
199                        pending_actions = inner.pending_actions.len(),
200                        "Orchestrator yielded, waiting for tasks"
201                    );
202                }
203            }
204        } else {
205            let inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
206            tracing::debug!(
207                instance_id = %instance_id,
208                is_suspended = inner.is_suspended,
209                is_complete = inner.is_complete,
210                "Skipping orchestrator execution"
211            );
212            #[cfg(feature = "opentelemetry")]
213            if inner.is_complete {
214                crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
215            }
216        }
217
218        // End the OTel span
219        #[cfg(feature = "opentelemetry")]
220        crate::internal::otel::end_span(&otel_ctx);
221
222        let response = Self::build_response(&ctx, instance_id, completion_token);
223        tracing::debug!(
224            instance_id = %instance_id,
225            actions = response.actions.len(),
226            "Built orchestration response"
227        );
228        Ok(response)
229    }
230
231    /// Find the parent trace context from ExecutionStarted events in history.
232    #[cfg(feature = "opentelemetry")]
233    fn find_parent_trace_context<'a>(
234        old_events: &'a [proto::HistoryEvent],
235        new_events: &'a [proto::HistoryEvent],
236    ) -> Option<&'a proto::TraceContext> {
237        old_events.iter().chain(new_events.iter()).find_map(|e| {
238            if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
239                es.parent_trace_context.as_ref()
240            } else {
241                None
242            }
243        })
244    }
245
246    /// Process a single history event, updating the orchestration context state.
247    ///
248    /// Operates directly on the inner state to avoid per-event mutex acquisition.
249    /// The caller must hold the lock.
250    fn process_event(
251        inner: &mut OrchestrationContextInner,
252        event: &proto::HistoryEvent,
253        instance_id: &str,
254        max_identifier_length: usize,
255    ) {
256        let event_type = match &event.event_type {
257            Some(et) => et,
258            None => return,
259        };
260
261        match event_type {
262            EventType::WorkflowStarted(ws) => {
263                if let Some(ts) = &event.timestamp {
264                    if let Some(dt) = from_timestamp(ts) {
265                        inner.current_utc_datetime = dt;
266                    }
267                }
268                if let Some(version) = &ws.version {
269                    for patch in &version.patches {
270                        inner.history_patches.insert(patch.clone());
271                    }
272                }
273            }
274            EventType::ExecutionStarted(e) => {
275                tracing::debug!(
276                    instance_id = %instance_id,
277                    orchestrator = %e.name,
278                    "Execution started event"
279                );
280                inner.name = e.name.clone();
281                inner.input = e.input.clone();
282            }
283            EventType::TaskCompleted(e) => {
284                let seq = e.task_scheduled_id;
285                tracing::debug!(
286                    instance_id = %instance_id,
287                    task_id = seq,
288                    "Task completed"
289                );
290                let task = inner.pending_tasks.entry(seq).or_default();
291                if task.is_complete() {
292                    tracing::debug!(
293                        instance_id = %instance_id,
294                        task_id = seq,
295                        "Skipping duplicate task completion"
296                    );
297                    return;
298                }
299                task.complete(e.result.clone());
300            }
301            EventType::TaskFailed(e) => {
302                let seq = e.task_scheduled_id;
303                let details = e
304                    .failure_details
305                    .as_ref()
306                    .map(FailureDetails::from)
307                    .unwrap_or_else(|| FailureDetails {
308                        message: "Task failed".to_string(),
309                        error_type: "Unknown".to_string(),
310                        stack_trace: None,
311                    });
312                tracing::debug!(
313                    instance_id = %instance_id,
314                    task_id = seq,
315                    error = %details.message,
316                    "Task failed"
317                );
318                let task = inner.pending_tasks.entry(seq).or_default();
319                if task.is_complete() {
320                    tracing::debug!(
321                        instance_id = %instance_id,
322                        task_id = seq,
323                        "Skipping duplicate task completion"
324                    );
325                    return;
326                }
327                task.fail(details);
328            }
329            EventType::TaskScheduled(_)
330            | EventType::TimerCreated(_)
331            | EventType::ChildWorkflowInstanceCreated(_) => {
332                inner.history_scheduled_count += 1;
333            }
334            EventType::TimerFired(e) => {
335                let seq = e.timer_id;
336                tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
337                let task = inner.pending_tasks.entry(seq).or_default();
338                if task.is_complete() {
339                    tracing::debug!(
340                        instance_id = %instance_id,
341                        task_id = seq,
342                        "Skipping duplicate task completion"
343                    );
344                    return;
345                }
346                task.complete(None);
347            }
348            EventType::ChildWorkflowInstanceCompleted(e) => {
349                let seq = e.task_scheduled_id;
350                tracing::debug!(
351                    instance_id = %instance_id,
352                    task_id = seq,
353                    "Child workflow completed"
354                );
355                let task = inner.pending_tasks.entry(seq).or_default();
356                if task.is_complete() {
357                    tracing::debug!(
358                        instance_id = %instance_id,
359                        task_id = seq,
360                        "Skipping duplicate task completion"
361                    );
362                    return;
363                }
364                task.complete(e.result.clone());
365            }
366            EventType::ChildWorkflowInstanceFailed(e) => {
367                let seq = e.task_scheduled_id;
368                let details = e
369                    .failure_details
370                    .as_ref()
371                    .map(FailureDetails::from)
372                    .unwrap_or_else(|| FailureDetails {
373                        message: "Sub-orchestration failed".to_string(),
374                        error_type: "Unknown".to_string(),
375                        stack_trace: None,
376                    });
377                tracing::debug!(
378                    instance_id = %instance_id,
379                    task_id = seq,
380                    error = %details.message,
381                    "Child workflow failed"
382                );
383                let task = inner.pending_tasks.entry(seq).or_default();
384                if task.is_complete() {
385                    tracing::debug!(
386                        instance_id = %instance_id,
387                        task_id = seq,
388                        "Skipping duplicate task completion"
389                    );
390                    return;
391                }
392                task.fail(details);
393            }
394            EventType::EventRaised(e) => {
395                if let Err(err) = crate::internal::validate_identifier(
396                    &e.name,
397                    "event name",
398                    max_identifier_length,
399                ) {
400                    tracing::warn!(
401                        instance_id = %instance_id,
402                        event_name = %e.name,
403                        error = %err,
404                        "Rejected event: invalid event name"
405                    );
406                    return;
407                }
408                let event_name = e.name.to_lowercase();
409                tracing::debug!(
410                    instance_id = %instance_id,
411                    event_name = %e.name,
412                    "External event raised"
413                );
414
415                if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name) {
416                    if !tasks.is_empty() {
417                        let task = tasks.remove(0);
418                        if task.is_complete() {
419                            tracing::debug!(
420                                instance_id = %instance_id,
421                                event_name = %e.name,
422                                "Skipping duplicate task completion"
423                            );
424                            return;
425                        }
426                        task.complete(e.input.clone());
427                        return;
428                    }
429                }
430
431                if inner.buffered_events.len() >= inner.max_event_names
432                    && !inner.buffered_events.contains_key(&event_name)
433                {
434                    tracing::warn!(
435                        instance_id = %instance_id,
436                        event_name = %e.name,
437                        "Event name limit reached, discarding event"
438                    );
439                    return;
440                }
441
442                let max_events = inner.max_events_per_name;
443                let events = inner.buffered_events.entry(event_name).or_default();
444                if events.len() >= max_events {
445                    tracing::warn!(
446                        instance_id = %instance_id,
447                        event_name = %e.name,
448                        "Event buffer limit reached, discarding event"
449                    );
450                    return;
451                }
452                events.push(e.input.clone());
453            }
454            EventType::ExecutionSuspended(_) => {
455                tracing::info!(instance_id = %instance_id, "Orchestration suspended");
456                inner.is_suspended = true;
457            }
458            EventType::ExecutionResumed(_) => {
459                tracing::info!(instance_id = %instance_id, "Orchestration resumed");
460                inner.is_suspended = false;
461            }
462            EventType::ExecutionTerminated(e) => {
463                tracing::info!(instance_id = %instance_id, "Orchestration terminated");
464                inner.is_complete = true;
465                inner.completion_status = Some(OrchestrationStatus::Terminated);
466                inner.completion_result = e.input.clone();
467                inner.pending_actions.clear();
468            }
469            EventType::ExecutionCompleted(_)
470            | EventType::WorkflowCompleted(_)
471            | EventType::EventSent(_)
472            | EventType::ContinueAsNew(_)
473            | EventType::ExecutionStalled(_)
474            | EventType::DetachedWorkflowInstanceCreated(_) => {}
475        }
476    }
477
478    fn build_response(
479        ctx: &OrchestrationContext,
480        instance_id: &str,
481        completion_token: String,
482    ) -> proto::WorkflowResponse {
483        let mut inner = ctx.inner.lock().unwrap_or_else(|e| e.into_inner());
484
485        // Move actions out instead of cloning — the context is consumed after
486        // this response is built, so the original Vec is no longer needed.
487        let mut actions = std::mem::take(&mut inner.pending_actions);
488
489        if let Some(new_input) = inner.continue_as_new_input.take() {
490            let mut carryover_events = Vec::new();
491            if inner.save_events_on_continue {
492                for (name, events) in &inner.buffered_events {
493                    for input in events {
494                        carryover_events.push(proto::HistoryEvent {
495                            event_id: -1,
496                            timestamp: None,
497                            router: None,
498                            event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
499                                name: name.clone(),
500                                input: input.clone(),
501                            })),
502                        });
503                    }
504                }
505            }
506
507            actions.push(proto::WorkflowAction {
508                id: actions.len() as i32,
509                router: None,
510                workflow_action_type: Some(
511                    proto::workflow_action::WorkflowActionType::CompleteWorkflow(
512                        proto::CompleteWorkflowAction {
513                            workflow_status: proto::OrchestrationStatus::ContinuedAsNew as i32,
514                            result: Some(new_input),
515                            details: None,
516                            new_version: None,
517                            carryover_events,
518                            failure_details: None,
519                        },
520                    ),
521                ),
522            });
523        } else if let Some(ref status) = inner.completion_status {
524            match status {
525                OrchestrationStatus::Completed => {
526                    actions.push(proto::WorkflowAction {
527                        id: actions.len() as i32,
528                        router: None,
529                        workflow_action_type: Some(
530                            proto::workflow_action::WorkflowActionType::CompleteWorkflow(
531                                proto::CompleteWorkflowAction {
532                                    workflow_status: proto::OrchestrationStatus::Completed as i32,
533                                    result: inner.completion_result.take(),
534                                    details: None,
535                                    new_version: None,
536                                    carryover_events: vec![],
537                                    failure_details: None,
538                                },
539                            ),
540                        ),
541                    });
542                }
543                OrchestrationStatus::Failed => {
544                    let failure = inner.completion_failure.take();
545                    actions.push(proto::WorkflowAction {
546                        id: actions.len() as i32,
547                        router: None,
548                        workflow_action_type: Some(
549                            proto::workflow_action::WorkflowActionType::CompleteWorkflow(
550                                proto::CompleteWorkflowAction {
551                                    workflow_status: proto::OrchestrationStatus::Failed as i32,
552                                    result: None,
553                                    details: None,
554                                    new_version: None,
555                                    carryover_events: vec![],
556                                    failure_details: failure.map(|f| proto::TaskFailureDetails {
557                                        error_type: f.error_type,
558                                        error_message: f.message,
559                                        stack_trace: f.stack_trace,
560                                        inner_failure: None,
561                                        is_non_retriable: false,
562                                    }),
563                                },
564                            ),
565                        ),
566                    });
567                }
568                OrchestrationStatus::Terminated => {
569                    actions.push(proto::WorkflowAction {
570                        id: actions.len() as i32,
571                        router: None,
572                        workflow_action_type: Some(
573                            proto::workflow_action::WorkflowActionType::CompleteWorkflow(
574                                proto::CompleteWorkflowAction {
575                                    workflow_status: proto::OrchestrationStatus::Terminated as i32,
576                                    result: inner.completion_result.take(),
577                                    details: None,
578                                    new_version: None,
579                                    carryover_events: vec![],
580                                    failure_details: None,
581                                },
582                            ),
583                        ),
584                    });
585                }
586                _ => {
587                    // Other statuses (Pending, Running, etc.) don't produce completion actions
588                }
589            }
590        }
591
592        proto::WorkflowResponse {
593            instance_id: instance_id.to_string(),
594            actions,
595            custom_status: inner.custom_status.take(),
596            completion_token,
597            num_events_processed: None,
598            version: None,
599        }
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use crate::internal::to_timestamp;
607    use crate::proto::history_event::EventType;
608
609    use std::sync::Arc;
610
611    fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
612        proto::HistoryEvent {
613            event_id: 1,
614            timestamp: Some(to_timestamp(ts)),
615            router: None,
616            event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
617                version: None,
618            })),
619        }
620    }
621
622    fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
623        proto::HistoryEvent {
624            event_id: 2,
625            timestamp: Some(to_timestamp(chrono::Utc::now())),
626            router: None,
627            event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
628                name: name.to_string(),
629                version: None,
630                input,
631                workflow_instance: None,
632                parent_instance: None,
633                scheduled_start_timestamp: None,
634                parent_trace_context: None,
635                workflow_span_id: None,
636                tags: Default::default(),
637            })),
638        }
639    }
640
641    fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
642        proto::HistoryEvent {
643            event_id,
644            timestamp: Some(to_timestamp(chrono::Utc::now())),
645            router: None,
646            event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
647                name: name.to_string(),
648                version: None,
649                input: None,
650                parent_trace_context: None,
651                task_execution_id: String::new(),
652                rerun_parent_instance_info: None,
653                history_propagation_scope: None,
654            })),
655        }
656    }
657
658    fn make_task_completed(
659        event_id: i32,
660        task_scheduled_id: i32,
661        result: Option<String>,
662    ) -> proto::HistoryEvent {
663        proto::HistoryEvent {
664            event_id,
665            timestamp: Some(to_timestamp(chrono::Utc::now())),
666            router: None,
667            event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
668                task_scheduled_id,
669                result,
670                task_execution_id: String::new(),
671                attestation: None,
672                signer_certificate: None,
673            })),
674        }
675    }
676
677    fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
678        proto::HistoryEvent {
679            event_id,
680            timestamp: Some(to_timestamp(chrono::Utc::now())),
681            router: None,
682            event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
683                task_scheduled_id,
684                failure_details: Some(proto::TaskFailureDetails {
685                    error_type: "TestError".to_string(),
686                    error_message: "test failure".to_string(),
687                    stack_trace: None,
688                    inner_failure: None,
689                    is_non_retriable: false,
690                }),
691                task_execution_id: String::new(),
692                attestation: None,
693                signer_certificate: None,
694            })),
695        }
696    }
697
698    #[tokio::test]
699    async fn test_simple_orchestrator_completes() {
700        let orch_fn: OrchestratorFn =
701            Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
702
703        let ts = chrono::Utc::now();
704        let old_events = vec![make_workflow_started(ts)];
705        let new_events = vec![make_execution_started("test_orch", None)];
706
707        let resp = OrchestrationExecutor::execute(
708            &orch_fn,
709            "inst-1",
710            old_events,
711            new_events,
712            String::new(),
713            &WorkerOptions::default(),
714            None,
715        )
716        .await
717        .unwrap();
718
719        assert_eq!(resp.instance_id, "inst-1");
720        let complete_action = resp.actions.iter().find(|a| {
721            matches!(
722                &a.workflow_action_type,
723                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
724            )
725        });
726        assert!(complete_action.is_some());
727        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
728            &complete_action.unwrap().workflow_action_type
729        {
730            assert_eq!(
731                cw.workflow_status,
732                proto::OrchestrationStatus::Completed as i32
733            );
734            assert_eq!(cw.result, Some("\"done\"".to_string()));
735        }
736    }
737
738    #[tokio::test]
739    async fn test_orchestrator_with_activity_replay() {
740        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
741            Box::pin(async move {
742                let result = ctx.call_activity("greet", "world").await?;
743                Ok(result)
744            })
745        });
746
747        let ts = chrono::Utc::now();
748        let old_events = vec![
749            make_workflow_started(ts),
750            make_execution_started("test_orch", None),
751            make_task_scheduled(3, "greet"),
752            make_task_completed(4, 0, Some("\"hello world\"".to_string())),
753        ];
754        let new_events = vec![];
755
756        let resp = OrchestrationExecutor::execute(
757            &orch_fn,
758            "inst-1",
759            old_events,
760            new_events,
761            String::new(),
762            &WorkerOptions::default(),
763            None,
764        )
765        .await
766        .unwrap();
767
768        let complete_action = resp.actions.iter().find(|a| {
769            matches!(
770                &a.workflow_action_type,
771                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
772            )
773        });
774        assert!(complete_action.is_some());
775        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
776            &complete_action.unwrap().workflow_action_type
777        {
778            assert_eq!(
779                cw.workflow_status,
780                proto::OrchestrationStatus::Completed as i32
781            );
782            assert_eq!(cw.result, Some("\"hello world\"".to_string()));
783        }
784    }
785
786    #[tokio::test]
787    async fn test_orchestrator_pending_activity() {
788        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
789            Box::pin(async move {
790                let result = ctx.call_activity("greet", "world").await?;
791                Ok(result)
792            })
793        });
794
795        let ts = chrono::Utc::now();
796        let old_events = vec![make_workflow_started(ts)];
797        let new_events = vec![make_execution_started("test_orch", None)];
798
799        let resp = OrchestrationExecutor::execute(
800            &orch_fn,
801            "inst-1",
802            old_events,
803            new_events,
804            String::new(),
805            &WorkerOptions::default(),
806            None,
807        )
808        .await
809        .unwrap();
810
811        let has_schedule = resp.actions.iter().any(|a| {
812            matches!(
813                &a.workflow_action_type,
814                Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
815            )
816        });
817        assert!(has_schedule);
818
819        let has_complete = resp.actions.iter().any(|a| {
820            matches!(
821                &a.workflow_action_type,
822                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
823            )
824        });
825        assert!(!has_complete);
826    }
827
828    #[tokio::test]
829    async fn test_orchestrator_task_failure() {
830        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
831            Box::pin(async move {
832                let result = ctx.call_activity("greet", "world").await?;
833                Ok(result)
834            })
835        });
836
837        let ts = chrono::Utc::now();
838        let old_events = vec![
839            make_workflow_started(ts),
840            make_execution_started("test_orch", None),
841            make_task_scheduled(3, "greet"),
842            make_task_failed(4, 0),
843        ];
844        let new_events = vec![];
845
846        let resp = OrchestrationExecutor::execute(
847            &orch_fn,
848            "inst-1",
849            old_events,
850            new_events,
851            String::new(),
852            &WorkerOptions::default(),
853            None,
854        )
855        .await
856        .unwrap();
857
858        let complete_action = resp.actions.iter().find(|a| {
859            matches!(
860                &a.workflow_action_type,
861                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
862            )
863        });
864        assert!(complete_action.is_some());
865        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
866            &complete_action.unwrap().workflow_action_type
867        {
868            assert_eq!(
869                cw.workflow_status,
870                proto::OrchestrationStatus::Failed as i32
871            );
872            assert!(cw.failure_details.is_some());
873        }
874    }
875
876    #[tokio::test]
877    async fn test_suspended_orchestration_not_run() {
878        let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
879
880        let ts = chrono::Utc::now();
881        let old_events = vec![make_workflow_started(ts)];
882        let new_events = vec![
883            make_execution_started("test_orch", None),
884            proto::HistoryEvent {
885                event_id: 3,
886                timestamp: Some(to_timestamp(chrono::Utc::now())),
887                router: None,
888                event_type: Some(EventType::ExecutionSuspended(
889                    proto::ExecutionSuspendedEvent {
890                        input: Some("paused".to_string()),
891                    },
892                )),
893            },
894        ];
895
896        let resp = OrchestrationExecutor::execute(
897            &orch_fn,
898            "inst-1",
899            old_events,
900            new_events,
901            String::new(),
902            &WorkerOptions::default(),
903            None,
904        )
905        .await
906        .unwrap();
907
908        assert!(resp.actions.is_empty());
909    }
910
911    #[tokio::test]
912    async fn test_terminated_orchestration_not_run() {
913        let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
914
915        let ts = chrono::Utc::now();
916        let old_events = vec![make_workflow_started(ts)];
917        let new_events = vec![
918            make_execution_started("test_orch", None),
919            proto::HistoryEvent {
920                event_id: 3,
921                timestamp: Some(to_timestamp(chrono::Utc::now())),
922                router: None,
923                event_type: Some(EventType::ExecutionTerminated(
924                    proto::ExecutionTerminatedEvent {
925                        input: None,
926                        recurse: false,
927                    },
928                )),
929            },
930        ];
931
932        let resp = OrchestrationExecutor::execute(
933            &orch_fn,
934            "inst-1",
935            old_events,
936            new_events,
937            String::new(),
938            &WorkerOptions::default(),
939            None,
940        )
941        .await
942        .unwrap();
943
944        // Terminated — CompleteWorkflow with Terminated status
945        assert_eq!(resp.actions.len(), 1);
946        match &resp.actions[0].workflow_action_type {
947            Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
948                assert_eq!(
949                    cw.workflow_status,
950                    proto::OrchestrationStatus::Terminated as i32
951                );
952                assert!(cw.result.is_none());
953            }
954            other => panic!("expected CompleteWorkflow, got {:?}", other),
955        }
956    }
957
958    #[tokio::test]
959    async fn test_continue_as_new() {
960        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
961            Box::pin(async move {
962                ctx.continue_as_new("new_input", false);
963                Ok(None)
964            })
965        });
966
967        let ts = chrono::Utc::now();
968        let old_events = vec![make_workflow_started(ts)];
969        let new_events = vec![make_execution_started("test_orch", None)];
970
971        let resp = OrchestrationExecutor::execute(
972            &orch_fn,
973            "inst-1",
974            old_events,
975            new_events,
976            String::new(),
977            &WorkerOptions::default(),
978            None,
979        )
980        .await
981        .unwrap();
982
983        let complete_action = resp.actions.iter().find(|a| {
984            matches!(
985                &a.workflow_action_type,
986                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
987            )
988        });
989        assert!(complete_action.is_some());
990        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
991            &complete_action.unwrap().workflow_action_type
992        {
993            assert_eq!(
994                cw.workflow_status,
995                proto::OrchestrationStatus::ContinuedAsNew as i32
996            );
997            assert_eq!(cw.result, Some("\"new_input\"".to_string()));
998        }
999    }
1000
1001    #[tokio::test]
1002    async fn test_external_event_delivery() {
1003        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
1004            Box::pin(async move {
1005                let result = ctx.wait_for_external_event("approval").await?;
1006                Ok(result)
1007            })
1008        });
1009
1010        let ts = chrono::Utc::now();
1011        let old_events = vec![make_workflow_started(ts)];
1012        let new_events = vec![
1013            make_execution_started("test_orch", None),
1014            proto::HistoryEvent {
1015                event_id: 3,
1016                timestamp: Some(to_timestamp(chrono::Utc::now())),
1017                router: None,
1018                event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
1019                    name: "approval".to_string(),
1020                    input: Some("\"yes\"".to_string()),
1021                })),
1022            },
1023        ];
1024
1025        let resp = OrchestrationExecutor::execute(
1026            &orch_fn,
1027            "inst-1",
1028            old_events,
1029            new_events,
1030            String::new(),
1031            &WorkerOptions::default(),
1032            None,
1033        )
1034        .await
1035        .unwrap();
1036
1037        let complete_action = resp.actions.iter().find(|a| {
1038            matches!(
1039                &a.workflow_action_type,
1040                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1041            )
1042        });
1043        assert!(complete_action.is_some());
1044        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1045            &complete_action.unwrap().workflow_action_type
1046        {
1047            assert_eq!(
1048                cw.workflow_status,
1049                proto::OrchestrationStatus::Completed as i32
1050            );
1051            assert_eq!(cw.result, Some("\"yes\"".to_string()));
1052        }
1053    }
1054}