1#![cfg_attr(docsrs, feature(doc_cfg))]
47#![warn(missing_docs)]
48#![warn(rust_2018_idioms)]
49
50use std::sync::Arc;
51use std::time::{Duration, Instant};
52
53use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
54
55pub mod latency;
56pub mod soak;
57
58#[cfg(feature = "system-stats")]
59#[cfg_attr(docsrs, doc(cfg(feature = "system-stats")))]
60pub mod system;
61
62pub use latency::{LatencyStats, LatencyTracker};
63pub use soak::{SoakCheckpoint, SoakResult, SoakRun};
64
65pub trait Workload: Send + Sync {
85 fn run_once(&self);
88}
89
90pub struct StressRun {
102 name: String,
103 iterations: usize,
104 threads: usize,
105 track_latency: Option<usize>, }
107
108impl StressRun {
109 pub fn new(name: impl Into<String>) -> Self {
111 Self {
112 name: name.into(),
113 iterations: 1_000,
114 threads: 1,
115 track_latency: None,
116 }
117 }
118
119 pub fn iterations(mut self, n: usize) -> Self {
121 self.iterations = n;
122 self
123 }
124
125 pub fn threads(mut self, n: usize) -> Self {
127 self.threads = n.max(1);
128 self
129 }
130
131 pub fn track_latency(mut self, rate: usize) -> Self {
147 self.track_latency = Some(rate.max(1));
148 self
149 }
150
151 pub fn iterations_planned(&self) -> usize {
153 self.iterations
154 }
155
156 pub fn threads_planned(&self) -> usize {
158 self.threads
159 }
160
161 pub fn execute<W>(&self, workload: &W) -> StressResult
163 where
164 W: Workload + Clone + 'static,
165 {
166 let per_thread = self.iterations / self.threads;
167 let leftover = self.iterations % self.threads;
168 let started = Instant::now();
169 let mut handles = Vec::with_capacity(self.threads);
170 let workload = Arc::new(workload.clone());
171
172 for t in 0..self.threads {
173 let count = per_thread + if t < leftover { 1 } else { 0 };
174 let w = workload.clone();
175 let track = self.track_latency;
176 handles.push(std::thread::spawn(move || {
177 let start = Instant::now();
178 let mut tracker = track.map(LatencyTracker::new);
179 for i in 0..count {
180 if let Some(t) = tracker.as_mut() {
181 t.record(i, || w.run_once());
182 } else {
183 w.run_once();
184 }
185 }
186 (start.elapsed(), tracker)
187 }));
188 }
189
190 let mut thread_times = Vec::with_capacity(self.threads);
191 let mut latency_samples: Vec<Duration> = Vec::new();
192 for h in handles {
193 let (elapsed, tracker) = h.join().unwrap();
194 thread_times.push(elapsed);
195 if let Some(t) = tracker {
196 latency_samples.extend(t.into_samples());
197 }
198 }
199 let total_elapsed = started.elapsed();
200
201 StressResult {
202 name: self.name.clone(),
203 iterations: self.iterations,
204 threads: self.threads,
205 total_elapsed,
206 thread_times,
207 latency: if latency_samples.is_empty() {
208 None
209 } else {
210 Some(LatencyStats::from_samples(latency_samples))
211 },
212 }
213 }
214}
215
216#[derive(Debug, Clone)]
233pub struct StressResult {
234 pub name: String,
236 pub iterations: usize,
238 pub threads: usize,
240 pub total_elapsed: Duration,
242 pub thread_times: Vec<Duration>,
244 pub latency: Option<LatencyStats>,
247}
248
249impl StressResult {
250 pub fn ops_per_sec(&self) -> f64 {
252 if self.total_elapsed.is_zero() {
253 return 0.0;
254 }
255 self.iterations as f64 / self.total_elapsed.as_secs_f64()
256 }
257
258 pub fn thread_time_cv(&self) -> f64 {
261 if self.thread_times.is_empty() {
262 return 0.0;
263 }
264 let n = self.thread_times.len() as f64;
265 let mean: f64 = self
266 .thread_times
267 .iter()
268 .map(|d| d.as_secs_f64())
269 .sum::<f64>()
270 / n;
271 if mean == 0.0 {
272 return 0.0;
273 }
274 let var = self
275 .thread_times
276 .iter()
277 .map(|d| (d.as_secs_f64() - mean).powi(2))
278 .sum::<f64>()
279 / n;
280 var.sqrt() / mean
281 }
282
283 pub fn into_check_result(self, baseline_ops_per_sec: Option<f64>) -> CheckResult {
303 self.compare_with_options(&CompareOptions {
304 baseline_ops_per_sec,
305 ..CompareOptions::default()
306 })
307 }
308
309 pub fn compare_with_options(&self, opts: &CompareOptions) -> CheckResult {
336 let name = format!("stress::{}", self.name);
337 let mut evidence = self.numeric_evidence();
338 let mut tags = vec!["stress".to_string()];
339
340 let mut regressions: Vec<String> = Vec::new();
341
342 if let Some(baseline_ops) = opts.baseline_ops_per_sec {
344 evidence.push(Evidence::numeric("baseline_ops_per_sec", baseline_ops));
345 let floor = baseline_ops * (1.0 - opts.ops_drop_pct_threshold / 100.0);
346 if self.ops_per_sec() < floor {
347 regressions.push(format!(
348 "ops_per_sec {:.0} < floor {:.0} ({}% drop allowed)",
349 self.ops_per_sec(),
350 floor,
351 opts.ops_drop_pct_threshold
352 ));
353 }
354 }
355
356 if let (Some(baseline_p99), Some(latency)) = (opts.baseline_p99, &self.latency) {
358 evidence.push(Evidence::numeric(
359 "baseline_p99_ns",
360 baseline_p99.as_nanos() as f64,
361 ));
362 let allowed =
363 baseline_p99.as_nanos() as f64 * (1.0 + opts.p99_regression_pct_threshold / 100.0);
364 if (latency.p99.as_nanos() as f64) > allowed {
365 regressions.push(format!(
366 "p99_ns {} > allowed {:.0} ({}% regression allowed)",
367 latency.p99.as_nanos(),
368 allowed,
369 opts.p99_regression_pct_threshold
370 ));
371 }
372 }
373
374 let detail = self.detail_string();
375
376 if regressions.is_empty() {
377 let mut c = CheckResult::pass(name).with_detail(detail);
379 c.tags = tags;
380 c.evidence = evidence;
381 return c;
382 }
383
384 tags.push("regression".to_string());
385 let combined_detail = format!("{} :: {}", detail, regressions.join("; "));
386 let mut c = CheckResult::fail(name, Severity::Warning).with_detail(combined_detail);
387 c.tags = tags;
388 c.evidence = evidence;
389 c
390 }
391
392 pub fn into_report(self, subject_version: impl Into<String>, opts: &CompareOptions) -> Report {
396 let name = self.name.clone();
397 let check = self.compare_with_options(opts);
398 let mut r = Report::new(name, subject_version).with_producer("dev-stress");
399 r.push(check);
400 r.finish();
401 r
402 }
403
404 fn numeric_evidence(&self) -> Vec<Evidence> {
405 let mut e = vec![
406 Evidence::numeric("iterations", self.iterations as f64),
407 Evidence::numeric("threads", self.threads as f64),
408 Evidence::numeric("ops_per_sec", self.ops_per_sec()),
409 Evidence::numeric("thread_time_cv", self.thread_time_cv()),
410 Evidence::numeric(
411 "total_elapsed_ms",
412 self.total_elapsed.as_secs_f64() * 1000.0,
413 ),
414 ];
415 if let Some(lat) = &self.latency {
416 e.push(Evidence::numeric(
417 "latency_p50_ns",
418 lat.p50.as_nanos() as f64,
419 ));
420 e.push(Evidence::numeric(
421 "latency_p95_ns",
422 lat.p95.as_nanos() as f64,
423 ));
424 e.push(Evidence::numeric(
425 "latency_p99_ns",
426 lat.p99.as_nanos() as f64,
427 ));
428 e.push(Evidence::numeric(
429 "latency_samples",
430 lat.samples_count as f64,
431 ));
432 }
433 e
434 }
435
436 fn detail_string(&self) -> String {
437 let lat = match &self.latency {
438 Some(l) => format!(
439 ", p50={}ns, p95={}ns, p99={}ns",
440 l.p50.as_nanos(),
441 l.p95.as_nanos(),
442 l.p99.as_nanos()
443 ),
444 None => String::new(),
445 };
446 format!(
447 "iterations={}, threads={}, total={:.3}s, ops/sec={:.0}, thread_cv={:.3}{}",
448 self.iterations,
449 self.threads,
450 self.total_elapsed.as_secs_f64(),
451 self.ops_per_sec(),
452 self.thread_time_cv(),
453 lat
454 )
455 }
456}
457
458#[derive(Debug, Clone)]
476pub struct CompareOptions {
477 pub baseline_ops_per_sec: Option<f64>,
480 pub ops_drop_pct_threshold: f64,
482 pub baseline_p99: Option<Duration>,
485 pub p99_regression_pct_threshold: f64,
487}
488
489impl Default for CompareOptions {
490 fn default() -> Self {
491 Self {
492 baseline_ops_per_sec: None,
493 ops_drop_pct_threshold: 10.0,
494 baseline_p99: None,
495 p99_regression_pct_threshold: 20.0,
496 }
497 }
498}
499
500pub struct StressProducer<F>
521where
522 F: Fn() -> StressResult,
523{
524 run: F,
525 subject_version: String,
526 opts: CompareOptions,
527}
528
529impl<F> StressProducer<F>
530where
531 F: Fn() -> StressResult,
532{
533 pub fn new(run: F, subject_version: impl Into<String>, opts: CompareOptions) -> Self {
535 Self {
536 run,
537 subject_version: subject_version.into(),
538 opts,
539 }
540 }
541}
542
543impl<F> Producer for StressProducer<F>
544where
545 F: Fn() -> StressResult,
546{
547 fn produce(&self) -> Report {
548 let result = (self.run)();
549 result.into_report(self.subject_version.clone(), &self.opts)
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use dev_report::Verdict;
557
558 #[derive(Clone)]
559 struct Noop;
560 impl Workload for Noop {
561 fn run_once(&self) {
562 std::hint::black_box(1 + 1);
563 }
564 }
565
566 #[test]
567 fn run_completes() {
568 let run = StressRun::new("noop").iterations(1_000).threads(2);
569 let r = run.execute(&Noop);
570 assert_eq!(r.iterations, 1_000);
571 assert_eq!(r.threads, 2);
572 assert!(r.ops_per_sec() > 0.0);
573 }
574
575 #[test]
576 fn no_baseline_passes() {
577 let run = StressRun::new("noop").iterations(100).threads(1);
578 let r = run.execute(&Noop);
579 let c = r.into_check_result(None);
580 assert_eq!(c.verdict, Verdict::Pass);
581 assert!(c.has_tag("stress"));
582 }
583
584 #[test]
585 fn check_result_has_numeric_evidence() {
586 let run = StressRun::new("noop").iterations(100).threads(2);
587 let r = run.execute(&Noop);
588 let c = r.into_check_result(None);
589 let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
590 assert!(labels.contains(&"iterations"));
591 assert!(labels.contains(&"threads"));
592 assert!(labels.contains(&"ops_per_sec"));
593 assert!(labels.contains(&"thread_time_cv"));
594 assert!(labels.contains(&"total_elapsed_ms"));
595 }
596
597 #[test]
598 fn ops_drop_below_threshold_fails() {
599 let run = StressRun::new("x").iterations(50).threads(1);
600 let r = run.execute(&Noop);
601 let baseline = r.ops_per_sec() * 100.0; let opts = CompareOptions {
603 baseline_ops_per_sec: Some(baseline),
604 ops_drop_pct_threshold: 10.0,
605 ..CompareOptions::default()
606 };
607 let c = r.compare_with_options(&opts);
608 assert_eq!(c.verdict, Verdict::Fail);
609 assert!(c.has_tag("regression"));
610 }
611
612 #[test]
613 fn ops_within_threshold_passes() {
614 let run = StressRun::new("x").iterations(50).threads(1);
615 let r = run.execute(&Noop);
616 let baseline = r.ops_per_sec(); let opts = CompareOptions {
618 baseline_ops_per_sec: Some(baseline),
619 ops_drop_pct_threshold: 10.0,
620 ..CompareOptions::default()
621 };
622 let c = r.compare_with_options(&opts);
623 assert_eq!(c.verdict, Verdict::Pass);
624 }
625
626 #[test]
627 fn latency_tracking_records_percentiles() {
628 let run = StressRun::new("hot")
629 .iterations(1_000)
630 .threads(2)
631 .track_latency(1);
632 let r = run.execute(&Noop);
633 let lat = r.latency.expect("latency tracked");
634 assert!(lat.samples_count > 0);
635 assert!(lat.p50.as_nanos() <= lat.p95.as_nanos());
636 assert!(lat.p95.as_nanos() <= lat.p99.as_nanos());
637 }
638
639 #[test]
640 fn p99_regression_detected() {
641 let run = StressRun::new("hot")
642 .iterations(200)
643 .threads(2)
644 .track_latency(1);
645 let r = run.execute(&Noop);
646 let opts = CompareOptions {
648 baseline_p99: Some(Duration::from_nanos(1)),
649 p99_regression_pct_threshold: 0.0,
650 ..CompareOptions::default()
651 };
652 let c = r.compare_with_options(&opts);
653 if r.latency.as_ref().unwrap().p99.as_nanos() > 1 {
655 assert_eq!(c.verdict, Verdict::Fail);
656 assert!(c.has_tag("regression"));
657 }
658 }
659
660 #[test]
661 fn into_report_emits_one_check() {
662 let run = StressRun::new("noop").iterations(100).threads(2);
663 let r = run.execute(&Noop);
664 let report = r.into_report("0.1.0", &CompareOptions::default());
665 assert_eq!(report.checks.len(), 1);
666 assert_eq!(report.producer.as_deref(), Some("dev-stress"));
667 assert_eq!(report.subject, "noop");
668 }
669
670 #[test]
671 fn stress_producer_implements_producer_trait() {
672 let producer = StressProducer::new(
673 || {
674 StressRun::new("hot")
675 .iterations(50)
676 .threads(1)
677 .execute(&Noop)
678 },
679 "0.1.0",
680 CompareOptions::default(),
681 );
682 let report = producer.produce();
683 assert_eq!(report.checks.len(), 1);
684 }
685
686 #[test]
687 fn iterations_distribute_evenly_with_leftover() {
688 let run = StressRun::new("x").iterations(7).threads(3);
690 let r = run.execute(&Noop);
691 assert_eq!(r.iterations, 7);
692 assert_eq!(r.thread_times.len(), 3);
693 }
694}