Skip to main content

dapr_durabletask/worker/
orchestration_executor.rs

1use std::sync::atomic::Ordering;
2use std::task::Poll;
3
4use futures::FutureExt;
5
6use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
7use crate::internal::from_timestamp;
8use crate::proto;
9use crate::proto::history_event::EventType;
10use crate::task::OrchestrationContext;
11use crate::task::orchestration_context::{OrchestrationContextInner, lock_inner};
12
13use super::options::WorkerOptions;
14use super::registry::OrchestratorFn;
15
16/// Executes orchestrator functions by replaying history and processing new events.
17///
18/// The executor follows the durable task replay model:
19/// 1. Process old (past) events to pre-populate completed tasks
20/// 2. Process new events to deliver results and external events
21/// 3. Run the orchestrator function (unless suspended, terminated, or already complete)
22/// 4. Collect pending actions and build the response
23pub struct OrchestrationExecutor;
24
25impl OrchestrationExecutor {
26    /// Execute an orchestrator function by replaying history and processing new events.
27    ///
28    /// Returns a `WorkflowResponse` with the actions to take.
29    pub async fn execute(
30        orchestrator_fn: &OrchestratorFn,
31        instance_id: &str,
32        old_events: Vec<proto::HistoryEvent>,
33        new_events: Vec<proto::HistoryEvent>,
34        completion_token: String,
35        options: &WorkerOptions,
36        propagated_history: Option<crate::api::PropagatedHistory>,
37    ) -> crate::api::Result<proto::WorkflowResponse> {
38        tracing::info!(
39            instance_id = %instance_id,
40            past_events = old_events.len(),
41            new_events = new_events.len(),
42            "Starting orchestration execution"
43        );
44
45        // Name and input start empty and are overwritten when ExecutionStarted is replayed.
46        let ctx = OrchestrationContext::new(
47            instance_id.to_string(),
48            String::new(),
49            None,
50            chrono::Utc::now(),
51            true,
52            options,
53            old_events.len() + new_events.len(),
54        );
55
56        // Stash the propagated history (if any) before running the function so
57        // that ctx.propagated_history() is available during user code.
58        if propagated_history.is_some() {
59            let mut inner = lock_inner(&ctx.inner);
60            inner.propagated_history = propagated_history.map(std::sync::Arc::new);
61        }
62
63        // Process all history events under a single lock acquisition.
64        //
65        // Completions applied while draining old events are tagged
66        // "during replay"; those applied from new events are tagged "new".
67        // `CompletableTask::poll` then clears the shared `is_replaying`
68        // flag the first time the orchestrator awaits a "new" completion —
69        // the replay frontier.
70        let initially_replaying = !old_events.is_empty();
71        {
72            let mut inner = lock_inner(&ctx.inner);
73            inner.is_replaying.store(true, Ordering::Release);
74            tracing::debug!(
75                instance_id = %instance_id,
76                count = old_events.len(),
77                "Replaying old events"
78            );
79            for event in &old_events {
80                Self::process_event(
81                    &mut inner,
82                    event,
83                    instance_id,
84                    options.max_identifier_length,
85                );
86            }
87
88            inner.is_replaying.store(false, Ordering::Release);
89            tracing::debug!(
90                instance_id = %instance_id,
91                count = new_events.len(),
92                "Processing new events"
93            );
94            for event in &new_events {
95                Self::process_event(
96                    &mut inner,
97                    event,
98                    instance_id,
99                    options.max_identifier_length,
100                );
101            }
102
103            // Brand-new executions run at the frontier from the first poll;
104            // re-runs start in replay and clear the flag as awaits resolve.
105            inner
106                .is_replaying
107                .store(initially_replaying, Ordering::Release);
108        }
109
110        // Start an OTel orchestration span
111        #[cfg(feature = "opentelemetry")]
112        let otel_ctx = {
113            let inner = lock_inner(&ctx.inner);
114            let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
115            let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
116            crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
117        };
118
119        let should_run = {
120            let inner = lock_inner(&ctx.inner);
121            !inner.is_suspended && !inner.is_complete
122        };
123
124        if should_run {
125            tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
126
127            // Poll the orchestrator future once — if all awaited tasks are already
128            // completed from replay, the future runs to completion in a single poll.
129            let mut future = (orchestrator_fn)(ctx.clone()).boxed();
130            let poll_result = futures::poll!(future.as_mut());
131
132            match poll_result {
133                Poll::Ready(Ok(output)) => {
134                    let mut inner = lock_inner(&ctx.inner);
135                    if inner.continue_as_new_input.is_some() {
136                        tracing::info!(
137                            instance_id = %instance_id,
138                            orchestrator = %inner.name,
139                            "Orchestration continuing as new"
140                        );
141                        #[cfg(feature = "opentelemetry")]
142                        crate::internal::otel::set_span_status_attribute(
143                            &otel_ctx,
144                            "CONTINUED_AS_NEW",
145                        );
146                        inner.is_complete = true;
147                        inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
148                    } else if !inner.is_complete {
149                        tracing::info!(
150                            instance_id = %instance_id,
151                            orchestrator = %inner.name,
152                            "Orchestration completed successfully"
153                        );
154                        #[cfg(feature = "opentelemetry")]
155                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
156                        inner.is_complete = true;
157                        inner.completion_status = Some(OrchestrationStatus::Completed);
158                        inner.completion_result = output;
159                    }
160                }
161                Poll::Ready(Err(DurableTaskError::TaskFailed {
162                    message,
163                    failure_details,
164                })) => {
165                    let mut inner = lock_inner(&ctx.inner);
166                    tracing::warn!(
167                        instance_id = %instance_id,
168                        orchestrator = %inner.name,
169                        error = %message,
170                        "Orchestration failed due to task failure"
171                    );
172                    #[cfg(feature = "opentelemetry")]
173                    {
174                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
175                        crate::internal::otel::set_span_error(&otel_ctx, &message);
176                    }
177                    inner.is_complete = true;
178                    inner.completion_status = Some(OrchestrationStatus::Failed);
179                    inner.completion_failure =
180                        Some(failure_details.unwrap_or_else(|| FailureDetails {
181                            message: message.clone(),
182                            error_type: "TaskFailed".to_string(),
183                            stack_trace: None,
184                        }));
185                }
186                Poll::Ready(Err(e)) => {
187                    let mut inner = lock_inner(&ctx.inner);
188                    tracing::error!(
189                        instance_id = %instance_id,
190                        orchestrator = %inner.name,
191                        error = %e,
192                        "Orchestration failed with error"
193                    );
194                    #[cfg(feature = "opentelemetry")]
195                    {
196                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
197                        crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
198                    }
199                    inner.is_complete = true;
200                    inner.completion_status = Some(OrchestrationStatus::Failed);
201                    inner.completion_failure = Some(FailureDetails {
202                        message: e.to_string(),
203                        error_type: "OrchestratorError".to_string(),
204                        stack_trace: None,
205                    });
206                }
207                Poll::Pending => {
208                    let inner = lock_inner(&ctx.inner);
209                    tracing::debug!(
210                        instance_id = %instance_id,
211                        orchestrator = %inner.name,
212                        pending_actions = inner.pending_actions.len(),
213                        "Orchestrator yielded, waiting for tasks"
214                    );
215                }
216            }
217        } else {
218            let inner = lock_inner(&ctx.inner);
219            tracing::debug!(
220                instance_id = %instance_id,
221                is_suspended = inner.is_suspended,
222                is_complete = inner.is_complete,
223                "Skipping orchestrator execution"
224            );
225            #[cfg(feature = "opentelemetry")]
226            if inner.is_complete {
227                crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
228            }
229        }
230
231        // End the OTel span
232        #[cfg(feature = "opentelemetry")]
233        crate::internal::otel::end_span(&otel_ctx);
234
235        let response = Self::build_response(&ctx, instance_id, completion_token);
236        tracing::debug!(
237            instance_id = %instance_id,
238            actions = response.actions.len(),
239            "Built orchestration response"
240        );
241        Ok(response)
242    }
243
244    /// Find the parent trace context from ExecutionStarted events in history.
245    #[cfg(feature = "opentelemetry")]
246    fn find_parent_trace_context<'a>(
247        old_events: &'a [proto::HistoryEvent],
248        new_events: &'a [proto::HistoryEvent],
249    ) -> Option<&'a proto::TraceContext> {
250        old_events.iter().chain(new_events.iter()).find_map(|e| {
251            if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
252                es.parent_trace_context.as_ref()
253            } else {
254                None
255            }
256        })
257    }
258
259    /// Process a single history event, updating the orchestration context state.
260    ///
261    /// Operates directly on the inner state to avoid per-event mutex acquisition.
262    /// The caller must hold the lock.
263    fn process_event(
264        inner: &mut OrchestrationContextInner,
265        event: &proto::HistoryEvent,
266        instance_id: &str,
267        max_identifier_length: usize,
268    ) {
269        let event_type = match &event.event_type {
270            Some(et) => et,
271            None => return,
272        };
273
274        let during_replay = inner.is_replaying.load(Ordering::Acquire);
275        let replay_handle = inner.is_replaying.clone();
276        // Get-or-insert a pending task at `seq`, ensuring placeholders
277        // inherit the shared replay flag.
278        let pending_task = |inner: &mut OrchestrationContextInner, seq: i32| {
279            let task = inner.pending_tasks.entry(seq).or_default();
280            task.set_replay_handle(replay_handle.clone());
281            task.clone()
282        };
283
284        match event_type {
285            EventType::WorkflowStarted(ws) => {
286                if let Some(ts) = &event.timestamp
287                    && let Some(dt) = from_timestamp(ts)
288                {
289                    inner.current_utc_datetime = dt;
290                }
291                if let Some(version) = &ws.version {
292                    for patch in &version.patches {
293                        inner.history_patches.insert(patch.clone());
294                    }
295                }
296            }
297            EventType::ExecutionStarted(e) => {
298                tracing::debug!(
299                    instance_id = %instance_id,
300                    orchestrator = %e.name,
301                    "Execution started event"
302                );
303                inner.name = std::sync::Arc::<str>::from(e.name.clone());
304                inner.input = e.input.clone();
305            }
306            EventType::TaskCompleted(e) => {
307                let seq = e.task_scheduled_id;
308                tracing::debug!(
309                    instance_id = %instance_id,
310                    task_id = seq,
311                    "Task completed"
312                );
313                let task = pending_task(inner, seq);
314                if task.is_complete() {
315                    tracing::debug!(
316                        instance_id = %instance_id,
317                        task_id = seq,
318                        "Skipping duplicate task completion"
319                    );
320                    return;
321                }
322                task.complete_with_phase(e.result.clone(), during_replay);
323            }
324            EventType::TaskFailed(e) => {
325                let seq = e.task_scheduled_id;
326                let details = e
327                    .failure_details
328                    .as_ref()
329                    .map(FailureDetails::from)
330                    .unwrap_or_else(|| FailureDetails {
331                        message: "Task failed".to_string(),
332                        error_type: "Unknown".to_string(),
333                        stack_trace: None,
334                    });
335                tracing::debug!(
336                    instance_id = %instance_id,
337                    task_id = seq,
338                    error = %details.message,
339                    "Task failed"
340                );
341                let task = pending_task(inner, seq);
342                if task.is_complete() {
343                    tracing::debug!(
344                        instance_id = %instance_id,
345                        task_id = seq,
346                        "Skipping duplicate task completion"
347                    );
348                    return;
349                }
350                task.fail_with_phase(details, during_replay);
351            }
352            EventType::TaskScheduled(_)
353            | EventType::TimerCreated(_)
354            | EventType::ChildWorkflowInstanceCreated(_) => {
355                inner.history_scheduled_count += 1;
356            }
357            EventType::TimerFired(e) => {
358                let seq = e.timer_id;
359                tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
360                let task = pending_task(inner, seq);
361                if task.is_complete() {
362                    tracing::debug!(
363                        instance_id = %instance_id,
364                        task_id = seq,
365                        "Skipping duplicate task completion"
366                    );
367                    return;
368                }
369                task.complete_with_phase(None, during_replay);
370            }
371            EventType::ChildWorkflowInstanceCompleted(e) => {
372                let seq = e.task_scheduled_id;
373                tracing::debug!(
374                    instance_id = %instance_id,
375                    task_id = seq,
376                    "Child workflow completed"
377                );
378                let task = pending_task(inner, seq);
379                if task.is_complete() {
380                    tracing::debug!(
381                        instance_id = %instance_id,
382                        task_id = seq,
383                        "Skipping duplicate task completion"
384                    );
385                    return;
386                }
387                task.complete_with_phase(e.result.clone(), during_replay);
388            }
389            EventType::ChildWorkflowInstanceFailed(e) => {
390                let seq = e.task_scheduled_id;
391                let details = e
392                    .failure_details
393                    .as_ref()
394                    .map(FailureDetails::from)
395                    .unwrap_or_else(|| FailureDetails {
396                        message: "Sub-orchestration failed".to_string(),
397                        error_type: "Unknown".to_string(),
398                        stack_trace: None,
399                    });
400                tracing::debug!(
401                    instance_id = %instance_id,
402                    task_id = seq,
403                    error = %details.message,
404                    "Child workflow failed"
405                );
406                let task = pending_task(inner, seq);
407                if task.is_complete() {
408                    tracing::debug!(
409                        instance_id = %instance_id,
410                        task_id = seq,
411                        "Skipping duplicate task completion"
412                    );
413                    return;
414                }
415                task.fail_with_phase(details, during_replay);
416            }
417            EventType::EventRaised(e) => {
418                if let Err(err) = crate::internal::validate_identifier(
419                    &e.name,
420                    "event name",
421                    max_identifier_length,
422                ) {
423                    tracing::warn!(
424                        instance_id = %instance_id,
425                        event_name = %e.name,
426                        error = %err,
427                        "Rejected event: invalid event name"
428                    );
429                    return;
430                }
431                let event_name = e.name.to_lowercase();
432                tracing::debug!(
433                    instance_id = %instance_id,
434                    event_name = %e.name,
435                    "External event raised"
436                );
437
438                if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name)
439                    && !tasks.is_empty()
440                {
441                    let task = tasks
442                        .pop_front()
443                        .expect("pending event task queue is not empty");
444                    if task.is_complete() {
445                        tracing::debug!(
446                            instance_id = %instance_id,
447                            event_name = %e.name,
448                            "Skipping duplicate task completion"
449                        );
450                        return;
451                    }
452                    task.complete_with_phase(e.input.clone(), during_replay);
453                    return;
454                }
455
456                if inner.buffered_events.len() >= inner.config.max_event_names
457                    && !inner.buffered_events.contains_key(&event_name)
458                {
459                    tracing::warn!(
460                        instance_id = %instance_id,
461                        event_name = %e.name,
462                        "Event name limit reached, discarding event"
463                    );
464                    return;
465                }
466
467                let max_events = inner.config.max_events_per_name;
468                let events = inner.buffered_events.entry(event_name).or_default();
469                if events.len() >= max_events {
470                    tracing::warn!(
471                        instance_id = %instance_id,
472                        event_name = %e.name,
473                        "Event buffer limit reached, discarding event"
474                    );
475                    return;
476                }
477                events.push_back((e.input.clone(), during_replay));
478            }
479            EventType::ExecutionSuspended(_) => {
480                tracing::info!(instance_id = %instance_id, "Orchestration suspended");
481                inner.is_suspended = true;
482            }
483            EventType::ExecutionResumed(_) => {
484                tracing::info!(instance_id = %instance_id, "Orchestration resumed");
485                inner.is_suspended = false;
486            }
487            EventType::ExecutionTerminated(e) => {
488                tracing::info!(instance_id = %instance_id, "Orchestration terminated");
489                inner.is_complete = true;
490                inner.completion_status = Some(OrchestrationStatus::Terminated);
491                inner.completion_result = e.input.clone();
492                inner.pending_actions.clear();
493            }
494            EventType::ExecutionCompleted(_)
495            | EventType::WorkflowCompleted(_)
496            | EventType::EventSent(_)
497            | EventType::ContinueAsNew(_)
498            | EventType::ExecutionStalled(_)
499            | EventType::DetachedWorkflowInstanceCreated(_) => {}
500        }
501    }
502
503    fn make_complete_action(
504        id: i32,
505        status: proto::OrchestrationStatus,
506        result: Option<String>,
507        carryover_events: Vec<proto::HistoryEvent>,
508        failure: Option<FailureDetails>,
509    ) -> proto::WorkflowAction {
510        proto::WorkflowAction {
511            id,
512            router: None,
513            workflow_action_type: Some(
514                proto::workflow_action::WorkflowActionType::CompleteWorkflow(
515                    proto::CompleteWorkflowAction {
516                        workflow_status: status as i32,
517                        result,
518                        details: None,
519                        new_version: None,
520                        carryover_events,
521                        failure_details: failure.map(|f| proto::TaskFailureDetails {
522                            error_type: f.error_type,
523                            error_message: f.message,
524                            stack_trace: f.stack_trace,
525                            inner_failure: None,
526                            is_non_retriable: false,
527                        }),
528                    },
529                ),
530            ),
531        }
532    }
533
534    fn build_response(
535        ctx: &OrchestrationContext,
536        instance_id: &str,
537        completion_token: String,
538    ) -> proto::WorkflowResponse {
539        let mut inner = lock_inner(&ctx.inner);
540
541        // Move actions out instead of cloning — the context is consumed after
542        // this response is built, so the original Vec is no longer needed.
543        let mut actions = std::mem::take(&mut inner.pending_actions);
544
545        if let Some(new_input) = inner.continue_as_new_input.take() {
546            let mut carryover_events = Vec::new();
547            if inner.save_events_on_continue {
548                for (name, events) in &inner.buffered_events {
549                    for (input, _during_replay) in events {
550                        carryover_events.push(proto::HistoryEvent {
551                            event_id: -1,
552                            timestamp: None,
553                            router: None,
554                            event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
555                                name: name.clone(),
556                                input: input.clone(),
557                            })),
558                        });
559                    }
560                }
561            }
562
563            actions.push(Self::make_complete_action(
564                actions.len() as i32,
565                proto::OrchestrationStatus::ContinuedAsNew,
566                Some(new_input),
567                carryover_events,
568                None,
569            ));
570        } else if let Some(status) = inner.completion_status {
571            match status {
572                OrchestrationStatus::Completed => {
573                    actions.push(Self::make_complete_action(
574                        actions.len() as i32,
575                        proto::OrchestrationStatus::Completed,
576                        inner.completion_result.take(),
577                        Vec::new(),
578                        None,
579                    ));
580                }
581                OrchestrationStatus::Failed => {
582                    let failure = inner.completion_failure.take();
583                    actions.push(Self::make_complete_action(
584                        actions.len() as i32,
585                        proto::OrchestrationStatus::Failed,
586                        None,
587                        Vec::new(),
588                        failure,
589                    ));
590                }
591                OrchestrationStatus::Terminated => {
592                    actions.push(Self::make_complete_action(
593                        actions.len() as i32,
594                        proto::OrchestrationStatus::Terminated,
595                        inner.completion_result.take(),
596                        Vec::new(),
597                        None,
598                    ));
599                }
600                _ => {
601                    // Other statuses (Pending, Running, etc.) don't produce completion actions
602                }
603            }
604        }
605
606        // Persist applied patches so the runtime records them in the next
607        // WorkflowStarted event, enabling correct replay of patch-gated code.
608        let version = {
609            let mut applied: Vec<String> = inner
610                .applied_patches
611                .iter()
612                .filter(|(_, v)| **v)
613                .map(|(k, _)| k.clone())
614                .collect();
615            if applied.is_empty() {
616                None
617            } else {
618                applied.sort();
619                Some(proto::WorkflowVersion {
620                    patches: applied,
621                    name: None,
622                })
623            }
624        };
625
626        proto::WorkflowResponse {
627            instance_id: instance_id.to_string(),
628            actions,
629            custom_status: inner.custom_status.take(),
630            completion_token,
631            num_events_processed: None,
632            version,
633        }
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::internal::to_timestamp;
641    use crate::proto::history_event::EventType;
642
643    use std::sync::Arc;
644
645    fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
646        proto::HistoryEvent {
647            event_id: 1,
648            timestamp: Some(to_timestamp(ts)),
649            router: None,
650            event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
651                version: None,
652            })),
653        }
654    }
655
656    fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
657        proto::HistoryEvent {
658            event_id: 2,
659            timestamp: Some(to_timestamp(chrono::Utc::now())),
660            router: None,
661            event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
662                name: name.to_string(),
663                version: None,
664                input,
665                workflow_instance: None,
666                parent_instance: None,
667                scheduled_start_timestamp: None,
668                parent_trace_context: None,
669                workflow_span_id: None,
670                tags: Default::default(),
671            })),
672        }
673    }
674
675    fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
676        proto::HistoryEvent {
677            event_id,
678            timestamp: Some(to_timestamp(chrono::Utc::now())),
679            router: None,
680            event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
681                name: name.to_string(),
682                version: None,
683                input: None,
684                parent_trace_context: None,
685                task_execution_id: String::new(),
686                rerun_parent_instance_info: None,
687                history_propagation_scope: None,
688            })),
689        }
690    }
691
692    fn make_task_completed(
693        event_id: i32,
694        task_scheduled_id: i32,
695        result: Option<String>,
696    ) -> proto::HistoryEvent {
697        proto::HistoryEvent {
698            event_id,
699            timestamp: Some(to_timestamp(chrono::Utc::now())),
700            router: None,
701            event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
702                task_scheduled_id,
703                result,
704                task_execution_id: String::new(),
705                attestation: None,
706                signer_certificate: None,
707            })),
708        }
709    }
710
711    fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
712        proto::HistoryEvent {
713            event_id,
714            timestamp: Some(to_timestamp(chrono::Utc::now())),
715            router: None,
716            event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
717                task_scheduled_id,
718                failure_details: Some(proto::TaskFailureDetails {
719                    error_type: "TestError".to_string(),
720                    error_message: "test failure".to_string(),
721                    stack_trace: None,
722                    inner_failure: None,
723                    is_non_retriable: false,
724                }),
725                task_execution_id: String::new(),
726                attestation: None,
727                signer_certificate: None,
728            })),
729        }
730    }
731
732    #[tokio::test]
733    async fn test_simple_orchestrator_completes() {
734        let orch_fn: OrchestratorFn =
735            Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
736
737        let ts = chrono::Utc::now();
738        let old_events = vec![make_workflow_started(ts)];
739        let new_events = vec![make_execution_started("test_orch", None)];
740
741        let resp = OrchestrationExecutor::execute(
742            &orch_fn,
743            "inst-1",
744            old_events,
745            new_events,
746            String::new(),
747            &WorkerOptions::default(),
748            None,
749        )
750        .await
751        .unwrap();
752
753        assert_eq!(resp.instance_id, "inst-1");
754        let complete_action = resp.actions.iter().find(|a| {
755            matches!(
756                &a.workflow_action_type,
757                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
758            )
759        });
760        assert!(complete_action.is_some());
761        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
762            &complete_action.unwrap().workflow_action_type
763        {
764            assert_eq!(
765                cw.workflow_status,
766                proto::OrchestrationStatus::Completed as i32
767            );
768            assert_eq!(cw.result, Some("\"done\"".to_string()));
769        }
770    }
771
772    #[tokio::test]
773    async fn test_orchestrator_with_activity_replay() {
774        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
775            Box::pin(async move {
776                let result = ctx.call_activity("greet", "world").await?;
777                Ok(result)
778            })
779        });
780
781        let ts = chrono::Utc::now();
782        let old_events = vec![
783            make_workflow_started(ts),
784            make_execution_started("test_orch", None),
785            make_task_scheduled(3, "greet"),
786            make_task_completed(4, 0, Some("\"hello world\"".to_string())),
787        ];
788        let new_events = vec![];
789
790        let resp = OrchestrationExecutor::execute(
791            &orch_fn,
792            "inst-1",
793            old_events,
794            new_events,
795            String::new(),
796            &WorkerOptions::default(),
797            None,
798        )
799        .await
800        .unwrap();
801
802        let complete_action = resp.actions.iter().find(|a| {
803            matches!(
804                &a.workflow_action_type,
805                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
806            )
807        });
808        assert!(complete_action.is_some());
809        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
810            &complete_action.unwrap().workflow_action_type
811        {
812            assert_eq!(
813                cw.workflow_status,
814                proto::OrchestrationStatus::Completed as i32
815            );
816            assert_eq!(cw.result, Some("\"hello world\"".to_string()));
817        }
818    }
819
820    #[tokio::test]
821    async fn test_orchestrator_pending_activity() {
822        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
823            Box::pin(async move {
824                let result = ctx.call_activity("greet", "world").await?;
825                Ok(result)
826            })
827        });
828
829        let ts = chrono::Utc::now();
830        let old_events = vec![make_workflow_started(ts)];
831        let new_events = vec![make_execution_started("test_orch", None)];
832
833        let resp = OrchestrationExecutor::execute(
834            &orch_fn,
835            "inst-1",
836            old_events,
837            new_events,
838            String::new(),
839            &WorkerOptions::default(),
840            None,
841        )
842        .await
843        .unwrap();
844
845        let has_schedule = resp.actions.iter().any(|a| {
846            matches!(
847                &a.workflow_action_type,
848                Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
849            )
850        });
851        assert!(has_schedule);
852
853        let has_complete = resp.actions.iter().any(|a| {
854            matches!(
855                &a.workflow_action_type,
856                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
857            )
858        });
859        assert!(!has_complete);
860    }
861
862    #[tokio::test]
863    async fn test_orchestrator_task_failure() {
864        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
865            Box::pin(async move {
866                let result = ctx.call_activity("greet", "world").await?;
867                Ok(result)
868            })
869        });
870
871        let ts = chrono::Utc::now();
872        let old_events = vec![
873            make_workflow_started(ts),
874            make_execution_started("test_orch", None),
875            make_task_scheduled(3, "greet"),
876            make_task_failed(4, 0),
877        ];
878        let new_events = vec![];
879
880        let resp = OrchestrationExecutor::execute(
881            &orch_fn,
882            "inst-1",
883            old_events,
884            new_events,
885            String::new(),
886            &WorkerOptions::default(),
887            None,
888        )
889        .await
890        .unwrap();
891
892        let complete_action = resp.actions.iter().find(|a| {
893            matches!(
894                &a.workflow_action_type,
895                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
896            )
897        });
898        assert!(complete_action.is_some());
899        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
900            &complete_action.unwrap().workflow_action_type
901        {
902            assert_eq!(
903                cw.workflow_status,
904                proto::OrchestrationStatus::Failed as i32
905            );
906            assert!(cw.failure_details.is_some());
907        }
908    }
909
910    #[tokio::test]
911    async fn test_suspended_orchestration_not_run() {
912        let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
913
914        let ts = chrono::Utc::now();
915        let old_events = vec![make_workflow_started(ts)];
916        let new_events = vec![
917            make_execution_started("test_orch", None),
918            proto::HistoryEvent {
919                event_id: 3,
920                timestamp: Some(to_timestamp(chrono::Utc::now())),
921                router: None,
922                event_type: Some(EventType::ExecutionSuspended(
923                    proto::ExecutionSuspendedEvent {
924                        input: Some("paused".to_string()),
925                    },
926                )),
927            },
928        ];
929
930        let resp = OrchestrationExecutor::execute(
931            &orch_fn,
932            "inst-1",
933            old_events,
934            new_events,
935            String::new(),
936            &WorkerOptions::default(),
937            None,
938        )
939        .await
940        .unwrap();
941
942        assert!(resp.actions.is_empty());
943    }
944
945    #[tokio::test]
946    async fn test_terminated_orchestration_not_run() {
947        let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
948
949        let ts = chrono::Utc::now();
950        let old_events = vec![make_workflow_started(ts)];
951        let new_events = vec![
952            make_execution_started("test_orch", None),
953            proto::HistoryEvent {
954                event_id: 3,
955                timestamp: Some(to_timestamp(chrono::Utc::now())),
956                router: None,
957                event_type: Some(EventType::ExecutionTerminated(
958                    proto::ExecutionTerminatedEvent {
959                        input: None,
960                        recurse: false,
961                    },
962                )),
963            },
964        ];
965
966        let resp = OrchestrationExecutor::execute(
967            &orch_fn,
968            "inst-1",
969            old_events,
970            new_events,
971            String::new(),
972            &WorkerOptions::default(),
973            None,
974        )
975        .await
976        .unwrap();
977
978        // Terminated — CompleteWorkflow with Terminated status
979        assert_eq!(resp.actions.len(), 1);
980        match &resp.actions[0].workflow_action_type {
981            Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
982                assert_eq!(
983                    cw.workflow_status,
984                    proto::OrchestrationStatus::Terminated as i32
985                );
986                assert!(cw.result.is_none());
987            }
988            other => panic!("expected CompleteWorkflow, got {other:?}"),
989        }
990    }
991
992    #[tokio::test]
993    async fn test_continue_as_new() {
994        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
995            Box::pin(async move {
996                ctx.continue_as_new("new_input", false);
997                Ok(None)
998            })
999        });
1000
1001        let ts = chrono::Utc::now();
1002        let old_events = vec![make_workflow_started(ts)];
1003        let new_events = vec![make_execution_started("test_orch", None)];
1004
1005        let resp = OrchestrationExecutor::execute(
1006            &orch_fn,
1007            "inst-1",
1008            old_events,
1009            new_events,
1010            String::new(),
1011            &WorkerOptions::default(),
1012            None,
1013        )
1014        .await
1015        .unwrap();
1016
1017        let complete_action = resp.actions.iter().find(|a| {
1018            matches!(
1019                &a.workflow_action_type,
1020                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1021            )
1022        });
1023        assert!(complete_action.is_some());
1024        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1025            &complete_action.unwrap().workflow_action_type
1026        {
1027            assert_eq!(
1028                cw.workflow_status,
1029                proto::OrchestrationStatus::ContinuedAsNew as i32
1030            );
1031            assert_eq!(cw.result, Some("\"new_input\"".to_string()));
1032        }
1033    }
1034
1035    #[tokio::test]
1036    async fn test_external_event_delivery() {
1037        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
1038            Box::pin(async move {
1039                let result = ctx.wait_for_external_event("approval").await?;
1040                Ok(result)
1041            })
1042        });
1043
1044        let ts = chrono::Utc::now();
1045        let old_events = vec![make_workflow_started(ts)];
1046        let new_events = vec![
1047            make_execution_started("test_orch", None),
1048            proto::HistoryEvent {
1049                event_id: 3,
1050                timestamp: Some(to_timestamp(chrono::Utc::now())),
1051                router: None,
1052                event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
1053                    name: "approval".to_string(),
1054                    input: Some("\"yes\"".to_string()),
1055                })),
1056            },
1057        ];
1058
1059        let resp = OrchestrationExecutor::execute(
1060            &orch_fn,
1061            "inst-1",
1062            old_events,
1063            new_events,
1064            String::new(),
1065            &WorkerOptions::default(),
1066            None,
1067        )
1068        .await
1069        .unwrap();
1070
1071        let complete_action = resp.actions.iter().find(|a| {
1072            matches!(
1073                &a.workflow_action_type,
1074                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1075            )
1076        });
1077        assert!(complete_action.is_some());
1078        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1079            &complete_action.unwrap().workflow_action_type
1080        {
1081            assert_eq!(
1082                cw.workflow_status,
1083                proto::OrchestrationStatus::Completed as i32
1084            );
1085            assert_eq!(cw.result, Some("\"yes\"".to_string()));
1086        }
1087    }
1088}