1use crate::evidence::{Evidence, PerformanceMetrics};
6use apr_qa_gen::QaScenario;
7use rayon::prelude::*;
8use std::process::{Command, Stdio};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
11use std::time::Instant;
12
13#[derive(Debug, Clone)]
15pub struct ParallelConfig {
16 pub num_workers: usize,
18 pub timeout_ms: u64,
20 pub model_path: String,
22 pub stop_on_failure: bool,
24}
25
26impl Default for ParallelConfig {
27 fn default() -> Self {
28 Self {
29 num_workers: num_cpus::get().min(4),
30 timeout_ms: 60_000,
31 model_path: "model.gguf".to_string(),
32 stop_on_failure: false,
33 }
34 }
35}
36
37#[derive(Debug)]
39pub struct ParallelResult {
40 pub evidence: Vec<Evidence>,
42 pub passed: usize,
44 pub failed: usize,
46 pub skipped: usize,
48 pub duration_ms: u64,
50 pub stopped_early: bool,
52}
53
54pub struct ParallelExecutor {
56 config: ParallelConfig,
57}
58
59impl ParallelExecutor {
60 #[must_use]
62 pub fn new(config: ParallelConfig) -> Self {
63 rayon::ThreadPoolBuilder::new()
65 .num_threads(config.num_workers)
66 .build_global()
67 .ok(); Self { config }
69 }
70
71 #[must_use]
73 pub fn execute(&self, scenarios: &[QaScenario]) -> ParallelResult {
74 let start = Instant::now();
75 let stop_flag = Arc::new(AtomicBool::new(false));
76 let passed = Arc::new(AtomicUsize::new(0));
77 let failed = Arc::new(AtomicUsize::new(0));
78 let skipped = Arc::new(AtomicUsize::new(0));
79
80 let evidence: Vec<Evidence> = scenarios
81 .par_iter()
82 .filter_map(|scenario| {
83 if self.config.stop_on_failure && stop_flag.load(Ordering::Relaxed) {
85 skipped.fetch_add(1, Ordering::Relaxed);
86 return None;
87 }
88
89 let result = self.execute_single(scenario);
90
91 if result.outcome.is_pass() {
92 passed.fetch_add(1, Ordering::Relaxed);
93 } else {
94 failed.fetch_add(1, Ordering::Relaxed);
95 if self.config.stop_on_failure {
96 stop_flag.store(true, Ordering::Relaxed);
97 }
98 }
99
100 Some(result)
101 })
102 .collect();
103
104 ParallelResult {
105 evidence,
106 passed: passed.load(Ordering::Relaxed),
107 failed: failed.load(Ordering::Relaxed),
108 skipped: skipped.load(Ordering::Relaxed),
109 duration_ms: start.elapsed().as_millis() as u64,
110 stopped_early: stop_flag.load(Ordering::Relaxed),
111 }
112 }
113
114 fn execute_single(&self, scenario: &QaScenario) -> Evidence {
116 let start = Instant::now();
117
118 let (output, exit_code, stderr) = self.subprocess_execution(scenario);
119
120 let duration = start.elapsed().as_millis() as u64;
121 let gate_id = format!("F-{}-001", scenario.mqs_category());
122
123 if exit_code != 0 {
125 return Evidence::crashed(
126 &gate_id,
127 scenario.clone(),
128 "Non-zero exit code",
129 exit_code,
130 duration,
131 )
132 .with_stderr(stderr);
133 }
134
135 if duration > self.config.timeout_ms {
137 return Evidence::timeout(&gate_id, scenario.clone(), duration);
138 }
139
140 let oracle_result = scenario.evaluate(&output);
142
143 match oracle_result {
144 apr_qa_gen::OracleResult::Corroborated { evidence: reason } => {
145 Evidence::corroborated(&gate_id, scenario.clone(), &output, duration)
146 .with_metrics(PerformanceMetrics {
147 duration_ms: duration,
148 total_tokens: Some(estimate_tokens(&output)),
149 ..Default::default()
150 })
151 .with_reason(reason)
152 }
153 apr_qa_gen::OracleResult::Falsified {
154 reason,
155 evidence: _,
156 } => Evidence::falsified(&gate_id, scenario.clone(), reason, &output, duration),
157 }
158 }
159
160 fn subprocess_execution(&self, scenario: &QaScenario) -> (String, i32, Option<String>) {
162 let cmd_str = scenario.to_command(&self.config.model_path);
163 let parts: Vec<&str> = cmd_str.split_whitespace().collect();
164
165 if parts.is_empty() {
166 return (String::new(), -1, Some("Empty command".to_string()));
167 }
168
169 let result = Command::new(parts[0])
170 .args(&parts[1..])
171 .stdout(Stdio::piped())
172 .stderr(Stdio::piped())
173 .output();
174
175 match result {
176 Ok(output) => {
177 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
178 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
179 let exit_code = output.status.code().unwrap_or(-1);
180 (
181 stdout,
182 exit_code,
183 if stderr.is_empty() {
184 None
185 } else {
186 Some(stderr)
187 },
188 )
189 }
190 Err(e) => (String::new(), -1, Some(e.to_string())),
191 }
192 }
193}
194
195impl Default for ParallelExecutor {
196 fn default() -> Self {
197 Self::new(ParallelConfig::default())
198 }
199}
200
201fn estimate_tokens(text: &str) -> u32 {
203 (text.len() / 4).max(1) as u32
205}
206
207trait EvidenceExt {
209 fn with_stderr(self, stderr: Option<String>) -> Self;
210 fn with_reason(self, reason: String) -> Self;
211}
212
213impl EvidenceExt for Evidence {
214 fn with_stderr(mut self, stderr: Option<String>) -> Self {
215 self.stderr = stderr;
216 self
217 }
218
219 fn with_reason(mut self, reason: String) -> Self {
220 self.reason = reason;
221 self
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use apr_qa_gen::{Backend, Format, Modality, ModelId};
229
230 fn test_scenario() -> QaScenario {
231 QaScenario::new(
232 ModelId::new("test", "model"),
233 Modality::Run,
234 Backend::Cpu,
235 Format::Gguf,
236 "2+2=".to_string(),
237 42,
238 )
239 }
240
241 fn test_scenarios(count: usize) -> Vec<QaScenario> {
242 (0..count)
243 .map(|i| {
244 QaScenario::new(
245 ModelId::new("test", "model"),
246 Modality::Run,
247 Backend::Cpu,
248 Format::Gguf,
249 format!("What is {}+{}?", i, i + 1),
250 i as u64,
251 )
252 })
253 .collect()
254 }
255
256 #[test]
257 fn test_parallel_config_default() {
258 let config = ParallelConfig::default();
259 assert!(config.num_workers > 0);
260 assert_eq!(config.timeout_ms, 60_000);
261 }
262
263 #[test]
264 fn test_parallel_executor_single() {
265 let executor = ParallelExecutor::default();
266 let scenario = test_scenario();
267
268 let evidence = executor.execute_single(&scenario);
269 assert!(evidence.outcome.is_fail());
271 }
272
273 #[test]
274 fn test_parallel_executor_batch() {
275 let config = ParallelConfig {
276 num_workers: 2,
277 ..Default::default()
278 };
279 let executor = ParallelExecutor::new(config);
280 let scenarios = test_scenarios(10);
281
282 let result = executor.execute(&scenarios);
283
284 assert_eq!(result.evidence.len(), 10);
285 assert_eq!(result.passed + result.failed + result.skipped, 10);
286 }
287
288 #[test]
289 fn test_parallel_executor_stop_on_failure() {
290 let config = ParallelConfig {
291 num_workers: 1, stop_on_failure: true,
293 ..Default::default()
294 };
295 let executor = ParallelExecutor::new(config);
296
297 let mut scenarios = test_scenarios(5);
299 scenarios.insert(
300 2,
301 QaScenario::new(
302 ModelId::new("test", "model"),
303 Modality::Run,
304 Backend::Cpu,
305 Format::Gguf,
306 String::new(), 99,
308 ),
309 );
310
311 let result = executor.execute(&scenarios);
312
313 assert!(result.failed > 0);
315 }
316
317 #[test]
318 fn test_estimate_tokens() {
319 assert_eq!(estimate_tokens(""), 1);
320 assert_eq!(estimate_tokens("test"), 1);
321 assert_eq!(estimate_tokens("hello world this is a test"), 6);
322 }
323
324 #[test]
325 fn test_parallel_result_fields() {
326 let result = ParallelResult {
327 evidence: vec![],
328 passed: 5,
329 failed: 2,
330 skipped: 1,
331 duration_ms: 1000,
332 stopped_early: false,
333 };
334
335 assert_eq!(result.passed, 5);
336 assert_eq!(result.failed, 2);
337 assert_eq!(result.skipped, 1);
338 assert!(!result.stopped_early);
339 }
340
341 #[test]
342 fn test_parallel_executor_default() {
343 let executor = ParallelExecutor::default();
344 let scenarios = test_scenarios(3);
345 let result = executor.execute(&scenarios);
346 assert_eq!(result.evidence.len(), 3);
347 }
348
349 #[test]
350 fn test_evidence_ext_with_stderr() {
351 let scenario = test_scenario();
352 let evidence = Evidence::corroborated("F-TEST-001", scenario, "output", 100);
353 let with_stderr = evidence.with_stderr(Some("error output".to_string()));
354 assert_eq!(with_stderr.stderr, Some("error output".to_string()));
355 }
356
357 #[test]
358 fn test_evidence_ext_with_reason() {
359 let scenario = test_scenario();
360 let evidence = Evidence::corroborated("F-TEST-001", scenario, "output", 100);
361 let with_reason = evidence.with_reason("test reason".to_string());
362 assert_eq!(with_reason.reason, "test reason");
363 }
364
365 #[test]
366 fn test_parallel_config_clone() {
367 let config = ParallelConfig::default();
368 let cloned = config.clone();
369 assert_eq!(cloned.num_workers, config.num_workers);
370 assert_eq!(cloned.timeout_ms, config.timeout_ms);
371 }
372
373 #[test]
374 fn test_parallel_config_debug() {
375 let config = ParallelConfig::default();
376 let debug_str = format!("{config:?}");
377 assert!(debug_str.contains("ParallelConfig"));
378 }
379
380 #[test]
381 fn test_parallel_result_debug() {
382 let result = ParallelResult {
383 evidence: vec![],
384 passed: 0,
385 failed: 0,
386 skipped: 0,
387 duration_ms: 0,
388 stopped_early: false,
389 };
390 let debug_str = format!("{result:?}");
391 assert!(debug_str.contains("ParallelResult"));
392 }
393
394 #[test]
395 fn test_execute_single_failing() {
396 let executor = ParallelExecutor::default();
397 let scenario = QaScenario::new(
398 ModelId::new("test", "model"),
399 Modality::Run,
400 Backend::Cpu,
401 Format::Gguf,
402 String::new(), 42,
404 );
405
406 let evidence = executor.execute_single(&scenario);
407 assert!(evidence.outcome.is_fail() || evidence.output.is_empty());
409 }
410
411 #[test]
412 fn test_parallel_collect_all() {
413 let config = ParallelConfig {
414 num_workers: 2,
415 stop_on_failure: false,
416 ..Default::default()
417 };
418 let executor = ParallelExecutor::new(config);
419
420 let mut scenarios = test_scenarios(5);
421 scenarios.push(QaScenario::new(
422 ModelId::new("test", "model"),
423 Modality::Run,
424 Backend::Cpu,
425 Format::Gguf,
426 String::new(), 99,
428 ));
429
430 let result = executor.execute(&scenarios);
431 assert_eq!(result.evidence.len(), 6);
433 }
434
435 #[test]
436 fn test_parallel_executor_with_custom_model_path() {
437 let config = ParallelConfig {
438 model_path: "custom/model.gguf".to_string(),
439 ..Default::default()
440 };
441 let executor = ParallelExecutor::new(config);
442 assert_eq!(executor.config.model_path, "custom/model.gguf");
443 }
444
445 #[test]
446 fn test_parallel_executor_with_timeout() {
447 let config = ParallelConfig {
448 timeout_ms: 1000,
449 ..Default::default()
450 };
451 let executor = ParallelExecutor::new(config);
452 assert_eq!(executor.config.timeout_ms, 1000);
453 }
454
455 #[test]
456 fn test_execute_single_without_binary() {
457 let executor = ParallelExecutor::default();
458 let scenario = QaScenario::new(
459 ModelId::new("test", "model"),
460 Modality::Run,
461 Backend::Cpu,
462 Format::Gguf,
463 "2+2=".to_string(),
464 42,
465 );
466
467 let evidence = executor.execute_single(&scenario);
468 assert!(evidence.outcome.is_fail());
470 }
471
472 #[test]
473 fn test_execute_single_falsified() {
474 let executor = ParallelExecutor::default();
475 let scenario = QaScenario::new(
476 ModelId::new("test", "model"),
477 Modality::Run,
478 Backend::Cpu,
479 Format::Gguf,
480 String::new(), 42,
482 );
483
484 let evidence = executor.execute_single(&scenario);
485 assert!(evidence.output.is_empty() || evidence.outcome.is_fail());
487 }
488
489 #[test]
490 fn test_parallel_batch_without_binary() {
491 let executor = ParallelExecutor::default();
492 let scenarios: Vec<_> = (0..5)
493 .map(|i| {
494 QaScenario::new(
495 ModelId::new("test", "model"),
496 Modality::Run,
497 Backend::Cpu,
498 Format::Gguf,
499 format!("What is 2+{}?", i),
500 i as u64,
501 )
502 })
503 .collect();
504
505 let result = executor.execute(&scenarios);
506 assert_eq!(result.evidence.len(), 5);
507 assert_eq!(result.failed, 5);
509 }
510
511 #[test]
512 fn test_parallel_with_mixed_modalities() {
513 let executor = ParallelExecutor::default();
514 let scenarios = vec![
515 QaScenario::new(
516 ModelId::new("test", "model"),
517 Modality::Run,
518 Backend::Cpu,
519 Format::Gguf,
520 "2+2=".to_string(),
521 1,
522 ),
523 QaScenario::new(
524 ModelId::new("test", "model"),
525 Modality::Chat,
526 Backend::Cpu,
527 Format::Gguf,
528 "Hello".to_string(),
529 2,
530 ),
531 ];
532
533 let result = executor.execute(&scenarios);
534 assert_eq!(result.evidence.len(), 2);
535 }
536
537 #[test]
538 fn test_parallel_result_stopped_early() {
539 let result = ParallelResult {
540 evidence: vec![],
541 passed: 3,
542 failed: 1,
543 skipped: 6,
544 duration_ms: 500,
545 stopped_early: true,
546 };
547 assert!(result.stopped_early);
548 assert_eq!(result.skipped, 6);
549 }
550
551 #[test]
552 fn test_parallel_empty_scenarios() {
553 let executor = ParallelExecutor::default();
554 let result = executor.execute(&[]);
555 assert_eq!(result.evidence.len(), 0);
556 assert_eq!(result.passed, 0);
557 assert_eq!(result.failed, 0);
558 }
559
560 #[test]
561 fn test_evidence_ext_with_stderr_none() {
562 let scenario = test_scenario();
563 let evidence = Evidence::corroborated("F-TEST-001", scenario, "output", 100);
564 let with_stderr = evidence.with_stderr(None);
565 assert!(with_stderr.stderr.is_none());
566 }
567
568 #[test]
569 fn test_parallel_config_single_worker() {
570 let config = ParallelConfig {
571 num_workers: 1,
572 ..Default::default()
573 };
574 let executor = ParallelExecutor::new(config);
575 let scenarios = test_scenarios(3);
576 let result = executor.execute(&scenarios);
577 assert_eq!(result.evidence.len(), 3);
578 }
579
580 #[test]
581 fn test_estimate_tokens_longer_text() {
582 let tokens = estimate_tokens("This is a longer string.");
584 assert!(tokens >= 5);
585 }
586
587 #[test]
588 fn test_parallel_batch_single_scenario() {
589 let executor = ParallelExecutor::default();
590 let scenarios = vec![test_scenario()];
591 let result = executor.execute(&scenarios);
592 assert_eq!(result.evidence.len(), 1);
593 }
594
595 #[test]
596 fn test_parallel_result_all_fields() {
597 let scenario = test_scenario();
598 let evidence = Evidence::corroborated("F-TEST-001", scenario, "output", 50);
599
600 let result = ParallelResult {
601 evidence: vec![evidence],
602 passed: 1,
603 failed: 0,
604 skipped: 0,
605 duration_ms: 100,
606 stopped_early: false,
607 };
608
609 assert_eq!(result.evidence.len(), 1);
610 assert_eq!(result.duration_ms, 100);
611 }
612
613 #[test]
614 fn test_parallel_config_all_fields() {
615 let config = ParallelConfig {
616 num_workers: 8,
617 timeout_ms: 30_000,
618 model_path: "/custom/path/model.gguf".to_string(),
619 stop_on_failure: true,
620 };
621
622 assert_eq!(config.num_workers, 8);
623 assert_eq!(config.timeout_ms, 30_000);
624 assert!(config.stop_on_failure);
625 assert!(config.model_path.contains("custom"));
626 }
627
628 #[test]
639 fn test_timeout_enforcement_marks_f_int_002_falsified() {
640 use crate::evidence::Outcome;
641 use crate::patterns::{IntegrityChecker, SpecGate};
642
643 let timed_out = true;
645 let exit_code = None; let has_output = false;
647
648 let result = IntegrityChecker::check_process_termination(exit_code, timed_out, has_output);
650
651 assert_eq!(result.gate_id, SpecGate::IntProcessTermination.id());
652 assert_eq!(result.gate_id, "F-INT-002");
653 assert!(!result.passed, "Timeout should mark F-INT-002 as FALSIFIED");
654 assert!(
655 result.description.contains("timed out"),
656 "Description should mention timeout: {}",
657 result.description
658 );
659
660 let evidence = Evidence::timeout(
662 SpecGate::IntProcessTermination.id(),
663 test_scenario(),
664 61_000, );
666 assert!(
667 matches!(evidence.outcome, Outcome::Timeout),
668 "Evidence should have Timeout outcome"
669 );
670 }
671
672 #[test]
674 fn test_timeout_config_enforcement() {
675 let config = ParallelConfig {
677 timeout_ms: 1, ..Default::default()
679 };
680
681 let executor = ParallelExecutor::new(config);
682 assert_eq!(executor.config.timeout_ms, 1);
683
684 let evidence = Evidence::timeout("F-INT-002", test_scenario(), 61_000);
686 assert!(!evidence.outcome.is_pass());
687 }
688
689 #[test]
690 fn test_subprocess_execution_empty_command() {
691 let executor = ParallelExecutor::new(ParallelConfig::default());
692
693 let scenario = QaScenario::new(
695 ModelId::new("test", "model"),
696 Modality::Run,
697 Backend::Cpu,
698 Format::Gguf,
699 "test".to_string(),
700 42,
701 );
702
703 let (_, exit_code, stderr) = executor.subprocess_execution(&scenario);
705 assert!(exit_code != 0 || stderr.is_some());
707 }
708
709 #[test]
710 fn test_evidence_with_metrics() {
711 let scenario = test_scenario();
712 let evidence = Evidence::corroborated("F-TEST-001", scenario, "output", 100);
713 let with_metrics = evidence.with_metrics(PerformanceMetrics {
714 duration_ms: 500,
715 tokens_per_second: Some(10.0),
716 ..Default::default()
717 });
718 assert_eq!(with_metrics.metrics.duration_ms, 500);
719 }
720
721 #[test]
722 fn test_parallel_result_with_stopped_early() {
723 let result = ParallelResult {
724 evidence: vec![],
725 passed: 2,
726 failed: 1,
727 skipped: 7,
728 duration_ms: 100,
729 stopped_early: true,
730 };
731 assert!(result.stopped_early);
732 assert_eq!(result.skipped, 7);
733 }
734
735 #[test]
736 fn test_parallel_executor_execute_with_subprocess_mode() {
737 let config = ParallelConfig {
739 num_workers: 1,
740 model_path: "/nonexistent/path.gguf".to_string(),
741 stop_on_failure: true,
742 ..Default::default()
743 };
744 let executor = ParallelExecutor::new(config);
745
746 let result = executor.execute(&[]);
748 assert_eq!(result.evidence.len(), 0);
749 }
750}