use crate::cook::execution::mapreduce::{
agent::AgentResult,
checkpoint::{
CheckpointConfig, CheckpointId, CheckpointManager, CheckpointReason, FileCheckpointStorage,
MapReduceCheckpoint as Checkpoint, PhaseType, WorkItem, WorkItemProgress, WorkItemState,
},
coordination::MapReduceCoordinator,
types::{MapPhase, ReducePhase, SetupPhase},
};
use crate::cook::orchestrator::ExecutionEnvironment;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::info;
pub struct CheckpointedCoordinator {
_coordinator: MapReduceCoordinator,
checkpoint_manager: Arc<CheckpointManager>,
current_checkpoint: Arc<RwLock<Option<Checkpoint>>>,
last_checkpoint_time: Arc<RwLock<DateTime<Utc>>>,
items_since_checkpoint: Arc<RwLock<usize>>,
job_id: String,
}
impl CheckpointedCoordinator {
pub fn new(
coordinator: MapReduceCoordinator,
checkpoint_storage_path: PathBuf,
job_id: String,
) -> Self {
let config = CheckpointConfig::default();
let storage = Box::new(FileCheckpointStorage::new(checkpoint_storage_path, true));
let checkpoint_manager = Arc::new(CheckpointManager::new(storage, config, job_id.clone()));
Self {
_coordinator: coordinator,
checkpoint_manager,
current_checkpoint: Arc::new(RwLock::new(None)),
last_checkpoint_time: Arc::new(RwLock::new(Utc::now())),
items_since_checkpoint: Arc::new(RwLock::new(0)),
job_id,
}
}
pub async fn execute_job_with_checkpoints(
&self,
setup: Option<SetupPhase>,
map_phase: MapPhase,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
checkpoint_id: Option<CheckpointId>,
) -> Result<Vec<AgentResult>> {
if let Some(checkpoint_id) = checkpoint_id {
info!("Resuming job from checkpoint {}", checkpoint_id);
return self
.resume_from_checkpoint(checkpoint_id, setup, map_phase, reduce, env)
.await;
}
info!("Starting new job execution with checkpoint support");
self.initialize_checkpoint_state(&map_phase).await?;
let results = self
.execute_with_checkpoints(setup, map_phase, reduce, env)
.await?;
Ok(results)
}
async fn initialize_checkpoint_state(&self, map_phase: &MapPhase) -> Result<()> {
let checkpoint = create_initial_checkpoint(&self.job_id, map_phase.config.max_parallel);
*self.current_checkpoint.write().await = Some(checkpoint);
*self.last_checkpoint_time.write().await = Utc::now();
Ok(())
}
async fn execute_with_checkpoints(
&self,
setup: Option<SetupPhase>,
map_phase: MapPhase,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
if let Some(setup_phase) = setup {
self.execute_setup_with_checkpoint(setup_phase, env).await?;
}
let map_results = self.execute_map_with_checkpoints(map_phase, env).await?;
if let Some(reduce_phase) = reduce {
self.execute_reduce_with_checkpoint(reduce_phase, &map_results, env)
.await?;
}
self.save_checkpoint(CheckpointReason::PhaseTransition)
.await?;
Ok(map_results)
}
async fn execute_setup_with_checkpoint(
&self,
_setup: SetupPhase,
_env: &ExecutionEnvironment,
) -> Result<()> {
info!("Executing setup phase with checkpoint support");
if let Some(ref mut checkpoint) = *self.current_checkpoint.write().await {
checkpoint.metadata.phase = PhaseType::Setup;
checkpoint.execution_state.current_phase = PhaseType::Setup;
}
self.save_checkpoint(CheckpointReason::PhaseTransition)
.await?;
Ok(())
}
async fn execute_map_with_checkpoints(
&self,
map_phase: MapPhase,
env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
info!("Executing map phase with checkpoint support");
if let Some(ref mut checkpoint) = *self.current_checkpoint.write().await {
update_checkpoint_to_map_phase(checkpoint);
}
let work_items = self.load_work_items(&map_phase).await?;
let total_items = work_items.len();
if let Some(ref mut checkpoint) = *self.current_checkpoint.write().await {
checkpoint.metadata.total_work_items = total_items;
checkpoint.work_item_state.pending_items = create_work_items(work_items);
}
self.save_checkpoint(CheckpointReason::PhaseTransition)
.await?;
let mut all_results = Vec::new();
while let Some(batch) = self.get_next_batch(map_phase.config.max_parallel).await {
let batch_results = self.process_batch(batch, &map_phase, env).await?;
self.update_checkpoint_with_results(&batch_results).await?;
all_results.extend(batch_results);
if self.should_checkpoint().await {
self.save_checkpoint(CheckpointReason::Interval).await?;
*self.items_since_checkpoint.write().await = 0;
}
}
self.save_checkpoint(CheckpointReason::PhaseTransition)
.await?;
Ok(all_results)
}
async fn execute_reduce_with_checkpoint(
&self,
_reduce: ReducePhase,
_map_results: &[AgentResult],
_env: &ExecutionEnvironment,
) -> Result<()> {
info!("Executing reduce phase with checkpoint support");
if let Some(ref mut checkpoint) = *self.current_checkpoint.write().await {
checkpoint.metadata.phase = PhaseType::Reduce;
checkpoint.execution_state.current_phase = PhaseType::Reduce;
}
self.save_checkpoint(CheckpointReason::PhaseTransition)
.await?;
Ok(())
}
async fn resume_from_checkpoint(
&self,
checkpoint_id: CheckpointId,
setup: Option<SetupPhase>,
map_phase: MapPhase,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
let resume_state = self
.checkpoint_manager
.resume_from_checkpoint(Some(checkpoint_id))
.await
.context("Failed to load checkpoint")?;
*self.current_checkpoint.write().await = Some(resume_state.checkpoint.clone());
match resume_state.checkpoint.metadata.phase {
PhaseType::Setup => self.resume_from_setup(setup, map_phase, reduce, env).await,
PhaseType::Map => {
self.resume_from_map(resume_state, map_phase, reduce, env)
.await
}
PhaseType::Reduce => self.resume_from_reduce(resume_state, reduce, env).await,
PhaseType::Complete => Ok(extract_agent_results(&resume_state.checkpoint)),
}
}
async fn resume_from_setup(
&self,
setup: Option<SetupPhase>,
map_phase: MapPhase,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
info!("Resuming from setup phase");
self.execute_with_checkpoints(setup, map_phase, reduce, env)
.await
}
async fn resume_from_map(
&self,
resume_state: crate::cook::execution::mapreduce::checkpoint::ResumeState,
map_phase: MapPhase,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
info!(
"Resuming from map phase with {} pending items",
resume_state.work_items.pending_items.len()
);
restore_work_items(&self.current_checkpoint, resume_state.work_items).await;
let mut all_results = extract_agent_results(&resume_state.checkpoint);
self.continue_map_processing(&mut all_results, &map_phase, env)
.await?;
execute_reduce_if_needed(self, reduce, &all_results, env).await?;
Ok(all_results)
}
async fn continue_map_processing(
&self,
all_results: &mut Vec<AgentResult>,
map_phase: &MapPhase,
env: &ExecutionEnvironment,
) -> Result<()> {
while let Some(batch) = self.get_next_batch(map_phase.config.max_parallel).await {
let batch_results = self.process_batch(batch, map_phase, env).await?;
self.update_checkpoint_with_results(&batch_results).await?;
all_results.extend(batch_results);
if self.should_checkpoint().await {
self.save_checkpoint(CheckpointReason::Interval).await?;
*self.items_since_checkpoint.write().await = 0;
}
}
Ok(())
}
async fn resume_from_reduce(
&self,
resume_state: crate::cook::execution::mapreduce::checkpoint::ResumeState,
reduce: Option<ReducePhase>,
env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
info!("Resuming from reduce phase");
let all_results = extract_agent_results(&resume_state.checkpoint);
execute_reduce_if_needed(self, reduce, &all_results, env).await?;
Ok(all_results)
}
async fn get_next_batch(&self, max_size: usize) -> Option<Vec<WorkItem>> {
let mut checkpoint = self.current_checkpoint.write().await;
let cp = checkpoint.as_mut()?;
let batch = extract_batch_from_pending(&mut cp.work_item_state.pending_items, max_size)?;
mark_items_in_progress(&mut cp.work_item_state.in_progress_items, &batch);
Some(batch)
}
async fn process_batch(
&self,
batch: Vec<WorkItem>,
_map_phase: &MapPhase,
_env: &ExecutionEnvironment,
) -> Result<Vec<AgentResult>> {
let results = simulate_batch_processing(batch);
*self.items_since_checkpoint.write().await += results.len();
Ok(results)
}
async fn update_checkpoint_with_results(&self, results: &[AgentResult]) -> Result<()> {
let mut checkpoint = self.current_checkpoint.write().await;
let cp = match checkpoint.as_mut() {
Some(cp) => cp,
None => return Ok(()),
};
for result in results {
if let Some(progress) = cp.work_item_state.in_progress_items.remove(&result.item_id) {
update_work_item_status(cp, result, progress);
cp.agent_state
.agent_results
.insert(result.item_id.clone(), result.clone());
}
}
Ok(())
}
async fn should_checkpoint(&self) -> bool {
let items = *self.items_since_checkpoint.read().await;
let last_time = *self.last_checkpoint_time.read().await;
self.checkpoint_manager.should_checkpoint(items, last_time)
}
async fn save_checkpoint(&self, reason: CheckpointReason) -> Result<()> {
let checkpoint = self.current_checkpoint.read().await;
if let Some(ref cp) = *checkpoint {
let checkpoint_id = self
.checkpoint_manager
.create_checkpoint(cp, reason)
.await?;
*self.last_checkpoint_time.write().await = Utc::now();
info!(
"Saved checkpoint {} with {} completed items",
checkpoint_id, cp.metadata.completed_items
);
}
Ok(())
}
async fn load_work_items(&self, _map_phase: &MapPhase) -> Result<Vec<Value>> {
Ok(vec![])
}
}
pub fn create_checkpointed_coordinator(
coordinator: MapReduceCoordinator,
checkpoint_path: PathBuf,
job_id: String,
) -> CheckpointedCoordinator {
CheckpointedCoordinator::new(coordinator, checkpoint_path, job_id)
}
fn create_initial_checkpoint(job_id: &str, max_parallel: usize) -> Checkpoint {
Checkpoint {
metadata: create_checkpoint_metadata(job_id),
execution_state: create_execution_state(),
work_item_state: create_work_item_state(),
agent_state: create_agent_state(),
variable_state: create_variable_state(),
resource_state: create_resource_state(max_parallel),
error_state: create_error_state(),
}
}
fn create_checkpoint_metadata(
job_id: &str,
) -> crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
use crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata;
CheckpointMetadata {
checkpoint_id: String::new(),
job_id: job_id.to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Setup,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
}
}
fn create_execution_state() -> crate::cook::execution::mapreduce::checkpoint::ExecutionState {
use crate::cook::execution::mapreduce::checkpoint::ExecutionState;
ExecutionState {
current_phase: PhaseType::Setup,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: HashMap::new(),
}
}
fn create_work_item_state() -> WorkItemState {
WorkItemState {
pending_items: vec![],
in_progress_items: HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
}
}
fn create_agent_state() -> crate::cook::execution::mapreduce::checkpoint::AgentState {
use crate::cook::execution::mapreduce::checkpoint::AgentState;
AgentState {
active_agents: HashMap::new(),
agent_assignments: HashMap::new(),
agent_results: HashMap::new(),
resource_allocation: HashMap::new(),
}
}
fn create_variable_state() -> crate::cook::execution::mapreduce::checkpoint::VariableState {
use crate::cook::execution::mapreduce::checkpoint::VariableState;
VariableState {
workflow_variables: HashMap::new(),
captured_outputs: HashMap::new(),
environment_variables: HashMap::new(),
item_variables: HashMap::new(),
}
}
fn create_resource_state(
max_parallel: usize,
) -> crate::cook::execution::mapreduce::checkpoint::ResourceState {
use crate::cook::execution::mapreduce::checkpoint::ResourceState;
ResourceState {
total_agents_allowed: max_parallel,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
}
}
fn create_error_state() -> crate::cook::execution::mapreduce::checkpoint::ErrorState {
use crate::cook::execution::mapreduce::checkpoint::ErrorState;
ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
}
}
fn create_work_items(items: Vec<Value>) -> Vec<WorkItem> {
items
.into_iter()
.enumerate()
.map(|(i, item)| WorkItem {
id: format!("item_{}", i),
data: item,
})
.collect()
}
fn extract_batch_from_pending(
pending_items: &mut Vec<WorkItem>,
max_size: usize,
) -> Option<Vec<WorkItem>> {
if pending_items.is_empty() {
return None;
}
let batch_size = max_size.min(pending_items.len());
Some(pending_items.drain(..batch_size).collect())
}
fn mark_items_in_progress(
in_progress_items: &mut HashMap<String, WorkItemProgress>,
batch: &[WorkItem],
) {
let now = Utc::now();
for item in batch {
in_progress_items.insert(item.id.clone(), create_work_item_progress(item, &now));
}
}
fn create_work_item_progress(item: &WorkItem, now: &DateTime<Utc>) -> WorkItemProgress {
WorkItemProgress {
work_item: item.clone(),
agent_id: format!("agent_{}", item.id),
started_at: *now,
last_update: *now,
}
}
fn simulate_batch_processing(batch: Vec<WorkItem>) -> Vec<AgentResult> {
batch
.into_iter()
.map(|item| create_success_result(&item))
.collect()
}
fn create_success_result(item: &WorkItem) -> AgentResult {
AgentResult {
item_id: item.id.clone(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Success,
output: Some(format!("Processed {}", item.id)),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
}
}
fn update_work_item_status(
checkpoint: &mut Checkpoint,
result: &AgentResult,
progress: WorkItemProgress,
) {
use crate::cook::execution::mapreduce::agent::AgentStatus;
match &result.status {
AgentStatus::Success => {
handle_success_status(checkpoint, result, progress);
}
AgentStatus::Failed(_) | AgentStatus::Timeout => {
handle_failure_status(checkpoint, result, progress);
}
AgentStatus::Pending | AgentStatus::Running | AgentStatus::Retrying(_) => {
checkpoint
.work_item_state
.in_progress_items
.insert(result.item_id.clone(), progress);
}
}
}
fn handle_success_status(
checkpoint: &mut Checkpoint,
result: &AgentResult,
progress: WorkItemProgress,
) {
use crate::cook::execution::mapreduce::checkpoint::CompletedWorkItem;
checkpoint
.work_item_state
.completed_items
.push(CompletedWorkItem {
work_item: progress.work_item,
result: result.clone(),
completed_at: Utc::now(),
});
checkpoint.metadata.completed_items += 1;
}
fn handle_failure_status(
checkpoint: &mut Checkpoint,
result: &AgentResult,
progress: WorkItemProgress,
) {
use crate::cook::execution::mapreduce::checkpoint::FailedWorkItem;
checkpoint
.work_item_state
.failed_items
.push(FailedWorkItem {
work_item: progress.work_item,
error: result.error.clone().unwrap_or_default(),
failed_at: Utc::now(),
retry_count: 0,
});
checkpoint.error_state.error_count += 1;
}
fn extract_agent_results(checkpoint: &Checkpoint) -> Vec<AgentResult> {
checkpoint
.agent_state
.agent_results
.values()
.cloned()
.collect()
}
async fn restore_work_items(
current_checkpoint: &Arc<RwLock<Option<Checkpoint>>>,
work_items: WorkItemState,
) {
if let Some(ref mut checkpoint) = *current_checkpoint.write().await {
checkpoint.work_item_state = work_items;
}
}
async fn execute_reduce_if_needed(
coordinator: &CheckpointedCoordinator,
reduce: Option<ReducePhase>,
results: &[AgentResult],
env: &ExecutionEnvironment,
) -> Result<()> {
if let Some(reduce_phase) = reduce {
coordinator
.execute_reduce_with_checkpoint(reduce_phase, results, env)
.await?;
}
Ok(())
}
fn update_checkpoint_to_map_phase(checkpoint: &mut Checkpoint) {
checkpoint.metadata.phase = PhaseType::Map;
checkpoint.execution_state.current_phase = PhaseType::Map;
}
#[allow(dead_code)] fn should_checkpoint_based_on_items(items_processed: usize, config: &CheckpointConfig) -> bool {
items_processed >= config.interval_items.unwrap_or(10)
}
#[allow(dead_code)]
fn validate_checkpoint_state(checkpoint: &Checkpoint, expected_phase: PhaseType) -> bool {
checkpoint.metadata.phase == expected_phase
&& checkpoint.execution_state.current_phase == expected_phase
}
#[allow(dead_code)]
fn calculate_batch_size(
max_parallel: usize,
remaining_items: usize,
config_batch_size: Option<usize>,
) -> usize {
let max = config_batch_size.unwrap_or(max_parallel);
max.min(remaining_items)
}
#[allow(dead_code)]
fn prepare_work_items(items: Vec<Value>, offset: usize) -> (Vec<WorkItem>, usize) {
let total = items.len();
let work_items = items
.into_iter()
.enumerate()
.map(|(i, item)| WorkItem {
id: format!("item_{}", offset + i),
data: item,
})
.collect();
(work_items, total)
}
#[allow(dead_code)]
fn update_phase_metadata(
checkpoint: &mut Checkpoint,
phase: PhaseType,
total_items: Option<usize>,
) {
checkpoint.metadata.phase = phase;
checkpoint.execution_state.current_phase = phase;
checkpoint.execution_state.phase_start_time = Utc::now();
if let Some(count) = total_items {
checkpoint.metadata.total_work_items = count;
}
}
#[allow(dead_code)]
fn process_work_batch(batch: Vec<WorkItem>, base_duration_secs: u64) -> Vec<AgentResult> {
batch
.into_iter()
.map(|item| AgentResult {
item_id: item.id.clone(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Success,
output: Some(format!("Processed {}", item.id)),
commits: vec![],
duration: Duration::from_secs(base_duration_secs),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
})
.collect()
}
#[allow(dead_code)]
fn aggregate_batch_results(all_results: &[AgentResult]) -> (usize, usize, u64) {
let successful = all_results
.iter()
.filter(|r| {
matches!(
r.status,
crate::cook::execution::mapreduce::agent::AgentStatus::Success
)
})
.count();
let failed = all_results
.iter()
.filter(|r| {
!matches!(
r.status,
crate::cook::execution::mapreduce::agent::AgentStatus::Success
)
})
.count();
let total_duration = all_results.iter().map(|r| r.duration.as_secs()).sum();
(successful, failed, total_duration)
}
#[allow(dead_code)]
fn update_checkpoint_progress(current_completed: usize, batch_results: &[AgentResult]) -> usize {
let successful_in_batch = batch_results
.iter()
.filter(|r| {
matches!(
r.status,
crate::cook::execution::mapreduce::agent::AgentStatus::Success
)
})
.count();
current_completed + successful_in_batch
}
#[allow(dead_code)]
fn handle_batch_completion(
items_since_checkpoint: usize,
batch_size: usize,
checkpoint_interval: usize,
) -> (bool, usize) {
let new_count = items_since_checkpoint + batch_size;
let should_checkpoint = new_count >= checkpoint_interval;
let reset_count = if should_checkpoint { 0 } else { new_count };
(should_checkpoint, reset_count)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[tokio::test]
async fn test_get_next_batch_empty() {
let temp_dir = tempfile::TempDir::new().unwrap();
let checkpoint_path = temp_dir.path().to_path_buf();
let job_id = "test-empty-batch";
let config = CheckpointConfig::default();
let storage = Box::new(FileCheckpointStorage::new(checkpoint_path.clone(), true));
let _checkpoint_manager =
Arc::new(CheckpointManager::new(storage, config, job_id.to_string()));
let current_checkpoint: Arc<RwLock<Option<Checkpoint>>> =
Arc::new(RwLock::new(Some(Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: job_id.to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Map,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Map,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
})));
let mut checkpoint = current_checkpoint.write().await;
if let Some(ref mut cp) = *checkpoint {
let pending_count = cp.work_item_state.pending_items.len();
assert_eq!(pending_count, 0);
}
}
#[tokio::test]
async fn test_checkpoint_state_updates() {
let work_items = vec![
WorkItem {
id: "item_0".to_string(),
data: serde_json::json!({"test": "data1"}),
},
WorkItem {
id: "item_1".to_string(),
data: serde_json::json!({"test": "data2"}),
},
];
let mut work_item_state = WorkItemState {
pending_items: work_items,
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
};
let batch_size = 2;
let batch: Vec<WorkItem> = work_item_state.pending_items.drain(..batch_size).collect();
assert_eq!(batch.len(), 2);
assert_eq!(work_item_state.pending_items.len(), 0);
for item in &batch {
work_item_state.in_progress_items.insert(
item.id.clone(),
WorkItemProgress {
work_item: item.clone(),
agent_id: format!("agent_{}", item.id),
started_at: Utc::now(),
last_update: Utc::now(),
},
);
}
assert_eq!(work_item_state.in_progress_items.len(), 2);
}
#[tokio::test]
async fn test_checkpoint_decision_logic() {
let config = CheckpointConfig::default();
let items_since_last = 0;
let _last_time = Utc::now();
assert!(!should_checkpoint_based_on_items(items_since_last, &config));
let items_since_last = config.interval_items.unwrap_or(10);
assert!(should_checkpoint_based_on_items(items_since_last, &config));
}
#[test]
fn test_create_work_items_normal_case() {
let items = vec![
serde_json::json!({"id": 1, "data": "test1"}),
serde_json::json!({"id": 2, "data": "test2"}),
serde_json::json!({"id": 3, "data": "test3"}),
];
let work_items = create_work_items(items);
assert_eq!(work_items.len(), 3);
assert_eq!(work_items[0].id, "item_0");
assert_eq!(work_items[1].id, "item_1");
assert_eq!(work_items[2].id, "item_2");
assert_eq!(work_items[0].data["id"], 1);
assert_eq!(work_items[1].data["data"], "test2");
}
#[test]
fn test_create_work_items_empty() {
let items: Vec<serde_json::Value> = vec![];
let work_items = create_work_items(items);
assert_eq!(work_items.len(), 0);
}
#[test]
fn test_create_work_items_single_item() {
let items = vec![serde_json::json!({"test": "single"})];
let work_items = create_work_items(items);
assert_eq!(work_items.len(), 1);
assert_eq!(work_items[0].id, "item_0");
assert_eq!(work_items[0].data["test"], "single");
}
#[test]
fn test_create_work_items_id_formatting() {
let items: Vec<serde_json::Value> =
(0..15).map(|i| serde_json::json!({"index": i})).collect();
let work_items = create_work_items(items);
assert_eq!(work_items.len(), 15);
assert_eq!(work_items[0].id, "item_0");
assert_eq!(work_items[9].id, "item_9");
assert_eq!(work_items[14].id, "item_14");
}
#[test]
fn test_update_checkpoint_to_map_phase() {
let mut checkpoint = Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Setup,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Setup,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
};
update_checkpoint_to_map_phase(&mut checkpoint);
assert_eq!(checkpoint.metadata.phase, PhaseType::Map);
assert_eq!(checkpoint.execution_state.current_phase, PhaseType::Map);
}
#[test]
fn test_update_checkpoint_to_map_phase_from_different_phases() {
let mut checkpoint = Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Reduce,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Reduce,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
};
update_checkpoint_to_map_phase(&mut checkpoint);
assert_eq!(checkpoint.metadata.phase, PhaseType::Map);
assert_eq!(checkpoint.execution_state.current_phase, PhaseType::Map);
}
#[test]
fn test_update_checkpoint_preserves_other_fields() {
let original_job_id = "test-job-123".to_string();
let original_total_items = 42;
let mut checkpoint = Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: original_job_id.clone(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Setup,
total_work_items: original_total_items,
completed_items: 10,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Setup,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
};
update_checkpoint_to_map_phase(&mut checkpoint);
assert_eq!(checkpoint.metadata.phase, PhaseType::Map);
assert_eq!(checkpoint.execution_state.current_phase, PhaseType::Map);
assert_eq!(checkpoint.metadata.job_id, original_job_id);
assert_eq!(checkpoint.metadata.total_work_items, original_total_items);
assert_eq!(checkpoint.metadata.completed_items, 10);
}
#[test]
fn test_validate_checkpoint_state() {
let checkpoint = Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Map,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Map,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
};
assert!(
validate_checkpoint_state(&checkpoint, PhaseType::Map),
"Should validate when phases match"
);
assert!(
!validate_checkpoint_state(&checkpoint, PhaseType::Setup),
"Should not validate when phases don't match"
);
assert!(
!validate_checkpoint_state(&checkpoint, PhaseType::Reduce),
"Should not validate for wrong phase"
);
}
#[test]
fn test_calculate_batch_size() {
assert_eq!(
calculate_batch_size(10, 25, None),
10,
"Should use max_parallel when no config"
);
assert_eq!(
calculate_batch_size(10, 25, Some(5)),
5,
"Should use configured batch size"
);
assert_eq!(
calculate_batch_size(10, 3, None),
3,
"Should limit to remaining items"
);
assert_eq!(
calculate_batch_size(10, 2, Some(5)),
2,
"Should limit to remaining items even with config"
);
assert_eq!(
calculate_batch_size(10, 0, None),
0,
"Should handle zero items"
);
assert_eq!(
calculate_batch_size(10, 100, Some(50)),
50,
"Should use larger configured batch"
);
}
#[test]
fn test_prepare_work_items() {
let items = vec![
serde_json::json!({"data": "test1"}),
serde_json::json!({"data": "test2"}),
serde_json::json!({"data": "test3"}),
];
let (work_items, total) = prepare_work_items(items.clone(), 0);
assert_eq!(total, 3);
assert_eq!(work_items.len(), 3);
assert_eq!(work_items[0].id, "item_0");
assert_eq!(work_items[1].id, "item_1");
assert_eq!(work_items[2].id, "item_2");
let (work_items, total) = prepare_work_items(items.clone(), 100);
assert_eq!(total, 3);
assert_eq!(work_items[0].id, "item_100");
assert_eq!(work_items[1].id, "item_101");
assert_eq!(work_items[2].id, "item_102");
let (work_items, total) = prepare_work_items(vec![], 0);
assert_eq!(total, 0);
assert_eq!(work_items.len(), 0);
}
#[test]
fn test_update_phase_metadata() {
let mut checkpoint = Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Setup,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Setup,
phase_start_time: Utc::now()
.checked_sub_signed(chrono::Duration::seconds(100))
.unwrap(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
};
let old_time = checkpoint.execution_state.phase_start_time;
update_phase_metadata(&mut checkpoint, PhaseType::Map, Some(42));
assert_eq!(checkpoint.metadata.phase, PhaseType::Map);
assert_eq!(checkpoint.execution_state.current_phase, PhaseType::Map);
assert_eq!(checkpoint.metadata.total_work_items, 42);
assert!(checkpoint.execution_state.phase_start_time > old_time);
update_phase_metadata(&mut checkpoint, PhaseType::Reduce, None);
assert_eq!(checkpoint.metadata.phase, PhaseType::Reduce);
assert_eq!(checkpoint.execution_state.current_phase, PhaseType::Reduce);
assert_eq!(checkpoint.metadata.total_work_items, 42); }
#[test]
fn test_process_work_batch() {
let batch = vec![
WorkItem {
id: "item_1".to_string(),
data: serde_json::json!({"test": "data1"}),
},
WorkItem {
id: "item_2".to_string(),
data: serde_json::json!({"test": "data2"}),
},
];
let results = process_work_batch(batch, 2);
assert_eq!(results.len(), 2);
assert_eq!(results[0].item_id, "item_1");
assert_eq!(results[1].item_id, "item_2");
for result in &results {
assert!(matches!(
result.status,
crate::cook::execution::mapreduce::agent::AgentStatus::Success
));
assert_eq!(result.duration.as_secs(), 2);
assert!(result.output.is_some());
}
let empty_results = process_work_batch(vec![], 1);
assert_eq!(empty_results.len(), 0);
}
#[test]
fn test_aggregate_batch_results() {
let results = vec![
AgentResult {
item_id: "item_1".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Success,
output: Some("test".to_string()),
commits: vec![],
duration: Duration::from_secs(5),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
AgentResult {
item_id: "item_2".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Failed(
"error".to_string(),
),
output: None,
commits: vec![],
duration: Duration::from_secs(3),
error: Some("error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
AgentResult {
item_id: "item_3".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Success,
output: Some("test".to_string()),
commits: vec![],
duration: Duration::from_secs(4),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
];
let (successful, failed, total_duration) = aggregate_batch_results(&results);
assert_eq!(successful, 2);
assert_eq!(failed, 1);
assert_eq!(total_duration, 12);
let (s, f, d) = aggregate_batch_results(&[]);
assert_eq!(s, 0);
assert_eq!(f, 0);
assert_eq!(d, 0);
}
#[test]
fn test_update_checkpoint_progress() {
let results = vec![
AgentResult {
item_id: "item_1".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Success,
output: Some("test".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
AgentResult {
item_id: "item_2".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Failed(
"error".to_string(),
),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
AgentResult {
item_id: "item_3".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Success,
output: Some("test".to_string()),
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
];
let new_completed = update_checkpoint_progress(10, &results);
assert_eq!(new_completed, 12);
let failed_results = vec![AgentResult {
item_id: "item_f".to_string(),
status: crate::cook::execution::mapreduce::agent::AgentStatus::Failed(
"error".to_string(),
),
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: Some("error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
}];
let no_change = update_checkpoint_progress(10, &failed_results);
assert_eq!(no_change, 10);
let empty_update = update_checkpoint_progress(5, &[]);
assert_eq!(empty_update, 5);
}
#[test]
fn test_handle_batch_completion() {
let (should_checkpoint, new_count) = handle_batch_completion(5, 3, 10);
assert!(!should_checkpoint);
assert_eq!(new_count, 8);
let (should_checkpoint, new_count) = handle_batch_completion(7, 3, 10);
assert!(should_checkpoint);
assert_eq!(new_count, 0);
let (should_checkpoint, new_count) = handle_batch_completion(8, 5, 10);
assert!(should_checkpoint);
assert_eq!(new_count, 0);
let (should_checkpoint, new_count) = handle_batch_completion(5, 0, 10);
assert!(!should_checkpoint);
assert_eq!(new_count, 5);
let (should_checkpoint, new_count) = handle_batch_completion(0, 1, 1);
assert!(should_checkpoint);
assert_eq!(new_count, 0);
}
#[test]
fn test_should_checkpoint_based_on_items_below_threshold() {
let config = CheckpointConfig {
interval_items: Some(10),
..Default::default()
};
assert!(
!should_checkpoint_based_on_items(5, &config),
"Should not checkpoint with 5 items when threshold is 10"
);
}
#[test]
fn test_should_checkpoint_based_on_items_at_threshold() {
let config = CheckpointConfig {
interval_items: Some(10),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(10, &config),
"Should checkpoint with 10 items when threshold is 10"
);
}
#[test]
fn test_should_checkpoint_based_on_items_above_threshold() {
let config = CheckpointConfig {
interval_items: Some(10),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(15, &config),
"Should checkpoint with 15 items when threshold is 10"
);
}
#[test]
fn test_should_checkpoint_based_on_items_zero_threshold() {
let config = CheckpointConfig {
interval_items: Some(0),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(0, &config),
"Should checkpoint immediately with 0 threshold"
);
assert!(
should_checkpoint_based_on_items(1, &config),
"Should checkpoint with any items when threshold is 0"
);
}
#[test]
fn test_should_checkpoint_based_on_items_none_config() {
let config = CheckpointConfig {
interval_items: None,
..Default::default()
};
assert!(
!should_checkpoint_based_on_items(5, &config),
"Should not checkpoint with 5 items when using default"
);
assert!(
should_checkpoint_based_on_items(10, &config),
"Should checkpoint with 10 items when using default"
);
}
#[test]
fn test_should_checkpoint_based_on_items_various_thresholds() {
let config1 = CheckpointConfig {
interval_items: Some(1),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(1, &config1),
"Should checkpoint after each item with threshold 1"
);
let config2 = CheckpointConfig {
interval_items: Some(50),
..Default::default()
};
assert!(
!should_checkpoint_based_on_items(49, &config2),
"Should not checkpoint at 49 with threshold 50"
);
assert!(
should_checkpoint_based_on_items(50, &config2),
"Should checkpoint at 50 with threshold 50"
);
let config3 = CheckpointConfig {
interval_items: Some(100),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(150, &config3),
"Should checkpoint well above threshold"
);
}
#[tokio::test]
async fn test_execute_map_phase_state_updates() {
let temp_dir = tempfile::TempDir::new().unwrap();
let checkpoint_path = temp_dir.path().to_path_buf();
let job_id = "test-phase-updates".to_string();
let config = CheckpointConfig::default();
let storage = Box::new(FileCheckpointStorage::new(checkpoint_path.clone(), true));
let _checkpoint_manager = Arc::new(CheckpointManager::new(storage, config, job_id.clone()));
let current_checkpoint: Arc<RwLock<Option<Checkpoint>>> = Arc::new(RwLock::new(None));
*current_checkpoint.write().await = Some(Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: job_id.clone(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Setup,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Setup,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
});
{
let mut checkpoint = current_checkpoint.write().await;
if let Some(ref mut cp) = *checkpoint {
cp.metadata.phase = PhaseType::Map;
cp.execution_state.current_phase = PhaseType::Map;
}
}
let checkpoint = current_checkpoint.read().await;
assert!(checkpoint.is_some());
if let Some(ref cp) = *checkpoint {
assert_eq!(cp.metadata.phase, PhaseType::Map);
assert_eq!(cp.execution_state.current_phase, PhaseType::Map);
}
}
#[tokio::test]
async fn test_work_items_loaded_to_checkpoint() {
let work_items_data = vec![
serde_json::json!({"id": 1, "data": "test1"}),
serde_json::json!({"id": 2, "data": "test2"}),
serde_json::json!({"id": 3, "data": "test3"}),
];
let work_items: Vec<WorkItem> = work_items_data
.into_iter()
.enumerate()
.map(|(i, item)| WorkItem {
id: format!("item_{}", i),
data: item,
})
.collect();
assert_eq!(work_items.len(), 3);
assert_eq!(work_items[0].id, "item_0");
assert_eq!(work_items[1].id, "item_1");
assert_eq!(work_items[2].id, "item_2");
assert_eq!(work_items[0].data["id"], 1);
assert_eq!(work_items[1].data["data"], "test2");
}
#[tokio::test]
async fn test_batch_processing_loop() {
let pending_items = (0..5)
.map(|i| WorkItem {
id: format!("item_{}", i),
data: serde_json::json!({"test": format!("data{}", i)}),
})
.collect::<Vec<WorkItem>>();
let max_parallel = 2;
let mut remaining = pending_items;
let mut all_processed = Vec::new();
let mut batch_count = 0;
while !remaining.is_empty() {
let batch_size = max_parallel.min(remaining.len());
let batch: Vec<WorkItem> = remaining.drain(..batch_size).collect();
assert!(batch.len() <= max_parallel);
all_processed.extend(batch);
batch_count += 1;
}
assert_eq!(all_processed.len(), 5, "Should process all 5 items");
assert_eq!(batch_count, 3, "Should require 3 batches (2+2+1)");
assert_eq!(remaining.len(), 0, "Should have no remaining items");
}
#[tokio::test]
async fn test_checkpoint_time_tracking() {
let last_checkpoint_time = Arc::new(RwLock::new(Utc::now()));
*last_checkpoint_time.write().await = Utc::now();
let last_time = *last_checkpoint_time.read().await;
let now = Utc::now();
let diff = now.signed_duration_since(last_time);
assert!(
diff.num_seconds() < 1,
"Checkpoint time should be very recent"
);
}
#[tokio::test]
async fn test_batch_processing_with_checkpoint_triggering() {
let items_since_checkpoint = Arc::new(RwLock::new(0));
let config = CheckpointConfig {
interval_items: Some(10),
..Default::default()
};
let checkpoint_interval = config.interval_items.unwrap_or(10);
let total_items = 25;
let batch_size = 5;
let mut checkpoints_saved = 0;
for batch_num in 0..(total_items / batch_size) {
*items_since_checkpoint.write().await += batch_size;
let items_count = *items_since_checkpoint.read().await;
if items_count >= checkpoint_interval {
checkpoints_saved += 1;
*items_since_checkpoint.write().await = 0;
}
if batch_num == 1 {
assert_eq!(
checkpoints_saved, 1,
"Should save checkpoint after 10 items"
);
assert_eq!(
*items_since_checkpoint.read().await,
0,
"Counter should reset after checkpoint"
);
}
}
assert_eq!(
checkpoints_saved, 2,
"Should have saved 2 checkpoints (at 10 and 20 items)"
);
}
#[tokio::test]
async fn test_batch_processing_without_intermediate_checkpoints() {
let items_since_checkpoint = Arc::new(RwLock::new(0));
let config = CheckpointConfig {
interval_items: Some(100), ..Default::default()
};
let checkpoint_interval = config.interval_items.unwrap_or(10);
let batches = 5;
let batch_size = 5;
let mut checkpoints_saved = 0;
for _ in 0..batches {
*items_since_checkpoint.write().await += batch_size;
let items_count = *items_since_checkpoint.read().await;
if items_count >= checkpoint_interval {
checkpoints_saved += 1;
*items_since_checkpoint.write().await = 0;
}
}
assert_eq!(
checkpoints_saved, 0,
"Should not save checkpoints when threshold not reached"
);
assert_eq!(
*items_since_checkpoint.read().await,
25,
"Counter should accumulate without reset"
);
}
#[tokio::test]
async fn test_should_checkpoint_interval_logic() {
let config = CheckpointConfig {
interval_items: Some(10),
interval_duration: None, ..Default::default()
};
let items_processed = 5;
assert!(
!should_checkpoint_based_on_items(items_processed, &config),
"Should not checkpoint with only 5 items processed"
);
let items_processed = 10;
assert!(
should_checkpoint_based_on_items(items_processed, &config),
"Should checkpoint with 10 items processed"
);
let items_processed = 15;
assert!(
should_checkpoint_based_on_items(items_processed, &config),
"Should checkpoint with 15 items processed"
);
let items_processed = 10;
assert!(
should_checkpoint_based_on_items(items_processed, &config),
"Should checkpoint exactly at threshold"
);
}
#[tokio::test]
async fn test_items_counter_reset_after_checkpoint() {
let items_since_checkpoint = Arc::new(RwLock::new(0));
*items_since_checkpoint.write().await = 15;
assert_eq!(*items_since_checkpoint.read().await, 15);
*items_since_checkpoint.write().await = 0;
assert_eq!(
*items_since_checkpoint.read().await,
0,
"Counter should be reset to 0"
);
*items_since_checkpoint.write().await = 5;
assert_eq!(
*items_since_checkpoint.read().await,
5,
"Counter should accumulate from 0"
);
}
#[tokio::test]
async fn test_empty_work_items() {
let work_items_data: Vec<serde_json::Value> = vec![];
let work_items: Vec<WorkItem> = work_items_data
.into_iter()
.enumerate()
.map(|(i, item)| WorkItem {
id: format!("item_{}", i),
data: item,
})
.collect();
assert_eq!(work_items.len(), 0, "Should handle empty work items");
let total_items = work_items.len();
assert_eq!(total_items, 0);
}
#[tokio::test]
async fn test_checkpoint_state_defensive_none_handling() {
let current_checkpoint: Arc<RwLock<Option<Checkpoint>>> = Arc::new(RwLock::new(None));
let checkpoint = current_checkpoint.read().await;
assert!(
checkpoint.is_none(),
"Should handle None checkpoint gracefully"
);
if checkpoint.is_some() {
panic!("Should not have a checkpoint");
}
}
#[tokio::test]
async fn test_get_next_batch_returns_none_when_empty() {
let current_checkpoint: Arc<RwLock<Option<Checkpoint>>> =
Arc::new(RwLock::new(Some(Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Map,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Map,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![], in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
})));
let checkpoint = current_checkpoint.read().await;
let batch = if let Some(ref cp) = *checkpoint {
if cp.work_item_state.pending_items.is_empty() {
None
} else {
Some(cp.work_item_state.pending_items.clone())
}
} else {
None
};
assert!(batch.is_none(), "Should return None when no items remain");
}
#[tokio::test]
async fn test_checkpoint_interval_edge_cases() {
let config = CheckpointConfig {
interval_items: Some(0),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(0, &config),
"Should checkpoint immediately with interval_items = 0"
);
let config = CheckpointConfig {
interval_items: Some(1),
..Default::default()
};
assert!(
should_checkpoint_based_on_items(1, &config),
"Should checkpoint after 1 item"
);
assert!(
!should_checkpoint_based_on_items(0, &config),
"Should not checkpoint with 0 items"
);
let config = CheckpointConfig {
interval_items: None,
..Default::default()
};
assert!(
should_checkpoint_based_on_items(10, &config),
"Should use default threshold of 10"
);
}
#[tokio::test]
async fn test_large_batch_processing() {
let total_items = 1000;
let max_parallel = 50;
let mut pending_items: Vec<WorkItem> = (0..total_items)
.map(|i| WorkItem {
id: format!("item_{}", i),
data: serde_json::json!({"index": i}),
})
.collect();
let mut batch_count = 0;
let mut total_processed = 0;
while !pending_items.is_empty() {
let batch_size = max_parallel.min(pending_items.len());
let _batch: Vec<WorkItem> = pending_items.drain(..batch_size).collect();
total_processed += batch_size;
batch_count += 1;
}
assert_eq!(total_processed, total_items, "Should process all items");
assert_eq!(batch_count, 20, "Should require 20 batches (50 items each)");
}
#[tokio::test]
async fn test_checkpoint_phase_transition_to_map() {
let mut checkpoint = Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Setup,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Setup,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
};
update_checkpoint_to_map_phase(&mut checkpoint);
assert_eq!(
checkpoint.metadata.phase,
PhaseType::Map,
"Metadata phase should be updated to Map"
);
assert_eq!(
checkpoint.execution_state.current_phase,
PhaseType::Map,
"Execution state phase should be updated to Map"
);
}
#[tokio::test]
async fn test_work_items_enumeration_and_checkpoint_update() {
let work_items_data = vec![
serde_json::json!({"id": 1, "data": "test1"}),
serde_json::json!({"id": 2, "data": "test2"}),
serde_json::json!({"id": 3, "data": "test3"}),
];
let total_items = work_items_data.len();
let work_items = create_work_items(work_items_data);
assert_eq!(work_items.len(), 3);
assert_eq!(work_items[0].id, "item_0");
assert_eq!(work_items[1].id, "item_1");
assert_eq!(work_items[2].id, "item_2");
let current_checkpoint: Arc<RwLock<Option<Checkpoint>>> =
Arc::new(RwLock::new(Some(Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Map,
total_work_items: 0, completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Map,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
})));
{
let mut checkpoint = current_checkpoint.write().await;
if let Some(ref mut cp) = *checkpoint {
cp.metadata.total_work_items = total_items;
cp.work_item_state.pending_items = work_items.clone();
}
}
let checkpoint = current_checkpoint.read().await;
if let Some(ref cp) = *checkpoint {
assert_eq!(cp.metadata.total_work_items, 3);
assert_eq!(cp.work_item_state.pending_items.len(), 3);
assert_eq!(cp.work_item_state.pending_items[0].id, "item_0");
}
}
#[tokio::test]
async fn test_empty_work_items_handling() {
let work_items_data: Vec<serde_json::Value> = vec![];
let total_items = work_items_data.len();
let work_items = create_work_items(work_items_data);
assert_eq!(work_items.len(), 0, "Should handle empty work items");
assert_eq!(total_items, 0, "Total items should be 0");
let current_checkpoint: Arc<RwLock<Option<Checkpoint>>> =
Arc::new(RwLock::new(Some(Checkpoint {
metadata: crate::cook::execution::mapreduce::checkpoint::CheckpointMetadata {
checkpoint_id: String::new(),
job_id: "test".to_string(),
version: 1,
created_at: Utc::now(),
phase: PhaseType::Map,
total_work_items: 0,
completed_items: 0,
checkpoint_reason: CheckpointReason::Manual,
integrity_hash: String::new(),
},
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: PhaseType::Map,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: std::collections::HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: vec![],
in_progress_items: std::collections::HashMap::new(),
completed_items: vec![],
failed_items: vec![],
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: std::collections::HashMap::new(),
agent_assignments: std::collections::HashMap::new(),
agent_results: std::collections::HashMap::new(),
resource_allocation: std::collections::HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: std::collections::HashMap::new(),
captured_outputs: std::collections::HashMap::new(),
environment_variables: std::collections::HashMap::new(),
item_variables: std::collections::HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 10,
current_agents_active: 0,
worktrees_created: vec![],
worktrees_cleaned: vec![],
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: vec![],
error_threshold_reached: false,
last_error: None,
},
})));
{
let mut checkpoint = current_checkpoint.write().await;
if let Some(ref mut cp) = *checkpoint {
cp.metadata.total_work_items = total_items;
cp.work_item_state.pending_items = work_items;
}
}
let checkpoint = current_checkpoint.read().await;
if let Some(ref cp) = *checkpoint {
assert_eq!(cp.metadata.total_work_items, 0);
assert_eq!(cp.work_item_state.pending_items.len(), 0);
}
}
}