use std::sync::Arc;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use zero_operator_state::Snapshot as OperatorSnapshot;
use crate::models::{LiveCockpit, Positions, Regime, Risk, V2Status};
use crate::stat::{Source, Stat};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ConnectionHealth {
pub ws_connected: bool,
pub reconnect_count: u32,
pub total_attempts: u64,
pub last_reconnect_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Default, Clone)]
pub struct EngineState {
pub status: Option<Stat<V2Status>>,
pub positions: Option<Stat<Positions>>,
pub risk: Option<Stat<Risk>>,
pub regime: Option<Stat<Regime>>,
pub live_cockpit: Option<Stat<LiveCockpit>>,
pub operator_state: Option<Stat<OperatorSnapshot>>,
pub last_heartbeat: Option<DateTime<Utc>>,
pub connection: ConnectionHealth,
}
impl EngineState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn shared() -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self::new()))
}
#[must_use]
pub fn feed_age_seconds(&self, now: DateTime<Utc>) -> Option<i64> {
self.last_heartbeat
.map(|hb| now.signed_duration_since(hb).num_seconds())
}
#[must_use]
pub fn hl_rate_snapshot(&self) -> Option<crate::models::HlRate> {
self.status.as_ref().and_then(|s| s.value.hl_rate)
}
pub fn on_ws_connected(&mut self) {
self.connection.ws_connected = true;
self.connection.reconnect_count = 0;
}
pub fn on_ws_disconnected(&mut self) {
self.connection.ws_connected = false;
}
pub fn on_reconnect_attempt(&mut self, at: DateTime<Utc>) {
self.connection.reconnect_count = self.connection.reconnect_count.saturating_add(1);
self.connection.total_attempts = self.connection.total_attempts.saturating_add(1);
self.connection.last_reconnect_at = Some(at);
}
pub fn apply_status(&mut self, status: V2Status, as_of: DateTime<Utc>, source: Source) {
self.status = Some(Stat::new(status, source).with_as_of(as_of));
if matches!(source, Source::Ws) {
self.last_heartbeat = Some(as_of);
}
}
pub fn apply_positions(&mut self, positions: Positions, as_of: DateTime<Utc>, source: Source) {
self.positions = Some(Stat::new(positions, source).with_as_of(as_of));
if matches!(source, Source::Ws) {
self.last_heartbeat = Some(as_of);
}
}
pub fn apply_risk(&mut self, risk: Risk, as_of: DateTime<Utc>, source: Source) {
self.risk = Some(Stat::new(risk, source).with_as_of(as_of));
if matches!(source, Source::Ws) {
self.last_heartbeat = Some(as_of);
}
}
pub fn apply_regime(&mut self, regime: Regime, as_of: DateTime<Utc>, source: Source) {
self.regime = Some(Stat::new(regime, source).with_as_of(as_of));
if matches!(source, Source::Ws) {
self.last_heartbeat = Some(as_of);
}
}
pub fn apply_live_cockpit(&mut self, cockpit: LiveCockpit, as_of: DateTime<Utc>) {
self.live_cockpit = Some(Stat::new(cockpit, Source::Http).with_as_of(as_of));
}
pub fn apply_heartbeat(&mut self, at: DateTime<Utc>) {
self.last_heartbeat = Some(at);
}
pub fn apply_operator_state(&mut self, snap: OperatorSnapshot, as_of: DateTime<Utc>) {
self.operator_state = Some(Stat::new(snap, Source::Http).with_as_of(as_of));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reconnect_counter_ticks_and_resets() {
let mut s = EngineState::new();
assert_eq!(s.connection.reconnect_count, 0);
assert_eq!(s.connection.total_attempts, 0);
assert!(!s.connection.ws_connected);
s.on_reconnect_attempt(Utc::now());
s.on_reconnect_attempt(Utc::now());
assert_eq!(s.connection.reconnect_count, 2);
assert_eq!(s.connection.total_attempts, 2);
assert!(!s.connection.ws_connected);
s.on_ws_connected();
assert_eq!(s.connection.reconnect_count, 0);
assert_eq!(
s.connection.total_attempts, 2,
"total_attempts must survive successful reconnect"
);
assert!(s.connection.ws_connected);
s.on_ws_disconnected();
assert!(!s.connection.ws_connected);
}
#[test]
fn heartbeat_updates_feed_age() {
let mut s = EngineState::new();
let t0 = Utc::now();
s.apply_heartbeat(t0);
let later = t0 + chrono::Duration::seconds(3);
assert_eq!(s.feed_age_seconds(later), Some(3));
}
}