nornir 0.5.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Iceberg writer/reader for the **airgap-op DAG** (`airgap_events`) — EPIC
//! AIRGAP / AIRGAP8.
//!
//! The durable twin of the lean `skidbladnir` crate's JSONL event sink
//! (`skidbladnir::events`): on an air-gapped target with no warehouse server
//! the crate appends events to `<warehouse_root>/airgap_events.jsonl`; when the
//! fat `nornir` CLI runs against a real warehouse, [`append_airgap_events`]
//! ingests those same rows into the historized Iceberg table so the bring-up DAG
//! is queryable + time-travelable, exactly like `release_events`.
//!
//! Schema: [`super::iceberg_schema::airgap_events`].

use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, ListBuilder, RecordBatch, StringArray, StringBuilder, TimestampMicrosecondArray};
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;

use super::iceberg::{IcebergWarehouse, TABLE_AIRGAP_EVENTS, append_batch};

const COL_DEPENDS_ON: usize = 8;

/// One row of the `airgap_events` table — identical fields to the lean crate's
/// `AirgapEvent`, re-declared here so the warehouse layer needn't depend on the
/// `skidbladnir` crate (the JSONL bytes are the contract).
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AirgapEventRow {
    pub run_id: String,
    pub seq: i64,
    pub ts_micros: i64,
    pub product: String,
    pub op: String,
    pub phase: String,
    pub status: String,
    pub detail: String,
    /// Bring-up producers this op waited on (None = root op).
    pub depends_on: Option<Vec<String>>,
}

/// Read a `skidbladnir` JSONL event file (one JSON object per line) into rows.
/// `depends_on` becomes `None` when empty (matches the optional Iceberg column).
pub fn read_jsonl(path: &Path) -> Result<Vec<AirgapEventRow>> {
    #[derive(serde::Deserialize)]
    struct Raw {
        run_id: String,
        seq: i64,
        ts_micros: i64,
        product: String,
        op: String,
        phase: String,
        status: String,
        #[serde(default)]
        detail: String,
        #[serde(default)]
        depends_on: Vec<String>,
    }
    let text = std::fs::read_to_string(path)?;
    let mut rows = Vec::new();
    for line in text.lines().filter(|l| !l.trim().is_empty()) {
        let r: Raw = serde_json::from_str(line)?;
        rows.push(AirgapEventRow {
            run_id: r.run_id,
            seq: r.seq,
            ts_micros: r.ts_micros,
            product: r.product,
            op: r.op,
            phase: r.phase,
            status: r.status,
            detail: r.detail,
            depends_on: if r.depends_on.is_empty() { None } else { Some(r.depends_on) },
        });
    }
    rows.sort_by_key(|r| (r.run_id.clone(), r.seq));
    Ok(rows)
}

