Skip to main content

paygress/
durable_workload.rs

1// Durable Workload abstraction (Unit 5 of the 12-month plan,
2// docs/plans/2026-04-26-001-feat-paygress-12mo-vision-plan.md).
3//
4// A workload is described by a structured manifest; the provider
5// tracks each workload via an explicit state machine driven by
6// observed heartbeats from M-of-N Nostr relays.
7//
8// Single-writer invariant
9// -----------------------
10// A workload's state machine emits `PublishLeaseRevocation` only
11// after its own local state has left `Live`. The standby cannot
12// promote until it observes the revocation, so the union of "Live"
13// states across all providers tracking the same workload is at
14// most one. The proptest in `tests/durable_workload.rs` checks
15// the local half of this invariant; cross-provider integration is
16// out of scope here.
17
18use std::collections::HashMap;
19
20use serde::{Deserialize, Serialize};
21
22/// Replication / availability mode chosen by the consumer at spawn.
23///
24/// `None` is cheapest: one container, no checkpoint, no failover.
25/// `WarmStandby` registers a list of standby providers; on
26/// eviction the state machine emits `PublishLeaseRevocation` so the
27/// caller can hand off the lease.
28///
29/// `Checkpointed` (without warm-standby) is reserved for Unit 6 —
30/// the consumer-side SDK will respawn from the latest Blossom
31/// checkpoint on a fresh provider.
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33#[serde(tag = "mode", rename_all = "kebab-case")]
34pub enum ReplicationMode {
35    None,
36    Checkpointed,
37    WarmStandby { standby_providers: Vec<String> },
38}
39
40impl Default for ReplicationMode {
41    fn default() -> Self {
42        Self::None
43    }
44}
45
46/// What to do when a workload exits unexpectedly.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(tag = "policy", rename_all = "kebab-case")]
49pub enum RestartPolicy {
50    Never,
51    OnFailure { max_attempts: u8 },
52}
53
54impl Default for RestartPolicy {
55    fn default() -> Self {
56        Self::OnFailure { max_attempts: 3 }
57    }
58}
59
60/// Lifecycle state for a workload tracked by a single provider.
61///
62/// Mermaid diagram is in the 12-month plan. Roughly:
63/// `Provisioning -> Live -> Suspect -> {Live, Evicted}`
64/// `Evicted -> {Respawning, Failed}` depending on replication +
65/// restart policy.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum WorkloadState {
68    Provisioning {
69        since: u64,
70    },
71    Live {
72        since: u64,
73    },
74    /// Observed silent on too many relays; debounce window before
75    /// eviction.
76    Suspect {
77        since: u64,
78    },
79    /// Heartbeats absent past T2; lease is forfeit on this provider.
80    Evicted {
81        at: u64,
82    },
83    /// Restart in progress (only when `RestartPolicy::OnFailure`
84    /// and replication is `None`).
85    Respawning {
86        since: u64,
87        attempts_used: u8,
88        last_error: Option<String>,
89    },
90    /// Terminal: lease is dead and not coming back here.
91    Failed {
92        reason: String,
93    },
94}
95
96/// A heartbeat the provider received from the relay pool.
97///
98/// `seen_at` is when our local clock saw the event (for `t1/t2`
99/// timing); `event_timestamp` is the heartbeat's claimed creation
100/// time. Heartbeats whose event_timestamp is older than
101/// `stale_secs` are ignored to defeat replay-on-relay.
102#[derive(Debug, Clone)]
103pub struct HeartbeatObservation {
104    pub provider_npub: String,
105    pub relay_url: String,
106    pub seen_at: u64,
107    pub event_timestamp: u64,
108}
109
110/// Operator-tunable timing + quorum knobs.
111#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
112pub struct QuorumConfig {
113    /// Required count of live relays (M).
114    pub m: u8,
115    /// Total relay set size (N). Used for symmetry; quorum logic
116    /// only requires that we observe `m` distinct live relays.
117    pub n: u8,
118    /// Live → Suspect after this many seconds with quorum lost.
119    pub t1_secs: u64,
120    /// Suspect → Evicted after this many seconds without recovery.
121    pub t2_secs: u64,
122    /// Heartbeats older than this on the wire don't count.
123    pub stale_secs: u64,
124}
125
126impl Default for QuorumConfig {
127    fn default() -> Self {
128        Self {
129            m: 2,
130            n: 3,
131            t1_secs: 120,
132            t2_secs: 300,
133            stale_secs: 180,
134        }
135    }
136}
137
138/// One workload as tracked by a provider.
139#[derive(Debug, Clone)]
140pub struct DurableWorkload {
141    pub workload_id: u32,
142    pub provider_npub: String,
143    pub state: WorkloadState,
144    pub replication: ReplicationMode,
145    pub restart_policy: RestartPolicy,
146    /// Optional Blossom URI of the latest checkpoint (Unit 6).
147    pub state_uri: Option<String>,
148    pub created_at: u64,
149    pub expires_at: u64,
150}
151
152/// Side-effects the state machine asks the controller to perform.
153/// The state machine never does I/O — it returns events and the
154/// caller (typically `ProviderService::run`'s fourth concurrent
155/// loop) translates them into Nostr publishes / backend respawns.
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub enum StateMachineEvent {
158    EnteredLive {
159        workload_id: u32,
160    },
161    EnteredSuspect {
162        workload_id: u32,
163    },
164    Evicted {
165        workload_id: u32,
166        reason: &'static str,
167    },
168    /// The local state has left `Live`. The controller should
169    /// publish a `LeaseRevocation` Nostr event addressed to the
170    /// listed standby providers so exactly one of them can promote.
171    PublishLeaseRevocation {
172        workload_id: u32,
173        standby_providers: Vec<String>,
174    },
175    /// Controller should attempt a local respawn (None + OnFailure).
176    /// The result is fed back via `notify_respawn_failed`.
177    AttemptRespawn {
178        workload_id: u32,
179        attempt: u8,
180    },
181    Failed {
182        workload_id: u32,
183        reason: String,
184    },
185}
186
187/// The state machine. Holds a map of tracked workloads keyed by
188/// `workload_id`. Drives transitions on each `tick`.
189pub struct WorkloadStateMachine {
190    config: QuorumConfig,
191    workloads: HashMap<u32, DurableWorkload>,
192}
193
194impl WorkloadStateMachine {
195    pub fn new(config: QuorumConfig) -> Self {
196        Self {
197            config,
198            workloads: HashMap::new(),
199        }
200    }
201
202    pub fn track(&mut self, workload: DurableWorkload) {
203        self.workloads.insert(workload.workload_id, workload);
204    }
205
206    pub fn untrack(&mut self, workload_id: u32) {
207        self.workloads.remove(&workload_id);
208    }
209
210    pub fn state_of(&self, workload_id: u32) -> Option<&WorkloadState> {
211        self.workloads.get(&workload_id).map(|w| &w.state)
212    }
213
214    pub fn workload(&self, workload_id: u32) -> Option<&DurableWorkload> {
215        self.workloads.get(&workload_id)
216    }
217
218    /// Apply observations and advance every tracked workload.
219    /// Returns the side-effects the controller must perform.
220    pub fn tick(
221        &mut self,
222        now: u64,
223        observations: &[HeartbeatObservation],
224    ) -> Vec<StateMachineEvent> {
225        let mut events = Vec::new();
226        let cfg = self.config;
227
228        for workload in self.workloads.values_mut() {
229            // Live relays = distinct relays where this provider was
230            // observed within stale_secs. We trust `event_timestamp`
231            // because the relay round-trip already authenticated it
232            // (signed event); replay-on-relay is defeated by
233            // staleness.
234            let mut live_relays = std::collections::HashSet::new();
235            for obs in observations {
236                if obs.provider_npub != workload.provider_npub {
237                    continue;
238                }
239                if obs.event_timestamp + cfg.stale_secs < now {
240                    continue;
241                }
242                live_relays.insert(obs.relay_url.clone());
243            }
244            let quorum_alive = live_relays.len() as u8 >= cfg.m;
245
246            advance(workload, now, quorum_alive, &cfg, &mut events);
247        }
248
249        events
250    }
251
252    /// Controller reports that a respawn attempt for a workload
253    /// failed. The state machine either retries (if attempts
254    /// remain) or marks the workload `Failed`.
255    pub fn notify_respawn_failed(&mut self, workload_id: u32, reason: &str) {
256        let Some(workload) = self.workloads.get_mut(&workload_id) else {
257            return;
258        };
259        let WorkloadState::Respawning {
260            since: _,
261            attempts_used,
262            last_error: _,
263        } = &workload.state
264        else {
265            return;
266        };
267        let attempts_used = *attempts_used;
268
269        let max = match workload.restart_policy {
270            RestartPolicy::OnFailure { max_attempts } => max_attempts,
271            RestartPolicy::Never => 0,
272        };
273
274        if attempts_used >= max {
275            workload.state = WorkloadState::Failed {
276                reason: format!(
277                    "respawn exhausted after {} attempt(s): {}",
278                    attempts_used, reason
279                ),
280            };
281        } else {
282            // Hold in Respawning; the controller can re-attempt on
283            // its own cadence. Record the error for diagnostics.
284            workload.state = WorkloadState::Respawning {
285                since: workload_state_since(&workload.state).unwrap_or(0),
286                attempts_used,
287                last_error: Some(reason.to_string()),
288            };
289        }
290    }
291
292    /// Controller reports that a respawn attempt succeeded. State
293    /// transitions back to Live (heartbeats from the new container
294    /// will keep it there).
295    pub fn notify_respawn_succeeded(&mut self, workload_id: u32, now: u64) {
296        if let Some(workload) = self.workloads.get_mut(&workload_id) {
297            workload.state = WorkloadState::Live { since: now };
298        }
299    }
300}
301
302fn workload_state_since(state: &WorkloadState) -> Option<u64> {
303    match state {
304        WorkloadState::Provisioning { since }
305        | WorkloadState::Live { since }
306        | WorkloadState::Suspect { since }
307        | WorkloadState::Respawning { since, .. } => Some(*since),
308        WorkloadState::Evicted { at } => Some(*at),
309        WorkloadState::Failed { .. } => None,
310    }
311}
312
313fn advance(
314    workload: &mut DurableWorkload,
315    now: u64,
316    quorum_alive: bool,
317    cfg: &QuorumConfig,
318    events: &mut Vec<StateMachineEvent>,
319) {
320    match workload.state.clone() {
321        WorkloadState::Provisioning { .. } => {
322            if quorum_alive {
323                workload.state = WorkloadState::Live { since: now };
324                events.push(StateMachineEvent::EnteredLive {
325                    workload_id: workload.workload_id,
326                });
327            }
328        }
329        WorkloadState::Live { since } => {
330            if quorum_alive {
331                // refresh
332                workload.state = WorkloadState::Live { since };
333            } else if now.saturating_sub(since) >= cfg.t1_secs {
334                workload.state = WorkloadState::Suspect { since: now };
335                events.push(StateMachineEvent::EnteredSuspect {
336                    workload_id: workload.workload_id,
337                });
338            }
339        }
340        WorkloadState::Suspect { since } => {
341            if quorum_alive {
342                workload.state = WorkloadState::Live { since: now };
343                events.push(StateMachineEvent::EnteredLive {
344                    workload_id: workload.workload_id,
345                });
346            } else if now.saturating_sub(since) >= cfg.t2_secs {
347                evict(workload, now, events);
348            }
349        }
350        WorkloadState::Evicted { .. }
351        | WorkloadState::Respawning { .. }
352        | WorkloadState::Failed { .. } => {
353            // Terminal-ish: the controller drives transitions out
354            // of these via notify_respawn_succeeded / failed. We
355            // don't auto-recover from quorum because the original
356            // container is gone.
357        }
358    }
359}
360
361fn evict(workload: &mut DurableWorkload, now: u64, events: &mut Vec<StateMachineEvent>) {
362    workload.state = WorkloadState::Evicted { at: now };
363    events.push(StateMachineEvent::Evicted {
364        workload_id: workload.workload_id,
365        reason: "heartbeat-quorum-lost-past-t2",
366    });
367
368    match (&workload.replication, workload.restart_policy) {
369        (ReplicationMode::WarmStandby { standby_providers }, _) => {
370            // Single-writer invariant: emit revocation only AFTER
371            // the local state has left Live (we just set it to
372            // Evicted above). Standby cannot promote until it
373            // observes this event; in the local state machine
374            // we stay in Evicted.
375            events.push(StateMachineEvent::PublishLeaseRevocation {
376                workload_id: workload.workload_id,
377                standby_providers: standby_providers.clone(),
378            });
379        }
380        (
381            ReplicationMode::None | ReplicationMode::Checkpointed,
382            RestartPolicy::OnFailure { max_attempts },
383        ) => {
384            if max_attempts == 0 {
385                workload.state = WorkloadState::Failed {
386                    reason: "OnFailure with max_attempts=0".to_string(),
387                };
388                events.push(StateMachineEvent::Failed {
389                    workload_id: workload.workload_id,
390                    reason: "OnFailure with max_attempts=0".to_string(),
391                });
392            } else {
393                let attempt = 1u8;
394                workload.state = WorkloadState::Respawning {
395                    since: now,
396                    attempts_used: attempt,
397                    last_error: None,
398                };
399                events.push(StateMachineEvent::AttemptRespawn {
400                    workload_id: workload.workload_id,
401                    attempt,
402                });
403            }
404        }
405        (ReplicationMode::None | ReplicationMode::Checkpointed, RestartPolicy::Never) => {
406            workload.state = WorkloadState::Failed {
407                reason: "RestartPolicy::Never on eviction".to_string(),
408            };
409            events.push(StateMachineEvent::Failed {
410                workload_id: workload.workload_id,
411                reason: "RestartPolicy::Never on eviction".to_string(),
412            });
413        }
414    }
415}