use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use super::iceberg::{IcebergWarehouse, TABLE_VIZ_ACTIONS, append_batch};
const COL_SESSION_ID: usize = 0;
const COL_SEQ: usize = 1;
const COL_TS_MICROS: usize = 2;
const COL_KIND: usize = 3;
const COL_WORKSPACE: usize = 4;
const COL_TAB: usize = 5;
const COL_DETAIL: usize = 6;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VizActionRow {
pub session_id: String,
pub seq: i64,
pub ts_micros: i64,
pub kind: String,
pub workspace: String,
pub tab: String,
pub detail: String,
}
impl VizActionRow {
fn key(&self) -> (String, i64) {
(self.session_id.clone(), self.seq)
}
}
pub async fn append_viz_actions(wh: &IcebergWarehouse, rows: &[VizActionRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_VIZ_ACTIONS)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.session_id.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.seq).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.workspace.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.tab.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.detail.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub fn append_viz_actions_blocking(wh: &IcebergWarehouse, rows: &[VizActionRow]) -> Result<()> {
wh.block_on(append_viz_actions(wh, rows))
}
#[derive(Debug, Clone)]
pub enum ActionSelector {
Session(String),
All,
}
pub async fn query_viz_actions(
wh: &IcebergWarehouse,
sel: &ActionSelector,
limit: Option<usize>,
) -> Result<Vec<VizActionRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_VIZ_ACTIONS)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<VizActionRow> = Vec::new();
for b in &batches {
let session_id = col_str(b, COL_SESSION_ID)?;
let seq = col_i64(b, COL_SEQ)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let kind = col_str(b, COL_KIND)?;
let workspace = col_str(b, COL_WORKSPACE)?;
let tab = col_str(b, COL_TAB)?;
let detail = col_str(b, COL_DETAIL)?;
for i in 0..b.num_rows() {
let row = VizActionRow {
session_id: session_id.value(i).to_string(),
seq: seq.value(i),
ts_micros: ts.value(i),
kind: kind.value(i).to_string(),
workspace: workspace.value(i).to_string(),
tab: tab.value(i).to_string(),
detail: detail.value(i).to_string(),
};
let keep = match sel {
ActionSelector::Session(id) => &row.session_id == id,
ActionSelector::All => true,
};
if keep {
out.push(row);
}
}
}
out.sort_by_key(|r| r.key());
if let Some(n) = limit {
let start = out.len().saturating_sub(n);
out.drain(..start);
}
Ok(out)
}
pub fn query_viz_actions_blocking(
wh: &IcebergWarehouse,
sel: &ActionSelector,
limit: Option<usize>,
) -> Result<Vec<VizActionRow>> {
wh.block_on(query_viz_actions(wh, sel, limit))
}
pub fn rows_to_json(rows: &[VizActionRow]) -> String {
fn esc(s: &str) -> String {
let mut o = String::with_capacity(s.len() + 2);
for c in s.chars() {
match c {
'"' => o.push_str("\\\""),
'\\' => o.push_str("\\\\"),
'\n' => o.push_str("\\n"),
'\t' => o.push_str("\\t"),
'\r' => o.push_str("\\r"),
c => o.push(c),
}
}
o
}
let mut s = String::from("[");
for (i, r) in rows.iter().enumerate() {
let ts_rfc = Utc
.timestamp_micros(r.ts_micros)
.single()
.map(|d| d.to_rfc3339())
.unwrap_or_default();
s.push_str(&format!(
"{{\"session_id\": \"{}\", \"seq\": {}, \"ts\": \"{}\", \"kind\": \"{}\", \
\"workspace\": \"{}\", \"tab\": \"{}\", \"detail\": \"{}\"}}{}",
esc(&r.session_id),
r.seq,
esc(&ts_rfc),
esc(&r.kind),
esc(&r.workspace),
esc(&r.tab),
esc(&r.detail),
if i + 1 < rows.len() { ", " } else { "" },
));
}
s.push(']');
s
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("viz_actions col {idx} is not StringArray"))
}
fn col_i64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| anyhow!("viz_actions col {idx} is not Int64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("viz_actions col {idx} is not TimestampMicrosecondArray"))
}
pub fn ts_to_rfc3339(ts_micros: i64) -> String {
let dt: DateTime<Utc> = Utc.timestamp_micros(ts_micros).single().unwrap_or_else(Utc::now);
dt.to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_and_query_round_trip_with_scope_and_order() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let sess_a = vec![
VizActionRow {
session_id: "sessA".into(), seq: 2, ts_micros: 30, kind: "RPC".into(),
workspace: "knut".into(), tab: "Release".into(), detail: "Viz.Timeline".into(),
},
VizActionRow {
session_id: "sessA".into(), seq: 0, ts_micros: 10, kind: "LIFE".into(),
workspace: "knut".into(), tab: String::new(), detail: "action-log started".into(),
},
VizActionRow {
session_id: "sessA".into(), seq: 1, ts_micros: 20, kind: "TAB".into(),
workspace: "knut".into(), tab: "Release".into(), detail: "tab=Release".into(),
},
];
let sess_b = vec![VizActionRow {
session_id: "sessB".into(), seq: 0, ts_micros: 40, kind: "CLICK".into(),
workspace: "korp".into(), tab: "Test".into(), detail: "↻ reload".into(),
}];
append_viz_actions_blocking(&wh, &sess_a).unwrap();
append_viz_actions_blocking(&wh, &sess_b).unwrap();
let a = query_viz_actions_blocking(&wh, &ActionSelector::Session("sessA".into()), None)
.unwrap();
assert_eq!(a.len(), 3);
assert_eq!(a.iter().map(|r| r.seq).collect::<Vec<_>>(), vec![0, 1, 2]);
assert_eq!(a[0].kind, "LIFE");
assert_eq!(a[0].detail, "action-log started");
assert_eq!(a[1].kind, "TAB");
assert_eq!(a[1].tab, "Release");
assert_eq!(a[2].kind, "RPC");
assert_eq!(a[2].detail, "Viz.Timeline");
assert_eq!(a[2].ts_micros, 30);
let recent = query_viz_actions_blocking(
&wh,
&ActionSelector::Session("sessA".into()),
Some(2),
)
.unwrap();
assert_eq!(recent.iter().map(|r| r.seq).collect::<Vec<_>>(), vec![1, 2]);
let all = query_viz_actions_blocking(&wh, &ActionSelector::All, None).unwrap();
assert_eq!(all.len(), 4);
let b = query_viz_actions_blocking(&wh, &ActionSelector::Session("sessB".into()), None)
.unwrap();
assert_eq!(b.len(), 1);
assert_eq!(b[0].workspace, "korp");
assert_eq!(b[0].detail, "↻ reload");
let json = rows_to_json(&a);
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.as_array().unwrap().len(), 3);
assert_eq!(parsed[2]["kind"], "RPC");
assert_eq!(parsed[1]["tab"], "Release");
}
}