use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int64Array, ListArray, ListBuilder, RecordBatch, StringArray, StringBuilder,
TimestampMicrosecondArray,
};
use chrono::{DateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use uuid::Uuid;
use super::iceberg::{IcebergWarehouse, TABLE_RELEASE_EVENTS, append_batch};
const COL_RUN_ID: usize = 0;
const COL_SEQ: usize = 1;
const COL_TS_MICROS: usize = 2;
const COL_COMPONENT: usize = 3;
const COL_REPO: usize = 4;
const COL_OP: usize = 5;
const COL_PHASE: usize = 6;
const COL_STATUS: usize = 7;
const COL_DETAIL: usize = 8;
const COL_DEPENDS_ON: usize = 9;
const COL_ELAPSED_MS: usize = 10;
pub mod phase {
pub const START: &str = "start";
pub const END: &str = "end";
pub const SKIP: &str = "skip";
}
pub mod status {
pub const OK: &str = "ok";
pub const FAIL: &str = "fail";
pub const WARN: &str = "warn";
pub const RUNNING: &str = "running";
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReleaseEventRow {
pub run_id: String,
pub seq: i64,
pub ts_micros: i64,
pub component: String,
pub repo: String,
pub op: String,
pub phase: String,
pub status: String,
pub detail: String,
pub depends_on: Option<Vec<String>>,
pub elapsed_ms: Option<i64>,
}
impl ReleaseEventRow {
fn key(&self) -> (String, i64) {
(self.run_id.clone(), self.seq)
}
}
#[derive(Clone)]
pub struct ReleaseEventEmitter {
run_id: String,
seq: Arc<AtomicI64>,
}
impl ReleaseEventEmitter {
pub fn new(run_id: impl Into<String>) -> Self {
Self { run_id: run_id.into(), seq: Arc::new(AtomicI64::new(0)) }
}
pub fn run_id(&self) -> &str {
&self.run_id
}
pub fn row(
&self,
component: &str,
repo: &str,
op: &str,
phase: &str,
status: &str,
detail: &str,
depends_on: Option<Vec<String>>,
elapsed_ms: Option<i64>,
) -> ReleaseEventRow {
let seq = self.seq.fetch_add(1, Ordering::SeqCst);
ReleaseEventRow {
run_id: self.run_id.clone(),
seq,
ts_micros: Utc::now().timestamp_micros(),
component: component.to_string(),
repo: repo.to_string(),
op: op.to_string(),
phase: phase.to_string(),
status: status.to_string(),
detail: detail.to_string(),
depends_on,
elapsed_ms,
}
}
#[allow(clippy::too_many_arguments)]
pub async fn emit_async(
&self,
wh: &IcebergWarehouse,
component: &str,
repo: &str,
op: &str,
phase: &str,
status: &str,
detail: &str,
depends_on: Option<Vec<String>>,
elapsed_ms: Option<i64>,
) {
let row = self.row(component, repo, op, phase, status, detail, depends_on, elapsed_ms);
if let Err(e) = append_release_events(wh, std::slice::from_ref(&row)).await {
eprintln!(
" ⚠ release_events: dropped {op}/{phase} for `{component}` (non-fatal): {e:#}"
);
}
}
#[allow(clippy::too_many_arguments)]
pub fn emit(
&self,
wh: &IcebergWarehouse,
component: &str,
repo: &str,
op: &str,
phase: &str,
status: &str,
detail: &str,
depends_on: Option<Vec<String>>,
elapsed_ms: Option<i64>,
) {
wh.block_on(self.emit_async(
wh, component, repo, op, phase, status, detail, depends_on, elapsed_ms,
));
}
}
pub async fn append_release_events(wh: &IcebergWarehouse, rows: &[ReleaseEventRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_RELEASE_EVENTS)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let dep_elem = match arrow_schema.field(COL_DEPENDS_ON).data_type() {
arrow::datatypes::DataType::List(elem)
| arrow::datatypes::DataType::LargeList(elem) => elem.clone(),
other => return Err(anyhow!("depends_on column expected List, got {other:?}")),
};
let mut deps_b: ListBuilder<StringBuilder> =
ListBuilder::new(StringBuilder::new()).with_field(dep_elem);
for r in rows {
match &r.depends_on {
None => deps_b.append(false),
Some(v) => {
for d in v {
deps_b.values().append_value(d);
}
deps_b.append(true);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.component.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.phase.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
Arc::new(deps_b.finish()),
Arc::new(Int64Array::from(rows.iter().map(|r| r.elapsed_ms).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
#[derive(Debug, Clone)]
pub enum EventSelector {
Run(String),
Repo(String),
All,
}
pub async fn query_release_events(
wh: &IcebergWarehouse,
sel: &EventSelector,
) -> Result<Vec<ReleaseEventRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_RELEASE_EVENTS)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<ReleaseEventRow> = Vec::new();
for b in &batches {
let run_id = col_str(b, COL_RUN_ID)?;
let seq = col_i64(b, COL_SEQ)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let component = col_str(b, COL_COMPONENT)?;
let repo = col_str(b, COL_REPO)?;
let op = col_str(b, COL_OP)?;
let phase = col_str(b, COL_PHASE)?;
let st = col_str(b, COL_STATUS)?;
let detail = col_str(b, COL_DETAIL)?;
let deps = col_str_list(b, COL_DEPENDS_ON)?;
let elapsed = col_i64(b, COL_ELAPSED_MS)?;
for i in 0..b.num_rows() {
let row = ReleaseEventRow {
run_id: run_id.value(i).to_string(),
seq: seq.value(i),
ts_micros: ts.value(i),
component: component.value(i).to_string(),
repo: repo.value(i).to_string(),
op: op.value(i).to_string(),
phase: phase.value(i).to_string(),
status: st.value(i).to_string(),
detail: detail.value(i).to_string(),
depends_on: deps[i].clone(),
elapsed_ms: if elapsed.is_null(i) { None } else { Some(elapsed.value(i)) },
};
let keep = match sel {
EventSelector::Run(id) => &row.run_id == id,
EventSelector::Repo(r) => &row.repo == r,
EventSelector::All => true,
};
if keep {
out.push(row);
}
}
}
out.sort_by_key(|r| r.key());
Ok(out)
}
pub fn rows_to_json(rows: &[ReleaseEventRow]) -> String {
fn esc(s: &str) -> String {
let mut o = String::with_capacity(s.len() + 2);
for c in s.chars() {
match c {
'"' => o.push_str("\\\""),
'\\' => o.push_str("\\\\"),
'\n' => o.push_str("\\n"),
'\t' => o.push_str("\\t"),
'\r' => o.push_str("\\r"),
c => o.push(c),
}
}
o
}
let mut s = String::from("[\n");
for (i, r) in rows.iter().enumerate() {
let depends = match &r.depends_on {
None => "null".to_string(),
Some(v) => {
let inner = v.iter().map(|d| format!("\"{}\"", esc(d))).collect::<Vec<_>>().join(", ");
format!("[{inner}]")
}
};
let elapsed = r.elapsed_ms.map(|e| e.to_string()).unwrap_or_else(|| "null".to_string());
let ts_rfc = Utc
.timestamp_micros(r.ts_micros)
.single()
.map(|d| d.to_rfc3339())
.unwrap_or_default();
s.push_str(&format!(
" {{\"run_id\": \"{}\", \"seq\": {}, \"ts\": \"{}\", \"component\": \"{}\", \
\"repo\": \"{}\", \"op\": \"{}\", \"phase\": \"{}\", \"status\": \"{}\", \
\"detail\": \"{}\", \"depends_on\": {}, \"elapsed_ms\": {}}}{}\n",
esc(&r.run_id),
r.seq,
esc(&ts_rfc),
esc(&r.component),
esc(&r.repo),
esc(&r.op),
esc(&r.phase),
esc(&r.status),
esc(&r.detail),
depends,
elapsed,
if i + 1 < rows.len() { "," } else { "" },
));
}
s.push(']');
s
}
pub fn render_topo(rows: &[ReleaseEventRow]) -> String {
let mut by_run: BTreeMap<String, Vec<&ReleaseEventRow>> = BTreeMap::new();
for r in rows {
by_run.entry(r.run_id.clone()).or_default().push(r);
}
let mut out = String::new();
for (run_id, evs) in &by_run {
let mut evs = evs.clone();
evs.sort_by_key(|r| r.seq);
out.push_str(&format!("release run {run_id}\n"));
let mut seen_deps: BTreeMap<&str, ()> = BTreeMap::new();
for r in &evs {
if let Some(deps) = &r.depends_on {
if seen_deps.insert(r.component.as_str(), ()).is_none() && !deps.is_empty() {
out.push_str(&format!(
" {} depends_on {}\n",
r.component,
deps.join(", ")
));
}
}
}
out.push_str(" ─ timeline ─\n");
for r in &evs {
let mark = match r.status.as_str() {
status::OK => "✓",
status::FAIL => "✗",
status::WARN => "⚠",
status::RUNNING => "…",
_ => "·",
};
let elapsed = r
.elapsed_ms
.map(|e| format!(" ({e}ms)"))
.unwrap_or_default();
let detail = if r.detail.is_empty() {
String::new()
} else {
format!(" — {}", r.detail)
};
out.push_str(&format!(
" [{seq:>3}] {mark} {component} {op}/{phase} {status}{elapsed}{detail}\n",
seq = r.seq,
component = r.component,
op = r.op,
phase = r.phase,
status = r.status,
));
}
out.push('\n');
}
if out.is_empty() {
out.push_str("(no release events recorded)\n");
}
out
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("release_events col {idx} is not StringArray"))
}
fn col_i64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| anyhow!("release_events col {idx} is not Int64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("release_events col {idx} is not TimestampMicrosecondArray"))
}
fn col_str_list(b: &RecordBatch, idx: usize) -> Result<Vec<Option<Vec<String>>>> {
let arr = b
.column(idx)
.as_any()
.downcast_ref::<ListArray>()
.ok_or_else(|| anyhow!("release_events col {idx} is not ListArray"))?;
let mut out = Vec::with_capacity(arr.len());
for i in 0..arr.len() {
if arr.is_null(i) {
out.push(None);
continue;
}
let values = arr.value(i);
let strs = values
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("release_events depends_on element not StringArray"))?;
let mut v = Vec::with_capacity(strs.len());
for j in 0..strs.len() {
if !strs.is_null(j) {
v.push(strs.value(j).to_string());
}
}
out.push(Some(v));
}
Ok(out)
}
pub fn ts_to_rfc3339(ts_micros: i64) -> String {
let dt: DateTime<Utc> = Utc.timestamp_micros(ts_micros).single().unwrap_or_else(Utc::now);
dt.to_rfc3339()
}
pub fn new_run_id() -> String {
Uuid::new_v4().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn emitter_seq_is_monotonic_and_shared_across_clones() {
let e = ReleaseEventEmitter::new("run-1");
let r0 = e.row("a", "a", "test", phase::START, status::RUNNING, "", None, None);
let e2 = e.clone();
let r1 = e2.row("a", "a", "test", phase::END, status::OK, "", None, Some(5));
let r2 = e.row("b", "b", "gate", phase::START, status::RUNNING, "", Some(vec!["a".into()]), None);
assert_eq!((r0.seq, r1.seq, r2.seq), (0, 1, 2));
assert_eq!(r0.run_id, "run-1");
}
#[test]
fn append_and_query_round_trip_with_grouping_and_order() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let rows = vec![
ReleaseEventRow {
run_id: "runA".into(), seq: 1, ts_micros: 10, component: "znippy".into(),
repo: "znippy".into(), op: "test".into(), phase: phase::END.into(),
status: status::OK.into(), detail: "3 passed".into(), depends_on: None,
elapsed_ms: Some(120),
},
ReleaseEventRow {
run_id: "runA".into(), seq: 0, ts_micros: 5, component: "znippy".into(),
repo: "znippy".into(), op: "test".into(), phase: phase::START.into(),
status: status::RUNNING.into(), detail: String::new(), depends_on: None,
elapsed_ms: None,
},
ReleaseEventRow {
run_id: "runA".into(), seq: 2, ts_micros: 20, component: "holger".into(),
repo: "holger".into(), op: "gate".into(), phase: phase::START.into(),
status: status::RUNNING.into(), detail: String::new(),
depends_on: Some(vec!["znippy".into()]), elapsed_ms: None,
},
ReleaseEventRow {
run_id: "runB".into(), seq: 0, ts_micros: 30, component: "korp".into(),
repo: "korp".into(), op: "test".into(), phase: phase::START.into(),
status: status::RUNNING.into(), detail: String::new(), depends_on: Some(vec![]),
elapsed_ms: None,
},
];
wh.block_on(append_release_events(&wh, &rows)).unwrap();
let a = wh
.block_on(query_release_events(&wh, &EventSelector::Run("runA".into())))
.unwrap();
assert_eq!(a.len(), 3);
assert_eq!(a.iter().map(|r| r.seq).collect::<Vec<_>>(), vec![0, 1, 2]);
assert_eq!(a[0].phase, phase::START);
assert_eq!(a[1].phase, phase::END);
assert_eq!(a[1].elapsed_ms, Some(120));
assert_eq!(a[2].depends_on, Some(vec!["znippy".to_string()]));
assert_eq!(a[0].depends_on, None);
let korp = wh
.block_on(query_release_events(&wh, &EventSelector::Repo("korp".into())))
.unwrap();
assert_eq!(korp.len(), 1);
assert_eq!(korp[0].run_id, "runB");
assert_eq!(korp[0].depends_on, Some(vec![]));
let all = wh.block_on(query_release_events(&wh, &EventSelector::All)).unwrap();
assert_eq!(all.len(), 4);
let topo = render_topo(&all);
assert!(topo.contains("release run runA"));
assert!(topo.contains("release run runB"));
assert!(topo.contains("holger depends_on znippy"));
let json = rows_to_json(&a);
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.as_array().unwrap().len(), 3);
assert_eq!(parsed[2]["depends_on"][0], "znippy");
assert_eq!(parsed[1]["elapsed_ms"], 120);
}
}