nornir 0.4.42

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Iceberg writer/reader for the **clone/populate-outcome stream** (`clone_events`,
//! PLAN #6).
//!
//! When the server's poll loop (or a `Workspaces.Fetch` RPC, or the fat-CLI
//! `nornir workspace fetch`) clones+fetches a monitored workspace's git members,
//! per-member failures used to be *log-only* — an `eprintln!("…fetch error…")`
//! that a thin viz/CLI client could never see. It just saw missing data and no
//! reason. This module makes those outcomes **first-class, persisted, and readable
//! remotely**: one row per member attempt (clone/fetch + republish), `ok` or
//! `error`, carrying the error chain and the elapsed time.
//!
//! Write path: [`record_fetch_report`] turns a [`crate::monitor::FetchReport`] into
//! rows and appends them through the warehouse handle the *server already holds*
//! (redb is single-writer — the monitor itself can't open a second handle, so the
//! call sites that own a handle do the write). Best-effort + non-fatal: a logging
//! row failure must never abort a fetch/republish.
//!
//! Read path: [`query_clone_events`] scans the table, scopes by workspace, and
//! returns rows newest-first (Iceberg gives no scan order, so reads sort by
//! `ts_micros`). The `Viz.CloneEvents` RPC + `nornir workspace events` both read
//! through it.
//!
//! Schema: [`super::iceberg_schema::clone_events`].

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use chrono::{DateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;

use super::iceberg::{IcebergWarehouse, TABLE_CLONE_EVENTS, append_batch};

// Column indices match `iceberg_schema::clone_events` field order.
const COL_TS_MICROS: usize = 0;
const COL_WORKSPACE: usize = 1;
const COL_MEMBER: usize = 2;
const COL_REMOTE: usize = 3;
const COL_OP: usize = 4;
const COL_STATUS: usize = 5;
const COL_DETAIL: usize = 6;
const COL_ELAPSED_MS: usize = 7;

/// Canonical op tags.
pub mod op {
    /// A member clone/fetch attempt (today both go through `clone_or_fetch`).
    pub const CLONE_FETCH: &str = "clone-fetch";
    /// The post-fetch warehouse republish for the whole workspace.
    pub const REPUBLISH: &str = "republish";
}

/// Canonical status tags.
pub mod status {
    pub const OK: &str = "ok";
    pub const ERROR: &str = "error";
}

/// One row of the `clone_events` table — a single member's clone/fetch (or the
/// workspace republish) outcome.
///
/// `serde` derives let the server ship the exact rows to a thin viz client over
/// the `Viz.CloneEvents` RPC (serialize → JSON → deserialize), so the remote 🧬
/// nornir pane + `nornir --server workspace events` render the identical populate
/// status the embedded path reads from the warehouse.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CloneEventRow {
    /// Event time, microseconds since the Unix epoch (UTC).
    pub ts_micros: i64,
    /// The workspace this fetch belonged to.
    pub workspace: String,
    /// The member that was fetched (or `*` for a workspace-level `republish`).
    pub member: String,
    /// The git remote that was cloned/fetched (empty for a republish row).
    pub remote: String,
    /// `clone-fetch` | `republish` (see [`op`]).
    pub op: String,
    /// `ok` | `error` (see [`status`]).
    pub status: String,
    /// On error: the error chain (`{e:#}`). On ok: the resolved SHA (clone-fetch)
    /// or the snapshot id (republish). Stored as "" never null so SQL readers see
    /// no surprise nulls.
    pub detail: String,
    /// Wall-clock duration of the op, milliseconds.
    pub elapsed_ms: i64,
}

/// Append a batch of `clone_events` rows (one Iceberg snapshot). Mirrors
/// [`super::release_events::append_release_events`].
pub async fn append_clone_events(wh: &IcebergWarehouse, rows: &[CloneEventRow]) -> Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_CLONE_EVENTS)).await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(
            TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(rows.iter().map(|r| r.workspace.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.member.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.remote.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
        Arc::new(Int64Array::from(rows.iter().map(|r| r.elapsed_ms).collect::<Vec<_>>())),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

/// Build `clone_events` rows from a [`crate::monitor::FetchReport`] — one per
/// fetched member outcome (ok or error). Pure (no I/O), so the writer + tests
/// share it. Stamped now (UTC micros).
pub fn rows_from_report(report: &crate::monitor::FetchReport) -> Vec<CloneEventRow> {
    let ts = Utc::now().timestamp_micros();
    report
        .outcomes
        .iter()
        .map(|o| CloneEventRow {
            ts_micros: ts,
            workspace: report.workspace.clone(),
            member: o.member.clone(),
            remote: o.remote.clone(),
            op: o.op.clone(),
            status: o.status.clone(),
            detail: o.detail.clone(),
            elapsed_ms: o.elapsed_ms,
        })
        .collect()
}

/// Persist a fetch report's per-member outcomes to `clone_events`. Best-effort +
/// **non-fatal**: a logging-row failure is logged (stderr) and swallowed so a
/// warehouse hiccup never aborts an otherwise-successful fetch/republish. Called
/// from every path that owns a warehouse handle (server sweep, `Workspaces.Fetch`,
/// the fat-CLI fetch).
pub fn record_fetch_report(wh: &IcebergWarehouse, report: &crate::monitor::FetchReport) {
    let rows = rows_from_report(report);
    if rows.is_empty() {
        return;
    }
    if let Err(e) = wh.block_on(append_clone_events(wh, &rows)) {
        eprintln!(
            "   ⚠ clone_events: dropped {} populate-outcome row(s) for `{}` (non-fatal): {e:#}",
            rows.len(),
            report.workspace
        );
    }
}

/// Record a single workspace-level `republish` outcome (ok → snapshot id, or the
/// error chain). Non-fatal. `member` is `*` (a republish covers the whole
/// workspace, not one member).
pub fn record_republish(
    wh: &IcebergWarehouse,
    workspace: &str,
    status_: &str,
    detail: &str,
    elapsed_ms: i64,
) {
    let row = CloneEventRow {
        ts_micros: Utc::now().timestamp_micros(),
        workspace: workspace.to_string(),
        member: "*".into(),
        remote: String::new(),
        op: op::REPUBLISH.into(),
        status: status_.to_string(),
        detail: detail.to_string(),
        elapsed_ms,
    };
    if let Err(e) = wh.block_on(append_clone_events(wh, std::slice::from_ref(&row))) {
        eprintln!("   ⚠ clone_events: dropped republish row for `{workspace}` (non-fatal): {e:#}");
    }
}

/// Which clone events to read.
#[derive(Debug, Clone)]
pub enum CloneSelector {
    /// Every event whose `workspace` matches.
    Workspace(String),
    /// Everything in the table.
    All,
}

/// Read clone events, scoped by `sel`, returned **newest-first** (`ts_micros`
/// descending) so the recent populate status is at the top — Iceberg gives no scan
/// order, so the reader sorts.
pub async fn query_clone_events(
    wh: &IcebergWarehouse,
    sel: &CloneSelector,
) -> Result<Vec<CloneEventRow>> {
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_CLONE_EVENTS)).await?;
    let scan = table.scan().build()?;
    let stream = scan.to_arrow().await?;
    let batches: Vec<RecordBatch> = stream.try_collect().await?;

    let mut out: Vec<CloneEventRow> = Vec::new();
    for b in &batches {
        let ts = col_ts(b, COL_TS_MICROS)?;
        let workspace = col_str(b, COL_WORKSPACE)?;
        let member = col_str(b, COL_MEMBER)?;
        let remote = col_str(b, COL_REMOTE)?;
        let op_ = col_str(b, COL_OP)?;
        let st = col_str(b, COL_STATUS)?;
        let detail = col_str(b, COL_DETAIL)?;
        let elapsed = col_i64(b, COL_ELAPSED_MS)?;
        for i in 0..b.num_rows() {
            let row = CloneEventRow {
                ts_micros: ts.value(i),
                workspace: workspace.value(i).to_string(),
                member: member.value(i).to_string(),
                remote: remote.value(i).to_string(),
                op: op_.value(i).to_string(),
                status: st.value(i).to_string(),
                detail: detail.value(i).to_string(),
                elapsed_ms: elapsed.value(i),
            };
            let keep = match sel {
                CloneSelector::Workspace(w) => &row.workspace == w,
                CloneSelector::All => true,
            };
            if keep {
                out.push(row);
            }
        }
    }
    out.sort_by(|a, b| b.ts_micros.cmp(&a.ts_micros));
    Ok(out)
}

/// Render a human-readable populate-status view: newest events first, each line
/// `mark op member — detail (elapsed)`. Backs `nornir workspace events`.
pub fn render_events(rows: &[CloneEventRow]) -> String {
    if rows.is_empty() {
        return "(no clone/populate events recorded)\n".to_string();
    }
    let mut out = String::new();
    for r in rows {
        let mark = match r.status.as_str() {
            status::OK => "",
            status::ERROR => "",
            _ => "·",
        };
        let when = ts_to_rfc3339(r.ts_micros);
        let detail = if r.detail.is_empty() {
            String::new()
        } else {
            format!("{}", r.detail)
        };
        out.push_str(&format!(
            "{mark} {when}  {op:<12} {member}{detail} ({elapsed}ms)\n",
            op = r.op,
            member = r.member,
            elapsed = r.elapsed_ms,
        ));
    }
    out
}

// ─── column helpers ─────────────────────────────────────────────────────

fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<StringArray>()
        .ok_or_else(|| anyhow!("clone_events col {idx} is not StringArray"))
}

