syncable_cli/analyzer/vulnerability/checkers/
python.rs1use std::path::Path;
2use std::process::Command;
3use log::{info, warn};
4use crate::analyzer::dependency_parser::DependencyInfo;
5use crate::analyzer::tool_management::ToolDetector;
6use crate::analyzer::vulnerability::{VulnerableDependency, VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity};
7use super::MutableLanguageVulnerabilityChecker;
8
9pub struct PythonVulnerabilityChecker {
10 tool_detector: ToolDetector,
11}
12
13impl PythonVulnerabilityChecker {
14 pub fn new() -> Self {
15 Self {
16 tool_detector: ToolDetector::new(),
17 }
18 }
19
20 fn execute_safety_check(
21 &mut self,
22 project_path: &Path,
23 dependencies: &[DependencyInfo],
24 ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
25 let safety_status = self.tool_detector.detect_tool("safety");
27 if !safety_status.available {
28 warn!("safety not found, skipping Python vulnerability check. Install with: pip install safety");
29 return Ok(None);
30 }
31
32 info!("Executing safety check in {}", project_path.display());
33
34 let output = Command::new("safety")
36 .args(&["check", "--json"])
37 .current_dir(project_path)
38 .output()
39 .map_err(|e| VulnerabilityError::CommandError(
40 format!("Failed to run safety check: {}", e)
41 ))?;
42
43 if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
46 return Err(VulnerabilityError::CommandError(
47 format!("safety check failed with exit code {}: {}",
48 output.status.code().unwrap_or(-1),
49 String::from_utf8_lossy(&output.stderr))
50 ));
51 }
52
53 if output.stdout.is_empty() {
54 return Ok(None);
55 }
56
57 let audit_data: serde_json::Value = serde_json::from_slice(&output.stdout)
59 .map_err(|e| VulnerabilityError::ParseError(
60 format!("Failed to parse safety check output: {}", e)
61 ))?;
62
63 self.parse_safety_output(&audit_data, dependencies)
64 }
65
66 fn parse_safety_output(
67 &self,
68 audit_data: &serde_json::Value,
69 dependencies: &[DependencyInfo],
70 ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
71 let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
72
73 if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
75 for vulnerability in vulns {
76 if let Some(vuln_obj) = vulnerability.as_object() {
77 let package_name = vuln_obj.get("package_name").and_then(|n| n.as_str())
78 .unwrap_or("").to_string();
79 let package_version = vuln_obj.get("package_version").and_then(|v| v.as_str())
80 .unwrap_or("").to_string();
81
82 if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
84 let vuln_id = vuln_obj.get("vulnerability_id").and_then(|i| i.as_str())
85 .unwrap_or("unknown").to_string();
86 let title = vuln_obj.get("advisory").and_then(|a| a.as_str())
87 .unwrap_or("Unknown vulnerability").to_string();
88 let description = vuln_obj.get("description").and_then(|d| d.as_str())
89 .unwrap_or("").to_string();
90 let severity = self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
91 let cve = vuln_obj.get("CVE").and_then(|c| c.as_str())
92 .map(|s| s.to_string());
93 let specs = vuln_obj.get("specs").and_then(|s| s.as_array())
94 .map(|arr| {
95 arr.iter()
96 .filter_map(|s| s.as_str())
97 .map(|s| s.to_string())
98 .collect::<Vec<String>>()
99 })
100 .unwrap_or_default();
101 let affected_versions = if specs.is_empty() {
102 "*".to_string()
103 } else {
104 specs.join(", ")
105 };
106
107 let vuln_info = VulnerabilityInfo {
108 id: vuln_id,
109 vuln_type: "security".to_string(), severity,
111 title,
112 description,
113 cve,
114 ghsa: None, affected_versions,
116 patched_versions: None, published_date: None,
118 references: Vec::new(), };
120
121 if let Some(existing) = vulnerable_deps.iter_mut()
123 .find(|vuln_dep| vuln_dep.name == package_name && vuln_dep.version == package_version)
124 {
125 existing.vulnerabilities.push(vuln_info);
126 } else {
127 vulnerable_deps.push(VulnerableDependency {
128 name: dep.name.clone(),
129 version: package_version,
130 language: crate::analyzer::dependency_parser::Language::Python,
131 vulnerabilities: vec![vuln_info],
132 });
133 }
134 }
135 }
136 }
137 }
138
139 if vulnerable_deps.is_empty() {
140 Ok(None)
141 } else {
142 Ok(Some(vulnerable_deps))
143 }
144 }
145
146 fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
147 match severity.map(|s| s.to_lowercase()).as_deref() {
148 Some("critical") => VulnerabilitySeverity::Critical,
149 Some("high") => VulnerabilitySeverity::High,
150 Some("medium") => VulnerabilitySeverity::Medium,
151 Some("low") => VulnerabilitySeverity::Low,
152 _ => VulnerabilitySeverity::Medium, }
154 }
155}
156
157impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
158 fn check_vulnerabilities(
159 &mut self,
160 dependencies: &[DependencyInfo],
161 project_path: &Path,
162 ) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
163 info!("Checking Python dependencies");
164
165 match self.execute_safety_check(project_path, dependencies) {
166 Ok(Some(vulns)) => Ok(vulns),
167 Ok(None) => Ok(vec![]),
168 Err(e) => Err(e),
169 }
170 }
171}