syncable_cli/analyzer/vulnerability/checkers/
python.rs1use super::MutableLanguageVulnerabilityChecker;
2use crate::analyzer::dependency_parser::DependencyInfo;
3use crate::analyzer::tool_management::ToolDetector;
4use crate::analyzer::vulnerability::{
5 VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity, VulnerableDependency,
6};
7use log::{info, warn};
8use std::path::Path;
9use std::process::Command;
10
11pub struct PythonVulnerabilityChecker {
12 tool_detector: ToolDetector,
13}
14
15impl PythonVulnerabilityChecker {
16 pub fn new() -> Self {
17 Self {
18 tool_detector: ToolDetector::new(),
19 }
20 }
21
22 fn execute_safety_check(
23 &mut self,
24 project_path: &Path,
25 dependencies: &[DependencyInfo],
26 ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
27 let safety_status = self.tool_detector.detect_tool("safety");
29 if !safety_status.available {
30 warn!(
31 "safety not found, skipping Python vulnerability check. Install with: pip install safety"
32 );
33 return Ok(None);
34 }
35
36 info!("Executing safety check in {}", project_path.display());
37
38 let output = Command::new("safety")
40 .args(&["check", "--json"])
41 .current_dir(project_path)
42 .output()
43 .map_err(|e| {
44 VulnerabilityError::CommandError(format!("Failed to run safety check: {}", e))
45 })?;
46
47 if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
50 return Err(VulnerabilityError::CommandError(format!(
51 "safety check failed with exit code {}: {}",
52 output.status.code().unwrap_or(-1),
53 String::from_utf8_lossy(&output.stderr)
54 )));
55 }
56
57 if output.stdout.is_empty() {
58 return Ok(None);
59 }
60
61 let audit_data: serde_json::Value =
63 serde_json::from_slice(&output.stdout).map_err(|e| {
64 VulnerabilityError::ParseError(format!(
65 "Failed to parse safety check output: {}",
66 e
67 ))
68 })?;
69
70 self.parse_safety_output(&audit_data, dependencies)
71 }
72
73 fn parse_safety_output(
74 &self,
75 audit_data: &serde_json::Value,
76 dependencies: &[DependencyInfo],
77 ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
78 let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
79
80 if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
82 for vulnerability in vulns {
83 if let Some(vuln_obj) = vulnerability.as_object() {
84 let package_name = vuln_obj
85 .get("package_name")
86 .and_then(|n| n.as_str())
87 .unwrap_or("")
88 .to_string();
89 let package_version = vuln_obj
90 .get("package_version")
91 .and_then(|v| v.as_str())
92 .unwrap_or("")
93 .to_string();
94
95 if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
97 let vuln_id = vuln_obj
98 .get("vulnerability_id")
99 .and_then(|i| i.as_str())
100 .unwrap_or("unknown")
101 .to_string();
102 let title = vuln_obj
103 .get("advisory")
104 .and_then(|a| a.as_str())
105 .unwrap_or("Unknown vulnerability")
106 .to_string();
107 let description = vuln_obj
108 .get("description")
109 .and_then(|d| d.as_str())
110 .unwrap_or("")
111 .to_string();
112 let severity =
113 self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
114 let cve = vuln_obj
115 .get("CVE")
116 .and_then(|c| c.as_str())
117 .map(|s| s.to_string());
118 let specs = vuln_obj
119 .get("specs")
120 .and_then(|s| s.as_array())
121 .map(|arr| {
122 arr.iter()
123 .filter_map(|s| s.as_str())
124 .map(|s| s.to_string())
125 .collect::<Vec<String>>()
126 })
127 .unwrap_or_default();
128 let affected_versions = if specs.is_empty() {
129 "*".to_string()
130 } else {
131 specs.join(", ")
132 };
133
134 let vuln_info = VulnerabilityInfo {
135 id: vuln_id,
136 vuln_type: "security".to_string(), severity,
138 title,
139 description,
140 cve,
141 ghsa: None, affected_versions,
143 patched_versions: None, published_date: None,
145 references: Vec::new(), };
147
148 if let Some(existing) = vulnerable_deps.iter_mut().find(|vuln_dep| {
150 vuln_dep.name == package_name && vuln_dep.version == package_version
151 }) {
152 existing.vulnerabilities.push(vuln_info);
153 } else {
154 vulnerable_deps.push(VulnerableDependency {
155 name: dep.name.clone(),
156 version: package_version,
157 language: crate::analyzer::dependency_parser::Language::Python,
158 vulnerabilities: vec![vuln_info],
159 });
160 }
161 }
162 }
163 }
164 }
165
166 if vulnerable_deps.is_empty() {
167 Ok(None)
168 } else {
169 Ok(Some(vulnerable_deps))
170 }
171 }
172
173 fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
174 match severity.map(|s| s.to_lowercase()).as_deref() {
175 Some("critical") => VulnerabilitySeverity::Critical,
176 Some("high") => VulnerabilitySeverity::High,
177 Some("medium") => VulnerabilitySeverity::Medium,
178 Some("low") => VulnerabilitySeverity::Low,
179 _ => VulnerabilitySeverity::Medium, }
181 }
182}
183
184impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
185 fn check_vulnerabilities(
186 &mut self,
187 dependencies: &[DependencyInfo],
188 project_path: &Path,
189 ) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
190 info!("Checking Python dependencies");
191
192 match self.execute_safety_check(project_path, dependencies) {
193 Ok(Some(vulns)) => Ok(vulns),
194 Ok(None) => Ok(vec![]),
195 Err(e) => Err(e),
196 }
197 }
198}