fn col_i64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int64Array> {
    b.column(idx)
        .as_any()
        .downcast_ref::<Int64Array>()
        .ok_or_else(|| anyhow!("clone_events col {idx} is not Int64Array"))
}

fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<TimestampMicrosecondArray>()
        .ok_or_else(|| anyhow!("clone_events col {idx} is not TimestampMicrosecondArray"))
}

/// Format `ts_micros` as RFC3339 (UTC). Convenience for renderers/tests.
pub fn ts_to_rfc3339(ts_micros: i64) -> String {
    let dt: DateTime<Utc> = Utc.timestamp_micros(ts_micros).single().unwrap_or_else(Utc::now);
    dt.to_rfc3339()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::monitor::{FetchReport, MemberOutcome};

    /// LAW 1 (inject-assert): feed a FetchReport with one OK member and one FAILED
    /// member → assert exactly two rows persist with the right
    /// (member, op, status, detail) and read back newest-first, scoped by
    /// workspace.
    #[test]
    fn fetch_report_failure_persists_and_reads_back() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();

        let report = FetchReport {
            workspace: "nordisk".into(),
            fetched: 2,
            changed: vec!["facett".into()],
            errors: vec![(
                "korp".into(),
                "clone-fetch https://github.com/nordisk/korp: Couldn't obtain Username".into(),
            )],
            outcomes: vec![
                MemberOutcome {
                    member: "facett".into(),
                    remote: "git@github.com:nordisk/facett.git".into(),
                    op: op::CLONE_FETCH.into(),
                    status: status::OK.into(),
                    detail: "abc123".into(),
                    elapsed_ms: 42,
                },
                MemberOutcome {
                    member: "korp".into(),
                    remote: "https://github.com/nordisk/korp".into(),
                    op: op::CLONE_FETCH.into(),
                    status: status::ERROR.into(),
                    detail: "clone-fetch https://github.com/nordisk/korp: Couldn't obtain Username"
                        .into(),
                    elapsed_ms: 7,
                },
            ],
        };

        // Write through the (non-fatal) recorder — the same call the server uses.
        record_fetch_report(&wh, &report);

        // Read back, scoped to the workspace, newest-first.
        let rows = wh
            .block_on(query_clone_events(&wh, &CloneSelector::Workspace("nordisk".into())))
            .unwrap();
        assert_eq!(rows.len(), 2, "one row per fetched member");

        // The FAILED member is first-class: find it and assert the error detail
        // round-tripped exactly.
        let failed: Vec<_> = rows.iter().filter(|r| r.status == status::ERROR).collect();
        assert_eq!(failed.len(), 1);
        assert_eq!(failed[0].member, "korp");
        assert_eq!(failed[0].op, "clone-fetch");
        assert_eq!(failed[0].remote, "https://github.com/nordisk/korp");
        assert!(
            failed[0].detail.contains("Couldn't obtain Username"),
            "the error chain is readable, got: {}",
            failed[0].detail
        );
        assert_eq!(failed[0].elapsed_ms, 7);

        // The OK member round-trips with its SHA in `detail`.
        let ok: Vec<_> = rows.iter().filter(|r| r.status == status::OK).collect();
        assert_eq!(ok.len(), 1);
        assert_eq!(ok[0].member, "facett");
        assert_eq!(ok[0].detail, "abc123");

        // A different workspace's scope sees nothing.
        let other = wh
            .block_on(query_clone_events(&wh, &CloneSelector::Workspace("holger".into())))
            .unwrap();
        assert!(other.is_empty(), "workspace scope isolates events");

        // The human render names the failed member + its error.
        let txt = render_events(&rows);
        assert!(txt.contains("korp"));
        assert!(txt.contains("Couldn't obtain Username"));
        assert!(txt.contains(''), "failure marker present");

        // JSON round-trips for the RPC contract.
        let json = serde_json::to_string(&rows).unwrap();
        let back: Vec<CloneEventRow> = serde_json::from_str(&json).unwrap();
        assert_eq!(back, rows);
    }

    /// A republish outcome is recordable as its own workspace-level row.
    #[test]
    fn republish_outcome_records() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        record_republish(&wh, "nordisk", status::OK, "snapshot-99", 1234);
        let rows = wh.block_on(query_clone_events(&wh, &CloneSelector::All)).unwrap();
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].op, "republish");
        assert_eq!(rows[0].member, "*");
        assert_eq!(rows[0].detail, "snapshot-99");
        assert_eq!(rows[0].elapsed_ms, 1234);
    }
}