use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum ReplicationMode {
None,
Checkpointed,
WarmStandby { standby_providers: Vec<String> },
}
impl Default for ReplicationMode {
fn default() -> Self {
Self::None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "policy", rename_all = "kebab-case")]
pub enum RestartPolicy {
Never,
OnFailure { max_attempts: u8 },
}
impl Default for RestartPolicy {
fn default() -> Self {
Self::OnFailure { max_attempts: 3 }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkloadState {
Provisioning {
since: u64,
},
Live {
since: u64,
},
Suspect {
since: u64,
},
Evicted {
at: u64,
},
Respawning {
since: u64,
attempts_used: u8,
last_error: Option<String>,
},
Failed {
reason: String,
},
}
#[derive(Debug, Clone)]
pub struct HeartbeatObservation {
pub provider_npub: String,
pub relay_url: String,
pub seen_at: u64,
pub event_timestamp: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct QuorumConfig {
pub m: u8,
pub n: u8,
pub t1_secs: u64,
pub t2_secs: u64,
pub stale_secs: u64,
}
impl Default for QuorumConfig {
fn default() -> Self {
Self {
m: 2,
n: 3,
t1_secs: 120,
t2_secs: 300,
stale_secs: 180,
}
}
}
#[derive(Debug, Clone)]
pub struct DurableWorkload {
pub workload_id: u32,
pub provider_npub: String,
pub state: WorkloadState,
pub replication: ReplicationMode,
pub restart_policy: RestartPolicy,
pub state_uri: Option<String>,
pub created_at: u64,
pub expires_at: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StateMachineEvent {
EnteredLive {
workload_id: u32,
},
EnteredSuspect {
workload_id: u32,
},
Evicted {
workload_id: u32,
reason: &'static str,
},
PublishLeaseRevocation {
workload_id: u32,
standby_providers: Vec<String>,
},
AttemptRespawn {
workload_id: u32,
attempt: u8,
},
Failed {
workload_id: u32,
reason: String,
},
}
pub struct WorkloadStateMachine {
config: QuorumConfig,
workloads: HashMap<u32, DurableWorkload>,
}
impl WorkloadStateMachine {
pub fn new(config: QuorumConfig) -> Self {
Self {
config,
workloads: HashMap::new(),
}
}
pub fn track(&mut self, workload: DurableWorkload) {
self.workloads.insert(workload.workload_id, workload);
}
pub fn untrack(&mut self, workload_id: u32) {
self.workloads.remove(&workload_id);
}
pub fn state_of(&self, workload_id: u32) -> Option<&WorkloadState> {
self.workloads.get(&workload_id).map(|w| &w.state)
}
pub fn workload(&self, workload_id: u32) -> Option<&DurableWorkload> {
self.workloads.get(&workload_id)
}
pub fn tick(
&mut self,
now: u64,
observations: &[HeartbeatObservation],
) -> Vec<StateMachineEvent> {
let mut events = Vec::new();
let cfg = self.config;
for workload in self.workloads.values_mut() {
let mut live_relays = std::collections::HashSet::new();
for obs in observations {
if obs.provider_npub != workload.provider_npub {
continue;
}
if obs.event_timestamp + cfg.stale_secs < now {
continue;
}
live_relays.insert(obs.relay_url.clone());
}
let quorum_alive = live_relays.len() as u8 >= cfg.m;
advance(workload, now, quorum_alive, &cfg, &mut events);
}
events
}
pub fn notify_respawn_failed(&mut self, workload_id: u32, reason: &str) {
let Some(workload) = self.workloads.get_mut(&workload_id) else {
return;
};
let WorkloadState::Respawning {
since: _,
attempts_used,
last_error: _,
} = &workload.state
else {
return;
};
let attempts_used = *attempts_used;
let max = match workload.restart_policy {
RestartPolicy::OnFailure { max_attempts } => max_attempts,
RestartPolicy::Never => 0,
};
if attempts_used >= max {
workload.state = WorkloadState::Failed {
reason: format!(
"respawn exhausted after {} attempt(s): {}",
attempts_used, reason
),
};
} else {
workload.state = WorkloadState::Respawning {
since: workload_state_since(&workload.state).unwrap_or(0),
attempts_used,
last_error: Some(reason.to_string()),
};
}
}
pub fn notify_respawn_succeeded(&mut self, workload_id: u32, now: u64) {
if let Some(workload) = self.workloads.get_mut(&workload_id) {
workload.state = WorkloadState::Live { since: now };
}
}
}
fn workload_state_since(state: &WorkloadState) -> Option<u64> {
match state {
WorkloadState::Provisioning { since }
| WorkloadState::Live { since }
| WorkloadState::Suspect { since }
| WorkloadState::Respawning { since, .. } => Some(*since),
WorkloadState::Evicted { at } => Some(*at),
WorkloadState::Failed { .. } => None,
}
}
fn advance(
workload: &mut DurableWorkload,
now: u64,
quorum_alive: bool,
cfg: &QuorumConfig,
events: &mut Vec<StateMachineEvent>,
) {
match workload.state.clone() {
WorkloadState::Provisioning { .. } => {
if quorum_alive {
workload.state = WorkloadState::Live { since: now };
events.push(StateMachineEvent::EnteredLive {
workload_id: workload.workload_id,
});
}
}
WorkloadState::Live { since } => {
if quorum_alive {
workload.state = WorkloadState::Live { since };
} else if now.saturating_sub(since) >= cfg.t1_secs {
workload.state = WorkloadState::Suspect { since: now };
events.push(StateMachineEvent::EnteredSuspect {
workload_id: workload.workload_id,
});
}
}
WorkloadState::Suspect { since } => {
if quorum_alive {
workload.state = WorkloadState::Live { since: now };
events.push(StateMachineEvent::EnteredLive {
workload_id: workload.workload_id,
});
} else if now.saturating_sub(since) >= cfg.t2_secs {
evict(workload, now, events);
}
}
WorkloadState::Evicted { .. }
| WorkloadState::Respawning { .. }
| WorkloadState::Failed { .. } => {
}
}
}
fn evict(workload: &mut DurableWorkload, now: u64, events: &mut Vec<StateMachineEvent>) {
workload.state = WorkloadState::Evicted { at: now };
events.push(StateMachineEvent::Evicted {
workload_id: workload.workload_id,
reason: "heartbeat-quorum-lost-past-t2",
});
match (&workload.replication, workload.restart_policy) {
(ReplicationMode::WarmStandby { standby_providers }, _) => {
events.push(StateMachineEvent::PublishLeaseRevocation {
workload_id: workload.workload_id,
standby_providers: standby_providers.clone(),
});
}
(
ReplicationMode::None | ReplicationMode::Checkpointed,
RestartPolicy::OnFailure { max_attempts },
) => {
if max_attempts == 0 {
workload.state = WorkloadState::Failed {
reason: "OnFailure with max_attempts=0".to_string(),
};
events.push(StateMachineEvent::Failed {
workload_id: workload.workload_id,
reason: "OnFailure with max_attempts=0".to_string(),
});
} else {
let attempt = 1u8;
workload.state = WorkloadState::Respawning {
since: now,
attempts_used: attempt,
last_error: None,
};
events.push(StateMachineEvent::AttemptRespawn {
workload_id: workload.workload_id,
attempt,
});
}
}
(ReplicationMode::None | ReplicationMode::Checkpointed, RestartPolicy::Never) => {
workload.state = WorkloadState::Failed {
reason: "RestartPolicy::Never on eviction".to_string(),
};
events.push(StateMachineEvent::Failed {
workload_id: workload.workload_id,
reason: "RestartPolicy::Never on eviction".to_string(),
});
}
}
}