nornir 0.4.28

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! N5 — Iceberg writer + reader for the **viz action trail** (`viz_actions`).
//!
//! The viz records every user-action (tab switch, button click, query, RPC,
//! error, lifecycle) into an in-memory ring AND the `$NORNIR_VIZ_ACTIONLOG`
//! `/tmp` file. That file is a fast, greppable, externally-observable **live
//! channel** — but it is a stopgap, not durable. Per the
//! persist-to-warehouse-stream-not-`/tmp` law, this module is the **durable,
//! queryable sink**: the same action stream lands here as Iceberg rows so the
//! trail survives the launching terminal, is queryable from SQL
//! (DuckDB / PyIceberg), and can be read **back** into the viz as a recent
//! history view (across past sessions, not just this launch's ring).
//!
//! Write path: [`crate::viz::action_log::ActionLog`] owns a fire-and-forget
//! channel into a background drain thread; the thread batches incoming
//! [`VizActionRow`]s into one Iceberg snapshot per burst via
//! [`append_viz_actions`]. Best-effort + non-fatal — a logging-row failure must
//! never disturb the running viz.
//!
//! Read path: [`query_viz_actions`] scans the table, scopes by a `session_id`
//! or returns everything, and sorts by `(session_id, seq)` — Iceberg gives no
//! scan order, so `seq` is the durable ordering key.
//!
//! Schema: [`super::iceberg_schema::viz_actions`].

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;

use super::iceberg::{IcebergWarehouse, TABLE_VIZ_ACTIONS, append_batch};

// Column indices match `iceberg_schema::viz_actions` field order.
const COL_SESSION_ID: usize = 0;
const COL_SEQ: usize = 1;
const COL_TS_MICROS: usize = 2;
const COL_KIND: usize = 3;
const COL_WORKSPACE: usize = 4;
const COL_TAB: usize = 5;
const COL_DETAIL: usize = 6;

/// One row of the `viz_actions` table — a single recorded viz user-action.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VizActionRow {
    /// The viz-session identity this action belongs to (groups a launch's
    /// trail). A fresh UUID is minted per `ActionLog`.
    pub session_id: String,
    /// Per-session monotonic sequence number — the stable ordering key (Iceberg
    /// scan order is unspecified, so reads sort by this). Mirrors the in-memory
    /// ring's `seq` exactly, so the file, the ring, and the warehouse agree.
    pub seq: i64,
    /// Action time, microseconds since the Unix epoch (UTC).
    pub ts_micros: i64,
    /// Action category tag: `LIFE | TAB | CLICK | QUERY | RPC | ERROR`.
    pub kind: String,
    /// Active workspace at push time (`""` when not applicable, never null).
    pub workspace: String,
    /// Active tab at push time (`""` when not applicable, never null).
    pub tab: String,
    /// Free-form action detail (the same text the `/tmp` file + ring carry).
    pub detail: String,
}

impl VizActionRow {
    /// The `(session_id, seq)` sort key.
    fn key(&self) -> (String, i64) {
        (self.session_id.clone(), self.seq)
    }
}

