axiomsync 1.0.1

Local retrieval runtime and CLI for AxiomSync.
Documentation
use std::collections::BTreeMap;

use crate::error::{AxiomError, Result};
use crate::models::{MemoryPromotionFact, MemoryPromotionResult};
use crate::uri::AxiomUri;

use super::Session;
use super::promotion::{
    plan_promotion_apply, restore_promotion_snapshots, validate_promotion_fact_semantics,
};
use super::read_path::list_existing_promotion_facts;
use super::write_path::{
    persist_promotion_candidate as persist_promotion_candidate_write_path,
    reindex_memory_uris as reindex_memory_uris_write_path,
};

type RecordDedupFallbackFn = fn(&Session, &str, &str);

pub(super) fn apply_promotion_all_or_nothing(
    session: &Session,
    checkpoint_id: &str,
    facts: &[MemoryPromotionFact],
    record_dedup_fallback: RecordDedupFallbackFn,
) -> Result<MemoryPromotionResult> {
    for fact in facts {
        validate_promotion_fact_semantics(fact)?;
    }

    let existing = list_existing_promotion_facts(session)?;
    let plan = plan_promotion_apply(&existing, facts);
    let mut snapshots = BTreeMap::<String, Option<String>>::new();
    let mut persisted_uris = Vec::<AxiomUri>::new();

    for candidate in &plan.candidates {
        let uri = match persist_promotion_candidate_write_path(
            session,
            candidate,
            Some(&mut snapshots),
        ) {
            Ok(uri) => uri,
            Err(err) => {
                restore_promotion_snapshots(session, &snapshots)?;
                return Err(err);
            }
        };
        if !persisted_uris.iter().any(|item| item == &uri) {
            persisted_uris.push(uri);
        }
    }

    if let Err(reindex_err) = reindex_memory_uris_write_path(session, &persisted_uris) {
        record_dedup_fallback(session, "promotion_reindex", &reindex_err.to_string());
        let rollback_err = restore_promotion_snapshots(session, &snapshots).err();
        let rollback_reindex_err = if rollback_err.is_none() {
            reindex_memory_uris_write_path(session, &persisted_uris).err()
        } else {
            None
        };
        let rollback_status = rollback_err
            .as_ref()
            .map_or_else(|| "ok".to_string(), |err| format!("err:{err}"));
        let rollback_reindex_status = rollback_reindex_err
            .as_ref()
            .map_or_else(|| "ok_or_skipped".to_string(), |err| format!("err:{err}"));
        return Err(AxiomError::Internal(format!(
            "promotion all_or_nothing reindex failed: {reindex_err}; rollback={rollback_status}; rollback_reindex={rollback_reindex_status}",
        )));
    }

    Ok(MemoryPromotionResult {
        session_id: session.session_id.clone(),
        checkpoint_id: checkpoint_id.to_string(),
        accepted: facts.len(),
        persisted: plan.candidates.len(),
        skipped_duplicates: plan.skipped_duplicates,
        rejected: 0,
    })
}

pub(super) fn apply_promotion_best_effort(
    session: &Session,
    checkpoint_id: &str,
    facts: &[MemoryPromotionFact],
    record_dedup_fallback: RecordDedupFallbackFn,
) -> Result<MemoryPromotionResult> {
    let mut rejected = 0usize;
    let mut valid = Vec::<MemoryPromotionFact>::new();
    for fact in facts {
        if validate_promotion_fact_semantics(fact).is_ok() {
            valid.push(fact.clone());
        } else {
            rejected = rejected.saturating_add(1);
        }
    }

    let existing = list_existing_promotion_facts(session)?;
    let plan = plan_promotion_apply(&existing, &valid);

    let mut persisted = 0usize;
    let mut persisted_uris = Vec::<AxiomUri>::new();
    let mut snapshots = BTreeMap::<String, Option<String>>::new();
    for candidate in &plan.candidates {
        match persist_promotion_candidate_write_path(session, candidate, Some(&mut snapshots)) {
            Ok(uri) => {
                if !persisted_uris.iter().any(|item| item == &uri) {
                    persisted_uris.push(uri);
                }
                persisted = persisted.saturating_add(1);
            }
            Err(_) => {
                rejected = rejected.saturating_add(1);
            }
        }
    }
    if let Err(reindex_err) = reindex_memory_uris_write_path(session, &persisted_uris) {
        record_dedup_fallback(session, "promotion_reindex", &reindex_err.to_string());
        let rollback_err = restore_promotion_snapshots(session, &snapshots).err();
        let rollback_reindex_err = if rollback_err.is_none() {
            reindex_memory_uris_write_path(session, &persisted_uris).err()
        } else {
            None
        };
        let rollback_status = rollback_err
            .as_ref()
            .map_or_else(|| "ok".to_string(), |err| format!("err:{err}"));
        let rollback_reindex_status = rollback_reindex_err
            .as_ref()
            .map_or_else(|| "ok_or_skipped".to_string(), |err| format!("err:{err}"));
        return Err(AxiomError::Internal(format!(
            "promotion best_effort reindex failed: {reindex_err}; rollback={rollback_status}; rollback_reindex={rollback_reindex_status}",
        )));
    }

    Ok(MemoryPromotionResult {
        session_id: session.session_id.clone(),
        checkpoint_id: checkpoint_id.to_string(),
        accepted: valid.len(),
        persisted,
        skipped_duplicates: plan.skipped_duplicates,
        rejected,
    })
}