zero_tui/app/event_ring.rs
1//! Bounded ring of `EngineEvent`s for the live-stream pane.
2//!
3//! The TUI taps the `WsSubscriber`'s broadcast channel and
4//! appends every received event here. The ring is a FIFO with a
5//! hard capacity — old events drop off the back so the pane
6//! always fits in a fixed-size allocation even on a noisy day.
7//!
8//! A second item variant, [`RingItem::Lagged`], is pushed when
9//! the broadcast receiver reports `RecvError::Lagged`: rather
10//! than silently lose events (the conversation log is the
11//! operator's record of what the engine was saying), we drop a
12//! loud marker telling them the stream skipped N messages. This
13//! is a deliberate honesty choice — a calm empty pane after a
14//! burst of dropped events would be the worst possible failure
15//! mode for a trading terminal.
16
17use std::collections::VecDeque;
18
19use chrono::{DateTime, Utc};
20use zero_engine_client::EngineEvent;
21
22/// Default ring capacity. Chosen large enough to hold several
23/// minutes of mixed traffic at typical engine emission rates
24/// (`heartbeat` every 5 s + occasional status / positions /
25/// risk updates) while keeping the allocation under a few KB.
26pub const DEFAULT_CAPACITY: usize = 200;
27
28/// One slot in the ring. Either a real decoded engine event or
29/// a synthetic "lagged" marker recording how many events the
30/// broadcast channel skipped.
31#[derive(Debug, Clone)]
32pub enum RingItem {
33 Event(RingEntry),
34 Lagged {
35 ts: DateTime<Utc>,
36 /// Number of events the broadcast receiver reported as
37 /// dropped.
38 skipped: u64,
39 },
40}
41
42/// A decoded engine event plus the wall-clock timestamp we
43/// observed it at (falling back to "now" when the event does
44/// not carry one). Storing the ts separately keeps render
45/// formatting stateless — the pane never has to dig into the
46/// typed payload to display when it arrived.
47#[derive(Debug, Clone)]
48pub struct RingEntry {
49 pub ts: DateTime<Utc>,
50 pub event: EngineEvent,
51}
52
53/// Bounded FIFO of ring items.
54#[derive(Debug)]
55pub struct EventRing {
56 items: VecDeque<RingItem>,
57 cap: usize,
58}
59
60impl Default for EventRing {
61 fn default() -> Self {
62 Self::with_capacity(DEFAULT_CAPACITY)
63 }
64}
65
66impl EventRing {
67 #[must_use]
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 /// Build a ring with a specific capacity. `cap == 0` is
73 /// accepted and produces a drop-everything ring (tests may
74 /// use this to confirm the bound is enforced at zero too);
75 /// production callers should use [`DEFAULT_CAPACITY`].
76 #[must_use]
77 pub fn with_capacity(cap: usize) -> Self {
78 Self {
79 items: VecDeque::with_capacity(cap),
80 cap,
81 }
82 }
83
84 /// Maximum number of items this ring retains.
85 #[must_use]
86 pub const fn capacity(&self) -> usize {
87 self.cap
88 }
89
90 /// Current number of retained items.
91 #[must_use]
92 pub fn len(&self) -> usize {
93 self.items.len()
94 }
95
96 /// `true` when no items have been recorded yet.
97 #[must_use]
98 pub fn is_empty(&self) -> bool {
99 self.items.is_empty()
100 }
101
102 /// Append a typed engine event. Drops the oldest item when
103 /// at capacity so the push is O(1) and never allocates past
104 /// `cap`.
105 pub fn push_event(&mut self, event: EngineEvent) {
106 let ts = event_timestamp(&event).unwrap_or_else(Utc::now);
107 self.push(RingItem::Event(RingEntry { ts, event }));
108 }
109
110 /// Like [`Self::push_event`] with an explicit timestamp.
111 /// Exists so snapshot tests can drive deterministic content
112 /// for variants (`Status`, `Positions`, `Risk`, `Regime`)
113 /// that do not carry a ts inside their typed payload. Never
114 /// called by the runtime event loop — production writes
115 /// always go through [`Self::push_event`] which takes the
116 /// wall clock.
117 pub fn push_event_at(&mut self, event: EngineEvent, ts: DateTime<Utc>) {
118 self.push(RingItem::Event(RingEntry { ts, event }));
119 }
120
121 /// Append a "broadcast channel lagged" marker. Reserved for
122 /// the event loop's `RecvError::Lagged` branch.
123 pub fn push_lagged(&mut self, skipped: u64) {
124 self.push(RingItem::Lagged {
125 ts: Utc::now(),
126 skipped,
127 });
128 }
129
130 /// Deterministic sibling of [`Self::push_lagged`] for tests.
131 pub fn push_lagged_at(&mut self, skipped: u64, ts: DateTime<Utc>) {
132 self.push(RingItem::Lagged { ts, skipped });
133 }
134
135 fn push(&mut self, item: RingItem) {
136 if self.cap == 0 {
137 // Zero-cap ring accepts no items; this is the
138 // "discard everything" test config. Dropping here
139 // keeps the invariant `len() <= cap` across pushes.
140 return;
141 }
142 while self.items.len() >= self.cap {
143 self.items.pop_front();
144 }
145 self.items.push_back(item);
146 }
147
148 /// Iterate newest-last (i.e. chronological order). Kept
149 /// generic so callers can chain `.rev()` for newest-first
150 /// rendering without paying for a materialized `Vec`.
151 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &RingItem> {
152 self.items.iter()
153 }
154
155 /// Iterate the last `n` items in chronological order. When
156 /// `n` exceeds [`Self::len`] the iterator yields every item
157 /// — same as [`Self::iter`]. Used by the pane to render
158 /// "last N rows" without allocating.
159 pub fn tail(&self, n: usize) -> impl DoubleEndedIterator<Item = &RingItem> {
160 let start = self.items.len().saturating_sub(n);
161 self.items.iter().skip(start)
162 }
163}
164
165/// Extract the timestamp from an engine event when the variant
166/// carries one directly. Other variants fall through to "now"
167/// in [`EventRing::push_event`] — the subscriber's frame-receive
168/// time is a good enough proxy for the operator's pane.
169fn event_timestamp(evt: &EngineEvent) -> Option<DateTime<Utc>> {
170 match evt {
171 EngineEvent::Heartbeat(ts) | EngineEvent::Unknown { ts, .. } => Some(*ts),
172 EngineEvent::Status(_)
173 | EngineEvent::Positions(_)
174 | EngineEvent::Risk(_)
175 | EngineEvent::Regime(_) => None,
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 fn heartbeat_at(sec: i64) -> EngineEvent {
184 EngineEvent::Heartbeat(DateTime::<Utc>::from_timestamp(sec, 0).unwrap())
185 }
186
187 #[test]
188 fn fresh_ring_is_empty_and_reports_capacity() {
189 let r = EventRing::new();
190 assert!(r.is_empty());
191 assert_eq!(r.len(), 0);
192 assert_eq!(r.capacity(), DEFAULT_CAPACITY);
193 }
194
195 #[test]
196 fn push_increments_len_up_to_capacity() {
197 let mut r = EventRing::with_capacity(4);
198 for s in 0..3 {
199 r.push_event(heartbeat_at(s));
200 }
201 assert_eq!(r.len(), 3);
202 assert!(!r.is_empty());
203 }
204
205 #[test]
206 fn push_beyond_capacity_drops_oldest() {
207 let mut r = EventRing::with_capacity(3);
208 for s in 1..=5 {
209 r.push_event(heartbeat_at(s));
210 }
211 assert_eq!(r.len(), 3);
212 let tss: Vec<i64> = r
213 .iter()
214 .map(|it| match it {
215 RingItem::Event(e) => e.ts.timestamp(),
216 RingItem::Lagged { ts, .. } => ts.timestamp(),
217 })
218 .collect();
219 // Oldest two (1, 2) must have been discarded; 3..=5 remain.
220 assert_eq!(tss, vec![3, 4, 5]);
221 }
222
223 #[test]
224 fn zero_capacity_accepts_no_items() {
225 let mut r = EventRing::with_capacity(0);
226 r.push_event(heartbeat_at(1));
227 r.push_lagged(42);
228 assert!(r.is_empty());
229 assert_eq!(r.capacity(), 0);
230 }
231
232 #[test]
233 fn push_lagged_records_marker_with_count() {
234 let mut r = EventRing::with_capacity(2);
235 r.push_event(heartbeat_at(1));
236 r.push_lagged(7);
237 let items: Vec<_> = r.iter().collect();
238 assert_eq!(items.len(), 2);
239 assert!(matches!(items[1], RingItem::Lagged { skipped: 7, .. }));
240 }
241
242 #[test]
243 fn tail_clamps_to_ring_len() {
244 let mut r = EventRing::with_capacity(10);
245 for s in 1..=3 {
246 r.push_event(heartbeat_at(s));
247 }
248 // Asking for more than we have returns all three.
249 assert_eq!(r.tail(100).count(), 3);
250 // Asking for the last 2 returns the two newest.
251 let last2: Vec<i64> = r
252 .tail(2)
253 .map(|it| match it {
254 RingItem::Event(e) => e.ts.timestamp(),
255 RingItem::Lagged { ts, .. } => ts.timestamp(),
256 })
257 .collect();
258 assert_eq!(last2, vec![2, 3]);
259 }
260
261 #[test]
262 fn push_event_without_direct_ts_falls_back_to_now() {
263 // Status / positions / risk / regime don't carry a ts in
264 // the enum; the ring must still record them with a
265 // sensible timestamp. We don't assert the exact value
266 // (wall clock), just that the entry landed.
267 let mut r = EventRing::with_capacity(2);
268 r.push_event(EngineEvent::Heartbeat(Utc::now())); // trivial variant
269 // Unknown carries ts, exercised elsewhere. The real
270 // fallback path is hit by Status/Positions/Risk/Regime,
271 // which require a full decoded payload — not worth
272 // constructing here. The key invariant (push does not
273 // panic, timestamp is finite) is what we care about.
274 assert_eq!(r.len(), 1);
275 }
276}