nornir 0.4.21

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
//! TOTAL user-action debug log — the headless-debuggability contract for the
//! viz. Every interaction (tab switch, button click, query, RPC call, error) is
//! pushed here with a millisecond timestamp so that "when I fire it up you
//! really follow every little bug".
//!
//! **Four** sinks, all always-on and cheap:
//!   1. an in-memory **ring buffer** (last [`CAP`] entries) the app paints in a
//!      bottom overlay panel,
//!   2. a **per-entry stderr line** (`nornir-viz ACTION …`) so the operator
//!      watching the launching terminal / server log sees the trail live, and
//!   3. an appended **file** at `$NORNIR_VIZ_ACTIONLOG` (default
//!      `/tmp/nornir_viz_actions.log`) — externally observable + greppable for
//!      bug-hunting, mirroring how `$NORNIR_VIZ_STATE` dumps the UI state, and
//!   4. **N5** — the durable warehouse `viz_actions` table (when a warehouse is
//!      attached). The `/tmp` file stays the fast LOCAL live-channel; the
//!      warehouse is the DURABLE, queryable sink (write both, per the
//!      persist-to-warehouse-stream-not-`/tmp` law). The viz also reads its own
//!      recent action history **back** from this table (see
//!      [`ActionLog::warehouse_recent_json`]).
//!
//! Cheap by design: a `Mutex<VecDeque>` push + a `writeln!` + a non-blocking
//! channel send to a background drain thread (which owns the warehouse and
//! batches a burst into one Iceberg snapshot — the viz frame never blocks on
//! I/O). The instrumented call sites (see `app.rs`) only push on real
//! edge-triggered events (a click, a tab change, an RPC firing), never every
//! frame, so it never floods.

use std::collections::VecDeque;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Mutex;
use std::sync::mpsc::{Sender, channel};

use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::viz_actions::{
    ActionSelector, VizActionRow, append_viz_actions_blocking, query_viz_actions, rows_to_json,
};

/// Max entries retained in the in-memory ring (older ones drop off the front).
const CAP: usize = 256;

/// One timestamped action-log entry.
#[derive(Clone, Debug)]
pub struct Entry {
    /// Monotonic sequence number (since app start) — stable ordering even when
    /// two entries land in the same millisecond.
    pub seq: u64,
    /// `HH:MM:SS.mmm` wall-clock stamp (local), for the overlay + file.
    pub stamp: String,
    /// Coarse category — drives the overlay color + the grep prefix.
    pub kind: Kind,
    /// Free-form detail, e.g. `tab=Bench`, `Search.Query q="foo"`, the error text.
    pub detail: String,
}

/// Action category. Kept small + greppable.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Kind {
    /// App lifecycle (launch, workspace switch, reload).
    Life,
    /// A tab became active.
    Tab,
    /// A button / control was clicked.
    Click,
    /// A user-issued query (search / lookup / call path).
    Query,
    /// An outgoing gRPC call to the server.
    Rpc,
    /// An error surfaced anywhere in the flow.
    Error,
}

impl Kind {
    pub fn tag(self) -> &'static str {
        match self {
            Kind::Life => "LIFE",
            Kind::Tab => "TAB",
            Kind::Click => "CLICK",
            Kind::Query => "QUERY",
            Kind::Rpc => "RPC",
            Kind::Error => "ERROR",
        }
    }
}

/// Central action log: a ring buffer + the file/stderr sinks + the durable
/// warehouse sink (N5). Interior-mutable (`Mutex`) so it can be shared `&self`
/// across every instrumented call site without threading `&mut` through the
/// whole app.
pub struct ActionLog {
    inner: Mutex<Inner>,
    /// File sink path (surfaced in the overlay header).
    file_path: String,
    /// This launch's identity — the `viz_actions.session_id` every row carries,
    /// so the warehouse trail is groupable + readable back per session.
    session_id: String,
    /// Warehouse root the read-back queries open (set when a durable sink is
    /// attached). `None` keeps the log file/stderr-only (e.g. remote mode or a
    /// warehouse that wouldn't open).
    warehouse_root: Option<PathBuf>,
    /// Join handle for the background drain thread, so [`flush`](Self::flush)
    /// can close the channel and wait for the final snapshot to land (used by
    /// the inject-assert test; the live viz just lets it run for its lifetime).
    drain: Mutex<Option<std::thread::JoinHandle<()>>>,
}

