ccd-cli 1.0.0-beta.2

Bootstrap and validate Continuous Context Development repositories
use std::fs;
use std::path::Path;

use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};

use crate::db;
use crate::memory::file::write_memory_file;
use crate::memory::governance::GovernanceDecisionView;
use crate::paths::state::StateLayout;
use crate::repo::marker as repo_marker;
use crate::state::{
    compiled as compiled_state, projection_metadata,
    protected_write::{self, AppendWriteAuthority, AppendWriteOutcome, ExclusiveWriteOptions},
    session,
};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct MemoryFileMutation {
    pub(crate) scope: String,
    pub(crate) path: String,
    pub(crate) next_contents: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct MemoryOpPlan {
    pub(crate) target: MemoryFileMutation,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub(crate) source: Option<MemoryFileMutation>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub(crate) refresh_locality_id: Option<String>,
    #[serde(default)]
    pub(crate) authored_entry_ids: Vec<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub(crate) governance: Option<GovernanceDecisionView>,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct StagedMemoryOpView {
    pub(crate) op_id: String,
    pub(crate) outcome: AppendWriteOutcome,
    pub(crate) native_state_path: String,
    pub(crate) target_scope: String,
    pub(crate) target_path: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub(crate) source_path: Option<String>,
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub(crate) authored_entry_ids: Vec<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub(crate) message: Option<String>,
}

pub(crate) fn queue_required_for_autonomous(layout: &StateLayout) -> Result<bool> {
    let Some(state) = session::load_for_layout(layout)? else {
        return Ok(false);
    };
    Ok(state.lifecycle() == session::SessionLifecycle::Autonomous)
}

pub(crate) fn fingerprint<T: Serialize>(value: &T) -> Result<String> {
    let json = serde_json::to_vec(value)?;
    Ok(hex_digest(&json))
}

pub(crate) fn stable_op_id(
    prefix: &str,
    request_fingerprint: &str,
    explicit_id: Option<&str>,
) -> String {
    explicit_id
        .map(str::to_owned)
        .unwrap_or_else(|| format!("memory_op_{prefix}_{}", &request_fingerprint[..16]))
}

pub(crate) fn load_replayed_snapshot(
    layout: &StateLayout,
    op_id: &str,
    request_fingerprint: &str,
) -> Result<Option<Value>> {
    if !layout.state_db_path().exists() {
        return Ok(None);
    }

    let db = db::StateDb::open(&layout.state_db_path())?;
    let Some(record) = db::memory_ops::get_by_id(db.conn(), op_id)? else {
        return Ok(None);
    };

    if record.request_fingerprint != request_fingerprint || !record.reconciled {
        return Ok(None);
    }

    let Some(snapshot_json) = record.snapshot_json.as_deref() else {
        return Ok(None);
    };

    serde_json::from_str(snapshot_json)
        .with_context(|| format!("failed to decode queued memory-op snapshot for `{op_id}`"))
        .map(Some)
}

pub(crate) fn stage_and_reconcile(
    repo_root: &Path,
    layout: &StateLayout,
    command: &str,
    op_id: &str,
    request_fingerprint: &str,
    write_options: &ExclusiveWriteOptions,
    plan: &MemoryOpPlan,
) -> Result<StagedMemoryOpView> {
    if let Some(conflict) = protected_write::authorize_append_surface_write(
        layout,
        "memory_op_queue",
        write_options,
        AppendWriteAuthority::OwnerOnly,
    )? {
        return Ok(StagedMemoryOpView {
            op_id: op_id.to_owned(),
            outcome: conflict.outcome,
            native_state_path: layout.state_db_path().display().to_string(),
            target_scope: plan.target.scope.clone(),
            target_path: plan.target.path.clone(),
            source_path: plan.source.as_ref().map(|source| source.path.clone()),
            authored_entry_ids: plan.authored_entry_ids.clone(),
            message: Some(conflict.message),
        });
    }

    let db = db::StateDb::open(&layout.state_db_path())?;
    if let Some(existing) = db::memory_ops::get_by_id(db.conn(), op_id)? {
        if existing.request_fingerprint != request_fingerprint {
            return staged_view_from_record(
                layout,
                &existing,
                AppendWriteOutcome::DuplicateIdConflict,
                Some(format!(
                    "memory-op id `{op_id}` is already bound to a different queued memory write"
                )),
            );
        }

        if existing.reconciled {
            return staged_view_from_record(
                layout,
                &existing,
                AppendWriteOutcome::IdempotentNoop,
                None,
            );
        }

        let stored_plan: MemoryOpPlan = serde_json::from_str(&existing.plan_json)
            .with_context(|| format!("failed to decode queued memory-op plan for `{op_id}`"))?;
        let outcome = apply_plan(repo_root, layout, &stored_plan)?;
        db::memory_ops::mark_reconciled(
            db.conn(),
            op_id,
            session::now_epoch_s()?,
            append_outcome_label(outcome),
            &serde_json::to_string(&stored_plan.authored_entry_ids)?,
            &stored_plan.target.path,
            stored_plan
                .source
                .as_ref()
                .map(|source| source.path.as_str()),
        )?;
        return Ok(StagedMemoryOpView {
            op_id: op_id.to_owned(),
            outcome,
            native_state_path: layout.state_db_path().display().to_string(),
            target_scope: stored_plan.target.scope,
            target_path: stored_plan.target.path,
            source_path: stored_plan.source.map(|source| source.path),
            authored_entry_ids: stored_plan.authored_entry_ids,
            message: None,
        });
    }

    let now = session::now_epoch_s()?;
    db::memory_ops::insert_staged(
        db.conn(),
        &db::memory_ops::MemoryOpRecord {
            id: op_id.to_owned(),
            command: command.to_owned(),
            request_fingerprint: request_fingerprint.to_owned(),
            plan_json: serde_json::to_string(plan)?,
            staged_at_epoch_s: now,
            updated_at_epoch_s: now,
            actor_id: write_options.actor_id.clone(),
            session_id: write_options.session_id.clone(),
            reconciled: false,
            outcome: None,
            authored_entry_ids_json: "[]".to_owned(),
            authored_target_path: None,
            authored_source_path: None,
            snapshot_json: None,
        },
    )?;

    let outcome = apply_plan(repo_root, layout, plan)?;
    db::memory_ops::mark_reconciled(
        db.conn(),
        op_id,
        session::now_epoch_s()?,
        append_outcome_label(outcome),
        &serde_json::to_string(&plan.authored_entry_ids)?,
        &plan.target.path,
        plan.source.as_ref().map(|source| source.path.as_str()),
    )?;

    Ok(StagedMemoryOpView {
        op_id: op_id.to_owned(),
        outcome,
        native_state_path: layout.state_db_path().display().to_string(),
        target_scope: plan.target.scope.clone(),
        target_path: plan.target.path.clone(),
        source_path: plan.source.as_ref().map(|source| source.path.clone()),
        authored_entry_ids: plan.authored_entry_ids.clone(),
        message: None,
    })
}

pub(crate) fn persist_report_snapshot<T: Serialize>(
    layout: &StateLayout,
    op_id: &str,
    report: &T,
) -> Result<()> {
    let db = db::StateDb::open(&layout.state_db_path())?;
    db::memory_ops::update_snapshot(
        db.conn(),
        op_id,
        session::now_epoch_s()?,
        &serde_json::to_string(report)?,
    )?;
    Ok(())
}

pub(crate) fn apply_direct_plan(
    repo_root: &Path,
    layout: &StateLayout,
    plan: &MemoryOpPlan,
) -> Result<AppendWriteOutcome> {
    apply_plan(repo_root, layout, plan)
}

fn apply_plan(
    repo_root: &Path,
    layout: &StateLayout,
    plan: &MemoryOpPlan,
) -> Result<AppendWriteOutcome> {
    let target_current = read_optional_file(Path::new(&plan.target.path))?;
    let source_current = plan
        .source
        .as_ref()
        .map(|source| read_optional_file(Path::new(&source.path)))
        .transpose()?;

    let target_matches = target_current == plan.target.next_contents;
    let source_matches = match (&plan.source, &source_current) {
        (Some(source), Some(current)) => current == &source.next_contents,
        (None, None) => true,
        _ => false,
    };

    if target_matches && source_matches {
        return Ok(AppendWriteOutcome::IdempotentNoop);
    }

    write_memory_file(Path::new(&plan.target.path), &plan.target.next_contents)?;
    if let Some(source) = &plan.source {
        write_memory_file(Path::new(&source.path), &source.next_contents)?;
    }
    refresh_compiled_state_after_write(layout, repo_root, plan.refresh_locality_id.as_deref())?;
    Ok(AppendWriteOutcome::Applied)
}

fn refresh_compiled_state_after_write(
    layout: &StateLayout,
    repo_root: &Path,
    locality_id: Option<&str>,
) -> Result<()> {
    if let Some(locality_id) = locality_id {
        let compiled = compiled_state::refresh_after_write(repo_root, layout, locality_id)?;
        if let Err(error) = projection_metadata::record_for_compiled_store(layout, &compiled) {
            projection_metadata::warn_record_error(layout, &error);
        }
    } else if let Some(marker) = repo_marker::load(repo_root)? {
        let compiled = compiled_state::refresh_after_write(repo_root, layout, &marker.locality_id)?;
        if let Err(error) = projection_metadata::record_for_compiled_store(layout, &compiled) {
            projection_metadata::warn_record_error(layout, &error);
        }
    }

    Ok(())
}

fn read_optional_file(path: &Path) -> Result<String> {
    match fs::read_to_string(path) {
        Ok(contents) => Ok(contents),
        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(String::new()),
        Err(error) => Err(error).with_context(|| format!("failed to read {}", path.display())),
    }
}

fn staged_view_from_record(
    layout: &StateLayout,
    record: &db::memory_ops::MemoryOpRecord,
    outcome: AppendWriteOutcome,
    message: Option<String>,
) -> Result<StagedMemoryOpView> {
    let plan: MemoryOpPlan = serde_json::from_str(&record.plan_json)
        .with_context(|| format!("failed to decode queued memory-op plan for `{}`", record.id))?;
    let authored_entry_ids = serde_json::from_str(&record.authored_entry_ids_json)
        .with_context(|| format!("failed to decode authored_entry_ids for `{}`", record.id))?;

    Ok(StagedMemoryOpView {
        op_id: record.id.clone(),
        outcome,
        native_state_path: layout.state_db_path().display().to_string(),
        target_scope: plan.target.scope,
        target_path: record
            .authored_target_path
            .clone()
            .unwrap_or(plan.target.path),
        source_path: record
            .authored_source_path
            .clone()
            .or_else(|| plan.source.map(|source| source.path)),
        authored_entry_ids,
        message,
    })
}

fn append_outcome_label(outcome: AppendWriteOutcome) -> &'static str {
    match outcome {
        AppendWriteOutcome::Applied => "applied",
        AppendWriteOutcome::IdempotentNoop => "idempotent_noop",
        AppendWriteOutcome::OwnershipConflict => "ownership_conflict",
        AppendWriteOutcome::StaleSession => "stale_session",
        AppendWriteOutcome::UnsupportedMultiwriter => "unsupported_multiwriter",
        AppendWriteOutcome::DuplicateIdConflict => "duplicate_id_conflict",
    }
}

fn hex_digest(bytes: &[u8]) -> String {
    let mut hasher = Sha256::new();
    hasher.update(bytes);
    format!("{:x}", hasher.finalize())
}