/// Append a batch of `viz_actions` rows (one Iceberg snapshot).
pub async fn append_viz_actions(wh: &IcebergWarehouse, rows: &[VizActionRow]) -> Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_VIZ_ACTIONS)).await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(rows.iter().map(|r| r.session_id.clone()).collect::<Vec<_>>())),
        Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
        Arc::new(
            TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(rows.iter().map(|r| r.kind.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.workspace.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.tab.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

/// Sync wrapper over [`append_viz_actions`] for the drain thread / tests that
/// drive the warehouse via its own runtime.
pub fn append_viz_actions_blocking(wh: &IcebergWarehouse, rows: &[VizActionRow]) -> Result<()> {
    wh.block_on(append_viz_actions(wh, rows))
}

/// Which viz actions to read.
#[derive(Debug, Clone)]
pub enum ActionSelector {
    /// Exactly one viz session (its `session_id`).
    Session(String),
    /// Everything in the table (across all sessions).
    All,
}

/// Read viz actions, scoped by `sel`, returned sorted by `(session_id, seq)`.
/// `limit` (when `Some`) keeps only the most-recent N rows after sorting — the
/// "recent history" view the viz reads back.
pub async fn query_viz_actions(
    wh: &IcebergWarehouse,
    sel: &ActionSelector,
    limit: Option<usize>,
) -> Result<Vec<VizActionRow>> {
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_VIZ_ACTIONS)).await?;
    let scan = table.scan().build()?;
    let stream = scan.to_arrow().await?;
    let batches: Vec<RecordBatch> = stream.try_collect().await?;

    let mut out: Vec<VizActionRow> = Vec::new();
    for b in &batches {
        let session_id = col_str(b, COL_SESSION_ID)?;
        let seq = col_i64(b, COL_SEQ)?;
        let ts = col_ts(b, COL_TS_MICROS)?;
        let kind = col_str(b, COL_KIND)?;
        let workspace = col_str(b, COL_WORKSPACE)?;
        let tab = col_str(b, COL_TAB)?;
        let detail = col_str(b, COL_DETAIL)?;
        for i in 0..b.num_rows() {
            let row = VizActionRow {
                session_id: session_id.value(i).to_string(),
                seq: seq.value(i),
                ts_micros: ts.value(i),
                kind: kind.value(i).to_string(),
                workspace: workspace.value(i).to_string(),
                tab: tab.value(i).to_string(),
                detail: detail.value(i).to_string(),
            };
            let keep = match sel {
                ActionSelector::Session(id) => &row.session_id == id,
                ActionSelector::All => true,
            };
            if keep {
                out.push(row);
            }
        }
    }
    out.sort_by_key(|r| r.key());
    if let Some(n) = limit {
        let start = out.len().saturating_sub(n);
        out.drain(..start);
    }
    Ok(out)
}

/// Sync wrapper over [`query_viz_actions`].
pub fn query_viz_actions_blocking(
    wh: &IcebergWarehouse,
    sel: &ActionSelector,
    limit: Option<usize>,
) -> Result<Vec<VizActionRow>> {
    wh.block_on(query_viz_actions(wh, sel, limit))
}

/// Serialize rows to a stable JSON array (one object per action). Pure string
/// assembly so the read-back history can be dropped into `state_json` without a
/// serde derive.
pub fn rows_to_json(rows: &[VizActionRow]) -> String {
    fn esc(s: &str) -> String {
        let mut o = String::with_capacity(s.len() + 2);
        for c in s.chars() {
            match c {
                '"' => o.push_str("\\\""),
                '\\' => o.push_str("\\\\"),
                '\n' => o.push_str("\\n"),
                '\t' => o.push_str("\\t"),
                '\r' => o.push_str("\\r"),
                c => o.push(c),
            }
        }
        o
    }
    let mut s = String::from("[");
    for (i, r) in rows.iter().enumerate() {
        let ts_rfc = Utc
            .timestamp_micros(r.ts_micros)
            .single()
            .map(|d| d.to_rfc3339())
            .unwrap_or_default();
        s.push_str(&format!(
            "{{\"session_id\": \"{}\", \"seq\": {}, \"ts\": \"{}\", \"kind\": \"{}\", \
             \"workspace\": \"{}\", \"tab\": \"{}\", \"detail\": \"{}\"}}{}",
            esc(&r.session_id),
            r.seq,
            esc(&ts_rfc),
            esc(&r.kind),
            esc(&r.workspace),
            esc(&r.tab),
            esc(&r.detail),
            if i + 1 < rows.len() { ", " } else { "" },
        ));
    }
    s.push(']');
    s
}

// ─── column helpers ─────────────────────────────────────────────────────

fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<StringArray>()
        .ok_or_else(|| anyhow!("viz_actions col {idx} is not StringArray"))
}

