1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use cha_core::{ClassInfo, FunctionInfo, ImportInfo, SourceFile, SourceModel};
5use tree_sitter::{Node, Parser};
6
7use crate::LanguageParser;
8
9pub struct PythonParser;
10
11impl LanguageParser for PythonParser {
12 fn language_name(&self) -> &str {
13 "python"
14 }
15
16 fn parse(&self, file: &SourceFile) -> Option<SourceModel> {
17 let mut parser = Parser::new();
18 parser
19 .set_language(&tree_sitter_python::LANGUAGE.into())
20 .ok()?;
21 let tree = parser.parse(&file.content, None)?;
22 let root = tree.root_node();
23 let src = file.content.as_bytes();
24
25 let mut functions = Vec::new();
26 let mut classes = Vec::new();
27 let mut imports = Vec::new();
28
29 let imports_map = crate::python_imports::build(root, src);
30 collect_top_level(
31 root,
32 src,
33 &imports_map,
34 &mut functions,
35 &mut classes,
36 &mut imports,
37 );
38
39 Some(SourceModel {
40 language: "python".into(),
41 total_lines: file.line_count(),
42 functions,
43 classes,
44 imports,
45 comments: collect_comments(root, src),
46 type_aliases: vec![], })
48 }
49}
50
51fn push_definition(
52 node: Node,
53 src: &[u8],
54 imports_map: &crate::type_ref::ImportsMap,
55 functions: &mut Vec<FunctionInfo>,
56 classes: &mut Vec<ClassInfo>,
57) {
58 match node.kind() {
59 "function_definition" => {
60 if let Some(f) = extract_function(node, src, imports_map) {
61 functions.push(f);
62 }
63 }
64 "class_definition" => {
65 if let Some(c) = extract_class(node, src, imports_map, functions) {
66 classes.push(c);
67 }
68 }
69 _ => {}
70 }
71}
72
73fn collect_top_level(
74 node: Node,
75 src: &[u8],
76 imports_map: &crate::type_ref::ImportsMap,
77 functions: &mut Vec<FunctionInfo>,
78 classes: &mut Vec<ClassInfo>,
79 imports: &mut Vec<ImportInfo>,
80) {
81 let mut cursor = node.walk();
82 for child in node.children(&mut cursor) {
83 match child.kind() {
84 "function_definition" | "class_definition" => {
85 push_definition(child, src, imports_map, functions, classes);
86 }
87 "import_statement" => collect_import(child, src, imports),
88 "import_from_statement" => collect_import_from(child, src, imports),
89 "decorated_definition" => {
90 let mut inner = child.walk();
91 for c in child.children(&mut inner) {
92 push_definition(c, src, imports_map, functions, classes);
93 }
94 }
95 _ => {}
96 }
97 }
98}
99
100fn extract_function(
101 node: Node,
102 src: &[u8],
103 imports_map: &crate::type_ref::ImportsMap,
104) -> Option<FunctionInfo> {
105 let name_node = node.child_by_field_name("name")?;
106 let name = node_text(name_node, src).to_string();
107 let name_col = name_node.start_position().column;
108 let name_end_col = name_node.end_position().column;
109 let start_line = node.start_position().row + 1;
110 let end_line = node.end_position().row + 1;
111 let body = node.child_by_field_name("body");
112 let params = node.child_by_field_name("parameters");
113 let (param_count, param_types) = params
114 .map(|p| extract_params(p, src, imports_map))
115 .unwrap_or((0, vec![]));
116
117 Some(FunctionInfo {
118 name,
119 start_line,
120 end_line,
121 name_col,
122 name_end_col,
123 line_count: end_line - start_line + 1,
124 complexity: count_complexity(node),
125 body_hash: body.map(hash_ast_structure),
126 is_exported: true,
127 parameter_count: param_count,
128 parameter_types: param_types,
129 chain_depth: body.map(max_chain_depth).unwrap_or(0),
130 switch_arms: body.map(count_match_arms).unwrap_or(0),
131 external_refs: body
132 .map(|b| collect_external_refs(b, src))
133 .unwrap_or_default(),
134 is_delegating: body.map(|b| check_delegating(b, src)).unwrap_or(false),
135 comment_lines: count_comment_lines(node, src),
136 referenced_fields: body.map(|b| collect_self_refs(b, src)).unwrap_or_default(),
137 null_check_fields: body
138 .map(|b| collect_none_checks(b, src))
139 .unwrap_or_default(),
140 switch_dispatch_target: body.and_then(|b| extract_match_target_py(b, src)),
141 optional_param_count: params.map(count_optional).unwrap_or(0),
142 called_functions: body.map(|b| collect_calls_py(b, src)).unwrap_or_default(),
143 cognitive_complexity: body.map(cognitive_complexity_py).unwrap_or(0),
144 return_type: node
145 .child_by_field_name("return_type")
146 .map(|rt| crate::type_ref::resolve(node_text(rt, src), imports_map)),
147 })
148}
149
150fn find_method_def(child: Node) -> Option<Node> {
151 if child.kind() == "function_definition" {
152 return Some(child);
153 }
154 if child.kind() == "decorated_definition" {
155 let mut inner = child.walk();
156 return child
157 .children(&mut inner)
158 .find(|c| c.kind() == "function_definition");
159 }
160 None
161}
162
163fn extract_parent_name(node: Node, src: &[u8]) -> Option<String> {
164 node.child_by_field_name("superclasses").and_then(|sc| {
165 let mut c = sc.walk();
166 sc.children(&mut c)
167 .find(|n| n.kind() != "(" && n.kind() != ")" && n.kind() != ",")
168 .map(|n| node_text(n, src).to_string())
169 })
170}
171
172fn has_listener_name(name: &str) -> bool {
173 name.contains("listener")
174 || name.contains("handler")
175 || name.contains("callback")
176 || name.contains("observer")
177}
178
179fn process_method(
180 func_node: Node,
181 f: &mut FunctionInfo,
182 src: &[u8],
183 field_names: &mut Vec<String>,
184) -> (bool, bool, bool, usize) {
185 let method_name = &f.name;
186 let mut has_behavior = false;
187 let mut is_override = false;
188 let mut is_notify = false;
189 if method_name == "__init__" {
190 collect_init_fields(func_node, src, field_names);
191 } else {
192 has_behavior = true;
193 }
194 let sc = func_node
195 .child_by_field_name("body")
196 .map(|b| count_self_calls(b, src))
197 .unwrap_or(0);
198 if method_name.starts_with("__") && method_name.ends_with("__") && method_name != "__init__" {
199 is_override = true;
200 }
201 if method_name.contains("notify") || method_name.contains("emit") {
202 is_notify = true;
203 }
204 f.is_exported = !method_name.starts_with('_');
205 (has_behavior, is_override, is_notify, sc)
206}
207
208struct ClassScan {
209 methods: Vec<FunctionInfo>,
210 field_names: Vec<String>,
211 delegating_count: usize,
212 has_behavior: bool,
213 override_count: usize,
214 self_call_count: usize,
215 has_notify_method: bool,
216}
217
218fn scan_class_methods(
219 body: Node,
220 src: &[u8],
221 imports_map: &crate::type_ref::ImportsMap,
222) -> ClassScan {
223 let mut s = ClassScan {
224 methods: Vec::new(),
225 field_names: Vec::new(),
226 delegating_count: 0,
227 has_behavior: false,
228 override_count: 0,
229 self_call_count: 0,
230 has_notify_method: false,
231 };
232 let mut cursor = body.walk();
233 for child in body.children(&mut cursor) {
234 let Some(func_node) = find_method_def(child) else {
235 continue;
236 };
237 let Some(mut f) = extract_function(func_node, src, imports_map) else {
238 continue;
239 };
240 if f.is_delegating {
241 s.delegating_count += 1;
242 }
243 let (behav, over, notify, sc) = process_method(func_node, &mut f, src, &mut s.field_names);
244 s.has_behavior |= behav;
245 if over {
246 s.override_count += 1;
247 }
248 if notify {
249 s.has_notify_method = true;
250 }
251 s.self_call_count += sc;
252 s.methods.push(f);
253 }
254 s
255}
256
257fn extract_class(
258 node: Node,
259 src: &[u8],
260 imports_map: &crate::type_ref::ImportsMap,
261 top_functions: &mut Vec<FunctionInfo>,
262) -> Option<ClassInfo> {
263 let name_node = node.child_by_field_name("name")?;
264 let name = node_text(name_node, src).to_string();
265 let name_col = name_node.start_position().column;
266 let name_end_col = name_node.end_position().column;
267 let start_line = node.start_position().row + 1;
268 let end_line = node.end_position().row + 1;
269 let body = node.child_by_field_name("body")?;
270 let s = scan_class_methods(body, src, imports_map);
271 let method_count = s.methods.len();
272 top_functions.extend(s.methods);
273
274 Some(ClassInfo {
275 name,
276 start_line,
277 end_line,
278 name_col,
279 name_end_col,
280 line_count: end_line - start_line + 1,
281 method_count,
282 is_exported: true,
283 delegating_method_count: s.delegating_count,
284 field_count: s.field_names.len(),
285 has_listener_field: s.field_names.iter().any(|n| has_listener_name(n)),
286 field_names: s.field_names,
287 field_types: Vec::new(),
288 has_behavior: s.has_behavior,
289 is_interface: has_only_pass_or_ellipsis(body, src),
290 parent_name: extract_parent_name(node, src),
291 override_count: s.override_count,
292 self_call_count: s.self_call_count,
293 has_notify_method: s.has_notify_method,
294 })
295}
296
297fn collect_import(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
300 let line = node.start_position().row + 1;
301 let col = node.start_position().column;
302 let mut cursor = node.walk();
303 for child in node.children(&mut cursor) {
304 if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
305 let text = node_text(child, src);
306 imports.push(ImportInfo {
307 source: text.to_string(),
308 line,
309 col,
310 ..Default::default()
311 });
312 }
313 }
314}
315
316fn collect_import_from(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
317 let line = node.start_position().row + 1;
318 let col = node.start_position().column;
319 let module = node
320 .child_by_field_name("module_name")
321 .map(|n| node_text(n, src).to_string())
322 .unwrap_or_default();
323 let mut cursor = node.walk();
324 let mut has_names = false;
325 for child in node.children(&mut cursor) {
326 if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
327 let n = node_text(child, src).to_string();
328 if n != module {
329 imports.push(ImportInfo {
330 source: format!("{module}.{n}"),
331 line,
332 col,
333 ..Default::default()
334 });
335 has_names = true;
336 }
337 }
338 }
339 if !has_names {
340 imports.push(ImportInfo {
341 source: module,
342 line,
343 col,
344 ..Default::default()
345 });
346 }
347}
348
349fn node_text<'a>(node: Node, src: &'a [u8]) -> &'a str {
352 node.utf8_text(src).unwrap_or("")
353}
354
355fn count_complexity(node: Node) -> usize {
356 let mut complexity = 1usize;
357 let mut cursor = node.walk();
358 visit_all(node, &mut cursor, &mut |n| {
359 match n.kind() {
360 "if_statement"
361 | "elif_clause"
362 | "for_statement"
363 | "while_statement"
364 | "except_clause"
365 | "with_statement"
366 | "assert_statement"
367 | "conditional_expression"
368 | "boolean_operator"
369 | "list_comprehension"
370 | "set_comprehension"
371 | "dictionary_comprehension"
372 | "generator_expression" => {
373 complexity += 1;
374 }
375 "match_statement" => {} "case_clause" => {
377 complexity += 1;
378 }
379 _ => {}
380 }
381 });
382 complexity
383}
384
385fn hash_ast_structure(node: Node) -> u64 {
386 let mut hasher = DefaultHasher::new();
387 hash_node(node, &mut hasher);
388 hasher.finish()
389}
390
391fn hash_node(node: Node, hasher: &mut DefaultHasher) {
392 node.kind().hash(hasher);
393 let mut cursor = node.walk();
394 for child in node.children(&mut cursor) {
395 hash_node(child, hasher);
396 }
397}
398
399fn max_chain_depth(node: Node) -> usize {
400 let mut max = 0usize;
401 let mut cursor = node.walk();
402 visit_all(node, &mut cursor, &mut |n| {
403 if n.kind() == "attribute" {
404 let depth = chain_len(n);
405 if depth > max {
406 max = depth;
407 }
408 }
409 });
410 max
411}
412
413fn chain_len(node: Node) -> usize {
414 let mut depth = 0usize;
415 let mut current = node;
416 while current.kind() == "attribute" || current.kind() == "call" {
417 if current.kind() == "attribute" {
418 depth += 1;
419 }
420 if let Some(obj) = current.child(0) {
421 current = obj;
422 } else {
423 break;
424 }
425 }
426 depth
427}
428
429fn count_match_arms(node: Node) -> usize {
430 let mut count = 0usize;
431 let mut cursor = node.walk();
432 visit_all(node, &mut cursor, &mut |n| {
433 if n.kind() == "case_clause" {
434 count += 1;
435 }
436 });
437 count
438}
439
440fn collect_external_refs(node: Node, src: &[u8]) -> Vec<String> {
441 let mut refs = Vec::new();
442 let mut cursor = node.walk();
443 visit_all(node, &mut cursor, &mut |n| {
444 if n.kind() != "attribute" {
445 return;
446 }
447 let Some(obj) = n.child(0) else { return };
448 let text = node_text(obj, src);
449 if text != "self"
450 && !text.is_empty()
451 && text.starts_with(|c: char| c.is_lowercase())
452 && !refs.contains(&text.to_string())
453 {
454 refs.push(text.to_string());
455 }
456 });
457 refs
458}
459
460fn unwrap_single_call(body: Node) -> Option<Node> {
461 let mut c = body.walk();
462 let stmts: Vec<Node> = body
463 .children(&mut c)
464 .filter(|n| !n.is_extra() && n.kind() != "pass_statement" && n.kind() != "comment")
465 .collect();
466 if stmts.len() != 1 {
467 return None;
468 }
469 let stmt = stmts[0];
470 match stmt.kind() {
471 "return_statement" => stmt.child(1).filter(|v| v.kind() == "call"),
472 "expression_statement" => stmt.child(0).filter(|v| v.kind() == "call"),
473 _ => None,
474 }
475}
476
477fn check_delegating(body: Node, src: &[u8]) -> bool {
478 let Some(func) = unwrap_single_call(body).and_then(|c| c.child(0)) else {
479 return false;
480 };
481 let text = node_text(func, src);
482 text.contains('.') && !text.starts_with("self.")
483}
484
485fn count_comment_lines(node: Node, src: &[u8]) -> usize {
486 let mut count = 0usize;
487 let mut cursor = node.walk();
488 visit_all(node, &mut cursor, &mut |n| {
489 if n.kind() == "comment" {
490 count += 1;
491 } else if n.kind() == "string" || n.kind() == "expression_statement" {
492 let text = node_text(n, src);
494 if text.starts_with("\"\"\"") || text.starts_with("'''") {
495 count += text.lines().count();
496 }
497 }
498 });
499 count
500}
501
502fn collect_self_refs(body: Node, src: &[u8]) -> Vec<String> {
503 let mut refs = Vec::new();
504 let mut cursor = body.walk();
505 visit_all(body, &mut cursor, &mut |n| {
506 if n.kind() != "attribute" {
507 return;
508 }
509 let is_self = n.child(0).is_some_and(|o| node_text(o, src) == "self");
510 if !is_self {
511 return;
512 }
513 if let Some(attr) = n.child_by_field_name("attribute") {
514 let name = node_text(attr, src).to_string();
515 if !refs.contains(&name) {
516 refs.push(name);
517 }
518 }
519 });
520 refs
521}
522
523fn collect_none_checks(body: Node, src: &[u8]) -> Vec<String> {
524 let mut fields = Vec::new();
525 let mut cursor = body.walk();
526 visit_all(body, &mut cursor, &mut |n| {
527 if n.kind() != "comparison_operator" {
528 return;
529 }
530 let text = node_text(n, src);
531 if !text.contains("is None") && !text.contains("is not None") && !text.contains("== None") {
532 return;
533 }
534 if let Some(left) = n.child(0) {
535 let name = node_text(left, src).to_string();
536 if !fields.contains(&name) {
537 fields.push(name);
538 }
539 }
540 });
541 fields
542}
543
544fn is_self_or_cls(name: &str) -> bool {
545 name == "self" || name == "cls"
546}
547
548fn param_name_and_type(child: Node, src: &[u8]) -> Option<(String, String)> {
549 match child.kind() {
550 "identifier" => {
551 let name = node_text(child, src);
552 (!is_self_or_cls(name)).then(|| (name.to_string(), "Any".to_string()))
553 }
554 "typed_parameter" | "default_parameter" | "typed_default_parameter" => {
555 let name = child
556 .child_by_field_name("name")
557 .or_else(|| child.child(0))
558 .map(|n| node_text(n, src))
559 .unwrap_or("");
560 if is_self_or_cls(name) {
561 return None;
562 }
563 let ty = child
564 .child_by_field_name("type")
565 .map(|n| node_text(n, src).to_string())
566 .unwrap_or_else(|| "Any".to_string());
567 Some((name.to_string(), ty))
568 }
569 "list_splat_pattern" | "dictionary_splat_pattern" => {
570 Some(("*".to_string(), "Any".to_string()))
571 }
572 _ => None,
573 }
574}
575
576fn extract_params(
577 params_node: Node,
578 src: &[u8],
579 imports_map: &crate::type_ref::ImportsMap,
580) -> (usize, Vec<cha_core::TypeRef>) {
581 let mut count = 0usize;
582 let mut types = Vec::new();
583 let mut cursor = params_node.walk();
584 for child in params_node.children(&mut cursor) {
585 if let Some((_name, ty)) = param_name_and_type(child, src) {
586 count += 1;
587 types.push(crate::type_ref::resolve(ty, imports_map));
588 }
589 }
590 (count, types)
591}
592
593fn count_optional(params_node: Node) -> usize {
594 let mut count = 0usize;
595 let mut cursor = params_node.walk();
596 for child in params_node.children(&mut cursor) {
597 if child.kind() == "default_parameter" || child.kind() == "typed_default_parameter" {
598 count += 1;
599 }
600 }
601 count
602}
603
604fn collect_init_fields(func_node: Node, src: &[u8], fields: &mut Vec<String>) {
605 let Some(body) = func_node.child_by_field_name("body") else {
606 return;
607 };
608 let mut cursor = body.walk();
609 visit_all(body, &mut cursor, &mut |n| {
610 if n.kind() != "assignment" {
611 return;
612 }
613 let Some(left) = n.child_by_field_name("left") else {
614 return;
615 };
616 if left.kind() != "attribute" {
617 return;
618 }
619 let is_self = left.child(0).is_some_and(|o| node_text(o, src) == "self");
620 if !is_self {
621 return;
622 }
623 if let Some(attr) = left.child_by_field_name("attribute") {
624 let name = node_text(attr, src).to_string();
625 if !fields.contains(&name) {
626 fields.push(name);
627 }
628 }
629 });
630}
631
632fn count_self_calls(body: Node, src: &[u8]) -> usize {
633 let mut count = 0;
634 let mut cursor = body.walk();
635 visit_all(body, &mut cursor, &mut |n| {
636 if n.kind() != "call" {
637 return;
638 }
639 let is_self_call = n
640 .child(0)
641 .filter(|f| f.kind() == "attribute")
642 .and_then(|f| f.child(0))
643 .is_some_and(|obj| node_text(obj, src) == "self");
644 if is_self_call {
645 count += 1;
646 }
647 });
648 count
649}
650
651fn is_stub_body(node: Node, src: &[u8]) -> bool {
652 node.child_by_field_name("body")
653 .is_none_or(|b| has_only_pass_or_ellipsis(b, src))
654}
655
656fn has_only_pass_or_ellipsis(body: Node, src: &[u8]) -> bool {
657 let mut cursor = body.walk();
658 for child in body.children(&mut cursor) {
659 let ok = match child.kind() {
660 "pass_statement" | "ellipsis" | "comment" => true,
661 "expression_statement" => child.child(0).is_none_or(|expr| {
662 let text = node_text(expr, src);
663 text == "..." || text.starts_with("\"\"\"") || text.starts_with("'''")
664 }),
665 "function_definition" => is_stub_body(child, src),
666 "decorated_definition" => {
667 let mut inner = child.walk();
668 child
669 .children(&mut inner)
670 .filter(|c| c.kind() == "function_definition")
671 .all(|c| is_stub_body(c, src))
672 }
673 _ => false,
674 };
675 if !ok {
676 return false;
677 }
678 }
679 true
680}
681
682fn cognitive_complexity_py(node: tree_sitter::Node) -> usize {
683 let mut score = 0;
684 cc_walk_py(node, 0, &mut score);
685 score
686}
687
688fn cc_walk_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
689 match node.kind() {
690 "if_statement" => {
691 *score += 1 + nesting;
692 cc_children_py(node, nesting + 1, score);
693 return;
694 }
695 "for_statement" | "while_statement" => {
696 *score += 1 + nesting;
697 cc_children_py(node, nesting + 1, score);
698 return;
699 }
700 "match_statement" => {
701 *score += 1 + nesting;
702 cc_children_py(node, nesting + 1, score);
703 return;
704 }
705 "elif_clause" | "else_clause" => {
706 *score += 1;
707 }
708 "boolean_operator" => {
709 *score += 1;
710 }
711 "except_clause" => {
712 *score += 1 + nesting;
713 cc_children_py(node, nesting + 1, score);
714 return;
715 }
716 "lambda" => {
717 cc_children_py(node, nesting + 1, score);
718 return;
719 }
720 _ => {}
721 }
722 cc_children_py(node, nesting, score);
723}
724
725fn cc_children_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
726 let mut cursor = node.walk();
727 for child in node.children(&mut cursor) {
728 cc_walk_py(child, nesting, score);
729 }
730}
731
732fn extract_match_target_py(body: tree_sitter::Node, src: &[u8]) -> Option<String> {
733 let mut target = None;
734 let mut cursor = body.walk();
735 visit_all(body, &mut cursor, &mut |n| {
736 if n.kind() == "match_statement"
737 && target.is_none()
738 && let Some(subj) = n.child_by_field_name("subject")
739 {
740 target = Some(node_text(subj, src).to_string());
741 }
742 });
743 target
744}
745
746fn collect_calls_py(body: tree_sitter::Node, src: &[u8]) -> Vec<String> {
747 let mut calls = Vec::new();
748 let mut cursor = body.walk();
749 visit_all(body, &mut cursor, &mut |n| {
750 if n.kind() == "call"
751 && let Some(func) = n.child(0)
752 {
753 let name = node_text(func, src).to_string();
754 if !calls.contains(&name) {
755 calls.push(name);
756 }
757 }
758 });
759 calls
760}
761
762fn collect_comments(root: Node, src: &[u8]) -> Vec<cha_core::CommentInfo> {
763 let mut comments = Vec::new();
764 let mut cursor = root.walk();
765 visit_all(root, &mut cursor, &mut |n| {
766 if n.kind().contains("comment") {
767 comments.push(cha_core::CommentInfo {
768 text: node_text(n, src).to_string(),
769 line: n.start_position().row + 1,
770 });
771 }
772 });
773 comments
774}
775
776fn visit_all<F: FnMut(Node)>(node: Node, cursor: &mut tree_sitter::TreeCursor, f: &mut F) {
777 f(node);
778 if cursor.goto_first_child() {
779 loop {
780 let child_node = cursor.node();
781 let mut child_cursor = child_node.walk();
782 visit_all(child_node, &mut child_cursor, f);
783 if !cursor.goto_next_sibling() {
784 break;
785 }
786 }
787 cursor.goto_parent();
788 }
789}