Skip to main content

dev_stress/
soak.rs

1//! Soak testing: run a workload for a sustained duration and capture
2//! ops/sec, latency, and degradation per checkpoint window.
3//!
4//! Where [`StressRun`](crate::StressRun) is iteration-bounded, [`SoakRun`]
5//! is duration-bounded. It runs for `total_duration`, recording one
6//! [`SoakCheckpoint`] every `checkpoint_interval`. Comparing
7//! checkpoints surfaces drift the stress aggregate would smooth over.
8
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use dev_report::{CheckResult, Evidence, Severity};
14
15use crate::{LatencyStats, LatencyTracker, Workload};
16
17/// Configuration for a soak run.
18///
19/// # Example
20///
21/// ```
22/// use dev_stress::SoakRun;
23/// use std::time::Duration;
24///
25/// let run = SoakRun::new("steady_state")
26///     .duration(Duration::from_millis(500))
27///     .checkpoint(Duration::from_millis(100))
28///     .threads(2);
29/// ```
30pub struct SoakRun {
31    name: String,
32    total_duration: Duration,
33    checkpoint_interval: Duration,
34    threads: usize,
35    track_latency: Option<usize>,
36}
37
38impl SoakRun {
39    /// Begin building a soak run with a stable name.
40    pub fn new(name: impl Into<String>) -> Self {
41        Self {
42            name: name.into(),
43            total_duration: Duration::from_secs(60),
44            checkpoint_interval: Duration::from_secs(10),
45            threads: 1,
46            track_latency: None,
47        }
48    }
49
50    /// Total wall-clock duration the soak runs for.
51    pub fn duration(mut self, d: Duration) -> Self {
52        self.total_duration = d;
53        self
54    }
55
56    /// Wall-clock interval between checkpoints.
57    pub fn checkpoint(mut self, d: Duration) -> Self {
58        self.checkpoint_interval = d;
59        self
60    }
61
62    /// Number of OS threads. Minimum is `1`.
63    pub fn threads(mut self, n: usize) -> Self {
64        self.threads = n.max(1);
65        self
66    }
67
68    /// Track per-operation latency, sampling 1 of every `rate` iterations.
69    pub fn track_latency(mut self, rate: usize) -> Self {
70        self.track_latency = Some(rate.max(1));
71        self
72    }
73
74    /// Execute the soak run.
75    ///
76    /// Returns when `total_duration` has elapsed. Threads observe a
77    /// shared `stop` flag and finish their current iteration before
78    /// joining.
79    pub fn execute<W>(&self, workload: &W) -> SoakResult
80    where
81        W: Workload + Clone + 'static,
82    {
83        let stop = Arc::new(AtomicBool::new(false));
84        let total_iters = Arc::new(AtomicUsize::new(0));
85        let workload = Arc::new(workload.clone());
86        let started = Instant::now();
87
88        // Worker threads.
89        let mut handles = Vec::with_capacity(self.threads);
90        for _ in 0..self.threads {
91            let w = workload.clone();
92            let stop = stop.clone();
93            let total = total_iters.clone();
94            let track = self.track_latency;
95            handles.push(std::thread::spawn(move || {
96                let start = Instant::now();
97                let mut tracker = track.map(LatencyTracker::new);
98                let mut local_count: usize = 0;
99                while !stop.load(Ordering::Relaxed) {
100                    if let Some(t) = tracker.as_mut() {
101                        t.record(local_count, || w.run_once());
102                    } else {
103                        w.run_once();
104                    }
105                    local_count = local_count.wrapping_add(1);
106                    // Periodic flush to the shared counter.
107                    if local_count % 1024 == 0 {
108                        total.fetch_add(1024, Ordering::Relaxed);
109                    }
110                }
111                // Flush remainder.
112                let remainder = local_count % 1024;
113                if remainder != 0 {
114                    total.fetch_add(remainder, Ordering::Relaxed);
115                }
116                (start.elapsed(), tracker)
117            }));
118        }
119
120        // Driver thread: every checkpoint_interval, snapshot the
121        // running counter and any latency stats from a separate sample
122        // pool we maintain here. Latency in checkpoints is approximate:
123        // we only know the cumulative latency at finish.
124        let mut checkpoints: Vec<SoakCheckpoint> = Vec::new();
125        let mut last_iters = 0usize;
126        let mut last_at = started;
127        let end_at = started + self.total_duration;
128        loop {
129            let now = Instant::now();
130            if now >= end_at {
131                break;
132            }
133            let next = (last_at + self.checkpoint_interval).min(end_at);
134            let sleep_for = next.saturating_duration_since(now);
135            std::thread::sleep(sleep_for);
136            let now_iters = total_iters.load(Ordering::Relaxed);
137            let window_iters = now_iters - last_iters;
138            let window_dur = next - last_at;
139            let ops_per_sec = if window_dur.is_zero() {
140                0.0
141            } else {
142                window_iters as f64 / window_dur.as_secs_f64()
143            };
144            checkpoints.push(SoakCheckpoint {
145                at_offset: next - started,
146                window_iters,
147                window_duration: window_dur,
148                ops_per_sec,
149            });
150            last_iters = now_iters;
151            last_at = next;
152        }
153        stop.store(true, Ordering::Relaxed);
154
155        let mut thread_times = Vec::with_capacity(self.threads);
156        let mut latency_samples: Vec<Duration> = Vec::new();
157        for h in handles {
158            let (elapsed, tracker) = h.join().unwrap();
159            thread_times.push(elapsed);
160            if let Some(t) = tracker {
161                latency_samples.extend(t.into_samples());
162            }
163        }
164        let total_elapsed = started.elapsed();
165        let total_iters_final = total_iters.load(Ordering::Relaxed);
166
167        SoakResult {
168            name: self.name.clone(),
169            iterations: total_iters_final,
170            threads: self.threads,
171            total_elapsed,
172            thread_times,
173            latency: if latency_samples.is_empty() {
174                None
175            } else {
176                Some(LatencyStats::from_samples(latency_samples))
177            },
178            checkpoints,
179        }
180    }
181}
182
183/// One sampling window inside a [`SoakResult`].
184#[derive(Debug, Clone, Copy, PartialEq)]
185pub struct SoakCheckpoint {
186    /// Offset from the run start to the end of this checkpoint.
187    pub at_offset: Duration,
188    /// Iterations executed during this window across all threads.
189    pub window_iters: usize,
190    /// Wall-clock duration of this window.
191    pub window_duration: Duration,
192    /// Throughput during this window.
193    pub ops_per_sec: f64,
194}
195
196/// Result of a soak run.
197///
198/// # Example
199///
200/// ```no_run
201/// use dev_stress::{SoakRun, Workload};
202/// use std::time::Duration;
203///
204/// #[derive(Clone)]
205/// struct Noop;
206/// impl Workload for Noop { fn run_once(&self) {} }
207///
208/// let r = SoakRun::new("steady")
209///     .duration(Duration::from_millis(50))
210///     .checkpoint(Duration::from_millis(10))
211///     .threads(1)
212///     .execute(&Noop);
213/// assert!(!r.checkpoints.is_empty());
214/// ```
215#[derive(Debug, Clone)]
216pub struct SoakResult {
217    /// Stable name of the run.
218    pub name: String,
219    /// Total iterations across all threads.
220    pub iterations: usize,
221    /// Threads used.
222    pub threads: usize,
223    /// Wall-clock duration of the soak.
224    pub total_elapsed: Duration,
225    /// Per-thread elapsed times.
226    pub thread_times: Vec<Duration>,
227    /// Aggregate latency stats across the whole run.
228    pub latency: Option<LatencyStats>,
229    /// Per-window checkpoints captured during the soak.
230    pub checkpoints: Vec<SoakCheckpoint>,
231}
232
233impl SoakResult {
234    /// Effective throughput in operations per second across the whole soak.
235    pub fn ops_per_sec(&self) -> f64 {
236        if self.total_elapsed.is_zero() {
237            return 0.0;
238        }
239        self.iterations as f64 / self.total_elapsed.as_secs_f64()
240    }
241
242    /// Coefficient of variation of `ops_per_sec` across checkpoints.
243    ///
244    /// High values indicate the workload is degrading or fluctuating
245    /// over time; low values indicate steady state.
246    pub fn checkpoint_ops_cv(&self) -> f64 {
247        if self.checkpoints.len() < 2 {
248            return 0.0;
249        }
250        let n = self.checkpoints.len() as f64;
251        let mean: f64 = self.checkpoints.iter().map(|c| c.ops_per_sec).sum::<f64>() / n;
252        if mean == 0.0 {
253            return 0.0;
254        }
255        let var = self
256            .checkpoints
257            .iter()
258            .map(|c| (c.ops_per_sec - mean).powi(2))
259            .sum::<f64>()
260            / n;
261        var.sqrt() / mean
262    }
263
264    /// Convert this result into a `CheckResult`.
265    ///
266    /// Default verdict logic:
267    /// - No checkpoints (or only one) -> `Skip` with detail.
268    /// - `degradation_pct_threshold` exceeded between first-half and
269    ///   second-half mean ops/sec -> `Fail+Warning`.
270    /// - Otherwise `Pass`.
271    ///
272    /// Always carries the `stress` and `soak` tags plus numeric
273    /// evidence for `iterations`, `threads`, `ops_per_sec`,
274    /// `total_elapsed_ms`, `checkpoint_count`, `checkpoint_ops_cv`,
275    /// `first_half_ops`, `second_half_ops`.
276    pub fn into_check_result(self, degradation_pct_threshold: f64) -> CheckResult {
277        let name = format!("stress::soak::{}", self.name);
278        let mut evidence = vec![
279            Evidence::numeric("iterations", self.iterations as f64),
280            Evidence::numeric("threads", self.threads as f64),
281            Evidence::numeric("ops_per_sec", self.ops_per_sec()),
282            Evidence::numeric(
283                "total_elapsed_ms",
284                self.total_elapsed.as_secs_f64() * 1000.0,
285            ),
286            Evidence::numeric("checkpoint_count", self.checkpoints.len() as f64),
287            Evidence::numeric("checkpoint_ops_cv", self.checkpoint_ops_cv()),
288        ];
289        if let Some(lat) = &self.latency {
290            evidence.push(Evidence::numeric(
291                "latency_p50_ns",
292                lat.p50.as_nanos() as f64,
293            ));
294            evidence.push(Evidence::numeric(
295                "latency_p95_ns",
296                lat.p95.as_nanos() as f64,
297            ));
298            evidence.push(Evidence::numeric(
299                "latency_p99_ns",
300                lat.p99.as_nanos() as f64,
301            ));
302        }
303        let tags = vec!["stress".to_string(), "soak".to_string()];
304
305        if self.checkpoints.len() < 2 {
306            let mut c = CheckResult::skip(name).with_detail(format!(
307                "fewer than 2 checkpoints (got {})",
308                self.checkpoints.len()
309            ));
310            c.tags = tags;
311            c.evidence = evidence;
312            return c;
313        }
314
315        let mid = self.checkpoints.len() / 2;
316        let first_half_mean = mean_ops(&self.checkpoints[..mid]);
317        let second_half_mean = mean_ops(&self.checkpoints[mid..]);
318        evidence.push(Evidence::numeric("first_half_ops", first_half_mean));
319        evidence.push(Evidence::numeric("second_half_ops", second_half_mean));
320
321        if first_half_mean == 0.0 {
322            let mut c = CheckResult::pass(name)
323                .with_detail("first-half throughput was zero, skipping degradation check");
324            c.tags = tags;
325            c.evidence = evidence;
326            return c;
327        }
328
329        let drop_pct = ((first_half_mean - second_half_mean) / first_half_mean) * 100.0;
330        let detail = format!(
331            "iterations={} elapsed={:.3}s ops/sec={:.0} checkpoints={} first_half_ops={:.0} second_half_ops={:.0} drop={:.2}%",
332            self.iterations,
333            self.total_elapsed.as_secs_f64(),
334            self.ops_per_sec(),
335            self.checkpoints.len(),
336            first_half_mean,
337            second_half_mean,
338            drop_pct
339        );
340
341        if drop_pct > degradation_pct_threshold {
342            let mut tags = tags;
343            tags.push("regression".to_string());
344            let mut c = CheckResult::fail(name, Severity::Warning).with_detail(detail);
345            c.tags = tags;
346            c.evidence = evidence;
347            c
348        } else {
349            let mut c = CheckResult::pass(name).with_detail(detail);
350            c.tags = tags;
351            c.evidence = evidence;
352            c
353        }
354    }
355}
356
357fn mean_ops(checkpoints: &[SoakCheckpoint]) -> f64 {
358    if checkpoints.is_empty() {
359        return 0.0;
360    }
361    checkpoints.iter().map(|c| c.ops_per_sec).sum::<f64>() / checkpoints.len() as f64
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use dev_report::Verdict;
368
369    #[derive(Clone)]
370    struct Noop;
371    impl Workload for Noop {
372        fn run_once(&self) {
373            std::hint::black_box(1 + 1);
374        }
375    }
376
377    #[test]
378    fn soak_runs_for_duration_and_records_checkpoints() {
379        let r = SoakRun::new("steady")
380            .duration(Duration::from_millis(150))
381            .checkpoint(Duration::from_millis(50))
382            .threads(2)
383            .execute(&Noop);
384        assert!(r.iterations > 0);
385        assert!(!r.checkpoints.is_empty());
386        assert!(r.total_elapsed >= Duration::from_millis(140));
387    }
388
389    #[test]
390    fn soak_fewer_than_two_checkpoints_skips() {
391        let r = SoakRun::new("brief")
392            .duration(Duration::from_millis(20))
393            .checkpoint(Duration::from_millis(50))
394            .threads(1)
395            .execute(&Noop);
396        let c = r.into_check_result(20.0);
397        // 0 or 1 checkpoint -> Skip.
398        if c.verdict != Verdict::Skip {
399            // Could occasionally land 1 checkpoint; either way the
400            // verdict should not be Fail.
401            assert_ne!(c.verdict, Verdict::Fail);
402        }
403    }
404
405    #[test]
406    fn soak_with_latency_tracking_records_percentiles() {
407        let r = SoakRun::new("hot")
408            .duration(Duration::from_millis(80))
409            .checkpoint(Duration::from_millis(20))
410            .threads(2)
411            .track_latency(1)
412            .execute(&Noop);
413        assert!(r.latency.is_some());
414    }
415
416    #[test]
417    fn soak_check_carries_tags_and_evidence() {
418        let r = SoakRun::new("steady")
419            .duration(Duration::from_millis(80))
420            .checkpoint(Duration::from_millis(20))
421            .threads(1)
422            .execute(&Noop);
423        let c = r.into_check_result(20.0);
424        assert!(c.has_tag("stress"));
425        assert!(c.has_tag("soak"));
426        let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
427        assert!(labels.contains(&"checkpoint_count"));
428        assert!(labels.contains(&"checkpoint_ops_cv"));
429    }
430
431    #[test]
432    fn checkpoint_ops_cv_is_low_for_uniform_load() {
433        let r = SoakRun::new("steady")
434            .duration(Duration::from_millis(100))
435            .checkpoint(Duration::from_millis(20))
436            .threads(2)
437            .execute(&Noop);
438        // CV is bounded; the value depends on machine, just sanity check.
439        assert!(r.checkpoint_ops_cv() >= 0.0);
440    }
441}