nornir 0.4.1

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Doc-export history, historized in the Iceberg warehouse.
//!
//! Every successful `nornir docs export` / `docs book` lands one row in the
//! `doc_exports` Iceberg table (`src/warehouse/iceberg_schema.rs`), carrying the
//! full rendered bytes so any past version is recoverable by a warehouse read.
//! The *current* artifact also lands as a plain file under `<gitroot>/docs/`
//! (see [`super::layout`]) — that directory is the live, link-target copy; the
//! warehouse is the time-travelable history.
//!
//! Writes are deduplicated on `(repo, doc_name, version, format, sha256)`: a
//! re-export of byte-identical content returns the existing row instead of
//! appending a duplicate. The dedup is best-effort (a read-then-append, not an
//! atomic upsert), so two *concurrent* identical exports could each append a
//! row; readers tolerate that (same bytes, newest-first). Timestamps are always
//! stamped with [`chrono::Utc`].

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;

use crate::warehouse::iceberg::{append_batch, IcebergWarehouse, TABLE_DOC_EXPORTS};

/// One historized document export (metadata only; the bytes stay in the
/// warehouse and are streamed back via restore, not via this row).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DocExport {
    pub export_id: String,
    pub workspace: String,
    pub repo: String,
    pub doc_name: String,
    pub version: String,
    pub format: String,
    pub git_sha: String,
    pub sha256: String,
    pub byte_len: i64,
    /// RFC3339 UTC timestamp.
    pub generated_at: String,
}

/// Optional filter for [`list_doc_exports`].
#[derive(Debug, Default, Clone)]
pub struct ExportFilter {
    pub doc_name: Option<String>,
    pub version: Option<String>,
    pub format: Option<String>,
    pub limit: Option<usize>,
}

fn sha256_hex(bytes: &[u8]) -> String {
    let mut h = Sha256::new();
    h.update(bytes);
    let out = h.finalize();
    let mut s = String::with_capacity(out.len() * 2);
    for b in out {
        use std::fmt::Write;
        let _ = write!(s, "{b:02x}");
    }
    s
}

/// Downcast a column **by name** (projection-safe).
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
    batch
        .column_by_name(name)
        .ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}

fn rfc3339(ts_micros: i64) -> String {
    DateTime::<Utc>::from_timestamp_micros(ts_micros)
        .unwrap_or_else(Utc::now)
        .to_rfc3339()
}

/// Record one export. Deduplicated by `(repo, doc_name, version, format,
/// sha256)`: byte-identical re-exports return the existing row.
pub async fn record_doc_export_async(
    wh: &IcebergWarehouse,
    workspace: &str,
    repo: &str,
    doc_name: &str,
    version: &str,
    format: &str,
    git_sha: &str,
    bytes: &[u8],
) -> Result<DocExport> {
    let sha256 = sha256_hex(bytes);
    if let Some(existing) =
        find_existing(wh, repo, doc_name, version, format, &sha256).await?
    {
        return Ok(existing);
    }

    let export_id = Uuid::new_v4().to_string();
    let ts = Utc::now();
    let byte_len = bytes.len() as i64;

    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
        .await?;
    let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(vec![export_id.clone()])),
        Arc::new(StringArray::from(vec![workspace.to_string()])),
        Arc::new(StringArray::from(vec![repo.to_string()])),
        Arc::new(StringArray::from(vec![doc_name.to_string()])),
        Arc::new(StringArray::from(vec![version.to_string()])),
        Arc::new(StringArray::from(vec![format.to_string()])),
        Arc::new(StringArray::from(vec![git_sha.to_string()])),
        Arc::new(
            TimestampMicrosecondArray::from(vec![ts.timestamp_micros()])
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(vec![sha256.clone()])),
        Arc::new(Int64Array::from(vec![byte_len])),
        Arc::new(LargeBinaryArray::from(vec![bytes])),
    ];
    let batch = RecordBatch::try_new(schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;

    Ok(DocExport {
        export_id,
        workspace: workspace.to_string(),
        repo: repo.to_string(),
        doc_name: doc_name.to_string(),
        version: version.to_string(),
        format: format.to_string(),
        git_sha: git_sha.to_string(),
        sha256,
        byte_len,
        generated_at: ts.to_rfc3339(),
    })
}

/// Sync wrapper over [`record_doc_export_async`] for the CLI.
pub fn record_doc_export(
    wh: &IcebergWarehouse,
    workspace: &str,
    repo: &str,
    doc_name: &str,
    version: &str,
    format: &str,
    git_sha: &str,
    bytes: &[u8],
) -> Result<DocExport> {
    wh.block_on(record_doc_export_async(
        wh, workspace, repo, doc_name, version, format, git_sha, bytes,
    ))
}

