net-deck 0.20.1

Operator cyberdeck — terminal UI for the Net mesh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
//! Streaming tails — Phase 4 wiring. Replaces "render straight
//! from snapshot.log_ring" with a per-stream buffer fed by the
//! SDK's `subscribe_*` APIs. The buffer is decoupled from the
//! substrate's ring cap, so an operator who pauses on LOGS
//! keeps the records they were reading even if the runtime
//! rotates them out of `log_ring`.
//!
//! The implementation is intentionally small: a parking_lot
//! `Mutex<VecDeque>` shared between a tokio spawn that drains
//! the SDK stream and the App's sync render path. Render-side
//! locks are short (push to buffer / clone slice on render),
//! never held across an await, never contended at the rate of
//! a 16Hz redraw.

use std::collections::VecDeque;
use std::sync::Arc;

use futures::StreamExt;
use net_sdk::dataforts::{BlobAdapter, BlobInventoryEntry, BlobListOptions, MeshBlobAdapter};
use net_sdk::deck::{AdminAuditRecord, DeckClient, FailureRecord, LogFilter, LogRecord};
use parking_lot::Mutex;

/// One observed nRPC call — caller, callee, method, latency,
/// status. The deck-side model is observation-only today; the
/// substrate's nRPC layer doesn't yet emit a call stream that
/// the SDK exposes, so the buffer is fed by mock injectors
/// (samples-logs) and, eventually, a real observer wire.
///
/// `#[allow(dead_code)]` because the struct is fully populated
/// only by the samples-logs injector; the default-feature
/// build sees the fields as never read.
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct NrpcCall {
    pub ts_ms: u64,
    pub caller: u64,
    pub callee: u64,
    pub method: String,
    pub latency_ms: u32,
    pub status: NrpcStatus,
    pub request_bytes: u32,
    pub response_bytes: u32,
}

#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum NrpcStatus {
    Ok,
    InFlight,
    Error(String),
    Timeout,
}

/// Capacity of the NRPC tail. Calls can be chatty (sub-ms
/// inference / mixer sub-bus updates / sensor frame requests),
/// so 5000 gives a few minutes of history at sustained traffic
/// before the ring rolls.
pub const NRPC_TAIL_CAP: usize = 5_000;

/// Shared, lock-protected ring of nRPC call records. Same
/// shape as `LogsTail`; render-side reads via `snapshot()`,
/// injectors push via `push`. `#[allow(dead_code)]` for the
/// same default-feature reason as `NrpcCall`.
#[allow(dead_code)]
#[derive(Clone)]
pub struct NrpcTail {
    pub records: Arc<Mutex<VecDeque<NrpcCall>>>,
    pub cap: usize,
}

#[allow(dead_code)]
impl NrpcTail {
    pub fn new(cap: usize) -> Self {
        Self {
            records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(1024)))),
            cap,
        }
    }

    pub fn snapshot(&self) -> Vec<NrpcCall> {
        let g = self.records.lock();
        g.iter().cloned().collect()
    }

    /// Clone the most recent `n` records (or the full ring if
    /// fewer exist). Render reads at frame rate; cloning the
    /// whole 5000-entry ring per frame is wasteful when the
    /// visible window only needs `area.height - 3` rows.
    pub fn snapshot_tail(&self, n: usize) -> Vec<NrpcCall> {
        if n == 0 {
            return Vec::new();
        }
        let g = self.records.lock();
        let take = n.min(g.len());
        let start = g.len() - take;
        g.iter().skip(start).cloned().collect()
    }

    pub fn push(&self, call: NrpcCall) {
        let mut g = self.records.lock();
        if g.len() == self.cap {
            g.pop_front();
        }
        g.push_back(call);
    }
}

/// Capacity of the LOGS tail. 5000 records × ~256B per record
/// is ~1.3MB — fine for an operator session and deep enough
/// that scrolling back through an incident's worth of lines
/// rarely runs out.
pub const LOGS_TAIL_CAP: usize = 5_000;

/// Capacity of the AUDIT tail. Admin commits are sparse
/// (operator-driven, never machine-generated), so 2000 covers
/// a long session with room to spare.
pub const AUDIT_TAIL_CAP: usize = 2_000;

/// Capacity of the FAILURES tail. Bursty on outages but rarely
/// chatty enough in steady state to need a deep history;
/// 2000 records is comfortable for incident triage.
pub const FAILURES_TAIL_CAP: usize = 2_000;

/// Shared, lock-protected ring of log records. Owned by App;
/// the streaming task holds a clone of the Arc and pushes new
/// records as they arrive.
#[derive(Clone)]
pub struct LogsTail {
    pub records: Arc<Mutex<VecDeque<LogRecord>>>,
    pub cap: usize,
}

