governor-core 2.0.3

Core domain and application logic for cargo-governor
Documentation
//! Checkpoint store trait for workflow resumption

use async_trait::async_trait;

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;

use crate::domain::version::SemanticVersion;

/// Error type for checkpoint operations
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
    /// Checkpoint not found
    #[error("Checkpoint not found: {0}")]
    NotFound(String),

    /// Failed to save checkpoint
    #[error("Failed to save checkpoint: {0}")]
    SaveFailed(String),

    /// Failed to load checkpoint
    #[error("Failed to load checkpoint: {0}")]
    LoadFailed(String),

    /// IO error
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),

    /// Serialization error
    #[error("Serialization error: {0}")]
    SerializationError(String),
}

/// Trait for checkpoint storage
#[async_trait]
pub trait CheckpointStore: Send + Sync {
    /// Save a checkpoint
    async fn save(&self, checkpoint: &Checkpoint) -> Result<(), StoreError>;

    /// Load the most recent checkpoint
    async fn load(&self) -> Result<Option<Checkpoint>, StoreError>;

    /// Load a specific checkpoint by ID
    async fn load_by_id(&self, id: &str) -> Result<Option<Checkpoint>, StoreError>;

    /// List all checkpoints
    async fn list(&self) -> Result<Vec<CheckpointInfo>, StoreError>;

    /// Delete a checkpoint
    async fn delete(&self, id: &str) -> Result<(), StoreError>;

    /// Clear all checkpoints
    async fn clear(&self) -> Result<(), StoreError>;

    /// Get the store name
    fn name(&self) -> &str;
}

/// A checkpoint for workflow resumption
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Checkpoint {
    /// Unique checkpoint ID
    pub id: String,
    /// Workflow ID
    pub workflow_id: String,
    /// Current step index
    pub step_index: usize,
    /// Completed steps
    pub completed_steps: Vec<String>,
    /// Workflow state
    pub state: serde_json::Value,
    /// Target version
    pub target_version: Option<SemanticVersion>,
    /// Timestamp
    pub timestamp: chrono::DateTime<chrono::Utc>,
    /// Whether the workflow was interrupted by an error
    pub interrupted_by_error: bool,
    /// Error message if interrupted
    pub error_message: Option<String>,
}

impl Checkpoint {
    /// Create a new checkpoint
    #[must_use]
    pub fn new(
        workflow_id: String,
        step_index: usize,
        completed_steps: Vec<String>,
        state: serde_json::Value,
    ) -> Self {
        Self {
            id: format!("chk_{}", Uuid::new_v4().simple()),
            workflow_id,
            step_index,
            completed_steps,
            state,
            target_version: None,
            timestamp: chrono::Utc::now(),
            interrupted_by_error: false,
            error_message: None,
        }
    }

    /// Get the checkpoint ID
    #[must_use]
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Check if this checkpoint can be resumed
    #[must_use]
    pub const fn can_resume(&self) -> bool {
        !self.interrupted_by_error || self.completed_steps.is_empty()
    }

    /// Mark as interrupted by error
    #[must_use]
    pub fn with_error(mut self, message: String) -> Self {
        self.interrupted_by_error = true;
        self.error_message = Some(message);
        self
    }

    /// Set target version
    #[must_use]
    pub fn with_version(mut self, version: SemanticVersion) -> Self {
        self.target_version = Some(version);
        self
    }
}

/// Information about a checkpoint (without full state)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointInfo {
    /// Checkpoint ID
    pub id: String,
    /// Workflow ID
    pub workflow_id: String,
    /// Current step index
    pub step_index: usize,
    /// Number of completed steps
    pub completed_count: usize,
    /// Timestamp
    pub timestamp: chrono::DateTime<chrono::Utc>,
    /// Whether interrupted by error
    pub has_error: bool,
}

impl From<Checkpoint> for CheckpointInfo {
    fn from(checkpoint: Checkpoint) -> Self {
        Self {
            id: checkpoint.id,
            workflow_id: checkpoint.workflow_id,
            step_index: checkpoint.step_index,
            completed_count: checkpoint.completed_steps.len(),
            timestamp: checkpoint.timestamp,
            has_error: checkpoint.interrupted_by_error,
        }
    }
}

/// Workflow state stored in checkpoints
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowState {
    /// Workspace crates and their versions
    pub crate_versions: HashMap<String, SemanticVersion>,
    /// Published crates
    pub published_crates: Vec<String>,
    /// Crates that failed to publish
    pub failed_crates: Vec<String>,
    /// Git commit hash
    pub commit_hash: Option<String>,
    /// Git tag
    pub tag: Option<String>,
    /// Additional data
    pub extra_data: HashMap<String, serde_json::Value>,
}

impl WorkflowState {
    /// Create a new workflow state
    #[must_use]
    pub fn new() -> Self {
        Self {
            crate_versions: HashMap::new(),
            published_crates: Vec::new(),
            failed_crates: Vec::new(),
            commit_hash: None,
            tag: None,
            extra_data: HashMap::new(),
        }
    }

