#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, Client, Either2, OrchestrationContext, OrchestrationRegistry};
use serde::{Deserialize, Serialize};
use std::time::Duration;
mod common;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct AddReq {
a: i32,
b: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct AddRes {
sum: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct MulReq {
a: i32,
b: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct MulRes {
product: i32,
}
fn math_activities() -> duroxide::runtime::registry::Registry<dyn duroxide::runtime::ActivityHandler> {
ActivityRegistry::builder()
.register_typed::<AddReq, AddRes, _, _>("Add", |_ctx: ActivityContext, req| async move {
Ok(AddRes { sum: req.a + req.b })
})
.register_typed::<MulReq, MulRes, _, _>("Mul", |_ctx: ActivityContext, req| async move {
Ok(MulRes { product: req.a * req.b })
})
.register_typed::<AddReq, AddRes, _, _>("SlowAdd", |_ctx: ActivityContext, req| async move {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(AddRes { sum: req.a + req.b })
})
.register_typed::<MulReq, MulRes, _, _>("FailDiv", |_ctx: ActivityContext, req| async move {
if req.b == 0 {
Err("division by zero".to_string())
} else {
Ok(MulRes { product: req.a / req.b })
}
})
.build()
}
#[tokio::test]
async fn typed_activity_join_fan_out() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = math_activities();
let orch = |ctx: OrchestrationContext, _: String| async move {
let futures = vec![
ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 1, b: 2 }),
ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 10, b: 20 }),
ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 100, b: 200 }),
];
let results: Vec<Result<AddRes, String>> = ctx.join(futures).await;
let total: i32 = results.into_iter().map(|r| r.unwrap().sum).sum();
Ok(total.to_string())
};
let orchestrations = OrchestrationRegistry::builder().register("TypedJoin", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("tj-1", "TypedJoin", "").await.unwrap();
let status = client
.wait_for_orchestration("tj-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "333"); }
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn typed_activity_join2_heterogeneous() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = math_activities();
let orch = |ctx: OrchestrationContext, _: String| async move {
let (add_res, mul_res) = ctx
.join2(
ctx.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 5, b: 3 }),
ctx.schedule_activity_typed::<MulReq, MulRes>("Mul", &MulReq { a: 5, b: 3 }),
)
.await;
let sum = add_res?.sum;
let product = mul_res?.product;
Ok(format!("sum={sum},product={product}"))
};
let orchestrations = OrchestrationRegistry::builder().register("TypedJoin2", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("tj2-1", "TypedJoin2", "").await.unwrap();
let status = client
.wait_for_orchestration("tj2-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "sum=8,product=15");
}
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn typed_activity_select2_timeout() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = math_activities();
let orch = |ctx: OrchestrationContext, _: String| async move {
let slow = ctx.schedule_activity_typed::<AddReq, AddRes>("SlowAdd", &AddReq { a: 1, b: 2 });
let timeout = ctx.schedule_timer(Duration::from_millis(10));
match ctx.select2(slow, timeout).await {
Either2::First(result) => Ok(format!("completed:{}", result?.sum)),
Either2::Second(()) => Ok("timeout".to_string()),
}
};
let orchestrations = OrchestrationRegistry::builder().register("TypedSelect2", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("ts2-1", "TypedSelect2", "").await.unwrap();
let status = client
.wait_for_orchestration("ts2-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => {
assert!(
output == "timeout" || output.starts_with("completed:"),
"Unexpected output: {output}"
);
}
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn durable_future_map_transforms_result() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = ActivityRegistry::builder()
.register("Double", |_ctx: ActivityContext, input: String| async move {
let n: i32 = input.parse().map_err(|e: std::num::ParseIntError| e.to_string())?;
Ok((n * 2).to_string())
})
.build();
let orch = |ctx: OrchestrationContext, _: String| async move {
let result = ctx
.schedule_activity("Double", "21")
.map(|r| {
r.map(|s| {
let n: i32 = s.parse().unwrap();
(n + 1).to_string()
})
})
.await?;
Ok(result) };
let orchestrations = OrchestrationRegistry::builder().register("MapCombo", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("mc-1", "MapCombo", "").await.unwrap();
let status = client
.wait_for_orchestration("mc-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "43"),
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn typed_map_then_join() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = math_activities();
let orch = |ctx: OrchestrationContext, _: String| async move {
let f1 = ctx
.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 1, b: 2 })
.map(|r| r.map(|res| res.sum));
let f2 = ctx
.schedule_activity_typed::<AddReq, AddRes>("Add", &AddReq { a: 10, b: 20 })
.map(|r| r.map(|res| res.sum));
let results: Vec<Result<i32, String>> = ctx.join(vec![f1, f2]).await;
let total: i32 = results.into_iter().map(|r| r.unwrap()).sum();
Ok(total.to_string())
};
let orchestrations = OrchestrationRegistry::builder().register("TypedMapJoin", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("tmj-1", "TypedMapJoin", "").await.unwrap();
let status = client
.wait_for_orchestration("tmj-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "33"),
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn map_preserves_cancellation_on_drop() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = ActivityRegistry::builder()
.register("SlowWork", |_ctx: ActivityContext, _: String| async move {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok("done".to_string())
})
.build();
let orch = |ctx: OrchestrationContext, _: String| async move {
let mapped = ctx
.schedule_activity("SlowWork", "")
.map(|r| r.map(|s| format!("mapped:{s}")));
let timeout = ctx.schedule_timer(Duration::from_millis(50));
match ctx.select2(mapped, timeout).await {
Either2::First(result) => Ok(format!("work:{}", result?)),
Either2::Second(()) => Ok("cancelled".to_string()),
}
};
let orchestrations = OrchestrationRegistry::builder().register("MapCancel", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("mcan-1", "MapCancel", "").await.unwrap();
let status = client
.wait_for_orchestration("mcan-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => assert_eq!(output, "cancelled"),
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn typed_error_propagation_through_join() {
let (store, _td) = common::create_sqlite_store_disk().await;
let activities = math_activities();
let orch = |ctx: OrchestrationContext, _: String| async move {
let futures = vec![
ctx.schedule_activity_typed::<MulReq, MulRes>("FailDiv", &MulReq { a: 10, b: 2 }),
ctx.schedule_activity_typed::<MulReq, MulRes>("FailDiv", &MulReq { a: 10, b: 0 }), ];
let results: Vec<Result<MulRes, String>> = ctx.join(futures).await;
let first = results[0].as_ref().map(|r| r.product).unwrap();
let second_err = results[1].as_ref().unwrap_err().clone();
Ok(format!("ok={first},err={second_err}"))
};
let orchestrations = OrchestrationRegistry::builder().register("TypedErrJoin", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = Client::new(store);
client.start_orchestration("tej-1", "TypedErrJoin", "").await.unwrap();
let status = client
.wait_for_orchestration("tej-1", Duration::from_secs(5))
.await
.unwrap();
match status {
duroxide::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "ok=5,err=division by zero");
}
other => panic!("Expected completed, got {other:?}"),
}
rt.shutdown(None).await;
}