1use std::{
2 collections::hash_map::DefaultHasher,
3 collections::BTreeMap,
4 ffi::OsStr,
5 fs,
6 hash::{Hash, Hasher},
7 path::Path,
8 process::{Command, Stdio},
9 thread,
10 time::{Duration, Instant},
11};
12
13use anyhow::{anyhow, Context, Result};
14use camino::Utf8PathBuf;
15use serde::Serialize;
16use tree_sitter::{Node, Parser};
17use veritas_core::{
18 config::{MutationConfig, PythonPluginConfig},
19 persist_mutation_record_artifacts, start_mutation_run,
20};
21use veritas_plugin_api::{
22 finalize_mutation_metrics, mutation_taxonomy, ArtifactKind, ArtifactStatus, BehaviorReplayCase,
23 BehaviorReplayObservation, BehaviorReplayStatus, CommandRecord, CoverageFile, CoverageReport,
24 Failure, FailureSeverity, GeneratedArtifact, LanguagePlugin, LineRange, MutationAttribution,
25 MutationRecord, MutationStatus, PluginCapability, ProjectInfo, ReproCase, RiskLevel, RunStatus,
26 SourceSpan, TargetKind, TestRunResult, VerificationPlan, VerificationQuality,
27 VerificationStrategy, VerificationTarget,
28};
29use walkdir::WalkDir;
30
31#[derive(Debug, Clone)]
32pub struct PythonPlugin {
33 config: PythonPluginConfig,
34}
35
36#[derive(Debug, Clone)]
37struct PythonFunction {
38 path: Utf8PathBuf,
39 name: String,
40 symbol: String,
41 owner: Option<String>,
42 params: Vec<String>,
43 signature: String,
44 line_range: LineRange,
45 start_byte: usize,
46 end_byte: usize,
47 calls: Vec<String>,
48}
49
50#[derive(Debug, Clone)]
51struct PythonMutationCandidate {
52 path: Utf8PathBuf,
53 function: String,
54 label: String,
55 from: String,
56 to: String,
57 start_byte: usize,
58 end_byte: usize,
59 line_range: LineRange,
60}
61
62#[derive(Debug, Clone, Serialize)]
63struct PythonSymbolGraph<'a> {
64 target_id: &'a str,
65 symbols: Vec<PythonSymbolNode<'a>>,
66}
67
68#[derive(Debug, Clone, Serialize)]
69struct PythonSymbolNode<'a> {
70 id: String,
71 path: &'a Utf8PathBuf,
72 symbol: &'a str,
73 name: &'a str,
74 owner: Option<&'a str>,
75 signature: &'a str,
76 line_range: &'a LineRange,
77 risk: RiskLevel,
78 calls: &'a [String],
79}
80
81impl PythonPlugin {
82 pub fn new(config: PythonPluginConfig) -> Self {
83 Self { config }
84 }
85}
86
87impl LanguagePlugin for PythonPlugin {
88 fn id(&self) -> &'static str {
89 "python"
90 }
91
92 fn display_name(&self) -> &'static str {
93 "Python"
94 }
95
96 fn capabilities(&self) -> Vec<PluginCapability> {
97 vec![
98 PluginCapability::TargetDiscovery,
99 PluginCapability::SymbolGraph,
100 PluginCapability::GeneratedTests,
101 PluginCapability::ExistingTests,
102 PluginCapability::MutationChecks,
103 PluginCapability::DifferentialReplay,
104 PluginCapability::RegressionPromotion,
105 PluginCapability::ResourceBudgets,
106 ]
107 }
108
109 fn detect_project(&self, root: &Path) -> Result<ProjectInfo> {
110 if !root.join("pyproject.toml").exists()
111 && !root.join("setup.py").exists()
112 && !contains_python_file(root)
113 {
114 return Err(anyhow!("Python project not found"));
115 }
116 let name = root
117 .file_name()
118 .and_then(OsStr::to_str)
119 .unwrap_or("python-project")
120 .to_string();
121 let manifests = ["pyproject.toml", "setup.py"]
122 .into_iter()
123 .filter(|path| root.join(path).exists())
124 .map(Utf8PathBuf::from)
125 .collect();
126 Ok(ProjectInfo {
127 language: "python".to_string(),
128 name,
129 root: utf8_path(root)?,
130 manifests,
131 })
132 }
133
134 fn discover_targets(&self, root: &Path) -> Result<Vec<VerificationTarget>> {
135 let mut targets = vec![VerificationTarget {
136 id: "python:project".to_string(),
137 language: "python".to_string(),
138 kind: TargetKind::Project,
139 path: Utf8PathBuf::from("."),
140 symbol: None,
141 signature: None,
142 line_range: None,
143 description: "Python project".to_string(),
144 risk: RiskLevel::Medium,
145 }];
146
147 for function in discover_functions(root)? {
148 targets.push(VerificationTarget {
149 id: format!("python:{}:{}", function.path, function.symbol),
150 language: "python".to_string(),
151 kind: TargetKind::Function,
152 path: function.path.clone(),
153 symbol: Some(function.symbol.clone()),
154 signature: Some(function.signature.clone()),
155 line_range: Some(function.line_range.clone()),
156 description: if let Some(owner) = &function.owner {
157 format!("Python method {owner}.{}", function.name)
158 } else {
159 format!("Python function {}", function.name)
160 },
161 risk: infer_risk(&function.symbol),
162 });
163 }
164
165 Ok(targets)
166 }
167
168 fn generate_tests(
169 &self,
170 target: &VerificationTarget,
171 plan: &VerificationPlan,
172 ) -> Result<Vec<GeneratedArtifact>> {
173 let root = std::env::current_dir()?;
174 let functions = discover_functions(&root).unwrap_or_default();
175 let mut artifacts = vec![symbol_graph_artifact(
176 &target.id,
177 &target.path,
178 target.symbol.as_deref(),
179 &functions,
180 )?];
181 if plan
182 .strategies
183 .contains(&VerificationStrategy::MutationChecks)
184 {
185 artifacts.push(python_mutation_manifest_artifact(
186 target,
187 target.symbol.as_deref(),
188 &functions,
189 )?);
190 }
191 if plan
192 .strategies
193 .contains(&VerificationStrategy::PropertyTests)
194 {
195 if let Some(artifact) =
196 python_hypothesis_property_artifact(target, target.symbol.as_deref(), &functions)?
197 {
198 artifacts.push(artifact);
199 }
200 }
201 Ok(artifacts)
202 }
203
204 fn run_tests(
205 &self,
206 root: &Path,
207 artifacts: &[GeneratedArtifact],
208 _plan: &VerificationPlan,
209 ) -> Result<TestRunResult> {
210 let start = Instant::now();
211 let (runner_name, args) = python_test_args(root);
212 let command = run_command(
213 root,
214 "python3",
215 args.iter().map(String::as_str),
216 self.config.command_timeout_seconds,
217 )?;
218 let status = command.status.clone();
219 let failures = if command.status == RunStatus::Failed {
220 vec![Failure {
221 id: None,
222 message: format!("python {runner_name} failed"),
223 severity: FailureSeverity::Error,
224 target_id: artifacts.first().map(|artifact| artifact.target_id.clone()),
225 artifact_id: artifacts.first().map(|artifact| artifact.id.clone()),
226 command: command_line(&command.program, &command.args),
227 stdout_excerpt: excerpt(&command.stdout),
228 stderr_excerpt: excerpt(&command.stderr),
229 repro: Some(ReproCase {
230 command: command_line(&command.program, &command.args),
231 input: None,
232 path: None,
233 }),
234 }]
235 } else {
236 Vec::new()
237 };
238 let mut commands = vec![command];
239 let mut failures = failures;
240 let mut status = status;
241 if artifacts
242 .iter()
243 .any(|artifact| artifact.kind == ArtifactKind::PropertyTest)
244 {
245 let property = run_python_property_artifacts(root, artifacts, &self.config)?;
246 if property.status == RunStatus::Failed {
247 status = RunStatus::Failed;
248 }
249 commands.extend(property.commands);
250 failures.extend(property.failures);
251 }
252 let mutation = if artifacts
253 .iter()
254 .any(|artifact| artifact.kind == ArtifactKind::MutationCheck)
255 {
256 let mutation = run_python_mutations(root, artifacts, &self.config)?;
257 if mutation.status == RunStatus::Failed {
258 status = RunStatus::Failed;
259 }
260 commands.extend(mutation.commands);
261 failures.extend(mutation.failures);
262 mutation.quality
263 } else {
264 VerificationQuality::default()
265 };
266
267 Ok(TestRunResult {
268 language: "python".to_string(),
269 status,
270 commands,
271 failures,
272 duration_ms: start.elapsed().as_millis(),
273 quality: mutation,
274 })
275 }
276
277 fn collect_coverage(&self, root: &Path) -> Result<Option<CoverageReport>> {
278 if !self.config.coverage_enabled {
279 return Ok(Some(CoverageReport {
280 tool: "python coverage".to_string(),
281 summary: "not collected: disabled by Python plugin config".to_string(),
282 files: vec![],
283 }));
284 }
285 if !python_module_available(root, "coverage") {
286 return Ok(Some(CoverageReport {
287 tool: "python coverage.py".to_string(),
288 summary: "not collected: coverage.py is not available".to_string(),
289 files: vec![],
290 }));
291 }
292
293 let veritas_dir = root.join(".veritas");
294 fs::create_dir_all(&veritas_dir)
295 .with_context(|| format!("failed to create {}", veritas_dir.display()))?;
296 let data_file = ".veritas/.coverage";
297 let json_file = ".veritas/python_coverage.json";
298 let (_, test_args) = python_test_args(root);
299 let mut run_args = vec![
300 "-m".to_string(),
301 "coverage".to_string(),
302 "run".to_string(),
303 "--data-file".to_string(),
304 data_file.to_string(),
305 ];
306 run_args.extend(test_args);
307 let run = run_command(
308 root,
309 "python3",
310 run_args.iter().map(String::as_str),
311 self.config.command_timeout_seconds,
312 )?;
313 if run.status == RunStatus::Failed {
314 return Ok(Some(CoverageReport {
315 tool: "python coverage.py".to_string(),
316 summary: format!("not collected: {}", excerpt(&run.stderr)),
317 files: vec![],
318 }));
319 }
320 let json_args = [
321 "-m",
322 "coverage",
323 "json",
324 "--data-file",
325 data_file,
326 "-o",
327 json_file,
328 ];
329 let json = run_command(
330 root,
331 "python3",
332 json_args,
333 self.config.command_timeout_seconds,
334 )?;
335 if json.status == RunStatus::Failed {
336 return Ok(Some(CoverageReport {
337 tool: "python coverage.py".to_string(),
338 summary: format!("not collected: {}", excerpt(&json.stderr)),
339 files: vec![],
340 }));
341 }
342 Ok(Some(python_coverage_report(root, &root.join(json_file))?))
343 }
344
345 fn replay_behavior(
346 &self,
347 root: &Path,
348 target: &VerificationTarget,
349 case: &BehaviorReplayCase,
350 ) -> Result<Option<BehaviorReplayObservation>> {
351 let functions = discover_functions(root)?;
352 let Some(function) = functions
353 .iter()
354 .find(|function| python_target_matches_function(&target.id, function))
355 else {
356 return Ok(None);
357 };
358 replay_python_function(root, function, case, &self.config).map(Some)
359 }
360
361 fn replay_behaviors(
362 &self,
363 root: &Path,
364 target: &VerificationTarget,
365 cases: &[BehaviorReplayCase],
366 ) -> Result<BTreeMap<String, BehaviorReplayObservation>> {
367 let functions = discover_functions(root)?;
368 let Some(function) = functions
369 .iter()
370 .find(|function| python_target_matches_function(&target.id, function))
371 else {
372 return Ok(BTreeMap::new());
373 };
374 replay_python_function_batch(root, function, cases, &self.config)
375 }
376}
377
378fn discover_functions(root: &Path) -> Result<Vec<PythonFunction>> {
379 let mut parser = Parser::new();
380 parser
381 .set_language(&tree_sitter_python::LANGUAGE.into())
382 .map_err(|error| anyhow!("failed to load tree-sitter Python grammar: {error}"))?;
383
384 let mut functions = Vec::new();
385 for entry in WalkDir::new(root)
386 .into_iter()
387 .filter_entry(|entry| !is_ignored(entry.path(), entry.file_name()))
388 {
389 let entry = entry?;
390 if !entry.file_type().is_file()
391 || entry.path().extension() != Some(OsStr::new("py"))
392 || is_python_test_file(entry.path())
393 {
394 continue;
395 }
396 let contents = fs::read_to_string(entry.path())
397 .with_context(|| format!("failed to read {}", entry.path().display()))?;
398 let tree = parser
399 .parse(&contents, None)
400 .ok_or_else(|| anyhow!("failed to parse Python source {}", entry.path().display()))?;
401 let path = relative_utf8(root, entry.path())?;
402 collect_python_functions(tree.root_node(), &contents, &path, &mut functions)?;
403 }
404 Ok(functions)
405}
406
407fn collect_python_functions(
408 node: Node<'_>,
409 source: &str,
410 path: &Utf8PathBuf,
411 functions: &mut Vec<PythonFunction>,
412) -> Result<()> {
413 if matches!(
414 node.kind(),
415 "function_definition" | "async_function_definition"
416 ) {
417 if let Some(function) = parse_python_function(node, source, path)? {
418 functions.push(function);
419 }
420 }
421
422 let mut cursor = node.walk();
423 for child in node.children(&mut cursor) {
424 collect_python_functions(child, source, path, functions)?;
425 }
426 Ok(())
427}
428
429fn parse_python_function(
430 node: Node<'_>,
431 source: &str,
432 path: &Utf8PathBuf,
433) -> Result<Option<PythonFunction>> {
434 let Some(name_node) = node.child_by_field_name("name") else {
435 return Ok(None);
436 };
437 let name = node_text(name_node, source)?.to_string();
438 if name.starts_with('_') {
439 return Ok(None);
440 }
441 let owner = python_function_owner(node, source)?;
442 let symbol = owner
443 .as_ref()
444 .map(|owner| format!("{owner}.{name}"))
445 .unwrap_or_else(|| name.clone());
446 let signature = signature_text(node, source)?.trim().to_string();
447 let params = node
448 .child_by_field_name("parameters")
449 .map(|parameters| parse_python_params(node_text(parameters, source).unwrap_or_default()))
450 .unwrap_or_default();
451 Ok(Some(PythonFunction {
452 path: path.clone(),
453 name,
454 symbol,
455 owner,
456 params,
457 signature,
458 line_range: LineRange {
459 start: node.start_position().row + 1,
460 end: node.end_position().row + 1,
461 },
462 start_byte: node.start_byte(),
463 end_byte: node.end_byte(),
464 calls: python_calls_in_function(node, source)?,
465 }))
466}
467
468fn python_function_owner(node: Node<'_>, source: &str) -> Result<Option<String>> {
469 let mut current = node.parent();
470 while let Some(parent) = current {
471 if parent.kind() == "class_definition" {
472 return Ok(parent
473 .child_by_field_name("name")
474 .map(|node| node_text(node, source).unwrap_or_default().to_string()));
475 }
476 current = parent.parent();
477 }
478 Ok(None)
479}
480
481fn parse_python_params(params: &str) -> Vec<String> {
482 params
483 .trim()
484 .trim_start_matches('(')
485 .trim_end_matches(')')
486 .split(',')
487 .filter_map(|param| {
488 let name = param
489 .split([':', '='])
490 .next()
491 .unwrap_or_default()
492 .trim()
493 .trim_start_matches('*');
494 (!name.is_empty()).then_some(name.to_string())
495 })
496 .collect()
497}
498
499fn python_calls_in_function(node: Node<'_>, source: &str) -> Result<Vec<String>> {
500 let mut calls = Vec::new();
501 collect_python_calls(node, source, &mut calls)?;
502 calls.sort();
503 calls.dedup();
504 Ok(calls)
505}
506
507fn collect_python_calls(node: Node<'_>, source: &str, calls: &mut Vec<String>) -> Result<()> {
508 if node.kind() == "call" {
509 if let Some(function) = node.child_by_field_name("function") {
510 let text = node_text(function, source)?.trim();
511 if !text.is_empty() {
512 calls.push(text.to_string());
513 }
514 }
515 }
516 let mut cursor = node.walk();
517 for child in node.children(&mut cursor) {
518 collect_python_calls(child, source, calls)?;
519 }
520 Ok(())
521}
522
523fn symbol_graph_artifact(
524 target_id: &str,
525 target_path: &Utf8PathBuf,
526 target_symbol: Option<&str>,
527 functions: &[PythonFunction],
528) -> Result<GeneratedArtifact> {
529 let symbols = functions
530 .iter()
531 .filter(|function| {
532 if let Some(symbol) = target_symbol {
533 function.symbol == symbol && function.path == *target_path
534 } else {
535 target_path.as_str() == "." || function.path == *target_path
536 }
537 })
538 .map(|function| PythonSymbolNode {
539 id: format!("python:{}:{}", function.path, function.symbol),
540 path: &function.path,
541 symbol: &function.symbol,
542 name: &function.name,
543 owner: function.owner.as_deref(),
544 signature: &function.signature,
545 line_range: &function.line_range,
546 risk: infer_risk(&function.symbol),
547 calls: &function.calls,
548 })
549 .collect::<Vec<_>>();
550 let contents = serde_json::to_string_pretty(&PythonSymbolGraph { target_id, symbols })?;
551 Ok(GeneratedArtifact {
552 id: format!("python-symbol-{}", safe_ident(target_id)),
553 language: "python".to_string(),
554 kind: ArtifactKind::SymbolGraph,
555 target_id: target_id.to_string(),
556 path: Utf8PathBuf::from(format!(
557 ".veritas/symbols/python_{}.json",
558 safe_ident(target_id)
559 )),
560 contents,
561 description: "Tree-sitter Python symbol graph for selected target".to_string(),
562 status: ArtifactStatus::Planned,
563 })
564}
565
566fn python_mutation_manifest_artifact(
567 target: &VerificationTarget,
568 target_symbol: Option<&str>,
569 functions: &[PythonFunction],
570) -> Result<GeneratedArtifact> {
571 let selected = functions
572 .iter()
573 .filter(|function| {
574 if let Some(symbol) = target_symbol {
575 function.symbol == symbol && function.path == target.path
576 } else {
577 target.path.as_str() == "." || function.path == target.path
578 }
579 })
580 .map(|function| {
581 serde_json::json!({
582 "id": format!("python:{}:{}", function.path, function.symbol),
583 "path": &function.path,
584 "symbol": &function.symbol,
585 "signature": &function.signature,
586 "line_range": &function.line_range,
587 "risk": infer_risk(&function.symbol),
588 "operators": [
589 "comparison_boundary",
590 "boolean_guard",
591 "string_normalization",
592 "exception_path",
593 "numeric_limit"
594 ],
595 })
596 })
597 .collect::<Vec<_>>();
598 let contents = serde_json::to_string_pretty(&serde_json::json!({
599 "version": 1,
600 "language": "python",
601 "mode": "mutation_manifest",
602 "target_id": target.id,
603 "status": "planned",
604 "targets": selected,
605 "next_step": "Python mutation execution uses this manifest as the stable target/operator contract for generated tests and AI repair loops.",
606 }))?;
607 Ok(GeneratedArtifact {
608 id: format!("python-mutation-{}", safe_ident(&target.id)),
609 language: "python".to_string(),
610 kind: ArtifactKind::MutationCheck,
611 target_id: target.id.clone(),
612 path: Utf8PathBuf::from(format!(
613 ".veritas/mutations/python_{}.json",
614 safe_ident(&target.id)
615 )),
616 contents,
617 description: "Tree-sitter Python mutation target/operator manifest".to_string(),
618 status: ArtifactStatus::Planned,
619 })
620}
621
622fn python_hypothesis_property_artifact(
623 target: &VerificationTarget,
624 target_symbol: Option<&str>,
625 functions: &[PythonFunction],
626) -> Result<Option<GeneratedArtifact>> {
627 let selected = functions
628 .iter()
629 .filter(|function| {
630 if let Some(symbol) = target_symbol {
631 function.symbol == symbol && function.path == target.path
632 } else {
633 target.path.as_str() == "." || function.path == target.path
634 }
635 })
636 .filter(|function| {
637 !function.params.is_empty()
638 && function.owner.is_none()
639 && function.params.len() <= 3
640 && function
641 .params
642 .iter()
643 .all(|param| !matches!(param.as_str(), "self" | "cls"))
644 })
645 .take(4)
646 .collect::<Vec<_>>();
647 if selected.is_empty() {
648 return Ok(None);
649 }
650
651 let mut contents = String::from(
652 "# Generated by veritas. Review before committing.\n\
653 # Requires: python3 -m pip install hypothesis\n\
654 \n\
655 from hypothesis import given, strategies as st\n\
656 \n\
657 def _veritas_observe(call):\n\
658 try:\n\
659 return (\"ok\", call())\n\
660 except Exception as exc:\n\
661 return (\"exception\", type(exc).__name__, str(exc))\n\
662 \n",
663 );
664 for function in selected {
665 let module = python_module_name(&function.path);
666 let name = &function.name;
667 contents.push_str(&format!("from {module} import {name}\n"));
668 let params = function.params.join(", ");
669 let strategies = function
670 .params
671 .iter()
672 .map(|param| format!("{param}={}", hypothesis_strategy_for_param(param)))
673 .collect::<Vec<_>>()
674 .join(", ");
675 contents.push_str(&format!(
676 "\n@given({strategies})\n\
677 def test_veritas_{name}_does_not_panic({params}):\n\
678 first = _veritas_observe(lambda: {name}({params}))\n\
679 second = _veritas_observe(lambda: {name}({params}))\n\
680 assert first == second # is_deterministic\n\
681 assert first[0] in (\"ok\", \"exception\") # prop_assert does_not_panic\n"
682 ));
683 }
684
685 Ok(Some(GeneratedArtifact {
686 id: format!("python-property-{}", safe_ident(&target.id)),
687 language: "python".to_string(),
688 kind: ArtifactKind::PropertyTest,
689 target_id: target.id.clone(),
690 path: Utf8PathBuf::from(format!(
691 ".veritas/properties/python_{}.py",
692 safe_ident(&target.id)
693 )),
694 contents,
695 description: "Reviewable Hypothesis property candidates for Python targets".to_string(),
696 status: ArtifactStatus::Planned,
697 }))
698}
699
700fn hypothesis_strategy_for_param(param: &str) -> String {
701 let lowered = param.to_ascii_lowercase();
702 if lowered.contains("cents")
703 || lowered.contains("amount")
704 || lowered.contains("total")
705 || lowered.contains("count")
706 || lowered.contains("limit")
707 || lowered.contains("price")
708 {
709 "st.integers(min_value=-1_000_000, max_value=1_000_000)".to_string()
710 } else if lowered.contains("enabled")
711 || lowered.starts_with("is_")
712 || lowered.starts_with("has_")
713 || lowered.starts_with("can_")
714 {
715 "st.booleans()".to_string()
716 } else {
717 "st.text(max_size=64)".to_string()
718 }
719}
720
721fn python_module_name(path: &Utf8PathBuf) -> String {
722 path.as_str()
723 .trim_end_matches(".py")
724 .replace(['/', '\\'], ".")
725 .trim_end_matches(".__init__")
726 .to_string()
727}
728
729fn run_python_property_artifacts(
730 root: &Path,
731 artifacts: &[GeneratedArtifact],
732 config: &PythonPluginConfig,
733) -> Result<TestRunResult> {
734 let start = Instant::now();
735 let property_paths = artifacts
736 .iter()
737 .filter(|artifact| artifact.kind == ArtifactKind::PropertyTest)
738 .map(|artifact| artifact.path.to_string())
739 .collect::<Vec<_>>();
740 if property_paths.is_empty() {
741 return Ok(TestRunResult {
742 language: "python".to_string(),
743 status: RunStatus::Skipped,
744 commands: vec![],
745 failures: vec![],
746 duration_ms: 0,
747 quality: VerificationQuality::default(),
748 });
749 }
750 if !python_module_available(root, "hypothesis") {
751 return Ok(TestRunResult {
752 language: "python".to_string(),
753 status: RunStatus::Skipped,
754 commands: vec![skipped_python_command(
755 root,
756 vec![
757 "-m".to_string(),
758 "pytest".to_string(),
759 "-q".to_string(),
760 ".veritas/properties".to_string(),
761 ],
762 "skipped: hypothesis is not installed",
763 )?],
764 failures: vec![],
765 duration_ms: start.elapsed().as_millis(),
766 quality: VerificationQuality::default(),
767 });
768 }
769 if !python_module_available(root, "pytest") {
770 return Ok(TestRunResult {
771 language: "python".to_string(),
772 status: RunStatus::Skipped,
773 commands: vec![skipped_python_command(
774 root,
775 vec![
776 "-m".to_string(),
777 "pytest".to_string(),
778 "-q".to_string(),
779 ".veritas/properties".to_string(),
780 ],
781 "skipped: pytest is not installed",
782 )?],
783 failures: vec![],
784 duration_ms: start.elapsed().as_millis(),
785 quality: VerificationQuality::default(),
786 });
787 }
788
789 let mut args = vec!["-m".to_string(), "pytest".to_string(), "-q".to_string()];
790 args.extend(property_paths);
791 let command = run_command(
792 root,
793 "python3",
794 args.iter().map(String::as_str),
795 config.command_timeout_seconds.min(60),
796 )?;
797 let failures = if command.status == RunStatus::Failed {
798 vec![Failure {
799 id: None,
800 message: "python hypothesis property candidates failed".to_string(),
801 severity: FailureSeverity::Warning,
802 target_id: artifacts
803 .iter()
804 .find(|artifact| artifact.kind == ArtifactKind::PropertyTest)
805 .map(|artifact| artifact.target_id.clone()),
806 artifact_id: artifacts
807 .iter()
808 .find(|artifact| artifact.kind == ArtifactKind::PropertyTest)
809 .map(|artifact| artifact.id.clone()),
810 command: command_line(&command.program, &command.args),
811 stdout_excerpt: excerpt(&command.stdout),
812 stderr_excerpt: excerpt(&command.stderr),
813 repro: Some(ReproCase {
814 command: command_line(&command.program, &command.args),
815 input: None,
816 path: Some(Utf8PathBuf::from(".veritas/properties")),
817 }),
818 }]
819 } else {
820 Vec::new()
821 };
822 let status = command.status.clone();
823 Ok(TestRunResult {
824 language: "python".to_string(),
825 status,
826 commands: vec![command],
827 failures,
828 duration_ms: start.elapsed().as_millis(),
829 quality: VerificationQuality::default(),
830 })
831}
832
833fn skipped_python_command(root: &Path, args: Vec<String>, stderr: &str) -> Result<CommandRecord> {
834 Ok(CommandRecord {
835 program: "python3".to_string(),
836 args,
837 cwd: utf8_path(root)?,
838 exit_code: None,
839 status: RunStatus::Skipped,
840 stdout: String::new(),
841 stderr: stderr.to_string(),
842 duration_ms: 0,
843 })
844}
845
846fn run_python_mutations(
847 root: &Path,
848 artifacts: &[GeneratedArtifact],
849 config: &PythonPluginConfig,
850) -> Result<TestRunResult> {
851 let start = Instant::now();
852 let functions = discover_functions(root)?;
853 let candidates = python_mutation_candidates(root, &functions, artifacts)?
854 .into_iter()
855 .filter(|candidate| python_mutation_candidate_in_shard(candidate, &config.mutation))
856 .filter(|candidate| {
857 config.mutation.report_filtered || python_mutation_candidate_allowed(candidate, config)
858 })
859 .collect::<Vec<_>>();
860 let mut quality = VerificationQuality::default();
861 quality.mutation.generated = candidates.len();
862 quality.mutation.effective_workers = 1;
863 let mut commands = Vec::new();
864 let mut failures = Vec::new();
865 let mut status = RunStatus::Passed;
866 let run_dir = start_mutation_run(root, "python")?;
867
868 for candidate in candidates.into_iter().take(8) {
869 let domain = python_mutation_domain(&candidate);
870 let operator = python_mutation_operator(&candidate.label);
871 record_mutation_generated(&mut quality.mutation.by_domain, &domain);
872 record_mutation_generated(&mut quality.mutation.by_operator, &operator);
873 if !python_mutation_candidate_allowed(&candidate, config) {
874 let mut record = python_mutation_record(
875 &candidate,
876 &domain,
877 &operator,
878 MutationStatus::Skipped,
879 None,
880 0,
881 );
882 record.skip_reason = Some("filtered by mutation config".to_string());
883 persist_mutation_record_artifacts(root, &run_dir, &mut record, None)?;
884 quality.mutation.records.push(record);
885 continue;
886 }
887 if config.mutation.dry_run {
888 quality.mutation.runnable += 1;
889 record_mutation_runnable(&mut quality.mutation.by_domain, &domain);
890 record_mutation_runnable(&mut quality.mutation.by_operator, &operator);
891 let mut record = python_mutation_record(
892 &candidate,
893 &domain,
894 &operator,
895 MutationStatus::Runnable,
896 None,
897 0,
898 );
899 persist_mutation_record_artifacts(root, &run_dir, &mut record, None)?;
900 quality.mutation.records.push(record);
901 continue;
902 }
903 quality.mutation.runnable += 1;
904 quality.mutation.executed += 1;
905 record_mutation_runnable(&mut quality.mutation.by_domain, &domain);
906 record_mutation_runnable(&mut quality.mutation.by_operator, &operator);
907 record_mutation_executed(&mut quality.mutation.by_domain, &domain);
908 record_mutation_executed(&mut quality.mutation.by_operator, &operator);
909
910 let path = root.join(&candidate.path);
911 let original = fs::read_to_string(&path)
912 .with_context(|| format!("failed to read {}", path.display()))?;
913 let mut mutated = original.clone();
914 mutated.replace_range(candidate.start_byte..candidate.end_byte, &candidate.to);
915 fs::write(&path, mutated).with_context(|| {
916 format!(
917 "failed to write Python mutation {} in {}",
918 candidate.label,
919 path.display()
920 )
921 })?;
922 let (_, args) = python_test_args(root);
923 let command = run_command(
924 root,
925 "python3",
926 args.iter().map(String::as_str),
927 config.command_timeout_seconds.min(60),
928 );
929 fs::write(&path, original)
930 .with_context(|| format!("failed to restore {}", path.display()))?;
931 let command = command?;
932 let command_text = command_line(&command.program, &command.args);
933 let mutation_status = if command.status == RunStatus::Passed {
934 quality.mutation.survived += 1;
935 record_mutation_survived(&mut quality.mutation.by_domain, &domain);
936 record_mutation_survived(&mut quality.mutation.by_operator, &operator);
937 status = RunStatus::Failed;
938 failures.push(python_mutation_failure(
939 artifacts,
940 &candidate,
941 &command,
942 &command_text,
943 ));
944 MutationStatus::Lived
945 } else if python_mutation_not_viable(&command) {
946 quality.mutation.not_viable += 1;
947 record_mutation_not_viable(&mut quality.mutation.by_domain, &domain);
948 record_mutation_not_viable(&mut quality.mutation.by_operator, &operator);
949 MutationStatus::NotViable
950 } else {
951 quality.mutation.killed += 1;
952 record_mutation_killed(&mut quality.mutation.by_domain, &domain);
953 record_mutation_killed(&mut quality.mutation.by_operator, &operator);
954 MutationStatus::Killed
955 };
956 let mut record = python_mutation_record(
957 &candidate,
958 &domain,
959 &operator,
960 mutation_status,
961 Some(&command_text),
962 command.duration_ms,
963 );
964 persist_mutation_record_artifacts(root, &run_dir, &mut record, Some(&command))?;
965 quality.mutation.records.push(record);
966 commands.push(command);
967 }
968 finalize_mutation_skips(&mut quality.mutation.by_domain);
969 finalize_mutation_skips(&mut quality.mutation.by_operator);
970 finalize_mutation_metrics(&mut quality.mutation);
971
972 Ok(TestRunResult {
973 language: "python".to_string(),
974 status,
975 commands,
976 failures,
977 duration_ms: start.elapsed().as_millis(),
978 quality,
979 })
980}
981
982fn python_mutation_candidates(
983 root: &Path,
984 functions: &[PythonFunction],
985 artifacts: &[GeneratedArtifact],
986) -> Result<Vec<PythonMutationCandidate>> {
987 let mut candidates = Vec::new();
988 for function in functions {
989 if !python_function_selected_for_mutation(function, artifacts) {
990 continue;
991 }
992 let path = root.join(&function.path);
993 let contents = fs::read_to_string(&path)
994 .with_context(|| format!("failed to read {}", path.display()))?;
995 if function_has_mutation_skip_annotation(&contents, function.start_byte, function.end_byte)
996 {
997 continue;
998 }
999 push_python_mutation(
1000 &contents,
1001 function,
1002 "<=",
1003 "<",
1004 "boundary comparison mutation",
1005 &mut candidates,
1006 );
1007 push_python_mutation(
1008 &contents,
1009 function,
1010 " or ",
1011 " and ",
1012 "permission boolean connector mutation",
1013 &mut candidates,
1014 );
1015 push_python_mutation(
1016 &contents,
1017 function,
1018 " and ",
1019 " or ",
1020 "boolean connector mutation",
1021 &mut candidates,
1022 );
1023 push_python_mutation(
1024 &contents,
1025 function,
1026 "return 0, False",
1027 "return 1, False",
1028 "default tuple mutation",
1029 &mut candidates,
1030 );
1031 for (from, to, label) in [
1032 ("FOR UPDATE", "", "database isolation_lock mutation"),
1033 ("tenant_id", "1", "database tenant_filter mutation"),
1034 (
1035 "idempotency_key",
1036 "request_id",
1037 "database idempotency mutation",
1038 ),
1039 (
1040 "asyncio.create_task",
1041 "await",
1042 "concurrency_lifecycle task_spawn mutation",
1043 ),
1044 (
1045 ".acquire()",
1046 ".locked()",
1047 "synchronization lock_mode mutation",
1048 ),
1049 ("retry(", "once(", "retry_resilience retry_attempt mutation"),
1050 ("time.time()", "0", "testability injected_clock mutation"),
1051 (
1052 "random.",
1053 "deterministic_random.",
1054 "testability injected_randomness mutation",
1055 ),
1056 (
1057 "sorted(",
1058 "list(",
1059 "brittleness equivalent_ordering mutation",
1060 ),
1061 ] {
1062 push_python_mutation(&contents, function, from, to, label, &mut candidates);
1063 }
1064 }
1065 candidates.sort_by(|left, right| {
1066 left.path
1067 .cmp(&right.path)
1068 .then(left.start_byte.cmp(&right.start_byte))
1069 .then(left.label.cmp(&right.label))
1070 });
1071 candidates.dedup_by(|left, right| {
1072 left.path == right.path
1073 && left.start_byte == right.start_byte
1074 && left.end_byte == right.end_byte
1075 && left.to == right.to
1076 });
1077 Ok(candidates)
1078}
1079
1080fn function_has_mutation_skip_annotation(source: &str, start_byte: usize, end_byte: usize) -> bool {
1081 source
1082 .get(start_byte..end_byte)
1083 .is_some_and(|body| body.contains("veritas:skip-mutation"))
1084}
1085
1086fn push_python_mutation(
1087 contents: &str,
1088 function: &PythonFunction,
1089 from: &str,
1090 to: &str,
1091 label: &str,
1092 candidates: &mut Vec<PythonMutationCandidate>,
1093) {
1094 let Some(source) = contents.get(function.start_byte..function.end_byte) else {
1095 return;
1096 };
1097 let Some(offset) = source.find(from) else {
1098 return;
1099 };
1100 let start_byte = function.start_byte + offset;
1101 candidates.push(PythonMutationCandidate {
1102 path: function.path.clone(),
1103 function: function.symbol.clone(),
1104 label: label.to_string(),
1105 from: from.to_string(),
1106 to: to.to_string(),
1107 start_byte,
1108 end_byte: start_byte + from.len(),
1109 line_range: function.line_range.clone(),
1110 });
1111}
1112
1113fn python_function_selected_for_mutation(
1114 function: &PythonFunction,
1115 artifacts: &[GeneratedArtifact],
1116) -> bool {
1117 artifacts
1118 .iter()
1119 .filter(|artifact| artifact.kind == ArtifactKind::MutationCheck)
1120 .any(|artifact| {
1121 artifact.target_id == "python:project"
1122 || artifact.target_id == format!("python:{}", function.path)
1123 || artifact.target_id == format!("python:{}:{}", function.path, function.symbol)
1124 })
1125}
1126
1127fn python_mutation_failure(
1128 artifacts: &[GeneratedArtifact],
1129 candidate: &PythonMutationCandidate,
1130 command: &CommandRecord,
1131 command_text: &str,
1132) -> Failure {
1133 Failure {
1134 id: None,
1135 message: format!(
1136 "mutation survived in Python function `{}`: {}",
1137 candidate.function, candidate.label
1138 ),
1139 severity: FailureSeverity::Warning,
1140 target_id: Some(format!("python:{}:{}", candidate.path, candidate.function)),
1141 artifact_id: artifacts
1142 .iter()
1143 .find(|artifact| artifact.kind == ArtifactKind::MutationCheck)
1144 .map(|artifact| artifact.id.clone()),
1145 command: command_text.to_string(),
1146 stdout_excerpt: excerpt(&command.stdout),
1147 stderr_excerpt: excerpt(&command.stderr),
1148 repro: Some(ReproCase {
1149 command: format!(
1150 "replace `{}` with `{}` in {} and run {}",
1151 candidate.from, candidate.to, candidate.path, command_text
1152 ),
1153 input: None,
1154 path: Some(candidate.path.clone()),
1155 }),
1156 }
1157}
1158
1159fn python_mutation_record(
1160 candidate: &PythonMutationCandidate,
1161 domain: &str,
1162 operator: &str,
1163 status: MutationStatus,
1164 command: Option<&str>,
1165 duration_ms: u128,
1166) -> MutationRecord {
1167 MutationRecord {
1168 id: python_mutation_candidate_id(candidate),
1169 language: "python".to_string(),
1170 path: candidate.path.clone(),
1171 symbol: candidate.function.clone(),
1172 operator: operator.to_string(),
1173 domain: domain.to_string(),
1174 status,
1175 from: Some(candidate.from.clone()),
1176 to: Some(candidate.to.clone()),
1177 line_range: Some(candidate.line_range.clone()),
1178 source_span: Some(SourceSpan {
1179 start_byte: candidate.start_byte,
1180 end_byte: candidate.end_byte,
1181 }),
1182 diff: Some(python_mutation_diff(candidate)),
1183 diff_path: None,
1184 outcome_path: None,
1185 command_log_path: None,
1186 stdout_log_path: None,
1187 stderr_log_path: None,
1188 risk_note: Some(mutation_taxonomy::risk_note(domain, operator).to_string()),
1189 suggested_test: Some(mutation_taxonomy::suggested_test(domain, operator).to_string()),
1190 skip_reason: mutation_skip_reason(status),
1191 selected_test_command: command.map(ToString::to_string),
1192 test_selection_hint: None,
1193 test_selection_fallback: None,
1194 brittleness_probe: domain == "brittleness",
1195 command: command.map(ToString::to_string),
1196 duration_ms,
1197 }
1198}
1199
1200fn python_mutation_candidate_id(candidate: &PythonMutationCandidate) -> String {
1201 format!(
1202 "python:{}:{}:{}:{}",
1203 candidate.path, candidate.function, candidate.start_byte, candidate.end_byte
1204 )
1205}
1206
1207fn python_mutation_diff(candidate: &PythonMutationCandidate) -> String {
1208 format!(
1209 "--- {}\n+++ {}\n@@ bytes {}..{} @@\n-{}\n+{}",
1210 candidate.path,
1211 candidate.path,
1212 candidate.start_byte,
1213 candidate.end_byte,
1214 candidate.from,
1215 candidate.to
1216 )
1217}
1218
1219fn mutation_skip_reason(status: MutationStatus) -> Option<String> {
1220 match status {
1221 MutationStatus::NotCovered => Some("no selected tests cover this mutant".to_string()),
1222 MutationStatus::Skipped => Some("mutation was skipped before execution".to_string()),
1223 MutationStatus::TimedOut => Some(
1224 "mutation test command timed out; consider filtering this operator/mutant or adding deterministic test seams for recurring hangs"
1225 .to_string(),
1226 ),
1227 MutationStatus::NotViable => Some("mutation did not compile or could not run".to_string()),
1228 _ => None,
1229 }
1230}
1231
1232fn python_mutation_domain(candidate: &PythonMutationCandidate) -> String {
1233 let value = format!("{} {}", candidate.function, candidate.label).to_ascii_lowercase();
1234 mutation_taxonomy::normalize_domain(&value).to_string()
1235}
1236
1237fn python_mutation_operator(label: &str) -> String {
1238 mutation_taxonomy::normalize_operator(label).to_string()
1239}
1240
1241fn python_mutation_candidate_allowed(
1242 candidate: &PythonMutationCandidate,
1243 config: &PythonPluginConfig,
1244) -> bool {
1245 if !config.mutation.include_paths.is_empty()
1246 && !config
1247 .mutation
1248 .include_paths
1249 .iter()
1250 .any(|pattern| text_matches(candidate.path.as_str(), pattern))
1251 {
1252 return false;
1253 }
1254 if config
1255 .mutation
1256 .exclude_paths
1257 .iter()
1258 .any(|pattern| text_matches(candidate.path.as_str(), pattern))
1259 {
1260 return false;
1261 }
1262 if !config.mutation.include_symbols.is_empty()
1263 && !config
1264 .mutation
1265 .include_symbols
1266 .iter()
1267 .any(|pattern| text_matches(&candidate.function, pattern))
1268 {
1269 return false;
1270 }
1271 if config
1272 .mutation
1273 .exclude_symbols
1274 .iter()
1275 .any(|pattern| text_matches(&candidate.function, pattern))
1276 {
1277 return false;
1278 }
1279 let target_id = python_mutation_candidate_target_id(candidate);
1280 if !config.mutation.include_target_ids.is_empty()
1281 && !config
1282 .mutation
1283 .include_target_ids
1284 .iter()
1285 .any(|pattern| text_matches(&target_id, pattern))
1286 {
1287 return false;
1288 }
1289 if config
1290 .mutation
1291 .exclude_target_ids
1292 .iter()
1293 .any(|pattern| text_matches(&target_id, pattern))
1294 {
1295 return false;
1296 }
1297 let id = python_mutation_candidate_id(candidate);
1298 if !config.mutation.include_mutant_ids.is_empty()
1299 && !config
1300 .mutation
1301 .include_mutant_ids
1302 .iter()
1303 .any(|configured| text_matches(&id, configured))
1304 {
1305 return false;
1306 }
1307 if config
1308 .mutation
1309 .exclude_mutant_ids
1310 .iter()
1311 .any(|configured| text_matches(&id, configured))
1312 {
1313 return false;
1314 }
1315 let domain = python_mutation_domain(candidate);
1316 if !config.mutation.enabled_domains.is_empty()
1317 && !config
1318 .mutation
1319 .enabled_domains
1320 .iter()
1321 .any(|enabled| taxonomy_matches(&domain, enabled))
1322 {
1323 return false;
1324 }
1325 if config
1326 .mutation
1327 .disabled_domains
1328 .iter()
1329 .any(|disabled| taxonomy_matches(&domain, disabled))
1330 {
1331 return false;
1332 }
1333 let operator = python_mutation_operator(&candidate.label);
1334 if !config.mutation.enabled_operators.is_empty()
1335 && !config
1336 .mutation
1337 .enabled_operators
1338 .iter()
1339 .any(|enabled| taxonomy_matches(&operator, enabled))
1340 {
1341 return false;
1342 }
1343 !config
1344 .mutation
1345 .disabled_operators
1346 .iter()
1347 .any(|disabled| taxonomy_matches(&operator, disabled))
1348}
1349
1350fn python_mutation_candidate_target_id(candidate: &PythonMutationCandidate) -> String {
1351 format!("python:{}:{}", candidate.path, candidate.function)
1352}
1353
1354fn python_mutation_candidate_in_shard(
1355 candidate: &PythonMutationCandidate,
1356 config: &MutationConfig,
1357) -> bool {
1358 let Some(shard_count) = config.shard_count else {
1359 return true;
1360 };
1361 let shard_index = config.shard_index.unwrap_or(0);
1362 if shard_index >= shard_count {
1363 return false;
1364 }
1365 let mut hasher = DefaultHasher::new();
1366 candidate.path.hash(&mut hasher);
1367 candidate.function.hash(&mut hasher);
1368 candidate.start_byte.hash(&mut hasher);
1369 candidate.end_byte.hash(&mut hasher);
1370 (hasher.finish() as usize % shard_count) == shard_index
1371}
1372
1373fn taxonomy_matches(operator: &str, configured: &str) -> bool {
1374 let configured = configured.replace('-', "_").to_ascii_lowercase();
1375 text_matches(operator, configured.trim())
1376}
1377
1378fn text_matches(text: &str, pattern: &str) -> bool {
1379 if let Some(exact) = pattern.strip_prefix("exact:") {
1380 return text.eq_ignore_ascii_case(exact);
1381 }
1382 if let Some(regex) = pattern.strip_prefix("regex:") {
1383 return regex::RegexBuilder::new(regex)
1384 .case_insensitive(true)
1385 .build()
1386 .is_ok_and(|regex| regex.is_match(text));
1387 }
1388 let pattern = pattern.strip_prefix("glob:").unwrap_or(pattern);
1389 let text = text.to_ascii_lowercase();
1390 let pattern = pattern.to_ascii_lowercase();
1391 if let Some(suffix) = pattern.strip_suffix('$') {
1392 return text.ends_with(suffix);
1393 }
1394 if pattern.contains('*') {
1395 let mut rest = text.as_str();
1396 for part in pattern.split('*').filter(|part| !part.is_empty()) {
1397 let Some(offset) = rest.find(part) else {
1398 return false;
1399 };
1400 rest = &rest[offset + part.len()..];
1401 }
1402 return true;
1403 }
1404 text.contains(&pattern)
1405}
1406
1407fn record_mutation_generated(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1408 metrics.entry(key.to_string()).or_default().generated += 1;
1409}
1410
1411fn record_mutation_runnable(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1412 metrics.entry(key.to_string()).or_default().runnable += 1;
1413}
1414
1415fn record_mutation_executed(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1416 metrics.entry(key.to_string()).or_default().executed += 1;
1417}
1418
1419fn record_mutation_killed(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1420 metrics.entry(key.to_string()).or_default().killed += 1;
1421}
1422
1423fn record_mutation_survived(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1424 metrics.entry(key.to_string()).or_default().survived += 1;
1425}
1426
1427fn record_mutation_not_viable(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1428 metrics.entry(key.to_string()).or_default().not_viable += 1;
1429}
1430
1431fn finalize_mutation_skips(metrics: &mut BTreeMap<String, MutationAttribution>) {
1432 for metric in metrics.values_mut() {
1433 metric.skipped = metric.generated.saturating_sub(metric.executed);
1434 }
1435}
1436
1437fn python_mutation_not_viable(command: &CommandRecord) -> bool {
1438 let output = format!("{}\n{}", command.stdout, command.stderr).to_ascii_lowercase();
1439 output.contains("syntaxerror")
1440 || output.contains("indentationerror")
1441 || output.contains("nameerror")
1442 || output.contains("typeerror")
1443}
1444
1445fn python_coverage_report(root: &Path, path: &Path) -> Result<CoverageReport> {
1446 let contents =
1447 fs::read_to_string(path).with_context(|| format!("failed to read {}", path.display()))?;
1448 let value: serde_json::Value = serde_json::from_str(&contents)
1449 .with_context(|| format!("failed to parse {}", path.display()))?;
1450 let total = value["totals"]["percent_covered_display"]
1451 .as_str()
1452 .map(ToString::to_string)
1453 .or_else(|| {
1454 value["totals"]["percent_covered"]
1455 .as_f64()
1456 .map(|percent| format!("{percent:.1}"))
1457 })
1458 .unwrap_or_else(|| "unknown".to_string());
1459 let files = value["files"]
1460 .as_object()
1461 .into_iter()
1462 .flat_map(|files| files.iter())
1463 .map(|(path, file)| CoverageFile {
1464 path: Utf8PathBuf::from(path),
1465 line_coverage_percent: file["summary"]["percent_covered"]
1466 .as_f64()
1467 .map(|percent| percent.round().clamp(0.0, 100.0) as u8),
1468 uncovered_ranges: file["missing_lines"]
1469 .as_array()
1470 .into_iter()
1471 .flatten()
1472 .filter_map(|line| line.as_u64())
1473 .map(|line| line.to_string())
1474 .collect(),
1475 })
1476 .collect();
1477 Ok(CoverageReport {
1478 tool: "python coverage.py".to_string(),
1479 summary: format!("line coverage {total}% ({})", relative_display(root, path)),
1480 files,
1481 })
1482}
1483
1484fn relative_display(root: &Path, path: &Path) -> String {
1485 path.strip_prefix(root)
1486 .unwrap_or(path)
1487 .to_string_lossy()
1488 .to_string()
1489}
1490
1491fn replay_python_function(
1492 root: &Path,
1493 function: &PythonFunction,
1494 case: &BehaviorReplayCase,
1495 config: &PythonPluginConfig,
1496) -> Result<BehaviorReplayObservation> {
1497 if function.owner.is_some() {
1498 return Ok(unsupported_python_replay(
1499 "method replay requires receiver construction",
1500 ));
1501 }
1502 if function.params.is_empty() || case.inputs.is_empty() {
1503 return Ok(unsupported_python_replay(
1504 "executable replay requires at least one supported argument with seeded inputs",
1505 ));
1506 }
1507
1508 let script = render_python_replay_script(function, case)?;
1509 let script_path = root.join(".veritas").join("tmp_python_replay.py");
1510 if let Some(parent) = script_path.parent() {
1511 fs::create_dir_all(parent)
1512 .with_context(|| format!("failed to create {}", parent.display()))?;
1513 }
1514 fs::write(&script_path, script)
1515 .with_context(|| format!("failed to write {}", script_path.display()))?;
1516 let command = run_command(
1517 root,
1518 "python3",
1519 [script_path.to_string_lossy().as_ref()],
1520 config.command_timeout_seconds.min(30),
1521 );
1522 let cleanup = fs::remove_file(&script_path)
1523 .with_context(|| format!("failed to remove {}", script_path.display()));
1524 let command = command?;
1525 cleanup?;
1526
1527 let outputs = parse_replay_marker_output(&command.stdout);
1528 let output = if outputs.is_empty() {
1529 serde_json::json!({
1530 "observations": [],
1531 "stderr": excerpt(&command.stderr),
1532 })
1533 } else {
1534 serde_json::json!({ "observations": outputs })
1535 };
1536 let status = if command.status == RunStatus::Passed && !outputs.is_empty() {
1537 BehaviorReplayStatus::Observed
1538 } else {
1539 BehaviorReplayStatus::Failed
1540 };
1541 Ok(BehaviorReplayObservation {
1542 status,
1543 output,
1544 command: Some(command_line(&command.program, &command.args)),
1545 stdout_excerpt: Some(excerpt(&command.stdout)),
1546 stderr_excerpt: Some(excerpt(&command.stderr)),
1547 duration_ms: Some(command.duration_ms),
1548 })
1549}
1550
1551fn replay_python_function_batch(
1552 root: &Path,
1553 function: &PythonFunction,
1554 cases: &[BehaviorReplayCase],
1555 config: &PythonPluginConfig,
1556) -> Result<BTreeMap<String, BehaviorReplayObservation>> {
1557 let mut observations = BTreeMap::new();
1558 if cases.is_empty() {
1559 return Ok(observations);
1560 }
1561 if function.owner.is_some() {
1562 insert_unsupported_python_replay(
1563 &mut observations,
1564 cases,
1565 "method replay requires receiver construction",
1566 );
1567 return Ok(observations);
1568 }
1569 if function.params.is_empty() {
1570 insert_unsupported_python_replay(
1571 &mut observations,
1572 cases,
1573 "executable replay requires at least one supported argument",
1574 );
1575 return Ok(observations);
1576 }
1577
1578 let runnable = cases
1579 .iter()
1580 .filter(|case| {
1581 if case.inputs.is_empty() {
1582 observations.insert(
1583 case.name.clone(),
1584 unsupported_python_replay(
1585 "executable replay requires at least one seeded input",
1586 ),
1587 );
1588 false
1589 } else {
1590 true
1591 }
1592 })
1593 .collect::<Vec<_>>();
1594 if runnable.is_empty() {
1595 return Ok(observations);
1596 }
1597
1598 let script = render_python_replay_batch_script(function, &runnable)?;
1599 let script_path = root.join(".veritas").join(format!(
1600 "tmp_python_replay_{}_{}.py",
1601 std::process::id(),
1602 safe_ident(&function.symbol)
1603 ));
1604 if let Some(parent) = script_path.parent() {
1605 fs::create_dir_all(parent)
1606 .with_context(|| format!("failed to create {}", parent.display()))?;
1607 }
1608 fs::write(&script_path, script)
1609 .with_context(|| format!("failed to write {}", script_path.display()))?;
1610 let command = run_command(
1611 root,
1612 "python3",
1613 [script_path.to_string_lossy().as_ref()],
1614 config.command_timeout_seconds.min(30),
1615 );
1616 let cleanup = fs::remove_file(&script_path)
1617 .with_context(|| format!("failed to remove {}", script_path.display()));
1618 let command = command?;
1619 cleanup?;
1620
1621 let mut by_case = parse_replay_case_marker_output(&command.stdout);
1622 for case in runnable {
1623 observations.insert(
1624 case.name.clone(),
1625 replay_observation_from_command(
1626 &command,
1627 by_case.remove(&case.name).unwrap_or_default(),
1628 ),
1629 );
1630 }
1631 Ok(observations)
1632}
1633
1634fn render_python_replay_script(
1635 function: &PythonFunction,
1636 case: &BehaviorReplayCase,
1637) -> Result<String> {
1638 let inputs = serde_json::to_string(&case.inputs)?;
1639 Ok(format!(
1640 r#"import importlib.util
1641import json
1642import pathlib
1643import traceback
1644
1645module_path = pathlib.Path({path:?})
1646spec = importlib.util.spec_from_file_location("veritas_replay_target", module_path)
1647module = importlib.util.module_from_spec(spec)
1648spec.loader.exec_module(module)
1649target = getattr(module, {name:?})
1650arity = {arity}
1651
1652def call_args(value):
1653 if arity == 1:
1654 return [value]
1655 if isinstance(value, list) and len(value) == arity:
1656 return value
1657 raise ValueError("replay input does not match target arity")
1658
1659for value in json.loads({inputs:?}):
1660 try:
1661 output = repr(target(*call_args(value)))
1662 status = "observed"
1663 except BaseException as exc:
1664 output = repr(exc)
1665 status = "exception"
1666 print("__VERITAS_REPLAY__{{}}\t{{}}\t{{}}".format(repr(value), status, output))
1667"#,
1668 path = function.path.as_str(),
1669 name = function.name,
1670 arity = function.params.len(),
1671 inputs = inputs
1672 ))
1673}
1674
1675fn render_python_replay_batch_script(
1676 function: &PythonFunction,
1677 cases: &[&BehaviorReplayCase],
1678) -> Result<String> {
1679 let cases = cases
1680 .iter()
1681 .map(|case| {
1682 serde_json::json!({
1683 "name": &case.name,
1684 "inputs": &case.inputs,
1685 })
1686 })
1687 .collect::<Vec<_>>();
1688 let cases = serde_json::to_string(&cases)?;
1689 Ok(format!(
1690 r#"import importlib.util
1691import json
1692import pathlib
1693
1694module_path = pathlib.Path({path:?})
1695spec = importlib.util.spec_from_file_location("veritas_replay_target", module_path)
1696module = importlib.util.module_from_spec(spec)
1697spec.loader.exec_module(module)
1698target = getattr(module, {name:?})
1699arity = {arity}
1700
1701def call_args(value):
1702 if arity == 1:
1703 return [value]
1704 if isinstance(value, list) and len(value) == arity:
1705 return value
1706 raise ValueError("replay input does not match target arity")
1707
1708for case in json.loads({cases:?}):
1709 for value in case["inputs"]:
1710 try:
1711 output = repr(target(*call_args(value)))
1712 status = "observed"
1713 except BaseException as exc:
1714 output = repr(exc)
1715 status = "exception"
1716 print("__VERITAS_REPLAY_CASE__{{}}\t{{}}\t{{}}\t{{}}".format(case["name"], repr(value), status, output))
1717"#,
1718 path = function.path.as_str(),
1719 name = function.name,
1720 arity = function.params.len(),
1721 cases = cases
1722 ))
1723}
1724
1725fn unsupported_python_replay(reason: &str) -> BehaviorReplayObservation {
1726 BehaviorReplayObservation {
1727 status: BehaviorReplayStatus::Unsupported,
1728 output: serde_json::json!({ "reason": reason }),
1729 command: None,
1730 stdout_excerpt: None,
1731 stderr_excerpt: None,
1732 duration_ms: None,
1733 }
1734}
1735
1736fn insert_unsupported_python_replay(
1737 observations: &mut BTreeMap<String, BehaviorReplayObservation>,
1738 cases: &[BehaviorReplayCase],
1739 reason: &str,
1740) {
1741 for case in cases {
1742 observations.insert(case.name.clone(), unsupported_python_replay(reason));
1743 }
1744}
1745
1746fn replay_observation_from_command(
1747 command: &CommandRecord,
1748 outputs: Vec<serde_json::Value>,
1749) -> BehaviorReplayObservation {
1750 let has_outputs = !outputs.is_empty();
1751 let output = if outputs.is_empty() {
1752 serde_json::json!({
1753 "observations": [],
1754 "stderr": excerpt(&command.stderr),
1755 })
1756 } else {
1757 serde_json::json!({ "observations": outputs })
1758 };
1759 let status = if command.status == RunStatus::Passed && has_outputs {
1760 BehaviorReplayStatus::Observed
1761 } else {
1762 BehaviorReplayStatus::Failed
1763 };
1764 BehaviorReplayObservation {
1765 status,
1766 output,
1767 command: Some(command_line(&command.program, &command.args)),
1768 stdout_excerpt: Some(excerpt(&command.stdout)),
1769 stderr_excerpt: Some(excerpt(&command.stderr)),
1770 duration_ms: Some(command.duration_ms),
1771 }
1772}
1773
1774fn python_target_matches_function(target_id: &str, function: &PythonFunction) -> bool {
1775 if target_id == "python:project" {
1776 return true;
1777 }
1778 let Some(rest) = target_id.strip_prefix("python:") else {
1779 return false;
1780 };
1781 if rest == function.path.as_str() {
1782 return true;
1783 }
1784 if let Some((path, symbol)) = rest.rsplit_once(':') {
1785 return path == function.path.as_str() && symbol == function.symbol;
1786 }
1787 false
1788}
1789
1790fn contains_python_file(root: &Path) -> bool {
1791 WalkDir::new(root)
1792 .max_depth(4)
1793 .into_iter()
1794 .filter_entry(|entry| !is_ignored(entry.path(), entry.file_name()))
1795 .filter_map(Result::ok)
1796 .any(|entry| {
1797 entry.file_type().is_file() && entry.path().extension() == Some(OsStr::new("py"))
1798 })
1799}
1800
1801fn is_ignored(path: &Path, name: &OsStr) -> bool {
1802 let name = name.to_string_lossy();
1803 if matches!(
1804 name.as_ref(),
1805 ".git"
1806 | ".hg"
1807 | ".svn"
1808 | ".venv"
1809 | "venv"
1810 | "__pycache__"
1811 | ".mypy_cache"
1812 | ".pytest_cache"
1813 | ".veritas"
1814 ) {
1815 return true;
1816 }
1817 path.components().any(|component| {
1818 matches!(
1819 component.as_os_str().to_string_lossy().as_ref(),
1820 ".git" | ".venv" | "venv" | "__pycache__" | ".veritas"
1821 )
1822 })
1823}
1824
1825fn is_python_test_file(path: &Path) -> bool {
1826 path.file_name()
1827 .and_then(OsStr::to_str)
1828 .is_some_and(|name| name.starts_with("test_") || name.ends_with("_test.py"))
1829 || path
1830 .components()
1831 .any(|component| component.as_os_str() == OsStr::new("tests"))
1832}
1833
1834fn python_test_args(root: &Path) -> (&'static str, Vec<String>) {
1835 python_test_args_with_availability(root, python_module_available(root, "pytest"))
1836}
1837
1838fn python_test_args_with_availability(
1839 root: &Path,
1840 pytest_available: bool,
1841) -> (&'static str, Vec<String>) {
1842 if prefers_pytest(root) && pytest_available {
1843 (
1844 "pytest",
1845 vec!["-m".to_string(), "pytest".to_string(), "-q".to_string()],
1846 )
1847 } else {
1848 (
1849 "unittest",
1850 vec![
1851 "-m".to_string(),
1852 "unittest".to_string(),
1853 "discover".to_string(),
1854 ],
1855 )
1856 }
1857}
1858
1859fn prefers_pytest(root: &Path) -> bool {
1860 root.join("pytest.ini").exists()
1861 || root.join(".pytest.ini").exists()
1862 || config_contains(root.join("pyproject.toml"), "[tool.pytest")
1863 || config_contains(root.join("setup.cfg"), "[tool:pytest]")
1864 || config_contains(root.join("tox.ini"), "[pytest]")
1865 || tests_import_pytest(root)
1866}
1867
1868fn tests_import_pytest(root: &Path) -> bool {
1869 WalkDir::new(root)
1870 .max_depth(4)
1871 .into_iter()
1872 .filter_entry(|entry| !is_ignored(entry.path(), entry.file_name()))
1873 .filter_map(Result::ok)
1874 .filter(|entry| {
1875 entry.file_type().is_file()
1876 && entry.path().extension() == Some(OsStr::new("py"))
1877 && is_python_test_file(entry.path())
1878 })
1879 .any(|entry| {
1880 fs::read_to_string(entry.path()).is_ok_and(|contents| {
1881 contents.contains("import pytest") || contents.contains("from pytest")
1882 })
1883 })
1884}
1885
1886fn config_contains(path: impl AsRef<Path>, needle: &str) -> bool {
1887 fs::read_to_string(path).is_ok_and(|contents| contents.contains(needle))
1888}
1889
1890fn python_module_available(root: &Path, module: &str) -> bool {
1891 Command::new("python3")
1892 .args(["-m", module, "--version"])
1893 .current_dir(root)
1894 .stdout(Stdio::null())
1895 .stderr(Stdio::null())
1896 .status()
1897 .is_ok_and(|status| status.success())
1898}
1899
1900fn infer_risk(name: &str) -> RiskLevel {
1901 let lowered = name.to_ascii_lowercase();
1902 let high_risk_terms = [
1903 "parse",
1904 "auth",
1905 "permission",
1906 "money",
1907 "price",
1908 "invoice",
1909 "token",
1910 "serialize",
1911 "deserialize",
1912 "refund",
1913 "discount",
1914 ];
1915 if high_risk_terms.iter().any(|term| lowered.contains(term)) {
1916 RiskLevel::High
1917 } else {
1918 RiskLevel::Medium
1919 }
1920}
1921
1922fn node_text<'a>(node: Node<'_>, source: &'a str) -> Result<&'a str> {
1923 source
1924 .get(node.byte_range())
1925 .ok_or_else(|| anyhow!("invalid tree-sitter byte range"))
1926}
1927
1928fn signature_text<'a>(node: Node<'_>, source: &'a str) -> Result<&'a str> {
1929 let start = node.start_byte();
1930 let body_start = node
1931 .child_by_field_name("body")
1932 .map(|body| body.start_byte())
1933 .unwrap_or(node.end_byte());
1934 source
1935 .get(start..body_start)
1936 .ok_or_else(|| anyhow!("invalid Python signature byte range"))
1937}
1938
1939fn run_command<I, S>(
1940 root: &Path,
1941 program: &str,
1942 args: I,
1943 timeout_seconds: u64,
1944) -> Result<CommandRecord>
1945where
1946 I: IntoIterator<Item = S>,
1947 S: Into<String>,
1948{
1949 let start = Instant::now();
1950 let args = args.into_iter().map(Into::into).collect::<Vec<String>>();
1951 let mut child = Command::new(program)
1952 .args(&args)
1953 .current_dir(root)
1954 .stdout(Stdio::piped())
1955 .stderr(Stdio::piped())
1956 .spawn()
1957 .with_context(|| format!("failed to run {}", command_line(program, &args)))?;
1958 let timeout = Duration::from_secs(timeout_seconds.max(1));
1959 loop {
1960 if child
1961 .try_wait()
1962 .with_context(|| format!("failed to poll {}", command_line(program, &args)))?
1963 .is_some()
1964 {
1965 let output = child
1966 .wait_with_output()
1967 .with_context(|| format!("failed to read {}", command_line(program, &args)))?;
1968 let status = if output.status.success() {
1969 RunStatus::Passed
1970 } else {
1971 RunStatus::Failed
1972 };
1973 return Ok(CommandRecord {
1974 program: program.to_string(),
1975 args,
1976 cwd: utf8_path(root)?,
1977 exit_code: output.status.code(),
1978 status,
1979 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
1980 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
1981 duration_ms: start.elapsed().as_millis(),
1982 });
1983 }
1984 if start.elapsed() >= timeout {
1985 let _ = child.kill();
1986 let output = child.wait_with_output().with_context(|| {
1987 format!(
1988 "failed to collect timed-out {}",
1989 command_line(program, &args)
1990 )
1991 })?;
1992 let mut stderr = String::from_utf8_lossy(&output.stderr).to_string();
1993 if !stderr.is_empty() {
1994 stderr.push('\n');
1995 }
1996 stderr.push_str(&format!("command timed out after {}s", timeout.as_secs()));
1997 return Ok(CommandRecord {
1998 program: program.to_string(),
1999 args,
2000 cwd: utf8_path(root)?,
2001 exit_code: output.status.code(),
2002 status: RunStatus::Failed,
2003 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
2004 stderr,
2005 duration_ms: start.elapsed().as_millis(),
2006 });
2007 }
2008 thread::sleep(Duration::from_millis(20));
2009 }
2010}
2011
2012fn parse_replay_marker_output(stdout: &str) -> Vec<serde_json::Value> {
2013 stdout
2014 .lines()
2015 .filter_map(|line| {
2016 let marker = line.find("__VERITAS_REPLAY__")?;
2017 let payload = &line[marker + "__VERITAS_REPLAY__".len()..];
2018 let mut parts = payload.splitn(3, '\t');
2019 let input = parts.next()?.to_string();
2020 let status = parts.next()?.to_string();
2021 let output = parts.next().unwrap_or_default().to_string();
2022 Some(serde_json::json!({
2023 "input": input,
2024 "status": status,
2025 "output": output,
2026 }))
2027 })
2028 .collect()
2029}
2030
2031fn parse_replay_case_marker_output(stdout: &str) -> BTreeMap<String, Vec<serde_json::Value>> {
2032 let mut by_case: BTreeMap<String, Vec<serde_json::Value>> = BTreeMap::new();
2033 for line in stdout.lines() {
2034 let Some(marker) = line.find("__VERITAS_REPLAY_CASE__") else {
2035 continue;
2036 };
2037 let payload = &line[marker + "__VERITAS_REPLAY_CASE__".len()..];
2038 let mut parts = payload.splitn(4, '\t');
2039 let Some(case) = parts.next() else {
2040 continue;
2041 };
2042 let Some(input) = parts.next() else {
2043 continue;
2044 };
2045 let Some(status) = parts.next() else {
2046 continue;
2047 };
2048 let output = parts.next().unwrap_or_default();
2049 by_case
2050 .entry(case.to_string())
2051 .or_default()
2052 .push(serde_json::json!({
2053 "input": input,
2054 "status": status,
2055 "output": output,
2056 }));
2057 }
2058 by_case
2059}
2060
2061fn safe_ident(value: &str) -> String {
2062 let mut out = String::new();
2063 for ch in value.chars() {
2064 if ch.is_ascii_alphanumeric() {
2065 out.push(ch.to_ascii_lowercase());
2066 } else {
2067 out.push('_');
2068 }
2069 }
2070 while out.contains("__") {
2071 out = out.replace("__", "_");
2072 }
2073 let out = out.trim_matches('_').to_string();
2074 if out.is_empty() {
2075 "target".to_string()
2076 } else {
2077 out
2078 }
2079}
2080
2081fn relative_utf8(root: &Path, path: &Path) -> Result<Utf8PathBuf> {
2082 let relative = path
2083 .strip_prefix(root)
2084 .with_context(|| format!("failed to relativize {}", path.display()))?;
2085 Utf8PathBuf::from_path_buf(relative.to_path_buf())
2086 .map_err(|path| anyhow!("path is not valid UTF-8: {}", path.display()))
2087}
2088
2089fn utf8_path(path: &Path) -> Result<Utf8PathBuf> {
2090 Utf8PathBuf::from_path_buf(path.to_path_buf())
2091 .map_err(|path| anyhow!("path is not valid UTF-8: {}", path.display()))
2092}
2093
2094fn command_line(program: &str, args: &[impl AsRef<str>]) -> String {
2095 let mut parts = vec![program.to_string()];
2096 parts.extend(args.iter().map(|arg| {
2097 let arg = arg.as_ref();
2098 if arg.contains(char::is_whitespace) {
2099 format!("{arg:?}")
2100 } else {
2101 arg.to_string()
2102 }
2103 }));
2104 parts.join(" ")
2105}
2106
2107fn excerpt(value: &str) -> String {
2108 let mut lines = value.lines().take(20).collect::<Vec<_>>().join("\n");
2109 if value.lines().count() > 20 {
2110 lines.push_str("\n...");
2111 }
2112 lines
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117 use super::*;
2118 use std::path::PathBuf;
2119 use std::time::{SystemTime, UNIX_EPOCH};
2120
2121 #[test]
2122 fn pytest_preference_uses_pytest_when_available() {
2123 let root = unique_test_root("pytest-available");
2124 fs::create_dir_all(root.join("tests")).expect("create tests dir");
2125 fs::write(root.join("pyproject.toml"), "[tool.pytest.ini_options]\n").expect("pyproject");
2126
2127 let (runner, args) = python_test_args_with_availability(&root, true);
2128
2129 fs::remove_dir_all(&root).ok();
2130 assert_eq!(runner, "pytest");
2131 assert_eq!(args, ["-m", "pytest", "-q"]);
2132 }
2133
2134 #[test]
2135 fn pytest_preference_falls_back_to_unittest_when_unavailable() {
2136 let root = unique_test_root("pytest-unavailable");
2137 fs::create_dir_all(root.join("tests")).expect("create tests dir");
2138 fs::write(root.join("tests/test_invoice.py"), "import pytest\n").expect("test file");
2139
2140 let (runner, args) = python_test_args_with_availability(&root, false);
2141
2142 fs::remove_dir_all(&root).ok();
2143 assert_eq!(runner, "unittest");
2144 assert_eq!(args, ["-m", "unittest", "discover"]);
2145 }
2146
2147 #[test]
2148 fn python_mutation_skip_annotation_is_function_local() {
2149 let source =
2150 "def kept():\n return 1\n\ndef skipped():\n # veritas:skip-mutation\n return 2\n";
2151 let kept_start = source.find("def kept").unwrap();
2152 let kept_end = source.find("def skipped").unwrap();
2153 let skipped_start = kept_end;
2154 assert!(!function_has_mutation_skip_annotation(
2155 source, kept_start, kept_end
2156 ));
2157 assert!(function_has_mutation_skip_annotation(
2158 source,
2159 skipped_start,
2160 source.len()
2161 ));
2162 }
2163
2164 fn unique_test_root(name: &str) -> PathBuf {
2165 let millis = SystemTime::now()
2166 .duration_since(UNIX_EPOCH)
2167 .expect("system time")
2168 .as_millis();
2169 std::env::temp_dir().join(format!("veritas-python-{name}-{millis}"))
2170 }
2171}