syncable_cli/analyzer/vulnerability/checkers/
python.rs

1use std::path::Path;
2use std::process::Command;
3use log::{info, warn};
4use crate::analyzer::dependency_parser::DependencyInfo;
5use crate::analyzer::tool_management::ToolDetector;
6use crate::analyzer::vulnerability::{VulnerableDependency, VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity};
7use super::MutableLanguageVulnerabilityChecker;
8
9pub struct PythonVulnerabilityChecker {
10    tool_detector: ToolDetector,
11}
12
13impl PythonVulnerabilityChecker {
14    pub fn new() -> Self {
15        Self {
16            tool_detector: ToolDetector::new(),
17        }
18    }
19    
20    fn execute_safety_check(
21        &mut self,
22        project_path: &Path,
23        dependencies: &[DependencyInfo],
24    ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
25        // Check if safety is available
26        let safety_status = self.tool_detector.detect_tool("safety");
27        if !safety_status.available {
28            warn!("safety not found, skipping Python vulnerability check. Install with: pip install safety");
29            return Ok(None);
30        }
31        
32        info!("Executing safety check in {}", project_path.display());
33        
34        // Execute safety check --json
35        let output = Command::new("safety")
36            .args(&["check", "--json"])
37            .current_dir(project_path)
38            .output()
39            .map_err(|e| VulnerabilityError::CommandError(
40                format!("Failed to run safety check: {}", e)
41            ))?;
42        
43        // Safety returns non-zero exit code when vulnerabilities found
44        // This is expected behavior, not an error
45        if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
46            return Err(VulnerabilityError::CommandError(
47                format!("safety check failed with exit code {}: {}", 
48                    output.status.code().unwrap_or(-1),
49                    String::from_utf8_lossy(&output.stderr))
50            ));
51        }
52        
53        if output.stdout.is_empty() {
54            return Ok(None);
55        }
56        
57        // Parse safety check output
58        let audit_data: serde_json::Value = serde_json::from_slice(&output.stdout)
59            .map_err(|e| VulnerabilityError::ParseError(
60                format!("Failed to parse safety check output: {}", e)
61            ))?;
62        
63        self.parse_safety_output(&audit_data, dependencies)
64    }
65    
66    fn parse_safety_output(
67        &self,
68        audit_data: &serde_json::Value,
69        dependencies: &[DependencyInfo],
70    ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
71        let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
72        
73        // Safety JSON structure parsing
74        if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
75            for vulnerability in vulns {
76                if let Some(vuln_obj) = vulnerability.as_object() {
77                    let package_name = vuln_obj.get("package_name").and_then(|n| n.as_str())
78                        .unwrap_or("").to_string();
79                    let package_version = vuln_obj.get("package_version").and_then(|v| v.as_str())
80                        .unwrap_or("").to_string();
81                    
82                    // Find matching dependency
83                    if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
84                        let vuln_id = vuln_obj.get("vulnerability_id").and_then(|i| i.as_str())
85                            .unwrap_or("unknown").to_string();
86                        let title = vuln_obj.get("advisory").and_then(|a| a.as_str())
87                            .unwrap_or("Unknown vulnerability").to_string();
88                        let description = vuln_obj.get("description").and_then(|d| d.as_str())
89                            .unwrap_or("").to_string();
90                        let severity = self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
91                        let cve = vuln_obj.get("CVE").and_then(|c| c.as_str())
92                            .map(|s| s.to_string());
93                        let specs = vuln_obj.get("specs").and_then(|s| s.as_array())
94                            .map(|arr| {
95                                arr.iter()
96                                    .filter_map(|s| s.as_str())
97                                    .map(|s| s.to_string())
98                                    .collect::<Vec<String>>()
99                            })
100                            .unwrap_or_default();
101                        let affected_versions = if specs.is_empty() {
102                            "*".to_string()
103                        } else {
104                            specs.join(", ")
105                        };
106                        
107                        let vuln_info = VulnerabilityInfo {
108                            id: vuln_id,
109                            vuln_type: "security".to_string(),  // Security vulnerability
110                            severity,
111                            title,
112                            description,
113                            cve,
114                            ghsa: None, // Safety doesn't provide GHSA
115                            affected_versions,
116                            patched_versions: None, // Safety doesn't provide this directly
117                            published_date: None,
118                            references: Vec::new(), // Safety doesn't provide references
119                        };
120                        
121                        // Check if we already have this dependency
122                        if let Some(existing) = vulnerable_deps.iter_mut()
123                            .find(|vuln_dep| vuln_dep.name == package_name && vuln_dep.version == package_version)
124                        {
125                            existing.vulnerabilities.push(vuln_info);
126                        } else {
127                            vulnerable_deps.push(VulnerableDependency {
128                                name: dep.name.clone(),
129                                version: package_version,
130                                language: crate::analyzer::dependency_parser::Language::Python,
131                                vulnerabilities: vec![vuln_info],
132                            });
133                        }
134                    }
135                }
136            }
137        }
138        
139        if vulnerable_deps.is_empty() {
140            Ok(None)
141        } else {
142            Ok(Some(vulnerable_deps))
143        }
144    }
145    
146    fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
147        match severity.map(|s| s.to_lowercase()).as_deref() {
148            Some("critical") => VulnerabilitySeverity::Critical,
149            Some("high") => VulnerabilitySeverity::High,
150            Some("medium") => VulnerabilitySeverity::Medium,
151            Some("low") => VulnerabilitySeverity::Low,
152            _ => VulnerabilitySeverity::Medium, // Default to medium if not specified
153        }
154    }
155}
156
157impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
158    fn check_vulnerabilities(
159        &mut self,
160        dependencies: &[DependencyInfo],
161        project_path: &Path,
162    ) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
163        info!("Checking Python dependencies");
164        
165        match self.execute_safety_check(project_path, dependencies) {
166            Ok(Some(vulns)) => Ok(vulns),
167            Ok(None) => Ok(vec![]),
168            Err(e) => Err(e),
169        }
170    }
171}