use crate::graph::DependencyGraph;
use crate::imports::ModuleOrigin;
use crate::pyproject;
use anyhow::Result;
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::{HashMap, HashSet};
use std::sync::OnceLock;
#[derive(Debug)]
pub struct ExternalAnalysisResult {
pub frequency_analysis: Vec<DependencyUsage>,
pub summary: ExternalDependencySummary,
pub undeclared_dependencies: Vec<String>,
pub unused_dependencies: Vec<String>,
pub declared_externals_count: usize,
}
#[derive(Debug)]
pub struct DependencyUsage {
pub package_name: String,
pub usage_count: usize,
pub used_by_modules: Vec<String>,
}
#[derive(Debug)]
pub struct ExternalDependencySummary {
pub total_used_packages: usize,
}
pub fn analyze_external_dependencies(graph: &DependencyGraph) -> Result<ExternalAnalysisResult> {
let used_externals = pyproject::get_used_externals()?;
let frequency_analysis = collect_package_usage(graph, &used_externals)?;
let declared_deps = pyproject::get_declared_dependencies()?;
let (undeclared_dependencies, unused_dependencies) =
analyze_dependency_gaps(&frequency_analysis, &declared_deps)?;
let summary = ExternalDependencySummary {
total_used_packages: frequency_analysis.len(),
};
Ok(ExternalAnalysisResult {
frequency_analysis,
summary,
undeclared_dependencies,
unused_dependencies,
declared_externals_count: used_externals.len(),
})
}
fn collect_package_usage(graph: &DependencyGraph, used_externals: &[String]) -> Result<Vec<DependencyUsage>> {
let stdlib_modules = get_python_standard_library_modules();
let mut package_usage: HashMap<String, Vec<String>> = HashMap::new();
for package_name in used_externals {
if !stdlib_modules.contains(package_name) {
package_usage
.entry(package_name.clone())
.or_default()
.push("(declared)".to_string());
}
}
for module in graph.all_modules() {
if module.origin == ModuleOrigin::Internal {
let dependencies = graph.get_dependencies_with_types(module)?;
for (dep_module, _dep_type) in dependencies {
if let Some(external_module) = graph
.all_modules()
.find(|m| m.canonical_path == dep_module && m.origin == ModuleOrigin::External)
{
let package_name = extract_root_package_name(&external_module.canonical_path);
if stdlib_modules.contains(&package_name) {
continue;
}
package_usage
.entry(package_name)
.or_default()
.push(module.canonical_path.clone());
}
}
}
}
let mut frequency_analysis: Vec<DependencyUsage> = package_usage
.into_iter()
.map(|(package_name, mut used_by_modules)| {
used_by_modules.sort();
used_by_modules.dedup();
DependencyUsage {
package_name,
usage_count: used_by_modules.len(),
used_by_modules,
}
})
.collect();
frequency_analysis.sort_by(|a, b| {
b.usage_count
.cmp(&a.usage_count)
.then_with(|| a.package_name.cmp(&b.package_name))
});
Ok(frequency_analysis)
}
fn analyze_dependency_gaps(
frequency_analysis: &[DependencyUsage],
declared_deps: &[String],
) -> Result<(Vec<String>, Vec<String>)> {
let declared_deps_set: HashSet<&str> = declared_deps.iter().map(String::as_str).collect();
let mapping = build_complete_mapping(declared_deps)?;
let resolved_used_deps: HashSet<String> = frequency_analysis
.iter()
.map(|dep| resolve_import_to_package_name(&mapping, &dep.package_name))
.collect();
let mut undeclared_dependencies: Vec<String> = resolved_used_deps
.iter()
.filter(|dep| !declared_deps_set.contains(dep.as_str()))
.cloned()
.collect();
undeclared_dependencies.sort();
let mut unused_dependencies: Vec<String> = declared_deps_set
.iter()
.filter(|dep| !resolved_used_deps.contains(**dep))
.map(|s| s.to_string())
.collect();
unused_dependencies.sort();
Ok((undeclared_dependencies, unused_dependencies))
}
static PYTHON_STDLIB_MODULES: OnceLock<HashSet<String>> = OnceLock::new();
fn get_python_standard_library_modules() -> &'static HashSet<String> {
PYTHON_STDLIB_MODULES.get_or_init(|| {
for python_cmd in ["python", "python3"] {
match std::process::Command::new(python_cmd)
.args([
"-c",
"import sys; print('\\n'.join(sys.stdlib_module_names))",
])
.output()
{
Ok(output) if output.status.success() => {
let result: HashSet<String> = String::from_utf8_lossy(&output.stdout)
.lines()
.map(|line| line.trim().to_string())
.filter(|line| !line.is_empty())
.collect();
if !result.is_empty() {
return result;
}
}
_ => continue, }
}
println!("Warning: Could not detect Python stdlib modules. Install Python or ensure it's in PATH.");
HashSet::new()
})
}
fn extract_root_package_name(module_path: &str) -> String {
module_path
.split('.')
.next()
.unwrap_or(module_path)
.to_string()
}
#[derive(Debug, Clone)]
struct PackageImportMapping {
static_mappings: HashMap<String, String>,
api_mappings: HashMap<String, String>,
}
impl PackageImportMapping {
fn new() -> Result<Self> {
Ok(Self {
static_mappings: load_static_package_mappings()?,
api_mappings: HashMap::new(),
})
}
fn resolve_import_to_package(&self, import_name: &str) -> String {
let normalized_name = import_name.to_lowercase();
if let Some(package_name) = self.static_mappings.get(&normalized_name) {
return package_name.clone();
}
if let Some(package_name) = self.static_mappings.get(import_name) {
return package_name.clone();
}
if let Some(package_name) = self.api_mappings.get(&normalized_name) {
return package_name.clone();
}
if let Some(package_name) = self.api_mappings.get(import_name) {
return package_name.clone();
}
import_name.to_string()
}
fn add_mapping(&mut self, import_name: String, package_name: String) {
let normalized_name = import_name.to_lowercase();
self.api_mappings.insert(import_name.clone(), package_name.clone());
if normalized_name != import_name {
self.api_mappings.insert(normalized_name, package_name);
}
}
}
fn build_complete_mapping(declared_packages: &[String]) -> Result<PackageImportMapping> {
let mut mapping = PackageImportMapping::new()?;
if declared_packages.is_empty() {
return Ok(mapping);
}
let pb = ProgressBar::new(declared_packages.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos:>7}/{len:7} {msg}")
.unwrap()
.progress_chars("##-"),
);
pb.set_message("Fetching package mappings");
for package_name in declared_packages {
if let Ok(import_names) = query_pypi_for_imports(package_name) {
for import_name in import_names {
mapping.add_mapping(import_name, package_name.clone());
}
}
pb.inc(1);
}
pb.finish_and_clear();
Ok(mapping)
}
fn resolve_import_to_package_name(mapping: &PackageImportMapping, import_name: &str) -> String {
mapping.resolve_import_to_package(import_name)
}
#[derive(serde::Deserialize)]
struct PackageMappingsJson {
import_to_package: HashMap<String, String>,
}
fn load_static_package_mappings() -> Result<HashMap<String, String>> {
let json_content = include_str!("package_mappings.json");
let mappings: PackageMappingsJson = serde_json::from_str(json_content)?;
Ok(mappings.import_to_package)
}
#[derive(serde::Deserialize)]
struct PyPIResponse {
info: PyPIInfo,
}
#[derive(serde::Deserialize)]
struct PyPIInfo {
#[serde(default)]
top_level: Vec<String>,
}
fn query_pypi_for_imports(package_name: &str) -> Result<Vec<String>> {
const MAX_RETRIES: u32 = 2;
let client = reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()?;
let url = format!("https://pypi.org/pypi/{}/json", package_name);
for attempt in 0..=MAX_RETRIES {
match client.get(&url).send() {
Ok(response) if response.status().is_success() => {
match response.json::<PyPIResponse>() {
Ok(pypi_response) => {
if !pypi_response.info.top_level.is_empty() {
return Ok(pypi_response.info.top_level);
} else {
return Ok(vec![package_name.replace('-', "_")]);
}
}
Err(_) => {
if attempt == MAX_RETRIES {
return Ok(vec![package_name.replace('-', "_")]);
}
}
}
}
Ok(response) if response.status().is_client_error() => {
return Ok(vec![package_name.replace('-', "_")]);
}
_ => {
if attempt == MAX_RETRIES {
return Ok(vec![package_name.replace('-', "_")]);
}
std::thread::sleep(std::time::Duration::from_millis(100 * (attempt + 1) as u64));
}
}
}
Ok(vec![package_name.replace('-', "_")])
}
pub mod formatters {
use super::*;
pub fn format_text_grouped(result: &ExternalAnalysisResult) -> String {
let mut output = String::new();
output.push_str("External Dependencies Analysis:\n\n");
if result.frequency_analysis.is_empty() {
output.push_str("No external dependencies found.\n");
return output;
}
output.push_str("=== Frequency Analysis ===\n");
let high_usage: Vec<_> = result
.frequency_analysis
.iter()
.filter(|dep| dep.usage_count >= 30)
.collect();
let medium_usage: Vec<_> = result
.frequency_analysis
.iter()
.filter(|dep| dep.usage_count >= 5 && dep.usage_count < 30)
.collect();
let low_usage: Vec<_> = result
.frequency_analysis
.iter()
.filter(|dep| dep.usage_count < 5)
.collect();
if !high_usage.is_empty() {
output.push_str("High usage (10+ modules):\n");
for dep in high_usage {
output.push_str(&format!(
" {} (used by {} modules)\n",
dep.package_name, dep.usage_count
));
}
output.push('\n');
}
if !medium_usage.is_empty() {
output.push_str("Medium usage (5-9 modules):\n");
for dep in medium_usage {
output.push_str(&format!(
" {} (used by {} modules)\n",
dep.package_name, dep.usage_count
));
}
output.push('\n');
}
if !low_usage.is_empty() {
output.push_str("Low usage (1-4 modules):\n");
for dep in low_usage {
output.push_str(&format!(
" {} (used by {} modules)\n",
dep.package_name, dep.usage_count
));
}
output.push('\n');
}
output.push_str("=== Summary ===\n");
output.push_str(&format!(
"Total external packages used: {}\n",
result.summary.total_used_packages
));
if result.declared_externals_count > 0 {
output.push_str(&format!(
"Manually declared externals: {}\n",
result.declared_externals_count
));
}
if !result.undeclared_dependencies.is_empty() {
output.push_str("\n=== Undeclared Dependencies ===\n");
output.push_str("(Used in code but not declared in pyproject.toml)\n");
for dep in &result.undeclared_dependencies {
output.push_str(&format!(" {}\n", dep));
}
}
if !result.unused_dependencies.is_empty() {
output.push_str("\n=== Unused Dependencies ===\n");
output.push_str("(Declared in pyproject.toml but not used in code)\n");
for dep in &result.unused_dependencies {
output.push_str(&format!(" {}\n", dep));
}
}
if !result.undeclared_dependencies.is_empty() || !result.unused_dependencies.is_empty() {
output.push_str("\n=== Dependency Sync Status ===\n");
output.push_str(&format!(
"Undeclared dependencies: {}\n",
result.undeclared_dependencies.len()
));
output.push_str(&format!(
"Unused dependencies: {}\n",
result.unused_dependencies.len()
));
} else {
output.push_str("\n=== Dependency Sync Status ===\n");
output.push_str("✓ All used dependencies are properly declared in pyproject.toml\n");
output.push_str("✓ No unused dependencies found\n");
}
output
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::DependencyGraph;
use crate::graph::DependencyType;
use crate::imports::{ModuleIdentifier, ModuleOrigin};
fn create_test_module_id(name: &str, origin: ModuleOrigin) -> ModuleIdentifier {
ModuleIdentifier {
origin,
canonical_path: name.to_string(),
}
}
#[test]
fn test_extract_root_package_name() {
assert_eq!(extract_root_package_name("numpy"), "numpy");
assert_eq!(extract_root_package_name("numpy.testing"), "numpy");
assert_eq!(extract_root_package_name("numpy.testing.utils"), "numpy");
assert_eq!(extract_root_package_name("scipy.stats"), "scipy");
}
#[test]
fn test_analyze_external_dependencies() {
use crate::pyproject::{init_for_test, reset_for_test};
use tempfile::TempDir;
reset_for_test();
let temp_dir = TempDir::new().unwrap();
init_for_test(temp_dir.path());
let mut graph = DependencyGraph::new();
let internal1 = create_test_module_id("myapp.main", ModuleOrigin::Internal);
let internal2 = create_test_module_id("myapp.utils", ModuleOrigin::Internal);
let numpy_id = create_test_module_id("numpy", ModuleOrigin::External);
let numpy_testing_id = create_test_module_id("numpy.testing", ModuleOrigin::External);
let pandas_id = create_test_module_id("pandas", ModuleOrigin::External);
graph.add_module(internal1.clone());
graph.add_module(internal2.clone());
graph.add_module(numpy_id.clone());
graph.add_module(numpy_testing_id.clone());
graph.add_module(pandas_id.clone());
graph
.add_dependency(&internal1, &numpy_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &pandas_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal2, &numpy_testing_id, DependencyType::Imports)
.unwrap();
let result = analyze_external_dependencies(&graph).unwrap();
assert_eq!(result.summary.total_used_packages, 2); assert_eq!(result.frequency_analysis.len(), 2);
let numpy_usage = result
.frequency_analysis
.iter()
.find(|dep| dep.package_name == "numpy")
.unwrap();
let pandas_usage = result
.frequency_analysis
.iter()
.find(|dep| dep.package_name == "pandas")
.unwrap();
assert_eq!(numpy_usage.usage_count, 2); assert_eq!(pandas_usage.usage_count, 1);
assert_eq!(result.undeclared_dependencies.len(), 2); assert!(
result
.undeclared_dependencies
.contains(&"numpy".to_string())
);
assert!(
result
.undeclared_dependencies
.contains(&"pandas".to_string())
);
assert!(result.unused_dependencies.is_empty()); }
#[test]
fn test_stdlib_modules_filtered_out() {
let mut graph = DependencyGraph::new();
let internal1 = create_test_module_id("myapp.main", ModuleOrigin::Internal);
let numpy_id = create_test_module_id("numpy", ModuleOrigin::External);
let sys_id = create_test_module_id("sys", ModuleOrigin::External);
let os_id = create_test_module_id("os", ModuleOrigin::External);
let json_id = create_test_module_id("json", ModuleOrigin::External);
graph.add_module(internal1.clone());
graph.add_module(numpy_id.clone());
graph.add_module(sys_id.clone());
graph.add_module(os_id.clone());
graph.add_module(json_id.clone());
graph
.add_dependency(&internal1, &numpy_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &sys_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &os_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &json_id, DependencyType::Imports)
.unwrap();
let result = analyze_external_dependencies(&graph).unwrap();
assert_eq!(result.summary.total_used_packages, 1);
assert_eq!(result.frequency_analysis.len(), 1);
assert_eq!(result.frequency_analysis[0].package_name, "numpy");
}
#[test]
fn test_used_externals_integration() {
use crate::pyproject::{init_for_test, reset_for_test};
use std::fs;
use tempfile::TempDir;
reset_for_test();
let temp_dir = TempDir::new().unwrap();
let used_externals_content = r#"# Manually declared packages
setuptools
wheel
redis
tensorflow # This one won't be used in code
"#;
fs::write(temp_dir.path().join(".used-externals.txt"), used_externals_content).unwrap();
init_for_test(temp_dir.path());
let mut graph = DependencyGraph::new();
let internal1 = create_test_module_id("myapp.main", ModuleOrigin::Internal);
let numpy_id = create_test_module_id("numpy", ModuleOrigin::External); let redis_id = create_test_module_id("redis", ModuleOrigin::External); let setuptools_id = create_test_module_id("setuptools", ModuleOrigin::External);
graph.add_module(internal1.clone());
graph.add_module(numpy_id.clone());
graph.add_module(redis_id.clone());
graph.add_module(setuptools_id.clone());
graph
.add_dependency(&internal1, &numpy_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &redis_id, DependencyType::Imports)
.unwrap();
let result = analyze_external_dependencies(&graph).unwrap();
assert_eq!(result.summary.total_used_packages, 5);
assert_eq!(result.declared_externals_count, 4);
let package_names: Vec<&str> = result
.frequency_analysis
.iter()
.map(|dep| dep.package_name.as_str())
.collect();
assert!(package_names.contains(&"numpy")); assert!(package_names.contains(&"redis")); assert!(package_names.contains(&"setuptools")); assert!(package_names.contains(&"wheel")); assert!(package_names.contains(&"tensorflow"));
let redis_usage = result
.frequency_analysis
.iter()
.find(|dep| dep.package_name == "redis")
.unwrap();
assert_eq!(redis_usage.usage_count, 2);
let setuptools_usage = result
.frequency_analysis
.iter()
.find(|dep| dep.package_name == "setuptools")
.unwrap();
assert_eq!(setuptools_usage.usage_count, 1); assert!(setuptools_usage.used_by_modules.contains(&"(declared)".to_string()));
let numpy_usage = result
.frequency_analysis
.iter()
.find(|dep| dep.package_name == "numpy")
.unwrap();
assert_eq!(numpy_usage.usage_count, 1); assert!(!numpy_usage.used_by_modules.contains(&"(declared)".to_string()));
}
#[test]
fn test_dependency_diff_analysis() {
use crate::pyproject::{init_for_test, reset_for_test};
use std::fs;
use tempfile::TempDir;
reset_for_test();
let temp_dir = TempDir::new().unwrap();
let pyproject_content = r#"
[tool.poetry.dependencies]
python = ">=3.10,<3.11"
numpy = "^1.24.3"
pandas = "^2.0.3"
unused-package = "^1.0.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
"#;
fs::write(temp_dir.path().join("pyproject.toml"), pyproject_content).unwrap();
init_for_test(temp_dir.path());
let mut graph = DependencyGraph::new();
let internal1 = create_test_module_id("myapp.main", ModuleOrigin::Internal);
let numpy_id = create_test_module_id("numpy", ModuleOrigin::External);
let torch_id = create_test_module_id("torch", ModuleOrigin::External); let sklearn_id = create_test_module_id("sklearn", ModuleOrigin::External);
graph.add_module(internal1.clone());
graph.add_module(numpy_id.clone());
graph.add_module(torch_id.clone());
graph.add_module(sklearn_id.clone());
graph
.add_dependency(&internal1, &numpy_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &torch_id, DependencyType::Imports)
.unwrap();
graph
.add_dependency(&internal1, &sklearn_id, DependencyType::Imports)
.unwrap();
let result = analyze_external_dependencies(&graph).unwrap();
assert_eq!(result.summary.total_used_packages, 3);
assert_eq!(result.frequency_analysis.len(), 3);
assert!(
result
.undeclared_dependencies
.contains(&"torch".to_string())
);
assert!(
result
.undeclared_dependencies
.contains(&"scikit-learn".to_string())
);
assert!(
!result
.undeclared_dependencies
.contains(&"numpy".to_string())
); assert_eq!(result.undeclared_dependencies.len(), 2);
assert!(result.unused_dependencies.contains(&"pandas".to_string()));
assert!(result.unused_dependencies.contains(&"pytest".to_string()));
assert!(
result
.unused_dependencies
.contains(&"unused-package".to_string())
);
assert!(!result.unused_dependencies.contains(&"numpy".to_string())); assert_eq!(result.unused_dependencies.len(), 3);
}
#[test]
fn test_get_python_standard_library_modules() {
let stdlib_modules = get_python_standard_library_modules();
assert!(stdlib_modules.contains("sys"));
assert!(stdlib_modules.contains("os"));
assert!(stdlib_modules.contains("json"));
assert!(stdlib_modules.contains("collections"));
assert!(!stdlib_modules.contains("numpy"));
assert!(!stdlib_modules.contains("pandas"));
assert!(!stdlib_modules.contains("torch"));
}
}