use crate::provider_validation::{Event, EventKind, ExecutionMetadata};
use crate::provider_validations::ProviderFactory;
use crate::providers::WorkItem;
use std::time::Duration;
fn start_item_with_execution(instance: &str, execution_id: u64) -> WorkItem {
WorkItem::StartOrchestration {
instance: instance.to_string(),
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: None,
parent_id: None,
execution_id,
}
}
pub async fn test_execution_isolation<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing multi-execution: execution isolation");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 1), None)
.await
.unwrap();
let (_item1, lock_token1, _attempt_count1) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token1,
1,
vec![
Event::with_event_id(
1,
"instance-A".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"instance-A".to_string(),
1,
None,
EventKind::ActivityScheduled {
name: "Activity1".to_string(),
input: "input1".to_string(),
session_id: None,
tag: None,
},
),
Event::with_event_id(
3,
"instance-A".to_string(),
1,
None,
EventKind::OrchestrationCompleted {
output: "result1".to_string(),
},
),
],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 2), None)
.await
.unwrap();
let (_item2, lock_token2, _attempt_count2) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token2,
2,
vec![
Event::with_event_id(
1,
"instance-A".to_string(),
2,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"instance-A".to_string(),
2,
None,
EventKind::OrchestrationCompleted {
output: "result2".to_string(),
},
),
],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let history1 = provider.read_with_execution("instance-A", 1).await.unwrap_or_default();
assert_eq!(history1.len(), 3);
assert!(matches!(&history1[0].kind, EventKind::OrchestrationStarted { .. }));
assert!(matches!(&history1[1].kind, EventKind::ActivityScheduled { .. }));
assert!(matches!(&history1[2].kind, EventKind::OrchestrationCompleted { .. }));
let history2 = provider.read_with_execution("instance-A", 2).await.unwrap_or_default();
assert_eq!(history2.len(), 2);
assert!(matches!(&history2[0].kind, EventKind::OrchestrationStarted { .. }));
assert!(matches!(&history2[1].kind, EventKind::OrchestrationCompleted { .. }));
let latest = provider.read("instance-A").await.unwrap_or_default();
assert_eq!(latest.len(), 2);
assert!(matches!(&latest[0].kind, EventKind::OrchestrationStarted { .. }));
assert!(matches!(&latest[1].kind, EventKind::OrchestrationCompleted { .. }));
tracing::info!("✓ Test passed: execution isolation verified");
}
pub async fn test_latest_execution_detection<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing multi-execution: latest execution detection");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 1), None)
.await
.unwrap();
let (_item1, lock_token1, _attempt_count1) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token1,
1,
vec![Event::with_event_id(
1,
"instance-A".to_string(),
1,
None,
EventKind::ActivityScheduled {
name: "A".to_string(),
input: "".to_string(),
session_id: None,
tag: None,
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 2), None)
.await
.unwrap();
let (_item2, lock_token2, _attempt_count2) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token2,
2,
vec![Event::with_event_id(
1,
"instance-A".to_string(),
2,
None,
EventKind::ActivityScheduled {
name: "B".to_string(),
input: "".to_string(),
session_id: None,
tag: None,
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let latest = provider.read("instance-A").await.unwrap_or_default();
assert_eq!(latest.len(), 1);
if let EventKind::ActivityScheduled { name, .. } = &latest[0].kind {
assert_eq!(name, "B");
} else {
panic!("Expected ActivityScheduled");
}
let exec1 = provider.read_with_execution("instance-A", 1).await.unwrap_or_default();
assert_eq!(exec1.len(), 1);
if let EventKind::ActivityScheduled { name, .. } = &exec1[0].kind {
assert_eq!(name, "A");
} else {
panic!("Expected ActivityScheduled");
}
tracing::info!("✓ Test passed: latest execution detection verified");
}
pub async fn test_execution_id_sequencing<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing multi-execution: execution ID sequencing");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 1), None)
.await
.unwrap();
let (item1, lock_token1, _attempt_count1) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item1.execution_id, 1);
provider
.ack_orchestration_item(
&lock_token1,
1,
vec![Event::with_event_id(
1,
"instance-A".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 2), None)
.await
.unwrap();
let (_item2, lock_token2, _attempt_count2) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token2,
2, vec![Event::with_event_id(
1,
"instance-A".to_string(),
2,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 3), None)
.await
.unwrap();
let (item3, _lock_token3, _attempt_count3) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item3.execution_id, 2, "Current execution should be 2");
tracing::info!("✓ Test passed: execution ID sequencing verified");
}
pub async fn test_continue_as_new_creates_new_execution<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing multi-execution: continue-as-new creates new execution");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", 1), None)
.await
.unwrap();
let (_item1, lock_token1, _attempt_count1) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token1,
1,
vec![Event::with_event_id(
1,
"instance-A".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(
WorkItem::ContinueAsNew {
instance: "instance-A".to_string(),
orchestration: "TestOrch".to_string(),
input: "new-input".to_string(),
version: Some("1.0.0".to_string()),
carry_forward_events: vec![],
initial_custom_status: None,
},
None,
)
.await
.unwrap();
let (_item2, lock_token2, _attempt_count2) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token2,
2, vec![Event::with_event_id(
1,
"instance-A".to_string(),
2,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "new-input".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let result = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap();
if let Some((item, _lock_token, _attempt_count)) = result {
assert_eq!(item.execution_id, 2, "Continue-as-new should create execution 2");
}
tracing::info!("✓ Test passed: continue-as-new creates new execution verified");
}
pub async fn test_execution_history_persistence<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing multi-execution: execution history persistence");
let provider = factory.create_provider().await;
for exec_id in 1..=3 {
provider
.enqueue_for_orchestrator(start_item_with_execution("instance-A", exec_id), None)
.await
.unwrap();
let (_item, lock_token, _attempt_count) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
exec_id,
vec![Event::with_event_id(
1,
"instance-A".to_string(),
exec_id,
None,
EventKind::ActivityScheduled {
name: format!("Activity-{exec_id}"),
input: format!("input-{exec_id}"),
session_id: None,
tag: None,
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
for exec_id in 1..=3 {
let history = provider
.read_with_execution("instance-A", exec_id)
.await
.unwrap_or_default();
assert_eq!(history.len(), 1);
if let EventKind::ActivityScheduled { name, .. } = &history[0].kind {
assert_eq!(name, &format!("Activity-{exec_id}"));
} else {
panic!("Expected ActivityScheduled");
}
}
let latest = provider.read("instance-A").await.unwrap_or_default();
assert_eq!(latest.len(), 1);
if let EventKind::ActivityScheduled { name, .. } = &latest[0].kind {
assert_eq!(name, "Activity-3");
} else {
panic!("Expected ActivityScheduled");
}
tracing::info!("✓ Test passed: execution history persistence verified");
}