1#![cfg_attr(docsrs, feature(doc_cfg))]
47#![warn(missing_docs)]
48#![warn(rust_2018_idioms)]
49
50use std::sync::Arc;
51use std::time::{Duration, Instant};
52
53use dev_report::{CheckResult, Evidence, Producer, Report, Severity};
54
55pub mod latency;
56pub mod soak;
57
58#[cfg(feature = "system-stats")]
59#[cfg_attr(docsrs, doc(cfg(feature = "system-stats")))]
60pub mod system;
61
62pub use latency::{LatencyStats, LatencyTracker};
63pub use soak::{SoakCheckpoint, SoakResult, SoakRun};
64
65pub trait Workload: Send + Sync {
85 fn run_once(&self);
88}
89
90pub struct StressRun {
102 name: String,
103 iterations: usize,
104 threads: usize,
105 track_latency: Option<usize>, target_ops_per_sec_per_thread: Option<f64>,
108}
109
110impl StressRun {
111 pub fn new(name: impl Into<String>) -> Self {
113 Self {
114 name: name.into(),
115 iterations: 1_000,
116 threads: 1,
117 track_latency: None,
118 target_ops_per_sec_per_thread: None,
119 }
120 }
121
122 pub fn iterations(mut self, n: usize) -> Self {
124 self.iterations = n;
125 self
126 }
127
128 pub fn threads(mut self, n: usize) -> Self {
130 self.threads = n.max(1);
131 self
132 }
133
134 pub fn track_latency(mut self, rate: usize) -> Self {
150 self.track_latency = Some(rate.max(1));
151 self
152 }
153
154 pub fn target_ops_per_sec(mut self, total_rate: f64) -> Self {
175 if total_rate <= 0.0 {
176 self.target_ops_per_sec_per_thread = None;
177 } else {
178 let per_thread = total_rate / (self.threads.max(1) as f64);
179 self.target_ops_per_sec_per_thread = Some(per_thread);
180 }
181 self
182 }
183
184 pub fn iterations_planned(&self) -> usize {
186 self.iterations
187 }
188
189 pub fn threads_planned(&self) -> usize {
191 self.threads
192 }
193
194 pub fn target_ops_per_sec_per_thread(&self) -> Option<f64> {
196 self.target_ops_per_sec_per_thread
197 }
198
199 pub fn execute<W>(&self, workload: &W) -> StressResult
201 where
202 W: Workload + Clone + 'static,
203 {
204 let per_thread = self.iterations / self.threads;
205 let leftover = self.iterations % self.threads;
206 let started = Instant::now();
207 let mut handles = Vec::with_capacity(self.threads);
208 let workload = Arc::new(workload.clone());
209
210 for t in 0..self.threads {
211 let count = per_thread + if t < leftover { 1 } else { 0 };
212 let w = workload.clone();
213 let track = self.track_latency;
214 let target_rate = self.target_ops_per_sec_per_thread;
215 handles.push(std::thread::spawn(move || {
216 let start = Instant::now();
217 let mut tracker = track.map(LatencyTracker::new);
218 let interval_s = target_rate.map(|r| 1.0 / r);
220 for i in 0..count {
221 if let (Some(interval), idx) = (interval_s, i) {
222 let target = start + Duration::from_secs_f64(interval * idx as f64);
223 let now = Instant::now();
224 if target > now {
225 std::thread::sleep(target - now);
226 }
227 }
228 if let Some(t) = tracker.as_mut() {
229 t.record(i, || w.run_once());
230 } else {
231 w.run_once();
232 }
233 }
234 (start.elapsed(), tracker)
235 }));
236 }
237
238 let mut thread_times = Vec::with_capacity(self.threads);
239 let mut latency_samples: Vec<Duration> = Vec::new();
240 for h in handles {
241 let (elapsed, tracker) = h.join().unwrap();
242 thread_times.push(elapsed);
243 if let Some(t) = tracker {
244 latency_samples.extend(t.into_samples());
245 }
246 }
247 let total_elapsed = started.elapsed();
248
249 StressResult {
250 name: self.name.clone(),
251 iterations: self.iterations,
252 threads: self.threads,
253 total_elapsed,
254 thread_times,
255 latency: if latency_samples.is_empty() {
256 None
257 } else {
258 Some(LatencyStats::from_samples(latency_samples))
259 },
260 }
261 }
262}
263
264#[derive(Debug, Clone)]
281pub struct StressResult {
282 pub name: String,
284 pub iterations: usize,
286 pub threads: usize,
288 pub total_elapsed: Duration,
290 pub thread_times: Vec<Duration>,
292 pub latency: Option<LatencyStats>,
295}
296
297impl StressResult {
298 pub fn ops_per_sec(&self) -> f64 {
300 if self.total_elapsed.is_zero() {
301 return 0.0;
302 }
303 self.iterations as f64 / self.total_elapsed.as_secs_f64()
304 }
305
306 pub fn thread_time_cv(&self) -> f64 {
309 if self.thread_times.is_empty() {
310 return 0.0;
311 }
312 let n = self.thread_times.len() as f64;
313 let mean: f64 = self
314 .thread_times
315 .iter()
316 .map(|d| d.as_secs_f64())
317 .sum::<f64>()
318 / n;
319 if mean == 0.0 {
320 return 0.0;
321 }
322 let var = self
323 .thread_times
324 .iter()
325 .map(|d| (d.as_secs_f64() - mean).powi(2))
326 .sum::<f64>()
327 / n;
328 var.sqrt() / mean
329 }
330
331 pub fn into_check_result(self, baseline_ops_per_sec: Option<f64>) -> CheckResult {
351 self.compare_with_options(&CompareOptions {
352 baseline_ops_per_sec,
353 ..CompareOptions::default()
354 })
355 }
356
357 pub fn compare_with_options(&self, opts: &CompareOptions) -> CheckResult {
384 let name = format!("stress::{}", self.name);
385 let mut evidence = self.numeric_evidence();
386 let mut tags = vec!["stress".to_string()];
387
388 let mut regressions: Vec<String> = Vec::new();
389
390 if let Some(baseline_ops) = opts.baseline_ops_per_sec {
392 evidence.push(Evidence::numeric("baseline_ops_per_sec", baseline_ops));
393 let floor = baseline_ops * (1.0 - opts.ops_drop_pct_threshold / 100.0);
394 if self.ops_per_sec() < floor {
395 regressions.push(format!(
396 "ops_per_sec {:.0} < floor {:.0} ({}% drop allowed)",
397 self.ops_per_sec(),
398 floor,
399 opts.ops_drop_pct_threshold
400 ));
401 }
402 }
403
404 if let (Some(baseline_p99), Some(latency)) = (opts.baseline_p99, &self.latency) {
406 evidence.push(Evidence::numeric(
407 "baseline_p99_ns",
408 baseline_p99.as_nanos() as f64,
409 ));
410 let allowed =
411 baseline_p99.as_nanos() as f64 * (1.0 + opts.p99_regression_pct_threshold / 100.0);
412 if (latency.p99.as_nanos() as f64) > allowed {
413 regressions.push(format!(
414 "p99_ns {} > allowed {:.0} ({}% regression allowed)",
415 latency.p99.as_nanos(),
416 allowed,
417 opts.p99_regression_pct_threshold
418 ));
419 }
420 }
421
422 let detail = self.detail_string();
423
424 if regressions.is_empty() {
425 let mut c = CheckResult::pass(name).with_detail(detail);
427 c.tags = tags;
428 c.evidence = evidence;
429 return c;
430 }
431
432 tags.push("regression".to_string());
433 let combined_detail = format!("{} :: {}", detail, regressions.join("; "));
434 let mut c = CheckResult::fail(name, Severity::Warning).with_detail(combined_detail);
435 c.tags = tags;
436 c.evidence = evidence;
437 c
438 }
439
440 pub fn into_report(self, subject_version: impl Into<String>, opts: &CompareOptions) -> Report {
444 let name = self.name.clone();
445 let check = self.compare_with_options(opts);
446 let mut r = Report::new(name, subject_version).with_producer("dev-stress");
447 r.push(check);
448 r.finish();
449 r
450 }
451
452 fn numeric_evidence(&self) -> Vec<Evidence> {
453 let mut e = vec![
454 Evidence::numeric("iterations", self.iterations as f64),
455 Evidence::numeric("threads", self.threads as f64),
456 Evidence::numeric("ops_per_sec", self.ops_per_sec()),
457 Evidence::numeric("thread_time_cv", self.thread_time_cv()),
458 Evidence::numeric(
459 "total_elapsed_ms",
460 self.total_elapsed.as_secs_f64() * 1000.0,
461 ),
462 ];
463 if let Some(lat) = &self.latency {
464 e.push(Evidence::numeric(
465 "latency_p50_ns",
466 lat.p50.as_nanos() as f64,
467 ));
468 e.push(Evidence::numeric(
469 "latency_p95_ns",
470 lat.p95.as_nanos() as f64,
471 ));
472 e.push(Evidence::numeric(
473 "latency_p99_ns",
474 lat.p99.as_nanos() as f64,
475 ));
476 e.push(Evidence::numeric(
477 "latency_samples",
478 lat.samples_count as f64,
479 ));
480 }
481 e
482 }
483
484 fn detail_string(&self) -> String {
485 let lat = match &self.latency {
486 Some(l) => format!(
487 ", p50={}ns, p95={}ns, p99={}ns",
488 l.p50.as_nanos(),
489 l.p95.as_nanos(),
490 l.p99.as_nanos()
491 ),
492 None => String::new(),
493 };
494 format!(
495 "iterations={}, threads={}, total={:.3}s, ops/sec={:.0}, thread_cv={:.3}{}",
496 self.iterations,
497 self.threads,
498 self.total_elapsed.as_secs_f64(),
499 self.ops_per_sec(),
500 self.thread_time_cv(),
501 lat
502 )
503 }
504}
505
506#[derive(Debug, Clone)]
524pub struct CompareOptions {
525 pub baseline_ops_per_sec: Option<f64>,
528 pub ops_drop_pct_threshold: f64,
530 pub baseline_p99: Option<Duration>,
533 pub p99_regression_pct_threshold: f64,
535}
536
537impl Default for CompareOptions {
538 fn default() -> Self {
539 Self {
540 baseline_ops_per_sec: None,
541 ops_drop_pct_threshold: 10.0,
542 baseline_p99: None,
543 p99_regression_pct_threshold: 20.0,
544 }
545 }
546}
547
548pub struct StressProducer<F>
569where
570 F: Fn() -> StressResult,
571{
572 run: F,
573 subject_version: String,
574 opts: CompareOptions,
575}
576
577impl<F> StressProducer<F>
578where
579 F: Fn() -> StressResult,
580{
581 pub fn new(run: F, subject_version: impl Into<String>, opts: CompareOptions) -> Self {
583 Self {
584 run,
585 subject_version: subject_version.into(),
586 opts,
587 }
588 }
589}
590
591impl<F> Producer for StressProducer<F>
592where
593 F: Fn() -> StressResult,
594{
595 fn produce(&self) -> Report {
596 let result = (self.run)();
597 result.into_report(self.subject_version.clone(), &self.opts)
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604 use dev_report::Verdict;
605
606 #[derive(Clone)]
607 struct Noop;
608 impl Workload for Noop {
609 fn run_once(&self) {
610 std::hint::black_box(1 + 1);
611 }
612 }
613
614 #[test]
615 fn run_completes() {
616 let run = StressRun::new("noop").iterations(1_000).threads(2);
617 let r = run.execute(&Noop);
618 assert_eq!(r.iterations, 1_000);
619 assert_eq!(r.threads, 2);
620 assert!(r.ops_per_sec() > 0.0);
621 }
622
623 #[test]
624 fn no_baseline_passes() {
625 let run = StressRun::new("noop").iterations(100).threads(1);
626 let r = run.execute(&Noop);
627 let c = r.into_check_result(None);
628 assert_eq!(c.verdict, Verdict::Pass);
629 assert!(c.has_tag("stress"));
630 }
631
632 #[test]
633 fn check_result_has_numeric_evidence() {
634 let run = StressRun::new("noop").iterations(100).threads(2);
635 let r = run.execute(&Noop);
636 let c = r.into_check_result(None);
637 let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
638 assert!(labels.contains(&"iterations"));
639 assert!(labels.contains(&"threads"));
640 assert!(labels.contains(&"ops_per_sec"));
641 assert!(labels.contains(&"thread_time_cv"));
642 assert!(labels.contains(&"total_elapsed_ms"));
643 }
644
645 #[test]
646 fn ops_drop_below_threshold_fails() {
647 let run = StressRun::new("x").iterations(50).threads(1);
648 let r = run.execute(&Noop);
649 let baseline = r.ops_per_sec() * 100.0; let opts = CompareOptions {
651 baseline_ops_per_sec: Some(baseline),
652 ops_drop_pct_threshold: 10.0,
653 ..CompareOptions::default()
654 };
655 let c = r.compare_with_options(&opts);
656 assert_eq!(c.verdict, Verdict::Fail);
657 assert!(c.has_tag("regression"));
658 }
659
660 #[test]
661 fn ops_within_threshold_passes() {
662 let run = StressRun::new("x").iterations(50).threads(1);
663 let r = run.execute(&Noop);
664 let baseline = r.ops_per_sec(); let opts = CompareOptions {
666 baseline_ops_per_sec: Some(baseline),
667 ops_drop_pct_threshold: 10.0,
668 ..CompareOptions::default()
669 };
670 let c = r.compare_with_options(&opts);
671 assert_eq!(c.verdict, Verdict::Pass);
672 }
673
674 #[test]
675 fn latency_tracking_records_percentiles() {
676 let run = StressRun::new("hot")
677 .iterations(1_000)
678 .threads(2)
679 .track_latency(1);
680 let r = run.execute(&Noop);
681 let lat = r.latency.expect("latency tracked");
682 assert!(lat.samples_count > 0);
683 assert!(lat.p50.as_nanos() <= lat.p95.as_nanos());
684 assert!(lat.p95.as_nanos() <= lat.p99.as_nanos());
685 }
686
687 #[test]
688 fn p99_regression_detected() {
689 let run = StressRun::new("hot")
690 .iterations(200)
691 .threads(2)
692 .track_latency(1);
693 let r = run.execute(&Noop);
694 let opts = CompareOptions {
696 baseline_p99: Some(Duration::from_nanos(1)),
697 p99_regression_pct_threshold: 0.0,
698 ..CompareOptions::default()
699 };
700 let c = r.compare_with_options(&opts);
701 if r.latency.as_ref().unwrap().p99.as_nanos() > 1 {
703 assert_eq!(c.verdict, Verdict::Fail);
704 assert!(c.has_tag("regression"));
705 }
706 }
707
708 #[test]
709 fn into_report_emits_one_check() {
710 let run = StressRun::new("noop").iterations(100).threads(2);
711 let r = run.execute(&Noop);
712 let report = r.into_report("0.1.0", &CompareOptions::default());
713 assert_eq!(report.checks.len(), 1);
714 assert_eq!(report.producer.as_deref(), Some("dev-stress"));
715 assert_eq!(report.subject, "noop");
716 }
717
718 #[test]
719 fn stress_producer_implements_producer_trait() {
720 let producer = StressProducer::new(
721 || {
722 StressRun::new("hot")
723 .iterations(50)
724 .threads(1)
725 .execute(&Noop)
726 },
727 "0.1.0",
728 CompareOptions::default(),
729 );
730 let report = producer.produce();
731 assert_eq!(report.checks.len(), 1);
732 }
733
734 #[test]
735 fn iterations_distribute_evenly_with_leftover() {
736 let run = StressRun::new("x").iterations(7).threads(3);
738 let r = run.execute(&Noop);
739 assert_eq!(r.iterations, 7);
740 assert_eq!(r.thread_times.len(), 3);
741 }
742
743 #[test]
744 fn target_ops_per_sec_divides_across_threads() {
745 let run = StressRun::new("x")
746 .iterations(100)
747 .threads(4)
748 .target_ops_per_sec(1000.0);
749 assert_eq!(run.target_ops_per_sec_per_thread(), Some(250.0));
751 }
752
753 #[test]
754 fn target_ops_per_sec_zero_or_negative_disables() {
755 let run = StressRun::new("x")
756 .iterations(10)
757 .threads(2)
758 .target_ops_per_sec(0.0);
759 assert_eq!(run.target_ops_per_sec_per_thread(), None);
760 let run = StressRun::new("x")
761 .iterations(10)
762 .threads(2)
763 .target_ops_per_sec(-5.0);
764 assert_eq!(run.target_ops_per_sec_per_thread(), None);
765 }
766
767 #[test]
768 fn rate_limited_run_takes_at_least_target_duration() {
769 let started = Instant::now();
773 let run = StressRun::new("x")
774 .iterations(50)
775 .threads(1)
776 .target_ops_per_sec(100.0);
777 let _ = run.execute(&Noop);
778 let elapsed = started.elapsed();
779 assert!(elapsed >= Duration::from_millis(100));
782 }
783}