#[cfg(test)]
mod tests {
use crate::cook::execution::mapreduce::{AgentResult, AgentStatus, MapReduceConfig};
use crate::cook::execution::state::{
CheckpointManager, DefaultJobStateManager, JobStateManager, MapReduceJobState,
};
use serde_json::json;
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_job_state_creation() {
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: Some(10),
offset: None,
};
let work_items = vec![
json!({"id": 1, "name": "item1"}),
json!({"id": 2, "name": "item2"}),
json!({"id": 3, "name": "item3"}),
];
let state = MapReduceJobState::new(
"test-job-123".to_string(),
config.clone(),
work_items.clone(),
);
assert_eq!(state.job_id, "test-job-123");
assert_eq!(state.total_items, 3);
assert_eq!(state.pending_items.len(), 3);
assert_eq!(state.successful_count, 0);
assert_eq!(state.failed_count, 0);
assert!(!state.is_complete);
assert!(state.pending_items.contains(&"item_0".to_string()));
assert!(state.pending_items.contains(&"item_1".to_string()));
assert!(state.pending_items.contains(&"item_2".to_string()));
}
#[tokio::test]
async fn test_find_work_item() {
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1, "data": "test1"}),
json!({"id": 2, "data": "test2"}),
json!({"id": 3, "data": "test3"}),
];
let state = MapReduceJobState::new("test-job".to_string(), config, work_items);
let item = state.find_work_item("item_0");
assert!(item.is_some());
assert_eq!(item.unwrap()["id"], 1);
let item = state.find_work_item("item_2");
assert!(item.is_some());
assert_eq!(item.unwrap()["id"], 3);
let item = state.find_work_item("item_10");
assert!(item.is_none());
let item = state.find_work_item("invalid");
assert!(item.is_none());
}
#[tokio::test]
async fn test_resume_with_partial_completion() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1}),
json!({"id": 2}),
json!({"id": 3}),
json!({"id": 4}),
];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("Error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state.successful_count, 1);
assert_eq!(state.failed_count, 1);
assert_eq!(state.completed_agents.len(), 2);
assert_eq!(state.pending_items.len(), 2); assert!(!state.is_complete);
let retriable = state.get_retriable_items(2);
assert_eq!(retriable.len(), 1); assert!(retriable.contains(&"item_1".to_string()));
}
#[tokio::test]
async fn test_agent_result_update() {
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let mut state = MapReduceJobState::new("test-job".to_string(), config, work_items);
let success_result = AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("Success output".to_string()),
commits: vec!["abc123".to_string()],
duration: Duration::from_secs(5),
error: None,
worktree_path: Some(PathBuf::from("<test-worktree-path>")),
branch_name: Some("feature-1".to_string()),
worktree_session_id: Some("session-1".to_string()),
files_modified: vec!["file1.rs".to_string()],
json_log_location: None,
cleanup_status: None,
};
state.update_agent_result(success_result.clone());
assert_eq!(state.successful_count, 1);
assert_eq!(state.failed_count, 0);
assert!(state.completed_agents.contains("item_0"));
assert!(!state.pending_items.contains(&"item_0".to_string()));
assert_eq!(state.agent_results.len(), 1);
assert_eq!(state.checkpoint_version, 1);
let failed_result = AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("Test error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(2),
error: Some("Test error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
};
state.update_agent_result(failed_result);
assert_eq!(state.successful_count, 1);
assert_eq!(state.failed_count, 1);
assert!(state.completed_agents.contains("item_1"));
assert!(!state.pending_items.contains(&"item_1".to_string()));
assert_eq!(state.failed_agents.len(), 1);
assert!(state.failed_agents.contains_key("item_1"));
assert_eq!(state.checkpoint_version, 2);
}
#[tokio::test]
async fn test_checkpoint_persistence() {
let temp_dir = TempDir::new().unwrap();
let checkpoint_manager = CheckpointManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1, "data": "test1"}),
json!({"id": 2, "data": "test2"}),
json!({"id": 3, "data": "test3"}),
];
let mut state = MapReduceJobState::new("checkpoint-test".to_string(), config, work_items);
state.update_agent_result(AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("output1".to_string()),
commits: vec!["commit1".to_string()],
duration: Duration::from_secs(3),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
});
state.update_agent_result(AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
});
checkpoint_manager.save_checkpoint(&state).await.unwrap();
let loaded_state = checkpoint_manager
.load_checkpoint("checkpoint-test")
.await
.unwrap();
assert_eq!(loaded_state.job_id, state.job_id);
assert_eq!(loaded_state.total_items, state.total_items);
assert_eq!(loaded_state.successful_count, 1);
assert_eq!(loaded_state.failed_count, 1);
assert_eq!(loaded_state.completed_agents.len(), 2);
assert_eq!(loaded_state.pending_items.len(), 1);
assert_eq!(loaded_state.checkpoint_version, state.checkpoint_version);
}
#[tokio::test]
async fn test_checkpoint_cleanup() {
let temp_dir = TempDir::new().unwrap();
let checkpoint_manager = CheckpointManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let mut state = MapReduceJobState::new("cleanup-test".to_string(), config, vec![]);
for i in 0..5 {
state.checkpoint_version = i;
checkpoint_manager.save_checkpoint(&state).await.unwrap();
}
let checkpoints = checkpoint_manager
.list_checkpoints("cleanup-test")
.await
.unwrap();
assert!(checkpoints.len() <= 3);
if !checkpoints.is_empty() {
assert_eq!(checkpoints[0].version, 4);
}
}
#[tokio::test]
async fn test_job_state_manager_lifecycle() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
let job_id = manager
.create_job(config.clone(), work_items.clone(), vec![], None)
.await
.unwrap();
assert!(job_id.starts_with("mapreduce-"));
let state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state.total_items, 3);
assert_eq!(state.successful_count, 0);
let result1 = AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("output".to_string()),
commits: vec![],
duration: Duration::from_secs(2),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
};
manager.update_agent_result(&job_id, result1).await.unwrap();
let updated_state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(updated_state.successful_count, 1);
assert_eq!(updated_state.completed_agents.len(), 1);
manager.start_reduce_phase(&job_id).await.unwrap();
let state_with_reduce = manager.get_job_state(&job_id).await.unwrap();
assert!(state_with_reduce.reduce_phase_state.is_some());
assert!(state_with_reduce.reduce_phase_state.unwrap().started);
manager
.complete_reduce_phase(&job_id, Some("reduce output".to_string()))
.await
.unwrap();
let final_state = manager.get_job_state(&job_id).await.unwrap();
assert!(final_state.is_complete);
assert!(final_state.reduce_phase_state.unwrap().completed);
manager.cleanup_job(&job_id).await.unwrap();
assert!(manager.get_job_state(&job_id).await.is_err());
}
#[tokio::test]
async fn test_resume_job_functionality() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1}),
json!({"id": 2}),
json!({"id": 3}),
json!({"id": 4}),
];
let job_id = manager
.create_job(config, work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("result1".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let results = manager.resume_job(&job_id).await.unwrap();
assert_eq!(results.len(), 2);
let state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state.completed_agents.len(), 2);
assert_eq!(state.pending_items.len(), 2); assert_eq!(state.successful_count, 1);
assert_eq!(state.failed_count, 1);
let retriable = state.get_retriable_items(3);
assert_eq!(retriable.len(), 1); assert!(retriable.contains(&"item_1".to_string()));
}
#[tokio::test]
async fn test_atomic_checkpoint_writes() {
let temp_dir = TempDir::new().unwrap();
let checkpoint_manager = CheckpointManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let state =
MapReduceJobState::new("atomic-test".to_string(), config, vec![json!({"id": 1})]);
checkpoint_manager.save_checkpoint(&state).await.unwrap();
let job_dir = temp_dir.path().join("jobs").join("atomic-test");
let temp_files: Vec<_> = std::fs::read_dir(&job_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension() == Some(std::ffi::OsStr::new("tmp")))
.collect();
assert_eq!(temp_files.len(), 0, "No temp files should remain");
assert!(job_dir.join("checkpoint-v0.json").exists());
assert!(job_dir.join("metadata.json").exists());
}
#[tokio::test]
async fn test_phase_completion_tracking() {
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let mut state = MapReduceJobState::new("phase-test".to_string(), config, work_items);
assert!(!state.is_map_phase_complete());
state.update_agent_result(AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("output".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
});
state.update_agent_result(AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Success,
output: Some("output".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
});
assert!(state.is_map_phase_complete());
assert_eq!(state.pending_items.len(), 0);
assert_eq!(state.completed_agents.len(), 2);
state.start_reduce_phase();
assert!(state.reduce_phase_state.is_some());
assert!(state.reduce_phase_state.as_ref().unwrap().started);
assert!(!state.reduce_phase_state.as_ref().unwrap().completed);
state.complete_reduce_phase(Some("reduce output".to_string()));
assert!(state.reduce_phase_state.as_ref().unwrap().completed);
assert_eq!(
state.reduce_phase_state.as_ref().unwrap().output,
Some("reduce output".to_string())
);
assert!(state.is_complete);
}
#[tokio::test]
async fn test_resume_with_modified_max_parallel() {
use crate::cook::execution::mapreduce::ResumeOptions;
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1}),
json!({"id": 2}),
json!({"id": 3}),
json!({"id": 4}),
json!({"id": 5}),
];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
let initial_state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(initial_state.config.max_parallel, 5);
let resume_options = ResumeOptions {
reprocess_failed: false,
max_parallel: None,
skip_validation: false,
agent_timeout_secs: None,
};
assert!(!resume_options.reprocess_failed);
assert_eq!(resume_options.max_parallel, None);
assert!(!resume_options.skip_validation);
}
#[tokio::test]
async fn test_resume_with_timeout_override() {
use crate::cook::execution::mapreduce::ResumeOptions;
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
let _initial_state = manager.get_job_state(&job_id).await.unwrap();
let resume_options = ResumeOptions {
reprocess_failed: false,
max_parallel: None,
skip_validation: false,
agent_timeout_secs: None,
};
assert!(!resume_options.reprocess_failed);
assert_eq!(resume_options.max_parallel, None);
}
#[tokio::test]
async fn test_resume_with_additional_retries() {
use crate::cook::execution::mapreduce::ResumeOptions;
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Failed("Error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let _state = manager.get_job_state(&job_id).await.unwrap();
let resume_options = ResumeOptions {
reprocess_failed: false,
max_parallel: Some(2),
skip_validation: false,
agent_timeout_secs: None,
};
assert_eq!(resume_options.max_parallel, Some(2));
}
#[tokio::test]
async fn test_resume_with_force_retry_all() {
use crate::cook::execution::mapreduce::ResumeOptions;
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Failed("Error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state = manager.get_job_state(&job_id).await.unwrap();
let retriable = state.get_retriable_items(0);
assert_eq!(retriable.len(), 0);
let resume_options = ResumeOptions {
reprocess_failed: true,
max_parallel: None,
skip_validation: false,
agent_timeout_secs: None,
};
assert!(resume_options.reprocess_failed);
}
#[tokio::test]
#[ignore = "Complex test that depends on internal state management logic"]
async fn test_multiple_interrupt_resume_cycles() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 2,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1}),
json!({"id": 2}),
json!({"id": 3}),
json!({"id": 4}),
json!({"id": 5}),
];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("Success 1".to_string()),
commits: vec!["commit1".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file1.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("Error 1".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Error 1".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state1 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state1.successful_count, 1);
assert_eq!(state1.failed_count, 1);
assert_eq!(state1.checkpoint_version, 2);
assert_eq!(state1.pending_items.len(), 3);
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_2".to_string(),
status: AgentStatus::Success,
output: Some("Success 2".to_string()),
commits: vec!["commit2".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file2.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Success,
output: Some("Success after retry".to_string()),
commits: vec!["commit3".to_string()],
duration: Duration::from_secs(2),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file1_fixed.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state2 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state2.successful_count, 3);
assert_eq!(state2.checkpoint_version, 4);
assert_eq!(state2.pending_items.len(), 2);
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_3".to_string(),
status: AgentStatus::Success,
output: Some("Success 3".to_string()),
commits: vec!["commit4".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file3.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_4".to_string(),
status: AgentStatus::Success,
output: Some("Success 4".to_string()),
commits: vec!["commit5".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file4.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let final_state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(final_state.successful_count, 5);
assert_eq!(final_state.checkpoint_version, 6);
assert_eq!(final_state.pending_items.len(), 0);
assert!(final_state.is_complete);
assert_eq!(final_state.agent_results.len(), 5);
assert!(final_state.agent_results.contains_key("item_0"));
assert!(final_state.agent_results.contains_key("item_1"));
assert!(final_state.agent_results.contains_key("item_2"));
assert!(final_state.agent_results.contains_key("item_3"));
assert!(final_state.agent_results.contains_key("item_4"));
assert_eq!(final_state.completed_agents.len(), 5);
assert!(final_state.failed_agents.is_empty() || final_state.failed_agents.len() <= 1);
}
#[tokio::test]
async fn test_full_mapreduce_resume_workflow_integration() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 3,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1, "name": "item1"}),
json!({"id": 2, "name": "item2"}),
json!({"id": 3, "name": "item3"}),
json!({"id": 4, "name": "item4"}),
json!({"id": 5, "name": "item5"}),
];
let job_id = manager
.create_job(config.clone(), work_items.clone(), vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("Processed item1".to_string()),
commits: vec!["commit1".to_string()],
duration: Duration::from_secs(2),
error: None,
worktree_path: Some(PathBuf::from("/worktrees/agent1")),
branch_name: Some("mapreduce-agent-1".to_string()),
worktree_session_id: Some("session-1".to_string()),
files_modified: vec!["file1.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("Network error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Network error".to_string()),
worktree_path: Some(PathBuf::from("/worktrees/agent2")),
branch_name: Some("mapreduce-agent-2".to_string()),
worktree_session_id: Some("session-2".to_string()),
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state1 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state1.successful_count, 1);
assert_eq!(state1.failed_count, 1);
assert_eq!(state1.pending_items.len(), 3);
assert_eq!(state1.checkpoint_version, 2);
let resumed_results = manager.resume_job(&job_id).await.unwrap();
assert_eq!(resumed_results.len(), 2);
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_2".to_string(),
status: AgentStatus::Success,
output: Some("Processed item3".to_string()),
commits: vec!["commit2".to_string()],
duration: Duration::from_secs(3),
error: None,
worktree_path: Some(PathBuf::from("/worktrees/agent3")),
branch_name: Some("mapreduce-agent-3".to_string()),
worktree_session_id: Some("session-3".to_string()),
files_modified: vec!["file3.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Success,
output: Some("Processed item2 on retry".to_string()),
commits: vec!["commit3".to_string()],
duration: Duration::from_secs(2),
error: None,
worktree_path: Some(PathBuf::from("/worktrees/agent4")),
branch_name: Some("mapreduce-agent-4".to_string()),
worktree_session_id: Some("session-4".to_string()),
files_modified: vec!["file2.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state2 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state2.successful_count, 3);
assert_eq!(state2.pending_items.len(), 2);
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_3".to_string(),
status: AgentStatus::Success,
output: Some("Processed item4".to_string()),
commits: vec!["commit4".to_string()],
duration: Duration::from_secs(2),
error: None,
worktree_path: Some(PathBuf::from("/worktrees/agent5")),
branch_name: Some("mapreduce-agent-5".to_string()),
worktree_session_id: Some("session-5".to_string()),
files_modified: vec!["file4.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_4".to_string(),
status: AgentStatus::Success,
output: Some("Processed item5".to_string()),
commits: vec!["commit5".to_string()],
duration: Duration::from_secs(2),
error: None,
worktree_path: Some(PathBuf::from("/worktrees/agent6")),
branch_name: Some("mapreduce-agent-6".to_string()),
worktree_session_id: Some("session-6".to_string()),
files_modified: vec!["file5.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state3 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state3.successful_count, 5);
assert_eq!(state3.pending_items.len(), 0);
assert!(state3.is_map_phase_complete());
manager.start_reduce_phase(&job_id).await.unwrap();
let state_with_reduce = manager.get_job_state(&job_id).await.unwrap();
assert!(state_with_reduce.reduce_phase_state.is_some());
assert!(
state_with_reduce
.reduce_phase_state
.as_ref()
.unwrap()
.started
);
manager
.complete_reduce_phase(&job_id, Some("Aggregated 5 items successfully".to_string()))
.await
.unwrap();
let final_state = manager.get_job_state(&job_id).await.unwrap();
assert!(final_state.is_complete);
assert_eq!(final_state.agent_results.len(), 5);
assert!(final_state.reduce_phase_state.as_ref().unwrap().completed);
assert_eq!(
final_state.reduce_phase_state.as_ref().unwrap().output,
Some("Aggregated 5 items successfully".to_string())
);
let all_commits: Vec<String> = final_state
.agent_results
.values()
.flat_map(|r| r.commits.clone())
.collect();
assert_eq!(all_commits.len(), 5);
assert!(all_commits.contains(&"commit1".to_string()));
assert!(all_commits.contains(&"commit5".to_string()));
}
#[tokio::test]
async fn test_dlq_restoration_on_resume() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 2,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1, "critical": true}),
json!({"id": 2, "critical": false}),
json!({"id": 3, "critical": true}),
];
let job_id = manager
.create_job(config.clone(), work_items.clone(), vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Failed("Critical failure".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Critical failure".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Failed("Still failing".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Still failing".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec!["commit1".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state_before = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state_before.successful_count, 1);
assert_eq!(state_before.failed_count, 2); assert!(state_before.failed_agents.contains_key("item_0"));
assert_eq!(
state_before.failed_agents.get("item_0").unwrap().attempts,
2
);
let resumed_results = manager.resume_job(&job_id).await.unwrap();
assert_eq!(resumed_results.len(), 2);
let state_after = manager.get_job_state(&job_id).await.unwrap();
assert!(state_after.failed_agents.contains_key("item_0"));
assert_eq!(state_after.failed_agents.get("item_0").unwrap().attempts, 2);
let retriable = state_after.get_retriable_items(1);
assert!(!retriable.contains(&"item_0".to_string()));
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_2".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec!["commit2".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let final_state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(final_state.successful_count, 2);
assert_eq!(final_state.failed_count, 2); assert!(final_state.is_map_phase_complete());
}
#[tokio::test]
async fn test_duplicate_prevention_on_resume() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 3,
max_items: None,
offset: None,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("First execution".to_string()),
commits: vec!["commit1".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file1.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state1 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state1.successful_count, 1);
assert!(state1.completed_agents.contains("item_0"));
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("Duplicate execution".to_string()),
commits: vec!["commit2".to_string()],
duration: Duration::from_secs(2),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec!["file2.rs".to_string()],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state2 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state2.successful_count, 2); assert_eq!(state2.completed_agents.len(), 1);
let result = state2.agent_results.get("item_0").unwrap();
assert_eq!(result.output, Some("Duplicate execution".to_string()));
assert_eq!(result.commits, vec!["commit2".to_string()]);
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec!["commit3".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_2".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec!["commit4".to_string()],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let final_state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(final_state.successful_count, 4); assert_eq!(final_state.completed_agents.len(), 3); assert!(final_state.is_map_phase_complete()); }
#[tokio::test]
#[ignore = "Complex test that depends on internal state management logic"]
async fn test_interrupt_resume_with_mixed_failures() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 3,
max_items: None,
offset: None,
};
let work_items = vec![
json!({"id": 1}),
json!({"id": 2}),
json!({"id": 3}),
json!({"id": 4}),
];
let job_id = manager
.create_job(config.clone(), work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("Error".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state1 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state1.successful_count, 1);
assert_eq!(state1.failed_count, 1);
assert_eq!(state1.pending_items.len(), 2);
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_2".to_string(),
status: AgentStatus::Failed("Error 2".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("Error 2".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Success,
output: Some("Success on retry".to_string()),
commits: vec![],
duration: Duration::from_secs(2),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let state2 = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state2.successful_count, 2);
assert_eq!(state2.pending_items.len(), 1);
let retriable = state2.get_retriable_items(2);
assert_eq!(retriable.len(), 1); assert!(retriable.contains(&"item_2".to_string()));
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_3".to_string(),
status: AgentStatus::Success,
output: Some("Success".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_2".to_string(),
status: AgentStatus::Success,
output: Some("Success on retry".to_string()),
commits: vec![],
duration: Duration::from_secs(2),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let final_state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(final_state.successful_count, 4);
assert!(final_state.is_complete);
assert_eq!(final_state.checkpoint_version, 6);
}
}