1#![cfg_attr(docsrs, feature(doc_cfg))]
46#![warn(missing_docs)]
47#![warn(rust_2018_idioms)]
48
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51
52use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
53
54pub mod latency;
55pub mod soak;
56
57#[cfg(feature = "system-stats")]
58#[cfg_attr(docsrs, doc(cfg(feature = "system-stats")))]
59pub mod system;
60
61pub use latency::{LatencyStats, LatencyTracker};
62pub use soak::{SoakCheckpoint, SoakResult, SoakRun};
63
64pub trait Workload: Send + Sync {
84 fn run_once(&self);
87}
88
89pub struct StressRun {
101 name: String,
102 iterations: usize,
103 threads: usize,
104 track_latency: Option<usize>, }
106
107impl StressRun {
108 pub fn new(name: impl Into<String>) -> Self {
110 Self {
111 name: name.into(),
112 iterations: 1_000,
113 threads: 1,
114 track_latency: None,
115 }
116 }
117
118 pub fn iterations(mut self, n: usize) -> Self {
120 self.iterations = n;
121 self
122 }
123
124 pub fn threads(mut self, n: usize) -> Self {
126 self.threads = n.max(1);
127 self
128 }
129
130 pub fn track_latency(mut self, rate: usize) -> Self {
146 self.track_latency = Some(rate.max(1));
147 self
148 }
149
150 pub fn iterations_planned(&self) -> usize {
152 self.iterations
153 }
154
155 pub fn threads_planned(&self) -> usize {
157 self.threads
158 }
159
160 pub fn execute<W>(&self, workload: &W) -> StressResult
162 where
163 W: Workload + Clone + 'static,
164 {
165 let per_thread = self.iterations / self.threads;
166 let leftover = self.iterations % self.threads;
167 let started = Instant::now();
168 let mut handles = Vec::with_capacity(self.threads);
169 let workload = Arc::new(workload.clone());
170
171 for t in 0..self.threads {
172 let count = per_thread + if t < leftover { 1 } else { 0 };
173 let w = workload.clone();
174 let track = self.track_latency;
175 handles.push(std::thread::spawn(move || {
176 let start = Instant::now();
177 let mut tracker = track.map(LatencyTracker::new);
178 for i in 0..count {
179 if let Some(t) = tracker.as_mut() {
180 t.record(i, || w.run_once());
181 } else {
182 w.run_once();
183 }
184 }
185 (start.elapsed(), tracker)
186 }));
187 }
188
189 let mut thread_times = Vec::with_capacity(self.threads);
190 let mut latency_samples: Vec<Duration> = Vec::new();
191 for h in handles {
192 let (elapsed, tracker) = h.join().unwrap();
193 thread_times.push(elapsed);
194 if let Some(t) = tracker {
195 latency_samples.extend(t.into_samples());
196 }
197 }
198 let total_elapsed = started.elapsed();
199
200 StressResult {
201 name: self.name.clone(),
202 iterations: self.iterations,
203 threads: self.threads,
204 total_elapsed,
205 thread_times,
206 latency: if latency_samples.is_empty() {
207 None
208 } else {
209 Some(LatencyStats::from_samples(latency_samples))
210 },
211 }
212 }
213}
214
215#[derive(Debug, Clone)]
232pub struct StressResult {
233 pub name: String,
235 pub iterations: usize,
237 pub threads: usize,
239 pub total_elapsed: Duration,
241 pub thread_times: Vec<Duration>,
243 pub latency: Option<LatencyStats>,
246}
247
248impl StressResult {
249 pub fn ops_per_sec(&self) -> f64 {
251 if self.total_elapsed.is_zero() {
252 return 0.0;
253 }
254 self.iterations as f64 / self.total_elapsed.as_secs_f64()
255 }
256
257 pub fn thread_time_cv(&self) -> f64 {
260 if self.thread_times.is_empty() {
261 return 0.0;
262 }
263 let n = self.thread_times.len() as f64;
264 let mean: f64 = self
265 .thread_times
266 .iter()
267 .map(|d| d.as_secs_f64())
268 .sum::<f64>()
269 / n;
270 if mean == 0.0 {
271 return 0.0;
272 }
273 let var = self
274 .thread_times
275 .iter()
276 .map(|d| (d.as_secs_f64() - mean).powi(2))
277 .sum::<f64>()
278 / n;
279 var.sqrt() / mean
280 }
281
282 pub fn into_check_result(self, baseline_ops_per_sec: Option<f64>) -> CheckResult {
302 self.compare_with_options(&CompareOptions {
303 baseline_ops_per_sec,
304 ..CompareOptions::default()
305 })
306 }
307
308 pub fn compare_with_options(&self, opts: &CompareOptions) -> CheckResult {
335 let name = format!("stress::{}", self.name);
336 let mut evidence = self.numeric_evidence();
337 let mut tags = vec!["stress".to_string()];
338
339 let mut regressions: Vec<String> = Vec::new();
340
341 if let Some(baseline_ops) = opts.baseline_ops_per_sec {
343 evidence.push(Evidence::numeric("baseline_ops_per_sec", baseline_ops));
344 let floor = baseline_ops * (1.0 - opts.ops_drop_pct_threshold / 100.0);
345 if self.ops_per_sec() < floor {
346 regressions.push(format!(
347 "ops_per_sec {:.0} < floor {:.0} ({}% drop allowed)",
348 self.ops_per_sec(),
349 floor,
350 opts.ops_drop_pct_threshold
351 ));
352 }
353 }
354
355 if let (Some(baseline_p99), Some(latency)) = (opts.baseline_p99, &self.latency) {
357 evidence.push(Evidence::numeric(
358 "baseline_p99_ns",
359 baseline_p99.as_nanos() as f64,
360 ));
361 let allowed =
362 baseline_p99.as_nanos() as f64 * (1.0 + opts.p99_regression_pct_threshold / 100.0);
363 if (latency.p99.as_nanos() as f64) > allowed {
364 regressions.push(format!(
365 "p99_ns {} > allowed {:.0} ({}% regression allowed)",
366 latency.p99.as_nanos(),
367 allowed,
368 opts.p99_regression_pct_threshold
369 ));
370 }
371 }
372
373 let detail = self.detail_string();
374
375 if regressions.is_empty() {
376 let mut c = CheckResult::pass(name).with_detail(detail);
378 c.tags = tags;
379 c.evidence = evidence;
380 return c;
381 }
382
383 tags.push("regression".to_string());
384 let combined_detail = format!("{} :: {}", detail, regressions.join("; "));
385 let mut c = CheckResult::fail(name, Severity::Warning).with_detail(combined_detail);
386 c.tags = tags;
387 c.evidence = evidence;
388 c
389 }
390
391 pub fn into_report(self, subject_version: impl Into<String>, opts: &CompareOptions) -> Report {
395 let name = self.name.clone();
396 let check = self.compare_with_options(opts);
397 let mut r = Report::new(name, subject_version).with_producer("dev-stress");
398 r.push(check);
399 r.finish();
400 r
401 }
402
403 fn numeric_evidence(&self) -> Vec<Evidence> {
404 let mut e = vec![
405 Evidence::numeric("iterations", self.iterations as f64),
406 Evidence::numeric("threads", self.threads as f64),
407 Evidence::numeric("ops_per_sec", self.ops_per_sec()),
408 Evidence::numeric("thread_time_cv", self.thread_time_cv()),
409 Evidence::numeric(
410 "total_elapsed_ms",
411 self.total_elapsed.as_secs_f64() * 1000.0,
412 ),
413 ];
414 if let Some(lat) = &self.latency {
415 e.push(Evidence::numeric(
416 "latency_p50_ns",
417 lat.p50.as_nanos() as f64,
418 ));
419 e.push(Evidence::numeric(
420 "latency_p95_ns",
421 lat.p95.as_nanos() as f64,
422 ));
423 e.push(Evidence::numeric(
424 "latency_p99_ns",
425 lat.p99.as_nanos() as f64,
426 ));
427 e.push(Evidence::numeric(
428 "latency_samples",
429 lat.samples_count as f64,
430 ));
431 }
432 e
433 }
434
435 fn detail_string(&self) -> String {
436 let lat = match &self.latency {
437 Some(l) => format!(
438 ", p50={}ns, p95={}ns, p99={}ns",
439 l.p50.as_nanos(),
440 l.p95.as_nanos(),
441 l.p99.as_nanos()
442 ),
443 None => String::new(),
444 };
445 format!(
446 "iterations={}, threads={}, total={:.3}s, ops/sec={:.0}, thread_cv={:.3}{}",
447 self.iterations,
448 self.threads,
449 self.total_elapsed.as_secs_f64(),
450 self.ops_per_sec(),
451 self.thread_time_cv(),
452 lat
453 )
454 }
455}
456
457#[derive(Debug, Clone)]
475pub struct CompareOptions {
476 pub baseline_ops_per_sec: Option<f64>,
479 pub ops_drop_pct_threshold: f64,
481 pub baseline_p99: Option<Duration>,
484 pub p99_regression_pct_threshold: f64,
486}
487
488impl Default for CompareOptions {
489 fn default() -> Self {
490 Self {
491 baseline_ops_per_sec: None,
492 ops_drop_pct_threshold: 10.0,
493 baseline_p99: None,
494 p99_regression_pct_threshold: 20.0,
495 }
496 }
497}
498
499pub struct StressProducer<F>
520where
521 F: Fn() -> StressResult,
522{
523 run: F,
524 subject_version: String,
525 opts: CompareOptions,
526}
527
528impl<F> StressProducer<F>
529where
530 F: Fn() -> StressResult,
531{
532 pub fn new(run: F, subject_version: impl Into<String>, opts: CompareOptions) -> Self {
534 Self {
535 run,
536 subject_version: subject_version.into(),
537 opts,
538 }
539 }
540}
541
542impl<F> Producer for StressProducer<F>
543where
544 F: Fn() -> StressResult,
545{
546 fn produce(&self) -> Report {
547 let result = (self.run)();
548 result.into_report(self.subject_version.clone(), &self.opts)
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use dev_report::Verdict;
556
557 #[derive(Clone)]
558 struct Noop;
559 impl Workload for Noop {
560 fn run_once(&self) {
561 std::hint::black_box(1 + 1);
562 }
563 }
564
565 #[test]
566 fn run_completes() {
567 let run = StressRun::new("noop").iterations(1_000).threads(2);
568 let r = run.execute(&Noop);
569 assert_eq!(r.iterations, 1_000);
570 assert_eq!(r.threads, 2);
571 assert!(r.ops_per_sec() > 0.0);
572 }
573
574 #[test]
575 fn no_baseline_passes() {
576 let run = StressRun::new("noop").iterations(100).threads(1);
577 let r = run.execute(&Noop);
578 let c = r.into_check_result(None);
579 assert_eq!(c.verdict, Verdict::Pass);
580 assert!(c.has_tag("stress"));
581 }
582
583 #[test]
584 fn check_result_has_numeric_evidence() {
585 let run = StressRun::new("noop").iterations(100).threads(2);
586 let r = run.execute(&Noop);
587 let c = r.into_check_result(None);
588 let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
589 assert!(labels.contains(&"iterations"));
590 assert!(labels.contains(&"threads"));
591 assert!(labels.contains(&"ops_per_sec"));
592 assert!(labels.contains(&"thread_time_cv"));
593 assert!(labels.contains(&"total_elapsed_ms"));
594 }
595
596 #[test]
597 fn ops_drop_below_threshold_fails() {
598 let run = StressRun::new("x").iterations(50).threads(1);
599 let r = run.execute(&Noop);
600 let baseline = r.ops_per_sec() * 100.0; let opts = CompareOptions {
602 baseline_ops_per_sec: Some(baseline),
603 ops_drop_pct_threshold: 10.0,
604 ..CompareOptions::default()
605 };
606 let c = r.compare_with_options(&opts);
607 assert_eq!(c.verdict, Verdict::Fail);
608 assert!(c.has_tag("regression"));
609 }
610
611 #[test]
612 fn ops_within_threshold_passes() {
613 let run = StressRun::new("x").iterations(50).threads(1);
614 let r = run.execute(&Noop);
615 let baseline = r.ops_per_sec(); let opts = CompareOptions {
617 baseline_ops_per_sec: Some(baseline),
618 ops_drop_pct_threshold: 10.0,
619 ..CompareOptions::default()
620 };
621 let c = r.compare_with_options(&opts);
622 assert_eq!(c.verdict, Verdict::Pass);
623 }
624
625 #[test]
626 fn latency_tracking_records_percentiles() {
627 let run = StressRun::new("hot")
628 .iterations(1_000)
629 .threads(2)
630 .track_latency(1);
631 let r = run.execute(&Noop);
632 let lat = r.latency.expect("latency tracked");
633 assert!(lat.samples_count > 0);
634 assert!(lat.p50.as_nanos() <= lat.p95.as_nanos());
635 assert!(lat.p95.as_nanos() <= lat.p99.as_nanos());
636 }
637
638 #[test]
639 fn p99_regression_detected() {
640 let run = StressRun::new("hot")
641 .iterations(200)
642 .threads(2)
643 .track_latency(1);
644 let r = run.execute(&Noop);
645 let opts = CompareOptions {
647 baseline_p99: Some(Duration::from_nanos(1)),
648 p99_regression_pct_threshold: 0.0,
649 ..CompareOptions::default()
650 };
651 let c = r.compare_with_options(&opts);
652 if r.latency.as_ref().unwrap().p99.as_nanos() > 1 {
654 assert_eq!(c.verdict, Verdict::Fail);
655 assert!(c.has_tag("regression"));
656 }
657 }
658
659 #[test]
660 fn into_report_emits_one_check() {
661 let run = StressRun::new("noop").iterations(100).threads(2);
662 let r = run.execute(&Noop);
663 let report = r.into_report("0.1.0", &CompareOptions::default());
664 assert_eq!(report.checks.len(), 1);
665 assert_eq!(report.producer.as_deref(), Some("dev-stress"));
666 assert_eq!(report.subject, "noop");
667 }
668
669 #[test]
670 fn stress_producer_implements_producer_trait() {
671 let producer = StressProducer::new(
672 || {
673 StressRun::new("hot")
674 .iterations(50)
675 .threads(1)
676 .execute(&Noop)
677 },
678 "0.1.0",
679 CompareOptions::default(),
680 );
681 let report = producer.produce();
682 assert_eq!(report.checks.len(), 1);
683 }
684
685 #[test]
686 fn iterations_distribute_evenly_with_leftover() {
687 let run = StressRun::new("x").iterations(7).threads(3);
689 let r = run.execute(&Noop);
690 assert_eq!(r.iterations, 7);
691 assert_eq!(r.thread_times.len(), 3);
692 }
693}