use crate::provider_validation::{Event, EventKind, ExecutionMetadata, start_item};
use crate::provider_validations::ProviderFactory;
use crate::providers::{ScheduledActivityIdentifier, TagFilter, WorkItem};
use std::time::Duration;
pub async fn test_fetch_returns_running_state_for_active_orchestration<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: fetch returns Running for active orchestration");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-running"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Running".to_string()),
..Default::default()
};
let activity_item = WorkItem::ActivityExecute {
instance: "inst-running".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-running".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![activity_item],
vec![],
metadata,
vec![],
)
.await
.unwrap();
let result = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
match result {
Some((_, _, _)) => {
}
None => panic!("Expected to fetch work item"),
}
tracing::info!("✓ Test passed: fetch returns Running for active orchestration");
}
pub async fn test_fetch_returns_terminal_state_when_orchestration_completed<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: fetch returns Terminal for completed orchestration");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-completed"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Completed".to_string()), output: Some("done".to_string()),
..Default::default()
};
let activity_item = WorkItem::ActivityExecute {
instance: "inst-completed".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![
Event::with_event_id(
1,
"inst-completed".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"inst-completed".to_string(),
1,
None,
EventKind::OrchestrationCompleted {
output: "done".to_string(),
},
),
],
vec![activity_item],
vec![],
metadata,
vec![],
)
.await
.unwrap();
let result = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
match result {
Some((_, _, _)) => {
}
None => panic!("Expected to fetch work item"),
}
tracing::info!("✓ Test passed: fetch returns Terminal for completed orchestration");
}
pub async fn test_fetch_returns_terminal_state_when_orchestration_failed<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: fetch returns Terminal for failed orchestration");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-failed"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Failed".to_string()), ..Default::default()
};
let activity_item = WorkItem::ActivityExecute {
instance: "inst-failed".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![
Event::with_event_id(
1,
"inst-failed".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"inst-failed".to_string(),
1,
None,
EventKind::OrchestrationFailed {
details: crate::ErrorDetails::Application {
kind: crate::AppErrorKind::OrchestrationFailed,
message: "boom".to_string(),
retryable: false,
},
},
),
],
vec![activity_item],
vec![],
metadata,
vec![],
)
.await
.unwrap();
let result = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
match result {
Some((_, _, _)) => {
}
None => panic!("Expected to fetch work item"),
}
tracing::info!("✓ Test passed: fetch returns Terminal for failed orchestration");
}
pub async fn test_fetch_returns_terminal_state_when_orchestration_continued_as_new<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: fetch returns Terminal for ContinuedAsNew orchestration");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-can"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("ContinuedAsNew".to_string()), output: Some("new-input".to_string()),
..Default::default()
};
let activity_item = WorkItem::ActivityExecute {
instance: "inst-can".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![
Event::with_event_id(
1,
"inst-can".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"inst-can".to_string(),
1,
None,
EventKind::OrchestrationContinuedAsNew {
input: "new-input".to_string(),
},
),
],
vec![activity_item],
vec![],
metadata,
vec![],
)
.await
.unwrap();
let result = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
match result {
Some((_, _, _)) => {
}
None => panic!("Expected to fetch work item"),
}
tracing::info!("✓ Test passed: fetch returns Terminal for ContinuedAsNew orchestration");
}
pub async fn test_fetch_returns_missing_state_when_instance_deleted<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: fetch returns activity for missing instance");
let provider = factory.create_provider().await;
let activity_item = WorkItem::ActivityExecute {
instance: "inst-missing".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider.enqueue_for_worker(activity_item).await.unwrap();
let result = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
match result {
Some((_, _, _)) => {
}
None => panic!("Expected to fetch work item"),
}
tracing::info!("✓ Test passed: fetch returns activity for missing instance");
}
pub async fn test_renew_returns_running_when_orchestration_active<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: renew returns Running for active orchestration");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-renew-run"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Running".to_string()),
..Default::default()
};
let activity_item = WorkItem::ActivityExecute {
instance: "inst-renew-run".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-renew-run".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![activity_item],
vec![],
metadata,
vec![],
)
.await
.unwrap();
let (_, lock_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
provider
.renew_work_item_lock(&lock_token, Duration::from_secs(30))
.await
.unwrap();
tracing::info!("✓ Test passed: renew returns Running for active orchestration");
}
pub async fn test_renew_returns_terminal_when_orchestration_completed<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: renew returns Terminal for completed orchestration");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-renew-term"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Running".to_string()),
..Default::default()
};
let activity_item = WorkItem::ActivityExecute {
instance: "inst-renew-term".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-renew-term".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![activity_item],
vec![],
metadata,
vec![],
)
.await
.unwrap();
let (_, lock_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
provider
.enqueue_for_orchestrator(
WorkItem::ExternalRaised {
instance: "inst-renew-term".to_string(),
name: "Trigger".to_string(),
data: "{}".to_string(),
},
None,
)
.await
.unwrap();
let (_item2, token2, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata2 = ExecutionMetadata {
status: Some("Completed".to_string()),
output: Some("done".to_string()),
..Default::default()
};
provider
.ack_orchestration_item(
&token2,
1,
vec![Event::with_event_id(
2,
"inst-renew-term".to_string(),
1,
None,
EventKind::OrchestrationCompleted {
output: "done".to_string(),
},
)],
vec![],
vec![],
metadata2,
vec![],
)
.await
.unwrap();
provider
.renew_work_item_lock(&lock_token, Duration::from_secs(30))
.await
.unwrap();
tracing::info!("✓ Test passed: renew returns Terminal for completed orchestration");
}
pub async fn test_renew_returns_missing_when_instance_deleted<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: renew for missing instance");
let provider = factory.create_provider().await;
let activity_item = WorkItem::ActivityExecute {
instance: "inst-renew-missing".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider.enqueue_for_worker(activity_item).await.unwrap();
let (_, lock_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
provider
.renew_work_item_lock(&lock_token, Duration::from_secs(30))
.await
.unwrap();
tracing::info!("✓ Test passed: renew for missing instance");
}
pub async fn test_ack_work_item_none_deletes_without_enqueue<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: ack(None) deletes without enqueue");
let provider = factory.create_provider().await;
let activity_item = WorkItem::ActivityExecute {
instance: "inst-ack-none".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider.enqueue_for_worker(activity_item).await.unwrap();
let (_, lock_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
provider.ack_work_item(&lock_token, None).await.unwrap();
let result = provider
.fetch_work_item(Duration::from_millis(100), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
assert!(result.is_none(), "Worker queue should be empty");
let orch_result = provider
.fetch_orchestration_item(Duration::from_millis(100), Duration::ZERO, None)
.await
.unwrap();
assert!(orch_result.is_none(), "Orchestrator queue should be empty");
tracing::info!("✓ Test passed: ack(None) deletes without enqueue");
}
pub async fn test_cancelled_activities_deleted_from_worker_queue<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: cancelled_activities deletes from worker queue");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-cancel-delete"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Running".to_string()),
..Default::default()
};
let activity1 = WorkItem::ActivityExecute {
instance: "inst-cancel-delete".to_string(),
execution_id: 1,
id: 1,
name: "Activity1".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
let activity2 = WorkItem::ActivityExecute {
instance: "inst-cancel-delete".to_string(),
execution_id: 1,
id: 2,
name: "Activity2".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
let activity3 = WorkItem::ActivityExecute {
instance: "inst-cancel-delete".to_string(),
execution_id: 1,
id: 3,
name: "Activity3".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-cancel-delete".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![activity1, activity2, activity3],
vec![],
metadata,
vec![], )
.await
.unwrap();
let (_item1, token1, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.expect("Should have activity in queue");
provider.abandon_work_item(&token1, None, false).await.unwrap();
provider
.enqueue_for_orchestrator(
WorkItem::ExternalRaised {
instance: "inst-cancel-delete".to_string(),
name: "Trigger".to_string(),
data: "{}".to_string(),
},
None,
)
.await
.unwrap();
let (_item2, token2, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let cancelled = vec![
ScheduledActivityIdentifier {
instance: "inst-cancel-delete".to_string(),
execution_id: 1,
activity_id: 1,
},
ScheduledActivityIdentifier {
instance: "inst-cancel-delete".to_string(),
execution_id: 1,
activity_id: 2,
},
];
provider
.ack_orchestration_item(
&token2,
1,
vec![], vec![], vec![], ExecutionMetadata::default(),
cancelled,
)
.await
.unwrap();
let (remaining_item, _, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.expect("Should have activity 3 remaining");
match remaining_item {
WorkItem::ActivityExecute { id, .. } => {
assert_eq!(
id, 3,
"Only activity 3 should remain; activities 1 and 2 should be cancelled"
);
}
_ => panic!("Expected ActivityExecute"),
}
let no_more = provider
.fetch_work_item(Duration::from_millis(100), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
assert!(no_more.is_none(), "Should have no more activities");
tracing::info!("✓ Test passed: cancelled_activities deletes from worker queue");
}
pub async fn test_ack_work_item_fails_when_entry_deleted<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: ack_work_item fails when entry deleted (lock stolen)");
let provider = factory.create_provider().await;
let activity_item = WorkItem::ActivityExecute {
instance: "inst-ack-stolen".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider.enqueue_for_worker(activity_item).await.unwrap();
let (_, lock_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
provider.ack_work_item(&lock_token, None).await.unwrap();
let completion = WorkItem::ActivityCompleted {
instance: "inst-ack-stolen".to_string(),
execution_id: 1,
id: 1,
result: "done".to_string(),
};
let result = provider.ack_work_item(&lock_token, Some(completion)).await;
assert!(result.is_err(), "ack_work_item should fail when entry already deleted");
let err = result.unwrap_err();
assert!(
!err.is_retryable(),
"Error should be permanent (not retryable) for deleted entry: {err:?}"
);
tracing::info!("✓ Test passed: ack_work_item fails when entry deleted (lock stolen)");
}
pub async fn test_renew_fails_when_entry_deleted<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: renew fails when entry deleted (cancellation signal)");
let provider = factory.create_provider().await;
let activity_item = WorkItem::ActivityExecute {
instance: "inst-renew-stolen".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider.enqueue_for_worker(activity_item).await.unwrap();
let (_, lock_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
provider.ack_work_item(&lock_token, None).await.unwrap();
let result = provider
.renew_work_item_lock(&lock_token, Duration::from_secs(30))
.await;
assert!(result.is_err(), "renew_work_item_lock should fail when entry deleted");
tracing::info!("✓ Test passed: renew fails when entry deleted (cancellation signal)");
}
pub async fn test_cancelling_nonexistent_activities_is_idempotent<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: cancelling non-existent activities is idempotent");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-cancel-idempotent"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let cancelled = vec![
ScheduledActivityIdentifier {
instance: "inst-cancel-idempotent".to_string(),
execution_id: 1,
activity_id: 999, },
ScheduledActivityIdentifier {
instance: "inst-cancel-idempotent".to_string(),
execution_id: 1, activity_id: 1,
},
];
let result = provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-cancel-idempotent".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
..Default::default()
},
cancelled,
)
.await;
assert!(result.is_ok(), "Cancelling non-existent activities should not error");
tracing::info!("✓ Test passed: cancelling non-existent activities is idempotent");
}
pub async fn test_batch_cancellation_deletes_multiple_activities<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: batch cancellation deletes multiple activities atomically");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-batch-cancel"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Running".to_string()),
..Default::default()
};
let activities: Vec<WorkItem> = (1..=5)
.map(|i| WorkItem::ActivityExecute {
instance: "inst-batch-cancel".to_string(),
execution_id: 1,
id: i,
name: format!("Activity{i}"),
input: "{}".to_string(),
session_id: None,
tag: None,
})
.collect();
provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-batch-cancel".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
activities,
vec![],
metadata,
vec![],
)
.await
.unwrap();
provider
.enqueue_for_orchestrator(
WorkItem::ExternalRaised {
instance: "inst-batch-cancel".to_string(),
name: "Trigger".to_string(),
data: "{}".to_string(),
},
None,
)
.await
.unwrap();
let (_item2, token2, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let cancelled: Vec<ScheduledActivityIdentifier> = (1..=5)
.map(|i| ScheduledActivityIdentifier {
instance: "inst-batch-cancel".to_string(),
execution_id: 1,
activity_id: i,
})
.collect();
provider
.ack_orchestration_item(
&token2,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
cancelled,
)
.await
.unwrap();
let remaining = provider
.fetch_work_item(Duration::from_millis(100), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
assert!(
remaining.is_none(),
"All activities should be cancelled; worker queue should be empty"
);
tracing::info!("✓ Test passed: batch cancellation deletes multiple activities atomically");
}
pub async fn test_same_activity_in_worker_items_and_cancelled_is_noop<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: same activity in worker_items and cancelled_activities is no-op");
let provider = factory.create_provider().await;
provider
.enqueue_for_orchestrator(start_item("inst-schedule-then-cancel"), None)
.await
.unwrap();
let (_item, token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let metadata = ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
status: Some("Running".to_string()),
..Default::default()
};
let activity_id = 2u64; let scheduled_activity = WorkItem::ActivityExecute {
instance: "inst-schedule-then-cancel".to_string(),
execution_id: 1,
id: activity_id,
name: "DroppedActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
let cancelled_activity = ScheduledActivityIdentifier {
instance: "inst-schedule-then-cancel".to_string(),
execution_id: 1,
activity_id,
};
let normal_activity = WorkItem::ActivityExecute {
instance: "inst-schedule-then-cancel".to_string(),
execution_id: 1,
id: 3, name: "NormalActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&token,
1,
vec![Event::with_event_id(
1,
"inst-schedule-then-cancel".to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![scheduled_activity, normal_activity], vec![],
metadata,
vec![cancelled_activity], )
.await
.expect("ack_orchestration_item should succeed");
let (remaining_item, remaining_token, _) = provider
.fetch_work_item(Duration::from_secs(30), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.expect("Should have the normal activity in queue");
match &remaining_item {
WorkItem::ActivityExecute { id, name, .. } => {
assert_eq!(
*id, 3,
"Only the normal activity (id=3) should remain; the scheduled-then-cancelled activity (id=2) should be gone"
);
assert_eq!(name, "NormalActivity");
}
_ => panic!("Expected ActivityExecute, got {remaining_item:?}"),
}
provider.ack_work_item(&remaining_token, None).await.unwrap();
let no_more = provider
.fetch_work_item(Duration::from_millis(100), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
assert!(
no_more.is_none(),
"Should have no more activities; the schedule-then-cancel activity should NOT be in queue"
);
tracing::info!("✓ Test passed: same activity in worker_items and cancelled_activities is no-op");
}
pub async fn test_orphan_activity_after_instance_force_deletion<F: ProviderFactory>(factory: &F) {
tracing::info!("→ Testing cancellation: orphan activity after instance force-deletion");
let provider = factory.create_provider().await;
let instance = "inst-orphan-activity";
provider
.enqueue_for_orchestrator(start_item(instance), None)
.await
.unwrap();
let (_item, orch_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let activity_item = WorkItem::ActivityExecute {
instance: instance.to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "test-input".to_string(),
session_id: None,
tag: None,
};
provider
.ack_orchestration_item(
&orch_token,
1,
vec![Event::with_event_id(
1,
instance.to_string(),
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![activity_item],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
status: Some("Running".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
let delete_result = mgmt.delete_instance(instance, true).await.unwrap();
assert!(
delete_result.instances_deleted > 0,
"Force delete should remove the instance"
);
assert!(
mgmt.get_instance_info(instance).await.is_err(),
"Instance should no longer exist"
);
let work_item_result = provider
.fetch_work_item(Duration::from_secs(5), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
if let Some((_item, worker_token, _)) = work_item_result {
tracing::info!("Activity survived instance deletion — executing and completing it");
let completion = WorkItem::ActivityCompleted {
instance: instance.to_string(),
execution_id: 1,
id: 1,
result: "orphan-result".to_string(),
};
provider.ack_work_item(&worker_token, Some(completion)).await.unwrap();
tracing::info!("Activity completed — completion is now orphaned in orchestrator queue");
} else {
tracing::info!("Force delete also cleaned up worker queue — activity was removed");
}
tracing::info!("✓ Test passed: orphan activity after instance force-deletion handled gracefully");
}