sdivi-pipeline 0.2.23

Orchestration pipeline for the Structural Divergence Indexer — owns FS, clock, and atomic writes
Documentation
//! Snapshot and boundary persistence helpers — atomic write, retention, and read utilities.
//!
//! Re-exports the write helpers from `sdivi-snapshot`; the read utilities
//! (listing snapshots, loading by id, loading the latest) live here.
//! Boundary YAML write (atomic tempfile + rename) also lives here.

use std::io::Write as _;
use std::path::Path;

pub use sdivi_snapshot::retention::enforce_retention;
pub use sdivi_snapshot::store::{iso_to_filename_safe, write_snapshot};

use sdivi_snapshot::Snapshot;

/// Returns sorted `snapshot_*.json` entries from `dir`.
fn list_snapshot_entries(dir: &Path) -> std::io::Result<Vec<std::fs::DirEntry>> {
    let mut entries: Vec<_> = std::fs::read_dir(dir)?
        .filter_map(|e| e.ok())
        .filter(|e| {
            let name = e.file_name();
            let s = name.to_string_lossy();
            s.starts_with("snapshot_") && s.ends_with(".json")
        })
        .collect();
    entries.sort_by_key(|e| e.file_name());
    Ok(entries)
}

/// Reads all snapshots from `dir` in chronological order (oldest→newest).
///
/// Files are sorted lexicographically by filename. The `snapshot_*` naming
/// scheme (`snapshot_<YYYYMMDDTHHMMSS>_<hash>.json`) ensures this matches
/// chronological order. Non-matching files and malformed JSON are skipped
/// with a warning logged to stderr.
///
/// Returns an empty `Vec` if the directory does not exist.
pub fn read_snapshots(dir: &Path) -> std::io::Result<Vec<Snapshot>> {
    if !dir.exists() {
        return Ok(vec![]);
    }
    let mut snapshots = Vec::new();
    for entry in list_snapshot_entries(dir)? {
        let content = std::fs::read_to_string(entry.path())?;
        match serde_json::from_str::<Snapshot>(&content) {
            Ok(s) => snapshots.push(s),
            Err(e) => {
                tracing::warn!(
                    path = %entry.path().display(),
                    error = %e,
                    "skipping malformed snapshot"
                );
            }
        }
    }
    Ok(snapshots)
}

/// Returns the most recent snapshot from `dir`, or `None` if none exist.
///
/// "Most recent" is the lexicographically last `snapshot_*.json` file,
/// which matches chronological order under the M07 naming scheme.
pub fn latest_snapshot(dir: &Path) -> std::io::Result<Option<Snapshot>> {
    if !dir.exists() {
        return Ok(None);
    }
    let entries = list_snapshot_entries(dir)?;

    match entries.last() {
        None => Ok(None),
        Some(entry) => {
            let content = std::fs::read_to_string(entry.path())?;
            let snap = serde_json::from_str(&content)
                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
            Ok(Some(snap))
        }
    }
}

/// Returns the snapshot identified by `id` from `dir`.
///
/// `id` is matched as the filename stem (without `.json` extension).
/// For example, `"snapshot_20260429T123456_abc12345"` matches the file
/// `snapshot_20260429T123456_abc12345.json` in `dir`.
pub fn read_snapshot_by_id(dir: &Path, id: &str) -> std::io::Result<Snapshot> {
    let filename = if id.ends_with(".json") {
        id.to_string()
    } else {
        format!("{id}.json")
    };
    let path = dir.join(&filename);
    let content = std::fs::read_to_string(&path)
        .map_err(|e| std::io::Error::new(e.kind(), format!("snapshot '{}' not found: {e}", id)))?;
    serde_json::from_str(&content)
        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}

/// Writes `spec` atomically to `path` (tempfile in parent dir + rename).
///
/// If `path` already exists and contains YAML comments (`#`), a warning is
/// printed to stderr — comment loss on ratify is documented behaviour (KDD-6).
///
/// Creates parent directories if they do not exist.
pub fn write_boundary_spec(spec: &sdivi_config::BoundarySpec, path: &Path) -> std::io::Result<()> {
    if path.exists() {
        let existing = std::fs::read_to_string(path)?;
        if existing.lines().any(|l| {
            let trimmed = l.trim_start();
            trimmed.starts_with('#') || trimmed.contains(" #")
        }) {
            eprintln!(
                "sdivi: warning: '{}' contains YAML comments — comments will be lost \
                 after ratify (see docs/migrating-from-the-python-poc.md)",
                path.display()
            );
        }
    }

    let yaml = spec.to_yaml();
    let parent = path.parent().unwrap_or(Path::new("."));
    std::fs::create_dir_all(parent)?;

    let mut tmp = tempfile::NamedTempFile::new_in(parent)?;
    tmp.write_all(yaml.as_bytes())?;
    tmp.persist(path).map_err(|e| e.error)?;

    tracing::debug!(path = %path.display(), "boundary spec written");
    Ok(())
}