#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use std::sync::Arc;
use std::time::Duration;
mod common;
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self, RuntimeOptions};
use duroxide::{Client, OrchestrationContext, OrchestrationRegistry};
#[tokio::test]
async fn default_lock_timeouts() {
let options = RuntimeOptions::default();
assert_eq!(
options.orchestrator_lock_timeout,
Duration::from_secs(5),
"Default orchestrator lock timeout should be 5 seconds"
);
assert_eq!(
options.worker_lock_timeout,
Duration::from_secs(30),
"Default worker lock timeout should be 30 seconds"
);
assert_eq!(
options.worker_lock_renewal_buffer,
Duration::from_secs(5),
"Default worker lock renewal buffer should be 5 seconds"
);
}
#[tokio::test]
async fn custom_lock_timeout_configuration() {
let options = RuntimeOptions {
orchestrator_lock_timeout: Duration::from_secs(10),
worker_lock_timeout: Duration::from_secs(120),
..Default::default()
};
assert_eq!(options.orchestrator_lock_timeout, Duration::from_secs(10));
assert_eq!(options.worker_lock_timeout, Duration::from_secs(120));
let short_options = RuntimeOptions {
orchestrator_lock_timeout: Duration::from_secs(1),
worker_lock_timeout: Duration::from_secs(5),
..Default::default()
};
assert_eq!(short_options.orchestrator_lock_timeout, Duration::from_secs(1));
assert_eq!(short_options.worker_lock_timeout, Duration::from_secs(5));
}
#[tokio::test]
async fn orchestration_with_custom_timeout_completes() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("TestActivity", "data").await?;
Ok(result)
};
let acts = ActivityRegistry::builder()
.register(
"TestActivity",
|_ctx: duroxide::ActivityContext, input: String| async move { Ok(format!("processed: {input}")) },
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
orchestrator_lock_timeout: Duration::from_secs(60),
worker_lock_timeout: Duration::from_secs(60),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-custom-timeout";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "processed: data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn very_short_lock_timeout_works() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("FastActivity", "data").await?;
Ok(result)
};
let acts = ActivityRegistry::builder()
.register(
"FastActivity",
|_ctx: duroxide::ActivityContext, input: String| async move {
Ok(format!("processed: {input}"))
},
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
orchestrator_lock_timeout: Duration::from_secs(1),
worker_lock_timeout: Duration::from_secs(1),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-short-timeout";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "processed: data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn long_running_activity_with_lock_renewal() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("LongActivity", "data").await?;
Ok(result)
};
let acts = ActivityRegistry::builder()
.register(
"LongActivity",
|_ctx: duroxide::ActivityContext, input: String| async move {
tokio::time::sleep(Duration::from_secs(6)).await;
Ok(format!("completed: {input}"))
},
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
worker_lock_timeout: Duration::from_secs(3),
worker_lock_renewal_buffer: Duration::from_secs(1),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-long-activity-renewal";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(10))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "completed: data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn short_activity_no_renewal_needed() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("QuickActivity", "data").await?;
Ok(result)
};
let acts = ActivityRegistry::builder()
.register(
"QuickActivity",
|_ctx: duroxide::ActivityContext, input: String| async move {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(format!("quick: {input}"))
},
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
worker_lock_timeout: Duration::from_secs(3),
worker_lock_renewal_buffer: Duration::from_secs(1),
..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-quick-activity";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(5))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "quick: data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn lock_renewal_short_timeout() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("MediumActivity", "data").await?;
Ok(result)
};
let acts = ActivityRegistry::builder()
.register(
"MediumActivity",
|_ctx: duroxide::ActivityContext, input: String| async move {
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(format!("medium: {input}"))
},
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
worker_lock_timeout: Duration::from_secs(2),
worker_lock_renewal_buffer: Duration::from_secs(1), ..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-short-timeout-renewal";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(6))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "medium: data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn custom_renewal_buffer() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("LongActivity", "data").await?;
Ok(result)
};
let acts = ActivityRegistry::builder()
.register(
"LongActivity",
|_ctx: duroxide::ActivityContext, input: String| async move {
tokio::time::sleep(Duration::from_secs(6)).await;
Ok(format!("done: {input}"))
},
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
worker_lock_timeout: Duration::from_secs(5),
worker_lock_renewal_buffer: Duration::from_secs(2), ..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-custom-buffer";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(10))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "done: data");
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn concurrent_activities_with_renewal() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let orch = |ctx: OrchestrationContext, _input: String| async move {
let a1 = ctx.schedule_activity("LongActivity", "task1");
let a2 = ctx.schedule_activity("LongActivity", "task2");
let a3 = ctx.schedule_activity("LongActivity", "task3");
let r1 = a1.await?;
let r2 = a2.await?;
let r3 = a3.await?;
Ok(format!("{r1}, {r2}, {r3}"))
};
let acts = ActivityRegistry::builder()
.register(
"LongActivity",
|_ctx: duroxide::ActivityContext, input: String| async move {
tokio::time::sleep(Duration::from_secs(6)).await;
Ok(format!("completed-{input}"))
},
)
.build();
let reg = OrchestrationRegistry::builder().register("TestOrch", orch).build();
let options = RuntimeOptions {
worker_lock_timeout: Duration::from_secs(3),
worker_lock_renewal_buffer: Duration::from_secs(1),
worker_concurrency: 3, ..Default::default()
};
let rt = runtime::Runtime::start_with_options(store.clone(), acts, reg, options).await;
let client = Client::new(store.clone());
let inst = "inst-concurrent-renewal";
client.start_orchestration(inst, "TestOrch", "").await.unwrap();
let status = client
.wait_for_orchestration(inst, Duration::from_secs(25))
.await
.unwrap();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert!(output.contains("completed-task1"));
assert!(output.contains("completed-task2"));
assert!(output.contains("completed-task3"));
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!("Orchestration failed: {}", details.display_message());
}
_ => panic!("Unexpected orchestration status"),
}
rt.shutdown(None).await;
}
#[tokio::test]
async fn orchestrator_lock_renewal_buffer_defaults() {
let options = RuntimeOptions::default();
assert_eq!(
options.orchestrator_lock_renewal_buffer,
Duration::from_secs(2),
"Default orchestrator lock renewal buffer should be 2 seconds"
);
let custom = RuntimeOptions {
orchestrator_lock_renewal_buffer: Duration::from_secs(5),
..Default::default()
};
assert_eq!(custom.orchestrator_lock_renewal_buffer, Duration::from_secs(5));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn orchestration_lock_renewal_prevents_expiration() {
use duroxide::providers::sqlite::SqliteProvider;
use duroxide::runtime::test_hooks;
test_hooks::clear_orch_processing_delay();
let store = Arc::new(SqliteProvider::new_in_memory().await.unwrap());
let options = RuntimeOptions {
orchestrator_lock_timeout: Duration::from_secs(2),
orchestrator_lock_renewal_buffer: Duration::from_secs(1), orchestration_concurrency: 1,
..Default::default()
};
let activities = ActivityRegistry::builder()
.register(
"QuickActivity",
|_ctx: duroxide::ActivityContext, _input: String| async move { Ok("done".to_string()) },
)
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"RenewalTestOrch",
|ctx: OrchestrationContext, _input: String| async move {
let result = ctx.schedule_activity("QuickActivity", "{}").await?;
Ok(format!("completed: {result}"))
},
)
.build();
let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
let client = Client::new(store.clone());
test_hooks::set_orch_processing_delay(Duration::from_secs(4), Some("lock-renewal-e2e"));
client
.start_orchestration("lock-renewal-e2e", "RenewalTestOrch", "")
.await
.unwrap();
let status = client
.wait_for_orchestration("lock-renewal-e2e", Duration::from_secs(15))
.await
.unwrap();
test_hooks::clear_orch_processing_delay();
match status {
runtime::OrchestrationStatus::Completed { output, .. } => {
assert!(output.contains("completed"));
assert!(output.contains("done"));
}
runtime::OrchestrationStatus::Failed { details, .. } => {
panic!(
"Orchestration failed - lock renewal may not be working: {}",
details.display_message()
);
}
other => panic!("Unexpected status: {other:?}"),
}
rt.shutdown(None).await;
}