use crate::INITIAL_EXECUTION_ID;
use crate::provider_validation::{Event, EventKind, ExecutionMetadata, ProviderFactory, create_instance, start_item};
use crate::providers::{TagFilter, WorkItem};
use std::time::Duration;
pub async fn test_delete_terminal_instances<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing deletion: delete terminal instances (completed and failed)");
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
let completed_id = "delete-term-completed";
create_completed_instance(&*provider, completed_id).await;
let failed_id = "delete-term-failed";
create_failed_instance(&*provider, failed_id).await;
let cancelled_id = "delete-term-cancelled";
create_cancelled_instance(&*provider, cancelled_id).await;
assert!(mgmt.get_instance_info(completed_id).await.is_ok());
assert!(mgmt.get_instance_info(failed_id).await.is_ok());
assert!(mgmt.get_instance_info(cancelled_id).await.is_ok());
let result = mgmt.delete_instance(completed_id, false).await.unwrap();
assert!(result.instances_deleted >= 1, "Completed instance should be deleted");
assert!(result.executions_deleted >= 1, "Should delete at least 1 execution");
assert!(result.events_deleted >= 1, "Should delete at least 1 event");
assert!(
mgmt.get_instance_info(completed_id).await.is_err(),
"Completed instance should not exist after deletion"
);
let result = mgmt.delete_instance(failed_id, false).await.unwrap();
assert!(result.instances_deleted >= 1, "Failed instance should be deleted");
assert!(
mgmt.get_instance_info(failed_id).await.is_err(),
"Failed instance should not exist after deletion"
);
let result = mgmt.delete_instance(cancelled_id, false).await.unwrap();
assert!(result.instances_deleted >= 1, "Cancelled instance should be deleted");
assert!(
mgmt.get_instance_info(cancelled_id).await.is_err(),
"Cancelled instance should not exist after deletion"
);
tracing::info!("✓ Test passed: delete terminal instances (completed, failed, cancelled)");
}
pub async fn test_delete_running_rejected_force_succeeds<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing deletion: running rejected, force succeeds");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let instance_id = "delete-running-force";
create_instance(&*provider, instance_id).await.unwrap();
let result = mgmt.delete_instance(instance_id, false).await;
assert!(result.is_err(), "Should reject deletion of running instance");
let err = result.unwrap_err();
assert!(!err.is_retryable(), "Error should be permanent");
assert!(
err.to_string().to_lowercase().contains("running"),
"Error message should mention 'running'"
);
assert!(
mgmt.get_instance_info(instance_id).await.is_ok(),
"Instance should still exist after rejected deletion"
);
let result = mgmt.delete_instance(instance_id, true).await.unwrap();
assert!(result.instances_deleted >= 1, "Force delete should succeed");
assert!(
mgmt.get_instance_info(instance_id).await.is_err(),
"Instance should not exist after force deletion"
);
tracing::info!("✓ Test passed: running rejected, force succeeds");
}
pub async fn test_delete_nonexistent_instance<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing deletion: non-existent instance returns error");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let result = mgmt.delete_instance("does-not-exist-12345", false).await;
assert!(result.is_err(), "Should return error for non-existent instance");
let err = result.unwrap_err();
assert!(!err.is_retryable(), "Error should be permanent");
assert!(
err.to_string().to_lowercase().contains("not found"),
"Error should mention 'not found'"
);
tracing::info!("✓ Test passed: non-existent instance returns error");
}
pub async fn test_delete_cleans_queues_and_locks<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing deletion: cleans queues, locks, allows ID reuse");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let instance_id = "delete-queues-locks";
provider
.enqueue_for_orchestrator(start_item(instance_id), None)
.await
.unwrap();
provider
.enqueue_for_worker(WorkItem::ActivityExecute {
instance: instance_id.to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
})
.await
.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
status: Some("Completed".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let result = mgmt.delete_instance(instance_id, false).await.unwrap();
assert!(result.queue_messages_deleted >= 1, "Should have deleted queue messages");
let new_orch_name = "ReusedOrch";
let new_input = r#"{"reused": true}"#;
provider
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: instance_id.to_string(),
orchestration: new_orch_name.to_string(),
version: Some("2.0.0".to_string()),
input: new_input.to_string(),
parent_instance: None,
parent_id: None,
execution_id: 1,
},
None,
)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(
item.orchestration_name, new_orch_name,
"Fetched item should have new orchestration name"
);
let result = provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: new_orch_name.to_string(),
version: "2.0.0".to_string(),
input: new_input.to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some(new_orch_name.to_string()),
orchestration_version: Some("2.0.0".to_string()),
..Default::default()
},
vec![],
)
.await;
assert!(
result.is_ok(),
"Should be able to recreate instance after deletion (locks cleared)"
);
let info = mgmt.get_instance_info(instance_id).await.unwrap();
assert_eq!(
info.orchestration_name, new_orch_name,
"Recreated instance should have new orchestration name, not stale data"
);
mgmt.delete_instance(instance_id, true).await.unwrap();
tracing::info!("✓ Test passed: cleans queues, locks, allows ID reuse");
}
pub async fn test_cascade_delete_hierarchy<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing deletion: cascade delete hierarchy");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let root_id = "cascade-root";
let child1_id = "cascade-child1";
let child2_id = "cascade-child2";
let grandchild_id = "cascade-grandchild";
create_completed_instance_with_parent(&*provider, root_id, None).await;
create_completed_instance_with_parent(&*provider, child1_id, Some(root_id)).await;
create_completed_instance_with_parent(&*provider, child2_id, Some(root_id)).await;
create_completed_instance_with_parent(&*provider, grandchild_id, Some(child1_id)).await;
assert!(mgmt.get_instance_info(root_id).await.is_ok());
assert!(mgmt.get_instance_info(child1_id).await.is_ok());
assert!(mgmt.get_instance_info(child2_id).await.is_ok());
assert!(mgmt.get_instance_info(grandchild_id).await.is_ok());
let result = mgmt.delete_instance(child1_id, false).await;
assert!(result.is_err(), "Should not allow direct deletion of sub-orchestration");
let err = result.unwrap_err();
assert!(
err.to_string().to_lowercase().contains("sub-orchestration")
|| err.to_string().to_lowercase().contains("root")
|| err.to_string().to_lowercase().contains("parent"),
"Error should mention sub-orchestration, root, or parent"
);
let result = mgmt.delete_instance(child1_id, true).await;
assert!(result.is_err(), "Should not allow force deletion of sub-orchestration");
let result = mgmt.delete_instance(grandchild_id, false).await;
assert!(
result.is_err(),
"Should not allow direct deletion of deeply nested sub-orchestration"
);
let result = mgmt.delete_instance(root_id, false).await.unwrap();
assert!(result.instances_deleted >= 1, "Root should be deleted");
assert!(
result.executions_deleted >= 4,
"Should delete executions from all 4 instances"
);
assert!(mgmt.get_instance_info(root_id).await.is_err(), "Root should be deleted");
assert!(
mgmt.get_instance_info(child1_id).await.is_err(),
"Child1 should be cascade deleted"
);
assert!(
mgmt.get_instance_info(child2_id).await.is_err(),
"Child2 should be cascade deleted"
);
assert!(
mgmt.get_instance_info(grandchild_id).await.is_err(),
"Grandchild should be cascade deleted"
);
tracing::info!("✓ Test passed: cascade delete hierarchy");
}
pub async fn test_force_delete_prevents_ack_recreation<F: ProviderFactory>(factory: &F) {
tracing::info!("→ CRITICAL TEST: force delete prevents ack recreation");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let instance_id = "zombie-prevention";
provider
.enqueue_for_orchestrator(start_item(instance_id), None)
.await
.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(
WorkItem::ExternalRaised {
instance: instance_id.to_string(),
name: "test-event".to_string(),
data: "{}".to_string(),
},
None,
)
.await
.unwrap();
let (_item2, lock_token2, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let delete_result = mgmt.delete_instance(instance_id, true).await.unwrap();
assert!(delete_result.instances_deleted >= 1, "Force delete should succeed");
let ack_result = provider
.ack_orchestration_item(
&lock_token2,
1,
vec![Event::with_event_id(
2,
instance_id,
1,
None,
EventKind::ExternalEvent {
name: "test-event".to_string(),
data: "{}".to_string(),
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await;
assert!(
ack_result.is_err(),
"CRITICAL: Ack must fail after force delete to prevent zombie recreation"
);
let info_result = mgmt.get_instance_info(instance_id).await;
assert!(
info_result.is_err(),
"CRITICAL: Instance must NOT be recreated after force delete"
);
tracing::info!("✓ CRITICAL TEST PASSED: force delete prevents ack recreation");
}
pub(crate) async fn create_completed_instance(provider: &dyn crate::providers::Provider, instance_id: &str) {
create_completed_instance_with_parent(provider, instance_id, None).await;
}
async fn create_completed_instance_with_parent(
provider: &dyn crate::providers::Provider,
instance_id: &str,
parent_id: Option<&str>,
) {
let start_item = if let Some(parent) = parent_id {
WorkItem::StartOrchestration {
instance: instance_id.to_string(),
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
parent_instance: Some(parent.to_string()),
parent_id: Some(1),
execution_id: INITIAL_EXECUTION_ID,
}
} else {
start_item(instance_id)
};
provider.enqueue_for_orchestrator(start_item, None).await.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: parent_id.map(|s| s.to_string()),
parent_id: parent_id.map(|_| 1),
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
status: Some("Completed".to_string()),
output: Some("done".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
parent_instance_id: parent_id.map(|s| s.to_string()),
pinned_duroxide_version: None,
},
vec![],
)
.await
.unwrap();
}
async fn create_failed_instance(provider: &dyn crate::providers::Provider, instance_id: &str) {
provider
.enqueue_for_orchestrator(start_item(instance_id), None)
.await
.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
status: Some("Failed".to_string()),
output: Some("error: something went wrong".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
}
async fn create_cancelled_instance(provider: &dyn crate::providers::Provider, instance_id: &str) {
provider
.enqueue_for_orchestrator(start_item(instance_id), None)
.await
.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
instance_id,
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
status: Some("Failed".to_string()),
output: Some("cancelled: user requested cancellation".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
}
pub async fn test_list_children<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing primitive: list_children");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let root_id = "list-children-root";
let child1_id = format!("{root_id}::sub::2");
let child2_id = format!("{root_id}::sub::3");
let grandchild_id = format!("{child2_id}::sub::4");
create_completed_instance(&*provider, root_id).await;
create_child_instance(&*provider, &child1_id, root_id).await;
create_child_instance(&*provider, &child2_id, root_id).await;
create_child_instance(&*provider, &grandchild_id, &child2_id).await;
let children = mgmt.list_children(root_id).await.unwrap();
assert_eq!(children.len(), 2, "Root should have 2 direct children");
assert!(children.contains(&child1_id), "Should contain child1");
assert!(children.contains(&child2_id), "Should contain child2");
assert!(!children.contains(&grandchild_id), "Should NOT contain grandchild");
let children = mgmt.list_children(&child2_id).await.unwrap();
assert_eq!(children.len(), 1, "child2 should have 1 child");
assert!(children.contains(&grandchild_id), "Should contain grandchild");
let children = mgmt.list_children(&grandchild_id).await.unwrap();
assert!(children.is_empty(), "Leaf should have no children");
let children = mgmt.list_children("non-existent").await.unwrap();
assert!(children.is_empty(), "Non-existent instance should return empty");
mgmt.delete_instance(root_id, true).await.unwrap();
tracing::info!("✓ Test passed: list_children");
}
pub async fn test_delete_get_parent_id<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing primitive: get_parent_id");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let root_id = "parent-id-root";
let child_id = format!("{root_id}::sub::2");
create_completed_instance(&*provider, root_id).await;
create_child_instance(&*provider, &child_id, root_id).await;
let parent = mgmt.get_parent_id(root_id).await.unwrap();
assert!(parent.is_none(), "Root should have no parent");
let parent = mgmt.get_parent_id(&child_id).await.unwrap();
assert_eq!(parent, Some(root_id.to_string()), "Child should have root as parent");
let result = mgmt.get_parent_id("non-existent").await;
assert!(result.is_err(), "Non-existent instance should return error");
mgmt.delete_instance(root_id, true).await.unwrap();
tracing::info!("✓ Test passed: get_parent_id");
}
pub async fn test_delete_get_instance_tree<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing composite: get_instance_tree");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let root_id = "tree-root";
let child1_id = format!("{root_id}::sub::2");
let child2_id = format!("{root_id}::sub::3");
let grandchild_id = format!("{child2_id}::sub::4");
create_completed_instance(&*provider, root_id).await;
create_child_instance(&*provider, &child1_id, root_id).await;
create_child_instance(&*provider, &child2_id, root_id).await;
create_child_instance(&*provider, &grandchild_id, &child2_id).await;
let tree = mgmt.get_instance_tree(root_id).await.unwrap();
assert_eq!(tree.root_id, root_id, "Root ID should match");
assert_eq!(tree.size(), 4, "Tree should have 4 instances");
assert!(!tree.is_root_only(), "Tree should have children");
assert!(tree.all_ids.contains(&root_id.to_string()));
assert!(tree.all_ids.contains(&child1_id));
assert!(tree.all_ids.contains(&child2_id));
assert!(tree.all_ids.contains(&grandchild_id));
let leaf_tree = mgmt.get_instance_tree(&grandchild_id).await.unwrap();
assert_eq!(leaf_tree.size(), 1, "Leaf tree should have 1 instance");
assert!(leaf_tree.is_root_only(), "Single instance tree should be root-only");
mgmt.delete_instance(root_id, true).await.unwrap();
tracing::info!("✓ Test passed: get_instance_tree");
}
pub async fn test_delete_instances_atomic<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing primitive: delete_instances_atomic");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let ids = vec![
"atomic-del-1".to_string(),
"atomic-del-2".to_string(),
"atomic-del-3".to_string(),
];
for id in &ids {
create_completed_instance(&*provider, id).await;
}
for id in &ids {
assert!(mgmt.get_instance_info(id).await.is_ok());
}
let result = mgmt.delete_instances_atomic(&ids, false).await.unwrap();
assert!(result.instances_deleted >= 1, "Should report deletion");
assert!(result.executions_deleted >= 3, "Should delete at least 3 executions");
for id in &ids {
assert!(
mgmt.get_instance_info(id).await.is_err(),
"Instance {id} should be gone"
);
}
tracing::info!("✓ Test passed: delete_instances_atomic");
}
pub async fn test_delete_instances_atomic_force<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing primitive: delete_instances_atomic with force");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let completed_id = "atomic-force-completed".to_string();
let running_id = "atomic-force-running".to_string();
create_completed_instance(&*provider, &completed_id).await;
create_instance(&*provider, &running_id).await.unwrap();
let ids = vec![completed_id.clone(), running_id.clone()];
let result = mgmt.delete_instances_atomic(&ids, false).await;
assert!(
result.is_err(),
"Should fail without force when running instance present"
);
assert!(mgmt.get_instance_info(&completed_id).await.is_ok());
assert!(mgmt.get_instance_info(&running_id).await.is_ok());
let result = mgmt.delete_instances_atomic(&ids, true).await.unwrap();
assert!(result.instances_deleted >= 1, "Should delete with force");
assert!(mgmt.get_instance_info(&completed_id).await.is_err());
assert!(mgmt.get_instance_info(&running_id).await.is_err());
tracing::info!("✓ Test passed: delete_instances_atomic with force");
}
pub async fn test_delete_instances_atomic_orphan_detection<F: ProviderFactory>(factory: &F) {
tracing::info!("→ CRITICAL TEST: delete_instances_atomic orphan detection");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let root_id = "orphan-detect-root";
let child_id = format!("{root_id}::sub::2");
create_completed_instance(&*provider, root_id).await;
create_child_instance(&*provider, &child_id, root_id).await;
assert!(mgmt.get_instance_info(root_id).await.is_ok());
assert!(mgmt.get_instance_info(&child_id).await.is_ok());
let result = mgmt.delete_instances_atomic(&[root_id.to_string()], false).await;
assert!(
result.is_err(),
"CRITICAL: delete_instances_atomic must fail when it would create orphans"
);
let err = result.unwrap_err();
let err_msg = err.to_string().to_lowercase();
assert!(
err_msg.contains("child") || err_msg.contains("orphan") || err_msg.contains("tree"),
"Error should mention child/orphan/tree issue: {err}"
);
assert!(
mgmt.get_instance_info(root_id).await.is_ok(),
"Root should still exist after failed delete"
);
assert!(
mgmt.get_instance_info(&child_id).await.is_ok(),
"Child should still exist after failed delete"
);
let result = mgmt
.delete_instances_atomic(&[root_id.to_string(), child_id.clone()], false)
.await;
assert!(result.is_ok(), "Delete with complete tree should succeed: {result:?}");
assert!(mgmt.get_instance_info(root_id).await.is_err(), "Root should be deleted");
assert!(
mgmt.get_instance_info(&child_id).await.is_err(),
"Child should be deleted"
);
tracing::info!("✓ CRITICAL TEST PASSED: delete_instances_atomic orphan detection");
}
pub async fn test_stale_activity_after_delete_recreate<F: ProviderFactory>(factory: &F) {
tracing::info!("→ CRITICAL TEST: stale activity after delete+recreate");
let provider = factory.create_provider().await;
let mgmt = provider.as_management_capability().unwrap();
let instance_id = "stale-activity-recreate";
provider
.enqueue_for_orchestrator(start_item(instance_id), None)
.await
.unwrap();
let (_item, orch_lock, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&orch_lock,
INITIAL_EXECUTION_ID,
vec![
Event::with_event_id(
1,
instance_id,
INITIAL_EXECUTION_ID,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
instance_id,
INITIAL_EXECUTION_ID,
Some(2),
EventKind::ActivityScheduled {
name: "OldActivity".to_string(),
input: "old-input".to_string(),
session_id: None,
tag: None,
},
),
],
vec![WorkItem::ActivityExecute {
instance: instance_id.to_string(),
execution_id: INITIAL_EXECUTION_ID,
id: 2,
name: "OldActivity".to_string(),
input: "old-input".to_string(),
session_id: None,
tag: None,
}],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let (_work_item, old_worker_lock, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
let delete_result = mgmt.delete_instance(instance_id, true).await.unwrap();
assert!(delete_result.instances_deleted >= 1, "Force delete should succeed");
assert!(
mgmt.get_instance_info(instance_id).await.is_err(),
"Instance should be deleted"
);
provider
.enqueue_for_orchestrator(start_item(instance_id), None)
.await
.unwrap();
let (_item2, orch_lock2, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&orch_lock2,
INITIAL_EXECUTION_ID,
vec![Event::with_event_id(
1,
instance_id,
INITIAL_EXECUTION_ID,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let info = mgmt.get_instance_info(instance_id).await.unwrap();
assert_eq!(info.orchestration_name, "TestOrch");
let old_ack_result = provider
.ack_work_item(
&old_worker_lock,
Some(WorkItem::ActivityCompleted {
instance: instance_id.to_string(),
execution_id: INITIAL_EXECUTION_ID,
id: 2,
result: "stale-result-from-old-instance".to_string(),
}),
)
.await;
assert!(
old_ack_result.is_err(),
"CRITICAL: Stale activity ack must fail to prevent corruption of recreated instance"
);
provider
.enqueue_for_orchestrator(
WorkItem::ExternalRaised {
instance: instance_id.to_string(),
name: "probe-event".to_string(),
data: "{}".to_string(),
},
None,
)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let has_stale_completion = item.messages.iter().any(|m| {
matches!(
m,
WorkItem::ActivityCompleted { result, .. } if result.contains("stale-result")
)
});
assert!(
!has_stale_completion,
"CRITICAL: New instance must NOT see stale completion from deleted instance. Messages: {:?}",
item.messages
);
let _ = provider
.ack_orchestration_item(
&lock_token,
INITIAL_EXECUTION_ID,
vec![],
vec![],
vec![],
ExecutionMetadata {
status: Some("Completed".to_string()),
..Default::default()
},
vec![],
)
.await;
let _ = mgmt.delete_instance(instance_id, true).await;
tracing::info!("✓ CRITICAL TEST PASSED: stale activity after delete+recreate");
}
async fn create_child_instance(provider: &dyn crate::providers::Provider, instance_id: &str, parent_id: &str) {
use crate::provider_validation::create_instance_with_parent;
create_instance_with_parent(provider, instance_id, Some(parent_id.to_string()))
.await
.unwrap();
let metadata = ExecutionMetadata {
status: Some("Completed".to_string()),
output: Some("done".to_string()),
orchestration_name: Some("ChildOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
parent_instance_id: Some(parent_id.to_string()),
pinned_duroxide_version: None,
};
let (_item, lock_token, _attempt_count) = provider
.fetch_orchestration_item(Duration::from_secs(5), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
provider
.ack_orchestration_item(
&lock_token,
INITIAL_EXECUTION_ID,
vec![Event::with_event_id(
1,
instance_id,
INITIAL_EXECUTION_ID,
None,
EventKind::OrchestrationStarted {
name: "ChildOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: Some(parent_id.to_string()),
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
metadata,
vec![],
)
.await
.unwrap();
}