use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use chrono::{DateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use super::iceberg::{IcebergWarehouse, TABLE_CLONE_EVENTS, append_batch};
const COL_TS_MICROS: usize = 0;
const COL_WORKSPACE: usize = 1;
const COL_MEMBER: usize = 2;
const COL_REMOTE: usize = 3;
const COL_OP: usize = 4;
const COL_STATUS: usize = 5;
const COL_DETAIL: usize = 6;
const COL_ELAPSED_MS: usize = 7;
pub mod op {
pub const CLONE_FETCH: &str = "clone-fetch";
pub const REPUBLISH: &str = "republish";
}
pub mod status {
pub const OK: &str = "ok";
pub const ERROR: &str = "error";
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CloneEventRow {
pub ts_micros: i64,
pub workspace: String,
pub member: String,
pub remote: String,
pub op: String,
pub status: String,
pub detail: String,
pub elapsed_ms: i64,
}
pub async fn append_clone_events(wh: &IcebergWarehouse, rows: &[CloneEventRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_CLONE_EVENTS)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.workspace.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.member.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.remote.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.op.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.elapsed_ms).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub fn rows_from_report(report: &crate::monitor::FetchReport) -> Vec<CloneEventRow> {
let ts = Utc::now().timestamp_micros();
report
.outcomes
.iter()
.map(|o| CloneEventRow {
ts_micros: ts,
workspace: report.workspace.clone(),
member: o.member.clone(),
remote: o.remote.clone(),
op: o.op.clone(),
status: o.status.clone(),
detail: o.detail.clone(),
elapsed_ms: o.elapsed_ms,
})
.collect()
}
pub fn record_fetch_report(wh: &IcebergWarehouse, report: &crate::monitor::FetchReport) {
let rows = rows_from_report(report);
if rows.is_empty() {
return;
}
if let Err(e) = wh.block_on(append_clone_events(wh, &rows)) {
eprintln!(
" ⚠ clone_events: dropped {} populate-outcome row(s) for `{}` (non-fatal): {e:#}",
rows.len(),
report.workspace
);
}
}
pub fn record_republish(
wh: &IcebergWarehouse,
workspace: &str,
status_: &str,
detail: &str,
elapsed_ms: i64,
) {
let row = CloneEventRow {
ts_micros: Utc::now().timestamp_micros(),
workspace: workspace.to_string(),
member: "*".into(),
remote: String::new(),
op: op::REPUBLISH.into(),
status: status_.to_string(),
detail: detail.to_string(),
elapsed_ms,
};
if let Err(e) = wh.block_on(append_clone_events(wh, std::slice::from_ref(&row))) {
eprintln!(" ⚠ clone_events: dropped republish row for `{workspace}` (non-fatal): {e:#}");
}
}
#[derive(Debug, Clone)]
pub enum CloneSelector {
Workspace(String),
All,
}
pub async fn query_clone_events(
wh: &IcebergWarehouse,
sel: &CloneSelector,
) -> Result<Vec<CloneEventRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_CLONE_EVENTS)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<CloneEventRow> = Vec::new();
for b in &batches {
let ts = col_ts(b, COL_TS_MICROS)?;
let workspace = col_str(b, COL_WORKSPACE)?;
let member = col_str(b, COL_MEMBER)?;
let remote = col_str(b, COL_REMOTE)?;
let op_ = col_str(b, COL_OP)?;
let st = col_str(b, COL_STATUS)?;
let detail = col_str(b, COL_DETAIL)?;
let elapsed = col_i64(b, COL_ELAPSED_MS)?;
for i in 0..b.num_rows() {
let row = CloneEventRow {
ts_micros: ts.value(i),
workspace: workspace.value(i).to_string(),
member: member.value(i).to_string(),
remote: remote.value(i).to_string(),
op: op_.value(i).to_string(),
status: st.value(i).to_string(),
detail: detail.value(i).to_string(),
elapsed_ms: elapsed.value(i),
};
let keep = match sel {
CloneSelector::Workspace(w) => &row.workspace == w,
CloneSelector::All => true,
};
if keep {
out.push(row);
}
}
}
out.sort_by(|a, b| b.ts_micros.cmp(&a.ts_micros));
Ok(out)
}
pub fn render_events(rows: &[CloneEventRow]) -> String {
if rows.is_empty() {
return "(no clone/populate events recorded)\n".to_string();
}
let mut out = String::new();
for r in rows {
let mark = match r.status.as_str() {
status::OK => "✓",
status::ERROR => "✗",
_ => "·",
};
let when = ts_to_rfc3339(r.ts_micros);
let detail = if r.detail.is_empty() {
String::new()
} else {
format!(" — {}", r.detail)
};
out.push_str(&format!(
"{mark} {when} {op:<12} {member}{detail} ({elapsed}ms)\n",
op = r.op,
member = r.member,
elapsed = r.elapsed_ms,
));
}
out
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("clone_events col {idx} is not StringArray"))
}
fn col_i64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| anyhow!("clone_events col {idx} is not Int64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("clone_events col {idx} is not TimestampMicrosecondArray"))
}
pub fn ts_to_rfc3339(ts_micros: i64) -> String {
let dt: DateTime<Utc> = Utc.timestamp_micros(ts_micros).single().unwrap_or_else(Utc::now);
dt.to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::monitor::{FetchReport, MemberOutcome};
#[test]
fn fetch_report_failure_persists_and_reads_back() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let report = FetchReport {
workspace: "nordisk".into(),
fetched: 2,
changed: vec!["facett".into()],
errors: vec![(
"korp".into(),
"clone-fetch https://github.com/nordisk/korp: Couldn't obtain Username".into(),
)],
outcomes: vec![
MemberOutcome {
member: "facett".into(),
remote: "git@github.com:nordisk/facett.git".into(),
op: op::CLONE_FETCH.into(),
status: status::OK.into(),
detail: "abc123".into(),
elapsed_ms: 42,
},
MemberOutcome {
member: "korp".into(),
remote: "https://github.com/nordisk/korp".into(),
op: op::CLONE_FETCH.into(),
status: status::ERROR.into(),
detail: "clone-fetch https://github.com/nordisk/korp: Couldn't obtain Username"
.into(),
elapsed_ms: 7,
},
],
};
record_fetch_report(&wh, &report);
let rows = wh
.block_on(query_clone_events(&wh, &CloneSelector::Workspace("nordisk".into())))
.unwrap();
assert_eq!(rows.len(), 2, "one row per fetched member");
let failed: Vec<_> = rows.iter().filter(|r| r.status == status::ERROR).collect();
assert_eq!(failed.len(), 1);
assert_eq!(failed[0].member, "korp");
assert_eq!(failed[0].op, "clone-fetch");
assert_eq!(failed[0].remote, "https://github.com/nordisk/korp");
assert!(
failed[0].detail.contains("Couldn't obtain Username"),
"the error chain is readable, got: {}",
failed[0].detail
);
assert_eq!(failed[0].elapsed_ms, 7);
let ok: Vec<_> = rows.iter().filter(|r| r.status == status::OK).collect();
assert_eq!(ok.len(), 1);
assert_eq!(ok[0].member, "facett");
assert_eq!(ok[0].detail, "abc123");
let other = wh
.block_on(query_clone_events(&wh, &CloneSelector::Workspace("holger".into())))
.unwrap();
assert!(other.is_empty(), "workspace scope isolates events");
let txt = render_events(&rows);
assert!(txt.contains("korp"));
assert!(txt.contains("Couldn't obtain Username"));
assert!(txt.contains('✗'), "failure marker present");
let json = serde_json::to_string(&rows).unwrap();
let back: Vec<CloneEventRow> = serde_json::from_str(&json).unwrap();
assert_eq!(back, rows);
}
#[test]
fn republish_outcome_records() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
record_republish(&wh, "nordisk", status::OK, "snapshot-99", 1234);
let rows = wh.block_on(query_clone_events(&wh, &CloneSelector::All)).unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].op, "republish");
assert_eq!(rows[0].member, "*");
assert_eq!(rows[0].detail, "snapshot-99");
assert_eq!(rows[0].elapsed_ms, 1234);
}
}