use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use parking_lot::RwLock;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use crate::http::{HttpClient, HttpError};
use crate::stat::Source;
use crate::state::EngineState;
pub const POLL_INTERVAL: Duration = Duration::from_secs(5);
pub const POLL_BACKOFF: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct OperatorStatePoller {
shutdown_tx: watch::Sender<bool>,
task: JoinHandle<()>,
}
impl OperatorStatePoller {
#[must_use]
pub fn spawn(http: HttpClient, state: Arc<RwLock<EngineState>>) -> Self {
Self::spawn_with_interval(http, state, POLL_INTERVAL)
}
#[must_use]
pub fn spawn_with_interval(
http: HttpClient,
state: Arc<RwLock<EngineState>>,
interval: Duration,
) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let task = tokio::spawn(run_loop(http, state, interval, shutdown_rx));
Self { shutdown_tx, task }
}
pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
let _ = self.shutdown_tx.send(true);
self.task.await
}
}
async fn run_loop(
http: HttpClient,
state: Arc<RwLock<EngineState>>,
interval: Duration,
mut shutdown: watch::Receiver<bool>,
) {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
_ = ticker.tick() => {
match http.operator_state().await {
Ok(snap) => {
state.write().apply_operator_state(snap, Utc::now());
}
Err(e) => {
match &e {
HttpError::NotFound { .. } => {
tracing::debug!("operator-state endpoint not served; continuing");
}
_ => {
tracing::warn!(err = %e, "operator-state poll failed");
}
}
tokio::select! {
() = tokio::time::sleep(POLL_BACKOFF) => {}
_ = shutdown.changed() => break,
}
}
}
}
}
}
tracing::debug!("operator-state poller exited");
}
pub const BACKFILL_INTERVAL: Duration = Duration::from_secs(30);
pub const BACKFILL_BACKOFF: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct EngineStatePoller {
shutdown_tx: watch::Sender<bool>,
task: JoinHandle<()>,
}
impl EngineStatePoller {
#[must_use]
pub fn spawn(http: HttpClient, state: Arc<RwLock<EngineState>>) -> Self {
Self::spawn_with_interval(http, state, BACKFILL_INTERVAL)
}
#[must_use]
pub fn spawn_with_interval(
http: HttpClient,
state: Arc<RwLock<EngineState>>,
interval: Duration,
) -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let task = tokio::spawn(backfill_loop(http, state, interval, shutdown_rx));
Self { shutdown_tx, task }
}
pub async fn shutdown(self) -> Result<(), tokio::task::JoinError> {
let _ = self.shutdown_tx.send(true);
self.task.await
}
}
async fn backfill_loop(
http: HttpClient,
state: Arc<RwLock<EngineState>>,
interval: Duration,
mut shutdown: watch::Receiver<bool>,
) {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
_ = ticker.tick() => {
let failed = fetch_and_apply(&http, &state).await;
if failed {
tokio::select! {
() = tokio::time::sleep(BACKFILL_BACKOFF) => {}
_ = shutdown.changed() => break,
}
}
}
}
}
tracing::debug!("engine-state backfill poller exited");
}
async fn fetch_and_apply(http: &HttpClient, state: &Arc<RwLock<EngineState>>) -> bool {
let mut any_failed = false;
let now = Utc::now();
match http.v2_status().await {
Ok(s) => state.write().apply_status(s, now, Source::Http),
Err(e) => {
log_backfill_error("v2_status", &e);
any_failed = true;
}
}
match http.positions().await {
Ok(p) => state.write().apply_positions(p, now, Source::Http),
Err(e) => {
log_backfill_error("positions", &e);
any_failed = true;
}
}
match http.risk().await {
Ok(r) => state.write().apply_risk(r, now, Source::Http),
Err(e) => {
log_backfill_error("risk", &e);
any_failed = true;
}
}
match http.regime(None).await {
Ok(r) => state.write().apply_regime(r, now, Source::Http),
Err(e) => {
log_backfill_error("regime", &e);
any_failed = true;
}
}
match http.live_cockpit().await {
Ok(c) => state.write().apply_live_cockpit(c, now),
Err(e) => {
log_backfill_error("live_cockpit", &e);
any_failed = true;
}
}
any_failed
}
fn log_backfill_error(endpoint: &'static str, err: &HttpError) {
match err {
HttpError::NotFound { .. } => {
tracing::debug!(endpoint, "backfill endpoint not served; continuing");
}
HttpError::Unauthorized => {
tracing::debug!(endpoint, "backfill auth rejected; continuing");
}
_ => {
tracing::warn!(endpoint, err = %err, "backfill poll failed");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn poller_writes_snapshot_on_first_tick() {
let mock = zero_testkit::mock_engine::MockEngine::spawn()
.await
.expect("mock up");
let http = HttpClient::new(mock.base_url(), None).expect("client");
let state = EngineState::shared();
let poller = OperatorStatePoller::spawn_with_interval(
http,
state.clone(),
Duration::from_millis(10),
);
let deadline = std::time::Instant::now() + Duration::from_secs(1);
loop {
if state.read().operator_state.is_some() {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"snapshot never arrived"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
poller.shutdown().await.expect("clean shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn poller_picks_up_label_changes() {
let mock = zero_testkit::mock_engine::MockEngine::spawn()
.await
.expect("mock up");
let http = HttpClient::new(mock.base_url(), None).expect("client");
let state = EngineState::shared();
let poller = OperatorStatePoller::spawn_with_interval(
http,
state.clone(),
Duration::from_millis(10),
);
let deadline = std::time::Instant::now() + Duration::from_secs(1);
loop {
if matches!(
state.read().operator_state.as_ref().map(|s| s.value.label),
Some(zero_operator_state::Label::Steady)
) {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"steady never arrived"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
mock.with_overrides(|o| {
o.operator_label = Some("tilt".to_string());
o.operator_version += 1;
});
let deadline = std::time::Instant::now() + Duration::from_secs(1);
loop {
if matches!(
state.read().operator_state.as_ref().map(|s| s.value.label),
Some(zero_operator_state::Label::Tilt)
) {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"tilt never propagated"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
poller.shutdown().await.expect("clean shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn backfill_populates_all_tracked_fields() {
let mock = zero_testkit::mock_engine::MockEngine::spawn()
.await
.expect("mock up");
let http = HttpClient::new(mock.base_url(), None).expect("client");
let state = EngineState::shared();
let poller =
EngineStatePoller::spawn_with_interval(http, state.clone(), Duration::from_millis(10));
let deadline = std::time::Instant::now() + Duration::from_secs(2);
loop {
let ready = {
let s = state.read();
s.status.is_some()
&& s.positions.is_some()
&& s.risk.is_some()
&& s.regime.is_some()
&& s.live_cockpit.is_some()
};
if ready {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"not all fields backfilled in time"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
let (src_status, src_positions, src_risk, src_regime, src_cockpit, heartbeat) = {
let s = state.read();
(
s.status.as_ref().unwrap().source,
s.positions.as_ref().unwrap().source,
s.risk.as_ref().unwrap().source,
s.regime.as_ref().unwrap().source,
s.live_cockpit.as_ref().unwrap().source,
s.last_heartbeat,
)
};
assert!(matches!(src_status, Source::Http));
assert!(matches!(src_positions, Source::Http));
assert!(matches!(src_risk, Source::Http));
assert!(matches!(src_regime, Source::Http));
assert!(matches!(src_cockpit, Source::Http));
assert!(
heartbeat.is_none(),
"HTTP backfill must not bump last_heartbeat — that's a WS-only signal",
);
poller.shutdown().await.expect("clean shutdown");
mock.shutdown().await;
}
#[tokio::test]
async fn backfill_survives_transient_503_via_retry() {
let mock = zero_testkit::mock_engine::MockEngine::spawn()
.await
.expect("mock up");
mock.with_overrides(|o| o.transient_fail_count = 1);
let http = HttpClient::new(mock.base_url(), None).expect("client");
let state = EngineState::shared();
let poller =
EngineStatePoller::spawn_with_interval(http, state.clone(), Duration::from_millis(10));
let deadline = std::time::Instant::now() + Duration::from_secs(3);
loop {
let ready = state.read().status.is_some();
if ready {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"status never backfilled (retry may have mis-behaved)"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
poller.shutdown().await.expect("clean shutdown");
mock.shutdown().await;
}
}