1use std::sync::Arc;
19use std::time::Duration;
20
21use chrono::Utc;
22use parking_lot::RwLock;
23use tokio::sync::watch;
24use tokio::task::JoinHandle;
25
26use crate::http::{HttpClient, HttpError};
27use crate::stat::Source;
28use crate::state::EngineState;
29
30pub const POLL_INTERVAL: Duration = Duration::from_secs(5);
34
35pub const POLL_BACKOFF: Duration = Duration::from_secs(5);
42
43#[derive(Debug)]
49pub struct OperatorStatePoller {
50 shutdown_tx: watch::Sender<bool>,
51 task: JoinHandle<()>,
52}
53
54impl OperatorStatePoller {
55 #[must_use]
59 pub fn spawn(http: HttpClient, state: Arc<RwLock<EngineState>>) -> Self {
60 Self::spawn_with_interval(http, state, POLL_INTERVAL)
61 }
62
63 #[must_use]
67 pub fn spawn_with_interval(
68 http: HttpClient,
69 state: Arc<RwLock<EngineState>>,
70 interval: Duration,
71 ) -> Self {
72 let (shutdown_tx, shutdown_rx) = watch::channel(false);
73 let task = tokio::spawn(run_loop(http, state, interval, shutdown_rx));
74 Self { shutdown_tx, task }
75 }
76
77 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
83 let _ = self.shutdown_tx.send(true);
84 self.task.await
85 }
86}
87
88async fn run_loop(
89 http: HttpClient,
90 state: Arc<RwLock<EngineState>>,
91 interval: Duration,
92 mut shutdown: watch::Receiver<bool>,
93) {
94 let mut ticker = tokio::time::interval(interval);
95 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
96
97 loop {
98 tokio::select! {
99 _ = shutdown.changed() => {
100 if *shutdown.borrow() {
101 break;
102 }
103 }
104 _ = ticker.tick() => {
105 match http.operator_state().await {
106 Ok(snap) => {
107 state.write().apply_operator_state(snap, Utc::now());
108 }
109 Err(e) => {
110 match &e {
115 HttpError::NotFound { .. } => {
116 tracing::debug!("operator-state endpoint not served; continuing");
117 }
118 _ => {
119 tracing::warn!(err = %e, "operator-state poll failed");
120 }
121 }
122 tokio::select! {
123 () = tokio::time::sleep(POLL_BACKOFF) => {}
124 _ = shutdown.changed() => break,
125 }
126 }
127 }
128 }
129 }
130 }
131
132 tracing::debug!("operator-state poller exited");
133}
134
135pub const BACKFILL_INTERVAL: Duration = Duration::from_secs(30);
161
162pub const BACKFILL_BACKOFF: Duration = Duration::from_secs(5);
166
167#[derive(Debug)]
174pub struct EngineStatePoller {
175 shutdown_tx: watch::Sender<bool>,
176 task: JoinHandle<()>,
177}
178
179impl EngineStatePoller {
180 #[must_use]
184 pub fn spawn(http: HttpClient, state: Arc<RwLock<EngineState>>) -> Self {
185 Self::spawn_with_interval(http, state, BACKFILL_INTERVAL)
186 }
187
188 #[must_use]
191 pub fn spawn_with_interval(
192 http: HttpClient,
193 state: Arc<RwLock<EngineState>>,
194 interval: Duration,
195 ) -> Self {
196 let (shutdown_tx, shutdown_rx) = watch::channel(false);
197 let task = tokio::spawn(backfill_loop(http, state, interval, shutdown_rx));
198 Self { shutdown_tx, task }
199 }
200
201 pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
207 let _ = self.shutdown_tx.send(true);
208 self.task.await
209 }
210}
211
212async fn backfill_loop(
213 http: HttpClient,
214 state: Arc<RwLock<EngineState>>,
215 interval: Duration,
216 mut shutdown: watch::Receiver<bool>,
217) {
218 let mut ticker = tokio::time::interval(interval);
219 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
220
221 loop {
222 tokio::select! {
223 _ = shutdown.changed() => {
224 if *shutdown.borrow() {
225 break;
226 }
227 }
228 _ = ticker.tick() => {
229 let failed = fetch_and_apply(&http, &state).await;
230 if failed {
231 tokio::select! {
232 () = tokio::time::sleep(BACKFILL_BACKOFF) => {}
233 _ = shutdown.changed() => break,
234 }
235 }
236 }
237 }
238 }
239
240 tracing::debug!("engine-state backfill poller exited");
241}
242
243async fn fetch_and_apply(http: &HttpClient, state: &Arc<RwLock<EngineState>>) -> bool {
249 let mut any_failed = false;
250 let now = Utc::now();
251
252 match http.v2_status().await {
253 Ok(s) => state.write().apply_status(s, now, Source::Http),
254 Err(e) => {
255 log_backfill_error("v2_status", &e);
256 any_failed = true;
257 }
258 }
259 match http.positions().await {
260 Ok(p) => state.write().apply_positions(p, now, Source::Http),
261 Err(e) => {
262 log_backfill_error("positions", &e);
263 any_failed = true;
264 }
265 }
266 match http.risk().await {
267 Ok(r) => state.write().apply_risk(r, now, Source::Http),
268 Err(e) => {
269 log_backfill_error("risk", &e);
270 any_failed = true;
271 }
272 }
273 match http.regime(None).await {
274 Ok(r) => state.write().apply_regime(r, now, Source::Http),
275 Err(e) => {
276 log_backfill_error("regime", &e);
277 any_failed = true;
278 }
279 }
280 match http.live_cockpit().await {
281 Ok(c) => state.write().apply_live_cockpit(c, now),
282 Err(e) => {
283 log_backfill_error("live_cockpit", &e);
284 any_failed = true;
285 }
286 }
287
288 any_failed
289}
290
291fn log_backfill_error(endpoint: &'static str, err: &HttpError) {
292 match err {
293 HttpError::NotFound { .. } => {
297 tracing::debug!(endpoint, "backfill endpoint not served; continuing");
298 }
299 HttpError::Unauthorized => {
302 tracing::debug!(endpoint, "backfill auth rejected; continuing");
303 }
304 _ => {
305 tracing::warn!(endpoint, err = %err, "backfill poll failed");
306 }
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 #[tokio::test]
315 async fn poller_writes_snapshot_on_first_tick() {
316 let mock = zero_testkit::mock_engine::MockEngine::spawn()
317 .await
318 .expect("mock up");
319 let http = HttpClient::new(mock.base_url(), None).expect("client");
320 let state = EngineState::shared();
321
322 let poller = OperatorStatePoller::spawn_with_interval(
323 http,
324 state.clone(),
325 Duration::from_millis(10),
326 );
327
328 let deadline = std::time::Instant::now() + Duration::from_secs(1);
330 loop {
331 if state.read().operator_state.is_some() {
332 break;
333 }
334 assert!(
335 std::time::Instant::now() <= deadline,
336 "snapshot never arrived"
337 );
338 tokio::time::sleep(Duration::from_millis(20)).await;
339 }
340
341 poller.shutdown().await.expect("clean shutdown");
342 mock.shutdown().await;
343 }
344
345 #[tokio::test]
346 async fn poller_picks_up_label_changes() {
347 let mock = zero_testkit::mock_engine::MockEngine::spawn()
348 .await
349 .expect("mock up");
350 let http = HttpClient::new(mock.base_url(), None).expect("client");
351 let state = EngineState::shared();
352
353 let poller = OperatorStatePoller::spawn_with_interval(
354 http,
355 state.clone(),
356 Duration::from_millis(10),
357 );
358
359 let deadline = std::time::Instant::now() + Duration::from_secs(1);
361 loop {
362 if matches!(
363 state.read().operator_state.as_ref().map(|s| s.value.label),
364 Some(zero_operator_state::Label::Steady)
365 ) {
366 break;
367 }
368 assert!(
369 std::time::Instant::now() <= deadline,
370 "steady never arrived"
371 );
372 tokio::time::sleep(Duration::from_millis(20)).await;
373 }
374
375 mock.with_overrides(|o| {
377 o.operator_label = Some("tilt".to_string());
378 o.operator_version += 1;
379 });
380
381 let deadline = std::time::Instant::now() + Duration::from_secs(1);
382 loop {
383 if matches!(
384 state.read().operator_state.as_ref().map(|s| s.value.label),
385 Some(zero_operator_state::Label::Tilt)
386 ) {
387 break;
388 }
389 assert!(
390 std::time::Instant::now() <= deadline,
391 "tilt never propagated"
392 );
393 tokio::time::sleep(Duration::from_millis(20)).await;
394 }
395
396 poller.shutdown().await.expect("clean shutdown");
397 mock.shutdown().await;
398 }
399
400 #[tokio::test]
403 async fn backfill_populates_all_tracked_fields() {
404 let mock = zero_testkit::mock_engine::MockEngine::spawn()
405 .await
406 .expect("mock up");
407 let http = HttpClient::new(mock.base_url(), None).expect("client");
408 let state = EngineState::shared();
409
410 let poller =
411 EngineStatePoller::spawn_with_interval(http, state.clone(), Duration::from_millis(10));
412
413 let deadline = std::time::Instant::now() + Duration::from_secs(2);
419 loop {
420 let ready = {
421 let s = state.read();
422 s.status.is_some()
423 && s.positions.is_some()
424 && s.risk.is_some()
425 && s.regime.is_some()
426 && s.live_cockpit.is_some()
427 };
428 if ready {
429 break;
430 }
431 assert!(
432 std::time::Instant::now() <= deadline,
433 "not all fields backfilled in time"
434 );
435 tokio::time::sleep(Duration::from_millis(20)).await;
436 }
437
438 let (src_status, src_positions, src_risk, src_regime, src_cockpit, heartbeat) = {
442 let s = state.read();
443 (
444 s.status.as_ref().unwrap().source,
445 s.positions.as_ref().unwrap().source,
446 s.risk.as_ref().unwrap().source,
447 s.regime.as_ref().unwrap().source,
448 s.live_cockpit.as_ref().unwrap().source,
449 s.last_heartbeat,
450 )
451 };
452 assert!(matches!(src_status, Source::Http));
453 assert!(matches!(src_positions, Source::Http));
454 assert!(matches!(src_risk, Source::Http));
455 assert!(matches!(src_regime, Source::Http));
456 assert!(matches!(src_cockpit, Source::Http));
457 assert!(
458 heartbeat.is_none(),
459 "HTTP backfill must not bump last_heartbeat — that's a WS-only signal",
460 );
461
462 poller.shutdown().await.expect("clean shutdown");
463 mock.shutdown().await;
464 }
465
466 #[tokio::test]
467 async fn backfill_survives_transient_503_via_retry() {
468 let mock = zero_testkit::mock_engine::MockEngine::spawn()
469 .await
470 .expect("mock up");
471 mock.with_overrides(|o| o.transient_fail_count = 1);
475
476 let http = HttpClient::new(mock.base_url(), None).expect("client");
477 let state = EngineState::shared();
478
479 let poller =
480 EngineStatePoller::spawn_with_interval(http, state.clone(), Duration::from_millis(10));
481
482 let deadline = std::time::Instant::now() + Duration::from_secs(3);
483 loop {
484 let ready = state.read().status.is_some();
486 if ready {
487 break;
488 }
489 assert!(
490 std::time::Instant::now() <= deadline,
491 "status never backfilled (retry may have mis-behaved)"
492 );
493 tokio::time::sleep(Duration::from_millis(20)).await;
494 }
495
496 poller.shutdown().await.expect("clean shutdown");
497 mock.shutdown().await;
498 }
499}