#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::EventKind;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, Client, Either2, OrchestrationContext, OrchestrationRegistry};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
mod common;
#[tokio::test]
async fn sample_hello_world_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Hello", |ctx: ActivityContext, input: String| async move {
ctx.trace_info("Hello activity started");
let greeting = format!("Hello, {input}!");
ctx.trace_info(format!("Hello activity completed -> {greeting}"));
Ok(greeting)
})
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("hello_world started");
let res = ctx.schedule_activity("Hello", "Rust").await?;
ctx.trace_info(format!("hello_world result={res} "));
let res1 = ctx.schedule_activity("Hello", input).await?;
ctx.trace_info(format!("hello_world result={res1} "));
Ok(res1)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("HelloWorld", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-hello-1", "HelloWorld", "World")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-hello-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "Hello, World!"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_basic_control_flow_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("GetFlag", |_ctx: ActivityContext, _input: String| async move {
Ok("yes".to_string())
})
.register("SayYes", |_ctx: ActivityContext, _in: String| async move {
Ok("picked_yes".to_string())
})
.register("SayNo", |_ctx: ActivityContext, _in: String| async move {
Ok("picked_no".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let flag = ctx.schedule_activity("GetFlag", "").await.unwrap();
ctx.trace_info(format!("control_flow flag decided = {flag}"));
if flag == "yes" {
Ok(ctx.schedule_activity("SayYes", "").await.unwrap())
} else {
Ok(ctx.schedule_activity("SayNo", "").await.unwrap())
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ControlFlow", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-cflow-1", "ControlFlow", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-cflow-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "picked_yes"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_loop_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Append", |_ctx: ActivityContext, input: String| async move {
Ok(format!("{input}x"))
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let mut acc = String::from("start");
for i in 0..3 {
acc = ctx.schedule_activity("Append", acc).await.unwrap();
ctx.trace_info(format!("loop iteration {i} completed acc={acc}"));
}
Ok(acc)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("LoopOrchestration", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-loop-1", "LoopOrchestration", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-loop-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "startxxx"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_error_handling_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Fragile", |_ctx: ActivityContext, input: String| async move {
if input == "bad" {
Err("boom".to_string())
} else {
Ok("ok".to_string())
}
})
.register("Recover", |_ctx: ActivityContext, _input: String| async move {
Ok("recovered".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
match ctx.schedule_activity("Fragile", "bad").await {
Ok(v) => {
ctx.trace_info(format!("fragile succeeded value={v}"));
Ok(v)
}
Err(e) => {
ctx.trace_warn(format!("fragile failed error={e}"));
let rec = ctx.schedule_activity("Recover", "").await.unwrap();
if rec != "recovered" {
ctx.trace_error(format!("unexpected recovery value={rec}"));
}
Ok(rec)
}
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ErrorHandling", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-err-1", "ErrorHandling", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-err-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "recovered"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_timeout_with_timer_race_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("LongOp", |ctx: ActivityContext, _input: String| async move {
ctx.trace_info("LongOp started");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
ctx.trace_info("LongOp finished");
Ok("done".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let act = ctx.schedule_activity("LongOp", "");
let t = async {
ctx.schedule_timer(Duration::from_millis(100)).await;
Err::<String, String>("timeout".into())
};
let (idx, out) = ctx.select2(act, t).await.into_tuple();
match idx {
0 => out,
1 => out,
_ => unreachable!(),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("TimeoutSample", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-timeout-sample", "TimeoutSample", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-timeout-sample", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => assert_eq!(details.display_message(), "timeout"),
runtime::OrchestrationStatus::Completed { output, .. } => panic!("expected timeout failure, got: {output}"),
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_select2_activity_vs_external_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Sleep", |ctx: ActivityContext, _input: String| async move {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
ctx.trace_info("Sleep activity finished");
Ok("slept".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let act = ctx.schedule_activity("Sleep", "");
let evt = async { Ok::<String, String>(ctx.schedule_wait("Go").await) };
let (idx, out) = ctx.select2(act, evt).await.into_tuple();
match idx {
0 => out.map(|s| format!("activity:{s}")),
1 => out.map(|payload| format!("event:{payload}")),
_ => unreachable!(),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Select2ActVsEvt", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let store_for_wait = store.clone();
tokio::spawn(async move {
let sfw = store_for_wait.clone();
let _ = common::wait_for_subscription(sfw.clone(), "inst-s2-mixed", "Go", 1000).await;
let client = Client::new(sfw);
let _ = client.raise_event("inst-s2-mixed", "Go", "ok").await;
});
let client = Client::new(store.clone());
client
.start_orchestration("inst-s2-mixed", "Select2ActVsEvt", "")
.await
.unwrap();
let s = match client
.wait_for_orchestration("inst-s2-mixed", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert_eq!(s, "event:ok");
rt.shutdown(None).await;
}
#[tokio::test]
async fn dtf_legacy_gabbar_greetings_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Greetings", |ctx: ActivityContext, input: String| async move {
ctx.trace_info("Greeting activity started");
ctx.trace_debug(format!("Original input: {input}"));
let output = format!("Hello, {input}!");
ctx.trace_info(format!("Greeting activity completed -> {output}"));
Ok(output)
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let a = ctx.schedule_activity("Greetings", "Gabbar");
let b = ctx.schedule_activity("Greetings", "Samba");
let outs = ctx.join(vec![a, b]).await;
let mut vals: Vec<String> = outs
.into_iter()
.map(|o| match o {
Ok(s) => s,
Err(e) => panic!("activity failed: {e}"),
})
.collect();
vals.sort();
Ok(format!("{}, {}", vals[0].clone(), vals[1].clone()))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Greetings", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-dtf-greetings", "Greetings", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-dtf-greetings", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "Hello, Gabbar!, Hello, Samba!"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_system_activities_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder().build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let now = ctx.utc_now().await?;
let guid = ctx.new_guid().await?;
let now_ms = now
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| e.to_string())?
.as_millis() as u64;
ctx.trace_info(format!("system now={now_ms}ms, guid={guid}"));
Ok(format!("n={now_ms},g={guid}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SystemActivities", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-system-acts", "SystemActivities", "")
.await
.unwrap();
let out = match client
.wait_for_orchestration("inst-system-acts", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert!(out.contains("n=") && out.contains(",g="));
let parts: Vec<&str> = out.split([',', '=']).collect();
assert!(parts.len() >= 4);
let now_val: u64 = parts[1].parse().unwrap_or(0);
let guid_str = parts[3];
assert!(now_val > 0);
assert_eq!(guid_str.len(), 36);
assert!(guid_str.chars().filter(|c| *c != '-').all(|c| c.is_ascii_hexdigit()));
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_status_polling_fs() {
use duroxide::OrchestrationStatus;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder().build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(20)).await;
Ok("done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("StatusSample", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-status-sample", "StatusSample", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-status-sample", std::time::Duration::from_secs(2))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "done"),
OrchestrationStatus::Failed { details, .. } => panic!("unexpected failure: {}", details.display_message()),
_ => unreachable!(),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_sub_orchestration_basic_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Upper", |ctx: ActivityContext, input: String| async move {
ctx.trace_info("Upper activity converting string");
let result = input.to_uppercase();
ctx.trace_info(format!("Upper activity result -> {result}"));
Ok(result)
})
.build();
let child_upper = |ctx: OrchestrationContext, input: String| async move {
let up = ctx.schedule_activity("Upper", input).await.unwrap();
Ok(up)
};
let parent = |ctx: OrchestrationContext, input: String| async move {
let r = ctx.schedule_sub_orchestration("ChildUpper", input).await.unwrap();
Ok(format!("parent:{r}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildUpper", child_upper)
.register("Parent", parent)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-basic", "Parent", "hi")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-basic", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "parent:HI"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_sub_orchestration_fanout_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Add", |_ctx: ActivityContext, input: String| async move {
let mut it = input.split(',');
let a = it.next().unwrap_or("0").parse::<i64>().unwrap_or(0);
let b = it.next().unwrap_or("0").parse::<i64>().unwrap_or(0);
Ok((a + b).to_string())
})
.build();
let child_sum = |ctx: OrchestrationContext, input: String| async move {
let s = ctx.schedule_activity("Add", input).await.unwrap();
Ok(s)
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let a = ctx.schedule_sub_orchestration("ChildSum", "1,2");
let b = ctx.schedule_sub_orchestration("ChildSum", "3,4");
let outs = ctx.join(vec![a, b]).await;
let mut nums: Vec<i64> = outs
.into_iter()
.map(|o| match o {
Ok(s) => s.parse::<i64>().unwrap(),
Err(e) => panic!("child failed: {e}"),
})
.collect();
let total: i64 = nums.drain(..).sum();
Ok(format!("total={total}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildSum", child_sum)
.register("ParentFan", parent)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-fan", "ParentFan", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-fan", std::time::Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "total=10"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_sub_orchestration_chained_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("AppendX", |_ctx: ActivityContext, input: String| async move {
Ok(format!("{input}x"))
})
.build();
let leaf = |ctx: OrchestrationContext, input: String| async move {
Ok(ctx.schedule_activity("AppendX", input).await.unwrap())
};
let mid = |ctx: OrchestrationContext, input: String| async move {
let r = ctx.schedule_sub_orchestration("Leaf", input).await.unwrap();
Ok(format!("{r}-mid"))
};
let root = |ctx: OrchestrationContext, input: String| async move {
let r = ctx.schedule_sub_orchestration("Mid", input).await.unwrap();
Ok(format!("root:{r}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Leaf", leaf)
.register("Mid", mid)
.register("Root", root)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client.start_orchestration("inst-sub-chain", "Root", "a").await.unwrap();
match client
.wait_for_orchestration("inst-sub-chain", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "root:ax-mid"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_detached_orchestration_scheduling_fs() {
use duroxide::OrchestrationStatus;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let chained = |ctx: OrchestrationContext, input: String| async move {
ctx.schedule_timer(Duration::from_millis(5)).await;
Ok(ctx.schedule_activity("Echo", input).await.unwrap())
};
let coordinator = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_orchestration("Chained", "W1", "A");
ctx.schedule_orchestration("Chained", "W2", "B");
Ok("scheduled".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Chained", chained)
.register("Coordinator", coordinator)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("CoordinatorRoot", "Coordinator", "")
.await
.unwrap();
match client
.wait_for_orchestration("CoordinatorRoot", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "scheduled"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let insts = vec!["W1".to_string(), "W2".to_string()];
for inst in insts {
match client
.wait_for_orchestration(&inst, std::time::Duration::from_secs(5))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => {
assert!(output == "A" || output == "B");
}
OrchestrationStatus::Failed { details, .. } => {
panic!("scheduled orchestration failed: {}", details.display_message())
}
_ => unreachable!(),
}
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_detached_then_activity_fs() {
use duroxide::OrchestrationStatus;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) })
.build();
let child = |ctx: OrchestrationContext, input: String| async move {
ctx.schedule_timer(Duration::from_millis(5)).await;
Ok(format!("child-{input}"))
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_orchestration("Child", "detached-child", "payload");
let result = ctx.schedule_activity("Echo", "hello").await?;
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Child", child)
.register("Parent", parent)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("ParentInstance", "Parent", "")
.await
.unwrap();
match client
.wait_for_orchestration("ParentInstance", std::time::Duration::from_secs(5))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "hello"),
OrchestrationStatus::Failed { details, .. } => {
panic!("parent orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
match client
.wait_for_orchestration("detached-child", std::time::Duration::from_secs(5))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "child-payload"),
OrchestrationStatus::Failed { details, .. } => {
panic!("child orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected child status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_continue_as_new_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder().build();
let orch = |ctx: OrchestrationContext, input: String| async move {
let n: u32 = input.parse().unwrap_or(0);
if n < 3 {
ctx.trace_info(format!("CAN sample n={n} -> continue"));
return ctx.continue_as_new((n + 1).to_string()).await;
} else {
Ok(format!("final:{n}"))
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("CanSample", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-can", "CanSample", "0")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-can", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "final:3"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let mgmt = store.as_management_capability().expect("ProviderAdmin required");
let execs = mgmt.list_executions("inst-sample-can").await.unwrap_or_default();
assert_eq!(execs, vec![1, 2, 3, 4]);
rt.shutdown(None).await;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct AddReq {
a: i32,
b: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct AddRes {
sum: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct Ack {
ok: bool,
}
#[tokio::test]
async fn sample_typed_activity_and_orchestration_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.build();
let orchestration = |ctx: OrchestrationContext, req: AddReq| async move {
let out: AddRes = ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &req).await?;
Ok(out)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register_typed::<AddReq, AddRes, _, _>("Adder", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration_typed::<AddReq>("inst-typed-add", "Adder", AddReq { a: 2, b: 3 })
.await
.unwrap();
match client
.wait_for_orchestration_typed::<AddRes>("inst-typed-add", std::time::Duration::from_secs(5))
.await
.unwrap()
{
Ok(result) => assert_eq!(result, AddRes { sum: 5 }),
Err(error) => panic!("orchestration failed: {error}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_typed_event_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder().build();
let orch = |ctx: OrchestrationContext, _in: ()| async move {
let ack: Ack = ctx.schedule_wait_typed::<Ack>("Ready").await;
Ok::<_, String>(serde_json::to_string(&ack).unwrap())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register_typed::<(), String, _, _>("WaitAck", orch)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let store_for_wait = store.clone();
tokio::spawn(async move {
let sfw = store_for_wait.clone();
let _ = common::wait_for_subscription(sfw.clone(), "inst-typed-ack", "Ready", 1000).await;
let payload = serde_json::to_string(&Ack { ok: true }).unwrap();
let client = Client::new(sfw);
let _ = client.raise_event("inst-typed-ack", "Ready", payload).await;
});
let client = Client::new(store.clone());
client
.start_orchestration_typed::<()>("inst-typed-ack", "WaitAck", ())
.await
.unwrap();
match client
.wait_for_orchestration_typed::<String>("inst-typed-ack", std::time::Duration::from_secs(5))
.await
.unwrap()
{
Ok(result) => assert_eq!(result, serde_json::to_string(&Ack { ok: true }).unwrap()),
Err(error) => panic!("orchestration failed: {error}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_mixed_string_and_typed_typed_orch_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Upper", |_ctx: ActivityContext, input: String| async move {
Ok(input.to_uppercase())
})
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.build();
let orch = |ctx: OrchestrationContext, req: AddReq| async move {
let f_typed = async {
let res: Result<AddRes, String> = ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &req).await;
res.map(|r| format!("sum={}", r.sum))
};
let f_str = async {
let res: Result<String, String> = ctx.schedule_activity("Upper", "hello").await;
res.map(|s| format!("up={s}"))
};
let (_idx, out) = ctx.select2(f_typed, f_str).await.into_tuple();
out
};
let orchestration_registry = OrchestrationRegistry::builder()
.register_typed::<AddReq, String, _, _>("MixedTypedOrch", orch)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration_typed::<AddReq>("inst-mixed-typed", "MixedTypedOrch", AddReq { a: 1, b: 2 })
.await
.unwrap();
let client = Client::new(store.clone());
let s = match client
.wait_for_orchestration_typed::<String>("inst-mixed-typed", std::time::Duration::from_secs(5))
.await
.unwrap()
{
Ok(result) => result,
Err(error) => panic!("orchestration failed: {error}"),
};
assert!(s == "sum=3" || s == "up=HELLO");
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_mixed_string_and_typed_string_orch_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Upper", |_ctx: ActivityContext, input: String| async move {
Ok(input.to_uppercase())
})
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.build();
let orch = |ctx: OrchestrationContext, _in: String| async move {
let f_typed = async {
let res: Result<AddRes, String> = ctx
.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 5, b: 7 })
.await;
res.map(|r| format!("sum={}", r.sum))
};
let f_str = async {
let res: Result<String, String> = ctx.schedule_activity("Upper", "race").await;
res.map(|s| format!("up={s}"))
};
let (_idx, out) = ctx.select2(f_typed, f_str).await.into_tuple();
out
};
let orch_reg = OrchestrationRegistry::builder()
.register("MixedStringOrch", orch)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orch_reg).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-mixed-string", "MixedStringOrch", "")
.await
.unwrap();
let s = match client
.wait_for_orchestration("inst-mixed-string", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert!(s == "sum=12" || s == "up=RACE");
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_versioning_start_latest_vs_exact_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let v1 = |_: OrchestrationContext, _in: String| async move { Ok("v1".to_string()) };
let v2 = |_: OrchestrationContext, _in: String| async move { Ok("v2".to_string()) };
let reg = OrchestrationRegistry::builder()
.register("Versioned", v1)
.register_versioned("Versioned", "2.0.0", v2)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg.clone()).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-vers-latest", "Versioned", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-vers-latest", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "v2"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
reg.set_version_policy(
"Versioned",
duroxide::runtime::VersionPolicy::Exact(semver::Version::parse("1.0.0").unwrap()),
);
client
.start_orchestration("inst-vers-exact", "Versioned", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-vers-exact", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "v1"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_versioning_sub_orchestration_explicit_vs_policy_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let child_v1 = |_: OrchestrationContext, _in: String| async move { Ok("c1".to_string()) };
let child_v2 = |_: OrchestrationContext, _in: String| async move { Ok("c2".to_string()) };
let parent = |ctx: OrchestrationContext, _in: String| async move {
let a = ctx
.schedule_sub_orchestration_versioned("Child", Some("1.0.0".to_string()), "exp")
.await
.unwrap();
let b = ctx.schedule_sub_orchestration("Child", "pol").await.unwrap();
Ok(format!("{a}-{b}"))
};
let reg = OrchestrationRegistry::builder()
.register("ParentVers", parent)
.register("Child", child_v1)
.register_versioned("Child", "2.0.0", child_v2)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-vers", "ParentVers", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-vers", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "c1-c2"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_versioning_continue_as_new_upgrade_fs() {
use duroxide::OrchestrationStatus;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let v1 = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("v1: upgrading via ContinueAsNew (default policy)".to_string());
return ctx.continue_as_new(format!("v1:{input}")).await;
};
let v2 = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info(format!("v2: resumed with input={input}"));
Ok(format!("upgraded:{input}"))
};
let reg = OrchestrationRegistry::builder()
.register("LongRunner", v1) .register_versioned("LongRunner", "2.0.0", v2)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = Client::new(store.clone());
client
.start_orchestration_versioned("inst-can-upgrade", "LongRunner", "1.0.0", "state")
.await
.unwrap();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
match client.get_orchestration_status("inst-can-upgrade").await.unwrap() {
OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "upgraded:v1:state");
break;
}
OrchestrationStatus::Failed { details, .. } => panic!("unexpected failure: {}", details.display_message()),
_ if std::time::Instant::now() < deadline => tokio::time::sleep(std::time::Duration::from_millis(10)).await,
_ => panic!("timeout waiting for upgraded completion"),
}
}
let mgmt2 = store.as_management_capability().expect("ProviderAdmin required");
let execs = mgmt2.list_executions("inst-can-upgrade").await.unwrap_or_default();
assert_eq!(execs, vec![1, 2]);
let e1 = mgmt2
.read_history_with_execution_id("inst-can-upgrade", 1)
.await
.unwrap_or_default();
assert!(
e1.iter()
.any(|e| matches!(&e.kind, duroxide::EventKind::OrchestrationContinuedAsNew { .. }))
);
let e2 = mgmt2
.read_history_with_execution_id("inst-can-upgrade", 2)
.await
.unwrap_or_default();
assert!(
e2.iter()
.any(|e| matches!(&e.kind, duroxide::EventKind::OrchestrationStarted { input, .. } if input == "v1:state"))
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_cancellation_parent_cascades_to_children_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let child = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_wait("Go").await;
Ok("done".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_sub_orchestration("ChildSample", "seed").await?;
Ok::<_, String>("parent_done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildSample", child)
.register("ParentSample", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-cancel", "ParentSample", "")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = client.cancel_instance("inst-sample-cancel", "user_request").await;
let ok = common::wait_for_history(
store.clone(),
"inst-sample-cancel",
|hist| {
hist.iter().rev().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "user_request"
)
)
})
},
5_000,
)
.await;
assert!(ok, "timeout waiting for parent cancel failure");
let mgmt = store.as_management_capability().expect("ProviderAdmin required");
let children: Vec<String> = mgmt
.list_instances()
.await
.unwrap_or_default()
.into_iter()
.filter(|i| i.starts_with("inst-sample-cancel::"))
.collect();
assert!(!children.is_empty());
for child in children {
let ok_child = common::wait_for_history(
store.clone(),
&child,
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }))
&& hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "parent canceled"
)
)
})
},
5_000,
)
.await;
assert!(ok_child, "timeout waiting for child cancel for {child}");
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_basic_error_handling_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("ValidateInput", |_ctx: ActivityContext, input: String| async move {
if input.is_empty() {
Err("Input cannot be empty".to_string())
} else {
Ok(format!("Valid: {input}"))
}
})
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("Starting validation");
let result = ctx.schedule_activity("ValidateInput", input).await?;
ctx.trace_info(format!("Validation result: {result}"));
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("BasicErrorHandling", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-basic-error-1", "BasicErrorHandling", "test")
.await
.unwrap();
match client
.wait_for_orchestration("inst-basic-error-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Valid: test");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
client
.start_orchestration("inst-basic-error-2", "BasicErrorHandling", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-basic-error-2", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(details.display_message().contains("Input cannot be empty"));
}
runtime::OrchestrationStatus::Completed { output, .. } => panic!("Expected failure but got success: {output}"),
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_nested_function_error_handling_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("ProcessData", |_ctx: ActivityContext, input: String| async move {
if input.contains("error") {
Err("Processing failed".to_string())
} else {
Ok(format!("Processed: {input}"))
}
})
.register("FormatOutput", |_ctx: ActivityContext, input: String| async move {
Ok(format!("Final: {input}"))
})
.build();
async fn process_and_format(ctx: &OrchestrationContext, data: &str) -> Result<String, String> {
ctx.trace_info("Starting processing");
let processed = ctx.schedule_activity("ProcessData", data.to_string()).await?;
ctx.trace_info("Starting formatting");
let formatted = ctx.schedule_activity("FormatOutput", processed).await?;
Ok(formatted)
}
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("Starting orchestration");
let result = process_and_format(&ctx, &input).await?;
ctx.trace_info("Orchestration completed");
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("NestedErrorHandling", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-nested-error-1", "NestedErrorHandling", "test")
.await
.unwrap();
match client
.wait_for_orchestration("inst-nested-error-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Final: Processed: test");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
client
.start_orchestration("inst-nested-error-2", "NestedErrorHandling", "error")
.await
.unwrap();
match client
.wait_for_orchestration("inst-nested-error-2", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(details.display_message().contains("Processing failed"));
}
runtime::OrchestrationStatus::Completed { output, .. } => panic!("Expected failure but got success: {output}"),
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_error_recovery_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("ProcessData", |_ctx: ActivityContext, input: String| async move {
if input.contains("error") {
Err("Processing failed".to_string())
} else {
Ok(format!("Processed: {input}"))
}
})
.register("LogError", |_ctx: ActivityContext, error: String| async move {
Ok(format!("Logged: {error}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("Starting orchestration");
match ctx.schedule_activity("ProcessData", input.clone()).await {
Ok(result) => {
ctx.trace_info("Processing succeeded");
Ok(result)
}
Err(e) => {
ctx.trace_info("Processing failed, logging error");
let _ = ctx.schedule_activity("LogError", e.clone()).await;
Err(format!("Failed to process '{input}': {e}"))
}
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ErrorRecovery", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-recovery-1", "ErrorRecovery", "test")
.await
.unwrap();
match client
.wait_for_orchestration("inst-recovery-1", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Processed: test");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
client
.start_orchestration("inst-recovery-2", "ErrorRecovery", "error")
.await
.unwrap();
match client
.wait_for_orchestration("inst-recovery-2", std::time::Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
let error_msg = details.display_message();
assert!(error_msg.contains("Failed to process 'error'"));
assert!(error_msg.contains("Processing failed"));
}
runtime::OrchestrationStatus::Completed { output, .. } => panic!("Expected failure but got success: {output}"),
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_self_pruning_eternal_orchestration() {
use duroxide::providers::PruneOptions;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let prune_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let executions_pruned = Arc::new(std::sync::atomic::AtomicU64::new(0));
let prune_count_clone = prune_count.clone();
let executions_pruned_clone = executions_pruned.clone();
let activity_registry = ActivityRegistry::builder()
.register("ProcessBatch", |_ctx: ActivityContext, batch_num: String| async move {
Ok(format!("Processed batch {batch_num}"))
})
.register("PruneSelf", move |ctx: ActivityContext, _input: String| {
let prune_count = prune_count_clone.clone();
let executions_pruned = executions_pruned_clone.clone();
async move {
let client = ctx.get_client();
let instance_id = ctx.instance_id().to_string();
let result = client
.prune_executions(
&instance_id,
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?;
prune_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
executions_pruned.fetch_add(result.executions_deleted, std::sync::atomic::Ordering::SeqCst);
Ok(format!("Pruned {} executions", result.executions_deleted))
}
})
.build();
let orchestration = |ctx: OrchestrationContext, state_str: String| async move {
#[derive(Serialize, Deserialize)]
struct State {
batch_num: u32,
total_batches: u32,
}
let state: State = serde_json::from_str(&state_str).unwrap_or(State {
batch_num: 0,
total_batches: 5,
});
let _result = ctx
.schedule_activity("ProcessBatch", state.batch_num.to_string())
.await?;
let _prune_result = ctx.schedule_activity("PruneSelf", "".to_string()).await?;
if state.batch_num >= state.total_batches - 1 {
return Ok(format!("Completed {} batches", state.total_batches));
}
let next_state = State {
batch_num: state.batch_num + 1,
total_batches: state.total_batches,
};
ctx.continue_as_new(serde_json::to_string(&next_state).unwrap()).await
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SelfPruningOrch", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-self-prune", "SelfPruningOrch", "{}")
.await
.unwrap();
match client
.wait_for_orchestration("inst-self-prune", std::time::Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert!(output.contains("Completed 5 batches"));
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let prunes = prune_count.load(std::sync::atomic::Ordering::SeqCst);
assert!(prunes >= 4, "Should have pruned at least 4 times");
let pruned = executions_pruned.load(std::sync::atomic::Ordering::SeqCst);
assert!(pruned >= 3, "Should have pruned at least 3 executions total");
let executions = client.list_executions("inst-self-prune").await.unwrap();
assert_eq!(
executions.len(),
1,
"Only final execution should remain after self-pruning"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_config_hot_reload_persistent_events_fs() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("ApplyConfig", |_ctx: ActivityContext, config: String| async move {
Ok(format!("applied:{config}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let mut log: Vec<String> = Vec::new();
for cycle in 0..3 {
loop {
let config_wait = ctx.dequeue_event("ConfigUpdate");
let drain_timeout = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(config_wait, drain_timeout).await {
Either2::First(config_json) => {
let result = ctx.schedule_activity("ApplyConfig", &config_json).await?;
log.push(result);
}
Either2::Second(_) => break, }
}
ctx.schedule_timer(Duration::from_millis(1000)).await;
let _ = ctx.schedule_activity("ApplyConfig", format!("cycle_{cycle}")).await?;
log.push(format!("cycle:{cycle}"));
}
loop {
let config_wait = ctx.dequeue_event("ConfigUpdate");
let drain_timeout = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(config_wait, drain_timeout).await {
Either2::First(config_json) => {
let result = ctx.schedule_activity("ApplyConfig", &config_json).await?;
log.push(result);
}
Either2::Second(_) => break,
}
}
Ok(log.join(","))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ConfigHotReload", orchestration)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-hot-reload", "ConfigHotReload", "")
.await
.unwrap();
client
.enqueue_event("inst-hot-reload", "ConfigUpdate", "v1")
.await
.unwrap();
client
.enqueue_event("inst-hot-reload", "ConfigUpdate", "v2")
.await
.unwrap();
assert!(
common::wait_for_history(
store.clone(),
"inst-hot-reload",
|hist| {
hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::ActivityScheduled { input, .. } if input == "cycle_0"
)
})
},
15_000,
)
.await,
"Timed out waiting for cycle:0 to progress past its drain phase"
);
client
.enqueue_event("inst-hot-reload", "ConfigUpdate", "v3")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-hot-reload", Duration::from_secs(20))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
let entries: Vec<&str> = output.split(',').collect();
let pos_v1 = entries
.iter()
.position(|e| *e == "applied:v1")
.unwrap_or_else(|| panic!("v1 missing from output: {output}"));
let pos_v2 = entries
.iter()
.position(|e| *e == "applied:v2")
.unwrap_or_else(|| panic!("v2 missing from output: {output}"));
let pos_v3 = entries
.iter()
.position(|e| *e == "applied:v3")
.unwrap_or_else(|| panic!("v3 missing from output: {output}"));
let pos_cycle0 = entries
.iter()
.position(|e| *e == "cycle:0")
.unwrap_or_else(|| panic!("cycle:0 missing from output: {output}"));
assert!(pos_v1 < pos_v2, "v1 should come before v2 (FIFO), got: {output}");
assert!(pos_v2 < pos_v3, "v2 should come before v3 (FIFO), got: {output}");
assert!(
pos_v3 > pos_cycle0,
"v3 arrived mid-flight, should be drained after cycle:0, got: {output}"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_heterogeneous_workers_with_tags() {
use duroxide::TagFilter;
use duroxide::runtime::RuntimeOptions;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Render", |_ctx: ActivityContext, input: String| async move {
Ok(format!("rendered:{input}"))
})
.register("Encode", |_ctx: ActivityContext, input: String| async move {
Ok(format!("encoded:{input}"))
})
.register("Upload", |_ctx: ActivityContext, input: String| async move {
Ok(format!("uploaded:{input}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let rendered = ctx.schedule_activity("Render", "frame42").with_tag("gpu").await?;
let encoded = ctx.schedule_activity("Encode", rendered).with_tag("cpu").await?;
let uploaded = ctx.schedule_activity("Upload", encoded).await?;
Ok(uploaded)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("VideoPipeline", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu", "cpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("video-1", "VideoPipeline", "")
.await
.unwrap();
match client
.wait_for_orchestration("video-1", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "uploaded:encoded:rendered:frame42");
let history = store.read("video-1").await.unwrap();
let tags: Vec<Option<String>> = history
.iter()
.filter_map(|e| match &e.kind {
EventKind::ActivityScheduled { tag, .. } => Some(tag.clone()),
_ => None,
})
.collect();
assert_eq!(
tags,
vec![Some("gpu".to_string()), Some("cpu".to_string()), None],
"History should preserve per-activity tags"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_starvation_safe_tagged_activity() {
use duroxide::TagFilter;
use duroxide::runtime::RuntimeOptions;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("GpuInference", |_ctx: ActivityContext, input: String| async move {
Ok(format!("inference:{input}"))
})
.register("CpuFallback", |_ctx: ActivityContext, input: String| async move {
Ok(format!("cpu_fallback:{input}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
let gpu_activity = ctx.schedule_activity("GpuInference", input.clone()).with_tag("gpu");
let timeout = ctx.schedule_timer(Duration::from_millis(500));
match ctx.select2(gpu_activity, timeout).await {
Either2::First(Ok(result)) => {
Ok(result)
}
Either2::First(Err(e)) => Err(e),
Either2::Second(()) => {
let result = ctx.schedule_activity("CpuFallback", input).await?;
Ok(result)
}
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("InferenceWithFallback", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("infer-1", "InferenceWithFallback", "model-v3")
.await
.unwrap();
match client
.wait_for_orchestration("infer-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "cpu_fallback:model-v3");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_dual_runtime_tag_cooperation() {
use duroxide::TagFilter;
use duroxide::runtime::RuntimeOptions;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let make_activities = || {
ActivityRegistry::builder()
.register("PreProcess", |_ctx: ActivityContext, input: String| async move {
Ok(format!("preprocessed:{input}"))
})
.register("GpuTrain", |_ctx: ActivityContext, input: String| async move {
Ok(format!("trained:{input}"))
})
.register("SaveModel", |_ctx: ActivityContext, input: String| async move {
Ok(format!("saved:{input}"))
})
.build()
};
let make_orchestrations = || {
OrchestrationRegistry::builder()
.register("MLPipeline", |ctx: OrchestrationContext, input: String| async move {
let preprocessed = ctx.schedule_activity("PreProcess", input).await?;
let model = ctx.schedule_activity("GpuTrain", preprocessed).with_tag("gpu").await?;
let saved = ctx.schedule_activity("SaveModel", model).await?;
Ok(saved)
})
.build()
};
let rt_a = runtime::Runtime::start_with_options(
store.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
},
)
.await;
let rt_b = runtime::Runtime::start_with_options(
store.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
orchestration_concurrency: 0, worker_tag_filter: TagFilter::tags(["gpu"]),
..Default::default()
},
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("ml-1", "MLPipeline", "dataset-v5")
.await
.unwrap();
match client
.wait_for_orchestration("ml-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "saved:trained:preprocessed:dataset-v5");
let history = store.read("ml-1").await.unwrap();
let scheduled: Vec<(&str, Option<&str>)> = history
.iter()
.filter_map(|e| match &e.kind {
EventKind::ActivityScheduled { name, tag, .. } => Some((name.as_str(), tag.as_deref())),
_ => None,
})
.collect();
assert_eq!(
scheduled,
vec![("PreProcess", None), ("GpuTrain", Some("gpu")), ("SaveModel", None),],
"History should show correct tag routing"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt_b.shutdown(None).await;
rt_a.shutdown(None).await;
}
#[tokio::test]
async fn sample_kv_request_response() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activities = ActivityRegistry::builder()
.register("ProcessCommand", |_ctx: ActivityContext, input: String| async move {
Ok(input.chars().rev().collect::<String>())
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"RequestServer",
|ctx: OrchestrationContext, _input: String| async move {
ctx.set_kv_value("status", "ready");
for _ in 0..3 {
let request_json = ctx.schedule_wait("request").await;
let request: serde_json::Value = serde_json::from_str(&request_json).unwrap();
let op_id = request["op_id"].as_str().unwrap().to_string();
let command = request["command"].as_str().unwrap().to_string();
ctx.set_kv_value("status", "processing");
let result = ctx
.schedule_activity("ProcessCommand", command)
.await
.unwrap_or_else(|e| format!("error: {e}"));
ctx.set_kv_value(format!("response:{op_id}"), &result);
ctx.set_kv_value("status", "ready");
}
ctx.set_kv_value("status", "shutdown");
Ok("served 3 requests".to_string())
},
)
.build();
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, Default::default()).await;
let client = Client::new(store.clone());
client
.start_orchestration("req-resp-server", "RequestServer", "")
.await
.unwrap();
let status = client
.wait_for_kv_value("req-resp-server", "status", Duration::from_secs(5))
.await
.expect("Server never became ready");
assert_eq!(status, "ready");
let requests = vec![("op-1", "hello"), ("op-2", "world"), ("op-3", "rust")];
for (op_id, command) in &requests {
let event_data = serde_json::json!({ "op_id": op_id, "command": command }).to_string();
client
.raise_event("req-resp-server", "request", &event_data)
.await
.unwrap();
let response_key = format!("response:{op_id}");
let response = client
.wait_for_kv_value("req-resp-server", &response_key, Duration::from_secs(5))
.await
.unwrap_or_else(|_| panic!("Timed out waiting for response to {op_id}"));
let expected: String = command.chars().rev().collect();
assert_eq!(
response, expected,
"Response for {op_id} should be the reversed command"
);
}
match client
.wait_for_orchestration("req-resp-server", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "served 3 requests");
}
other => panic!("Expected Completed, got: {other:?}"),
}
assert_eq!(
client.get_kv_value("req-resp-server", "status").await.unwrap(),
Some("shutdown".to_string())
);
assert_eq!(
client.get_kv_value("req-resp-server", "response:op-1").await.unwrap(),
Some("olleh".to_string())
);
assert_eq!(
client.get_kv_value("req-resp-server", "response:op-2").await.unwrap(),
Some("dlrow".to_string())
);
assert_eq!(
client.get_kv_value("req-resp-server", "response:op-3").await.unwrap(),
Some("tsur".to_string())
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_kv_cross_orchestration_read() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activities = ActivityRegistry::builder()
.register("ComputeResult", |_ctx: ActivityContext, input: String| async move {
let n: i64 = input.parse().map_err(|e| format!("parse: {e}"))?;
Ok((n * n).to_string())
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("Producer", |ctx: OrchestrationContext, input: String| async move {
let n: i64 = input.parse().unwrap();
ctx.set_kv_value("status", "computing");
let squared = ctx.schedule_activity("ComputeResult", n.to_string()).await?;
ctx.set_kv_value("result", &squared);
ctx.set_kv_value("status", "done");
ctx.schedule_wait("ack").await;
Ok(format!("produced:{squared}"))
})
.register(
"Consumer",
|ctx: OrchestrationContext, producer_id: String| async move {
let mut attempts = 0;
loop {
let status = ctx
.get_kv_value_from_instance(&producer_id, "status")
.await
.map_err(|e| format!("read status: {e}"))?;
if status.as_deref() == Some("done") {
break;
}
attempts += 1;
if attempts > 20 {
return Err("producer never finished".to_string());
}
ctx.schedule_timer(Duration::from_millis(100)).await;
}
let result = ctx
.get_kv_value_from_instance(&producer_id, "result")
.await
.map_err(|e| format!("read result: {e}"))?;
let result = result.ok_or_else(|| "result key missing".to_string())?;
Ok(format!("consumed:{result}"))
},
)
.build();
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, Default::default()).await;
let client = Client::new(store.clone());
client.start_orchestration("producer-1", "Producer", "7").await.unwrap();
let result = client
.wait_for_kv_value("producer-1", "result", Duration::from_secs(5))
.await
.expect("Producer never set result");
assert_eq!(result, "49");
client
.start_orchestration("consumer-1", "Consumer", "producer-1")
.await
.unwrap();
match client
.wait_for_orchestration("consumer-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "consumed:49");
}
other => panic!("Expected consumer Completed, got: {other:?}"),
}
client.raise_event("producer-1", "ack", "").await.unwrap();
match client
.wait_for_orchestration("producer-1", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "produced:49");
}
other => panic!("Expected producer Completed, got: {other:?}"),
}
assert_eq!(
client.get_kv_value("producer-1", "result").await.unwrap(),
Some("49".to_string())
);
assert_eq!(
client.get_kv_value("producer-1", "status").await.unwrap(),
Some("done".to_string())
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_kv_read_modify_write_counter() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("ProcessBatch", |_ctx: ActivityContext, batch: String| async move {
Ok(format!("processed:{batch}"))
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"BatchProcessor",
|ctx: OrchestrationContext, _input: String| async move {
let batches = vec!["alpha", "beta", "gamma"];
for batch_name in &batches {
let processed = ctx.get_kv_value("batches_processed").unwrap_or("0".to_string());
let count: u32 = processed.parse().unwrap();
let result = ctx.schedule_activity("ProcessBatch", batch_name.to_string()).await?;
ctx.set_kv_value("batches_processed", (count + 1).to_string());
ctx.set_kv_value("last_result", &result);
}
Ok(ctx.get_kv_value("batches_processed").unwrap_or("0".to_string()))
},
)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store.clone());
client
.start_orchestration("batch-proc", "BatchProcessor", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("batch-proc", Duration::from_secs(10))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "3", "Should have processed 3 batches");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!(
"Batch processor failed (possible RMW nondeterminism): {}",
details.display_message()
);
}
other => panic!("Expected Completed, got: {other:?}"),
}
assert_eq!(
client.get_kv_value("batch-proc", "batches_processed").await.unwrap(),
Some("3".to_string()),
);
assert_eq!(
client.get_kv_value("batch-proc", "last_result").await.unwrap(),
Some("processed:gamma".to_string()),
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn sample_orchestration_stats() {
let store = Arc::new(
duroxide::providers::sqlite::SqliteProvider::new_in_memory()
.await
.unwrap(),
);
let activities = ActivityRegistry::builder()
.register("FetchData", |_ctx: ActivityContext, url: String| async move {
Ok(format!("data from {url}"))
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register("DataPipeline", |ctx: OrchestrationContext, _: String| async move {
let result = ctx
.schedule_activity("FetchData", "https://api.example.com".to_string())
.await?;
ctx.set_kv_value("last_fetch", &result);
ctx.set_kv_value("status", "complete");
Ok(result)
})
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store.clone());
assert!(client.get_orchestration_stats("missing").await.unwrap().is_none());
client
.start_orchestration("pipeline-1", "DataPipeline", "")
.await
.unwrap();
client
.wait_for_orchestration("pipeline-1", Duration::from_secs(5))
.await
.unwrap();
let stats = client
.get_orchestration_stats("pipeline-1")
.await
.unwrap()
.expect("stats should exist after completion");
assert!(stats.history_event_count >= 4);
assert!(stats.history_size_bytes > 0);
assert_eq!(stats.kv_user_key_count, 2);
assert!(stats.kv_total_value_bytes > 0);
assert_eq!(stats.queue_pending_count, 0);
rt.shutdown(None).await;
}