Skip to main content

dapr_durabletask/worker/
orchestration_executor.rs

1use std::sync::atomic::Ordering;
2use std::task::Poll;
3
4use futures::FutureExt;
5
6use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
7use crate::internal::from_timestamp;
8use crate::proto;
9use crate::proto::history_event::EventType;
10use crate::task::OrchestrationContext;
11use crate::task::orchestration_context::{OrchestrationContextInner, lock_inner};
12
13use super::options::WorkerOptions;
14use super::registry::OrchestratorFn;
15
16/// Executes orchestrator functions by replaying history and processing new events.
17///
18/// The executor follows the durable task replay model:
19/// 1. Process old (past) events to pre-populate completed tasks
20/// 2. Process new events to deliver results and external events
21/// 3. Run the orchestrator function (unless suspended, terminated, or already complete)
22/// 4. Collect pending actions and build the response
23pub struct OrchestrationExecutor;
24
25impl OrchestrationExecutor {
26    /// Execute an orchestrator function by replaying history and processing new events.
27    ///
28    /// Returns a `WorkflowResponse` with the actions to take.
29    pub async fn execute(
30        orchestrator_fn: &OrchestratorFn,
31        instance_id: &str,
32        old_events: Vec<proto::HistoryEvent>,
33        new_events: Vec<proto::HistoryEvent>,
34        completion_token: String,
35        options: &WorkerOptions,
36        propagated_history: Option<crate::api::PropagatedHistory>,
37    ) -> crate::api::Result<proto::WorkflowResponse> {
38        tracing::info!(
39            instance_id = %instance_id,
40            past_events = old_events.len(),
41            new_events = new_events.len(),
42            "Starting orchestration execution"
43        );
44
45        // Name and input start empty and are overwritten when ExecutionStarted is replayed.
46        let ctx = OrchestrationContext::new(
47            instance_id.to_string(),
48            String::new(),
49            None,
50            chrono::Utc::now(),
51            true,
52            options,
53            old_events.len() + new_events.len(),
54        );
55
56        // Stash the propagated history (if any) before running the function so
57        // that ctx.propagated_history() is available during user code.
58        if propagated_history.is_some() {
59            let mut inner = lock_inner(&ctx.inner);
60            inner.propagated_history = propagated_history.map(std::sync::Arc::new);
61        }
62
63        // Process all history events under a single lock acquisition.
64        //
65        // Completions applied while draining old events are tagged
66        // "during replay"; those applied from new events are tagged "new".
67        // `CompletableTask::poll` then clears the shared `is_replaying`
68        // flag the first time the orchestrator awaits a "new" completion —
69        // the replay frontier.
70        let initially_replaying = !old_events.is_empty();
71        {
72            let mut inner = lock_inner(&ctx.inner);
73            inner.is_replaying.store(true, Ordering::Release);
74            tracing::debug!(
75                instance_id = %instance_id,
76                count = old_events.len(),
77                "Replaying old events"
78            );
79            for event in &old_events {
80                Self::process_event(
81                    &mut inner,
82                    event,
83                    instance_id,
84                    options.max_identifier_length,
85                );
86            }
87
88            inner.is_replaying.store(false, Ordering::Release);
89            tracing::debug!(
90                instance_id = %instance_id,
91                count = new_events.len(),
92                "Processing new events"
93            );
94            for event in &new_events {
95                Self::process_event(
96                    &mut inner,
97                    event,
98                    instance_id,
99                    options.max_identifier_length,
100                );
101            }
102
103            // Brand-new executions run at the frontier from the first poll;
104            // re-runs start in replay and clear the flag as awaits resolve.
105            inner
106                .is_replaying
107                .store(initially_replaying, Ordering::Release);
108        }
109
110        // Start an OTel orchestration span
111        #[cfg(feature = "opentelemetry")]
112        let otel_ctx = {
113            let inner = lock_inner(&ctx.inner);
114            let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
115            let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
116            crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
117        };
118
119        let should_run = {
120            let inner = lock_inner(&ctx.inner);
121            !inner.is_suspended && !inner.is_complete
122        };
123
124        if should_run {
125            tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
126
127            // Poll the orchestrator future once — if all awaited tasks are already
128            // completed from replay, the future runs to completion in a single poll.
129            let mut future = (orchestrator_fn)(ctx.clone()).boxed();
130            let poll_result = futures::poll!(future.as_mut());
131
132            match poll_result {
133                Poll::Ready(Ok(output)) => {
134                    let mut inner = lock_inner(&ctx.inner);
135                    if inner.continue_as_new_input.is_some() {
136                        tracing::info!(
137                            instance_id = %instance_id,
138                            orchestrator = %inner.name,
139                            "Orchestration continuing as new"
140                        );
141                        #[cfg(feature = "opentelemetry")]
142                        crate::internal::otel::set_span_status_attribute(
143                            &otel_ctx,
144                            "CONTINUED_AS_NEW",
145                        );
146                        inner.is_complete = true;
147                        inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
148                    } else if !inner.is_complete {
149                        tracing::info!(
150                            instance_id = %instance_id,
151                            orchestrator = %inner.name,
152                            "Orchestration completed successfully"
153                        );
154                        #[cfg(feature = "opentelemetry")]
155                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
156                        inner.is_complete = true;
157                        inner.completion_status = Some(OrchestrationStatus::Completed);
158                        inner.completion_result = output;
159                    }
160                }
161                Poll::Ready(Err(DurableTaskError::TaskFailed {
162                    message,
163                    failure_details,
164                })) => {
165                    let mut inner = lock_inner(&ctx.inner);
166                    tracing::warn!(
167                        instance_id = %instance_id,
168                        orchestrator = %inner.name,
169                        error = %message,
170                        "Orchestration failed due to task failure"
171                    );
172                    #[cfg(feature = "opentelemetry")]
173                    {
174                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
175                        crate::internal::otel::set_span_error(&otel_ctx, &message);
176                    }
177                    inner.is_complete = true;
178                    inner.completion_status = Some(OrchestrationStatus::Failed);
179                    inner.completion_failure =
180                        Some(failure_details.unwrap_or_else(|| FailureDetails {
181                            message: message.clone(),
182                            error_type: "TaskFailed".to_string(),
183                            stack_trace: None,
184                        }));
185                }
186                Poll::Ready(Err(e)) => {
187                    let mut inner = lock_inner(&ctx.inner);
188                    tracing::error!(
189                        instance_id = %instance_id,
190                        orchestrator = %inner.name,
191                        error = %e,
192                        "Orchestration failed with error"
193                    );
194                    #[cfg(feature = "opentelemetry")]
195                    {
196                        crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
197                        crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
198                    }
199                    inner.is_complete = true;
200                    inner.completion_status = Some(OrchestrationStatus::Failed);
201                    inner.completion_failure = Some(FailureDetails {
202                        message: e.to_string(),
203                        error_type: "OrchestratorError".to_string(),
204                        stack_trace: None,
205                    });
206                }
207                Poll::Pending => {
208                    let inner = lock_inner(&ctx.inner);
209                    tracing::debug!(
210                        instance_id = %instance_id,
211                        orchestrator = %inner.name,
212                        pending_actions = inner.pending_actions.len(),
213                        "Orchestrator yielded, waiting for tasks"
214                    );
215                }
216            }
217        } else {
218            let inner = lock_inner(&ctx.inner);
219            tracing::debug!(
220                instance_id = %instance_id,
221                is_suspended = inner.is_suspended,
222                is_complete = inner.is_complete,
223                "Skipping orchestrator execution"
224            );
225            #[cfg(feature = "opentelemetry")]
226            if inner.is_complete {
227                crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
228            }
229        }
230
231        // End the OTel span
232        #[cfg(feature = "opentelemetry")]
233        crate::internal::otel::end_span(&otel_ctx);
234
235        let response = Self::build_response(&ctx, instance_id, completion_token);
236        tracing::debug!(
237            instance_id = %instance_id,
238            actions = response.actions.len(),
239            "Built orchestration response"
240        );
241        Ok(response)
242    }
243
244    /// Find the parent trace context from ExecutionStarted events in history.
245    #[cfg(feature = "opentelemetry")]
246    fn find_parent_trace_context<'a>(
247        old_events: &'a [proto::HistoryEvent],
248        new_events: &'a [proto::HistoryEvent],
249    ) -> Option<&'a proto::TraceContext> {
250        old_events.iter().chain(new_events.iter()).find_map(|e| {
251            if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
252                es.parent_trace_context.as_ref()
253            } else {
254                None
255            }
256        })
257    }
258
259    /// Process a single history event, updating the orchestration context state.
260    ///
261    /// Operates directly on the inner state to avoid per-event mutex acquisition.
262    /// The caller must hold the lock.
263    fn process_event(
264        inner: &mut OrchestrationContextInner,
265        event: &proto::HistoryEvent,
266        instance_id: &str,
267        max_identifier_length: usize,
268    ) {
269        let event_type = match &event.event_type {
270            Some(et) => et,
271            None => return,
272        };
273
274        let during_replay = inner.is_replaying.load(Ordering::Acquire);
275        let replay_handle = inner.is_replaying.clone();
276        // Get-or-insert a pending task at `seq`, ensuring placeholders
277        // inherit the shared replay flag.
278        let pending_task = |inner: &mut OrchestrationContextInner, seq: i32| {
279            let task = inner.pending_tasks.entry(seq).or_default();
280            task.set_replay_handle(replay_handle.clone());
281            task.clone()
282        };
283
284        match event_type {
285            EventType::WorkflowStarted(ws) => {
286                if let Some(ts) = &event.timestamp
287                    && let Some(dt) = from_timestamp(ts)
288                {
289                    inner.current_utc_datetime = dt;
290                }
291                if let Some(version) = &ws.version {
292                    for patch in &version.patches {
293                        inner.history_patches.insert(patch.clone());
294                    }
295                }
296            }
297            EventType::ExecutionStarted(e) => {
298                tracing::debug!(
299                    instance_id = %instance_id,
300                    orchestrator = %e.name,
301                    "Execution started event"
302                );
303                inner.name = std::sync::Arc::<str>::from(e.name.clone());
304                inner.input = e.input.clone();
305            }
306            EventType::TaskCompleted(e) => {
307                let seq = e.task_scheduled_id;
308                tracing::debug!(
309                    instance_id = %instance_id,
310                    task_id = seq,
311                    "Task completed"
312                );
313                let task = pending_task(inner, seq);
314                if task.is_complete() {
315                    tracing::debug!(
316                        instance_id = %instance_id,
317                        task_id = seq,
318                        "Skipping duplicate task completion"
319                    );
320                    return;
321                }
322                task.complete_with_phase(e.result.clone(), during_replay);
323            }
324            EventType::TaskFailed(e) => {
325                let seq = e.task_scheduled_id;
326                let details = e
327                    .failure_details
328                    .as_ref()
329                    .map(FailureDetails::from)
330                    .unwrap_or_else(|| FailureDetails {
331                        message: "Task failed".to_string(),
332                        error_type: "Unknown".to_string(),
333                        stack_trace: None,
334                    });
335                tracing::debug!(
336                    instance_id = %instance_id,
337                    task_id = seq,
338                    error = %details.message,
339                    "Task failed"
340                );
341                let task = pending_task(inner, seq);
342                if task.is_complete() {
343                    tracing::debug!(
344                        instance_id = %instance_id,
345                        task_id = seq,
346                        "Skipping duplicate task completion"
347                    );
348                    return;
349                }
350                task.fail_with_phase(details, during_replay);
351            }
352            EventType::TaskScheduled(_)
353            | EventType::TimerCreated(_)
354            | EventType::ChildWorkflowInstanceCreated(_) => {
355                inner.history_scheduled_count += 1;
356            }
357            EventType::TimerFired(e) => {
358                let seq = e.timer_id;
359                tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
360                let task = pending_task(inner, seq);
361                if task.is_complete() {
362                    tracing::debug!(
363                        instance_id = %instance_id,
364                        task_id = seq,
365                        "Skipping duplicate task completion"
366                    );
367                    return;
368                }
369                task.complete_with_phase(None, during_replay);
370            }
371            EventType::ChildWorkflowInstanceCompleted(e) => {
372                let seq = e.task_scheduled_id;
373                tracing::debug!(
374                    instance_id = %instance_id,
375                    task_id = seq,
376                    "Child workflow completed"
377                );
378                let task = pending_task(inner, seq);
379                if task.is_complete() {
380                    tracing::debug!(
381                        instance_id = %instance_id,
382                        task_id = seq,
383                        "Skipping duplicate task completion"
384                    );
385                    return;
386                }
387                task.complete_with_phase(e.result.clone(), during_replay);
388            }
389            EventType::ChildWorkflowInstanceFailed(e) => {
390                let seq = e.task_scheduled_id;
391                let details = e
392                    .failure_details
393                    .as_ref()
394                    .map(FailureDetails::from)
395                    .unwrap_or_else(|| FailureDetails {
396                        message: "Sub-orchestration failed".to_string(),
397                        error_type: "Unknown".to_string(),
398                        stack_trace: None,
399                    });
400                tracing::debug!(
401                    instance_id = %instance_id,
402                    task_id = seq,
403                    error = %details.message,
404                    "Child workflow failed"
405                );
406                let task = pending_task(inner, seq);
407                if task.is_complete() {
408                    tracing::debug!(
409                        instance_id = %instance_id,
410                        task_id = seq,
411                        "Skipping duplicate task completion"
412                    );
413                    return;
414                }
415                task.fail_with_phase(details, during_replay);
416            }
417            EventType::EventRaised(e) => {
418                if let Err(err) = crate::internal::validate_identifier(
419                    &e.name,
420                    "event name",
421                    max_identifier_length,
422                ) {
423                    tracing::warn!(
424                        instance_id = %instance_id,
425                        event_name = %e.name,
426                        error = %err,
427                        "Rejected event: invalid event name"
428                    );
429                    return;
430                }
431                let event_name = e.name.to_lowercase();
432                tracing::debug!(
433                    instance_id = %instance_id,
434                    event_name = %e.name,
435                    "External event raised"
436                );
437
438                if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name)
439                    && !tasks.is_empty()
440                {
441                    let task = tasks
442                        .pop_front()
443                        .expect("pending event task queue is not empty");
444                    if task.is_complete() {
445                        tracing::debug!(
446                            instance_id = %instance_id,
447                            event_name = %e.name,
448                            "Skipping duplicate task completion"
449                        );
450                        return;
451                    }
452                    task.complete_with_phase(e.input.clone(), during_replay);
453                    return;
454                }
455
456                if inner.buffered_events.len() >= inner.config.max_event_names
457                    && !inner.buffered_events.contains_key(&event_name)
458                {
459                    tracing::warn!(
460                        instance_id = %instance_id,
461                        event_name = %e.name,
462                        "Event name limit reached, discarding event"
463                    );
464                    return;
465                }
466
467                let max_events = inner.config.max_events_per_name;
468                let events = inner.buffered_events.entry(event_name).or_default();
469                if events.len() >= max_events {
470                    tracing::warn!(
471                        instance_id = %instance_id,
472                        event_name = %e.name,
473                        "Event buffer limit reached, discarding event"
474                    );
475                    return;
476                }
477                events.push_back((e.input.clone(), during_replay));
478            }
479            EventType::ExecutionSuspended(_) => {
480                tracing::info!(instance_id = %instance_id, "Orchestration suspended");
481                inner.is_suspended = true;
482            }
483            EventType::ExecutionResumed(_) => {
484                tracing::info!(instance_id = %instance_id, "Orchestration resumed");
485                inner.is_suspended = false;
486            }
487            EventType::ExecutionTerminated(e) => {
488                tracing::info!(instance_id = %instance_id, "Orchestration terminated");
489                inner.is_complete = true;
490                inner.completion_status = Some(OrchestrationStatus::Terminated);
491                inner.completion_result = e.input.clone();
492                inner.pending_actions.clear();
493            }
494            EventType::ExecutionCompleted(_)
495            | EventType::WorkflowCompleted(_)
496            | EventType::EventSent(_)
497            | EventType::ContinueAsNew(_)
498            | EventType::ExecutionStalled(_)
499            | EventType::DetachedWorkflowInstanceCreated(_) => {}
500        }
501    }
502
503    fn make_complete_action(
504        id: i32,
505        status: proto::OrchestrationStatus,
506        result: Option<String>,
507        carryover_events: Vec<proto::HistoryEvent>,
508        failure: Option<FailureDetails>,
509    ) -> proto::WorkflowAction {
510        proto::WorkflowAction {
511            id,
512            router: None,
513            workflow_action_type: Some(
514                proto::workflow_action::WorkflowActionType::CompleteWorkflow(
515                    proto::CompleteWorkflowAction {
516                        workflow_status: status as i32,
517                        result,
518                        details: None,
519                        new_version: None,
520                        carryover_events,
521                        failure_details: failure.map(|f| proto::TaskFailureDetails {
522                            error_type: f.error_type,
523                            error_message: f.message,
524                            stack_trace: f.stack_trace,
525                            inner_failure: None,
526                            is_non_retriable: false,
527                        }),
528                    },
529                ),
530            ),
531        }
532    }
533
534    fn build_response(
535        ctx: &OrchestrationContext,
536        instance_id: &str,
537        completion_token: String,
538    ) -> proto::WorkflowResponse {
539        let mut inner = lock_inner(&ctx.inner);
540
541        // Move actions out instead of cloning — the context is consumed after
542        // this response is built, so the original Vec is no longer needed.
543        let mut actions = std::mem::take(&mut inner.pending_actions);
544
545        if let Some(new_input) = inner.continue_as_new_input.take() {
546            let mut carryover_events = Vec::new();
547            if inner.save_events_on_continue {
548                for (name, events) in &inner.buffered_events {
549                    for (input, _during_replay) in events {
550                        carryover_events.push(proto::HistoryEvent {
551                            event_id: -1,
552                            timestamp: None,
553                            router: None,
554                            event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
555                                name: name.clone(),
556                                input: input.clone(),
557                            })),
558                        });
559                    }
560                }
561            }
562
563            actions.push(Self::make_complete_action(
564                actions.len() as i32,
565                proto::OrchestrationStatus::ContinuedAsNew,
566                Some(new_input),
567                carryover_events,
568                None,
569            ));
570        } else if let Some(status) = inner.completion_status {
571            match status {
572                OrchestrationStatus::Completed => {
573                    actions.push(Self::make_complete_action(
574                        actions.len() as i32,
575                        proto::OrchestrationStatus::Completed,
576                        inner.completion_result.take(),
577                        Vec::new(),
578                        None,
579                    ));
580                }
581                OrchestrationStatus::Failed => {
582                    let failure = inner.completion_failure.take();
583                    actions.push(Self::make_complete_action(
584                        actions.len() as i32,
585                        proto::OrchestrationStatus::Failed,
586                        None,
587                        Vec::new(),
588                        failure,
589                    ));
590                }
591                OrchestrationStatus::Terminated => {
592                    actions.push(Self::make_complete_action(
593                        actions.len() as i32,
594                        proto::OrchestrationStatus::Terminated,
595                        inner.completion_result.take(),
596                        Vec::new(),
597                        None,
598                    ));
599                }
600                _ => {
601                    // Other statuses (Pending, Running, etc.) don't produce completion actions
602                }
603            }
604        }
605
606        proto::WorkflowResponse {
607            instance_id: instance_id.to_string(),
608            actions,
609            custom_status: inner.custom_status.take(),
610            completion_token,
611            num_events_processed: None,
612            version: None,
613        }
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620    use crate::internal::to_timestamp;
621    use crate::proto::history_event::EventType;
622
623    use std::sync::Arc;
624
625    fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
626        proto::HistoryEvent {
627            event_id: 1,
628            timestamp: Some(to_timestamp(ts)),
629            router: None,
630            event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
631                version: None,
632            })),
633        }
634    }
635
636    fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
637        proto::HistoryEvent {
638            event_id: 2,
639            timestamp: Some(to_timestamp(chrono::Utc::now())),
640            router: None,
641            event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
642                name: name.to_string(),
643                version: None,
644                input,
645                workflow_instance: None,
646                parent_instance: None,
647                scheduled_start_timestamp: None,
648                parent_trace_context: None,
649                workflow_span_id: None,
650                tags: Default::default(),
651            })),
652        }
653    }
654
655    fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
656        proto::HistoryEvent {
657            event_id,
658            timestamp: Some(to_timestamp(chrono::Utc::now())),
659            router: None,
660            event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
661                name: name.to_string(),
662                version: None,
663                input: None,
664                parent_trace_context: None,
665                task_execution_id: String::new(),
666                rerun_parent_instance_info: None,
667                history_propagation_scope: None,
668            })),
669        }
670    }
671
672    fn make_task_completed(
673        event_id: i32,
674        task_scheduled_id: i32,
675        result: Option<String>,
676    ) -> proto::HistoryEvent {
677        proto::HistoryEvent {
678            event_id,
679            timestamp: Some(to_timestamp(chrono::Utc::now())),
680            router: None,
681            event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
682                task_scheduled_id,
683                result,
684                task_execution_id: String::new(),
685                attestation: None,
686                signer_certificate: None,
687            })),
688        }
689    }
690
691    fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
692        proto::HistoryEvent {
693            event_id,
694            timestamp: Some(to_timestamp(chrono::Utc::now())),
695            router: None,
696            event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
697                task_scheduled_id,
698                failure_details: Some(proto::TaskFailureDetails {
699                    error_type: "TestError".to_string(),
700                    error_message: "test failure".to_string(),
701                    stack_trace: None,
702                    inner_failure: None,
703                    is_non_retriable: false,
704                }),
705                task_execution_id: String::new(),
706                attestation: None,
707                signer_certificate: None,
708            })),
709        }
710    }
711
712    #[tokio::test]
713    async fn test_simple_orchestrator_completes() {
714        let orch_fn: OrchestratorFn =
715            Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
716
717        let ts = chrono::Utc::now();
718        let old_events = vec![make_workflow_started(ts)];
719        let new_events = vec![make_execution_started("test_orch", None)];
720
721        let resp = OrchestrationExecutor::execute(
722            &orch_fn,
723            "inst-1",
724            old_events,
725            new_events,
726            String::new(),
727            &WorkerOptions::default(),
728            None,
729        )
730        .await
731        .unwrap();
732
733        assert_eq!(resp.instance_id, "inst-1");
734        let complete_action = resp.actions.iter().find(|a| {
735            matches!(
736                &a.workflow_action_type,
737                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
738            )
739        });
740        assert!(complete_action.is_some());
741        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
742            &complete_action.unwrap().workflow_action_type
743        {
744            assert_eq!(
745                cw.workflow_status,
746                proto::OrchestrationStatus::Completed as i32
747            );
748            assert_eq!(cw.result, Some("\"done\"".to_string()));
749        }
750    }
751
752    #[tokio::test]
753    async fn test_orchestrator_with_activity_replay() {
754        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
755            Box::pin(async move {
756                let result = ctx.call_activity("greet", "world").await?;
757                Ok(result)
758            })
759        });
760
761        let ts = chrono::Utc::now();
762        let old_events = vec![
763            make_workflow_started(ts),
764            make_execution_started("test_orch", None),
765            make_task_scheduled(3, "greet"),
766            make_task_completed(4, 0, Some("\"hello world\"".to_string())),
767        ];
768        let new_events = vec![];
769
770        let resp = OrchestrationExecutor::execute(
771            &orch_fn,
772            "inst-1",
773            old_events,
774            new_events,
775            String::new(),
776            &WorkerOptions::default(),
777            None,
778        )
779        .await
780        .unwrap();
781
782        let complete_action = resp.actions.iter().find(|a| {
783            matches!(
784                &a.workflow_action_type,
785                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
786            )
787        });
788        assert!(complete_action.is_some());
789        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
790            &complete_action.unwrap().workflow_action_type
791        {
792            assert_eq!(
793                cw.workflow_status,
794                proto::OrchestrationStatus::Completed as i32
795            );
796            assert_eq!(cw.result, Some("\"hello world\"".to_string()));
797        }
798    }
799
800    #[tokio::test]
801    async fn test_orchestrator_pending_activity() {
802        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
803            Box::pin(async move {
804                let result = ctx.call_activity("greet", "world").await?;
805                Ok(result)
806            })
807        });
808
809        let ts = chrono::Utc::now();
810        let old_events = vec![make_workflow_started(ts)];
811        let new_events = vec![make_execution_started("test_orch", None)];
812
813        let resp = OrchestrationExecutor::execute(
814            &orch_fn,
815            "inst-1",
816            old_events,
817            new_events,
818            String::new(),
819            &WorkerOptions::default(),
820            None,
821        )
822        .await
823        .unwrap();
824
825        let has_schedule = resp.actions.iter().any(|a| {
826            matches!(
827                &a.workflow_action_type,
828                Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
829            )
830        });
831        assert!(has_schedule);
832
833        let has_complete = resp.actions.iter().any(|a| {
834            matches!(
835                &a.workflow_action_type,
836                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
837            )
838        });
839        assert!(!has_complete);
840    }
841
842    #[tokio::test]
843    async fn test_orchestrator_task_failure() {
844        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
845            Box::pin(async move {
846                let result = ctx.call_activity("greet", "world").await?;
847                Ok(result)
848            })
849        });
850
851        let ts = chrono::Utc::now();
852        let old_events = vec![
853            make_workflow_started(ts),
854            make_execution_started("test_orch", None),
855            make_task_scheduled(3, "greet"),
856            make_task_failed(4, 0),
857        ];
858        let new_events = vec![];
859
860        let resp = OrchestrationExecutor::execute(
861            &orch_fn,
862            "inst-1",
863            old_events,
864            new_events,
865            String::new(),
866            &WorkerOptions::default(),
867            None,
868        )
869        .await
870        .unwrap();
871
872        let complete_action = resp.actions.iter().find(|a| {
873            matches!(
874                &a.workflow_action_type,
875                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
876            )
877        });
878        assert!(complete_action.is_some());
879        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
880            &complete_action.unwrap().workflow_action_type
881        {
882            assert_eq!(
883                cw.workflow_status,
884                proto::OrchestrationStatus::Failed as i32
885            );
886            assert!(cw.failure_details.is_some());
887        }
888    }
889
890    #[tokio::test]
891    async fn test_suspended_orchestration_not_run() {
892        let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
893
894        let ts = chrono::Utc::now();
895        let old_events = vec![make_workflow_started(ts)];
896        let new_events = vec![
897            make_execution_started("test_orch", None),
898            proto::HistoryEvent {
899                event_id: 3,
900                timestamp: Some(to_timestamp(chrono::Utc::now())),
901                router: None,
902                event_type: Some(EventType::ExecutionSuspended(
903                    proto::ExecutionSuspendedEvent {
904                        input: Some("paused".to_string()),
905                    },
906                )),
907            },
908        ];
909
910        let resp = OrchestrationExecutor::execute(
911            &orch_fn,
912            "inst-1",
913            old_events,
914            new_events,
915            String::new(),
916            &WorkerOptions::default(),
917            None,
918        )
919        .await
920        .unwrap();
921
922        assert!(resp.actions.is_empty());
923    }
924
925    #[tokio::test]
926    async fn test_terminated_orchestration_not_run() {
927        let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
928
929        let ts = chrono::Utc::now();
930        let old_events = vec![make_workflow_started(ts)];
931        let new_events = vec![
932            make_execution_started("test_orch", None),
933            proto::HistoryEvent {
934                event_id: 3,
935                timestamp: Some(to_timestamp(chrono::Utc::now())),
936                router: None,
937                event_type: Some(EventType::ExecutionTerminated(
938                    proto::ExecutionTerminatedEvent {
939                        input: None,
940                        recurse: false,
941                    },
942                )),
943            },
944        ];
945
946        let resp = OrchestrationExecutor::execute(
947            &orch_fn,
948            "inst-1",
949            old_events,
950            new_events,
951            String::new(),
952            &WorkerOptions::default(),
953            None,
954        )
955        .await
956        .unwrap();
957
958        // Terminated — CompleteWorkflow with Terminated status
959        assert_eq!(resp.actions.len(), 1);
960        match &resp.actions[0].workflow_action_type {
961            Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
962                assert_eq!(
963                    cw.workflow_status,
964                    proto::OrchestrationStatus::Terminated as i32
965                );
966                assert!(cw.result.is_none());
967            }
968            other => panic!("expected CompleteWorkflow, got {other:?}"),
969        }
970    }
971
972    #[tokio::test]
973    async fn test_continue_as_new() {
974        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
975            Box::pin(async move {
976                ctx.continue_as_new("new_input", false);
977                Ok(None)
978            })
979        });
980
981        let ts = chrono::Utc::now();
982        let old_events = vec![make_workflow_started(ts)];
983        let new_events = vec![make_execution_started("test_orch", None)];
984
985        let resp = OrchestrationExecutor::execute(
986            &orch_fn,
987            "inst-1",
988            old_events,
989            new_events,
990            String::new(),
991            &WorkerOptions::default(),
992            None,
993        )
994        .await
995        .unwrap();
996
997        let complete_action = resp.actions.iter().find(|a| {
998            matches!(
999                &a.workflow_action_type,
1000                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1001            )
1002        });
1003        assert!(complete_action.is_some());
1004        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1005            &complete_action.unwrap().workflow_action_type
1006        {
1007            assert_eq!(
1008                cw.workflow_status,
1009                proto::OrchestrationStatus::ContinuedAsNew as i32
1010            );
1011            assert_eq!(cw.result, Some("\"new_input\"".to_string()));
1012        }
1013    }
1014
1015    #[tokio::test]
1016    async fn test_external_event_delivery() {
1017        let orch_fn: OrchestratorFn = Arc::new(|ctx| {
1018            Box::pin(async move {
1019                let result = ctx.wait_for_external_event("approval").await?;
1020                Ok(result)
1021            })
1022        });
1023
1024        let ts = chrono::Utc::now();
1025        let old_events = vec![make_workflow_started(ts)];
1026        let new_events = vec![
1027            make_execution_started("test_orch", None),
1028            proto::HistoryEvent {
1029                event_id: 3,
1030                timestamp: Some(to_timestamp(chrono::Utc::now())),
1031                router: None,
1032                event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
1033                    name: "approval".to_string(),
1034                    input: Some("\"yes\"".to_string()),
1035                })),
1036            },
1037        ];
1038
1039        let resp = OrchestrationExecutor::execute(
1040            &orch_fn,
1041            "inst-1",
1042            old_events,
1043            new_events,
1044            String::new(),
1045            &WorkerOptions::default(),
1046            None,
1047        )
1048        .await
1049        .unwrap();
1050
1051        let complete_action = resp.actions.iter().find(|a| {
1052            matches!(
1053                &a.workflow_action_type,
1054                Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
1055            )
1056        });
1057        assert!(complete_action.is_some());
1058        if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
1059            &complete_action.unwrap().workflow_action_type
1060        {
1061            assert_eq!(
1062                cw.workflow_status,
1063                proto::OrchestrationStatus::Completed as i32
1064            );
1065            assert_eq!(cw.result, Some("\"yes\"".to_string()));
1066        }
1067    }
1068}