Skip to main content

veritas_python/
lib.rs

1use std::{
2    collections::hash_map::DefaultHasher,
3    collections::BTreeMap,
4    ffi::OsStr,
5    fs,
6    hash::{Hash, Hasher},
7    path::Path,
8    process::{Command, Stdio},
9    thread,
10    time::{Duration, Instant},
11};
12
13use anyhow::{anyhow, Context, Result};
14use camino::Utf8PathBuf;
15use serde::Serialize;
16use tree_sitter::{Node, Parser};
17use veritas_core::{
18    config::{MutationConfig, PythonPluginConfig},
19    persist_mutation_record_artifacts, start_mutation_run,
20};
21use veritas_plugin_api::{
22    finalize_mutation_metrics, mutation_taxonomy, ArtifactKind, ArtifactStatus, BehaviorReplayCase,
23    BehaviorReplayObservation, BehaviorReplayStatus, CommandRecord, CoverageFile, CoverageReport,
24    Failure, FailureSeverity, GeneratedArtifact, LanguagePlugin, LineRange, MutationAttribution,
25    MutationRecord, MutationStatus, PluginCapability, ProjectInfo, ReproCase, RiskLevel, RunStatus,
26    SourceSpan, TargetKind, TestRunResult, VerificationPlan, VerificationQuality,
27    VerificationStrategy, VerificationTarget,
28};
29use walkdir::WalkDir;
30
31#[derive(Debug, Clone)]
32pub struct PythonPlugin {
33    config: PythonPluginConfig,
34}
35
36#[derive(Debug, Clone)]
37struct PythonFunction {
38    path: Utf8PathBuf,
39    name: String,
40    symbol: String,
41    owner: Option<String>,
42    params: Vec<String>,
43    signature: String,
44    line_range: LineRange,
45    start_byte: usize,
46    end_byte: usize,
47    calls: Vec<String>,
48}
49
50#[derive(Debug, Clone)]
51struct PythonMutationCandidate {
52    path: Utf8PathBuf,
53    function: String,
54    label: String,
55    from: String,
56    to: String,
57    start_byte: usize,
58    end_byte: usize,
59    line_range: LineRange,
60}
61
62#[derive(Debug, Clone, Serialize)]
63struct PythonSymbolGraph<'a> {
64    target_id: &'a str,
65    symbols: Vec<PythonSymbolNode<'a>>,
66}
67
68#[derive(Debug, Clone, Serialize)]
69struct PythonSymbolNode<'a> {
70    id: String,
71    path: &'a Utf8PathBuf,
72    symbol: &'a str,
73    name: &'a str,
74    owner: Option<&'a str>,
75    signature: &'a str,
76    line_range: &'a LineRange,
77    risk: RiskLevel,
78    calls: &'a [String],
79}
80
81impl PythonPlugin {
82    pub fn new(config: PythonPluginConfig) -> Self {
83        Self { config }
84    }
85}
86
87impl LanguagePlugin for PythonPlugin {
88    fn id(&self) -> &'static str {
89        "python"
90    }
91
92    fn display_name(&self) -> &'static str {
93        "Python"
94    }
95
96    fn capabilities(&self) -> Vec<PluginCapability> {
97        vec![
98            PluginCapability::TargetDiscovery,
99            PluginCapability::SymbolGraph,
100            PluginCapability::GeneratedTests,
101            PluginCapability::ExistingTests,
102            PluginCapability::MutationChecks,
103            PluginCapability::DifferentialReplay,
104            PluginCapability::RegressionPromotion,
105            PluginCapability::ResourceBudgets,
106        ]
107    }
108
109    fn detect_project(&self, root: &Path) -> Result<ProjectInfo> {
110        if !root.join("pyproject.toml").exists()
111            && !root.join("setup.py").exists()
112            && !contains_python_file(root)
113        {
114            return Err(anyhow!("Python project not found"));
115        }
116        let name = root
117            .file_name()
118            .and_then(OsStr::to_str)
119            .unwrap_or("python-project")
120            .to_string();
121        let manifests = ["pyproject.toml", "setup.py"]
122            .into_iter()
123            .filter(|path| root.join(path).exists())
124            .map(Utf8PathBuf::from)
125            .collect();
126        Ok(ProjectInfo {
127            language: "python".to_string(),
128            name,
129            root: utf8_path(root)?,
130            manifests,
131        })
132    }
133
134    fn discover_targets(&self, root: &Path) -> Result<Vec<VerificationTarget>> {
135        let mut targets = vec![VerificationTarget {
136            id: "python:project".to_string(),
137            language: "python".to_string(),
138            kind: TargetKind::Project,
139            path: Utf8PathBuf::from("."),
140            symbol: None,
141            signature: None,
142            line_range: None,
143            description: "Python project".to_string(),
144            risk: RiskLevel::Medium,
145        }];
146
147        for function in discover_functions(root)? {
148            targets.push(VerificationTarget {
149                id: format!("python:{}:{}", function.path, function.symbol),
150                language: "python".to_string(),
151                kind: TargetKind::Function,
152                path: function.path.clone(),
153                symbol: Some(function.symbol.clone()),
154                signature: Some(function.signature.clone()),
155                line_range: Some(function.line_range.clone()),
156                description: if let Some(owner) = &function.owner {
157                    format!("Python method {owner}.{}", function.name)
158                } else {
159                    format!("Python function {}", function.name)
160                },
161                risk: infer_risk(&function.symbol),
162            });
163        }
164
165        Ok(targets)
166    }
167
168    fn generate_tests(
169        &self,
170        target: &VerificationTarget,
171        plan: &VerificationPlan,
172    ) -> Result<Vec<GeneratedArtifact>> {
173        let root = std::env::current_dir()?;
174        let functions = discover_functions(&root).unwrap_or_default();
175        let mut artifacts = vec![symbol_graph_artifact(
176            &target.id,
177            &target.path,
178            target.symbol.as_deref(),
179            &functions,
180        )?];
181        if plan
182            .strategies
183            .contains(&VerificationStrategy::MutationChecks)
184        {
185            artifacts.push(python_mutation_manifest_artifact(
186                target,
187                target.symbol.as_deref(),
188                &functions,
189            )?);
190        }
191        if plan
192            .strategies
193            .contains(&VerificationStrategy::PropertyTests)
194        {
195            if let Some(artifact) =
196                python_hypothesis_property_artifact(target, target.symbol.as_deref(), &functions)?
197            {
198                artifacts.push(artifact);
199            }
200        }
201        Ok(artifacts)
202    }
203
204    fn run_tests(
205        &self,
206        root: &Path,
207        artifacts: &[GeneratedArtifact],
208        _plan: &VerificationPlan,
209    ) -> Result<TestRunResult> {
210        let start = Instant::now();
211        let (runner_name, args) = python_test_args(root);
212        let command = run_command(
213            root,
214            "python3",
215            args.iter().map(String::as_str),
216            self.config.command_timeout_seconds,
217        )?;
218        let status = command.status.clone();
219        let failures = if command.status == RunStatus::Failed {
220            vec![Failure {
221                id: None,
222                message: format!("python {runner_name} failed"),
223                severity: FailureSeverity::Error,
224                target_id: artifacts.first().map(|artifact| artifact.target_id.clone()),
225                artifact_id: artifacts.first().map(|artifact| artifact.id.clone()),
226                command: command_line(&command.program, &command.args),
227                stdout_excerpt: excerpt(&command.stdout),
228                stderr_excerpt: excerpt(&command.stderr),
229                repro: Some(ReproCase {
230                    command: command_line(&command.program, &command.args),
231                    input: None,
232                    path: None,
233                }),
234            }]
235        } else {
236            Vec::new()
237        };
238        let mut commands = vec![command];
239        let mut failures = failures;
240        let mut status = status;
241        if artifacts
242            .iter()
243            .any(|artifact| artifact.kind == ArtifactKind::PropertyTest)
244        {
245            let property = run_python_property_artifacts(root, artifacts, &self.config)?;
246            if property.status == RunStatus::Failed {
247                status = RunStatus::Failed;
248            }
249            commands.extend(property.commands);
250            failures.extend(property.failures);
251        }
252        let mutation = if artifacts
253            .iter()
254            .any(|artifact| artifact.kind == ArtifactKind::MutationCheck)
255        {
256            let mutation = run_python_mutations(root, artifacts, &self.config)?;
257            if mutation.status == RunStatus::Failed {
258                status = RunStatus::Failed;
259            }
260            commands.extend(mutation.commands);
261            failures.extend(mutation.failures);
262            mutation.quality
263        } else {
264            VerificationQuality::default()
265        };
266
267        Ok(TestRunResult {
268            language: "python".to_string(),
269            status,
270            commands,
271            failures,
272            duration_ms: start.elapsed().as_millis(),
273            quality: mutation,
274        })
275    }
276
277    fn collect_coverage(&self, root: &Path) -> Result<Option<CoverageReport>> {
278        if !self.config.coverage_enabled {
279            return Ok(Some(CoverageReport {
280                tool: "python coverage".to_string(),
281                summary: "not collected: disabled by Python plugin config".to_string(),
282                files: vec![],
283            }));
284        }
285        if !python_module_available(root, "coverage") {
286            return Ok(Some(CoverageReport {
287                tool: "python coverage.py".to_string(),
288                summary: "not collected: coverage.py is not available".to_string(),
289                files: vec![],
290            }));
291        }
292
293        let veritas_dir = root.join(".veritas");
294        fs::create_dir_all(&veritas_dir)
295            .with_context(|| format!("failed to create {}", veritas_dir.display()))?;
296        let data_file = ".veritas/.coverage";
297        let json_file = ".veritas/python_coverage.json";
298        let (_, test_args) = python_test_args(root);
299        let mut run_args = vec![
300            "-m".to_string(),
301            "coverage".to_string(),
302            "run".to_string(),
303            "--data-file".to_string(),
304            data_file.to_string(),
305        ];
306        run_args.extend(test_args);
307        let run = run_command(
308            root,
309            "python3",
310            run_args.iter().map(String::as_str),
311            self.config.command_timeout_seconds,
312        )?;
313        if run.status == RunStatus::Failed {
314            return Ok(Some(CoverageReport {
315                tool: "python coverage.py".to_string(),
316                summary: format!("not collected: {}", excerpt(&run.stderr)),
317                files: vec![],
318            }));
319        }
320        let json_args = [
321            "-m",
322            "coverage",
323            "json",
324            "--data-file",
325            data_file,
326            "-o",
327            json_file,
328        ];
329        let json = run_command(
330            root,
331            "python3",
332            json_args,
333            self.config.command_timeout_seconds,
334        )?;
335        if json.status == RunStatus::Failed {
336            return Ok(Some(CoverageReport {
337                tool: "python coverage.py".to_string(),
338                summary: format!("not collected: {}", excerpt(&json.stderr)),
339                files: vec![],
340            }));
341        }
342        Ok(Some(python_coverage_report(root, &root.join(json_file))?))
343    }
344
345    fn replay_behavior(
346        &self,
347        root: &Path,
348        target: &VerificationTarget,
349        case: &BehaviorReplayCase,
350    ) -> Result<Option<BehaviorReplayObservation>> {
351        let functions = discover_functions(root)?;
352        let Some(function) = functions
353            .iter()
354            .find(|function| python_target_matches_function(&target.id, function))
355        else {
356            return Ok(None);
357        };
358        replay_python_function(root, function, case, &self.config).map(Some)
359    }
360
361    fn replay_behaviors(
362        &self,
363        root: &Path,
364        target: &VerificationTarget,
365        cases: &[BehaviorReplayCase],
366    ) -> Result<BTreeMap<String, BehaviorReplayObservation>> {
367        let functions = discover_functions(root)?;
368        let Some(function) = functions
369            .iter()
370            .find(|function| python_target_matches_function(&target.id, function))
371        else {
372            return Ok(BTreeMap::new());
373        };
374        replay_python_function_batch(root, function, cases, &self.config)
375    }
376}
377
378fn discover_functions(root: &Path) -> Result<Vec<PythonFunction>> {
379    let mut parser = Parser::new();
380    parser
381        .set_language(&tree_sitter_python::LANGUAGE.into())
382        .map_err(|error| anyhow!("failed to load tree-sitter Python grammar: {error}"))?;
383
384    let mut functions = Vec::new();
385    for entry in WalkDir::new(root)
386        .into_iter()
387        .filter_entry(|entry| !is_ignored(entry.path(), entry.file_name()))
388    {
389        let entry = entry?;
390        if !entry.file_type().is_file()
391            || entry.path().extension() != Some(OsStr::new("py"))
392            || is_python_test_file(entry.path())
393        {
394            continue;
395        }
396        let contents = fs::read_to_string(entry.path())
397            .with_context(|| format!("failed to read {}", entry.path().display()))?;
398        let tree = parser
399            .parse(&contents, None)
400            .ok_or_else(|| anyhow!("failed to parse Python source {}", entry.path().display()))?;
401        let path = relative_utf8(root, entry.path())?;
402        collect_python_functions(tree.root_node(), &contents, &path, &mut functions)?;
403    }
404    Ok(functions)
405}
406
407fn collect_python_functions(
408    node: Node<'_>,
409    source: &str,
410    path: &Utf8PathBuf,
411    functions: &mut Vec<PythonFunction>,
412) -> Result<()> {
413    if matches!(
414        node.kind(),
415        "function_definition" | "async_function_definition"
416    ) {
417        if let Some(function) = parse_python_function(node, source, path)? {
418            functions.push(function);
419        }
420    }
421
422    let mut cursor = node.walk();
423    for child in node.children(&mut cursor) {
424        collect_python_functions(child, source, path, functions)?;
425    }
426    Ok(())
427}
428
429fn parse_python_function(
430    node: Node<'_>,
431    source: &str,
432    path: &Utf8PathBuf,
433) -> Result<Option<PythonFunction>> {
434    let Some(name_node) = node.child_by_field_name("name") else {
435        return Ok(None);
436    };
437    let name = node_text(name_node, source)?.to_string();
438    if name.starts_with('_') {
439        return Ok(None);
440    }
441    let owner = python_function_owner(node, source)?;
442    let symbol = owner
443        .as_ref()
444        .map(|owner| format!("{owner}.{name}"))
445        .unwrap_or_else(|| name.clone());
446    let signature = signature_text(node, source)?.trim().to_string();
447    let params = node
448        .child_by_field_name("parameters")
449        .map(|parameters| parse_python_params(node_text(parameters, source).unwrap_or_default()))
450        .unwrap_or_default();
451    Ok(Some(PythonFunction {
452        path: path.clone(),
453        name,
454        symbol,
455        owner,
456        params,
457        signature,
458        line_range: LineRange {
459            start: node.start_position().row + 1,
460            end: node.end_position().row + 1,
461        },
462        start_byte: node.start_byte(),
463        end_byte: node.end_byte(),
464        calls: python_calls_in_function(node, source)?,
465    }))
466}
467
468fn python_function_owner(node: Node<'_>, source: &str) -> Result<Option<String>> {
469    let mut current = node.parent();
470    while let Some(parent) = current {
471        if parent.kind() == "class_definition" {
472            return Ok(parent
473                .child_by_field_name("name")
474                .map(|node| node_text(node, source).unwrap_or_default().to_string()));
475        }
476        current = parent.parent();
477    }
478    Ok(None)
479}
480
481fn parse_python_params(params: &str) -> Vec<String> {
482    params
483        .trim()
484        .trim_start_matches('(')
485        .trim_end_matches(')')
486        .split(',')
487        .filter_map(|param| {
488            let name = param
489                .split([':', '='])
490                .next()
491                .unwrap_or_default()
492                .trim()
493                .trim_start_matches('*');
494            (!name.is_empty()).then_some(name.to_string())
495        })
496        .collect()
497}
498
499fn python_calls_in_function(node: Node<'_>, source: &str) -> Result<Vec<String>> {
500    let mut calls = Vec::new();
501    collect_python_calls(node, source, &mut calls)?;
502    calls.sort();
503    calls.dedup();
504    Ok(calls)
505}
506
507fn collect_python_calls(node: Node<'_>, source: &str, calls: &mut Vec<String>) -> Result<()> {
508    if node.kind() == "call" {
509        if let Some(function) = node.child_by_field_name("function") {
510            let text = node_text(function, source)?.trim();
511            if !text.is_empty() {
512                calls.push(text.to_string());
513            }
514        }
515    }
516    let mut cursor = node.walk();
517    for child in node.children(&mut cursor) {
518        collect_python_calls(child, source, calls)?;
519    }
520    Ok(())
521}
522
523fn symbol_graph_artifact(
524    target_id: &str,
525    target_path: &Utf8PathBuf,
526    target_symbol: Option<&str>,
527    functions: &[PythonFunction],
528) -> Result<GeneratedArtifact> {
529    let symbols = functions
530        .iter()
531        .filter(|function| {
532            if let Some(symbol) = target_symbol {
533                function.symbol == symbol && function.path == *target_path
534            } else {
535                target_path.as_str() == "." || function.path == *target_path
536            }
537        })
538        .map(|function| PythonSymbolNode {
539            id: format!("python:{}:{}", function.path, function.symbol),
540            path: &function.path,
541            symbol: &function.symbol,
542            name: &function.name,
543            owner: function.owner.as_deref(),
544            signature: &function.signature,
545            line_range: &function.line_range,
546            risk: infer_risk(&function.symbol),
547            calls: &function.calls,
548        })
549        .collect::<Vec<_>>();
550    let contents = serde_json::to_string_pretty(&PythonSymbolGraph { target_id, symbols })?;
551    Ok(GeneratedArtifact {
552        id: format!("python-symbol-{}", safe_ident(target_id)),
553        language: "python".to_string(),
554        kind: ArtifactKind::SymbolGraph,
555        target_id: target_id.to_string(),
556        path: Utf8PathBuf::from(format!(
557            ".veritas/symbols/python_{}.json",
558            safe_ident(target_id)
559        )),
560        contents,
561        description: "Tree-sitter Python symbol graph for selected target".to_string(),
562        status: ArtifactStatus::Planned,
563    })
564}
565
566fn python_mutation_manifest_artifact(
567    target: &VerificationTarget,
568    target_symbol: Option<&str>,
569    functions: &[PythonFunction],
570) -> Result<GeneratedArtifact> {
571    let selected = functions
572        .iter()
573        .filter(|function| {
574            if let Some(symbol) = target_symbol {
575                function.symbol == symbol && function.path == target.path
576            } else {
577                target.path.as_str() == "." || function.path == target.path
578            }
579        })
580        .map(|function| {
581            serde_json::json!({
582                "id": format!("python:{}:{}", function.path, function.symbol),
583                "path": &function.path,
584                "symbol": &function.symbol,
585                "signature": &function.signature,
586                "line_range": &function.line_range,
587                "risk": infer_risk(&function.symbol),
588                "operators": [
589                    "comparison_boundary",
590                    "boolean_guard",
591                    "string_normalization",
592                    "exception_path",
593                    "numeric_limit"
594                ],
595            })
596        })
597        .collect::<Vec<_>>();
598    let contents = serde_json::to_string_pretty(&serde_json::json!({
599        "version": 1,
600        "language": "python",
601        "mode": "mutation_manifest",
602        "target_id": target.id,
603        "status": "planned",
604        "targets": selected,
605        "next_step": "Python mutation execution uses this manifest as the stable target/operator contract for generated tests and AI repair loops.",
606    }))?;
607    Ok(GeneratedArtifact {
608        id: format!("python-mutation-{}", safe_ident(&target.id)),
609        language: "python".to_string(),
610        kind: ArtifactKind::MutationCheck,
611        target_id: target.id.clone(),
612        path: Utf8PathBuf::from(format!(
613            ".veritas/mutations/python_{}.json",
614            safe_ident(&target.id)
615        )),
616        contents,
617        description: "Tree-sitter Python mutation target/operator manifest".to_string(),
618        status: ArtifactStatus::Planned,
619    })
620}
621
622fn python_hypothesis_property_artifact(
623    target: &VerificationTarget,
624    target_symbol: Option<&str>,
625    functions: &[PythonFunction],
626) -> Result<Option<GeneratedArtifact>> {
627    let selected = functions
628        .iter()
629        .filter(|function| {
630            if let Some(symbol) = target_symbol {
631                function.symbol == symbol && function.path == target.path
632            } else {
633                target.path.as_str() == "." || function.path == target.path
634            }
635        })
636        .filter(|function| {
637            !function.params.is_empty()
638                && function.owner.is_none()
639                && function.params.len() <= 3
640                && function
641                    .params
642                    .iter()
643                    .all(|param| !matches!(param.as_str(), "self" | "cls"))
644        })
645        .take(4)
646        .collect::<Vec<_>>();
647    if selected.is_empty() {
648        return Ok(None);
649    }
650
651    let mut contents = String::from(
652        "# Generated by veritas. Review before committing.\n\
653         # Requires: python3 -m pip install hypothesis\n\
654         \n\
655         from hypothesis import given, strategies as st\n\
656         \n\
657         def _veritas_observe(call):\n\
658             try:\n\
659                 return (\"ok\", call())\n\
660             except Exception as exc:\n\
661                 return (\"exception\", type(exc).__name__, str(exc))\n\
662         \n",
663    );
664    for function in selected {
665        let module = python_module_name(&function.path);
666        let name = &function.name;
667        contents.push_str(&format!("from {module} import {name}\n"));
668        let params = function.params.join(", ");
669        let strategies = function
670            .params
671            .iter()
672            .map(|param| format!("{param}={}", hypothesis_strategy_for_param(param)))
673            .collect::<Vec<_>>()
674            .join(", ");
675        contents.push_str(&format!(
676            "\n@given({strategies})\n\
677             def test_veritas_{name}_does_not_panic({params}):\n\
678                 first = _veritas_observe(lambda: {name}({params}))\n\
679                 second = _veritas_observe(lambda: {name}({params}))\n\
680                 assert first == second  # is_deterministic\n\
681                 assert first[0] in (\"ok\", \"exception\")  # prop_assert does_not_panic\n"
682        ));
683    }
684
685    Ok(Some(GeneratedArtifact {
686        id: format!("python-property-{}", safe_ident(&target.id)),
687        language: "python".to_string(),
688        kind: ArtifactKind::PropertyTest,
689        target_id: target.id.clone(),
690        path: Utf8PathBuf::from(format!(
691            ".veritas/properties/python_{}.py",
692            safe_ident(&target.id)
693        )),
694        contents,
695        description: "Reviewable Hypothesis property candidates for Python targets".to_string(),
696        status: ArtifactStatus::Planned,
697    }))
698}
699
700fn hypothesis_strategy_for_param(param: &str) -> String {
701    let lowered = param.to_ascii_lowercase();
702    if lowered.contains("cents")
703        || lowered.contains("amount")
704        || lowered.contains("total")
705        || lowered.contains("count")
706        || lowered.contains("limit")
707        || lowered.contains("price")
708    {
709        "st.integers(min_value=-1_000_000, max_value=1_000_000)".to_string()
710    } else if lowered.contains("enabled")
711        || lowered.starts_with("is_")
712        || lowered.starts_with("has_")
713        || lowered.starts_with("can_")
714    {
715        "st.booleans()".to_string()
716    } else {
717        "st.text(max_size=64)".to_string()
718    }
719}
720
721fn python_module_name(path: &Utf8PathBuf) -> String {
722    path.as_str()
723        .trim_end_matches(".py")
724        .replace(['/', '\\'], ".")
725        .trim_end_matches(".__init__")
726        .to_string()
727}
728
729fn run_python_property_artifacts(
730    root: &Path,
731    artifacts: &[GeneratedArtifact],
732    config: &PythonPluginConfig,
733) -> Result<TestRunResult> {
734    let start = Instant::now();
735    let property_paths = artifacts
736        .iter()
737        .filter(|artifact| artifact.kind == ArtifactKind::PropertyTest)
738        .map(|artifact| artifact.path.to_string())
739        .collect::<Vec<_>>();
740    if property_paths.is_empty() {
741        return Ok(TestRunResult {
742            language: "python".to_string(),
743            status: RunStatus::Skipped,
744            commands: vec![],
745            failures: vec![],
746            duration_ms: 0,
747            quality: VerificationQuality::default(),
748        });
749    }
750    if !python_module_available(root, "hypothesis") {
751        return Ok(TestRunResult {
752            language: "python".to_string(),
753            status: RunStatus::Skipped,
754            commands: vec![skipped_python_command(
755                root,
756                vec![
757                    "-m".to_string(),
758                    "pytest".to_string(),
759                    "-q".to_string(),
760                    ".veritas/properties".to_string(),
761                ],
762                "skipped: hypothesis is not installed",
763            )?],
764            failures: vec![],
765            duration_ms: start.elapsed().as_millis(),
766            quality: VerificationQuality::default(),
767        });
768    }
769    if !python_module_available(root, "pytest") {
770        return Ok(TestRunResult {
771            language: "python".to_string(),
772            status: RunStatus::Skipped,
773            commands: vec![skipped_python_command(
774                root,
775                vec![
776                    "-m".to_string(),
777                    "pytest".to_string(),
778                    "-q".to_string(),
779                    ".veritas/properties".to_string(),
780                ],
781                "skipped: pytest is not installed",
782            )?],
783            failures: vec![],
784            duration_ms: start.elapsed().as_millis(),
785            quality: VerificationQuality::default(),
786        });
787    }
788
789    let mut args = vec!["-m".to_string(), "pytest".to_string(), "-q".to_string()];
790    args.extend(property_paths);
791    let command = run_command(
792        root,
793        "python3",
794        args.iter().map(String::as_str),
795        config.command_timeout_seconds.min(60),
796    )?;
797    let failures = if command.status == RunStatus::Failed {
798        vec![Failure {
799            id: None,
800            message: "python hypothesis property candidates failed".to_string(),
801            severity: FailureSeverity::Warning,
802            target_id: artifacts
803                .iter()
804                .find(|artifact| artifact.kind == ArtifactKind::PropertyTest)
805                .map(|artifact| artifact.target_id.clone()),
806            artifact_id: artifacts
807                .iter()
808                .find(|artifact| artifact.kind == ArtifactKind::PropertyTest)
809                .map(|artifact| artifact.id.clone()),
810            command: command_line(&command.program, &command.args),
811            stdout_excerpt: excerpt(&command.stdout),
812            stderr_excerpt: excerpt(&command.stderr),
813            repro: Some(ReproCase {
814                command: command_line(&command.program, &command.args),
815                input: None,
816                path: Some(Utf8PathBuf::from(".veritas/properties")),
817            }),
818        }]
819    } else {
820        Vec::new()
821    };
822    let status = command.status.clone();
823    Ok(TestRunResult {
824        language: "python".to_string(),
825        status,
826        commands: vec![command],
827        failures,
828        duration_ms: start.elapsed().as_millis(),
829        quality: VerificationQuality::default(),
830    })
831}
832
833fn skipped_python_command(root: &Path, args: Vec<String>, stderr: &str) -> Result<CommandRecord> {
834    Ok(CommandRecord {
835        program: "python3".to_string(),
836        args,
837        cwd: utf8_path(root)?,
838        exit_code: None,
839        status: RunStatus::Skipped,
840        stdout: String::new(),
841        stderr: stderr.to_string(),
842        duration_ms: 0,
843    })
844}
845
846fn run_python_mutations(
847    root: &Path,
848    artifacts: &[GeneratedArtifact],
849    config: &PythonPluginConfig,
850) -> Result<TestRunResult> {
851    let start = Instant::now();
852    let functions = discover_functions(root)?;
853    let candidates = python_mutation_candidates(root, &functions, artifacts)?
854        .into_iter()
855        .filter(|candidate| python_mutation_candidate_in_shard(candidate, &config.mutation))
856        .filter(|candidate| {
857            config.mutation.report_filtered || python_mutation_candidate_allowed(candidate, config)
858        })
859        .collect::<Vec<_>>();
860    let mut quality = VerificationQuality::default();
861    quality.mutation.generated = candidates.len();
862    quality.mutation.effective_workers = 1;
863    let mut commands = Vec::new();
864    let mut failures = Vec::new();
865    let mut status = RunStatus::Passed;
866    let run_dir = start_mutation_run(root, "python")?;
867
868    for candidate in candidates.into_iter().take(8) {
869        let domain = python_mutation_domain(&candidate);
870        let operator = python_mutation_operator(&candidate.label);
871        record_mutation_generated(&mut quality.mutation.by_domain, &domain);
872        record_mutation_generated(&mut quality.mutation.by_operator, &operator);
873        if !python_mutation_candidate_allowed(&candidate, config) {
874            let mut record = python_mutation_record(
875                &candidate,
876                &domain,
877                &operator,
878                MutationStatus::Skipped,
879                None,
880                0,
881            );
882            record.skip_reason = Some("filtered by mutation config".to_string());
883            persist_mutation_record_artifacts(root, &run_dir, &mut record, None)?;
884            quality.mutation.records.push(record);
885            continue;
886        }
887        if config.mutation.dry_run {
888            quality.mutation.runnable += 1;
889            record_mutation_runnable(&mut quality.mutation.by_domain, &domain);
890            record_mutation_runnable(&mut quality.mutation.by_operator, &operator);
891            let mut record = python_mutation_record(
892                &candidate,
893                &domain,
894                &operator,
895                MutationStatus::Runnable,
896                None,
897                0,
898            );
899            persist_mutation_record_artifacts(root, &run_dir, &mut record, None)?;
900            quality.mutation.records.push(record);
901            continue;
902        }
903        quality.mutation.runnable += 1;
904        quality.mutation.executed += 1;
905        record_mutation_runnable(&mut quality.mutation.by_domain, &domain);
906        record_mutation_runnable(&mut quality.mutation.by_operator, &operator);
907        record_mutation_executed(&mut quality.mutation.by_domain, &domain);
908        record_mutation_executed(&mut quality.mutation.by_operator, &operator);
909
910        let path = root.join(&candidate.path);
911        let original = fs::read_to_string(&path)
912            .with_context(|| format!("failed to read {}", path.display()))?;
913        let mut mutated = original.clone();
914        mutated.replace_range(candidate.start_byte..candidate.end_byte, &candidate.to);
915        fs::write(&path, mutated).with_context(|| {
916            format!(
917                "failed to write Python mutation {} in {}",
918                candidate.label,
919                path.display()
920            )
921        })?;
922        let (_, args) = python_test_args(root);
923        let command = run_command(
924            root,
925            "python3",
926            args.iter().map(String::as_str),
927            config.command_timeout_seconds.min(60),
928        );
929        fs::write(&path, original)
930            .with_context(|| format!("failed to restore {}", path.display()))?;
931        let command = command?;
932        let command_text = command_line(&command.program, &command.args);
933        let mutation_status = if command.status == RunStatus::Passed {
934            quality.mutation.survived += 1;
935            record_mutation_survived(&mut quality.mutation.by_domain, &domain);
936            record_mutation_survived(&mut quality.mutation.by_operator, &operator);
937            status = RunStatus::Failed;
938            failures.push(python_mutation_failure(
939                artifacts,
940                &candidate,
941                &command,
942                &command_text,
943            ));
944            MutationStatus::Lived
945        } else if python_mutation_not_viable(&command) {
946            quality.mutation.not_viable += 1;
947            record_mutation_not_viable(&mut quality.mutation.by_domain, &domain);
948            record_mutation_not_viable(&mut quality.mutation.by_operator, &operator);
949            MutationStatus::NotViable
950        } else {
951            quality.mutation.killed += 1;
952            record_mutation_killed(&mut quality.mutation.by_domain, &domain);
953            record_mutation_killed(&mut quality.mutation.by_operator, &operator);
954            MutationStatus::Killed
955        };
956        let mut record = python_mutation_record(
957            &candidate,
958            &domain,
959            &operator,
960            mutation_status,
961            Some(&command_text),
962            command.duration_ms,
963        );
964        persist_mutation_record_artifacts(root, &run_dir, &mut record, Some(&command))?;
965        quality.mutation.records.push(record);
966        commands.push(command);
967    }
968    finalize_mutation_skips(&mut quality.mutation.by_domain);
969    finalize_mutation_skips(&mut quality.mutation.by_operator);
970    finalize_mutation_metrics(&mut quality.mutation);
971
972    Ok(TestRunResult {
973        language: "python".to_string(),
974        status,
975        commands,
976        failures,
977        duration_ms: start.elapsed().as_millis(),
978        quality,
979    })
980}
981
982fn python_mutation_candidates(
983    root: &Path,
984    functions: &[PythonFunction],
985    artifacts: &[GeneratedArtifact],
986) -> Result<Vec<PythonMutationCandidate>> {
987    let mut candidates = Vec::new();
988    for function in functions {
989        if !python_function_selected_for_mutation(function, artifacts) {
990            continue;
991        }
992        let path = root.join(&function.path);
993        let contents = fs::read_to_string(&path)
994            .with_context(|| format!("failed to read {}", path.display()))?;
995        if function_has_mutation_skip_annotation(&contents, function.start_byte, function.end_byte)
996        {
997            continue;
998        }
999        push_python_mutation(
1000            &contents,
1001            function,
1002            "<=",
1003            "<",
1004            "boundary comparison mutation",
1005            &mut candidates,
1006        );
1007        push_python_mutation(
1008            &contents,
1009            function,
1010            " or ",
1011            " and ",
1012            "permission boolean connector mutation",
1013            &mut candidates,
1014        );
1015        push_python_mutation(
1016            &contents,
1017            function,
1018            " and ",
1019            " or ",
1020            "boolean connector mutation",
1021            &mut candidates,
1022        );
1023        push_python_mutation(
1024            &contents,
1025            function,
1026            "return 0, False",
1027            "return 1, False",
1028            "default tuple mutation",
1029            &mut candidates,
1030        );
1031        for (from, to, label) in [
1032            ("FOR UPDATE", "", "database isolation_lock mutation"),
1033            ("tenant_id", "1", "database tenant_filter mutation"),
1034            (
1035                "idempotency_key",
1036                "request_id",
1037                "database idempotency mutation",
1038            ),
1039            (
1040                "asyncio.create_task",
1041                "await",
1042                "concurrency_lifecycle task_spawn mutation",
1043            ),
1044            (
1045                ".acquire()",
1046                ".locked()",
1047                "synchronization lock_mode mutation",
1048            ),
1049            ("retry(", "once(", "retry_resilience retry_attempt mutation"),
1050            ("time.time()", "0", "testability injected_clock mutation"),
1051            (
1052                "random.",
1053                "deterministic_random.",
1054                "testability injected_randomness mutation",
1055            ),
1056            (
1057                "sorted(",
1058                "list(",
1059                "brittleness equivalent_ordering mutation",
1060            ),
1061        ] {
1062            push_python_mutation(&contents, function, from, to, label, &mut candidates);
1063        }
1064    }
1065    candidates.sort_by(|left, right| {
1066        left.path
1067            .cmp(&right.path)
1068            .then(left.start_byte.cmp(&right.start_byte))
1069            .then(left.label.cmp(&right.label))
1070    });
1071    candidates.dedup_by(|left, right| {
1072        left.path == right.path
1073            && left.start_byte == right.start_byte
1074            && left.end_byte == right.end_byte
1075            && left.to == right.to
1076    });
1077    Ok(candidates)
1078}
1079
1080fn function_has_mutation_skip_annotation(source: &str, start_byte: usize, end_byte: usize) -> bool {
1081    source
1082        .get(start_byte..end_byte)
1083        .is_some_and(|body| body.contains("veritas:skip-mutation"))
1084}
1085
1086fn push_python_mutation(
1087    contents: &str,
1088    function: &PythonFunction,
1089    from: &str,
1090    to: &str,
1091    label: &str,
1092    candidates: &mut Vec<PythonMutationCandidate>,
1093) {
1094    let Some(source) = contents.get(function.start_byte..function.end_byte) else {
1095        return;
1096    };
1097    let Some(offset) = source.find(from) else {
1098        return;
1099    };
1100    let start_byte = function.start_byte + offset;
1101    candidates.push(PythonMutationCandidate {
1102        path: function.path.clone(),
1103        function: function.symbol.clone(),
1104        label: label.to_string(),
1105        from: from.to_string(),
1106        to: to.to_string(),
1107        start_byte,
1108        end_byte: start_byte + from.len(),
1109        line_range: function.line_range.clone(),
1110    });
1111}
1112
1113fn python_function_selected_for_mutation(
1114    function: &PythonFunction,
1115    artifacts: &[GeneratedArtifact],
1116) -> bool {
1117    artifacts
1118        .iter()
1119        .filter(|artifact| artifact.kind == ArtifactKind::MutationCheck)
1120        .any(|artifact| {
1121            artifact.target_id == "python:project"
1122                || artifact.target_id == format!("python:{}", function.path)
1123                || artifact.target_id == format!("python:{}:{}", function.path, function.symbol)
1124        })
1125}
1126
1127fn python_mutation_failure(
1128    artifacts: &[GeneratedArtifact],
1129    candidate: &PythonMutationCandidate,
1130    command: &CommandRecord,
1131    command_text: &str,
1132) -> Failure {
1133    Failure {
1134        id: None,
1135        message: format!(
1136            "mutation survived in Python function `{}`: {}",
1137            candidate.function, candidate.label
1138        ),
1139        severity: FailureSeverity::Warning,
1140        target_id: Some(format!("python:{}:{}", candidate.path, candidate.function)),
1141        artifact_id: artifacts
1142            .iter()
1143            .find(|artifact| artifact.kind == ArtifactKind::MutationCheck)
1144            .map(|artifact| artifact.id.clone()),
1145        command: command_text.to_string(),
1146        stdout_excerpt: excerpt(&command.stdout),
1147        stderr_excerpt: excerpt(&command.stderr),
1148        repro: Some(ReproCase {
1149            command: format!(
1150                "replace `{}` with `{}` in {} and run {}",
1151                candidate.from, candidate.to, candidate.path, command_text
1152            ),
1153            input: None,
1154            path: Some(candidate.path.clone()),
1155        }),
1156    }
1157}
1158
1159fn python_mutation_record(
1160    candidate: &PythonMutationCandidate,
1161    domain: &str,
1162    operator: &str,
1163    status: MutationStatus,
1164    command: Option<&str>,
1165    duration_ms: u128,
1166) -> MutationRecord {
1167    MutationRecord {
1168        id: python_mutation_candidate_id(candidate),
1169        language: "python".to_string(),
1170        path: candidate.path.clone(),
1171        symbol: candidate.function.clone(),
1172        operator: operator.to_string(),
1173        domain: domain.to_string(),
1174        status,
1175        from: Some(candidate.from.clone()),
1176        to: Some(candidate.to.clone()),
1177        line_range: Some(candidate.line_range.clone()),
1178        source_span: Some(SourceSpan {
1179            start_byte: candidate.start_byte,
1180            end_byte: candidate.end_byte,
1181        }),
1182        diff: Some(python_mutation_diff(candidate)),
1183        diff_path: None,
1184        outcome_path: None,
1185        command_log_path: None,
1186        stdout_log_path: None,
1187        stderr_log_path: None,
1188        risk_note: Some(mutation_taxonomy::risk_note(domain, operator).to_string()),
1189        suggested_test: Some(mutation_taxonomy::suggested_test(domain, operator).to_string()),
1190        skip_reason: mutation_skip_reason(status),
1191        selected_test_command: command.map(ToString::to_string),
1192        test_selection_hint: None,
1193        test_selection_fallback: None,
1194        brittleness_probe: domain == "brittleness",
1195        command: command.map(ToString::to_string),
1196        duration_ms,
1197    }
1198}
1199
1200fn python_mutation_candidate_id(candidate: &PythonMutationCandidate) -> String {
1201    format!(
1202        "python:{}:{}:{}:{}",
1203        candidate.path, candidate.function, candidate.start_byte, candidate.end_byte
1204    )
1205}
1206
1207fn python_mutation_diff(candidate: &PythonMutationCandidate) -> String {
1208    format!(
1209        "--- {}\n+++ {}\n@@ bytes {}..{} @@\n-{}\n+{}",
1210        candidate.path,
1211        candidate.path,
1212        candidate.start_byte,
1213        candidate.end_byte,
1214        candidate.from,
1215        candidate.to
1216    )
1217}
1218
1219fn mutation_skip_reason(status: MutationStatus) -> Option<String> {
1220    match status {
1221        MutationStatus::NotCovered => Some("no selected tests cover this mutant".to_string()),
1222        MutationStatus::Skipped => Some("mutation was skipped before execution".to_string()),
1223        MutationStatus::TimedOut => Some(
1224            "mutation test command timed out; consider filtering this operator/mutant or adding deterministic test seams for recurring hangs"
1225                .to_string(),
1226        ),
1227        MutationStatus::NotViable => Some("mutation did not compile or could not run".to_string()),
1228        _ => None,
1229    }
1230}
1231
1232fn python_mutation_domain(candidate: &PythonMutationCandidate) -> String {
1233    let value = format!("{} {}", candidate.function, candidate.label).to_ascii_lowercase();
1234    mutation_taxonomy::normalize_domain(&value).to_string()
1235}
1236
1237fn python_mutation_operator(label: &str) -> String {
1238    mutation_taxonomy::normalize_operator(label).to_string()
1239}
1240
1241fn python_mutation_candidate_allowed(
1242    candidate: &PythonMutationCandidate,
1243    config: &PythonPluginConfig,
1244) -> bool {
1245    if !config.mutation.include_paths.is_empty()
1246        && !config
1247            .mutation
1248            .include_paths
1249            .iter()
1250            .any(|pattern| text_matches(candidate.path.as_str(), pattern))
1251    {
1252        return false;
1253    }
1254    if config
1255        .mutation
1256        .exclude_paths
1257        .iter()
1258        .any(|pattern| text_matches(candidate.path.as_str(), pattern))
1259    {
1260        return false;
1261    }
1262    if !config.mutation.include_symbols.is_empty()
1263        && !config
1264            .mutation
1265            .include_symbols
1266            .iter()
1267            .any(|pattern| text_matches(&candidate.function, pattern))
1268    {
1269        return false;
1270    }
1271    if config
1272        .mutation
1273        .exclude_symbols
1274        .iter()
1275        .any(|pattern| text_matches(&candidate.function, pattern))
1276    {
1277        return false;
1278    }
1279    let target_id = python_mutation_candidate_target_id(candidate);
1280    if !config.mutation.include_target_ids.is_empty()
1281        && !config
1282            .mutation
1283            .include_target_ids
1284            .iter()
1285            .any(|pattern| text_matches(&target_id, pattern))
1286    {
1287        return false;
1288    }
1289    if config
1290        .mutation
1291        .exclude_target_ids
1292        .iter()
1293        .any(|pattern| text_matches(&target_id, pattern))
1294    {
1295        return false;
1296    }
1297    let id = python_mutation_candidate_id(candidate);
1298    if !config.mutation.include_mutant_ids.is_empty()
1299        && !config
1300            .mutation
1301            .include_mutant_ids
1302            .iter()
1303            .any(|configured| text_matches(&id, configured))
1304    {
1305        return false;
1306    }
1307    if config
1308        .mutation
1309        .exclude_mutant_ids
1310        .iter()
1311        .any(|configured| text_matches(&id, configured))
1312    {
1313        return false;
1314    }
1315    let domain = python_mutation_domain(candidate);
1316    if !config.mutation.enabled_domains.is_empty()
1317        && !config
1318            .mutation
1319            .enabled_domains
1320            .iter()
1321            .any(|enabled| taxonomy_matches(&domain, enabled))
1322    {
1323        return false;
1324    }
1325    if config
1326        .mutation
1327        .disabled_domains
1328        .iter()
1329        .any(|disabled| taxonomy_matches(&domain, disabled))
1330    {
1331        return false;
1332    }
1333    let operator = python_mutation_operator(&candidate.label);
1334    if !config.mutation.enabled_operators.is_empty()
1335        && !config
1336            .mutation
1337            .enabled_operators
1338            .iter()
1339            .any(|enabled| taxonomy_matches(&operator, enabled))
1340    {
1341        return false;
1342    }
1343    !config
1344        .mutation
1345        .disabled_operators
1346        .iter()
1347        .any(|disabled| taxonomy_matches(&operator, disabled))
1348}
1349
1350fn python_mutation_candidate_target_id(candidate: &PythonMutationCandidate) -> String {
1351    format!("python:{}:{}", candidate.path, candidate.function)
1352}
1353
1354fn python_mutation_candidate_in_shard(
1355    candidate: &PythonMutationCandidate,
1356    config: &MutationConfig,
1357) -> bool {
1358    let Some(shard_count) = config.shard_count else {
1359        return true;
1360    };
1361    let shard_index = config.shard_index.unwrap_or(0);
1362    if shard_index >= shard_count {
1363        return false;
1364    }
1365    let mut hasher = DefaultHasher::new();
1366    candidate.path.hash(&mut hasher);
1367    candidate.function.hash(&mut hasher);
1368    candidate.start_byte.hash(&mut hasher);
1369    candidate.end_byte.hash(&mut hasher);
1370    (hasher.finish() as usize % shard_count) == shard_index
1371}
1372
1373fn taxonomy_matches(operator: &str, configured: &str) -> bool {
1374    let configured = configured.replace('-', "_").to_ascii_lowercase();
1375    text_matches(operator, configured.trim())
1376}
1377
1378fn text_matches(text: &str, pattern: &str) -> bool {
1379    if let Some(exact) = pattern.strip_prefix("exact:") {
1380        return text.eq_ignore_ascii_case(exact);
1381    }
1382    if let Some(regex) = pattern.strip_prefix("regex:") {
1383        return regex::RegexBuilder::new(regex)
1384            .case_insensitive(true)
1385            .build()
1386            .is_ok_and(|regex| regex.is_match(text));
1387    }
1388    let pattern = pattern.strip_prefix("glob:").unwrap_or(pattern);
1389    let text = text.to_ascii_lowercase();
1390    let pattern = pattern.to_ascii_lowercase();
1391    if let Some(suffix) = pattern.strip_suffix('$') {
1392        return text.ends_with(suffix);
1393    }
1394    if pattern.contains('*') {
1395        let mut rest = text.as_str();
1396        for part in pattern.split('*').filter(|part| !part.is_empty()) {
1397            let Some(offset) = rest.find(part) else {
1398                return false;
1399            };
1400            rest = &rest[offset + part.len()..];
1401        }
1402        return true;
1403    }
1404    text.contains(&pattern)
1405}
1406
1407fn record_mutation_generated(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1408    metrics.entry(key.to_string()).or_default().generated += 1;
1409}
1410
1411fn record_mutation_runnable(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1412    metrics.entry(key.to_string()).or_default().runnable += 1;
1413}
1414
1415fn record_mutation_executed(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1416    metrics.entry(key.to_string()).or_default().executed += 1;
1417}
1418
1419fn record_mutation_killed(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1420    metrics.entry(key.to_string()).or_default().killed += 1;
1421}
1422
1423fn record_mutation_survived(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1424    metrics.entry(key.to_string()).or_default().survived += 1;
1425}
1426
1427fn record_mutation_not_viable(metrics: &mut BTreeMap<String, MutationAttribution>, key: &str) {
1428    metrics.entry(key.to_string()).or_default().not_viable += 1;
1429}
1430
1431fn finalize_mutation_skips(metrics: &mut BTreeMap<String, MutationAttribution>) {
1432    for metric in metrics.values_mut() {
1433        metric.skipped = metric.generated.saturating_sub(metric.executed);
1434    }
1435}
1436
1437fn python_mutation_not_viable(command: &CommandRecord) -> bool {
1438    let output = format!("{}\n{}", command.stdout, command.stderr).to_ascii_lowercase();
1439    output.contains("syntaxerror")
1440        || output.contains("indentationerror")
1441        || output.contains("nameerror")
1442        || output.contains("typeerror")
1443}
1444
1445fn python_coverage_report(root: &Path, path: &Path) -> Result<CoverageReport> {
1446    let contents =
1447        fs::read_to_string(path).with_context(|| format!("failed to read {}", path.display()))?;
1448    let value: serde_json::Value = serde_json::from_str(&contents)
1449        .with_context(|| format!("failed to parse {}", path.display()))?;
1450    let total = value["totals"]["percent_covered_display"]
1451        .as_str()
1452        .map(ToString::to_string)
1453        .or_else(|| {
1454            value["totals"]["percent_covered"]
1455                .as_f64()
1456                .map(|percent| format!("{percent:.1}"))
1457        })
1458        .unwrap_or_else(|| "unknown".to_string());
1459    let files = value["files"]
1460        .as_object()
1461        .into_iter()
1462        .flat_map(|files| files.iter())
1463        .map(|(path, file)| CoverageFile {
1464            path: Utf8PathBuf::from(path),
1465            line_coverage_percent: file["summary"]["percent_covered"]
1466                .as_f64()
1467                .map(|percent| percent.round().clamp(0.0, 100.0) as u8),
1468            uncovered_ranges: file["missing_lines"]
1469                .as_array()
1470                .into_iter()
1471                .flatten()
1472                .filter_map(|line| line.as_u64())
1473                .map(|line| line.to_string())
1474                .collect(),
1475        })
1476        .collect();
1477    Ok(CoverageReport {
1478        tool: "python coverage.py".to_string(),
1479        summary: format!("line coverage {total}% ({})", relative_display(root, path)),
1480        files,
1481    })
1482}
1483
1484fn relative_display(root: &Path, path: &Path) -> String {
1485    path.strip_prefix(root)
1486        .unwrap_or(path)
1487        .to_string_lossy()
1488        .to_string()
1489}
1490
1491fn replay_python_function(
1492    root: &Path,
1493    function: &PythonFunction,
1494    case: &BehaviorReplayCase,
1495    config: &PythonPluginConfig,
1496) -> Result<BehaviorReplayObservation> {
1497    if function.owner.is_some() {
1498        return Ok(unsupported_python_replay(
1499            "method replay requires receiver construction",
1500        ));
1501    }
1502    if function.params.is_empty() || case.inputs.is_empty() {
1503        return Ok(unsupported_python_replay(
1504            "executable replay requires at least one supported argument with seeded inputs",
1505        ));
1506    }
1507
1508    let script = render_python_replay_script(function, case)?;
1509    let script_path = root.join(".veritas").join("tmp_python_replay.py");
1510    if let Some(parent) = script_path.parent() {
1511        fs::create_dir_all(parent)
1512            .with_context(|| format!("failed to create {}", parent.display()))?;
1513    }
1514    fs::write(&script_path, script)
1515        .with_context(|| format!("failed to write {}", script_path.display()))?;
1516    let command = run_command(
1517        root,
1518        "python3",
1519        [script_path.to_string_lossy().as_ref()],
1520        config.command_timeout_seconds.min(30),
1521    );
1522    let cleanup = fs::remove_file(&script_path)
1523        .with_context(|| format!("failed to remove {}", script_path.display()));
1524    let command = command?;
1525    cleanup?;
1526
1527    let outputs = parse_replay_marker_output(&command.stdout);
1528    let output = if outputs.is_empty() {
1529        serde_json::json!({
1530            "observations": [],
1531            "stderr": excerpt(&command.stderr),
1532        })
1533    } else {
1534        serde_json::json!({ "observations": outputs })
1535    };
1536    let status = if command.status == RunStatus::Passed && !outputs.is_empty() {
1537        BehaviorReplayStatus::Observed
1538    } else {
1539        BehaviorReplayStatus::Failed
1540    };
1541    Ok(BehaviorReplayObservation {
1542        status,
1543        output,
1544        command: Some(command_line(&command.program, &command.args)),
1545        stdout_excerpt: Some(excerpt(&command.stdout)),
1546        stderr_excerpt: Some(excerpt(&command.stderr)),
1547        duration_ms: Some(command.duration_ms),
1548    })
1549}
1550
1551fn replay_python_function_batch(
1552    root: &Path,
1553    function: &PythonFunction,
1554    cases: &[BehaviorReplayCase],
1555    config: &PythonPluginConfig,
1556) -> Result<BTreeMap<String, BehaviorReplayObservation>> {
1557    let mut observations = BTreeMap::new();
1558    if cases.is_empty() {
1559        return Ok(observations);
1560    }
1561    if function.owner.is_some() {
1562        insert_unsupported_python_replay(
1563            &mut observations,
1564            cases,
1565            "method replay requires receiver construction",
1566        );
1567        return Ok(observations);
1568    }
1569    if function.params.is_empty() {
1570        insert_unsupported_python_replay(
1571            &mut observations,
1572            cases,
1573            "executable replay requires at least one supported argument",
1574        );
1575        return Ok(observations);
1576    }
1577
1578    let runnable = cases
1579        .iter()
1580        .filter(|case| {
1581            if case.inputs.is_empty() {
1582                observations.insert(
1583                    case.name.clone(),
1584                    unsupported_python_replay(
1585                        "executable replay requires at least one seeded input",
1586                    ),
1587                );
1588                false
1589            } else {
1590                true
1591            }
1592        })
1593        .collect::<Vec<_>>();
1594    if runnable.is_empty() {
1595        return Ok(observations);
1596    }
1597
1598    let script = render_python_replay_batch_script(function, &runnable)?;
1599    let script_path = root.join(".veritas").join(format!(
1600        "tmp_python_replay_{}_{}.py",
1601        std::process::id(),
1602        safe_ident(&function.symbol)
1603    ));
1604    if let Some(parent) = script_path.parent() {
1605        fs::create_dir_all(parent)
1606            .with_context(|| format!("failed to create {}", parent.display()))?;
1607    }
1608    fs::write(&script_path, script)
1609        .with_context(|| format!("failed to write {}", script_path.display()))?;
1610    let command = run_command(
1611        root,
1612        "python3",
1613        [script_path.to_string_lossy().as_ref()],
1614        config.command_timeout_seconds.min(30),
1615    );
1616    let cleanup = fs::remove_file(&script_path)
1617        .with_context(|| format!("failed to remove {}", script_path.display()));
1618    let command = command?;
1619    cleanup?;
1620
1621    let mut by_case = parse_replay_case_marker_output(&command.stdout);
1622    for case in runnable {
1623        observations.insert(
1624            case.name.clone(),
1625            replay_observation_from_command(
1626                &command,
1627                by_case.remove(&case.name).unwrap_or_default(),
1628            ),
1629        );
1630    }
1631    Ok(observations)
1632}
1633
1634fn render_python_replay_script(
1635    function: &PythonFunction,
1636    case: &BehaviorReplayCase,
1637) -> Result<String> {
1638    let inputs = serde_json::to_string(&case.inputs)?;
1639    Ok(format!(
1640        r#"import importlib.util
1641import json
1642import pathlib
1643import traceback
1644
1645module_path = pathlib.Path({path:?})
1646spec = importlib.util.spec_from_file_location("veritas_replay_target", module_path)
1647module = importlib.util.module_from_spec(spec)
1648spec.loader.exec_module(module)
1649target = getattr(module, {name:?})
1650arity = {arity}
1651
1652def call_args(value):
1653    if arity == 1:
1654        return [value]
1655    if isinstance(value, list) and len(value) == arity:
1656        return value
1657    raise ValueError("replay input does not match target arity")
1658
1659for value in json.loads({inputs:?}):
1660    try:
1661        output = repr(target(*call_args(value)))
1662        status = "observed"
1663    except BaseException as exc:
1664        output = repr(exc)
1665        status = "exception"
1666    print("__VERITAS_REPLAY__{{}}\t{{}}\t{{}}".format(repr(value), status, output))
1667"#,
1668        path = function.path.as_str(),
1669        name = function.name,
1670        arity = function.params.len(),
1671        inputs = inputs
1672    ))
1673}
1674
1675fn render_python_replay_batch_script(
1676    function: &PythonFunction,
1677    cases: &[&BehaviorReplayCase],
1678) -> Result<String> {
1679    let cases = cases
1680        .iter()
1681        .map(|case| {
1682            serde_json::json!({
1683                "name": &case.name,
1684                "inputs": &case.inputs,
1685            })
1686        })
1687        .collect::<Vec<_>>();
1688    let cases = serde_json::to_string(&cases)?;
1689    Ok(format!(
1690        r#"import importlib.util
1691import json
1692import pathlib
1693
1694module_path = pathlib.Path({path:?})
1695spec = importlib.util.spec_from_file_location("veritas_replay_target", module_path)
1696module = importlib.util.module_from_spec(spec)
1697spec.loader.exec_module(module)
1698target = getattr(module, {name:?})
1699arity = {arity}
1700
1701def call_args(value):
1702    if arity == 1:
1703        return [value]
1704    if isinstance(value, list) and len(value) == arity:
1705        return value
1706    raise ValueError("replay input does not match target arity")
1707
1708for case in json.loads({cases:?}):
1709    for value in case["inputs"]:
1710        try:
1711            output = repr(target(*call_args(value)))
1712            status = "observed"
1713        except BaseException as exc:
1714            output = repr(exc)
1715            status = "exception"
1716        print("__VERITAS_REPLAY_CASE__{{}}\t{{}}\t{{}}\t{{}}".format(case["name"], repr(value), status, output))
1717"#,
1718        path = function.path.as_str(),
1719        name = function.name,
1720        arity = function.params.len(),
1721        cases = cases
1722    ))
1723}
1724
1725fn unsupported_python_replay(reason: &str) -> BehaviorReplayObservation {
1726    BehaviorReplayObservation {
1727        status: BehaviorReplayStatus::Unsupported,
1728        output: serde_json::json!({ "reason": reason }),
1729        command: None,
1730        stdout_excerpt: None,
1731        stderr_excerpt: None,
1732        duration_ms: None,
1733    }
1734}
1735
1736fn insert_unsupported_python_replay(
1737    observations: &mut BTreeMap<String, BehaviorReplayObservation>,
1738    cases: &[BehaviorReplayCase],
1739    reason: &str,
1740) {
1741    for case in cases {
1742        observations.insert(case.name.clone(), unsupported_python_replay(reason));
1743    }
1744}
1745
1746fn replay_observation_from_command(
1747    command: &CommandRecord,
1748    outputs: Vec<serde_json::Value>,
1749) -> BehaviorReplayObservation {
1750    let has_outputs = !outputs.is_empty();
1751    let output = if outputs.is_empty() {
1752        serde_json::json!({
1753            "observations": [],
1754            "stderr": excerpt(&command.stderr),
1755        })
1756    } else {
1757        serde_json::json!({ "observations": outputs })
1758    };
1759    let status = if command.status == RunStatus::Passed && has_outputs {
1760        BehaviorReplayStatus::Observed
1761    } else {
1762        BehaviorReplayStatus::Failed
1763    };
1764    BehaviorReplayObservation {
1765        status,
1766        output,
1767        command: Some(command_line(&command.program, &command.args)),
1768        stdout_excerpt: Some(excerpt(&command.stdout)),
1769        stderr_excerpt: Some(excerpt(&command.stderr)),
1770        duration_ms: Some(command.duration_ms),
1771    }
1772}
1773
1774fn python_target_matches_function(target_id: &str, function: &PythonFunction) -> bool {
1775    if target_id == "python:project" {
1776        return true;
1777    }
1778    let Some(rest) = target_id.strip_prefix("python:") else {
1779        return false;
1780    };
1781    if rest == function.path.as_str() {
1782        return true;
1783    }
1784    if let Some((path, symbol)) = rest.rsplit_once(':') {
1785        return path == function.path.as_str() && symbol == function.symbol;
1786    }
1787    false
1788}
1789
1790fn contains_python_file(root: &Path) -> bool {
1791    WalkDir::new(root)
1792        .max_depth(4)
1793        .into_iter()
1794        .filter_entry(|entry| !is_ignored(entry.path(), entry.file_name()))
1795        .filter_map(Result::ok)
1796        .any(|entry| {
1797            entry.file_type().is_file() && entry.path().extension() == Some(OsStr::new("py"))
1798        })
1799}
1800
1801fn is_ignored(path: &Path, name: &OsStr) -> bool {
1802    let name = name.to_string_lossy();
1803    if matches!(
1804        name.as_ref(),
1805        ".git"
1806            | ".hg"
1807            | ".svn"
1808            | ".venv"
1809            | "venv"
1810            | "__pycache__"
1811            | ".mypy_cache"
1812            | ".pytest_cache"
1813            | ".veritas"
1814    ) {
1815        return true;
1816    }
1817    path.components().any(|component| {
1818        matches!(
1819            component.as_os_str().to_string_lossy().as_ref(),
1820            ".git" | ".venv" | "venv" | "__pycache__" | ".veritas"
1821        )
1822    })
1823}
1824
1825fn is_python_test_file(path: &Path) -> bool {
1826    path.file_name()
1827        .and_then(OsStr::to_str)
1828        .is_some_and(|name| name.starts_with("test_") || name.ends_with("_test.py"))
1829        || path
1830            .components()
1831            .any(|component| component.as_os_str() == OsStr::new("tests"))
1832}
1833
1834fn python_test_args(root: &Path) -> (&'static str, Vec<String>) {
1835    python_test_args_with_availability(root, python_module_available(root, "pytest"))
1836}
1837
1838fn python_test_args_with_availability(
1839    root: &Path,
1840    pytest_available: bool,
1841) -> (&'static str, Vec<String>) {
1842    if prefers_pytest(root) && pytest_available {
1843        (
1844            "pytest",
1845            vec!["-m".to_string(), "pytest".to_string(), "-q".to_string()],
1846        )
1847    } else {
1848        (
1849            "unittest",
1850            vec![
1851                "-m".to_string(),
1852                "unittest".to_string(),
1853                "discover".to_string(),
1854            ],
1855        )
1856    }
1857}
1858
1859fn prefers_pytest(root: &Path) -> bool {
1860    root.join("pytest.ini").exists()
1861        || root.join(".pytest.ini").exists()
1862        || config_contains(root.join("pyproject.toml"), "[tool.pytest")
1863        || config_contains(root.join("setup.cfg"), "[tool:pytest]")
1864        || config_contains(root.join("tox.ini"), "[pytest]")
1865        || tests_import_pytest(root)
1866}
1867
1868fn tests_import_pytest(root: &Path) -> bool {
1869    WalkDir::new(root)
1870        .max_depth(4)
1871        .into_iter()
1872        .filter_entry(|entry| !is_ignored(entry.path(), entry.file_name()))
1873        .filter_map(Result::ok)
1874        .filter(|entry| {
1875            entry.file_type().is_file()
1876                && entry.path().extension() == Some(OsStr::new("py"))
1877                && is_python_test_file(entry.path())
1878        })
1879        .any(|entry| {
1880            fs::read_to_string(entry.path()).is_ok_and(|contents| {
1881                contents.contains("import pytest") || contents.contains("from pytest")
1882            })
1883        })
1884}
1885
1886fn config_contains(path: impl AsRef<Path>, needle: &str) -> bool {
1887    fs::read_to_string(path).is_ok_and(|contents| contents.contains(needle))
1888}
1889
1890fn python_module_available(root: &Path, module: &str) -> bool {
1891    Command::new("python3")
1892        .args(["-m", module, "--version"])
1893        .current_dir(root)
1894        .stdout(Stdio::null())
1895        .stderr(Stdio::null())
1896        .status()
1897        .is_ok_and(|status| status.success())
1898}
1899
1900fn infer_risk(name: &str) -> RiskLevel {
1901    let lowered = name.to_ascii_lowercase();
1902    let high_risk_terms = [
1903        "parse",
1904        "auth",
1905        "permission",
1906        "money",
1907        "price",
1908        "invoice",
1909        "token",
1910        "serialize",
1911        "deserialize",
1912        "refund",
1913        "discount",
1914    ];
1915    if high_risk_terms.iter().any(|term| lowered.contains(term)) {
1916        RiskLevel::High
1917    } else {
1918        RiskLevel::Medium
1919    }
1920}
1921
1922fn node_text<'a>(node: Node<'_>, source: &'a str) -> Result<&'a str> {
1923    source
1924        .get(node.byte_range())
1925        .ok_or_else(|| anyhow!("invalid tree-sitter byte range"))
1926}
1927
1928fn signature_text<'a>(node: Node<'_>, source: &'a str) -> Result<&'a str> {
1929    let start = node.start_byte();
1930    let body_start = node
1931        .child_by_field_name("body")
1932        .map(|body| body.start_byte())
1933        .unwrap_or(node.end_byte());
1934    source
1935        .get(start..body_start)
1936        .ok_or_else(|| anyhow!("invalid Python signature byte range"))
1937}
1938
1939fn run_command<I, S>(
1940    root: &Path,
1941    program: &str,
1942    args: I,
1943    timeout_seconds: u64,
1944) -> Result<CommandRecord>
1945where
1946    I: IntoIterator<Item = S>,
1947    S: Into<String>,
1948{
1949    let start = Instant::now();
1950    let args = args.into_iter().map(Into::into).collect::<Vec<String>>();
1951    let mut child = Command::new(program)
1952        .args(&args)
1953        .current_dir(root)
1954        .stdout(Stdio::piped())
1955        .stderr(Stdio::piped())
1956        .spawn()
1957        .with_context(|| format!("failed to run {}", command_line(program, &args)))?;
1958    let timeout = Duration::from_secs(timeout_seconds.max(1));
1959    loop {
1960        if child
1961            .try_wait()
1962            .with_context(|| format!("failed to poll {}", command_line(program, &args)))?
1963            .is_some()
1964        {
1965            let output = child
1966                .wait_with_output()
1967                .with_context(|| format!("failed to read {}", command_line(program, &args)))?;
1968            let status = if output.status.success() {
1969                RunStatus::Passed
1970            } else {
1971                RunStatus::Failed
1972            };
1973            return Ok(CommandRecord {
1974                program: program.to_string(),
1975                args,
1976                cwd: utf8_path(root)?,
1977                exit_code: output.status.code(),
1978                status,
1979                stdout: String::from_utf8_lossy(&output.stdout).to_string(),
1980                stderr: String::from_utf8_lossy(&output.stderr).to_string(),
1981                duration_ms: start.elapsed().as_millis(),
1982            });
1983        }
1984        if start.elapsed() >= timeout {
1985            let _ = child.kill();
1986            let output = child.wait_with_output().with_context(|| {
1987                format!(
1988                    "failed to collect timed-out {}",
1989                    command_line(program, &args)
1990                )
1991            })?;
1992            let mut stderr = String::from_utf8_lossy(&output.stderr).to_string();
1993            if !stderr.is_empty() {
1994                stderr.push('\n');
1995            }
1996            stderr.push_str(&format!("command timed out after {}s", timeout.as_secs()));
1997            return Ok(CommandRecord {
1998                program: program.to_string(),
1999                args,
2000                cwd: utf8_path(root)?,
2001                exit_code: output.status.code(),
2002                status: RunStatus::Failed,
2003                stdout: String::from_utf8_lossy(&output.stdout).to_string(),
2004                stderr,
2005                duration_ms: start.elapsed().as_millis(),
2006            });
2007        }
2008        thread::sleep(Duration::from_millis(20));
2009    }
2010}
2011
2012fn parse_replay_marker_output(stdout: &str) -> Vec<serde_json::Value> {
2013    stdout
2014        .lines()
2015        .filter_map(|line| {
2016            let marker = line.find("__VERITAS_REPLAY__")?;
2017            let payload = &line[marker + "__VERITAS_REPLAY__".len()..];
2018            let mut parts = payload.splitn(3, '\t');
2019            let input = parts.next()?.to_string();
2020            let status = parts.next()?.to_string();
2021            let output = parts.next().unwrap_or_default().to_string();
2022            Some(serde_json::json!({
2023                "input": input,
2024                "status": status,
2025                "output": output,
2026            }))
2027        })
2028        .collect()
2029}
2030
2031fn parse_replay_case_marker_output(stdout: &str) -> BTreeMap<String, Vec<serde_json::Value>> {
2032    let mut by_case: BTreeMap<String, Vec<serde_json::Value>> = BTreeMap::new();
2033    for line in stdout.lines() {
2034        let Some(marker) = line.find("__VERITAS_REPLAY_CASE__") else {
2035            continue;
2036        };
2037        let payload = &line[marker + "__VERITAS_REPLAY_CASE__".len()..];
2038        let mut parts = payload.splitn(4, '\t');
2039        let Some(case) = parts.next() else {
2040            continue;
2041        };
2042        let Some(input) = parts.next() else {
2043            continue;
2044        };
2045        let Some(status) = parts.next() else {
2046            continue;
2047        };
2048        let output = parts.next().unwrap_or_default();
2049        by_case
2050            .entry(case.to_string())
2051            .or_default()
2052            .push(serde_json::json!({
2053                "input": input,
2054                "status": status,
2055                "output": output,
2056            }));
2057    }
2058    by_case
2059}
2060
2061fn safe_ident(value: &str) -> String {
2062    let mut out = String::new();
2063    for ch in value.chars() {
2064        if ch.is_ascii_alphanumeric() {
2065            out.push(ch.to_ascii_lowercase());
2066        } else {
2067            out.push('_');
2068        }
2069    }
2070    while out.contains("__") {
2071        out = out.replace("__", "_");
2072    }
2073    let out = out.trim_matches('_').to_string();
2074    if out.is_empty() {
2075        "target".to_string()
2076    } else {
2077        out
2078    }
2079}
2080
2081fn relative_utf8(root: &Path, path: &Path) -> Result<Utf8PathBuf> {
2082    let relative = path
2083        .strip_prefix(root)
2084        .with_context(|| format!("failed to relativize {}", path.display()))?;
2085    Utf8PathBuf::from_path_buf(relative.to_path_buf())
2086        .map_err(|path| anyhow!("path is not valid UTF-8: {}", path.display()))
2087}
2088
2089fn utf8_path(path: &Path) -> Result<Utf8PathBuf> {
2090    Utf8PathBuf::from_path_buf(path.to_path_buf())
2091        .map_err(|path| anyhow!("path is not valid UTF-8: {}", path.display()))
2092}
2093
2094fn command_line(program: &str, args: &[impl AsRef<str>]) -> String {
2095    let mut parts = vec![program.to_string()];
2096    parts.extend(args.iter().map(|arg| {
2097        let arg = arg.as_ref();
2098        if arg.contains(char::is_whitespace) {
2099            format!("{arg:?}")
2100        } else {
2101            arg.to_string()
2102        }
2103    }));
2104    parts.join(" ")
2105}
2106
2107fn excerpt(value: &str) -> String {
2108    let mut lines = value.lines().take(20).collect::<Vec<_>>().join("\n");
2109    if value.lines().count() > 20 {
2110        lines.push_str("\n...");
2111    }
2112    lines
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117    use super::*;
2118    use std::path::PathBuf;
2119    use std::time::{SystemTime, UNIX_EPOCH};
2120
2121    #[test]
2122    fn pytest_preference_uses_pytest_when_available() {
2123        let root = unique_test_root("pytest-available");
2124        fs::create_dir_all(root.join("tests")).expect("create tests dir");
2125        fs::write(root.join("pyproject.toml"), "[tool.pytest.ini_options]\n").expect("pyproject");
2126
2127        let (runner, args) = python_test_args_with_availability(&root, true);
2128
2129        fs::remove_dir_all(&root).ok();
2130        assert_eq!(runner, "pytest");
2131        assert_eq!(args, ["-m", "pytest", "-q"]);
2132    }
2133
2134    #[test]
2135    fn pytest_preference_falls_back_to_unittest_when_unavailable() {
2136        let root = unique_test_root("pytest-unavailable");
2137        fs::create_dir_all(root.join("tests")).expect("create tests dir");
2138        fs::write(root.join("tests/test_invoice.py"), "import pytest\n").expect("test file");
2139
2140        let (runner, args) = python_test_args_with_availability(&root, false);
2141
2142        fs::remove_dir_all(&root).ok();
2143        assert_eq!(runner, "unittest");
2144        assert_eq!(args, ["-m", "unittest", "discover"]);
2145    }
2146
2147    #[test]
2148    fn python_mutation_skip_annotation_is_function_local() {
2149        let source =
2150            "def kept():\n    return 1\n\ndef skipped():\n    # veritas:skip-mutation\n    return 2\n";
2151        let kept_start = source.find("def kept").unwrap();
2152        let kept_end = source.find("def skipped").unwrap();
2153        let skipped_start = kept_end;
2154        assert!(!function_has_mutation_skip_annotation(
2155            source, kept_start, kept_end
2156        ));
2157        assert!(function_has_mutation_skip_annotation(
2158            source,
2159            skipped_start,
2160            source.len()
2161        ));
2162    }
2163
2164    fn unique_test_root(name: &str) -> PathBuf {
2165        let millis = SystemTime::now()
2166            .duration_since(UNIX_EPOCH)
2167            .expect("system time")
2168            .as_millis();
2169        std::env::temp_dir().join(format!("veritas-python-{name}-{millis}"))
2170    }
2171}