use super::dlq_reprocessor::*;
use crate::cook::execution::dlq::{
DLQFilter, DeadLetterQueue, DeadLetteredItem, ErrorType as DlqErrorType, FailureDetail,
WorktreeArtifacts,
};
use chrono::{Duration, Utc};
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
fn create_test_worktree_path(id: &str) -> PathBuf {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let path = temp_dir.path().join(format!("worktree-{}", id));
std::mem::forget(temp_dir);
path
}
async fn create_test_dlq_with_items(
job_id: &str,
) -> anyhow::Result<(Arc<DeadLetterQueue>, TempDir)> {
let temp_dir = TempDir::new()?;
let dlq = Arc::new(
DeadLetterQueue::new(
job_id.to_string(),
temp_dir.path().to_path_buf(),
100,
30,
None,
)
.await?,
);
let item1 = DeadLetteredItem {
item_id: "test-item-1".to_string(),
item_data: json!({"id": 1, "priority": "high"}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 3,
failure_history: vec![FailureDetail {
attempt_number: 1,
timestamp: Utc::now(),
error_type: DlqErrorType::CommandFailed { exit_code: 1 },
error_message: "Test failure".to_string(),
error_context: None,
stack_trace: None,
agent_id: "agent-1".to_string(),
step_failed: "map".to_string(),
duration_ms: 100,
json_log_location: None,
}],
error_signature: "test-error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let item2 = DeadLetteredItem {
item_id: "test-item-2".to_string(),
item_data: json!({"id": 2, "priority": "low"}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 5,
failure_history: vec![],
error_signature: "test-error-2".to_string(),
worktree_artifacts: None,
reprocess_eligible: false,
manual_review_required: true,
};
dlq.add(item1).await?;
dlq.add(item2).await?;
Ok((dlq, temp_dir))
}
#[tokio::test]
async fn test_filter_evaluator_expressions() {
let item = DeadLetteredItem {
item_id: "test-1".to_string(),
item_data: json!({
"priority": "high",
"score": 10,
"name": "test item"
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 3,
failure_history: vec![],
error_signature: "test".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let test_cases = vec![
("item.priority == 'high'", true),
("item.priority == 'low'", false),
("item.priority != 'low'", true),
("item.score >= 10", true),
("item.score > 10", false),
("item.score < 20", true),
("item.failure_count >= 3", true),
("item.failure_count > 5", false),
("item.reprocess_eligible == true", true),
("item.reprocess_eligible == false", false),
("item.name contains 'test'", true),
("item.name contains 'xyz'", false),
];
for (expression, expected) in test_cases {
let evaluator = FilterEvaluator::new(expression.to_string());
assert_eq!(
evaluator.matches(&item),
expected,
"Failed for expression: {}",
expression
);
}
}
#[tokio::test]
async fn test_concurrent_reprocessing_prevention() {
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-3").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = Arc::new(DlqReprocessor::new(dlq.clone(), None, project_root.clone()));
reprocessor
.acquire_reprocessing_lock("test-job-3")
.await
.unwrap();
let result = reprocessor.acquire_reprocessing_lock("test-job-3").await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("already being reprocessed"));
reprocessor.release_reprocessing_lock("test-job-3").await;
let result = reprocessor.acquire_reprocessing_lock("test-job-3").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_clear_processed_items() {
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-4").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = DlqReprocessor::new(dlq.clone(), None, project_root);
let count = reprocessor
.clear_processed_items("test-job-4")
.await
.unwrap();
assert_eq!(count, 1);
let remaining = dlq.list_items(DLQFilter::default()).await.unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].item_id, "test-item-1");
}
#[tokio::test]
async fn test_retry_strategy_delays() {
use std::time::Instant;
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-5").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = DlqReprocessor::new(dlq, None, project_root);
let start = Instant::now();
reprocessor
.apply_retry_delay(&RetryStrategy::Immediate, 2)
.await;
assert!(start.elapsed().as_millis() < 10);
let start = Instant::now();
reprocessor
.apply_retry_delay(&RetryStrategy::FixedDelay { delay_ms: 50 }, 2)
.await;
assert!(start.elapsed().as_millis() >= 50);
let start = Instant::now();
reprocessor
.apply_retry_delay(&RetryStrategy::ExponentialBackoff, 3)
.await;
assert!(start.elapsed().as_millis() >= 4000);
}
#[tokio::test]
async fn test_global_stats() {
let temp_dir = TempDir::new().unwrap();
let dlq_dir = temp_dir.path().join(".prodigy").join("dlq");
std::fs::create_dir_all(&dlq_dir).unwrap();
let dlq_file = dlq_dir.join("test-job-6.json");
std::fs::write(&dlq_file, "{}").unwrap();
let (_dlq, _) = create_test_dlq_with_items("test-job-6").await.unwrap();
let project_root = temp_dir.path().to_path_buf();
let test_dlq = Arc::new(
DeadLetterQueue::new(
"test-job-6".to_string(),
project_root.clone(),
100,
30,
None,
)
.await
.unwrap(),
);
let item1 = DeadLetteredItem {
item_id: "test-item-1".to_string(),
item_data: json!({"id": 1, "priority": "high"}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 3,
failure_history: vec![],
error_signature: "test-error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let item2 = DeadLetteredItem {
item_id: "test-item-2".to_string(),
item_data: json!({"id": 2, "priority": "normal"}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 1,
failure_history: vec![],
error_signature: "test-error-2".to_string(),
worktree_artifacts: None,
reprocess_eligible: false,
manual_review_required: true,
};
test_dlq.add(item1).await.unwrap();
test_dlq.add(item2).await.unwrap();
let dlq_stats = test_dlq.get_stats().await.unwrap();
assert_eq!(dlq_stats.total_items, 2);
assert_eq!(dlq_stats.eligible_for_reprocess, 1);
assert_eq!(dlq_stats.requiring_manual_review, 1);
assert!(dlq_stats.oldest_item.is_some());
assert!(dlq_stats.newest_item.is_some());
assert!(!dlq_stats.error_categories.is_empty());
}
#[tokio::test]
async fn test_advanced_filter_error_types() {
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-7").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = DlqReprocessor::new(dlq.clone(), None, project_root);
let timeout_item = DeadLetteredItem {
item_id: "timeout-item".to_string(),
item_data: json!({"id": 10}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 1,
failure_history: vec![],
error_signature: "timeout occurred during execution".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let validation_item = DeadLetteredItem {
item_id: "validation-item".to_string(),
item_data: json!({"id": 11}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 2,
failure_history: vec![],
error_signature: "validation failed for input".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let items = vec![timeout_item, validation_item];
let filter = DlqFilterAdvanced {
error_types: Some(vec![ErrorType::Timeout]),
date_range: None,
item_filter: None,
max_failure_count: None,
};
let filtered = reprocessor
.apply_advanced_filter(items.clone(), &filter)
.unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].item_id, "timeout-item");
let filter = DlqFilterAdvanced {
error_types: Some(vec![ErrorType::Validation]),
date_range: None,
item_filter: None,
max_failure_count: None,
};
let filtered = reprocessor
.apply_advanced_filter(items.clone(), &filter)
.unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].item_id, "validation-item");
}
#[tokio::test]
async fn test_advanced_filter_date_range() {
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-8").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = DlqReprocessor::new(dlq.clone(), None, project_root);
let now = Utc::now();
let old_item = DeadLetteredItem {
item_id: "old-item".to_string(),
item_data: json!({"id": 20}),
first_attempt: now - Duration::days(5),
last_attempt: now - Duration::days(5),
failure_count: 1,
failure_history: vec![],
error_signature: "old error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let recent_item = DeadLetteredItem {
item_id: "recent-item".to_string(),
item_data: json!({"id": 21}),
first_attempt: now - Duration::hours(1),
last_attempt: now - Duration::hours(1),
failure_count: 1,
failure_history: vec![],
error_signature: "recent error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let items = vec![old_item, recent_item];
let filter = DlqFilterAdvanced {
error_types: None,
date_range: Some(DateRange {
start: now - Duration::days(2),
end: now,
}),
item_filter: None,
max_failure_count: None,
};
let filtered = reprocessor.apply_advanced_filter(items, &filter).unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].item_id, "recent-item");
}
#[tokio::test]
async fn test_advanced_filter_failure_count() {
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-9").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = DlqReprocessor::new(dlq.clone(), None, project_root);
let low_failure_item = DeadLetteredItem {
item_id: "low-failure".to_string(),
item_data: json!({"id": 30}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 2,
failure_history: vec![],
error_signature: "error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let high_failure_item = DeadLetteredItem {
item_id: "high-failure".to_string(),
item_data: json!({"id": 31}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 10,
failure_history: vec![],
error_signature: "error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
let items = vec![low_failure_item, high_failure_item];
let filter = DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: None,
max_failure_count: Some(5),
};
let filtered = reprocessor.apply_advanced_filter(items, &filter).unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].item_id, "low-failure");
}
#[tokio::test]
async fn test_reprocess_items_basic() {
let (dlq, _temp_dir) = create_test_dlq_with_items("test-job-10").await.unwrap();
let project_root = PathBuf::from(".");
let reprocessor = DlqReprocessor::new(dlq.clone(), None, project_root);
let options = ReprocessOptions {
max_retries: 1,
filter: None,
parallel: 2,
timeout_per_item: 10,
strategy: RetryStrategy::Immediate,
merge_results: false,
force: true,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
assert!(result.total_items > 0);
assert_eq!(
result.total_items,
result.successful + result.failed + result.skipped
);
assert!(!result.job_id.is_empty());
}
#[tokio::test]
async fn test_integration_end_to_end_dlq_retry_with_mock_failures() {
let temp_dir = TempDir::new().unwrap();
let job_id = "test-job-integration";
let dlq = Arc::new(
DeadLetterQueue::new(
job_id.to_string(),
temp_dir.path().to_path_buf(),
100,
30,
None,
)
.await
.unwrap(),
);
let items: Vec<DeadLetteredItem> = (0..20)
.map(|i| {
let failure_type = match i % 4 {
0 => DlqErrorType::CommandFailed { exit_code: 1 },
1 => DlqErrorType::Timeout,
2 => DlqErrorType::ValidationFailed,
_ => DlqErrorType::Unknown,
};
DeadLetteredItem {
item_id: format!("item-{}", i),
item_data: json!({
"id": i,
"priority": if i % 3 == 0 { "high" } else { "normal" },
"type": format!("type-{}", i % 5),
}),
first_attempt: Utc::now() - Duration::hours(i as i64),
last_attempt: Utc::now() - Duration::minutes(i as i64 * 10),
failure_count: (i % 10) + 1,
failure_history: vec![FailureDetail {
attempt_number: 1,
timestamp: Utc::now(),
error_type: failure_type,
error_message: format!("Mock failure for item {}", i),
error_context: None,
stack_trace: None,
agent_id: format!("agent-{}", i % 3),
step_failed: if i % 2 == 0 { "map" } else { "reduce" }.to_string(),
duration_ms: 100 + i as u64 * 10,
json_log_location: None,
}],
error_signature: format!("error-sig-{}", i % 7),
worktree_artifacts: if i % 5 == 0 {
Some(WorktreeArtifacts {
worktree_path: create_test_worktree_path(&i.to_string()),
branch_name: format!("branch-{}", i),
uncommitted_changes: None,
error_logs: None,
})
} else {
None
},
reprocess_eligible: i % 2 == 0, manual_review_required: i % 5 == 0,
}
})
.collect();
for item in &items {
dlq.add(item.clone()).await.unwrap();
}
let project_root = PathBuf::from(".");
let reprocessor = Arc::new(DlqReprocessor::new(dlq.clone(), None, project_root.clone()));
let options = ReprocessOptions {
max_retries: 2,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.priority == 'high'".to_string()),
max_failure_count: None,
}),
parallel: 3,
timeout_per_item: 30,
strategy: RetryStrategy::FixedDelay { delay_ms: 100 },
merge_results: true,
force: false,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
assert!(result.total_items > 0);
assert!(result.total_items <= 20);
let options = ReprocessOptions {
max_retries: 3,
filter: None,
parallel: 5,
timeout_per_item: 60,
strategy: RetryStrategy::ExponentialBackoff,
merge_results: false,
force: true, };
let result = reprocessor.reprocess_items(options).await.unwrap();
assert!(result.total_items > 0);
let options = ReprocessOptions {
max_retries: 1,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.failure_count < 5".to_string()),
max_failure_count: None,
}),
parallel: 10, timeout_per_item: 20,
strategy: RetryStrategy::Immediate,
merge_results: true,
force: false,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
assert!(result.duration.as_secs() < 60);
let interrupt_job_id = "test-interrupt-job";
let interrupt_dlq = Arc::new(
DeadLetterQueue::new(
interrupt_job_id.to_string(),
temp_dir.path().to_path_buf(),
100,
30,
None,
)
.await
.unwrap(),
);
for i in 0..10 {
let item = DeadLetteredItem {
item_id: format!("interrupt-item-{}", i),
item_data: json!({"id": i, "test": "interrupt"}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 1,
failure_history: vec![],
error_signature: "interrupt-test".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
};
interrupt_dlq.add(item).await.unwrap();
}
let interrupt_reprocessor = Arc::new(DlqReprocessor::new(
interrupt_dlq.clone(),
None,
project_root.clone(),
));
let interrupt_reprocessor_2 = Arc::new(DlqReprocessor::new(
interrupt_dlq.clone(),
None,
project_root.clone(),
));
let options = ReprocessOptions {
max_retries: 1,
filter: None,
parallel: 2,
timeout_per_item: 10,
strategy: RetryStrategy::Immediate,
merge_results: false,
force: true,
};
let result = interrupt_reprocessor_2
.reprocess_items(options)
.await
.unwrap();
assert!(result.total_items > 0);
interrupt_reprocessor
.acquire_reprocessing_lock(interrupt_job_id)
.await
.unwrap();
let lock_result = interrupt_reprocessor
.acquire_reprocessing_lock(interrupt_job_id)
.await;
assert!(lock_result.is_err());
assert!(lock_result
.unwrap_err()
.to_string()
.contains("already being reprocessed"));
interrupt_reprocessor
.release_reprocessing_lock(interrupt_job_id)
.await;
interrupt_reprocessor
.acquire_reprocessing_lock(interrupt_job_id)
.await
.unwrap();
}
#[tokio::test]
async fn test_integration_complex_filter_scenarios() {
let temp_dir = TempDir::new().unwrap();
let job_id = "test-job-complex-filter";
let dlq = Arc::new(
DeadLetterQueue::new(
job_id.to_string(),
temp_dir.path().to_path_buf(),
100,
30,
None,
)
.await
.unwrap(),
);
let complex_items = vec![
DeadLetteredItem {
item_id: "complex-1".to_string(),
item_data: json!({
"user": {
"id": 1,
"name": "Alice",
"score": 95,
"tags": ["premium", "active"]
},
"metadata": {
"region": "US",
"tier": "gold"
}
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 2,
failure_history: vec![],
error_signature: "auth-error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
},
DeadLetteredItem {
item_id: "complex-2".to_string(),
item_data: json!({
"user": {
"id": 2,
"name": "Bob",
"score": 60,
"tags": ["free", "inactive"]
},
"metadata": {
"region": "EU",
"tier": "silver"
}
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 5,
failure_history: vec![],
error_signature: "timeout-error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
},
DeadLetteredItem {
item_id: "complex-3".to_string(),
item_data: json!({
"user": {
"id": 3,
"name": "Charlie",
"score": 75,
"tags": ["premium", "inactive"]
},
"metadata": {
"region": "US",
"tier": "bronze"
}
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 1,
failure_history: vec![],
error_signature: "validation-error".to_string(),
worktree_artifacts: None,
reprocess_eligible: false,
manual_review_required: true,
},
];
for item in &complex_items {
dlq.add(item.clone()).await.unwrap();
}
let project_root = PathBuf::from(".");
let reprocessor = Arc::new(DlqReprocessor::new(dlq.clone(), None, project_root));
let options = ReprocessOptions {
max_retries: 1,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.user.score > 70".to_string()),
max_failure_count: None,
}),
parallel: 2,
timeout_per_item: 30,
strategy: RetryStrategy::Immediate,
merge_results: true,
force: true,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
assert_eq!(result.total_items, 2);
let options = ReprocessOptions {
max_retries: 1,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.metadata.region == 'US' && item.failure_count < 3".to_string()),
max_failure_count: None,
}),
parallel: 2,
timeout_per_item: 30,
strategy: RetryStrategy::Immediate,
merge_results: true,
force: true,
};
let _result = reprocessor.reprocess_items(options).await.unwrap();
}
#[tokio::test]
async fn test_performance_large_dlq_processing() {
use std::time::Instant;
let temp_dir = TempDir::new().unwrap();
let job_id = "test-job-performance";
let dlq = Arc::new(
DeadLetterQueue::new(
job_id.to_string(),
temp_dir.path().to_path_buf(),
10000, 30,
None,
)
.await
.unwrap(),
);
let large_item_count = 1000;
for batch in 0..(large_item_count / 100) {
let batch_items: Vec<DeadLetteredItem> = (0..100)
.map(|i| {
let item_idx = batch * 100 + i;
DeadLetteredItem {
item_id: format!("perf-item-{}", item_idx),
item_data: json!({
"id": item_idx,
"data": format!("test-data-{}", item_idx),
"batch": batch,
"priority": if item_idx % 10 == 0 { "high" } else { "normal" },
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: (item_idx % 5) + 1,
failure_history: vec![],
error_signature: format!("error-{}", item_idx % 20),
worktree_artifacts: None,
reprocess_eligible: item_idx % 4 != 0, manual_review_required: false,
}
})
.collect();
for item in batch_items {
dlq.add(item).await.unwrap();
}
}
let project_root = PathBuf::from(".");
let reprocessor = Arc::new(DlqReprocessor::new(dlq.clone(), None, project_root));
let start_memory_test = Instant::now();
let options = ReprocessOptions {
max_retries: 1,
filter: None,
parallel: 20, timeout_per_item: 10,
strategy: RetryStrategy::Immediate,
merge_results: false, force: false, };
let result = reprocessor.reprocess_items(options).await.unwrap();
let memory_test_duration = start_memory_test.elapsed();
assert!(result.total_items > 0); assert!(memory_test_duration.as_secs() < 120);
let parallel_configs = vec![1, 5, 10, 20];
let mut scaling_results = Vec::new();
for parallel_count in parallel_configs {
let start = Instant::now();
let options = ReprocessOptions {
max_retries: 1,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.priority == 'high'".to_string()),
max_failure_count: None,
}), parallel: parallel_count,
timeout_per_item: 10,
strategy: RetryStrategy::Immediate,
merge_results: false,
force: true,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
let duration = start.elapsed();
scaling_results.push((parallel_count, duration.as_millis(), result.total_items));
}
let single_thread_time = scaling_results[0].1;
let multi_thread_time = scaling_results[3].1;
if single_thread_time == 0 && multi_thread_time == 0 {
return;
}
let single_thread_time = single_thread_time.max(1);
let performance_ratio = multi_thread_time as f64 / single_thread_time as f64;
assert!(
performance_ratio < 3.0,
"Parallel execution severely degraded: single={} ms, multi={} ms, ratio={:.2}",
single_thread_time,
multi_thread_time,
performance_ratio
);
let filter_start = Instant::now();
let complex_filter = "item.failure_count <= 3 && item.batch >= 5";
let options = ReprocessOptions {
max_retries: 1,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some(complex_filter.to_string()),
max_failure_count: None,
}),
parallel: 10,
timeout_per_item: 10,
strategy: RetryStrategy::Immediate,
merge_results: false,
force: true,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
let filter_duration = filter_start.elapsed();
assert!(result.total_items > 0);
assert!(filter_duration.as_secs() < 60);
let cleanup_start = Instant::now();
let cleaned = reprocessor.clear_processed_items(job_id).await.unwrap();
let cleanup_duration = cleanup_start.elapsed();
assert!(cleaned > 0);
assert!(cleanup_duration.as_secs() < 30); }
#[tokio::test]
async fn test_error_recovery_and_resilience() {
let temp_dir = TempDir::new().unwrap();
let job_id = "test-job-resilience";
let dlq = Arc::new(
DeadLetterQueue::new(
job_id.to_string(),
temp_dir.path().to_path_buf(),
100,
30,
None,
)
.await
.unwrap(),
);
let error_items = vec![
DeadLetteredItem {
item_id: "poison-pill".to_string(),
item_data: json!({
"type": "poison",
"should_fail": true,
"error_code": "FATAL"
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 100, failure_history: vec![],
error_signature: "fatal-error".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: true,
},
DeadLetteredItem {
item_id: "recoverable".to_string(),
item_data: json!({
"type": "transient",
"retry_count": 0
}),
first_attempt: Utc::now(),
last_attempt: Utc::now(),
failure_count: 2,
failure_history: vec![],
error_signature: "network-timeout".to_string(),
worktree_artifacts: None,
reprocess_eligible: true,
manual_review_required: false,
},
];
for item in &error_items {
dlq.add(item.clone()).await.unwrap();
}
let project_root = PathBuf::from(".");
let reprocessor = Arc::new(DlqReprocessor::new(dlq.clone(), None, project_root));
let options = ReprocessOptions {
max_retries: 3,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.failure_count > 50".to_string()),
max_failure_count: None,
}),
parallel: 1,
timeout_per_item: 5, strategy: RetryStrategy::ExponentialBackoff,
merge_results: true,
force: true,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
assert!(result.total_items > 0);
let options = ReprocessOptions {
max_retries: 5,
filter: Some(DlqFilterAdvanced {
error_types: None,
date_range: None,
item_filter: Some("item.error_signature contains 'timeout'".to_string()),
max_failure_count: None,
}),
parallel: 2,
timeout_per_item: 30,
strategy: RetryStrategy::FixedDelay { delay_ms: 500 },
merge_results: true,
force: false,
};
let result = reprocessor.reprocess_items(options).await.unwrap();
assert!(result.total_items > 0);
}