#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::EventKind;
mod common;
#[test]
fn activity_scheduled_with_tag_roundtrips() {
let kind = EventKind::ActivityScheduled {
name: "Build".to_string(),
input: "{}".to_string(),
session_id: None,
tag: Some("gpu".to_string()),
};
let json = serde_json::to_string(&kind).unwrap();
let deser: EventKind = serde_json::from_str(&json).unwrap();
match deser {
EventKind::ActivityScheduled { tag, .. } => {
assert_eq!(tag, Some("gpu".to_string()));
}
_ => panic!("Expected ActivityScheduled"),
}
}
#[test]
fn activity_scheduled_none_tag_omitted_in_json() {
let kind = EventKind::ActivityScheduled {
name: "Build".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
let json = serde_json::to_string(&kind).unwrap();
assert!(
!json.contains("\"tag\""),
"tag: None should be omitted from JSON, got: {}",
json
);
}
#[test]
fn activity_scheduled_missing_tag_deserializes_as_none() {
let json = r#"{"type":"ActivityScheduled","name":"Build","input":"{}"}"#;
let kind: EventKind = serde_json::from_str(json).unwrap();
match kind {
EventKind::ActivityScheduled { tag, .. } => {
assert_eq!(tag, None);
}
_ => panic!("Expected ActivityScheduled"),
}
}
#[test]
fn work_item_activity_execute_with_tag_roundtrips() {
use duroxide::providers::WorkItem;
let item = WorkItem::ActivityExecute {
instance: "i".to_string(),
execution_id: 1,
id: 1,
name: "Build".to_string(),
input: "{}".to_string(),
session_id: None,
tag: Some("gpu".to_string()),
};
let json = serde_json::to_string(&item).unwrap();
let deser: WorkItem = serde_json::from_str(&json).unwrap();
assert_eq!(deser, item);
}
#[test]
fn work_item_activity_execute_none_tag_omitted_in_json() {
use duroxide::providers::WorkItem;
let item = WorkItem::ActivityExecute {
instance: "i".to_string(),
execution_id: 1,
id: 1,
name: "Build".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
let json = serde_json::to_string(&item).unwrap();
assert!(
!json.contains("\"tag\""),
"tag: None should be omitted from JSON, got: {}",
json
);
}
#[test]
fn work_item_activity_execute_missing_tag_deserializes_as_none() {
use duroxide::providers::WorkItem;
let json = r#"{"ActivityExecute":{"instance":"i","execution_id":1,"id":1,"name":"Build","input":"{}"}}"#;
let item: WorkItem = serde_json::from_str(json).unwrap();
match item {
WorkItem::ActivityExecute { tag, .. } => {
assert_eq!(tag, None);
}
_ => panic!("Expected ActivityExecute"),
}
}
#[tokio::test]
async fn e2e_tagged_activity_runs_on_matching_worker() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, Either2, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Compute", |_ctx: ActivityContext, input: String| async move {
Ok(format!("computed:{input}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let gpu_result = ctx.schedule_activity("Compute", "gpu-work").with_tag("gpu").await?;
let untagged = ctx.schedule_activity("Compute", "default-work");
let timeout = ctx.schedule_timer(Duration::from_millis(500));
let untagged_status = match ctx.select2(untagged, timeout).await {
Either2::First(Ok(_)) => "untagged_completed",
Either2::First(Err(_)) => "untagged_error",
Either2::Second(()) => "untagged_timed_out",
};
Ok(format!("{gpu_result}|{untagged_status}"))
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("TagTest", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::tags(["gpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client.start_orchestration("tag-e2e-1", "TagTest", "").await.unwrap();
match client
.wait_for_orchestration("tag-e2e-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(
output, "computed:gpu-work|untagged_timed_out",
"GPU activity should complete, untagged should time out"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_with_tag_persists_tag_in_event_history() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("GpuTask", |_ctx: ActivityContext, input: String| async move {
Ok(format!("gpu:{input}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("GpuTask", "frame1").with_tag("gpu").await?;
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("GpuOrch", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client.start_orchestration("tag-e2e-2", "GpuOrch", "").await.unwrap();
match client
.wait_for_orchestration("tag-e2e-2", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "gpu:frame1");
let history = store.read("tag-e2e-2").await.unwrap();
let scheduled = history
.iter()
.find(|e| matches!(&e.kind, EventKind::ActivityScheduled { tag: Some(t), .. } if t == "gpu"));
assert!(
scheduled.is_some(),
"Expected ActivityScheduled with tag='gpu' in history: {:?}",
history.iter().map(|e| &e.kind).collect::<Vec<_>>()
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_tagged_activity_starvation_times_out() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, Either2, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("GpuRender", |_ctx: ActivityContext, input: String| async move {
Ok(format!("rendered:{input}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let activity = ctx.schedule_activity("GpuRender", "frame1").with_tag("gpu");
let timeout = ctx.schedule_timer(Duration::from_millis(500));
match ctx.select2(activity, timeout).await {
Either2::First(Ok(result)) => Ok(format!("completed:{result}")),
Either2::First(Err(e)) => Err(format!("activity_error:{e}")),
Either2::Second(()) => Ok("timeout:no_gpu_worker".to_string()),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("StarvationTest", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("starvation-1", "StarvationTest", "")
.await
.unwrap();
match client
.wait_for_orchestration("starvation-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(
output, "timeout:no_gpu_worker",
"Timer should win because no GPU worker exists"
);
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_oversized_tag_fails_orchestration() {
use duroxide::runtime::{self, RuntimeOptions, limits, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Work", |_ctx: ActivityContext, input: String| async move {
Ok(format!("done:{input}"))
})
.build();
let oversized_tag = "x".repeat(limits::MAX_TAG_NAME_BYTES + 1);
let orchestration = move |ctx: OrchestrationContext, _input: String| {
let tag = oversized_tag.clone();
async move {
let result = ctx.schedule_activity("Work", "data").with_tag(&tag).await?;
Ok(result)
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("OversizedTagTest", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::Any,
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("oversized-1", "OversizedTagTest", "")
.await
.unwrap();
match client
.wait_for_orchestration("oversized-1", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Failed { details, .. } => {
let msg = details.display_message();
assert!(
msg.contains("tag size") && msg.contains("exceeds limit"),
"Expected tag size limit error, got: {msg}"
);
}
runtime::OrchestrationStatus::Completed { output, .. } => {
panic!("Expected failure but got completion: {output}")
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_tag_at_boundary_succeeds() {
use duroxide::runtime::{self, RuntimeOptions, limits, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("Work", |_ctx: ActivityContext, input: String| async move {
Ok(format!("done:{input}"))
})
.build();
let boundary_tag = "x".repeat(limits::MAX_TAG_NAME_BYTES);
let orchestration = move |ctx: OrchestrationContext, _input: String| {
let tag = boundary_tag.clone();
async move {
let result = ctx.schedule_activity("Work", "data").with_tag(&tag).await?;
Ok(result)
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("BoundaryTagTest", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::Any,
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("boundary-1", "BoundaryTagTest", "")
.await
.unwrap();
match client
.wait_for_orchestration("boundary-1", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "done:data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!(
"Expected success but orchestration failed: {}",
details.display_message()
)
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_activity_context_tag_visible_in_handler() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("EchoTag", |ctx: ActivityContext, _input: String| async move {
Ok(ctx.tag().unwrap_or("none").to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let tag_value = ctx.schedule_activity("EchoTag", "").with_tag("gpu").await?;
Ok(tag_value)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("EchoTagOrch", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("echo-tag-1", "EchoTagOrch", "")
.await
.unwrap();
match client
.wait_for_orchestration("echo-tag-1", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "gpu", "Activity handler should see tag='gpu'");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_activity_context_tag_none_for_untagged() {
use duroxide::runtime::{self, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("EchoTag", |ctx: ActivityContext, _input: String| async move {
Ok(ctx.tag().unwrap_or("none").to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let tag_value = ctx.schedule_activity("EchoTag", "").await?;
Ok(tag_value)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("EchoTagOrchDefault", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("echo-tag-2", "EchoTagOrchDefault", "")
.await
.unwrap();
match client
.wait_for_orchestration("echo-tag-2", Duration::from_secs(5))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "none", "Untagged activity should have tag=None");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_tag_routing_survives_continue_as_new() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("GpuWork", |_ctx: ActivityContext, input: String| async move {
Ok(format!("gpu:{input}"))
})
.build();
let orchestration = |ctx: OrchestrationContext, input: String| async move {
let count: u32 = input.parse().unwrap_or(0);
if count < 1 {
return ctx.continue_as_new("1").await;
}
let result = ctx
.schedule_activity("GpuWork", format!("iter{count}"))
.with_tag("gpu")
.await?;
Ok(result)
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("CanTagOrch", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("can-tag-1", "CanTagOrch", "0")
.await
.unwrap();
match client
.wait_for_orchestration("can-tag-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "gpu:iter1");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn e2e_two_workers_different_tags_no_overlap() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let make_activities = || {
ActivityRegistry::builder()
.register("Work", |_ctx: ActivityContext, input: String| async move {
Ok(format!("done:{input}"))
})
.build()
};
let make_orchestrations = || {
OrchestrationRegistry::builder()
.register("TwoPoolOrch", |ctx: OrchestrationContext, _input: String| async move {
let cpu = ctx.schedule_activity("Work", "cpu").await?;
let gpu = ctx.schedule_activity("Work", "gpu").with_tag("gpu").await?;
Ok(format!("{cpu}|{gpu}"))
})
.build()
};
let rt_a = runtime::Runtime::start_with_options(
store.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
worker_tag_filter: TagFilter::default(),
..Default::default()
},
)
.await;
let rt_b = runtime::Runtime::start_with_options(
store.clone(),
make_activities(),
make_orchestrations(),
RuntimeOptions {
orchestration_concurrency: 0,
worker_tag_filter: TagFilter::tags(["gpu"]),
..Default::default()
},
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("two-pool-1", "TwoPoolOrch", "")
.await
.unwrap();
match client
.wait_for_orchestration("two-pool-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "done:cpu|done:gpu");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt_b.shutdown(None).await;
rt_a.shutdown(None).await;
}
#[tokio::test]
async fn e2e_dropped_tagged_future_triggers_cancellation() {
use duroxide::runtime::{self, RuntimeOptions, registry::ActivityRegistry};
use duroxide::{ActivityContext, Client, Either2, OrchestrationContext, OrchestrationRegistry, TagFilter};
use std::time::Duration;
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let activity_registry = ActivityRegistry::builder()
.register("SlowGpu", |_ctx: ActivityContext, _input: String| async move {
tokio::time::sleep(Duration::from_secs(60)).await;
Ok("should_not_reach".to_string())
})
.build();
let orchestration = |ctx: OrchestrationContext, _input: String| async move {
let gpu = ctx.schedule_activity("SlowGpu", "data").with_tag("gpu");
let timeout = ctx.schedule_timer(Duration::from_millis(200));
match ctx.select2(gpu, timeout).await {
Either2::First(Ok(r)) => Ok(format!("gpu_won:{r}")),
Either2::First(Err(e)) => Err(e),
Either2::Second(()) => Ok("timeout_won".to_string()),
}
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("CancelTagOrch", orchestration)
.build();
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu"]),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, opts).await;
let client = Client::new(store.clone());
client
.start_orchestration("cancel-tag-1", "CancelTagOrch", "")
.await
.unwrap();
match client
.wait_for_orchestration("cancel-tag-1", Duration::from_secs(10))
.await
.unwrap()
{
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "timeout_won", "Timer should win, tagged activity cancelled");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("orchestration failed: {}", details.display_message())
}
other => panic!("unexpected status: {:?}", other),
}
rt.shutdown(None).await;
}
#[test]
fn kv_event_kinds_roundtrip() {
use duroxide::EventKind;
let cases: Vec<EventKind> = vec![
EventKind::KeyValueSet {
key: "my_key".to_string(),
value: "my_value".to_string(),
last_updated_at_ms: 0,
},
EventKind::KeyValueCleared {
key: "clear_me".to_string(),
},
EventKind::KeyValuesCleared,
];
for kind in &cases {
let json = serde_json::to_string(kind).unwrap();
let deser: EventKind = serde_json::from_str(&json).unwrap();
match (kind, &deser) {
(EventKind::KeyValueSet { key: k1, value: v1, .. }, EventKind::KeyValueSet { key: k2, value: v2, .. }) => {
assert_eq!(k1, k2);
assert_eq!(v1, v2);
}
(EventKind::KeyValueCleared { key: k1 }, EventKind::KeyValueCleared { key: k2 }) => {
assert_eq!(k1, k2);
}
(EventKind::KeyValuesCleared, EventKind::KeyValuesCleared) => {}
_ => panic!("roundtrip mismatch: serialized {kind:?} but got {deser:?}"),
}
}
}
#[test]
fn kv_event_kinds_backward_compat() {
use duroxide::EventKind;
let json = r#"{"type":"KeyValueSet","key":"k","value":"v"}"#;
let kind: EventKind = serde_json::from_str(json).unwrap();
assert!(matches!(kind, EventKind::KeyValueSet { .. }));
let json = r#"{"type":"KeyValueCleared","key":"k"}"#;
let kind: EventKind = serde_json::from_str(json).unwrap();
assert!(matches!(kind, EventKind::KeyValueCleared { .. }));
let json = r#"{"type":"KeyValuesCleared"}"#;
let kind: EventKind = serde_json::from_str(json).unwrap();
assert!(matches!(kind, EventKind::KeyValuesCleared));
}