zero_engine_client/state.rs
1//! `EngineState` — a CLI-side mirror of live engine state.
2//!
3//! Populated by the WS subscriber (push) and opportunistically by
4//! HTTP polls (pull). Every field that the TUI renders is a
5//! `Stat<T>`, so staleness is visible and the renderer can refuse
6//! to display values that age out.
7//!
8//! The mirror is read by widgets via a cheap clone of its `Arc`,
9//! and it is updated only from within the subscriber's task. This
10//! keeps the lock scope small and lets ratatui's render loop
11//! acquire `read()` handles without contention.
12//!
13//! See ADR-003 (state model) and spec §3.5 ("engine is the source
14//! of truth").
15
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use parking_lot::RwLock;
20use zero_operator_state::Snapshot as OperatorSnapshot;
21
22use crate::models::{LiveCockpit, Positions, Regime, Risk, V2Status};
23use crate::stat::{Source, Stat};
24
25/// A connection health roll-up for the status bar.
26#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
27pub struct ConnectionHealth {
28 /// Whether the WS is currently up.
29 pub ws_connected: bool,
30 /// Failed-connect attempts since the last successful connect.
31 /// Drives the "reconnecting (attempt N)…" status-bar hint and
32 /// resets to zero the moment the socket is up again.
33 pub reconnect_count: u32,
34 /// Lifetime reconnect attempts. Never resets. Used by
35 /// observability tooling (and tests) that need to see that a
36 /// reconnect happened even after the subscriber recovered.
37 pub total_attempts: u64,
38 /// When the most recent reconnect attempt began. Useful for
39 /// rendering "reconnecting in 2s…" hints.
40 pub last_reconnect_at: Option<DateTime<Utc>>,
41}
42
43/// Live mirror of the engine fields the CLI renders.
44///
45/// Every field is an `Option<Stat<T>>` — `None` means "we have not
46/// seen this yet," not "the engine doesn't have one." The renderer
47/// distinguishes these states: unseen → placeholder, stale → amber
48/// staleness badge, fresh → normal render.
49#[derive(Debug, Default, Clone)]
50pub struct EngineState {
51 pub status: Option<Stat<V2Status>>,
52 pub positions: Option<Stat<Positions>>,
53 pub risk: Option<Stat<Risk>>,
54 pub regime: Option<Stat<Regime>>,
55 /// Consolidated live-readiness cockpit from `GET /live/cockpit`.
56 /// This is read-only operator state: preflight, immune,
57 /// reconciliation, certification, heartbeat, and local receipt
58 /// counts. It intentionally lives in the same mirror as status,
59 /// positions, risk, and regime so the full-screen TUI cockpit can
60 /// render without issuing network calls from the draw path.
61 pub live_cockpit: Option<Stat<LiveCockpit>>,
62 /// Operator behavioral state snapshot mirrored from the engine's
63 /// `GET /operator/state` endpoint (ADR-016). The classifier runs
64 /// on the engine host; the CLI only renders. `None` means "never
65 /// observed" — the status bar falls back to `?`.
66 pub operator_state: Option<Stat<OperatorSnapshot>>,
67 /// Most recent heartbeat timestamp from the engine's bus poller.
68 /// Drives the freshness clock — if this stops advancing, the
69 /// status bar goes amber then red per `feed` thresholds.
70 pub last_heartbeat: Option<DateTime<Utc>>,
71 pub connection: ConnectionHealth,
72}
73
74impl EngineState {
75 #[must_use]
76 pub fn new() -> Self {
77 Self::default()
78 }
79
80 /// Returns an `Arc<RwLock<Self>>` ready for sharing between the
81 /// WS subscriber task and the TUI render loop.
82 #[must_use]
83 pub fn shared() -> Arc<RwLock<Self>> {
84 Arc::new(RwLock::new(Self::new()))
85 }
86
87 /// Freshness of the feed, in seconds, measured against `now`.
88 /// Returns `None` if no heartbeat has been observed.
89 #[must_use]
90 pub fn feed_age_seconds(&self, now: DateTime<Utc>) -> Option<i64> {
91 self.last_heartbeat
92 .map(|hb| now.signed_duration_since(hb).num_seconds())
93 }
94
95 /// Hyperliquid per-minute rate the engine is reporting, if any.
96 ///
97 /// Returns `None` when `/v2/status` has not been observed yet
98 /// or when the observed payload did not carry an `hl_rate`
99 /// field (older engine). The caller renders `hl:?` in both
100 /// cases — from the operator's seat, "we haven't heard"
101 /// and "the engine is not telling" are indistinguishable, so
102 /// the same metadata-color placeholder is the honest render.
103 #[must_use]
104 pub fn hl_rate_snapshot(&self) -> Option<crate::models::HlRate> {
105 self.status.as_ref().and_then(|s| s.value.hl_rate)
106 }
107
108 /// Record that a WS connection was established. Resets the
109 /// reconnect counter to zero — the one we just made is behind us.
110 pub fn on_ws_connected(&mut self) {
111 self.connection.ws_connected = true;
112 self.connection.reconnect_count = 0;
113 }
114
115 /// Record that the WS dropped. Does not reset `reconnect_count`
116 /// — that ticks up on each attempt until one succeeds.
117 pub fn on_ws_disconnected(&mut self) {
118 self.connection.ws_connected = false;
119 }
120
121 /// Record the start of a new reconnect attempt.
122 pub fn on_reconnect_attempt(&mut self, at: DateTime<Utc>) {
123 self.connection.reconnect_count = self.connection.reconnect_count.saturating_add(1);
124 self.connection.total_attempts = self.connection.total_attempts.saturating_add(1);
125 self.connection.last_reconnect_at = Some(at);
126 }
127
128 /// Merge a status event into the mirror. `source` distinguishes
129 /// push updates (WS) from pull backfill (HTTP). The status bar
130 /// does not currently branch on `Source`, but the persisted
131 /// `Stat<T>` carries it so observability tooling + future
132 /// lints can tell which surface wrote each value.
133 ///
134 /// `last_heartbeat` is only bumped on WS updates — HTTP
135 /// backfill is opportunistic and must not paper over a stalled
136 /// bus feed.
137 pub fn apply_status(&mut self, status: V2Status, as_of: DateTime<Utc>, source: Source) {
138 self.status = Some(Stat::new(status, source).with_as_of(as_of));
139 if matches!(source, Source::Ws) {
140 self.last_heartbeat = Some(as_of);
141 }
142 }
143
144 /// Merge a positions event into the mirror. See
145 /// [`Self::apply_status`] for the source + heartbeat contract.
146 pub fn apply_positions(&mut self, positions: Positions, as_of: DateTime<Utc>, source: Source) {
147 self.positions = Some(Stat::new(positions, source).with_as_of(as_of));
148 if matches!(source, Source::Ws) {
149 self.last_heartbeat = Some(as_of);
150 }
151 }
152
153 /// Merge a risk event into the mirror. See
154 /// [`Self::apply_status`] for the source + heartbeat contract.
155 pub fn apply_risk(&mut self, risk: Risk, as_of: DateTime<Utc>, source: Source) {
156 self.risk = Some(Stat::new(risk, source).with_as_of(as_of));
157 if matches!(source, Source::Ws) {
158 self.last_heartbeat = Some(as_of);
159 }
160 }
161
162 /// Merge a regime event into the mirror. See
163 /// [`Self::apply_status`] for the source + heartbeat contract.
164 pub fn apply_regime(&mut self, regime: Regime, as_of: DateTime<Utc>, source: Source) {
165 self.regime = Some(Stat::new(regime, source).with_as_of(as_of));
166 if matches!(source, Source::Ws) {
167 self.last_heartbeat = Some(as_of);
168 }
169 }
170
171 /// Merge a live-cockpit packet fetched from the engine.
172 ///
173 /// Cockpit is an HTTP-only surface today and must not bump
174 /// `last_heartbeat`; a healthy cockpit response does not prove
175 /// the market/event feed is alive.
176 pub fn apply_live_cockpit(&mut self, cockpit: LiveCockpit, as_of: DateTime<Utc>) {
177 self.live_cockpit = Some(Stat::new(cockpit, Source::Http).with_as_of(as_of));
178 }
179
180 /// Record a heartbeat with no payload. Bumps the freshness clock
181 /// so the status-bar feed indicator stays green even during
182 /// quiet market periods.
183 pub fn apply_heartbeat(&mut self, at: DateTime<Utc>) {
184 self.last_heartbeat = Some(at);
185 }
186
187 /// Merge an operator-state snapshot fetched from the engine.
188 ///
189 /// The snapshot carries its own `as_of` but we also wrap it in a
190 /// [`Stat`] so the staleness clock stays uniform across every
191 /// mirror field. Note: this does **not** touch `last_heartbeat`
192 /// — operator-state lives on a slower poll cadence than the bus
193 /// feed, and we do not want it masking a stalled market feed.
194 pub fn apply_operator_state(&mut self, snap: OperatorSnapshot, as_of: DateTime<Utc>) {
195 self.operator_state = Some(Stat::new(snap, Source::Http).with_as_of(as_of));
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn reconnect_counter_ticks_and_resets() {
205 let mut s = EngineState::new();
206 assert_eq!(s.connection.reconnect_count, 0);
207 assert_eq!(s.connection.total_attempts, 0);
208 assert!(!s.connection.ws_connected);
209
210 s.on_reconnect_attempt(Utc::now());
211 s.on_reconnect_attempt(Utc::now());
212 assert_eq!(s.connection.reconnect_count, 2);
213 assert_eq!(s.connection.total_attempts, 2);
214 assert!(!s.connection.ws_connected);
215
216 s.on_ws_connected();
217 assert_eq!(s.connection.reconnect_count, 0);
218 assert_eq!(
219 s.connection.total_attempts, 2,
220 "total_attempts must survive successful reconnect"
221 );
222 assert!(s.connection.ws_connected);
223
224 s.on_ws_disconnected();
225 assert!(!s.connection.ws_connected);
226 }
227
228 #[test]
229 fn heartbeat_updates_feed_age() {
230 let mut s = EngineState::new();
231 let t0 = Utc::now();
232 s.apply_heartbeat(t0);
233 let later = t0 + chrono::Duration::seconds(3);
234 assert_eq!(s.feed_age_seconds(later), Some(3));
235 }
236}