impl LogsTail {
    pub fn new(cap: usize) -> Self {
        Self {
            records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(1024)))),
            cap,
        }
    }

    /// Copy the current buffer contents into a flat Vec for the
    /// render path. We allocate per redraw rather than returning
    /// a lock guard so the lock is held for microseconds, not
    /// the full render pass — and the render functions stay sync
    /// without leaking the lock type into their signatures.
    pub fn snapshot(&self) -> Vec<LogRecord> {
        let g = self.records.lock();
        g.iter().cloned().collect()
    }

    /// Append a record, dropping the oldest if at capacity.
    pub fn push(&self, record: LogRecord) {
        let mut g = self.records.lock();
        if g.len() == self.cap {
            g.pop_front();
        }
        g.push_back(record);
    }
}

/// Spawn the LOGS streaming task. Returns immediately; the task
/// runs until the stream errors / closes (substrate shutdown).
/// The filter is intentionally empty — App-side filters (level
/// threshold, substring search) apply at render time so
/// operators can adjust without re-subscribing.
pub fn spawn_logs_stream(deck: Arc<DeckClient>, tail: LogsTail) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut stream = deck.subscribe_logs(LogFilter::new());
        while let Some(item) = stream.next().await {
            match item {
                Ok(record) => tail.push(record),
                Err(_err) => {
                    // Stream-level errors are surfaced by the
                    // SDK but rare in practice (substrate gone).
                    // Back off briefly so a persistently failing
                    // stream doesn't peg a tokio worker; if the
                    // substrate is truly gone the stream will
                    // end on its own.
                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    continue;
                }
            }
        }
    })
}

/// AUDIT tail mirror of `LogsTail`. Same locking + capacity
/// discipline; different record type. Fed by the audit stream.
#[derive(Clone)]
pub struct AuditTail {
    pub records: Arc<Mutex<VecDeque<AdminAuditRecord>>>,
    pub cap: usize,
}

impl AuditTail {
    pub fn new(cap: usize) -> Self {
        Self {
            records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(512)))),
            cap,
        }
    }

    pub fn snapshot(&self) -> Vec<AdminAuditRecord> {
        let g = self.records.lock();
        g.iter().cloned().collect()
    }

    pub fn push(&self, record: AdminAuditRecord) {
        let mut g = self.records.lock();
        if g.len() == self.cap {
            g.pop_front();
        }
        g.push_back(record);
    }
}

/// Spawn the AUDIT streaming task. The query is unfiltered —
/// App-side toggles (`[f]` ICE-only, `[/]` search) apply at
/// render time so operators can adjust without re-subscribing.
pub fn spawn_audit_stream(deck: Arc<DeckClient>, tail: AuditTail) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut stream = deck.audit().stream();
        while let Some(item) = stream.next().await {
            match item {
                Ok(record) => tail.push(record),
                Err(_err) => {
                    // Back off briefly so a persistently
                    // failing stream doesn't peg a tokio
                    // worker in a tight retry loop. 50ms is
                    // small enough to feel instant on success;
                    // a stream that errors continuously will
                    // either end (substrate gone — the while
                    // loop exits) or recover.
                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    continue;
                }
            }
        }
    })
}

/// FAILURES tail mirror of `LogsTail` / `AuditTail`. Holds
/// executor failure records — dispatcher rejections,
/// constraint-violation drops, etc. Fed by the failure stream.
#[derive(Clone)]
pub struct FailuresTail {
    pub records: Arc<Mutex<VecDeque<FailureRecord>>>,
    pub cap: usize,
}

impl FailuresTail {
    pub fn new(cap: usize) -> Self {
        Self {
            records: Arc::new(Mutex::new(VecDeque::with_capacity(cap.min(512)))),
            cap,
        }
    }

    pub fn snapshot(&self) -> Vec<FailureRecord> {
        let g = self.records.lock();
        g.iter().cloned().collect()
    }

    pub fn push(&self, record: FailureRecord) {
        let mut g = self.records.lock();
        if g.len() == self.cap {
            g.pop_front();
        }
        g.push_back(record);
    }
}

/// Spawn the FAILURES streaming task. `since_seq = 0` replays
/// everything still on the substrate's ring at subscribe time;
/// the SDK dedups via per-record `seq` so reconnects don't
/// double-emit.
pub fn spawn_failures_stream(
    deck: Arc<DeckClient>,
    tail: FailuresTail,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut stream = deck.subscribe_failures(0);
        while let Some(item) = stream.next().await {
            match item {
                Ok(record) => tail.push(record),
                Err(_err) => {
                    // Back off briefly so a persistently
                    // failing stream doesn't peg a tokio
                    // worker in a tight retry loop. 50ms is
                    // small enough to feel instant on success;
                    // a stream that errors continuously will
                    // either end (substrate gone — the while
                    // loop exits) or recover.
                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    continue;
                }
            }
        }
    })
}

