use crate::cook::execution::interpolation::{InterpolationContext, InterpolationEngine};
use crate::cook::execution::mapreduce::*;
use crate::cook::orchestrator::ExecutionEnvironment;
use crate::cook::workflow::{CaptureOutput, CommandType};
use serde_json::json;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
#[test]
fn test_agent_status_serialization() {
let statuses = vec![
AgentStatus::Pending,
AgentStatus::Running,
AgentStatus::Success,
AgentStatus::Failed("error message".to_string()),
AgentStatus::Timeout,
AgentStatus::Retrying(2),
];
for status in statuses {
let json = serde_json::to_string(&status).unwrap();
let deserialized: AgentStatus = serde_json::from_str(&json).unwrap();
match (&status, &deserialized) {
(AgentStatus::Pending, AgentStatus::Pending) => {}
(AgentStatus::Running, AgentStatus::Running) => {}
(AgentStatus::Success, AgentStatus::Success) => {}
(AgentStatus::Failed(a), AgentStatus::Failed(b)) if a == b => {}
(AgentStatus::Timeout, AgentStatus::Timeout) => {}
(AgentStatus::Retrying(a), AgentStatus::Retrying(b)) if a == b => {}
_ => panic!("Deserialization mismatch"),
}
}
}
#[test]
fn test_resume_options_defaults() {
let options = ResumeOptions::default();
assert!(!options.reprocess_failed);
assert_eq!(options.max_parallel, None);
assert!(!options.skip_validation);
}
#[test]
fn test_resume_result_serialization() {
let result = ResumeResult {
job_id: "test-job-123".to_string(),
resumed_from_version: 5,
total_items: 100,
already_completed: 75,
remaining_items: 25,
final_results: vec![AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("output".to_string()),
commits: vec!["abc123".to_string()],
duration: Duration::from_secs(10),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
}],
};
let json = serde_json::to_string(&result).unwrap();
let deserialized: ResumeResult = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.job_id, result.job_id);
assert_eq!(
deserialized.resumed_from_version,
result.resumed_from_version
);
assert_eq!(deserialized.total_items, result.total_items);
assert_eq!(deserialized.already_completed, result.already_completed);
assert_eq!(deserialized.remaining_items, result.remaining_items);
assert_eq!(deserialized.final_results.len(), result.final_results.len());
}
#[test]
fn test_mapreduce_config_defaults() {
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 5,
max_items: None,
offset: None,
};
assert_eq!(config.max_parallel, 5);
}
#[test]
fn test_agent_context_creation() {
let env = ExecutionEnvironment {
working_dir: Arc::new(PathBuf::from("/test/worktree")),
project_dir: Arc::new(PathBuf::from("/test/project")),
worktree_name: Some(Arc::from("test-worktree")),
session_id: Arc::from("test-session"),
};
let context = AgentContext::new(
"item-1".to_string(),
PathBuf::from("/test/worktree"),
"test-worktree".to_string(),
env,
);
assert_eq!(context.item_id, "item-1");
assert_eq!(context.worktree_path, PathBuf::from("/test/worktree"));
assert_eq!(context.worktree_name, "test-worktree");
assert_eq!(context.retry_count, 0);
assert!(context.shell_output.is_none());
assert!(context.variables.is_empty());
}
#[test]
fn test_agent_context_update_with_output() {
let env = ExecutionEnvironment {
working_dir: Arc::new(PathBuf::from("/test/worktree")),
project_dir: Arc::new(PathBuf::from("/test/project")),
worktree_name: Some(Arc::from("test-worktree")),
session_id: Arc::from("test-session"),
};
let mut context = AgentContext::new(
"item-1".to_string(),
PathBuf::from("/test/worktree"),
"test-worktree".to_string(),
env,
);
context.update_with_output(Some("test output".to_string()));
assert_eq!(context.shell_output, Some("test output".to_string()));
assert_eq!(
context.variables.get("shell.output"),
Some(&"test output".to_string())
);
assert_eq!(
context.variables.get("shell.last_output"),
Some(&"test output".to_string())
);
}
#[test]
fn test_agent_context_to_interpolation_context() {
let env = ExecutionEnvironment {
working_dir: Arc::new(PathBuf::from("/test/worktree")),
project_dir: Arc::new(PathBuf::from("/test/project")),
worktree_name: Some(Arc::from("test-worktree")),
session_id: Arc::from("test-session"),
};
let mut context = AgentContext::new(
"item-1".to_string(),
PathBuf::from("/test/worktree"),
"test-worktree".to_string(),
env,
);
context
.variables
.insert("key1".to_string(), "value1".to_string());
context.shell_output = Some("shell output".to_string());
context
.captured_outputs
.insert("capture1".to_string(), "captured".to_string());
context
.iteration_vars
.insert("iter1".to_string(), "iteration".to_string());
let interp_context = context.to_interpolation_context();
let mut engine = InterpolationEngine::new(false);
assert_eq!(
engine.interpolate("${key1}", &interp_context).unwrap(),
"value1"
);
assert_eq!(
engine.interpolate("${capture1}", &interp_context).unwrap(),
"captured"
);
assert_eq!(
engine.interpolate("${iter1}", &interp_context).unwrap(),
"iteration"
);
assert_eq!(
engine
.interpolate("${shell.output}", &interp_context)
.unwrap(),
"shell output"
);
}
#[test]
fn test_agent_result_serialization() {
let result = AgentResult {
item_id: "test_item".to_string(),
status: AgentStatus::Success,
output: Some("test output".to_string()),
commits: vec!["abc123".to_string(), "def456".to_string()],
duration: Duration::from_secs(10),
error: None,
worktree_path: Some(PathBuf::from("<test-worktree-path>")),
branch_name: Some("prodigy-agent-123-test_item".to_string()),
worktree_session_id: Some("prodigy-session-123".to_string()),
files_modified: vec!["src/main.rs".to_string()],
json_log_location: None,
cleanup_status: None,
};
let json = serde_json::to_string(&result).unwrap();
let deserialized: AgentResult = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.item_id, result.item_id);
assert_eq!(deserialized.output, result.output);
assert_eq!(deserialized.commits, result.commits);
assert_eq!(deserialized.duration, result.duration);
assert_eq!(deserialized.error, result.error);
assert_eq!(deserialized.worktree_path, result.worktree_path);
}
#[test]
fn test_map_phase_configuration() {
use crate::cook::workflow::WorkflowStep;
let map_phase = MapPhase {
config: MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "items.json".to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 20,
max_items: None,
offset: None,
},
json_path: Some("$.items[*]".to_string()),
agent_template: vec![WorkflowStep {
name: None,
claude: Some("/fix-issue ${item.description}".to_string()),
shell: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
auto_commit: false,
commit_config: None,
output_file: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_failure: None,
retry: None,
on_success: None,
on_exit_code: HashMap::new(),
commit_required: true,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
}],
filter: Some("severity == 'high'".to_string()),
sort_by: Some("priority".to_string()),
max_items: None,
distinct: None,
timeout_config: None,
workflow_env: HashMap::new(),
};
assert_eq!(map_phase.config.max_parallel, 20);
assert_eq!(map_phase.agent_template.len(), 1);
assert_eq!(map_phase.filter, Some("severity == 'high'".to_string()));
assert_eq!(map_phase.sort_by, Some("priority".to_string()));
}
#[test]
fn test_reduce_phase_configuration() {
use crate::cook::workflow::WorkflowStep;
let reduce_phase = ReducePhase {
commands: vec![
WorkflowStep {
name: None,
claude: Some("/summarize-results".to_string()),
shell: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_failure: None,
retry: None,
on_success: None,
on_exit_code: HashMap::new(),
commit_required: false,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
},
WorkflowStep {
name: None,
claude: None,
shell: Some("git merge --no-ff agent-*".to_string()),
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_failure: None,
retry: None,
on_success: None,
on_exit_code: HashMap::new(),
commit_required: true,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
},
],
timeout_secs: None,
};
assert_eq!(reduce_phase.commands.len(), 2);
assert!(reduce_phase.commands[0].claude.is_some());
assert!(reduce_phase.commands[1].shell.is_some());
}
#[test]
fn test_reduce_phase_variable_substitution() {
let env = ExecutionEnvironment {
working_dir: Arc::new(PathBuf::from("/test/worktree")),
project_dir: Arc::new(PathBuf::from("/test/project")),
worktree_name: Some(Arc::from("test-worktree")),
session_id: Arc::from("test-session"),
};
let mut reduce_context = AgentContext::new(
"reduce".to_string(),
PathBuf::from("/test/worktree"),
"test-worktree".to_string(),
env,
);
reduce_context
.variables
.insert("map.successful".to_string(), "3".to_string());
reduce_context
.variables
.insert("map.failed".to_string(), "1".to_string());
reduce_context
.variables
.insert("map.total".to_string(), "4".to_string());
let interp_context = reduce_context.to_interpolation_context();
assert_eq!(
interp_context
.variables
.get("map.successful")
.and_then(|v| v.as_str()),
Some("3")
);
assert_eq!(
interp_context
.variables
.get("map.failed")
.and_then(|v| v.as_str()),
Some("1")
);
assert_eq!(
interp_context
.variables
.get("map.total")
.and_then(|v| v.as_str()),
Some("4")
);
}
#[test]
fn test_reduce_phase_complex_variable_substitution() {
let env = ExecutionEnvironment {
working_dir: Arc::new(PathBuf::from("/test/worktree")),
project_dir: Arc::new(PathBuf::from("/test/project")),
worktree_name: Some(Arc::from("test-worktree")),
session_id: Arc::from("test-session"),
};
let mut reduce_context = AgentContext::new(
"reduce".to_string(),
PathBuf::from("/test/worktree"),
"test-worktree".to_string(),
env,
);
reduce_context
.variables
.insert("map.successful".to_string(), "5".to_string());
reduce_context
.variables
.insert("map.failed".to_string(), "2".to_string());
reduce_context
.variables
.insert("map.total".to_string(), "7".to_string());
reduce_context.captured_outputs.insert(
"claude.output".to_string(),
"Debt reduction analysis: 30% improvement".to_string(),
);
reduce_context
.variables
.insert("result.0.item_id".to_string(), "debt-item-1".to_string());
reduce_context
.variables
.insert("result.0.status".to_string(), "success".to_string());
reduce_context
.variables
.insert("result.1.item_id".to_string(), "debt-item-2".to_string());
reduce_context
.variables
.insert("result.1.status".to_string(), "failed".to_string());
let interp_context = reduce_context.to_interpolation_context();
assert_eq!(
interp_context
.variables
.get("map.successful")
.and_then(|v| v.as_str()),
Some("5")
);
assert_eq!(
interp_context
.variables
.get("claude.output")
.and_then(|v| v.as_str()),
Some("Debt reduction analysis: 30% improvement")
);
assert_eq!(
interp_context
.variables
.get("result.0.item_id")
.and_then(|v| v.as_str()),
Some("debt-item-1")
);
}
#[test]
fn test_reduce_context_has_map_variables() {
let env = ExecutionEnvironment {
working_dir: Arc::new(PathBuf::from("/test/worktree")),
project_dir: Arc::new(PathBuf::from("/test/project")),
worktree_name: Some(Arc::from("test-worktree")),
session_id: Arc::from("test-session"),
};
let mut reduce_context = AgentContext::new(
"reduce".to_string(),
(*env.working_dir).clone(),
"test-worktree".to_string(),
env.clone(),
);
let successful_count = 3;
let failed_count = 1;
let total = 4;
reduce_context
.variables
.insert("map.successful".to_string(), successful_count.to_string());
reduce_context
.variables
.insert("map.failed".to_string(), failed_count.to_string());
reduce_context
.variables
.insert("map.total".to_string(), total.to_string());
let interp = reduce_context.to_interpolation_context();
assert!(interp.variables.contains_key("map.successful"));
assert!(interp.variables.contains_key("map.failed"));
assert!(interp.variables.contains_key("map.total"));
let _test_command = "echo 'Processed ${map.successful} of ${map.total} items'";
assert_eq!(
interp
.variables
.get("map.successful")
.and_then(|v| v.as_str()),
Some("3")
);
}
#[test]
fn test_custom_capture_output_variables() {
use crate::cook::workflow::CaptureOutput;
assert!(!CaptureOutput::Disabled.is_enabled());
assert!(CaptureOutput::Default.is_enabled());
assert!(CaptureOutput::Variable("my_output".to_string()).is_enabled());
let claude_cmd = CommandType::Claude("test".to_string());
let shell_cmd = CommandType::Shell("echo test".to_string());
assert_eq!(CaptureOutput::Disabled.get_variable_name(&claude_cmd), None);
assert_eq!(
CaptureOutput::Default.get_variable_name(&claude_cmd),
Some("claude.output".to_string())
);
assert_eq!(
CaptureOutput::Default.get_variable_name(&shell_cmd),
Some("shell.output".to_string())
);
assert_eq!(
CaptureOutput::Variable("custom_var".to_string()).get_variable_name(&claude_cmd),
Some("custom_var".to_string())
);
assert_eq!(
CaptureOutput::Variable("my.special.output".to_string()).get_variable_name(&shell_cmd),
Some("my.special.output".to_string())
);
}
#[test]
fn test_interpolation_with_work_item() {
let mut engine = InterpolationEngine::new(false);
let mut context = InterpolationContext::new();
let item = json!({
"id": 123,
"description": "Fix memory leak in parser",
"priority": "high",
"location": {
"file": "src/parser.rs",
"line": 45
}
});
context.set("item", item);
let tests = vec![
("Task ${item.id}", "Task 123"),
("Fix: ${item.description}", "Fix: Fix memory leak in parser"),
("Priority: ${item.priority}", "Priority: high"),
(
"File: ${item.location.file}:${item.location.line}",
"File: src/parser.rs:45",
),
];
for (template, expected) in tests {
let result = engine.interpolate(template, &context).unwrap();
assert_eq!(result, expected, "Failed for template: {}", template);
}
}
#[test]
fn test_interpolation_with_map_results() {
let mut engine = InterpolationEngine::new(false);
let mut context = InterpolationContext::new();
context.set(
"map",
json!({
"successful": 8,
"failed": 2,
"total": 10
}),
);
let template =
"Processed ${map.total} items: ${map.successful} successful, ${map.failed} failed";
let result = engine.interpolate(template, &context).unwrap();
assert_eq!(result, "Processed 10 items: 8 successful, 2 failed");
}
#[test]
fn test_interpolation_with_shell_output() {
let mut engine = InterpolationEngine::new(false);
let mut context = InterpolationContext::new();
context.set(
"shell",
json!({
"output": "All tests passed",
"last_output": "Coverage: 85%"
}),
);
let template = "Previous output: ${shell.output}. ${shell.last_output}";
let result = engine.interpolate(template, &context).unwrap();
assert_eq!(result, "Previous output: All tests passed. Coverage: 85%");
}
#[test]
fn test_interpolation_with_defaults() {
let mut engine = InterpolationEngine::new(false);
let context = InterpolationContext::new();
let tests = vec![
("Timeout: ${timeout:-600}s", "Timeout: 600s"),
("Workers: ${workers:-10}", "Workers: 10"),
("Mode: ${mode:-parallel}", "Mode: parallel"),
];
for (template, expected) in tests {
let result = engine.interpolate(template, &context).unwrap();
assert_eq!(result, expected, "Failed for template: {}", template);
}
}
#[test]
fn test_interpolation_context_hierarchy() {
let mut engine = InterpolationEngine::new(false);
let mut parent = InterpolationContext::new();
parent.set("global_setting", json!("production"));
parent.set("max_workers", json!(20));
let mut child = parent.child();
child.set("local_setting", json!("debug"));
child.set("max_workers", json!(5));
let tests = vec![
("Mode: ${global_setting}", "Mode: production"),
("Debug: ${local_setting}", "Debug: debug"),
("Workers: ${max_workers}", "Workers: 5"), ];
for (template, expected) in tests {
let result = engine.interpolate(template, &child).unwrap();
assert_eq!(result, expected, "Failed for template: {}", template);
}
}
#[test]
fn test_interpolation_with_arrays() {
let mut engine = InterpolationEngine::new(false);
let mut context = InterpolationContext::new();
let results = json!([
{"id": "item1", "status": "success"},
{"id": "item2", "status": "failed"},
{"id": "item3", "status": "success"}
]);
context.set("results", results);
let tests = vec![
("First: ${results[0].id}", "First: item1"),
(
"Second status: ${results[1].status}",
"Second status: failed",
),
("Third: ${results[2].id}", "Third: item3"),
];
for (template, expected) in tests {
let result = engine.interpolate(template, &context).unwrap();
assert_eq!(result, expected, "Failed for template: {}", template);
}
}
#[test]
fn test_interpolation_strict_mode() {
let mut engine = InterpolationEngine::new(true); let context = InterpolationContext::new();
let result = engine.interpolate("Value: ${undefined}", &context);
assert!(result.is_err());
let result = engine.interpolate("Value: ${undefined:-default}", &context);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "Value: default");
}
#[cfg(test)]
mod command_type_tests {
use super::*;
use crate::cook::workflow::{CaptureOutput, WorkflowStep};
#[test]
fn test_collect_command_types_single_claude() {
let step = WorkflowStep {
claude: Some("/test-command".to_string()),
name: None,
shell: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
on_failure: None,
retry: None,
on_success: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_exit_code: HashMap::new(),
commit_required: true,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
};
let commands = crate::cook::execution::mapreduce::command::collect_command_types(&step);
assert_eq!(commands.len(), 1);
matches!(commands[0], crate::cook::workflow::CommandType::Claude(_));
}
#[test]
fn test_collect_command_types_single_shell() {
let step = WorkflowStep {
shell: Some("echo test".to_string()),
name: None,
claude: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
on_failure: None,
retry: None,
on_success: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_exit_code: HashMap::new(),
commit_required: true,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
};
let commands = crate::cook::execution::mapreduce::command::collect_command_types(&step);
assert_eq!(commands.len(), 1);
matches!(commands[0], crate::cook::workflow::CommandType::Shell(_));
}
#[test]
fn test_collect_command_types_multiple() {
let step = WorkflowStep {
claude: Some("/test-command".to_string()),
shell: Some("echo test".to_string()),
name: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
on_failure: None,
retry: None,
on_success: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_exit_code: HashMap::new(),
commit_required: true,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
};
let commands = crate::cook::execution::mapreduce::command::collect_command_types(&step);
assert_eq!(commands.len(), 2);
}
#[test]
fn test_collect_command_types_legacy_with_slash() {
let step = WorkflowStep {
name: Some("/legacy-command".to_string()),
claude: None,
shell: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
on_failure: None,
retry: None,
on_success: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_exit_code: HashMap::new(),
commit_required: true,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
};
let commands = crate::cook::execution::mapreduce::command::collect_command_types(&step);
assert_eq!(commands.len(), 1);
if let CommandType::Legacy(cmd) = &commands[0] {
assert_eq!(cmd, "/legacy-command");
} else {
panic!("Expected Legacy command type");
}
}
#[test]
fn test_collect_command_types_legacy_without_slash() {
let step = WorkflowStep {
name: Some("legacy-command".to_string()),
claude: None,
shell: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Disabled,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
on_failure: None,
retry: None,
on_success: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_exit_code: HashMap::new(),
commit_required: true,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
};
let commands = crate::cook::execution::mapreduce::command::collect_command_types(&step);
assert_eq!(commands.len(), 1);
if let CommandType::Legacy(cmd) = &commands[0] {
assert_eq!(cmd, "/legacy-command");
} else {
panic!("Expected Legacy command type");
}
}
#[test]
fn test_validate_command_count_empty() {
let commands = vec![];
let result = crate::cook::execution::mapreduce::command::validate_command_count(&commands);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("No command specified"));
}
#[test]
fn test_validate_command_count_single() {
let commands = vec![CommandType::Shell("echo test".to_string())];
let result = crate::cook::execution::mapreduce::command::validate_command_count(&commands);
assert!(result.is_ok());
}
#[test]
fn test_validate_command_count_multiple() {
let commands = vec![
CommandType::Shell("echo test".to_string()),
CommandType::Claude("/test".to_string()),
];
let result = crate::cook::execution::mapreduce::command::validate_command_count(&commands);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Multiple command types specified"));
}
}
#[cfg(test)]
mod merge_history_tests {
use crate::cook::commit_tracker::TrackedCommit;
use chrono::Utc;
use std::path::PathBuf;
#[test]
fn test_merge_preserves_commit_structure() {
let timestamp = Utc::now();
let commit = TrackedCommit {
hash: "abc123def456".to_string(),
message: "feat: add new feature".to_string(),
author: "Test Author".to_string(),
timestamp,
files_changed: vec![PathBuf::from("src/main.rs"), PathBuf::from("src/lib.rs")],
insertions: 50,
deletions: 10,
step_name: "map-agent-1".to_string(),
agent_id: Some("agent-001".to_string()),
};
let json = serde_json::to_string(&commit).unwrap();
let deserialized: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized["hash"], "abc123def456");
assert_eq!(deserialized["message"], "feat: add new feature");
assert_eq!(deserialized["author"], "Test Author");
assert_eq!(deserialized["insertions"], 50);
assert_eq!(deserialized["deletions"], 10);
assert_eq!(deserialized["step_name"], "map-agent-1");
assert_eq!(deserialized["agent_id"], "agent-001");
assert!(deserialized["files_changed"].is_array());
assert_eq!(deserialized["files_changed"].as_array().unwrap().len(), 2);
}
#[test]
fn test_multiple_commits_history_format() {
let timestamp = Utc::now();
let commits = vec![
TrackedCommit {
hash: "commit1".to_string(),
message: "First commit".to_string(),
author: "Author 1".to_string(),
timestamp,
files_changed: vec![PathBuf::from("file1.rs")],
insertions: 10,
deletions: 5,
step_name: "step1".to_string(),
agent_id: Some("agent-1".to_string()),
},
TrackedCommit {
hash: "commit2".to_string(),
message: "Second commit".to_string(),
author: "Author 2".to_string(),
timestamp: timestamp + chrono::Duration::minutes(5),
files_changed: vec![PathBuf::from("file2.rs")],
insertions: 20,
deletions: 3,
step_name: "step2".to_string(),
agent_id: Some("agent-2".to_string()),
},
TrackedCommit {
hash: "commit3".to_string(),
message: "Third commit".to_string(),
author: "Author 3".to_string(),
timestamp: timestamp + chrono::Duration::minutes(10),
files_changed: vec![PathBuf::from("file3.rs")],
insertions: 15,
deletions: 8,
step_name: "step3".to_string(),
agent_id: None,
},
];
for (i, commit) in commits.iter().enumerate() {
assert_eq!(commit.hash, format!("commit{}", i + 1));
assert!(commit
.message
.contains(&["First", "Second", "Third"][i].to_string()));
}
let json = serde_json::to_string(&commits).unwrap();
let parsed: Vec<serde_json::Value> = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.len(), 3);
assert!(parsed[0]["agent_id"].is_string());
assert!(parsed[1]["agent_id"].is_string());
assert!(parsed[2]["agent_id"].is_null());
}
#[test]
fn test_worktree_merge_scenario() {
let base_time = Utc::now();
let agent_commits = vec![
TrackedCommit {
hash: "agent1hash".to_string(),
message: "feat: implement feature A".to_string(),
author: "Agent".to_string(),
timestamp: base_time,
files_changed: vec![PathBuf::from("feature_a.rs")],
insertions: 100,
deletions: 0,
step_name: "map-phase".to_string(),
agent_id: Some("worker-1".to_string()),
},
TrackedCommit {
hash: "agent2hash".to_string(),
message: "test: add tests for feature A".to_string(),
author: "Agent".to_string(),
timestamp: base_time + chrono::Duration::minutes(1),
files_changed: vec![PathBuf::from("tests/feature_a_test.rs")],
insertions: 50,
deletions: 0,
step_name: "map-phase".to_string(),
agent_id: Some("worker-1".to_string()),
},
];
for commit in &agent_commits {
assert!(!commit.hash.is_empty());
assert!(!commit.message.is_empty());
assert!(commit.agent_id.is_some());
assert_eq!(commit.step_name, "map-phase");
}
let mut full_history = Vec::new();
full_history.push(TrackedCommit {
hash: "parenthash".to_string(),
message: "Previous work on main branch".to_string(),
author: "Developer".to_string(),
timestamp: base_time - chrono::Duration::hours(1),
files_changed: vec![PathBuf::from("main.rs")],
insertions: 30,
deletions: 10,
step_name: "initial".to_string(),
agent_id: None,
});
full_history.extend(agent_commits);
assert_eq!(full_history.len(), 3);
assert!(full_history[0].agent_id.is_none()); assert!(full_history[1].agent_id.is_some()); assert!(full_history[2].agent_id.is_some()); }
}
#[cfg(test)]
pub mod test_mocks {
use super::*;
use crate::cook::execution::events::MapReduceEvent;
use crate::worktree::WorktreeSession;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex as StdMutex};
pub struct MockWorktreeManager {
pub sessions: Arc<StdMutex<Vec<WorktreeSession>>>,
pub fail_on_create: bool,
pub fail_on_cleanup: bool,
pub create_delay_ms: u64,
}
impl Default for MockWorktreeManager {
fn default() -> Self {
Self::new()
}
}
impl MockWorktreeManager {
pub fn new() -> Self {
Self {
sessions: Arc::new(StdMutex::new(Vec::new())),
fail_on_create: false,
fail_on_cleanup: false,
create_delay_ms: 0,
}
}
pub async fn create_worktree(&self, branch_name: &str) -> anyhow::Result<WorktreeSession> {
if self.create_delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(self.create_delay_ms)).await;
}
if self.fail_on_create {
anyhow::bail!("Mock worktree creation failed");
}
let session = WorktreeSession {
name: format!("mock-worktree-{}", branch_name),
path: PathBuf::from(format!("/mock/worktree/{}", branch_name)),
branch: branch_name.to_string(),
created_at: chrono::Utc::now(),
};
self.sessions.lock().unwrap().push(session.clone());
Ok(session)
}
pub async fn cleanup_worktree(&self, _name: &str) -> anyhow::Result<()> {
if self.fail_on_cleanup {
anyhow::bail!("Mock worktree cleanup failed");
}
Ok(())
}
}
pub struct MockCommandExecutor {
pub responses: Arc<StdMutex<VecDeque<(bool, String)>>>,
pub executed_commands: Arc<StdMutex<Vec<String>>>,
pub fail_on_command: Option<String>,
pub execution_delay_ms: u64,
}
impl Default for MockCommandExecutor {
fn default() -> Self {
Self::new()
}
}
impl MockCommandExecutor {
pub fn new() -> Self {
Self {
responses: Arc::new(StdMutex::new(VecDeque::new())),
executed_commands: Arc::new(StdMutex::new(Vec::new())),
fail_on_command: None,
execution_delay_ms: 0,
}
}
pub fn add_response(&self, success: bool, output: String) {
self.responses.lock().unwrap().push_back((success, output));
}
}
pub struct MockEventLogger {
pub events: Arc<StdMutex<Vec<MapReduceEvent>>>,
pub fail_on_log: bool,
}
impl Default for MockEventLogger {
fn default() -> Self {
Self::new()
}
}
impl MockEventLogger {
pub fn new() -> Self {
Self {
events: Arc::new(StdMutex::new(Vec::new())),
fail_on_log: false,
}
}
pub fn get_events(&self) -> Vec<MapReduceEvent> {
self.events.lock().unwrap().clone()
}
pub async fn log_event(&self, event: MapReduceEvent) -> anyhow::Result<()> {
if self.fail_on_log {
anyhow::bail!("Mock event logging failed");
}
self.events.lock().unwrap().push(event);
Ok(())
}
}
}
#[cfg(test)]
mod concurrency_tests {
use super::test_mocks::*;
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;
#[tokio::test]
async fn test_parallel_agent_execution_limits() {
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 3,
max_items: Some(10),
offset: None,
};
let semaphore = Arc::new(Semaphore::new(config.max_parallel));
let mut handles = vec![];
for i in 0..10 {
let sem = semaphore.clone();
let concurrent = concurrent_count.clone();
let max_con = max_concurrent.clone();
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
loop {
let max = max_con.load(Ordering::SeqCst);
if current <= max {
break;
}
if max_con
.compare_exchange(max, current, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
concurrent.fetch_sub(1, Ordering::SeqCst);
format!("Agent {} completed", i)
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let max_observed = max_concurrent.load(Ordering::SeqCst);
assert!(
max_observed <= 3,
"Max concurrent agents ({}) exceeded limit of 3",
max_observed
);
}
#[tokio::test]
async fn test_agent_timeout_handling() {
let _config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "test.json".to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 2,
max_items: None,
offset: None,
};
let start = std::time::Instant::now();
let result = tokio::time::timeout(Duration::from_secs(1), async {
tokio::time::sleep(Duration::from_secs(5)).await;
"Should timeout"
})
.await;
assert!(result.is_err(), "Agent should have timed out");
assert!(
start.elapsed() < Duration::from_secs(2),
"Timeout took too long"
);
}
#[tokio::test]
async fn test_concurrent_worktree_creation() {
let manager = Arc::new(MockWorktreeManager::new());
let mut handles = vec![];
for i in 0..5 {
let mgr = manager.clone();
let handle =
tokio::spawn(async move { mgr.create_worktree(&format!("agent-{}", i)).await });
handles.push(handle);
}
let mut results = vec![];
for handle in handles {
results.push(handle.await.unwrap());
}
assert_eq!(results.iter().filter(|r| r.is_ok()).count(), 5);
let sessions = manager.sessions.lock().unwrap();
assert_eq!(sessions.len(), 5);
}
#[tokio::test]
async fn test_agent_race_conditions() {
let results = Arc::new(Mutex::new(Vec::<AgentResult>::new()));
let mut handles = vec![];
for i in 0..20 {
let res = results.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(i as u64 % 10)).await;
let result = AgentResult {
item_id: format!("item-{}", i),
status: AgentStatus::Success,
output: Some(format!("Output {}", i)),
commits: vec![],
duration: Duration::from_millis(i as u64),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
};
res.lock().await.push(result);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let final_results = results.lock().await;
assert_eq!(final_results.len(), 20);
let mut ids: Vec<String> = final_results.iter().map(|r| r.item_id.clone()).collect();
ids.sort();
ids.dedup();
assert_eq!(ids.len(), 20);
}
}
#[cfg(test)]
mod dlq_tests {
use super::*;
use crate::cook::execution::dlq::{ErrorType, FailureDetail};
use chrono::Utc;
use serde_json::Value;
use std::collections::HashMap;
#[test]
fn test_dlq_failure_detail_serialization() {
let failure_detail = FailureDetail {
attempt_number: 2,
timestamp: Utc::now(),
error_type: ErrorType::CommandFailed { exit_code: 1 },
error_message: "Command execution failed".to_string(),
error_context: None,
stack_trace: Some("Stack trace here".to_string()),
agent_id: "agent-123".to_string(),
step_failed: "map-phase".to_string(),
duration_ms: 1500,
json_log_location: None,
};
let json = serde_json::to_value(&failure_detail).unwrap();
let deserialized: FailureDetail = serde_json::from_value(json).unwrap();
assert_eq!(deserialized.error_message, failure_detail.error_message);
assert_eq!(deserialized.attempt_number, 2);
}
#[test]
fn test_dlq_error_type_categorization() {
let error_types = vec![
ErrorType::Timeout,
ErrorType::CommandFailed { exit_code: 1 },
ErrorType::WorktreeError,
ErrorType::ValidationFailed,
ErrorType::Unknown,
];
for err_type in error_types {
let detail = FailureDetail {
attempt_number: 1,
timestamp: Utc::now(),
error_type: err_type.clone(),
error_message: "Test message".to_string(),
error_context: None,
stack_trace: None,
agent_id: "test-agent".to_string(),
step_failed: "test-step".to_string(),
duration_ms: 1000,
json_log_location: None,
};
let json = serde_json::to_value(&detail).unwrap();
assert!(json.is_object());
}
}
#[tokio::test]
async fn test_dlq_failed_item_tracking() {
let mut failed_items: HashMap<String, Value> = HashMap::new();
for i in 0..5 {
let item_id = format!("item-{}", i);
let item_data = json!({
"id": item_id.clone(),
"failure": {
"error_type": if i % 2 == 0 { "Timeout" } else { "CommandExecution" },
"message": format!("Failed item {}", i),
"retry_count": i,
}
});
failed_items.insert(item_id, item_data);
}
assert_eq!(failed_items.len(), 5);
failed_items.remove("item-0");
failed_items.remove("item-2");
failed_items.remove("item-4");
assert_eq!(failed_items.len(), 2);
assert!(failed_items.contains_key("item-1"));
assert!(failed_items.contains_key("item-3"));
}
#[tokio::test]
async fn test_dlq_persistence_simulation() {
use std::fs;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let dlq_path = temp_dir.path().join("dlq");
fs::create_dir_all(&dlq_path).unwrap();
let item_data = json!({
"item_id": "test-item",
"error": "Test failure",
"retry_count": 1
});
let item_file = dlq_path.join("test-item.json");
fs::write(&item_file, item_data.to_string()).unwrap();
let content = fs::read_to_string(&item_file).unwrap();
let parsed: Value = serde_json::from_str(&content).unwrap();
assert_eq!(parsed["item_id"], "test-item");
assert_eq!(parsed["retry_count"], 1);
}
}
#[cfg(test)]
mod recovery_tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
#[tokio::test]
async fn test_sigint_handling_simulation() {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
let handle = tokio::spawn(async move {
for i in 0..100 {
if shutdown_clone.load(Ordering::Relaxed) {
return Ok::<_, String>(format!("Interrupted at item {}", i));
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
Ok("Completed all items".to_string())
});
tokio::time::sleep(Duration::from_millis(50)).await;
shutdown.store(true, Ordering::Relaxed);
let result = handle.await.unwrap().unwrap();
assert!(result.contains("Interrupted"));
}
#[tokio::test]
async fn test_checkpoint_data_integrity() {
use serde_json::Value;
let checkpoint_data = json!({
"job_id": "test-job-123",
"work_items": [
{"id": 1, "data": "item1"},
{"id": 2, "data": "item2"}
],
"completed_agents": ["agent-1", "agent-2"],
"checkpoint_version": 1
});
let serialized = serde_json::to_string(&checkpoint_data).unwrap();
let deserialized: Value = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized["job_id"], "test-job-123");
assert_eq!(deserialized["work_items"].as_array().unwrap().len(), 2);
assert_eq!(
deserialized["completed_agents"].as_array().unwrap().len(),
2
);
assert_eq!(deserialized["checkpoint_version"], 1);
}
#[tokio::test]
#[allow(clippy::unnecessary_to_owned)]
async fn test_recovery_with_partial_results() {
let mut completed_items = std::collections::HashSet::new();
completed_items.insert("item-1".to_string());
completed_items.insert("item-3".to_string());
completed_items.insert("item-5".to_string());
let total_items = ["item-1", "item-2", "item-3", "item-4", "item-5"];
let remaining: Vec<_> = total_items
.iter()
.filter(|item| !completed_items.contains(&item.to_string()))
.collect();
assert_eq!(remaining.len(), 2);
assert!(remaining.contains(&&"item-2"));
assert!(remaining.contains(&&"item-4"));
}
}
#[cfg(test)]
mod additional_coverage_tests {
use super::*;
use crate::cook::workflow::WorkflowStep;
#[test]
fn test_setup_phase_edge_cases() {
let setup_phase = SetupPhase {
commands: vec![WorkflowStep {
shell: Some("echo 'setup'".to_string()),
claude: None,
name: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Variable("setup_output".to_string()),
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
timeout: Some(30),
working_dir: None,
env: HashMap::new(),
on_failure: None,
retry: None,
on_success: None,
on_exit_code: HashMap::new(),
commit_required: false,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
}],
timeout: Some(60),
capture_outputs: HashMap::from([(
"setup_output".to_string(),
crate::cook::execution::variable_capture::CaptureConfig::Simple(0),
)]),
};
let serialized = serde_json::to_string(&setup_phase).unwrap();
let deserialized: SetupPhase = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.commands.len(), 1);
assert_eq!(deserialized.timeout, Some(60));
assert_eq!(deserialized.capture_outputs.len(), 1);
}
#[test]
fn test_map_phase_with_all_options() {
let map_phase = MapPhase {
config: MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "data.json".to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 50,
max_items: Some(100),
offset: Some(10),
},
agent_template: vec![WorkflowStep {
claude: Some("/process ${item}".to_string()),
shell: None,
name: None,
test: None,
foreach: None,
write_file: None,
command: None,
handler: None,
capture_output: CaptureOutput::Default,
capture: None,
capture_format: None,
capture_streams: Default::default(),
output_file: None,
timeout: None,
working_dir: None,
env: HashMap::new(),
on_failure: None,
retry: None,
on_success: None,
on_exit_code: HashMap::new(),
commit_required: false,
auto_commit: false,
commit_config: None,
validate: None,
step_validate: None,
skip_validation: false,
validation_timeout: None,
ignore_validation_failure: false,
when: None,
}],
json_path: Some("$.items[*]".to_string()),
filter: Some("item.priority == 'high'".to_string()),
sort_by: Some("item.created_at DESC".to_string()),
max_items: Some(100),
distinct: Some("item.id".to_string()),
timeout_config: None,
workflow_env: HashMap::new(),
};
assert_eq!(map_phase.config.max_parallel, 50);
assert_eq!(map_phase.config.max_items, Some(100));
assert_eq!(map_phase.config.offset, Some(10));
assert!(map_phase.filter.is_some());
assert!(map_phase.sort_by.is_some());
assert!(map_phase.distinct.is_some());
}
#[test]
fn test_resume_options_edge_cases() {
let options = vec![
ResumeOptions::default(),
ResumeOptions {
reprocess_failed: true,
max_parallel: Some(5),
skip_validation: true,
agent_timeout_secs: Some(300),
},
ResumeOptions {
reprocess_failed: false,
max_parallel: None,
skip_validation: false,
agent_timeout_secs: None,
},
];
for opt in options {
let serialized = serde_json::to_string(&opt).unwrap();
let deserialized: ResumeOptions = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.reprocess_failed, opt.reprocess_failed);
assert_eq!(deserialized.max_parallel, opt.max_parallel);
assert_eq!(deserialized.skip_validation, opt.skip_validation);
assert_eq!(deserialized.agent_timeout_secs, opt.agent_timeout_secs);
}
}
#[test]
fn test_agent_result_edge_cases() {
let minimal_result = AgentResult {
item_id: "minimal".to_string(),
status: AgentStatus::Pending,
output: None,
commits: vec![],
duration: Duration::from_secs(0),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
};
assert_eq!(minimal_result.item_id, "minimal");
assert!(matches!(minimal_result.status, AgentStatus::Pending));
assert!(minimal_result.output.is_none());
assert!(minimal_result.commits.is_empty());
let error_result = AgentResult {
item_id: "error-item".to_string(),
status: AgentStatus::Failed("Test failure".to_string()),
output: None,
commits: vec![],
duration: Duration::from_secs(5),
error: Some("Detailed error message".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
};
assert!(matches!(error_result.status, AgentStatus::Failed(_)));
assert!(error_result.error.is_some());
let serialized = serde_json::to_string(&error_result).unwrap();
let deserialized: AgentResult = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.item_id, error_result.item_id);
assert_eq!(deserialized.error, error_result.error);
}
#[test]
fn test_large_agent_result_collection() {
let mut results = Vec::new();
for i in 0..50 {
results.push(AgentResult {
item_id: format!("item-{}", i),
status: if i % 5 == 0 {
AgentStatus::Failed(format!("Failed item {}", i))
} else if i % 3 == 0 {
AgentStatus::Retrying(1)
} else {
AgentStatus::Success
},
output: Some(format!("Output for item {}", i)),
commits: if i % 2 == 0 {
vec![format!("commit-{}", i)]
} else {
vec![]
},
duration: Duration::from_secs(i as u64),
error: if i % 5 == 0 {
Some(format!("Error on item {}", i))
} else {
None
},
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
});
}
assert_eq!(results.len(), 50);
let success_count = results
.iter()
.filter(|r| matches!(r.status, AgentStatus::Success))
.count();
let failed_count = results
.iter()
.filter(|r| matches!(r.status, AgentStatus::Failed(_)))
.count();
let retrying_count = results
.iter()
.filter(|r| matches!(r.status, AgentStatus::Retrying(_)))
.count();
assert!(success_count > 0);
assert!(failed_count > 0);
assert!(retrying_count > 0);
let serialized = serde_json::to_string(&results).unwrap();
let deserialized: Vec<AgentResult> = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.len(), 50);
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
use serde_json::Value;
use tempfile::TempDir;
#[tokio::test]
async fn test_end_to_end_mapreduce_flow() {
let temp_dir = TempDir::new().unwrap();
let work_dir = temp_dir.path();
let input_file = work_dir.join("test_items.json");
let items = json!({
"items": [
{"id": 1, "name": "task1", "priority": "high"},
{"id": 2, "name": "task2", "priority": "low"},
{"id": 3, "name": "task3", "priority": "high"},
]
});
std::fs::write(&input_file, items.to_string()).unwrap();
let map_config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: input_file.to_string_lossy().to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 2,
max_items: None,
offset: None,
};
assert_eq!(map_config.max_parallel, 2);
let json_content = std::fs::read_to_string(&input_file).unwrap();
let parsed: Value = serde_json::from_str(&json_content).unwrap();
if let Some(items_array) = parsed.get("items").and_then(|v| v.as_array()) {
assert_eq!(items_array.len(), 3);
assert_eq!(items_array[0]["priority"], "high");
}
}
#[tokio::test]
async fn test_filter_and_sort_operations() {
let items = [
json!({"id": 1, "priority": 3, "status": "active"}),
json!({"id": 2, "priority": 1, "status": "inactive"}),
json!({"id": 3, "priority": 2, "status": "active"}),
json!({"id": 4, "priority": 1, "status": "active"}),
];
let filtered: Vec<_> = items
.iter()
.filter(|item| item["status"] == "active")
.cloned()
.collect();
assert_eq!(filtered.len(), 3);
let mut sorted = filtered.clone();
sorted.sort_by_key(|item| item["priority"].as_u64().unwrap_or(0));
assert_eq!(sorted[0]["id"], 4);
assert_eq!(sorted[1]["id"], 3);
assert_eq!(sorted[2]["id"], 1);
}
#[tokio::test]
async fn test_offset_and_limit_processing() {
let items: Vec<Value> = (0..100)
.map(|i| json!({"id": i, "value": format!("item_{}", i)}))
.collect();
let config = MapReduceConfig {
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
input: "dummy".to_string(),
json_path: "$[*]".to_string(),
max_parallel: 5,
max_items: Some(20),
offset: Some(10),
};
let start = config.offset.unwrap_or(0);
let processed: Vec<_> = items
.iter()
.skip(start)
.take(config.max_items.unwrap_or(items.len()))
.collect();
assert_eq!(processed.len(), 20);
assert_eq!(processed[0]["id"], 10);
assert_eq!(processed[19]["id"], 29);
}
}