struct Inner {
    ring: VecDeque<Entry>,
    seq: u64,
    file: Option<std::fs::File>,
    /// Active context stamped onto each warehouse row (cheap to update each
    /// frame from the app: the current workspace + tab).
    workspace: String,
    tab: String,
    /// Non-blocking channel into the background warehouse-drain thread. `None`
    /// until/unless a warehouse is attached.
    wh_tx: Option<Sender<VizActionRow>>,
}

impl Default for ActionLog {
    fn default() -> Self {
        Self::new()
    }
}

impl ActionLog {
    pub fn new() -> Self {
        let file_path = std::env::var("NORNIR_VIZ_ACTIONLOG")
            .unwrap_or_else(|_| "/tmp/nornir_viz_actions.log".to_string());
        // Truncate on launch so each session's trail starts clean (matches the
        // single-snapshot semantics of $NORNIR_VIZ_STATE).
        let file = std::fs::File::create(&file_path).ok();
        let log = Self {
            inner: Mutex::new(Inner {
                ring: VecDeque::with_capacity(CAP),
                seq: 0,
                file,
                workspace: String::new(),
                tab: String::new(),
                wh_tx: None,
            }),
            file_path,
            session_id: uuid::Uuid::new_v4().to_string(),
            warehouse_root: None,
            drain: Mutex::new(None),
        };
        log.push(Kind::Life, format!("action-log started → {}", log.file_path));
        log
    }

    /// Attach the durable warehouse sink (N5): spawn a background drain thread
    /// that owns one [`IcebergWarehouse`] for the log's lifetime and batches
    /// incoming actions into one Iceberg snapshot per burst, and remember the
    /// `root` so the viz can read its own recent history **back** from the
    /// `viz_actions` table. Best-effort: if the warehouse can't open, telemetry
    /// stays off (file/stderr keep working) and read-back returns empty.
    ///
    /// Call once, right after [`new`](Self::new). The `/tmp` file remains the
    /// fast local live-channel; this is the durable sink (write both).
    pub fn attach_warehouse(&mut self, root: PathBuf) {
        let (tx, rx) = channel::<VizActionRow>();
        let wh_root = root.clone();
        let spawned = std::thread::Builder::new()
            .name("nornir-viz-actionlog".into())
            .spawn(move || {
                let wh = match IcebergWarehouse::open(&wh_root) {
                    Ok(w) => w,
                    Err(e) => {
                        eprintln!(
                            "nornir-viz action-log: warehouse open failed; durable sink off: {e:#}"
                        );
                        return;
                    }
                };
                // Drain-batch: take the first, then greedily coalesce whatever
                // else is queued so a burst of clicks is ONE Iceberg snapshot.
                while let Ok(first) = rx.recv() {
                    let mut batch = vec![first];
                    while let Ok(more) = rx.try_recv() {
                        batch.push(more);
                        if batch.len() >= 256 {
                            break;
                        }
                    }
                    if let Err(e) = append_viz_actions_blocking(&wh, &batch) {
                        eprintln!(
                            "nornir-viz action-log: append {} row(s) failed (non-fatal): {e:#}",
                            batch.len()
                        );
                    }
                }
            });
        match spawned {
            Ok(handle) => {
                if let Ok(mut g) = self.inner.lock() {
                    g.wh_tx = Some(tx);
                }
                if let Ok(mut d) = self.drain.lock() {
                    *d = Some(handle);
                }
                self.warehouse_root = Some(root);
                // The first row (the LIFE "started" line) was pushed before the
                // sink existed; re-emit a LIFE marker so the durable trail opens
                // with this session's start too.
                self.push(
                    Kind::Life,
                    format!("durable sink → warehouse viz_actions (session {})", self.session_id),
                );
            }
            Err(e) => {
                eprintln!("nornir-viz action-log: drain thread spawn failed; durable sink off: {e}");
            }
        }
    }

