Skip to main content

dev_stress/
lib.rs

1//! # dev-stress
2//!
3//! High-load stress testing for Rust. Concurrency, volume, saturation
4//! under pressure. Part of the `dev-*` verification suite.
5//!
6//! `dev-stress` is the answer to "does this code survive real load?"
7//! Not "is it fast" (that's `dev-bench`). Not "does it deadlock"
8//! (that's `dev-async`). Not "does it recover from failure" (that's
9//! `dev-chaos`).
10//!
11//! ## Quick example
12//!
13//! ```no_run
14//! use dev_stress::{Workload, StressRun};
15//!
16//! #[derive(Clone)]
17//! struct MyWorkload;
18//! impl Workload for MyWorkload {
19//!     fn run_once(&self) {
20//!         std::hint::black_box(40 + 2);
21//!     }
22//! }
23//!
24//! let run = StressRun::new("hot_path")
25//!     .iterations(100_000)
26//!     .threads(8);
27//!
28//! let result = run.execute(&MyWorkload);
29//! let _check = result.into_check_result(None);
30//! ```
31//!
32//! ## What's measured
33//!
34//! - **`ops_per_sec`** — total iterations divided by total wall time.
35//! - **`thread_time_cv`** — coefficient of variation across per-thread
36//!   elapsed times. High CV indicates load imbalance or contention.
37//! - **`latency_p50/p95/p99`** — per-operation latency percentiles
38//!   (when [`LatencyTracker`] is enabled).
39//!
40//! ## Features
41//!
42//! - `system-stats` (opt-in): measure peak RSS and CPU time via
43//!   `sysinfo`. See the `system` module
44//!   (visible in rustdoc when the feature is enabled).
45
46#![cfg_attr(docsrs, feature(doc_cfg))]
47#![warn(missing_docs)]
48#![warn(rust_2018_idioms)]
49
50use std::sync::Arc;
51use std::time::{Duration, Instant};
52
53use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
54
55pub mod latency;
56pub mod soak;
57
58#[cfg(feature = "system-stats")]
59#[cfg_attr(docsrs, doc(cfg(feature = "system-stats")))]
60pub mod system;
61
62pub use latency::{LatencyStats, LatencyTracker};
63pub use soak::{SoakCheckpoint, SoakResult, SoakRun};
64
65/// A workload that can be executed many times under stress.
66///
67/// Implementations MUST be safe to call concurrently from multiple
68/// threads (`Send + Sync`) and MUST be cheap to clone, since each
69/// thread receives an `Arc<Self>`.
70///
71/// # Example
72///
73/// ```
74/// use dev_stress::Workload;
75///
76/// #[derive(Clone)]
77/// struct Noop;
78/// impl Workload for Noop {
79///     fn run_once(&self) {
80///         std::hint::black_box(1 + 1);
81///     }
82/// }
83/// ```
84pub trait Workload: Send + Sync {
85    /// Execute one unit of work. MUST be safe to call concurrently
86    /// from multiple threads.
87    fn run_once(&self);
88}
89
90/// Configuration for a stress run.
91///
92/// # Example
93///
94/// ```
95/// use dev_stress::StressRun;
96///
97/// let run = StressRun::new("hot_path").iterations(1_000).threads(4);
98/// assert_eq!(run.iterations_planned(), 1_000);
99/// assert_eq!(run.threads_planned(), 4);
100/// ```
101pub struct StressRun {
102    name: String,
103    iterations: usize,
104    threads: usize,
105    track_latency: Option<usize>, // None = off; Some(n) = sample 1/n iterations
106}
107
108impl StressRun {
109    /// Begin building a stress run with a stable name.
110    pub fn new(name: impl Into<String>) -> Self {
111        Self {
112            name: name.into(),
113            iterations: 1_000,
114            threads: 1,
115            track_latency: None,
116        }
117    }
118
119    /// Total iterations across all threads.
120    pub fn iterations(mut self, n: usize) -> Self {
121        self.iterations = n;
122        self
123    }
124
125    /// Number of OS threads to run concurrently. Minimum is `1`.
126    pub fn threads(mut self, n: usize) -> Self {
127        self.threads = n.max(1);
128        self
129    }
130
131    /// Track per-operation latency, sampling 1 of every `rate` iterations.
132    ///
133    /// `rate = 1` records every iteration; `rate = 100` records 1% of
134    /// iterations. Lower rates lower memory and overhead at the cost
135    /// of percentile precision.
136    ///
137    /// # Example
138    ///
139    /// ```
140    /// use dev_stress::StressRun;
141    ///
142    /// let run = StressRun::new("hot").iterations(10_000).threads(2)
143    ///     .track_latency(10); // 10% sample rate
144    /// assert_eq!(run.iterations_planned(), 10_000);
145    /// ```
146    pub fn track_latency(mut self, rate: usize) -> Self {
147        self.track_latency = Some(rate.max(1));
148        self
149    }
150
151    /// The configured iteration count.
152    pub fn iterations_planned(&self) -> usize {
153        self.iterations
154    }
155
156    /// The configured thread count.
157    pub fn threads_planned(&self) -> usize {
158        self.threads
159    }
160
161    /// Execute the run. Returns a result with timing statistics.
162    pub fn execute<W>(&self, workload: &W) -> StressResult
163    where
164        W: Workload + Clone + 'static,
165    {
166        let per_thread = self.iterations / self.threads;
167        let leftover = self.iterations % self.threads;
168        let started = Instant::now();
169        let mut handles = Vec::with_capacity(self.threads);
170        let workload = Arc::new(workload.clone());
171
172        for t in 0..self.threads {
173            let count = per_thread + if t < leftover { 1 } else { 0 };
174            let w = workload.clone();
175            let track = self.track_latency;
176            handles.push(std::thread::spawn(move || {
177                let start = Instant::now();
178                let mut tracker = track.map(LatencyTracker::new);
179                for i in 0..count {
180                    if let Some(t) = tracker.as_mut() {
181                        t.record(i, || w.run_once());
182                    } else {
183                        w.run_once();
184                    }
185                }
186                (start.elapsed(), tracker)
187            }));
188        }
189
190        let mut thread_times = Vec::with_capacity(self.threads);
191        let mut latency_samples: Vec<Duration> = Vec::new();
192        for h in handles {
193            let (elapsed, tracker) = h.join().unwrap();
194            thread_times.push(elapsed);
195            if let Some(t) = tracker {
196                latency_samples.extend(t.into_samples());
197            }
198        }
199        let total_elapsed = started.elapsed();
200
201        StressResult {
202            name: self.name.clone(),
203            iterations: self.iterations,
204            threads: self.threads,
205            total_elapsed,
206            thread_times,
207            latency: if latency_samples.is_empty() {
208                None
209            } else {
210                Some(LatencyStats::from_samples(latency_samples))
211            },
212        }
213    }
214}
215
216/// Result of a stress run.
217///
218/// # Example
219///
220/// ```no_run
221/// use dev_stress::{StressRun, Workload};
222///
223/// #[derive(Clone)]
224/// struct Noop;
225/// impl Workload for Noop {
226///     fn run_once(&self) { std::hint::black_box(1 + 1); }
227/// }
228///
229/// let r = StressRun::new("noop").iterations(100).threads(2).execute(&Noop);
230/// assert!(r.ops_per_sec() > 0.0);
231/// ```
232#[derive(Debug, Clone)]
233pub struct StressResult {
234    /// Stable name of the run.
235    pub name: String,
236    /// Iterations actually executed.
237    pub iterations: usize,
238    /// Threads used.
239    pub threads: usize,
240    /// Wall-clock time from run start to all threads finishing.
241    pub total_elapsed: Duration,
242    /// Per-thread elapsed times. Variance here indicates contention.
243    pub thread_times: Vec<Duration>,
244    /// Per-operation latency percentiles, when [`StressRun::track_latency`]
245    /// was enabled. `None` otherwise.
246    pub latency: Option<LatencyStats>,
247}
248
249impl StressResult {
250    /// Effective throughput in operations per second.
251    pub fn ops_per_sec(&self) -> f64 {
252        if self.total_elapsed.is_zero() {
253            return 0.0;
254        }
255        self.iterations as f64 / self.total_elapsed.as_secs_f64()
256    }
257
258    /// Coefficient of variation across thread times. Higher numbers
259    /// indicate worse contention or load imbalance.
260    pub fn thread_time_cv(&self) -> f64 {
261        if self.thread_times.is_empty() {
262            return 0.0;
263        }
264        let n = self.thread_times.len() as f64;
265        let mean: f64 = self
266            .thread_times
267            .iter()
268            .map(|d| d.as_secs_f64())
269            .sum::<f64>()
270            / n;
271        if mean == 0.0 {
272            return 0.0;
273        }
274        let var = self
275            .thread_times
276            .iter()
277            .map(|d| (d.as_secs_f64() - mean).powi(2))
278            .sum::<f64>()
279            / n;
280        var.sqrt() / mean
281    }
282
283    /// Convert this result into a `CheckResult` using the legacy
284    /// behavior (90%-baseline ops/sec floor, no latency thresholds).
285    ///
286    /// `baseline_ops_per_sec` is the previously-recorded throughput.
287    /// `None` -> `Pass` with detail. Below 90% baseline -> `Fail+Warning`.
288    /// Otherwise `Pass`.
289    ///
290    /// # Example
291    ///
292    /// ```no_run
293    /// use dev_stress::{StressRun, Workload};
294    ///
295    /// #[derive(Clone)]
296    /// struct Noop;
297    /// impl Workload for Noop { fn run_once(&self) {} }
298    ///
299    /// let r = StressRun::new("noop").iterations(100).threads(1).execute(&Noop);
300    /// let _check = r.into_check_result(None);
301    /// ```
302    pub fn into_check_result(self, baseline_ops_per_sec: Option<f64>) -> CheckResult {
303        self.compare_with_options(&CompareOptions {
304            baseline_ops_per_sec,
305            ..CompareOptions::default()
306        })
307    }
308
309    /// Compare this result against a baseline using full options.
310    ///
311    /// Always returns a `CheckResult` tagged `stress`, with numeric
312    /// `Evidence` for `iterations`, `threads`, `ops_per_sec`,
313    /// `thread_time_cv`, `total_elapsed_ms`, plus latency percentiles
314    /// (when tracked) and any baseline values provided.
315    ///
316    /// # Example
317    ///
318    /// ```no_run
319    /// use dev_stress::{CompareOptions, StressRun, Workload};
320    /// use std::time::Duration;
321    ///
322    /// #[derive(Clone)]
323    /// struct Noop;
324    /// impl Workload for Noop { fn run_once(&self) {} }
325    ///
326    /// let r = StressRun::new("noop").iterations(100).threads(1).execute(&Noop);
327    /// let opts = CompareOptions {
328    ///     baseline_ops_per_sec: Some(1_000_000.0),
329    ///     ops_drop_pct_threshold: 10.0,
330    ///     baseline_p99: None,
331    ///     p99_regression_pct_threshold: 20.0,
332    /// };
333    /// let _ = r.compare_with_options(&opts);
334    /// ```
335    pub fn compare_with_options(&self, opts: &CompareOptions) -> CheckResult {
336        let name = format!("stress::{}", self.name);
337        let mut evidence = self.numeric_evidence();
338        let mut tags = vec!["stress".to_string()];
339
340        let mut regressions: Vec<String> = Vec::new();
341
342        // ops/sec drop check.
343        if let Some(baseline_ops) = opts.baseline_ops_per_sec {
344            evidence.push(Evidence::numeric("baseline_ops_per_sec", baseline_ops));
345            let floor = baseline_ops * (1.0 - opts.ops_drop_pct_threshold / 100.0);
346            if self.ops_per_sec() < floor {
347                regressions.push(format!(
348                    "ops_per_sec {:.0} < floor {:.0} ({}% drop allowed)",
349                    self.ops_per_sec(),
350                    floor,
351                    opts.ops_drop_pct_threshold
352                ));
353            }
354        }
355
356        // p99 regression check.
357        if let (Some(baseline_p99), Some(latency)) = (opts.baseline_p99, &self.latency) {
358            evidence.push(Evidence::numeric(
359                "baseline_p99_ns",
360                baseline_p99.as_nanos() as f64,
361            ));
362            let allowed =
363                baseline_p99.as_nanos() as f64 * (1.0 + opts.p99_regression_pct_threshold / 100.0);
364            if (latency.p99.as_nanos() as f64) > allowed {
365                regressions.push(format!(
366                    "p99_ns {} > allowed {:.0} ({}% regression allowed)",
367                    latency.p99.as_nanos(),
368                    allowed,
369                    opts.p99_regression_pct_threshold
370                ));
371            }
372        }
373
374        let detail = self.detail_string();
375
376        if regressions.is_empty() {
377            // No baseline OR within thresholds.
378            let mut c = CheckResult::pass(name).with_detail(detail);
379            c.tags = tags;
380            c.evidence = evidence;
381            return c;
382        }
383
384        tags.push("regression".to_string());
385        let combined_detail = format!("{} :: {}", detail, regressions.join("; "));
386        let mut c = CheckResult::fail(name, Severity::Warning).with_detail(combined_detail);
387        c.tags = tags;
388        c.evidence = evidence;
389        c
390    }
391
392    /// Build a one-check `Report` containing the comparison result.
393    ///
394    /// Sets `subject = self.name`, `producer = "dev-stress"`.
395    pub fn into_report(self, subject_version: impl Into<String>, opts: &CompareOptions) -> Report {
396        let name = self.name.clone();
397        let check = self.compare_with_options(opts);
398        let mut r = Report::new(name, subject_version).with_producer("dev-stress");
399        r.push(check);
400        r.finish();
401        r
402    }
403
404    fn numeric_evidence(&self) -> Vec<Evidence> {
405        let mut e = vec![
406            Evidence::numeric("iterations", self.iterations as f64),
407            Evidence::numeric("threads", self.threads as f64),
408            Evidence::numeric("ops_per_sec", self.ops_per_sec()),
409            Evidence::numeric("thread_time_cv", self.thread_time_cv()),
410            Evidence::numeric(
411                "total_elapsed_ms",
412                self.total_elapsed.as_secs_f64() * 1000.0,
413            ),
414        ];
415        if let Some(lat) = &self.latency {
416            e.push(Evidence::numeric(
417                "latency_p50_ns",
418                lat.p50.as_nanos() as f64,
419            ));
420            e.push(Evidence::numeric(
421                "latency_p95_ns",
422                lat.p95.as_nanos() as f64,
423            ));
424            e.push(Evidence::numeric(
425                "latency_p99_ns",
426                lat.p99.as_nanos() as f64,
427            ));
428            e.push(Evidence::numeric(
429                "latency_samples",
430                lat.samples_count as f64,
431            ));
432        }
433        e
434    }
435
436    fn detail_string(&self) -> String {
437        let lat = match &self.latency {
438            Some(l) => format!(
439                ", p50={}ns, p95={}ns, p99={}ns",
440                l.p50.as_nanos(),
441                l.p95.as_nanos(),
442                l.p99.as_nanos()
443            ),
444            None => String::new(),
445        };
446        format!(
447            "iterations={}, threads={}, total={:.3}s, ops/sec={:.0}, thread_cv={:.3}{}",
448            self.iterations,
449            self.threads,
450            self.total_elapsed.as_secs_f64(),
451            self.ops_per_sec(),
452            self.thread_time_cv(),
453            lat
454        )
455    }
456}
457
458/// Options controlling how a [`StressResult`] is compared against a baseline.
459///
460/// Defaults: no baseline; ops/sec drop threshold 10%; p99 regression threshold 20%.
461///
462/// # Example
463///
464/// ```
465/// use dev_stress::CompareOptions;
466///
467/// let opts = CompareOptions {
468///     baseline_ops_per_sec: Some(900_000.0),
469///     ops_drop_pct_threshold: 5.0,
470///     baseline_p99: None,
471///     p99_regression_pct_threshold: 25.0,
472/// };
473/// assert_eq!(opts.ops_drop_pct_threshold, 5.0);
474/// ```
475#[derive(Debug, Clone)]
476pub struct CompareOptions {
477    /// Baseline throughput (ops/sec). When `Some`, the run fails if
478    /// `ops_per_sec < baseline * (1 - ops_drop_pct_threshold/100)`.
479    pub baseline_ops_per_sec: Option<f64>,
480    /// Maximum allowed drop, as a percent.
481    pub ops_drop_pct_threshold: f64,
482    /// Baseline p99 latency. When `Some` AND latency was tracked, the
483    /// run fails if `p99 > baseline_p99 * (1 + p99_regression_pct_threshold/100)`.
484    pub baseline_p99: Option<Duration>,
485    /// Maximum allowed p99 regression, as a percent.
486    pub p99_regression_pct_threshold: f64,
487}
488
489impl Default for CompareOptions {
490    fn default() -> Self {
491        Self {
492            baseline_ops_per_sec: None,
493            ops_drop_pct_threshold: 10.0,
494            baseline_p99: None,
495            p99_regression_pct_threshold: 20.0,
496        }
497    }
498}
499
500/// Producer wrapper that runs a stress run and emits a single-check `Report`.
501///
502/// # Example
503///
504/// ```no_run
505/// use dev_stress::{CompareOptions, StressProducer, StressRun, Workload};
506/// use dev_report::Producer;
507///
508/// #[derive(Clone)]
509/// struct Noop;
510/// impl Workload for Noop { fn run_once(&self) {} }
511///
512/// let producer = StressProducer::new(
513///     || StressRun::new("hot").iterations(1_000).threads(2).execute(&Noop),
514///     "0.1.0",
515///     CompareOptions::default(),
516/// );
517/// let report = producer.produce();
518/// assert_eq!(report.checks.len(), 1);
519/// ```
520pub struct StressProducer<F>
521where
522    F: Fn() -> StressResult,
523{
524    run: F,
525    subject_version: String,
526    opts: CompareOptions,
527}
528
529impl<F> StressProducer<F>
530where
531    F: Fn() -> StressResult,
532{
533    /// Build a new producer.
534    pub fn new(run: F, subject_version: impl Into<String>, opts: CompareOptions) -> Self {
535        Self {
536            run,
537            subject_version: subject_version.into(),
538            opts,
539        }
540    }
541}
542
543impl<F> Producer for StressProducer<F>
544where
545    F: Fn() -> StressResult,
546{
547    fn produce(&self) -> Report {
548        let result = (self.run)();
549        result.into_report(self.subject_version.clone(), &self.opts)
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use dev_report::Verdict;
557
558    #[derive(Clone)]
559    struct Noop;
560    impl Workload for Noop {
561        fn run_once(&self) {
562            std::hint::black_box(1 + 1);
563        }
564    }
565
566    #[test]
567    fn run_completes() {
568        let run = StressRun::new("noop").iterations(1_000).threads(2);
569        let r = run.execute(&Noop);
570        assert_eq!(r.iterations, 1_000);
571        assert_eq!(r.threads, 2);
572        assert!(r.ops_per_sec() > 0.0);
573    }
574
575    #[test]
576    fn no_baseline_passes() {
577        let run = StressRun::new("noop").iterations(100).threads(1);
578        let r = run.execute(&Noop);
579        let c = r.into_check_result(None);
580        assert_eq!(c.verdict, Verdict::Pass);
581        assert!(c.has_tag("stress"));
582    }
583
584    #[test]
585    fn check_result_has_numeric_evidence() {
586        let run = StressRun::new("noop").iterations(100).threads(2);
587        let r = run.execute(&Noop);
588        let c = r.into_check_result(None);
589        let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
590        assert!(labels.contains(&"iterations"));
591        assert!(labels.contains(&"threads"));
592        assert!(labels.contains(&"ops_per_sec"));
593        assert!(labels.contains(&"thread_time_cv"));
594        assert!(labels.contains(&"total_elapsed_ms"));
595    }
596
597    #[test]
598    fn ops_drop_below_threshold_fails() {
599        let run = StressRun::new("x").iterations(50).threads(1);
600        let r = run.execute(&Noop);
601        let baseline = r.ops_per_sec() * 100.0; // way higher than current
602        let opts = CompareOptions {
603            baseline_ops_per_sec: Some(baseline),
604            ops_drop_pct_threshold: 10.0,
605            ..CompareOptions::default()
606        };
607        let c = r.compare_with_options(&opts);
608        assert_eq!(c.verdict, Verdict::Fail);
609        assert!(c.has_tag("regression"));
610    }
611
612    #[test]
613    fn ops_within_threshold_passes() {
614        let run = StressRun::new("x").iterations(50).threads(1);
615        let r = run.execute(&Noop);
616        let baseline = r.ops_per_sec(); // exactly current
617        let opts = CompareOptions {
618            baseline_ops_per_sec: Some(baseline),
619            ops_drop_pct_threshold: 10.0,
620            ..CompareOptions::default()
621        };
622        let c = r.compare_with_options(&opts);
623        assert_eq!(c.verdict, Verdict::Pass);
624    }
625
626    #[test]
627    fn latency_tracking_records_percentiles() {
628        let run = StressRun::new("hot")
629            .iterations(1_000)
630            .threads(2)
631            .track_latency(1);
632        let r = run.execute(&Noop);
633        let lat = r.latency.expect("latency tracked");
634        assert!(lat.samples_count > 0);
635        assert!(lat.p50.as_nanos() <= lat.p95.as_nanos());
636        assert!(lat.p95.as_nanos() <= lat.p99.as_nanos());
637    }
638
639    #[test]
640    fn p99_regression_detected() {
641        let run = StressRun::new("hot")
642            .iterations(200)
643            .threads(2)
644            .track_latency(1);
645        let r = run.execute(&Noop);
646        // Baseline p99 set so far below current that any non-zero p99 fails.
647        let opts = CompareOptions {
648            baseline_p99: Some(Duration::from_nanos(1)),
649            p99_regression_pct_threshold: 0.0,
650            ..CompareOptions::default()
651        };
652        let c = r.compare_with_options(&opts);
653        // If p99 was 0 (very fast noop) treat as pass; otherwise fail.
654        if r.latency.as_ref().unwrap().p99.as_nanos() > 1 {
655            assert_eq!(c.verdict, Verdict::Fail);
656            assert!(c.has_tag("regression"));
657        }
658    }
659
660    #[test]
661    fn into_report_emits_one_check() {
662        let run = StressRun::new("noop").iterations(100).threads(2);
663        let r = run.execute(&Noop);
664        let report = r.into_report("0.1.0", &CompareOptions::default());
665        assert_eq!(report.checks.len(), 1);
666        assert_eq!(report.producer.as_deref(), Some("dev-stress"));
667        assert_eq!(report.subject, "noop");
668    }
669
670    #[test]
671    fn stress_producer_implements_producer_trait() {
672        let producer = StressProducer::new(
673            || {
674                StressRun::new("hot")
675                    .iterations(50)
676                    .threads(1)
677                    .execute(&Noop)
678            },
679            "0.1.0",
680            CompareOptions::default(),
681        );
682        let report = producer.produce();
683        assert_eq!(report.checks.len(), 1);
684    }
685
686    #[test]
687    fn iterations_distribute_evenly_with_leftover() {
688        // 7 iters across 3 threads -> 3, 2, 2 (front-loaded).
689        let run = StressRun::new("x").iterations(7).threads(3);
690        let r = run.execute(&Noop);
691        assert_eq!(r.iterations, 7);
692        assert_eq!(r.thread_times.len(), 3);
693    }
694}