nornir 0.1.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Per-repo docs warehouse: append-only history of exported documents.
//!
//! Layout under `<repo>/.nornir/warehouse/docs/`:
//!
//! ```text
//! index.db                                # SQLite catalog (one per repo)
//! latest/<doc>-<version>.<ext>            # rolling "latest" pointer (overwritten)
//! archive/<rfc3339>-<doc>-<version>.<ext> # immutable historical copies
//! ```
//!
//! Every successful `nornir docs export` lands one row in `docs_exports`
//! plus one immutable file under `archive/`, and overwrites the matching
//! `latest/` entry. Duplicate writes (same sha256 as the last archive
//! entry for the same `(doc, version, format)` triple) skip the new
//! archive copy but still update `latest/`.
//!
//! The catalog is intentionally tiny (one SQLite file per repo) so users
//! can `sqlite3 .nornir/warehouse/docs/index.db` themselves. Mirrors the
//! Iceberg-style partitioning used by [`crate::warehouse::local`] without
//! requiring the full Arrow/Parquet stack.
//!
//! No timestamps come from outside; we always stamp with [`chrono::Utc`].

use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Mutex;

use anyhow::{Context, Result};
use chrono::Utc;
use rusqlite::{params, Connection};
use sha2::{Digest, Sha256};

use super::layout::RepoLayout;

/// One row in the per-repo docs warehouse catalog.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DocExport {
    pub id: i64,
    pub doc_name: String,
    pub version: String,
    pub format: String,
    pub bytes_len: i64,
    pub sha256: String,
    pub archive_path: String,
    pub generated_at: String,
}

/// Optional filter for [`DocsWarehouse::list`].
#[derive(Debug, Default, Clone)]
pub struct ExportFilter {
    pub doc_name: Option<String>,
    pub version: Option<String>,
    pub format: Option<String>,
    pub limit: Option<usize>,
}

/// Per-repo docs warehouse rooted at `<repo>/.nornir/warehouse/docs/`.
pub struct DocsWarehouse {
    root: PathBuf,
    catalog: Mutex<Connection>,
}

impl DocsWarehouse {
    /// Open (or initialise) the warehouse for the given repo layout.
    pub fn open(layout: &RepoLayout) -> Result<Self> {
        Self::open_at(layout.warehouse_dir().join("docs"))
    }

    /// Open (or initialise) a warehouse at an arbitrary root directory.
    /// Exposed for tests; production callers should use [`Self::open`].
    pub fn open_at(root: impl Into<PathBuf>) -> Result<Self> {
        let root = root.into();
        fs::create_dir_all(&root)
            .with_context(|| format!("create docs warehouse {}", root.display()))?;
        fs::create_dir_all(root.join("archive"))?;
        fs::create_dir_all(root.join("latest"))?;
        let db_path = root.join("index.db");
        let conn = Connection::open(&db_path)
            .with_context(|| format!("open {}", db_path.display()))?;
        conn.execute_batch(
            "CREATE TABLE IF NOT EXISTS docs_exports (
                id           INTEGER PRIMARY KEY AUTOINCREMENT,
                doc_name     TEXT NOT NULL,
                version      TEXT NOT NULL,
                format       TEXT NOT NULL,
                bytes_len    INTEGER NOT NULL,
                sha256       TEXT NOT NULL,
                archive_path TEXT NOT NULL UNIQUE,
                generated_at TEXT NOT NULL
             );
             CREATE INDEX IF NOT EXISTS idx_docs_exports_doc
                 ON docs_exports(doc_name, version, format);
             CREATE INDEX IF NOT EXISTS idx_docs_exports_time
                 ON docs_exports(generated_at);",
        )?;
        Ok(Self { root, catalog: Mutex::new(conn) })
    }

    /// Root directory of the warehouse (`.../warehouse/docs/`).
    pub fn root(&self) -> &Path {
        &self.root
    }

    /// Record one export. Always overwrites `latest/<doc>-<version>.<ext>`.
    /// Archives the bytes immutably under `archive/` unless the sha256
    /// matches the most recent archive entry for the same triple
    /// `(doc_name, version, format)`, in which case the archive write is
    /// skipped and the existing entry is returned.
    pub fn record(
        &self,
        doc_name: &str,
        version: &str,
        format: &str,
        bytes: &[u8],
    ) -> Result<DocExport> {
        let sha256 = sha256_hex(bytes);
        let now = Utc::now();
        let generated_at = now.to_rfc3339();

        // Always refresh `latest/`.
        let latest_name = format!("{doc_name}-{version}.{format}");
        let latest_path = self.root.join("latest").join(&latest_name);
        atomic_write(&latest_path, bytes)
            .with_context(|| format!("write {}", latest_path.display()))?;

        // Dedup: skip archive if last entry for this triple has same hash.
        if let Some(prev) = self.last_for(doc_name, version, format)? {
            if prev.sha256 == sha256 {
                return Ok(prev);
            }
        }

        // Archive under a sortable, conflict-resistant name.
        let stamp = now.format("%Y%m%dT%H%M%S%.3fZ").to_string();
        let archive_name = format!("{stamp}-{doc_name}-{version}.{format}");
        let archive_path = self.root.join("archive").join(&archive_name);
        atomic_write(&archive_path, bytes)
            .with_context(|| format!("write {}", archive_path.display()))?;

        let rel = archive_path
            .strip_prefix(&self.root)
            .unwrap_or(&archive_path)
            .to_string_lossy()
            .to_string();

        let id = {
            let conn = self.catalog.lock().unwrap();
            conn.execute(
                "INSERT INTO docs_exports
                   (doc_name, version, format, bytes_len, sha256, archive_path, generated_at)
                 VALUES (?, ?, ?, ?, ?, ?, ?)",
                params![
                    doc_name,
                    version,
                    format,
                    bytes.len() as i64,
                    sha256,
                    rel,
                    generated_at,
                ],
            )?;
            conn.last_insert_rowid()
        };

        Ok(DocExport {
            id,
            doc_name: doc_name.into(),
            version: version.into(),
            format: format.into(),
            bytes_len: bytes.len() as i64,
            sha256,
            archive_path: rel,
            generated_at,
        })
    }

    /// List historical exports, newest first. Apply optional filters.
    pub fn list(&self, filter: &ExportFilter) -> Result<Vec<DocExport>> {
        let conn = self.catalog.lock().unwrap();
        let mut q = String::from(
            "SELECT id, doc_name, version, format, bytes_len, sha256, archive_path, generated_at
             FROM docs_exports WHERE 1=1",
        );
        let mut args: Vec<String> = Vec::new();
        if let Some(v) = &filter.doc_name { q.push_str(" AND doc_name = ?"); args.push(v.clone()); }
        if let Some(v) = &filter.version  { q.push_str(" AND version  = ?"); args.push(v.clone()); }
        if let Some(v) = &filter.format   { q.push_str(" AND format   = ?"); args.push(v.clone()); }
        q.push_str(" ORDER BY id DESC");
        if let Some(n) = filter.limit { q.push_str(&format!(" LIMIT {n}")); }
        let mut stmt = conn.prepare(&q)?;
        let params_dyn: Vec<&dyn rusqlite::ToSql> =
            args.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
        let rows = stmt.query_map(params_dyn.as_slice(), |r| {
            Ok(DocExport {
                id: r.get(0)?,
                doc_name: r.get(1)?,
                version: r.get(2)?,
                format: r.get(3)?,
                bytes_len: r.get(4)?,
                sha256: r.get(5)?,
                archive_path: r.get(6)?,
                generated_at: r.get(7)?,
            })
        })?;
        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
    }

    fn last_for(&self, doc_name: &str, version: &str, format: &str) -> Result<Option<DocExport>> {
        let mut got = self.list(&ExportFilter {
            doc_name: Some(doc_name.into()),
            version: Some(version.into()),
            format: Some(format.into()),
            limit: Some(1),
        })?;
        Ok(got.pop())
    }
}

