Skip to main content

zero_tui/widgets/
live_stream.rs

1//! Live-stream pane — a compact tail of the engine's WS push
2//! surface. Sourced from [`crate::app::event_ring::EventRing`];
3//! rendered as one row per event in chronological order (newest
4//! at the bottom so the pane reads like a log).
5//!
6//! Design choices:
7//! - Self-labeling kind column so operators can scan by color
8//!   and prefix without waiting for a full decode.
9//! - `HH:MM:SS` timestamps only. Dates are visible in the status
10//!   bar; repeating them per-row eats horizontal budget without
11//!   helping.
12//! - Honest empty state (`(no engine events yet — waiting …)`)
13//!   when the ring is empty. A blank pane would look calm even
14//!   when the subscriber is offline, which is exactly the wrong
15//!   signal to send on a trading terminal.
16//! - Broadcast-lag markers render as `!! lag · dropped N events`
17//!   in alert color; silently losing events after a burst is a
18//!   worse failure mode than a loud row.
19
20use chrono::{DateTime, Timelike, Utc};
21use ratatui::buffer::Buffer;
22use ratatui::layout::Rect;
23use ratatui::style::{Modifier, Style};
24use ratatui::text::{Line, Span};
25use ratatui::widgets::Widget;
26use zero_engine_client::EngineEvent;
27
28use crate::app::event_ring::{EventRing, RingItem};
29use crate::theme::Theme;
30
31/// Header height (`" live stream "` row) subtracted from the
32/// pane's total area before computing how many event rows fit.
33const HEADER_ROWS: u16 = 1;
34
35#[derive(Debug)]
36pub struct LiveStreamPane<'a> {
37    pub ring: &'a EventRing,
38    pub theme: Theme,
39}
40
41impl Widget for LiveStreamPane<'_> {
42    fn render(self, area: Rect, buf: &mut Buffer) {
43        clear(area, buf);
44        if area.height == 0 || area.width == 0 {
45            return;
46        }
47
48        let header_line = Line::from(vec![
49            Span::styled(
50                " live stream ",
51                Style::default()
52                    .fg(self.theme.primary)
53                    .add_modifier(Modifier::BOLD),
54            ),
55            Span::styled(
56                format!("· {} buffered", self.ring.len()),
57                Style::default().fg(self.theme.metadata),
58            ),
59        ]);
60        header_line.render(subrect(area, 0, 1), buf);
61
62        // Usable rows for event lines — pane height minus the header.
63        let body_rows = area.height.saturating_sub(HEADER_ROWS);
64        if body_rows == 0 {
65            return;
66        }
67
68        if self.ring.is_empty() {
69            Line::from(vec![Span::styled(
70                " (no engine events yet — waiting for the subscriber)",
71                Style::default().fg(self.theme.metadata),
72            )])
73            .render(subrect(area, HEADER_ROWS, 1), buf);
74            return;
75        }
76
77        // Render the last `body_rows` items in chronological
78        // order (newest at the bottom of the pane). `tail`
79        // already clamps to the ring length so we never overshoot.
80        let take = usize::from(body_rows);
81        let items: Vec<&RingItem> = self.ring.tail(take).collect();
82        for (i, item) in items.iter().enumerate() {
83            let y = u16::try_from(usize::from(HEADER_ROWS) + i).unwrap_or(u16::MAX);
84            let r = subrect(area, y, 1);
85            if r.y >= area.bottom() {
86                break;
87            }
88            Line::from(format_item(item, self.theme)).render(r, buf);
89        }
90    }
91}
92
93/// Format one ring item as a line of `Span`s. Public so tests
94/// can assert text + color without threading a fake terminal.
95#[must_use]
96pub fn format_item(item: &RingItem, theme: Theme) -> Vec<Span<'static>> {
97    match item {
98        RingItem::Event(e) => format_event(e.ts, &e.event, theme),
99        RingItem::Lagged { ts, skipped } => format_lagged(*ts, *skipped, theme),
100    }
101}
102
103fn format_event(ts: DateTime<Utc>, evt: &EngineEvent, theme: Theme) -> Vec<Span<'static>> {
104    let (kind, detail, color) = kind_detail_color(evt, theme);
105    vec![
106        Span::styled(
107            format!(" {}", fmt_hms(ts)),
108            Style::default().fg(theme.metadata),
109        ),
110        Span::styled("  ", Style::default()),
111        Span::styled(format!("[{kind:<9}]"), Style::default().fg(color)),
112        Span::styled("  ", Style::default()),
113        Span::styled(detail, Style::default().fg(theme.primary)),
114    ]
115}
116
117fn format_lagged(ts: DateTime<Utc>, skipped: u64, theme: Theme) -> Vec<Span<'static>> {
118    vec![
119        Span::styled(
120            format!(" {}", fmt_hms(ts)),
121            Style::default().fg(theme.metadata),
122        ),
123        Span::styled("  ", Style::default()),
124        Span::styled(
125            "[!! lag   ]",
126            Style::default()
127                .fg(theme.alert)
128                .add_modifier(Modifier::BOLD),
129        ),
130        Span::styled("  ", Style::default()),
131        Span::styled(
132            format!("broadcast channel dropped {skipped} events"),
133            Style::default().fg(theme.alert),
134        ),
135    ]
136}
137
138fn kind_detail_color(
139    evt: &EngineEvent,
140    theme: Theme,
141) -> (&'static str, String, ratatui::style::Color) {
142    match evt {
143        EngineEvent::Heartbeat(_) => ("heartbeat", "engine alive".into(), theme.muted),
144        EngineEvent::Status(s) => {
145            let regime = s.regime().unwrap_or("—");
146            let conf = s
147                .engine_confidence()
148                .map_or("—".to_string(), |v| format!("{v:.0}"));
149            let eq = s.equity().map_or("—".to_string(), |v| format!("{v:.2}"));
150            (
151                "status",
152                format!("regime={regime}  conf={conf}  eq={eq}"),
153                theme.primary,
154            )
155        }
156        EngineEvent::Positions(p) => {
157            let n = p.items.len();
158            (
159                "positions",
160                format!("{n} open position{}", if n == 1 { "" } else { "s" }),
161                theme.primary,
162            )
163        }
164        EngineEvent::Risk(r) => {
165            let halted = r.is_halted();
166            let dd = r
167                .drawdown_pct
168                .map_or("—".to_string(), |v| format!("{v:.2}%"));
169            let loss = r
170                .daily_loss_pct()
171                .map(|v| format!("{v:.2}%"))
172                .or_else(|| r.daily_loss_usd.map(|v| format!("${v:.2}")))
173                .unwrap_or_else(|| "—".to_string());
174            let pnl = r
175                .daily_pnl_usd
176                .map_or("—".to_string(), |v| format!("{v:+.2}"));
177            let line = format!("dd={dd}  daily-loss={loss}  daily-pnl={pnl}");
178            // Halted risk is the one WS event that absolutely must
179            // grab the operator's eye even inside a fast-scrolling
180            // pane; paint its row in alert.
181            let color = if halted { theme.alert } else { theme.primary };
182            ("risk", line, color)
183        }
184        EngineEvent::Regime(r) => {
185            let name = r.regime.as_deref().unwrap_or("—");
186            let conf = r.confidence.map_or("—".to_string(), |v| format!("{v:.2}"));
187            ("regime", format!("{name}  conf={conf}"), theme.caution)
188        }
189        EngineEvent::Unknown { event, .. } => ("unknown", format!("event={event}"), theme.metadata),
190    }
191}
192
193fn fmt_hms(ts: DateTime<Utc>) -> String {
194    format!("{:02}:{:02}:{:02}", ts.hour(), ts.minute(), ts.second())
195}
196
197fn clear(area: Rect, buf: &mut Buffer) {
198    for y in area.top()..area.bottom() {
199        for x in area.left()..area.right() {
200            buf[(x, y)].set_char(' ');
201        }
202    }
203}
204
205fn subrect(area: Rect, y_offset: u16, height: u16) -> Rect {
206    Rect {
207        x: area.x,
208        y: area
209            .y
210            .saturating_add(y_offset)
211            .min(area.bottom().saturating_sub(1)),
212        width: area.width,
213        height: height.min(area.height.saturating_sub(y_offset)),
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use chrono::TimeZone;
221    use zero_engine_client::models::{Positions, Risk};
222
223    fn theme() -> Theme {
224        Theme::phosphor()
225    }
226
227    fn ts_at(h: u32, m: u32, s: u32) -> DateTime<Utc> {
228        Utc.with_ymd_and_hms(2030, 1, 2, h, m, s).unwrap()
229    }
230
231    fn render_line(item: &RingItem) -> String {
232        format_item(item, theme())
233            .iter()
234            .map(|s| s.content.as_ref())
235            .collect()
236    }
237
238    #[test]
239    fn heartbeat_row_shows_hms_and_muted_label() {
240        let item = RingItem::Event(super::super::super::app::event_ring::RingEntry {
241            ts: ts_at(9, 30, 15),
242            event: EngineEvent::Heartbeat(ts_at(9, 30, 15)),
243        });
244        let out = render_line(&item);
245        assert!(out.contains("09:30:15"), "ts: {out}");
246        assert!(out.contains("heartbeat"), "kind: {out}");
247        assert!(out.contains("engine alive"), "detail: {out}");
248    }
249
250    #[test]
251    fn risk_row_includes_percentages() {
252        let risk = Risk {
253            drawdown_pct: Some(1.25),
254            daily_loss_usd: Some(5.0),
255            peak_equity: Some(1000.0),
256            ..Default::default()
257        };
258        let item = RingItem::Event(super::super::super::app::event_ring::RingEntry {
259            ts: ts_at(9, 30, 15),
260            event: EngineEvent::Risk(Box::new(risk)),
261        });
262        let out = render_line(&item);
263        assert!(out.contains("dd=1.25%"), "{out}");
264        // daily_loss_usd=5 / peak_equity=1000 => 0.50%
265        assert!(out.contains("daily-loss=0.50%"), "{out}");
266    }
267
268    #[test]
269    fn positions_row_pluralizes() {
270        let p = Positions::default();
271        let item_none = RingItem::Event(super::super::super::app::event_ring::RingEntry {
272            ts: ts_at(0, 0, 0),
273            event: EngineEvent::Positions(Box::new(p)),
274        });
275        let out_none = render_line(&item_none);
276        assert!(out_none.contains("0 open positions"), "{out_none}");
277    }
278
279    #[test]
280    fn lagged_row_is_loud() {
281        let item = RingItem::Lagged {
282            ts: ts_at(12, 0, 0),
283            skipped: 42,
284        };
285        let out = render_line(&item);
286        assert!(out.contains("!! lag"), "prefix: {out}");
287        assert!(out.contains("42 events"), "count: {out}");
288    }
289
290    #[test]
291    fn unknown_event_falls_back_to_event_kind_label() {
292        let item = RingItem::Event(super::super::super::app::event_ring::RingEntry {
293            ts: ts_at(1, 2, 3),
294            event: EngineEvent::Unknown {
295                event: "scar_fired".into(),
296                ts: ts_at(1, 2, 3),
297                data: serde_json::Value::default(),
298            },
299        });
300        let out = render_line(&item);
301        assert!(out.contains("unknown"), "{out}");
302        assert!(out.contains("scar_fired"), "{out}");
303    }
304
305    #[test]
306    fn pane_renders_empty_state_when_ring_has_nothing() {
307        let ring = EventRing::new();
308        let mut buf = Buffer::empty(Rect::new(0, 0, 80, 6));
309        LiveStreamPane {
310            ring: &ring,
311            theme: theme(),
312        }
313        .render(Rect::new(0, 0, 80, 6), &mut buf);
314        let row1 = row_string(&buf, 1);
315        assert!(
316            row1.contains("no engine events yet"),
317            "honest empty state: {row1}"
318        );
319    }
320
321    #[test]
322    fn pane_renders_last_rows_only_when_ring_exceeds_height() {
323        let mut ring = EventRing::with_capacity(20);
324        for s in 0..10 {
325            ring.push_event(EngineEvent::Heartbeat(ts_at(9, 0, s)));
326        }
327        // Pane with 1 header + 3 body rows should show the last 3 heartbeats.
328        let rect = Rect::new(0, 0, 80, 4);
329        let mut buf = Buffer::empty(rect);
330        LiveStreamPane {
331            ring: &ring,
332            theme: theme(),
333        }
334        .render(rect, &mut buf);
335        let body = (1..4).map(|y| row_string(&buf, y)).collect::<Vec<_>>();
336        assert!(body[0].contains("09:00:07"), "{body:?}");
337        assert!(body[1].contains("09:00:08"), "{body:?}");
338        assert!(body[2].contains("09:00:09"), "{body:?}");
339    }
340
341    #[test]
342    fn zero_height_pane_does_not_panic() {
343        let ring = EventRing::new();
344        let rect = Rect::new(0, 0, 80, 0);
345        let mut buf = Buffer::empty(Rect::new(0, 0, 80, 2));
346        LiveStreamPane {
347            ring: &ring,
348            theme: theme(),
349        }
350        .render(rect, &mut buf);
351    }
352
353    fn row_string(buf: &Buffer, y: u16) -> String {
354        (0..buf.area.width)
355            .map(|x| buf[(x, y)].symbol())
356            .collect::<String>()
357    }
358}