nornir 0.4.34

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Iceberg writer/reader for the **airgap-op DAG** (`airgap_events`) — EPIC
//! AIRGAP / AIRGAP8.
//!
//! The durable twin of the lean `nornir-airgap` crate's JSONL event sink
//! (`nornir_airgap::events`): on an air-gapped target with no warehouse server
//! the crate appends events to `<warehouse_root>/airgap_events.jsonl`; when the
//! fat `nornir` CLI runs against a real warehouse, [`append_airgap_events`]
//! ingests those same rows into the historized Iceberg table so the bring-up DAG
//! is queryable + time-travelable, exactly like `release_events`.
//!
//! Schema: [`super::iceberg_schema::airgap_events`].

use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, ListBuilder, RecordBatch, StringArray, StringBuilder, TimestampMicrosecondArray};
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;

use super::iceberg::{IcebergWarehouse, TABLE_AIRGAP_EVENTS, append_batch};

const COL_DEPENDS_ON: usize = 8;

/// One row of the `airgap_events` table — identical fields to the lean crate's
/// `AirgapEvent`, re-declared here so the warehouse layer needn't depend on the
/// `nornir-airgap` crate (the JSONL bytes are the contract).
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AirgapEventRow {
    pub run_id: String,
    pub seq: i64,
    pub ts_micros: i64,
    pub product: String,
    pub op: String,
    pub phase: String,
    pub status: String,
    pub detail: String,
    /// Bring-up producers this op waited on (None = root op).
    pub depends_on: Option<Vec<String>>,
}

/// Read a `nornir-airgap` JSONL event file (one JSON object per line) into rows.
/// `depends_on` becomes `None` when empty (matches the optional Iceberg column).
pub fn read_jsonl(path: &Path) -> Result<Vec<AirgapEventRow>> {
    #[derive(serde::Deserialize)]
    struct Raw {
        run_id: String,
        seq: i64,
        ts_micros: i64,
        product: String,
        op: String,
        phase: String,
        status: String,
        #[serde(default)]
        detail: String,
        #[serde(default)]
        depends_on: Vec<String>,
    }
    let text = std::fs::read_to_string(path)?;
    let mut rows = Vec::new();
    for line in text.lines().filter(|l| !l.trim().is_empty()) {
        let r: Raw = serde_json::from_str(line)?;
        rows.push(AirgapEventRow {
            run_id: r.run_id,
            seq: r.seq,
            ts_micros: r.ts_micros,
            product: r.product,
            op: r.op,
            phase: r.phase,
            status: r.status,
            detail: r.detail,
            depends_on: if r.depends_on.is_empty() { None } else { Some(r.depends_on) },
        });
    }
    rows.sort_by_key(|r| (r.run_id.clone(), r.seq));
    Ok(rows)
}

/// Append a batch of `airgap_events` rows (one Iceberg snapshot). Mirrors
/// [`super::release_events::append_release_events`].
pub async fn append_airgap_events(wh: &IcebergWarehouse, rows: &[AirgapEventRow]) -> Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_AIRGAP_EVENTS)).await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let dep_elem = match arrow_schema.field(COL_DEPENDS_ON).data_type() {
        arrow::datatypes::DataType::List(elem) | arrow::datatypes::DataType::LargeList(elem) => {
            elem.clone()
        }
        other => return Err(anyhow!("depends_on column expected List, got {other:?}")),
    };
    let mut deps_b: ListBuilder<StringBuilder> =
        ListBuilder::new(StringBuilder::new()).with_field(dep_elem);
    for r in rows {
        match &r.depends_on {
            None => deps_b.append(false),
            Some(v) => {
                for d in v {
                    deps_b.values().append_value(d);
                }
                deps_b.append(true);
            }
        }
    }

    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
        Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
        Arc::new(
            TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(rows.iter().map(|r| r.product.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.phase.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
        Arc::new(deps_b.finish()),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    /// LAW 1 (inject-assert): parse a real JSONL event file (the lean crate's
    /// on-disk format) → assert the rows decode with the right fields and
    /// `depends_on` is `None` for roots / `Some` for edges.
    #[test]
    fn jsonl_decodes_airgap_events() {
        let dir = tempfile::tempdir().unwrap();
        let f = dir.path().join("airgap_events.jsonl");
        std::fs::write(
            &f,
            "{\"run_id\":\"r1\",\"seq\":0,\"ts_micros\":1,\"product\":\"holger\",\"op\":\"install\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"\",\"depends_on\":[]}\n\
             {\"run_id\":\"r1\",\"seq\":1,\"ts_micros\":2,\"product\":\"nornir-server\",\"op\":\"start\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"healthy\",\"depends_on\":[\"holger\"]}\n",
        )
        .unwrap();

        let rows = read_jsonl(&f).unwrap();
        assert_eq!(rows.len(), 2);
        assert_eq!(rows[0].product, "holger");
        assert_eq!(rows[0].op, "install");
        assert_eq!(rows[0].depends_on, None, "root op has no deps");
        assert_eq!(rows[1].product, "nornir-server");
        assert_eq!(rows[1].depends_on, Some(vec!["holger".to_string()]));
    }
}