use std::path::{Path, PathBuf};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ExtractionError {
#[error("failed to parse source: {0}")]
ParseError(String),
#[error("invalid tree-sitter query: {0}")]
QueryError(String),
#[error("cannot resolve module path '{module}' from '{source_file}': {reason}")]
ResolutionError {
module: String,
source_file: PathBuf,
reason: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ImportInfo {
pub module_path: String,
pub symbols: Vec<String>,
pub is_wildcard: bool,
pub relative_level: usize,
pub aliases: Vec<(String, String)>,
}
pub struct PythonDependencyExtractor {
_private: (),
}
impl PythonDependencyExtractor {
pub fn new() -> Self {
Self { _private: () }
}
pub fn extract_imports(
&self,
source: &str,
_file_path: &Path,
) -> Result<Vec<ImportInfo>, ExtractionError> {
if source.is_empty() {
return Ok(Vec::new());
}
let language = thread_language::parsers::language_python();
let mut parser = tree_sitter::Parser::new();
parser
.set_language(&language)
.map_err(|e| ExtractionError::ParseError(e.to_string()))?;
let tree = parser
.parse(source, None)
.ok_or_else(|| ExtractionError::ParseError("tree-sitter returned None".into()))?;
let root = tree.root_node();
let mut imports = Vec::new();
let src = source.as_bytes();
Self::walk_imports(root, src, &mut imports);
Ok(imports)
}
fn walk_imports(node: tree_sitter::Node<'_>, source: &[u8], imports: &mut Vec<ImportInfo>) {
match node.kind() {
"import_statement" => {
Self::extract_import_statement(node, source, imports);
return;
}
"import_from_statement" => {
Self::extract_import_from_statement(node, source, imports);
return;
}
_ => {}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
Self::walk_imports(child, source, imports);
}
}
fn extract_import_statement(
node: tree_sitter::Node<'_>,
source: &[u8],
imports: &mut Vec<ImportInfo>,
) {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
match child.kind() {
"dotted_name" => {
if let Ok(name) = child.utf8_text(source) {
imports.push(ImportInfo {
module_path: name.to_string(),
symbols: Vec::new(),
is_wildcard: false,
relative_level: 0,
aliases: Vec::new(),
});
}
}
"aliased_import" => {
if let Some(info) = Self::parse_bare_aliased_import(child, source) {
imports.push(info);
}
}
_ => {}
}
}
}
fn parse_bare_aliased_import(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<ImportInfo> {
let name_node = node.child_by_field_name("name")?;
let alias_node = node.child_by_field_name("alias")?;
let name = name_node.utf8_text(source).ok()?;
let alias = alias_node.utf8_text(source).ok()?;
Some(ImportInfo {
module_path: name.to_string(),
symbols: Vec::new(),
is_wildcard: false,
relative_level: 0,
aliases: vec![(name.to_string(), alias.to_string())],
})
}
fn extract_import_from_statement(
node: tree_sitter::Node<'_>,
source: &[u8],
imports: &mut Vec<ImportInfo>,
) {
let mut module_path = String::new();
let mut relative_level: usize = 0;
let mut symbols: Vec<String> = Vec::new();
let mut is_wildcard = false;
let mut aliases: Vec<(String, String)> = Vec::new();
let mut module_name_found = false;
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
match child.kind() {
"relative_import" => {
let mut rc = child.walk();
for rchild in child.children(&mut rc) {
match rchild.kind() {
"import_prefix" => {
if let Ok(prefix) = rchild.utf8_text(source) {
relative_level = prefix.chars().filter(|&c| c == '.').count();
}
}
"dotted_name" => {
if let Ok(name) = rchild.utf8_text(source) {
module_path = name.to_string();
}
}
_ => {}
}
}
module_name_found = true;
}
"dotted_name" => {
if !module_name_found {
if let Ok(name) = child.utf8_text(source) {
module_path = name.to_string();
}
module_name_found = true;
} else {
if let Ok(name) = child.utf8_text(source) {
symbols.push(name.to_string());
}
}
}
"wildcard_import" => {
is_wildcard = true;
}
"aliased_import" => {
if let Some((sym, al)) = Self::parse_from_aliased_symbol(child, source) {
symbols.push(sym.clone());
aliases.push((sym, al));
}
}
_ => {}
}
}
imports.push(ImportInfo {
module_path,
symbols,
is_wildcard,
relative_level,
aliases,
});
}
fn parse_from_aliased_symbol(
node: tree_sitter::Node<'_>,
source: &[u8],
) -> Option<(String, String)> {
let name_node = node.child_by_field_name("name")?;
let alias_node = node.child_by_field_name("alias")?;
let name = name_node.utf8_text(source).ok()?.to_string();
let alias = alias_node.utf8_text(source).ok()?.to_string();
Some((name, alias))
}
pub fn resolve_module_path(
&self,
source_file: &Path,
module_path: &str,
relative_level: usize,
) -> Result<PathBuf, ExtractionError> {
if relative_level == 0 {
let fs_path = module_path.replace('.', "/");
return Ok(PathBuf::from(format!("{fs_path}.py")));
}
let source_dir = source_file
.parent()
.ok_or_else(|| ExtractionError::ResolutionError {
module: module_path.to_string(),
source_file: source_file.to_path_buf(),
reason: "source file has no parent directory".into(),
})?;
let mut base = source_dir.to_path_buf();
for _ in 1..relative_level {
base = base.parent().map(Path::to_path_buf).ok_or_else(|| {
ExtractionError::ResolutionError {
module: module_path.to_string(),
source_file: source_file.to_path_buf(),
reason: format!(
"cannot navigate {} levels up from {}",
relative_level,
source_dir.display()
),
}
})?;
}
if module_path.is_empty() {
return Ok(base.join("__init__.py"));
}
let fs_path = module_path.replace('.', "/");
Ok(base.join(format!("{fs_path}.py")))
}
pub fn extract_dependency_edges(
&self,
source: &str,
file_path: &Path,
) -> Result<Vec<super::super::types::DependencyEdge>, ExtractionError> {
let imports = self.extract_imports(source, file_path)?;
let mut edges = Vec::new();
for import in &imports {
if let Ok(resolved) =
self.resolve_module_path(file_path, &import.module_path, import.relative_level)
{
edges.push(super::super::types::DependencyEdge::new(
file_path.to_path_buf(),
resolved,
super::super::types::DependencyType::Import,
));
}
}
Ok(edges)
}
}
impl Default for PythonDependencyExtractor {
fn default() -> Self {
Self::new()
}
}