use std::collections::VecDeque;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Mutex;
use std::sync::mpsc::{Sender, channel};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::viz_actions::{
ActionSelector, VizActionRow, append_viz_actions_blocking, query_viz_actions, rows_to_json,
};
const CAP: usize = 256;
#[derive(Clone, Debug)]
pub struct Entry {
pub seq: u64,
pub stamp: String,
pub kind: Kind,
pub detail: String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Kind {
Life,
Tab,
Click,
Query,
Rpc,
Error,
}
impl Kind {
pub fn tag(self) -> &'static str {
match self {
Kind::Life => "LIFE",
Kind::Tab => "TAB",
Kind::Click => "CLICK",
Kind::Query => "QUERY",
Kind::Rpc => "RPC",
Kind::Error => "ERROR",
}
}
}
pub struct ActionLog {
inner: Mutex<Inner>,
file_path: String,
session_id: String,
warehouse_root: Option<PathBuf>,
drain: Mutex<Option<std::thread::JoinHandle<()>>>,
}
struct Inner {
ring: VecDeque<Entry>,
seq: u64,
file: Option<std::fs::File>,
workspace: String,
tab: String,
wh_tx: Option<Sender<VizActionRow>>,
}
impl Default for ActionLog {
fn default() -> Self {
Self::new()
}
}
impl ActionLog {
pub fn new() -> Self {
let file_path = std::env::var("NORNIR_VIZ_ACTIONLOG")
.unwrap_or_else(|_| "/tmp/nornir_viz_actions.log".to_string());
let file = std::fs::File::create(&file_path).ok();
let log = Self {
inner: Mutex::new(Inner {
ring: VecDeque::with_capacity(CAP),
seq: 0,
file,
workspace: String::new(),
tab: String::new(),
wh_tx: None,
}),
file_path,
session_id: uuid::Uuid::new_v4().to_string(),
warehouse_root: None,
drain: Mutex::new(None),
};
log.push(Kind::Life, format!("action-log started → {}", log.file_path));
log
}
pub fn attach_warehouse(&mut self, root: PathBuf) {
let (tx, rx) = channel::<VizActionRow>();
let wh_root = root.clone();
let spawned = std::thread::Builder::new()
.name("nornir-viz-actionlog".into())
.spawn(move || {
let wh = match IcebergWarehouse::open(&wh_root) {
Ok(w) => w,
Err(e) => {
eprintln!(
"nornir-viz action-log: warehouse open failed; durable sink off: {e:#}"
);
return;
}
};
while let Ok(first) = rx.recv() {
let mut batch = vec![first];
while let Ok(more) = rx.try_recv() {
batch.push(more);
if batch.len() >= 256 {
break;
}
}
if let Err(e) = append_viz_actions_blocking(&wh, &batch) {
eprintln!(
"nornir-viz action-log: append {} row(s) failed (non-fatal): {e:#}",
batch.len()
);
}
}
});
match spawned {
Ok(handle) => {
if let Ok(mut g) = self.inner.lock() {
g.wh_tx = Some(tx);
}
if let Ok(mut d) = self.drain.lock() {
*d = Some(handle);
}
self.warehouse_root = Some(root);
self.push(
Kind::Life,
format!("durable sink → warehouse viz_actions (session {})", self.session_id),
);
}
Err(e) => {
eprintln!("nornir-viz action-log: drain thread spawn failed; durable sink off: {e}");
}
}
}
pub fn file_path(&self) -> &str {
&self.file_path
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn set_context(&self, workspace: &str, tab: &str) {
if let Ok(mut g) = self.inner.lock() {
if g.workspace != workspace {
g.workspace = workspace.to_string();
}
if g.tab != tab {
g.tab = tab.to_string();
}
}
}
pub fn push(&self, kind: Kind, detail: impl Into<String>) {
let detail = detail.into();
let stamp = now_stamp();
let ts_micros = chrono::Utc::now().timestamp_micros();
let Ok(mut g) = self.inner.lock() else { return };
g.seq += 1;
let seq = g.seq;
let entry = Entry { seq, stamp: stamp.clone(), kind, detail: detail.clone() };
eprintln!("nornir-viz ACTION {} [{}] {}", stamp, kind.tag(), detail);
if let Some(f) = g.file.as_mut() {
let _ = writeln!(f, "{} {:>5} [{}] {}", stamp, seq, kind.tag(), detail);
let _ = f.flush();
}
if let Some(tx) = g.wh_tx.as_ref() {
let row = VizActionRow {
session_id: self.session_id.clone(),
seq: seq as i64,
ts_micros,
kind: kind.tag().to_string(),
workspace: g.workspace.clone(),
tab: g.tab.clone(),
detail: detail.clone(),
};
if tx.send(row).is_err() {
g.wh_tx = None;
}
}
if g.ring.len() == CAP {
g.ring.pop_front();
}
g.ring.push_back(entry);
}
pub fn warehouse_recent_json(&self, n: usize) -> String {
let Some(root) = self.warehouse_root.as_ref() else { return "[]".to_string() };
let wh = match IcebergWarehouse::open_read_only(root) {
Ok(w) => w,
Err(_) => return "[]".to_string(),
};
let sel = ActionSelector::Session(self.session_id.clone());
match wh.block_on(query_viz_actions(&wh, &sel, Some(n))) {
Ok(rows) => rows_to_json(&rows),
Err(_) => "[]".to_string(),
}
}
pub fn flush(&self) {
if let Ok(mut g) = self.inner.lock() {
g.wh_tx = None;
}
let handle = self.drain.lock().ok().and_then(|mut d| d.take());
if let Some(h) = handle {
let _ = h.join();
}
}
pub fn recent(&self, n: usize) -> Vec<Entry> {
let Ok(g) = self.inner.lock() else { return Vec::new() };
let len = g.ring.len();
let start = len.saturating_sub(n);
g.ring.iter().skip(start).cloned().collect()
}
pub fn count(&self) -> u64 {
self.inner.lock().map(|g| g.seq).unwrap_or(0)
}
pub fn tail_lines(&self, max_bytes: u64, max_lines: usize) -> Vec<String> {
use std::io::{Read, Seek, SeekFrom};
let Ok(mut f) = std::fs::File::open(&self.file_path) else { return Vec::new() };
let len = f.metadata().map(|m| m.len()).unwrap_or(0);
let start = len.saturating_sub(max_bytes);
if start > 0 && f.seek(SeekFrom::Start(start)).is_err() {
return Vec::new();
}
let mut bytes = Vec::new();
if f.read_to_end(&mut bytes).is_err() {
return Vec::new();
}
let text = String::from_utf8_lossy(&bytes);
let mut lines: Vec<&str> = text.lines().collect();
if start > 0 && !lines.is_empty() {
lines.remove(0);
}
let n = lines.len();
lines[n.saturating_sub(max_lines)..]
.iter()
.map(|s| s.to_string())
.collect()
}
}
fn now_stamp() -> String {
chrono::Local::now().format("%H:%M:%S%.3f").to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ring_caps_and_orders() {
std::env::set_var("NORNIR_VIZ_ACTIONLOG", "/tmp/nornir_viz_actions_test.log");
let log = ActionLog::new();
for i in 0..(CAP + 50) {
log.push(Kind::Click, format!("click {i}"));
}
let recent = log.recent(10);
assert_eq!(recent.len(), 10);
for w in recent.windows(2) {
assert!(w[1].seq > w[0].seq);
}
assert!(log.recent(usize::MAX).len() <= CAP);
assert_eq!(log.count(), CAP as u64 + 50 + 1);
}
#[test]
fn kinds_have_distinct_tags() {
let tags = [
Kind::Life.tag(),
Kind::Tab.tag(),
Kind::Click.tag(),
Kind::Query.tag(),
Kind::Rpc.tag(),
Kind::Error.tag(),
];
let mut uniq = tags.to_vec();
uniq.sort_unstable();
uniq.dedup();
assert_eq!(uniq.len(), tags.len());
}
}