1use chrono::{DateTime, Timelike, Utc};
21use ratatui::buffer::Buffer;
22use ratatui::layout::Rect;
23use ratatui::style::{Modifier, Style};
24use ratatui::text::{Line, Span};
25use ratatui::widgets::Widget;
26use zero_engine_client::EngineEvent;
27
28use crate::app::event_ring::{EventRing, RingItem};
29use crate::theme::Theme;
30
31const HEADER_ROWS: u16 = 1;
34
35#[derive(Debug)]
36pub struct LiveStreamPane<'a> {
37 pub ring: &'a EventRing,
38 pub theme: Theme,
39}
40
41impl Widget for LiveStreamPane<'_> {
42 fn render(self, area: Rect, buf: &mut Buffer) {
43 clear(area, buf);
44 if area.height == 0 || area.width == 0 {
45 return;
46 }
47
48 let header_line = Line::from(vec![
49 Span::styled(
50 " live stream ",
51 Style::default()
52 .fg(self.theme.primary)
53 .add_modifier(Modifier::BOLD),
54 ),
55 Span::styled(
56 format!("· {} buffered", self.ring.len()),
57 Style::default().fg(self.theme.metadata),
58 ),
59 ]);
60 header_line.render(subrect(area, 0, 1), buf);
61
62 let body_rows = area.height.saturating_sub(HEADER_ROWS);
64 if body_rows == 0 {
65 return;
66 }
67
68 if self.ring.is_empty() {
69 Line::from(vec![Span::styled(
70 " (no engine events yet — waiting for the subscriber)",
71 Style::default().fg(self.theme.metadata),
72 )])
73 .render(subrect(area, HEADER_ROWS, 1), buf);
74 return;
75 }
76
77 let take = usize::from(body_rows);
81 let items: Vec<&RingItem> = self.ring.tail(take).collect();
82 for (i, item) in items.iter().enumerate() {
83 let y = u16::try_from(usize::from(HEADER_ROWS) + i).unwrap_or(u16::MAX);
84 let r = subrect(area, y, 1);
85 if r.y >= area.bottom() {
86 break;
87 }
88 Line::from(format_item(item, self.theme)).render(r, buf);
89 }
90 }
91}
92
93#[must_use]
96pub fn format_item(item: &RingItem, theme: Theme) -> Vec<Span<'static>> {
97 match item {
98 RingItem::Event(e) => format_event(e.ts, &e.event, theme),
99 RingItem::Lagged { ts, skipped } => format_lagged(*ts, *skipped, theme),
100 }
101}
102
103fn format_event(ts: DateTime<Utc>, evt: &EngineEvent, theme: Theme) -> Vec<Span<'static>> {
104 let (kind, detail, color) = kind_detail_color(evt, theme);
105 vec![
106 Span::styled(
107 format!(" {}", fmt_hms(ts)),
108 Style::default().fg(theme.metadata),
109 ),
110 Span::styled(" ", Style::default()),
111 Span::styled(format!("[{kind:<9}]"), Style::default().fg(color)),
112 Span::styled(" ", Style::default()),
113 Span::styled(detail, Style::default().fg(theme.primary)),
114 ]
115}
116
117fn format_lagged(ts: DateTime<Utc>, skipped: u64, theme: Theme) -> Vec<Span<'static>> {
118 vec![
119 Span::styled(
120 format!(" {}", fmt_hms(ts)),
121 Style::default().fg(theme.metadata),
122 ),
123 Span::styled(" ", Style::default()),
124 Span::styled(
125 "[!! lag ]",
126 Style::default()
127 .fg(theme.alert)
128 .add_modifier(Modifier::BOLD),
129 ),
130 Span::styled(" ", Style::default()),
131 Span::styled(
132 format!("broadcast channel dropped {skipped} events"),
133 Style::default().fg(theme.alert),
134 ),
135 ]
136}
137
138fn kind_detail_color(
139 evt: &EngineEvent,
140 theme: Theme,
141) -> (&'static str, String, ratatui::style::Color) {
142 match evt {
143 EngineEvent::Heartbeat(_) => ("heartbeat", "engine alive".into(), theme.muted),
144 EngineEvent::Status(s) => {
145 let regime = s.regime().unwrap_or("—");
146 let conf = s
147 .engine_confidence()
148 .map_or("—".to_string(), |v| format!("{v:.0}"));
149 let eq = s.equity().map_or("—".to_string(), |v| format!("{v:.2}"));
150 (
151 "status",
152 format!("regime={regime} conf={conf} eq={eq}"),
153 theme.primary,
154 )
155 }
156 EngineEvent::Positions(p) => {
157 let n = p.items.len();
158 (
159 "positions",
160 format!("{n} open position{}", if n == 1 { "" } else { "s" }),
161 theme.primary,
162 )
163 }
164 EngineEvent::Risk(r) => {
165 let halted = r.is_halted();
166 let dd = r
167 .drawdown_pct
168 .map_or("—".to_string(), |v| format!("{v:.2}%"));
169 let loss = r
170 .daily_loss_pct()
171 .map(|v| format!("{v:.2}%"))
172 .or_else(|| r.daily_loss_usd.map(|v| format!("${v:.2}")))
173 .unwrap_or_else(|| "—".to_string());
174 let pnl = r
175 .daily_pnl_usd
176 .map_or("—".to_string(), |v| format!("{v:+.2}"));
177 let line = format!("dd={dd} daily-loss={loss} daily-pnl={pnl}");
178 let color = if halted { theme.alert } else { theme.primary };
182 ("risk", line, color)
183 }
184 EngineEvent::Regime(r) => {
185 let name = r.regime.as_deref().unwrap_or("—");
186 let conf = r.confidence.map_or("—".to_string(), |v| format!("{v:.2}"));
187 ("regime", format!("{name} conf={conf}"), theme.caution)
188 }
189 EngineEvent::Unknown { event, .. } => ("unknown", format!("event={event}"), theme.metadata),
190 }
191}
192
193fn fmt_hms(ts: DateTime<Utc>) -> String {
194 format!("{:02}:{:02}:{:02}", ts.hour(), ts.minute(), ts.second())
195}
196
197fn clear(area: Rect, buf: &mut Buffer) {
198 for y in area.top()..area.bottom() {
199 for x in area.left()..area.right() {
200 buf[(x, y)].set_char(' ');
201 }
202 }
203}
204
205fn subrect(area: Rect, y_offset: u16, height: u16) -> Rect {
206 Rect {
207 x: area.x,
208 y: area
209 .y
210 .saturating_add(y_offset)
211 .min(area.bottom().saturating_sub(1)),
212 width: area.width,
213 height: height.min(area.height.saturating_sub(y_offset)),
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use chrono::TimeZone;
221 use zero_engine_client::models::{Positions, Risk};
222
223 fn theme() -> Theme {
224 Theme::phosphor()
225 }
226
227 fn ts_at(h: u32, m: u32, s: u32) -> DateTime<Utc> {
228 Utc.with_ymd_and_hms(2030, 1, 2, h, m, s).unwrap()
229 }
230
231 fn render_line(item: &RingItem) -> String {
232 format_item(item, theme())
233 .iter()
234 .map(|s| s.content.as_ref())
235 .collect()
236 }
237
238 #[test]
239 fn heartbeat_row_shows_hms_and_muted_label() {
240 let item = RingItem::Event(super::super::super::app::event_ring::RingEntry {
241 ts: ts_at(9, 30, 15),
242 event: EngineEvent::Heartbeat(ts_at(9, 30, 15)),
243 });
244 let out = render_line(&item);
245 assert!(out.contains("09:30:15"), "ts: {out}");
246 assert!(out.contains("heartbeat"), "kind: {out}");
247 assert!(out.contains("engine alive"), "detail: {out}");
248 }
249
250 #[test]
251 fn risk_row_includes_percentages() {
252 let risk = Risk {
253 drawdown_pct: Some(1.25),
254 daily_loss_usd: Some(5.0),
255 peak_equity: Some(1000.0),
256 ..Default::default()
257 };
258 let item = RingItem::Event(super::super::super::app::event_ring::RingEntry {
259 ts: ts_at(9, 30, 15),
260 event: EngineEvent::Risk(Box::new(risk)),
261 });
262 let out = render_line(&item);
263 assert!(out.contains("dd=1.25%"), "{out}");
264 assert!(out.contains("daily-loss=0.50%"), "{out}");
266 }
267
268 #[test]
269 fn positions_row_pluralizes() {
270 let p = Positions::default();
271 let item_none = RingItem::Event(super::super::super::app::event_ring::RingEntry {
272 ts: ts_at(0, 0, 0),
273 event: EngineEvent::Positions(Box::new(p)),
274 });
275 let out_none = render_line(&item_none);
276 assert!(out_none.contains("0 open positions"), "{out_none}");
277 }
278
279 #[test]
280 fn lagged_row_is_loud() {
281 let item = RingItem::Lagged {
282 ts: ts_at(12, 0, 0),
283 skipped: 42,
284 };
285 let out = render_line(&item);
286 assert!(out.contains("!! lag"), "prefix: {out}");
287 assert!(out.contains("42 events"), "count: {out}");
288 }
289
290 #[test]
291 fn unknown_event_falls_back_to_event_kind_label() {
292 let item = RingItem::Event(super::super::super::app::event_ring::RingEntry {
293 ts: ts_at(1, 2, 3),
294 event: EngineEvent::Unknown {
295 event: "scar_fired".into(),
296 ts: ts_at(1, 2, 3),
297 data: serde_json::Value::default(),
298 },
299 });
300 let out = render_line(&item);
301 assert!(out.contains("unknown"), "{out}");
302 assert!(out.contains("scar_fired"), "{out}");
303 }
304
305 #[test]
306 fn pane_renders_empty_state_when_ring_has_nothing() {
307 let ring = EventRing::new();
308 let mut buf = Buffer::empty(Rect::new(0, 0, 80, 6));
309 LiveStreamPane {
310 ring: &ring,
311 theme: theme(),
312 }
313 .render(Rect::new(0, 0, 80, 6), &mut buf);
314 let row1 = row_string(&buf, 1);
315 assert!(
316 row1.contains("no engine events yet"),
317 "honest empty state: {row1}"
318 );
319 }
320
321 #[test]
322 fn pane_renders_last_rows_only_when_ring_exceeds_height() {
323 let mut ring = EventRing::with_capacity(20);
324 for s in 0..10 {
325 ring.push_event(EngineEvent::Heartbeat(ts_at(9, 0, s)));
326 }
327 let rect = Rect::new(0, 0, 80, 4);
329 let mut buf = Buffer::empty(rect);
330 LiveStreamPane {
331 ring: &ring,
332 theme: theme(),
333 }
334 .render(rect, &mut buf);
335 let body = (1..4).map(|y| row_string(&buf, y)).collect::<Vec<_>>();
336 assert!(body[0].contains("09:00:07"), "{body:?}");
337 assert!(body[1].contains("09:00:08"), "{body:?}");
338 assert!(body[2].contains("09:00:09"), "{body:?}");
339 }
340
341 #[test]
342 fn zero_height_pane_does_not_panic() {
343 let ring = EventRing::new();
344 let rect = Rect::new(0, 0, 80, 0);
345 let mut buf = Buffer::empty(Rect::new(0, 0, 80, 2));
346 LiveStreamPane {
347 ring: &ring,
348 theme: theme(),
349 }
350 .render(rect, &mut buf);
351 }
352
353 fn row_string(buf: &Buffer, y: u16) -> String {
354 (0..buf.area.width)
355 .map(|x| buf[(x, y)].symbol())
356 .collect::<String>()
357 }
358}