    /// Path of the file sink (surfaced in the overlay header).
    pub fn file_path(&self) -> &str {
        &self.file_path
    }

    /// This launch's `viz_actions.session_id`.
    pub fn session_id(&self) -> &str {
        &self.session_id
    }

    /// Set the active context (workspace + tab) stamped onto subsequent
    /// warehouse rows. Cheap; the app calls it each frame (only the lock cost).
    pub fn set_context(&self, workspace: &str, tab: &str) {
        if let Ok(mut g) = self.inner.lock() {
            if g.workspace != workspace {
                g.workspace = workspace.to_string();
            }
            if g.tab != tab {
                g.tab = tab.to_string();
            }
        }
    }

    /// Record one action. Always-on; pushes to the ring + stderr + file + (when
    /// attached) the durable warehouse sink.
    pub fn push(&self, kind: Kind, detail: impl Into<String>) {
        let detail = detail.into();
        let stamp = now_stamp();
        let ts_micros = chrono::Utc::now().timestamp_micros();
        let Ok(mut g) = self.inner.lock() else { return };
        g.seq += 1;
        let seq = g.seq;
        let entry = Entry { seq, stamp: stamp.clone(), kind, detail: detail.clone() };
        // stderr: live trail for the launching terminal / server log.
        eprintln!("nornir-viz ACTION {} [{}] {}", stamp, kind.tag(), detail);
        // file: greppable, externally observable (mirrors $NORNIR_VIZ_STATE).
        if let Some(f) = g.file.as_mut() {
            let _ = writeln!(f, "{} {:>5} [{}] {}", stamp, seq, kind.tag(), detail);
            let _ = f.flush();
        }
        // warehouse: durable, queryable sink (N5). Non-blocking send — the frame
        // never waits on Iceberg I/O; a dropped channel just disables the sink.
        if let Some(tx) = g.wh_tx.as_ref() {
            let row = VizActionRow {
                session_id: self.session_id.clone(),
                seq: seq as i64,
                ts_micros,
                kind: kind.tag().to_string(),
                workspace: g.workspace.clone(),
                tab: g.tab.clone(),
                detail: detail.clone(),
            };
            if tx.send(row).is_err() {
                // Drain thread gone (warehouse open failed) — drop the sink so we
                // stop trying.
                g.wh_tx = None;
            }
        }
        if g.ring.len() == CAP {
            g.ring.pop_front();
        }
        g.ring.push_back(entry);
    }

    /// Read this session's recent actions **back** from the durable warehouse
    /// `viz_actions` table as a JSON array (oldest→newest, last `n`). This is
    /// the read-back history surfaced in `state_json` — it proves the trail is
    /// durable + queryable, not just a `/tmp` file. Returns `"[]"` when no
    /// warehouse is attached or the read fails (never panics, never blocks a
    /// frame for long: a handful of rows scoped to one `session_id` partition).
    pub fn warehouse_recent_json(&self, n: usize) -> String {
        let Some(root) = self.warehouse_root.as_ref() else { return "[]".to_string() };
        let wh = match IcebergWarehouse::open_read_only(root) {
            Ok(w) => w,
            Err(_) => return "[]".to_string(),
        };
        let sel = ActionSelector::Session(self.session_id.clone());
        match wh.block_on(query_viz_actions(&wh, &sel, Some(n))) {
            Ok(rows) => rows_to_json(&rows),
            Err(_) => "[]".to_string(),
        }
    }

    /// Close the durable-sink channel and **block** until the background drain
    /// thread has flushed every queued action into the warehouse. After this
    /// returns, the `viz_actions` table reflects all actions pushed so far — and
    /// no more can be appended (the sink is closed). Intended for the
    /// inject-assert test (deterministic, no sleeps) and a clean shutdown; the
    /// live viz never calls it (the drain runs for the process lifetime).
    pub fn flush(&self) {
        // Drop the sender so the drain thread's `rx.recv()` returns `Err` and the
        // loop exits after appending its last batch.
        if let Ok(mut g) = self.inner.lock() {
            g.wh_tx = None;
        }
        let handle = self.drain.lock().ok().and_then(|mut d| d.take());
        if let Some(h) = handle {
            let _ = h.join();
        }
    }

