use super::{JobState, PhaseType, StateError, StateEvent, StateEventType, StateManager};
use chrono::Utc;
use tracing::{debug, error, info};
impl StateManager {
pub async fn transition_to_phase(
&self,
job_id: &str,
new_phase: PhaseType,
) -> Result<(), StateError> {
let state = self
.get_state(job_id)
.await?
.ok_or_else(|| StateError::NotFound(job_id.to_string()))?;
let old_phase = state.phase;
if !self.transitions.is_valid_transition(old_phase, new_phase) {
return Err(StateError::InvalidTransition {
from: old_phase,
to: new_phase,
});
}
self.update_state(job_id, |state| {
state.phase = new_phase;
match new_phase {
PhaseType::Completed => {
state.is_complete = true;
info!("Job {} completed successfully", job_id);
}
PhaseType::Failed => {
state.is_complete = true;
error!("Job {} failed", job_id);
}
_ => {}
}
Ok(())
})
.await?;
debug!(
"Job {} transitioned from {:?} to {:?}",
job_id, old_phase, new_phase
);
Ok(())
}
pub async fn mark_job_started(&self, job_id: &str) -> Result<(), StateError> {
self.transition_to_phase(job_id, PhaseType::Map).await
}
pub async fn mark_reduce_started(&self, job_id: &str) -> Result<(), StateError> {
self.transition_to_phase(job_id, PhaseType::Reduce).await
}
pub async fn mark_job_completed(&self, job_id: &str) -> Result<(), StateError> {
self.update_state(job_id, |state| {
let valid_completion = matches!(state.phase, PhaseType::Map | PhaseType::Reduce);
if !valid_completion {
return Err(StateError::InvalidTransition {
from: state.phase,
to: PhaseType::Completed,
});
}
state.phase = PhaseType::Completed;
state.is_complete = true;
Ok(())
})
.await?;
self.log_event(StateEvent {
timestamp: Utc::now(),
event_type: StateEventType::JobCompleted,
job_id: job_id.to_string(),
details: None,
})
.await;
info!("Job {} marked as completed", job_id);
Ok(())
}
pub async fn mark_job_failed(&self, job_id: &str, reason: String) -> Result<(), StateError> {
self.update_state(job_id, |state| {
if state.phase == PhaseType::Completed || state.phase == PhaseType::Failed {
return Err(StateError::InvalidTransition {
from: state.phase,
to: PhaseType::Failed,
});
}
state.phase = PhaseType::Failed;
state.is_complete = true;
Ok(())
})
.await?;
self.log_event(StateEvent {
timestamp: Utc::now(),
event_type: StateEventType::JobFailed {
reason: reason.clone(),
},
job_id: job_id.to_string(),
details: Some(reason),
})
.await;
error!("Job {} marked as failed", job_id);
Ok(())
}
pub async fn get_valid_transitions(&self, job_id: &str) -> Result<Vec<PhaseType>, StateError> {
let state = self
.get_state(job_id)
.await?
.ok_or_else(|| StateError::NotFound(job_id.to_string()))?;
Ok(self.transitions.get_valid_transitions(state.phase))
}
pub async fn can_transition(
&self,
job_id: &str,
to_phase: PhaseType,
) -> Result<bool, StateError> {
let state = self
.get_state(job_id)
.await?
.ok_or_else(|| StateError::NotFound(job_id.to_string()))?;
Ok(self.transitions.is_valid_transition(state.phase, to_phase))
}
}
impl JobState {
pub fn is_terminal(&self) -> bool {
matches!(self.phase, PhaseType::Completed | PhaseType::Failed)
}
pub fn can_resume(&self) -> bool {
!self.is_terminal() && self.checkpoint.is_some()
}
pub fn is_setup(&self) -> bool {
matches!(self.phase, PhaseType::Setup)
}
pub fn is_map(&self) -> bool {
matches!(self.phase, PhaseType::Map)
}
pub fn is_reduce(&self) -> bool {
matches!(self.phase, PhaseType::Reduce)
}
pub fn status_string(&self) -> String {
match self.phase {
PhaseType::Setup => "Setting up".to_string(),
PhaseType::Map => format!(
"Mapping ({}/{} items)",
self.processed_items.len(),
self.total_items
),
PhaseType::Reduce => "Reducing results".to_string(),
PhaseType::Completed => "Completed".to_string(),
PhaseType::Failed => "Failed".to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cook::execution::mapreduce::state::persistence::InMemoryStateStore;
use crate::cook::execution::mapreduce::MapReduceConfig;
use std::sync::Arc;
#[tokio::test]
async fn test_valid_transitions() {
let store = Arc::new(InMemoryStateStore::new());
let manager = StateManager::new(store);
let config = MapReduceConfig::default();
let job_id = "test-job-transitions".to_string();
manager.create_job(&config, job_id.clone()).await.unwrap();
manager.mark_job_started(&job_id).await.unwrap();
let state = manager.get_state(&job_id).await.unwrap().unwrap();
assert_eq!(state.phase, PhaseType::Map);
manager.mark_reduce_started(&job_id).await.unwrap();
let state = manager.get_state(&job_id).await.unwrap().unwrap();
assert_eq!(state.phase, PhaseType::Reduce);
manager.mark_job_completed(&job_id).await.unwrap();
let state = manager.get_state(&job_id).await.unwrap().unwrap();
assert_eq!(state.phase, PhaseType::Completed);
assert!(state.is_complete);
}
#[tokio::test]
async fn test_invalid_transitions() {
let store = Arc::new(InMemoryStateStore::new());
let manager = StateManager::new(store);
let config = MapReduceConfig::default();
let job_id = "test-job-invalid".to_string();
manager.create_job(&config, job_id.clone()).await.unwrap();
let result = manager
.transition_to_phase(&job_id, PhaseType::Reduce)
.await;
assert!(result.is_err());
let result = manager.mark_job_completed(&job_id).await;
assert!(result.is_err());
let state = manager.get_state(&job_id).await.unwrap().unwrap();
assert_eq!(state.phase, PhaseType::Setup);
}
#[tokio::test]
async fn test_terminal_states() {
let store = Arc::new(InMemoryStateStore::new());
let manager = StateManager::new(store);
let config = MapReduceConfig::default();
let job_id = "test-job-terminal".to_string();
manager.create_job(&config, job_id.clone()).await.unwrap();
manager.mark_job_started(&job_id).await.unwrap();
manager.mark_job_completed(&job_id).await.unwrap();
let result = manager.transition_to_phase(&job_id, PhaseType::Map).await;
assert!(result.is_err());
let job_id2 = "test-job-failed".to_string();
manager.create_job(&config, job_id2.clone()).await.unwrap();
manager
.mark_job_failed(&job_id2, "Test failure".to_string())
.await
.unwrap();
let result = manager.transition_to_phase(&job_id2, PhaseType::Map).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_valid_transitions() {
let store = Arc::new(InMemoryStateStore::new());
let manager = StateManager::new(store);
let config = MapReduceConfig::default();
let job_id = "test-job-valid-trans".to_string();
manager.create_job(&config, job_id.clone()).await.unwrap();
let transitions = manager.get_valid_transitions(&job_id).await.unwrap();
assert!(transitions.contains(&PhaseType::Map));
assert!(transitions.contains(&PhaseType::Failed));
assert_eq!(transitions.len(), 2);
manager.mark_job_started(&job_id).await.unwrap();
let transitions = manager.get_valid_transitions(&job_id).await.unwrap();
assert!(transitions.contains(&PhaseType::Reduce));
assert!(transitions.contains(&PhaseType::Completed));
assert!(transitions.contains(&PhaseType::Failed));
}
#[tokio::test]
async fn test_job_state_helpers() {
let mut state = JobState {
id: "test".to_string(),
phase: PhaseType::Setup,
checkpoint: None,
processed_items: Default::default(),
failed_items: Vec::new(),
variables: Default::default(),
created_at: Utc::now(),
updated_at: Utc::now(),
config: MapReduceConfig::default(),
agent_results: Default::default(),
is_complete: false,
total_items: 10,
};
assert!(state.is_setup());
assert!(!state.is_map());
assert!(!state.is_terminal());
state.phase = PhaseType::Map;
assert!(state.is_map());
assert!(!state.is_terminal());
state.phase = PhaseType::Completed;
assert!(state.is_terminal());
assert!(!state.can_resume());
}
}