use super::dlq::DeadLetterQueue;
use super::errors::MapReduceError;
use super::errors::MapReduceResult as MRResult;
use super::events::{EventLogger, MapReduceEvent};
use super::mapreduce::{AgentResult, MapPhase, MapReduceExecutor, ReducePhase};
use super::state::{JobStateManager, MapReduceJobState};
use crate::cook::orchestrator::ExecutionEnvironment;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnhancedResumeOptions {
pub force: bool,
pub max_additional_retries: u32,
pub skip_validation: bool,
pub from_checkpoint: Option<u32>,
pub max_parallel: Option<usize>,
pub force_recreation: bool,
pub include_dlq_items: bool,
pub validate_environment: bool,
pub reset_failed_agents: bool,
}
impl Default for EnhancedResumeOptions {
fn default() -> Self {
Self {
force: false,
max_additional_retries: 2,
skip_validation: false,
from_checkpoint: None,
max_parallel: None,
force_recreation: false,
include_dlq_items: true,
validate_environment: true,
reset_failed_agents: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MapReducePhase {
Setup,
Map,
Reduce,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PhaseResult {
pub phase: MapReducePhase,
pub completed_at: DateTime<Utc>,
pub success: bool,
pub items_processed: usize,
pub output: Option<Value>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeMetadata {
pub original_start_time: DateTime<Utc>,
pub last_checkpoint_time: DateTime<Utc>,
pub resume_attempts: u32,
pub interruption_reason: Option<String>,
pub environment_snapshot: EnvironmentSnapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvironmentSnapshot {
pub working_directory: PathBuf,
pub project_root: PathBuf,
pub git_branch: Option<String>,
pub git_commit: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MapReduceResumeState {
pub job_id: String,
pub current_phase: MapReducePhase,
pub completed_items: HashSet<String>,
pub failed_items: Vec<WorkItem>,
pub agent_assignments: HashMap<String, String>, pub phase_results: HashMap<String, PhaseResult>, pub resume_metadata: ResumeMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkItem {
pub id: String,
pub data: Value,
pub retry_count: u32,
pub last_error: Option<String>,
}
#[derive(Debug, Clone)]
pub enum EnhancedResumeResult {
MapOnlyCompleted(MapResult),
FullWorkflowCompleted(FullMapReduceResult),
PartialResume {
phase: MapReducePhase,
progress: f64,
},
ReadyToExecute {
phase: MapReducePhase,
map_phase: Option<Box<MapPhase>>,
reduce_phase: Option<Box<ReducePhase>>,
remaining_items: Box<Vec<Value>>,
state: Box<MapReduceJobState>,
},
}
#[derive(Debug, Clone)]
pub struct MapResult {
pub successful: usize,
pub failed: usize,
pub total: usize,
pub results: Vec<AgentResult>,
}
#[derive(Debug, Clone)]
pub struct FullMapReduceResult {
pub map_result: MapResult,
pub reduce_result: Option<Value>,
}
pub struct MapReduceResumeManager {
state_manager: Arc<dyn JobStateManager>,
event_logger: Arc<EventLogger>,
dlq: Arc<DeadLetterQueue>,
executor: Option<Arc<MapReduceExecutor>>,
lock_manager: super::resume_lock::ResumeLockManager,
}
impl MapReduceResumeManager {
pub async fn new(
job_id: String,
state_manager: Arc<dyn JobStateManager>,
event_logger: Arc<EventLogger>,
project_root: PathBuf,
) -> anyhow::Result<Self> {
let dlq = Arc::new(
DeadLetterQueue::new(
job_id,
project_root.clone(),
1000, 30, Some(event_logger.clone()), )
.await?,
);
let storage_dir = crate::storage::get_default_storage_dir()
.map_err(|e| anyhow::anyhow!("Failed to get storage directory: {}", e))?;
let lock_manager = super::resume_lock::ResumeLockManager::new(storage_dir)
.map_err(|e| anyhow::anyhow!("Failed to create lock manager: {}", e))?;
Ok(Self {
state_manager,
event_logger,
dlq,
executor: None,
lock_manager,
})
}
pub fn set_executor(&mut self, executor: Arc<MapReduceExecutor>) {
self.executor = Some(executor);
}
pub async fn resume_job(
&self,
job_id: &str,
options: EnhancedResumeOptions,
env: &ExecutionEnvironment,
) -> MRResult<EnhancedResumeResult> {
let _lock = self.lock_manager.acquire_lock(job_id).await.map_err(|e| {
MapReduceError::from_anyhow(anyhow::anyhow!("Failed to acquire resume lock: {}", e))
})?;
info!("Starting enhanced resume for job {}", job_id);
let mut job_state = self.load_and_validate_state(job_id, &options).await?;
self.validate_resume_conditions(&job_state, &options, env)
.await?;
let remaining_items = self
.calculate_remaining_items(&mut job_state, &options)
.await?;
let current_phase = self.determine_current_phase(&job_state);
info!(
"Resume state: phase={:?}, completed={}, remaining={}",
current_phase,
job_state.completed_agents.len(),
remaining_items.len()
);
self.event_logger
.log(MapReduceEvent::JobResumed {
job_id: job_id.to_string(),
checkpoint_version: job_state.checkpoint_version,
pending_items: remaining_items.len(),
})
.await
.unwrap_or_else(|e| warn!("Failed to log resume event: {}", e));
match current_phase {
MapReducePhase::Setup => {
self.resume_from_setup(&mut job_state, remaining_items, env, &options)
.await
}
MapReducePhase::Map => {
self.resume_from_map(&mut job_state, remaining_items, env, &options)
.await
}
MapReducePhase::Reduce => self.resume_from_reduce(&mut job_state, env, &options).await,
}
}
async fn load_and_validate_state(
&self,
job_id: &str,
options: &EnhancedResumeOptions,
) -> MRResult<MapReduceJobState> {
let state = if let Some(version) = options.from_checkpoint {
self.state_manager
.get_job_state_from_checkpoint(job_id, Some(version))
.await
.map_err(|e| MapReduceError::CheckpointLoadFailed {
job_id: job_id.to_string(),
details: e.to_string(),
})?
} else {
self.state_manager
.get_job_state(job_id)
.await
.map_err(|e| MapReduceError::CheckpointLoadFailed {
job_id: job_id.to_string(),
details: e.to_string(),
})?
};
if !options.skip_validation {
self.validate_checkpoint_integrity(&state)?;
}
Ok(state)
}
fn validate_checkpoint_integrity(&self, state: &MapReduceJobState) -> MRResult<()> {
if state.job_id.is_empty() {
return Err(MapReduceError::CheckpointCorrupted {
job_id: "<empty>".to_string(),
version: state.checkpoint_version,
details: "Empty job ID".to_string(),
});
}
if state.work_items.is_empty() {
return Err(MapReduceError::CheckpointCorrupted {
job_id: state.job_id.clone(),
version: state.checkpoint_version,
details: "No work items found".to_string(),
});
}
let total_processed = state.completed_agents.len() + state.failed_agents.len();
if total_processed > state.total_items {
return Err(MapReduceError::CheckpointCorrupted {
job_id: state.job_id.clone(),
version: state.checkpoint_version,
details: format!(
"Processed count {} exceeds total items {}",
total_processed, state.total_items
),
});
}
Ok(())
}
async fn validate_resume_conditions(
&self,
state: &MapReduceJobState,
options: &EnhancedResumeOptions,
env: &ExecutionEnvironment,
) -> MRResult<()> {
if state.is_complete && !options.force {
info!("Job {} is already complete, skipping resume", state.job_id);
return Ok(());
}
if options.validate_environment {
self.validate_environment_consistency(state, env)?;
}
Ok(())
}
fn validate_environment_consistency(
&self,
_state: &MapReduceJobState,
env: &ExecutionEnvironment,
) -> MRResult<()> {
if !env.working_dir.exists() {
return Err(MapReduceError::EnvironmentError {
details: format!("Working directory {:?} does not exist", env.working_dir),
});
}
Ok(())
}
pub async fn calculate_remaining_items(
&self,
state: &mut MapReduceJobState,
options: &EnhancedResumeOptions,
) -> MRResult<Vec<Value>> {
use super::mapreduce::resume_collection::{
collect_failed_items, collect_pending_items, combine_work_items,
};
use super::mapreduce::resume_deduplication::{count_duplicates, deduplicate_work_items};
let pending = collect_pending_items(state);
let failed = if options.reset_failed_agents {
collect_failed_items(state, options.max_additional_retries)
} else {
Vec::new()
};
let dlq = if options.include_dlq_items {
self.load_dlq_items(&state.job_id).await?
} else {
Vec::new()
};
let combined = combine_work_items(pending.clone(), failed.clone(), dlq.clone());
let duplicate_count = count_duplicates(&combined);
if duplicate_count > 0 {
warn!(
"Found {} duplicate work items across resume sources (pending: {}, failed: {}, dlq: {})",
duplicate_count,
pending.len(),
failed.len(),
dlq.len()
);
}
let deduped = deduplicate_work_items(combined);
info!(
"Resume work items: {} total, {} unique after deduplication",
pending.len() + failed.len() + dlq.len(),
deduped.len()
);
if state.item_retry_counts.is_empty() {
use super::mapreduce::retry_tracking;
let dlq_items = if options.include_dlq_items {
self.load_all_dlq_items(&state.job_id).await?
} else {
vec![]
};
state.item_retry_counts =
retry_tracking::merge_retry_counts(&state.failed_agents, &dlq_items);
debug!(
"Initialized retry counts for {} items from checkpoint state",
state.item_retry_counts.len()
);
}
Ok(deduped)
}
async fn load_dlq_items(&self, _job_id: &str) -> MRResult<Vec<Value>> {
use super::dlq::DLQFilter;
let filter = DLQFilter {
reprocess_eligible: Some(true),
error_type: None,
after: None,
before: None,
error_signature: None,
};
match self.dlq.list_items(filter).await {
Ok(items) => {
let values: Vec<Value> = items.into_iter().map(|item| item.item_data).collect();
info!("Loaded {} items from DLQ for job {}", values.len(), _job_id);
Ok(values)
}
Err(e) => {
warn!("Failed to load DLQ items for job {}: {}", _job_id, e);
Ok(Vec::new())
}
}
}
async fn load_all_dlq_items(
&self,
_job_id: &str,
) -> MRResult<Vec<super::dlq::DeadLetteredItem>> {
use super::dlq::DLQFilter;
let filter = DLQFilter {
reprocess_eligible: None,
error_type: None,
after: None,
before: None,
error_signature: None,
};
match self.dlq.list_items(filter).await {
Ok(items) => {
debug!(
"Loaded {} DLQ items for retry count initialization",
items.len()
);
Ok(items)
}
Err(e) => {
warn!("Failed to load DLQ items for job {}: {}", _job_id, e);
Ok(Vec::new())
}
}
}
fn determine_current_phase(&self, state: &MapReduceJobState) -> MapReducePhase {
if let Some(ref reduce_state) = state.reduce_phase_state {
if reduce_state.started {
return MapReducePhase::Reduce;
}
}
if state.completed_agents.len() < state.total_items {
return MapReducePhase::Map;
}
if state.reduce_commands.is_some()
&& (state.reduce_phase_state.is_none()
|| !state.reduce_phase_state.as_ref().is_some_and(|s| s.started))
{
return MapReducePhase::Reduce;
}
MapReducePhase::Map
}
async fn resume_from_setup(
&self,
state: &mut MapReduceJobState,
remaining_items: Vec<Value>,
env: &ExecutionEnvironment,
options: &EnhancedResumeOptions,
) -> MRResult<EnhancedResumeResult> {
info!(
"Resuming from setup phase with {} items",
remaining_items.len()
);
self.resume_from_map(state, remaining_items, env, options)
.await
}
async fn resume_from_map(
&self,
state: &mut MapReduceJobState,
remaining_items: Vec<Value>,
env: &ExecutionEnvironment,
options: &EnhancedResumeOptions,
) -> MRResult<EnhancedResumeResult> {
info!(
"Resuming map phase with {} remaining items",
remaining_items.len()
);
if remaining_items.is_empty() {
if state.reduce_commands.is_some() {
return self.resume_from_reduce(state, env, options).await;
} else {
let results: Vec<AgentResult> = state.agent_results.values().cloned().collect();
return Ok(EnhancedResumeResult::MapOnlyCompleted(MapResult {
successful: state.successful_count,
failed: state.failed_count,
total: state.total_items,
results,
}));
}
}
let map_phase = MapPhase {
config: state.config.clone(),
json_path: None,
agent_template: state.agent_template.clone(),
filter: None,
sort_by: None,
max_items: None,
distinct: None,
timeout_config: None,
workflow_env: std::collections::HashMap::new(),
};
Ok(EnhancedResumeResult::ReadyToExecute {
phase: MapReducePhase::Map,
map_phase: Some(Box::new(map_phase)),
reduce_phase: state.reduce_commands.as_ref().map(|commands| {
Box::new(ReducePhase {
commands: commands.clone(),
timeout_secs: None,
})
}),
remaining_items: Box::new(remaining_items),
state: Box::new(state.clone()),
})
}
async fn resume_from_reduce(
&self,
state: &mut MapReduceJobState,
_env: &ExecutionEnvironment,
_options: &EnhancedResumeOptions,
) -> MRResult<EnhancedResumeResult> {
info!("Resuming from reduce phase");
if let Some(reduce_state) = &state.reduce_phase_state {
if reduce_state.completed {
let results: Vec<AgentResult> = state.agent_results.values().cloned().collect();
return Ok(EnhancedResumeResult::FullWorkflowCompleted(
FullMapReduceResult {
map_result: MapResult {
successful: state.successful_count,
failed: state.failed_count,
total: state.total_items,
results,
},
reduce_result: reduce_state
.output
.as_ref()
.and_then(|s| serde_json::from_str(s).ok()),
},
));
}
}
if state.reduce_phase_state.is_some()
&& !state
.reduce_phase_state
.as_ref()
.is_none_or(|s| s.completed)
{
Ok(EnhancedResumeResult::ReadyToExecute {
phase: MapReducePhase::Reduce,
map_phase: None,
reduce_phase: state.reduce_commands.as_ref().map(|commands| {
Box::new(ReducePhase {
commands: commands.clone(),
timeout_secs: None,
})
}),
remaining_items: Box::new(Vec::new()), state: Box::new(state.clone()),
})
} else {
let results: Vec<AgentResult> = state.agent_results.values().cloned().collect();
Ok(EnhancedResumeResult::MapOnlyCompleted(MapResult {
successful: state.successful_count,
failed: state.failed_count,
total: state.total_items,
results,
}))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_enhanced_resume_options_default() {
let options = EnhancedResumeOptions::default();
assert!(!options.force);
assert_eq!(options.max_additional_retries, 2);
assert!(!options.skip_validation);
assert!(options.from_checkpoint.is_none());
assert!(options.include_dlq_items);
assert!(options.validate_environment);
}
#[test]
fn test_work_item_serialization() {
let item = WorkItem {
id: "test-item".to_string(),
data: serde_json::json!({"value": 42}),
retry_count: 1,
last_error: Some("Test error".to_string()),
};
let json = serde_json::to_string(&item).unwrap();
let deserialized: WorkItem = serde_json::from_str(&json).unwrap();
assert_eq!(item.id, deserialized.id);
assert_eq!(item.retry_count, deserialized.retry_count);
assert_eq!(item.last_error, deserialized.last_error);
}
async fn create_test_state(job_id: &str, completed: usize, total: usize) -> MapReduceJobState {
use crate::cook::execution::mapreduce::MapReduceConfig;
use std::collections::HashSet;
let config = MapReduceConfig {
input: "test.json".to_string(),
json_path: "$.items[*]".to_string(),
max_parallel: 5,
agent_timeout_secs: None,
continue_on_failure: false,
batch_size: None,
enable_checkpoints: true,
max_items: None,
offset: None,
};
let mut completed_agents = HashSet::new();
let mut agent_results = HashMap::new();
let mut work_items = Vec::new();
for i in 0..total {
work_items.push(serde_json::json!({"id": i}));
}
for i in 0..completed {
let agent_id = format!("agent-{}", i);
completed_agents.insert(agent_id.clone());
agent_results.insert(
agent_id.clone(),
AgentResult {
item_id: format!("item_{}", i),
status: crate::cook::execution::mapreduce::AgentStatus::Success,
output: Some(format!("Result {}", i)),
commits: vec![],
files_modified: vec![],
branch_name: None,
worktree_session_id: None,
duration: std::time::Duration::from_secs(10),
error: None,
worktree_path: Some(std::path::PathBuf::from("<test-worktree-path>")),
json_log_location: None,
cleanup_status: None,
},
);
}
let pending_items: Vec<String> =
(completed..total).map(|i| format!("item_{}", i)).collect();
MapReduceJobState {
job_id: job_id.to_string(),
config,
started_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
work_items,
agent_results,
completed_agents,
failed_agents: HashMap::new(),
pending_items,
checkpoint_version: 1,
checkpoint_format_version: 1,
parent_worktree: None,
reduce_phase_state: None,
total_items: total,
successful_count: completed,
failed_count: 0,
is_complete: false,
agent_template: vec![],
reduce_commands: None,
variables: HashMap::new(),
setup_output: None,
setup_completed: false,
item_retry_counts: HashMap::new(),
}
}
#[tokio::test]
async fn test_calculate_remaining_items() {
use crate::cook::execution::state::DefaultJobStateManager;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let state_manager = Arc::new(DefaultJobStateManager::new(temp_dir.path().to_path_buf()));
let event_logger = Arc::new(crate::cook::execution::events::EventLogger::new(vec![]));
let manager = MapReduceResumeManager::new(
"test-job".to_string(),
state_manager,
event_logger,
temp_dir.path().to_path_buf(),
)
.await
.unwrap();
let mut state = create_test_state("test-job", 3, 5).await;
assert_eq!(state.pending_items.len(), 2, "Should have 2 pending items");
let options = EnhancedResumeOptions::default();
let remaining = manager
.calculate_remaining_items(&mut state, &options)
.await
.unwrap();
assert_eq!(remaining.len(), 2, "Should have 2 remaining work items");
}
#[tokio::test]
async fn test_resume_from_map_empty_items() {
use crate::cook::execution::state::DefaultJobStateManager;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let state_manager = Arc::new(DefaultJobStateManager::new(temp_dir.path().to_path_buf()));
let event_logger = Arc::new(crate::cook::execution::events::EventLogger::new(vec![]));
let manager = MapReduceResumeManager::new(
"test-job".to_string(),
state_manager,
event_logger,
temp_dir.path().to_path_buf(),
)
.await
.unwrap();
let mut state = create_test_state("test-job", 5, 5).await;
let env = ExecutionEnvironment {
working_dir: Arc::new(std::path::PathBuf::from("/tmp")),
project_dir: Arc::new(std::path::PathBuf::from("/tmp")),
worktree_name: None,
session_id: Arc::from("test-session"),
};
let options = EnhancedResumeOptions::default();
let result = manager
.resume_from_map(&mut state, vec![], &env, &options)
.await
.unwrap();
match result {
EnhancedResumeResult::MapOnlyCompleted(map_result) => {
assert_eq!(map_result.successful, 5);
assert_eq!(map_result.failed, 0);
assert_eq!(map_result.total, 5);
}
_ => panic!("Expected MapOnlyCompleted result"),
}
}
#[tokio::test]
async fn test_resume_from_map_with_remaining() {
use crate::cook::execution::state::DefaultJobStateManager;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let state_manager = Arc::new(DefaultJobStateManager::new(temp_dir.path().to_path_buf()));
let event_logger = Arc::new(crate::cook::execution::events::EventLogger::new(vec![]));
let manager = MapReduceResumeManager::new(
"test-job".to_string(),
state_manager,
event_logger,
temp_dir.path().to_path_buf(),
)
.await
.unwrap();
let mut state = create_test_state("test-job", 3, 5).await;
let env = ExecutionEnvironment {
working_dir: Arc::new(std::path::PathBuf::from("/tmp")),
project_dir: Arc::new(std::path::PathBuf::from("/tmp")),
worktree_name: None,
session_id: Arc::from("test-session"),
};
let options = EnhancedResumeOptions::default();
let remaining_items = vec![serde_json::json!({"id": 3}), serde_json::json!({"id": 4})];
let result = manager
.resume_from_map(&mut state, remaining_items.clone(), &env, &options)
.await
.unwrap();
match result {
EnhancedResumeResult::ReadyToExecute {
phase,
map_phase,
remaining_items: items,
..
} => {
assert_eq!(phase, MapReducePhase::Map);
assert!(map_phase.is_some());
assert_eq!(items.len(), 2);
}
_ => panic!("Expected ReadyToExecute result"),
}
}
#[tokio::test]
async fn test_resume_from_reduce_completed() {
use crate::cook::execution::state::DefaultJobStateManager;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let state_manager = Arc::new(DefaultJobStateManager::new(temp_dir.path().to_path_buf()));
let event_logger = Arc::new(crate::cook::execution::events::EventLogger::new(vec![]));
let manager = MapReduceResumeManager::new(
"test-job".to_string(),
state_manager,
event_logger,
temp_dir.path().to_path_buf(),
)
.await
.unwrap();
let mut state = create_test_state("test-job", 5, 5).await;
state.reduce_phase_state = Some(crate::cook::execution::state::ReducePhaseState {
started: true,
completed: true,
output: Some(r#"{"summary": "all done"}"#.to_string()),
error: None,
executed_commands: vec![],
started_at: Some(chrono::Utc::now()),
completed_at: Some(chrono::Utc::now()),
});
let env = ExecutionEnvironment {
working_dir: Arc::new(std::path::PathBuf::from("/tmp")),
project_dir: Arc::new(std::path::PathBuf::from("/tmp")),
worktree_name: None,
session_id: Arc::from("test-session"),
};
let options = EnhancedResumeOptions::default();
let result = manager
.resume_from_reduce(&mut state, &env, &options)
.await
.unwrap();
match result {
EnhancedResumeResult::FullWorkflowCompleted(full_result) => {
assert_eq!(full_result.map_result.successful, 5);
assert!(full_result.reduce_result.is_some());
}
_ => panic!("Expected FullWorkflowCompleted result"),
}
}
#[tokio::test]
async fn test_load_and_validate_state() {
use crate::cook::execution::state::DefaultJobStateManager;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let state_manager = Arc::new(DefaultJobStateManager::new(temp_dir.path().to_path_buf()));
let event_logger = Arc::new(crate::cook::execution::events::EventLogger::new(vec![]));
let manager = MapReduceResumeManager::new(
"test-job".to_string(),
state_manager.clone(),
event_logger,
temp_dir.path().to_path_buf(),
)
.await
.unwrap();
let test_state = create_test_state("test-job", 5, 10).await;
let created_job_id = state_manager
.create_job(
test_state.config.clone(),
test_state.work_items.clone(),
test_state.agent_template.clone(),
test_state.reduce_commands.clone(),
)
.await
.unwrap();
let loaded_from_manager = state_manager.get_job_state(&created_job_id).await.unwrap();
assert_eq!(loaded_from_manager.job_id, created_job_id);
assert_eq!(loaded_from_manager.total_items, 10);
let options = EnhancedResumeOptions::default();
let result = manager
.load_and_validate_state(&created_job_id, &options)
.await;
if result.is_err() {
assert_eq!(created_job_id, "test-job");
} else {
let state = result.unwrap();
assert_eq!(state.job_id, created_job_id);
}
}
}