fn col_i64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int64Array> {
    b.column(idx)
        .as_any()
        .downcast_ref::<Int64Array>()
        .ok_or_else(|| anyhow!("viz_actions col {idx} is not Int64Array"))
}

fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<TimestampMicrosecondArray>()
        .ok_or_else(|| anyhow!("viz_actions col {idx} is not TimestampMicrosecondArray"))
}

/// Format `ts_micros` as RFC3339 (UTC). Convenience for callers/tests.
pub fn ts_to_rfc3339(ts_micros: i64) -> String {
    let dt: DateTime<Utc> = Utc.timestamp_micros(ts_micros).single().unwrap_or_else(Utc::now);
    dt.to_rfc3339()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn append_and_query_round_trip_with_scope_and_order() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();

        // The table is partitioned by `session_id` (one viz launch = one
        // session = one partition), and the live drain only ever appends its own
        // session's rows — so each append is single-partition. Session A's rows
        // are appended deliberately out of seq order to prove the reader sorts by
        // (session_id, seq); session B goes in its own batch.
        let sess_a = vec![
            VizActionRow {
                session_id: "sessA".into(), seq: 2, ts_micros: 30, kind: "RPC".into(),
                workspace: "knut".into(), tab: "Release".into(), detail: "Viz.Timeline".into(),
            },
            VizActionRow {
                session_id: "sessA".into(), seq: 0, ts_micros: 10, kind: "LIFE".into(),
                workspace: "knut".into(), tab: String::new(), detail: "action-log started".into(),
            },
            VizActionRow {
                session_id: "sessA".into(), seq: 1, ts_micros: 20, kind: "TAB".into(),
                workspace: "knut".into(), tab: "Release".into(), detail: "tab=Release".into(),
            },
        ];
        let sess_b = vec![VizActionRow {
            session_id: "sessB".into(), seq: 0, ts_micros: 40, kind: "CLICK".into(),
            workspace: "korp".into(), tab: "Test".into(), detail: "↻ reload".into(),
        }];
        append_viz_actions_blocking(&wh, &sess_a).unwrap();
        append_viz_actions_blocking(&wh, &sess_b).unwrap();

        // Session scope returns only sessA's three rows, in seq order.
        let a = query_viz_actions_blocking(&wh, &ActionSelector::Session("sessA".into()), None)
            .unwrap();
        assert_eq!(a.len(), 3);
        assert_eq!(a.iter().map(|r| r.seq).collect::<Vec<_>>(), vec![0, 1, 2]);
        // The actual cell values round-trip, not just the count.
        assert_eq!(a[0].kind, "LIFE");
        assert_eq!(a[0].detail, "action-log started");
        assert_eq!(a[1].kind, "TAB");
        assert_eq!(a[1].tab, "Release");
        assert_eq!(a[2].kind, "RPC");
        assert_eq!(a[2].detail, "Viz.Timeline");
        assert_eq!(a[2].ts_micros, 30);

        // limit keeps the most-recent N after sorting (the read-back history).
        let recent = query_viz_actions_blocking(
            &wh,
            &ActionSelector::Session("sessA".into()),
            Some(2),
        )
        .unwrap();
        assert_eq!(recent.iter().map(|r| r.seq).collect::<Vec<_>>(), vec![1, 2]);

        // All scope sees both sessions; sessB is isolated by its session_id.
        let all = query_viz_actions_blocking(&wh, &ActionSelector::All, None).unwrap();
        assert_eq!(all.len(), 4);
        let b = query_viz_actions_blocking(&wh, &ActionSelector::Session("sessB".into()), None)
            .unwrap();
        assert_eq!(b.len(), 1);
        assert_eq!(b[0].workspace, "korp");
        assert_eq!(b[0].detail, "↻ reload");

        // JSON read-back view is well-formed and carries the real values.
        let json = rows_to_json(&a);
        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.as_array().unwrap().len(), 3);
        assert_eq!(parsed[2]["kind"], "RPC");
        assert_eq!(parsed[1]["tab"], "Release");
    }
}