Skip to main content

cha_parser/
python.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use cha_core::{ClassInfo, FunctionInfo, ImportInfo, SourceFile, SourceModel};
5use tree_sitter::{Node, Parser};
6
7use crate::LanguageParser;
8
9pub struct PythonParser;
10
11impl LanguageParser for PythonParser {
12    fn language_name(&self) -> &str {
13        "python"
14    }
15
16    fn parse(&self, file: &SourceFile) -> Option<SourceModel> {
17        let mut parser = Parser::new();
18        parser
19            .set_language(&tree_sitter_python::LANGUAGE.into())
20            .ok()?;
21        let tree = parser.parse(&file.content, None)?;
22        let root = tree.root_node();
23        let src = file.content.as_bytes();
24
25        let mut functions = Vec::new();
26        let mut classes = Vec::new();
27        let mut imports = Vec::new();
28
29        let imports_map = crate::python_imports::build(root, src);
30        collect_top_level(
31            root,
32            src,
33            &imports_map,
34            &mut functions,
35            &mut classes,
36            &mut imports,
37        );
38
39        Some(SourceModel {
40            language: "python".into(),
41            total_lines: file.line_count(),
42            functions,
43            classes,
44            imports,
45            comments: collect_comments(root, src),
46            type_aliases: vec![], // TODO(parser): extract type aliases from 'type X = Y' / 'X = Y' declarations
47        })
48    }
49}
50
51fn push_definition(
52    node: Node,
53    src: &[u8],
54    imports_map: &crate::type_ref::ImportsMap,
55    functions: &mut Vec<FunctionInfo>,
56    classes: &mut Vec<ClassInfo>,
57) {
58    match node.kind() {
59        "function_definition" => {
60            if let Some(f) = extract_function(node, src, imports_map) {
61                functions.push(f);
62            }
63        }
64        "class_definition" => {
65            if let Some(c) = extract_class(node, src, imports_map, functions) {
66                classes.push(c);
67            }
68        }
69        _ => {}
70    }
71}
72
73fn collect_top_level(
74    node: Node,
75    src: &[u8],
76    imports_map: &crate::type_ref::ImportsMap,
77    functions: &mut Vec<FunctionInfo>,
78    classes: &mut Vec<ClassInfo>,
79    imports: &mut Vec<ImportInfo>,
80) {
81    let mut cursor = node.walk();
82    for child in node.children(&mut cursor) {
83        match child.kind() {
84            "function_definition" | "class_definition" => {
85                push_definition(child, src, imports_map, functions, classes);
86            }
87            "import_statement" => collect_import(child, src, imports),
88            "import_from_statement" => collect_import_from(child, src, imports),
89            "decorated_definition" => {
90                let mut inner = child.walk();
91                for c in child.children(&mut inner) {
92                    push_definition(c, src, imports_map, functions, classes);
93                }
94            }
95            _ => {}
96        }
97    }
98}
99
100fn extract_function(
101    node: Node,
102    src: &[u8],
103    imports_map: &crate::type_ref::ImportsMap,
104) -> Option<FunctionInfo> {
105    let name_node = node.child_by_field_name("name")?;
106    let name = node_text(name_node, src).to_string();
107    let name_col = name_node.start_position().column;
108    let name_end_col = name_node.end_position().column;
109    let start_line = node.start_position().row + 1;
110    let end_line = node.end_position().row + 1;
111    let body = node.child_by_field_name("body");
112    let params = node.child_by_field_name("parameters");
113    let (param_count, param_types) = params
114        .map(|p| extract_params(p, src, imports_map))
115        .unwrap_or((0, vec![]));
116
117    Some(FunctionInfo {
118        name,
119        start_line,
120        end_line,
121        name_col,
122        name_end_col,
123        line_count: end_line - start_line + 1,
124        complexity: count_complexity(node),
125        body_hash: body.map(hash_ast_structure),
126        is_exported: true,
127        parameter_count: param_count,
128        parameter_types: param_types,
129        chain_depth: body.map(max_chain_depth).unwrap_or(0),
130        switch_arms: body.map(count_match_arms).unwrap_or(0),
131        external_refs: body
132            .map(|b| collect_external_refs(b, src))
133            .unwrap_or_default(),
134        is_delegating: body.map(|b| check_delegating(b, src)).unwrap_or(false),
135        comment_lines: count_comment_lines(node, src),
136        referenced_fields: body.map(|b| collect_self_refs(b, src)).unwrap_or_default(),
137        null_check_fields: body
138            .map(|b| collect_none_checks(b, src))
139            .unwrap_or_default(),
140        switch_dispatch_target: body.and_then(|b| extract_match_target_py(b, src)),
141        optional_param_count: params.map(count_optional).unwrap_or(0),
142        called_functions: body.map(|b| collect_calls_py(b, src)).unwrap_or_default(),
143        cognitive_complexity: body.map(cognitive_complexity_py).unwrap_or(0),
144        return_type: node
145            .child_by_field_name("return_type")
146            .map(|rt| crate::type_ref::resolve(node_text(rt, src), imports_map)),
147    })
148}
149
150fn find_method_def(child: Node) -> Option<Node> {
151    if child.kind() == "function_definition" {
152        return Some(child);
153    }
154    if child.kind() == "decorated_definition" {
155        let mut inner = child.walk();
156        return child
157            .children(&mut inner)
158            .find(|c| c.kind() == "function_definition");
159    }
160    None
161}
162
163fn extract_parent_name(node: Node, src: &[u8]) -> Option<String> {
164    node.child_by_field_name("superclasses").and_then(|sc| {
165        let mut c = sc.walk();
166        sc.children(&mut c)
167            .find(|n| n.kind() != "(" && n.kind() != ")" && n.kind() != ",")
168            .map(|n| node_text(n, src).to_string())
169    })
170}
171
172fn has_listener_name(name: &str) -> bool {
173    name.contains("listener")
174        || name.contains("handler")
175        || name.contains("callback")
176        || name.contains("observer")
177}
178
179fn process_method(
180    func_node: Node,
181    f: &mut FunctionInfo,
182    src: &[u8],
183    field_names: &mut Vec<String>,
184) -> (bool, bool, bool, usize) {
185    let method_name = &f.name;
186    let mut has_behavior = false;
187    let mut is_override = false;
188    let mut is_notify = false;
189    if method_name == "__init__" {
190        collect_init_fields(func_node, src, field_names);
191    } else {
192        has_behavior = true;
193    }
194    let sc = func_node
195        .child_by_field_name("body")
196        .map(|b| count_self_calls(b, src))
197        .unwrap_or(0);
198    if method_name.starts_with("__") && method_name.ends_with("__") && method_name != "__init__" {
199        is_override = true;
200    }
201    if method_name.contains("notify") || method_name.contains("emit") {
202        is_notify = true;
203    }
204    f.is_exported = !method_name.starts_with('_');
205    (has_behavior, is_override, is_notify, sc)
206}
207
208struct ClassScan {
209    methods: Vec<FunctionInfo>,
210    field_names: Vec<String>,
211    delegating_count: usize,
212    has_behavior: bool,
213    override_count: usize,
214    self_call_count: usize,
215    has_notify_method: bool,
216}
217
218fn scan_class_methods(
219    body: Node,
220    src: &[u8],
221    imports_map: &crate::type_ref::ImportsMap,
222) -> ClassScan {
223    let mut s = ClassScan {
224        methods: Vec::new(),
225        field_names: Vec::new(),
226        delegating_count: 0,
227        has_behavior: false,
228        override_count: 0,
229        self_call_count: 0,
230        has_notify_method: false,
231    };
232    let mut cursor = body.walk();
233    for child in body.children(&mut cursor) {
234        let Some(func_node) = find_method_def(child) else {
235            continue;
236        };
237        let Some(mut f) = extract_function(func_node, src, imports_map) else {
238            continue;
239        };
240        if f.is_delegating {
241            s.delegating_count += 1;
242        }
243        let (behav, over, notify, sc) = process_method(func_node, &mut f, src, &mut s.field_names);
244        s.has_behavior |= behav;
245        if over {
246            s.override_count += 1;
247        }
248        if notify {
249            s.has_notify_method = true;
250        }
251        s.self_call_count += sc;
252        s.methods.push(f);
253    }
254    s
255}
256
257fn extract_class(
258    node: Node,
259    src: &[u8],
260    imports_map: &crate::type_ref::ImportsMap,
261    top_functions: &mut Vec<FunctionInfo>,
262) -> Option<ClassInfo> {
263    let name_node = node.child_by_field_name("name")?;
264    let name = node_text(name_node, src).to_string();
265    let name_col = name_node.start_position().column;
266    let name_end_col = name_node.end_position().column;
267    let start_line = node.start_position().row + 1;
268    let end_line = node.end_position().row + 1;
269    let body = node.child_by_field_name("body")?;
270    let s = scan_class_methods(body, src, imports_map);
271    let method_count = s.methods.len();
272    top_functions.extend(s.methods);
273
274    Some(ClassInfo {
275        name,
276        start_line,
277        end_line,
278        name_col,
279        name_end_col,
280        line_count: end_line - start_line + 1,
281        method_count,
282        is_exported: true,
283        delegating_method_count: s.delegating_count,
284        field_count: s.field_names.len(),
285        has_listener_field: s.field_names.iter().any(|n| has_listener_name(n)),
286        field_names: s.field_names,
287        field_types: Vec::new(),
288        has_behavior: s.has_behavior,
289        is_interface: has_only_pass_or_ellipsis(body, src),
290        parent_name: extract_parent_name(node, src),
291        override_count: s.override_count,
292        self_call_count: s.self_call_count,
293        has_notify_method: s.has_notify_method,
294    })
295}
296
297// --- imports ---
298
299fn collect_import(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
300    let line = node.start_position().row + 1;
301    let col = node.start_position().column;
302    let mut cursor = node.walk();
303    for child in node.children(&mut cursor) {
304        if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
305            let text = node_text(child, src);
306            imports.push(ImportInfo {
307                source: text.to_string(),
308                line,
309                col,
310                ..Default::default()
311            });
312        }
313    }
314}
315
316fn collect_import_from(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
317    let line = node.start_position().row + 1;
318    let col = node.start_position().column;
319    let module = node
320        .child_by_field_name("module_name")
321        .map(|n| node_text(n, src).to_string())
322        .unwrap_or_default();
323    let mut cursor = node.walk();
324    let mut has_names = false;
325    for child in node.children(&mut cursor) {
326        if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
327            let n = node_text(child, src).to_string();
328            if n != module {
329                imports.push(ImportInfo {
330                    source: format!("{module}.{n}"),
331                    line,
332                    col,
333                    ..Default::default()
334                });
335                has_names = true;
336            }
337        }
338    }
339    if !has_names {
340        imports.push(ImportInfo {
341            source: module,
342            line,
343            col,
344            ..Default::default()
345        });
346    }
347}
348
349// --- helpers ---
350
351fn node_text<'a>(node: Node, src: &'a [u8]) -> &'a str {
352    node.utf8_text(src).unwrap_or("")
353}
354
355fn count_complexity(node: Node) -> usize {
356    let mut complexity = 1usize;
357    let mut cursor = node.walk();
358    visit_all(node, &mut cursor, &mut |n| {
359        match n.kind() {
360            "if_statement"
361            | "elif_clause"
362            | "for_statement"
363            | "while_statement"
364            | "except_clause"
365            | "with_statement"
366            | "assert_statement"
367            | "conditional_expression"
368            | "boolean_operator"
369            | "list_comprehension"
370            | "set_comprehension"
371            | "dictionary_comprehension"
372            | "generator_expression" => {
373                complexity += 1;
374            }
375            "match_statement" => {} // match itself doesn't add, cases do
376            "case_clause" => {
377                complexity += 1;
378            }
379            _ => {}
380        }
381    });
382    complexity
383}
384
385fn hash_ast_structure(node: Node) -> u64 {
386    let mut hasher = DefaultHasher::new();
387    hash_node(node, &mut hasher);
388    hasher.finish()
389}
390
391fn hash_node(node: Node, hasher: &mut DefaultHasher) {
392    node.kind().hash(hasher);
393    let mut cursor = node.walk();
394    for child in node.children(&mut cursor) {
395        hash_node(child, hasher);
396    }
397}
398
399fn max_chain_depth(node: Node) -> usize {
400    let mut max = 0usize;
401    let mut cursor = node.walk();
402    visit_all(node, &mut cursor, &mut |n| {
403        if n.kind() == "attribute" {
404            let depth = chain_len(n);
405            if depth > max {
406                max = depth;
407            }
408        }
409    });
410    max
411}
412
413fn chain_len(node: Node) -> usize {
414    let mut depth = 0usize;
415    let mut current = node;
416    while current.kind() == "attribute" || current.kind() == "call" {
417        if current.kind() == "attribute" {
418            depth += 1;
419        }
420        if let Some(obj) = current.child(0) {
421            current = obj;
422        } else {
423            break;
424        }
425    }
426    depth
427}
428
429fn count_match_arms(node: Node) -> usize {
430    let mut count = 0usize;
431    let mut cursor = node.walk();
432    visit_all(node, &mut cursor, &mut |n| {
433        if n.kind() == "case_clause" {
434            count += 1;
435        }
436    });
437    count
438}
439
440fn collect_external_refs(node: Node, src: &[u8]) -> Vec<String> {
441    let mut refs = Vec::new();
442    let mut cursor = node.walk();
443    visit_all(node, &mut cursor, &mut |n| {
444        if n.kind() != "attribute" {
445            return;
446        }
447        let Some(obj) = n.child(0) else { return };
448        let text = node_text(obj, src);
449        if text != "self"
450            && !text.is_empty()
451            && text.starts_with(|c: char| c.is_lowercase())
452            && !refs.contains(&text.to_string())
453        {
454            refs.push(text.to_string());
455        }
456    });
457    refs
458}
459
460fn unwrap_single_call(body: Node) -> Option<Node> {
461    let mut c = body.walk();
462    let stmts: Vec<Node> = body
463        .children(&mut c)
464        .filter(|n| !n.is_extra() && n.kind() != "pass_statement" && n.kind() != "comment")
465        .collect();
466    if stmts.len() != 1 {
467        return None;
468    }
469    let stmt = stmts[0];
470    match stmt.kind() {
471        "return_statement" => stmt.child(1).filter(|v| v.kind() == "call"),
472        "expression_statement" => stmt.child(0).filter(|v| v.kind() == "call"),
473        _ => None,
474    }
475}
476
477fn check_delegating(body: Node, src: &[u8]) -> bool {
478    let Some(func) = unwrap_single_call(body).and_then(|c| c.child(0)) else {
479        return false;
480    };
481    let text = node_text(func, src);
482    text.contains('.') && !text.starts_with("self.")
483}
484
485fn count_comment_lines(node: Node, src: &[u8]) -> usize {
486    let mut count = 0usize;
487    let mut cursor = node.walk();
488    visit_all(node, &mut cursor, &mut |n| {
489        if n.kind() == "comment" {
490            count += 1;
491        } else if n.kind() == "string" || n.kind() == "expression_statement" {
492            // docstrings
493            let text = node_text(n, src);
494            if text.starts_with("\"\"\"") || text.starts_with("'''") {
495                count += text.lines().count();
496            }
497        }
498    });
499    count
500}
501
502fn collect_self_refs(body: Node, src: &[u8]) -> Vec<String> {
503    let mut refs = Vec::new();
504    let mut cursor = body.walk();
505    visit_all(body, &mut cursor, &mut |n| {
506        if n.kind() != "attribute" {
507            return;
508        }
509        let is_self = n.child(0).is_some_and(|o| node_text(o, src) == "self");
510        if !is_self {
511            return;
512        }
513        if let Some(attr) = n.child_by_field_name("attribute") {
514            let name = node_text(attr, src).to_string();
515            if !refs.contains(&name) {
516                refs.push(name);
517            }
518        }
519    });
520    refs
521}
522
523fn collect_none_checks(body: Node, src: &[u8]) -> Vec<String> {
524    let mut fields = Vec::new();
525    let mut cursor = body.walk();
526    visit_all(body, &mut cursor, &mut |n| {
527        if n.kind() != "comparison_operator" {
528            return;
529        }
530        let text = node_text(n, src);
531        if !text.contains("is None") && !text.contains("is not None") && !text.contains("== None") {
532            return;
533        }
534        if let Some(left) = n.child(0) {
535            let name = node_text(left, src).to_string();
536            if !fields.contains(&name) {
537                fields.push(name);
538            }
539        }
540    });
541    fields
542}
543
544fn is_self_or_cls(name: &str) -> bool {
545    name == "self" || name == "cls"
546}
547
548fn param_name_and_type(child: Node, src: &[u8]) -> Option<(String, String)> {
549    match child.kind() {
550        "identifier" => {
551            let name = node_text(child, src);
552            (!is_self_or_cls(name)).then(|| (name.to_string(), "Any".to_string()))
553        }
554        "typed_parameter" | "default_parameter" | "typed_default_parameter" => {
555            let name = child
556                .child_by_field_name("name")
557                .or_else(|| child.child(0))
558                .map(|n| node_text(n, src))
559                .unwrap_or("");
560            if is_self_or_cls(name) {
561                return None;
562            }
563            let ty = child
564                .child_by_field_name("type")
565                .map(|n| node_text(n, src).to_string())
566                .unwrap_or_else(|| "Any".to_string());
567            Some((name.to_string(), ty))
568        }
569        "list_splat_pattern" | "dictionary_splat_pattern" => {
570            Some(("*".to_string(), "Any".to_string()))
571        }
572        _ => None,
573    }
574}
575
576fn extract_params(
577    params_node: Node,
578    src: &[u8],
579    imports_map: &crate::type_ref::ImportsMap,
580) -> (usize, Vec<cha_core::TypeRef>) {
581    let mut count = 0usize;
582    let mut types = Vec::new();
583    let mut cursor = params_node.walk();
584    for child in params_node.children(&mut cursor) {
585        if let Some((_name, ty)) = param_name_and_type(child, src) {
586            count += 1;
587            types.push(crate::type_ref::resolve(ty, imports_map));
588        }
589    }
590    (count, types)
591}
592
593fn count_optional(params_node: Node) -> usize {
594    let mut count = 0usize;
595    let mut cursor = params_node.walk();
596    for child in params_node.children(&mut cursor) {
597        if child.kind() == "default_parameter" || child.kind() == "typed_default_parameter" {
598            count += 1;
599        }
600    }
601    count
602}
603
604fn collect_init_fields(func_node: Node, src: &[u8], fields: &mut Vec<String>) {
605    let Some(body) = func_node.child_by_field_name("body") else {
606        return;
607    };
608    let mut cursor = body.walk();
609    visit_all(body, &mut cursor, &mut |n| {
610        if n.kind() != "assignment" {
611            return;
612        }
613        let Some(left) = n.child_by_field_name("left") else {
614            return;
615        };
616        if left.kind() != "attribute" {
617            return;
618        }
619        let is_self = left.child(0).is_some_and(|o| node_text(o, src) == "self");
620        if !is_self {
621            return;
622        }
623        if let Some(attr) = left.child_by_field_name("attribute") {
624            let name = node_text(attr, src).to_string();
625            if !fields.contains(&name) {
626                fields.push(name);
627            }
628        }
629    });
630}
631
632fn count_self_calls(body: Node, src: &[u8]) -> usize {
633    let mut count = 0;
634    let mut cursor = body.walk();
635    visit_all(body, &mut cursor, &mut |n| {
636        if n.kind() != "call" {
637            return;
638        }
639        let is_self_call = n
640            .child(0)
641            .filter(|f| f.kind() == "attribute")
642            .and_then(|f| f.child(0))
643            .is_some_and(|obj| node_text(obj, src) == "self");
644        if is_self_call {
645            count += 1;
646        }
647    });
648    count
649}
650
651fn is_stub_body(node: Node, src: &[u8]) -> bool {
652    node.child_by_field_name("body")
653        .is_none_or(|b| has_only_pass_or_ellipsis(b, src))
654}
655
656fn has_only_pass_or_ellipsis(body: Node, src: &[u8]) -> bool {
657    let mut cursor = body.walk();
658    for child in body.children(&mut cursor) {
659        let ok = match child.kind() {
660            "pass_statement" | "ellipsis" | "comment" => true,
661            "expression_statement" => child.child(0).is_none_or(|expr| {
662                let text = node_text(expr, src);
663                text == "..." || text.starts_with("\"\"\"") || text.starts_with("'''")
664            }),
665            "function_definition" => is_stub_body(child, src),
666            "decorated_definition" => {
667                let mut inner = child.walk();
668                child
669                    .children(&mut inner)
670                    .filter(|c| c.kind() == "function_definition")
671                    .all(|c| is_stub_body(c, src))
672            }
673            _ => false,
674        };
675        if !ok {
676            return false;
677        }
678    }
679    true
680}
681
682fn cognitive_complexity_py(node: tree_sitter::Node) -> usize {
683    let mut score = 0;
684    cc_walk_py(node, 0, &mut score);
685    score
686}
687
688fn cc_walk_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
689    match node.kind() {
690        "if_statement" => {
691            *score += 1 + nesting;
692            cc_children_py(node, nesting + 1, score);
693            return;
694        }
695        "for_statement" | "while_statement" => {
696            *score += 1 + nesting;
697            cc_children_py(node, nesting + 1, score);
698            return;
699        }
700        "match_statement" => {
701            *score += 1 + nesting;
702            cc_children_py(node, nesting + 1, score);
703            return;
704        }
705        "elif_clause" | "else_clause" => {
706            *score += 1;
707        }
708        "boolean_operator" => {
709            *score += 1;
710        }
711        "except_clause" => {
712            *score += 1 + nesting;
713            cc_children_py(node, nesting + 1, score);
714            return;
715        }
716        "lambda" => {
717            cc_children_py(node, nesting + 1, score);
718            return;
719        }
720        _ => {}
721    }
722    cc_children_py(node, nesting, score);
723}
724
725fn cc_children_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
726    let mut cursor = node.walk();
727    for child in node.children(&mut cursor) {
728        cc_walk_py(child, nesting, score);
729    }
730}
731
732fn extract_match_target_py(body: tree_sitter::Node, src: &[u8]) -> Option<String> {
733    let mut target = None;
734    let mut cursor = body.walk();
735    visit_all(body, &mut cursor, &mut |n| {
736        if n.kind() == "match_statement"
737            && target.is_none()
738            && let Some(subj) = n.child_by_field_name("subject")
739        {
740            target = Some(node_text(subj, src).to_string());
741        }
742    });
743    target
744}
745
746fn collect_calls_py(body: tree_sitter::Node, src: &[u8]) -> Vec<String> {
747    let mut calls = Vec::new();
748    let mut cursor = body.walk();
749    visit_all(body, &mut cursor, &mut |n| {
750        if n.kind() == "call"
751            && let Some(func) = n.child(0)
752        {
753            let name = node_text(func, src).to_string();
754            if !calls.contains(&name) {
755                calls.push(name);
756            }
757        }
758    });
759    calls
760}
761
762fn collect_comments(root: Node, src: &[u8]) -> Vec<cha_core::CommentInfo> {
763    let mut comments = Vec::new();
764    let mut cursor = root.walk();
765    visit_all(root, &mut cursor, &mut |n| {
766        if n.kind().contains("comment") {
767            comments.push(cha_core::CommentInfo {
768                text: node_text(n, src).to_string(),
769                line: n.start_position().row + 1,
770            });
771        }
772    });
773    comments
774}
775
776fn visit_all<F: FnMut(Node)>(node: Node, cursor: &mut tree_sitter::TreeCursor, f: &mut F) {
777    f(node);
778    if cursor.goto_first_child() {
779        loop {
780            let child_node = cursor.node();
781            let mut child_cursor = child_node.walk();
782            visit_all(child_node, &mut child_cursor, f);
783            if !cursor.goto_next_sibling() {
784                break;
785            }
786        }
787        cursor.goto_parent();
788    }
789}