#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::Client;
use duroxide::Either2;
use duroxide::EventKind;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry};
mod common;
use std::time::Duration;
#[tokio::test]
async fn persistent_event_basic_delivery() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let data = ctx.dequeue_event("Signal").await;
Ok(format!("got:{data}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-basic", "TestOrch", "")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-pers-basic", "Signal", 3000).await,
"Subscription was never registered"
);
client
.enqueue_event("inst-pers-basic", "Signal", "hello")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-basic", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "got:hello"),
"Expected got:hello, got {:?}",
status
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_event_arrives_before_subscription() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
ctx.schedule_timer(Duration::from_millis(200)).await;
let data = ctx.dequeue_event("Signal").await;
Ok(format!("got:{data}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-before", "TestOrch", "")
.await
.unwrap();
client
.enqueue_event("inst-pers-before", "Signal", "early_bird")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-before", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "got:early_bird"),
"Persistent event should resolve even when raised before subscription, got {:?}",
status
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_event_fifo_ordering() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let d1 = ctx.dequeue_event("Data").await;
let d2 = ctx.dequeue_event("Data").await;
Ok(format!("{d1},{d2}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-fifo", "TestOrch", "")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-pers-fifo", "Data", 3000).await,
"First subscription was never registered"
);
client.enqueue_event("inst-pers-fifo", "Data", "first").await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client.enqueue_event("inst-pers-fifo", "Data", "second").await.unwrap();
let status = client
.wait_for_orchestration("inst-pers-fifo", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "first,second"),
"FIFO ordering broken for persistent events, got {:?}",
status
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_event_survives_select_cancellation_same_batch() {
let (store, _td) = common::create_sqlite_store_disk().await;
let make_orch = || {
|ctx: OrchestrationContext, _: String| async move {
let wait1 = ctx.dequeue_event("Signal");
let timeout1 = ctx.schedule_timer(Duration::from_millis(50));
match ctx.select2(wait1, timeout1).await {
Either2::First(_) => return Ok("unexpected_phase1".to_string()),
Either2::Second(_) => {} }
let data = ctx.dequeue_event("Signal").await;
Ok(format!("got:{data}"))
}
};
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(
store.clone(),
ActivityRegistry::builder().build(),
OrchestrationRegistry::builder()
.register("TestOrch", make_orch())
.build(),
options.clone(),
)
.await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-cancel-same", "TestOrch", "")
.await
.unwrap();
assert!(
common::wait_for_history(
store.clone(),
"inst-pers-cancel-same",
|hist| {
hist.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscribed { name } if name == "Signal"))
.count()
>= 2
},
3000,
)
.await,
"Second QueueSubscribed was never registered"
);
rt.shutdown(None).await;
client
.enqueue_event("inst-pers-cancel-same", "Signal", "survived")
.await
.unwrap();
client
.enqueue_event("inst-pers-cancel-same", "Signal", "extra")
.await
.unwrap();
let rt2 = runtime::Runtime::start_with_options(
store.clone(),
ActivityRegistry::builder().build(),
OrchestrationRegistry::builder()
.register("TestOrch", make_orch())
.build(),
options,
)
.await;
let status = client
.wait_for_orchestration("inst-pers-cancel-same", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "got:survived"),
"Persistent event should survive cancelled subscription, got {:?}",
status
);
let history = store.read("inst-pers-cancel-same").await.unwrap();
let has_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { reason } if reason == "dropped_future"));
assert!(
has_cancel,
"Missing QueueSubscriptionCancelled(dropped_future) for first wait"
);
let arrival_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueEventDelivered { name, .. } if name == "Signal"))
.count();
assert_eq!(
arrival_count, 2,
"Both persistent events should be in history (same batch, dispatcher was stopped during enqueue)"
);
let active_sub_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscribed { name } if name == "Signal"))
.count();
let cancelled_sub_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { .. }))
.count();
assert_eq!(active_sub_count, 2, "Expected 2 total persistent subscriptions");
assert_eq!(cancelled_sub_count, 1, "Expected 1 cancelled persistent subscription");
rt2.shutdown(None).await;
}
#[tokio::test]
async fn persistent_event_survives_select_cancellation_late_extra_discarded() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let wait1 = ctx.dequeue_event("Signal");
let timeout1 = ctx.schedule_timer(Duration::from_millis(50));
match ctx.select2(wait1, timeout1).await {
Either2::First(_) => return Ok("unexpected_phase1".to_string()),
Either2::Second(_) => {} }
let data = ctx.dequeue_event("Signal").await;
Ok(format!("got:{data}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-cancel-late", "TestOrch", "")
.await
.unwrap();
assert!(
common::wait_for_history(
store.clone(),
"inst-pers-cancel-late",
|hist| {
hist.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscribed { name } if name == "Signal"))
.count()
>= 2
},
3000,
)
.await,
"Second QueueSubscribed was never registered"
);
client
.enqueue_event("inst-pers-cancel-late", "Signal", "survived")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-cancel-late", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "got:survived"),
"Persistent event should survive cancelled subscription, got {:?}",
status
);
client
.enqueue_event("inst-pers-cancel-late", "Signal", "extra")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let history = store.read("inst-pers-cancel-late").await.unwrap();
let has_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { reason } if reason == "dropped_future"));
assert!(
has_cancel,
"Missing QueueSubscriptionCancelled(dropped_future) for first wait"
);
let arrival_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueEventDelivered { name, .. } if name == "Signal"))
.count();
assert_eq!(
arrival_count, 1,
"Only 'survived' should be in history — 'extra' arrived after terminal and was discarded"
);
let active_sub_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscribed { name } if name == "Signal"))
.count();
let cancelled_sub_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { .. }))
.count();
assert_eq!(active_sub_count, 2, "Expected 2 total persistent subscriptions");
assert_eq!(cancelled_sub_count, 1, "Expected 1 cancelled persistent subscription");
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_and_positional_are_independent() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let positional = ctx.schedule_wait("Signal");
let persistent = ctx.dequeue_event("Signal");
let (pos_data, pers_data) = ctx.join2(positional, persistent).await;
Ok(format!("pos:{pos_data},pers:{pers_data}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-indep", "TestOrch", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client
.raise_event("inst-pers-indep", "Signal", "positional_data")
.await
.unwrap();
client
.enqueue_event("inst-pers-indep", "Signal", "persistent_data")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-indep", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "pos:positional_data,pers:persistent_data"),
"Persistent and positional should be independent, got {:?}",
status
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_cancelled_subscription_emits_breadcrumb() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let wait = ctx.dequeue_event("Signal");
let timeout = ctx.schedule_timer(Duration::from_millis(50));
match ctx.select2(wait, timeout).await {
Either2::First(_) => Ok("unexpected".to_string()),
Either2::Second(_) => Ok("timer_won".to_string()),
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-breadcrumb", "TestOrch", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-breadcrumb", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "timer_won"),
"Expected timer_won, got {:?}",
status
);
let history = store.read("inst-pers-breadcrumb").await.unwrap();
let has_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { reason } if reason == "dropped_future"));
assert!(
has_cancel,
"Missing QueueSubscriptionCancelled(dropped_future) breadcrumb"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_outstanding_subscription_cancelled_on_completion() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let _unused = ctx.dequeue_event("NeverResolved");
ctx.schedule_timer(Duration::from_millis(50)).await;
Ok("done_early".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-terminal", "TestOrch", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-terminal", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "done_early"),
"Expected done_early, got {:?}",
status
);
let history = store.read("inst-pers-terminal").await.unwrap();
let has_sub = history
.iter()
.any(|e| matches!(&e.kind, EventKind::QueueSubscribed { name } if name == "NeverResolved"));
assert!(has_sub, "Missing QueueSubscribed event");
let has_cancel = history.iter().any(|e| {
matches!(&e.kind, EventKind::QueueSubscriptionCancelled { reason }
if reason == "dropped_future" || reason == "orchestration_terminal")
});
assert!(
has_cancel,
"Missing QueueSubscriptionCancelled breadcrumb for outstanding subscription"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn persistent_multiple_cancelled_subscriptions_only_active_receives() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let wait1 = ctx.dequeue_event("Signal");
let timeout1 = ctx.schedule_timer(Duration::from_millis(30));
match ctx.select2(wait1, timeout1).await {
Either2::First(_) => return Ok("unexpected1".to_string()),
Either2::Second(_) => {}
}
let wait2 = ctx.dequeue_event("Signal");
let timeout2 = ctx.schedule_timer(Duration::from_millis(30));
match ctx.select2(wait2, timeout2).await {
Either2::First(_) => return Ok("unexpected2".to_string()),
Either2::Second(_) => {}
}
let data = ctx.dequeue_event("Signal").await;
Ok(format!("got:{data}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-pers-multi-cancel", "TestOrch", "")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(400)).await;
client
.enqueue_event("inst-pers-multi-cancel", "Signal", "winner")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-pers-multi-cancel", Duration::from_secs(5))
.await
.unwrap();
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == "got:winner"),
"Expected got:winner, got {:?}",
status
);
let history = store.read("inst-pers-multi-cancel").await.unwrap();
let cancel_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { reason } if reason == "dropped_future"))
.count();
assert_eq!(
cancel_count, 2,
"Expected 2 dropped_future cancellation breadcrumbs, got {cancel_count}"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn complex_multi_turn_persistent_and_positional_with_activities() {
let (store, _td) = common::create_sqlite_store_disk().await;
let echo_activity = |_ctx: ActivityContext, input: String| async move { Ok(format!("echo:{input}")) };
let orch = |ctx: OrchestrationContext, _: String| async move {
let mut results = Vec::new();
let pos1 = ctx.schedule_wait("PosFirst").await;
results.push(format!("pos1:{pos1}"));
let act1 = ctx.schedule_activity("Echo", "step2").await?;
results.push(act1);
let pers1 = ctx.dequeue_event("PersA").await;
results.push(format!("pers1:{pers1}"));
let act2 = ctx.schedule_activity("Echo", "step3").await?;
results.push(act2);
let pos_doomed = ctx.schedule_wait("PosDoomed");
let timer = ctx.schedule_timer(Duration::from_millis(50));
match ctx.select2(pos_doomed, timer).await {
Either2::First(d) => results.push(format!("pos_doomed_unexpected:{d}")),
Either2::Second(_) => results.push("pos_cancelled".to_string()),
}
let pers2 = ctx.dequeue_event("PersB").await;
results.push(format!("pers2:{pers2}"));
let act3 = ctx.schedule_activity("Echo", "step4").await?;
results.push(act3);
let pers3 = ctx.dequeue_event("PersC").await;
results.push(format!("pers3:{pers3}"));
Ok(results.join(","))
};
let orchestration_registry = OrchestrationRegistry::builder().register("ComplexOrch", orch).build();
let activity_registry = ActivityRegistry::builder().register("Echo", echo_activity).build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-complex-multi", "ComplexOrch", "")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-complex-multi", "PosFirst", 3000).await,
"PosFirst subscription never registered"
);
client
.raise_event("inst-complex-multi", "PosFirst", "alpha")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-complex-multi", "PersA", 3000).await,
"PersA subscription never registered"
);
client
.enqueue_event("inst-complex-multi", "PersA", "beta")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-complex-multi", "PersB", 3000).await,
"PersB subscription never registered"
);
client
.enqueue_event("inst-complex-multi", "PersB", "gamma")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-complex-multi", "PersC", 3000).await,
"PersC subscription never registered"
);
client
.enqueue_event("inst-complex-multi", "PersC", "delta")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-complex-multi", Duration::from_secs(10))
.await
.unwrap();
let expected = "pos1:alpha,echo:step2,pers1:beta,echo:step3,pos_cancelled,pers2:gamma,echo:step4,pers3:delta";
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == expected),
"Expected:\n {expected}\nGot:\n {:?}",
status
);
let history = store.read("inst-complex-multi").await.unwrap();
let has_pos_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::ExternalSubscribedCancelled { reason } if reason == "dropped_future"));
assert!(
has_pos_cancel,
"Missing positional ExternalSubscribedCancelled(dropped_future) breadcrumb"
);
let pers_cancel_count = history
.iter()
.filter(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { .. }))
.count();
assert_eq!(
pers_cancel_count, 0,
"No persistent subscriptions should be cancelled, got {pers_cancel_count}"
);
rt.shutdown(None).await;
}
#[tokio::test]
async fn select_then_join_persistent_and_positional() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let mut results = Vec::new();
let pos_race = ctx.schedule_wait("RacePos");
let pers_race = ctx.dequeue_event("RacePers");
match ctx.select2(pos_race, pers_race).await {
Either2::First(d) => results.push(format!("select_pos:{d}")),
Either2::Second(d) => results.push(format!("select_pers:{d}")),
}
let pos_join = ctx.schedule_wait("JoinPos");
let pers_join = ctx.dequeue_event("JoinPers");
let (pos_data, pers_data) = ctx.join2(pos_join, pers_join).await;
results.push(format!("join_pos:{pos_data}"));
results.push(format!("join_pers:{pers_data}"));
Ok(results.join(","))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-sel-join", "TestOrch", "")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-sel-join", "RacePers", 3000).await,
"RacePers subscription never registered"
);
client.enqueue_event("inst-sel-join", "RacePers", "fast").await.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-sel-join", "JoinPos", 3000).await,
"JoinPos subscription never registered"
);
client.raise_event("inst-sel-join", "JoinPos", "left").await.unwrap();
client
.enqueue_event("inst-sel-join", "JoinPers", "right")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-sel-join", Duration::from_secs(10))
.await
.unwrap();
let expected = "select_pers:fast,join_pos:left,join_pers:right";
assert!(
matches!(status, runtime::OrchestrationStatus::Completed { ref output, .. } if output == expected),
"Expected:\n {expected}\nGot:\n {:?}",
status
);
let history = store.read("inst-sel-join").await.unwrap();
let pos_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::ExternalSubscribedCancelled { reason } if reason == "dropped_future"));
assert!(
pos_cancel,
"Positional RacePos should have a dropped_future cancellation breadcrumb"
);
let pers_cancel = history
.iter()
.any(|e| matches!(&e.kind, EventKind::QueueSubscriptionCancelled { .. }));
assert!(!pers_cancel, "No persistent subscriptions should be cancelled");
rt.shutdown(None).await;
}
#[tokio::test]
async fn raise_event_typed_round_trip() {
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct Payload {
id: u32,
msg: String,
}
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let p: Payload = ctx.schedule_wait_typed("Signal").await;
Ok(format!("id={},msg={}", p.id, p.msg))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TypedWait", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-typed-wait", "TypedWait", "")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-typed-wait", "Signal", 3000).await,
"Subscription was never registered"
);
let payload = Payload {
id: 42,
msg: "hello".into(),
};
client
.raise_event_typed("inst-typed-wait", "Signal", &payload)
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-typed-wait", Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "id=42,msg=hello");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn enqueue_event_typed_round_trip() {
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
struct Command {
action: String,
value: i64,
}
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let cmd: Command = ctx.dequeue_event_typed("commands").await;
Ok(format!("{}:{}", cmd.action, cmd.value))
};
let orchestration_registry = OrchestrationRegistry::builder().register("TypedQueue", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-typed-queue", "TypedQueue", "")
.await
.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-typed-queue", "commands", 3000).await,
"Subscription was never registered"
);
let cmd = Command {
action: "increment".into(),
value: 99,
};
client
.enqueue_event_typed("inst-typed-queue", "commands", &cmd)
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-typed-queue", Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "increment:99");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn multi_queue_isolation_and_independent_fifo() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let order1 = ctx.dequeue_event("orders").await;
let order2 = ctx.dequeue_event("orders").await;
let pay1 = ctx.dequeue_event("payments").await;
let pay2 = ctx.dequeue_event("payments").await;
let notif1 = ctx.dequeue_event("notifications").await;
Ok(format!("{order1},{order2}|{pay1},{pay2}|{notif1}"))
};
let orchestration_registry = OrchestrationRegistry::builder().register("MultiQueue", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-multi-q", "MultiQueue", "")
.await
.unwrap();
client
.enqueue_event("inst-multi-q", "payments", "pay_first")
.await
.unwrap();
client
.enqueue_event("inst-multi-q", "orders", "order_first")
.await
.unwrap();
client
.enqueue_event("inst-multi-q", "notifications", "alert_1")
.await
.unwrap();
client
.enqueue_event("inst-multi-q", "orders", "order_second")
.await
.unwrap();
client
.enqueue_event("inst-multi-q", "payments", "pay_second")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-multi-q", Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "order_first,order_second|pay_first,pay_second|alert_1");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn multi_queue_staggered_delivery() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let cmd1 = ctx.dequeue_event("commands").await;
let _ = ctx.schedule_activity("Noop", "x").await?;
let stat1 = ctx.dequeue_event("status").await;
let cmd2 = ctx.dequeue_event("commands").await;
let stat2 = ctx.dequeue_event("status").await;
Ok(format!("cmd:{cmd1},{cmd2}|stat:{stat1},{stat2}"))
};
let noop_activity = |_ctx: ActivityContext, _input: String| async move { Ok("ok".to_string()) };
let orchestration_registry = OrchestrationRegistry::builder().register("StaggeredQ", orch).build();
let activity_registry = ActivityRegistry::builder().register("Noop", noop_activity).build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.start_orchestration("inst-staggered-q", "StaggeredQ", "")
.await
.unwrap();
client
.enqueue_event("inst-staggered-q", "commands", "c1")
.await
.unwrap();
client
.enqueue_event("inst-staggered-q", "commands", "c2")
.await
.unwrap();
client.enqueue_event("inst-staggered-q", "status", "s1").await.unwrap();
assert!(
common::wait_for_subscription(store.clone(), "inst-staggered-q", "status", 3000).await,
"Status subscription was never registered"
);
tokio::time::sleep(Duration::from_millis(300)).await;
client.enqueue_event("inst-staggered-q", "status", "s2").await.unwrap();
let status = client
.wait_for_orchestration("inst-staggered-q", Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "cmd:c1,c2|stat:s1,s2");
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn events_enqueued_before_start_orchestration_are_dropped() {
let (store, _td) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _: String| async move {
let event = ctx.dequeue_event("pre-start-q");
let timeout = ctx.schedule_timer(Duration::from_millis(500));
match ctx.select2(event, timeout).await {
Either2::First(data) => Ok(format!("unexpected:{data}")),
Either2::Second(()) => {
Ok("timeout_as_expected".to_string())
}
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("OrphanTest", orch).build();
let activity_registry = ActivityRegistry::builder().build();
let options = runtime::RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(10),
..Default::default()
};
let rt =
runtime::Runtime::start_with_options(store.clone(), activity_registry, orchestration_registry, options).await;
let client = Client::new(store.clone());
client
.enqueue_event("inst-orphan", "pre-start-q", "should-be-dropped")
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
client
.start_orchestration("inst-orphan", "OrphanTest", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-orphan", Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(
output, "timeout_as_expected",
"Pre-start event should have been dropped; dequeue should have timed out"
);
}
other => panic!("Expected Completed, got: {other:?}"),
}
rt.shutdown(None).await;
}