/// Append a batch of `airgap_events` rows (one Iceberg snapshot). Mirrors
/// [`super::release_events::append_release_events`].
pub async fn append_airgap_events(wh: &IcebergWarehouse, rows: &[AirgapEventRow]) -> Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_AIRGAP_EVENTS)).await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let dep_elem = match arrow_schema.field(COL_DEPENDS_ON).data_type() {
        arrow::datatypes::DataType::List(elem) | arrow::datatypes::DataType::LargeList(elem) => {
            elem.clone()
        }
        other => return Err(anyhow!("depends_on column expected List, got {other:?}")),
    };
    let mut deps_b: ListBuilder<StringBuilder> =
        ListBuilder::new(StringBuilder::new()).with_field(dep_elem);
    for r in rows {
        match &r.depends_on {
            None => deps_b.append(false),
            Some(v) => {
                for d in v {
                    deps_b.values().append_value(d);
                }
                deps_b.append(true);
            }
        }
    }

    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
        Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
        Arc::new(
            TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(rows.iter().map(|r| r.product.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.phase.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
        Arc::new(deps_b.finish()),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

// ─── AUT9 CLI command outcomes ───────────────────────────────────────────────
// The airgap READ commands (`events`, `fetch`/verify, `unfold`/plan) rendered as
// the uniform `crate::cli_outcome::CommandOutcome` — the CLI twin of a Facet's
// `state_json`. Each builder takes ALREADY-fetched data (rows / a verify report /
// a rendered plan) and only SHAPES it; it never reads disk or runs a backend
// (CommandOutcome is presentation, never a transport). `ok ⟺ a real, non-empty,
// non-failed result` (RAGNARÖK: empty / tampered ⇒ RED, never silently green).
// See `.nornir/cli-command-contract.md`.

/// `airgap events` — the bring-up DAG read back as a [`CommandOutcome`]. `rows`
/// are already parsed from the JSONL sink (`read_jsonl`); this only shapes +
/// renders them. `ok ⟺ ≥1 event` (an empty/absent DAG is RED).
pub fn events_outcome(
    path: &Path,
    rows: &[AirgapEventRow],
) -> crate::cli_outcome::CommandOutcome {
    use crate::cli_outcome::CommandOutcome;
    if rows.is_empty() {
        return CommandOutcome::fail(
            "airgap events",
            format!("no airgap_events at {} (run `nornir airgap start`)", path.display()),
        );
    }
    let mut human = String::new();
    for ev in rows {
        let deps = ev.depends_on.as_ref().filter(|d| !d.is_empty());
        human.push_str(&format!(
            "  [{}] {:>3} {:<14} {:<8} {:<5} {:<7} {} {}\n",
            ev.run_id,
            ev.seq,
            ev.product,
            ev.op,
            ev.phase,
            ev.status,
            ev.detail,
            match deps {
                Some(d) => format!("{}", d.join(",")),
                None => String::new(),
            }
        ));
    }
    let data = serde_json::json!({ "events": rows });
    CommandOutcome::ok("airgap events", data, human.trim_end().to_string())
}

/// `airgap fetch` (AIRGAP3 — verify a received bundle) as a [`CommandOutcome`].
/// Takes the already-computed verify fields (from
/// `skidbladnir::fetch::verify_on_receipt`). `ok ⟺ manifest+contents intact`
/// (a tampered/failed bundle is RED, carrying the bad entries in `data`).
pub fn verify_outcome(
    bundle: &Path,
    manifest_root_ok: bool,
    bundle_hash_ok: bool,
    bad_entries: &[String],
) -> crate::cli_outcome::CommandOutcome {
    use crate::cli_outcome::CommandOutcome;
    let ok = manifest_root_ok && bundle_hash_ok && bad_entries.is_empty();
    let data = serde_json::json!({
        "bundle": bundle.display().to_string(),
        "manifest_root_ok": manifest_root_ok,
        "bundle_hash_ok": bundle_hash_ok,
        "bad_entries": bad_entries,
        "ok": ok,
    });
    if ok {
        CommandOutcome::ok(
            "airgap fetch",
            data,
            format!(
                "✓ bundle verified: manifest+contents intact, zero network ({})",
                bundle.display()
            ),
        )
    } else {
        CommandOutcome::fail(
            "airgap fetch",
            format!(
                "✗ bundle verify FAILED for {} (manifest_root_ok={manifest_root_ok}, bundle_hash_ok={bundle_hash_ok}, bad={bad_entries:?})",
                bundle.display()
            ),
        )
    }
}

/// `airgap unfold` (AIRGAP7 — the provisioning PLAN for a target) as a
/// [`CommandOutcome`]. Takes the already-rendered plan `lines` (the backend's
/// qemu argv / HCL / reset body) for `backend`/`target`. `ok ⟺ a non-empty plan`
/// (an unknown backend yields no lines ⇒ RED, handled by the caller's bail).
pub fn unfold_plan_outcome(
    backend: &str,
    target: &str,
    lines: &[String],
) -> crate::cli_outcome::CommandOutcome {
    use crate::cli_outcome::CommandOutcome;
    if lines.is_empty() {
        return CommandOutcome::fail(
            "airgap unfold",
            format!("no provisioning plan for backend `{backend}` target `{target}`"),
        );
    }
    let data = serde_json::json!({
        "backend": backend,
        "target": target,
        "plan": lines,
    });
    CommandOutcome::ok("airgap unfold", data, lines.join("\n"))
}

#[cfg(test)]
mod tests {
    use super::*;

    /// LAW 1 (inject-assert): parse a real JSONL event file (the lean crate's
    /// on-disk format) → assert the rows decode with the right fields and
    /// `depends_on` is `None` for roots / `Some` for edges.
    #[test]
    fn jsonl_decodes_airgap_events() {
        let dir = tempfile::tempdir().unwrap();
        let f = dir.path().join("airgap_events.jsonl");
        std::fs::write(
            &f,
            "{\"run_id\":\"r1\",\"seq\":0,\"ts_micros\":1,\"product\":\"holger\",\"op\":\"install\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"\",\"depends_on\":[]}\n\
             {\"run_id\":\"r1\",\"seq\":1,\"ts_micros\":2,\"product\":\"nornir-server\",\"op\":\"start\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"healthy\",\"depends_on\":[\"holger\"]}\n",
        )
        .unwrap();

        let rows = read_jsonl(&f).unwrap();
        assert_eq!(rows.len(), 2);
        assert_eq!(rows[0].product, "holger");
        assert_eq!(rows[0].op, "install");
        assert_eq!(rows[0].depends_on, None, "root op has no deps");
        assert_eq!(rows[1].product, "nornir-server");
        assert_eq!(rows[1].depends_on, Some(vec!["holger".to_string()]));
    }

    fn ev(run: &str, seq: i64, product: &str, op: &str, deps: Option<Vec<String>>) -> AirgapEventRow {
        AirgapEventRow {
            run_id: run.into(),
            seq,
            ts_micros: seq + 1,
            product: product.into(),
            op: op.into(),
            phase: "end".into(),
            status: "ok".into(),
            detail: String::new(),
            depends_on: deps,
        }
    }

    #[test]
    fn events_outcome_empty_is_red_real_is_sannr() {
        let p = std::path::Path::new("/tmp/airgap_events.jsonl");
        // RAGNARÖK: an absent/empty DAG is RED.
        let red = events_outcome(p, &[]);
        assert_eq!(red.command, "airgap events");
        assert!(!red.is_sannr());
        assert!(red.human.contains("no airgap_events"));
        // real rows ⇒ sannr, events surfaced as readable data.
        let rows = vec![
            ev("r1", 0, "holger", "install", None),
            ev("r1", 1, "nornir-server", "start", Some(vec!["holger".into()])),
        ];
        let o = events_outcome(p, &rows);
        assert!(o.is_sannr());
        let arr = o.data["events"].as_array().unwrap();
        assert_eq!(arr.len(), 2);
        assert_eq!(arr[1]["product"], serde_json::json!("nornir-server"));
        assert!(o.human.contains("holger"), "dep edge rendered: {}", o.human);
    }

    #[test]
    fn verify_outcome_intact_is_sannr_tampered_is_red() {
        let b = std::path::Path::new("/srv/lake.tar.zst");
        let ok = verify_outcome(b, true, true, &[]);
        assert_eq!(ok.command, "airgap fetch");
        assert!(ok.is_sannr(), "intact bundle is a true (sannr) outcome");
        assert_eq!(ok.data["ok"], serde_json::json!(true));
        assert_eq!(ok.data["bundle"], serde_json::json!("/srv/lake.tar.zst"));
        // a bad entry ⇒ verify FAILS ⇒ RED.
        let bad = verify_outcome(b, true, false, &["bin/holger".into()]);
        assert!(!bad.is_sannr());
        assert!(bad.human.contains("FAILED"));
        assert!(bad.human.contains("bin/holger"));
    }

    #[test]
    fn unfold_plan_outcome_real_plan_is_sannr_empty_is_red() {
        let lines = vec![
            "qemu-system-x86_64 -accel kvm".to_string(),
            "  -m 4096".to_string(),
        ];
        let o = unfold_plan_outcome("kvm", "njord", &lines);
        assert_eq!(o.command, "airgap unfold");
        assert!(o.is_sannr());
        assert_eq!(o.data["backend"], serde_json::json!("kvm"));
        assert_eq!(o.data["plan"].as_array().unwrap().len(), 2);
        assert!(o.human.contains("qemu-system-x86_64"));
        // no plan ⇒ RED.
        assert!(!unfold_plan_outcome("kvm", "njord", &[]).is_sannr());
    }
}