Skip to main content

cha_parser/
python.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use cha_core::{ClassInfo, FunctionInfo, ImportInfo, SourceFile, SourceModel};
5use tree_sitter::{Node, Parser};
6
7use crate::LanguageParser;
8
9pub struct PythonParser;
10
11impl LanguageParser for PythonParser {
12    fn language_name(&self) -> &str {
13        "python"
14    }
15
16    fn parse(&self, file: &SourceFile) -> Option<SourceModel> {
17        let mut parser = Parser::new();
18        parser
19            .set_language(&tree_sitter_python::LANGUAGE.into())
20            .ok()?;
21        let tree = parser.parse(&file.content, None)?;
22        let root = tree.root_node();
23        let src = file.content.as_bytes();
24
25        let mut functions = Vec::new();
26        let mut classes = Vec::new();
27        let mut imports = Vec::new();
28
29        let imports_map = crate::python_imports::build(root, src);
30        collect_top_level(
31            root,
32            src,
33            &imports_map,
34            &mut functions,
35            &mut classes,
36            &mut imports,
37        );
38
39        Some(SourceModel {
40            language: "python".into(),
41            total_lines: file.line_count(),
42            functions,
43            classes,
44            imports,
45            comments: collect_comments(root, src),
46            type_aliases: vec![], // TODO(parser): extract type aliases from 'type X = Y' / 'X = Y' declarations
47        })
48    }
49}
50
51fn push_definition(
52    node: Node,
53    src: &[u8],
54    imports_map: &crate::type_ref::ImportsMap,
55    functions: &mut Vec<FunctionInfo>,
56    classes: &mut Vec<ClassInfo>,
57) {
58    match node.kind() {
59        "function_definition" => {
60            if let Some(f) = extract_function(node, src, imports_map) {
61                functions.push(f);
62            }
63        }
64        "class_definition" => {
65            if let Some(c) = extract_class(node, src, imports_map, functions) {
66                classes.push(c);
67            }
68        }
69        _ => {}
70    }
71}
72
73fn collect_top_level(
74    node: Node,
75    src: &[u8],
76    imports_map: &crate::type_ref::ImportsMap,
77    functions: &mut Vec<FunctionInfo>,
78    classes: &mut Vec<ClassInfo>,
79    imports: &mut Vec<ImportInfo>,
80) {
81    let mut cursor = node.walk();
82    for child in node.children(&mut cursor) {
83        match child.kind() {
84            "function_definition" | "class_definition" => {
85                push_definition(child, src, imports_map, functions, classes);
86            }
87            "import_statement" => collect_import(child, src, imports),
88            "import_from_statement" => collect_import_from(child, src, imports),
89            "decorated_definition" => {
90                let mut inner = child.walk();
91                for c in child.children(&mut inner) {
92                    push_definition(c, src, imports_map, functions, classes);
93                }
94            }
95            _ => {}
96        }
97    }
98}
99
100fn extract_function(
101    node: Node,
102    src: &[u8],
103    imports_map: &crate::type_ref::ImportsMap,
104) -> Option<FunctionInfo> {
105    let name_node = node.child_by_field_name("name")?;
106    let name = node_text(name_node, src).to_string();
107    let name_col = name_node.start_position().column;
108    let name_end_col = name_node.end_position().column;
109    let start_line = node.start_position().row + 1;
110    let end_line = node.end_position().row + 1;
111    let body = node.child_by_field_name("body");
112    let params = node.child_by_field_name("parameters");
113    let (param_count, param_types) = params
114        .map(|p| extract_params(p, src, imports_map))
115        .unwrap_or((0, vec![]));
116
117    Some(FunctionInfo {
118        name,
119        start_line,
120        end_line,
121        name_col,
122        name_end_col,
123        line_count: end_line - start_line + 1,
124        complexity: count_complexity(node),
125        body_hash: body.map(hash_ast_structure),
126        is_exported: true,
127        parameter_count: param_count,
128        parameter_types: param_types,
129        chain_depth: body.map(max_chain_depth).unwrap_or(0),
130        switch_arms: body.map(count_match_arms).unwrap_or(0),
131        external_refs: body
132            .map(|b| collect_external_refs(b, src))
133            .unwrap_or_default(),
134        is_delegating: body.map(|b| check_delegating(b, src)).unwrap_or(false),
135        comment_lines: count_comment_lines(node, src),
136        referenced_fields: body.map(|b| collect_self_refs(b, src)).unwrap_or_default(),
137        null_check_fields: body
138            .map(|b| collect_none_checks(b, src))
139            .unwrap_or_default(),
140        switch_dispatch_target: body.and_then(|b| extract_match_target_py(b, src)),
141        optional_param_count: params.map(count_optional).unwrap_or(0),
142        called_functions: body.map(|b| collect_calls_py(b, src)).unwrap_or_default(),
143        cognitive_complexity: body.map(cognitive_complexity_py).unwrap_or(0),
144    })
145}
146
147fn find_method_def(child: Node) -> Option<Node> {
148    if child.kind() == "function_definition" {
149        return Some(child);
150    }
151    if child.kind() == "decorated_definition" {
152        let mut inner = child.walk();
153        return child
154            .children(&mut inner)
155            .find(|c| c.kind() == "function_definition");
156    }
157    None
158}
159
160fn extract_parent_name(node: Node, src: &[u8]) -> Option<String> {
161    node.child_by_field_name("superclasses").and_then(|sc| {
162        let mut c = sc.walk();
163        sc.children(&mut c)
164            .find(|n| n.kind() != "(" && n.kind() != ")" && n.kind() != ",")
165            .map(|n| node_text(n, src).to_string())
166    })
167}
168
169fn has_listener_name(name: &str) -> bool {
170    name.contains("listener")
171        || name.contains("handler")
172        || name.contains("callback")
173        || name.contains("observer")
174}
175
176fn process_method(
177    func_node: Node,
178    f: &mut FunctionInfo,
179    src: &[u8],
180    field_names: &mut Vec<String>,
181) -> (bool, bool, bool, usize) {
182    let method_name = &f.name;
183    let mut has_behavior = false;
184    let mut is_override = false;
185    let mut is_notify = false;
186    if method_name == "__init__" {
187        collect_init_fields(func_node, src, field_names);
188    } else {
189        has_behavior = true;
190    }
191    let sc = func_node
192        .child_by_field_name("body")
193        .map(|b| count_self_calls(b, src))
194        .unwrap_or(0);
195    if method_name.starts_with("__") && method_name.ends_with("__") && method_name != "__init__" {
196        is_override = true;
197    }
198    if method_name.contains("notify") || method_name.contains("emit") {
199        is_notify = true;
200    }
201    f.is_exported = !method_name.starts_with('_');
202    (has_behavior, is_override, is_notify, sc)
203}
204
205struct ClassScan {
206    methods: Vec<FunctionInfo>,
207    field_names: Vec<String>,
208    delegating_count: usize,
209    has_behavior: bool,
210    override_count: usize,
211    self_call_count: usize,
212    has_notify_method: bool,
213}
214
215fn scan_class_methods(
216    body: Node,
217    src: &[u8],
218    imports_map: &crate::type_ref::ImportsMap,
219) -> ClassScan {
220    let mut s = ClassScan {
221        methods: Vec::new(),
222        field_names: Vec::new(),
223        delegating_count: 0,
224        has_behavior: false,
225        override_count: 0,
226        self_call_count: 0,
227        has_notify_method: false,
228    };
229    let mut cursor = body.walk();
230    for child in body.children(&mut cursor) {
231        let Some(func_node) = find_method_def(child) else {
232            continue;
233        };
234        let Some(mut f) = extract_function(func_node, src, imports_map) else {
235            continue;
236        };
237        if f.is_delegating {
238            s.delegating_count += 1;
239        }
240        let (behav, over, notify, sc) = process_method(func_node, &mut f, src, &mut s.field_names);
241        s.has_behavior |= behav;
242        if over {
243            s.override_count += 1;
244        }
245        if notify {
246            s.has_notify_method = true;
247        }
248        s.self_call_count += sc;
249        s.methods.push(f);
250    }
251    s
252}
253
254fn extract_class(
255    node: Node,
256    src: &[u8],
257    imports_map: &crate::type_ref::ImportsMap,
258    top_functions: &mut Vec<FunctionInfo>,
259) -> Option<ClassInfo> {
260    let name_node = node.child_by_field_name("name")?;
261    let name = node_text(name_node, src).to_string();
262    let name_col = name_node.start_position().column;
263    let name_end_col = name_node.end_position().column;
264    let start_line = node.start_position().row + 1;
265    let end_line = node.end_position().row + 1;
266    let body = node.child_by_field_name("body")?;
267    let s = scan_class_methods(body, src, imports_map);
268    let method_count = s.methods.len();
269    top_functions.extend(s.methods);
270
271    Some(ClassInfo {
272        name,
273        start_line,
274        end_line,
275        name_col,
276        name_end_col,
277        line_count: end_line - start_line + 1,
278        method_count,
279        is_exported: true,
280        delegating_method_count: s.delegating_count,
281        field_count: s.field_names.len(),
282        has_listener_field: s.field_names.iter().any(|n| has_listener_name(n)),
283        field_names: s.field_names,
284        field_types: Vec::new(),
285        has_behavior: s.has_behavior,
286        is_interface: has_only_pass_or_ellipsis(body, src),
287        parent_name: extract_parent_name(node, src),
288        override_count: s.override_count,
289        self_call_count: s.self_call_count,
290        has_notify_method: s.has_notify_method,
291    })
292}
293
294// --- imports ---
295
296fn collect_import(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
297    let line = node.start_position().row + 1;
298    let col = node.start_position().column;
299    let mut cursor = node.walk();
300    for child in node.children(&mut cursor) {
301        if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
302            let text = node_text(child, src);
303            imports.push(ImportInfo {
304                source: text.to_string(),
305                line,
306                col,
307                ..Default::default()
308            });
309        }
310    }
311}
312
313fn collect_import_from(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
314    let line = node.start_position().row + 1;
315    let col = node.start_position().column;
316    let module = node
317        .child_by_field_name("module_name")
318        .map(|n| node_text(n, src).to_string())
319        .unwrap_or_default();
320    let mut cursor = node.walk();
321    let mut has_names = false;
322    for child in node.children(&mut cursor) {
323        if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
324            let n = node_text(child, src).to_string();
325            if n != module {
326                imports.push(ImportInfo {
327                    source: format!("{module}.{n}"),
328                    line,
329                    col,
330                    ..Default::default()
331                });
332                has_names = true;
333            }
334        }
335    }
336    if !has_names {
337        imports.push(ImportInfo {
338            source: module,
339            line,
340            col,
341            ..Default::default()
342        });
343    }
344}
345
346// --- helpers ---
347
348fn node_text<'a>(node: Node, src: &'a [u8]) -> &'a str {
349    node.utf8_text(src).unwrap_or("")
350}
351
352fn count_complexity(node: Node) -> usize {
353    let mut complexity = 1usize;
354    let mut cursor = node.walk();
355    visit_all(node, &mut cursor, &mut |n| {
356        match n.kind() {
357            "if_statement"
358            | "elif_clause"
359            | "for_statement"
360            | "while_statement"
361            | "except_clause"
362            | "with_statement"
363            | "assert_statement"
364            | "conditional_expression"
365            | "boolean_operator"
366            | "list_comprehension"
367            | "set_comprehension"
368            | "dictionary_comprehension"
369            | "generator_expression" => {
370                complexity += 1;
371            }
372            "match_statement" => {} // match itself doesn't add, cases do
373            "case_clause" => {
374                complexity += 1;
375            }
376            _ => {}
377        }
378    });
379    complexity
380}
381
382fn hash_ast_structure(node: Node) -> u64 {
383    let mut hasher = DefaultHasher::new();
384    hash_node(node, &mut hasher);
385    hasher.finish()
386}
387
388fn hash_node(node: Node, hasher: &mut DefaultHasher) {
389    node.kind().hash(hasher);
390    let mut cursor = node.walk();
391    for child in node.children(&mut cursor) {
392        hash_node(child, hasher);
393    }
394}
395
396fn max_chain_depth(node: Node) -> usize {
397    let mut max = 0usize;
398    let mut cursor = node.walk();
399    visit_all(node, &mut cursor, &mut |n| {
400        if n.kind() == "attribute" {
401            let depth = chain_len(n);
402            if depth > max {
403                max = depth;
404            }
405        }
406    });
407    max
408}
409
410fn chain_len(node: Node) -> usize {
411    let mut depth = 0usize;
412    let mut current = node;
413    while current.kind() == "attribute" || current.kind() == "call" {
414        if current.kind() == "attribute" {
415            depth += 1;
416        }
417        if let Some(obj) = current.child(0) {
418            current = obj;
419        } else {
420            break;
421        }
422    }
423    depth
424}
425
426fn count_match_arms(node: Node) -> usize {
427    let mut count = 0usize;
428    let mut cursor = node.walk();
429    visit_all(node, &mut cursor, &mut |n| {
430        if n.kind() == "case_clause" {
431            count += 1;
432        }
433    });
434    count
435}
436
437fn collect_external_refs(node: Node, src: &[u8]) -> Vec<String> {
438    let mut refs = Vec::new();
439    let mut cursor = node.walk();
440    visit_all(node, &mut cursor, &mut |n| {
441        if n.kind() != "attribute" {
442            return;
443        }
444        let Some(obj) = n.child(0) else { return };
445        let text = node_text(obj, src);
446        if text != "self"
447            && !text.is_empty()
448            && text.starts_with(|c: char| c.is_lowercase())
449            && !refs.contains(&text.to_string())
450        {
451            refs.push(text.to_string());
452        }
453    });
454    refs
455}
456
457fn unwrap_single_call(body: Node) -> Option<Node> {
458    let mut c = body.walk();
459    let stmts: Vec<Node> = body
460        .children(&mut c)
461        .filter(|n| !n.is_extra() && n.kind() != "pass_statement" && n.kind() != "comment")
462        .collect();
463    if stmts.len() != 1 {
464        return None;
465    }
466    let stmt = stmts[0];
467    match stmt.kind() {
468        "return_statement" => stmt.child(1).filter(|v| v.kind() == "call"),
469        "expression_statement" => stmt.child(0).filter(|v| v.kind() == "call"),
470        _ => None,
471    }
472}
473
474fn check_delegating(body: Node, src: &[u8]) -> bool {
475    let Some(func) = unwrap_single_call(body).and_then(|c| c.child(0)) else {
476        return false;
477    };
478    let text = node_text(func, src);
479    text.contains('.') && !text.starts_with("self.")
480}
481
482fn count_comment_lines(node: Node, src: &[u8]) -> usize {
483    let mut count = 0usize;
484    let mut cursor = node.walk();
485    visit_all(node, &mut cursor, &mut |n| {
486        if n.kind() == "comment" {
487            count += 1;
488        } else if n.kind() == "string" || n.kind() == "expression_statement" {
489            // docstrings
490            let text = node_text(n, src);
491            if text.starts_with("\"\"\"") || text.starts_with("'''") {
492                count += text.lines().count();
493            }
494        }
495    });
496    count
497}
498
499fn collect_self_refs(body: Node, src: &[u8]) -> Vec<String> {
500    let mut refs = Vec::new();
501    let mut cursor = body.walk();
502    visit_all(body, &mut cursor, &mut |n| {
503        if n.kind() != "attribute" {
504            return;
505        }
506        let is_self = n.child(0).is_some_and(|o| node_text(o, src) == "self");
507        if !is_self {
508            return;
509        }
510        if let Some(attr) = n.child_by_field_name("attribute") {
511            let name = node_text(attr, src).to_string();
512            if !refs.contains(&name) {
513                refs.push(name);
514            }
515        }
516    });
517    refs
518}
519
520fn collect_none_checks(body: Node, src: &[u8]) -> Vec<String> {
521    let mut fields = Vec::new();
522    let mut cursor = body.walk();
523    visit_all(body, &mut cursor, &mut |n| {
524        if n.kind() != "comparison_operator" {
525            return;
526        }
527        let text = node_text(n, src);
528        if !text.contains("is None") && !text.contains("is not None") && !text.contains("== None") {
529            return;
530        }
531        if let Some(left) = n.child(0) {
532            let name = node_text(left, src).to_string();
533            if !fields.contains(&name) {
534                fields.push(name);
535            }
536        }
537    });
538    fields
539}
540
541fn is_self_or_cls(name: &str) -> bool {
542    name == "self" || name == "cls"
543}
544
545fn param_name_and_type(child: Node, src: &[u8]) -> Option<(String, String)> {
546    match child.kind() {
547        "identifier" => {
548            let name = node_text(child, src);
549            (!is_self_or_cls(name)).then(|| (name.to_string(), "Any".to_string()))
550        }
551        "typed_parameter" | "default_parameter" | "typed_default_parameter" => {
552            let name = child
553                .child_by_field_name("name")
554                .or_else(|| child.child(0))
555                .map(|n| node_text(n, src))
556                .unwrap_or("");
557            if is_self_or_cls(name) {
558                return None;
559            }
560            let ty = child
561                .child_by_field_name("type")
562                .map(|n| node_text(n, src).to_string())
563                .unwrap_or_else(|| "Any".to_string());
564            Some((name.to_string(), ty))
565        }
566        "list_splat_pattern" | "dictionary_splat_pattern" => {
567            Some(("*".to_string(), "Any".to_string()))
568        }
569        _ => None,
570    }
571}
572
573fn extract_params(
574    params_node: Node,
575    src: &[u8],
576    imports_map: &crate::type_ref::ImportsMap,
577) -> (usize, Vec<cha_core::TypeRef>) {
578    let mut count = 0usize;
579    let mut types = Vec::new();
580    let mut cursor = params_node.walk();
581    for child in params_node.children(&mut cursor) {
582        if let Some((_name, ty)) = param_name_and_type(child, src) {
583            count += 1;
584            types.push(crate::type_ref::resolve(ty, imports_map));
585        }
586    }
587    (count, types)
588}
589
590fn count_optional(params_node: Node) -> usize {
591    let mut count = 0usize;
592    let mut cursor = params_node.walk();
593    for child in params_node.children(&mut cursor) {
594        if child.kind() == "default_parameter" || child.kind() == "typed_default_parameter" {
595            count += 1;
596        }
597    }
598    count
599}
600
601fn collect_init_fields(func_node: Node, src: &[u8], fields: &mut Vec<String>) {
602    let Some(body) = func_node.child_by_field_name("body") else {
603        return;
604    };
605    let mut cursor = body.walk();
606    visit_all(body, &mut cursor, &mut |n| {
607        if n.kind() != "assignment" {
608            return;
609        }
610        let Some(left) = n.child_by_field_name("left") else {
611            return;
612        };
613        if left.kind() != "attribute" {
614            return;
615        }
616        let is_self = left.child(0).is_some_and(|o| node_text(o, src) == "self");
617        if !is_self {
618            return;
619        }
620        if let Some(attr) = left.child_by_field_name("attribute") {
621            let name = node_text(attr, src).to_string();
622            if !fields.contains(&name) {
623                fields.push(name);
624            }
625        }
626    });
627}
628
629fn count_self_calls(body: Node, src: &[u8]) -> usize {
630    let mut count = 0;
631    let mut cursor = body.walk();
632    visit_all(body, &mut cursor, &mut |n| {
633        if n.kind() != "call" {
634            return;
635        }
636        let is_self_call = n
637            .child(0)
638            .filter(|f| f.kind() == "attribute")
639            .and_then(|f| f.child(0))
640            .is_some_and(|obj| node_text(obj, src) == "self");
641        if is_self_call {
642            count += 1;
643        }
644    });
645    count
646}
647
648fn is_stub_body(node: Node, src: &[u8]) -> bool {
649    node.child_by_field_name("body")
650        .is_none_or(|b| has_only_pass_or_ellipsis(b, src))
651}
652
653fn has_only_pass_or_ellipsis(body: Node, src: &[u8]) -> bool {
654    let mut cursor = body.walk();
655    for child in body.children(&mut cursor) {
656        let ok = match child.kind() {
657            "pass_statement" | "ellipsis" | "comment" => true,
658            "expression_statement" => child.child(0).is_none_or(|expr| {
659                let text = node_text(expr, src);
660                text == "..." || text.starts_with("\"\"\"") || text.starts_with("'''")
661            }),
662            "function_definition" => is_stub_body(child, src),
663            "decorated_definition" => {
664                let mut inner = child.walk();
665                child
666                    .children(&mut inner)
667                    .filter(|c| c.kind() == "function_definition")
668                    .all(|c| is_stub_body(c, src))
669            }
670            _ => false,
671        };
672        if !ok {
673            return false;
674        }
675    }
676    true
677}
678
679fn cognitive_complexity_py(node: tree_sitter::Node) -> usize {
680    let mut score = 0;
681    cc_walk_py(node, 0, &mut score);
682    score
683}
684
685fn cc_walk_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
686    match node.kind() {
687        "if_statement" => {
688            *score += 1 + nesting;
689            cc_children_py(node, nesting + 1, score);
690            return;
691        }
692        "for_statement" | "while_statement" => {
693            *score += 1 + nesting;
694            cc_children_py(node, nesting + 1, score);
695            return;
696        }
697        "match_statement" => {
698            *score += 1 + nesting;
699            cc_children_py(node, nesting + 1, score);
700            return;
701        }
702        "elif_clause" | "else_clause" => {
703            *score += 1;
704        }
705        "boolean_operator" => {
706            *score += 1;
707        }
708        "except_clause" => {
709            *score += 1 + nesting;
710            cc_children_py(node, nesting + 1, score);
711            return;
712        }
713        "lambda" => {
714            cc_children_py(node, nesting + 1, score);
715            return;
716        }
717        _ => {}
718    }
719    cc_children_py(node, nesting, score);
720}
721
722fn cc_children_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
723    let mut cursor = node.walk();
724    for child in node.children(&mut cursor) {
725        cc_walk_py(child, nesting, score);
726    }
727}
728
729fn extract_match_target_py(body: tree_sitter::Node, src: &[u8]) -> Option<String> {
730    let mut target = None;
731    let mut cursor = body.walk();
732    visit_all(body, &mut cursor, &mut |n| {
733        if n.kind() == "match_statement"
734            && target.is_none()
735            && let Some(subj) = n.child_by_field_name("subject")
736        {
737            target = Some(node_text(subj, src).to_string());
738        }
739    });
740    target
741}
742
743fn collect_calls_py(body: tree_sitter::Node, src: &[u8]) -> Vec<String> {
744    let mut calls = Vec::new();
745    let mut cursor = body.walk();
746    visit_all(body, &mut cursor, &mut |n| {
747        if n.kind() == "call"
748            && let Some(func) = n.child(0)
749        {
750            let name = node_text(func, src).to_string();
751            if !calls.contains(&name) {
752                calls.push(name);
753            }
754        }
755    });
756    calls
757}
758
759fn collect_comments(root: Node, src: &[u8]) -> Vec<cha_core::CommentInfo> {
760    let mut comments = Vec::new();
761    let mut cursor = root.walk();
762    visit_all(root, &mut cursor, &mut |n| {
763        if n.kind().contains("comment") {
764            comments.push(cha_core::CommentInfo {
765                text: node_text(n, src).to_string(),
766                line: n.start_position().row + 1,
767            });
768        }
769    });
770    comments
771}
772
773fn visit_all<F: FnMut(Node)>(node: Node, cursor: &mut tree_sitter::TreeCursor, f: &mut F) {
774    f(node);
775    if cursor.goto_first_child() {
776        loop {
777            let child_node = cursor.node();
778            let mut child_cursor = child_node.walk();
779            visit_all(child_node, &mut child_cursor, f);
780            if !cursor.goto_next_sibling() {
781                break;
782            }
783        }
784        cursor.goto_parent();
785    }
786}