syncable_cli/analyzer/vulnerability/checkers/
python.rs1use super::MutableLanguageVulnerabilityChecker;
2use crate::analyzer::dependency_parser::DependencyInfo;
3use crate::analyzer::tool_management::ToolDetector;
4use crate::analyzer::vulnerability::{
5 VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity, VulnerableDependency,
6};
7use log::{info, warn};
8use std::path::Path;
9use std::process::Command;
10
11#[derive(Default)]
12pub struct PythonVulnerabilityChecker {
13 tool_detector: ToolDetector,
14}
15
16impl PythonVulnerabilityChecker {
17 pub fn new() -> Self {
18 Self {
19 tool_detector: ToolDetector::new(),
20 }
21 }
22
23 fn execute_safety_check(
24 &mut self,
25 project_path: &Path,
26 dependencies: &[DependencyInfo],
27 ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
28 let safety_status = self.tool_detector.detect_tool("safety");
30 if !safety_status.available {
31 warn!(
32 "safety not found, skipping Python vulnerability check. Install with: pip install safety"
33 );
34 return Ok(None);
35 }
36
37 info!("Executing safety check in {}", project_path.display());
38
39 let output = Command::new("safety")
41 .args(["check", "--json"])
42 .current_dir(project_path)
43 .output()
44 .map_err(|e| {
45 VulnerabilityError::CommandError(format!("Failed to run safety check: {}", e))
46 })?;
47
48 if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
51 return Err(VulnerabilityError::CommandError(format!(
52 "safety check failed with exit code {}: {}",
53 output.status.code().unwrap_or(-1),
54 String::from_utf8_lossy(&output.stderr)
55 )));
56 }
57
58 if output.stdout.is_empty() {
59 return Ok(None);
60 }
61
62 let audit_data: serde_json::Value =
64 serde_json::from_slice(&output.stdout).map_err(|e| {
65 VulnerabilityError::ParseError(format!(
66 "Failed to parse safety check output: {}",
67 e
68 ))
69 })?;
70
71 self.parse_safety_output(&audit_data, dependencies)
72 }
73
74 fn parse_safety_output(
75 &self,
76 audit_data: &serde_json::Value,
77 dependencies: &[DependencyInfo],
78 ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
79 let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
80
81 if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
83 for vulnerability in vulns {
84 if let Some(vuln_obj) = vulnerability.as_object() {
85 let package_name = vuln_obj
86 .get("package_name")
87 .and_then(|n| n.as_str())
88 .unwrap_or("")
89 .to_string();
90 let package_version = vuln_obj
91 .get("package_version")
92 .and_then(|v| v.as_str())
93 .unwrap_or("")
94 .to_string();
95
96 if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
98 let vuln_id = vuln_obj
99 .get("vulnerability_id")
100 .and_then(|i| i.as_str())
101 .unwrap_or("unknown")
102 .to_string();
103 let title = vuln_obj
104 .get("advisory")
105 .and_then(|a| a.as_str())
106 .unwrap_or("Unknown vulnerability")
107 .to_string();
108 let description = vuln_obj
109 .get("description")
110 .and_then(|d| d.as_str())
111 .unwrap_or("")
112 .to_string();
113 let severity =
114 self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
115 let cve = vuln_obj
116 .get("CVE")
117 .and_then(|c| c.as_str())
118 .map(|s| s.to_string());
119 let specs = vuln_obj
120 .get("specs")
121 .and_then(|s| s.as_array())
122 .map(|arr| {
123 arr.iter()
124 .filter_map(|s| s.as_str())
125 .map(|s| s.to_string())
126 .collect::<Vec<String>>()
127 })
128 .unwrap_or_default();
129 let affected_versions = if specs.is_empty() {
130 "*".to_string()
131 } else {
132 specs.join(", ")
133 };
134
135 let vuln_info = VulnerabilityInfo {
136 id: vuln_id,
137 vuln_type: "security".to_string(), severity,
139 title,
140 description,
141 cve,
142 ghsa: None, affected_versions,
144 patched_versions: None, published_date: None,
146 references: Vec::new(), };
148
149 if let Some(existing) = vulnerable_deps.iter_mut().find(|vuln_dep| {
151 vuln_dep.name == package_name && vuln_dep.version == package_version
152 }) {
153 existing.vulnerabilities.push(vuln_info);
154 } else {
155 vulnerable_deps.push(VulnerableDependency {
156 name: dep.name.clone(),
157 version: package_version,
158 language: crate::analyzer::dependency_parser::Language::Python,
159 vulnerabilities: vec![vuln_info],
160 });
161 }
162 }
163 }
164 }
165 }
166
167 if vulnerable_deps.is_empty() {
168 Ok(None)
169 } else {
170 Ok(Some(vulnerable_deps))
171 }
172 }
173
174 fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
175 match severity.map(|s| s.to_lowercase()).as_deref() {
176 Some("critical") => VulnerabilitySeverity::Critical,
177 Some("high") => VulnerabilitySeverity::High,
178 Some("medium") => VulnerabilitySeverity::Medium,
179 Some("low") => VulnerabilitySeverity::Low,
180 _ => VulnerabilitySeverity::Medium, }
182 }
183}
184
185impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
186 fn check_vulnerabilities(
187 &mut self,
188 dependencies: &[DependencyInfo],
189 project_path: &Path,
190 ) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
191 info!("Checking Python dependencies");
192
193 match self.execute_safety_check(project_path, dependencies) {
194 Ok(Some(vulns)) => Ok(vulns),
195 Ok(None) => Ok(vec![]),
196 Err(e) => Err(e),
197 }
198 }
199}