use std::sync::atomic::Ordering;
use std::task::Poll;
use futures::FutureExt;
use crate::api::{DurableTaskError, FailureDetails, OrchestrationStatus};
use crate::internal::from_timestamp;
use crate::proto;
use crate::proto::history_event::EventType;
use crate::task::OrchestrationContext;
use crate::task::orchestration_context::{OrchestrationContextInner, lock_inner};
use super::options::WorkerOptions;
use super::registry::OrchestratorFn;
pub struct OrchestrationExecutor;
impl OrchestrationExecutor {
pub async fn execute(
orchestrator_fn: &OrchestratorFn,
instance_id: &str,
old_events: Vec<proto::HistoryEvent>,
new_events: Vec<proto::HistoryEvent>,
completion_token: String,
options: &WorkerOptions,
propagated_history: Option<crate::api::PropagatedHistory>,
) -> crate::api::Result<proto::WorkflowResponse> {
tracing::info!(
instance_id = %instance_id,
past_events = old_events.len(),
new_events = new_events.len(),
"Starting orchestration execution"
);
let ctx = OrchestrationContext::new(
instance_id.to_string(),
String::new(),
None,
chrono::Utc::now(),
true,
options,
old_events.len() + new_events.len(),
);
if propagated_history.is_some() {
let mut inner = lock_inner(&ctx.inner);
inner.propagated_history = propagated_history.map(std::sync::Arc::new);
}
let initially_replaying = !old_events.is_empty();
{
let mut inner = lock_inner(&ctx.inner);
inner.is_replaying.store(true, Ordering::Release);
tracing::debug!(
instance_id = %instance_id,
count = old_events.len(),
"Replaying old events"
);
for event in &old_events {
Self::process_event(
&mut inner,
event,
instance_id,
options.max_identifier_length,
);
}
inner.is_replaying.store(false, Ordering::Release);
tracing::debug!(
instance_id = %instance_id,
count = new_events.len(),
"Processing new events"
);
for event in &new_events {
Self::process_event(
&mut inner,
event,
instance_id,
options.max_identifier_length,
);
}
inner
.is_replaying
.store(initially_replaying, Ordering::Release);
}
#[cfg(feature = "opentelemetry")]
let otel_ctx = {
let inner = lock_inner(&ctx.inner);
let parent_tc = Self::find_parent_trace_context(&old_events, &new_events);
let parent_ctx = crate::internal::otel::context_from_trace_context(parent_tc);
crate::internal::otel::start_orchestration_span(&parent_ctx, &inner.name, instance_id)
};
let should_run = {
let inner = lock_inner(&ctx.inner);
!inner.is_suspended && !inner.is_complete
};
if should_run {
tracing::debug!(instance_id = %instance_id, "Polling orchestrator function");
let mut future = (orchestrator_fn)(ctx.clone()).boxed();
let poll_result = futures::poll!(future.as_mut());
match poll_result {
Poll::Ready(Ok(output)) => {
let mut inner = lock_inner(&ctx.inner);
if inner.continue_as_new_input.is_some() {
tracing::info!(
instance_id = %instance_id,
orchestrator = %inner.name,
"Orchestration continuing as new"
);
#[cfg(feature = "opentelemetry")]
crate::internal::otel::set_span_status_attribute(
&otel_ctx,
"CONTINUED_AS_NEW",
);
inner.is_complete = true;
inner.completion_status = Some(OrchestrationStatus::ContinuedAsNew);
} else if !inner.is_complete {
tracing::info!(
instance_id = %instance_id,
orchestrator = %inner.name,
"Orchestration completed successfully"
);
#[cfg(feature = "opentelemetry")]
crate::internal::otel::set_span_status_attribute(&otel_ctx, "COMPLETED");
inner.is_complete = true;
inner.completion_status = Some(OrchestrationStatus::Completed);
inner.completion_result = output;
}
}
Poll::Ready(Err(DurableTaskError::TaskFailed {
message,
failure_details,
})) => {
let mut inner = lock_inner(&ctx.inner);
tracing::warn!(
instance_id = %instance_id,
orchestrator = %inner.name,
error = %message,
"Orchestration failed due to task failure"
);
#[cfg(feature = "opentelemetry")]
{
crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
crate::internal::otel::set_span_error(&otel_ctx, &message);
}
inner.is_complete = true;
inner.completion_status = Some(OrchestrationStatus::Failed);
inner.completion_failure =
Some(failure_details.unwrap_or_else(|| FailureDetails {
message: message.clone(),
error_type: "TaskFailed".to_string(),
stack_trace: None,
}));
}
Poll::Ready(Err(e)) => {
let mut inner = lock_inner(&ctx.inner);
tracing::error!(
instance_id = %instance_id,
orchestrator = %inner.name,
error = %e,
"Orchestration failed with error"
);
#[cfg(feature = "opentelemetry")]
{
crate::internal::otel::set_span_status_attribute(&otel_ctx, "FAILED");
crate::internal::otel::set_span_error(&otel_ctx, &e.to_string());
}
inner.is_complete = true;
inner.completion_status = Some(OrchestrationStatus::Failed);
inner.completion_failure = Some(FailureDetails {
message: e.to_string(),
error_type: "OrchestratorError".to_string(),
stack_trace: None,
});
}
Poll::Pending => {
let inner = lock_inner(&ctx.inner);
tracing::debug!(
instance_id = %instance_id,
orchestrator = %inner.name,
pending_actions = inner.pending_actions.len(),
"Orchestrator yielded, waiting for tasks"
);
}
}
} else {
let inner = lock_inner(&ctx.inner);
tracing::debug!(
instance_id = %instance_id,
is_suspended = inner.is_suspended,
is_complete = inner.is_complete,
"Skipping orchestrator execution"
);
#[cfg(feature = "opentelemetry")]
if inner.is_complete {
crate::internal::otel::set_span_status_attribute(&otel_ctx, "TERMINATED");
}
}
#[cfg(feature = "opentelemetry")]
crate::internal::otel::end_span(&otel_ctx);
let response = Self::build_response(&ctx, instance_id, completion_token);
tracing::debug!(
instance_id = %instance_id,
actions = response.actions.len(),
"Built orchestration response"
);
Ok(response)
}
#[cfg(feature = "opentelemetry")]
fn find_parent_trace_context<'a>(
old_events: &'a [proto::HistoryEvent],
new_events: &'a [proto::HistoryEvent],
) -> Option<&'a proto::TraceContext> {
old_events.iter().chain(new_events.iter()).find_map(|e| {
if let Some(EventType::ExecutionStarted(es)) = &e.event_type {
es.parent_trace_context.as_ref()
} else {
None
}
})
}
fn process_event(
inner: &mut OrchestrationContextInner,
event: &proto::HistoryEvent,
instance_id: &str,
max_identifier_length: usize,
) {
let event_type = match &event.event_type {
Some(et) => et,
None => return,
};
let during_replay = inner.is_replaying.load(Ordering::Acquire);
let replay_handle = inner.is_replaying.clone();
let pending_task = |inner: &mut OrchestrationContextInner, seq: i32| {
let task = inner.pending_tasks.entry(seq).or_default();
task.set_replay_handle(replay_handle.clone());
task.clone()
};
match event_type {
EventType::WorkflowStarted(ws) => {
if let Some(ts) = &event.timestamp
&& let Some(dt) = from_timestamp(ts)
{
inner.current_utc_datetime = dt;
}
if let Some(version) = &ws.version {
for patch in &version.patches {
inner.history_patches.insert(patch.clone());
}
}
}
EventType::ExecutionStarted(e) => {
tracing::debug!(
instance_id = %instance_id,
orchestrator = %e.name,
"Execution started event"
);
inner.name = std::sync::Arc::<str>::from(e.name.clone());
inner.input = e.input.clone();
}
EventType::TaskCompleted(e) => {
let seq = e.task_scheduled_id;
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Task completed"
);
let task = pending_task(inner, seq);
if task.is_complete() {
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Skipping duplicate task completion"
);
return;
}
task.complete_with_phase(e.result.clone(), during_replay);
}
EventType::TaskFailed(e) => {
let seq = e.task_scheduled_id;
let details = e
.failure_details
.as_ref()
.map(FailureDetails::from)
.unwrap_or_else(|| FailureDetails {
message: "Task failed".to_string(),
error_type: "Unknown".to_string(),
stack_trace: None,
});
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
error = %details.message,
"Task failed"
);
let task = pending_task(inner, seq);
if task.is_complete() {
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Skipping duplicate task completion"
);
return;
}
task.fail_with_phase(details, during_replay);
}
EventType::TaskScheduled(_)
| EventType::TimerCreated(_)
| EventType::ChildWorkflowInstanceCreated(_) => {
inner.history_scheduled_count += 1;
}
EventType::TimerFired(e) => {
let seq = e.timer_id;
tracing::debug!(instance_id = %instance_id, timer_id = seq, "Timer fired");
let task = pending_task(inner, seq);
if task.is_complete() {
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Skipping duplicate task completion"
);
return;
}
task.complete_with_phase(None, during_replay);
}
EventType::ChildWorkflowInstanceCompleted(e) => {
let seq = e.task_scheduled_id;
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Child workflow completed"
);
let task = pending_task(inner, seq);
if task.is_complete() {
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Skipping duplicate task completion"
);
return;
}
task.complete_with_phase(e.result.clone(), during_replay);
}
EventType::ChildWorkflowInstanceFailed(e) => {
let seq = e.task_scheduled_id;
let details = e
.failure_details
.as_ref()
.map(FailureDetails::from)
.unwrap_or_else(|| FailureDetails {
message: "Sub-orchestration failed".to_string(),
error_type: "Unknown".to_string(),
stack_trace: None,
});
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
error = %details.message,
"Child workflow failed"
);
let task = pending_task(inner, seq);
if task.is_complete() {
tracing::debug!(
instance_id = %instance_id,
task_id = seq,
"Skipping duplicate task completion"
);
return;
}
task.fail_with_phase(details, during_replay);
}
EventType::EventRaised(e) => {
if let Err(err) = crate::internal::validate_identifier(
&e.name,
"event name",
max_identifier_length,
) {
tracing::warn!(
instance_id = %instance_id,
event_name = %e.name,
error = %err,
"Rejected event: invalid event name"
);
return;
}
let event_name = e.name.to_lowercase();
tracing::debug!(
instance_id = %instance_id,
event_name = %e.name,
"External event raised"
);
if let Some(tasks) = inner.pending_event_tasks.get_mut(&event_name)
&& !tasks.is_empty()
{
let task = tasks
.pop_front()
.expect("pending event task queue is not empty");
if task.is_complete() {
tracing::debug!(
instance_id = %instance_id,
event_name = %e.name,
"Skipping duplicate task completion"
);
return;
}
task.complete_with_phase(e.input.clone(), during_replay);
return;
}
if inner.buffered_events.len() >= inner.config.max_event_names
&& !inner.buffered_events.contains_key(&event_name)
{
tracing::warn!(
instance_id = %instance_id,
event_name = %e.name,
"Event name limit reached, discarding event"
);
return;
}
let max_events = inner.config.max_events_per_name;
let events = inner.buffered_events.entry(event_name).or_default();
if events.len() >= max_events {
tracing::warn!(
instance_id = %instance_id,
event_name = %e.name,
"Event buffer limit reached, discarding event"
);
return;
}
events.push_back((e.input.clone(), during_replay));
}
EventType::ExecutionSuspended(_) => {
tracing::info!(instance_id = %instance_id, "Orchestration suspended");
inner.is_suspended = true;
}
EventType::ExecutionResumed(_) => {
tracing::info!(instance_id = %instance_id, "Orchestration resumed");
inner.is_suspended = false;
}
EventType::ExecutionTerminated(e) => {
tracing::info!(instance_id = %instance_id, "Orchestration terminated");
inner.is_complete = true;
inner.completion_status = Some(OrchestrationStatus::Terminated);
inner.completion_result = e.input.clone();
inner.pending_actions.clear();
}
EventType::ExecutionCompleted(_)
| EventType::WorkflowCompleted(_)
| EventType::EventSent(_)
| EventType::ContinueAsNew(_)
| EventType::ExecutionStalled(_)
| EventType::DetachedWorkflowInstanceCreated(_) => {}
}
}
fn make_complete_action(
id: i32,
status: proto::OrchestrationStatus,
result: Option<String>,
carryover_events: Vec<proto::HistoryEvent>,
failure: Option<FailureDetails>,
) -> proto::WorkflowAction {
proto::WorkflowAction {
id,
router: None,
workflow_action_type: Some(
proto::workflow_action::WorkflowActionType::CompleteWorkflow(
proto::CompleteWorkflowAction {
workflow_status: status as i32,
result,
details: None,
new_version: None,
carryover_events,
failure_details: failure.map(|f| proto::TaskFailureDetails {
error_type: f.error_type,
error_message: f.message,
stack_trace: f.stack_trace,
inner_failure: None,
is_non_retriable: false,
}),
},
),
),
}
}
fn build_response(
ctx: &OrchestrationContext,
instance_id: &str,
completion_token: String,
) -> proto::WorkflowResponse {
let mut inner = lock_inner(&ctx.inner);
let mut actions = std::mem::take(&mut inner.pending_actions);
if let Some(new_input) = inner.continue_as_new_input.take() {
let mut carryover_events = Vec::new();
if inner.save_events_on_continue {
for (name, events) in &inner.buffered_events {
for (input, _during_replay) in events {
carryover_events.push(proto::HistoryEvent {
event_id: -1,
timestamp: None,
router: None,
event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
name: name.clone(),
input: input.clone(),
})),
});
}
}
}
actions.push(Self::make_complete_action(
actions.len() as i32,
proto::OrchestrationStatus::ContinuedAsNew,
Some(new_input),
carryover_events,
None,
));
} else if let Some(status) = inner.completion_status {
match status {
OrchestrationStatus::Completed => {
actions.push(Self::make_complete_action(
actions.len() as i32,
proto::OrchestrationStatus::Completed,
inner.completion_result.take(),
Vec::new(),
None,
));
}
OrchestrationStatus::Failed => {
let failure = inner.completion_failure.take();
actions.push(Self::make_complete_action(
actions.len() as i32,
proto::OrchestrationStatus::Failed,
None,
Vec::new(),
failure,
));
}
OrchestrationStatus::Terminated => {
actions.push(Self::make_complete_action(
actions.len() as i32,
proto::OrchestrationStatus::Terminated,
inner.completion_result.take(),
Vec::new(),
None,
));
}
_ => {
}
}
}
proto::WorkflowResponse {
instance_id: instance_id.to_string(),
actions,
custom_status: inner.custom_status.take(),
completion_token,
num_events_processed: None,
version: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::internal::to_timestamp;
use crate::proto::history_event::EventType;
use std::sync::Arc;
fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: 1,
timestamp: Some(to_timestamp(ts)),
router: None,
event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
version: None,
})),
}
}
fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: 2,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
name: name.to_string(),
version: None,
input,
workflow_instance: None,
parent_instance: None,
scheduled_start_timestamp: None,
parent_trace_context: None,
workflow_span_id: None,
tags: Default::default(),
})),
}
}
fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
name: name.to_string(),
version: None,
input: None,
parent_trace_context: None,
task_execution_id: String::new(),
rerun_parent_instance_info: None,
history_propagation_scope: None,
})),
}
}
fn make_task_completed(
event_id: i32,
task_scheduled_id: i32,
result: Option<String>,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
task_scheduled_id,
result,
task_execution_id: String::new(),
attestation: None,
signer_certificate: None,
})),
}
}
fn make_task_failed(event_id: i32, task_scheduled_id: i32) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
task_scheduled_id,
failure_details: Some(proto::TaskFailureDetails {
error_type: "TestError".to_string(),
error_message: "test failure".to_string(),
stack_trace: None,
inner_failure: None,
is_non_retriable: false,
}),
task_execution_id: String::new(),
attestation: None,
signer_certificate: None,
})),
}
}
#[tokio::test]
async fn test_simple_orchestrator_completes() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
let ts = chrono::Utc::now();
let old_events = vec![make_workflow_started(ts)];
let new_events = vec![make_execution_started("test_orch", None)];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
assert_eq!(resp.instance_id, "inst-1");
let complete_action = resp.actions.iter().find(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
});
assert!(complete_action.is_some());
if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
&complete_action.unwrap().workflow_action_type
{
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"done\"".to_string()));
}
}
#[tokio::test]
async fn test_orchestrator_with_activity_replay() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("greet", "world").await?;
Ok(result)
})
});
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("test_orch", None),
make_task_scheduled(3, "greet"),
make_task_completed(4, 0, Some("\"hello world\"".to_string())),
];
let new_events = vec![];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
let complete_action = resp.actions.iter().find(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
});
assert!(complete_action.is_some());
if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
&complete_action.unwrap().workflow_action_type
{
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"hello world\"".to_string()));
}
}
#[tokio::test]
async fn test_orchestrator_pending_activity() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("greet", "world").await?;
Ok(result)
})
});
let ts = chrono::Utc::now();
let old_events = vec![make_workflow_started(ts)];
let new_events = vec![make_execution_started("test_orch", None)];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
let has_schedule = resp.actions.iter().any(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::ScheduleTask(_))
)
});
assert!(has_schedule);
let has_complete = resp.actions.iter().any(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
});
assert!(!has_complete);
}
#[tokio::test]
async fn test_orchestrator_task_failure() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("greet", "world").await?;
Ok(result)
})
});
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("test_orch", None),
make_task_scheduled(3, "greet"),
make_task_failed(4, 0),
];
let new_events = vec![];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
let complete_action = resp.actions.iter().find(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
});
assert!(complete_action.is_some());
if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
&complete_action.unwrap().workflow_action_type
{
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
assert!(cw.failure_details.is_some());
}
}
#[tokio::test]
async fn test_suspended_orchestration_not_run() {
let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
let ts = chrono::Utc::now();
let old_events = vec![make_workflow_started(ts)];
let new_events = vec![
make_execution_started("test_orch", None),
proto::HistoryEvent {
event_id: 3,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::ExecutionSuspended(
proto::ExecutionSuspendedEvent {
input: Some("paused".to_string()),
},
)),
},
];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
assert!(resp.actions.is_empty());
}
#[tokio::test]
async fn test_terminated_orchestration_not_run() {
let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
let ts = chrono::Utc::now();
let old_events = vec![make_workflow_started(ts)];
let new_events = vec![
make_execution_started("test_orch", None),
proto::HistoryEvent {
event_id: 3,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::ExecutionTerminated(
proto::ExecutionTerminatedEvent {
input: None,
recurse: false,
},
)),
},
];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
assert_eq!(resp.actions.len(), 1);
match &resp.actions[0].workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Terminated as i32
);
assert!(cw.result.is_none());
}
other => panic!("expected CompleteWorkflow, got {other:?}"),
}
}
#[tokio::test]
async fn test_continue_as_new() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.continue_as_new("new_input", false);
Ok(None)
})
});
let ts = chrono::Utc::now();
let old_events = vec![make_workflow_started(ts)];
let new_events = vec![make_execution_started("test_orch", None)];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
let complete_action = resp.actions.iter().find(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
});
assert!(complete_action.is_some());
if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
&complete_action.unwrap().workflow_action_type
{
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::ContinuedAsNew as i32
);
assert_eq!(cw.result, Some("\"new_input\"".to_string()));
}
}
#[tokio::test]
async fn test_external_event_delivery() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.wait_for_external_event("approval").await?;
Ok(result)
})
});
let ts = chrono::Utc::now();
let old_events = vec![make_workflow_started(ts)];
let new_events = vec![
make_execution_started("test_orch", None),
proto::HistoryEvent {
event_id: 3,
timestamp: Some(to_timestamp(chrono::Utc::now())),
router: None,
event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
name: "approval".to_string(),
input: Some("\"yes\"".to_string()),
})),
},
];
let resp = OrchestrationExecutor::execute(
&orch_fn,
"inst-1",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
let complete_action = resp.actions.iter().find(|a| {
matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
});
assert!(complete_action.is_some());
if let Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) =
&complete_action.unwrap().workflow_action_type
{
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"yes\"".to_string()));
}
}
}