Skip to main content

zero_engine_client/
state.rs

1//! `EngineState` — a CLI-side mirror of live engine state.
2//!
3//! Populated by the WS subscriber (push) and opportunistically by
4//! HTTP polls (pull). Every field that the TUI renders is a
5//! `Stat<T>`, so staleness is visible and the renderer can refuse
6//! to display values that age out.
7//!
8//! The mirror is read by widgets via a cheap clone of its `Arc`,
9//! and it is updated only from within the subscriber's task. This
10//! keeps the lock scope small and lets ratatui's render loop
11//! acquire `read()` handles without contention.
12//!
13//! See ADR-003 (state model) and spec §3.5 ("engine is the source
14//! of truth").
15
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use parking_lot::RwLock;
20use zero_operator_state::Snapshot as OperatorSnapshot;
21
22use crate::models::{LiveCockpit, Positions, Regime, Risk, V2Status};
23use crate::stat::{Source, Stat};
24
25/// A connection health roll-up for the status bar.
26#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct ConnectionHealth {
28    /// Whether the WS is currently up.
29    pub ws_connected: bool,
30    /// Failed-connect attempts since the last successful connect.
31    /// Drives the "reconnecting (attempt N)…" status-bar hint and
32    /// resets to zero the moment the socket is up again.
33    pub reconnect_count: u32,
34    /// Lifetime reconnect attempts. Never resets. Used by
35    /// observability tooling (and tests) that need to see that a
36    /// reconnect happened even after the subscriber recovered.
37    pub total_attempts: u64,
38    /// When the most recent reconnect attempt began. Useful for
39    /// rendering "reconnecting in 2s…" hints.
40    pub last_reconnect_at: Option<DateTime<Utc>>,
41}
42
43/// Live mirror of the engine fields the CLI renders.
44///
45/// Every field is an `Option<Stat<T>>` — `None` means "we have not
46/// seen this yet," not "the engine doesn't have one." The renderer
47/// distinguishes these states: unseen → placeholder, stale → amber
48/// staleness badge, fresh → normal render.
49#[derive(Debug, Default, Clone)]
50pub struct EngineState {
51    pub status: Option<Stat<V2Status>>,
52    pub positions: Option<Stat<Positions>>,
53    pub risk: Option<Stat<Risk>>,
54    pub regime: Option<Stat<Regime>>,
55    /// Consolidated live-readiness cockpit from `GET /live/cockpit`.
56    /// This is read-only operator state: preflight, immune,
57    /// reconciliation, certification, heartbeat, and local receipt
58    /// counts. It intentionally lives in the same mirror as status,
59    /// positions, risk, and regime so the full-screen TUI cockpit can
60    /// render without issuing network calls from the draw path.
61    pub live_cockpit: Option<Stat<LiveCockpit>>,
62    /// Operator behavioral state snapshot mirrored from the engine's
63    /// `GET /operator/state` endpoint (ADR-016). The classifier runs
64    /// on the engine host; the CLI only renders. `None` means "never
65    /// observed" — the status bar falls back to `?`.
66    pub operator_state: Option<Stat<OperatorSnapshot>>,
67    /// Most recent heartbeat timestamp from the engine's bus poller.
68    /// Drives the freshness clock — if this stops advancing, the
69    /// status bar goes amber then red per `feed` thresholds.
70    pub last_heartbeat: Option<DateTime<Utc>>,
71    pub connection: ConnectionHealth,
72}
73
74impl EngineState {
75    #[must_use]
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Returns an `Arc<RwLock<Self>>` ready for sharing between the
81    /// WS subscriber task and the TUI render loop.
82    #[must_use]
83    pub fn shared() -> Arc<RwLock<Self>> {
84        Arc::new(RwLock::new(Self::new()))
85    }
86
87    /// Freshness of the feed, in seconds, measured against `now`.
88    /// Returns `None` if no heartbeat has been observed.
89    #[must_use]
90    pub fn feed_age_seconds(&self, now: DateTime<Utc>) -> Option<i64> {
91        self.last_heartbeat
92            .map(|hb| now.signed_duration_since(hb).num_seconds())
93    }
94
95    /// Hyperliquid per-minute rate the engine is reporting, if any.
96    ///
97    /// Returns `None` when `/v2/status` has not been observed yet
98    /// or when the observed payload did not carry an `hl_rate`
99    /// field (older engine). The caller renders `hl:?` in both
100    /// cases — from the operator's seat, "we haven't heard"
101    /// and "the engine is not telling" are indistinguishable, so
102    /// the same metadata-color placeholder is the honest render.
103    #[must_use]
104    pub fn hl_rate_snapshot(&self) -> Option<crate::models::HlRate> {
105        self.status.as_ref().and_then(|s| s.value.hl_rate)
106    }
107
108    /// Record that a WS connection was established. Resets the
109    /// reconnect counter to zero — the one we just made is behind us.
110    pub fn on_ws_connected(&mut self) {
111        self.connection.ws_connected = true;
112        self.connection.reconnect_count = 0;
113    }
114
115    /// Record that the WS dropped. Does not reset `reconnect_count`
116    /// — that ticks up on each attempt until one succeeds.
117    pub fn on_ws_disconnected(&mut self) {
118        self.connection.ws_connected = false;
119    }
120
121    /// Record the start of a new reconnect attempt.
122    pub fn on_reconnect_attempt(&mut self, at: DateTime<Utc>) {
123        self.connection.reconnect_count = self.connection.reconnect_count.saturating_add(1);
124        self.connection.total_attempts = self.connection.total_attempts.saturating_add(1);
125        self.connection.last_reconnect_at = Some(at);
126    }
127
128    /// Merge a status event into the mirror. `source` distinguishes
129    /// push updates (WS) from pull backfill (HTTP). The status bar
130    /// does not currently branch on `Source`, but the persisted
131    /// `Stat<T>` carries it so observability tooling + future
132    /// lints can tell which surface wrote each value.
133    ///
134    /// `last_heartbeat` is only bumped on WS updates — HTTP
135    /// backfill is opportunistic and must not paper over a stalled
136    /// bus feed.
137    pub fn apply_status(&mut self, status: V2Status, as_of: DateTime<Utc>, source: Source) {
138        self.status = Some(Stat::new(status, source).with_as_of(as_of));
139        if matches!(source, Source::Ws) {
140            self.last_heartbeat = Some(as_of);
141        }
142    }
143
144    /// Merge a positions event into the mirror. See
145    /// [`Self::apply_status`] for the source + heartbeat contract.
146    pub fn apply_positions(&mut self, positions: Positions, as_of: DateTime<Utc>, source: Source) {
147        self.positions = Some(Stat::new(positions, source).with_as_of(as_of));
148        if matches!(source, Source::Ws) {
149            self.last_heartbeat = Some(as_of);
150        }
151    }
152
153    /// Merge a risk event into the mirror. See
154    /// [`Self::apply_status`] for the source + heartbeat contract.
155    pub fn apply_risk(&mut self, risk: Risk, as_of: DateTime<Utc>, source: Source) {
156        self.risk = Some(Stat::new(risk, source).with_as_of(as_of));
157        if matches!(source, Source::Ws) {
158            self.last_heartbeat = Some(as_of);
159        }
160    }
161
162    /// Merge a regime event into the mirror. See
163    /// [`Self::apply_status`] for the source + heartbeat contract.
164    pub fn apply_regime(&mut self, regime: Regime, as_of: DateTime<Utc>, source: Source) {
165        self.regime = Some(Stat::new(regime, source).with_as_of(as_of));
166        if matches!(source, Source::Ws) {
167            self.last_heartbeat = Some(as_of);
168        }
169    }
170
171    /// Merge a live-cockpit packet fetched from the engine.
172    ///
173    /// Cockpit is an HTTP-only surface today and must not bump
174    /// `last_heartbeat`; a healthy cockpit response does not prove
175    /// the market/event feed is alive.
176    pub fn apply_live_cockpit(&mut self, cockpit: LiveCockpit, as_of: DateTime<Utc>) {
177        self.live_cockpit = Some(Stat::new(cockpit, Source::Http).with_as_of(as_of));
178    }
179
180    /// Record a heartbeat with no payload. Bumps the freshness clock
181    /// so the status-bar feed indicator stays green even during
182    /// quiet market periods.
183    pub fn apply_heartbeat(&mut self, at: DateTime<Utc>) {
184        self.last_heartbeat = Some(at);
185    }
186
187    /// Merge an operator-state snapshot fetched from the engine.
188    ///
189    /// The snapshot carries its own `as_of` but we also wrap it in a
190    /// [`Stat`] so the staleness clock stays uniform across every
191    /// mirror field. Note: this does **not** touch `last_heartbeat`
192    /// — operator-state lives on a slower poll cadence than the bus
193    /// feed, and we do not want it masking a stalled market feed.
194    pub fn apply_operator_state(&mut self, snap: OperatorSnapshot, as_of: DateTime<Utc>) {
195        self.operator_state = Some(Stat::new(snap, Source::Http).with_as_of(as_of));
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn reconnect_counter_ticks_and_resets() {
205        let mut s = EngineState::new();
206        assert_eq!(s.connection.reconnect_count, 0);
207        assert_eq!(s.connection.total_attempts, 0);
208        assert!(!s.connection.ws_connected);
209
210        s.on_reconnect_attempt(Utc::now());
211        s.on_reconnect_attempt(Utc::now());
212        assert_eq!(s.connection.reconnect_count, 2);
213        assert_eq!(s.connection.total_attempts, 2);
214        assert!(!s.connection.ws_connected);
215
216        s.on_ws_connected();
217        assert_eq!(s.connection.reconnect_count, 0);
218        assert_eq!(
219            s.connection.total_attempts, 2,
220            "total_attempts must survive successful reconnect"
221        );
222        assert!(s.connection.ws_connected);
223
224        s.on_ws_disconnected();
225        assert!(!s.connection.ws_connected);
226    }
227
228    #[test]
229    fn heartbeat_updates_feed_age() {
230        let mut s = EngineState::new();
231        let t0 = Utc::now();
232        s.apply_heartbeat(t0);
233        let later = t0 + chrono::Duration::seconds(3);
234        assert_eq!(s.feed_age_seconds(later), Some(3));
235    }
236}