use crate::INITIAL_EXECUTION_ID;
use crate::provider_validation::{Event, EventKind, ExecutionMetadata, ProviderFactory, create_instance, start_item};
use crate::providers::{InstanceFilter, WorkItem};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub async fn test_delete_instance_bulk_filter_combinations<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing delete_instance_bulk: filter combinations");
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
for i in 0..5 {
create_completed_instance(&*provider, &format!("bulk-del-filter-{i}")).await;
}
let result = mgmt
.delete_instance_bulk(InstanceFilter {
instance_ids: Some(vec!["bulk-del-filter-0".into(), "bulk-del-filter-1".into()]),
..Default::default()
})
.await
.unwrap();
assert_eq!(result.instances_deleted, 2, "Should delete 2 instances by ID");
assert!(mgmt.get_instance_info("bulk-del-filter-0").await.is_err());
assert!(mgmt.get_instance_info("bulk-del-filter-1").await.is_err());
assert!(mgmt.get_instance_info("bulk-del-filter-2").await.is_ok());
let result = mgmt
.delete_instance_bulk(InstanceFilter {
instance_ids: Some(vec!["does-not-exist-1".into(), "does-not-exist-2".into()]),
..Default::default()
})
.await
.unwrap();
assert_eq!(result.instances_deleted, 0, "Non-existent IDs should return 0 deleted");
let result = mgmt.delete_instance_bulk(InstanceFilter::default()).await.unwrap();
assert!(
result.instances_deleted >= 3,
"Empty filter should delete remaining terminal instances"
);
assert!(mgmt.get_instance_info("bulk-del-filter-2").await.is_err());
assert!(mgmt.get_instance_info("bulk-del-filter-3").await.is_err());
assert!(mgmt.get_instance_info("bulk-del-filter-4").await.is_err());
tracing::info!("✓ Test passed: delete_instance_bulk filter combinations");
}
pub async fn test_delete_instance_bulk_safety_and_limits<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing delete_instance_bulk: safety and limits");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
create_completed_instance(&*provider, "bulk-del-safe-completed-1").await;
create_completed_instance(&*provider, "bulk-del-safe-completed-2").await;
create_instance(&*provider, "bulk-del-safe-running-1").await.unwrap();
create_instance(&*provider, "bulk-del-safe-running-2").await.unwrap();
let result = mgmt
.delete_instance_bulk(InstanceFilter {
instance_ids: Some(vec![
"bulk-del-safe-completed-1".into(),
"bulk-del-safe-completed-2".into(),
"bulk-del-safe-running-1".into(),
"bulk-del-safe-running-2".into(),
]),
..Default::default()
})
.await
.unwrap();
assert_eq!(result.instances_deleted, 2, "Should only delete completed instances");
assert!(
mgmt.get_instance_info("bulk-del-safe-running-1").await.is_ok(),
"Running instance should not be deleted"
);
assert!(
mgmt.get_instance_info("bulk-del-safe-running-2").await.is_ok(),
"Running instance should not be deleted"
);
for i in 0..4 {
create_completed_instance(&*provider, &format!("bulk-del-batch-{i}")).await;
}
let result1 = mgmt
.delete_instance_bulk(InstanceFilter {
limit: Some(2),
..Default::default()
})
.await
.unwrap();
assert_eq!(result1.instances_deleted, 2, "First batch should delete 2");
let result2 = mgmt
.delete_instance_bulk(InstanceFilter {
limit: Some(2),
..Default::default()
})
.await
.unwrap();
assert_eq!(result2.instances_deleted, 2, "Second batch should delete 2");
let result3 = mgmt
.delete_instance_bulk(InstanceFilter {
limit: Some(2),
..Default::default()
})
.await
.unwrap();
assert_eq!(result3.instances_deleted, 0, "Third batch should delete 0 (all gone)");
tracing::info!("✓ Test passed: delete_instance_bulk safety and limits");
}
pub async fn test_delete_instance_bulk_completed_before_filter<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing delete_instance_bulk: completed_before filter");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
fn now_millis() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}
let before_creation = now_millis();
tokio::time::sleep(Duration::from_millis(10)).await;
create_completed_instance(&*provider, "bulk-del-time-1").await;
create_completed_instance(&*provider, "bulk-del-time-2").await;
tokio::time::sleep(Duration::from_millis(10)).await;
let after_creation = now_millis();
let result = mgmt
.delete_instance_bulk(InstanceFilter {
instance_ids: Some(vec!["bulk-del-time-1".into(), "bulk-del-time-2".into()]),
completed_before: Some(before_creation),
..Default::default()
})
.await
.unwrap();
assert_eq!(
result.instances_deleted, 0,
"No instances should be deleted - they were completed after the cutoff"
);
assert!(
mgmt.get_instance_info("bulk-del-time-1").await.is_ok(),
"Instance 1 should still exist"
);
assert!(
mgmt.get_instance_info("bulk-del-time-2").await.is_ok(),
"Instance 2 should still exist"
);
let result = mgmt
.delete_instance_bulk(InstanceFilter {
instance_ids: Some(vec!["bulk-del-time-1".into(), "bulk-del-time-2".into()]),
completed_before: Some(after_creation),
..Default::default()
})
.await
.unwrap();
assert_eq!(
result.instances_deleted, 2,
"Both instances should be deleted - they were completed before the cutoff"
);
assert!(
mgmt.get_instance_info("bulk-del-time-1").await.is_err(),
"Instance 1 should be deleted"
);
assert!(
mgmt.get_instance_info("bulk-del-time-2").await.is_err(),
"Instance 2 should be deleted"
);
tracing::info!("✓ Test passed: delete_instance_bulk completed_before filter");
}
pub async fn test_delete_instance_bulk_cascades_to_children<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing delete_instance_bulk: cascades to children");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let parent1 = "bulk-del-cascade-parent1";
let child1 = "bulk-del-cascade-child1";
create_completed_instance(&*provider, parent1).await;
create_completed_instance_with_parent(&*provider, child1, parent1).await;
let parent2 = "bulk-del-cascade-parent2";
let child2a = "bulk-del-cascade-child2a";
let child2b = "bulk-del-cascade-child2b";
let child2c = "bulk-del-cascade-child2c";
create_completed_instance(&*provider, parent2).await;
create_completed_instance_with_parent(&*provider, child2a, parent2).await;
create_completed_instance_with_parent(&*provider, child2b, parent2).await;
create_completed_instance_with_parent(&*provider, child2c, parent2).await;
let standalone = "bulk-del-cascade-standalone";
create_completed_instance(&*provider, standalone).await;
assert!(mgmt.get_instance_info(parent1).await.is_ok());
assert!(mgmt.get_instance_info(child1).await.is_ok());
assert!(mgmt.get_instance_info(parent2).await.is_ok());
assert!(mgmt.get_instance_info(child2a).await.is_ok());
assert!(mgmt.get_instance_info(child2b).await.is_ok());
assert!(mgmt.get_instance_info(child2c).await.is_ok());
assert!(mgmt.get_instance_info(standalone).await.is_ok());
let result = mgmt
.delete_instance_bulk(InstanceFilter {
instance_ids: Some(vec![parent1.into(), parent2.into(), standalone.into()]),
..Default::default()
})
.await
.unwrap();
assert_eq!(
result.instances_deleted, 7,
"Should delete all roots and their children"
);
assert!(
mgmt.get_instance_info(parent1).await.is_err(),
"Parent1 should be deleted"
);
assert!(
mgmt.get_instance_info(child1).await.is_err(),
"Child1 should be cascade deleted"
);
assert!(
mgmt.get_instance_info(parent2).await.is_err(),
"Parent2 should be deleted"
);
assert!(
mgmt.get_instance_info(child2a).await.is_err(),
"Child2a should be cascade deleted"
);
assert!(
mgmt.get_instance_info(child2b).await.is_err(),
"Child2b should be cascade deleted"
);
assert!(
mgmt.get_instance_info(child2c).await.is_err(),
"Child2c should be cascade deleted"
);
assert!(
mgmt.get_instance_info(standalone).await.is_err(),
"Standalone should be deleted"
);
tracing::info!("✓ Test passed: delete_instance_bulk cascades to children");
}
async fn create_completed_instance(provider: &dyn crate::providers::Provider, instance_id: &str) {
create_completed_instance_with_parent(provider, instance_id, "").await;
}
async fn create_completed_instance_with_parent(
provider: &dyn crate::providers::Provider,
instance_id: &str,
parent_id: &str,
) {
let (start_item, parent_instance_id) = if parent_id.is_empty() {
(start_item(instance_id), None)
} else {
(
WorkItem::StartOrchestration {
instance: instance_id.to_string(),
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: Some(parent_id.to_string()),
parent_id: Some(1),
execution_id: INITIAL_EXECUTION_ID,
},
Some(parent_id.to_string()),
)
};
provider.enqueue_for_orchestrator(start_item, None).await.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: parent_instance_id.clone(),
parent_id: if parent_instance_id.is_some() { Some(1) } else { None },
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
status: Some("Completed".to_string()),
output: Some("done".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
parent_instance_id,
pinned_duroxide_version: None,
},
vec![],
)
.await
.unwrap();
}