fn sha256_hex(bytes: &[u8]) -> String {
    let mut h = Sha256::new();
    h.update(bytes);
    let out = h.finalize();
    let mut s = String::with_capacity(out.len() * 2);
    for b in out { use std::fmt::Write; let _ = write!(s, "{b:02x}"); }
    s
}

fn atomic_write(path: &Path, bytes: &[u8]) -> Result<()> {
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }
    let tmp = path.with_extension("tmp-write");
    fs::write(&tmp, bytes)?;
    fs::rename(&tmp, path)?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn record_creates_archive_and_latest() {
        let dir = tempfile::tempdir().unwrap();
        let wh = DocsWarehouse::open_at(dir.path()).unwrap();
        let rec = wh.record("README", "0.1.0", "pdf", b"hello").unwrap();
        assert_eq!(rec.bytes_len, 5);
        assert_eq!(rec.format, "pdf");
        assert!(dir.path().join("latest/README-0.1.0.pdf").exists());
        assert!(dir.path().join(&rec.archive_path).exists());
        let latest = fs::read(dir.path().join("latest/README-0.1.0.pdf")).unwrap();
        assert_eq!(latest, b"hello");
    }

    #[test]
    fn dedup_skips_identical_content() {
        let dir = tempfile::tempdir().unwrap();
        let wh = DocsWarehouse::open_at(dir.path()).unwrap();
        let a = wh.record("README", "0.1.0", "pdf", b"same").unwrap();
        let b = wh.record("README", "0.1.0", "pdf", b"same").unwrap();
        assert_eq!(a.id, b.id, "second identical record should reuse the first row");
        let all = wh.list(&ExportFilter::default()).unwrap();
        assert_eq!(all.len(), 1);
    }

    #[test]
    fn different_content_makes_new_archive() {
        let dir = tempfile::tempdir().unwrap();
        let wh = DocsWarehouse::open_at(dir.path()).unwrap();
        wh.record("README", "0.1.0", "pdf", b"v1").unwrap();
        std::thread::sleep(std::time::Duration::from_millis(5));
        wh.record("README", "0.1.0", "pdf", b"v2").unwrap();
        let all = wh.list(&ExportFilter::default()).unwrap();
        assert_eq!(all.len(), 2, "different bytes => new archive row");
        // Newest first.
        assert_eq!(all[0].bytes_len, 2);
        assert_eq!(all[1].bytes_len, 2);
        let latest = fs::read(dir.path().join("latest/README-0.1.0.pdf")).unwrap();
        assert_eq!(latest, b"v2");
    }

    #[test]
    fn filter_by_format_and_limit() {
        let dir = tempfile::tempdir().unwrap();
        let wh = DocsWarehouse::open_at(dir.path()).unwrap();
        wh.record("README", "0.1.0", "pdf", b"p1").unwrap();
        wh.record("README", "0.1.0", "html", b"h1").unwrap();
        wh.record("README", "0.2.0", "pdf", b"p2").unwrap();
        let pdfs = wh.list(&ExportFilter {
            format: Some("pdf".into()), ..Default::default()
        }).unwrap();
        assert_eq!(pdfs.len(), 2);
        let one = wh.list(&ExportFilter { limit: Some(1), ..Default::default() }).unwrap();
        assert_eq!(one.len(), 1);
    }
}