use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::Once;
use std::time::Duration;
use tracing_subscriber::EnvFilter;
mod common;
static INIT_LOGGING: Once = Once::new();
fn init_test_logging() {
INIT_LOGGING.call_once(|| {
let env_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("debug"));
let _ = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
});
}
#[tokio::test]
async fn sample_hello_world_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Hello", |ctx: ActivityContext, input: String| async move {
ctx.trace_info("Hello activity started");
let greeting = format!("Hello, {input}!");
ctx.trace_info(format!("Hello activity completed -> {greeting}"));
Ok(greeting)
})
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("hello_world started");
let res = ctx.schedule_activity("Hello", "Rust").await?;
ctx.trace_info(format!("hello_world result={res} "));
let res1 = ctx.schedule_activity("Hello", input).await?;
ctx.trace_info(format!("hello_world result={res1} "));
Ok(res1)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("HelloWorld", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-hello-1", "HelloWorld", "World")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-hello-1", Duration::from_secs(60))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Hello, World!")
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_basic_control_flow_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"GetFlag",
|_ctx: ActivityContext, _input: String| async move { Ok("yes".to_string()) },
)
.register("SayYes", |_ctx: ActivityContext, _in: String| async move {
Ok("picked_yes".to_string())
})
.register("SayNo", |_ctx: ActivityContext, _in: String| async move {
Ok("picked_no".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let flag = ctx.schedule_activity("GetFlag", "").await.unwrap();
ctx.trace_info(format!("control_flow flag decided = {flag}"));
if flag == "yes" {
Ok(ctx.schedule_activity("SayYes", "").await.unwrap())
} else {
Ok(ctx.schedule_activity("SayNo", "").await.unwrap())
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ControlFlow", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-cflow-1", "ControlFlow", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-cflow-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "picked_yes"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_loop_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"Append",
|_ctx: ActivityContext, input: String| async move { Ok(format!("{input}x")) },
)
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let mut acc = String::from("start");
for i in 0..3 {
acc = ctx.schedule_activity("Append", acc).await.unwrap();
ctx.trace_info(format!("loop iteration {i} completed acc={acc}"));
}
Ok(acc)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("LoopOrchestration", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-loop-1", "LoopOrchestration", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-loop-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "startxxx"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_error_handling_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"Fragile",
|_ctx: ActivityContext, input: String| async move {
if input == "bad" {
Err("boom".to_string())
} else {
Ok("ok".to_string())
}
},
)
.register(
"Recover",
|_ctx: ActivityContext, _input: String| async move { Ok("recovered".to_string()) },
)
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
match ctx.schedule_activity("Fragile", "bad").await {
Ok(v) => {
ctx.trace_info(format!("fragile succeeded value={v}"));
Ok(v)
}
Err(e) => {
ctx.trace_warn(format!("fragile failed error={e}"));
let rec = ctx.schedule_activity("Recover", "").await.unwrap();
if rec != "recovered" {
ctx.trace_error(format!("unexpected recovery value={rec}"));
}
Ok(rec)
}
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ErrorHandling", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-err-1", "ErrorHandling", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-err-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "recovered"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_timeout_with_timer_race_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"LongOp",
|ctx: ActivityContext, _input: String| async move {
ctx.trace_info("LongOp started");
tokio::time::sleep(Duration::from_millis(500)).await;
ctx.trace_info("LongOp finished");
Ok("done".to_string())
},
)
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let act = ctx.schedule_activity("LongOp", "");
let t = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(act, t).await {
duroxide::Either2::Second(()) => Err("timeout".to_string()),
duroxide::Either2::First(Ok(s)) => Ok(s),
duroxide::Either2::First(Err(e)) => Err(e),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("TimeoutSample", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-timeout-sample", "TimeoutSample", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-timeout-sample", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
assert_eq!(details.display_message(), "timeout")
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("expected timeout failure, got: {output}")
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_select2_activity_vs_external_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Sleep", |ctx: ActivityContext, _input: String| async move {
tokio::time::sleep(Duration::from_millis(300)).await;
ctx.trace_info("Sleep activity finished");
Ok("slept".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let act = ctx.schedule_activity("Sleep", "");
let evt = ctx.schedule_wait("Go");
match ctx.select2(act, evt).await {
duroxide::Either2::First(Ok(s)) => Ok(format!("activity:{s}")),
duroxide::Either2::Second(payload) => Ok(format!("event:{payload}")),
duroxide::Either2::First(Err(e)) => Err(e),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Select2ActVsEvt", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let store_for_wait = store.clone();
tokio::spawn(async move {
let sfw = store_for_wait.clone();
let _ = common::wait_for_subscription(sfw.clone(), "inst-s2-mixed", "Go", 1000).await;
let client = Client::new(sfw);
let _ = client.raise_event("inst-s2-mixed", "Go", "ok").await;
});
let client = Client::new(store.clone());
client
.start_orchestration("inst-s2-mixed", "Select2ActVsEvt", "")
.await
.unwrap();
let s = match client
.wait_for_orchestration("inst-s2-mixed", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert_eq!(s, "event:ok");
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn dtf_legacy_gabbar_greetings_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"Greetings",
|ctx: ActivityContext, input: String| async move {
ctx.trace_info("Greeting activity started");
ctx.trace_debug(format!("Original input: {input}"));
let output = format!("Hello, {input}!");
ctx.trace_info(format!("Greeting activity completed -> {output}"));
Ok(output)
},
)
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let a = ctx.schedule_activity("Greetings", "Gabbar");
let b = ctx.schedule_activity("Greetings", "Samba");
let outs = ctx.join(vec![a, b]).await;
let mut vals: Vec<String> = outs
.into_iter()
.map(|o| o.expect("activity failed"))
.collect();
vals.sort();
Ok(format!("{}, {}", vals[0].clone(), vals[1].clone()))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Greetings", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-dtf-greetings", "Greetings", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-dtf-greetings", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Hello, Gabbar!, Hello, Samba!")
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_system_activities_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder().build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let now = ctx.utc_now().await?;
let now_ms = now
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let guid = ctx.new_guid().await?;
ctx.trace_info(format!("system now={now_ms}, guid={guid}"));
Ok(format!("n={now_ms},g={guid}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SystemActivities", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-system-acts", "SystemActivities", "")
.await
.unwrap();
let out = match client
.wait_for_orchestration("inst-system-acts", Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert!(out.contains("n=") && out.contains(",g="));
let parts: Vec<&str> = out.split([',', '=']).collect();
assert!(parts.len() >= 4);
let now_val: u64 = parts[1].parse().unwrap_or(0);
let guid_str = parts[3];
assert!(now_val > 0);
assert_eq!(guid_str.len(), 36);
assert!(guid_str
.chars()
.filter(|c| *c != '-')
.all(|c| c.is_ascii_hexdigit()));
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_status_polling_fs() {
init_test_logging();
use duroxide::OrchestrationStatus;
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder().build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(20)).await;
Ok("done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("StatusSample", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-status-sample", "StatusSample", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-status-sample", Duration::from_secs(4))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "done"),
OrchestrationStatus::Failed { details, .. } => {
panic!("unexpected failure: {}", details.display_message())
}
_ => unreachable!(),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_sub_orchestration_basic_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Upper", |ctx: ActivityContext, input: String| async move {
ctx.trace_info("Upper activity converting string");
let result = input.to_uppercase();
ctx.trace_info(format!("Upper activity result -> {result}"));
Ok(result)
})
.build();
let child_upper = |ctx: OrchestrationContext, input: String| async move {
let up = ctx.schedule_activity("Upper", input).await.unwrap();
Ok(up)
};
let parent = |ctx: OrchestrationContext, input: String| async move {
let r = ctx
.schedule_sub_orchestration("ChildUpper", input)
.await
.unwrap();
Ok(format!("parent:{r}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildUpper", child_upper)
.register("Parent", parent)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-basic", "Parent", "hi")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-basic", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "parent:HI"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_sub_orchestration_fanout_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Add", |_ctx: ActivityContext, input: String| async move {
let mut it = input.split(',');
let a = it.next().unwrap_or("0").parse::<i64>().unwrap_or(0);
let b = it.next().unwrap_or("0").parse::<i64>().unwrap_or(0);
Ok((a + b).to_string())
})
.build();
let child_sum = |ctx: OrchestrationContext, input: String| async move {
let s = ctx.schedule_activity("Add", input).await.unwrap();
Ok(s)
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let a = ctx.schedule_sub_orchestration("ChildSum", "1,2");
let b = ctx.schedule_sub_orchestration("ChildSum", "3,4");
let outs = ctx.join(vec![a, b]).await;
let mut nums: Vec<i64> = outs
.into_iter()
.map(|o| o.expect("child failed").parse::<i64>().unwrap())
.collect();
let total: i64 = nums.drain(..).sum();
Ok(format!("total={total}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildSum", child_sum)
.register("ParentFan", parent)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-fan", "ParentFan", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-fan", Duration::from_secs(20))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "total=10"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_sub_orchestration_chained_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"AppendX",
|_ctx: ActivityContext, input: String| async move { Ok(format!("{input}x")) },
)
.build();
let leaf = |ctx: OrchestrationContext, input: String| async move {
Ok(ctx.schedule_activity("AppendX", input).await.unwrap())
};
let mid = |ctx: OrchestrationContext, input: String| async move {
let r = ctx.schedule_sub_orchestration("Leaf", input).await.unwrap();
Ok(format!("{r}-mid"))
};
let root = |ctx: OrchestrationContext, input: String| async move {
let r = ctx.schedule_sub_orchestration("Mid", input).await.unwrap();
Ok(format!("root:{r}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Leaf", leaf)
.register("Mid", mid)
.register("Root", root)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-chain", "Root", "a")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-chain", Duration::from_secs(20))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "root:ax-mid"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_detached_orchestration_scheduling_fs() {
init_test_logging();
use duroxide::OrchestrationStatus;
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move {
Ok(input)
})
.build();
let chained = |ctx: OrchestrationContext, input: String| async move {
ctx.schedule_timer(Duration::from_millis(5)).await;
Ok(ctx.schedule_activity("Echo", input).await.unwrap())
};
let coordinator = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_orchestration("Chained", "W1", "A");
ctx.schedule_orchestration("Chained", "W2", "B");
Ok("scheduled".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Chained", chained)
.register("Coordinator", coordinator)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("CoordinatorRoot", "Coordinator", "")
.await
.unwrap();
match client
.wait_for_orchestration("CoordinatorRoot", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "scheduled"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let insts = vec!["W1".to_string(), "W2".to_string()];
for inst in insts {
match client
.wait_for_orchestration(&inst, Duration::from_secs(10))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => {
assert!(output == "A" || output == "B");
}
OrchestrationStatus::Failed { details, .. } => {
panic!(
"scheduled orchestration failed: {}",
details.display_message()
)
}
_ => unreachable!(),
}
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_detached_then_activity_fs() {
init_test_logging();
use duroxide::OrchestrationStatus;
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Echo", |_ctx: ActivityContext, input: String| async move {
Ok(input)
})
.build();
let child = |ctx: OrchestrationContext, input: String| async move {
ctx.schedule_timer(Duration::from_millis(5)).await;
Ok(format!("child-{input}"))
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_orchestration("Child", "detached-child", "payload");
let result = ctx.schedule_activity("Echo", "hello").await?;
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("Child", child)
.register("Parent", parent)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("ParentInstance", "Parent", "")
.await
.unwrap();
match client
.wait_for_orchestration("ParentInstance", Duration::from_secs(30))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "hello"),
OrchestrationStatus::Failed { details, .. } => {
panic!("parent orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
match client
.wait_for_orchestration("detached-child", Duration::from_secs(30))
.await
.unwrap()
{
OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "child-payload"),
OrchestrationStatus::Failed { details, .. } => {
panic!("child orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected child status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_continue_as_new_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder().build();
let orch = |ctx: OrchestrationContext, input: String| async move {
let n: u32 = input.parse().unwrap_or(0);
if n < 3 {
ctx.trace_info(format!("CAN sample n={n} -> continue"));
return ctx.continue_as_new((n + 1).to_string()).await;
} else {
Ok(format!("final:{n}"))
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("CanSample", orch)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-can", "CanSample", "0")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sample-can", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "final:3"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let admin = store
.as_management_capability()
.expect("Management capability should be available");
let execs = admin
.list_executions("inst-sample-can")
.await
.expect("list_executions should succeed");
assert_eq!(execs, vec![1, 2, 3, 4]);
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct AddReq {
a: i32,
b: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct AddRes {
sum: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct Ack {
ok: bool,
}
#[tokio::test]
async fn sample_typed_activity_and_orchestration_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.build();
let orchestration = |ctx: OrchestrationContext, req: AddReq| async move {
let out: AddRes = ctx
.schedule_activity_typed::<AddReq, AddRes>("Add", &req)
.await?;
Ok(out)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register_typed::<AddReq, AddRes, _, _>("Adder", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration_typed::<AddReq>("inst-typed-add", "Adder", AddReq { a: 2, b: 3 })
.await
.unwrap();
match client
.wait_for_orchestration_typed::<AddRes>("inst-typed-add", Duration::from_secs(10))
.await
.unwrap()
{
Ok(result) => assert_eq!(result, AddRes { sum: 5 }),
Err(error) => panic!("orchestration failed: {error}"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_typed_event_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder().build();
let orch = |ctx: OrchestrationContext, _in: ()| async move {
let ack: Ack = ctx.schedule_wait_typed::<Ack>("Ready").await;
Ok::<_, String>(serde_json::to_string(&ack).unwrap())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register_typed::<(), String, _, _>("WaitAck", orch)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let store_for_wait = store.clone();
tokio::spawn(async move {
let sfw = store_for_wait.clone();
let _ = common::wait_for_subscription(sfw.clone(), "inst-typed-ack", "Ready", 1000).await;
let payload = serde_json::to_string(&Ack { ok: true }).unwrap();
let client = Client::new(sfw);
let _ = client.raise_event("inst-typed-ack", "Ready", payload).await;
});
let client = Client::new(store.clone());
client
.start_orchestration_typed::<()>("inst-typed-ack", "WaitAck", ())
.await
.unwrap();
match client
.wait_for_orchestration_typed::<String>("inst-typed-ack", Duration::from_secs(10))
.await
.unwrap()
{
Ok(result) => assert_eq!(result, serde_json::to_string(&Ack { ok: true }).unwrap()),
Err(error) => panic!("orchestration failed: {error}"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_mixed_string_and_typed_typed_orch_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Upper", |_ctx: ActivityContext, input: String| async move {
Ok(input.to_uppercase())
})
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.build();
let orch = |ctx: OrchestrationContext, req: AddReq| async move {
let f_typed = ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &req);
let f_str = ctx.schedule_activity("Upper", "hello");
let s = match ctx.select2(f_typed, f_str).await {
duroxide::Either2::First(Ok(add_res)) => format!("sum={}", add_res.sum),
duroxide::Either2::Second(Ok(upper_res)) => format!("up={upper_res}"),
duroxide::Either2::First(Err(e)) | duroxide::Either2::Second(Err(e)) => return Err(e),
};
Ok::<_, String>(s)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register_typed::<AddReq, String, _, _>("MixedTypedOrch", orch)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration_typed::<AddReq>(
"inst-mixed-typed",
"MixedTypedOrch",
AddReq { a: 1, b: 2 },
)
.await
.unwrap();
let client = Client::new(store.clone());
let s = match client
.wait_for_orchestration_typed::<String>("inst-mixed-typed", Duration::from_secs(10))
.await
.unwrap()
{
Ok(result) => result,
Err(error) => panic!("orchestration failed: {error}"),
};
assert!(s == "sum=3" || s == "up=HELLO");
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_mixed_string_and_typed_string_orch_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register("Upper", |_ctx: ActivityContext, input: String| async move {
Ok(input.to_uppercase())
})
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.build();
let orch = |ctx: OrchestrationContext, _in: String| async move {
let f_typed = ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 5, b: 7 });
let f_str = ctx.schedule_activity("Upper", "race");
let s = match ctx.select2(f_typed, f_str).await {
duroxide::Either2::First(Ok(add_res)) => format!("sum={}", add_res.sum),
duroxide::Either2::Second(Ok(upper_res)) => format!("up={upper_res}"),
duroxide::Either2::First(Err(e)) | duroxide::Either2::Second(Err(e)) => return Err(e),
};
Ok::<_, String>(s)
};
let orch_reg = OrchestrationRegistry::builder()
.register("MixedStringOrch", orch)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orch_reg).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-mixed-string", "MixedStringOrch", "")
.await
.unwrap();
let s = match client
.wait_for_orchestration("inst-mixed-string", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => output,
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
};
assert!(s == "sum=12" || s == "up=RACE");
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_versioning_start_latest_vs_exact_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let v1 = |_: OrchestrationContext, _in: String| async move { Ok("v1".to_string()) };
let v2 = |_: OrchestrationContext, _in: String| async move { Ok("v2".to_string()) };
let reg = OrchestrationRegistry::builder()
.register("Versioned", v1)
.register_versioned("Versioned", "2.0.0", v2)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg.clone()).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-vers-latest", "Versioned", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-vers-latest", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "v2"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
reg.set_version_policy(
"Versioned",
duroxide::runtime::VersionPolicy::Exact(semver::Version::parse("1.0.0").unwrap()),
);
client
.start_orchestration("inst-vers-exact", "Versioned", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-vers-exact", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "v1"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_versioning_sub_orchestration_explicit_vs_policy_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let child_v1 = |_: OrchestrationContext, _in: String| async move { Ok("c1".to_string()) };
let child_v2 = |_: OrchestrationContext, _in: String| async move { Ok("c2".to_string()) };
let parent = |ctx: OrchestrationContext, _in: String| async move {
let a = ctx
.schedule_sub_orchestration_versioned("Child", Some("1.0.0".to_string()), "exp")
.await
.unwrap();
let b = ctx
.schedule_sub_orchestration("Child", "pol")
.await
.unwrap();
Ok(format!("{a}-{b}"))
};
let reg = OrchestrationRegistry::builder()
.register("ParentVers", parent)
.register("Child", child_v1)
.register_versioned("Child", "2.0.0", child_v2)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sub-vers", "ParentVers", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-sub-vers", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "c1-c2"),
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_versioning_continue_as_new_upgrade_fs() {
init_test_logging();
use duroxide::OrchestrationStatus;
let (store, container) = common::create_cosmos_store().await;
let v1 = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("v1: upgrading via ContinueAsNew (default policy)".to_string());
ctx.continue_as_new(format!("v1:{input}")).await
};
let v2 = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info(format!("v2: resumed with input={input}"));
Ok(format!("upgraded:{input}"))
};
let reg = OrchestrationRegistry::builder()
.register("LongRunner", v1)
.register_versioned("LongRunner", "2.0.0", v2)
.build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = Client::new(store.clone());
client
.start_orchestration_versioned("inst-can-upgrade", "LongRunner", "1.0.0", "state")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
match client.get_orchestration_status("inst-can-upgrade").await {
Ok(OrchestrationStatus::Completed { output, .. }) => {
assert_eq!(output, "upgraded:v1:state");
break;
}
Ok(OrchestrationStatus::Failed { details, .. }) => {
panic!("unexpected failure: {}", details.display_message())
}
Ok(_) if std::time::Instant::now() < deadline => {
tokio::time::sleep(Duration::from_millis(10)).await
}
_ => panic!("timeout waiting for upgraded completion"),
}
}
let admin = store
.as_management_capability()
.expect("Management capability should be available");
let execs = admin
.list_executions("inst-can-upgrade")
.await
.expect("list_executions should succeed");
assert_eq!(execs, vec![1, 2]);
let e1 = store
.read_with_execution("inst-can-upgrade", 1)
.await
.expect("read_with_execution should succeed");
assert!(e1.iter().any(|e| matches!(
&e.kind,
duroxide::EventKind::OrchestrationContinuedAsNew { .. }
)));
let e2 = store
.read_with_execution("inst-can-upgrade", 2)
.await
.expect("read_with_execution should succeed");
assert!(e2.iter().any(
|e| matches!(&e.kind, duroxide::EventKind::OrchestrationStarted { input, .. } if input == "v1:state")
));
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_cancellation_parent_cascades_to_children_fs() {
init_test_logging();
use duroxide::EventKind;
let (store, container) = common::create_cosmos_store().await;
let child = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx.schedule_wait("Go").await;
Ok("done".to_string())
};
let parent = |ctx: OrchestrationContext, _input: String| async move {
let _ = ctx
.schedule_sub_orchestration("ChildSample", "seed")
.await?;
Ok::<_, String>("parent_done".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ChildSample", child)
.register("ParentSample", parent)
.build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(
store.clone(),
activity_registry,
orchestration_registry,
options,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sample-cancel", "ParentSample", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let _ = client
.cancel_instance("inst-sample-cancel", "user_request")
.await;
let ok = common::wait_for_history(
store.clone(),
"inst-sample-cancel",
|hist| {
hist.iter().rev().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "user_request"
)
)
})
},
5_000,
)
.await;
assert!(ok, "timeout waiting for parent cancel failure");
let admin = store
.as_management_capability()
.expect("Management capability should be available");
let mut children = Vec::new();
let child_deadline = std::time::Instant::now() + Duration::from_secs(5);
while std::time::Instant::now() < child_deadline {
children = admin
.list_instances()
.await
.expect("list_instances should succeed")
.into_iter()
.filter(|i| i.starts_with("inst-sample-cancel::"))
.collect();
if !children.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(
!children.is_empty(),
"expected child instance(s) to exist after cancellation"
);
for child in children {
let ok_child = common::wait_for_history(
store.clone(),
&child,
|hist| {
hist.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationCancelRequested { .. }))
&& hist.iter().any(|e| {
matches!(
&e.kind,
EventKind::OrchestrationFailed { details, .. } if matches!(
details,
duroxide::ErrorDetails::Application {
kind: duroxide::AppErrorKind::Cancelled { reason },
..
} if reason == "parent canceled"
)
)
})
},
5_000,
)
.await;
assert!(ok_child, "timeout waiting for child cancel for {child}");
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_basic_error_handling_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"ValidateInput",
|_ctx: ActivityContext, input: String| async move {
if input.is_empty() {
Err("Input cannot be empty".to_string())
} else {
Ok(format!("Valid: {input}"))
}
},
)
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("Starting validation");
let result = ctx.schedule_activity("ValidateInput", input).await?;
ctx.trace_info(format!("Validation result: {result}"));
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("BasicErrorHandling", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-basic-error-1", "BasicErrorHandling", "test")
.await
.unwrap();
match client
.wait_for_orchestration("inst-basic-error-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Valid: test");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
client
.start_orchestration("inst-basic-error-2", "BasicErrorHandling", "")
.await
.unwrap();
match client
.wait_for_orchestration("inst-basic-error-2", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(details.display_message().contains("Input cannot be empty"));
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Expected failure but got success: {output}")
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_nested_function_error_handling_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"ProcessData",
|_ctx: ActivityContext, input: String| async move {
if input.contains("error") {
Err("Processing failed".to_string())
} else {
Ok(format!("Processed: {input}"))
}
},
)
.register(
"FormatOutput",
|_ctx: ActivityContext, input: String| async move { Ok(format!("Final: {input}")) },
)
.build();
async fn process_and_format(ctx: &OrchestrationContext, data: &str) -> Result<String, String> {
ctx.trace_info("Starting processing");
let processed = ctx
.schedule_activity("ProcessData", data.to_string())
.await?;
ctx.trace_info("Starting formatting");
let formatted = ctx.schedule_activity("FormatOutput", processed).await?;
Ok(formatted)
}
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("Starting orchestration");
let result = process_and_format(&ctx, &input).await?;
ctx.trace_info("Orchestration completed");
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("NestedErrorHandling", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-nested-error-1", "NestedErrorHandling", "test")
.await
.unwrap();
match client
.wait_for_orchestration("inst-nested-error-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Final: Processed: test");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
client
.start_orchestration("inst-nested-error-2", "NestedErrorHandling", "error")
.await
.unwrap();
match client
.wait_for_orchestration("inst-nested-error-2", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
assert!(details.display_message().contains("Processing failed"));
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Expected failure but got success: {output}")
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_error_recovery_fs() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"ProcessData",
|_ctx: ActivityContext, input: String| async move {
if input.contains("error") {
Err("Processing failed".to_string())
} else {
Ok(format!("Processed: {input}"))
}
},
)
.register(
"LogError",
|_ctx: ActivityContext, error: String| async move { Ok(format!("Logged: {error}")) },
)
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
ctx.trace_info("Starting orchestration");
match ctx.schedule_activity("ProcessData", input.clone()).await {
Ok(result) => {
ctx.trace_info("Processing succeeded");
Ok(result)
}
Err(e) => {
ctx.trace_info("Processing failed, logging error");
let _ = ctx.schedule_activity("LogError", e.clone()).await;
Err(format!("Failed to process '{input}': {e}"))
}
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ErrorRecovery", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-recovery-1", "ErrorRecovery", "test")
.await
.unwrap();
match client
.wait_for_orchestration("inst-recovery-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "Processed: test");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
client
.start_orchestration("inst-recovery-2", "ErrorRecovery", "error")
.await
.unwrap();
match client
.wait_for_orchestration("inst-recovery-2", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
let error_msg = details.display_message();
assert!(error_msg.contains("Failed to process 'error'"));
assert!(error_msg.contains("Processing failed"));
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Expected failure but got success: {output}")
}
_ => panic!("unexpected orchestration status"),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_self_pruning_eternal_orchestration() {
init_test_logging();
use duroxide::providers::PruneOptions;
let (store, container) = common::create_cosmos_store().await;
let prune_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
let executions_pruned = Arc::new(std::sync::atomic::AtomicU64::new(0));
let prune_count_clone = prune_count.clone();
let executions_pruned_clone = executions_pruned.clone();
let activity_registry = ActivityRegistry::builder()
.register(
"ProcessBatch",
|_ctx: ActivityContext, batch_num: String| async move {
Ok(format!("Processed batch {batch_num}"))
},
)
.register("PruneSelf", move |ctx: ActivityContext, _input: String| {
let prune_count = prune_count_clone.clone();
let executions_pruned = executions_pruned_clone.clone();
async move {
let client = ctx.get_client();
let instance_id = ctx.instance_id().to_string();
let result = client
.prune_executions(
&instance_id,
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?;
prune_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
executions_pruned.fetch_add(
result.executions_deleted,
std::sync::atomic::Ordering::SeqCst,
);
Ok(format!("Pruned {} executions", result.executions_deleted))
}
})
.build();
let orchestration = |ctx: OrchestrationContext, state_str: String| async move {
#[derive(Serialize, Deserialize)]
struct State {
batch_num: u32,
total_batches: u32,
}
let state: State = serde_json::from_str(&state_str).unwrap_or(State {
batch_num: 0,
total_batches: 5,
});
let _result = ctx
.schedule_activity("ProcessBatch", state.batch_num.to_string())
.await?;
let _prune_result = ctx.schedule_activity("PruneSelf", "".to_string()).await?;
if state.batch_num >= state.total_batches - 1 {
return Ok(format!("Completed {} batches", state.total_batches));
}
let next_state = State {
batch_num: state.batch_num + 1,
total_batches: state.total_batches,
};
ctx.continue_as_new(serde_json::to_string(&next_state).unwrap())
.await
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("SelfPruningOrch", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activity_registry,
orchestration_registry,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-self-prune", "SelfPruningOrch", "{}")
.await
.unwrap();
match client
.wait_for_orchestration("inst-self-prune", Duration::from_secs(90))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert!(output.contains("Completed 5 batches"));
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
_ => panic!("unexpected orchestration status"),
}
let prunes = prune_count.load(std::sync::atomic::Ordering::SeqCst);
assert!(prunes >= 4, "Should have pruned at least 4 times");
let pruned = executions_pruned.load(std::sync::atomic::Ordering::SeqCst);
assert!(
pruned >= 3,
"Should have pruned at least 3 executions total"
);
let executions = client.list_executions("inst-self-prune").await.unwrap();
assert_eq!(
executions.len(),
1,
"Only final execution should remain after self-pruning"
);
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_config_hot_reload_persistent_events_cdb() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"ApplyConfig",
|_ctx: ActivityContext, config: String| async move { Ok(format!("applied:{config}")) },
)
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let mut log: Vec<String> = Vec::new();
for cycle in 0..3 {
loop {
let config_wait = ctx.dequeue_event("ConfigUpdate");
let drain_timeout = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(config_wait, drain_timeout).await {
duroxide::Either2::First(config_json) => {
let result = ctx.schedule_activity("ApplyConfig", &config_json).await?;
log.push(result);
}
duroxide::Either2::Second(_) => break,
}
}
ctx.schedule_timer(Duration::from_millis(1000)).await;
let _ = ctx
.schedule_activity("ApplyConfig", format!("cycle_{cycle}"))
.await?;
log.push(format!("cycle:{cycle}"));
}
loop {
let config_wait = ctx.dequeue_event("ConfigUpdate");
let drain_timeout = ctx.schedule_timer(Duration::from_millis(100));
match ctx.select2(config_wait, drain_timeout).await {
duroxide::Either2::First(config_json) => {
let result = ctx.schedule_activity("ApplyConfig", &config_json).await?;
log.push(result);
}
duroxide::Either2::Second(_) => break,
}
}
Ok(log.join(","))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("ConfigHotReload", orchestration)
.build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(50),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(
store.clone(),
activity_registry,
orchestration_registry,
options,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-hot-reload", "ConfigHotReload", "")
.await
.unwrap();
client
.enqueue_event("inst-hot-reload", "ConfigUpdate", "v1")
.await
.unwrap();
client
.enqueue_event("inst-hot-reload", "ConfigUpdate", "v2")
.await
.unwrap();
assert!(
common::wait_for_history(
store.clone(),
"inst-hot-reload",
|hist| {
hist.iter().any(|e| {
matches!(
&e.kind,
duroxide::EventKind::ActivityScheduled { input, .. } if input == "cycle_0"
)
})
},
15_000,
)
.await,
"Timed out waiting for cycle:0 to progress past its drain phase"
);
client
.enqueue_event("inst-hot-reload", "ConfigUpdate", "v3")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-hot-reload", Duration::from_secs(20))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
let entries: Vec<&str> = output.split(',').collect();
let pos_v1 = entries
.iter()
.position(|e| *e == "applied:v1")
.unwrap_or_else(|| panic!("v1 missing from output: {output}"));
let pos_v2 = entries
.iter()
.position(|e| *e == "applied:v2")
.unwrap_or_else(|| panic!("v2 missing from output: {output}"));
let pos_v3 = entries
.iter()
.position(|e| *e == "applied:v3")
.unwrap_or_else(|| panic!("v3 missing from output: {output}"));
let pos_cycle0 = entries
.iter()
.position(|e| *e == "cycle:0")
.unwrap_or_else(|| panic!("cycle:0 missing from output: {output}"));
assert!(
pos_v1 < pos_v2,
"v1 should come before v2 (FIFO), got: {output}"
);
assert!(
pos_v2 < pos_v3,
"v2 should come before v3 (FIFO), got: {output}"
);
assert!(
pos_v3 > pos_cycle0,
"v3 arrived mid-flight, should be drained after cycle:0, got: {output}"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_heterogeneous_workers_with_tags() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"Render",
|_ctx: ActivityContext, input: String| async move { Ok(format!("rendered:{input}")) },
)
.register(
"Encode",
|_ctx: ActivityContext, input: String| async move { Ok(format!("encoded:{input}")) },
)
.register(
"Upload",
|_ctx: ActivityContext, input: String| async move { Ok(format!("uploaded:{input}")) },
)
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let rendered = ctx
.schedule_activity("Render", "frame42")
.with_tag("gpu")
.await?;
let encoded = ctx
.schedule_activity("Encode", rendered)
.with_tag("cpu")
.await?;
let uploaded = ctx.schedule_activity("Upload", encoded).await?;
Ok(uploaded)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("VideoPipeline", orchestration)
.build();
use duroxide::runtime::RuntimeOptions;
use duroxide::TagFilter;
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu", "cpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(
store.clone(),
activity_registry,
orchestration_registry,
opts,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("video-1", "VideoPipeline", "")
.await
.unwrap();
match client
.wait_for_orchestration("video-1", Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "uploaded:encoded:rendered:frame42");
let history = store.read("video-1").await.unwrap();
let tags: Vec<Option<String>> = history
.iter()
.filter_map(|e| match &e.kind {
duroxide::EventKind::ActivityScheduled { tag, .. } => Some(tag.clone()),
_ => None,
})
.collect();
assert_eq!(
tags,
vec![Some("gpu".to_string()), Some("cpu".to_string()), None],
"History should preserve per-activity tags"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_starvation_safe_tagged_activity() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activity_registry = ActivityRegistry::builder()
.register(
"GpuInference",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("inference:{input}"))
},
)
.register(
"CpuFallback",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("cpu_fallback:{input}"))
},
)
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
let gpu_activity = ctx
.schedule_activity("GpuInference", input.clone())
.with_tag("gpu");
let timeout = ctx.schedule_timer(Duration::from_millis(500));
match ctx.select2(gpu_activity, timeout).await {
duroxide::Either2::First(Ok(result)) => Ok(result),
duroxide::Either2::First(Err(e)) => Err(e),
duroxide::Either2::Second(()) => {
let result = ctx.schedule_activity("CpuFallback", input).await?;
Ok(result)
}
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("InferenceWithFallback", orchestration)
.build();
use duroxide::runtime::RuntimeOptions;
use duroxide::TagFilter;
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(
store.clone(),
activity_registry,
orchestration_registry,
opts,
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("infer-1", "InferenceWithFallback", "model-v3")
.await
.unwrap();
match client
.wait_for_orchestration("infer-1", Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "cpu_fallback:model-v3");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_dual_runtime_tag_cooperation() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let make_activities = || {
ActivityRegistry::builder()
.register(
"PreProcess",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("preprocessed:{input}"))
},
)
.register(
"GpuTrain",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("trained:{input}"))
},
)
.register(
"SaveModel",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("saved:{input}"))
},
)
.build()
};
let make_orchestrations = || {
OrchestrationRegistry::builder()
.register(
"MLPipeline",
|ctx: OrchestrationContext, input: String| async move {
let preprocessed = ctx.schedule_activity("PreProcess", input).await?;
let model = ctx
.schedule_activity("GpuTrain", preprocessed)
.with_tag("gpu")
.await?;
let saved = ctx.schedule_activity("SaveModel", model).await?;
Ok(saved)
},
)
.build()
};
use duroxide::runtime::RuntimeOptions;
use duroxide::TagFilter;
let rt_a = runtime::Runtime::start_with_options(
store.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
},
)
.await;
let rt_b = runtime::Runtime::start_with_options(
store.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
orchestration_concurrency: 0,
worker_tag_filter: TagFilter::tags(["gpu"]),
..Default::default()
},
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("ml-1", "MLPipeline", "dataset-v5")
.await
.unwrap();
match client
.wait_for_orchestration("ml-1", Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "saved:trained:preprocessed:dataset-v5");
let history = store.read("ml-1").await.unwrap();
let scheduled: Vec<(&str, Option<&str>)> = history
.iter()
.filter_map(|e| match &e.kind {
duroxide::EventKind::ActivityScheduled { name, tag, .. } => {
Some((name.as_str(), tag.as_deref()))
}
_ => None,
})
.collect();
assert_eq!(
scheduled,
vec![
("PreProcess", None),
("GpuTrain", Some("gpu")),
("SaveModel", None),
],
"History should show correct tag routing"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt_b.shutdown(None).await;
rt_a.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_dual_provider_tag_cooperation() {
init_test_logging();
let (store_a, container) = common::create_cosmos_store().await;
let store_b = common::create_cosmos_store_for_container(&container).await;
let make_activities = || {
ActivityRegistry::builder()
.register(
"PreProcess",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("preprocessed:{input}"))
},
)
.register(
"GpuTrain",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("trained:{input}"))
},
)
.register(
"SaveModel",
|_ctx: ActivityContext, input: String| async move {
Ok(format!("saved:{input}"))
},
)
.build()
};
let make_orchestrations = || {
OrchestrationRegistry::builder()
.register(
"MLPipeline",
|ctx: OrchestrationContext, input: String| async move {
let preprocessed = ctx.schedule_activity("PreProcess", input).await?;
let model = ctx
.schedule_activity("GpuTrain", preprocessed)
.with_tag("gpu")
.await?;
let saved = ctx.schedule_activity("SaveModel", model).await?;
Ok(saved)
},
)
.build()
};
use duroxide::runtime::RuntimeOptions;
use duroxide::TagFilter;
let rt_a = runtime::Runtime::start_with_options(
store_a.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
},
)
.await;
let rt_b = runtime::Runtime::start_with_options(
store_b.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
orchestration_concurrency: 0,
worker_tag_filter: TagFilter::tags(["gpu"]),
..Default::default()
},
)
.await;
let client = Client::new(store_a.clone());
client
.start_orchestration("ml-dual-1", "MLPipeline", "dataset-v7")
.await
.unwrap();
match client
.wait_for_orchestration("ml-dual-1", Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "saved:trained:preprocessed:dataset-v7");
let history = store_a.read("ml-dual-1").await.unwrap();
let scheduled: Vec<(&str, Option<&str>)> = history
.iter()
.filter_map(|e| match &e.kind {
duroxide::EventKind::ActivityScheduled { name, tag, .. } => {
Some((name.as_str(), tag.as_deref()))
}
_ => None,
})
.collect();
assert_eq!(
scheduled,
vec![
("PreProcess", None),
("GpuTrain", Some("gpu")),
("SaveModel", None),
],
"History should show correct tag routing across independent providers"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt_b.shutdown(None).await;
rt_a.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_kv_request_response() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activities = ActivityRegistry::builder()
.register(
"ProcessCommand",
|_ctx: ActivityContext, input: String| async move {
Ok(input.chars().rev().collect::<String>())
},
)
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"RequestServer",
|ctx: OrchestrationContext, _input: String| async move {
ctx.set_kv_value("status", "ready");
for _ in 0..3 {
let request_json = ctx.schedule_wait("request").await;
let request: serde_json::Value = serde_json::from_str(&request_json).unwrap();
let op_id = request["op_id"].as_str().unwrap().to_string();
let command = request["command"].as_str().unwrap().to_string();
ctx.set_kv_value("status", "processing");
let result = ctx
.schedule_activity("ProcessCommand", command)
.await
.unwrap_or_else(|e| format!("error: {e}"));
ctx.set_kv_value(format!("response:{op_id}"), &result);
ctx.set_kv_value("status", "ready");
}
ctx.set_kv_value("status", "shutdown");
Ok("served 3 requests".to_string())
},
)
.build();
let rt = runtime::Runtime::start_with_options(
store.clone(),
activities,
orchestrations,
Default::default(),
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("req-resp-server", "RequestServer", "")
.await
.unwrap();
let status = client
.wait_for_kv_value("req-resp-server", "status", Duration::from_secs(20))
.await
.expect("Server never became ready");
assert_eq!(status, "ready");
let requests = vec![("op-1", "hello"), ("op-2", "world"), ("op-3", "rust")];
for (op_id, command) in &requests {
let event_data = serde_json::json!({ "op_id": op_id, "command": command }).to_string();
client
.raise_event("req-resp-server", "request", &event_data)
.await
.unwrap();
let response_key = format!("response:{op_id}");
let response = client
.wait_for_kv_value("req-resp-server", &response_key, Duration::from_secs(20))
.await
.unwrap_or_else(|_| panic!("Timed out waiting for response to {op_id}"));
let expected: String = command.chars().rev().collect();
assert_eq!(
response, expected,
"Response for {op_id} should be the reversed command"
);
}
match client
.wait_for_orchestration("req-resp-server", Duration::from_secs(20))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "served 3 requests");
}
other => panic!("Expected Completed, got: {other:?}"),
}
assert_eq!(
client
.get_kv_value("req-resp-server", "status")
.await
.unwrap(),
Some("shutdown".to_string())
);
assert_eq!(
client
.get_kv_value("req-resp-server", "response:op-1")
.await
.unwrap(),
Some("olleh".to_string())
);
assert_eq!(
client
.get_kv_value("req-resp-server", "response:op-2")
.await
.unwrap(),
Some("dlrow".to_string())
);
assert_eq!(
client
.get_kv_value("req-resp-server", "response:op-3")
.await
.unwrap(),
Some("tsur".to_string())
);
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_kv_cross_orchestration_read() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activities = ActivityRegistry::builder()
.register(
"ComputeResult",
|_ctx: ActivityContext, input: String| async move {
let n: i64 = input.parse().map_err(|e| format!("parse: {e}"))?;
Ok((n * n).to_string())
},
)
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"Producer",
|ctx: OrchestrationContext, input: String| async move {
let n: i64 = input.parse().unwrap();
ctx.set_kv_value("status", "computing");
let squared = ctx
.schedule_activity("ComputeResult", n.to_string())
.await?;
ctx.set_kv_value("result", &squared);
ctx.set_kv_value("status", "done");
ctx.schedule_wait("ack").await;
Ok(format!("produced:{squared}"))
},
)
.register(
"Consumer",
|ctx: OrchestrationContext, producer_id: String| async move {
let mut attempts = 0;
loop {
let status = ctx
.get_kv_value_from_instance(&producer_id, "status")
.await
.map_err(|e| format!("read status: {e}"))?;
if status.as_deref() == Some("done") {
break;
}
attempts += 1;
if attempts > 40 {
return Err("producer never finished".to_string());
}
ctx.schedule_timer(Duration::from_millis(200)).await;
}
let result = ctx
.get_kv_value_from_instance(&producer_id, "result")
.await
.map_err(|e| format!("read result: {e}"))?;
let result = result.ok_or_else(|| "result key missing".to_string())?;
Ok(format!("consumed:{result}"))
},
)
.build();
let rt = runtime::Runtime::start_with_options(
store.clone(),
activities,
orchestrations,
Default::default(),
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("producer-1", "Producer", "7")
.await
.unwrap();
let result = client
.wait_for_kv_value("producer-1", "result", Duration::from_secs(20))
.await
.expect("Producer never set result");
assert_eq!(result, "49");
client
.start_orchestration("consumer-1", "Consumer", "producer-1")
.await
.unwrap();
match client
.wait_for_orchestration("consumer-1", Duration::from_secs(30))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "consumed:49");
}
other => panic!("Expected consumer Completed, got: {other:?}"),
}
client.raise_event("producer-1", "ack", "").await.unwrap();
match client
.wait_for_orchestration("producer-1", Duration::from_secs(20))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "produced:49");
}
other => panic!("Expected producer Completed, got: {other:?}"),
}
assert_eq!(
client.get_kv_value("producer-1", "result").await.unwrap(),
Some("49".to_string())
);
assert_eq!(
client.get_kv_value("producer-1", "status").await.unwrap(),
Some("done".to_string())
);
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_kv_read_modify_write_counter() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activities = ActivityRegistry::builder()
.register(
"ProcessBatch",
|_ctx: ActivityContext, batch: String| async move { Ok(format!("processed:{batch}")) },
)
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"BatchProcessor",
|ctx: OrchestrationContext, _input: String| async move {
let batches = vec!["alpha", "beta", "gamma"];
for batch_name in &batches {
let processed = ctx
.get_kv_value("batches_processed")
.unwrap_or("0".to_string());
let count: u32 = processed.parse().unwrap();
let result = ctx
.schedule_activity("ProcessBatch", batch_name.to_string())
.await?;
ctx.set_kv_value("batches_processed", (count + 1).to_string());
ctx.set_kv_value("last_result", &result);
}
Ok(ctx
.get_kv_value("batches_processed")
.unwrap_or("0".to_string()))
},
)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store.clone());
client
.start_orchestration("batch-proc", "BatchProcessor", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("batch-proc", Duration::from_secs(20))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "3", "Should have processed 3 batches");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!(
"Batch processor failed (possible RMW nondeterminism): {}",
details.display_message()
);
}
other => panic!("Expected Completed, got: {other:?}"),
}
assert_eq!(
client
.get_kv_value("batch-proc", "batches_processed")
.await
.unwrap(),
Some("3".to_string()),
);
assert_eq!(
client
.get_kv_value("batch-proc", "last_result")
.await
.unwrap(),
Some("processed:gamma".to_string()),
);
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}
#[tokio::test]
async fn sample_orchestration_stats() {
init_test_logging();
let (store, container) = common::create_cosmos_store().await;
let activities = ActivityRegistry::builder()
.register(
"FetchData",
|_ctx: ActivityContext, url: String| async move { Ok(format!("data from {url}")) },
)
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"DataPipeline",
|ctx: OrchestrationContext, _: String| async move {
let result = ctx
.schedule_activity("FetchData", "https://api.example.com".to_string())
.await?;
ctx.set_kv_value("last_fetch", &result);
ctx.set_kv_value("status", "complete");
Ok(result)
},
)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store.clone());
assert!(client
.get_orchestration_stats("missing")
.await
.unwrap()
.is_none());
client
.start_orchestration("pipeline-1", "DataPipeline", "")
.await
.unwrap();
client
.wait_for_orchestration("pipeline-1", Duration::from_secs(10))
.await
.unwrap();
let stats = client
.get_orchestration_stats("pipeline-1")
.await
.unwrap()
.expect("stats should exist after completion");
assert!(stats.history_event_count >= 4);
assert!(stats.history_size_bytes > 0);
assert_eq!(stats.kv_user_key_count, 2);
assert!(stats.kv_total_value_bytes > 0);
assert_eq!(stats.queue_pending_count, 0);
rt.shutdown(None).await;
common::cleanup_container(&container).await;
}