use std::sync::Arc;
use dapr_durabletask::api::DurableTaskError;
use dapr_durabletask::task::{when_all, when_any};
use dapr_durabletask::worker::{OrchestrationExecutor, OrchestratorFn, WorkerOptions};
use dapr_durabletask_proto as proto;
use dapr_durabletask_proto::history_event::EventType;
fn ts_now() -> chrono::DateTime<chrono::Utc> {
chrono::Utc::now()
}
fn to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> proto::prost_types::Timestamp {
proto::prost_types::Timestamp {
seconds: dt.timestamp(),
nanos: dt.timestamp_subsec_nanos() as i32,
}
}
fn make_workflow_started(ts: chrono::DateTime<chrono::Utc>) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: 1,
timestamp: Some(to_timestamp(ts)),
router: None,
event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
version: None,
})),
}
}
fn make_execution_started(name: &str, input: Option<String>) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: 2,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ExecutionStarted(proto::ExecutionStartedEvent {
name: name.to_string(),
version: None,
input,
workflow_instance: None,
parent_instance: None,
scheduled_start_timestamp: None,
parent_trace_context: None,
workflow_span_id: None,
tags: Default::default(),
})),
}
}
fn make_task_scheduled(event_id: i32, name: &str) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::TaskScheduled(proto::TaskScheduledEvent {
name: name.to_string(),
version: None,
input: None,
parent_trace_context: None,
task_execution_id: String::new(),
rerun_parent_instance_info: None,
history_propagation_scope: None,
})),
}
}
fn make_task_completed(
event_id: i32,
task_scheduled_id: i32,
result: Option<String>,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::TaskCompleted(proto::TaskCompletedEvent {
task_scheduled_id,
result,
task_execution_id: String::new(),
attestation: None,
signer_certificate: None,
})),
}
}
fn make_task_failed(
event_id: i32,
task_scheduled_id: i32,
error_type: &str,
message: &str,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::TaskFailed(proto::TaskFailedEvent {
task_scheduled_id,
failure_details: Some(proto::TaskFailureDetails {
error_type: error_type.to_string(),
error_message: message.to_string(),
stack_trace: None,
inner_failure: None,
is_non_retriable: false,
}),
task_execution_id: String::new(),
attestation: None,
signer_certificate: None,
})),
}
}
fn make_timer_created(
event_id: i32,
fire_at: chrono::DateTime<chrono::Utc>,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::TimerCreated(proto::TimerCreatedEvent {
fire_at: Some(to_timestamp(fire_at)),
name: None,
rerun_parent_instance_info: None,
origin: None,
})),
}
}
fn make_timer_fired(event_id: i32, timer_id: i32) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::TimerFired(proto::TimerFiredEvent {
fire_at: None,
timer_id,
})),
}
}
fn make_sub_orchestration_created(
event_id: i32,
name: &str,
instance_id: &str,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ChildWorkflowInstanceCreated(
proto::ChildWorkflowInstanceCreatedEvent {
instance_id: instance_id.to_string(),
name: name.to_string(),
version: None,
input: None,
parent_trace_context: None,
rerun_parent_instance_info: None,
history_propagation_scope: None,
},
)),
}
}
fn make_sub_orchestration_completed(
event_id: i32,
task_scheduled_id: i32,
result: Option<String>,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ChildWorkflowInstanceCompleted(
proto::ChildWorkflowInstanceCompletedEvent {
task_scheduled_id,
result,
attestation: None,
signer_certificate: None,
},
)),
}
}
fn make_sub_orchestration_failed(
event_id: i32,
task_scheduled_id: i32,
error_type: &str,
message: &str,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ChildWorkflowInstanceFailed(
proto::ChildWorkflowInstanceFailedEvent {
task_scheduled_id,
failure_details: Some(proto::TaskFailureDetails {
error_type: error_type.to_string(),
error_message: message.to_string(),
stack_trace: None,
inner_failure: None,
is_non_retriable: false,
}),
attestation: None,
signer_certificate: None,
},
)),
}
}
fn make_event_raised(name: &str, input: Option<String>) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: -1,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::EventRaised(proto::EventRaisedEvent {
name: name.to_string(),
input,
})),
}
}
fn make_suspended() -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: -1,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ExecutionSuspended(
proto::ExecutionSuspendedEvent { input: None },
)),
}
}
fn make_resumed() -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: -1,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ExecutionResumed(proto::ExecutionResumedEvent {
input: None,
})),
}
}
fn make_terminated(output: Option<String>) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: -1,
timestamp: Some(to_timestamp(ts_now())),
router: None,
event_type: Some(EventType::ExecutionTerminated(
proto::ExecutionTerminatedEvent {
input: output,
recurse: false,
},
)),
}
}
fn get_complete_action(
actions: &[proto::WorkflowAction],
) -> Option<&proto::CompleteWorkflowAction> {
actions.iter().find_map(|a| match &a.workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => Some(cw),
_ => None,
})
}
fn get_schedule_actions(actions: &[proto::WorkflowAction]) -> Vec<&proto::ScheduleTaskAction> {
actions
.iter()
.filter_map(|a| match &a.workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::ScheduleTask(st)) => Some(st),
_ => None,
})
.collect()
}
fn get_timer_actions(actions: &[proto::WorkflowAction]) -> Vec<&proto::CreateTimerAction> {
actions
.iter()
.filter_map(|a| match &a.workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::CreateTimer(ct)) => Some(ct),
_ => None,
})
.collect()
}
fn get_child_workflow_actions(
actions: &[proto::WorkflowAction],
) -> Vec<&proto::CreateChildWorkflowAction> {
actions
.iter()
.filter_map(|a| match &a.workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::CreateChildWorkflow(cw)) => Some(cw),
_ => None,
})
.collect()
}
async fn run_executor(
orch_fn: &OrchestratorFn,
old_events: Vec<proto::HistoryEvent>,
new_events: Vec<proto::HistoryEvent>,
) -> dapr_durabletask::api::Result<proto::WorkflowResponse> {
OrchestrationExecutor::execute(
orch_fn,
"test-instance",
old_events,
new_events,
String::new(),
&WorkerOptions::default(),
None,
)
.await
}
#[tokio::test]
async fn test_empty_orchestration() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
assert_eq!(resp.instance_id, "test-instance");
let cw = get_complete_action(&resp.actions).expect("should have CompleteWorkflow");
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"done\"".to_string()));
}
#[tokio::test]
async fn test_orchestration_with_input() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let input: String = ctx.input().unwrap();
Ok(Some(format!("\"got: {input}\"")))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started(
"test_orch",
Some("\"hello world\"".to_string()),
)],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"got: hello world\"".to_string()));
}
#[tokio::test]
async fn test_orchestration_with_no_output() {
let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { Ok(None) }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, None);
}
#[tokio::test]
async fn test_single_activity_scheduling() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("greet", "world").await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let schedules = get_schedule_actions(&resp.actions);
assert_eq!(schedules.len(), 1);
assert_eq!(schedules[0].name, "greet");
assert_eq!(schedules[0].input, Some("\"world\"".to_string()));
assert!(get_complete_action(&resp.actions).is_none());
}
#[tokio::test]
async fn test_single_activity_completion() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("greet", "world").await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "greet"),
make_task_completed(4, 0, Some("\"hello world\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"hello world\"".to_string()));
}
#[tokio::test]
async fn test_activity_sequence() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let a = ctx.call_activity("step_a", ()).await?;
let b = ctx.call_activity("step_b", ()).await?;
let c = ctx.call_activity("step_c", ()).await?;
Ok(c.or(b).or(a))
})
});
let resp1 = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let schedules1 = get_schedule_actions(&resp1.actions);
assert_eq!(schedules1.len(), 1);
assert_eq!(schedules1[0].name, "step_a");
assert!(get_complete_action(&resp1.actions).is_none());
let resp2 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "step_a"),
make_task_completed(4, 0, Some("\"result_a\"".to_string())),
],
vec![],
)
.await
.unwrap();
let schedules2 = get_schedule_actions(&resp2.actions);
assert_eq!(schedules2.len(), 1);
assert_eq!(schedules2[0].name, "step_b");
assert!(get_complete_action(&resp2.actions).is_none());
let resp3 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "step_a"),
make_task_completed(4, 0, Some("\"result_a\"".to_string())),
make_task_scheduled(5, "step_b"),
make_task_completed(6, 1, Some("\"result_b\"".to_string())),
],
vec![],
)
.await
.unwrap();
let schedules3 = get_schedule_actions(&resp3.actions);
assert_eq!(schedules3.len(), 1);
assert_eq!(schedules3[0].name, "step_c");
assert!(get_complete_action(&resp3.actions).is_none());
let resp4 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "step_a"),
make_task_completed(4, 0, Some("\"result_a\"".to_string())),
make_task_scheduled(5, "step_b"),
make_task_completed(6, 1, Some("\"result_b\"".to_string())),
make_task_scheduled(7, "step_c"),
make_task_completed(8, 2, Some("\"result_c\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp4.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"result_c\"".to_string()));
}
#[tokio::test]
async fn test_activity_failure_propagation() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("flaky", ()).await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "flaky"),
make_task_failed(4, 0, "ActivityError", "something went wrong"),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
let fd = cw.failure_details.as_ref().unwrap();
assert_eq!(fd.error_type, "ActivityError");
assert_eq!(fd.error_message, "something went wrong");
}
#[tokio::test]
async fn test_activity_failure_caught() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("flaky", ()).await;
match result {
Ok(v) => Ok(v),
Err(DurableTaskError::TaskFailed { message, .. }) => {
Ok(Some(format!("\"caught: {message}\"")))
}
Err(e) => Err(e),
}
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "flaky"),
make_task_failed(4, 0, "TestError", "boom"),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"caught: boom\"".to_string()));
}
#[tokio::test]
async fn test_timer_scheduling() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.create_timer(std::time::Duration::from_secs(60)).await?;
Ok(Some("\"timer done\"".to_string()))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let timers = get_timer_actions(&resp.actions);
assert_eq!(timers.len(), 1);
assert!(timers[0].fire_at.is_some());
assert!(get_complete_action(&resp.actions).is_none());
}
#[tokio::test]
async fn test_timer_completion() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.create_timer(std::time::Duration::from_secs(60)).await?;
Ok(Some("\"timer done\"".to_string()))
})
});
let fire_at = ts_now() + chrono::Duration::seconds(60);
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_timer_created(3, fire_at),
make_timer_fired(4, 0),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"timer done\"".to_string()));
}
#[tokio::test]
async fn test_sub_orchestration_scheduling() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx
.call_sub_orchestrator("child_orch", "child_input", Some("child-1"))
.await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let children = get_child_workflow_actions(&resp.actions);
assert_eq!(children.len(), 1);
assert_eq!(children[0].name, "child_orch");
assert_eq!(children[0].instance_id, "child-1");
assert_eq!(children[0].input, Some("\"child_input\"".to_string()));
assert!(get_complete_action(&resp.actions).is_none());
}
#[tokio::test]
async fn test_sub_orchestration_completion() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx
.call_sub_orchestrator("child_orch", (), Some("child-1"))
.await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_sub_orchestration_created(3, "child_orch", "child-1"),
make_sub_orchestration_completed(4, 0, Some("\"child result\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"child result\"".to_string()));
}
#[tokio::test]
async fn test_sub_orchestration_failure() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx
.call_sub_orchestrator("child_orch", (), Some("child-1"))
.await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_sub_orchestration_created(3, "child_orch", "child-1"),
make_sub_orchestration_failed(4, 0, "ChildError", "child failed"),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
let fd = cw.failure_details.as_ref().unwrap();
assert_eq!(fd.error_type, "ChildError");
assert_eq!(fd.error_message, "child failed");
}
#[tokio::test]
async fn test_external_event_received() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.wait_for_external_event("approval").await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_event_raised("approval", Some("\"approved\"".to_string())),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"approved\"".to_string()));
}
#[tokio::test]
async fn test_external_event_buffered() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.wait_for_external_event("approval").await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_event_raised("approval", Some("\"pre-buffered\"".to_string())),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"pre-buffered\"".to_string()));
}
#[tokio::test]
async fn test_external_event_case_insensitive() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.wait_for_external_event("approval").await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_event_raised("APPROVAL", Some("\"yes\"".to_string())),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"yes\"".to_string()));
}
#[tokio::test]
async fn test_multiple_external_events() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let a = ctx.wait_for_external_event("event_a").await?;
let b = ctx.wait_for_external_event("event_b").await?;
let c = ctx.wait_for_external_event("event_c").await?;
let combined = format!(
"\"{},{},{}\"",
a.as_deref().unwrap_or(""),
b.as_deref().unwrap_or(""),
c.as_deref().unwrap_or("")
);
Ok(Some(combined))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_event_raised("event_a", Some("\"A\"".to_string())),
make_event_raised("event_b", Some("\"B\"".to_string())),
make_event_raised("event_c", Some("\"C\"".to_string())),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"\"A\",\"B\",\"C\"\"".to_string()));
}
#[tokio::test]
async fn test_fan_out_scheduling() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let mut tasks = Vec::new();
for i in 0..5 {
tasks.push(ctx.call_activity("worker", i));
}
let results = when_all(tasks).await?;
Ok(Some(format!("{}", results.len())))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let schedules = get_schedule_actions(&resp.actions);
assert_eq!(schedules.len(), 5);
for s in &schedules {
assert_eq!(s.name, "worker");
}
assert!(get_complete_action(&resp.actions).is_none());
}
#[tokio::test]
async fn test_fan_out_fan_in_completion() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let mut tasks = Vec::new();
for i in 0..5 {
tasks.push(ctx.call_activity("worker", i));
}
let results = when_all(tasks).await?;
let count = results.iter().filter(|r| r.is_some()).count();
Ok(Some(format!("{count}")))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "worker"),
make_task_completed(4, 0, Some("\"r0\"".to_string())),
make_task_scheduled(5, "worker"),
make_task_completed(6, 1, Some("\"r1\"".to_string())),
make_task_scheduled(7, "worker"),
make_task_completed(8, 2, Some("\"r2\"".to_string())),
make_task_scheduled(9, "worker"),
make_task_completed(10, 3, Some("\"r3\"".to_string())),
make_task_scheduled(11, "worker"),
make_task_completed(12, 4, Some("\"r4\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("5".to_string()));
}
#[tokio::test]
async fn test_fan_out_partial_failure() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let mut tasks = Vec::new();
for i in 0..5 {
tasks.push(ctx.call_activity("worker", i));
}
let results = when_all(tasks).await?;
Ok(Some(format!("{}", results.len())))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "worker"),
make_task_completed(4, 0, Some("\"r0\"".to_string())),
make_task_scheduled(5, "worker"),
make_task_completed(6, 1, Some("\"r1\"".to_string())),
make_task_scheduled(7, "worker"),
make_task_failed(8, 2, "WorkerError", "worker 2 crashed"),
make_task_scheduled(9, "worker"),
make_task_completed(10, 3, Some("\"r3\"".to_string())),
make_task_scheduled(11, "worker"),
make_task_completed(12, 4, Some("\"r4\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
let fd = cw.failure_details.as_ref().unwrap();
assert_eq!(fd.error_message, "worker 2 crashed");
}
#[tokio::test]
async fn test_when_any_first_completes() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let t0 = ctx.call_activity("slow", ());
let t1 = ctx.call_activity("fast", ());
let t2 = ctx.call_activity("medium", ());
let winner = when_any(vec![t0, t1, t2]).await?;
Ok(Some(format!("{winner}")))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "slow"),
make_task_scheduled(4, "fast"),
make_task_completed(5, 1, Some("\"fast result\"".to_string())),
make_task_scheduled(6, "medium"),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("1".to_string()));
}
#[tokio::test]
async fn test_when_any_with_timer_timeout() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let activity_task = ctx.call_activity("slow_activity", ());
let timer_task = ctx.create_timer(std::time::Duration::from_secs(30));
let winner = when_any(vec![activity_task, timer_task]).await?;
Ok(Some(format!("{winner}")))
})
});
let fire_at = ts_now() + chrono::Duration::seconds(30);
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "slow_activity"),
make_timer_created(4, fire_at),
make_timer_fired(5, 1),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("1".to_string()));
}
#[tokio::test]
async fn test_continue_as_new() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.continue_as_new("next_iteration", false);
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::ContinuedAsNew as i32
);
assert_eq!(cw.result, Some("\"next_iteration\"".to_string()));
assert!(cw.carryover_events.is_empty());
}
#[tokio::test]
async fn test_continue_as_new_with_save_events() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.continue_as_new("next", true);
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_event_raised("pending_event", Some("\"data\"".to_string())),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::ContinuedAsNew as i32
);
assert!(!cw.carryover_events.is_empty());
let carryover = &cw.carryover_events[0];
match &carryover.event_type {
Some(EventType::EventRaised(e)) => {
assert_eq!(e.name, "pending_event");
assert_eq!(e.input, Some("\"data\"".to_string()));
}
_ => panic!("expected EventRaised carryover"),
}
}
#[tokio::test]
async fn test_suspend_prevents_execution() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { panic!("should not execute when suspended") }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None), make_suspended()],
)
.await
.unwrap();
assert!(resp.actions.is_empty());
}
#[tokio::test]
async fn test_suspend_and_resume() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { Ok(Some("\"resumed and done\"".to_string())) }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_suspended(),
make_resumed(),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"resumed and done\"".to_string()));
}
#[tokio::test]
async fn test_terminate_prevents_execution() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { panic!("should not execute when terminated") }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_terminated(None),
],
)
.await
.unwrap();
assert_eq!(resp.actions.len(), 1);
match &resp.actions[0].workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Terminated as i32
);
}
other => panic!("expected CompleteWorkflow, got {other:?}"),
}
}
#[tokio::test]
async fn test_custom_status() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.set_custom_status("step 1 of 3");
Ok(Some("\"done\"".to_string()))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
assert_eq!(resp.custom_status, Some("step 1 of 3".to_string()));
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
}
#[tokio::test]
async fn test_activity_error_handling_with_catch() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("risky_operation", ()).await;
match result {
Ok(v) => Ok(v),
Err(DurableTaskError::TaskFailed { .. }) => {
let compensate = ctx.call_activity("compensate", ()).await?;
Ok(compensate)
}
Err(e) => Err(e),
}
})
});
let resp1 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "risky_operation"),
make_task_failed(4, 0, "RiskyError", "it broke"),
],
vec![],
)
.await
.unwrap();
let schedules = get_schedule_actions(&resp1.actions);
assert_eq!(schedules.len(), 1);
assert_eq!(schedules[0].name, "compensate");
assert!(get_complete_action(&resp1.actions).is_none());
let resp2 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "risky_operation"),
make_task_failed(4, 0, "RiskyError", "it broke"),
make_task_scheduled(5, "compensate"),
make_task_completed(6, 1, Some("\"compensated\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp2.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"compensated\"".to_string()));
}
#[tokio::test]
async fn test_multi_round_replay() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let a = ctx.call_activity("activity_a", "input_a").await?;
let b = ctx
.call_activity("activity_b", a.as_deref().unwrap_or(""))
.await?;
Ok(b)
})
});
let resp1 = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let s1 = get_schedule_actions(&resp1.actions);
assert_eq!(s1.len(), 1);
assert_eq!(s1[0].name, "activity_a");
let resp2 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "activity_a"),
make_task_completed(4, 0, Some("\"result_a\"".to_string())),
],
vec![],
)
.await
.unwrap();
let s2 = get_schedule_actions(&resp2.actions);
assert_eq!(s2.len(), 1);
assert_eq!(s2[0].name, "activity_b");
assert!(get_complete_action(&resp2.actions).is_none());
let resp3 = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "activity_a"),
make_task_completed(4, 0, Some("\"result_a\"".to_string())),
make_task_scheduled(5, "activity_b"),
make_task_completed(6, 1, Some("\"final_result\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp3.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"final_result\"".to_string()));
}
#[tokio::test]
async fn test_orchestrator_context_accessors() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let name = ctx.name();
let iid = ctx.instance_id();
Ok(Some(format!("\"{name}:{iid}\"")))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("my_orch", None)],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(cw.result, Some("\"my_orch:test-instance\"".to_string()));
}
#[tokio::test]
async fn test_activity_with_new_event_completion() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("greet", ()).await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "greet"),
],
vec![make_task_completed(4, 0, Some("\"new hello\"".to_string()))],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"new hello\"".to_string()));
}
#[tokio::test]
async fn test_multiple_suspend_resume_cycles() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { Ok(Some("\"alive\"".to_string())) }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_suspended(),
make_resumed(),
make_suspended(),
make_resumed(),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
}
#[tokio::test]
async fn test_orchestrator_error_becomes_failed() {
let orch_fn: OrchestratorFn = Arc::new(|_ctx| {
Box::pin(async { Err(DurableTaskError::Other("unexpected crash".to_string())) })
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
let fd = cw.failure_details.as_ref().unwrap();
assert_eq!(fd.error_type, "OrchestratorError");
assert!(fd.error_message.contains("unexpected crash"));
}
#[tokio::test]
async fn test_timer_and_activity_sequence() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let _a = ctx.call_activity("step1", ()).await?;
ctx.create_timer(std::time::Duration::from_secs(10)).await?;
let b = ctx.call_activity("step2", ()).await?;
Ok(b)
})
});
let fire_at = ts_now() + chrono::Duration::seconds(10);
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "step1"),
make_task_completed(4, 0, Some("\"s1\"".to_string())),
make_timer_created(5, fire_at),
make_timer_fired(6, 1),
make_task_scheduled(7, "step2"),
make_task_completed(8, 2, Some("\"s2\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"s2\"".to_string()));
}
#[tokio::test]
async fn test_fan_out_fan_in_with_when_all_empty() {
let orch_fn: OrchestratorFn = Arc::new(|_ctx| {
Box::pin(async move {
let results = when_all(vec![]).await?;
Ok(Some(format!("{}", results.len())))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("0".to_string()));
}
#[tokio::test]
async fn test_event_not_yet_received() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.wait_for_external_event("approval").await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
assert!(get_complete_action(&resp.actions).is_none());
assert!(resp.actions.is_empty());
}
#[tokio::test]
async fn test_activity_with_struct_input() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
#[derive(serde::Serialize)]
struct Input {
x: i32,
y: i32,
}
let _task = ctx.call_activity("add", Input { x: 1, y: 2 });
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let schedules = get_schedule_actions(&resp.actions);
assert_eq!(schedules.len(), 1);
assert_eq!(schedules[0].name, "add");
assert_eq!(schedules[0].input, Some("{\"x\":1,\"y\":2}".to_string()));
}
#[tokio::test]
async fn test_completion_token_preserved() {
let orch_fn: OrchestratorFn =
Arc::new(|_ctx| Box::pin(async { Ok(Some("\"done\"".to_string())) }));
let resp = OrchestrationExecutor::execute(
&orch_fn,
"test-instance",
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
"my-token-123".to_string(),
&WorkerOptions::default(),
None,
)
.await
.unwrap();
assert_eq!(resp.completion_token, "my-token-123");
}
#[tokio::test]
async fn test_action_ids_are_sequential() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let _a = ctx.call_activity("a", ());
let _b = ctx.call_activity("b", ());
let _c = ctx.call_activity("c", ());
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
for (i, action) in resp.actions.iter().enumerate() {
assert_eq!(action.id, i as i32, "Action {i} should have id {i}");
}
}
#[tokio::test]
async fn test_sub_orchestration_with_auto_instance_id() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let _task = ctx.call_sub_orchestrator("child_orch", (), None);
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![make_execution_started("test_orch", None)],
)
.await
.unwrap();
let children = get_child_workflow_actions(&resp.actions);
assert_eq!(children.len(), 1);
assert_eq!(children[0].name, "child_orch");
assert!(!children[0].instance_id.is_empty());
}
#[tokio::test]
async fn test_continue_as_new_with_activity_before() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.call_activity("get_count", ()).await?;
let count: i32 = serde_json::from_str(result.as_deref().unwrap_or("0")).unwrap_or(0);
if count < 3 {
ctx.continue_as_new(count + 1, false);
}
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "get_count"),
make_task_completed(4, 0, Some("1".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::ContinuedAsNew as i32
);
assert_eq!(cw.result, Some("2".to_string()));
}
#[tokio::test]
async fn test_external_event_with_null_data() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let result = ctx.wait_for_external_event("signal").await?;
Ok(Some(format!("{}", result.is_none())))
})
});
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_event_raised("signal", None),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("true".to_string()));
}
#[tokio::test]
async fn test_custom_status_updated_mid_execution() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
ctx.set_custom_status("starting");
let _ = ctx.call_activity("step1", ()).await?;
ctx.set_custom_status("step 1 done");
Ok(Some("\"done\"".to_string()))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "step1"),
make_task_completed(4, 0, Some("\"ok\"".to_string())),
],
vec![],
)
.await
.unwrap();
assert_eq!(resp.custom_status, Some("step 1 done".to_string()));
}
#[tokio::test]
async fn test_terminate_with_output() {
let orch_fn: OrchestratorFn = Arc::new(|_ctx| Box::pin(async { panic!("should not run") }));
let resp = run_executor(
&orch_fn,
vec![make_workflow_started(ts_now())],
vec![
make_execution_started("test_orch", None),
make_terminated(Some("\"terminated reason\"".to_string())),
],
)
.await
.unwrap();
assert_eq!(resp.actions.len(), 1);
match &resp.actions[0].workflow_action_type {
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(cw)) => {
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Terminated as i32
);
assert_eq!(cw.result, Some("\"terminated reason\"".to_string()));
}
other => panic!("expected CompleteWorkflow, got {other:?}"),
}
}
#[tokio::test]
async fn test_mixed_event_types_in_replay() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let a = ctx.call_activity("fetch_data", ()).await?;
ctx.create_timer(std::time::Duration::from_secs(5)).await?;
let evt = ctx.wait_for_external_event("user_input").await?;
let combined = format!(
"\"data={},event={}\"",
a.as_deref().unwrap_or(""),
evt.as_deref().unwrap_or("")
);
Ok(Some(combined))
})
});
let fire_at = ts_now() + chrono::Duration::seconds(5);
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "fetch_data"),
make_task_completed(4, 0, Some("\"fetched\"".to_string())),
make_timer_created(5, fire_at),
make_timer_fired(6, 1),
],
vec![
make_event_raised("user_input", Some("\"clicked\"".to_string())),
],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(
cw.result,
Some("\"data=\"fetched\",event=\"clicked\"\"".to_string())
);
}
#[tokio::test]
async fn test_when_any_activity_completes_before_timer() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let activity_task = ctx.call_activity("fast_activity", ());
let timer_task = ctx.create_timer(std::time::Duration::from_secs(60));
let winner = when_any(vec![activity_task, timer_task]).await?;
Ok(Some(format!("{winner}")))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "fast_activity"),
make_task_completed(4, 0, Some("\"fast\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("0".to_string()));
}
fn make_workflow_started_with_patches(
ts: chrono::DateTime<chrono::Utc>,
patches: Vec<String>,
) -> proto::HistoryEvent {
proto::HistoryEvent {
event_id: 1,
timestamp: Some(to_timestamp(ts)),
router: None,
event_type: Some(EventType::WorkflowStarted(proto::WorkflowStartedEvent {
version: Some(proto::WorkflowVersion {
patches,
name: None,
}),
})),
}
}
#[tokio::test]
async fn test_is_patched_new_execution_applies_patch() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
if ctx.is_patched("new-feature") {
Ok(Some("patched".to_string()))
} else {
Ok(Some("unpatched".to_string()))
}
})
});
let resp = run_executor(&orch_fn, vec![], vec![]).await.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(cw.result, Some("patched".to_string()));
}
#[tokio::test]
async fn test_is_patched_mid_replay_uses_unpatched_path() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
if ctx.is_patched("new-feature") {
ctx.call_activity("new_act", ()).await?;
} else {
ctx.call_activity("old_act", ()).await?;
}
Ok(Some("done".to_string()))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "old_act"),
make_task_completed(4, 0, Some("\"ok\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("done".to_string()));
}
#[tokio::test]
async fn test_is_patched_history_patch_applies() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
if ctx.is_patched("new-feature") {
ctx.call_activity("new_act", ()).await?;
} else {
ctx.call_activity("old_act", ()).await?;
}
Ok(Some("done".to_string()))
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started_with_patches(ts_now(), vec!["new-feature".to_string()]),
make_execution_started("test_orch", None),
make_task_scheduled(3, "new_act"),
make_task_completed(4, 0, Some("\"ok\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("done".to_string()));
}
use dapr_durabletask::api::RetryPolicy;
use dapr_durabletask::task::{ActivityOptions, SubOrchestratorOptions};
use std::time::Duration;
#[tokio::test]
async fn test_retry_activity_succeeds_on_second_attempt() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let opts = ActivityOptions::new()
.with_retry_policy(RetryPolicy::new(3, Duration::from_secs(1)));
let result = ctx.call_activity_with_options("flaky", (), opts).await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "flaky"),
make_task_failed(4, 0, "IOError", "transient"),
make_timer_created(5, ts_now() + chrono::Duration::seconds(1)),
make_timer_fired(6, 1),
make_task_scheduled(7, "flaky"),
make_task_completed(8, 2, Some("\"ok\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"ok\"".to_string()));
}
#[tokio::test]
async fn test_retry_activity_fails_after_max_attempts() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let opts = ActivityOptions::new()
.with_retry_policy(RetryPolicy::new(2, Duration::from_secs(1)));
ctx.call_activity_with_options("bad", (), opts).await?;
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "bad"),
make_task_failed(4, 0, "IOError", "still broken"),
make_timer_created(5, ts_now() + chrono::Duration::seconds(1)),
make_timer_fired(6, 1),
make_task_scheduled(7, "bad"),
make_task_failed(8, 2, "IOError", "still broken"),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
}
#[tokio::test]
async fn test_retry_activity_predicate_blocks_retry() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let policy = RetryPolicy::new(5, Duration::from_secs(1))
.with_handle(|details| details.error_type != "FatalError");
let opts = ActivityOptions::new().with_retry_policy(policy);
ctx.call_activity_with_options("fatal_act", (), opts)
.await?;
Ok(None)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "fatal_act"),
make_task_failed(4, 0, "FatalError", "cannot retry"),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Failed as i32
);
let non_complete: Vec<_> = resp
.actions
.iter()
.filter(|a| {
!matches!(
&a.workflow_action_type,
Some(proto::workflow_action::WorkflowActionType::CompleteWorkflow(_))
)
})
.collect();
assert!(non_complete.is_empty(), "expected no retry timer action");
}
#[tokio::test]
async fn test_retry_activity_predicate_allows_retry() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let policy = RetryPolicy::new(3, Duration::from_secs(1))
.with_handle(|details| details.error_type == "RetryableError");
let opts = ActivityOptions::new().with_retry_policy(policy);
let result = ctx
.call_activity_with_options("retryable_act", (), opts)
.await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "retryable_act"),
make_task_failed(4, 0, "RetryableError", "try again"),
make_timer_created(5, ts_now() + chrono::Duration::seconds(1)),
make_timer_fired(6, 1),
make_task_scheduled(7, "retryable_act"),
make_task_completed(8, 2, Some("\"recovered\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"recovered\"".to_string()));
}
#[tokio::test]
async fn test_retry_sub_orchestrator_succeeds_on_second_attempt() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let opts = SubOrchestratorOptions::new()
.with_instance_id("child-1".to_string())
.with_retry_policy(RetryPolicy::new(3, Duration::from_secs(2)));
let result = ctx
.call_sub_orchestrator_with_options("child_orch", (), opts)
.await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_sub_orchestration_created(3, "child_orch", "child-1"),
make_sub_orchestration_failed(4, 0, "ChildError", "child failed"),
make_timer_created(5, ts_now() + chrono::Duration::seconds(2)),
make_timer_fired(6, 1),
make_sub_orchestration_created(7, "child_orch", "child-1"),
make_sub_orchestration_completed(8, 2, Some("\"child_ok\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"child_ok\"".to_string()));
}
#[tokio::test]
async fn test_retry_no_retry_on_success() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let opts = ActivityOptions::new()
.with_retry_policy(RetryPolicy::new(5, Duration::from_secs(1)));
let result = ctx
.call_activity_with_options("instant_ok", (), opts)
.await?;
Ok(result)
})
});
let resp = run_executor(
&orch_fn,
vec![
make_workflow_started(ts_now()),
make_execution_started("test_orch", None),
make_task_scheduled(3, "instant_ok"),
make_task_completed(4, 0, Some("\"done\"".to_string())),
],
vec![],
)
.await
.unwrap();
let cw = get_complete_action(&resp.actions).unwrap();
assert_eq!(
cw.workflow_status,
proto::OrchestrationStatus::Completed as i32
);
assert_eq!(cw.result, Some("\"done\"".to_string()));
}
use dapr_durabletask::api::{HistoryPropagationScope, PropagatedHistory};
use dapr_durabletask_proto::prost::Message as _;
fn make_propagated_history(
scope: proto::HistoryPropagationScope,
chunks: Vec<(&str, &str, &str, Vec<i32>)>,
) -> proto::PropagatedHistory {
let chunks = chunks
.into_iter()
.map(|(app, inst, name, ev_ids)| proto::PropagatedHistoryChunk {
raw_events: ev_ids
.into_iter()
.map(|id| {
proto::HistoryEvent {
event_id: id,
timestamp: None,
router: None,
event_type: None,
}
.encode_to_vec()
})
.collect(),
app_id: app.to_string(),
instance_id: inst.to_string(),
workflow_name: name.to_string(),
raw_signatures: vec![],
signing_cert_chains: vec![],
})
.collect();
proto::PropagatedHistory {
scope: scope as i32,
chunks,
}
}
#[tokio::test]
async fn test_schedule_activity_emits_history_propagation_scope() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let _ = ctx
.call_activity_with_options(
"verify",
serde_json::Value::Null,
ActivityOptions::new()
.with_history_propagation(HistoryPropagationScope::Lineage),
)
.await;
Ok(None)
})
});
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("test", None),
];
let resp = run_executor(&orch_fn, old_events, vec![]).await.unwrap();
let scheduled = get_schedule_actions(&resp.actions);
assert_eq!(scheduled.len(), 1);
assert_eq!(
scheduled[0].history_propagation_scope,
Some(proto::HistoryPropagationScope::Lineage as i32)
);
}
#[tokio::test]
async fn test_schedule_child_workflow_emits_own_history_scope() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let _ = ctx
.call_sub_orchestrator_with_options(
"child",
serde_json::Value::Null,
SubOrchestratorOptions::new()
.with_instance_id("child-1")
.with_history_propagation(HistoryPropagationScope::OwnHistory),
)
.await;
Ok(None)
})
});
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("test", None),
];
let resp = run_executor(&orch_fn, old_events, vec![]).await.unwrap();
let children = get_child_workflow_actions(&resp.actions);
assert_eq!(children.len(), 1);
assert_eq!(
children[0].history_propagation_scope,
Some(proto::HistoryPropagationScope::OwnHistory as i32)
);
}
#[tokio::test]
async fn test_no_history_propagation_scope_when_unset() {
let orch_fn: OrchestratorFn = Arc::new(|ctx| {
Box::pin(async move {
let _ = ctx.call_activity("plain", "x").await;
Ok(None)
})
});
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("test", None),
];
let resp = run_executor(&orch_fn, old_events, vec![]).await.unwrap();
let scheduled = get_schedule_actions(&resp.actions);
assert_eq!(scheduled.len(), 1);
assert_eq!(scheduled[0].history_propagation_scope, None);
}
#[tokio::test]
async fn test_propagated_history_lineage_visible_to_child_workflow() {
use std::sync::{Arc as StdArc, Mutex as StdMutex};
let captured: StdArc<StdMutex<Option<StdArc<PropagatedHistory>>>> =
StdArc::new(StdMutex::new(None));
let captured_clone = captured.clone();
let orch_fn: OrchestratorFn = Arc::new(move |ctx| {
let captured = captured_clone.clone();
Box::pin(async move {
*captured.lock().unwrap() = ctx.propagated_history();
Ok(None)
})
});
let propagated = make_propagated_history(
proto::HistoryPropagationScope::Lineage,
vec![
("grandparent-app", "gp-inst", "Grandparent", vec![1, 2]),
("parent-app", "parent-inst", "Parent", vec![3, 4, 5]),
],
);
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("child", None),
];
let _resp = OrchestrationExecutor::execute(
&orch_fn,
"child-1",
old_events,
vec![],
String::new(),
&WorkerOptions::default(),
PropagatedHistory::from_proto(propagated),
)
.await
.unwrap();
let history = captured
.lock()
.unwrap()
.clone()
.expect("propagated history present");
assert_eq!(history.scope, HistoryPropagationScope::Lineage);
assert_eq!(history.chunks.len(), 2);
assert_eq!(history.events.len(), 5);
assert_eq!(
history.app_ids(),
vec!["grandparent-app".to_string(), "parent-app".to_string()]
);
assert_eq!(
history.workflow_by_name("Grandparent").unwrap().event_count,
2
);
assert_eq!(history.events_by_workflow_name("Parent").unwrap().len(), 3);
}
#[tokio::test]
async fn test_propagated_history_own_history_drops_ancestors() {
use std::sync::{Arc as StdArc, Mutex as StdMutex};
let captured: StdArc<StdMutex<Option<StdArc<PropagatedHistory>>>> =
StdArc::new(StdMutex::new(None));
let captured_clone = captured.clone();
let orch_fn: OrchestratorFn = Arc::new(move |ctx| {
let captured = captured_clone.clone();
Box::pin(async move {
*captured.lock().unwrap() = ctx.propagated_history();
Ok(None)
})
});
let propagated = make_propagated_history(
proto::HistoryPropagationScope::OwnHistory,
vec![("parent-app", "parent-inst", "Parent", vec![10, 11])],
);
let ts = chrono::Utc::now();
let old_events = vec![
make_workflow_started(ts),
make_execution_started("child", None),
];
let _resp = OrchestrationExecutor::execute(
&orch_fn,
"child-1",
old_events,
vec![],
String::new(),
&WorkerOptions::default(),
PropagatedHistory::from_proto(propagated),
)
.await
.unwrap();
let history = captured
.lock()
.unwrap()
.clone()
.expect("propagated history present");
assert_eq!(history.scope, HistoryPropagationScope::OwnHistory);
assert_eq!(history.chunks.len(), 1);
assert_eq!(history.app_ids(), vec!["parent-app".to_string()]);
assert!(
history.workflow_by_name("Grandparent").is_err(),
"OwnHistory must drop ancestor chunks"
);
assert_eq!(history.events_by_app_id("parent-app").unwrap().len(), 2);
}
#[tokio::test]
async fn test_propagated_history_absent_returns_none() {
use std::sync::{Arc as StdArc, Mutex as StdMutex};
let initial = StdArc::new(PropagatedHistory {
scope: HistoryPropagationScope::OwnHistory,
events: vec![],
chunks: vec![],
});
let captured: StdArc<StdMutex<Option<StdArc<PropagatedHistory>>>> =
StdArc::new(StdMutex::new(Some(initial)));
let captured_clone = captured.clone();
let orch_fn: OrchestratorFn = Arc::new(move |ctx| {
let captured = captured_clone.clone();
Box::pin(async move {
*captured.lock().unwrap() = ctx.propagated_history();
Ok(None)
})
});
let ts = chrono::Utc::now();
let _ = run_executor(
&orch_fn,
vec![
make_workflow_started(ts),
make_execution_started("child", None),
],
vec![],
)
.await
.unwrap();
assert!(captured.lock().unwrap().is_none());
}