use anyhow::Result;
use clap::{Parser, Subcommand, ValueEnum};
use std::fs;
use std::path::{Path, PathBuf};
use dissolve_python::migrate_ruff;
use dissolve_python::type_introspection_context::TypeIntrospectionContext;
use dissolve_python::unified_visitor::{UnifiedResult, UnifiedVisitor};
use dissolve_python::TypeIntrospectionMethod;
use dissolve_python::{
check_file, collect_deprecated_from_dependencies,
collect_deprecated_from_dependencies_with_paths,
};
#[derive(Parser)]
#[command(name = "dissolve")]
#[command(about = "Dissolve - Replace deprecated API usage")]
#[command(version)]
struct Cli {
#[arg(long)]
debug: bool,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Migrate {
paths: Vec<String>,
#[arg(short, long)]
module: bool,
#[arg(short, long, group = "mode")]
write: bool,
#[arg(long, group = "mode")]
check: bool,
#[arg(long, group = "mode")]
interactive: bool,
#[arg(long, value_enum, default_value = "pyright-mypy")]
type_introspection: TypeIntrospectionMethodArg,
},
Cleanup {
paths: Vec<String>,
#[arg(short, long)]
module: bool,
#[arg(short, long, group = "cleanup_mode")]
write: bool,
#[arg(long)]
before: Option<String>,
#[arg(long)]
all: bool,
#[arg(long, group = "cleanup_mode")]
check: bool,
#[arg(long)]
current_version: Option<String>,
},
Check {
paths: Vec<String>,
#[arg(short, long)]
module: bool,
},
Info {
paths: Vec<String>,
#[arg(short, long)]
module: bool,
},
}
#[derive(ValueEnum, Clone)]
enum TypeIntrospectionMethodArg {
#[value(name = "pyright-lsp")]
PyrightLsp,
#[value(name = "mypy-daemon")]
MypyDaemon,
#[value(name = "pyright-mypy")]
PyrightWithMypyFallback,
}
impl From<TypeIntrospectionMethodArg> for TypeIntrospectionMethod {
fn from(arg: TypeIntrospectionMethodArg) -> Self {
match arg {
TypeIntrospectionMethodArg::PyrightLsp => TypeIntrospectionMethod::PyrightLsp,
TypeIntrospectionMethodArg::MypyDaemon => TypeIntrospectionMethod::MypyDaemon,
TypeIntrospectionMethodArg::PyrightWithMypyFallback => {
TypeIntrospectionMethod::PyrightWithMypyFallback
}
}
}
}
fn discover_python_files(path: &str, _as_module: bool) -> Result<Vec<PathBuf>> {
let path = Path::new(path);
if path.is_file() && path.extension().is_some_and(|ext| ext == "py") {
return Ok(vec![path.to_path_buf()]);
}
if path.is_dir() {
let mut python_files = Vec::new();
visit_python_files(path, &mut python_files)?;
python_files.sort();
return Ok(python_files);
}
if path.to_string_lossy().contains('*') || path.to_string_lossy().contains('?') {
let pattern = path.to_string_lossy();
let glob_results = glob::glob(&pattern)?;
let mut files = Vec::new();
for entry in glob_results {
let entry = entry?;
if entry.extension().is_some_and(|ext| ext == "py") {
files.push(entry);
}
}
files.sort();
return Ok(files);
}
Ok(vec![path.to_path_buf()])
}
fn visit_python_files(dir: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
if dir.is_dir() {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name() {
let name = name.to_string_lossy();
if !name.starts_with('.') && name != "__pycache__" {
visit_python_files(&path, files)?;
}
}
} else if path.extension().is_some_and(|ext| ext == "py") {
files.push(path);
}
}
}
Ok(())
}
fn expand_paths(paths: &[String], as_module: bool) -> Result<Vec<PathBuf>> {
use indexmap::IndexSet;
let mut expanded = IndexSet::new();
for path in paths {
expanded.extend(discover_python_files(path, as_module)?);
}
Ok(expanded.into_iter().collect())
}
fn detect_module_name(file_path: &Path) -> String {
let mut current_dir = file_path.parent().unwrap_or(Path::new("."));
let mut module_parts = Vec::new();
if let Some(stem) = file_path.file_stem() {
if stem != "__init__" {
module_parts.push(stem.to_string_lossy());
}
}
loop {
let init_file = current_dir.join("__init__.py");
if !init_file.exists() {
break;
}
if let Some(package_name) = current_dir.file_name() {
module_parts.insert(0, package_name.to_string_lossy());
}
match current_dir.parent() {
Some(parent) if parent != current_dir => current_dir = parent,
_ => break,
}
}
if !module_parts.is_empty() {
module_parts.join(".")
} else {
file_path
.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default()
}
}
fn process_files_common<F>(
files: &[PathBuf],
mut process_func: F,
check: bool,
write: bool,
operation_name: &str,
) -> Result<i32>
where
F: FnMut(&PathBuf) -> Result<(String, String)>,
{
let mut needs_changes = false;
for filepath in files {
let (original, result) = process_func(filepath)?;
let has_changes = result != original;
if check {
if has_changes {
println!("{}: needs {}", filepath.display(), operation_name);
needs_changes = true;
} else {
println!("{}: up to date", filepath.display());
}
} else if write {
if has_changes {
fs::write(filepath, &result)?;
println!("Modified: {}", filepath.display());
} else {
println!("Unchanged: {}", filepath.display());
}
} else {
println!("# {}: {}", operation_name, filepath.display());
println!("{}", result);
println!();
}
}
Ok(if check && needs_changes { 1 } else { 0 })
}
fn main() -> Result<()> {
let cli = Cli::parse();
if cli.debug || std::env::var("RUST_LOG").is_ok() {
let filter = match tracing_subscriber::EnvFilter::try_from_default_env() {
Ok(filter) => filter,
Err(_) => {
if cli.debug {
tracing_subscriber::EnvFilter::new("debug")
} else {
tracing_subscriber::EnvFilter::new("warn")
}
}
};
tracing_subscriber::fmt()
.with_env_filter(filter)
.without_time()
.init();
} else {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::WARN)
.without_time()
.init();
}
match cli.command {
Commands::Migrate {
paths,
module: _module,
write,
check,
interactive,
type_introspection,
} => {
let files = expand_paths(&paths, false)?; let type_method: TypeIntrospectionMethod = type_introspection.into();
let mut type_context = TypeIntrospectionContext::new(type_method)?;
let mut needs_changes = false;
for filepath in &files {
let original = fs::read_to_string(filepath)?;
let module_name = detect_module_name(filepath);
let result = if interactive {
interactive_migrate_file_content(
&original,
&module_name,
filepath,
&mut type_context,
)?
} else {
migrate_file_content(&original, &module_name, filepath, &mut type_context)?
};
let has_changes = result.as_ref().is_some_and(|r| r != &original);
if check {
if has_changes {
println!("{}: needs migration", filepath.display());
needs_changes = true;
} else {
println!("{}: up to date", filepath.display());
}
} else if write {
if let Some(new_content) = result {
if new_content != original {
fs::write(filepath, new_content)?;
println!("Modified: {}", filepath.display());
} else {
println!("Unchanged: {}", filepath.display());
}
} else {
println!("Unchanged: {}", filepath.display());
}
} else {
if let Some(new_content) = result {
if new_content != original {
println!("# migration: {}", filepath.display());
print!("{}", new_content);
}
}
}
}
type_context.shutdown()?;
std::process::exit(if check && needs_changes { 1 } else { 0 });
}
Commands::Cleanup {
paths,
module: _,
write,
before,
all,
check,
current_version,
} => {
let files = expand_paths(&paths, false)?;
let exit_code = process_files_common(
&files,
|filepath| {
let original = fs::read_to_string(filepath)?;
let module_name = detect_module_name(filepath);
let visitor = UnifiedVisitor::new_for_removal(
&module_name,
Some(filepath),
before.as_deref(),
all,
current_version.as_deref(),
);
let unified_result = visitor.process_source(original.clone())?;
let result = match unified_result {
UnifiedResult::Removal(cleaned_source) => {
let original_lines = original.lines().count();
let new_lines = cleaned_source.lines().count();
let removed_count = if original_lines > new_lines {
(original_lines - new_lines) / 10 } else {
0
};
if removed_count > 0 {
println!(
"Would remove approximately {} functions from {}",
removed_count,
filepath.display()
);
}
cleaned_source
}
_ => {
return Err(anyhow::anyhow!(
"Expected removal result from UnifiedVisitor"
))
}
};
Ok((original, result))
},
check,
write,
"function cleanup",
)?;
std::process::exit(exit_code);
}
Commands::Check { paths, module: _ } => {
let files = expand_paths(&paths, false)?; let mut errors_found = false;
for filepath in &files {
let source = fs::read_to_string(filepath)?;
let module_name = detect_module_name(filepath);
let result = check_file(&source, &module_name, filepath)?;
if result.success {
if !result.checked_functions.is_empty() {
println!(
"{}: {} @replace_me function(s) can be replaced",
filepath.display(),
result.checked_functions.len()
);
}
} else {
errors_found = true;
println!("{}: ERRORS found", filepath.display());
for error in &result.errors {
println!(" {}", error);
}
}
}
std::process::exit(if errors_found { 1 } else { 0 });
}
Commands::Info { paths, module: _ } => {
let files = expand_paths(&paths, false)?;
let mut all_deprecated: std::collections::HashMap<
String,
dissolve_python::ReplaceInfo,
> = std::collections::HashMap::new();
let mut total_files = 0;
for filepath in &files {
total_files += 1;
let source = fs::read_to_string(filepath)?;
let module_name = detect_module_name(filepath);
let visitor = UnifiedVisitor::new_for_collection(&module_name, Some(filepath));
let unified_result = visitor.process_source(source.clone())?;
if let UnifiedResult::Collection(result) = unified_result {
if !result.replacements.is_empty() {
println!(
"\n{}: {} deprecated function(s)",
filepath.display(),
result.replacements.len()
);
let mut functions: Vec<_> = result.replacements.iter().collect();
functions.sort_by_key(|(name, _)| name.as_str());
for (name, info) in functions {
println!(" - {}", name);
println!(" Replacement: {}", info.replacement_expr);
if let Some(since) = &info.since {
println!(" Since: {}", since);
}
if let Some(remove_in) = &info.remove_in {
println!(" Remove in: {}", remove_in);
}
if let Some(message) = &info.message {
println!(" Message: {}", message);
}
}
}
let dep_result =
collect_deprecated_from_dependencies(&source, &module_name, 5)?;
all_deprecated.extend(result.replacements);
all_deprecated.extend(dep_result.replacements);
} else {
return Err(anyhow::anyhow!(
"Expected collection result from UnifiedVisitor"
));
}
}
println!("\n=== Summary ===");
println!("Total files analyzed: {}", total_files);
println!("Total deprecated functions found: {}", all_deprecated.len());
if !all_deprecated.is_empty() {
println!("\n=== All deprecated functions ===");
let mut functions: Vec<_> = all_deprecated.iter().collect();
functions.sort_by_key(|(name, _)| name.as_str());
for (name, info) in functions {
println!("\n{}", name);
println!(" Replacement: {}", info.replacement_expr);
if let Some(since) = &info.since {
println!(" Since: {}", since);
}
if let Some(remove_in) = &info.remove_in {
println!(" Remove in: {}", remove_in);
}
if let Some(message) = &info.message {
println!(" Message: {}", message);
}
if !info.parameters.is_empty() {
println!(" Parameters:");
for param in &info.parameters {
print!(" - {}", param.name);
if param.has_default {
print!(" (has default");
if let Some(default) = ¶m.default_value {
print!(": {}", default);
}
print!(")");
}
if param.is_vararg {
print!(" (*args)");
}
if param.is_kwarg {
print!(" (**kwargs)");
}
if param.is_kwonly {
print!(" (keyword-only)");
}
println!();
}
}
}
}
std::process::exit(0);
}
}
}
fn migrate_file_content(
source: &str,
module_name: &str,
file_path: &Path,
type_context: &mut TypeIntrospectionContext,
) -> Result<Option<String>> {
tracing::debug!("Migrating {} ({} bytes)", module_name, source.len());
let visitor = UnifiedVisitor::new_for_collection(module_name, Some(file_path));
let unified_result = visitor.process_source(source.to_string())?;
let result = match unified_result {
UnifiedResult::Collection(collection_result) => collection_result,
_ => {
return Err(anyhow::anyhow!(
"Expected collection result from UnifiedVisitor"
))
}
};
let mut all_replacements = result.replacements;
let additional_paths = vec![std::env::current_dir()
.unwrap_or_default()
.to_string_lossy()
.to_string()];
let dep_result =
collect_deprecated_from_dependencies_with_paths(source, module_name, 5, &additional_paths)?;
let dep_count = dep_result.replacements.len();
all_replacements.extend(dep_result.replacements);
if dep_count > 0 {
tracing::debug!("Found {} deprecated functions in dependencies", dep_count);
}
if !result.unreplaceable.is_empty() {
for (name, unreplaceable_node) in &result.unreplaceable {
let construct_type =
format!("{:?}", unreplaceable_node.construct_type).replace('_', " ");
tracing::warn!(
"{} '{}' cannot be processed: {:?}{}",
construct_type,
name,
unreplaceable_node.reason,
if unreplaceable_node.message.is_empty() {
String::new()
} else {
format!(" ({})", unreplaceable_node.message)
}
);
}
}
if all_replacements.is_empty() {
return Ok(None);
}
tracing::debug!("Total replacements available: {}", all_replacements.len());
for key in all_replacements.keys() {
tracing::debug!(" Available replacement: {}", key);
}
let modified_source = migrate_ruff::migrate_file(
source,
module_name,
file_path,
type_context,
all_replacements,
dep_result.inheritance_map,
)?;
if modified_source == source {
return Ok(None);
}
Ok(Some(modified_source))
}
fn interactive_migrate_file_content(
source: &str,
module_name: &str,
file_path: &Path,
type_context: &mut TypeIntrospectionContext,
) -> Result<Option<String>> {
tracing::debug!(
"Interactively migrating {} ({} bytes)",
module_name,
source.len()
);
let visitor = UnifiedVisitor::new_for_collection(module_name, Some(file_path));
let unified_result = visitor.process_source(source.to_string())?;
let result = match unified_result {
UnifiedResult::Collection(collection_result) => collection_result,
_ => {
return Err(anyhow::anyhow!(
"Expected collection result from UnifiedVisitor"
))
}
};
let mut all_replacements = result.replacements;
let additional_paths = vec![std::env::current_dir()
.unwrap_or_default()
.to_string_lossy()
.to_string()];
let dep_result =
collect_deprecated_from_dependencies_with_paths(source, module_name, 5, &additional_paths)?;
let dep_count = dep_result.replacements.len();
all_replacements.extend(dep_result.replacements);
if dep_count > 0 {
tracing::debug!("Found {} deprecated functions in dependencies", dep_count);
}
if !result.unreplaceable.is_empty() {
for (name, unreplaceable_node) in &result.unreplaceable {
let construct_type =
format!("{:?}", unreplaceable_node.construct_type).replace('_', " ");
tracing::warn!(
"{} '{}' cannot be processed: {:?}{}",
construct_type,
name,
unreplaceable_node.reason,
if unreplaceable_node.message.is_empty() {
String::new()
} else {
format!(" ({})", unreplaceable_node.message)
}
);
}
}
if all_replacements.is_empty() {
return Ok(None);
}
tracing::debug!("Total replacements available: {}", all_replacements.len());
let modified_source = migrate_ruff::migrate_file_interactive(
source,
module_name,
file_path,
type_context,
all_replacements,
dep_result.inheritance_map,
)?;
if modified_source == source {
return Ok(None);
}
Ok(Some(modified_source))
}