use itertools::Itertools as _;
use owo_colors::OwoColorize;
use std::fmt::Write as _;
use std::path::Path;
use crate::commands::ExitStatus;
use crate::commands::diagnostics;
use crate::commands::pip::loggers::DefaultResolveLogger;
use crate::commands::pip::resolution_markers;
use crate::commands::project::default_dependency_groups;
use crate::commands::project::lock::{LockMode, LockOperation};
use crate::commands::project::lock_target::LockTarget;
use crate::commands::project::{
ProjectError, ProjectInterpreter, ScriptInterpreter, UniversalState, WorkspacePython,
};
use crate::commands::reporters::AuditReporter;
use crate::printer::Printer;
use crate::settings::{FrozenSource, LockCheck, ResolverSettings};
use anyhow::Result;
use rustc_hash::FxHashSet;
use tracing::trace;
use uv_audit::service::project_status::ProjectStatusAudit;
use uv_audit::service::{VulnerabilityServiceFormat, osv};
use uv_audit::types::{AdverseStatus, Dependency, Finding, VulnerabilityID};
use uv_cache::Cache;
use uv_client::{BaseClientBuilder, RegistryClientBuilder};
use uv_configuration::{Concurrency, DependencyGroups, ExtrasSpecification, TargetTriple};
use uv_distribution_types::{IndexCapabilities, IndexUrl};
use uv_normalize::{DefaultExtras, DefaultGroups};
use uv_preview::{Preview, PreviewFeature};
use uv_python::{PythonDownloads, PythonPreference, PythonVersion};
use uv_scripts::Pep723Script;
use uv_settings::PythonInstallMirrors;
use uv_warnings::warn_user;
use uv_workspace::{DiscoveryOptions, Workspace, WorkspaceCache};
pub(crate) async fn audit(
project_dir: &Path,
extras: ExtrasSpecification,
groups: DependencyGroups,
lock_check: LockCheck,
frozen: Option<FrozenSource>,
script: Option<Pep723Script>,
python_version: Option<PythonVersion>,
python_platform: Option<TargetTriple>,
install_mirrors: PythonInstallMirrors,
settings: ResolverSettings,
client_builder: BaseClientBuilder<'_>,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
concurrency: Concurrency,
no_config: bool,
cache: Cache,
printer: Printer,
preview: Preview,
service: VulnerabilityServiceFormat,
service_url: Option<String>,
ignore: Vec<VulnerabilityID>,
ignore_until_fixed: Vec<VulnerabilityID>,
) -> Result<ExitStatus> {
if !preview.is_enabled(PreviewFeature::Audit) {
warn_user!(
"`uv audit` is experimental and may change without warning. Pass `--preview-features {}` to disable this warning.",
PreviewFeature::Audit
);
}
let workspace_cache = WorkspaceCache::default();
let workspace;
let target = if let Some(script) = script.as_ref() {
LockTarget::Script(script)
} else {
workspace =
Workspace::discover(project_dir, &DiscoveryOptions::default(), &workspace_cache)
.await?;
LockTarget::Workspace(&workspace)
};
let default_groups = match target {
LockTarget::Workspace(workspace) => default_dependency_groups(workspace.pyproject_toml())?,
LockTarget::Script(_) => DefaultGroups::default(),
};
let groups = groups.with_defaults(default_groups);
let default_extras = match &target {
LockTarget::Workspace(_) => DefaultExtras::All,
LockTarget::Script(_) => DefaultExtras::All,
};
let extras = extras.with_defaults(default_extras);
let universal = python_version.is_none() && python_platform.is_none();
let interpreter = if frozen.is_some() && universal {
None
} else {
Some(match target {
LockTarget::Script(script) => ScriptInterpreter::discover(
script.into(),
None,
&client_builder,
python_preference,
python_downloads,
&install_mirrors,
false,
no_config,
Some(false),
&cache,
printer,
preview,
)
.await?
.into_interpreter(),
LockTarget::Workspace(workspace) => {
let workspace_python = WorkspacePython::from_request(
None,
Some(workspace),
&groups,
project_dir,
no_config,
)
.await?;
ProjectInterpreter::discover(
workspace,
&groups,
workspace_python,
&client_builder,
python_preference,
python_downloads,
&install_mirrors,
false,
Some(false),
&cache,
printer,
preview,
)
.await?
.into_interpreter()
}
})
};
let mode = if let Some(frozen_source) = frozen {
LockMode::Frozen(frozen_source.into())
} else if let LockCheck::Enabled(lock_check) = lock_check {
LockMode::Locked(interpreter.as_ref().unwrap(), lock_check)
} else if matches!(target, LockTarget::Script(_)) && !target.lock_path().is_file() {
LockMode::DryRun(interpreter.as_ref().unwrap())
} else {
LockMode::Write(interpreter.as_ref().unwrap())
};
let state = UniversalState::default();
let lock = match Box::pin(
LockOperation::new(
mode,
&settings,
&client_builder,
&state,
Box::new(DefaultResolveLogger),
&concurrency,
&cache,
&WorkspaceCache::default(),
printer,
preview,
)
.execute(target),
)
.await
{
Ok(result) => result.into_lock(),
Err(ProjectError::Operation(err)) => {
return diagnostics::OperationDiagnostic::with_system_certs(
client_builder.system_certs(),
)
.report(err)
.map_or(Ok(ExitStatus::Failure), |err| Err(err.into()));
}
Err(err) => return Err(err.into()),
};
let _markers = (!universal).then(|| {
resolution_markers(
python_version.as_ref(),
python_platform.as_ref(),
interpreter.as_ref().unwrap(),
)
});
let auditable = lock.auditable(&extras, &groups);
let mut projects = auditable.projects(target.install_path())?;
let flat_index_urls: FxHashSet<&IndexUrl> = settings
.index_locations
.flat_indexes()
.map(|index| &index.url)
.collect();
projects.retain(|(_, url)| !flat_index_urls.contains(url));
let reporter = AuditReporter::from(printer);
let dependencies: Vec<Dependency> = auditable
.packages()
.map(|(name, version)| Dependency::new(name.clone(), version.clone()))
.collect();
let base_client = client_builder.clone().build()?;
let registry_client = RegistryClientBuilder::new(client_builder, cache.clone())
.index_locations(settings.index_locations.clone())
.keyring(settings.keyring_provider)
.build()?;
let capabilities = IndexCapabilities::default();
let status_audit =
ProjectStatusAudit::new(®istry_client, &capabilities, concurrency.clone());
let osv_future = async {
match service {
VulnerabilityServiceFormat::Osv => {
let osv_url = service_url
.as_deref()
.unwrap_or(osv::API_BASE)
.parse()
.expect("invalid OSV service URL");
let client = base_client.for_host(&osv_url).raw_client().clone();
let service = osv::Osv::new(client, Some(osv_url), concurrency);
trace!("Auditing {n} dependencies against OSV", n = auditable.len());
service.query_batch(&dependencies, osv::Filter::All).await
}
}
};
let status_future = async {
trace!(
"Auditing {n} projects for adverse status",
n = projects.len()
);
status_audit.query_batch(&projects).await
};
let (osv_findings, status_findings) = tokio::join!(osv_future, status_future);
let mut all_findings = osv_findings?;
all_findings.extend(status_findings);
reporter.on_audit_complete();
let mut matched_ignores: FxHashSet<&VulnerabilityID> = FxHashSet::default();
let all_findings: Vec<_> = all_findings
.into_iter()
.filter(|finding| match finding {
Finding::Vulnerability(vulnerability) => {
if let Some(id) = ignore.iter().find(|id| vulnerability.matches(id)) {
matched_ignores.insert(id);
return false;
}
if let Some(id) = ignore_until_fixed
.iter()
.find(|id| vulnerability.matches(id))
{
matched_ignores.insert(id);
if vulnerability.fix_versions.is_empty() {
return false;
}
}
true
}
Finding::ProjectStatus(_) => true,
})
.collect();
for id in ignore.iter().chain(ignore_until_fixed.iter()) {
if !matched_ignores.contains(id) {
warn_user!(
"Ignored vulnerability `{}` does not match any vulnerability in the project",
id.as_str()
);
}
}
let display = AuditResults {
printer,
n_packages: auditable.len(),
findings: all_findings,
};
display.render()
}
struct AuditResults {
printer: Printer,
n_packages: usize,
findings: Vec<Finding>,
}
impl AuditResults {
fn render(&self) -> Result<ExitStatus> {
let (vulns, statuses): (Vec<_>, Vec<_>) =
self.findings.iter().partition_map(|finding| match finding {
Finding::Vulnerability(vuln) => itertools::Either::Left(vuln),
Finding::ProjectStatus(status) => itertools::Either::Right(status),
});
let vuln_banner = if !vulns.is_empty() {
let s = if vulns.len() == 1 { "y" } else { "ies" };
format!("{} known vulnerabilit{s}", vulns.len())
.yellow()
.to_string()
} else {
"no known vulnerabilities".bold().to_string()
};
let status_banner = if !statuses.is_empty() {
let s = if statuses.len() == 1 { "" } else { "es" };
format!(
"{} adverse project status{}",
statuses.len().to_string().yellow(),
s
)
} else {
"no adverse project statuses".bold().to_string()
};
writeln!(
self.printer.stderr(),
"Found {vuln_banner} and {status_banner} in {packages}",
packages = format!(
"{npackages} {label}",
npackages = self.n_packages,
label = if self.n_packages == 1 {
"package"
} else {
"packages"
}
)
.bold()
)?;
let has_vulnerabilities = !vulns.is_empty();
if !vulns.is_empty() {
writeln!(self.printer.stdout_important(), "\nVulnerabilities:\n")?;
let groups = vulns
.into_iter()
.chunk_by(|vuln| (vuln.dependency.name(), vuln.dependency.version()));
for (dependency, vulns) in &groups {
let vulns: Vec<_> = vulns.collect();
let (name, version) = dependency;
writeln!(
self.printer.stdout_important(),
"{name_version} has {n} known vulnerabilit{ies}:\n",
name_version = format!("{name} {version}").bold(),
n = vulns.len(),
ies = if vulns.len() == 1 { "y" } else { "ies" },
)?;
for vuln in vulns {
writeln!(
self.printer.stdout_important(),
"- {id}: {description}",
id = vuln.best_id().as_str().bold(),
description = vuln.summary.as_deref().unwrap_or("No summary provided"),
)?;
if vuln.fix_versions.is_empty() {
writeln!(
self.printer.stdout_important(),
"\n No fix versions available\n"
)?;
} else {
writeln!(
self.printer.stdout_important(),
"\n Fixed in: {}\n",
vuln.fix_versions
.iter()
.map(std::string::ToString::to_string)
.join(", ")
.blue()
)?;
}
if let Some(link) = &vuln.link {
writeln!(
self.printer.stdout_important(),
" Advisory information: {link}\n",
link = link.as_str().blue()
)?;
}
}
}
}
if !statuses.is_empty() {
writeln!(self.printer.stdout_important(), "\nAdverse statuses:\n")?;
for status in statuses {
let label = match status.status {
AdverseStatus::Archived => "archived".yellow().to_string(),
AdverseStatus::Deprecated => "deprecated".yellow().to_string(),
AdverseStatus::Quarantined => "quarantined".red().to_string(),
};
let name = status.name.bold();
if let Some(reason) = &status.reason {
writeln!(
self.printer.stdout_important(),
"- {name} is {label}: {reason}"
)?;
} else {
writeln!(self.printer.stdout_important(), "- {name} is {label}")?;
}
}
}
if has_vulnerabilities {
Ok(ExitStatus::Failure)
} else {
Ok(ExitStatus::Success)
}
}
}