use std::collections::BTreeMap;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use super::iceberg::{append_batch, IcebergWarehouse, TABLE_SAGA_EVENTS};
pub use nornir_saga::event::{EventKind, SagaEvent, SessionCtx};
pub use nornir_saga::replay::{ReplayPlan, ReplayStep, StepKind};
pub use nornir_saga::sink::SagaSink;
const COL_SESSION_ID: usize = 0;
const COL_SEQ: usize = 1;
const COL_TS_MICROS: usize = 2;
const COL_KIND: usize = 3;
const COL_ROUTE: usize = 4;
const COL_COMPONENT: usize = 5;
const COL_ACTION: usize = 6;
const COL_DETAIL: usize = 7;
const COL_STATE_JSON: usize = 8;
const COL_LEVEL: usize = 9;
const COL_SPAN_ID: usize = 10;
const COL_PARENT: usize = 11;
const COL_APP_USER: usize = 12;
const COL_APP: usize = 13;
const COL_APP_VERSION: usize = 14;
const COL_GIT_SHA: usize = 15;
const COL_SHA: usize = 16;
pub fn event_sha(e: &SagaEvent) -> String {
let mut h = Sha256::new();
h.update(e.session_id.as_bytes());
h.update([0]);
h.update(e.seq.to_le_bytes());
h.update(e.kind.as_str().as_bytes());
h.update([0]);
h.update(e.route.as_bytes());
h.update([0]);
h.update(e.component.as_bytes());
h.update([0]);
h.update(e.action.as_bytes());
h.update([0]);
h.update(e.state_json.as_bytes());
format!("{:x}", h.finalize())
}
pub async fn append_saga_events(wh: &IcebergWarehouse, events: &[SagaEvent]) -> Result<()> {
if events.is_empty() {
return Ok(());
}
let mut by_session: BTreeMap<&str, Vec<&SagaEvent>> = BTreeMap::new();
for e in events {
by_session.entry(e.session_id.as_str()).or_default().push(e);
}
for (_session, batch) in by_session {
append_one_session(wh, &batch).await?;
}
Ok(())
}
async fn append_one_session(wh: &IcebergWarehouse, events: &[&SagaEvent]) -> Result<()> {
if events.is_empty() {
return Ok(());
}
let ident = wh.table_ident(TABLE_SAGA_EVENTS);
let table = wh.catalog().load_table(&ident).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let s = |f: fn(&SagaEvent) -> String| -> Arc<dyn Array> {
Arc::new(StringArray::from(events.iter().map(|e| f(e)).collect::<Vec<_>>()))
};
let cols: Vec<Arc<dyn Array>> = vec![
s(|e| e.session_id.clone()),
Arc::new(Int64Array::from(events.iter().map(|e| e.seq as i64).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(
events.iter().map(|e| e.ts_micros as i64).collect::<Vec<_>>(),
)
.with_timezone("+00:00"),
),
s(|e| e.kind.as_str().to_string()),
s(|e| e.route.clone()),
s(|e| e.component.clone()),
s(|e| e.action.clone()),
s(|e| e.detail.clone()),
s(|e| e.state_json.clone()),
s(|e| e.level.clone()),
s(|e| e.span_id.clone()),
s(|e| e.parent.clone()),
s(|e| e.user.clone()),
s(|e| e.app.clone()),
s(|e| e.app_version.clone()),
s(|e| e.git_sha.clone()),
s(event_sha),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
#[derive(Debug, Clone)]
pub enum SagaSelector {
Session(String),
All,
}
pub async fn query_saga_events(
wh: &IcebergWarehouse,
sel: &SagaSelector,
) -> Result<Vec<SagaEvent>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_SAGA_EVENTS))
.await?;
let batches: Vec<RecordBatch> = skade::read_all(&table).await?;
let mut out: Vec<SagaEvent> = Vec::new();
for b in &batches {
let session_id = col_str(b, COL_SESSION_ID)?;
let seq = col_i64(b, COL_SEQ)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let kind = col_str(b, COL_KIND)?;
let route = col_str(b, COL_ROUTE)?;
let component = col_str(b, COL_COMPONENT)?;
let action = col_str(b, COL_ACTION)?;
let detail = col_str(b, COL_DETAIL)?;
let state_json = col_str(b, COL_STATE_JSON)?;
let level = col_str(b, COL_LEVEL)?;
let span_id = col_str(b, COL_SPAN_ID)?;
let parent = col_str(b, COL_PARENT)?;
let app_user = col_str(b, COL_APP_USER)?;
let app = col_str(b, COL_APP)?;
let app_version = col_str(b, COL_APP_VERSION)?;
let git_sha = col_str(b, COL_GIT_SHA)?;
let _sha = col_str(b, COL_SHA)?; for i in 0..b.num_rows() {
let row = SagaEvent {
seq: seq.value(i) as u64,
ts_micros: ts.value(i) as u64,
session_id: session_id.value(i).to_string(),
user: app_user.value(i).to_string(),
kind: EventKind::parse(kind.value(i)).unwrap_or(EventKind::Log),
route: route.value(i).to_string(),
component: component.value(i).to_string(),
action: action.value(i).to_string(),
detail: detail.value(i).to_string(),
state_json: state_json.value(i).to_string(),
level: level.value(i).to_string(),
span_id: span_id.value(i).to_string(),
parent: parent.value(i).to_string(),
app: app.value(i).to_string(),
app_version: app_version.value(i).to_string(),
git_sha: git_sha.value(i).to_string(),
};
let keep = match sel {
SagaSelector::Session(id) => &row.session_id == id,
SagaSelector::All => true,
};
if keep {
out.push(row);
}
}
}
out.sort_by(|a, b| (&a.session_id, a.seq).cmp(&(&b.session_id, b.seq)));
Ok(out)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SagaSessionSummary {
pub session_id: String,
pub app: String,
pub user: String,
pub git_sha: String,
pub event_count: usize,
pub step_count: usize,
pub error_count: usize,
pub started_micros: u64,
pub replayable: bool,
}
pub fn summarize_sessions(events: &[SagaEvent]) -> Vec<SagaSessionSummary> {
let mut by_session: BTreeMap<String, SagaSessionSummary> = BTreeMap::new();
for e in events {
let s = by_session
.entry(e.session_id.clone())
.or_insert_with(|| SagaSessionSummary {
session_id: e.session_id.clone(),
app: e.app.clone(),
user: e.user.clone(),
git_sha: e.git_sha.clone(),
event_count: 0,
step_count: 0,
error_count: 0,
started_micros: e.ts_micros,
replayable: false,
});
s.event_count += 1;
if e.kind.is_actionable() {
s.step_count += 1;
s.replayable = true;
}
if e.kind == EventKind::Error {
s.error_count += 1;
}
if e.ts_micros > 0 && (s.started_micros == 0 || e.ts_micros < s.started_micros) {
s.started_micros = e.ts_micros;
}
if s.app.is_empty() {
s.app = e.app.clone();
}
if s.user.is_empty() {
s.user = e.user.clone();
}
}
let mut out: Vec<SagaSessionSummary> = by_session.into_values().collect();
out.sort_by(|a, b| b.started_micros.cmp(&a.started_micros).then(a.session_id.cmp(&b.session_id)));
out
}
pub async fn list_saga_sessions(wh: &IcebergWarehouse) -> Result<Vec<SagaSessionSummary>> {
let events = query_saga_events(wh, &SagaSelector::All).await?;
Ok(summarize_sessions(&events))
}
pub async fn session_replay_plan(
wh: &IcebergWarehouse,
session_id: &str,
) -> Result<ReplayPlan> {
let events = query_saga_events(wh, &SagaSelector::Session(session_id.to_string())).await?;
Ok(ReplayPlan::for_session(session_id, &events))
}
pub struct IcebergSagaSink<'a> {
wh: &'a IcebergWarehouse,
}
impl<'a> IcebergSagaSink<'a> {
pub fn new(wh: &'a IcebergWarehouse) -> Self {
Self { wh }
}
}
impl SagaSink for IcebergSagaSink<'_> {
fn write(&self, events: &[SagaEvent]) -> Result<()> {
self.wh.block_on(append_saga_events(self.wh, events))
}
}
fn col_str(b: &RecordBatch, idx: usize) -> Result<&StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("saga_events col {idx} is not StringArray"))
}
fn col_i64(b: &RecordBatch, idx: usize) -> Result<&Int64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| anyhow!("saga_events col {idx} is not Int64Array"))
}
fn col_ts(b: &RecordBatch, idx: usize) -> Result<&TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("saga_events col {idx} is not TimestampMicrosecondArray"))
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(session: &str, seq: u64, kind: EventKind, comp: &str) -> SagaEvent {
let ctx = SessionCtx::new(session, "rickard", "njord", "0.9.0", "abc123");
let mut e = match kind {
EventKind::Nav => SagaEvent::nav("/home", comp, "open", r#"{"v":1}"#),
EventKind::Click => SagaEvent::click("/home", comp, "submit", r#"{"v":2}"#),
EventKind::Error => SagaEvent::error(comp, "boom"),
EventKind::Log => SagaEvent::log("info", "noise"),
EventKind::State => SagaEvent::state(comp, r#"{"v":3}"#),
}
.in_session(&ctx);
e.seq = seq;
e.ts_micros = 1000 + seq;
e
}
#[test]
fn append_query_round_trips_and_scopes() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let sess_a = vec![
ev("sA", 0, EventKind::Nav, "Dashboard"),
ev("sA", 1, EventKind::Log, "log"),
ev("sA", 2, EventKind::Click, "Submit"),
];
let sess_b = vec![
ev("sB", 3, EventKind::Nav, "Map"),
ev("sB", 4, EventKind::Error, "Map"),
];
wh.block_on(append_saga_events(&wh, &sess_a)).unwrap();
wh.block_on(append_saga_events(&wh, &sess_b)).unwrap();
let a = wh
.block_on(query_saga_events(&wh, &SagaSelector::Session("sA".into())))
.unwrap();
assert_eq!(a.len(), 3);
assert!(a.windows(2).all(|w| w[0].seq < w[1].seq), "sorted by seq");
let click = a.iter().find(|e| e.component == "Submit").unwrap();
assert_eq!(click.kind, EventKind::Click);
assert_eq!(click.state_json, r#"{"v":2}"#, "state round-trips exactly");
assert_eq!(click.app, "njord");
let all = wh.block_on(query_saga_events(&wh, &SagaSelector::All)).unwrap();
assert_eq!(all.len(), 5);
}
#[test]
fn summarize_and_replay_plan_from_warehouse() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let events = vec![
ev("sA", 0, EventKind::Nav, "Dashboard"),
ev("sA", 1, EventKind::Log, "noise"),
ev("sA", 2, EventKind::Click, "Submit"),
ev("sB", 3, EventKind::Error, "Map"),
];
wh.block_on(append_saga_events(&wh, &events)).unwrap();
let sessions = wh.block_on(list_saga_sessions(&wh)).unwrap();
assert_eq!(sessions.len(), 2);
let a = sessions.iter().find(|s| s.session_id == "sA").unwrap();
assert_eq!(a.event_count, 3);
assert_eq!(a.step_count, 2, "nav + click are replayable steps");
assert!(a.replayable);
let b = sessions.iter().find(|s| s.session_id == "sB").unwrap();
assert_eq!(b.step_count, 0, "an error-only session has no steps");
assert_eq!(b.error_count, 1);
assert!(!b.replayable);
let plan = wh.block_on(session_replay_plan(&wh, "sA")).unwrap();
assert_eq!(plan.steps.len(), 2);
assert_eq!(plan.steps[0].kind, StepKind::Navigate);
assert_eq!(plan.steps[1].kind, StepKind::Activate);
}
#[test]
fn iceberg_sink_writes_via_trait() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let sink = IcebergSagaSink::new(&wh);
sink.write(&[ev("sS", 0, EventKind::Nav, "Home")]).unwrap();
let back = wh
.block_on(query_saga_events(&wh, &SagaSelector::Session("sS".into())))
.unwrap();
assert_eq!(back.len(), 1);
assert_eq!(back[0].route, "/home");
}
#[test]
fn event_sha_is_stable_and_payload_sensitive() {
let a = ev("sA", 0, EventKind::Nav, "Dashboard");
let same = ev("sA", 0, EventKind::Nav, "Dashboard");
assert_eq!(event_sha(&a), event_sha(&same), "same payload → same sha");
let other = ev("sA", 0, EventKind::Click, "Dashboard");
assert_ne!(event_sha(&a), event_sha(&other), "different kind → different sha");
}
}