#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]
use duroxide::providers::{InstanceFilter, PruneOptions};
use duroxide::runtime::registry::ActivityRegistry;
use duroxide::runtime::{self, RuntimeOptions};
use duroxide::{ActivityContext, Client, OrchestrationContext, OrchestrationRegistry};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
mod common;
fn fast_runtime_options() -> RuntimeOptions {
RuntimeOptions {
dispatcher_min_poll_interval: Duration::from_millis(50),
..Default::default()
}
}
async fn wait_for_terminal(client: &Client, instance_id: &str, timeout: Duration) -> bool {
let deadline = std::time::Instant::now() + timeout;
loop {
if let Ok(info) = client.get_instance_info(instance_id).await
&& (info.status == "Completed" || info.status == "Failed")
{
return true;
}
if std::time::Instant::now() > deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn test_prune_continue_as_new_chain() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let client = Client::new(store.clone());
let orchestrations = OrchestrationRegistry::builder()
.register(
"ContinueOrch",
|ctx: OrchestrationContext, count_str: String| async move {
let count: u32 = count_str.parse().unwrap_or(0);
if count < 5 {
ctx.continue_as_new((count + 1).to_string()).await
} else {
Ok(format!("Final: {count}"))
}
},
)
.build();
let _rt = runtime::Runtime::start_with_options(
store.clone(),
ActivityRegistry::builder().build(),
orchestrations,
fast_runtime_options(),
)
.await;
client
.start_orchestration("prune-chain", "ContinueOrch", "0")
.await
.unwrap();
assert!(
wait_for_terminal(&client, "prune-chain", Duration::from_secs(10)).await,
"Should complete"
);
let executions = client.list_executions("prune-chain").await.unwrap();
assert_eq!(executions.len(), 6, "Should have 6 executions");
let result = client
.prune_executions(
"prune-chain",
PruneOptions {
keep_last: Some(2),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(result.instances_processed, 1);
assert!(result.executions_deleted >= 4, "Should delete at least 4 executions");
let executions_after = client.list_executions("prune-chain").await.unwrap();
assert_eq!(executions_after.len(), 2, "Should have 2 executions remaining");
assert!(executions_after.contains(&5) || executions_after.contains(&6));
}
#[tokio::test]
async fn test_prune_bulk_operations() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let client = Client::new(store.clone());
let orchestrations = OrchestrationRegistry::builder()
.register(
"ContinueOrch",
|ctx: OrchestrationContext, count_str: String| async move {
let count: u32 = count_str.parse().unwrap_or(0);
if count < 3 {
ctx.continue_as_new((count + 1).to_string()).await
} else {
Ok(format!("Final: {count}"))
}
},
)
.build();
let _rt = runtime::Runtime::start_with_options(
store.clone(),
ActivityRegistry::builder().build(),
orchestrations,
fast_runtime_options(),
)
.await;
for i in 0..3 {
client
.start_orchestration(&format!("prune-bulk-{i}"), "ContinueOrch", "0")
.await
.unwrap();
}
for i in 0..3 {
assert!(
wait_for_terminal(&client, &format!("prune-bulk-{i}"), Duration::from_secs(10)).await,
"Instance {i} should complete"
);
}
for i in 0..3 {
let executions = client.list_executions(&format!("prune-bulk-{i}")).await.unwrap();
assert_eq!(executions.len(), 4, "Instance {i} should have 4 executions");
}
let result = client
.prune_executions_bulk(
InstanceFilter {
instance_ids: Some(vec!["prune-bulk-0".into(), "prune-bulk-1".into()]),
..Default::default()
},
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(result.instances_processed, 2);
assert!(result.executions_deleted >= 6, "Should delete 3 from each instance");
assert_eq!(client.list_executions("prune-bulk-0").await.unwrap().len(), 1);
assert_eq!(client.list_executions("prune-bulk-1").await.unwrap().len(), 1);
assert_eq!(client.list_executions("prune-bulk-2").await.unwrap().len(), 4);
}
#[tokio::test]
async fn test_prune_safety() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let client = Client::new(store.clone());
let orchestrations = OrchestrationRegistry::builder()
.register(
"ContinueOrch",
|ctx: OrchestrationContext, count_str: String| async move {
let count: u32 = count_str.parse().unwrap_or(0);
if count < 2 {
ctx.continue_as_new((count + 1).to_string()).await
} else {
Ok(format!("Final: {count}"))
}
},
)
.build();
let _rt = runtime::Runtime::start_with_options(
store.clone(),
ActivityRegistry::builder().build(),
orchestrations,
fast_runtime_options(),
)
.await;
client
.start_orchestration("prune-safety", "ContinueOrch", "0")
.await
.unwrap();
wait_for_terminal(&client, "prune-safety", Duration::from_secs(10)).await;
let info = client.get_instance_info("prune-safety").await.unwrap();
let current_exec = info.current_execution_id;
let _result = client
.prune_executions(
"prune-safety",
PruneOptions {
keep_last: Some(0),
..Default::default()
},
)
.await
.unwrap();
let executions = client.list_executions("prune-safety").await.unwrap();
assert!(
executions.contains(¤t_exec),
"Current execution should never be deleted"
);
client
.start_orchestration("prune-safety-empty", "ContinueOrch", "0")
.await
.unwrap();
wait_for_terminal(&client, "prune-safety-empty", Duration::from_secs(10)).await;
let info = client.get_instance_info("prune-safety-empty").await.unwrap();
let current_exec_empty = info.current_execution_id;
let _result = client
.prune_executions(
"prune-safety-empty",
PruneOptions {
keep_last: None,
completed_before: None,
},
)
.await
.unwrap();
let executions_after = client.list_executions("prune-safety-empty").await.unwrap();
assert!(
executions_after.contains(¤t_exec_empty),
"Current execution should never be deleted"
);
}
#[tokio::test]
async fn test_prune_during_active_continue_as_new() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let client = Client::new(store.clone());
let execution_counter = Arc::new(AtomicU32::new(0));
let execution_counter_clone = execution_counter.clone();
let activities = ActivityRegistry::builder()
.register("SlowActivity", move |_ctx: ActivityContext, input: String| {
let counter = execution_counter_clone.clone();
async move {
let exec_num: u32 = input.parse().unwrap_or(0);
counter.store(exec_num, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(format!("exec-{exec_num}"))
}
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"ActiveContinueOrch",
|ctx: OrchestrationContext, count_str: String| async move {
let count: u32 = count_str.parse().unwrap_or(0);
ctx.schedule_activity("SlowActivity", count.to_string()).await?;
if count < 5 {
ctx.continue_as_new((count + 1).to_string()).await
} else {
Ok(format!("Final: {count}"))
}
},
)
.build();
let _rt =
runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, fast_runtime_options()).await;
client
.start_orchestration("prune-active", "ActiveContinueOrch", "0")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while execution_counter.load(Ordering::SeqCst) < 3 {
if std::time::Instant::now() > deadline {
panic!("Orchestration never reached execution 3");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let result = client
.prune_executions(
"prune-active",
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
assert!(result.instances_processed == 1, "Should process 1 instance");
let info = client.get_instance_info("prune-active").await.unwrap();
assert_eq!(info.status, "Running", "Instance should still be running");
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if let Ok(info) = client.get_instance_info("prune-active").await
&& info.status == "Completed"
{
break;
}
if std::time::Instant::now() > deadline {
panic!("Orchestration never completed");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let info = client.get_instance_info("prune-active").await.unwrap();
assert_eq!(info.status, "Completed");
assert!(
info.output.unwrap().contains("Final: 5"),
"Should complete with final value"
);
}
#[tokio::test]
async fn test_prune_bulk_includes_running_instances() {
let (store, _temp_dir) = common::create_sqlite_store_disk().await;
let client = Client::new(store.clone());
let execution_counter = Arc::new(AtomicU32::new(0));
let execution_counter_clone = execution_counter.clone();
let activities = ActivityRegistry::builder()
.register("SignalExecution", move |_ctx: ActivityContext, input: String| {
let counter = execution_counter_clone.clone();
async move {
let exec_num: u32 = input.parse().unwrap_or(0);
counter.store(exec_num, Ordering::SeqCst);
Ok(format!("exec-{exec_num}"))
}
})
.build();
let orchestrations = OrchestrationRegistry::builder()
.register(
"LongRunningWithContinueAsNew",
|ctx: OrchestrationContext, count_str: String| async move {
let count: u32 = count_str.parse().unwrap_or(0);
ctx.schedule_activity("SignalExecution", count.to_string()).await?;
if count == 5 {
let _signal = ctx.schedule_wait("proceed").await;
}
if count < 10 {
ctx.continue_as_new((count + 1).to_string()).await
} else {
Ok(format!("Final: {count}"))
}
},
)
.build();
let _rt =
runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, fast_runtime_options()).await;
client
.start_orchestration("bulk-prune-running", "LongRunningWithContinueAsNew", "0")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(15);
while execution_counter.load(Ordering::SeqCst) < 5 {
if std::time::Instant::now() > deadline {
panic!("Orchestration never reached execution 5");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(200)).await;
let info = client.get_instance_info("bulk-prune-running").await.unwrap();
assert_eq!(info.status, "Running", "Should be running and waiting for event");
let executions_before = client.list_executions("bulk-prune-running").await.unwrap();
assert!(
executions_before.len() >= 5,
"Should have at least 5 executions, got {}",
executions_before.len()
);
let result = client
.prune_executions_bulk(
InstanceFilter {
instance_ids: None, completed_before: None,
limit: Some(100),
},
PruneOptions {
keep_last: Some(2),
..Default::default()
},
)
.await
.unwrap();
assert!(
result.instances_processed >= 1,
"Should process at least 1 instance (the running one)"
);
assert!(
result.executions_deleted >= 3,
"Should have pruned old executions, got {} deleted",
result.executions_deleted
);
let info_after = client.get_instance_info("bulk-prune-running").await.unwrap();
assert_eq!(info_after.status, "Running", "Instance should still be running");
let executions_after = client.list_executions("bulk-prune-running").await.unwrap();
assert!(
executions_after.len() <= 2,
"Should have at most 2 executions after prune, got {}",
executions_after.len()
);
assert!(
executions_after.contains(&info_after.current_execution_id),
"Current execution must be preserved"
);
client
.raise_event("bulk-prune-running", "proceed", "go!")
.await
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(15);
loop {
if let Ok(info) = client.get_instance_info("bulk-prune-running").await
&& info.status == "Completed"
{
break;
}
if std::time::Instant::now() > deadline {
panic!("Orchestration never completed after event");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let final_info = client.get_instance_info("bulk-prune-running").await.unwrap();
assert_eq!(final_info.status, "Completed");
assert!(
final_info.output.unwrap().contains("Final: 10"),
"Should complete with final value after event"
);
}