/// Cap on the BLOBS inventory snapshot the deck renders. The
/// adapter's full set may exceed this; the cap bounds memory
/// and render cost at the cost of truncating to
/// most-recently-touched entries first.
pub const BLOBS_TAIL_CAP: usize = 5_000;

/// BLOBS inventory tail. Distinct from the log / audit /
/// failure tails because the source isn't a `Stream` —
/// `BlobAdapter::list` is a one-shot query. The spawned task
/// re-polls on a fixed cadence and overwrites the cached
/// snapshot. App reads via [`Self::snapshot`].
#[derive(Clone)]
pub struct BlobsTail {
    pub records: Arc<Mutex<Vec<BlobInventoryEntry>>>,
}

impl BlobsTail {
    pub fn new() -> Self {
        Self {
            records: Arc::new(Mutex::new(Vec::new())),
        }
    }

    /// Copy the current inventory into a flat Vec for the
    /// render path. Brief lock; never held across an await.
    pub fn snapshot(&self) -> Vec<BlobInventoryEntry> {
        self.records.lock().clone()
    }

    /// Overwrite the cached inventory with a fresh poll. Used
    /// by [`spawn_blobs_poll`].
    fn replace(&self, entries: Vec<BlobInventoryEntry>) {
        *self.records.lock() = entries;
    }
}

impl Default for BlobsTail {
    fn default() -> Self {
        Self::new()
    }
}

/// Spawn the BLOBS inventory poller. Polls every wired
/// adapter on each tick and unions their results into the
/// cached snapshot via [`BlobsTail::replace`]; the merged Vec
/// is capped at `BLOBS_TAIL_CAP`. Adapter-level errors are
/// surfaced as footer toasts on each transition (ok → err /
/// changed message) so a persistently failing adapter doesn't
/// spam the operator. Cancelled when the returned handle is
/// dropped.
pub fn spawn_blobs_poll(
    adapters: Vec<Arc<MeshBlobAdapter>>,
    tail: BlobsTail,
    poll_interval: std::time::Duration,
    toast_tx: std::sync::mpsc::Sender<String>,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut ticker = tokio::time::interval(poll_interval);
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
        // Per-adapter last error state. Toasts fire only on
        // transitions (None → Some / new message) so a stuck
        // adapter logs once, not every tick.
        let mut last_err: std::collections::HashMap<String, Option<String>> =
            std::collections::HashMap::new();
        loop {
            ticker.tick().await;
            let opts = BlobListOptions {
                prefix_hex: None,
                limit: BLOBS_TAIL_CAP,
            };
            let mut merged: Vec<BlobInventoryEntry> = Vec::new();
            for adapter in &adapters {
                let id = adapter.adapter_id().to_string();
                match adapter.list(&opts).await {
                    Ok(entries) => {
                        merged.extend(entries);
                        if last_err
                            .get(&id)
                            .map(|prev| prev.is_some())
                            .unwrap_or(false)
                        {
                            let _ = toast_tx.send(format!("BLOBS poll: adapter {id} recovered"));
                        }
                        last_err.insert(id, None);
                    }
                    Err(err) => {
                        let msg = format!("{err}");
                        let changed = last_err
                            .get(&id)
                            .map(|prev| prev.as_deref() != Some(msg.as_str()))
                            .unwrap_or(true);
                        if changed {
                            let _ = toast_tx
                                .send(format!("BLOBS poll: adapter {id} list error: {msg}"));
                        }
                        last_err.insert(id, Some(msg));
                    }
                }
            }
            // Cap the merged set so unions across many adapters
            // don't blow past the render budget; the entries
            // adapters return are themselves already capped per
            // call so the truncation only kicks in on the union.
            if merged.len() > BLOBS_TAIL_CAP {
                merged.truncate(BLOBS_TAIL_CAP);
            }
            tail.replace(merged);
        }
    })
}

/// Bundle of every render-side stream the App consumes. Each
/// tail is independently cloneable (cheap `Arc` clone) so the
/// bin can hold its own handle for the spawned drain tasks while
/// still moving this struct into `App::new`. Keeps the App
/// constructor under clippy's `too_many_arguments` ceiling.
pub struct Tails {
    pub logs: LogsTail,
    pub audit: AuditTail,
    pub failures: FailuresTail,
    pub blobs: BlobsTail,
    pub nrpc: NrpcTail,
}