#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use async_trait::async_trait;
use duroxide::Client;
use duroxide::Either2;
use duroxide::EventKind;
use duroxide::providers::error::ProviderError;
use duroxide::providers::{
DispatcherCapabilityFilter, ExecutionMetadata, OrchestrationItem, Provider, ProviderAdmin,
ScheduledActivityIdentifier, SessionFetchConfig, TagFilter, WorkItem,
};
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry};
mod common;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[tokio::test]
async fn cancel_parent_down_propagates_to_child() {
let (store, _td) = common::create_sqlite_store_disk().await;
let child = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_wait("Go").await;
Ok("done".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let _res = ctx.schedule_sub_orchestration("Child", "seed").await;
Ok("parent_done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Child", child)
.register("Parent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client.start_orchestration("inst-cancel-1", "Parent", "").await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = client.cancel_instance("inst-cancel-1", "by_test").await;
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(5000);
loop {
let hist = store.read("inst-cancel-1").await.unwrap_or_default();
if hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "by_test"
)
)
}) {
assert!(
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. })),
"missing cancel requested event for parent"
);
break;
}
if std::time::Instant::now() > deadline {
panic!("timeout waiting for parent canceled");
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let mgmt = store.as_management_capability().expect("ProviderAdmin required");
let children: Vec<String> = mgmt
.list_instances()
.await
.unwrap_or_default()
.into_iter()
.filter(|i| i.starts_with("inst-cancel-1::"))
.collect();
assert!(!children.is_empty(), "expected a child instance");
for child in children {
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(5000);
loop {
let hist = store.read(&child).await.unwrap_or_default();
let has_cancel = hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }));
let has_failed = hist.iter().any(|e| {
matches!(&e.kind, EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "parent canceled"
)
)
});
if has_cancel && has_failed {
break;
}
if std::time::Instant::now() > deadline {
panic!("child {child} did not cancel in time");
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancel_after_completion_is_noop() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |_ctx: OrchestrationContext, _input: String| async move { Ok("ok".to_string()) };
let orchestration_registry = OrchestrationRegistry::builder().register("Quick", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-cancel-noop", "Quick", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-cancel-noop", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "ok"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let _ = client.cancel_instance("inst-cancel-noop", "late").await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let hist = store.read("inst-cancel-noop").await.unwrap_or_default();
assert!(
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { output, .. } if output == "ok"))
);
assert!(
!hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }))
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancel_child_directly_signals_parent() {
let (store, _td) = common::create_sqlite_store_disk().await;
let child = |_ctx: OrchestrationContext, _input: String| async move {
futures::future::pending::<Result<String, String>>().await
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
match ctx.schedule_sub_orchestration("ChildD", "x").await {
Ok(v) => Ok(format!("ok:{v}")),
Err(e) => Ok(format!("child_err:{e}")),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildD", child)
.register("ParentD", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-chdirect", "ParentD", "")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let child_inst = "inst-chdirect::sub::2"; let _ = client.cancel_instance(child_inst, "by_test_child").await;
let s = match client
.wait_for_orchestration("inst-chdirect", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert!(
s.starts_with("child_err:canceled: by_test_child"),
"unexpected parent out: {s}"
);
let ph = store.read("inst-chdirect").await.unwrap_or_default();
assert!(ph.iter().any(|e| matches!(
&e.kind,
EventKind::SubOrchestrationFailed { details, .. }
if e.source_event_id == Some(2) && matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "by_test_child"
)
)));
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancel_continue_as_new_second_exec() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, input: String| async move {
match input.as_str() {
"start" => {
return ctx.continue_as_new("wait").await;
}
"wait" => {
let _ = ctx.schedule_wait("Go").await;
Ok("done".to_string())
}
_ => Ok(input),
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("CanCancel", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-can-can", "CanCancel", "start")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await; let _ = client.cancel_instance("inst-can-can", "by_test_can").await;
match client
.wait_for_orchestration("inst-can-can", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "by_test_can"
));
}
runtime::OrchestrationStatus::Completed { output, .. } => panic!("expected cancellation, got: {output}"),
_ => panic!("unexpected orchestration status"),
}
let ok = common::wait_for_history(
store.clone(),
"inst-can-can",
|hist| {
hist.iter().rev().any(|e| {
matches!(&e.kind, EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "by_test_can"
)
)
})
},
5000,
)
.await;
assert!(ok, "timeout waiting for cancel failure");
let hist = store.read("inst-can-can").await.unwrap_or_default();
assert!(
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }))
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn orchestration_completes_before_activity_finishes() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
drop(ctx.schedule_activity("slow", ""));
Ok("done".to_string())
};
let mut ab = ActivityRegistry::builder();
ab = ab.register("slow", |_ctx: ActivityContext, _s: String| async move {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
Ok("ok".to_string())
});
let activity_registry = ab.build();
let orchestration_registry = OrchestrationRegistry::builder().register("QuickDone", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-orch-done-first", "QuickDone", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-orch-done-first", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "done"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
let hist = store.read("inst-orch-done-first").await.unwrap_or_default();
assert!(
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { output, .. } if output == "done"))
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn orchestration_fails_before_activity_finishes() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
drop(ctx.schedule_activity("slow2", ""));
Err("boom".to_string())
};
let mut ab = ActivityRegistry::builder();
ab = ab.register("slow2", |_ctx: ActivityContext, _s: String| async move {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
Ok("ok".to_string())
});
let activity_registry = ab.build();
let orchestration_registry = OrchestrationRegistry::builder().register("QuickFail", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-orch-fail-first", "QuickFail", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-orch-fail-first", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details: _, .. } => {} runtime::OrchestrationStatus::Completed { output, .. } => panic!("expected failure, got: {output}"),
_ => panic!("unexpected orchestration status"),
}
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
let hist = store.read("inst-orch-fail-first").await.unwrap_or_default();
assert!(hist.iter().any(
|e| matches!(&e.kind, EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::OrchestrationFailed,
message,
..
} if message == "boom"
)
)
));
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancel_parent_with_multiple_children() {
let (store, _td) = common::create_sqlite_store_disk().await;
let child = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_wait("Go").await;
Ok("done".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let mut futures = Vec::new();
for i in 0..5 {
futures.push(ctx.schedule_sub_orchestration("MultiChild", format!("input-{i}")));
}
let results = ctx.join(futures).await;
for result in results {
match result {
Err(e) if e.contains("canceled") => {}
_ => return Err("expected cancellation".to_string()),
}
}
Ok("parent_done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("MultiChild", child)
.register("MultiParent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-cancel-multi", "MultiParent", "")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _ = client.cancel_instance("inst-cancel-multi", "multi_test").await;
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(5000);
loop {
let hist = store.read("inst-cancel-multi").await.unwrap_or_default();
if hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "multi_test"
)
)
}) {
assert!(
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. })),
"missing cancel requested event for parent"
);
break;
}
if std::time::Instant::now() > deadline {
panic!("timeout waiting for parent canceled");
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let mgmt = store.as_management_capability().expect("ProviderAdmin required");
let children: Vec<String> = mgmt
.list_instances()
.await
.unwrap_or_default()
.into_iter()
.filter(|i| i.starts_with("inst-cancel-multi::"))
.collect();
assert_eq!(
children.len(),
5,
"expected 5 child instances, found {}",
children.len()
);
for child in children {
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(5000);
loop {
let hist = store.read(&child).await.unwrap_or_default();
let has_cancel = hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }));
let has_failed = hist.iter().any(|e| {
matches!(&e.kind, EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "parent canceled"
)
)
});
if has_cancel && has_failed {
break;
}
if std::time::Instant::now() > deadline {
panic!("child {child} did not cancel in time");
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn activity_receives_cancellation_signal() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let saw_cancellation = Arc::new(AtomicBool::new(false));
let saw_cancellation_clone = Arc::clone(&saw_cancellation);
let long_activity = move |ctx: ActivityContext, _input: String| {
let saw_cancellation = Arc::clone(&saw_cancellation_clone);
async move {
tokio::select! {
_ = ctx.cancelled() => {
saw_cancellation.store(true, Ordering::SeqCst);
Ok("cancelled".to_string())
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
Ok("timeout".to_string())
}
}
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let _result = ctx.schedule_activity("LongActivity", "input").await;
Ok("done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("LongActivityOrch", orchestration)
.build();
let activity_registry = ActivityRegistry::builder()
.register("LongActivity", long_activity)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
activity_cancellation_grace_period: Duration::from_secs(5),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-activity-cancel", "LongActivityOrch", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let _ = client
.cancel_instance("inst-activity-cancel", "test_cancellation")
.await;
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if saw_cancellation.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("Activity did not receive cancellation signal within timeout");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
saw_cancellation.load(Ordering::SeqCst),
"Activity should have received cancellation signal"
);
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
let hist = store.read("inst-activity-cancel").await.unwrap_or_default();
let is_cancelled = hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. }
if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
)
)
});
if is_cancelled {
break;
}
if std::time::Instant::now() > deadline {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn select2_loser_activity_receives_cancellation_signal() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let activity_started = Arc::new(AtomicBool::new(false));
let saw_cancellation = Arc::new(AtomicBool::new(false));
let activity_started_clone = Arc::clone(&activity_started);
let saw_cancellation_clone = Arc::clone(&saw_cancellation);
let long_activity = move |ctx: ActivityContext, _input: String| {
let activity_started = Arc::clone(&activity_started_clone);
let saw_cancellation = Arc::clone(&saw_cancellation_clone);
async move {
activity_started.store(true, Ordering::SeqCst);
tokio::select! {
_ = ctx.cancelled() => {
saw_cancellation.store(true, Ordering::SeqCst);
Ok("cancelled".to_string())
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
Ok("timeout".to_string())
}
}
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let activity = ctx.schedule_activity("LongActivity", "input");
let timeout = ctx.schedule_timer(Duration::from_secs(1));
match ctx.select2(activity, timeout).await {
Either2::First(_) => panic!("activity should not win"),
Either2::Second(_) => {} }
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SelectLoserCancelOrch", orchestration)
.build();
let activity_registry = ActivityRegistry::builder()
.register("LongActivity", long_activity)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 1,
worker_concurrency: 1,
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
activity_cancellation_grace_period: Duration::from_secs(2),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-select2-loser-cancel", "SelectLoserCancelOrch", "")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
if activity_started.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("Activity did not start in time");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if saw_cancellation.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("select2 loser activity did not receive cancellation signal in time");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
saw_cancellation.load(Ordering::SeqCst),
"Activity should have received cancellation signal as select2 loser"
);
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let hist = store.read("inst-select2-loser-cancel").await.unwrap_or_default();
if hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
{
break;
}
if std::time::Instant::now() > deadline {
panic!("timeout waiting for orchestration to complete");
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
let hist = store.read("inst-select2-loser-cancel").await.unwrap_or_default();
let scheduled_id = hist.iter().find_map(|e| {
if let EventKind::ActivityScheduled { name, .. } = &e.kind
&& name == "LongActivity"
{
Some(e.event_id())
} else {
None
}
});
let scheduled_id = scheduled_id.expect("Expected LongActivity to be scheduled");
assert!(
hist.iter().any(|e| {
matches!(&e.kind, EventKind::ActivityCancelRequested { reason } if reason == "dropped_future")
&& e.source_event_id == Some(scheduled_id)
}),
"Expected ActivityCancelRequested(source_event_id={scheduled_id}, reason=dropped_future) in persisted history"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn activity_result_dropped_when_orchestration_cancelled() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let activity_completed_count = Arc::new(AtomicU32::new(0));
let activity_completed_count_clone = Arc::clone(&activity_completed_count);
let slow_activity = move |_ctx: ActivityContext, _input: String| {
let counter = Arc::clone(&activity_completed_count_clone);
async move {
tokio::time::sleep(Duration::from_millis(500)).await;
counter.fetch_add(1, Ordering::SeqCst);
Ok("completed".to_string())
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let _result = ctx.schedule_activity("SlowActivity", "input").await;
Ok("done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SlowActivityOrch", orchestration)
.build();
let activity_registry = ActivityRegistry::builder()
.register("SlowActivity", slow_activity)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-drop-result", "SlowActivityOrch", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = client
.cancel_instance("inst-drop-result", "cancel_during_activity")
.await;
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(
activity_completed_count.load(Ordering::SeqCst) > 0,
"Activity should have completed"
);
let hist = store.read("inst-drop-result").await.unwrap_or_default();
let has_activity_completed = hist
.iter()
.any(|e| matches!(&e.kind, EventKind::ActivityCompleted { .. }));
if !has_activity_completed {
tracing::info!("Activity result was successfully dropped due to cancellation");
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn activity_skipped_when_orchestration_terminal_at_fetch() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let ran = Arc::new(AtomicBool::new(false));
let ran_clone = Arc::clone(&ran);
let activity = move |_ctx: ActivityContext, _input: String| {
let ran = Arc::clone(&ran_clone);
async move {
ran.store(true, Ordering::SeqCst);
Ok("done".to_string())
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let _handle = ctx.schedule_activity("A", "input");
let _ = ctx.schedule_wait("Go").await;
Ok("ok".to_string())
};
let orch_registry = OrchestrationRegistry::builder().register("O", orchestration).build();
let act_registry = ActivityRegistry::builder().register("A", activity).build();
let options_no_workers = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
worker_concurrency: 0, ..Default::default()
};
let rt1 = runtime::Runtime::start_with_options(
store.clone(),
act_registry.clone(),
orch_registry.clone(),
options_no_workers,
)
.await;
let client = Client::new(store.clone());
client.start_orchestration("inst-skip", "O", "").await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
client.cancel_instance("inst-skip", "cancel").await.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
let hist = store.read("inst-skip").await.unwrap_or_default();
if hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationFailed { .. }))
{
break;
}
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for orchestration to be cancelled");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
rt1.shutdown(None).await;
assert!(
!ran.load(Ordering::SeqCst),
"Activity should not have run yet - no workers in phase 1"
);
let options_with_workers = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
worker_concurrency: 2,
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
activity_cancellation_grace_period: Duration::from_secs(1),
..Default::default()
};
let ran2 = Arc::new(AtomicBool::new(false));
let ran2_clone = Arc::clone(&ran2);
let activity2 = move |_ctx: ActivityContext, _input: String| {
let ran = Arc::clone(&ran2_clone);
async move {
ran.store(true, Ordering::SeqCst);
Ok("done".to_string())
}
};
let act_registry2 = ActivityRegistry::builder().register("A", activity2).build();
let rt2 =
runtime::Runtime::start_with_options(store.clone(), act_registry2, orch_registry, options_with_workers).await;
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(
!ran2.load(Ordering::SeqCst),
"Activity should have been skipped when orchestration was terminal at fetch"
);
let hist = store.read("inst-skip").await.unwrap_or_default();
let has_activity_completed = hist
.iter()
.any(|e| matches!(&e.kind, EventKind::ActivityCompleted { .. }));
assert!(
!has_activity_completed,
"Activity completion should not be recorded for skipped activity"
);
rt2.shutdown(None).await;
}
#[tokio::test]
async fn activity_aborted_after_cancellation_grace() {
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let attempts = Arc::new(AtomicU32::new(0));
let attempts_clone = Arc::clone(&attempts);
let stubborn_activity = move |_ctx: ActivityContext, _input: String| {
let attempts = Arc::clone(&attempts_clone);
async move {
attempts.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_secs(5)).await;
Ok("done".to_string())
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_activity("Stubborn", "input").await;
Ok("ok".to_string())
};
let orch_registry = OrchestrationRegistry::builder()
.register("StubbornOrch", orchestration)
.build();
let act_registry = ActivityRegistry::builder()
.register("Stubborn", stubborn_activity)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
activity_cancellation_grace_period: Duration::from_millis(500),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), act_registry, orch_registry, options).await;
let client = Client::new(store.clone());
let start = std::time::Instant::now();
client
.start_orchestration("inst-grace", "StubbornOrch", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
client.cancel_instance("inst-grace", "cancel_now").await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(attempts.load(Ordering::SeqCst), 1, "Activity should run once");
let hist = store.read("inst-grace").await.unwrap_or_default();
let has_activity_completed = hist
.iter()
.any(|e| matches!(&e.kind, EventKind::ActivityCompleted { .. }));
assert!(
!has_activity_completed,
"Activity completion should be dropped after cancellation"
);
assert!(
start.elapsed() < Duration::from_secs(5),
"Activity should have been aborted before full run"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancel_nonexistent_instance_is_noop() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orchestration_registry = OrchestrationRegistry::builder()
.register("Dummy", |_ctx: OrchestrationContext, _input: String| async move {
Ok("ok".to_string())
})
.build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
let result = client.cancel_instance("does-not-exist", "test").await;
assert!(result.is_ok(), "Cancelling non-existent instance should not error");
tokio::time::sleep(Duration::from_millis(200)).await;
rt.shutdown(None).await;
}
#[tokio::test]
async fn multiple_cancel_calls_are_idempotent() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_wait("Go").await;
Ok("done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder().register("Waiter", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-multi-cancel", "Waiter", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
for i in 0..5 {
let result = client.cancel_instance("inst-multi-cancel", format!("cancel-{i}")).await;
assert!(result.is_ok(), "Cancel call {i} should succeed");
}
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
let hist = store.read("inst-multi-cancel").await.unwrap_or_default();
if hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationFailed { .. }))
{
break;
}
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for orchestration to be cancelled");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let hist = store.read("inst-multi-cancel").await.unwrap_or_default();
let cancel_requested_count = hist
.iter()
.filter(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }))
.count();
assert_eq!(
cancel_requested_count, 1,
"Should only have one CancelRequested event, got {cancel_requested_count}"
);
let cancel_event = hist
.iter()
.find(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }))
.unwrap();
if let EventKind::OrchestrationCancelRequested { reason } = &cancel_event.kind {
assert_eq!(reason, "cancel-0", "First cancel reason should win");
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancel_before_orchestration_starts() {
let (store, _td) = common::create_sqlite_store_disk().await;
store
.enqueue_for_orchestrator(
duroxide::providers::WorkItem::StartOrchestration {
instance: "inst-early-cancel".to_string(),
orchestration: "Early".to_string(),
version: Some("1.0.0".to_string()),
input: "input".to_string(),
parent_instance: None,
parent_id: None,
execution_id: 1,
},
None,
)
.await
.unwrap();
store
.enqueue_for_orchestrator(
duroxide::providers::WorkItem::CancelInstance {
instance: "inst-early-cancel".to_string(),
reason: "early_cancel".to_string(),
},
None,
)
.await
.unwrap();
let orchestration_registry = OrchestrationRegistry::builder()
.register("Early", |_ctx: OrchestrationContext, _input: String| async move {
Ok("done".to_string())
})
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
let hist = store.read("inst-early-cancel").await.unwrap_or_default();
let is_cancelled = hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. }
if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
)
)
});
if is_cancelled {
break;
}
if std::time::Instant::now() > deadline {
let hist = store.read("inst-early-cancel").await.unwrap_or_default();
panic!(
"Timeout waiting for orchestration to be cancelled. History: {:?}",
hist.iter().map(|e| &e.kind).collect::<Vec<_>>()
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let hist = store.read("inst-early-cancel").await.unwrap_or_default();
let cancel_event = hist.iter().find(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. }
if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "early_cancel"
)
)
});
assert!(
cancel_event.is_some(),
"Orchestration should be cancelled with reason 'early_cancel'"
);
let completed = hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }));
assert!(
!completed,
"Orchestration should NOT have completed - cancel was enqueued after start"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn explicit_drop_activity_triggers_cancellation() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let activity_started = Arc::new(AtomicBool::new(false));
let saw_cancellation = Arc::new(AtomicBool::new(false));
let activity_started_clone = Arc::clone(&activity_started);
let saw_cancellation_clone = Arc::clone(&saw_cancellation);
let cancellable_activity = move |ctx: ActivityContext, _input: String| {
let activity_started = Arc::clone(&activity_started_clone);
let saw_cancellation = Arc::clone(&saw_cancellation_clone);
async move {
activity_started.store(true, Ordering::SeqCst);
tokio::select! {
_ = ctx.cancelled() => {
saw_cancellation.store(true, Ordering::SeqCst);
Ok("cancelled".to_string())
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
Ok("timeout".to_string())
}
}
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let activity_future = ctx.schedule_activity("CancellableActivity", "input");
ctx.schedule_timer(Duration::from_millis(500)).await;
drop(activity_future);
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("completed_after_drop".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ExplicitDropActivity", orchestration)
.build();
let activity_registry = ActivityRegistry::builder()
.register("CancellableActivity", cancellable_activity)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 1,
worker_concurrency: 1,
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
activity_cancellation_grace_period: Duration::from_secs(2),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-explicit-drop-activity", "ExplicitDropActivity", "")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
if activity_started.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("Activity did not start in time");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if saw_cancellation.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("Explicitly dropped activity did not receive cancellation signal");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
saw_cancellation.load(Ordering::SeqCst),
"Activity should have received cancellation signal after explicit drop"
);
match client
.wait_for_orchestration("inst-explicit-drop-activity", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "completed_after_drop");
}
other => panic!("Expected Completed, got {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn activity_out_of_scope_triggers_cancellation() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let (store, _td) = common::create_sqlite_store_disk().await;
let activity_started = Arc::new(AtomicBool::new(false));
let saw_cancellation = Arc::new(AtomicBool::new(false));
let activity_started_clone = Arc::clone(&activity_started);
let saw_cancellation_clone = Arc::clone(&saw_cancellation);
let cancellable_activity = move |ctx: ActivityContext, _input: String| {
let activity_started = Arc::clone(&activity_started_clone);
let saw_cancellation = Arc::clone(&saw_cancellation_clone);
async move {
activity_started.store(true, Ordering::SeqCst);
tokio::select! {
_ = ctx.cancelled() => {
saw_cancellation.store(true, Ordering::SeqCst);
Ok("cancelled".to_string())
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
Ok("timeout".to_string())
}
}
}
};
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
{
let _unused = ctx.schedule_activity("CancellableActivity", "input");
ctx.schedule_timer(Duration::from_millis(500)).await;
}
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("completed_without_await".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("OutOfScopeActivity", orchestration)
.build();
let activity_registry = ActivityRegistry::builder()
.register("CancellableActivity", cancellable_activity)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 1,
worker_concurrency: 1,
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_millis(500),
activity_cancellation_grace_period: Duration::from_secs(2),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-out-of-scope-activity", "OutOfScopeActivity", "")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
if activity_started.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("Activity did not start in time");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if saw_cancellation.load(Ordering::SeqCst) {
break;
}
if std::time::Instant::now() > deadline {
panic!("Out-of-scope activity did not receive cancellation signal");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
saw_cancellation.load(Ordering::SeqCst),
"Activity should have received cancellation signal when going out of scope"
);
match client
.wait_for_orchestration("inst-out-of-scope-activity", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "completed_without_await");
}
other => panic!("Expected Completed, got {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn select2_loser_sub_orchestration_cancelled() {
let (store, _td) = common::create_sqlite_store_disk().await;
let slow_child = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_wait("NeverComes").await;
Ok("child_completed".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let sub_orch = ctx.schedule_sub_orchestration("SlowChild", "input");
let timer = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(sub_orch, timer).await {
Either2::First(result) => Ok(format!("sub_won:{}", result?)),
Either2::Second(()) => Ok("timer_won".to_string()),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SlowChild", slow_child)
.register("SelectSubOrchParent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 2,
worker_concurrency: 1,
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-select2-suborg-cancel", "SelectSubOrchParent", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-select2-suborg-cancel", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "timer_won", "Timer should win the race");
}
other => panic!("Expected parent Completed, got {:?}", other),
}
let parent_hist = store.read("inst-select2-suborg-cancel").await.unwrap();
let child_instance = parent_hist.iter().find_map(|e| match &e.kind {
EventKind::SubOrchestrationScheduled { instance, .. } => Some(instance.clone()),
_ => None,
});
let child_id = child_instance.expect("Child should have been scheduled");
let full_child_id = format!("inst-select2-suborg-cancel::{child_id}");
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let child_status = client.get_orchestration_status(&full_child_id).await.unwrap();
match child_status {
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(
matches!(
&details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
),
"Child should be cancelled, got: {}",
details.display_message()
);
break;
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Child should NOT complete - it was a select2 loser. Got: {output}");
}
runtime::OrchestrationStatus::Running { .. } | runtime::OrchestrationStatus::NotFound => {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for child sub-orchestration to be cancelled");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn select2_loser_sub_orchestration_explicit_id_cancelled() {
let (store, _td) = common::create_sqlite_store_disk().await;
let slow_child = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_wait("NeverComes").await;
Ok("child_completed".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let sub_orch = ctx.schedule_sub_orchestration_with_id("SlowChild", "my-explicit-child-id", "input");
let timer = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(sub_orch, timer).await {
Either2::First(result) => Ok(format!("sub_won:{}", result?)),
Either2::Second(()) => Ok("timer_won".to_string()),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SlowChild", slow_child)
.register("SelectExplicitIdParent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 2,
worker_concurrency: 1,
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-select2-explicit-id", "SelectExplicitIdParent", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-select2-explicit-id", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "timer_won", "Timer should win the race");
}
other => panic!("Expected parent Completed, got {:?}", other),
}
let parent_hist = store.read("inst-select2-explicit-id").await.unwrap();
let child_instance = parent_hist.iter().find_map(|e| match &e.kind {
EventKind::SubOrchestrationScheduled { instance, .. } => Some(instance.clone()),
_ => None,
});
assert_eq!(
child_instance.as_deref(),
Some("my-explicit-child-id"),
"Child should have been scheduled with explicit instance ID"
);
let full_child_id = "my-explicit-child-id";
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let child_status = client.get_orchestration_status(full_child_id).await.unwrap();
match child_status {
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(
matches!(
&details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
),
"Child with explicit ID should be cancelled, got: {}",
details.display_message()
);
break;
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Child with explicit ID should NOT complete - it was a select2 loser. Got: {output}");
}
runtime::OrchestrationStatus::Running { .. } | runtime::OrchestrationStatus::NotFound => {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for child sub-orchestration with explicit ID to be cancelled");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn explicit_drop_sub_orchestration_cancelled() {
let (store, _td) = common::create_sqlite_store_disk().await;
let waiting_child = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_wait("NeverComes").await;
Ok("child_completed".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let sub_orch = ctx.schedule_sub_orchestration("WaitingChild", "input");
drop(sub_orch);
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("parent_completed_after_drop".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("WaitingChild", waiting_child)
.register("DropSubOrchParent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 2,
worker_concurrency: 1,
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-drop-suborg", "DropSubOrchParent", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-drop-suborg", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "parent_completed_after_drop");
}
other => panic!("Expected parent Completed, got {:?}", other),
}
let parent_hist = store.read("inst-drop-suborg").await.unwrap();
let child_instance = parent_hist.iter().find_map(|e| match &e.kind {
EventKind::SubOrchestrationScheduled { instance, .. } => Some(instance.clone()),
_ => None,
});
let child_id = child_instance.expect("Child should have been scheduled");
let full_child_id = format!("inst-drop-suborg::{child_id}");
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let child_status = client.get_orchestration_status(&full_child_id).await.unwrap();
match child_status {
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(
matches!(
&details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
),
"Child should be cancelled after explicit drop, got: {}",
details.display_message()
);
break;
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Child should NOT complete after being dropped. Got: {output}");
}
runtime::OrchestrationStatus::Running { .. } | runtime::OrchestrationStatus::NotFound => {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for dropped child sub-orchestration to be cancelled");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
rt.shutdown(None).await;
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum CancelInstanceEnqueueSource {
EnqueueForOrchestrator,
AckOrchestrationItem,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RecordedCancelInstance {
instance: String,
reason: String,
source: CancelInstanceEnqueueSource,
}
struct RecordingProvider {
inner: Arc<dyn Provider>,
cancel_instances: Mutex<Vec<RecordedCancelInstance>>,
allow_child_fetch: AtomicBool,
child_cancel_reasons_by_fetch: Mutex<Vec<Vec<String>>>,
}
impl RecordingProvider {
fn new(inner: Arc<dyn Provider>) -> Self {
Self {
inner,
cancel_instances: Mutex::new(Vec::new()),
allow_child_fetch: AtomicBool::new(false),
child_cancel_reasons_by_fetch: Mutex::new(Vec::new()),
}
}
fn cancel_instance_records(&self) -> Vec<RecordedCancelInstance> {
self.cancel_instances
.lock()
.expect("Mutex should not be poisoned")
.clone()
}
fn record_cancel_instance(&self, instance: &str, reason: &str, source: CancelInstanceEnqueueSource) {
self.cancel_instances
.lock()
.expect("Mutex should not be poisoned")
.push(RecordedCancelInstance {
instance: instance.to_string(),
reason: reason.to_string(),
source,
});
}
fn allow_child_fetch(&self) {
self.allow_child_fetch.store(true, Ordering::SeqCst);
}
fn child_cancel_reasons_by_fetch(&self) -> Vec<Vec<String>> {
self.child_cancel_reasons_by_fetch
.lock()
.expect("Mutex should not be poisoned")
.clone()
}
}
#[async_trait]
impl Provider for RecordingProvider {
async fn fetch_orchestration_item(
&self,
lock_timeout: Duration,
poll_timeout: Duration,
filter: Option<&DispatcherCapabilityFilter>,
) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
let result = self
.inner
.fetch_orchestration_item(lock_timeout, poll_timeout, filter)
.await?;
if let Some((item, lock_token, attempt_count)) = result {
if item.instance == "combo-child" && !self.allow_child_fetch.load(Ordering::SeqCst) {
self.inner
.abandon_orchestration_item(&lock_token, Some(Duration::from_millis(25)), true)
.await?;
return Ok(None);
}
if item.instance == "combo-child" {
let reasons: Vec<String> = item
.messages
.iter()
.filter_map(|m| match m {
WorkItem::CancelInstance { reason, .. } => Some(reason.clone()),
_ => None,
})
.collect();
self.child_cancel_reasons_by_fetch
.lock()
.expect("Mutex should not be poisoned")
.push(reasons);
}
return Ok(Some((item, lock_token, attempt_count)));
}
Ok(None)
}
async fn fetch_work_item(
&self,
lock_timeout: Duration,
poll_timeout: Duration,
session: Option<&SessionFetchConfig>,
tag_filter: &TagFilter,
) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
self.inner
.fetch_work_item(lock_timeout, poll_timeout, session, tag_filter)
.await
}
async fn ack_orchestration_item(
&self,
lock_token: &str,
execution_id: u64,
history_delta: Vec<duroxide::Event>,
worker_items: Vec<WorkItem>,
orchestrator_items: Vec<WorkItem>,
metadata: ExecutionMetadata,
cancelled_activities: Vec<ScheduledActivityIdentifier>,
) -> Result<(), ProviderError> {
for item in &orchestrator_items {
if let WorkItem::CancelInstance { instance, reason } = item {
self.record_cancel_instance(instance, reason, CancelInstanceEnqueueSource::AckOrchestrationItem);
}
}
self.inner
.ack_orchestration_item(
lock_token,
execution_id,
history_delta,
worker_items,
orchestrator_items,
metadata,
cancelled_activities,
)
.await
}
async fn abandon_orchestration_item(
&self,
lock_token: &str,
delay: Option<Duration>,
ignore_attempt: bool,
) -> Result<(), ProviderError> {
self.inner
.abandon_orchestration_item(lock_token, delay, ignore_attempt)
.await
}
async fn ack_work_item(&self, token: &str, completion: Option<WorkItem>) -> Result<(), ProviderError> {
self.inner.ack_work_item(token, completion).await
}
async fn renew_work_item_lock(&self, token: &str, extension: Duration) -> Result<(), ProviderError> {
self.inner.renew_work_item_lock(token, extension).await
}
async fn abandon_work_item(
&self,
token: &str,
delay: Option<Duration>,
ignore_attempt: bool,
) -> Result<(), ProviderError> {
self.inner.abandon_work_item(token, delay, ignore_attempt).await
}
async fn renew_session_lock(
&self,
owner_ids: &[&str],
extend_for: Duration,
idle_timeout: Duration,
) -> Result<usize, ProviderError> {
self.inner.renew_session_lock(owner_ids, extend_for, idle_timeout).await
}
async fn cleanup_orphaned_sessions(&self, idle_timeout: Duration) -> Result<usize, ProviderError> {
self.inner.cleanup_orphaned_sessions(idle_timeout).await
}
async fn renew_orchestration_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
self.inner.renew_orchestration_item_lock(token, extend_for).await
}
async fn enqueue_for_orchestrator(&self, item: WorkItem, delay: Option<Duration>) -> Result<(), ProviderError> {
if let WorkItem::CancelInstance { instance, reason } = &item {
self.record_cancel_instance(instance, reason, CancelInstanceEnqueueSource::EnqueueForOrchestrator);
}
self.inner.enqueue_for_orchestrator(item, delay).await
}
async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
self.inner.enqueue_for_worker(item).await
}
async fn read(&self, instance: &str) -> Result<Vec<duroxide::Event>, ProviderError> {
self.inner.read(instance).await
}
async fn read_with_execution(
&self,
instance: &str,
execution_id: u64,
) -> Result<Vec<duroxide::Event>, ProviderError> {
self.inner.read_with_execution(instance, execution_id).await
}
async fn append_with_execution(
&self,
instance: &str,
execution_id: u64,
new_events: Vec<duroxide::Event>,
) -> Result<(), ProviderError> {
self.inner
.append_with_execution(instance, execution_id, new_events)
.await
}
fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
self.inner.as_management_capability()
}
async fn get_custom_status(
&self,
instance: &str,
last_seen_version: u64,
) -> Result<Option<(Option<String>, u64)>, ProviderError> {
self.inner.get_custom_status(instance, last_seen_version).await
}
async fn get_kv_value(&self, instance: &str, key: &str) -> Result<Option<String>, ProviderError> {
self.inner.get_kv_value(instance, key).await
}
async fn get_kv_all_values(
&self,
instance: &str,
) -> Result<std::collections::HashMap<String, String>, ProviderError> {
self.inner.get_kv_all_values(instance).await
}
async fn get_instance_stats(&self, instance: &str) -> Result<Option<duroxide::SystemStats>, ProviderError> {
self.inner.get_instance_stats(instance).await
}
}
#[tokio::test]
async fn user_cancel_instance_and_dropped_future_both_enqueue_cancelinstance_for_child() {
let (inner_store, _td) = common::create_sqlite_store_disk().await;
let recording_store = Arc::new(RecordingProvider::new(inner_store));
let store: Arc<dyn Provider> = recording_store.clone();
let child = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_wait("NeverComes").await;
Ok("child_completed".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let sub_orch = ctx.schedule_sub_orchestration_with_id("ComboChild", "combo-child", "input");
drop(sub_orch);
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("parent_done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ComboChild", child)
.register("ComboParent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 2,
worker_concurrency: 1,
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-combo-cancel", "ComboParent", "")
.await
.unwrap();
client.cancel_instance("combo-child", "user_cancel").await.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let records = recording_store.cancel_instance_records();
let saw_user_cancel = records.iter().any(|r| {
r.instance == "combo-child"
&& r.reason == "user_cancel"
&& r.source == CancelInstanceEnqueueSource::EnqueueForOrchestrator
});
let saw_drop_cancel = records.iter().any(|r| {
r.instance == "combo-child"
&& r.reason == "parent dropped sub-orchestration future"
&& r.source == CancelInstanceEnqueueSource::AckOrchestrationItem
});
if saw_user_cancel && saw_drop_cancel {
break;
}
if std::time::Instant::now() > deadline {
panic!("Timed out waiting for both CancelInstance enqueues. records={records:?}");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
recording_store.allow_child_fetch();
match client
.wait_for_orchestration("inst-combo-cancel", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "parent_done"),
other => panic!("Expected parent Completed, got {:?}", other),
}
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let child_status = client.get_orchestration_status("combo-child").await.unwrap();
match child_status {
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(
matches!(
&details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
),
"Child should be cancelled, got: {}",
details.display_message()
);
break;
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Child should NOT complete (it was cancelled). Got: {output}");
}
runtime::OrchestrationStatus::Running { .. } | runtime::OrchestrationStatus::NotFound => {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for child to be cancelled");
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
}
let records = recording_store.cancel_instance_records();
assert!(
records.iter().any(|r| {
r.instance == "combo-child"
&& r.reason == "user_cancel"
&& r.source == CancelInstanceEnqueueSource::EnqueueForOrchestrator
}),
"Expected user CancelInstance enqueue for combo-child, got records={records:?}"
);
assert!(
records.iter().any(|r| {
r.instance == "combo-child"
&& r.reason == "parent dropped sub-orchestration future"
&& r.source == CancelInstanceEnqueueSource::AckOrchestrationItem
}),
"Expected dropped-future CancelInstance enqueue for combo-child, got records={records:?}"
);
let child_fetches = recording_store.child_cancel_reasons_by_fetch();
assert!(
!child_fetches.is_empty(),
"Expected at least one fetched child turn batch, got child_fetches={child_fetches:?}"
);
let first_batch = &child_fetches[0];
assert!(
first_batch.iter().any(|r| r == "user_cancel"),
"Expected user_cancel in first child batch, got first_batch={first_batch:?}"
);
assert!(
first_batch
.iter()
.any(|r| r == "parent dropped sub-orchestration future"),
"Expected dropped-future reason in first child batch, got first_batch={first_batch:?}"
);
let parent_hist = store.read("inst-combo-cancel").await.unwrap_or_default();
let scheduled_id = parent_hist.iter().find_map(|e| match &e.kind {
EventKind::SubOrchestrationScheduled { instance, .. } if instance == "combo-child" => Some(e.event_id()),
_ => None,
});
let scheduled_id = scheduled_id.expect("Expected sub-orchestration to be scheduled");
assert!(
parent_hist.iter().any(|e| {
matches!(&e.kind, EventKind::SubOrchestrationCancelRequested { reason } if reason == "dropped_future")
&& e.source_event_id == Some(scheduled_id)
}),
"Expected SubOrchestrationCancelRequested(reason=dropped_future, source_event_id={scheduled_id}) in parent history"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn sub_orchestration_out_of_scope_cancelled() {
let (store, _td) = common::create_sqlite_store_disk().await;
let waiting_child = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_wait("NeverComes").await;
Ok("child_completed".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
{
let _unused = ctx.schedule_sub_orchestration("WaitingChild", "input");
}
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("parent_completed_without_await".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("WaitingChild", waiting_child)
.register("OutOfScopeSubOrchParent", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestration_concurrency: 2,
worker_concurrency: 1,
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-scope-suborg", "OutOfScopeSubOrchParent", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-scope-suborg", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "parent_completed_without_await");
}
other => panic!("Expected parent Completed, got {:?}", other),
}
let parent_hist = store.read("inst-scope-suborg").await.unwrap();
let child_instance = parent_hist.iter().find_map(|e| match &e.kind {
EventKind::SubOrchestrationScheduled { instance, .. } => Some(instance.clone()),
_ => None,
});
let child_id = child_instance.expect("Child should have been scheduled");
let full_child_id = format!("inst-scope-suborg::{child_id}");
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
let child_status = client.get_orchestration_status(&full_child_id).await.unwrap();
match child_status {
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(
matches!(
&details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { .. },
..
}
),
"Child should be cancelled when going out of scope, got: {}",
details.display_message()
);
break;
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Child should NOT complete when it went out of scope. Got: {output}");
}
runtime::OrchestrationStatus::Running { .. } | runtime::OrchestrationStatus::NotFound => {
if std::time::Instant::now() > deadline {
panic!("Timeout waiting for out-of-scope child sub-orchestration to be cancelled");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn positional_arrival_before_sub_outstanding() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
ctx.schedule_timer(std::time::Duration::from_millis(100)).await;
let wait = ctx.schedule_wait("MyEvent");
let timeout = ctx.schedule_timer(std::time::Duration::from_millis(500));
match ctx.select2(wait, timeout).await {
duroxide::Either2::First(_) => Ok("event-received".to_string()),
duroxide::Either2::Second(_) => Ok("timeout".to_string()),
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pos-arr", "TestOrch", "")
.await
.unwrap();
client.raise_event("inst-pos-arr", "MyEvent", "data").await.unwrap();
let status = client
.wait_for_orchestration("inst-pos-arr", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "timeout"),
"Expected timeout, got {:?}",
status
);
let history = store.read("inst-pos-arr").await.unwrap();
let has_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::ExternalSubscribedCancelled { reason } if reason == "dropped_future"));
assert!(
has_cancel,
"Missing dropped_future cancellation breadcrumb for positional subscription"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn cancelled_subscription_does_not_steal_future_event() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let wait1 = ctx.schedule_wait("Signal");
let timeout1 = ctx.schedule_timer(Duration::from_millis(50));
match ctx.select2(wait1, timeout1).await {
Either2::First(_) => return Ok("unexpected_phase1".to_string()),
Either2::Second(_) => {} }
let data = ctx.schedule_wait("Signal").await;
Ok(format!("got:{data}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-no-steal", "TestOrch", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.raise_event("inst-no-steal", "Signal", "hello").await.unwrap();
let status = client
.wait_for_orchestration("inst-no-steal", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "got:hello"),
"Expected got:hello, got {:?}",
status
);
let history = store.read("inst-no-steal").await.unwrap();
let has_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::ExternalSubscribedCancelled { reason } if reason == "dropped_future"));
assert!(has_cancel, "Missing dropped_future cancellation breadcrumb");
rt.shutdown(None).await;
}
#[tokio::test]
async fn positional_matching_fifo_without_cancellation() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let d1 = ctx.schedule_wait("Data").await;
let d2 = ctx.schedule_wait("Data").await;
Ok(format!("{d1},{d2}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client.start_orchestration("inst-fifo", "TestOrch", "").await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.raise_event("inst-fifo", "Data", "first").await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.raise_event("inst-fifo", "Data", "second").await.unwrap();
let status = client
.wait_for_orchestration("inst-fifo", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "first,second"),
"FIFO ordering broken, got {:?}",
status
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn duplicate_external_events_not_deduplicated() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let d1 = ctx.schedule_wait("Dup").await;
let d2 = ctx.schedule_wait("Dup").await;
Ok(format!("{d1},{d2}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client.start_orchestration("inst-nodup", "TestOrch", "").await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.raise_event("inst-nodup", "Dup", "same_payload").await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.raise_event("inst-nodup", "Dup", "same_payload").await.unwrap();
let status = client
.wait_for_orchestration("inst-nodup", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "same_payload,same_payload"),
"Duplicate events were deduplicated (bug regression), got {:?}",
status
);
rt.shutdown(None).await;
}