Skip to main content

dk_engine/parser/
python_parser.rs

1use super::LanguageParser;
2use dk_core::{CallKind, Import, RawCallEdge, Result, Span, Symbol, SymbolKind, TypeInfo, Visibility};
3use std::path::Path;
4use tree_sitter::{Node, Parser, TreeCursor};
5use uuid::Uuid;
6
7/// Python parser backed by tree-sitter.
8///
9/// Extracts symbols, call edges, imports, and (stub) type information from
10/// Python source files.
11pub struct PythonParser;
12
13impl PythonParser {
14    pub fn new() -> Self {
15        Self
16    }
17
18    /// Create a configured tree-sitter parser for Python.
19    fn create_parser() -> Result<Parser> {
20        let mut parser = Parser::new();
21        parser
22            .set_language(&tree_sitter_python::LANGUAGE.into())
23            .map_err(|e| dk_core::Error::ParseError(format!("Failed to load Python grammar: {e}")))?;
24        Ok(parser)
25    }
26
27    /// Parse source bytes into a tree-sitter tree.
28    fn parse_tree(source: &[u8]) -> Result<tree_sitter::Tree> {
29        let mut parser = Self::create_parser()?;
30        parser
31            .parse(source, None)
32            .ok_or_else(|| dk_core::Error::ParseError("tree-sitter parse returned None".into()))
33    }
34
35    /// Get the text of a node as a UTF-8 string.
36    fn node_text<'a>(node: &Node, source: &'a [u8]) -> &'a str {
37        let text = &source[node.start_byte()..node.end_byte()];
38        std::str::from_utf8(text).unwrap_or("")
39    }
40
41    /// Determine visibility based on Python naming conventions.
42    /// Names starting with `_` are considered private; everything else is public.
43    fn name_visibility(name: &str) -> Visibility {
44        if name.starts_with('_') {
45            Visibility::Private
46        } else {
47            Visibility::Public
48        }
49    }
50
51    /// Extract the name from a function_definition or class_definition node.
52    fn node_name(node: &Node, source: &[u8]) -> Option<String> {
53        node.child_by_field_name("name")
54            .map(|n| Self::node_text(&n, source).to_string())
55    }
56
57    /// Extract the first line of the node's source text as the signature.
58    fn node_signature(node: &Node, source: &[u8]) -> Option<String> {
59        let text_str = Self::node_text(node, source);
60        let first_line = text_str.lines().next()?;
61        Some(first_line.trim().to_string())
62    }
63
64    /// Extract docstring from a function or class body.
65    ///
66    /// In Python, a docstring is the first statement in the body if it is an
67    /// `expression_statement` containing a `string` node.
68    fn extract_docstring(node: &Node, source: &[u8]) -> Option<String> {
69        // Look for the "body" field (block node)
70        let body = node.child_by_field_name("body")?;
71
72        // The first child of the block should be the potential docstring
73        let first_stmt = body.child(0)?;
74
75        if first_stmt.kind() == "expression_statement" {
76            let expr = first_stmt.child(0)?;
77            if expr.kind() == "string" {
78                let raw = Self::node_text(&expr, source);
79                // Strip triple-quote delimiters and clean up
80                let content = raw
81                    .strip_prefix("\"\"\"")
82                    .and_then(|s| s.strip_suffix("\"\"\""))
83                    .or_else(|| {
84                        raw.strip_prefix("'''")
85                            .and_then(|s| s.strip_suffix("'''"))
86                    })
87                    .unwrap_or(raw);
88                let trimmed = content.trim().to_string();
89                if !trimmed.is_empty() {
90                    return Some(trimmed);
91                }
92            }
93        }
94
95        None
96    }
97
98    /// Collect preceding `#` comments for a node.
99    ///
100    /// Preserves the `#` prefix so that AST merge can reconstruct valid Python.
101    /// Skips inline comments that belong to a preceding statement (e.g.
102    /// `x = 60  # 60 seconds` — the `# 60 seconds` is on the same line as
103    /// `x = 60` and should not be collected as a doc comment of the next symbol).
104    fn doc_comments(node: &Node, source: &[u8]) -> Option<String> {
105        let mut comments = Vec::new();
106        let mut sibling = node.prev_sibling();
107
108        while let Some(prev) = sibling {
109            if prev.kind() == "comment" {
110                // Skip inline comments: if this comment is on the same line
111                // as a preceding non-comment sibling, it belongs to that
112                // sibling, not to our node.
113                if let Some(before_comment) = prev.prev_sibling() {
114                    if before_comment.kind() != "comment"
115                        && before_comment.end_position().row == prev.start_position().row
116                    {
117                        break;
118                    }
119                }
120                let text = Self::node_text(&prev, source).trim().to_string();
121                comments.push(text);
122                sibling = prev.prev_sibling();
123                continue;
124            }
125            break;
126        }
127
128        if comments.is_empty() {
129            None
130        } else {
131            comments.reverse();
132            Some(comments.join("\n"))
133        }
134    }
135
136    /// Extract a symbol from a function_definition or class_definition node.
137    fn extract_symbol_from_def(
138        node: &Node,
139        source: &[u8],
140        file_path: &Path,
141    ) -> Option<Symbol> {
142        let kind = match node.kind() {
143            "function_definition" => SymbolKind::Function,
144            "class_definition" => SymbolKind::Class,
145            _ => return None,
146        };
147
148        let name = Self::node_name(node, source)?;
149        if name.is_empty() {
150            return None;
151        }
152
153        let visibility = Self::name_visibility(&name);
154        let signature = Self::node_signature(node, source);
155
156        // Try docstring first, fall back to preceding comments
157        let doc_comment = Self::extract_docstring(node, source)
158            .or_else(|| Self::doc_comments(node, source));
159
160        Some(Symbol {
161            id: Uuid::new_v4(),
162            name: name.clone(),
163            qualified_name: name,
164            kind,
165            visibility,
166            file_path: file_path.to_path_buf(),
167            span: Span {
168                start_byte: node.start_byte() as u32,
169                end_byte: node.end_byte() as u32,
170            },
171            signature,
172            doc_comment,
173            parent: None,
174            last_modified_by: None,
175            last_modified_intent: None,
176        })
177    }
178
179    /// Extract the name from a simple assignment at the top level.
180    /// e.g. `MAX_RETRIES = 3` yields "MAX_RETRIES".
181    /// Only handles simple identifier = value assignments (not tuple unpacking, etc.).
182    fn extract_assignment_name(node: &Node, source: &[u8]) -> Option<String> {
183        if node.kind() != "expression_statement" {
184            return None;
185        }
186
187        // The expression_statement should contain an assignment
188        let child = node.child(0)?;
189        if child.kind() != "assignment" {
190            return None;
191        }
192
193        // The left side should be a simple identifier
194        let left = child.child_by_field_name("left")?;
195        if left.kind() != "identifier" {
196            return None;
197        }
198
199        let name = Self::node_text(&left, source).to_string();
200        if name.is_empty() {
201            None
202        } else {
203            Some(name)
204        }
205    }
206
207    /// Find the name of the enclosing function for a given node, if any.
208    fn enclosing_function_name(node: &Node, source: &[u8]) -> String {
209        let mut current = node.parent();
210        while let Some(parent) = current {
211            if parent.kind() == "function_definition" {
212                if let Some(name_node) = parent.child_by_field_name("name") {
213                    let name = Self::node_text(&name_node, source);
214                    if !name.is_empty() {
215                        return name.to_string();
216                    }
217                }
218            }
219            current = parent.parent();
220        }
221        "<module>".to_string()
222    }
223
224    /// Extract the callee name and call kind from a call node's function field.
225    fn extract_callee_info(node: &Node, source: &[u8]) -> (String, CallKind) {
226        match node.kind() {
227            "attribute" => {
228                // e.g. obj.method — the callee is the attribute (method name)
229                if let Some(attr) = node.child_by_field_name("attribute") {
230                    let name = Self::node_text(&attr, source).to_string();
231                    return (name, CallKind::MethodCall);
232                }
233                let text = Self::node_text(node, source).to_string();
234                (text, CallKind::MethodCall)
235            }
236            "identifier" => {
237                let name = Self::node_text(node, source).to_string();
238                (name, CallKind::DirectCall)
239            }
240            _ => {
241                let text = Self::node_text(node, source).to_string();
242                (text, CallKind::DirectCall)
243            }
244        }
245    }
246
247    /// Recursively walk the tree to extract call edges.
248    fn walk_calls(cursor: &mut TreeCursor, source: &[u8], calls: &mut Vec<RawCallEdge>) {
249        let node = cursor.node();
250
251        match node.kind() {
252            "call" => {
253                // Python call node has a "function" field
254                if let Some(func_node) = node.child_by_field_name("function") {
255                    let (callee, kind) = Self::extract_callee_info(&func_node, source);
256                    if !callee.is_empty() {
257                        let caller = Self::enclosing_function_name(&node, source);
258                        calls.push(RawCallEdge {
259                            caller_name: caller,
260                            callee_name: callee,
261                            call_site: Span {
262                                start_byte: node.start_byte() as u32,
263                                end_byte: node.end_byte() as u32,
264                            },
265                            kind,
266                        });
267                    }
268                }
269            }
270            "decorator" => {
271                // A decorator is effectively a call to the decorator function.
272                // The decorator node contains the decorator expression (after @).
273                // It can be a simple identifier like `@login_required`,
274                // a call like `@app.route("/api")`, or an attribute like `@app.middleware`.
275                //
276                // For `@login_required`, the child is an identifier.
277                // For `@app.route("/api")`, the child is a call node (which walk_calls handles).
278                // For `@app.middleware`, the child is an attribute.
279                //
280                // We handle the identifier and attribute cases here; the call case
281                // is handled recursively when we descend into children.
282                let mut inner_cursor = node.walk();
283                for child in node.children(&mut inner_cursor) {
284                    match child.kind() {
285                        "identifier" => {
286                            let name = Self::node_text(&child, source).to_string();
287                            if !name.is_empty() {
288                                let caller = Self::enclosing_function_name(&node, source);
289                                calls.push(RawCallEdge {
290                                    caller_name: caller,
291                                    callee_name: name,
292                                    call_site: Span {
293                                        start_byte: node.start_byte() as u32,
294                                        end_byte: node.end_byte() as u32,
295                                    },
296                                    kind: CallKind::DirectCall,
297                                });
298                            }
299                        }
300                        "attribute" => {
301                            if let Some(attr) = child.child_by_field_name("attribute") {
302                                let name = Self::node_text(&attr, source).to_string();
303                                if !name.is_empty() {
304                                    let caller = Self::enclosing_function_name(&node, source);
305                                    calls.push(RawCallEdge {
306                                        caller_name: caller,
307                                        callee_name: name,
308                                        call_site: Span {
309                                            start_byte: node.start_byte() as u32,
310                                            end_byte: node.end_byte() as u32,
311                                        },
312                                        kind: CallKind::MethodCall,
313                                    });
314                                }
315                            }
316                        }
317                        _ => {}
318                    }
319                }
320            }
321            _ => {}
322        }
323
324        // Recurse into children
325        if cursor.goto_first_child() {
326            loop {
327                Self::walk_calls(cursor, source, calls);
328                if !cursor.goto_next_sibling() {
329                    break;
330                }
331            }
332            cursor.goto_parent();
333        }
334    }
335
336    /// Extract imports from an `import_statement` node.
337    /// e.g. `import os` or `import os, sys`
338    fn extract_import_statement(node: &Node, source: &[u8]) -> Vec<Import> {
339        let mut imports = Vec::new();
340        let mut cursor = node.walk();
341
342        for child in node.children(&mut cursor) {
343            match child.kind() {
344                "dotted_name" => {
345                    let module = Self::node_text(&child, source).to_string();
346                    if !module.is_empty() {
347                        imports.push(Import {
348                            module_path: module.clone(),
349                            imported_name: module,
350                            alias: None,
351                            is_external: true,
352                        });
353                    }
354                }
355                "aliased_import" => {
356                    let name_node = child.child_by_field_name("name");
357                    let alias_node = child.child_by_field_name("alias");
358
359                    if let Some(name_n) = name_node {
360                        let module = Self::node_text(&name_n, source).to_string();
361                        let alias = alias_node
362                            .map(|a| Self::node_text(&a, source).to_string());
363                        imports.push(Import {
364                            module_path: module.clone(),
365                            imported_name: module,
366                            alias,
367                            is_external: true,
368                        });
369                    }
370                }
371                _ => {}
372            }
373        }
374
375        imports
376    }
377
378    /// Extract imports from an `import_from_statement` node.
379    /// e.g. `from os.path import join, exists` or `from .local import helper`
380    fn extract_import_from_statement(node: &Node, source: &[u8]) -> Vec<Import> {
381        let mut imports = Vec::new();
382
383        // Get the module name. In tree-sitter-python the module is in the
384        // "module_name" field. For relative imports it includes the dots.
385        let module_path = Self::extract_from_module_path(node, source);
386        let is_external = !module_path.starts_with('.');
387
388        // Collect imported names
389        let mut cursor = node.walk();
390        for child in node.children(&mut cursor) {
391            match child.kind() {
392                "dotted_name" | "identifier" => {
393                    // Skip the module name itself (already captured)
394                    // The imported names come after the "import" keyword
395                    // In tree-sitter-python, the imported names are in the node's
396                    // named children that are not the module_name field.
397                    // We need to distinguish module from imported names.
398                }
399                "aliased_import" => {
400                    let name_node = child.child_by_field_name("name");
401                    let alias_node = child.child_by_field_name("alias");
402
403                    if let Some(name_n) = name_node {
404                        let imported_name = Self::node_text(&name_n, source).to_string();
405                        let alias = alias_node
406                            .map(|a| Self::node_text(&a, source).to_string());
407                        imports.push(Import {
408                            module_path: module_path.clone(),
409                            imported_name,
410                            alias,
411                            is_external,
412                        });
413                    }
414                }
415                "wildcard_import" => {
416                    imports.push(Import {
417                        module_path: module_path.clone(),
418                        imported_name: "*".to_string(),
419                        alias: None,
420                        is_external,
421                    });
422                }
423                _ => {}
424            }
425        }
426
427        // If we found no imports from the structured children above, parse
428        // the imported names from the node text. The tree-sitter-python grammar
429        // places imported names as direct children of import_from_statement.
430        if imports.is_empty() {
431            Self::extract_from_imported_names(node, source, &module_path, is_external, &mut imports);
432        }
433
434        imports
435    }
436
437    /// Extract the module path from a `from ... import` statement.
438    /// Handles both absolute (`from os.path`) and relative (`from .local`) imports.
439    fn extract_from_module_path(node: &Node, source: &[u8]) -> String {
440        // The module_name field contains the dotted name (may include leading dots for relative).
441        if let Some(module_node) = node.child_by_field_name("module_name") {
442            return Self::node_text(&module_node, source).to_string();
443        }
444
445        // Fallback: reconstruct from the node text between `from` and `import`.
446        let text = Self::node_text(node, source);
447        if let Some(from_idx) = text.find("from") {
448            let after_from = &text[from_idx + 4..];
449            if let Some(import_idx) = after_from.find("import") {
450                let module = after_from[..import_idx].trim();
451                return module.to_string();
452            }
453        }
454
455        String::new()
456    }
457
458    /// Extract imported names from a from-import statement by walking its children.
459    fn extract_from_imported_names(
460        node: &Node,
461        source: &[u8],
462        module_path: &str,
463        is_external: bool,
464        imports: &mut Vec<Import>,
465    ) {
466        // Walk through all children looking for imported names.
467        // In tree-sitter-python, after the module_name and "import" keyword,
468        // the imported identifiers appear as children.
469        let mut found_import_keyword = false;
470        let mut cursor = node.walk();
471
472        for child in node.children(&mut cursor) {
473            let text = Self::node_text(&child, source);
474
475            if text == "import" {
476                found_import_keyword = true;
477                continue;
478            }
479
480            if !found_import_keyword {
481                continue;
482            }
483
484            match child.kind() {
485                "dotted_name" | "identifier" => {
486                    let imported_name = text.to_string();
487                    if !imported_name.is_empty() && imported_name != "," {
488                        imports.push(Import {
489                            module_path: module_path.to_string(),
490                            imported_name,
491                            alias: None,
492                            is_external,
493                        });
494                    }
495                }
496                "aliased_import" => {
497                    let name_node = child.child_by_field_name("name");
498                    let alias_node = child.child_by_field_name("alias");
499
500                    if let Some(name_n) = name_node {
501                        let imported_name = Self::node_text(&name_n, source).to_string();
502                        let alias = alias_node
503                            .map(|a| Self::node_text(&a, source).to_string());
504                        imports.push(Import {
505                            module_path: module_path.to_string(),
506                            imported_name,
507                            alias,
508                            is_external,
509                        });
510                    }
511                }
512                "wildcard_import" => {
513                    imports.push(Import {
514                        module_path: module_path.to_string(),
515                        imported_name: "*".to_string(),
516                        alias: None,
517                        is_external,
518                    });
519                }
520                _ => {}
521            }
522        }
523    }
524}
525
526impl Default for PythonParser {
527    fn default() -> Self {
528        Self::new()
529    }
530}
531
532impl LanguageParser for PythonParser {
533    fn extensions(&self) -> &[&str] {
534        &["py"]
535    }
536
537    fn extract_symbols(&self, source: &[u8], file_path: &Path) -> Result<Vec<Symbol>> {
538        if source.is_empty() {
539            return Ok(vec![]);
540        }
541
542        let tree = Self::parse_tree(source)?;
543        let root = tree.root_node();
544        let mut symbols = Vec::new();
545        let mut cursor = root.walk();
546
547        for node in root.children(&mut cursor) {
548            match node.kind() {
549                "function_definition" | "class_definition" => {
550                    if let Some(sym) = Self::extract_symbol_from_def(&node, source, file_path) {
551                        symbols.push(sym);
552                    }
553                }
554                "decorated_definition" => {
555                    // Unwrap the decorated_definition to find the inner function or class
556                    if let Some(definition) = node.child_by_field_name("definition") {
557                        match definition.kind() {
558                            "function_definition" | "class_definition" => {
559                                if let Some(mut sym) =
560                                    Self::extract_symbol_from_def(&definition, source, file_path)
561                                {
562                                    // Use the span of the whole decorated definition
563                                    sym.span = Span {
564                                        start_byte: node.start_byte() as u32,
565                                        end_byte: node.end_byte() as u32,
566                                    };
567                                    // Include the decorator in the signature
568                                    sym.signature = Self::node_signature(&node, source);
569                                    symbols.push(sym);
570                                }
571                            }
572                            _ => {}
573                        }
574                    }
575                }
576                "expression_statement" => {
577                    // Module-level assignment
578                    if let Some(name) = Self::extract_assignment_name(&node, source) {
579                        let visibility = Self::name_visibility(&name);
580                        symbols.push(Symbol {
581                            id: Uuid::new_v4(),
582                            name: name.clone(),
583                            qualified_name: name,
584                            kind: SymbolKind::Variable,
585                            visibility,
586                            file_path: file_path.to_path_buf(),
587                            span: Span {
588                                start_byte: node.start_byte() as u32,
589                                end_byte: node.end_byte() as u32,
590                            },
591                            signature: Self::node_signature(&node, source),
592                            doc_comment: Self::doc_comments(&node, source),
593                            parent: None,
594                            last_modified_by: None,
595                            last_modified_intent: None,
596                        });
597                    }
598                }
599                _ => {}
600            }
601        }
602
603        Ok(symbols)
604    }
605
606    fn extract_calls(&self, source: &[u8], _file_path: &Path) -> Result<Vec<RawCallEdge>> {
607        if source.is_empty() {
608            return Ok(vec![]);
609        }
610
611        let tree = Self::parse_tree(source)?;
612        let root = tree.root_node();
613        let mut calls = Vec::new();
614        let mut cursor = root.walk();
615
616        Self::walk_calls(&mut cursor, source, &mut calls);
617
618        Ok(calls)
619    }
620
621    fn extract_types(&self, _source: &[u8], _file_path: &Path) -> Result<Vec<TypeInfo>> {
622        // Stub: will be enhanced later
623        Ok(vec![])
624    }
625
626    fn extract_imports(&self, source: &[u8], _file_path: &Path) -> Result<Vec<Import>> {
627        if source.is_empty() {
628            return Ok(vec![]);
629        }
630
631        let tree = Self::parse_tree(source)?;
632        let root = tree.root_node();
633        let mut imports = Vec::new();
634        let mut cursor = root.walk();
635
636        for node in root.children(&mut cursor) {
637            match node.kind() {
638                "import_statement" => {
639                    imports.extend(Self::extract_import_statement(&node, source));
640                }
641                "import_from_statement" => {
642                    imports.extend(Self::extract_import_from_statement(&node, source));
643                }
644                _ => {}
645            }
646        }
647
648        Ok(imports)
649    }
650}