use std::path::{Path, PathBuf};
use domain::model::{Edge, EdgeKind, Language};
use super::{ImportResolver, ResolveContext};
use crate::ParseResult;
static STDLIB_MODULES: &[&str] = &[
"abc",
"aifc",
"argparse",
"array",
"ast",
"asynchat",
"asyncio",
"asyncore",
"atexit",
"audioop",
"base64",
"bdb",
"binascii",
"binhex",
"bisect",
"builtins",
"bz2",
"calendar",
"cgi",
"cgitb",
"chunk",
"cmath",
"cmd",
"code",
"codecs",
"codeop",
"collections",
"colorsys",
"compileall",
"concurrent",
"configparser",
"contextlib",
"contextvars",
"copy",
"copyreg",
"cProfile",
"crypt",
"csv",
"ctypes",
"curses",
"dataclasses",
"datetime",
"dbm",
"decimal",
"difflib",
"dis",
"distutils",
"doctest",
"email",
"encodings",
"enum",
"errno",
"faulthandler",
"fcntl",
"filecmp",
"fileinput",
"fnmatch",
"formatter",
"fractions",
"ftplib",
"functools",
"gc",
"getopt",
"getpass",
"gettext",
"glob",
"grp",
"gzip",
"hashlib",
"heapq",
"hmac",
"html",
"http",
"idlelib",
"imaplib",
"imghdr",
"imp",
"importlib",
"inspect",
"io",
"ipaddress",
"itertools",
"json",
"keyword",
"lib2to3",
"linecache",
"locale",
"logging",
"lzma",
"mailbox",
"mailcap",
"marshal",
"math",
"mimetypes",
"mmap",
"modulefinder",
"multiprocessing",
"netrc",
"nis",
"nntplib",
"numbers",
"operator",
"optparse",
"os",
"ossaudiodev",
"parser",
"pathlib",
"pdb",
"pickle",
"pickletools",
"pipes",
"pkgutil",
"platform",
"plistlib",
"poplib",
"posix",
"posixpath",
"pprint",
"profile",
"pstats",
"pty",
"pwd",
"py_compile",
"pyclbr",
"pydoc",
"queue",
"quopri",
"random",
"re",
"readline",
"reprlib",
"resource",
"rlcompleter",
"runpy",
"sched",
"secrets",
"select",
"selectors",
"shelve",
"shlex",
"shutil",
"signal",
"site",
"smtpd",
"smtplib",
"sndhdr",
"socket",
"socketserver",
"spwd",
"sqlite3",
"sre_compile",
"sre_constants",
"sre_parse",
"ssl",
"stat",
"statistics",
"string",
"stringprep",
"struct",
"subprocess",
"sunau",
"symtable",
"sys",
"sysconfig",
"syslog",
"tabnanny",
"tarfile",
"telnetlib",
"tempfile",
"termios",
"test",
"textwrap",
"threading",
"time",
"timeit",
"tkinter",
"token",
"tokenize",
"tomllib",
"trace",
"traceback",
"tracemalloc",
"tty",
"turtle",
"turtledemo",
"types",
"typing",
"unicodedata",
"unittest",
"urllib",
"uu",
"uuid",
"venv",
"warnings",
"wave",
"weakref",
"webbrowser",
"winreg",
"winsound",
"wsgiref",
"xdrlib",
"xml",
"xmlrpc",
"zipapp",
"zipfile",
"zipimport",
"zlib",
"_thread",
"__future__",
"_abc",
"_collections_abc",
];
fn is_stdlib(first_segment: &str) -> bool {
STDLIB_MODULES.contains(&first_segment)
}
fn try_resolve(candidate: &Path, file_tree: &[PathBuf]) -> Option<PathBuf> {
let py_path = candidate.with_extension("py");
if file_tree.contains(&py_path) {
return Some(py_path);
}
let init_path = candidate.join("__init__.py");
if file_tree.contains(&init_path) {
return Some(init_path);
}
None
}
fn resolve_python_import(
specifier: &str,
current_file: &Path,
project_root: &Path,
file_tree: &[PathBuf],
package_roots: &[PathBuf],
) -> Option<PathBuf> {
if specifier.starts_with('.') {
let dot_count = specifier.chars().take_while(|c| *c == '.').count();
let module_path = &specifier[dot_count..];
let mut base_dir = current_file.parent().unwrap_or(current_file).to_path_buf();
for _ in 1..dot_count {
base_dir = base_dir.parent().unwrap_or(&base_dir).to_path_buf();
}
let candidate = if module_path.is_empty() {
base_dir
} else {
let rel: PathBuf = module_path.replace('.', "/").into();
base_dir.join(rel)
};
return try_resolve(&candidate, file_tree);
}
let first_segment = specifier.split('.').next().unwrap_or(specifier);
if is_stdlib(first_segment) {
return None;
}
let rel: PathBuf = specifier.replace('.', "/").into();
for package_root in package_roots {
let candidate = package_root.join(&rel);
if let Some(resolved) = try_resolve(&candidate, file_tree) {
return Some(resolved);
}
}
let candidate = project_root.join(rel);
try_resolve(&candidate, file_tree)
}
pub struct PythonConfig {
pub package_roots: Vec<PathBuf>,
}
impl PythonConfig {
pub fn load(project_root: &Path) -> Self {
let src = project_root.join("src");
if src.is_dir() {
PythonConfig {
package_roots: vec![src],
}
} else {
PythonConfig {
package_roots: vec![],
}
}
}
}
pub struct PythonResolver {
config: PythonConfig,
}
impl PythonResolver {
pub fn new(config: PythonConfig) -> Self {
PythonResolver { config }
}
}
impl ImportResolver for PythonResolver {
fn languages(&self) -> &[Language] {
&[Language::Python]
}
fn resolve(
&self,
file_path: &Path,
parse_result: &ParseResult,
context: &ResolveContext,
) -> domain::error::Result<Vec<Edge>> {
let source = file_path.to_string_lossy().into_owned();
let mut edges = Vec::new();
for import in &parse_result.imports {
let resolved = resolve_python_import(
&import.specifier,
file_path,
&context.project_root,
&context.file_tree,
&self.config.package_roots,
);
if let Some(target_path) = resolved {
let target = target_path.to_string_lossy().into_owned();
let kind = if import.is_type_only {
EdgeKind::ConditionalImport
} else {
EdgeKind::ImportsFrom
};
edges.push(Edge {
kind,
source: source.clone(),
target,
metadata: None,
});
}
}
Ok(edges)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use domain::model::EdgeKind;
use super::{PythonConfig, PythonResolver};
use crate::resolver::{ImportResolver, ResolveContext};
use crate::{ImportName, ParseResult, RawImport};
fn make_resolver() -> PythonResolver {
PythonResolver::new(PythonConfig {
package_roots: vec![],
})
}
fn make_context(project_root: &str, file_tree: Vec<&str>) -> ResolveContext {
ResolveContext {
project_root: PathBuf::from(project_root),
parsed_files: HashMap::new(),
file_tree: file_tree.into_iter().map(PathBuf::from).collect(),
}
}
#[test]
fn resolves_relative_import_single_dot() {
let context = make_context(
"/project",
vec!["/project/app/models.py", "/project/app/views.py"],
);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: ".models".into(),
names: vec![ImportName {
name: "User".into(),
alias: None,
is_type: false,
}],
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/app/views.py"), &parse_result, &context)
.unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].kind, EdgeKind::ImportsFrom);
assert_eq!(edges[0].source, "/project/app/views.py");
assert_eq!(edges[0].target, "/project/app/models.py");
}
#[test]
fn resolves_relative_import_double_dot() {
let context = make_context(
"/project",
vec!["/project/utils.py", "/project/app/views.py"],
);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: "..utils".into(),
names: vec![ImportName {
name: "helper".into(),
alias: None,
is_type: false,
}],
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/app/views.py"), &parse_result, &context)
.unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].kind, EdgeKind::ImportsFrom);
assert_eq!(edges[0].source, "/project/app/views.py");
assert_eq!(edges[0].target, "/project/utils.py");
}
#[test]
fn skips_stdlib_import() {
let context = make_context("/project", vec![]);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: "os".into(),
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/main.py"), &parse_result, &context)
.unwrap();
assert!(edges.is_empty(), "stdlib import should produce no edge");
}
#[test]
fn skips_stdlib_submodule_import() {
let context = make_context("/project", vec![]);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: "os.path".into(),
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/main.py"), &parse_result, &context)
.unwrap();
assert!(
edges.is_empty(),
"stdlib submodule import should produce no edge"
);
}
#[test]
fn creates_conditional_import_for_type_checking() {
let context = make_context(
"/project",
vec!["/project/app/models.py", "/project/app/views.py"],
);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: ".models".into(),
names: vec![ImportName {
name: "User".into(),
alias: None,
is_type: false,
}],
is_type_only: true,
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/app/views.py"), &parse_result, &context)
.unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].kind, EdgeKind::ConditionalImport);
}
#[test]
fn resolves_absolute_local_import() {
let context = make_context("/project", vec!["/project/utils/helpers.py"]);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: "utils.helpers".into(),
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/main.py"), &parse_result, &context)
.unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].kind, EdgeKind::ImportsFrom);
assert_eq!(edges[0].target, "/project/utils/helpers.py");
}
#[test]
fn resolves_package_import_to_init() {
let context = make_context("/project", vec!["/project/mypackage/__init__.py"]);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: "mypackage".into(),
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/main.py"), &parse_result, &context)
.unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].target, "/project/mypackage/__init__.py");
}
#[test]
fn unresolvable_import_produces_no_edge() {
let context = make_context("/project", vec![]);
let parse_result = ParseResult {
imports: vec![RawImport {
specifier: "third_party_lib".into(),
..Default::default()
}],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/main.py"), &parse_result, &context)
.unwrap();
assert!(edges.is_empty());
}
#[test]
fn resolves_multiple_imports() {
let context = make_context("/project", vec!["/project/models.py", "/project/utils.py"]);
let parse_result = ParseResult {
imports: vec![
RawImport {
specifier: "models".into(),
..Default::default()
},
RawImport {
specifier: "utils".into(),
..Default::default()
},
RawImport {
specifier: "sys".into(), ..Default::default()
},
],
..Default::default()
};
let resolver = make_resolver();
let edges = resolver
.resolve(Path::new("/project/main.py"), &parse_result, &context)
.unwrap();
assert_eq!(edges.len(), 2);
}
}
#[cfg(test)]
mod config_tests {
use super::*;
#[test]
fn python_config_detects_src_dir() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir_all(dir.path().join("src")).unwrap();
let config = PythonConfig::load(dir.path());
assert_eq!(config.package_roots.len(), 1);
assert_eq!(config.package_roots[0], dir.path().join("src"));
}
#[test]
fn python_config_empty_without_src() {
let dir = tempfile::tempdir().unwrap();
let config = PythonConfig::load(dir.path());
assert!(config.package_roots.is_empty());
}
}