Skip to main content

moonpool_sim/runner/
process.rs

1//! Process trait and reboot types for simulation testing.
2//!
3//! Processes represent the **system under test** — server nodes that can be
4//! killed and restarted (rebooted). Each process gets fresh in-memory state
5//! on every boot; persistence is only through storage.
6//!
7//! This is separate from [`Workload`](super::workload::Workload), which
8//! represents the **test driver** that survives server reboots.
9//!
10//! # Usage
11//!
12//! ```ignore
13//! use moonpool_sim::{Process, SimContext, SimulationResult};
14//!
15//! struct PaxosNode;
16//!
17//! #[async_trait(?Send)]
18//! impl Process for PaxosNode {
19//!     fn name(&self) -> &str { "paxos" }
20//!     async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()> {
21//!         let role = ctx.topology().my_tags().get("role")
22//!             .ok_or_else(|| moonpool_sim::SimulationError::InvalidState("missing role tag".into()))?;
23//!         // Run based on assigned role from tags...
24//!         Ok(())
25//!     }
26//! }
27//! ```
28
29use std::ops::Range;
30
31use async_trait::async_trait;
32
33use crate::SimulationResult;
34
35use super::context::SimContext;
36
37/// A process that participates in simulation as part of the system under test.
38///
39/// Processes are the primary unit of server behavior. A fresh instance is created
40/// from the factory on every boot (first boot and every reboot). State only
41/// persists through storage, not in-memory fields.
42///
43/// The process reads its tags and index from [`SimContext`] to determine its role.
44#[async_trait(?Send)]
45pub trait Process: 'static {
46    /// Name of this process type for reporting.
47    fn name(&self) -> &str;
48
49    /// Run the process. Called on each boot (first boot and every reboot).
50    ///
51    /// The [`SimContext`] has fresh providers each boot. The process should
52    /// bind listeners, establish connections, and run its main loop.
53    ///
54    /// Returns when the process exits voluntarily, or gets cancelled on reboot.
55    async fn run(&mut self, ctx: &SimContext) -> SimulationResult<()>;
56}
57
58/// The type of reboot to perform on a process.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum RebootKind {
61    /// Signal shutdown token, wait grace period, drain send buffers, then restart.
62    ///
63    /// The process's `ctx.shutdown()` token fires. The process has a grace period
64    /// to finish up. If it doesn't exit in time, the task is force-cancelled.
65    /// Send buffers drain during the grace period (FIN delivery).
66    Graceful,
67
68    /// Instant kill: task cancelled, all connections abort immediately.
69    ///
70    /// No buffer drain. Peers see connection reset errors. Unsynced storage
71    /// data may be lost (when per-IP storage scoping is implemented).
72    Crash,
73
74    /// Instant kill + wipe all storage for this process.
75    ///
76    /// Same as [`Crash`](RebootKind::Crash) but also deletes all persistent
77    /// storage. Simulates total data loss or a new node joining the cluster.
78    ///
79    /// **Note**: Storage wipe is deferred to future work (storage not yet scoped
80    /// per IP). Currently behaves the same as `Crash`.
81    CrashAndWipe,
82}
83
84/// Built-in attrition configuration for automatic process reboots.
85///
86/// Provides a default chaos mechanism that randomly kills and restarts server
87/// processes during the chaos phase. For custom fault injection strategies,
88/// implement [`FaultInjector`](super::fault_injector::FaultInjector) instead.
89///
90/// # Probabilities
91///
92/// The `prob_*` fields are weights that get normalized internally. They don't
93/// need to sum to 1.0, but all must be non-negative.
94///
95/// # Example
96///
97/// ```ignore
98/// Attrition {
99///     max_dead: 1,
100///     prob_graceful: 0.3,
101///     prob_crash: 0.5,
102///     prob_wipe: 0.2,
103///     recovery_delay_ms: None,
104///     grace_period_ms: None,
105/// }
106/// ```
107#[derive(Debug, Clone, PartialEq)]
108pub struct Attrition {
109    /// Maximum number of simultaneously dead processes.
110    ///
111    /// The attrition injector will not kill a process if the number of currently
112    /// dead (not yet restarted) processes is already at this limit.
113    pub max_dead: usize,
114
115    /// Weight for [`RebootKind::Graceful`] reboots.
116    pub prob_graceful: f64,
117
118    /// Weight for [`RebootKind::Crash`] reboots.
119    pub prob_crash: f64,
120
121    /// Weight for [`RebootKind::CrashAndWipe`] reboots.
122    pub prob_wipe: f64,
123
124    /// Recovery delay range in milliseconds.
125    ///
126    /// After a process is killed (crash or force-kill after grace), it restarts
127    /// after a seeded random delay drawn from this range.
128    ///
129    /// Defaults to `1000..10000` (1-10 seconds) if not set.
130    pub recovery_delay_ms: Option<Range<usize>>,
131
132    /// Grace period range in milliseconds (for graceful reboots).
133    ///
134    /// After the per-process shutdown token is cancelled, the process has this
135    /// long to clean up before being force-killed. The actual duration is a
136    /// seeded random value from this range.
137    ///
138    /// Defaults to `2000..5000` (2-5 seconds) if not set.
139    pub grace_period_ms: Option<Range<usize>>,
140}
141
142impl Attrition {
143    /// Choose a [`RebootKind`] based on the configured probabilities using the
144    /// given random value in `[0.0, 1.0)`.
145    pub(crate) fn choose_kind(&self, rand_val: f64) -> RebootKind {
146        let total = self.prob_graceful + self.prob_crash + self.prob_wipe;
147        if total <= 0.0 {
148            return RebootKind::Crash;
149        }
150
151        let normalized = rand_val * total;
152        if normalized < self.prob_graceful {
153            RebootKind::Graceful
154        } else if normalized < self.prob_graceful + self.prob_crash {
155            RebootKind::Crash
156        } else {
157            RebootKind::CrashAndWipe
158        }
159    }
160}