mati_core/analysis/enrich_signals/
python.rs1use std::cell::RefCell;
8use std::sync::LazyLock;
9
10use anyhow::Result;
11
12use super::{comments, Signal, SignalKind, SignalTier};
13use crate::analysis::walker::Language;
14
15static PY_LANGUAGE: LazyLock<tree_sitter::Language> =
16 LazyLock::new(|| tree_sitter_python::LANGUAGE.into());
17
18const PY_QUERY_SRC: &str = r#"
19 (raise_statement) @panic
20 (assert_statement) @assert
21 (call function: (attribute object: (identifier) @mod
22 attribute: (identifier) @fn)
23 (#eq? @mod "sys") (#eq? @fn "exit")) @sys_exit
24 (comment) @comment
25"#;
26
27static PY_QUERY: LazyLock<tree_sitter::Query> = LazyLock::new(|| {
28 tree_sitter::Query::new(&PY_LANGUAGE, PY_QUERY_SRC)
29 .expect("enrich_signals/python: invalid query")
30});
31
32thread_local! {
33 static PY_PARSER: RefCell<tree_sitter::Parser> = RefCell::new({
34 let mut p = tree_sitter::Parser::new();
35 p.set_language(&PY_LANGUAGE).expect("enrich_signals/python: grammar load failed");
36 p
37 });
38}
39
40pub fn extract(source: &str) -> Result<Vec<Signal>> {
41 let tree = PY_PARSER.with(|p| {
42 let mut parser = p.borrow_mut();
43 parser
44 .parse(source.as_bytes(), None)
45 .ok_or_else(|| anyhow::anyhow!("enrich_signals/python: parse returned None"))
46 })?;
47 let bytes = source.as_bytes();
48 let mut out: Vec<Signal> = Vec::new();
49 let mut cursor = tree_sitter::QueryCursor::new();
50 let cap = |n: &str| PY_QUERY.capture_index_for_name(n).unwrap_or(u32::MAX);
51 let (i_panic, i_assert, i_sys_exit, i_comment) =
52 (cap("panic"), cap("assert"), cap("sys_exit"), cap("comment"));
53
54 for m in cursor.matches(&PY_QUERY, tree.root_node(), bytes) {
55 for c in m.captures {
56 let node = c.node;
57 let line = node.start_position().row as u32 + 1;
58 let evidence = super::node_text(bytes, node);
59 if c.index == i_panic || c.index == i_sys_exit {
60 out.push(Signal {
61 file_line: line,
62 tier: SignalTier::High,
63 kind: SignalKind::Panic,
64 evidence: super::trim_evidence(&evidence),
65 });
66 } else if c.index == i_assert {
67 out.push(Signal {
68 file_line: line,
69 tier: SignalTier::High,
70 kind: SignalKind::Assert,
71 evidence: super::trim_evidence(&evidence),
72 });
73 } else if c.index == i_comment {
74 if let Some(sig) = comments::scan_comment_text(&evidence, line) {
75 out.push(sig);
76 } else if let Some(sig) =
77 comments::scan_linter_disable(&evidence, line, Language::Python)
78 {
79 out.push(sig);
80 }
81 }
82 }
83 }
84 Ok(out)
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90
91 #[test]
92 fn detects_raise() {
93 let signals = extract("def f():\n raise ValueError('bad')\n").unwrap();
94 assert!(signals.iter().any(|s| s.kind == SignalKind::Panic));
95 }
96
97 #[test]
98 fn detects_assert() {
99 let signals = extract("assert x > 0, 'must be positive'\n").unwrap();
100 assert!(signals.iter().any(|s| s.kind == SignalKind::Assert));
101 }
102
103 #[test]
104 fn detects_sys_exit_call() {
105 let signals = extract("import sys\nsys.exit(1)\n").unwrap();
106 assert!(signals
107 .iter()
108 .any(|s| s.kind == SignalKind::Panic && s.evidence.contains("sys.exit")));
109 }
110
111 #[test]
112 fn detects_warning_comment_and_noqa() {
113 let src = "# WARNING: don't import from .. here\nx = 1 # noqa: E501\n";
114 let signals = extract(src).unwrap();
115 assert!(signals
116 .iter()
117 .any(|s| s.kind == SignalKind::WarnComment && s.tier == SignalTier::High));
118 assert!(signals.iter().any(|s| s.kind == SignalKind::LinterDisable));
119 }
120}