    /// Add a crate version
    pub fn add_crate_version(&mut self, name: String, version: SemanticVersion) {
        self.crate_versions.insert(name, version);
    }

    /// Mark a crate as published
    pub fn mark_published(&mut self, name: String) {
        self.published_crates.push(name);
    }

    /// Mark a crate as failed
    pub fn mark_failed(&mut self, name: String) {
        self.failed_crates.push(name);
    }

    /// Set commit hash
    #[must_use]
    pub fn with_commit_hash(mut self, hash: String) -> Self {
        self.commit_hash = Some(hash);
        self
    }

    /// Set tag
    #[must_use]
    pub fn with_tag(mut self, tag: String) -> Self {
        self.tag = Some(tag);
        self
    }

    /// Add extra data
    #[must_use]
    pub fn with_extra(mut self, key: String, value: serde_json::Value) -> Self {
        self.extra_data.insert(key, value);
        self
    }
}

impl Default for WorkflowState {
    fn default() -> Self {
        Self::new()
    }
}

/// In-memory checkpoint store (useful for testing)
#[derive(Debug, Clone, Default)]
pub struct MemoryCheckpointStore {
    checkpoints: std::sync::Arc<std::sync::Mutex<Vec<Checkpoint>>>,
}

#[async_trait]
impl CheckpointStore for MemoryCheckpointStore {
    async fn save(&self, checkpoint: &Checkpoint) -> Result<(), StoreError> {
        self.checkpoints
            .lock()
            .map_err(|e| StoreError::SaveFailed(format!("Failed to acquire lock: {e}")))?
            .push(checkpoint.clone());
        Ok(())
    }

    async fn load(&self) -> Result<Option<Checkpoint>, StoreError> {
        let checkpoints = self
            .checkpoints
            .lock()
            .map_err(|e| StoreError::LoadFailed(format!("Failed to acquire lock: {e}")))?;
        Ok(checkpoints.last().cloned())
    }

    async fn load_by_id(&self, id: &str) -> Result<Option<Checkpoint>, StoreError> {
        let checkpoints = self
            .checkpoints
            .lock()
            .map_err(|e| StoreError::LoadFailed(format!("Failed to acquire lock: {e}")))?;
        Ok(checkpoints.iter().find(|c| c.id == id).cloned())
    }

    async fn list(&self) -> Result<Vec<CheckpointInfo>, StoreError> {
        let checkpoints = self
            .checkpoints
            .lock()
            .map_err(|e| StoreError::LoadFailed(format!("Failed to acquire lock: {e}")))?;
        Ok(checkpoints
            .iter()
            .cloned()
            .map(CheckpointInfo::from)
            .collect())
    }

    async fn delete(&self, id: &str) -> Result<(), StoreError> {
        self.checkpoints
            .lock()
            .map_err(|e| StoreError::SaveFailed(format!("Failed to acquire lock: {e}")))?
            .retain(|c| c.id != id);
        Ok(())
    }

    async fn clear(&self) -> Result<(), StoreError> {
        self.checkpoints
            .lock()
            .map_err(|e| StoreError::SaveFailed(format!("Failed to acquire lock: {e}")))?
            .clear();
        Ok(())
    }

    fn name(&self) -> &'static str {
        "memory"
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_memory_checkpoint_store() {
        let store = MemoryCheckpointStore::default();
        let checkpoint = Checkpoint::new(
            "test-workflow".to_string(),
            0,
            vec![],
            serde_json::Value::Object(serde_json::Map::default()),
        );

        store.save(&checkpoint).await.unwrap();

        let loaded = store.load().await.unwrap();
        assert!(loaded.is_some());
        assert_eq!(loaded.unwrap().workflow_id, "test-workflow");
    }

    #[tokio::test]
    async fn test_checkpoint_list() {
        let store = MemoryCheckpointStore::default();
        let checkpoint1 = Checkpoint::new(
            "workflow-1".to_string(),
            0,
            vec![],
            serde_json::Value::Object(serde_json::Map::default()),
        );
        let checkpoint2 = Checkpoint::new(
            "workflow-2".to_string(),
            1,
            vec!["step1".to_string()],
            serde_json::Value::Object(serde_json::Map::default()),
        );

        store.save(&checkpoint1).await.unwrap();
        store.save(&checkpoint2).await.unwrap();

        let list = store.list().await.unwrap();
        assert_eq!(list.len(), 2);
    }

    #[tokio::test]
    async fn test_workflow_state() {
        let mut state = WorkflowState::new();
        state.add_crate_version(
            "crate1".to_string(),
            SemanticVersion::parse("1.0.0").unwrap(),
        );
        state.mark_published("crate1".to_string());

        assert_eq!(state.crate_versions.len(), 1);
        assert_eq!(state.published_crates.len(), 1);
        assert_eq!(state.failed_crates.len(), 0);
    }
}