mod adapters;
mod application;
mod cli;
mod i18n;
mod ports;
mod sbom_generation;
mod shared;
use adapters::outbound::console::StderrProgressReporter;
use adapters::outbound::filesystem::FileSystemReader;
use adapters::outbound::network::{CachingPyPiLicenseRepository, OsvClient, PyPiLicenseRepository};
use application::dto::{OutputFormat, SbomRequest};
use application::factories::{FormatterFactory, PresenterFactory, PresenterType};
use application::read_models::SbomReadModelBuilder;
use application::use_cases::GenerateSbomUseCase;
use clap::Parser;
use cli::Args;
use i18n::Messages;
use owo_colors::OwoColorize;
use ports::outbound::ProjectConfigReader;
use sbom_generation::domain::license_policy::{LicensePolicy, UnknownLicenseHandling};
use sbom_generation::domain::vulnerability::Severity;
use shared::error::ExitCode;
use shared::security::validate_directory_path;
use shared::Result;
use std::collections::HashSet;
use std::path::PathBuf;
use std::process;
use uv_sbom::config::{self, ConfigFile, IgnoreCve};
#[tokio::main]
async fn main() {
let args = match Args::try_parse() {
Ok(args) => args,
Err(e) => {
let _ = e.print();
let exit_code = if e.use_stderr() {
ExitCode::InvalidArguments
} else {
ExitCode::Success
};
process::exit(exit_code.as_i32());
}
};
if args.init {
let dir = args.path.as_deref().unwrap_or(".");
let dir_path = std::path::Path::new(dir);
match config::generate_config_template(dir_path) {
Ok(abs_path) => {
eprintln!(
"Created {} in {}",
config::CONFIG_FILENAME,
abs_path.parent().unwrap_or(dir_path).display()
);
process::exit(ExitCode::Success.as_i32());
}
Err(e) => {
eprintln!("Error: {}", e);
process::exit(ExitCode::ApplicationError.as_i32());
}
}
}
match run(args).await {
Ok(has_vulnerabilities) => {
if has_vulnerabilities {
process::exit(ExitCode::VulnerabilitiesDetected.as_i32());
}
process::exit(ExitCode::Success.as_i32());
}
Err(e) => {
eprintln!("\n❌ An error occurred:\n");
eprintln!("{}", e);
let mut source = e.source();
while let Some(err) = source {
eprintln!("\nCaused by: {}", err);
source = err.source();
}
eprintln!();
process::exit(ExitCode::ApplicationError.as_i32());
}
}
}
async fn run(args: Args) -> Result<bool> {
display_banner();
let locale = args.lang;
let msgs = Messages::for_locale(locale);
if args.check_cve {
eprintln!("Warning: --check-cve is deprecated and will be removed in a future release. CVE checking is now enabled by default. Use --no-check-cve to opt out.");
}
if !args.no_check_cve && args.format == OutputFormat::Json {
eprintln!("{}", msgs.warn_check_cve_no_effect);
eprintln!(" Vulnerability data is not included in JSON output.");
eprintln!(" Use --format markdown to see vulnerability report.");
eprintln!();
}
if args.check_license && args.format == OutputFormat::Json {
eprintln!("{}", msgs.warn_check_license_no_effect);
eprintln!(" License compliance data is not included in JSON output.");
eprintln!(" Use --format markdown to see license compliance report.");
eprintln!();
}
if args.verify_links && args.format == OutputFormat::Json {
eprintln!("{}", msgs.warn_verify_links_no_effect);
eprintln!(" PyPI link verification only applies to Markdown output.");
eprintln!(" Use --format markdown to use link verification.");
eprintln!();
}
let project_dir = args.path.as_deref().unwrap_or(".");
let project_path = PathBuf::from(project_dir);
validate_project_path(&project_path)?;
let config = load_config(&args, &project_path)?;
let merged = merge_config(&args, &config);
let lockfile_reader = FileSystemReader::new();
let project_config_reader = FileSystemReader::new();
let pypi_repository = PyPiLicenseRepository::new()?;
let license_repository = CachingPyPiLicenseRepository::new(pypi_repository);
let progress_reporter = StderrProgressReporter::new(locale);
let vulnerability_repository = if merged.check_cve {
Some(OsvClient::new()?)
} else {
None
};
let use_case = GenerateSbomUseCase::new(
lockfile_reader,
project_config_reader,
license_repository,
progress_reporter,
vulnerability_repository,
locale,
);
let suggest_fix = resolve_suggest_fix(merged.suggest_fix, &project_path);
let include_dependency_info = matches!(merged.format, OutputFormat::Markdown);
let request = SbomRequest::builder()
.project_path(project_path.clone())
.include_dependency_info(include_dependency_info)
.exclude_patterns(merged.exclude_patterns)
.dry_run(args.dry_run)
.check_cve(merged.check_cve)
.severity_threshold_opt(merged.severity_threshold)
.cvss_threshold_opt(merged.cvss_threshold)
.ignore_cves(merged.ignore_cves)
.check_license(merged.check_license)
.license_policy(merged.license_policy)
.suggest_fix(suggest_fix)
.locale(locale)
.build()?;
let locale = request.locale;
let response = use_case.execute(request).await?;
if args.dry_run {
return Ok(false);
}
eprintln!(
"{}",
FormatterFactory::progress_message(merged.format, locale)
);
let project_reader = FileSystemReader::new();
let project_component_info = project_reader
.read_project_name(&project_path)
.ok()
.and_then(|name| {
let version = response
.enriched_packages
.iter()
.find(|ep| ep.package.name() == name)
.map(|ep| ep.package.version().to_string());
version.map(|v| (name, v))
});
let read_model = SbomReadModelBuilder::build_with_project(
response.enriched_packages,
&response.metadata,
response.dependency_graph.as_ref(),
response.vulnerability_check_result.as_ref(),
response.license_compliance_result.as_ref(),
project_component_info
.as_ref()
.map(|(n, v)| (n.as_str(), v.as_str())),
response.upgrade_recommendations.as_deref(),
);
let verified_packages = if args.verify_links && merged.format == OutputFormat::Markdown {
eprintln!("{}", msgs.progress_verifying_links);
let pypi_verifier = PyPiLicenseRepository::new()?;
let package_names: Vec<String> = read_model
.components
.iter()
.map(|c| c.name.clone())
.collect();
Some(pypi_verifier.verify_packages(&package_names).await)
} else {
None
};
let formatter = FormatterFactory::create(merged.format, verified_packages, locale);
let formatted_output = formatter.format(&read_model)?;
let presenter_type = if let Some(output_path) = args.output {
PresenterType::File(PathBuf::from(output_path))
} else {
PresenterType::Stdout
};
let presenter = PresenterFactory::create(presenter_type);
presenter.present(&formatted_output)?;
let has_issues =
response.has_vulnerabilities_above_threshold || response.has_license_violations;
Ok(has_issues)
}
fn display_banner() {
let version = env!("CARGO_PKG_VERSION");
eprintln!(
"{} {} {}",
"🚀".bright_yellow(),
"uv-sbom".bright_cyan().bold(),
format!("v{}", version).bright_green()
);
eprintln!();
}
struct MergedConfig {
format: OutputFormat,
exclude_patterns: Vec<String>,
check_cve: bool,
severity_threshold: Option<Severity>,
cvss_threshold: Option<f32>,
ignore_cves: Vec<IgnoreCve>,
check_license: bool,
license_policy: Option<LicensePolicy>,
suggest_fix: bool,
}
fn load_config(args: &Args, project_path: &std::path::Path) -> Result<Option<ConfigFile>> {
if let Some(ref config_path) = args.config {
let path = std::path::Path::new(config_path);
let cfg = config::load_config_from_path(path)?;
eprintln!("📄 Loaded config from: {}", path.display());
Ok(Some(cfg))
} else {
let cfg = config::discover_config(project_path)?;
if cfg.is_some() {
eprintln!("📄 Auto-discovered config file in project directory.");
}
Ok(cfg)
}
}
fn merge_config(args: &Args, config: &Option<ConfigFile>) -> MergedConfig {
let config = match config {
Some(c) => c,
None => {
let license_policy = if args.check_license
&& (!args.license_allow.is_empty() || !args.license_deny.is_empty())
{
Some(LicensePolicy::new(
&args.license_allow,
&args.license_deny,
UnknownLicenseHandling::default(),
))
} else if args.check_license {
Some(LicensePolicy::new(
&[],
&[],
UnknownLicenseHandling::default(),
))
} else {
None
};
return MergedConfig {
format: args.format,
exclude_patterns: args.exclude.clone(),
check_cve: !args.no_check_cve,
severity_threshold: args.severity_threshold,
cvss_threshold: args.cvss_threshold,
ignore_cves: args
.ignore_cve
.iter()
.map(|id| IgnoreCve {
id: id.clone(),
reason: None,
})
.collect(),
check_license: args.check_license,
license_policy,
suggest_fix: args.suggest_fix,
};
}
};
let exclude_patterns = merge_string_lists(&args.exclude, &config.exclude_packages);
let cli_ignore_cves: Vec<IgnoreCve> = args
.ignore_cve
.iter()
.map(|id| IgnoreCve {
id: id.clone(),
reason: None,
})
.collect();
let ignore_cves = merge_ignore_cves(&cli_ignore_cves, &config.ignore_cves);
let format = if let Some(ref config_format) = config.format {
if args.format != OutputFormat::Json {
args.format
} else {
config_format.parse::<OutputFormat>().unwrap_or(args.format)
}
} else {
args.format
};
let check_cve = if args.no_check_cve {
false
} else {
config.check_cve.unwrap_or(true)
};
let severity_threshold = args.severity_threshold.or_else(|| {
config
.severity_threshold
.as_ref()
.and_then(|s| match s.to_lowercase().as_str() {
"low" => Some(Severity::Low),
"medium" => Some(Severity::Medium),
"high" => Some(Severity::High),
"critical" => Some(Severity::Critical),
_ => None,
})
});
let cvss_threshold = args
.cvss_threshold
.or(config.cvss_threshold.map(|v| v as f32));
let check_license = args.check_license || config.check_license.unwrap_or(false);
let license_policy = if check_license {
if !args.license_allow.is_empty() || !args.license_deny.is_empty() {
Some(LicensePolicy::new(
&args.license_allow,
&args.license_deny,
UnknownLicenseHandling::default(),
))
} else if let Some(ref lp_config) = config.license_policy {
let unknown = lp_config
.unknown
.as_ref()
.map(|s| match s.to_lowercase().as_str() {
"deny" => UnknownLicenseHandling::Deny,
"allow" => UnknownLicenseHandling::Allow,
_ => UnknownLicenseHandling::Warn,
})
.unwrap_or_default();
let allow = lp_config.allow.clone().unwrap_or_default();
let deny = lp_config.deny.clone().unwrap_or_default();
Some(LicensePolicy::new(&allow, &deny, unknown))
} else {
Some(LicensePolicy::new(
&[],
&[],
UnknownLicenseHandling::default(),
))
}
} else {
None
};
let suggest_fix = args.suggest_fix || config.suggest_fix.unwrap_or(false);
MergedConfig {
format,
exclude_patterns,
check_cve,
severity_threshold,
cvss_threshold,
ignore_cves,
check_license,
license_policy,
suggest_fix,
}
}
fn merge_string_lists(cli: &[String], config: &Option<Vec<String>>) -> Vec<String> {
let mut seen = HashSet::new();
let mut result = Vec::new();
for item in cli {
if seen.insert(item.clone()) {
result.push(item.clone());
}
}
if let Some(config_items) = config {
for item in config_items {
if seen.insert(item.clone()) {
result.push(item.clone());
}
}
}
result
}
fn merge_ignore_cves(cli: &[IgnoreCve], config: &Option<Vec<IgnoreCve>>) -> Vec<IgnoreCve> {
let mut seen = HashSet::new();
let mut result = Vec::new();
for cve in cli {
if seen.insert(cve.id.clone()) {
result.push(cve.clone());
}
}
if let Some(config_cves) = config {
for cve in config_cves {
if seen.insert(cve.id.clone()) {
result.push(cve.clone());
}
}
}
result
}
fn resolve_suggest_fix(suggest_fix: bool, project_path: &std::path::Path) -> bool {
if !suggest_fix {
return false;
}
let uv_available = std::process::Command::new("uv")
.arg("--version")
.output()
.map(|o| o.status.success())
.unwrap_or(false);
if !uv_available {
eprintln!(
"⚠ --suggest-fix requires `uv` CLI. \
Install it with: curl -LsSf https://astral.sh/uv/install.sh | sh"
);
return false;
}
if !project_path.join("pyproject.toml").exists() {
eprintln!("⚠ --suggest-fix requires pyproject.toml in the project directory.");
return false;
}
true
}
fn validate_project_path(path: &std::path::Path) -> Result<()> {
validate_directory_path(path)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[test]
fn test_validate_project_path_valid_directory() {
let temp_dir = TempDir::new().unwrap();
let result = validate_project_path(temp_dir.path());
assert!(result.is_ok());
}
#[test]
fn test_validate_project_path_nonexistent() {
let nonexistent_path = PathBuf::from("/nonexistent/path/that/does/not/exist");
let result = validate_project_path(&nonexistent_path);
assert!(result.is_err());
let err = result.unwrap_err();
let err_string = format!("{}", err);
assert!(err_string.contains("Directory does not exist"));
}
#[test]
fn test_validate_project_path_file_not_directory() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test_file.txt");
fs::write(&file_path, "test content").unwrap();
let result = validate_project_path(&file_path);
assert!(result.is_err());
let err = result.unwrap_err();
let err_string = format!("{}", err);
assert!(err_string.contains("Not a directory"));
}
#[test]
fn test_validate_project_path_current_directory() {
let current_dir = std::env::current_dir().unwrap();
let result = validate_project_path(¤t_dir);
assert!(result.is_ok());
}
#[test]
fn test_merge_string_lists_both_empty() {
let result = merge_string_lists(&[], &None);
assert!(result.is_empty());
}
#[test]
fn test_merge_string_lists_cli_only() {
let cli = vec!["a".to_string(), "b".to_string()];
let result = merge_string_lists(&cli, &None);
assert_eq!(result, vec!["a", "b"]);
}
#[test]
fn test_merge_string_lists_config_only() {
let config = Some(vec!["x".to_string(), "y".to_string()]);
let result = merge_string_lists(&[], &config);
assert_eq!(result, vec!["x", "y"]);
}
#[test]
fn test_merge_string_lists_deduplication() {
let cli = vec!["a".to_string(), "b".to_string()];
let config = Some(vec!["b".to_string(), "c".to_string()]);
let result = merge_string_lists(&cli, &config);
assert_eq!(result, vec!["a", "b", "c"]);
}
#[test]
fn test_merge_ignore_cves_both_empty() {
let result = merge_ignore_cves(&[], &None);
assert!(result.is_empty());
}
#[test]
fn test_merge_ignore_cves_cli_only() {
let cli = vec![IgnoreCve {
id: "CVE-2024-1".to_string(),
reason: None,
}];
let result = merge_ignore_cves(&cli, &None);
assert_eq!(result.len(), 1);
assert_eq!(result[0].id, "CVE-2024-1");
}
#[test]
fn test_merge_ignore_cves_config_only() {
let config = Some(vec![IgnoreCve {
id: "CVE-2024-2".to_string(),
reason: Some("reason".to_string()),
}]);
let result = merge_ignore_cves(&[], &config);
assert_eq!(result.len(), 1);
assert_eq!(result[0].id, "CVE-2024-2");
assert_eq!(result[0].reason.as_deref(), Some("reason"));
}
#[test]
fn test_merge_ignore_cves_deduplication_cli_wins() {
let cli = vec![IgnoreCve {
id: "CVE-2024-1".to_string(),
reason: Some("cli reason".to_string()),
}];
let config = Some(vec![
IgnoreCve {
id: "CVE-2024-1".to_string(),
reason: Some("config reason".to_string()),
},
IgnoreCve {
id: "CVE-2024-2".to_string(),
reason: None,
},
]);
let result = merge_ignore_cves(&cli, &config);
assert_eq!(result.len(), 2);
assert_eq!(result[0].id, "CVE-2024-1");
assert_eq!(result[0].reason.as_deref(), Some("cli reason"));
assert_eq!(result[1].id, "CVE-2024-2");
}
#[test]
fn test_merge_config_no_config_file() {
let args = Args::parse_from(["uv-sbom"]);
let result = merge_config(&args, &None);
assert_eq!(result.format, OutputFormat::Json);
assert!(result.exclude_patterns.is_empty());
assert!(result.check_cve); assert!(result.severity_threshold.is_none());
assert!(result.cvss_threshold.is_none());
assert!(result.ignore_cves.is_empty());
}
#[test]
fn test_merge_config_config_provides_defaults() {
let args = Args::parse_from(["uv-sbom"]);
let config = Some(ConfigFile {
format: Some("markdown".to_string()),
exclude_packages: Some(vec!["pkg-a".to_string()]),
check_cve: Some(true),
severity_threshold: Some("high".to_string()),
cvss_threshold: Some(7.0),
ignore_cves: Some(vec![IgnoreCve {
id: "CVE-2024-1".to_string(),
reason: Some("not applicable".to_string()),
}]),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.format, OutputFormat::Markdown);
assert_eq!(result.exclude_patterns, vec!["pkg-a"]);
assert!(result.check_cve);
assert_eq!(result.severity_threshold, Some(Severity::High));
assert_eq!(result.cvss_threshold, Some(7.0));
assert_eq!(result.ignore_cves.len(), 1);
assert_eq!(result.ignore_cves[0].id, "CVE-2024-1");
}
#[test]
fn test_merge_config_cli_overrides_format() {
let args = Args::parse_from(["uv-sbom", "--format", "markdown"]);
let config = Some(ConfigFile {
format: Some("json".to_string()),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.format, OutputFormat::Markdown);
}
#[test]
fn test_merge_config_no_check_cve_cli_flag() {
let args = Args::parse_from(["uv-sbom", "--no-check-cve"]);
let config = Some(ConfigFile {
check_cve: Some(true),
..Default::default()
});
let result = merge_config(&args, &config);
assert!(!result.check_cve);
}
#[test]
fn test_merge_config_no_check_cve_overrides_config() {
let args = Args::parse_from(["uv-sbom", "--no-check-cve"]);
let config = Some(ConfigFile {
check_cve: Some(true),
..Default::default()
});
let result = merge_config(&args, &config);
assert!(!result.check_cve);
}
#[test]
fn test_merge_config_check_cve_from_config() {
let args = Args::parse_from(["uv-sbom"]);
let config = Some(ConfigFile {
check_cve: Some(true),
..Default::default()
});
let result = merge_config(&args, &config);
assert!(result.check_cve);
}
#[test]
fn test_merge_config_exclude_patterns_merged() {
let args = Args::parse_from(["uv-sbom", "-e", "cli-pkg"]);
let config = Some(ConfigFile {
exclude_packages: Some(vec!["config-pkg".to_string(), "cli-pkg".to_string()]),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.exclude_patterns, vec!["cli-pkg", "config-pkg"]);
}
#[test]
fn test_merge_config_ignore_cves_merged() {
let args = Args::parse_from(["uv-sbom", "-i", "CVE-2024-1"]);
let config = Some(ConfigFile {
ignore_cves: Some(vec![
IgnoreCve {
id: "CVE-2024-1".to_string(),
reason: Some("config reason".to_string()),
},
IgnoreCve {
id: "CVE-2024-2".to_string(),
reason: None,
},
]),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.ignore_cves.len(), 2);
assert_eq!(result.ignore_cves[0].id, "CVE-2024-1");
assert!(result.ignore_cves[0].reason.is_none());
assert_eq!(result.ignore_cves[1].id, "CVE-2024-2");
}
#[test]
fn test_merge_config_severity_threshold_cli_wins() {
let args = Args::parse_from(["uv-sbom", "--severity-threshold", "critical"]);
let config = Some(ConfigFile {
severity_threshold: Some("low".to_string()),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.severity_threshold, Some(Severity::Critical));
}
#[test]
fn test_merge_config_severity_threshold_from_config() {
let args = Args::parse_from(["uv-sbom"]);
let config = Some(ConfigFile {
severity_threshold: Some("medium".to_string()),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.severity_threshold, Some(Severity::Medium));
}
#[test]
fn test_merge_config_cvss_threshold_cli_wins() {
let args = Args::parse_from(["uv-sbom", "--cvss-threshold", "8.5"]);
let config = Some(ConfigFile {
cvss_threshold: Some(5.0),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.cvss_threshold, Some(8.5));
}
#[test]
fn test_merge_config_cvss_threshold_from_config() {
let args = Args::parse_from(["uv-sbom"]);
let config = Some(ConfigFile {
cvss_threshold: Some(6.0),
..Default::default()
});
let result = merge_config(&args, &config);
assert_eq!(result.cvss_threshold, Some(6.0));
}
#[test]
fn test_merge_config_suggest_fix_from_config() {
let args = Args::parse_from(["uv-sbom"]);
let config = Some(ConfigFile {
suggest_fix: Some(true),
..Default::default()
});
let result = merge_config(&args, &config);
assert!(result.suggest_fix);
}
#[test]
fn test_merge_config_suggest_fix_cli_flag() {
let args = Args::parse_from(["uv-sbom", "--suggest-fix"]);
let config = Some(ConfigFile {
suggest_fix: Some(true),
..Default::default()
});
let result = merge_config(&args, &config);
assert!(result.suggest_fix);
}
#[test]
fn test_merge_config_suggest_fix_cli_wins_over_config_false() {
let args = Args::parse_from(["uv-sbom", "--suggest-fix"]);
let config = Some(ConfigFile {
suggest_fix: Some(false),
..Default::default()
});
let result = merge_config(&args, &config);
assert!(result.suggest_fix);
}
#[test]
fn test_merge_config_suggest_fix_default_false() {
let args = Args::parse_from(["uv-sbom"]);
let config = Some(ConfigFile {
..Default::default()
});
let result = merge_config(&args, &config);
assert!(!result.suggest_fix);
}
#[test]
fn test_resolve_suggest_fix_disabled() {
let temp_dir = TempDir::new().unwrap();
assert!(!resolve_suggest_fix(false, temp_dir.path()));
}
#[test]
fn test_resolve_suggest_fix_missing_pyproject_toml() {
let temp_dir = TempDir::new().unwrap();
let uv_available = std::process::Command::new("uv")
.arg("--version")
.output()
.map(|o| o.status.success())
.unwrap_or(false);
if uv_available {
assert!(!resolve_suggest_fix(true, temp_dir.path()));
}
}
#[test]
fn test_resolve_suggest_fix_with_pyproject_toml() {
let temp_dir = TempDir::new().unwrap();
fs::write(
temp_dir.path().join("pyproject.toml"),
"[project]\nname = \"test\"\n",
)
.unwrap();
let uv_available = std::process::Command::new("uv")
.arg("--version")
.output()
.map(|o| o.status.success())
.unwrap_or(false);
assert_eq!(resolve_suggest_fix(true, temp_dir.path()), uv_available);
}
}