use anyhow::{Context, Result};
use streaming_iterator::StreamingIterator;
use tree_sitter::{Parser, Query, QueryCursor};
use crate::models::{Language, SearchResult, Span, SymbolKind};
pub fn parse(path: &str, source: &str) -> Result<Vec<SearchResult>> {
let mut parser = Parser::new();
let language = tree_sitter_python::LANGUAGE;
parser
.set_language(&language.into())
.context("Failed to set Python language")?;
let tree = parser
.parse(source, None)
.context("Failed to parse Python source")?;
let root_node = tree.root_node();
let mut symbols = Vec::new();
symbols.extend(extract_functions(source, &root_node, &language.into())?);
symbols.extend(extract_classes(source, &root_node, &language.into())?);
symbols.extend(extract_methods(source, &root_node, &language.into())?);
symbols.extend(extract_constants(source, &root_node, &language.into())?);
symbols.extend(extract_global_variables(source, &root_node, &language.into())?);
symbols.extend(extract_local_variables(source, &root_node, &language.into())?);
symbols.extend(extract_lambdas(source, &root_node, &language.into())?);
for symbol in &mut symbols {
symbol.path = path.to_string();
symbol.lang = Language::Python;
}
Ok(symbols)
}
fn extract_functions(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(function_definition
name: (identifier) @name) @function
"#;
let query = Query::new(language, query_str)
.context("Failed to create function query")?;
extract_symbols(source, root, &query, SymbolKind::Function, None)
}
fn extract_classes(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(class_definition
name: (identifier) @name) @class
"#;
let query = Query::new(language, query_str)
.context("Failed to create class query")?;
extract_symbols(source, root, &query, SymbolKind::Class, None)
}
fn extract_methods(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(class_definition
name: (identifier) @class_name
body: (block
(function_definition
name: (identifier) @method_name))) @class
(class_definition
name: (identifier) @class_name
body: (block
(decorated_definition
(function_definition
name: (identifier) @method_name)))) @class
"#;
let query = Query::new(language, query_str)
.context("Failed to create method query")?;
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(&query, *root, source.as_bytes());
let mut symbols = Vec::new();
while let Some(match_) = matches.next() {
let mut class_name = None;
let mut method_name = None;
let mut method_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
match capture_name {
"class_name" => {
class_name = Some(capture.node.utf8_text(source.as_bytes()).unwrap_or("").to_string());
}
"method_name" => {
method_name = Some(capture.node.utf8_text(source.as_bytes()).unwrap_or("").to_string());
let mut current = capture.node;
while let Some(parent) = current.parent() {
if parent.kind() == "function_definition" {
method_node = Some(parent);
break;
}
current = parent;
}
}
_ => {}
}
}
if let (Some(class_name), Some(method_name), Some(node)) = (class_name, method_name, method_node) {
let scope = format!("class {}", class_name);
let span = node_to_span(&node);
let preview = extract_preview(source, &span);
symbols.push(SearchResult::new(
String::new(),
Language::Python,
SymbolKind::Method,
Some(method_name),
span,
Some(scope),
preview,
));
}
}
Ok(symbols)
}
fn extract_constants(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(module
(expression_statement
(assignment
left: (identifier) @name))) @const
"#;
let query = Query::new(language, query_str)
.context("Failed to create constant query")?;
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(&query, *root, source.as_bytes());
let mut symbols = Vec::new();
while let Some(match_) = matches.next() {
let mut name = None;
let mut const_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
if capture_name == "name" {
let name_text = capture.node.utf8_text(source.as_bytes()).unwrap_or("");
if name_text.chars().all(|c| c.is_uppercase() || c == '_' || c.is_numeric()) {
name = Some(name_text.to_string());
let mut current = capture.node;
while let Some(parent) = current.parent() {
if parent.kind() == "assignment" {
const_node = Some(parent);
break;
}
current = parent;
}
}
}
}
if let (Some(name), Some(node)) = (name, const_node) {
let span = node_to_span(&node);
let preview = extract_preview(source, &span);
symbols.push(SearchResult::new(
String::new(),
Language::Python,
SymbolKind::Constant,
Some(name),
span,
None,
preview,
));
}
}
Ok(symbols)
}
fn extract_global_variables(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(module
(expression_statement
(assignment
left: (identifier) @name))) @var
"#;
let query = Query::new(language, query_str)
.context("Failed to create global variable query")?;
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(&query, *root, source.as_bytes());
let mut symbols = Vec::new();
while let Some(match_) = matches.next() {
let mut name = None;
let mut var_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
if capture_name == "name" {
let name_text = capture.node.utf8_text(source.as_bytes()).unwrap_or("");
if !name_text.chars().all(|c| c.is_uppercase() || c == '_' || c.is_numeric()) {
name = Some(name_text.to_string());
let mut current = capture.node;
while let Some(parent) = current.parent() {
if parent.kind() == "assignment" {
var_node = Some(parent);
break;
}
current = parent;
}
}
}
}
if let (Some(name), Some(node)) = (name, var_node) {
let span = node_to_span(&node);
let preview = extract_preview(source, &span);
symbols.push(SearchResult::new(
String::new(),
Language::Python,
SymbolKind::Variable,
Some(name),
span,
None,
preview,
));
}
}
Ok(symbols)
}
fn extract_local_variables(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(assignment
left: (identifier) @name) @assignment
"#;
let query = Query::new(language, query_str)
.context("Failed to create local variable query")?;
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(&query, *root, source.as_bytes());
let mut symbols = Vec::new();
while let Some(match_) = matches.next() {
let mut name = None;
let mut assignment_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
match capture_name {
"name" => {
let name_text = capture.node.utf8_text(source.as_bytes()).unwrap_or("");
if !name_text.chars().all(|c| c.is_uppercase() || c == '_' || c.is_numeric()) {
name = Some(name_text.to_string());
}
}
"assignment" => {
assignment_node = Some(capture.node);
}
_ => {}
}
}
if let (Some(name), Some(node)) = (name, assignment_node) {
let mut is_in_function = false;
let mut current = node;
while let Some(parent) = current.parent() {
if parent.kind() == "function_definition" {
is_in_function = true;
break;
}
if parent.kind() == "module" {
break;
}
current = parent;
}
if is_in_function {
let span = node_to_span(&node);
let preview = extract_preview(source, &span);
symbols.push(SearchResult::new(
String::new(),
Language::Python,
SymbolKind::Variable,
Some(name),
span,
None, preview,
));
}
}
}
Ok(symbols)
}
fn extract_lambdas(
source: &str,
root: &tree_sitter::Node,
language: &tree_sitter::Language,
) -> Result<Vec<SearchResult>> {
let query_str = r#"
(assignment
left: (identifier) @name
right: (lambda)) @lambda
"#;
let query = Query::new(language, query_str)
.context("Failed to create lambda query")?;
extract_symbols(source, root, &query, SymbolKind::Function, None)
}
fn extract_symbols(
source: &str,
root: &tree_sitter::Node,
query: &Query,
kind: SymbolKind,
scope: Option<String>,
) -> Result<Vec<SearchResult>> {
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(query, *root, source.as_bytes());
let mut symbols = Vec::new();
while let Some(match_) = matches.next() {
let mut name = None;
let mut full_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
if capture_name == "name" {
name = Some(capture.node.utf8_text(source.as_bytes()).unwrap_or("").to_string());
} else {
full_node = Some(capture.node);
}
}
if let (Some(name), Some(node)) = (name, full_node) {
let span = node_to_span(&node);
let preview = extract_preview(source, &span);
symbols.push(SearchResult::new(
String::new(),
Language::Python,
kind.clone(),
Some(name),
span,
scope.clone(),
preview,
));
}
}
Ok(symbols)
}
fn node_to_span(node: &tree_sitter::Node) -> Span {
let start = node.start_position();
let end = node.end_position();
Span::new(
start.row + 1, start.column,
end.row + 1,
end.column,
)
}
fn extract_preview(source: &str, span: &Span) -> String {
let lines: Vec<&str> = source.lines().collect();
let start_idx = (span.start_line - 1) as usize; let end_idx = (start_idx + 7).min(lines.len());
lines[start_idx..end_idx].join("\n")
}
use crate::models::ImportType;
use crate::parsers::{DependencyExtractor, ImportInfo};
pub struct PythonDependencyExtractor;
impl DependencyExtractor for PythonDependencyExtractor {
fn extract_dependencies(source: &str) -> Result<Vec<ImportInfo>> {
let mut parser = Parser::new();
let language = tree_sitter_python::LANGUAGE;
parser
.set_language(&language.into())
.context("Failed to set Python language")?;
let tree = parser
.parse(source, None)
.context("Failed to parse Python source")?;
let root_node = tree.root_node();
let mut imports = Vec::new();
imports.extend(extract_import_statements(source, &root_node)?);
imports.extend(extract_from_imports(source, &root_node)?);
Ok(imports)
}
}
fn extract_import_statements(
source: &str,
root: &tree_sitter::Node,
) -> Result<Vec<ImportInfo>> {
let language = tree_sitter_python::LANGUAGE;
let query_str = r#"
(import_statement
name: (dotted_name) @import_path) @import
"#;
let query = Query::new(&language.into(), query_str)
.context("Failed to create import statement query")?;
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(&query, *root, source.as_bytes());
let mut imports = Vec::new();
while let Some(match_) = matches.next() {
let mut import_path = None;
let mut import_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
match capture_name {
"import_path" => {
import_path = Some(capture.node.utf8_text(source.as_bytes()).unwrap_or("").to_string());
}
"import" => {
import_node = Some(capture.node);
}
_ => {}
}
}
if let (Some(path), Some(node)) = (import_path, import_node) {
let import_type = classify_python_import(&path);
let line_number = node.start_position().row + 1;
imports.push(ImportInfo {
imported_path: path,
import_type,
line_number,
imported_symbols: None,
});
}
}
Ok(imports)
}
fn extract_from_imports(
source: &str,
root: &tree_sitter::Node,
) -> Result<Vec<ImportInfo>> {
let language = tree_sitter_python::LANGUAGE;
let query_str = r#"
(import_from_statement
module_name: (dotted_name) @module_path) @import
(import_from_statement
module_name: (relative_import) @module_path) @import
"#;
let query = Query::new(&language.into(), query_str)
.context("Failed to create from-import query")?;
let mut cursor = QueryCursor::new();
let mut matches = cursor.matches(&query, *root, source.as_bytes());
let mut imports = Vec::new();
while let Some(match_) = matches.next() {
let mut module_path = None;
let mut import_node = None;
for capture in match_.captures {
let capture_name: &str = &query.capture_names()[capture.index as usize];
match capture_name {
"module_path" => {
module_path = Some(capture.node.utf8_text(source.as_bytes()).unwrap_or("").to_string());
}
"import" => {
import_node = Some(capture.node);
}
_ => {}
}
}
if let (Some(path), Some(node)) = (module_path, import_node) {
let import_type = classify_python_import(&path);
let line_number = node.start_position().row + 1;
let imported_symbols = extract_imported_symbols(source, &node);
imports.push(ImportInfo {
imported_path: path,
import_type,
line_number,
imported_symbols,
});
}
}
Ok(imports)
}
fn extract_imported_symbols(source: &str, import_node: &tree_sitter::Node) -> Option<Vec<String>> {
let mut symbols = Vec::new();
let mut cursor = import_node.walk();
for child in import_node.children(&mut cursor) {
match child.kind() {
"aliased_import" | "dotted_name" => {
let mut child_cursor = child.walk();
for grandchild in child.children(&mut child_cursor) {
if grandchild.kind() == "identifier" || grandchild.kind() == "dotted_name" {
if let Ok(text) = grandchild.utf8_text(source.as_bytes()) {
symbols.push(text.to_string());
break; }
}
}
}
_ => {}
}
}
if symbols.is_empty() {
None
} else {
Some(symbols)
}
}
pub fn find_python_package_name(root: &std::path::Path) -> Option<String> {
if let Some(name) = find_pyproject_package(root) {
return Some(name);
}
if let Some(name) = find_setup_py_package(root) {
return Some(name);
}
if let Some(name) = find_setup_cfg_package(root) {
return Some(name);
}
None
}
fn find_pyproject_package(root: &std::path::Path) -> Option<String> {
let pyproject_path = root.join("pyproject.toml");
let content = std::fs::read_to_string(pyproject_path).ok()?;
let mut in_project_section = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed == "[project]" {
in_project_section = true;
continue;
}
if trimmed.starts_with('[') && trimmed != "[project]" {
in_project_section = false;
continue;
}
if in_project_section && trimmed.starts_with("name") && trimmed.contains('=') {
if let Some(equals_pos) = trimmed.find('=') {
let after_equals = trimmed[equals_pos + 1..].trim();
for quote in ['"', '\''] {
if let Some(start) = after_equals.find(quote) {
if let Some(end) = after_equals[start + 1..].find(quote) {
let name = &after_equals[start + 1..start + 1 + end];
return Some(name.to_lowercase());
}
}
}
}
}
}
None
}
fn find_setup_py_package(root: &std::path::Path) -> Option<String> {
let setup_path = root.join("setup.py");
let content = std::fs::read_to_string(setup_path).ok()?;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.contains("name") && trimmed.contains('=') {
if let Some(name_pos) = trimmed.find("name") {
let after_name = &trimmed[name_pos + 4..];
if let Some(equals_pos) = after_name.find('=') {
let after_equals = after_name[equals_pos + 1..].trim();
for quote in ['"', '\''] {
if let Some(start) = after_equals.find(quote) {
if let Some(end) = after_equals[start + 1..].find(quote) {
let name = &after_equals[start + 1..start + 1 + end];
return Some(name.to_lowercase());
}
}
}
}
}
}
}
None
}
fn find_setup_cfg_package(root: &std::path::Path) -> Option<String> {
let setup_cfg_path = root.join("setup.cfg");
let content = std::fs::read_to_string(setup_cfg_path).ok()?;
let mut in_metadata_section = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed == "[metadata]" {
in_metadata_section = true;
continue;
}
if trimmed.starts_with('[') && trimmed != "[metadata]" {
in_metadata_section = false;
continue;
}
if in_metadata_section && trimmed.starts_with("name") && trimmed.contains('=') {
if let Some(equals_pos) = trimmed.find('=') {
let name = trimmed[equals_pos + 1..].trim();
return Some(name.to_lowercase());
}
}
}
None
}
pub fn reclassify_python_import(
import_path: &str,
package_prefix: Option<&str>,
) -> ImportType {
if let Some(prefix) = package_prefix {
let first_component = import_path.split('.').next().unwrap_or(import_path);
if first_component == prefix {
return ImportType::Internal;
}
}
if import_path.starts_with('.') {
return ImportType::Internal;
}
if is_python_stdlib(import_path) {
return ImportType::Stdlib;
}
ImportType::External
}
fn is_python_stdlib(path: &str) -> bool {
const STDLIB_MODULES: &[&str] = &[
"os", "sys", "io", "re", "json", "csv", "xml", "html", "http", "urllib",
"collections", "itertools", "functools", "operator", "pathlib", "glob",
"tempfile", "shutil", "pickle", "shelve", "sqlite3", "zlib", "gzip",
"time", "datetime", "calendar", "logging", "argparse", "configparser",
"typing", "dataclasses", "enum", "abc", "contextlib", "weakref",
"threading", "multiprocessing", "subprocess", "queue", "asyncio",
"socket", "email", "base64", "hashlib", "hmac", "secrets", "uuid",
"math", "random", "statistics", "decimal", "fractions",
"unittest", "doctest", "pdb", "trace", "timeit",
];
let first_component = path.split('.').next().unwrap_or("");
STDLIB_MODULES.contains(&first_component)
}
fn classify_python_import(import_path: &str) -> ImportType {
if import_path.starts_with('.') {
return ImportType::Internal;
}
const STDLIB_MODULES: &[&str] = &[
"os", "sys", "io", "re", "json", "csv", "xml", "html", "http", "urllib",
"collections", "itertools", "functools", "operator", "pathlib", "glob",
"tempfile", "shutil", "pickle", "shelve", "sqlite3", "zlib", "gzip",
"time", "datetime", "calendar", "logging", "argparse", "configparser",
"typing", "dataclasses", "enum", "abc", "contextlib", "weakref",
"threading", "multiprocessing", "subprocess", "queue", "asyncio",
"socket", "email", "base64", "hashlib", "hmac", "secrets", "uuid",
"math", "random", "statistics", "decimal", "fractions",
"unittest", "doctest", "pdb", "trace", "timeit",
];
let first_component = import_path.split('.').next().unwrap_or("");
if STDLIB_MODULES.contains(&first_component) {
ImportType::Stdlib
} else {
ImportType::External
}
}
#[derive(Debug, Clone)]
pub struct PythonPackage {
pub name: String,
pub project_root: String,
pub abs_project_root: std::path::PathBuf,
}
pub fn find_all_python_configs(index_root: &std::path::Path) -> Result<Vec<std::path::PathBuf>> {
use ignore::WalkBuilder;
let mut config_files = Vec::new();
let walker = WalkBuilder::new(index_root)
.follow_links(false)
.git_ignore(true)
.build();
for entry in walker {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
let filename = path.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if filename == "pyproject.toml" || filename == "setup.py" || filename == "setup.cfg" {
let path_str = path.to_string_lossy();
if path_str.contains("/venv/")
|| path_str.contains("/.venv/")
|| path_str.contains("/site-packages/")
|| path_str.contains("/dist/")
|| path_str.contains("/build/")
|| path_str.contains("/__pycache__/") {
log::trace!("Skipping Python config in vendor/build directory: {:?}", path);
continue;
}
config_files.push(path.to_path_buf());
}
}
log::debug!("Found {} Python config files", config_files.len());
Ok(config_files)
}
pub fn parse_all_python_packages(index_root: &std::path::Path) -> Result<Vec<PythonPackage>> {
let config_files = find_all_python_configs(index_root)?;
if config_files.is_empty() {
log::debug!("No Python config files found in {:?}", index_root);
return Ok(Vec::new());
}
let mut packages = Vec::new();
let config_count = config_files.len();
for config_path in &config_files {
let project_root = config_path
.parent()
.ok_or_else(|| anyhow::anyhow!("Config file has no parent directory"))?;
if let Some(package_name) = find_python_package_name(project_root) {
let relative_project_root = project_root
.strip_prefix(index_root)
.unwrap_or(project_root)
.to_string_lossy()
.to_string();
log::debug!(
"Found Python package '{}' at {:?}",
package_name,
relative_project_root
);
packages.push(PythonPackage {
name: package_name,
project_root: relative_project_root,
abs_project_root: project_root.to_path_buf(),
});
}
}
log::info!(
"Loaded {} Python packages from {} config files",
packages.len(),
config_count
);
Ok(packages)
}
pub fn resolve_python_import_to_path(
import_path: &str,
packages: &[PythonPackage],
current_file_path: Option<&str>,
) -> Option<String> {
if import_path.starts_with('.') {
return resolve_relative_python_import(import_path, current_file_path);
}
let first_component = import_path.split('.').next()?;
for package in packages {
if package.name == first_component {
let module_path = import_path.replace('.', "/");
let candidates = vec![
format!("{}/{}.py", package.project_root, module_path),
format!("{}/{}/__init__.py", package.project_root, module_path),
];
for candidate in candidates {
log::trace!("Checking Python module path: {}", candidate);
return Some(candidate);
}
}
}
None
}
fn resolve_relative_python_import(
import_path: &str,
current_file_path: Option<&str>,
) -> Option<String> {
let current_file = current_file_path?;
let dots = import_path.chars().take_while(|&c| c == '.').count();
if dots == 0 {
return None;
}
let current_dir = std::path::Path::new(current_file).parent()?;
let mut target_dir = current_dir.to_path_buf();
for _ in 1..dots {
target_dir = target_dir.parent()?.to_path_buf();
}
let module_path = import_path.trim_start_matches('.');
if module_path.is_empty() {
return Some(format!("{}/__init__.py", target_dir.to_string_lossy()));
}
let file_path = module_path.replace('.', "/");
let candidates = vec![
format!("{}/{}.py", target_dir.to_string_lossy(), file_path),
format!("{}/{}/__init__.py", target_dir.to_string_lossy(), file_path),
];
for candidate in candidates {
log::trace!("Checking relative Python import: {}", candidate);
return Some(candidate);
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_function() {
let source = r#"
def hello_world():
print("Hello, world!")
return True
"#;
let symbols = parse("test.py", source).unwrap();
assert_eq!(symbols.len(), 1);
assert_eq!(symbols[0].symbol.as_deref(), Some("hello_world"));
assert!(matches!(symbols[0].kind, SymbolKind::Function));
}
#[test]
fn test_parse_async_function() {
let source = r#"
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
"#;
let symbols = parse("test.py", source).unwrap();
assert_eq!(symbols.len(), 1);
assert_eq!(symbols[0].symbol.as_deref(), Some("fetch_data"));
assert!(matches!(symbols[0].kind, SymbolKind::Function));
}
#[test]
fn test_parse_class() {
let source = r#"
class User:
def __init__(self, name, age):
self.name = name
self.age = age
"#;
let symbols = parse("test.py", source).unwrap();
let class_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Class))
.collect();
assert_eq!(class_symbols.len(), 1);
assert_eq!(class_symbols[0].symbol.as_deref(), Some("User"));
}
#[test]
fn test_parse_class_with_methods() {
let source = r#"
class Calculator:
def add(self, a, b):
return a + b
def subtract(self, a, b):
return a - b
@staticmethod
def multiply(a, b):
return a * b
"#;
let symbols = parse("test.py", source).unwrap();
let method_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Method))
.collect();
assert_eq!(method_symbols.len(), 3);
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("add")));
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("subtract")));
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("multiply")));
for method in method_symbols {
}
}
#[test]
fn test_parse_async_method() {
let source = r#"
class DataFetcher:
async def get_user(self, user_id):
return await fetch(f"/users/{user_id}")
async def get_all_users(self):
return await fetch("/users")
"#;
let symbols = parse("test.py", source).unwrap();
let method_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Method))
.collect();
assert_eq!(method_symbols.len(), 2);
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("get_user")));
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("get_all_users")));
}
#[test]
fn test_parse_constants() {
let source = r#"
MAX_SIZE = 100
DEFAULT_TIMEOUT = 30
API_URL = "https://api.example.com"
"#;
let symbols = parse("test.py", source).unwrap();
let const_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Constant))
.collect();
assert_eq!(const_symbols.len(), 3);
assert!(const_symbols.iter().any(|s| s.symbol.as_deref() == Some("MAX_SIZE")));
assert!(const_symbols.iter().any(|s| s.symbol.as_deref() == Some("DEFAULT_TIMEOUT")));
assert!(const_symbols.iter().any(|s| s.symbol.as_deref() == Some("API_URL")));
}
#[test]
fn test_parse_lambda() {
let source = r#"
square = lambda x: x * x
add = lambda a, b: a + b
"#;
let symbols = parse("test.py", source).unwrap();
let lambda_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Function))
.collect();
assert!(lambda_symbols.len() >= 2);
assert!(lambda_symbols.iter().any(|s| s.symbol.as_deref() == Some("square")));
assert!(lambda_symbols.iter().any(|s| s.symbol.as_deref() == Some("add")));
}
#[test]
fn test_parse_decorated_method() {
let source = r#"
class WebService:
@property
def url(self):
return self._url
@classmethod
def from_config(cls, config):
return cls(config['url'])
@staticmethod
def validate_url(url):
return url.startswith('http')
"#;
let symbols = parse("test.py", source).unwrap();
let method_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Method))
.collect();
assert_eq!(method_symbols.len(), 3);
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("url")));
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("from_config")));
assert!(method_symbols.iter().any(|s| s.symbol.as_deref() == Some("validate_url")));
}
#[test]
fn test_parse_mixed_symbols() {
let source = r#"
API_KEY = "secret123"
MAX_RETRIES = 3
class APIClient:
def __init__(self, api_key):
self.api_key = api_key
async def request(self, endpoint):
return await self._fetch(endpoint)
@staticmethod
def build_url(endpoint):
return f"https://api.example.com/{endpoint}"
def create_client():
return APIClient(API_KEY)
process = lambda data: data.strip().lower()
"#;
let symbols = parse("test.py", source).unwrap();
assert!(symbols.len() >= 8);
let kinds: Vec<&SymbolKind> = symbols.iter().map(|s| &s.kind).collect();
assert!(kinds.contains(&&SymbolKind::Constant));
assert!(kinds.contains(&&SymbolKind::Class));
assert!(kinds.contains(&&SymbolKind::Method));
assert!(kinds.contains(&&SymbolKind::Function));
}
#[test]
fn test_parse_nested_class() {
let source = r#"
class Outer:
class Inner:
def inner_method(self):
pass
def outer_method(self):
pass
"#;
let symbols = parse("test.py", source).unwrap();
let class_symbols: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Class))
.collect();
assert_eq!(class_symbols.len(), 2);
assert!(class_symbols.iter().any(|s| s.symbol.as_deref() == Some("Outer")));
assert!(class_symbols.iter().any(|s| s.symbol.as_deref() == Some("Inner")));
}
#[test]
fn test_local_variables_included() {
let source = r#"
def calculate(input):
local_var = input * 2
result = local_var + 10
return result
class Calculator:
def compute(self, value):
temp = value * 3
final = temp + 5
return final
"#;
let symbols = parse("test.py", source).unwrap();
let variables: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Variable))
.collect();
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("local_var")));
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("result")));
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("temp")));
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("final")));
for var in variables {
}
}
#[test]
fn test_global_variables() {
let source = r#"
# Global constants (uppercase)
MAX_SIZE = 100
DEFAULT_TIMEOUT = 30
# Global variables (non-uppercase)
database_url = "postgresql://localhost/mydb"
config = {"debug": True}
current_user = None
def get_config():
return config
"#;
let symbols = parse("test.py", source).unwrap();
let constants: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Constant))
.collect();
let variables: Vec<_> = symbols.iter()
.filter(|s| matches!(s.kind, SymbolKind::Variable))
.collect();
assert!(constants.iter().any(|c| c.symbol.as_deref() == Some("MAX_SIZE")));
assert!(constants.iter().any(|c| c.symbol.as_deref() == Some("DEFAULT_TIMEOUT")));
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("database_url")));
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("config")));
assert!(variables.iter().any(|v| v.symbol.as_deref() == Some("current_user")));
for constant in constants {
}
for var in variables {
}
}
#[test]
fn test_find_all_python_configs() {
use tempfile::TempDir;
use std::fs;
let temp = TempDir::new().unwrap();
let root = temp.path();
let project1 = root.join("backend");
fs::create_dir_all(&project1).unwrap();
fs::write(project1.join("pyproject.toml"), "[project]\nname = \"backend\"").unwrap();
let project2 = root.join("frontend/api");
fs::create_dir_all(&project2).unwrap();
fs::write(project2.join("setup.py"), "setup(name='api')").unwrap();
let venv = root.join("venv");
fs::create_dir_all(&venv).unwrap();
fs::write(venv.join("setup.py"), "setup(name='should_skip')").unwrap();
let configs = find_all_python_configs(root).unwrap();
assert_eq!(configs.len(), 2);
assert!(configs.iter().any(|p| p.ends_with("backend/pyproject.toml")));
assert!(configs.iter().any(|p| p.ends_with("frontend/api/setup.py")));
}
#[test]
fn test_parse_all_python_packages() {
use tempfile::TempDir;
use std::fs;
let temp = TempDir::new().unwrap();
let root = temp.path();
let project1 = root.join("services/auth");
fs::create_dir_all(&project1).unwrap();
fs::write(
project1.join("pyproject.toml"),
"[project]\nname = \"auth-service\"\n"
).unwrap();
let project2 = root.join("services/api");
fs::create_dir_all(&project2).unwrap();
fs::write(
project2.join("setup.py"),
"setup(name=\"api-service\")"
).unwrap();
let packages = parse_all_python_packages(root).unwrap();
assert_eq!(packages.len(), 2);
let names: Vec<_> = packages.iter().map(|p| p.name.as_str()).collect();
assert!(names.contains(&"auth-service"));
assert!(names.contains(&"api-service"));
for package in &packages {
assert!(package.project_root.starts_with("services/"));
assert!(package.abs_project_root.ends_with(&package.project_root));
}
}
#[test]
fn test_resolve_python_import_absolute() {
use tempfile::TempDir;
use std::fs;
let temp = TempDir::new().unwrap();
let root = temp.path();
let myapp = root.join("myapp");
fs::create_dir_all(myapp.join("models")).unwrap();
fs::write(
myapp.join("pyproject.toml"),
"[project]\nname = \"myapp\"\n"
).unwrap();
let packages = parse_all_python_packages(root).unwrap();
assert_eq!(packages.len(), 1);
let resolved = resolve_python_import_to_path(
"myapp.models.user",
&packages,
None
);
assert!(resolved.is_some());
let path = resolved.unwrap();
assert!(path.contains("myapp/models/user.py") || path.contains("myapp/models/user/__init__.py"));
}
#[test]
fn test_resolve_python_import_relative() {
let current_file = "myapp/views/admin.py";
let resolved = resolve_python_import_to_path(
".models",
&[], Some(current_file),
);
assert!(resolved.is_some());
let path = resolved.unwrap();
assert!(path.contains("myapp/views/models"));
let resolved = resolve_python_import_to_path(
"..utils",
&[],
Some(current_file),
);
assert!(resolved.is_some());
let path = resolved.unwrap();
assert!(path.contains("myapp/utils"));
}
#[test]
fn test_resolve_python_import_relative_with_module() {
let current_file = "myapp/views/dashboard/index.py";
let resolved = resolve_python_import_to_path(
"..models.user",
&[],
Some(current_file),
);
assert!(resolved.is_some());
let path = resolved.unwrap();
assert!(path.contains("models/user"));
}
#[test]
fn test_resolve_python_import_not_found() {
use tempfile::TempDir;
use std::fs;
let temp = TempDir::new().unwrap();
let root = temp.path();
let myapp = root.join("myapp");
fs::create_dir_all(&myapp).unwrap();
fs::write(
myapp.join("pyproject.toml"),
"[project]\nname = \"myapp\"\n"
).unwrap();
let packages = parse_all_python_packages(root).unwrap();
let resolved = resolve_python_import_to_path(
"other_package.module",
&packages,
None
);
assert!(resolved.is_none());
}
#[test]
fn test_dynamic_imports_filtered() {
let source = r#"
import os
import sys
from json import loads
from .models import User
# Dynamic imports - should be filtered out
import importlib
mod = importlib.import_module("some_module")
pkg = __import__("package")
exec("import dynamic")
"#;
let deps = PythonDependencyExtractor::extract_dependencies(source).unwrap();
assert_eq!(deps.len(), 5, "Should extract 5 static imports only");
assert!(deps.iter().any(|d| d.imported_path == "os"));
assert!(deps.iter().any(|d| d.imported_path == "sys"));
assert!(deps.iter().any(|d| d.imported_path == "json"));
assert!(deps.iter().any(|d| d.imported_path == ".models"));
assert!(deps.iter().any(|d| d.imported_path == "importlib"));
assert!(!deps.iter().any(|d| d.imported_path.contains("some_module")));
assert!(!deps.iter().any(|d| d.imported_path.contains("package") && d.imported_path != "json"));
assert!(!deps.iter().any(|d| d.imported_path.contains("dynamic")));
}
}