1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use dev_report::{CheckResult, Evidence, Severity};
14
15use crate::{LatencyStats, LatencyTracker, Workload};
16
17pub struct SoakRun {
31 name: String,
32 total_duration: Duration,
33 checkpoint_interval: Duration,
34 threads: usize,
35 track_latency: Option<usize>,
36}
37
38impl SoakRun {
39 pub fn new(name: impl Into<String>) -> Self {
41 Self {
42 name: name.into(),
43 total_duration: Duration::from_secs(60),
44 checkpoint_interval: Duration::from_secs(10),
45 threads: 1,
46 track_latency: None,
47 }
48 }
49
50 pub fn duration(mut self, d: Duration) -> Self {
52 self.total_duration = d;
53 self
54 }
55
56 pub fn checkpoint(mut self, d: Duration) -> Self {
58 self.checkpoint_interval = d;
59 self
60 }
61
62 pub fn threads(mut self, n: usize) -> Self {
64 self.threads = n.max(1);
65 self
66 }
67
68 pub fn track_latency(mut self, rate: usize) -> Self {
70 self.track_latency = Some(rate.max(1));
71 self
72 }
73
74 pub fn execute<W>(&self, workload: &W) -> SoakResult
80 where
81 W: Workload + Clone + 'static,
82 {
83 let stop = Arc::new(AtomicBool::new(false));
84 let total_iters = Arc::new(AtomicUsize::new(0));
85 let workload = Arc::new(workload.clone());
86 let started = Instant::now();
87
88 let mut handles = Vec::with_capacity(self.threads);
90 for _ in 0..self.threads {
91 let w = workload.clone();
92 let stop = stop.clone();
93 let total = total_iters.clone();
94 let track = self.track_latency;
95 handles.push(std::thread::spawn(move || {
96 let start = Instant::now();
97 let mut tracker = track.map(LatencyTracker::new);
98 let mut local_count: usize = 0;
99 while !stop.load(Ordering::Relaxed) {
100 if let Some(t) = tracker.as_mut() {
101 t.record(local_count, || w.run_once());
102 } else {
103 w.run_once();
104 }
105 local_count = local_count.wrapping_add(1);
106 if local_count % 1024 == 0 {
108 total.fetch_add(1024, Ordering::Relaxed);
109 }
110 }
111 let remainder = local_count % 1024;
113 if remainder != 0 {
114 total.fetch_add(remainder, Ordering::Relaxed);
115 }
116 (start.elapsed(), tracker)
117 }));
118 }
119
120 let mut checkpoints: Vec<SoakCheckpoint> = Vec::new();
125 let mut last_iters = 0usize;
126 let mut last_at = started;
127 let end_at = started + self.total_duration;
128 loop {
129 let now = Instant::now();
130 if now >= end_at {
131 break;
132 }
133 let next = (last_at + self.checkpoint_interval).min(end_at);
134 let sleep_for = next.saturating_duration_since(now);
135 std::thread::sleep(sleep_for);
136 let now_iters = total_iters.load(Ordering::Relaxed);
137 let window_iters = now_iters - last_iters;
138 let window_dur = next - last_at;
139 let ops_per_sec = if window_dur.is_zero() {
140 0.0
141 } else {
142 window_iters as f64 / window_dur.as_secs_f64()
143 };
144 checkpoints.push(SoakCheckpoint {
145 at_offset: next - started,
146 window_iters,
147 window_duration: window_dur,
148 ops_per_sec,
149 });
150 last_iters = now_iters;
151 last_at = next;
152 }
153 stop.store(true, Ordering::Relaxed);
154
155 let mut thread_times = Vec::with_capacity(self.threads);
156 let mut latency_samples: Vec<Duration> = Vec::new();
157 for h in handles {
158 let (elapsed, tracker) = h.join().unwrap();
159 thread_times.push(elapsed);
160 if let Some(t) = tracker {
161 latency_samples.extend(t.into_samples());
162 }
163 }
164 let total_elapsed = started.elapsed();
165 let total_iters_final = total_iters.load(Ordering::Relaxed);
166
167 SoakResult {
168 name: self.name.clone(),
169 iterations: total_iters_final,
170 threads: self.threads,
171 total_elapsed,
172 thread_times,
173 latency: if latency_samples.is_empty() {
174 None
175 } else {
176 Some(LatencyStats::from_samples(latency_samples))
177 },
178 checkpoints,
179 }
180 }
181}
182
183#[derive(Debug, Clone, Copy, PartialEq)]
185pub struct SoakCheckpoint {
186 pub at_offset: Duration,
188 pub window_iters: usize,
190 pub window_duration: Duration,
192 pub ops_per_sec: f64,
194}
195
196#[derive(Debug, Clone)]
216pub struct SoakResult {
217 pub name: String,
219 pub iterations: usize,
221 pub threads: usize,
223 pub total_elapsed: Duration,
225 pub thread_times: Vec<Duration>,
227 pub latency: Option<LatencyStats>,
229 pub checkpoints: Vec<SoakCheckpoint>,
231}
232
233impl SoakResult {
234 pub fn ops_per_sec(&self) -> f64 {
236 if self.total_elapsed.is_zero() {
237 return 0.0;
238 }
239 self.iterations as f64 / self.total_elapsed.as_secs_f64()
240 }
241
242 pub fn checkpoint_ops_cv(&self) -> f64 {
247 if self.checkpoints.len() < 2 {
248 return 0.0;
249 }
250 let n = self.checkpoints.len() as f64;
251 let mean: f64 = self.checkpoints.iter().map(|c| c.ops_per_sec).sum::<f64>() / n;
252 if mean == 0.0 {
253 return 0.0;
254 }
255 let var = self
256 .checkpoints
257 .iter()
258 .map(|c| (c.ops_per_sec - mean).powi(2))
259 .sum::<f64>()
260 / n;
261 var.sqrt() / mean
262 }
263
264 pub fn into_check_result(self, degradation_pct_threshold: f64) -> CheckResult {
277 let name = format!("stress::soak::{}", self.name);
278 let mut evidence = vec![
279 Evidence::numeric("iterations", self.iterations as f64),
280 Evidence::numeric("threads", self.threads as f64),
281 Evidence::numeric("ops_per_sec", self.ops_per_sec()),
282 Evidence::numeric(
283 "total_elapsed_ms",
284 self.total_elapsed.as_secs_f64() * 1000.0,
285 ),
286 Evidence::numeric("checkpoint_count", self.checkpoints.len() as f64),
287 Evidence::numeric("checkpoint_ops_cv", self.checkpoint_ops_cv()),
288 ];
289 if let Some(lat) = &self.latency {
290 evidence.push(Evidence::numeric(
291 "latency_p50_ns",
292 lat.p50.as_nanos() as f64,
293 ));
294 evidence.push(Evidence::numeric(
295 "latency_p95_ns",
296 lat.p95.as_nanos() as f64,
297 ));
298 evidence.push(Evidence::numeric(
299 "latency_p99_ns",
300 lat.p99.as_nanos() as f64,
301 ));
302 }
303 let tags = vec!["stress".to_string(), "soak".to_string()];
304
305 if self.checkpoints.len() < 2 {
306 let mut c = CheckResult::skip(name).with_detail(format!(
307 "fewer than 2 checkpoints (got {})",
308 self.checkpoints.len()
309 ));
310 c.tags = tags;
311 c.evidence = evidence;
312 return c;
313 }
314
315 let mid = self.checkpoints.len() / 2;
316 let first_half_mean = mean_ops(&self.checkpoints[..mid]);
317 let second_half_mean = mean_ops(&self.checkpoints[mid..]);
318 evidence.push(Evidence::numeric("first_half_ops", first_half_mean));
319 evidence.push(Evidence::numeric("second_half_ops", second_half_mean));
320
321 if first_half_mean == 0.0 {
322 let mut c = CheckResult::pass(name)
323 .with_detail("first-half throughput was zero, skipping degradation check");
324 c.tags = tags;
325 c.evidence = evidence;
326 return c;
327 }
328
329 let drop_pct = ((first_half_mean - second_half_mean) / first_half_mean) * 100.0;
330 let detail = format!(
331 "iterations={} elapsed={:.3}s ops/sec={:.0} checkpoints={} first_half_ops={:.0} second_half_ops={:.0} drop={:.2}%",
332 self.iterations,
333 self.total_elapsed.as_secs_f64(),
334 self.ops_per_sec(),
335 self.checkpoints.len(),
336 first_half_mean,
337 second_half_mean,
338 drop_pct
339 );
340
341 if drop_pct > degradation_pct_threshold {
342 let mut tags = tags;
343 tags.push("regression".to_string());
344 let mut c = CheckResult::fail(name, Severity::Warning).with_detail(detail);
345 c.tags = tags;
346 c.evidence = evidence;
347 c
348 } else {
349 let mut c = CheckResult::pass(name).with_detail(detail);
350 c.tags = tags;
351 c.evidence = evidence;
352 c
353 }
354 }
355}
356
357fn mean_ops(checkpoints: &[SoakCheckpoint]) -> f64 {
358 if checkpoints.is_empty() {
359 return 0.0;
360 }
361 checkpoints.iter().map(|c| c.ops_per_sec).sum::<f64>() / checkpoints.len() as f64
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use dev_report::Verdict;
368
369 #[derive(Clone)]
370 struct Noop;
371 impl Workload for Noop {
372 fn run_once(&self) {
373 std::hint::black_box(1 + 1);
374 }
375 }
376
377 #[test]
378 fn soak_runs_for_duration_and_records_checkpoints() {
379 let r = SoakRun::new("steady")
380 .duration(Duration::from_millis(150))
381 .checkpoint(Duration::from_millis(50))
382 .threads(2)
383 .execute(&Noop);
384 assert!(r.iterations > 0);
385 assert!(!r.checkpoints.is_empty());
386 assert!(r.total_elapsed >= Duration::from_millis(140));
387 }
388
389 #[test]
390 fn soak_fewer_than_two_checkpoints_skips() {
391 let r = SoakRun::new("brief")
392 .duration(Duration::from_millis(20))
393 .checkpoint(Duration::from_millis(50))
394 .threads(1)
395 .execute(&Noop);
396 let c = r.into_check_result(20.0);
397 if c.verdict != Verdict::Skip {
399 assert_ne!(c.verdict, Verdict::Fail);
402 }
403 }
404
405 #[test]
406 fn soak_with_latency_tracking_records_percentiles() {
407 let r = SoakRun::new("hot")
408 .duration(Duration::from_millis(80))
409 .checkpoint(Duration::from_millis(20))
410 .threads(2)
411 .track_latency(1)
412 .execute(&Noop);
413 assert!(r.latency.is_some());
414 }
415
416 #[test]
417 fn soak_check_carries_tags_and_evidence() {
418 let r = SoakRun::new("steady")
419 .duration(Duration::from_millis(80))
420 .checkpoint(Duration::from_millis(20))
421 .threads(1)
422 .execute(&Noop);
423 let c = r.into_check_result(20.0);
424 assert!(c.has_tag("stress"));
425 assert!(c.has_tag("soak"));
426 let labels: Vec<&str> = c.evidence.iter().map(|e| e.label.as_str()).collect();
427 assert!(labels.contains(&"checkpoint_count"));
428 assert!(labels.contains(&"checkpoint_ops_cv"));
429 }
430
431 #[test]
432 fn checkpoint_ops_cv_is_low_for_uniform_load() {
433 let r = SoakRun::new("steady")
434 .duration(Duration::from_millis(100))
435 .checkpoint(Duration::from_millis(20))
436 .threads(2)
437 .execute(&Noop);
438 assert!(r.checkpoint_ops_cv() >= 0.0);
440 }
441}