#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::providers::{Provider, SemverRange, WorkItem};
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self, RuntimeOptions};
use duroxide::{Client, EventKind, INITIAL_EXECUTION_ID, OrchestrationRegistry};
use std::sync::Arc;
use std::time::Duration;
mod common;
#[tokio::test]
async fn runtime_abandons_incompatible_execution() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "incompat-12", "TestOrch", semver::Version::new(99, 0, 0)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"TestOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("completed".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 3,
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
tokio::time::sleep(Duration::from_secs(2)).await;
rt.shutdown(None).await;
let client = Client::new(store.clone());
let status = client.get_orchestration_status("incompat-12").await.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Running { .. }),
"Incompatible instance should remain Running (invisible to filtered runtime), got: {status:?}"
);
}
#[tokio::test]
async fn runtime_processes_compatible_execution_normally() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"CompatOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("compatible-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("compat-13", "CompatOrch", "{}")
.await
.unwrap();
let status = client
.wait_for_orchestration("compat-13", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => {
assert!(
output.contains("compatible-ok"),
"Expected compatible-ok in output, got: {output}"
);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn runtime_abandon_reaches_max_attempts_and_poisons() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "poison-14", "TestOrch", semver::Version::new(99, 0, 0)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"TestOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("should-not-reach".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 3,
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
tokio::time::sleep(Duration::from_secs(2)).await;
rt.shutdown(None).await;
let client = Client::new(store.clone());
let status = client.get_orchestration_status("poison-14").await.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Running { .. }),
"Incompatible instance should remain Running with provider-level filtering, got: {status:?}"
);
}
#[tokio::test]
async fn runtime_abandon_uses_short_delay() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "delay-15", "TestOrch", semver::Version::new(99, 0, 0)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"TestOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("should-not-reach".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 100,
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let client = Client::new(store.clone());
let status = client.get_orchestration_status("delay-15").await.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Running { .. }),
"Item should still be Running (not immediately poisoned due to abandon delay)"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn execution_metadata_includes_pinned_version_on_new_orchestration() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"MetaOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("meta-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let client = Client::new(store.clone());
client.start_orchestration("meta-19", "MetaOrch", "{}").await.unwrap();
let status = client
.wait_for_orchestration("meta-19", Duration::from_secs(5))
.await
.unwrap();
assert!(matches!(status, duroxide::OrchestrationStatus::Completed { .. }));
rt.shutdown(None).await;
let history = store.read("meta-19").await.unwrap();
let started_event = history
.iter()
.find(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
.expect("Should have OrchestrationStarted event");
let current_version = duroxide::current_build_version();
let pinned = semver::Version::parse(&started_event.duroxide_version).expect("Should be parseable semver");
assert_eq!(
pinned, current_version,
"Pinned version should match current build version"
);
}
#[tokio::test]
async fn existing_executions_without_pinned_version_remain_fetchable() {
let (store, _td) = common::create_sqlite_store_disk().await;
provider_seed_without_pinned_version(&*store, "null-20", "NullOrch").await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"NullOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("null-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let completed = common::wait_for_history(
store.clone(),
"null-20",
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(completed, "Instance with NULL pinned version should be processed");
rt.shutdown(None).await;
}
async fn provider_seed_without_pinned_version(provider: &dyn Provider, instance: &str, orchestration: &str) {
provider
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: instance.to_string(),
orchestration: orchestration.to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id: INITIAL_EXECUTION_ID,
},
None,
)
.await
.unwrap();
}
#[tokio::test]
async fn default_supported_range_includes_current_and_older_versions() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"RangeOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("range-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let client = Client::new(store.clone());
client.start_orchestration("range-26", "RangeOrch", "{}").await.unwrap();
let status = client
.wait_for_orchestration("range-26", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Completed { .. }),
"Orchestration pinned at current version should complete with default range"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn default_supported_range_excludes_future_versions() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "future-27", "FutureOrch", semver::Version::new(99, 0, 0)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"FutureOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("should-not-reach".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 3,
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
tokio::time::sleep(Duration::from_secs(2)).await;
rt.shutdown(None).await;
let client = Client::new(store.clone());
let status = client.get_orchestration_status("future-27").await.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Running { .. }),
"Future-version instance should remain Running (filtered out), got: {status:?}"
);
}
#[tokio::test]
async fn custom_supported_replay_versions_narrows_range() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "narrow-28", "NarrowOrch", semver::Version::new(1, 0, 0)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"NarrowOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("should-not-reach".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 3,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(2, 0, 0),
semver::Version::new(2, 99, 99),
)),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
tokio::time::sleep(Duration::from_secs(2)).await;
rt.shutdown(None).await;
let client = Client::new(store.clone());
let status = client.get_orchestration_status("narrow-28").await.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Running { .. }),
"v1.0.0 instance should remain Running with v2.x-only range, got: {status:?}"
);
}
#[tokio::test]
async fn two_runtimes_different_version_ranges_route_correctly() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "v1-inst", "RoutingOrch", semver::Version::new(1, 0, 0)).await;
common::seed_instance_with_pinned_version(&*store, "v2-inst", "RoutingOrch", semver::Version::new(2, 0, 0)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"RoutingOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("routed".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options_a = RuntimeOptions {
max_attempts: 50,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(1, 0, 0),
semver::Version::new(1, 99, 99),
)),
..Default::default()
};
let options_b = RuntimeOptions {
max_attempts: 50,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(2, 0, 0),
semver::Version::new(2, 99, 99),
)),
..Default::default()
};
let rt_a =
runtime::Runtime::start_with_options(store.clone(), activities.clone(), orchestrations.clone(), options_a)
.await;
let rt_b =
runtime::Runtime::start_with_options(store.clone(), activities.clone(), orchestrations.clone(), options_b)
.await;
let completed = common::wait_for_history(
store.clone(),
"v1-inst",
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(completed, "v1 instance should complete on Runtime A");
let completed = common::wait_for_history(
store.clone(),
"v2-inst",
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(completed, "v2 instance should complete on Runtime B");
rt_a.shutdown(None).await;
rt_b.shutdown(None).await;
}
#[tokio::test]
async fn sub_orchestration_gets_own_pinned_version() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"ParentOrch",
|ctx: duroxide::OrchestrationContext, _input: String| async move {
let sub_result = ctx.schedule_sub_orchestration("ChildOrch", "child-input").await?;
Ok(format!("parent-done:{sub_result}"))
},
)
.register(
"ChildOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("child-done".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("parent-24", "ParentOrch", "{}")
.await
.unwrap();
let status = client
.wait_for_orchestration("parent-24", Duration::from_secs(5))
.await
.unwrap();
match &status {
duroxide::OrchestrationStatus::Completed { output, .. } => {
assert!(
output.contains("child-done"),
"Parent output should include child result, got: {output}"
);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
let parent_hist = store.read("parent-24").await.unwrap();
let has_sub_scheduled = parent_hist
.iter()
.any(|e| matches!(&e.kind, EventKind::SubOrchestrationScheduled { .. }));
assert!(has_sub_scheduled, "Parent should have scheduled a sub-orchestration");
let child_instance = parent_hist
.iter()
.find_map(|e| {
if let EventKind::SubOrchestrationScheduled { instance, .. } = &e.kind {
Some(instance.clone())
} else {
None
}
})
.expect("Should find SubOrchestrationScheduled event");
let child_hist = store.read(&child_instance).await.unwrap_or_default();
if let Some(child_started) = child_hist
.iter()
.find(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
{
let child_pinned =
semver::Version::parse(&child_started.duroxide_version).expect("Child pinned version should be parseable");
let current = duroxide::current_build_version();
assert_eq!(
child_pinned, current,
"Sub-orchestration should be pinned at current build version"
);
}
}
#[tokio::test]
async fn runtime_logs_capability_declaration_at_startup() {
use tracing::Level;
let (captured, _guard) = common::tracing_capture::install_tracing_capture();
let (store, _td) = common::create_sqlite_store_disk().await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"LogOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_options(
store.clone(),
activities,
orchestrations,
RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
},
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
rt.shutdown(None).await;
let events = captured.lock().unwrap();
let startup_log = events.iter().find(|e| {
e.target.contains("duroxide::runtime")
&& e.level == Level::INFO
&& (e.message.contains("capability")
|| e.fields
.values()
.any(|v: &String| v.contains("capability") || v.contains("supported_range")))
});
assert!(
startup_log.is_some(),
"Runtime should log capability filter at startup. Captured {} events, targets: {:?}",
events.len(),
events.iter().map(|e| &e.target).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn overlapping_version_ranges_both_can_process() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "overlap-17", "OverlapOrch", semver::Version::new(2, 5, 0))
.await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"OverlapOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("overlap-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options_a = RuntimeOptions {
max_attempts: 50,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(1, 0, 0),
semver::Version::new(2, 99, 99),
)),
..Default::default()
};
let options_b = RuntimeOptions {
max_attempts: 50,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(2, 0, 0),
semver::Version::new(3, 99, 99),
)),
..Default::default()
};
let rt_a =
runtime::Runtime::start_with_options(store.clone(), activities.clone(), orchestrations.clone(), options_a)
.await;
let rt_b =
runtime::Runtime::start_with_options(store.clone(), activities.clone(), orchestrations.clone(), options_b)
.await;
let completed = common::wait_for_history(
store.clone(),
"overlap-17",
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(
completed,
"v2.5.0 instance should complete on either runtime (overlapping ranges)"
);
rt_a.shutdown(None).await;
rt_b.shutdown(None).await;
}
#[tokio::test]
async fn mixed_cluster_compatible_and_incompatible_items() {
let (store, _td) = common::create_sqlite_store_disk().await;
for i in 0..3 {
common::seed_instance_with_pinned_version(
&*store,
&format!("v1-mixed-{i}"),
"MixedOrch",
semver::Version::new(1, i as u64, 0),
)
.await;
}
for i in 0..2 {
common::seed_instance_with_pinned_version(
&*store,
&format!("v2-mixed-{i}"),
"MixedOrch",
semver::Version::new(2, i as u64, 0),
)
.await;
}
let orchestrations = OrchestrationRegistry::builder()
.register(
"MixedOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("mixed-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options_a = RuntimeOptions {
max_attempts: 50,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(1, 0, 0),
semver::Version::new(1, 99, 99),
)),
..Default::default()
};
let options_b = RuntimeOptions {
max_attempts: 50,
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(2, 0, 0),
semver::Version::new(2, 99, 99),
)),
..Default::default()
};
let rt_a =
runtime::Runtime::start_with_options(store.clone(), activities.clone(), orchestrations.clone(), options_a)
.await;
let rt_b =
runtime::Runtime::start_with_options(store.clone(), activities.clone(), orchestrations.clone(), options_b)
.await;
for i in 0..3 {
let inst = format!("v1-mixed-{i}");
let completed = common::wait_for_history(
store.clone(),
&inst,
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(completed, "{inst} should complete on Runtime A");
}
for i in 0..2 {
let inst = format!("v2-mixed-{i}");
let completed = common::wait_for_history(
store.clone(),
&inst,
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(completed, "{inst} should complete on Runtime B");
}
rt_a.shutdown(None).await;
rt_b.shutdown(None).await;
}
#[tokio::test]
async fn pinned_version_extracted_from_orchestration_started_event() {
let (store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*store, "pinned-21", "PinnedOrch", semver::Version::new(3, 1, 4)).await;
let orchestrations = OrchestrationRegistry::builder()
.register(
"PinnedOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("pinned-ok".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(0, 0, 0),
semver::Version::new(99, 99, 99),
)),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let completed = common::wait_for_history(
store.clone(),
"pinned-21",
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCompleted { .. }))
},
5000,
)
.await;
assert!(completed, "Pinned v3.1.4 instance should complete");
rt.shutdown(None).await;
let history = store.read("pinned-21").await.unwrap();
let started = history
.iter()
.find(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. }))
.expect("Should have OrchestrationStarted event");
let pinned = semver::Version::parse(&started.duroxide_version).expect("Should parse pinned version");
assert_eq!(
pinned,
semver::Version::new(3, 1, 4),
"Pinned version should be (3,1,4), got {pinned:?}"
);
}
#[tokio::test]
async fn activity_completion_after_continue_as_new_is_discarded() {
use duroxide::ActivityContext;
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = ActivityRegistry::builder()
.register("SlowActivity", |_ctx: ActivityContext, _input: String| async move {
Ok("slow-result".to_string())
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"CANOrch",
|ctx: duroxide::OrchestrationContext, input: String| async move {
match input.as_str() {
"start" => ctx.continue_as_new("second".to_string()).await,
_ => {
let _result = ctx.schedule_wait("done_signal").await;
Ok("can-complete".to_string())
}
}
},
)
.build();
let options = RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let client = Client::new(store.clone());
client.start_orchestration("can-25", "CANOrch", "start").await.unwrap();
let subscribed = common::wait_for_subscription(store.clone(), "can-25", "done_signal", 5000).await;
assert!(subscribed, "Second execution should subscribe to done_signal");
store
.enqueue_for_orchestrator(
WorkItem::ActivityCompleted {
instance: "can-25".to_string(),
execution_id: INITIAL_EXECUTION_ID, id: 999,
result: "stale-result".to_string(),
},
None,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let history = store.read("can-25").await.unwrap();
let has_stale = history.iter().any(|e| match &e.kind {
EventKind::ActivityCompleted { result, .. } => result == "stale-result",
_ => false,
});
assert!(
!has_stale,
"Stale activity completion from old execution should be discarded"
);
client.raise_event("can-25", "done_signal", "go").await.unwrap();
let status = client
.wait_for_orchestration("can-25", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, duroxide::OrchestrationStatus::Completed { .. }),
"Orchestration should complete normally after CAN, got: {status:?}"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn wide_supported_range_drains_stuck_items_via_deserialization_error() {
use duroxide::providers::sqlite::SqliteProvider;
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("drain-test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let sqlite = SqliteProvider::new(&db_url, None).await.unwrap();
let store: Arc<dyn Provider> = Arc::new(SqliteProvider::new(&db_url, None).await.unwrap());
common::seed_instance_with_pinned_version(&*store, "drain-29", "DrainOrch", semver::Version::new(99, 0, 0)).await;
sqlx::query("UPDATE history SET event_data = '{\"kind\":\"UnknownFutureEvent\",\"data\":\"garbage\"}' WHERE instance_id = 'drain-29'")
.execute(sqlite.get_pool())
.await
.expect("Failed to corrupt history");
let orchestrations = OrchestrationRegistry::builder()
.register(
"DrainOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("should-not-reach".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 3,
dispatcher_min_poll_interval: Duration::from_millis(10),
orchestrator_lock_timeout: Duration::from_millis(500),
supported_replay_versions: Some(SemverRange::new(
semver::Version::new(0, 0, 0),
semver::Version::new(99, 99, 99),
)),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let mgmt = sqlite.as_management_capability().expect("management capability");
let drained = tokio::time::timeout(Duration::from_secs(10), async {
loop {
if let Ok(info) = mgmt.get_instance_info("drain-29").await
&& info.status == "Failed"
{
return true;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.unwrap_or(false);
assert!(
drained,
"Drain procedure should terminate the orchestration as Failed via poison path"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn runtime_logs_warning_on_incompatible_abandon() {
use tracing::Level;
let (captured, _guard) = common::tracing_capture::install_tracing_capture();
let (inner_store, _td) = common::create_sqlite_store_disk().await;
common::seed_instance_with_pinned_version(&*inner_store, "warn-30", "WarnOrch", semver::Version::new(99, 0, 0))
.await;
let bypass_store: Arc<dyn Provider> = Arc::new(common::fault_injection::FilterBypassProvider::new(inner_store));
let orchestrations = OrchestrationRegistry::builder()
.register(
"WarnOrch",
|_ctx: duroxide::OrchestrationContext, _input: String| async move { Ok("should-not-reach".to_string()) },
)
.build();
let activities = ActivityRegistry::builder().build();
let options = RuntimeOptions {
max_attempts: 5,
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(bypass_store.clone(), activities, orchestrations, options).await;
tokio::time::sleep(Duration::from_secs(3)).await;
rt.shutdown(None).await;
let events = captured.lock().unwrap();
let warn_log = events.iter().find(|e| {
e.target.contains("duroxide::runtime")
&& e.level == Level::WARN
&& (e.message.contains("incompatible version")
|| e.fields.values().any(|v| v.contains("incompatible version")))
});
assert!(
warn_log.is_some(),
"Runtime should log warning when abandoning incompatible item via defense-in-depth. \
Captured {} events. WARN events: {:?}",
events.len(),
events
.iter()
.filter(|e| e.level == Level::WARN)
.map(|e| &e.message)
.collect::<Vec<_>>()
);
let warn = warn_log.unwrap();
let all_fields: String = format!("{} {:?}", warn.message, warn.fields);
assert!(
all_fields.contains("warn-30") || warn.fields.values().any(|v| v.contains("warn-30")),
"Warning should reference instance ID 'warn-30', got: {all_fields}"
);
}