async fn find_existing(
    wh: &IcebergWarehouse,
    repo: &str,
    doc_name: &str,
    version: &str,
    format: &str,
    sha256: &str,
) -> Result<Option<DocExport>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
        .await?;
    let predicate = Reference::new("repo")
        .equal_to(Datum::string(repo))
        .and(Reference::new("doc_name").equal_to(Datum::string(doc_name)))
        .and(Reference::new("version").equal_to(Datum::string(version)))
        .and(Reference::new("format").equal_to(Datum::string(format)))
        .and(Reference::new("sha256").equal_to(Datum::string(sha256)));
    let scan = table
        .scan()
        .with_filter(predicate)
        .select([
            "export_id", "workspace", "repo", "doc_name", "version", "format", "git_sha",
            "ts_micros", "sha256", "byte_len",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    for batch in &batches {
        let rows = rows_from_batch(batch)?;
        for r in rows {
            // Residual check: pushdown prunes at file granularity, not per row.
            if r.repo == repo
                && r.doc_name == doc_name
                && r.version == version
                && r.format == format
                && r.sha256 == sha256
            {
                return Ok(Some(r));
            }
        }
    }
    Ok(None)
}

/// List historized exports for `repo`, newest first, honoring `filter`.
pub async fn list_doc_exports_async(
    wh: &IcebergWarehouse,
    repo: &str,
    filter: &ExportFilter,
) -> Result<Vec<DocExport>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_DOC_EXPORTS))
        .await?;
    let mut predicate = Reference::new("repo").equal_to(Datum::string(repo));
    if let Some(d) = &filter.doc_name {
        predicate = predicate.and(Reference::new("doc_name").equal_to(Datum::string(d)));
    }
    if let Some(v) = &filter.version {
        predicate = predicate.and(Reference::new("version").equal_to(Datum::string(v)));
    }
    if let Some(f) = &filter.format {
        predicate = predicate.and(Reference::new("format").equal_to(Datum::string(f)));
    }
    let scan = table
        .scan()
        .with_filter(predicate)
        .select([
            "export_id", "workspace", "repo", "doc_name", "version", "format", "git_sha",
            "ts_micros", "sha256", "byte_len",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;

    let mut out: Vec<(i64, DocExport)> = Vec::new();
    for batch in &batches {
        let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
        for (i, r) in rows_from_batch(batch)?.into_iter().enumerate() {
            // Residual filtering (file-granular pushdown may over-return).
            if r.repo != repo {
                continue;
            }
            if filter.doc_name.as_deref().is_some_and(|d| d != r.doc_name) {
                continue;
            }
            if filter.version.as_deref().is_some_and(|v| v != r.version) {
                continue;
            }
            if filter.format.as_deref().is_some_and(|f| f != r.format) {
                continue;
            }
            out.push((ts.value(i), r));
        }
    }
    out.sort_by(|a, b| b.0.cmp(&a.0));
    let mut rows: Vec<DocExport> = out.into_iter().map(|(_, r)| r).collect();
    if let Some(n) = filter.limit {
        rows.truncate(n);
    }
    Ok(rows)
}

/// Sync wrapper over [`list_doc_exports_async`] for the CLI.
pub fn list_doc_exports(
    wh: &IcebergWarehouse,
    repo: &str,
    filter: &ExportFilter,
) -> Result<Vec<DocExport>> {
    wh.block_on(list_doc_exports_async(wh, repo, filter))
}

/// Reconstruct `DocExport`s from a projected batch (without `ts` ordering;
/// `generated_at` is derived from the `ts_micros` column).
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<DocExport>> {
    let ids = col::<StringArray>(batch, "export_id")?;
    let workspaces = col::<StringArray>(batch, "workspace")?;
    let repos = col::<StringArray>(batch, "repo")?;
    let docs = col::<StringArray>(batch, "doc_name")?;
    let versions = col::<StringArray>(batch, "version")?;
    let formats = col::<StringArray>(batch, "format")?;
    let shas = col::<StringArray>(batch, "git_sha")?;
    let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
    let hashes = col::<StringArray>(batch, "sha256")?;
    let lens = col::<Int64Array>(batch, "byte_len")?;
    let mut out = Vec::with_capacity(batch.num_rows());
    for i in 0..batch.num_rows() {
        out.push(DocExport {
            export_id: ids.value(i).to_string(),
            workspace: workspaces.value(i).to_string(),
            repo: repos.value(i).to_string(),
            doc_name: docs.value(i).to_string(),
            version: versions.value(i).to_string(),
            format: formats.value(i).to_string(),
            git_sha: shas.value(i).to_string(),
            sha256: hashes.value(i).to_string(),
            byte_len: lens.value(i),
            generated_at: rfc3339(ts.value(i)),
        });
    }
    Ok(out)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::warehouse::iceberg::IcebergWarehouse;

    fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        (dir, wh)
    }

    #[test]
    fn record_then_list() {
        let (_d, wh) = wh();
        let rec = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "abc123", b"hello")
            .unwrap();
        assert_eq!(rec.byte_len, 5);
        assert_eq!(rec.format, "pdf");
        let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].doc_name, "README");
        assert_eq!(rows[0].git_sha, "abc123");
    }

    #[test]
    fn dedup_skips_identical_content() {
        let (_d, wh) = wh();
        let a = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"same").unwrap();
        let b = record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"same").unwrap();
        assert_eq!(a.export_id, b.export_id, "identical re-export reuses the row");
        let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
        assert_eq!(rows.len(), 1);
    }

    #[test]
    fn different_content_makes_new_row() {
        let (_d, wh) = wh();
        record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"v1").unwrap();
        record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"v2").unwrap();
        let rows = list_doc_exports(&wh, "demo", &ExportFilter::default()).unwrap();
        assert_eq!(rows.len(), 2, "different bytes => new row");
    }

    #[test]
    fn filter_by_format_and_limit() {
        let (_d, wh) = wh();
        record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "pdf", "s", b"p1").unwrap();
        record_doc_export(&wh, "ws", "demo", "README", "0.1.0", "html", "s", b"h1").unwrap();
        record_doc_export(&wh, "ws", "demo", "README", "0.2.0", "pdf", "s", b"p2").unwrap();
        let pdfs = list_doc_exports(
            &wh,
            "demo",
            &ExportFilter { format: Some("pdf".into()), ..Default::default() },
        )
        .unwrap();
        assert_eq!(pdfs.len(), 2);
        let one = list_doc_exports(
            &wh,
            "demo",
            &ExportFilter { limit: Some(1), ..Default::default() },
        )
        .unwrap();
        assert_eq!(one.len(), 1);
    }
}