1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use cha_core::{ClassInfo, FunctionInfo, ImportInfo, SourceFile, SourceModel};
5use tree_sitter::{Node, Parser};
6
7use crate::LanguageParser;
8
9pub struct PythonParser;
10
11impl LanguageParser for PythonParser {
12 fn language_name(&self) -> &str {
13 "python"
14 }
15
16 fn parse(&self, file: &SourceFile) -> Option<SourceModel> {
17 let mut parser = Parser::new();
18 parser
19 .set_language(&tree_sitter_python::LANGUAGE.into())
20 .ok()?;
21 let tree = parser.parse(&file.content, None)?;
22 let root = tree.root_node();
23 let src = file.content.as_bytes();
24
25 let mut functions = Vec::new();
26 let mut classes = Vec::new();
27 let mut imports = Vec::new();
28 let mut type_aliases = Vec::new();
29
30 let imports_map = crate::python_imports::build(root, src);
31 collect_top_level(
32 root,
33 src,
34 &imports_map,
35 &mut functions,
36 &mut classes,
37 &mut imports,
38 &mut type_aliases,
39 );
40
41 Some(SourceModel {
42 language: "python".into(),
43 total_lines: file.line_count(),
44 functions,
45 classes,
46 imports,
47 comments: collect_comments(root, src),
48 type_aliases,
49 })
50 }
51}
52
53fn push_definition(
54 node: Node,
55 src: &[u8],
56 imports_map: &crate::type_ref::ImportsMap,
57 functions: &mut Vec<FunctionInfo>,
58 classes: &mut Vec<ClassInfo>,
59) {
60 match node.kind() {
61 "function_definition" => {
62 if let Some(f) = extract_function(node, src, imports_map) {
63 functions.push(f);
64 }
65 }
66 "class_definition" => {
67 if let Some(c) = extract_class(node, src, imports_map, functions) {
68 classes.push(c);
69 }
70 }
71 _ => {}
72 }
73}
74
75fn collect_top_level(
76 node: Node,
77 src: &[u8],
78 imports_map: &crate::type_ref::ImportsMap,
79 functions: &mut Vec<FunctionInfo>,
80 classes: &mut Vec<ClassInfo>,
81 imports: &mut Vec<ImportInfo>,
82 type_aliases: &mut Vec<(String, String)>,
83) {
84 let mut cursor = node.walk();
85 for child in node.children(&mut cursor) {
86 match child.kind() {
87 "function_definition" | "class_definition" => {
88 push_definition(child, src, imports_map, functions, classes);
89 }
90 "import_statement" => collect_import(child, src, imports),
91 "import_from_statement" => collect_import_from(child, src, imports),
92 "type_alias_statement" => collect_type_alias_statement(child, src, type_aliases),
93 "expression_statement" => collect_typed_alias_assignment(child, src, type_aliases),
94 "decorated_definition" => {
95 let mut inner = child.walk();
96 for c in child.children(&mut inner) {
97 push_definition(c, src, imports_map, functions, classes);
98 }
99 }
100 _ => {}
101 }
102 }
103}
104
105fn collect_type_alias_statement(node: Node, src: &[u8], out: &mut Vec<(String, String)>) {
106 if let Some(pair) = crate::type_aliases::python_statement(node, src) {
107 out.push(pair);
108 }
109}
110
111fn collect_typed_alias_assignment(node: Node, src: &[u8], out: &mut Vec<(String, String)>) {
112 if let Some(pair) = crate::type_aliases::python_assignment(node, src) {
113 out.push(pair);
114 }
115}
116
117fn extract_function(
118 node: Node,
119 src: &[u8],
120 imports_map: &crate::type_ref::ImportsMap,
121) -> Option<FunctionInfo> {
122 let name_node = node.child_by_field_name("name")?;
123 let name = node_text(name_node, src).to_string();
124 let name_col = name_node.start_position().column;
125 let name_end_col = name_node.end_position().column;
126 let start_line = node.start_position().row + 1;
127 let end_line = node.end_position().row + 1;
128 let body = node.child_by_field_name("body");
129 let params = node.child_by_field_name("parameters");
130 let (param_count, param_types) = params
131 .map(|p| extract_params(p, src, imports_map))
132 .unwrap_or((0, vec![]));
133
134 Some(FunctionInfo {
135 name,
136 start_line,
137 end_line,
138 name_col,
139 name_end_col,
140 line_count: end_line - start_line + 1,
141 complexity: count_complexity(node),
142 body_hash: body.map(hash_ast_structure),
143 is_exported: true,
144 parameter_count: param_count,
145 parameter_types: param_types,
146 chain_depth: body.map(max_chain_depth).unwrap_or(0),
147 switch_arms: body.map(count_match_arms).unwrap_or(0),
148 external_refs: body
149 .map(|b| collect_external_refs(b, src))
150 .unwrap_or_default(),
151 is_delegating: body.map(|b| check_delegating(b, src)).unwrap_or(false),
152 comment_lines: count_comment_lines(node, src),
153 referenced_fields: body.map(|b| collect_self_refs(b, src)).unwrap_or_default(),
154 null_check_fields: body
155 .map(|b| collect_none_checks(b, src))
156 .unwrap_or_default(),
157 switch_dispatch_target: body.and_then(|b| extract_match_target_py(b, src)),
158 optional_param_count: params.map(count_optional).unwrap_or(0),
159 called_functions: body.map(|b| collect_calls_py(b, src)).unwrap_or_default(),
160 cognitive_complexity: body.map(cognitive_complexity_py).unwrap_or(0),
161 return_type: node
162 .child_by_field_name("return_type")
163 .map(|rt| crate::type_ref::resolve(node_text(rt, src), imports_map)),
164 })
165}
166
167fn find_method_def(child: Node) -> Option<Node> {
168 if child.kind() == "function_definition" {
169 return Some(child);
170 }
171 if child.kind() == "decorated_definition" {
172 let mut inner = child.walk();
173 return child
174 .children(&mut inner)
175 .find(|c| c.kind() == "function_definition");
176 }
177 None
178}
179
180fn extract_parent_name(node: Node, src: &[u8]) -> Option<String> {
181 node.child_by_field_name("superclasses").and_then(|sc| {
182 let mut c = sc.walk();
183 sc.children(&mut c)
184 .find(|n| n.kind() != "(" && n.kind() != ")" && n.kind() != ",")
185 .map(|n| node_text(n, src).to_string())
186 })
187}
188
189fn has_listener_name(name: &str) -> bool {
190 name.contains("listener")
191 || name.contains("handler")
192 || name.contains("callback")
193 || name.contains("observer")
194}
195
196fn process_method(
197 func_node: Node,
198 f: &mut FunctionInfo,
199 src: &[u8],
200 field_names: &mut Vec<String>,
201) -> (bool, bool, bool, usize) {
202 let method_name = &f.name;
203 let mut has_behavior = false;
204 let mut is_override = false;
205 let mut is_notify = false;
206 if method_name == "__init__" {
207 collect_init_fields(func_node, src, field_names);
208 } else {
209 has_behavior = true;
210 }
211 let sc = func_node
212 .child_by_field_name("body")
213 .map(|b| count_self_calls(b, src))
214 .unwrap_or(0);
215 if method_name.starts_with("__") && method_name.ends_with("__") && method_name != "__init__" {
216 is_override = true;
217 }
218 if method_name.contains("notify") || method_name.contains("emit") {
219 is_notify = true;
220 }
221 f.is_exported = !method_name.starts_with('_');
222 (has_behavior, is_override, is_notify, sc)
223}
224
225struct ClassScan {
226 methods: Vec<FunctionInfo>,
227 field_names: Vec<String>,
228 delegating_count: usize,
229 has_behavior: bool,
230 override_count: usize,
231 self_call_count: usize,
232 has_notify_method: bool,
233}
234
235fn scan_class_methods(
236 body: Node,
237 src: &[u8],
238 imports_map: &crate::type_ref::ImportsMap,
239) -> ClassScan {
240 let mut s = ClassScan {
241 methods: Vec::new(),
242 field_names: Vec::new(),
243 delegating_count: 0,
244 has_behavior: false,
245 override_count: 0,
246 self_call_count: 0,
247 has_notify_method: false,
248 };
249 let mut cursor = body.walk();
250 for child in body.children(&mut cursor) {
251 let Some(func_node) = find_method_def(child) else {
252 continue;
253 };
254 let Some(mut f) = extract_function(func_node, src, imports_map) else {
255 continue;
256 };
257 if f.is_delegating {
258 s.delegating_count += 1;
259 }
260 let (behav, over, notify, sc) = process_method(func_node, &mut f, src, &mut s.field_names);
261 s.has_behavior |= behav;
262 if over {
263 s.override_count += 1;
264 }
265 if notify {
266 s.has_notify_method = true;
267 }
268 s.self_call_count += sc;
269 s.methods.push(f);
270 }
271 s
272}
273
274fn extract_class(
275 node: Node,
276 src: &[u8],
277 imports_map: &crate::type_ref::ImportsMap,
278 top_functions: &mut Vec<FunctionInfo>,
279) -> Option<ClassInfo> {
280 let name_node = node.child_by_field_name("name")?;
281 let name = node_text(name_node, src).to_string();
282 let name_col = name_node.start_position().column;
283 let name_end_col = name_node.end_position().column;
284 let start_line = node.start_position().row + 1;
285 let end_line = node.end_position().row + 1;
286 let body = node.child_by_field_name("body")?;
287 let s = scan_class_methods(body, src, imports_map);
288 let method_count = s.methods.len();
289 top_functions.extend(s.methods);
290
291 Some(ClassInfo {
292 name,
293 start_line,
294 end_line,
295 name_col,
296 name_end_col,
297 line_count: end_line - start_line + 1,
298 method_count,
299 is_exported: true,
300 delegating_method_count: s.delegating_count,
301 field_count: s.field_names.len(),
302 has_listener_field: s.field_names.iter().any(|n| has_listener_name(n)),
303 field_names: s.field_names,
304 field_types: Vec::new(),
305 has_behavior: s.has_behavior,
306 is_interface: has_only_pass_or_ellipsis(body, src),
307 parent_name: extract_parent_name(node, src),
308 override_count: s.override_count,
309 self_call_count: s.self_call_count,
310 has_notify_method: s.has_notify_method,
311 })
312}
313
314fn collect_import(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
317 let line = node.start_position().row + 1;
318 let col = node.start_position().column;
319 let mut cursor = node.walk();
320 for child in node.children(&mut cursor) {
321 if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
322 let text = node_text(child, src);
323 imports.push(ImportInfo {
324 source: text.to_string(),
325 line,
326 col,
327 ..Default::default()
328 });
329 }
330 }
331}
332
333fn collect_import_from(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
334 let line = node.start_position().row + 1;
335 let col = node.start_position().column;
336 let module = node
337 .child_by_field_name("module_name")
338 .map(|n| node_text(n, src).to_string())
339 .unwrap_or_default();
340 let mut cursor = node.walk();
341 let mut has_names = false;
342 for child in node.children(&mut cursor) {
343 if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
344 let n = node_text(child, src).to_string();
345 if n != module {
346 imports.push(ImportInfo {
347 source: format!("{module}.{n}"),
348 line,
349 col,
350 ..Default::default()
351 });
352 has_names = true;
353 }
354 }
355 }
356 if !has_names {
357 imports.push(ImportInfo {
358 source: module,
359 line,
360 col,
361 ..Default::default()
362 });
363 }
364}
365
366fn node_text<'a>(node: Node, src: &'a [u8]) -> &'a str {
369 node.utf8_text(src).unwrap_or("")
370}
371
372fn count_complexity(node: Node) -> usize {
373 let mut complexity = 1usize;
374 let mut cursor = node.walk();
375 visit_all(node, &mut cursor, &mut |n| {
376 match n.kind() {
377 "if_statement"
378 | "elif_clause"
379 | "for_statement"
380 | "while_statement"
381 | "except_clause"
382 | "with_statement"
383 | "assert_statement"
384 | "conditional_expression"
385 | "boolean_operator"
386 | "list_comprehension"
387 | "set_comprehension"
388 | "dictionary_comprehension"
389 | "generator_expression" => {
390 complexity += 1;
391 }
392 "match_statement" => {} "case_clause" => {
394 complexity += 1;
395 }
396 _ => {}
397 }
398 });
399 complexity
400}
401
402fn hash_ast_structure(node: Node) -> u64 {
403 let mut hasher = DefaultHasher::new();
404 hash_node(node, &mut hasher);
405 hasher.finish()
406}
407
408fn hash_node(node: Node, hasher: &mut DefaultHasher) {
409 node.kind().hash(hasher);
410 let mut cursor = node.walk();
411 for child in node.children(&mut cursor) {
412 hash_node(child, hasher);
413 }
414}
415
416fn max_chain_depth(node: Node) -> usize {
417 let mut max = 0usize;
418 let mut cursor = node.walk();
419 visit_all(node, &mut cursor, &mut |n| {
420 if n.kind() == "attribute" {
421 let depth = chain_len(n);
422 if depth > max {
423 max = depth;
424 }
425 }
426 });
427 max
428}
429
430fn chain_len(node: Node) -> usize {
431 let mut depth = 0usize;
432 let mut current = node;
433 while current.kind() == "attribute" || current.kind() == "call" {
434 if current.kind() == "attribute" {
435 depth += 1;
436 }
437 if let Some(obj) = current.child(0) {
438 current = obj;
439 } else {
440 break;
441 }
442 }
443 depth
444}
445
446fn count_match_arms(node: Node) -> usize {
447 let mut count = 0usize;
448 let mut cursor = node.walk();
449 visit_all(node, &mut cursor, &mut |n| {
450 if n.kind() == "case_clause" {
451 count += 1;
452 }
453 });
454 count
455}
456
457fn collect_external_refs(node: Node, src: &[u8]) -> Vec<String> {
458 let mut refs = Vec::new();
459 let mut cursor = node.walk();
460 visit_all(node, &mut cursor, &mut |n| {
461 if n.kind() != "attribute" {
462 return;
463 }
464 let Some(obj) = n.child(0) else { return };
465 let text = node_text(obj, src);
466 if text != "self"
467 && !text.is_empty()
468 && text.starts_with(|c: char| c.is_lowercase())
469 && !refs.contains(&text.to_string())
470 {
471 refs.push(text.to_string());
472 }
473 });
474 refs
475}
476
477fn unwrap_single_call(body: Node) -> Option<Node> {
478 let mut c = body.walk();
479 let stmts: Vec<Node> = body
480 .children(&mut c)
481 .filter(|n| !n.is_extra() && n.kind() != "pass_statement" && n.kind() != "comment")
482 .collect();
483 if stmts.len() != 1 {
484 return None;
485 }
486 let stmt = stmts[0];
487 match stmt.kind() {
488 "return_statement" => stmt.child(1).filter(|v| v.kind() == "call"),
489 "expression_statement" => stmt.child(0).filter(|v| v.kind() == "call"),
490 _ => None,
491 }
492}
493
494fn check_delegating(body: Node, src: &[u8]) -> bool {
495 let Some(func) = unwrap_single_call(body).and_then(|c| c.child(0)) else {
496 return false;
497 };
498 let text = node_text(func, src);
499 text.contains('.') && !text.starts_with("self.")
500}
501
502fn count_comment_lines(node: Node, src: &[u8]) -> usize {
503 let mut count = 0usize;
504 let mut cursor = node.walk();
505 visit_all(node, &mut cursor, &mut |n| {
506 if n.kind() == "comment" {
507 count += 1;
508 } else if n.kind() == "string" || n.kind() == "expression_statement" {
509 let text = node_text(n, src);
511 if text.starts_with("\"\"\"") || text.starts_with("'''") {
512 count += text.lines().count();
513 }
514 }
515 });
516 count
517}
518
519fn collect_self_refs(body: Node, src: &[u8]) -> Vec<String> {
520 let mut refs = Vec::new();
521 let mut cursor = body.walk();
522 visit_all(body, &mut cursor, &mut |n| {
523 if n.kind() != "attribute" {
524 return;
525 }
526 let is_self = n.child(0).is_some_and(|o| node_text(o, src) == "self");
527 if !is_self {
528 return;
529 }
530 if let Some(attr) = n.child_by_field_name("attribute") {
531 let name = node_text(attr, src).to_string();
532 if !refs.contains(&name) {
533 refs.push(name);
534 }
535 }
536 });
537 refs
538}
539
540fn collect_none_checks(body: Node, src: &[u8]) -> Vec<String> {
541 let mut fields = Vec::new();
542 let mut cursor = body.walk();
543 visit_all(body, &mut cursor, &mut |n| {
544 if n.kind() != "comparison_operator" {
545 return;
546 }
547 let text = node_text(n, src);
548 if !text.contains("is None") && !text.contains("is not None") && !text.contains("== None") {
549 return;
550 }
551 if let Some(left) = n.child(0) {
552 let name = node_text(left, src).to_string();
553 if !fields.contains(&name) {
554 fields.push(name);
555 }
556 }
557 });
558 fields
559}
560
561fn is_self_or_cls(name: &str) -> bool {
562 name == "self" || name == "cls"
563}
564
565fn param_name_and_type(child: Node, src: &[u8]) -> Option<(String, String)> {
566 match child.kind() {
567 "identifier" => {
568 let name = node_text(child, src);
569 (!is_self_or_cls(name)).then(|| (name.to_string(), "Any".to_string()))
570 }
571 "typed_parameter" | "default_parameter" | "typed_default_parameter" => {
572 let name = child
573 .child_by_field_name("name")
574 .or_else(|| child.child(0))
575 .map(|n| node_text(n, src))
576 .unwrap_or("");
577 if is_self_or_cls(name) {
578 return None;
579 }
580 let ty = child
581 .child_by_field_name("type")
582 .map(|n| node_text(n, src).to_string())
583 .unwrap_or_else(|| "Any".to_string());
584 Some((name.to_string(), ty))
585 }
586 "list_splat_pattern" | "dictionary_splat_pattern" => {
587 Some(("*".to_string(), "Any".to_string()))
588 }
589 _ => None,
590 }
591}
592
593fn extract_params(
594 params_node: Node,
595 src: &[u8],
596 imports_map: &crate::type_ref::ImportsMap,
597) -> (usize, Vec<cha_core::TypeRef>) {
598 let mut count = 0usize;
599 let mut types = Vec::new();
600 let mut cursor = params_node.walk();
601 for child in params_node.children(&mut cursor) {
602 if let Some((_name, ty)) = param_name_and_type(child, src) {
603 count += 1;
604 types.push(crate::type_ref::resolve(ty, imports_map));
605 }
606 }
607 (count, types)
608}
609
610fn count_optional(params_node: Node) -> usize {
611 let mut count = 0usize;
612 let mut cursor = params_node.walk();
613 for child in params_node.children(&mut cursor) {
614 if child.kind() == "default_parameter" || child.kind() == "typed_default_parameter" {
615 count += 1;
616 }
617 }
618 count
619}
620
621fn collect_init_fields(func_node: Node, src: &[u8], fields: &mut Vec<String>) {
622 let Some(body) = func_node.child_by_field_name("body") else {
623 return;
624 };
625 let mut cursor = body.walk();
626 visit_all(body, &mut cursor, &mut |n| {
627 if n.kind() != "assignment" {
628 return;
629 }
630 let Some(left) = n.child_by_field_name("left") else {
631 return;
632 };
633 if left.kind() != "attribute" {
634 return;
635 }
636 let is_self = left.child(0).is_some_and(|o| node_text(o, src) == "self");
637 if !is_self {
638 return;
639 }
640 if let Some(attr) = left.child_by_field_name("attribute") {
641 let name = node_text(attr, src).to_string();
642 if !fields.contains(&name) {
643 fields.push(name);
644 }
645 }
646 });
647}
648
649fn count_self_calls(body: Node, src: &[u8]) -> usize {
650 let mut count = 0;
651 let mut cursor = body.walk();
652 visit_all(body, &mut cursor, &mut |n| {
653 if n.kind() != "call" {
654 return;
655 }
656 let is_self_call = n
657 .child(0)
658 .filter(|f| f.kind() == "attribute")
659 .and_then(|f| f.child(0))
660 .is_some_and(|obj| node_text(obj, src) == "self");
661 if is_self_call {
662 count += 1;
663 }
664 });
665 count
666}
667
668fn is_stub_body(node: Node, src: &[u8]) -> bool {
669 node.child_by_field_name("body")
670 .is_none_or(|b| has_only_pass_or_ellipsis(b, src))
671}
672
673fn has_only_pass_or_ellipsis(body: Node, src: &[u8]) -> bool {
674 let mut cursor = body.walk();
675 for child in body.children(&mut cursor) {
676 let ok = match child.kind() {
677 "pass_statement" | "ellipsis" | "comment" => true,
678 "expression_statement" => child.child(0).is_none_or(|expr| {
679 let text = node_text(expr, src);
680 text == "..." || text.starts_with("\"\"\"") || text.starts_with("'''")
681 }),
682 "function_definition" => is_stub_body(child, src),
683 "decorated_definition" => {
684 let mut inner = child.walk();
685 child
686 .children(&mut inner)
687 .filter(|c| c.kind() == "function_definition")
688 .all(|c| is_stub_body(c, src))
689 }
690 _ => false,
691 };
692 if !ok {
693 return false;
694 }
695 }
696 true
697}
698
699fn cognitive_complexity_py(node: tree_sitter::Node) -> usize {
700 let mut score = 0;
701 cc_walk_py(node, 0, &mut score);
702 score
703}
704
705fn cc_walk_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
706 match node.kind() {
707 "if_statement" => {
708 *score += 1 + nesting;
709 cc_children_py(node, nesting + 1, score);
710 return;
711 }
712 "for_statement" | "while_statement" => {
713 *score += 1 + nesting;
714 cc_children_py(node, nesting + 1, score);
715 return;
716 }
717 "match_statement" => {
718 *score += 1 + nesting;
719 cc_children_py(node, nesting + 1, score);
720 return;
721 }
722 "elif_clause" | "else_clause" => {
723 *score += 1;
724 }
725 "boolean_operator" => {
726 *score += 1;
727 }
728 "except_clause" => {
729 *score += 1 + nesting;
730 cc_children_py(node, nesting + 1, score);
731 return;
732 }
733 "lambda" => {
734 cc_children_py(node, nesting + 1, score);
735 return;
736 }
737 _ => {}
738 }
739 cc_children_py(node, nesting, score);
740}
741
742fn cc_children_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
743 let mut cursor = node.walk();
744 for child in node.children(&mut cursor) {
745 cc_walk_py(child, nesting, score);
746 }
747}
748
749fn extract_match_target_py(body: tree_sitter::Node, src: &[u8]) -> Option<String> {
750 let mut target = None;
751 let mut cursor = body.walk();
752 visit_all(body, &mut cursor, &mut |n| {
753 if n.kind() == "match_statement"
754 && target.is_none()
755 && let Some(subj) = n.child_by_field_name("subject")
756 {
757 target = Some(node_text(subj, src).to_string());
758 }
759 });
760 target
761}
762
763fn collect_calls_py(body: tree_sitter::Node, src: &[u8]) -> Vec<String> {
764 let mut calls = Vec::new();
765 let mut cursor = body.walk();
766 visit_all(body, &mut cursor, &mut |n| {
767 if n.kind() == "call"
768 && let Some(func) = n.child(0)
769 {
770 let name = node_text(func, src).to_string();
771 if !calls.contains(&name) {
772 calls.push(name);
773 }
774 }
775 });
776 calls
777}
778
779fn collect_comments(root: Node, src: &[u8]) -> Vec<cha_core::CommentInfo> {
780 let mut comments = Vec::new();
781 let mut cursor = root.walk();
782 visit_all(root, &mut cursor, &mut |n| {
783 if n.kind().contains("comment") {
784 comments.push(cha_core::CommentInfo {
785 text: node_text(n, src).to_string(),
786 line: n.start_position().row + 1,
787 });
788 }
789 });
790 comments
791}
792
793fn visit_all<F: FnMut(Node)>(node: Node, cursor: &mut tree_sitter::TreeCursor, f: &mut F) {
794 f(node);
795 if cursor.goto_first_child() {
796 loop {
797 let child_node = cursor.node();
798 let mut child_cursor = child_node.walk();
799 visit_all(child_node, &mut child_cursor, f);
800 if !cursor.goto_next_sibling() {
801 break;
802 }
803 }
804 cursor.goto_parent();
805 }
806}