arcula 2.0.0

Arcula - MongoDB database synchronization tool
Documentation
use std::fs;
use std::path::PathBuf;

use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};

use crate::approvals::{self, ApprovalMode, ApprovalRecord};
use crate::config::{get_connection_policy, MongoConfig};
use crate::core::sync::{perform_sync, SyncReport};
use crate::plans::{self, PlanStatus, SyncPlanRecord};
use crate::storage;
use crate::utils::mongodb;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum OperationKind {
    Sync,
    Revert,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum OperationStatus {
    Running,
    Completed,
    Failed,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationStatusEvent {
    pub at: String,
    pub status: OperationStatus,
    pub message: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationRecord {
    pub version: u8,
    pub id: String,
    pub kind: OperationKind,
    pub plan_id: Option<String>,
    pub plan_hash: Option<String>,
    pub original_operation_id: Option<String>,
    pub created_at: String,
    pub updated_at: String,
    pub status: OperationStatus,
    pub status_history: Vec<OperationStatusEvent>,
    pub approval: Option<ApprovalRecord>,
    pub sync_report: Option<SyncReport>,
    pub error: Option<String>,
}

#[derive(Debug, Clone, Serialize)]
pub struct RevertPreview {
    pub operation_id: String,
    pub target_env: String,
    pub target_db: String,
    pub backup_path: String,
    pub target_policy_requires_human_approval: bool,
}

pub async fn run_plan(plan_id: &str, agent: bool) -> Result<OperationRecord> {
    let plan = plans::load_plan(plan_id)?;
    if matches!(plan.status, PlanStatus::Running | PlanStatus::Completed) {
        anyhow::bail!(
            "Plan '{}' is {:?} and cannot be run again. Create a new plan instead.",
            plan.id,
            plan.status
        );
    }
    let approval = ensure_plan_approval(&plan, agent)?;

    let mut operation = OperationRecord::new_sync(&plan, approval.clone());
    save_operation(&operation)?;
    plans::update_plan_status(&plan.id, PlanStatus::Running)?;

    match perform_sync(plan.config.clone()).await {
        Ok(report) => {
            operation.sync_report = Some(report);
            operation.push_status(OperationStatus::Completed, "sync completed");
            save_operation(&operation)?;
            plans::update_plan_status(&plan.id, PlanStatus::Completed)?;
            Ok(operation)
        }
        Err(err) => {
            operation.error = Some(format!("{err:#}"));
            operation.push_status(OperationStatus::Failed, "sync failed");
            save_operation(&operation)?;
            let _ = plans::update_plan_status(&plan.id, PlanStatus::Failed);
            Err(err)
        }
    }
}

pub fn preview_revert(operation_id: &str) -> Result<RevertPreview> {
    let operation = load_operation(operation_id)?;
    let report = operation
        .sync_report
        .as_ref()
        .context("Operation has no sync report and cannot be reverted")?;
    let backup_path = report
        .backup_path
        .clone()
        .context("Operation has no backup path and cannot be reverted")?;
    if !std::path::Path::new(&backup_path).exists() {
        anyhow::bail!("Backup path does not exist: {backup_path}");
    }
    let target_policy = get_connection_policy(&report.target_env);

    Ok(RevertPreview {
        operation_id: operation.id,
        target_env: report.target_env.name().to_string(),
        target_db: report.target_db.clone(),
        backup_path,
        target_policy_requires_human_approval: target_policy.human_approval_required,
    })
}

pub async fn revert_operation(operation_id: &str) -> Result<OperationRecord> {
    crate::config::check_mongodb_tools()
        .map_err(|err| anyhow::anyhow!("MongoDB tools not found: {err}"))?;

    let original = load_operation(operation_id)?;
    let report = original
        .sync_report
        .as_ref()
        .context("Operation has no sync report and cannot be reverted")?;
    let backup_path = report
        .backup_path
        .clone()
        .context("Operation has no backup path and cannot be reverted")?;
    if !std::path::Path::new(&backup_path).exists() {
        anyhow::bail!("Backup path does not exist: {backup_path}");
    }

    let target_policy = get_connection_policy(&report.target_env);
    let approval = if target_policy.human_approval_required {
        Some(approvals::create_human_presence_approval(
            &format!("revert:{}", original.id),
            &revert_hash(&original)?,
        )?)
    } else {
        Some(approvals::create_agent_policy_approval(
            &format!("revert:{}", original.id),
            &revert_hash(&original)?,
        )?)
    };

    let mut operation = OperationRecord::new_revert(&original, approval);
    save_operation(&operation)?;

    let target_config = MongoConfig::from_env(report.target_env.clone()).context(format!(
        "Failed to get configuration for {}",
        report.target_env
    ))?;
    match mongodb::restore_backup(
        &target_config,
        &report.target_db,
        std::path::Path::new(&backup_path),
    )
    .await
    {
        Ok(()) => {
            let mut revert_report = report.clone();
            revert_report.restored_from_backup = true;
            operation.sync_report = Some(revert_report);
            operation.push_status(OperationStatus::Completed, "revert completed");
            save_operation(&operation)?;
            Ok(operation)
        }
        Err(err) => {
            operation.error = Some(format!("{err:#}"));
            operation.push_status(OperationStatus::Failed, "revert failed");
            save_operation(&operation)?;
            Err(err)
        }
    }
}

pub fn load_operation(id: &str) -> Result<OperationRecord> {
    let path = operation_path(id);
    let contents = fs::read_to_string(&path)
        .with_context(|| format!("Failed to read operation {} from {}", id, path.display()))?;
    serde_json::from_str(&contents).with_context(|| format!("Failed to parse operation {id}"))
}

pub fn list_operations() -> Result<Vec<OperationRecord>> {
    let dir = storage::operations_dir();
    if !dir.exists() {
        return Ok(Vec::new());
    }

    let mut operations = Vec::new();
    for entry in fs::read_dir(&dir).with_context(|| format!("Failed to read {}", dir.display()))? {
        let entry = entry?;
        let path = entry.path().join("operation.json");
        if path.exists() {
            let contents = fs::read_to_string(&path)?;
            let operation: OperationRecord = serde_json::from_str(&contents)?;
            operations.push(operation);
        }
    }
    operations.sort_by(|a, b| b.created_at.cmp(&a.created_at));
    Ok(operations)
}

fn ensure_plan_approval(plan: &SyncPlanRecord, agent: bool) -> Result<Option<ApprovalRecord>> {
    if let Some(approval) = plans::load_approval(&plan.id)? {
        approvals::verify_approval(&approval, &plan.id, &plan.hash)?;
        if plan.requires_human_approval && approval.mode != ApprovalMode::HumanPresence {
            anyhow::bail!("Plan '{}' requires human OS approval", plan.id);
        }
        return Ok(Some(approval));
    }

    if plan.requires_human_approval {
        anyhow::bail!(
            "Plan '{}' requires human approval. Run: arcula plan approve {}",
            plan.id,
            plan.id
        );
    }

    if agent && !plan.target_policy.allow_agent_apply {
        anyhow::bail!(
            "Target policy does not allow agent execution for plan '{}'",
            plan.id
        );
    }

    let approval = approvals::create_agent_policy_approval(&plan.id, &plan.hash)?;
    plans::save_approval(&plan.id, &approval)?;
    plans::update_plan_status(&plan.id, PlanStatus::Approved)?;
    Ok(Some(approval))
}

fn save_operation(operation: &OperationRecord) -> Result<()> {
    storage::atomic_write_json(&operation_path(&operation.id), operation)
}

fn operation_dir(id: &str) -> PathBuf {
    storage::operations_dir().join(id)
}

fn operation_path(id: &str) -> PathBuf {
    operation_dir(id).join("operation.json")
}

fn revert_hash(operation: &OperationRecord) -> Result<String> {
    let bytes = serde_json::to_vec(operation)?;
    use sha2::{Digest, Sha256};
    let mut hasher = Sha256::new();
    hasher.update(bytes);
    Ok(to_hex(&hasher.finalize()))
}

fn generate_operation_id(prefix: &str) -> String {
    let timestamp = chrono::Utc::now().format("%Y%m%d%H%M%S");
    let suffix: u32 = rand::random();
    format!("{prefix}_{timestamp}_{suffix:08x}")
}

fn now() -> String {
    chrono::Utc::now().to_rfc3339()
}

impl OperationRecord {
    fn new_sync(plan: &SyncPlanRecord, approval: Option<ApprovalRecord>) -> Self {
        let created_at = now();
        let mut operation = Self {
            version: 1,
            id: generate_operation_id("op"),
            kind: OperationKind::Sync,
            plan_id: Some(plan.id.clone()),
            plan_hash: Some(plan.hash.clone()),
            original_operation_id: None,
            created_at: created_at.clone(),
            updated_at: created_at,
            status: OperationStatus::Running,
            status_history: Vec::new(),
            approval,
            sync_report: None,
            error: None,
        };
        operation.push_status(OperationStatus::Running, "sync started");
        operation
    }

    fn new_revert(original: &OperationRecord, approval: Option<ApprovalRecord>) -> Self {
        let created_at = now();
        let mut operation = Self {
            version: 1,
            id: generate_operation_id("revert"),
            kind: OperationKind::Revert,
            plan_id: original.plan_id.clone(),
            plan_hash: original.plan_hash.clone(),
            original_operation_id: Some(original.id.clone()),
            created_at: created_at.clone(),
            updated_at: created_at,
            status: OperationStatus::Running,
            status_history: Vec::new(),
            approval,
            sync_report: None,
            error: None,
        };
        operation.push_status(OperationStatus::Running, "revert started");
        operation
    }

    fn push_status(&mut self, status: OperationStatus, message: &str) {
        self.status = status.clone();
        self.updated_at = now();
        self.status_history.push(OperationStatusEvent {
            at: self.updated_at.clone(),
            status,
            message: message.to_string(),
        });
    }
}

fn to_hex(bytes: &[u8]) -> String {
    const HEX: &[u8; 16] = b"0123456789abcdef";
    let mut output = String::with_capacity(bytes.len() * 2);
    for byte in bytes {
        output.push(HEX[(byte >> 4) as usize] as char);
        output.push(HEX[(byte & 0x0f) as usize] as char);
    }
    output
}