use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::distributed::execution::ExecutionPlan;
use crate::distributed::partition::PartitionSet;
use crate::error::{Error, Result};
#[derive(Debug, Clone)]
pub struct ExecutionCheckpoint {
pub plan: ExecutionPlan,
pub partial_result: Option<PartitionSet>,
pub failed_partitions: Vec<usize>,
pub checkpoint_time: Instant,
}
impl ExecutionCheckpoint {
pub fn new(plan: ExecutionPlan) -> Self {
Self {
plan,
partial_result: None,
failed_partitions: Vec::new(),
checkpoint_time: Instant::now(),
}
}
pub fn with_partial_result(mut self, partial_result: PartitionSet) -> Self {
self.partial_result = Some(partial_result);
self
}
pub fn add_failed_partition(&mut self, partition_id: usize) {
self.failed_partitions.push(partition_id);
}
}
pub struct CheckpointManager {
checkpoints: Arc<Mutex<HashMap<String, ExecutionCheckpoint>>>,
}
impl CheckpointManager {
pub fn new() -> Self {
Self {
checkpoints: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn create_checkpoint(
&self,
execution_id: impl Into<String>,
plan: ExecutionPlan,
) -> Result<()> {
let checkpoint = ExecutionCheckpoint::new(plan);
match self.checkpoints.lock() {
Ok(mut checkpoints) => {
checkpoints.insert(execution_id.into(), checkpoint);
Ok(())
}
Err(_) => Err(Error::DistributedProcessing(
"Failed to create checkpoint".to_string(),
)),
}
}
pub fn get_checkpoint(&self, execution_id: &str) -> Result<Option<ExecutionCheckpoint>> {
match self.checkpoints.lock() {
Ok(checkpoints) => Ok(checkpoints.get(execution_id).cloned()),
Err(_) => Err(Error::DistributedProcessing(
"Failed to get checkpoint".to_string(),
)),
}
}
pub fn update_checkpoint(
&self,
execution_id: &str,
partial_result: Option<PartitionSet>,
failed_partitions: Option<Vec<usize>>,
) -> Result<()> {
match self.checkpoints.lock() {
Ok(mut checkpoints) => {
if let Some(checkpoint) = checkpoints.get_mut(execution_id) {
if let Some(result) = partial_result {
checkpoint.partial_result = Some(result);
}
if let Some(partitions) = failed_partitions {
checkpoint.failed_partitions = partitions;
}
Ok(())
} else {
Err(Error::DistributedProcessing(format!(
"Checkpoint not found for execution ID: {}",
execution_id
)))
}
}
Err(_) => Err(Error::DistributedProcessing(
"Failed to update checkpoint".to_string(),
)),
}
}
pub fn remove_checkpoint(&self, execution_id: &str) -> Result<()> {
match self.checkpoints.lock() {
Ok(mut checkpoints) => {
checkpoints.remove(execution_id);
Ok(())
}
Err(_) => Err(Error::DistributedProcessing(
"Failed to remove checkpoint".to_string(),
)),
}
}
}