1use std::collections::HashMap;
19
20use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33#[serde(tag = "mode", rename_all = "kebab-case")]
34pub enum ReplicationMode {
35 None,
36 Checkpointed,
37 WarmStandby { standby_providers: Vec<String> },
38}
39
40impl Default for ReplicationMode {
41 fn default() -> Self {
42 Self::None
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(tag = "policy", rename_all = "kebab-case")]
49pub enum RestartPolicy {
50 Never,
51 OnFailure { max_attempts: u8 },
52}
53
54impl Default for RestartPolicy {
55 fn default() -> Self {
56 Self::OnFailure { max_attempts: 3 }
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum WorkloadState {
68 Provisioning {
69 since: u64,
70 },
71 Live {
72 since: u64,
73 },
74 Suspect {
77 since: u64,
78 },
79 Evicted {
81 at: u64,
82 },
83 Respawning {
86 since: u64,
87 attempts_used: u8,
88 last_error: Option<String>,
89 },
90 Failed {
92 reason: String,
93 },
94}
95
96#[derive(Debug, Clone)]
103pub struct HeartbeatObservation {
104 pub provider_npub: String,
105 pub relay_url: String,
106 pub seen_at: u64,
107 pub event_timestamp: u64,
108}
109
110#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
112pub struct QuorumConfig {
113 pub m: u8,
115 pub n: u8,
118 pub t1_secs: u64,
120 pub t2_secs: u64,
122 pub stale_secs: u64,
124}
125
126impl Default for QuorumConfig {
127 fn default() -> Self {
128 Self {
129 m: 2,
130 n: 3,
131 t1_secs: 120,
132 t2_secs: 300,
133 stale_secs: 180,
134 }
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct DurableWorkload {
141 pub workload_id: u32,
142 pub provider_npub: String,
143 pub state: WorkloadState,
144 pub replication: ReplicationMode,
145 pub restart_policy: RestartPolicy,
146 pub state_uri: Option<String>,
148 pub created_at: u64,
149 pub expires_at: u64,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
157pub enum StateMachineEvent {
158 EnteredLive {
159 workload_id: u32,
160 },
161 EnteredSuspect {
162 workload_id: u32,
163 },
164 Evicted {
165 workload_id: u32,
166 reason: &'static str,
167 },
168 PublishLeaseRevocation {
172 workload_id: u32,
173 standby_providers: Vec<String>,
174 },
175 AttemptRespawn {
178 workload_id: u32,
179 attempt: u8,
180 },
181 Failed {
182 workload_id: u32,
183 reason: String,
184 },
185}
186
187pub struct WorkloadStateMachine {
190 config: QuorumConfig,
191 workloads: HashMap<u32, DurableWorkload>,
192}
193
194impl WorkloadStateMachine {
195 pub fn new(config: QuorumConfig) -> Self {
196 Self {
197 config,
198 workloads: HashMap::new(),
199 }
200 }
201
202 pub fn track(&mut self, workload: DurableWorkload) {
203 self.workloads.insert(workload.workload_id, workload);
204 }
205
206 pub fn untrack(&mut self, workload_id: u32) {
207 self.workloads.remove(&workload_id);
208 }
209
210 pub fn state_of(&self, workload_id: u32) -> Option<&WorkloadState> {
211 self.workloads.get(&workload_id).map(|w| &w.state)
212 }
213
214 pub fn workload(&self, workload_id: u32) -> Option<&DurableWorkload> {
215 self.workloads.get(&workload_id)
216 }
217
218 pub fn tick(
221 &mut self,
222 now: u64,
223 observations: &[HeartbeatObservation],
224 ) -> Vec<StateMachineEvent> {
225 let mut events = Vec::new();
226 let cfg = self.config;
227
228 for workload in self.workloads.values_mut() {
229 let mut live_relays = std::collections::HashSet::new();
235 for obs in observations {
236 if obs.provider_npub != workload.provider_npub {
237 continue;
238 }
239 if obs.event_timestamp + cfg.stale_secs < now {
240 continue;
241 }
242 live_relays.insert(obs.relay_url.clone());
243 }
244 let quorum_alive = live_relays.len() as u8 >= cfg.m;
245
246 advance(workload, now, quorum_alive, &cfg, &mut events);
247 }
248
249 events
250 }
251
252 pub fn notify_respawn_failed(&mut self, workload_id: u32, reason: &str) {
256 let Some(workload) = self.workloads.get_mut(&workload_id) else {
257 return;
258 };
259 let WorkloadState::Respawning {
260 since: _,
261 attempts_used,
262 last_error: _,
263 } = &workload.state
264 else {
265 return;
266 };
267 let attempts_used = *attempts_used;
268
269 let max = match workload.restart_policy {
270 RestartPolicy::OnFailure { max_attempts } => max_attempts,
271 RestartPolicy::Never => 0,
272 };
273
274 if attempts_used >= max {
275 workload.state = WorkloadState::Failed {
276 reason: format!(
277 "respawn exhausted after {} attempt(s): {}",
278 attempts_used, reason
279 ),
280 };
281 } else {
282 workload.state = WorkloadState::Respawning {
285 since: workload_state_since(&workload.state).unwrap_or(0),
286 attempts_used,
287 last_error: Some(reason.to_string()),
288 };
289 }
290 }
291
292 pub fn notify_respawn_succeeded(&mut self, workload_id: u32, now: u64) {
296 if let Some(workload) = self.workloads.get_mut(&workload_id) {
297 workload.state = WorkloadState::Live { since: now };
298 }
299 }
300}
301
302fn workload_state_since(state: &WorkloadState) -> Option<u64> {
303 match state {
304 WorkloadState::Provisioning { since }
305 | WorkloadState::Live { since }
306 | WorkloadState::Suspect { since }
307 | WorkloadState::Respawning { since, .. } => Some(*since),
308 WorkloadState::Evicted { at } => Some(*at),
309 WorkloadState::Failed { .. } => None,
310 }
311}
312
313fn advance(
314 workload: &mut DurableWorkload,
315 now: u64,
316 quorum_alive: bool,
317 cfg: &QuorumConfig,
318 events: &mut Vec<StateMachineEvent>,
319) {
320 match workload.state.clone() {
321 WorkloadState::Provisioning { .. } => {
322 if quorum_alive {
323 workload.state = WorkloadState::Live { since: now };
324 events.push(StateMachineEvent::EnteredLive {
325 workload_id: workload.workload_id,
326 });
327 }
328 }
329 WorkloadState::Live { since } => {
330 if quorum_alive {
331 workload.state = WorkloadState::Live { since };
333 } else if now.saturating_sub(since) >= cfg.t1_secs {
334 workload.state = WorkloadState::Suspect { since: now };
335 events.push(StateMachineEvent::EnteredSuspect {
336 workload_id: workload.workload_id,
337 });
338 }
339 }
340 WorkloadState::Suspect { since } => {
341 if quorum_alive {
342 workload.state = WorkloadState::Live { since: now };
343 events.push(StateMachineEvent::EnteredLive {
344 workload_id: workload.workload_id,
345 });
346 } else if now.saturating_sub(since) >= cfg.t2_secs {
347 evict(workload, now, events);
348 }
349 }
350 WorkloadState::Evicted { .. }
351 | WorkloadState::Respawning { .. }
352 | WorkloadState::Failed { .. } => {
353 }
358 }
359}
360
361fn evict(workload: &mut DurableWorkload, now: u64, events: &mut Vec<StateMachineEvent>) {
362 workload.state = WorkloadState::Evicted { at: now };
363 events.push(StateMachineEvent::Evicted {
364 workload_id: workload.workload_id,
365 reason: "heartbeat-quorum-lost-past-t2",
366 });
367
368 match (&workload.replication, workload.restart_policy) {
369 (ReplicationMode::WarmStandby { standby_providers }, _) => {
370 events.push(StateMachineEvent::PublishLeaseRevocation {
376 workload_id: workload.workload_id,
377 standby_providers: standby_providers.clone(),
378 });
379 }
380 (
381 ReplicationMode::None | ReplicationMode::Checkpointed,
382 RestartPolicy::OnFailure { max_attempts },
383 ) => {
384 if max_attempts == 0 {
385 workload.state = WorkloadState::Failed {
386 reason: "OnFailure with max_attempts=0".to_string(),
387 };
388 events.push(StateMachineEvent::Failed {
389 workload_id: workload.workload_id,
390 reason: "OnFailure with max_attempts=0".to_string(),
391 });
392 } else {
393 let attempt = 1u8;
394 workload.state = WorkloadState::Respawning {
395 since: now,
396 attempts_used: attempt,
397 last_error: None,
398 };
399 events.push(StateMachineEvent::AttemptRespawn {
400 workload_id: workload.workload_id,
401 attempt,
402 });
403 }
404 }
405 (ReplicationMode::None | ReplicationMode::Checkpointed, RestartPolicy::Never) => {
406 workload.state = WorkloadState::Failed {
407 reason: "RestartPolicy::Never on eviction".to_string(),
408 };
409 events.push(StateMachineEvent::Failed {
410 workload_id: workload.workload_id,
411 reason: "RestartPolicy::Never on eviction".to_string(),
412 });
413 }
414 }
415}