1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::time::{Duration, Instant};
10use tracing::instrument;
11
12use crate::{InvariantCheck, SimulationResult};
13
14use super::orchestrator::{IterationManager, MetricsCollector, WorkloadOrchestrator};
15use super::report::{SimulationMetrics, SimulationReport};
16use super::topology::{Workload, WorkloadTopology};
17
18#[derive(Debug, Clone)]
22pub enum IterationControl {
23 FixedCount(usize),
25 TimeLimit(Duration),
27 UntilAllSometimesReached(usize),
29}
30
31pub(crate) type WorkloadFn = Box<
33 dyn Fn(
34 crate::SimRandomProvider,
35 crate::SimNetworkProvider,
36 crate::SimTimeProvider,
37 crate::TokioTaskProvider,
38 WorkloadTopology,
39 ) -> Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>,
40>;
41
42pub struct SimulationBuilder {
44 iteration_control: IterationControl,
45 workloads: Vec<Workload>,
46 seeds: Vec<u64>,
47 next_ip: u32, use_random_config: bool,
49 invariants: Vec<InvariantCheck>,
50}
51
52impl Default for SimulationBuilder {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl SimulationBuilder {
59 pub fn new() -> Self {
61 Self {
62 iteration_control: IterationControl::FixedCount(1),
63 workloads: Vec::new(),
64 seeds: Vec::new(),
65 next_ip: 1, use_random_config: false,
67 invariants: Vec::new(),
68 }
69 }
70
71 pub fn register_workload<S, F, Fut>(mut self, name: S, workload: F) -> Self
77 where
78 S: Into<String>,
79 F: Fn(
80 crate::SimRandomProvider,
81 crate::SimNetworkProvider,
82 crate::SimTimeProvider,
83 crate::TokioTaskProvider,
84 WorkloadTopology,
85 ) -> Fut
86 + 'static,
87 Fut: Future<Output = SimulationResult<SimulationMetrics>> + 'static,
88 {
89 let ip_address = format!("10.0.0.{}", self.next_ip);
91 self.next_ip += 1;
92
93 let boxed_workload = Box::new(
94 move |random_provider, provider, time_provider, task_provider, topology| {
95 let fut = workload(
96 random_provider,
97 provider,
98 time_provider,
99 task_provider,
100 topology,
101 );
102 Box::pin(fut) as Pin<Box<dyn Future<Output = SimulationResult<SimulationMetrics>>>>
103 },
104 );
105
106 self.workloads.push(Workload {
107 name: name.into(),
108 ip_address,
109 workload: boxed_workload,
110 });
111 self
112 }
113
114 pub fn set_iterations(mut self, iterations: usize) -> Self {
116 self.iteration_control = IterationControl::FixedCount(iterations);
117 self
118 }
119
120 pub fn set_iteration_control(mut self, control: IterationControl) -> Self {
122 self.iteration_control = control;
123 self
124 }
125
126 pub fn set_time_limit(mut self, duration: Duration) -> Self {
128 self.iteration_control = IterationControl::TimeLimit(duration);
129 self
130 }
131
132 pub fn run_until_all_sometimes_reached(mut self, safety_limit: usize) -> Self {
134 self.iteration_control = IterationControl::UntilAllSometimesReached(safety_limit);
135 self
136 }
137
138 pub fn set_debug_seeds(mut self, seeds: Vec<u64>) -> Self {
158 self.seeds = seeds;
159 self
160 }
161
162 pub fn use_random_config(mut self) -> Self {
164 self.use_random_config = true;
165 self
166 }
167
168 pub fn with_invariants(mut self, invariants: Vec<InvariantCheck>) -> Self {
192 self.invariants = invariants;
193 self
194 }
195
196 #[instrument(skip_all)]
197 pub async fn run(self) -> SimulationReport {
199 if self.workloads.is_empty() {
200 return SimulationReport {
201 iterations: 0,
202 successful_runs: 0,
203 failed_runs: 0,
204 metrics: SimulationMetrics::default(),
205 individual_metrics: Vec::new(),
206 seeds_used: Vec::new(),
207 seeds_failing: Vec::new(),
208 assertion_results: HashMap::new(),
209 assertion_violations: Vec::new(),
210 };
211 }
212
213 let mut iteration_manager =
215 IterationManager::new(self.iteration_control.clone(), self.seeds.clone());
216 let mut metrics_collector = MetricsCollector::new();
217
218 while iteration_manager.should_continue() {
219 let seed = iteration_manager.next_iteration();
220 let iteration_count = iteration_manager.current_iteration();
221
222 crate::sim::reset_sim_rng();
224 crate::sim::set_sim_seed(seed);
225
226 crate::chaos::buggify_init(0.5, 0.25);
229
230 let network_config = if self.use_random_config {
232 crate::NetworkConfiguration::random_for_seed()
233 } else {
234 crate::NetworkConfiguration::default()
235 };
236
237 let sim = crate::sim::SimWorld::new_with_network_config_and_seed(network_config, seed);
239 let provider = sim.network_provider();
240
241 let start_time = Instant::now();
242
243 let shutdown_signal = tokio_util::sync::CancellationToken::new();
245
246 let orchestration_result = WorkloadOrchestrator::orchestrate_workloads(
248 &self.workloads,
249 seed,
250 provider,
251 sim,
252 shutdown_signal,
253 iteration_count,
254 &self.invariants,
255 )
256 .await;
257
258 let (all_results, sim_metrics) = match orchestration_result {
259 Ok((results, metrics)) => (results, metrics),
260 Err((faulty_seeds_from_deadlock, failed_count)) => {
261 metrics_collector.add_faulty_seeds(faulty_seeds_from_deadlock);
263 metrics_collector.add_failed_runs(failed_count);
264
265 let assertion_results = crate::chaos::get_assertion_results();
267 let assertion_violations = crate::chaos::validate_assertion_contracts();
268 crate::chaos::buggify_reset();
269
270 return metrics_collector.generate_report(
271 iteration_count,
272 iteration_manager.seeds_used().to_vec(),
273 assertion_results,
274 assertion_violations,
275 );
276 }
277 };
278
279 let wall_time = start_time.elapsed();
280
281 metrics_collector.record_iteration(seed, wall_time, all_results, sim_metrics);
283
284 crate::chaos::buggify_reset();
286 }
287
288 let iteration_count = iteration_manager.current_iteration();
292 let (successful_runs, failed_runs) = metrics_collector.current_stats();
293 tracing::info!(
294 "📊 Simulation completed: {}/{} iterations successful",
295 successful_runs,
296 iteration_count
297 );
298 tracing::info!("🌱 Seeds used: {:?}", iteration_manager.seeds_used());
299 if failed_runs > 0 {
300 tracing::warn!(
301 "⚠️ {} iterations failed - check logs above for failing seeds",
302 failed_runs
303 );
304 }
305
306 let assertion_results = crate::chaos::get_assertion_results();
308 let assertion_violations = crate::chaos::validate_assertion_contracts();
309
310 crate::chaos::buggify_reset();
312
313 metrics_collector.generate_report(
314 iteration_count,
315 iteration_manager.seeds_used().to_vec(),
316 assertion_results,
317 assertion_violations,
318 )
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::RandomProvider;
326
327 #[test]
328 fn test_simulation_builder_basic() {
329 let local_runtime = tokio::runtime::Builder::new_current_thread()
330 .enable_io()
331 .enable_time()
332 .build_local(Default::default())
333 .expect("Failed to build local runtime");
334
335 let report = local_runtime.block_on(async move {
336 SimulationBuilder::new()
337 .register_workload(
338 "test_workload",
339 |random, _provider, _time_provider, _task_provider, _topology| async move {
340 Ok(SimulationMetrics {
341 simulated_time: Duration::from_millis(random.random_range(0..100)),
342 events_processed: random.random_range(0..10),
343 ..Default::default()
344 })
345 },
346 )
347 .set_iterations(3)
348 .set_debug_seeds(vec![1, 2, 3])
349 .run()
350 .await
351 });
352
353 assert_eq!(report.iterations, 3);
354 assert_eq!(report.successful_runs, 3);
355 assert_eq!(report.failed_runs, 0);
356 assert_eq!(report.success_rate(), 100.0);
357
358 assert_eq!(report.seeds_used, vec![1, 2, 3]);
360 }
361
362 #[test]
363 fn test_simulation_builder_with_failures() {
364 let local_runtime = tokio::runtime::Builder::new_current_thread()
365 .enable_io()
366 .enable_time()
367 .build_local(Default::default())
368 .expect("Failed to build local runtime");
369
370 let report = local_runtime.block_on(async move {
371 SimulationBuilder::new()
372 .register_workload(
373 "failing_workload",
374 |random, _provider, _time_provider, _task_provider, _topology| async move {
375 let random_num = random.random_range(0..100);
377 if random_num % 2 == 0 {
378 Err(crate::SimulationError::InvalidState(
379 "Test failure".to_string(),
380 ))
381 } else {
382 Ok(SimulationMetrics {
383 simulated_time: Duration::from_millis(100),
384 events_processed: 5,
385 ..Default::default()
386 })
387 }
388 },
389 )
390 .set_iterations(4)
391 .set_debug_seeds(vec![1, 2, 5, 6]) .run()
393 .await
394 });
395
396 assert_eq!(report.iterations, 4);
397 assert_eq!(report.successful_runs, 2);
398 assert_eq!(report.failed_runs, 2);
399 assert_eq!(report.success_rate(), 50.0);
400
401 assert_eq!(report.average_simulated_time(), Duration::from_millis(100));
403 assert_eq!(report.average_events_processed(), 5.0);
404 }
405
406 #[tokio::test]
407 async fn test_simulation_report_display() {
408 let metrics = SimulationMetrics {
409 simulated_time: Duration::from_millis(200),
410 events_processed: 10,
411 ..Default::default()
412 };
413
414 let report = SimulationReport {
415 iterations: 2,
416 successful_runs: 2,
417 failed_runs: 0,
418 metrics,
419 individual_metrics: vec![],
420 seeds_used: vec![1, 2],
421 seeds_failing: vec![42],
422 assertion_results: HashMap::new(),
423 assertion_violations: Vec::new(),
424 };
425
426 let display = format!("{}", report);
427 assert!(display.contains("Iterations: 2"));
428 assert!(display.contains("Success Rate: 100.00%"));
429 }
430
431 #[test]
432 fn test_simulation_builder_with_network_config() {
433 let local_runtime = tokio::runtime::Builder::new_current_thread()
434 .enable_io()
435 .enable_time()
436 .build_local(Default::default())
437 .expect("Failed to build local runtime");
438
439 let report = local_runtime.block_on(async move {
440 SimulationBuilder::new()
441 .register_workload(
442 "network_test",
443 |_seed, _provider, _time_provider, _task_provider, _topology| async move {
444 Ok(SimulationMetrics {
445 simulated_time: Duration::from_millis(50),
446 events_processed: 10,
447 ..Default::default()
448 })
449 },
450 )
451 .set_iterations(2)
452 .set_debug_seeds(vec![42, 43])
453 .run()
454 .await
455 });
456
457 assert_eq!(report.iterations, 2);
458 assert_eq!(report.successful_runs, 2);
459 assert_eq!(report.failed_runs, 0);
460 assert_eq!(report.success_rate(), 100.0);
461
462 assert!(report.average_simulated_time() >= Duration::from_millis(50));
465 }
466
467 #[test]
468 fn test_multiple_workloads() {
469 let local_runtime = tokio::runtime::Builder::new_current_thread()
470 .enable_io()
471 .enable_time()
472 .build_local(Default::default())
473 .expect("Failed to build local runtime");
474
475 let report = local_runtime.block_on(async move {
476 SimulationBuilder::new()
477 .register_workload(
478 "workload1",
479 |random, _provider, _time_provider, _task_provider, _topology| async move {
480 Ok(SimulationMetrics {
481 simulated_time: Duration::from_millis(random.random_range(0..50)),
482 events_processed: random.random_range(0..5),
483 ..Default::default()
484 })
485 },
486 )
487 .register_workload(
488 "workload2",
489 |random, _provider, _time_provider, _task_provider, _topology| async move {
490 Ok(SimulationMetrics {
491 simulated_time: Duration::from_millis(random.random_range(0..50)),
492 events_processed: random.random_range(0..5),
493 ..Default::default()
494 })
495 },
496 )
497 .set_iterations(2)
498 .set_debug_seeds(vec![10, 20])
499 .run()
500 .await
501 });
502
503 assert_eq!(report.successful_runs, 2);
504 assert_eq!(report.failed_runs, 0);
505 assert_eq!(report.success_rate(), 100.0);
506 }
507}