    /// Snapshot the most recent `n` entries (oldest→newest) for the overlay.
    pub fn recent(&self, n: usize) -> Vec<Entry> {
        let Ok(g) = self.inner.lock() else { return Vec::new() };
        let len = g.ring.len();
        let start = len.saturating_sub(n);
        g.ring.iter().skip(start).cloned().collect()
    }

    /// Total entries recorded this session (the live counter, for the header).
    pub fn count(&self) -> u64 {
        self.inner.lock().map(|g| g.seq).unwrap_or(0)
    }

    /// Tail the action-log **file** — the single shared source of truth. Unlike
    /// [`recent`](Self::recent) (this viz's own in-memory ring), this reads back
    /// the file, so it also surfaces lines appended by **other** nornir processes
    /// to the same `$NORNIR_VIZ_ACTIONLOG` — most importantly a running
    /// `nornir bench`'s `[BENCH]` progress, so the operator watches the benchmark
    /// live in the viz's action-trail overlay. Bounded to the last `max_bytes`
    /// (cheap even when a long bench floods the file) and the last `max_lines`.
    pub fn tail_lines(&self, max_bytes: u64, max_lines: usize) -> Vec<String> {
        use std::io::{Read, Seek, SeekFrom};
        let Ok(mut f) = std::fs::File::open(&self.file_path) else { return Vec::new() };
        let len = f.metadata().map(|m| m.len()).unwrap_or(0);
        let start = len.saturating_sub(max_bytes);
        if start > 0 && f.seek(SeekFrom::Start(start)).is_err() {
            return Vec::new();
        }
        let mut bytes = Vec::new();
        if f.read_to_end(&mut bytes).is_err() {
            return Vec::new();
        }
        // from_utf8_lossy: the seek may land mid-multibyte; never panic on it.
        let text = String::from_utf8_lossy(&bytes);
        let mut lines: Vec<&str> = text.lines().collect();
        // Drop the (likely partial) first line when we seeked into the middle.
        if start > 0 && !lines.is_empty() {
            lines.remove(0);
        }
        let n = lines.len();
        lines[n.saturating_sub(max_lines)..]
            .iter()
            .map(|s| s.to_string())
            .collect()
    }
}

/// `HH:MM:SS.mmm` local-time stamp. Uses `chrono` (already a dep) so it matches
/// the rest of the codebase's time formatting.
fn now_stamp() -> String {
    chrono::Local::now().format("%H:%M:%S%.3f").to_string()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn ring_caps_and_orders() {
        // Don't clobber a real operator file during tests.
        std::env::set_var("NORNIR_VIZ_ACTIONLOG", "/tmp/nornir_viz_actions_test.log");
        let log = ActionLog::new();
        for i in 0..(CAP + 50) {
            log.push(Kind::Click, format!("click {i}"));
        }
        let recent = log.recent(10);
        assert_eq!(recent.len(), 10);
        // Newest last; seq strictly increasing.
        for w in recent.windows(2) {
            assert!(w[1].seq > w[0].seq);
        }
        // The ring never exceeds CAP even after CAP+50 (+1 the LIFE start) pushes.
        assert!(log.recent(usize::MAX).len() <= CAP);
        // Count tracks every push including the LIFE start line.
        assert_eq!(log.count(), CAP as u64 + 50 + 1);
    }

    #[test]
    fn kinds_have_distinct_tags() {
        let tags = [
            Kind::Life.tag(),
            Kind::Tab.tag(),
            Kind::Click.tag(),
            Kind::Query.tag(),
            Kind::Rpc.tag(),
            Kind::Error.tag(),
        ];
        let mut uniq = tags.to_vec();
        uniq.sort_unstable();
        uniq.dedup();
        assert_eq!(uniq.len(), tags.len());
    }
}