use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, ListBuilder, RecordBatch, StringArray, StringBuilder, TimestampMicrosecondArray};
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use super::iceberg::{IcebergWarehouse, TABLE_AIRGAP_EVENTS, append_batch};
const COL_DEPENDS_ON: usize = 8;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AirgapEventRow {
pub run_id: String,
pub seq: i64,
pub ts_micros: i64,
pub product: String,
pub op: String,
pub phase: String,
pub status: String,
pub detail: String,
pub depends_on: Option<Vec<String>>,
}
pub fn read_jsonl(path: &Path) -> Result<Vec<AirgapEventRow>> {
#[derive(serde::Deserialize)]
struct Raw {
run_id: String,
seq: i64,
ts_micros: i64,
product: String,
op: String,
phase: String,
status: String,
#[serde(default)]
detail: String,
#[serde(default)]
depends_on: Vec<String>,
}
let text = std::fs::read_to_string(path)?;
let mut rows = Vec::new();
for line in text.lines().filter(|l| !l.trim().is_empty()) {
let r: Raw = serde_json::from_str(line)?;
rows.push(AirgapEventRow {
run_id: r.run_id,
seq: r.seq,
ts_micros: r.ts_micros,
product: r.product,
op: r.op,
phase: r.phase,
status: r.status,
detail: r.detail,
depends_on: if r.depends_on.is_empty() { None } else { Some(r.depends_on) },
});
}
rows.sort_by_key(|r| (r.run_id.clone(), r.seq));
Ok(rows)
}
pub async fn append_airgap_events(wh: &IcebergWarehouse, rows: &[AirgapEventRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_AIRGAP_EVENTS)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let dep_elem = match arrow_schema.field(COL_DEPENDS_ON).data_type() {
arrow::datatypes::DataType::List(elem) | arrow::datatypes::DataType::LargeList(elem) => {
elem.clone()
}
other => return Err(anyhow!("depends_on column expected List, got {other:?}")),
};
let mut deps_b: ListBuilder<StringBuilder> =
ListBuilder::new(StringBuilder::new()).with_field(dep_elem);
for r in rows {
match &r.depends_on {
None => deps_b.append(false),
Some(v) => {
for d in v {
deps_b.values().append_value(d);
}
deps_b.append(true);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.product.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.phase.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
Arc::new(deps_b.finish()),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub fn events_outcome(
path: &Path,
rows: &[AirgapEventRow],
) -> crate::cli_outcome::CommandOutcome {
use crate::cli_outcome::CommandOutcome;
if rows.is_empty() {
return CommandOutcome::fail(
"airgap events",
format!("no airgap_events at {} (run `nornir airgap start`)", path.display()),
);
}
let mut human = String::new();
for ev in rows {
let deps = ev.depends_on.as_ref().filter(|d| !d.is_empty());
human.push_str(&format!(
" [{}] {:>3} {:<14} {:<8} {:<5} {:<7} {} {}\n",
ev.run_id,
ev.seq,
ev.product,
ev.op,
ev.phase,
ev.status,
ev.detail,
match deps {
Some(d) => format!("← {}", d.join(",")),
None => String::new(),
}
));
}
let data = serde_json::json!({ "events": rows });
CommandOutcome::ok("airgap events", data, human.trim_end().to_string())
}
pub fn verify_outcome(
bundle: &Path,
manifest_root_ok: bool,
bundle_hash_ok: bool,
bad_entries: &[String],
) -> crate::cli_outcome::CommandOutcome {
use crate::cli_outcome::CommandOutcome;
let ok = manifest_root_ok && bundle_hash_ok && bad_entries.is_empty();
let data = serde_json::json!({
"bundle": bundle.display().to_string(),
"manifest_root_ok": manifest_root_ok,
"bundle_hash_ok": bundle_hash_ok,
"bad_entries": bad_entries,
"ok": ok,
});
if ok {
CommandOutcome::ok(
"airgap fetch",
data,
format!(
"✓ bundle verified: manifest+contents intact, zero network ({})",
bundle.display()
),
)
} else {
CommandOutcome::fail(
"airgap fetch",
format!(
"✗ bundle verify FAILED for {} (manifest_root_ok={manifest_root_ok}, bundle_hash_ok={bundle_hash_ok}, bad={bad_entries:?})",
bundle.display()
),
)
}
}
pub fn unfold_plan_outcome(
backend: &str,
target: &str,
lines: &[String],
) -> crate::cli_outcome::CommandOutcome {
use crate::cli_outcome::CommandOutcome;
if lines.is_empty() {
return CommandOutcome::fail(
"airgap unfold",
format!("no provisioning plan for backend `{backend}` target `{target}`"),
);
}
let data = serde_json::json!({
"backend": backend,
"target": target,
"plan": lines,
});
CommandOutcome::ok("airgap unfold", data, lines.join("\n"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn jsonl_decodes_airgap_events() {
let dir = tempfile::tempdir().unwrap();
let f = dir.path().join("airgap_events.jsonl");
std::fs::write(
&f,
"{\"run_id\":\"r1\",\"seq\":0,\"ts_micros\":1,\"product\":\"holger\",\"op\":\"install\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"\",\"depends_on\":[]}\n\
{\"run_id\":\"r1\",\"seq\":1,\"ts_micros\":2,\"product\":\"nornir-server\",\"op\":\"start\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"healthy\",\"depends_on\":[\"holger\"]}\n",
)
.unwrap();
let rows = read_jsonl(&f).unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].product, "holger");
assert_eq!(rows[0].op, "install");
assert_eq!(rows[0].depends_on, None, "root op has no deps");
assert_eq!(rows[1].product, "nornir-server");
assert_eq!(rows[1].depends_on, Some(vec!["holger".to_string()]));
}
fn ev(run: &str, seq: i64, product: &str, op: &str, deps: Option<Vec<String>>) -> AirgapEventRow {
AirgapEventRow {
run_id: run.into(),
seq,
ts_micros: seq + 1,
product: product.into(),
op: op.into(),
phase: "end".into(),
status: "ok".into(),
detail: String::new(),
depends_on: deps,
}
}
#[test]
fn events_outcome_empty_is_red_real_is_sannr() {
let p = std::path::Path::new("/tmp/airgap_events.jsonl");
let red = events_outcome(p, &[]);
assert_eq!(red.command, "airgap events");
assert!(!red.is_sannr());
assert!(red.human.contains("no airgap_events"));
let rows = vec![
ev("r1", 0, "holger", "install", None),
ev("r1", 1, "nornir-server", "start", Some(vec!["holger".into()])),
];
let o = events_outcome(p, &rows);
assert!(o.is_sannr());
let arr = o.data["events"].as_array().unwrap();
assert_eq!(arr.len(), 2);
assert_eq!(arr[1]["product"], serde_json::json!("nornir-server"));
assert!(o.human.contains("holger"), "dep edge rendered: {}", o.human);
}
#[test]
fn verify_outcome_intact_is_sannr_tampered_is_red() {
let b = std::path::Path::new("/srv/lake.tar.zst");
let ok = verify_outcome(b, true, true, &[]);
assert_eq!(ok.command, "airgap fetch");
assert!(ok.is_sannr(), "intact bundle is a true (sannr) outcome");
assert_eq!(ok.data["ok"], serde_json::json!(true));
assert_eq!(ok.data["bundle"], serde_json::json!("/srv/lake.tar.zst"));
let bad = verify_outcome(b, true, false, &["bin/holger".into()]);
assert!(!bad.is_sannr());
assert!(bad.human.contains("FAILED"));
assert!(bad.human.contains("bin/holger"));
}
#[test]
fn unfold_plan_outcome_real_plan_is_sannr_empty_is_red() {
let lines = vec![
"qemu-system-x86_64 -accel kvm".to_string(),
" -m 4096".to_string(),
];
let o = unfold_plan_outcome("kvm", "njord", &lines);
assert_eq!(o.command, "airgap unfold");
assert!(o.is_sannr());
assert_eq!(o.data["backend"], serde_json::json!("kvm"));
assert_eq!(o.data["plan"].as_array().unwrap().len(), 2);
assert!(o.human.contains("qemu-system-x86_64"));
assert!(!unfold_plan_outcome("kvm", "njord", &[]).is_sannr());
}
}