use super::MutableLanguageVulnerabilityChecker;
use crate::analyzer::dependency_parser::DependencyInfo;
use crate::analyzer::tool_management::ToolDetector;
use crate::analyzer::vulnerability::{
VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity, VulnerableDependency,
};
use log::{info, warn};
use std::path::Path;
use std::process::Command;
#[derive(Default)]
pub struct PythonVulnerabilityChecker {
tool_detector: ToolDetector,
}
impl PythonVulnerabilityChecker {
pub fn new() -> Self {
Self {
tool_detector: ToolDetector::new(),
}
}
fn execute_safety_check(
&mut self,
project_path: &Path,
dependencies: &[DependencyInfo],
) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
let safety_status = self.tool_detector.detect_tool("safety");
if !safety_status.available {
warn!(
"safety not found, skipping Python vulnerability check. Install with: pip install safety"
);
return Ok(None);
}
info!("Executing safety check in {}", project_path.display());
let output = Command::new("safety")
.args(["check", "--json"])
.current_dir(project_path)
.output()
.map_err(|e| {
VulnerabilityError::CommandError(format!("Failed to run safety check: {}", e))
})?;
if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
return Err(VulnerabilityError::CommandError(format!(
"safety check failed with exit code {}: {}",
output.status.code().unwrap_or(-1),
String::from_utf8_lossy(&output.stderr)
)));
}
if output.stdout.is_empty() {
return Ok(None);
}
let audit_data: serde_json::Value =
serde_json::from_slice(&output.stdout).map_err(|e| {
VulnerabilityError::ParseError(format!(
"Failed to parse safety check output: {}",
e
))
})?;
self.parse_safety_output(&audit_data, dependencies)
}
fn parse_safety_output(
&self,
audit_data: &serde_json::Value,
dependencies: &[DependencyInfo],
) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
for vulnerability in vulns {
if let Some(vuln_obj) = vulnerability.as_object() {
let package_name = vuln_obj
.get("package_name")
.and_then(|n| n.as_str())
.unwrap_or("")
.to_string();
let package_version = vuln_obj
.get("package_version")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
let vuln_id = vuln_obj
.get("vulnerability_id")
.and_then(|i| i.as_str())
.unwrap_or("unknown")
.to_string();
let title = vuln_obj
.get("advisory")
.and_then(|a| a.as_str())
.unwrap_or("Unknown vulnerability")
.to_string();
let description = vuln_obj
.get("description")
.and_then(|d| d.as_str())
.unwrap_or("")
.to_string();
let severity =
self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
let cve = vuln_obj
.get("CVE")
.and_then(|c| c.as_str())
.map(|s| s.to_string());
let specs = vuln_obj
.get("specs")
.and_then(|s| s.as_array())
.map(|arr| {
arr.iter()
.filter_map(|s| s.as_str())
.map(|s| s.to_string())
.collect::<Vec<String>>()
})
.unwrap_or_default();
let affected_versions = if specs.is_empty() {
"*".to_string()
} else {
specs.join(", ")
};
let vuln_info = VulnerabilityInfo {
id: vuln_id,
vuln_type: "security".to_string(), severity,
title,
description,
cve,
ghsa: None, affected_versions,
patched_versions: None, published_date: None,
references: Vec::new(), };
if let Some(existing) = vulnerable_deps.iter_mut().find(|vuln_dep| {
vuln_dep.name == package_name && vuln_dep.version == package_version
}) {
existing.vulnerabilities.push(vuln_info);
} else {
vulnerable_deps.push(VulnerableDependency {
name: dep.name.clone(),
version: package_version,
language: crate::analyzer::dependency_parser::Language::Python,
vulnerabilities: vec![vuln_info],
source_dir: None,
});
}
}
}
}
}
if vulnerable_deps.is_empty() {
Ok(None)
} else {
Ok(Some(vulnerable_deps))
}
}
fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
match severity.map(|s| s.to_lowercase()).as_deref() {
Some("critical") => VulnerabilitySeverity::Critical,
Some("high") => VulnerabilitySeverity::High,
Some("medium") => VulnerabilitySeverity::Medium,
Some("low") => VulnerabilitySeverity::Low,
_ => VulnerabilitySeverity::Medium, }
}
}
impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
fn check_vulnerabilities(
&mut self,
dependencies: &[DependencyInfo],
project_path: &Path,
) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
info!("Checking Python dependencies");
match self.execute_safety_check(project_path, dependencies) {
Ok(Some(vulns)) => Ok(vulns),
Ok(None) => Ok(vec![]),
Err(e) => Err(e),
}
}
}