Skip to main content

zero_tui/app/
event_ring.rs

1//! Bounded ring of `EngineEvent`s for the live-stream pane.
2//!
3//! The TUI taps the `WsSubscriber`'s broadcast channel and
4//! appends every received event here. The ring is a FIFO with a
5//! hard capacity — old events drop off the back so the pane
6//! always fits in a fixed-size allocation even on a noisy day.
7//!
8//! A second item variant, [`RingItem::Lagged`], is pushed when
9//! the broadcast receiver reports `RecvError::Lagged`: rather
10//! than silently lose events (the conversation log is the
11//! operator's record of what the engine was saying), we drop a
12//! loud marker telling them the stream skipped N messages. This
13//! is a deliberate honesty choice — a calm empty pane after a
14//! burst of dropped events would be the worst possible failure
15//! mode for a trading terminal.
16
17use std::collections::VecDeque;
18
19use chrono::{DateTime, Utc};
20use zero_engine_client::EngineEvent;
21
22/// Default ring capacity. Chosen large enough to hold several
23/// minutes of mixed traffic at typical engine emission rates
24/// (`heartbeat` every 5 s + occasional status / positions /
25/// risk updates) while keeping the allocation under a few KB.
26pub const DEFAULT_CAPACITY: usize = 200;
27
28/// One slot in the ring. Either a real decoded engine event or
29/// a synthetic "lagged" marker recording how many events the
30/// broadcast channel skipped.
31#[derive(Debug, Clone)]
32pub enum RingItem {
33    Event(RingEntry),
34    Lagged {
35        ts: DateTime<Utc>,
36        /// Number of events the broadcast receiver reported as
37        /// dropped.
38        skipped: u64,
39    },
40}
41
42/// A decoded engine event plus the wall-clock timestamp we
43/// observed it at (falling back to "now" when the event does
44/// not carry one). Storing the ts separately keeps render
45/// formatting stateless — the pane never has to dig into the
46/// typed payload to display when it arrived.
47#[derive(Debug, Clone)]
48pub struct RingEntry {
49    pub ts: DateTime<Utc>,
50    pub event: EngineEvent,
51}
52
53/// Bounded FIFO of ring items.
54#[derive(Debug)]
55pub struct EventRing {
56    items: VecDeque<RingItem>,
57    cap: usize,
58}
59
60impl Default for EventRing {
61    fn default() -> Self {
62        Self::with_capacity(DEFAULT_CAPACITY)
63    }
64}
65
66impl EventRing {
67    #[must_use]
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Build a ring with a specific capacity. `cap == 0` is
73    /// accepted and produces a drop-everything ring (tests may
74    /// use this to confirm the bound is enforced at zero too);
75    /// production callers should use [`DEFAULT_CAPACITY`].
76    #[must_use]
77    pub fn with_capacity(cap: usize) -> Self {
78        Self {
79            items: VecDeque::with_capacity(cap),
80            cap,
81        }
82    }
83
84    /// Maximum number of items this ring retains.
85    #[must_use]
86    pub const fn capacity(&self) -> usize {
87        self.cap
88    }
89
90    /// Current number of retained items.
91    #[must_use]
92    pub fn len(&self) -> usize {
93        self.items.len()
94    }
95
96    /// `true` when no items have been recorded yet.
97    #[must_use]
98    pub fn is_empty(&self) -> bool {
99        self.items.is_empty()
100    }
101
102    /// Append a typed engine event. Drops the oldest item when
103    /// at capacity so the push is O(1) and never allocates past
104    /// `cap`.
105    pub fn push_event(&mut self, event: EngineEvent) {
106        let ts = event_timestamp(&event).unwrap_or_else(Utc::now);
107        self.push(RingItem::Event(RingEntry { ts, event }));
108    }
109
110    /// Like [`Self::push_event`] with an explicit timestamp.
111    /// Exists so snapshot tests can drive deterministic content
112    /// for variants (`Status`, `Positions`, `Risk`, `Regime`)
113    /// that do not carry a ts inside their typed payload. Never
114    /// called by the runtime event loop — production writes
115    /// always go through [`Self::push_event`] which takes the
116    /// wall clock.
117    pub fn push_event_at(&mut self, event: EngineEvent, ts: DateTime<Utc>) {
118        self.push(RingItem::Event(RingEntry { ts, event }));
119    }
120
121    /// Append a "broadcast channel lagged" marker. Reserved for
122    /// the event loop's `RecvError::Lagged` branch.
123    pub fn push_lagged(&mut self, skipped: u64) {
124        self.push(RingItem::Lagged {
125            ts: Utc::now(),
126            skipped,
127        });
128    }
129
130    /// Deterministic sibling of [`Self::push_lagged`] for tests.
131    pub fn push_lagged_at(&mut self, skipped: u64, ts: DateTime<Utc>) {
132        self.push(RingItem::Lagged { ts, skipped });
133    }
134
135    fn push(&mut self, item: RingItem) {
136        if self.cap == 0 {
137            // Zero-cap ring accepts no items; this is the
138            // "discard everything" test config. Dropping here
139            // keeps the invariant `len() <= cap` across pushes.
140            return;
141        }
142        while self.items.len() >= self.cap {
143            self.items.pop_front();
144        }
145        self.items.push_back(item);
146    }
147
148    /// Iterate newest-last (i.e. chronological order). Kept
149    /// generic so callers can chain `.rev()` for newest-first
150    /// rendering without paying for a materialized `Vec`.
151    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &RingItem> {
152        self.items.iter()
153    }
154
155    /// Iterate the last `n` items in chronological order. When
156    /// `n` exceeds [`Self::len`] the iterator yields every item
157    /// — same as [`Self::iter`]. Used by the pane to render
158    /// "last N rows" without allocating.
159    pub fn tail(&self, n: usize) -> impl DoubleEndedIterator<Item = &RingItem> {
160        let start = self.items.len().saturating_sub(n);
161        self.items.iter().skip(start)
162    }
163}
164
165/// Extract the timestamp from an engine event when the variant
166/// carries one directly. Other variants fall through to "now"
167/// in [`EventRing::push_event`] — the subscriber's frame-receive
168/// time is a good enough proxy for the operator's pane.
169fn event_timestamp(evt: &EngineEvent) -> Option<DateTime<Utc>> {
170    match evt {
171        EngineEvent::Heartbeat(ts) | EngineEvent::Unknown { ts, .. } => Some(*ts),
172        EngineEvent::Status(_)
173        | EngineEvent::Positions(_)
174        | EngineEvent::Risk(_)
175        | EngineEvent::Regime(_) => None,
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    fn heartbeat_at(sec: i64) -> EngineEvent {
184        EngineEvent::Heartbeat(DateTime::<Utc>::from_timestamp(sec, 0).unwrap())
185    }
186
187    #[test]
188    fn fresh_ring_is_empty_and_reports_capacity() {
189        let r = EventRing::new();
190        assert!(r.is_empty());
191        assert_eq!(r.len(), 0);
192        assert_eq!(r.capacity(), DEFAULT_CAPACITY);
193    }
194
195    #[test]
196    fn push_increments_len_up_to_capacity() {
197        let mut r = EventRing::with_capacity(4);
198        for s in 0..3 {
199            r.push_event(heartbeat_at(s));
200        }
201        assert_eq!(r.len(), 3);
202        assert!(!r.is_empty());
203    }
204
205    #[test]
206    fn push_beyond_capacity_drops_oldest() {
207        let mut r = EventRing::with_capacity(3);
208        for s in 1..=5 {
209            r.push_event(heartbeat_at(s));
210        }
211        assert_eq!(r.len(), 3);
212        let tss: Vec<i64> = r
213            .iter()
214            .map(|it| match it {
215                RingItem::Event(e) => e.ts.timestamp(),
216                RingItem::Lagged { ts, .. } => ts.timestamp(),
217            })
218            .collect();
219        // Oldest two (1, 2) must have been discarded; 3..=5 remain.
220        assert_eq!(tss, vec![3, 4, 5]);
221    }
222
223    #[test]
224    fn zero_capacity_accepts_no_items() {
225        let mut r = EventRing::with_capacity(0);
226        r.push_event(heartbeat_at(1));
227        r.push_lagged(42);
228        assert!(r.is_empty());
229        assert_eq!(r.capacity(), 0);
230    }
231
232    #[test]
233    fn push_lagged_records_marker_with_count() {
234        let mut r = EventRing::with_capacity(2);
235        r.push_event(heartbeat_at(1));
236        r.push_lagged(7);
237        let items: Vec<_> = r.iter().collect();
238        assert_eq!(items.len(), 2);
239        assert!(matches!(items[1], RingItem::Lagged { skipped: 7, .. }));
240    }
241
242    #[test]
243    fn tail_clamps_to_ring_len() {
244        let mut r = EventRing::with_capacity(10);
245        for s in 1..=3 {
246            r.push_event(heartbeat_at(s));
247        }
248        // Asking for more than we have returns all three.
249        assert_eq!(r.tail(100).count(), 3);
250        // Asking for the last 2 returns the two newest.
251        let last2: Vec<i64> = r
252            .tail(2)
253            .map(|it| match it {
254                RingItem::Event(e) => e.ts.timestamp(),
255                RingItem::Lagged { ts, .. } => ts.timestamp(),
256            })
257            .collect();
258        assert_eq!(last2, vec![2, 3]);
259    }
260
261    #[test]
262    fn push_event_without_direct_ts_falls_back_to_now() {
263        // Status / positions / risk / regime don't carry a ts in
264        // the enum; the ring must still record them with a
265        // sensible timestamp. We don't assert the exact value
266        // (wall clock), just that the entry landed.
267        let mut r = EventRing::with_capacity(2);
268        r.push_event(EngineEvent::Heartbeat(Utc::now())); // trivial variant
269        // Unknown carries ts, exercised elsewhere. The real
270        // fallback path is hit by Status/Positions/Risk/Regime,
271        // which require a full decoded payload — not worth
272        // constructing here. The key invariant (push does not
273        // panic, timestamp is finite) is what we care about.
274        assert_eq!(r.len(), 1);
275    }
276}