#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
mod common;
use duroxide::providers::Provider;
use duroxide::runtime::{self, OrchestrationStatus, limits, registry::ActivityRegistry};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry};
use std::sync::Arc;
use std::time::Duration;
#[tokio::test]
async fn custom_status_set_visible_on_completion() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register("SetStatus", |ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("step-1");
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-set", "SetStatus", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-set", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
output,
custom_status,
custom_status_version,
} => {
assert_eq!(output, "done");
assert_eq!(custom_status, Some("step-1".to_string()));
assert!(custom_status_version >= 1, "version should be >= 1");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_reset_clears_to_none() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register("ResetStatus", |ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("temporary");
ctx.reset_custom_status();
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-reset", "ResetStatus", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-reset", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
custom_status,
custom_status_version,
..
} => {
assert_eq!(custom_status, None, "reset should clear to None");
assert!(custom_status_version >= 1);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_last_write_wins() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register("LastWrite", |ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("first");
ctx.set_custom_status("second");
ctx.set_custom_status("third");
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-lww", "LastWrite", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-lww", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed { custom_status, .. } => {
assert_eq!(custom_status, Some("third".to_string()));
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_persists_across_turns() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("MultiTurn", |ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("processing");
let result = ctx.schedule_activity("Echo", "hello").await?;
Ok(result)
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-persist", "MultiTurn", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-persist", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
output, custom_status, ..
} => {
assert_eq!(output, "hello");
assert_eq!(custom_status, Some("processing".to_string()));
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_updated_in_later_turn() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("UpdateStatus", |ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("step-1");
ctx.schedule_activity("Echo", "a").await?;
ctx.set_custom_status("step-2");
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("cs-update", "UpdateStatus", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("cs-update", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed { custom_status, .. } => {
assert_eq!(custom_status, Some("step-2".to_string()));
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_none_when_not_set() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register("NoStatus", |_ctx: OrchestrationContext, _input: String| async move {
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-none", "NoStatus", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-none", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
custom_status,
custom_status_version,
..
} => {
assert_eq!(custom_status, None);
assert_eq!(custom_status_version, 0);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_visible_on_failure() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"FailWithStatus",
|ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("about-to-fail");
Err("boom".to_string())
},
)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("cs-fail", "FailWithStatus", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("cs-fail", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Failed { custom_status, .. } => {
assert_eq!(custom_status, Some("about-to-fail".to_string()));
}
other => panic!("Expected Failed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_version_increments() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("VersionIncr", |ctx: OrchestrationContext, _input: String| async move {
ctx.set_custom_status("v1");
ctx.schedule_activity("Echo", "a").await?;
ctx.set_custom_status("v2");
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-ver", "VersionIncr", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-ver", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
custom_status_version, ..
} => {
assert!(
custom_status_version >= 2,
"Expected version >= 2, got {custom_status_version}"
);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_exceeding_size_limit_fails_orchestration() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"OversizedStatus",
|ctx: OrchestrationContext, _input: String| async move {
let oversized = "x".repeat(limits::MAX_CUSTOM_STATUS_BYTES + 1);
ctx.set_custom_status(oversized);
Ok("should-not-reach".to_string())
},
)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("cs-oversize", "OversizedStatus", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("cs-oversize", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Failed { details, .. } => {
let msg = format!("{details:?}");
assert!(
msg.contains("Custom status size"),
"Expected size limit error, got: {msg}"
);
assert!(msg.contains("exceeds limit"), "Expected size limit error, got: {msg}");
}
other => panic!("Expected Failed due to size limit, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_at_size_limit_succeeds() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register("ExactLimit", |ctx: OrchestrationContext, _input: String| async move {
let exactly_at_limit = "x".repeat(limits::MAX_CUSTOM_STATUS_BYTES);
ctx.set_custom_status(exactly_at_limit);
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-exact", "ExactLimit", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-exact", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "done");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_persists_across_continue_as_new() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let store_for_activity = store.clone();
let activities = ActivityRegistry::builder()
.register("ReadStatus", move |_ctx: ActivityContext, instance: String| {
let s = store_for_activity.clone();
async move {
let result = s.get_custom_status(&instance, 0).await.unwrap();
match result {
Some((Some(status), version)) => Ok(format!("{status}@v{version}")),
Some((None, version)) => Ok(format!("null@v{version}")),
None => Ok("none".to_string()),
}
}
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("CanStatus", |ctx: OrchestrationContext, input: String| async move {
let n: u32 = input.parse().unwrap_or(0);
match n {
0 => {
ctx.set_custom_status("step-A");
ctx.continue_as_new("1".to_string()).await
}
1 => {
let status_snapshot = ctx
.schedule_activity("ReadStatus", "cs-can")
.await
.expect("ReadStatus activity failed");
assert_eq!(status_snapshot, "step-A@v1", "step-A should be visible in execution 2");
ctx.continue_as_new("2".to_string()).await
}
_ => {
ctx.set_custom_status("step-B");
Ok("done".to_string())
}
}
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-can", "CanStatus", "0").await.unwrap();
let status = client
.wait_for_orchestration("cs-can", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
output,
custom_status,
custom_status_version,
} => {
assert_eq!(output, "done");
assert_eq!(custom_status, Some("step-B".to_string()));
assert_eq!(custom_status_version, 2, "two set_custom_status calls total");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_reset_across_continue_as_new() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder().build();
let orchestrations = OrchestrationRegistry::builder()
.register("CanReset", |ctx: OrchestrationContext, input: String| async move {
let n: u32 = input.parse().unwrap_or(0);
if n == 0 {
ctx.set_custom_status("foo");
ctx.continue_as_new("1".to_string()).await
} else {
ctx.reset_custom_status();
Ok("done".to_string())
}
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("cs-can-reset", "CanReset", "0")
.await
.unwrap();
let status = client
.wait_for_orchestration("cs-can-reset", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
custom_status,
custom_status_version,
..
} => {
assert_eq!(custom_status, None, "reset should clear even across CAN");
assert_eq!(custom_status_version, 2, "set + reset = version 2");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_get_reflects_set_across_turns() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("GetterTest", |ctx: OrchestrationContext, _input: String| async move {
assert_eq!(ctx.get_custom_status(), None, "initial should be None");
ctx.set_custom_status("step-1");
assert_eq!(
ctx.get_custom_status(),
Some("step-1".to_string()),
"should reflect set immediately"
);
let _ = ctx.schedule_activity("Echo", "ping").await?;
assert_eq!(
ctx.get_custom_status(),
Some("step-1".to_string()),
"should survive replay across turns"
);
ctx.set_custom_status("step-2");
assert_eq!(
ctx.get_custom_status(),
Some("step-2".to_string()),
"should reflect second set"
);
Ok("done".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-getter", "GetterTest", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-getter", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
custom_status,
custom_status_version,
..
} => {
assert_eq!(custom_status, Some("step-2".to_string()));
assert_eq!(custom_status_version, 2);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_get_reflects_carry_forward_after_can() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("CANGetter", |ctx: OrchestrationContext, input: String| async move {
let iteration: u32 = input.parse().unwrap_or(0);
match iteration {
0 => {
assert_eq!(ctx.get_custom_status(), None, "iter 0: initial should be None");
ctx.set_custom_status("from-first");
assert_eq!(
ctx.get_custom_status(),
Some("from-first".to_string()),
"iter 0: should reflect set"
);
ctx.continue_as_new("1").await?;
Ok("unreachable".to_string())
}
1 => {
assert_eq!(
ctx.get_custom_status(),
Some("from-first".to_string()),
"iter 1: should carry forward from iter 0"
);
ctx.reset_custom_status();
assert_eq!(ctx.get_custom_status(), None, "iter 1: reset should clear");
ctx.continue_as_new("2").await?;
Ok("unreachable".to_string())
}
2 => {
assert_eq!(
ctx.get_custom_status(),
None,
"iter 2: reset in iter 1 should carry forward as None"
);
ctx.set_custom_status("from-third");
ctx.continue_as_new("3").await?;
Ok("unreachable".to_string())
}
_ => {
assert_eq!(
ctx.get_custom_status(),
Some("from-third".to_string()),
"iter 3: should carry forward from iter 2"
);
let _ = ctx.schedule_activity("Echo", "ping").await?;
assert_eq!(
ctx.get_custom_status(),
Some("from-third".to_string()),
"iter 3: carried value should survive replay"
);
Ok("done".to_string())
}
}
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("cs-can-getter", "CANGetter", "0")
.await
.unwrap();
let status = client
.wait_for_orchestration("cs-can-getter", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed {
custom_status,
custom_status_version,
..
} => {
assert_eq!(custom_status, Some("from-third".to_string()));
assert_eq!(custom_status_version, 3);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_status_size_limit_precise_boundary() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("BelowLimit", |ctx: OrchestrationContext, _input: String| async move {
let below = "x".repeat(limits::MAX_CUSTOM_STATUS_BYTES - 1);
ctx.set_custom_status(below);
Ok("ok".to_string())
})
.register("AtLimit", |ctx: OrchestrationContext, _input: String| async move {
let exact = "x".repeat(limits::MAX_CUSTOM_STATUS_BYTES);
ctx.set_custom_status(exact);
Ok("ok".to_string())
})
.register("AboveLimit", |ctx: OrchestrationContext, _input: String| async move {
let above = "x".repeat(limits::MAX_CUSTOM_STATUS_BYTES + 1);
ctx.set_custom_status(above);
Ok("should-not-reach".to_string())
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("cs-below", "BelowLimit", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-below", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "ok", "one byte below limit should succeed");
}
other => panic!("Expected Completed for below-limit, got: {other:?}"),
}
client.start_orchestration("cs-at", "AtLimit", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-at", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "ok", "exactly at limit should succeed");
}
other => panic!("Expected Completed for at-limit, got: {other:?}"),
}
client.start_orchestration("cs-above", "AboveLimit", "").await.unwrap();
let status = client
.wait_for_orchestration("cs-above", Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Failed { details, .. } => {
let msg = details.display_message();
assert!(
msg.contains("Custom status size") && msg.contains("exceeds limit"),
"Expected size limit error, got: {msg}"
);
}
other => panic!("Expected Failed for above-limit, got: {other:?}"),
}
rt.shutdown(None).await;
}