Skip to main content

zero_engine_client/
poll.rs

1//! Background pollers for engine endpoints that do not have a
2//! dedicated push channel.
3//!
4//! At present, that's one surface: `GET /operator/state`. The
5//! classifier runs on the engine host (ADR-016) and the endpoint is
6//! the CLI's only window into the current behavioral label. The
7//! poller runs at a deliberately relaxed cadence — operator state
8//! changes on human time scales, and over-polling an engine we do
9//! not own is rude.
10//!
11//! Transient errors are swallowed with a warn-log; the subscriber
12//! task keeps trying. Fatal errors (bad base URL, unauth) surface
13//! at construction time. The mirror's `Stat<Snapshot>` freshness
14//! metadata is what the widget uses to distinguish "never polled"
15//! from "stale" from "fresh" — see `statusbar.rs` for the render
16//! side.
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use chrono::Utc;
22use parking_lot::RwLock;
23use tokio::sync::watch;
24use tokio::task::JoinHandle;
25
26use crate::http::{HttpClient, HttpError};
27use crate::stat::Source;
28use crate::state::EngineState;
29
30/// How often the poller fetches `/operator/state`. 5 s matches the
31/// Addendum A §2 target classifier tick cadence — faster than the
32/// operator can notice a mislabel, slower than a naive polling loop.
33pub const POLL_INTERVAL: Duration = Duration::from_secs(5);
34
35/// How long to wait after a failed request before retrying. We
36/// intentionally do **not** escalate backoff here: if the engine is
37/// down, the status bar already shows `engine:DOWN` via the WS
38/// subscriber, and the operator-state segment degrading to `ops:?`
39/// after 30 s is the honest rendering (see
40/// `OPERATOR_STATE_STALE_AFTER` in the widget).
41pub const POLL_BACKOFF: Duration = Duration::from_secs(5);
42
43/// Handle to a running operator-state poller.
44///
45/// Dropping the handle does **not** stop the task; callers must
46/// explicitly `.shutdown().await` for a clean exit. Matches the
47/// `WsSubscriber` handle's semantics.
48#[derive(Debug)]
49pub struct OperatorStatePoller {
50    shutdown_tx: watch::Sender<bool>,
51    task: JoinHandle<()>,
52}
53
54impl OperatorStatePoller {
55    /// Spawn a poller that writes into `state` every
56    /// [`POLL_INTERVAL`]. Returns immediately — the first fetch
57    /// happens in the background task on the first tick.
58    #[must_use]
59    pub fn spawn(http: HttpClient, state: Arc<RwLock<EngineState>>) -> Self {
60        Self::spawn_with_interval(http, state, POLL_INTERVAL)
61    }
62
63    /// Like [`Self::spawn`] with a custom interval — used by tests
64    /// that want to exercise multiple polls in a short wall-clock
65    /// window.
66    #[must_use]
67    pub fn spawn_with_interval(
68        http: HttpClient,
69        state: Arc<RwLock<EngineState>>,
70        interval: Duration,
71    ) -> Self {
72        let (shutdown_tx, shutdown_rx) = watch::channel(false);
73        let task = tokio::spawn(run_loop(http, state, interval, shutdown_rx));
74        Self { shutdown_tx, task }
75    }
76
77    /// Signal the task to exit and wait for it.
78    ///
79    /// # Errors
80    /// Returns the tokio join error verbatim on panic or cancel;
81    /// clean shutdown always returns `Ok`.
82    pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
83        let _ = self.shutdown_tx.send(true);
84        self.task.await
85    }
86}
87
88async fn run_loop(
89    http: HttpClient,
90    state: Arc<RwLock<EngineState>>,
91    interval: Duration,
92    mut shutdown: watch::Receiver<bool>,
93) {
94    let mut ticker = tokio::time::interval(interval);
95    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
96
97    loop {
98        tokio::select! {
99            _ = shutdown.changed() => {
100                if *shutdown.borrow() {
101                    break;
102                }
103            }
104            _ = ticker.tick() => {
105                match http.operator_state().await {
106                    Ok(snap) => {
107                        state.write().apply_operator_state(snap, Utc::now());
108                    }
109                    Err(e) => {
110                        // 404 means the engine is older than ADR-016
111                        // and does not expose the endpoint. Log once
112                        // at info, then back off to a warn on repeat
113                        // failures.
114                        match &e {
115                            HttpError::NotFound { .. } => {
116                                tracing::debug!("operator-state endpoint not served; continuing");
117                            }
118                            _ => {
119                                tracing::warn!(err = %e, "operator-state poll failed");
120                            }
121                        }
122                        tokio::select! {
123                            () = tokio::time::sleep(POLL_BACKOFF) => {}
124                            _ = shutdown.changed() => break,
125                        }
126                    }
127                }
128            }
129        }
130    }
131
132    tracing::debug!("operator-state poller exited");
133}
134
135// ─── HTTP backfill for the core mirror fields ──────────────────────
136//
137// The WS subscriber is the primary source for `status` / `positions`
138// / `risk` / `regime`. The same poller also keeps the read-only
139// `live_cockpit` mirror warm for the full-screen operator cockpit.
140// The backfill poller is a defense-in-depth
141// layer: when the WS is reconnecting (or the engine temporarily
142// stops emitting an event type), this task keeps the mirror
143// populated via cheap HTTP polls. Writes are tagged `Source::Http`
144// so the rendered `Stat<T>` makes the provenance honest.
145//
146// The cadence is deliberately slow (30 s by default) because:
147//
148// * Push is the main path; this is backfill, not the source of truth.
149// * The CLI should never flood an engine it does not own.
150// * `last_heartbeat` is only bumped by WS updates (see
151//   `EngineState::apply_*`), so this poller cannot paper over a
152//   stalled feed — the status bar still goes amber → red if the bus
153//   stops, even while backfill keeps writing.
154
155/// How often the backfill poller pulls each endpoint. 30 s is slow
156/// enough to be polite to the engine and fast enough that a WS
157/// dropout doesn't leave the operator staring at a stale mirror
158/// for long. Callers that want a different cadence (tests, mainly)
159/// use [`EngineStatePoller::spawn_with_interval`].
160pub const BACKFILL_INTERVAL: Duration = Duration::from_secs(30);
161
162/// Backoff after a backfill failure. Matches [`POLL_BACKOFF`] —
163/// if the engine is down the WS-side indicator already tells the
164/// operator; doubling the visible cadence helps no one.
165pub const BACKFILL_BACKOFF: Duration = Duration::from_secs(5);
166
167/// Handle to a running HTTP-backfill poller for the core mirror
168/// fields.
169///
170/// Same lifecycle semantics as [`OperatorStatePoller`]: dropping
171/// the handle does **not** stop the task; call
172/// [`EngineStatePoller::shutdown`] for a clean exit.
173#[derive(Debug)]
174pub struct EngineStatePoller {
175    shutdown_tx: watch::Sender<bool>,
176    task: JoinHandle<()>,
177}
178
179impl EngineStatePoller {
180    /// Spawn a backfill poller for `status` / `positions` / `risk`
181    /// / `regime` / `live_cockpit`. Returns immediately — the first poll happens in
182    /// the background task on the first tick.
183    #[must_use]
184    pub fn spawn(http: HttpClient, state: Arc<RwLock<EngineState>>) -> Self {
185        Self::spawn_with_interval(http, state, BACKFILL_INTERVAL)
186    }
187
188    /// Like [`Self::spawn`] with a custom interval. Used by tests
189    /// to exercise multiple polls in a short wall-clock window.
190    #[must_use]
191    pub fn spawn_with_interval(
192        http: HttpClient,
193        state: Arc<RwLock<EngineState>>,
194        interval: Duration,
195    ) -> Self {
196        let (shutdown_tx, shutdown_rx) = watch::channel(false);
197        let task = tokio::spawn(backfill_loop(http, state, interval, shutdown_rx));
198        Self { shutdown_tx, task }
199    }
200
201    /// Signal the task to exit and wait for it.
202    ///
203    /// # Errors
204    /// Returns the tokio join error verbatim on panic or cancel;
205    /// clean shutdown always returns `Ok`.
206    pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
207        let _ = self.shutdown_tx.send(true);
208        self.task.await
209    }
210}
211
212async fn backfill_loop(
213    http: HttpClient,
214    state: Arc<RwLock<EngineState>>,
215    interval: Duration,
216    mut shutdown: watch::Receiver<bool>,
217) {
218    let mut ticker = tokio::time::interval(interval);
219    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
220
221    loop {
222        tokio::select! {
223            _ = shutdown.changed() => {
224                if *shutdown.borrow() {
225                    break;
226                }
227            }
228            _ = ticker.tick() => {
229                let failed = fetch_and_apply(&http, &state).await;
230                if failed {
231                    tokio::select! {
232                        () = tokio::time::sleep(BACKFILL_BACKOFF) => {}
233                        _ = shutdown.changed() => break,
234                    }
235                }
236            }
237        }
238    }
239
240    tracing::debug!("engine-state backfill poller exited");
241}
242
243/// Fetch each tracked endpoint in sequence, applying successes
244/// and logging individual failures. Returns `true` if any
245/// endpoint failed so the caller can apply a short backoff; a
246/// per-call `false` would starve the mirror of the endpoints
247/// that *did* work.
248async fn fetch_and_apply(http: &HttpClient, state: &Arc<RwLock<EngineState>>) -> bool {
249    let mut any_failed = false;
250    let now = Utc::now();
251
252    match http.v2_status().await {
253        Ok(s) => state.write().apply_status(s, now, Source::Http),
254        Err(e) => {
255            log_backfill_error("v2_status", &e);
256            any_failed = true;
257        }
258    }
259    match http.positions().await {
260        Ok(p) => state.write().apply_positions(p, now, Source::Http),
261        Err(e) => {
262            log_backfill_error("positions", &e);
263            any_failed = true;
264        }
265    }
266    match http.risk().await {
267        Ok(r) => state.write().apply_risk(r, now, Source::Http),
268        Err(e) => {
269            log_backfill_error("risk", &e);
270            any_failed = true;
271        }
272    }
273    match http.regime(None).await {
274        Ok(r) => state.write().apply_regime(r, now, Source::Http),
275        Err(e) => {
276            log_backfill_error("regime", &e);
277            any_failed = true;
278        }
279    }
280    match http.live_cockpit().await {
281        Ok(c) => state.write().apply_live_cockpit(c, now),
282        Err(e) => {
283            log_backfill_error("live_cockpit", &e);
284            any_failed = true;
285        }
286    }
287
288    any_failed
289}
290
291fn log_backfill_error(endpoint: &'static str, err: &HttpError) {
292    match err {
293        // 404 means the engine doesn't expose this endpoint (older
294        // build, or locked-down surface). Log once at debug — repeat
295        // log-spam is worse than the missing data.
296        HttpError::NotFound { .. } => {
297            tracing::debug!(endpoint, "backfill endpoint not served; continuing");
298        }
299        // 401 means auth isn't wired yet; the WS reconnect loop is
300        // already shouting about it. Keep it quiet here.
301        HttpError::Unauthorized => {
302            tracing::debug!(endpoint, "backfill auth rejected; continuing");
303        }
304        _ => {
305            tracing::warn!(endpoint, err = %err, "backfill poll failed");
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[tokio::test]
315    async fn poller_writes_snapshot_on_first_tick() {
316        let mock = zero_testkit::mock_engine::MockEngine::spawn()
317            .await
318            .expect("mock up");
319        let http = HttpClient::new(mock.base_url(), None).expect("client");
320        let state = EngineState::shared();
321
322        let poller = OperatorStatePoller::spawn_with_interval(
323            http,
324            state.clone(),
325            Duration::from_millis(10),
326        );
327
328        // Wait up to 1 s for the first snapshot to land.
329        let deadline = std::time::Instant::now() + Duration::from_secs(1);
330        loop {
331            if state.read().operator_state.is_some() {
332                break;
333            }
334            assert!(
335                std::time::Instant::now() <= deadline,
336                "snapshot never arrived"
337            );
338            tokio::time::sleep(Duration::from_millis(20)).await;
339        }
340
341        poller.shutdown().await.expect("clean shutdown");
342        mock.shutdown().await;
343    }
344
345    #[tokio::test]
346    async fn poller_picks_up_label_changes() {
347        let mock = zero_testkit::mock_engine::MockEngine::spawn()
348            .await
349            .expect("mock up");
350        let http = HttpClient::new(mock.base_url(), None).expect("client");
351        let state = EngineState::shared();
352
353        let poller = OperatorStatePoller::spawn_with_interval(
354            http,
355            state.clone(),
356            Duration::from_millis(10),
357        );
358
359        // Wait for STEADY (default).
360        let deadline = std::time::Instant::now() + Duration::from_secs(1);
361        loop {
362            if matches!(
363                state.read().operator_state.as_ref().map(|s| s.value.label),
364                Some(zero_operator_state::Label::Steady)
365            ) {
366                break;
367            }
368            assert!(
369                std::time::Instant::now() <= deadline,
370                "steady never arrived"
371            );
372            tokio::time::sleep(Duration::from_millis(20)).await;
373        }
374
375        // Flip to TILT.
376        mock.with_overrides(|o| {
377            o.operator_label = Some("tilt".to_string());
378            o.operator_version += 1;
379        });
380
381        let deadline = std::time::Instant::now() + Duration::from_secs(1);
382        loop {
383            if matches!(
384                state.read().operator_state.as_ref().map(|s| s.value.label),
385                Some(zero_operator_state::Label::Tilt)
386            ) {
387                break;
388            }
389            assert!(
390                std::time::Instant::now() <= deadline,
391                "tilt never propagated"
392            );
393            tokio::time::sleep(Duration::from_millis(20)).await;
394        }
395
396        poller.shutdown().await.expect("clean shutdown");
397        mock.shutdown().await;
398    }
399
400    // ── EngineStatePoller (HTTP backfill) ──────────────────────────
401
402    #[tokio::test]
403    async fn backfill_populates_all_tracked_fields() {
404        let mock = zero_testkit::mock_engine::MockEngine::spawn()
405            .await
406            .expect("mock up");
407        let http = HttpClient::new(mock.base_url(), None).expect("client");
408        let state = EngineState::shared();
409
410        let poller =
411            EngineStatePoller::spawn_with_interval(http, state.clone(), Duration::from_millis(10));
412
413        // Wait up to 2 s — five sequential HTTP calls per tick,
414        // each ~1 ms on localhost, but CI schedulers are capricious.
415        // The read-guard is scoped inside the block so it's dropped
416        // before the next `.await` (clippy's await-holding-lock lint
417        // forbids carrying parking_lot guards across yield points).
418        let deadline = std::time::Instant::now() + Duration::from_secs(2);
419        loop {
420            let ready = {
421                let s = state.read();
422                s.status.is_some()
423                    && s.positions.is_some()
424                    && s.risk.is_some()
425                    && s.regime.is_some()
426                    && s.live_cockpit.is_some()
427            };
428            if ready {
429                break;
430            }
431            assert!(
432                std::time::Instant::now() <= deadline,
433                "not all fields backfilled in time"
434            );
435            tokio::time::sleep(Duration::from_millis(20)).await;
436        }
437
438        // All five must be tagged HTTP — backfill path, not push.
439        // Snapshot the relevant bits inside a scoped read so the
440        // guard is released before the shutdown awaits below.
441        let (src_status, src_positions, src_risk, src_regime, src_cockpit, heartbeat) = {
442            let s = state.read();
443            (
444                s.status.as_ref().unwrap().source,
445                s.positions.as_ref().unwrap().source,
446                s.risk.as_ref().unwrap().source,
447                s.regime.as_ref().unwrap().source,
448                s.live_cockpit.as_ref().unwrap().source,
449                s.last_heartbeat,
450            )
451        };
452        assert!(matches!(src_status, Source::Http));
453        assert!(matches!(src_positions, Source::Http));
454        assert!(matches!(src_risk, Source::Http));
455        assert!(matches!(src_regime, Source::Http));
456        assert!(matches!(src_cockpit, Source::Http));
457        assert!(
458            heartbeat.is_none(),
459            "HTTP backfill must not bump last_heartbeat — that's a WS-only signal",
460        );
461
462        poller.shutdown().await.expect("clean shutdown");
463        mock.shutdown().await;
464    }
465
466    #[tokio::test]
467    async fn backfill_survives_transient_503_via_retry() {
468        let mock = zero_testkit::mock_engine::MockEngine::spawn()
469            .await
470            .expect("mock up");
471        // Single 503 on the next request; the retry-once policy
472        // should absorb it and the poller should still populate
473        // the mirror on its first tick.
474        mock.with_overrides(|o| o.transient_fail_count = 1);
475
476        let http = HttpClient::new(mock.base_url(), None).expect("client");
477        let state = EngineState::shared();
478
479        let poller =
480            EngineStatePoller::spawn_with_interval(http, state.clone(), Duration::from_millis(10));
481
482        let deadline = std::time::Instant::now() + Duration::from_secs(3);
483        loop {
484            // Scoped read — guard dropped before the await below.
485            let ready = state.read().status.is_some();
486            if ready {
487                break;
488            }
489            assert!(
490                std::time::Instant::now() <= deadline,
491                "status never backfilled (retry may have mis-behaved)"
492            );
493            tokio::time::sleep(Duration::from_millis(20)).await;
494        }
495
496        poller.shutdown().await.expect("clean shutdown");
497        mock.shutdown().await;
498    }
499}