use crate::cook::execution::mapreduce::checkpoint::{
CheckpointMetadata, CheckpointReason, MapReduceCheckpoint, PhaseType, WorkItemState,
};
use chrono::Utc;
use sha2::{Digest, Sha256};
pub fn prepare_checkpoint(
state: &MapReduceCheckpoint,
reason: CheckpointReason,
) -> MapReduceCheckpoint {
let mut checkpoint = state.clone();
checkpoint.metadata.checkpoint_id = format!("cp-{}", uuid::Uuid::new_v4());
checkpoint.metadata.created_at = Utc::now();
checkpoint.metadata.checkpoint_reason = reason;
reset_in_progress_items(&mut checkpoint.work_item_state);
checkpoint.metadata.completed_items = checkpoint.work_item_state.completed_items.len();
checkpoint.metadata.integrity_hash = calculate_integrity_hash(&checkpoint);
checkpoint
}
pub fn reset_in_progress_items(work_item_state: &mut WorkItemState) {
for (_, progress) in work_item_state.in_progress_items.drain() {
work_item_state.pending_items.push(progress.work_item);
}
}
pub fn calculate_integrity_hash(checkpoint: &MapReduceCheckpoint) -> String {
let mut hasher = Sha256::new();
hasher.update(checkpoint.metadata.job_id.as_bytes());
hasher.update(checkpoint.metadata.version.to_string().as_bytes());
hasher.update(format!("{:?}", checkpoint.metadata.phase).as_bytes());
hasher.update(checkpoint.metadata.total_work_items.to_string().as_bytes());
hasher.update(checkpoint.metadata.completed_items.to_string().as_bytes());
hasher.update(
checkpoint
.work_item_state
.completed_items
.len()
.to_string()
.as_bytes(),
);
hasher.update(
checkpoint
.work_item_state
.failed_items
.len()
.to_string()
.as_bytes(),
);
hasher.update(
checkpoint
.work_item_state
.pending_items
.len()
.to_string()
.as_bytes(),
);
format!("{:x}", hasher.finalize())
}
pub fn verify_integrity(checkpoint: &MapReduceCheckpoint) -> bool {
let calculated = calculate_integrity_hash(checkpoint);
calculated == checkpoint.metadata.integrity_hash
}
pub fn create_initial_checkpoint(
job_id: &str,
total_items: usize,
phase: PhaseType,
) -> MapReduceCheckpoint {
use std::collections::HashMap;
MapReduceCheckpoint {
metadata: CheckpointMetadata {
checkpoint_id: format!("cp-{}", uuid::Uuid::new_v4()),
job_id: job_id.to_string(),
version: 1,
created_at: Utc::now(),
phase,
total_work_items: total_items,
completed_items: 0,
checkpoint_reason: CheckpointReason::PhaseTransition,
integrity_hash: String::new(), },
execution_state: crate::cook::execution::mapreduce::checkpoint::ExecutionState {
current_phase: phase,
phase_start_time: Utc::now(),
setup_results: None,
map_results: None,
reduce_results: None,
workflow_variables: HashMap::new(),
},
work_item_state: WorkItemState {
pending_items: Vec::new(),
in_progress_items: HashMap::new(),
completed_items: Vec::new(),
failed_items: Vec::new(),
current_batch: None,
},
agent_state: crate::cook::execution::mapreduce::checkpoint::AgentState {
active_agents: HashMap::new(),
agent_assignments: HashMap::new(),
agent_results: HashMap::new(),
resource_allocation: HashMap::new(),
},
variable_state: crate::cook::execution::mapreduce::checkpoint::VariableState {
workflow_variables: HashMap::new(),
captured_outputs: HashMap::new(),
environment_variables: HashMap::new(),
item_variables: HashMap::new(),
},
resource_state: crate::cook::execution::mapreduce::checkpoint::ResourceState {
total_agents_allowed: 0,
current_agents_active: 0,
worktrees_created: Vec::new(),
worktrees_cleaned: Vec::new(),
disk_usage_bytes: None,
},
error_state: crate::cook::execution::mapreduce::checkpoint::ErrorState {
error_count: 0,
dlq_items: Vec::new(),
error_threshold_reached: false,
last_error: None,
},
}
}
pub fn update_completed_count(checkpoint: &mut MapReduceCheckpoint) {
checkpoint.metadata.completed_items = checkpoint.work_item_state.completed_items.len();
}
pub fn update_phase(checkpoint: &mut MapReduceCheckpoint, phase: PhaseType) {
checkpoint.metadata.phase = phase;
checkpoint.execution_state.current_phase = phase;
checkpoint.execution_state.phase_start_time = Utc::now();
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cook::execution::mapreduce::agent::{AgentResult, AgentStatus};
use crate::cook::execution::mapreduce::checkpoint::{
CompletedWorkItem, WorkItem, WorkItemProgress,
};
use serde_json::json;
use std::time::Duration;
fn create_test_checkpoint(job_id: &str) -> MapReduceCheckpoint {
create_initial_checkpoint(job_id, 10, PhaseType::Map)
}
#[test]
fn test_prepare_checkpoint_generates_new_id() {
let checkpoint = create_test_checkpoint("test-job");
let original_id = checkpoint.metadata.checkpoint_id.clone();
let prepared = prepare_checkpoint(&checkpoint, CheckpointReason::Interval);
assert_ne!(prepared.metadata.checkpoint_id, original_id);
assert!(prepared.metadata.checkpoint_id.starts_with("cp-"));
}
#[test]
fn test_prepare_checkpoint_sets_reason() {
let checkpoint = create_test_checkpoint("test-job");
let prepared = prepare_checkpoint(&checkpoint, CheckpointReason::BeforeShutdown);
assert!(matches!(
prepared.metadata.checkpoint_reason,
CheckpointReason::BeforeShutdown
));
}
#[test]
fn test_prepare_checkpoint_resets_in_progress() {
let mut checkpoint = create_test_checkpoint("test-job");
checkpoint.work_item_state.in_progress_items.insert(
"item-1".to_string(),
WorkItemProgress {
work_item: WorkItem {
id: "item-1".to_string(),
data: json!({}),
},
agent_id: "agent-1".to_string(),
started_at: Utc::now(),
last_update: Utc::now(),
},
);
let prepared = prepare_checkpoint(&checkpoint, CheckpointReason::Interval);
assert!(prepared.work_item_state.in_progress_items.is_empty());
assert_eq!(prepared.work_item_state.pending_items.len(), 1);
assert_eq!(prepared.work_item_state.pending_items[0].id, "item-1");
}
#[test]
fn test_integrity_hash_is_deterministic() {
let checkpoint = create_test_checkpoint("test-job");
let hash1 = calculate_integrity_hash(&checkpoint);
let hash2 = calculate_integrity_hash(&checkpoint);
assert_eq!(hash1, hash2);
}
#[test]
fn test_integrity_hash_changes_on_state_change() {
let mut checkpoint = create_test_checkpoint("test-job");
let hash1 = calculate_integrity_hash(&checkpoint);
checkpoint.metadata.completed_items += 1;
let hash2 = calculate_integrity_hash(&checkpoint);
assert_ne!(hash1, hash2);
}
#[test]
fn test_verify_integrity_valid() {
let mut checkpoint = create_test_checkpoint("test-job");
checkpoint.metadata.integrity_hash = calculate_integrity_hash(&checkpoint);
assert!(verify_integrity(&checkpoint));
}
#[test]
fn test_verify_integrity_invalid() {
let mut checkpoint = create_test_checkpoint("test-job");
checkpoint.metadata.integrity_hash = "invalid-hash".to_string();
assert!(!verify_integrity(&checkpoint));
}
#[test]
fn test_update_completed_count() {
let mut checkpoint = create_test_checkpoint("test-job");
checkpoint
.work_item_state
.completed_items
.push(CompletedWorkItem {
work_item: WorkItem {
id: "item-1".to_string(),
data: json!({}),
},
result: AgentResult {
item_id: "item-1".to_string(),
status: AgentStatus::Success,
output: None,
commits: vec![],
duration: Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
completed_at: Utc::now(),
});
update_completed_count(&mut checkpoint);
assert_eq!(checkpoint.metadata.completed_items, 1);
}
#[test]
fn test_update_phase() {
let mut checkpoint = create_test_checkpoint("test-job");
update_phase(&mut checkpoint, PhaseType::Reduce);
assert_eq!(checkpoint.metadata.phase, PhaseType::Reduce);
assert_eq!(checkpoint.execution_state.current_phase, PhaseType::Reduce);
}
}