#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::Either2;
use duroxide::providers::Provider;
use duroxide::providers::sqlite::SqliteProvider;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry};
use std::sync::Arc as StdArc;
use std::time::Duration;
use tempfile::TempDir;
mod common;
async fn create_sqlite_store() -> (StdArc<dyn Provider>, TempDir) {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join("test.db");
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store = StdArc::new(SqliteProvider::new(&db_url, None).await.unwrap()) as StdArc<dyn Provider>;
(store, td)
}
async fn create_sqlite_store_named(name: &str) -> (StdArc<dyn Provider>, TempDir, String) {
let td = tempfile::tempdir().unwrap();
let db_path = td.path().join(format!("{name}.db"));
std::fs::File::create(&db_path).unwrap();
let db_url = format!("sqlite:{}", db_path.display());
let store = StdArc::new(SqliteProvider::new(&db_url, None).await.unwrap()) as StdArc<dyn Provider>;
(store, td, db_url)
}
#[tokio::test]
async fn single_timer_fires() {
let (store, _td) = create_sqlite_store().await;
const TIMER_MS: u64 = 50;
let orch = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(TIMER_MS)).await;
Ok("done".to_string())
};
let reg = OrchestrationRegistry::builder().register("OneTimer", orch).build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = duroxide::Client::new(store.clone());
let start = std::time::Instant::now();
client.start_orchestration("inst-one", "OneTimer", "").await.unwrap();
let status = client
.wait_for_orchestration("inst-one", std::time::Duration::from_secs(5))
.await
.unwrap();
let elapsed = start.elapsed().as_millis() as u64;
assert!(
elapsed >= TIMER_MS,
"Timer fired too early: expected >={TIMER_MS}ms, got {elapsed}ms"
);
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert_eq!(output, "done");
}
drop(rt);
}
#[tokio::test]
async fn multiple_timers_fire_in_order() {
let (store, _td) = create_sqlite_store().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let t1 = ctx.schedule_timer(Duration::from_millis(100)).await;
let t2 = ctx.schedule_timer(Duration::from_millis(50)).await;
let t3 = ctx.schedule_timer(Duration::from_millis(75)).await;
let results = vec![t1, t2, t3];
Ok(format!("timers: {results:?}"))
};
let reg = OrchestrationRegistry::builder().register("MultiTimer", orch).build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("inst-multi", "MultiTimer", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-multi", std::time::Duration::from_secs(5))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
drop(rt);
}
#[tokio::test]
async fn timer_with_activity() {
let (store, _td) = create_sqlite_store().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let timer_future = ctx.schedule_timer(Duration::from_millis(50));
let activity_future = ctx.schedule_activity("TestActivity", "input");
let timer_result = timer_future.await;
let activity_result = activity_future.await.unwrap();
Ok(format!("timer: {timer_result:?}, activity: {activity_result}"))
};
let activity_registry = ActivityRegistry::builder()
.register("TestActivity", |_ctx: ActivityContext, input: String| async move {
Ok(format!("processed: {input}"))
})
.build();
let reg = OrchestrationRegistry::builder().register("TimerActivity", orch).build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, reg).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("inst-timer-activity", "TimerActivity", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-timer-activity", std::time::Duration::from_secs(5))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert!(output.contains("timer:"));
assert!(output.contains("activity: processed: input"));
}
drop(rt);
}
#[tokio::test]
async fn timer_recovery_after_crash_before_fire() {
let (store1, _td, _db_url) = create_sqlite_store_named("timer_recovery").await;
const TIMER_MS: u64 = 500;
let orch = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(TIMER_MS)).await;
let result = ctx.schedule_activity("PostTimer", "done").await?;
Ok(result)
};
let activity_registry = ActivityRegistry::builder()
.register("PostTimer", |_ctx: ActivityContext, input: String| async move {
Ok(format!("Timer fired, then: {input}"))
})
.build();
let orchestration_registry = OrchestrationRegistry::builder()
.register("TimerRecoveryTest", orch)
.build();
let rt = runtime::Runtime::start_with_store(store1.clone(), activity_registry, orchestration_registry).await;
let client = duroxide::Client::new(store1.clone());
client
.start_orchestration("timer-recovery-instance", "TimerRecoveryTest", "")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
drop(rt);
let orch2 = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(TIMER_MS)).await;
let result = ctx.schedule_activity("PostTimer", "done").await?;
Ok(result)
};
let activity_registry2 = ActivityRegistry::builder()
.register("PostTimer", |_ctx: ActivityContext, input: String| async move {
Ok(format!("Timer fired, then: {input}"))
})
.build();
let orchestration_registry2 = OrchestrationRegistry::builder()
.register("TimerRecoveryTest", orch2)
.build();
let rt2 = runtime::Runtime::start_with_store(store1.clone(), activity_registry2, orchestration_registry2).await;
let status = client
.wait_for_orchestration("timer-recovery-instance", std::time::Duration::from_secs(10))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert_eq!(output, "Timer fired, then: done");
}
drop(rt2);
}
#[tokio::test]
async fn timer_recovery_after_crash_after_fire() {
let (store1, _td, _db_url) = create_sqlite_store_named("timer_recovery_after").await;
const TIMER_MS: u64 = 100;
let orch = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(TIMER_MS)).await;
let result = ctx.schedule_activity("PostTimer", "done").await?;
Ok(result)
};
let activity_registry = ActivityRegistry::builder()
.register("PostTimer", |_ctx: ActivityContext, input: String| async move {
Ok(format!("Timer fired, then: {input}"))
})
.build();
let orchestration_registry = OrchestrationRegistry::builder()
.register("TimerRecoveryAfterTest", orch)
.build();
let rt = runtime::Runtime::start_with_store(store1.clone(), activity_registry, orchestration_registry).await;
let client = duroxide::Client::new(store1.clone());
client
.start_orchestration("timer-recovery-after-instance", "TimerRecoveryAfterTest", "")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(TIMER_MS + 50)).await;
drop(rt);
let orch2 = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(TIMER_MS)).await;
let result = ctx.schedule_activity("PostTimer", "done").await?;
Ok(result)
};
let activity_registry2 = ActivityRegistry::builder()
.register("PostTimer", |_ctx: ActivityContext, input: String| async move {
Ok(format!("Timer fired, then: {input}"))
})
.build();
let orchestration_registry2 = OrchestrationRegistry::builder()
.register("TimerRecoveryAfterTest", orch2)
.build();
let rt2 = runtime::Runtime::start_with_store(store1.clone(), activity_registry2, orchestration_registry2).await;
let status = client
.wait_for_orchestration("timer-recovery-after-instance", std::time::Duration::from_secs(5))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert_eq!(output, "Timer fired, then: done");
}
drop(rt2);
}
#[tokio::test]
async fn zero_duration_timer() {
let (store, _td) = create_sqlite_store().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::ZERO).await;
Ok("zero-timer-fired".to_string())
};
let reg = OrchestrationRegistry::builder().register("ZeroTimer", orch).build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = duroxide::Client::new(store.clone());
client.start_orchestration("inst-zero", "ZeroTimer", "").await.unwrap();
let status = client
.wait_for_orchestration("inst-zero", std::time::Duration::from_secs(5))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert_eq!(output, "zero-timer-fired");
}
drop(rt);
}
#[tokio::test]
async fn timer_cancellation() {
let (store, _td) = create_sqlite_store().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(100)).await;
Ok("timer-completed".to_string())
};
let reg = OrchestrationRegistry::builder().register("TimerCancel", orch).build();
let acts = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), acts, reg).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("inst-cancel", "TimerCancel", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("inst-cancel", std::time::Duration::from_secs(5))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert_eq!(output, "timer-completed");
}
drop(rt);
}
#[tokio::test]
async fn multiple_timers_recovery_after_crash() {
let (store1, _td, _db_url) = create_sqlite_store_named("multiple_timers_recovery").await;
const TIMER_MS: u64 = 100;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let timer1 = ctx.schedule_timer(Duration::from_millis(TIMER_MS));
let timer2 = ctx.schedule_timer(Duration::from_millis(TIMER_MS + 50));
let timer3 = ctx.schedule_timer(Duration::from_millis(TIMER_MS + 100));
timer1.await;
timer2.await;
timer3.await;
Ok("all-timers-fired".to_string())
};
let orchestration_registry = OrchestrationRegistry::builder()
.register("MultipleTimersRecoveryTest", orch)
.build();
let rt = runtime::Runtime::start_with_store(
store1.clone(),
ActivityRegistry::builder().build(),
orchestration_registry,
)
.await;
let client = duroxide::Client::new(store1.clone());
client
.start_orchestration("multiple-timers-recovery-instance", "MultipleTimersRecoveryTest", "")
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
drop(rt);
let orch2 = |ctx: OrchestrationContext, _input: String| async move {
let timer1 = ctx.schedule_timer(Duration::from_millis(TIMER_MS));
let timer2 = ctx.schedule_timer(Duration::from_millis(TIMER_MS + 50));
let timer3 = ctx.schedule_timer(Duration::from_millis(TIMER_MS + 100));
timer1.await;
timer2.await;
timer3.await;
Ok("all-timers-fired".to_string())
};
let orchestration_registry2 = OrchestrationRegistry::builder()
.register("MultipleTimersRecoveryTest", orch2)
.build();
let rt2 = runtime::Runtime::start_with_store(
store1.clone(),
ActivityRegistry::builder().build(),
orchestration_registry2,
)
.await;
let status = client
.wait_for_orchestration("multiple-timers-recovery-instance", std::time::Duration::from_secs(10))
.await
.unwrap();
assert!(matches!(
status,
duroxide::runtime::OrchestrationStatus::Completed { .. }
));
if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
assert_eq!(output, "all-timers-fired");
}
drop(rt2);
}
#[tokio::test]
async fn timer_fires_at_correct_time_after_previous_timer() {
let (store, _td) = create_sqlite_store().await;
async fn slow_activity(_ctx: ActivityContext, input: String) -> Result<String, String> {
let delay_ms: u64 = input.parse().unwrap_or(2000);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
Ok("activity_done".to_string())
}
let test_orch = |ctx: OrchestrationContext, _input: String| async move {
ctx.schedule_timer(Duration::from_millis(100)).await;
let _ = ctx.schedule_activity("SlowActivity", "2000").await;
let timer = ctx.schedule_timer(Duration::from_secs(1));
let activity = ctx.schedule_activity("SlowActivity", "100");
let result = match ctx.select2(timer, activity).await {
Either2::First(_) => {
"timer_won".to_string()
}
Either2::Second(r) => {
r.unwrap_or_else(|e| format!("activity_failed: {e}"))
}
};
Ok(result)
};
let orchestrations = OrchestrationRegistry::builder()
.register("TimerFireTimeTest", test_orch)
.build();
let activities = ActivityRegistry::builder()
.register("SlowActivity", slow_activity)
.build();
let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
let client = duroxide::Client::new(store.clone());
client
.start_orchestration("timer-fire-time-test", "TimerFireTimeTest", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("timer-fire-time-test", Duration::from_secs(15))
.await
.unwrap();
rt.shutdown(None).await;
match status {
duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
assert_ne!(
output, "timer_won",
"Timer fired early! The 1-second timer should not beat a 100ms activity. \
This indicates calculate_timer_fire_time bug has regressed."
);
assert_eq!(output, "activity_done", "Activity should have won the race");
}
duroxide::runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected status: {status:?}"),
}
}