use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, ListBuilder, RecordBatch, StringArray, StringBuilder, TimestampMicrosecondArray};
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use super::iceberg::{IcebergWarehouse, TABLE_AIRGAP_EVENTS, append_batch};
const COL_DEPENDS_ON: usize = 8;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct AirgapEventRow {
pub run_id: String,
pub seq: i64,
pub ts_micros: i64,
pub product: String,
pub op: String,
pub phase: String,
pub status: String,
pub detail: String,
pub depends_on: Option<Vec<String>>,
}
pub fn read_jsonl(path: &Path) -> Result<Vec<AirgapEventRow>> {
#[derive(serde::Deserialize)]
struct Raw {
run_id: String,
seq: i64,
ts_micros: i64,
product: String,
op: String,
phase: String,
status: String,
#[serde(default)]
detail: String,
#[serde(default)]
depends_on: Vec<String>,
}
let text = std::fs::read_to_string(path)?;
let mut rows = Vec::new();
for line in text.lines().filter(|l| !l.trim().is_empty()) {
let r: Raw = serde_json::from_str(line)?;
rows.push(AirgapEventRow {
run_id: r.run_id,
seq: r.seq,
ts_micros: r.ts_micros,
product: r.product,
op: r.op,
phase: r.phase,
status: r.status,
detail: r.detail,
depends_on: if r.depends_on.is_empty() { None } else { Some(r.depends_on) },
});
}
rows.sort_by_key(|r| (r.run_id.clone(), r.seq));
Ok(rows)
}
pub async fn append_airgap_events(wh: &IcebergWarehouse, rows: &[AirgapEventRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_AIRGAP_EVENTS)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let dep_elem = match arrow_schema.field(COL_DEPENDS_ON).data_type() {
arrow::datatypes::DataType::List(elem) | arrow::datatypes::DataType::LargeList(elem) => {
elem.clone()
}
other => return Err(anyhow!("depends_on column expected List, got {other:?}")),
};
let mut deps_b: ListBuilder<StringBuilder> =
ListBuilder::new(StringBuilder::new()).with_field(dep_elem);
for r in rows {
match &r.depends_on {
None => deps_b.append(false),
Some(v) => {
for d in v {
deps_b.values().append_value(d);
}
deps_b.append(true);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.product.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.phase.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
Arc::new(deps_b.finish()),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn jsonl_decodes_airgap_events() {
let dir = tempfile::tempdir().unwrap();
let f = dir.path().join("airgap_events.jsonl");
std::fs::write(
&f,
"{\"run_id\":\"r1\",\"seq\":0,\"ts_micros\":1,\"product\":\"holger\",\"op\":\"install\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"\",\"depends_on\":[]}\n\
{\"run_id\":\"r1\",\"seq\":1,\"ts_micros\":2,\"product\":\"nornir-server\",\"op\":\"start\",\"phase\":\"end\",\"status\":\"ok\",\"detail\":\"healthy\",\"depends_on\":[\"holger\"]}\n",
)
.unwrap();
let rows = read_jsonl(&f).unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].product, "holger");
assert_eq!(rows[0].op, "install");
assert_eq!(rows[0].depends_on, None, "root op has no deps");
assert_eq!(rows[1].product, "nornir-server");
assert_eq!(rows[1].depends_on, Some(vec!["holger".to_string()]));
}
}