Skip to main content

dev_stress/
lib.rs

1//! # dev-stress
2//!
3//! High-load stress testing for Rust. Concurrency, volume, saturation
4//! under pressure. Part of the `dev-*` verification suite.
5//!
6//! `dev-stress` is the answer to "does this code survive real load?"
7//! Not "is it fast" (that's `dev-bench`). Not "does it deadlock"
8//! (that's `dev-async`). Not "does it recover from failure" (that's
9//! `dev-chaos`).
10//!
11//! ## Quick example
12//!
13//! ```no_run
14//! use dev_stress::{Workload, StressRun};
15//!
16//! #[derive(Clone)]
17//! struct MyWorkload;
18//! impl Workload for MyWorkload {
19//!     fn run_once(&self) {
20//!         std::hint::black_box(40 + 2);
21//!     }
22//! }
23//!
24//! let run = StressRun::new("hot_path")
25//!     .iterations(100_000)
26//!     .threads(8);
27//!
28//! let result = run.execute(&MyWorkload);
29//! let _check = result.into_check_result(None);
30//! ```
31//!
32//! ## What's measured
33//!
34//! - **`ops_per_sec`** — total iterations divided by total wall time.
35//! - **`thread_time_cv`** — coefficient of variation across per-thread
36//!   elapsed times. High CV indicates load imbalance or contention.
37//! - **`latency_p50/p95/p99`** — per-operation latency percentiles
38//!   (when [`LatencyTracker`] is enabled).
39//!
40//! ## Features
41//!
42//! - `system-stats` (opt-in): measure peak RSS and CPU time via
43//!   `sysinfo`. See [`system`].
44
45#![cfg_attr(docsrs, feature(doc_cfg))]
46#![warn(missing_docs)]
47#![warn(rust_2018_idioms)]
48
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
53
54pub mod latency;
55pub mod soak;
56
57#[cfg(feature = "system-stats")]
58#[cfg_attr(docsrs, doc(cfg(feature = "system-stats")))]
59pub mod system;
60
61pub use latency::{LatencyStats, LatencyTracker};
62pub use soak::{SoakCheckpoint, SoakResult, SoakRun};
63
64/// A workload that can be executed many times under stress.
65///
66/// Implementations MUST be safe to call concurrently from multiple
67/// threads (`Send + Sync`) and MUST be cheap to clone, since each
68/// thread receives an `Arc<Self>`.
69///
70/// # Example
71///
72/// ```
73/// use dev_stress::Workload;
74///
75/// #[derive(Clone)]
76/// struct Noop;
77/// impl Workload for Noop {
78///     fn run_once(&self) {
79///         std::hint::black_box(1 + 1);
80///     }
81/// }
82/// ```
83pub trait Workload: Send + Sync {
84    /// Execute one unit of work. MUST be safe to call concurrently
85    /// from multiple threads.
86    fn run_once(&self);
87}
88
89/// Configuration for a stress run.
90///
91/// # Example
92///
93/// ```
94/// use dev_stress::StressRun;
95///
96/// let run = StressRun::new("hot_path").iterations(1_000).threads(4);
97/// assert_eq!(run.iterations_planned(), 1_000);
98/// assert_eq!(run.threads_planned(), 4);
99/// ```
100pub struct StressRun {
101    name: String,
102    iterations: usize,
103    threads: usize,
104    track_latency: Option<usize>, // None = off; Some(n) = sample 1/n iterations
105}
106
107impl StressRun {
108    /// Begin building a stress run with a stable name.
109    pub fn new(name: impl Into<String>) -> Self {
110        Self {
111            name: name.into(),
112            iterations: 1_000,
113            threads: 1,
114            track_latency: None,
115        }
116    }
117
118    /// Total iterations across all threads.
119    pub fn iterations(mut self, n: usize) -> Self {
120        self.iterations = n;
121        self
122    }
123
124    /// Number of OS threads to run concurrently. Minimum is `1`.
125    pub fn threads(mut self, n: usize) -> Self {
126        self.threads = n.max(1);
127        self
128    }
129
130    /// Track per-operation latency, sampling 1 of every `rate` iterations.
131    ///
132    /// `rate = 1` records every iteration; `rate = 100` records 1% of
133    /// iterations. Lower rates lower memory and overhead at the cost
134    /// of percentile precision.
135    ///
136    /// # Example
137    ///
138    /// ```
139    /// use dev_stress::StressRun;
140    ///
141    /// let run = StressRun::new("hot").iterations(10_000).threads(2)
142    ///     .track_latency(10); // 10% sample rate
143    /// assert_eq!(run.iterations_planned(), 10_000);
144    /// ```
145    pub fn track_latency(mut self, rate: usize) -> Self {
146        self.track_latency = Some(rate.max(1));
147        self
148    }
149
150    /// The configured iteration count.
151    pub fn iterations_planned(&self) -> usize {
152        self.iterations
153    }
154
155    /// The configured thread count.
156    pub fn threads_planned(&self) -> usize {
157        self.threads
158    }
159
160    /// Execute the run. Returns a result with timing statistics.
161    pub fn execute<W>(&self, workload: &W) -> StressResult
162    where
163        W: Workload + Clone + 'static,
164    {
165        let per_thread = self.iterations / self.threads;
166        let leftover = self.iterations % self.threads;
167        let started = Instant::now();
168        let mut handles = Vec::with_capacity(self.threads);
169        let workload = Arc::new(workload.clone());
170
171        for t in 0..self.threads {
172            let count = per_thread + if t < leftover { 1 } else { 0 };
173            let w = workload.clone();
174            let track = self.track_latency;
175            handles.push(std::thread::spawn(move || {
176                let start = Instant::now();
177                let mut tracker = track.map(LatencyTracker::new);
178                for i in 0..count {
179                    if let Some(t) = tracker.as_mut() {
180                        t.record(i, || w.run_once());
181                    } else {
182                        w.run_once();
183                    }
184                }
185                (start.elapsed(), tracker)
186            }));
187        }
188
189        let mut thread_times = Vec::with_capacity(self.threads);
190        let mut latency_samples: Vec<Duration> = Vec::new();
191        for h in handles {
192            let (elapsed, tracker) = h.join().unwrap();
193            thread_times.push(elapsed);
194            if let Some(t) = tracker {
195                latency_samples.extend(t.into_samples());
196            }
197        }
198        let total_elapsed = started.elapsed();
199
200        StressResult {
201            name: self.name.clone(),
202            iterations: self.iterations,
203            threads: self.threads,
204            total_elapsed,
205            thread_times,
206            latency: if latency_samples.is_empty() {
207                None
208            } else {
209                Some(LatencyStats::from_samples(latency_samples))
210            },
211        }
212    }
213}
214
215/// Result of a stress run.
216///
217/// # Example
218///
219/// ```no_run
220/// use dev_stress::{StressRun, Workload};
221///
222/// #[derive(Clone)]
223/// struct Noop;
224/// impl Workload for Noop {
225///     fn run_once(&self) { std::hint::black_box(1 + 1); }
226/// }
227///
228/// let r = StressRun::new("noop").iterations(100).threads(2).execute(&Noop);
229/// assert!(r.ops_per_sec() > 0.0);
230/// ```
231#[derive(Debug, Clone)]
232pub struct StressResult {
233    /// Stable name of the run.
234    pub name: String,
235    /// Iterations actually executed.
236    pub iterations: usize,
237    /// Threads used.
238    pub threads: usize,
239    /// Wall-clock time from run start to all threads finishing.
240    pub total_elapsed: Duration,
241    /// Per-thread elapsed times. Variance here indicates contention.
242    pub thread_times: Vec<Duration>,
243    /// Per-operation latency percentiles, when [`StressRun::track_latency`]
244    /// was enabled. `None` otherwise.
245    pub latency: Option<LatencyStats>,
246}
247
248impl StressResult {
249    /// Effective throughput in operations per second.
250    pub fn ops_per_sec(&self) -> f64 {
251        if self.total_elapsed.is_zero() {
252            return 0.0;
253        }
254        self.iterations as f64 / self.total_elapsed.as_secs_f64()
255    }
256
257    /// Coefficient of variation across thread times. Higher numbers
258    /// indicate worse contention or load imbalance.
259    pub fn thread_time_cv(&self) -> f64 {
260        if self.thread_times.is_empty() {
261            return 0.0;
262        }
263        let n = self.thread_times.len() as f64;
264        let mean: f64 = self
265            .thread_times
266            .iter()
267            .map(|d| d.as_secs_f64())
268            .sum::<f64>()
269            / n;
270        if mean == 0.0 {
271            return 0.0;
272        }
273        let var = self
274            .thread_times
275            .iter()
276            .map(|d| (d.as_secs_f64() - mean).powi(2))
277            .sum::<f64>()
278            / n;
279        var.sqrt() / mean
280    }
281
282    /// Convert this result into a `CheckResult` using the legacy
283    /// behavior (90%-baseline ops/sec floor, no latency thresholds).
284    ///
285    /// `baseline_ops_per_sec` is the previously-recorded throughput.
286    /// `None` -> `Pass` with detail. Below 90% baseline -> `Fail+Warning`.
287    /// Otherwise `Pass`.
288    ///
289    /// # Example
290    ///
291    /// ```no_run
292    /// use dev_stress::{StressRun, Workload};
293    ///
294    /// #[derive(Clone)]
295    /// struct Noop;
296    /// impl Workload for Noop { fn run_once(&self) {} }
297    ///
298    /// let r = StressRun::new("noop").iterations(100).threads(1).execute(&Noop);
299    /// let _check = r.into_check_result(None);
300    /// ```
301    pub fn into_check_result(self, baseline_ops_per_sec: Option<f64>) -> CheckResult {
302        self.compare_with_options(&CompareOptions {
303            baseline_ops_per_sec,
304            ..CompareOptions::default()
305        })
306    }
307
308    /// Compare this result against a baseline using full options.
309    ///
310    /// Always returns a `CheckResult` tagged `stress`, with numeric
311    /// `Evidence` for `iterations`, `threads`, `ops_per_sec`,
312    /// `thread_time_cv`, `total_elapsed_ms`, plus latency percentiles
313    /// (when tracked) and any baseline values provided.
314    ///
315    /// # Example
316    ///
317    /// ```no_run
318    /// use dev_stress::{CompareOptions, StressRun, Workload};
319    /// use std::time::Duration;
320    ///
321    /// #[derive(Clone)]
322    /// struct Noop;
323    /// impl Workload for Noop { fn run_once(&self) {} }
324    ///
325    /// let r = StressRun::new("noop").iterations(100).threads(1).execute(&Noop);
326    /// let opts = CompareOptions {
327    ///     baseline_ops_per_sec: Some(1_000_000.0),
328    ///     ops_drop_pct_threshold: 10.0,
329    ///     baseline_p99: None,
330    ///     p99_regression_pct_threshold: 20.0,
331    /// };
332    /// let _ = r.compare_with_options(&opts);
333    /// ```
334    pub fn compare_with_options(&self, opts: &CompareOptions) -> CheckResult {
335        let name = format!("stress::{}", self.name);
336        let mut evidence = self.numeric_evidence();
337        let mut tags = vec!["stress".to_string()];
338
339        let mut regressions: Vec<String> = Vec::new();
340
341        // ops/sec drop check.
342        if let Some(baseline_ops) = opts.baseline_ops_per_sec {
343            evidence.push(Evidence::numeric("baseline_ops_per_sec", baseline_ops));
344            let floor = baseline_ops * (1.0 - opts.ops_drop_pct_threshold / 100.0);
345            if self.ops_per_sec() < floor {
346                regressions.push(format!(
347                    "ops_per_sec {:.0} < floor {:.0} ({}% drop allowed)",
348                    self.ops_per_sec(),
349                    floor,
350                    opts.ops_drop_pct_threshold
351                ));
352            }
353        }
354
355        // p99 regression check.
356        if let (Some(baseline_p99), Some(latency)) = (opts.baseline_p99, &self.latency) {
357            evidence.push(Evidence::numeric(
358                "baseline_p99_ns",
359                baseline_p99.as_nanos() as f64,
360            ));
361            let allowed =
362                baseline_p99.as_nanos() as f64 * (1.0 + opts.p99_regression_pct_threshold / 100.0);
363            if (latency.p99.as_nanos() as f64) > allowed {
364                regressions.push(format!(
365                    "p99_ns {} > allowed {:.0} ({}% regression allowed)",
366                    latency.p99.as_nanos(),
367                    allowed,
368                    opts.p99_regression_pct_threshold
369                ));
370            }
371        }
372
373        let detail = self.detail_string();
374
375        if regressions.is_empty() {
376            // No baseline OR within thresholds.
377            let mut c = CheckResult::pass(name).with_detail(detail);
378            c.tags = tags;
379            c.evidence = evidence;
380            return c;
381        }
382
383        tags.push("regression".to_string());
384        let combined_detail = format!("{} :: {}", detail, regressions.join("; "));
385        let mut c = CheckResult::fail(name, Severity::Warning).with_detail(combined_detail);
386        c.tags = tags;
387        c.evidence = evidence;
388        c
389    }
390
391    /// Build a one-check `Report` containing the comparison result.
392    ///
393    /// Sets `subject = self.name`, `producer = "dev-stress"`.
394    pub fn into_report(self, subject_version: impl Into<String>, opts: &CompareOptions) -> Report {
395        let name = self.name.clone();
396        let check = self.compare_with_options(opts);
397        let mut r = Report::new(name, subject_version).with_producer("dev-stress");
398        r.push(check);
399        r.finish();
400        r
401    }
402
403    fn numeric_evidence(&self) -> Vec<Evidence> {
404        let mut e = vec![
405            Evidence::numeric("iterations", self.iterations as f64),
406            Evidence::numeric("threads", self.threads as f64),
407            Evidence::numeric("ops_per_sec", self.ops_per_sec()),
408            Evidence::numeric("thread_time_cv", self.thread_time_cv()),
409            Evidence::numeric(
410                "total_elapsed_ms",
411                self.total_elapsed.as_secs_f64() * 1000.0,
412            ),
413        ];
414        if let Some(lat) = &self.latency {
415            e.push(Evidence::numeric(
416                "latency_p50_ns",
417                lat.p50.as_nanos() as f64,
418            ));
419            e.push(Evidence::numeric(
420                "latency_p95_ns",
421                lat.p95.as_nanos() as f64,
422            ));
423            e.push(Evidence::numeric(
424                "latency_p99_ns",
425                lat.p99.as_nanos() as f64,
426            ));
427            e.push(Evidence::numeric(
428                "latency_samples",
429                lat.samples_count as f64,
430            ));
431        }
432        e
433    }
434
435    fn detail_string(&self) -> String {
436        let lat = match &self.latency {
437            Some(l) => format!(
438                ", p50={}ns, p95={}ns, p99={}ns",
439                l.p50.as_nanos(),
440                l.p95.as_nanos(),
441                l.p99.as_nanos()
442            ),
443            None => String::new(),
444        };
445        format!(
446            "iterations={}, threads={}, total={:.3}s, ops/sec={:.0}, thread_cv={:.3}{}",
447            self.iterations,
448            self.threads,
449            self.total_elapsed.as_secs_f64(),
450            self.ops_per_sec(),
451            self.thread_time_cv(),
452            lat
453        )
454    }
455}
456
457/// Options controlling how a [`StressResult`] is compared against a baseline.
458///
459/// Defaults: no baseline; ops/sec drop threshold 10%; p99 regression threshold 20%.
460///
461/// # Example
462///
463/// ```
464/// use dev_stress::CompareOptions;
465///
466/// let opts = CompareOptions {
467///     baseline_ops_per_sec: Some(900_000.0),
468///     ops_drop_pct_threshold: 5.0,
469///     baseline_p99: None,
470///     p99_regression_pct_threshold: 25.0,
471/// };
472/// assert_eq!(opts.ops_drop_pct_threshold, 5.0);
473/// ```
474#[derive(Debug, Clone)]
475pub struct CompareOptions {
476    /// Baseline throughput (ops/sec). When `Some`, the run fails if
477    /// `ops_per_sec < baseline * (1 - ops_drop_pct_threshold/100)`.
478    pub baseline_ops_per_sec: Option<f64>,
479    /// Maximum allowed drop, as a percent.
480    pub ops_drop_pct_threshold: f64,
481    /// Baseline p99 latency. When `Some` AND latency was tracked, the
482    /// run fails if `p99 > baseline_p99 * (1 + p99_regression_pct_threshold/100)`.
483    pub baseline_p99: Option<Duration>,
484    /// Maximum allowed p99 regression, as a percent.
485    pub p99_regression_pct_threshold: f64,
486}
487
488impl Default for CompareOptions {
489    fn default() -> Self {
490        Self {
491            baseline_ops_per_sec: None,
492            ops_drop_pct_threshold: 10.0,
493            baseline_p99: None,
494            p99_regression_pct_threshold: 20.0,
495        }
496    }
497}
498
499/// Producer wrapper that runs a stress run and emits a single-check `Report`.
500///
501/// # Example
502///
503/// ```no_run
504/// use dev_stress::{CompareOptions, StressProducer, StressRun, Workload};
505/// use dev_report::Producer;
506///
507/// #[derive(Clone)]
508/// struct Noop;
509/// impl Workload for Noop { fn run_once(&self) {} }
510///
511/// let producer = StressProducer::new(
512///     || StressRun::new("hot").iterations(1_000).threads(2).execute(&Noop),
513///     "0.1.0",
514///     CompareOptions::default(),
515/// );
516/// let report = producer.produce();
517/// assert_eq!(report.checks.len(), 1);
518/// ```
519pub struct StressProducer<F>
520where
521    F: Fn() -> StressResult,
522{
523    run: F,
524    subject_version: String,
525    opts: CompareOptions,
526}
527
528impl<F> StressProducer<F>
529where
530    F: Fn() -> StressResult,
531{
532    /// Build a new producer.
533    pub fn new(run: F, subject_version: impl Into<String>, opts: CompareOptions) -> Self {
534        Self {
535            run,
536            subject_version: subject_version.into(),
537            opts,
538        }
539    }
540}
541
542impl<F> Producer for StressProducer<F>
543where
544    F: Fn() -> StressResult,
545{
546    fn produce(&self) -> Report {
547        let result = (self.run)();
548        result.into_report(self.subject_version.clone(), &self.opts)
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use dev_report::Verdict;
556
557    #[derive(Clone)]
558    struct Noop;
559    impl Workload for Noop {
560        fn run_once(&self) {
561            std::hint::black_box(1 + 1);
562        }
563    }
564
565    #[test]
566    fn run_completes() {
567        let run = StressRun::new("noop").iterations(1_000).threads(2);
568        let r = run.execute(&Noop);
569        assert_eq!(r.iterations, 1_000);
570        assert_eq!(r.threads, 2);
571        assert!(r.ops_per_sec() > 0.0);
572    }
573
574    #[test]
575    fn no_baseline_passes() {
576        let run = StressRun::new("noop").iterations(100).threads(1);
577        let r = run.execute(&Noop);
578        let c = r.into_check_result(None);
579        assert_eq!(c.verdict, Verdict::Pass);
580        assert!(c.has_tag("stress"));
581    }
582
583    #[test]
584    fn check_result_has_numeric_evidence() {
585        let run = StressRun::new("noop").iterations(100).threads(2);
586        let r = run.execute(&Noop);
587        let c = r.into_check_result(None);
588        let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
589        assert!(labels.contains(&"iterations"));
590        assert!(labels.contains(&"threads"));
591        assert!(labels.contains(&"ops_per_sec"));
592        assert!(labels.contains(&"thread_time_cv"));
593        assert!(labels.contains(&"total_elapsed_ms"));
594    }
595
596    #[test]
597    fn ops_drop_below_threshold_fails() {
598        let run = StressRun::new("x").iterations(50).threads(1);
599        let r = run.execute(&Noop);
600        let baseline = r.ops_per_sec() * 100.0; // way higher than current
601        let opts = CompareOptions {
602            baseline_ops_per_sec: Some(baseline),
603            ops_drop_pct_threshold: 10.0,
604            ..CompareOptions::default()
605        };
606        let c = r.compare_with_options(&opts);
607        assert_eq!(c.verdict, Verdict::Fail);
608        assert!(c.has_tag("regression"));
609    }
610
611    #[test]
612    fn ops_within_threshold_passes() {
613        let run = StressRun::new("x").iterations(50).threads(1);
614        let r = run.execute(&Noop);
615        let baseline = r.ops_per_sec(); // exactly current
616        let opts = CompareOptions {
617            baseline_ops_per_sec: Some(baseline),
618            ops_drop_pct_threshold: 10.0,
619            ..CompareOptions::default()
620        };
621        let c = r.compare_with_options(&opts);
622        assert_eq!(c.verdict, Verdict::Pass);
623    }
624
625    #[test]
626    fn latency_tracking_records_percentiles() {
627        let run = StressRun::new("hot")
628            .iterations(1_000)
629            .threads(2)
630            .track_latency(1);
631        let r = run.execute(&Noop);
632        let lat = r.latency.expect("latency tracked");
633        assert!(lat.samples_count > 0);
634        assert!(lat.p50.as_nanos() <= lat.p95.as_nanos());
635        assert!(lat.p95.as_nanos() <= lat.p99.as_nanos());
636    }
637
638    #[test]
639    fn p99_regression_detected() {
640        let run = StressRun::new("hot")
641            .iterations(200)
642            .threads(2)
643            .track_latency(1);
644        let r = run.execute(&Noop);
645        // Baseline p99 set so far below current that any non-zero p99 fails.
646        let opts = CompareOptions {
647            baseline_p99: Some(Duration::from_nanos(1)),
648            p99_regression_pct_threshold: 0.0,
649            ..CompareOptions::default()
650        };
651        let c = r.compare_with_options(&opts);
652        // If p99 was 0 (very fast noop) treat as pass; otherwise fail.
653        if r.latency.as_ref().unwrap().p99.as_nanos() > 1 {
654            assert_eq!(c.verdict, Verdict::Fail);
655            assert!(c.has_tag("regression"));
656        }
657    }
658
659    #[test]
660    fn into_report_emits_one_check() {
661        let run = StressRun::new("noop").iterations(100).threads(2);
662        let r = run.execute(&Noop);
663        let report = r.into_report("0.1.0", &CompareOptions::default());
664        assert_eq!(report.checks.len(), 1);
665        assert_eq!(report.producer.as_deref(), Some("dev-stress"));
666        assert_eq!(report.subject, "noop");
667    }
668
669    #[test]
670    fn stress_producer_implements_producer_trait() {
671        let producer = StressProducer::new(
672            || {
673                StressRun::new("hot")
674                    .iterations(50)
675                    .threads(1)
676                    .execute(&Noop)
677            },
678            "0.1.0",
679            CompareOptions::default(),
680        );
681        let report = producer.produce();
682        assert_eq!(report.checks.len(), 1);
683    }
684
685    #[test]
686    fn iterations_distribute_evenly_with_leftover() {
687        // 7 iters across 3 threads -> 3, 2, 2 (front-loaded).
688        let run = StressRun::new("x").iterations(7).threads(3);
689        let r = run.execute(&Noop);
690        assert_eq!(r.iterations, 7);
691        assert_eq!(r.thread_times.len(), 3);
692    }
693}