#[cfg(test)]
use crate::cook::execution::mapreduce::AgentStatus;
use crate::cook::execution::mapreduce::{AgentResult, MapReduceConfig};
use crate::cook::execution::state_pure;
use crate::cook::workflow::WorkflowStep;
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
const MAX_CHECKPOINTS: usize = 3;
const CHECKPOINT_TIMEOUT_MS: u64 = 100;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReducePhaseState {
pub started: bool,
pub completed: bool,
pub executed_commands: Vec<String>,
pub output: Option<String>,
pub error: Option<String>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorktreeInfo {
pub path: PathBuf,
pub name: String,
pub branch: Option<String>,
pub session_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureRecord {
pub item_id: String,
pub attempts: u32,
pub last_error: String,
pub last_attempt: DateTime<Utc>,
pub worktree_info: Option<WorktreeInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MapReduceJobState {
pub job_id: String,
pub config: MapReduceConfig,
pub started_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub work_items: Vec<Value>,
pub agent_results: HashMap<String, AgentResult>,
pub completed_agents: HashSet<String>,
pub failed_agents: HashMap<String, FailureRecord>,
pub pending_items: Vec<String>,
pub checkpoint_version: u32,
#[serde(default = "default_format_version")]
pub checkpoint_format_version: u32,
pub parent_worktree: Option<String>,
pub reduce_phase_state: Option<ReducePhaseState>,
pub total_items: usize,
pub successful_count: usize,
pub failed_count: usize,
pub is_complete: bool,
pub agent_template: Vec<WorkflowStep>,
pub reduce_commands: Option<Vec<WorkflowStep>>,
#[serde(default)]
pub variables: HashMap<String, Value>,
#[serde(default)]
pub setup_output: Option<String>,
#[serde(default)]
pub setup_completed: bool,
#[serde(default)]
pub item_retry_counts: HashMap<String, u32>,
}
fn default_format_version() -> u32 {
1
}
fn to_pure_state(state: &MapReduceJobState) -> state_pure::MapReduceJobState {
let json = serde_json::to_string(state).expect("Failed to serialize state");
serde_json::from_str(&json).expect("Failed to deserialize to pure state")
}
fn from_pure_state(state: state_pure::MapReduceJobState) -> MapReduceJobState {
let json = serde_json::to_string(&state).expect("Failed to serialize pure state");
serde_json::from_str(&json).expect("Failed to deserialize from pure state")
}
fn serialize_state(state: &MapReduceJobState) -> Result<String> {
serde_json::to_string_pretty(state).context("Failed to serialize job state")
}
fn create_checkpoint_metadata(path: PathBuf, version: u32, size_bytes: usize) -> CheckpointInfo {
CheckpointInfo {
path,
version,
created_at: Utc::now(),
size_bytes: size_bytes as u64,
}
}
async fn write_file_atomically(
temp_path: &PathBuf,
final_path: &PathBuf,
data: &[u8],
) -> Result<()> {
use tokio::io::AsyncWriteExt;
let mut file = fs::File::create(temp_path)
.await
.context("Failed to create temporary file")?;
file.write_all(data).await.context("Failed to write data")?;
file.sync_data()
.await
.context("Failed to sync data to disk")?;
drop(file);
fs::rename(temp_path, final_path)
.await
.context("Failed to rename temporary file")
}
fn parse_checkpoint_version(path: &Path) -> Option<u32> {
let name = path.file_name()?.to_str()?;
if !is_checkpoint_file(name) {
return None;
}
extract_version_number(name)
}
fn is_checkpoint_file(name: &str) -> bool {
name.starts_with("checkpoint-v") && name.ends_with(".json")
}
fn extract_version_number(name: &str) -> Option<u32> {
name.strip_prefix("checkpoint-v")
.and_then(|s| s.strip_suffix(".json"))
.and_then(|s| s.parse::<u32>().ok())
}
fn sort_checkpoints_by_version(checkpoints: &mut [CheckpointInfo]) {
checkpoints.sort_by(|a, b| b.version.cmp(&a.version));
}
impl MapReduceJobState {
pub fn new(job_id: String, config: MapReduceConfig, work_items: Vec<Value>) -> Self {
let total_items = work_items.len();
let pending_items: Vec<String> = work_items
.iter()
.enumerate()
.map(|(i, _)| format!("item_{}", i))
.collect();
Self {
job_id,
config,
started_at: Utc::now(),
updated_at: Utc::now(),
work_items,
agent_results: HashMap::new(),
completed_agents: HashSet::new(),
failed_agents: HashMap::new(),
pending_items,
checkpoint_version: 0,
checkpoint_format_version: 1,
parent_worktree: None,
reduce_phase_state: None,
total_items,
successful_count: 0,
failed_count: 0,
is_complete: false,
agent_template: vec![],
reduce_commands: None,
variables: HashMap::new(),
setup_output: None,
setup_completed: false,
item_retry_counts: HashMap::new(),
}
}
pub fn update_agent_result(&mut self, result: AgentResult) {
let pure_state = to_pure_state(self);
let new_pure_state = state_pure::apply_agent_result(pure_state, result);
*self = from_pure_state(new_pure_state);
}
pub fn is_map_phase_complete(&self) -> bool {
let pure_state = to_pure_state(self);
state_pure::is_map_phase_complete(&pure_state)
}
pub fn get_retriable_items(&self, max_retries: u32) -> Vec<String> {
let pure_state = to_pure_state(self);
state_pure::get_retriable_items(&pure_state, max_retries)
}
pub fn start_reduce_phase(&mut self) {
let pure_state = to_pure_state(self);
let new_pure_state = state_pure::start_reduce_phase(pure_state);
*self = from_pure_state(new_pure_state);
}
pub fn complete_reduce_phase(&mut self, output: Option<String>) {
let pure_state = to_pure_state(self);
let new_pure_state = state_pure::complete_reduce_phase(pure_state, output);
*self = from_pure_state(new_pure_state);
}
pub fn mark_complete(&mut self) {
let pure_state = to_pure_state(self);
let new_pure_state = state_pure::mark_complete(pure_state);
*self = from_pure_state(new_pure_state);
}
pub fn find_work_item(&self, item_id: &str) -> Option<Value> {
if let Some(idx) = item_id
.strip_prefix("item_")
.and_then(|s| s.parse::<usize>().ok())
{
if idx < self.work_items.len() {
return Some(self.work_items[idx].clone());
}
}
None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointInfo {
pub path: PathBuf,
pub version: u32,
pub created_at: DateTime<Utc>,
pub size_bytes: u64,
}
pub struct CheckpointManager {
base_dir: PathBuf,
write_lock: RwLock<()>,
}
impl CheckpointManager {
pub fn new(base_dir: PathBuf) -> Self {
Self {
base_dir,
write_lock: RwLock::new(()),
}
}
fn job_dir(&self, job_id: &str) -> PathBuf {
self.base_dir.join("jobs").join(job_id)
}
pub fn jobs_dir(&self) -> PathBuf {
self.base_dir.join("jobs")
}
fn checkpoint_path(&self, job_id: &str, version: u32) -> PathBuf {
self.job_dir(job_id)
.join(format!("checkpoint-v{}.json", version))
}
fn metadata_path(&self, job_id: &str) -> PathBuf {
self.job_dir(job_id).join("metadata.json")
}
pub async fn save_checkpoint(&self, state: &MapReduceJobState) -> Result<()> {
let _lock = self.write_lock.write().await;
let start = std::time::Instant::now();
self.ensure_job_directory(&state.job_id).await?;
let json = serialize_state(state)?;
self.write_checkpoint_file(&state.job_id, state.checkpoint_version, &json)
.await?;
self.write_metadata_file(&state.job_id, state.checkpoint_version, json.len())
.await?;
self.log_checkpoint_timing(&state.job_id, state.checkpoint_version, start.elapsed());
self.cleanup_old_checkpoints(&state.job_id, MAX_CHECKPOINTS)
.await?;
Ok(())
}
async fn ensure_job_directory(&self, job_id: &str) -> Result<()> {
let job_dir = self.job_dir(job_id);
fs::create_dir_all(&job_dir)
.await
.context("Failed to create job directory")
}
async fn write_checkpoint_file(&self, job_id: &str, version: u32, json: &str) -> Result<()> {
let checkpoint_path = self.checkpoint_path(job_id, version);
let temp_path = checkpoint_path.with_extension("tmp");
write_file_atomically(&temp_path, &checkpoint_path, json.as_bytes())
.await
.context("Failed to write checkpoint file")
}
async fn write_metadata_file(
&self,
job_id: &str,
version: u32,
size_bytes: usize,
) -> Result<()> {
let checkpoint_path = self.checkpoint_path(job_id, version);
let metadata = create_checkpoint_metadata(checkpoint_path, version, size_bytes);
let metadata_json = serde_json::to_string_pretty(&metadata)?;
let metadata_path = self.metadata_path(job_id);
let temp_path = metadata_path.with_extension("tmp");
write_file_atomically(&temp_path, &metadata_path, metadata_json.as_bytes())
.await
.context("Failed to write metadata file")
}
fn log_checkpoint_timing(&self, job_id: &str, version: u32, duration: std::time::Duration) {
let duration_ms = duration.as_millis();
if duration_ms > CHECKPOINT_TIMEOUT_MS as u128 {
warn!(
"Checkpoint for job {} took {}ms (exceeds {}ms limit)",
job_id, duration_ms, CHECKPOINT_TIMEOUT_MS
);
} else {
debug!(
"Saved checkpoint v{} for job {} in {}ms",
version, job_id, duration_ms
);
}
}
pub async fn load_checkpoint(&self, job_id: &str) -> Result<MapReduceJobState> {
self.load_checkpoint_by_version(job_id, None).await
}
pub async fn load_checkpoint_by_version(
&self,
job_id: &str,
version: Option<u32>,
) -> Result<MapReduceJobState> {
let checkpoint_path = self.resolve_checkpoint_path(job_id, version).await?;
let state = self.load_and_migrate_checkpoint(&checkpoint_path).await?;
info!(
"Loaded checkpoint v{} for job {} (format v{})",
state.checkpoint_version, job_id, state.checkpoint_format_version
);
Ok(state)
}
async fn resolve_checkpoint_path(&self, job_id: &str, version: Option<u32>) -> Result<PathBuf> {
match version {
Some(v) => self.get_specific_checkpoint_path(job_id, v),
None => self.get_latest_checkpoint_path(job_id).await,
}
}
fn get_specific_checkpoint_path(&self, job_id: &str, version: u32) -> Result<PathBuf> {
let path = self.checkpoint_path(job_id, version);
if !path.exists() {
return Err(anyhow!(
"Checkpoint version {} not found for job {}",
version,
job_id
));
}
Ok(path)
}
async fn get_latest_checkpoint_path(&self, job_id: &str) -> Result<PathBuf> {
let metadata_path = self.metadata_path(job_id);
if !metadata_path.exists() {
return Err(anyhow!("No checkpoint found for job {}", job_id));
}
let metadata_json = fs::read_to_string(&metadata_path)
.await
.context("Failed to read checkpoint metadata")?;
let metadata: CheckpointInfo =
serde_json::from_str(&metadata_json).context("Failed to parse checkpoint metadata")?;
Ok(metadata.path)
}
async fn load_and_migrate_checkpoint(
&self,
checkpoint_path: &PathBuf,
) -> Result<MapReduceJobState> {
let checkpoint_json = fs::read_to_string(checkpoint_path)
.await
.context("Failed to read checkpoint file")?;
let state: MapReduceJobState =
serde_json::from_str(&checkpoint_json).context("Failed to parse checkpoint data")?;
self.migrate_checkpoint(state)
}
fn migrate_checkpoint(&self, mut state: MapReduceJobState) -> Result<MapReduceJobState> {
const CURRENT_FORMAT_VERSION: u32 = 1;
if state.checkpoint_format_version >= CURRENT_FORMAT_VERSION {
return Ok(state);
}
debug!(
"Migrating checkpoint from format v{} to v{}",
state.checkpoint_format_version, CURRENT_FORMAT_VERSION
);
state.checkpoint_format_version = CURRENT_FORMAT_VERSION;
Ok(state)
}
pub async fn list_checkpoints(&self, job_id: &str) -> Result<Vec<CheckpointInfo>> {
let job_dir = self.job_dir(job_id);
if !job_dir.exists() {
return Ok(Vec::new());
}
let mut checkpoints = self.collect_checkpoint_files(&job_dir).await?;
sort_checkpoints_by_version(&mut checkpoints);
Ok(checkpoints)
}
async fn collect_checkpoint_files(&self, job_dir: &PathBuf) -> Result<Vec<CheckpointInfo>> {
let mut checkpoints = Vec::new();
let mut entries = fs::read_dir(job_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if let Some(checkpoint_info) = self.try_parse_checkpoint_entry(entry).await {
checkpoints.push(checkpoint_info);
}
}
Ok(checkpoints)
}
async fn try_parse_checkpoint_entry(&self, entry: fs::DirEntry) -> Option<CheckpointInfo> {
let path = entry.path();
let version = parse_checkpoint_version(&path)?;
let metadata = fs::metadata(&path).await.ok()?;
Some(CheckpointInfo {
path,
version,
created_at: Utc::now(),
size_bytes: metadata.len(),
})
}
pub async fn cleanup_old_checkpoints(&self, job_id: &str, keep: usize) -> Result<()> {
let checkpoints = self.list_checkpoints(job_id).await?;
if checkpoints.len() <= keep {
return Ok(());
}
for checkpoint in &checkpoints[keep..] {
debug!(
"Removing old checkpoint v{} for job {}",
checkpoint.version, job_id
);
if let Err(e) = fs::remove_file(&checkpoint.path).await {
error!(
"Failed to remove old checkpoint {}: {}",
checkpoint.path.display(),
e
);
}
}
Ok(())
}
pub async fn cleanup_job(&self, job_id: &str) -> Result<()> {
let job_dir = self.job_dir(job_id);
if job_dir.exists() {
fs::remove_dir_all(&job_dir)
.await
.context("Failed to remove job directory")?;
info!("Cleaned up all checkpoints for job {}", job_id);
}
Ok(())
}
pub async fn has_checkpoint(&self, job_id: &str) -> bool {
self.metadata_path(job_id).exists()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumableJob {
pub job_id: String,
pub started_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub total_items: usize,
pub completed_items: usize,
pub failed_items: usize,
pub is_complete: bool,
pub checkpoint_version: u32,
}
#[async_trait::async_trait]
pub trait Resumable: Send + Sync {
async fn can_resume(&self, job_id: &str) -> Result<bool>;
async fn list_resumable_jobs(&self) -> Result<Vec<ResumableJob>>;
}
#[async_trait::async_trait]
pub trait JobStateManager: Send + Sync {
async fn create_job(
&self,
config: MapReduceConfig,
work_items: Vec<Value>,
agent_template: Vec<WorkflowStep>,
reduce_commands: Option<Vec<WorkflowStep>>,
) -> Result<String>;
async fn list_resumable_jobs(&self) -> Result<Vec<ResumableJob>>;
async fn update_agent_result(&self, job_id: &str, result: AgentResult) -> Result<()>;
async fn get_job_state(&self, job_id: &str) -> Result<MapReduceJobState>;
async fn get_job_state_from_checkpoint(
&self,
job_id: &str,
checkpoint_version: Option<u32>,
) -> Result<MapReduceJobState>;
async fn resume_job(&self, job_id: &str) -> Result<Vec<AgentResult>>;
async fn cleanup_job(&self, job_id: &str) -> Result<()>;
async fn start_reduce_phase(&self, job_id: &str) -> Result<()>;
async fn complete_reduce_phase(&self, job_id: &str, output: Option<String>) -> Result<()>;
async fn mark_job_complete(&self, job_id: &str) -> Result<()>;
}
pub struct DefaultJobStateManager {
pub checkpoint_manager: CheckpointManager,
active_jobs: RwLock<HashMap<String, MapReduceJobState>>,
#[allow(dead_code)]
project_root: Option<PathBuf>,
}
impl DefaultJobStateManager {
#[allow(deprecated)]
pub fn new(base_dir: PathBuf) -> Self {
Self {
checkpoint_manager: CheckpointManager::new(base_dir),
active_jobs: RwLock::new(HashMap::new()),
project_root: None,
}
}
#[allow(deprecated)]
pub async fn new_with_global(project_root: PathBuf) -> Result<Self> {
use crate::storage::{extract_repo_name, GlobalStorage};
let storage = GlobalStorage::new()?;
let repo_name = extract_repo_name(&project_root)?;
let global_base_dir = storage.get_state_dir(&repo_name, "mapreduce").await?;
Ok(Self {
checkpoint_manager: CheckpointManager::new(global_base_dir),
active_jobs: RwLock::new(HashMap::new()),
project_root: Some(project_root),
})
}
}
#[async_trait::async_trait]
impl JobStateManager for DefaultJobStateManager {
async fn create_job(
&self,
config: MapReduceConfig,
work_items: Vec<Value>,
agent_template: Vec<WorkflowStep>,
reduce_commands: Option<Vec<WorkflowStep>>,
) -> Result<String> {
let job_id = format!("mapreduce-{}", Utc::now().timestamp_millis());
let mut state = MapReduceJobState::new(job_id.clone(), config, work_items);
state.agent_template = agent_template;
state.reduce_commands = reduce_commands;
self.checkpoint_manager.save_checkpoint(&state).await?;
let mut jobs = self.active_jobs.write().await;
jobs.insert(job_id.clone(), state);
Ok(job_id)
}
async fn update_agent_result(&self, job_id: &str, result: AgentResult) -> Result<()> {
let mut jobs = self.active_jobs.write().await;
let state = jobs
.get_mut(job_id)
.ok_or_else(|| anyhow!("Job {} not found", job_id))?;
state.update_agent_result(result);
self.checkpoint_manager.save_checkpoint(state).await?;
Ok(())
}
async fn list_resumable_jobs(&self) -> Result<Vec<ResumableJob>> {
self.list_resumable_jobs_internal().await
}
async fn get_job_state(&self, job_id: &str) -> Result<MapReduceJobState> {
let jobs = self.active_jobs.read().await;
if let Some(state) = jobs.get(job_id) {
return Ok(state.clone());
}
self.checkpoint_manager.load_checkpoint(job_id).await
}
async fn get_job_state_from_checkpoint(
&self,
job_id: &str,
checkpoint_version: Option<u32>,
) -> Result<MapReduceJobState> {
self.checkpoint_manager
.load_checkpoint_by_version(job_id, checkpoint_version)
.await
}
async fn resume_job(&self, job_id: &str) -> Result<Vec<AgentResult>> {
let state = self.checkpoint_manager.load_checkpoint(job_id).await?;
let results: Vec<AgentResult> = state.agent_results.values().cloned().collect();
let mut jobs = self.active_jobs.write().await;
jobs.insert(job_id.to_string(), state);
Ok(results)
}
async fn cleanup_job(&self, job_id: &str) -> Result<()> {
let mut jobs = self.active_jobs.write().await;
jobs.remove(job_id);
self.checkpoint_manager.cleanup_job(job_id).await
}
async fn start_reduce_phase(&self, job_id: &str) -> Result<()> {
let mut jobs = self.active_jobs.write().await;
let state = jobs
.get_mut(job_id)
.ok_or_else(|| anyhow!("Job {} not found", job_id))?;
state.start_reduce_phase();
self.checkpoint_manager.save_checkpoint(state).await?;
Ok(())
}
async fn complete_reduce_phase(&self, job_id: &str, output: Option<String>) -> Result<()> {
let mut jobs = self.active_jobs.write().await;
let state = jobs
.get_mut(job_id)
.ok_or_else(|| anyhow!("Job {} not found", job_id))?;
state.complete_reduce_phase(output);
self.checkpoint_manager.save_checkpoint(state).await?;
Ok(())
}
async fn mark_job_complete(&self, job_id: &str) -> Result<()> {
let mut jobs = self.active_jobs.write().await;
let state = jobs
.get_mut(job_id)
.ok_or_else(|| anyhow!("Job {} not found", job_id))?;
state.mark_complete();
self.checkpoint_manager.save_checkpoint(state).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl Resumable for DefaultJobStateManager {
async fn can_resume(&self, job_id: &str) -> Result<bool> {
match self.checkpoint_manager.load_checkpoint(job_id).await {
Ok(state) => {
Ok(!state.is_complete)
}
Err(_) => Ok(false),
}
}
async fn list_resumable_jobs(&self) -> Result<Vec<ResumableJob>> {
self.list_resumable_jobs_internal().await
}
}
impl DefaultJobStateManager {
async fn ensure_jobs_dir_exists(jobs_dir: &std::path::Path) -> bool {
tokio::fs::metadata(jobs_dir).await.is_ok()
}
async fn is_valid_job_directory(path: &std::path::Path) -> Option<String> {
let metadata = tokio::fs::metadata(path).await.ok()?;
if !metadata.is_dir() {
return None;
}
path.file_name().and_then(|n| n.to_str()).map(String::from)
}
async fn load_job_checkpoint(
checkpoint_manager: &CheckpointManager,
job_id: &str,
) -> Option<MapReduceJobState> {
checkpoint_manager.load_checkpoint(job_id).await.ok()
}
async fn process_job_directory(
path: std::path::PathBuf,
checkpoint_manager: &CheckpointManager,
) -> Option<ResumableJob> {
let job_id = Self::is_valid_job_directory(&path).await?;
Self::try_build_resumable_job(checkpoint_manager, &job_id).await
}
async fn collect_resumable_jobs_from_dir(
jobs_dir: &std::path::Path,
checkpoint_manager: &CheckpointManager,
) -> Result<Vec<ResumableJob>> {
let mut resumable_jobs = Vec::new();
let mut entries = tokio::fs::read_dir(jobs_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if let Some(job) = Self::process_job_directory(entry.path(), checkpoint_manager).await {
resumable_jobs.push(job);
}
}
Ok(resumable_jobs)
}
async fn try_build_resumable_job(
checkpoint_manager: &CheckpointManager,
job_id: &str,
) -> Option<ResumableJob> {
let state = Self::load_job_checkpoint(checkpoint_manager, job_id).await?;
let checkpoints = checkpoint_manager
.list_checkpoints(job_id)
.await
.unwrap_or_default();
Self::build_resumable_job(job_id, state, checkpoints)
}
fn build_resumable_job(
job_id: &str,
state: MapReduceJobState,
checkpoints: Vec<CheckpointInfo>,
) -> Option<ResumableJob> {
if state.is_complete {
return None;
}
let latest_checkpoint = checkpoints
.into_iter()
.max_by_key(|c| c.version)
.map(|c| c.version)
.unwrap_or(0);
Some(ResumableJob {
job_id: job_id.to_string(),
started_at: state.started_at,
updated_at: state.updated_at,
total_items: state.total_items,
completed_items: state.successful_count,
failed_items: state.failed_count,
is_complete: false,
checkpoint_version: latest_checkpoint,
})
}
pub async fn resume_job_from_checkpoint(
&self,
job_id: &str,
checkpoint_version: Option<u32>,
) -> Result<Vec<AgentResult>> {
let state = self
.checkpoint_manager
.load_checkpoint_by_version(job_id, checkpoint_version)
.await?;
let results: Vec<AgentResult> = state.agent_results.values().cloned().collect();
let mut jobs = self.active_jobs.write().await;
jobs.insert(job_id.to_string(), state);
Ok(results)
}
pub async fn list_resumable_jobs_internal(&self) -> Result<Vec<ResumableJob>> {
let jobs_dir = self.checkpoint_manager.jobs_dir();
if !Self::ensure_jobs_dir_exists(&jobs_dir).await {
return Ok(Vec::new());
}
Self::collect_resumable_jobs_from_dir(&jobs_dir, &self.checkpoint_manager).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use tempfile::TempDir;
#[tokio::test]
async fn test_checkpoint_save_and_load() {
let temp_dir = TempDir::new().unwrap();
let manager = CheckpointManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
agent_timeout_secs: Some(300),
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
};
let work_items = vec![
serde_json::json!({"id": 1, "data": "test1"}),
serde_json::json!({"id": 2, "data": "test2"}),
];
let mut state = MapReduceJobState::new("test-job-1".to_string(), config, work_items);
state.update_agent_result(AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("test output".to_string()),
commits: vec![],
duration: std::time::Duration::from_secs(5),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
});
manager.save_checkpoint(&state).await.unwrap();
let loaded_state = manager.load_checkpoint("test-job-1").await.unwrap();
assert_eq!(loaded_state.job_id, "test-job-1");
assert_eq!(loaded_state.total_items, 2);
assert_eq!(loaded_state.successful_count, 1);
assert_eq!(loaded_state.completed_agents.len(), 1);
assert!(loaded_state.completed_agents.contains("item_0"));
}
#[tokio::test]
async fn test_checkpoint_cleanup() {
let temp_dir = TempDir::new().unwrap();
let manager = CheckpointManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
agent_timeout_secs: Some(300),
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
};
let mut state = MapReduceJobState::new("test-job-2".to_string(), config, vec![]);
for i in 0..5 {
state.checkpoint_version = i;
manager.save_checkpoint(&state).await.unwrap();
}
let checkpoints = manager.list_checkpoints("test-job-2").await.unwrap();
assert!(checkpoints.len() <= MAX_CHECKPOINTS);
assert_eq!(checkpoints[0].version, 4);
}
#[tokio::test]
async fn test_list_resumable_jobs() {
let temp_dir = tempfile::Builder::new()
.prefix(&format!(
"test-resumable-jobs-{}-",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
))
.tempdir()
.unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
agent_timeout_secs: Some(300),
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
};
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let template = vec![];
let reduce_commands = None;
let job1_id = manager
.create_job(
config.clone(),
work_items.clone(),
template.clone(),
reduce_commands.clone(),
)
.await
.unwrap();
let job2_id = manager
.create_job(config, work_items, template, reduce_commands)
.await
.unwrap();
manager.mark_job_complete(&job2_id).await.unwrap();
use Resumable;
let resumable = <DefaultJobStateManager as Resumable>::list_resumable_jobs(&manager)
.await
.unwrap();
assert_eq!(resumable.len(), 1);
assert_eq!(resumable[0].job_id, job1_id);
assert!(!resumable[0].is_complete);
}
#[tokio::test]
async fn test_job_state_manager() {
let temp_dir = TempDir::new().unwrap();
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = MapReduceConfig {
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
agent_timeout_secs: Some(300),
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
};
let work_items = vec![serde_json::json!({"id": 1}), serde_json::json!({"id": 2})];
let job_id = manager
.create_job(config, work_items, vec![], None)
.await
.unwrap();
let result = AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("output".to_string()),
commits: vec![],
duration: std::time::Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
};
manager.update_agent_result(&job_id, result).await.unwrap();
let state = manager.get_job_state(&job_id).await.unwrap();
assert_eq!(state.successful_count, 1);
manager.cleanup_job(&job_id).await.unwrap();
}
fn create_unique_temp_dir(prefix: &str) -> TempDir {
tempfile::Builder::new()
.prefix(&format!(
"{}-{}-",
prefix,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
))
.tempdir()
.unwrap()
}
fn create_test_config() -> MapReduceConfig {
MapReduceConfig {
input: "test.json".to_string(),
json_path: String::new(),
max_parallel: 5,
max_items: None,
offset: None,
agent_timeout_secs: Some(300),
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
}
}
#[tokio::test]
async fn test_list_resumable_empty_no_jobs_dir() {
let temp_dir = create_unique_temp_dir("test-empty-no-dir");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_empty_dir() {
let temp_dir = create_unique_temp_dir("test-empty-dir");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
tokio::fs::create_dir_all(manager.checkpoint_manager.jobs_dir())
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_only_files() {
let temp_dir = create_unique_temp_dir("test-only-files");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
tokio::fs::create_dir_all(&jobs_dir).await.unwrap();
tokio::fs::write(jobs_dir.join("file.txt"), "test")
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_invalid_metadata() {
let temp_dir = create_unique_temp_dir("test-invalid-meta");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
tokio::fs::create_dir_all(&jobs_dir).await.unwrap();
let job_dir = jobs_dir.join("job-1");
tokio::fs::create_dir(&job_dir).await.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert!(jobs.len() <= 1);
}
#[tokio::test]
async fn test_list_resumable_file_not_dir() {
let temp_dir = create_unique_temp_dir("test-file-not-dir");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
tokio::fs::create_dir_all(&jobs_dir).await.unwrap();
tokio::fs::write(jobs_dir.join("not-a-dir"), "content")
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_invalid_filename() {
let temp_dir = create_unique_temp_dir("test-invalid-filename");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
tokio::fs::create_dir_all(&jobs_dir).await.unwrap();
let job_dir = jobs_dir.join("valid-job-id");
tokio::fs::create_dir(&job_dir).await.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_invalid_checkpoint() {
let temp_dir = create_unique_temp_dir("test-invalid-checkpoint");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
let job_dir = jobs_dir.join("job-invalid");
tokio::fs::create_dir_all(&job_dir).await.unwrap();
let checkpoint_file = job_dir.join("checkpoint-0.json");
tokio::fs::write(checkpoint_file, "invalid json")
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_complete_job() {
let temp_dir = create_unique_temp_dir("test-complete-job");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let work_items = vec![json!({"id": 1})];
let job_id = manager
.create_job(config, work_items, vec![], None)
.await
.unwrap();
manager.mark_job_complete(&job_id).await.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_empty_checkpoint_list() {
let temp_dir = create_unique_temp_dir("test-empty-checkpoints");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let work_items = vec![json!({"id": 1})];
manager
.create_job(config, work_items, vec![], None)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].checkpoint_version, 0);
}
#[tokio::test]
async fn test_list_resumable_max_checkpoint_version() {
let temp_dir = create_unique_temp_dir("test-max-checkpoint");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let work_items = vec![json!({"id": 1}), json!({"id": 2})];
let job_id = manager
.create_job(config.clone(), work_items.clone(), vec![], None)
.await
.unwrap();
let mut state = manager.get_job_state(&job_id).await.unwrap();
for i in 1..4 {
state.checkpoint_version = i;
manager
.checkpoint_manager
.save_checkpoint(&state)
.await
.unwrap();
}
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].checkpoint_version, 3);
}
#[tokio::test]
async fn test_list_resumable_multiple_mixed_jobs() {
let temp_dir = create_unique_temp_dir("test-mixed-jobs");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let incomplete_job = manager
.create_job(config.clone(), vec![json!({"id": 1})], vec![], None)
.await
.unwrap();
let complete_job = manager
.create_job(config.clone(), vec![json!({"id": 2})], vec![], None)
.await
.unwrap();
manager.mark_job_complete(&complete_job).await.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].job_id, incomplete_job);
}
#[tokio::test]
async fn test_list_resumable_special_chars_in_name() {
let temp_dir = create_unique_temp_dir("test-special-chars");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let _job_id = manager
.create_job(config, vec![json!({"id": 1})], vec![], None)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert!(jobs[0].job_id.contains("mapreduce"));
}
#[tokio::test]
async fn test_list_resumable_many_jobs() {
let temp_dir = create_unique_temp_dir("test-many-jobs");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
for _ in 0..50 {
manager
.create_job(config.clone(), vec![json!({"id": 1})], vec![], None)
.await
.unwrap();
}
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 50);
}
#[tokio::test]
async fn test_list_resumable_metadata_missing() {
let temp_dir = create_unique_temp_dir("test-no-metadata");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
let job_dir = jobs_dir.join("job-no-metadata");
tokio::fs::create_dir_all(&job_dir).await.unwrap();
let config = create_test_config();
let state = MapReduceJobState::new(
"job-no-metadata".to_string(),
config,
vec![json!({"id": 1})],
);
let checkpoint_json = serde_json::to_string(&state).unwrap();
tokio::fs::write(job_dir.join("checkpoint-v0.json"), checkpoint_json)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_checkpoints_but_metadata_invalid() {
let temp_dir = create_unique_temp_dir("test-invalid-metadata");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let jobs_dir = manager.checkpoint_manager.jobs_dir();
let job_dir = jobs_dir.join("job-bad-metadata");
tokio::fs::create_dir_all(&job_dir).await.unwrap();
let config = create_test_config();
let state = MapReduceJobState::new(
"job-bad-metadata".to_string(),
config,
vec![json!({"id": 1})],
);
let checkpoint_json = serde_json::to_string(&state).unwrap();
tokio::fs::write(job_dir.join("checkpoint-v0.json"), checkpoint_json)
.await
.unwrap();
tokio::fs::write(job_dir.join("metadata.json"), "bad json")
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_list_resumable_mixed_checkpoint_versions() {
let temp_dir = create_unique_temp_dir("test-mixed-versions");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let job1 = manager
.create_job(config.clone(), vec![json!({"id": 1})], vec![], None)
.await
.unwrap();
let job2 = manager
.create_job(config, vec![json!({"id": 2})], vec![], None)
.await
.unwrap();
let mut state = manager.get_job_state(&job2).await.unwrap();
state.checkpoint_version = 5;
manager
.checkpoint_manager
.save_checkpoint(&state)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 2);
let j1 = jobs.iter().find(|j| j.job_id == job1).unwrap();
let j2 = jobs.iter().find(|j| j.job_id == job2).unwrap();
assert_eq!(j1.checkpoint_version, 0);
assert_eq!(j2.checkpoint_version, 5);
}
#[tokio::test]
async fn test_list_resumable_zero_items() {
let temp_dir = create_unique_temp_dir("test-zero-items");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let _job_id = manager
.create_job(config, vec![], vec![], None)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].total_items, 0);
assert_eq!(jobs[0].completed_items, 0);
}
#[tokio::test]
async fn test_list_resumable_high_checkpoint_version() {
let temp_dir = create_unique_temp_dir("test-high-version");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let job_id = manager
.create_job(config, vec![json!({"id": 1})], vec![], None)
.await
.unwrap();
let mut state = manager.get_job_state(&job_id).await.unwrap();
state.checkpoint_version = u32::MAX - 1;
manager
.checkpoint_manager
.save_checkpoint(&state)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].checkpoint_version, u32::MAX - 1);
}
#[tokio::test]
async fn test_list_resumable_partial_failures() {
let temp_dir = create_unique_temp_dir("test-partial-failures");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let work_items = vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})];
let job_id = manager
.create_job(config, work_items, vec![], None)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_0".to_string(),
status: AgentStatus::Success,
output: Some("success".to_string()),
commits: vec![],
duration: std::time::Duration::from_secs(1),
error: None,
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
manager
.update_agent_result(
&job_id,
AgentResult {
item_id: "item_1".to_string(),
status: AgentStatus::Failed("test error".to_string()),
output: None,
commits: vec![],
duration: std::time::Duration::from_secs(1),
error: Some("test error".to_string()),
worktree_path: None,
branch_name: None,
worktree_session_id: None,
files_modified: vec![],
json_log_location: None,
cleanup_status: None,
},
)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].completed_items, 1);
assert_eq!(jobs[0].failed_items, 1);
assert_eq!(jobs[0].total_items, 3);
}
#[tokio::test]
async fn test_list_resumable_recent_vs_old_jobs() {
let temp_dir = create_unique_temp_dir("test-timestamps");
let manager = DefaultJobStateManager::new(temp_dir.path().to_path_buf());
let config = create_test_config();
let old_job = manager
.create_job(config.clone(), vec![json!({"id": 1})], vec![], None)
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let new_job = manager
.create_job(config, vec![json!({"id": 2})], vec![], None)
.await
.unwrap();
let jobs = manager.list_resumable_jobs_internal().await.unwrap();
assert_eq!(jobs.len(), 2);
let old = jobs.iter().find(|j| j.job_id == old_job).unwrap();
let new = jobs.iter().find(|j| j.job_id == new_job).unwrap();
assert!(new.started_at >= old.started_at);
}
}