1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3
4use cha_core::{ClassInfo, FunctionInfo, ImportInfo, SourceFile, SourceModel};
5use tree_sitter::{Node, Parser};
6
7use crate::LanguageParser;
8
9pub struct PythonParser;
10
11impl LanguageParser for PythonParser {
12 fn language_name(&self) -> &str {
13 "python"
14 }
15
16 fn parse(&self, file: &SourceFile) -> Option<SourceModel> {
17 let mut parser = Parser::new();
18 parser
19 .set_language(&tree_sitter_python::LANGUAGE.into())
20 .ok()?;
21 let tree = parser.parse(&file.content, None)?;
22 let root = tree.root_node();
23 let src = file.content.as_bytes();
24
25 let mut functions = Vec::new();
26 let mut classes = Vec::new();
27 let mut imports = Vec::new();
28
29 let imports_map = crate::python_imports::build(root, src);
30 collect_top_level(
31 root,
32 src,
33 &imports_map,
34 &mut functions,
35 &mut classes,
36 &mut imports,
37 );
38
39 Some(SourceModel {
40 language: "python".into(),
41 total_lines: file.line_count(),
42 functions,
43 classes,
44 imports,
45 comments: collect_comments(root, src),
46 type_aliases: vec![], })
48 }
49}
50
51fn push_definition(
52 node: Node,
53 src: &[u8],
54 imports_map: &crate::type_ref::ImportsMap,
55 functions: &mut Vec<FunctionInfo>,
56 classes: &mut Vec<ClassInfo>,
57) {
58 match node.kind() {
59 "function_definition" => {
60 if let Some(f) = extract_function(node, src, imports_map) {
61 functions.push(f);
62 }
63 }
64 "class_definition" => {
65 if let Some(c) = extract_class(node, src, imports_map, functions) {
66 classes.push(c);
67 }
68 }
69 _ => {}
70 }
71}
72
73fn collect_top_level(
74 node: Node,
75 src: &[u8],
76 imports_map: &crate::type_ref::ImportsMap,
77 functions: &mut Vec<FunctionInfo>,
78 classes: &mut Vec<ClassInfo>,
79 imports: &mut Vec<ImportInfo>,
80) {
81 let mut cursor = node.walk();
82 for child in node.children(&mut cursor) {
83 match child.kind() {
84 "function_definition" | "class_definition" => {
85 push_definition(child, src, imports_map, functions, classes);
86 }
87 "import_statement" => collect_import(child, src, imports),
88 "import_from_statement" => collect_import_from(child, src, imports),
89 "decorated_definition" => {
90 let mut inner = child.walk();
91 for c in child.children(&mut inner) {
92 push_definition(c, src, imports_map, functions, classes);
93 }
94 }
95 _ => {}
96 }
97 }
98}
99
100fn extract_function(
101 node: Node,
102 src: &[u8],
103 imports_map: &crate::type_ref::ImportsMap,
104) -> Option<FunctionInfo> {
105 let name_node = node.child_by_field_name("name")?;
106 let name = node_text(name_node, src).to_string();
107 let name_col = name_node.start_position().column;
108 let name_end_col = name_node.end_position().column;
109 let start_line = node.start_position().row + 1;
110 let end_line = node.end_position().row + 1;
111 let body = node.child_by_field_name("body");
112 let params = node.child_by_field_name("parameters");
113 let (param_count, param_types) = params
114 .map(|p| extract_params(p, src, imports_map))
115 .unwrap_or((0, vec![]));
116
117 Some(FunctionInfo {
118 name,
119 start_line,
120 end_line,
121 name_col,
122 name_end_col,
123 line_count: end_line - start_line + 1,
124 complexity: count_complexity(node),
125 body_hash: body.map(hash_ast_structure),
126 is_exported: true,
127 parameter_count: param_count,
128 parameter_types: param_types,
129 chain_depth: body.map(max_chain_depth).unwrap_or(0),
130 switch_arms: body.map(count_match_arms).unwrap_or(0),
131 external_refs: body
132 .map(|b| collect_external_refs(b, src))
133 .unwrap_or_default(),
134 is_delegating: body.map(|b| check_delegating(b, src)).unwrap_or(false),
135 comment_lines: count_comment_lines(node, src),
136 referenced_fields: body.map(|b| collect_self_refs(b, src)).unwrap_or_default(),
137 null_check_fields: body
138 .map(|b| collect_none_checks(b, src))
139 .unwrap_or_default(),
140 switch_dispatch_target: body.and_then(|b| extract_match_target_py(b, src)),
141 optional_param_count: params.map(count_optional).unwrap_or(0),
142 called_functions: body.map(|b| collect_calls_py(b, src)).unwrap_or_default(),
143 cognitive_complexity: body.map(cognitive_complexity_py).unwrap_or(0),
144 })
145}
146
147fn find_method_def(child: Node) -> Option<Node> {
148 if child.kind() == "function_definition" {
149 return Some(child);
150 }
151 if child.kind() == "decorated_definition" {
152 let mut inner = child.walk();
153 return child
154 .children(&mut inner)
155 .find(|c| c.kind() == "function_definition");
156 }
157 None
158}
159
160fn extract_parent_name(node: Node, src: &[u8]) -> Option<String> {
161 node.child_by_field_name("superclasses").and_then(|sc| {
162 let mut c = sc.walk();
163 sc.children(&mut c)
164 .find(|n| n.kind() != "(" && n.kind() != ")" && n.kind() != ",")
165 .map(|n| node_text(n, src).to_string())
166 })
167}
168
169fn has_listener_name(name: &str) -> bool {
170 name.contains("listener")
171 || name.contains("handler")
172 || name.contains("callback")
173 || name.contains("observer")
174}
175
176fn process_method(
177 func_node: Node,
178 f: &mut FunctionInfo,
179 src: &[u8],
180 field_names: &mut Vec<String>,
181) -> (bool, bool, bool, usize) {
182 let method_name = &f.name;
183 let mut has_behavior = false;
184 let mut is_override = false;
185 let mut is_notify = false;
186 if method_name == "__init__" {
187 collect_init_fields(func_node, src, field_names);
188 } else {
189 has_behavior = true;
190 }
191 let sc = func_node
192 .child_by_field_name("body")
193 .map(|b| count_self_calls(b, src))
194 .unwrap_or(0);
195 if method_name.starts_with("__") && method_name.ends_with("__") && method_name != "__init__" {
196 is_override = true;
197 }
198 if method_name.contains("notify") || method_name.contains("emit") {
199 is_notify = true;
200 }
201 f.is_exported = !method_name.starts_with('_');
202 (has_behavior, is_override, is_notify, sc)
203}
204
205struct ClassScan {
206 methods: Vec<FunctionInfo>,
207 field_names: Vec<String>,
208 delegating_count: usize,
209 has_behavior: bool,
210 override_count: usize,
211 self_call_count: usize,
212 has_notify_method: bool,
213}
214
215fn scan_class_methods(
216 body: Node,
217 src: &[u8],
218 imports_map: &crate::type_ref::ImportsMap,
219) -> ClassScan {
220 let mut s = ClassScan {
221 methods: Vec::new(),
222 field_names: Vec::new(),
223 delegating_count: 0,
224 has_behavior: false,
225 override_count: 0,
226 self_call_count: 0,
227 has_notify_method: false,
228 };
229 let mut cursor = body.walk();
230 for child in body.children(&mut cursor) {
231 let Some(func_node) = find_method_def(child) else {
232 continue;
233 };
234 let Some(mut f) = extract_function(func_node, src, imports_map) else {
235 continue;
236 };
237 if f.is_delegating {
238 s.delegating_count += 1;
239 }
240 let (behav, over, notify, sc) = process_method(func_node, &mut f, src, &mut s.field_names);
241 s.has_behavior |= behav;
242 if over {
243 s.override_count += 1;
244 }
245 if notify {
246 s.has_notify_method = true;
247 }
248 s.self_call_count += sc;
249 s.methods.push(f);
250 }
251 s
252}
253
254fn extract_class(
255 node: Node,
256 src: &[u8],
257 imports_map: &crate::type_ref::ImportsMap,
258 top_functions: &mut Vec<FunctionInfo>,
259) -> Option<ClassInfo> {
260 let name_node = node.child_by_field_name("name")?;
261 let name = node_text(name_node, src).to_string();
262 let name_col = name_node.start_position().column;
263 let name_end_col = name_node.end_position().column;
264 let start_line = node.start_position().row + 1;
265 let end_line = node.end_position().row + 1;
266 let body = node.child_by_field_name("body")?;
267 let s = scan_class_methods(body, src, imports_map);
268 let method_count = s.methods.len();
269 top_functions.extend(s.methods);
270
271 Some(ClassInfo {
272 name,
273 start_line,
274 end_line,
275 name_col,
276 name_end_col,
277 line_count: end_line - start_line + 1,
278 method_count,
279 is_exported: true,
280 delegating_method_count: s.delegating_count,
281 field_count: s.field_names.len(),
282 has_listener_field: s.field_names.iter().any(|n| has_listener_name(n)),
283 field_names: s.field_names,
284 field_types: Vec::new(),
285 has_behavior: s.has_behavior,
286 is_interface: has_only_pass_or_ellipsis(body, src),
287 parent_name: extract_parent_name(node, src),
288 override_count: s.override_count,
289 self_call_count: s.self_call_count,
290 has_notify_method: s.has_notify_method,
291 })
292}
293
294fn collect_import(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
297 let line = node.start_position().row + 1;
298 let col = node.start_position().column;
299 let mut cursor = node.walk();
300 for child in node.children(&mut cursor) {
301 if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
302 let text = node_text(child, src);
303 imports.push(ImportInfo {
304 source: text.to_string(),
305 line,
306 col,
307 ..Default::default()
308 });
309 }
310 }
311}
312
313fn collect_import_from(node: Node, src: &[u8], imports: &mut Vec<ImportInfo>) {
314 let line = node.start_position().row + 1;
315 let col = node.start_position().column;
316 let module = node
317 .child_by_field_name("module_name")
318 .map(|n| node_text(n, src).to_string())
319 .unwrap_or_default();
320 let mut cursor = node.walk();
321 let mut has_names = false;
322 for child in node.children(&mut cursor) {
323 if child.kind() == "dotted_name" || child.kind() == "aliased_import" {
324 let n = node_text(child, src).to_string();
325 if n != module {
326 imports.push(ImportInfo {
327 source: format!("{module}.{n}"),
328 line,
329 col,
330 ..Default::default()
331 });
332 has_names = true;
333 }
334 }
335 }
336 if !has_names {
337 imports.push(ImportInfo {
338 source: module,
339 line,
340 col,
341 ..Default::default()
342 });
343 }
344}
345
346fn node_text<'a>(node: Node, src: &'a [u8]) -> &'a str {
349 node.utf8_text(src).unwrap_or("")
350}
351
352fn count_complexity(node: Node) -> usize {
353 let mut complexity = 1usize;
354 let mut cursor = node.walk();
355 visit_all(node, &mut cursor, &mut |n| {
356 match n.kind() {
357 "if_statement"
358 | "elif_clause"
359 | "for_statement"
360 | "while_statement"
361 | "except_clause"
362 | "with_statement"
363 | "assert_statement"
364 | "conditional_expression"
365 | "boolean_operator"
366 | "list_comprehension"
367 | "set_comprehension"
368 | "dictionary_comprehension"
369 | "generator_expression" => {
370 complexity += 1;
371 }
372 "match_statement" => {} "case_clause" => {
374 complexity += 1;
375 }
376 _ => {}
377 }
378 });
379 complexity
380}
381
382fn hash_ast_structure(node: Node) -> u64 {
383 let mut hasher = DefaultHasher::new();
384 hash_node(node, &mut hasher);
385 hasher.finish()
386}
387
388fn hash_node(node: Node, hasher: &mut DefaultHasher) {
389 node.kind().hash(hasher);
390 let mut cursor = node.walk();
391 for child in node.children(&mut cursor) {
392 hash_node(child, hasher);
393 }
394}
395
396fn max_chain_depth(node: Node) -> usize {
397 let mut max = 0usize;
398 let mut cursor = node.walk();
399 visit_all(node, &mut cursor, &mut |n| {
400 if n.kind() == "attribute" {
401 let depth = chain_len(n);
402 if depth > max {
403 max = depth;
404 }
405 }
406 });
407 max
408}
409
410fn chain_len(node: Node) -> usize {
411 let mut depth = 0usize;
412 let mut current = node;
413 while current.kind() == "attribute" || current.kind() == "call" {
414 if current.kind() == "attribute" {
415 depth += 1;
416 }
417 if let Some(obj) = current.child(0) {
418 current = obj;
419 } else {
420 break;
421 }
422 }
423 depth
424}
425
426fn count_match_arms(node: Node) -> usize {
427 let mut count = 0usize;
428 let mut cursor = node.walk();
429 visit_all(node, &mut cursor, &mut |n| {
430 if n.kind() == "case_clause" {
431 count += 1;
432 }
433 });
434 count
435}
436
437fn collect_external_refs(node: Node, src: &[u8]) -> Vec<String> {
438 let mut refs = Vec::new();
439 let mut cursor = node.walk();
440 visit_all(node, &mut cursor, &mut |n| {
441 if n.kind() != "attribute" {
442 return;
443 }
444 let Some(obj) = n.child(0) else { return };
445 let text = node_text(obj, src);
446 if text != "self"
447 && !text.is_empty()
448 && text.starts_with(|c: char| c.is_lowercase())
449 && !refs.contains(&text.to_string())
450 {
451 refs.push(text.to_string());
452 }
453 });
454 refs
455}
456
457fn unwrap_single_call(body: Node) -> Option<Node> {
458 let mut c = body.walk();
459 let stmts: Vec<Node> = body
460 .children(&mut c)
461 .filter(|n| !n.is_extra() && n.kind() != "pass_statement" && n.kind() != "comment")
462 .collect();
463 if stmts.len() != 1 {
464 return None;
465 }
466 let stmt = stmts[0];
467 match stmt.kind() {
468 "return_statement" => stmt.child(1).filter(|v| v.kind() == "call"),
469 "expression_statement" => stmt.child(0).filter(|v| v.kind() == "call"),
470 _ => None,
471 }
472}
473
474fn check_delegating(body: Node, src: &[u8]) -> bool {
475 let Some(func) = unwrap_single_call(body).and_then(|c| c.child(0)) else {
476 return false;
477 };
478 let text = node_text(func, src);
479 text.contains('.') && !text.starts_with("self.")
480}
481
482fn count_comment_lines(node: Node, src: &[u8]) -> usize {
483 let mut count = 0usize;
484 let mut cursor = node.walk();
485 visit_all(node, &mut cursor, &mut |n| {
486 if n.kind() == "comment" {
487 count += 1;
488 } else if n.kind() == "string" || n.kind() == "expression_statement" {
489 let text = node_text(n, src);
491 if text.starts_with("\"\"\"") || text.starts_with("'''") {
492 count += text.lines().count();
493 }
494 }
495 });
496 count
497}
498
499fn collect_self_refs(body: Node, src: &[u8]) -> Vec<String> {
500 let mut refs = Vec::new();
501 let mut cursor = body.walk();
502 visit_all(body, &mut cursor, &mut |n| {
503 if n.kind() != "attribute" {
504 return;
505 }
506 let is_self = n.child(0).is_some_and(|o| node_text(o, src) == "self");
507 if !is_self {
508 return;
509 }
510 if let Some(attr) = n.child_by_field_name("attribute") {
511 let name = node_text(attr, src).to_string();
512 if !refs.contains(&name) {
513 refs.push(name);
514 }
515 }
516 });
517 refs
518}
519
520fn collect_none_checks(body: Node, src: &[u8]) -> Vec<String> {
521 let mut fields = Vec::new();
522 let mut cursor = body.walk();
523 visit_all(body, &mut cursor, &mut |n| {
524 if n.kind() != "comparison_operator" {
525 return;
526 }
527 let text = node_text(n, src);
528 if !text.contains("is None") && !text.contains("is not None") && !text.contains("== None") {
529 return;
530 }
531 if let Some(left) = n.child(0) {
532 let name = node_text(left, src).to_string();
533 if !fields.contains(&name) {
534 fields.push(name);
535 }
536 }
537 });
538 fields
539}
540
541fn is_self_or_cls(name: &str) -> bool {
542 name == "self" || name == "cls"
543}
544
545fn param_name_and_type(child: Node, src: &[u8]) -> Option<(String, String)> {
546 match child.kind() {
547 "identifier" => {
548 let name = node_text(child, src);
549 (!is_self_or_cls(name)).then(|| (name.to_string(), "Any".to_string()))
550 }
551 "typed_parameter" | "default_parameter" | "typed_default_parameter" => {
552 let name = child
553 .child_by_field_name("name")
554 .or_else(|| child.child(0))
555 .map(|n| node_text(n, src))
556 .unwrap_or("");
557 if is_self_or_cls(name) {
558 return None;
559 }
560 let ty = child
561 .child_by_field_name("type")
562 .map(|n| node_text(n, src).to_string())
563 .unwrap_or_else(|| "Any".to_string());
564 Some((name.to_string(), ty))
565 }
566 "list_splat_pattern" | "dictionary_splat_pattern" => {
567 Some(("*".to_string(), "Any".to_string()))
568 }
569 _ => None,
570 }
571}
572
573fn extract_params(
574 params_node: Node,
575 src: &[u8],
576 imports_map: &crate::type_ref::ImportsMap,
577) -> (usize, Vec<cha_core::TypeRef>) {
578 let mut count = 0usize;
579 let mut types = Vec::new();
580 let mut cursor = params_node.walk();
581 for child in params_node.children(&mut cursor) {
582 if let Some((_name, ty)) = param_name_and_type(child, src) {
583 count += 1;
584 types.push(crate::type_ref::resolve(ty, imports_map));
585 }
586 }
587 (count, types)
588}
589
590fn count_optional(params_node: Node) -> usize {
591 let mut count = 0usize;
592 let mut cursor = params_node.walk();
593 for child in params_node.children(&mut cursor) {
594 if child.kind() == "default_parameter" || child.kind() == "typed_default_parameter" {
595 count += 1;
596 }
597 }
598 count
599}
600
601fn collect_init_fields(func_node: Node, src: &[u8], fields: &mut Vec<String>) {
602 let Some(body) = func_node.child_by_field_name("body") else {
603 return;
604 };
605 let mut cursor = body.walk();
606 visit_all(body, &mut cursor, &mut |n| {
607 if n.kind() != "assignment" {
608 return;
609 }
610 let Some(left) = n.child_by_field_name("left") else {
611 return;
612 };
613 if left.kind() != "attribute" {
614 return;
615 }
616 let is_self = left.child(0).is_some_and(|o| node_text(o, src) == "self");
617 if !is_self {
618 return;
619 }
620 if let Some(attr) = left.child_by_field_name("attribute") {
621 let name = node_text(attr, src).to_string();
622 if !fields.contains(&name) {
623 fields.push(name);
624 }
625 }
626 });
627}
628
629fn count_self_calls(body: Node, src: &[u8]) -> usize {
630 let mut count = 0;
631 let mut cursor = body.walk();
632 visit_all(body, &mut cursor, &mut |n| {
633 if n.kind() != "call" {
634 return;
635 }
636 let is_self_call = n
637 .child(0)
638 .filter(|f| f.kind() == "attribute")
639 .and_then(|f| f.child(0))
640 .is_some_and(|obj| node_text(obj, src) == "self");
641 if is_self_call {
642 count += 1;
643 }
644 });
645 count
646}
647
648fn is_stub_body(node: Node, src: &[u8]) -> bool {
649 node.child_by_field_name("body")
650 .is_none_or(|b| has_only_pass_or_ellipsis(b, src))
651}
652
653fn has_only_pass_or_ellipsis(body: Node, src: &[u8]) -> bool {
654 let mut cursor = body.walk();
655 for child in body.children(&mut cursor) {
656 let ok = match child.kind() {
657 "pass_statement" | "ellipsis" | "comment" => true,
658 "expression_statement" => child.child(0).is_none_or(|expr| {
659 let text = node_text(expr, src);
660 text == "..." || text.starts_with("\"\"\"") || text.starts_with("'''")
661 }),
662 "function_definition" => is_stub_body(child, src),
663 "decorated_definition" => {
664 let mut inner = child.walk();
665 child
666 .children(&mut inner)
667 .filter(|c| c.kind() == "function_definition")
668 .all(|c| is_stub_body(c, src))
669 }
670 _ => false,
671 };
672 if !ok {
673 return false;
674 }
675 }
676 true
677}
678
679fn cognitive_complexity_py(node: tree_sitter::Node) -> usize {
680 let mut score = 0;
681 cc_walk_py(node, 0, &mut score);
682 score
683}
684
685fn cc_walk_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
686 match node.kind() {
687 "if_statement" => {
688 *score += 1 + nesting;
689 cc_children_py(node, nesting + 1, score);
690 return;
691 }
692 "for_statement" | "while_statement" => {
693 *score += 1 + nesting;
694 cc_children_py(node, nesting + 1, score);
695 return;
696 }
697 "match_statement" => {
698 *score += 1 + nesting;
699 cc_children_py(node, nesting + 1, score);
700 return;
701 }
702 "elif_clause" | "else_clause" => {
703 *score += 1;
704 }
705 "boolean_operator" => {
706 *score += 1;
707 }
708 "except_clause" => {
709 *score += 1 + nesting;
710 cc_children_py(node, nesting + 1, score);
711 return;
712 }
713 "lambda" => {
714 cc_children_py(node, nesting + 1, score);
715 return;
716 }
717 _ => {}
718 }
719 cc_children_py(node, nesting, score);
720}
721
722fn cc_children_py(node: tree_sitter::Node, nesting: usize, score: &mut usize) {
723 let mut cursor = node.walk();
724 for child in node.children(&mut cursor) {
725 cc_walk_py(child, nesting, score);
726 }
727}
728
729fn extract_match_target_py(body: tree_sitter::Node, src: &[u8]) -> Option<String> {
730 let mut target = None;
731 let mut cursor = body.walk();
732 visit_all(body, &mut cursor, &mut |n| {
733 if n.kind() == "match_statement"
734 && target.is_none()
735 && let Some(subj) = n.child_by_field_name("subject")
736 {
737 target = Some(node_text(subj, src).to_string());
738 }
739 });
740 target
741}
742
743fn collect_calls_py(body: tree_sitter::Node, src: &[u8]) -> Vec<String> {
744 let mut calls = Vec::new();
745 let mut cursor = body.walk();
746 visit_all(body, &mut cursor, &mut |n| {
747 if n.kind() == "call"
748 && let Some(func) = n.child(0)
749 {
750 let name = node_text(func, src).to_string();
751 if !calls.contains(&name) {
752 calls.push(name);
753 }
754 }
755 });
756 calls
757}
758
759fn collect_comments(root: Node, src: &[u8]) -> Vec<cha_core::CommentInfo> {
760 let mut comments = Vec::new();
761 let mut cursor = root.walk();
762 visit_all(root, &mut cursor, &mut |n| {
763 if n.kind().contains("comment") {
764 comments.push(cha_core::CommentInfo {
765 text: node_text(n, src).to_string(),
766 line: n.start_position().row + 1,
767 });
768 }
769 });
770 comments
771}
772
773fn visit_all<F: FnMut(Node)>(node: Node, cursor: &mut tree_sitter::TreeCursor, f: &mut F) {
774 f(node);
775 if cursor.goto_first_child() {
776 loop {
777 let child_node = cursor.node();
778 let mut child_cursor = child_node.walk();
779 visit_all(child_node, &mut child_cursor, f);
780 if !cursor.goto_next_sibling() {
781 break;
782 }
783 }
784 cursor.goto_parent();
785 }
786}