use super::helpers::*;
use async_trait::async_trait;
use duroxide::{OrchestrationContext, OrchestrationHandler};
use std::sync::Arc;
use std::time::Duration;
struct LargeSubOrchestrationFanInHandler {
count: usize,
}
impl LargeSubOrchestrationFanInHandler {
fn new(count: usize) -> Arc<Self> {
Arc::new(Self { count })
}
}
#[async_trait]
impl OrchestrationHandler for LargeSubOrchestrationFanInHandler {
async fn invoke(&self, ctx: OrchestrationContext, _input: String) -> Result<String, String> {
let futures = (0..self.count)
.map(|idx| ctx.schedule_sub_orchestration_with_id("Child", format!("child-{idx}"), idx.to_string()))
.collect::<Vec<_>>();
let results = ctx.join(futures).await;
let completed = results.into_iter().collect::<Result<Vec<_>, _>>()?;
for (idx, result) in completed.iter().enumerate() {
if result != &idx.to_string() {
return Err(format!("result at index {idx} was {result}"));
}
}
Ok(completed.len().to_string())
}
}
#[test]
fn select2_activity_wins() {
let history = vec![
started_event(1), activity_scheduled(2, "Task", "input"), timer_created(3, 1000), activity_completed(4, 2, "activity-result"), ];
let mut engine = create_engine(history);
let handler = Select2Handler::new("Task", "input", Duration::from_secs(60));
let result = execute(&mut engine, handler);
assert_completed(&result, "activity:activity-result");
}
#[test]
fn select2_timer_wins() {
let history = vec![
started_event(1), activity_scheduled(2, "Task", "input"), timer_created(3, 1000), timer_fired(4, 3, 1000), ];
let mut engine = create_engine(history);
let handler = Select2Handler::new("Task", "input", Duration::from_secs(60));
let result = execute(&mut engine, handler);
assert_completed(&result, "timeout");
}
#[test]
fn select2_both_pending() {
let history = vec![
started_event(1), activity_scheduled(2, "Task", "input"), timer_created(3, 1000), ];
let mut engine = create_engine(history);
let handler = Select2Handler::new("Task", "input", Duration::from_secs(60));
let result = execute(&mut engine, handler);
assert_continue(&result);
}
#[test]
fn join_all_complete() {
let history = vec![
started_event(1), activity_scheduled(2, "A", "a"), activity_scheduled(3, "B", "b"), activity_completed(4, 2, "result-a"), activity_completed(5, 3, "result-b"), ];
let mut engine = create_engine(history);
let handler = JoinActivitiesHandler::new(vec![("A", "a"), ("B", "b")]);
let result = execute(&mut engine, handler);
assert_completed(&result, "result-a,result-b");
}
#[test]
fn join_partial_complete() {
let history = vec![
started_event(1), activity_scheduled(2, "A", "a"), activity_scheduled(3, "B", "b"), activity_completed(4, 2, "result-a"), ];
let mut engine = create_engine(history);
let handler = JoinActivitiesHandler::new(vec![("A", "a"), ("B", "b")]);
let result = execute(&mut engine, handler);
assert_continue(&result);
}
#[test]
fn join_fresh_schedule() {
let history = vec![started_event(1)]; let mut engine = create_engine(history);
let handler = JoinActivitiesHandler::new(vec![("A", "a"), ("B", "b"), ("C", "c")]);
let result = execute(&mut engine, handler);
assert_continue(&result);
assert_eq!(engine.pending_actions().len(), 3, "Three activities should be pending");
}
#[test]
fn join_one_fails() {
let history = vec![
started_event(1), activity_scheduled(2, "A", "a"), activity_scheduled(3, "B", "b"), activity_completed(4, 2, "result-a"), activity_failed(5, 3, "B failed"), ];
let mut engine = create_engine(history);
let handler = JoinActivitiesHandler::new(vec![("A", "a"), ("B", "b")]);
let result = execute(&mut engine, handler);
assert_failed(&result);
}
#[test]
fn join_large_completed_sub_orchestration_fan_in_reaches_terminal() {
const FAN_IN_COUNT: usize = 1024;
let mut history = Vec::with_capacity(1 + FAN_IN_COUNT * 2);
history.push(started_event(1));
for idx in 0..FAN_IN_COUNT {
history.push(sub_orch_scheduled(
(idx + 2) as u64,
"Child",
&format!("child-{idx}"),
&idx.to_string(),
));
}
for idx in 0..FAN_IN_COUNT {
history.push(sub_orch_completed(
(FAN_IN_COUNT + idx + 2) as u64,
(idx + 2) as u64,
&idx.to_string(),
));
}
let mut engine = create_engine(history);
let handler = LargeSubOrchestrationFanInHandler::new(FAN_IN_COUNT);
let result = execute(&mut engine, handler);
assert_completed(&result, &FAN_IN_COUNT.to_string());
assert!(
engine.pending_actions().is_empty(),
"fully replayed fan-in should not emit new pending work"
);
}