syncable_cli/analyzer/vulnerability/checkers/
python.rs

1use super::MutableLanguageVulnerabilityChecker;
2use crate::analyzer::dependency_parser::DependencyInfo;
3use crate::analyzer::tool_management::ToolDetector;
4use crate::analyzer::vulnerability::{
5    VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity, VulnerableDependency,
6};
7use log::{info, warn};
8use std::path::Path;
9use std::process::Command;
10
11pub struct PythonVulnerabilityChecker {
12    tool_detector: ToolDetector,
13}
14
15impl PythonVulnerabilityChecker {
16    pub fn new() -> Self {
17        Self {
18            tool_detector: ToolDetector::new(),
19        }
20    }
21
22    fn execute_safety_check(
23        &mut self,
24        project_path: &Path,
25        dependencies: &[DependencyInfo],
26    ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
27        // Check if safety is available
28        let safety_status = self.tool_detector.detect_tool("safety");
29        if !safety_status.available {
30            warn!(
31                "safety not found, skipping Python vulnerability check. Install with: pip install safety"
32            );
33            return Ok(None);
34        }
35
36        info!("Executing safety check in {}", project_path.display());
37
38        // Execute safety check --json
39        let output = Command::new("safety")
40            .args(&["check", "--json"])
41            .current_dir(project_path)
42            .output()
43            .map_err(|e| {
44                VulnerabilityError::CommandError(format!("Failed to run safety check: {}", e))
45            })?;
46
47        // Safety returns non-zero exit code when vulnerabilities found
48        // This is expected behavior, not an error
49        if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
50            return Err(VulnerabilityError::CommandError(format!(
51                "safety check failed with exit code {}: {}",
52                output.status.code().unwrap_or(-1),
53                String::from_utf8_lossy(&output.stderr)
54            )));
55        }
56
57        if output.stdout.is_empty() {
58            return Ok(None);
59        }
60
61        // Parse safety check output
62        let audit_data: serde_json::Value =
63            serde_json::from_slice(&output.stdout).map_err(|e| {
64                VulnerabilityError::ParseError(format!(
65                    "Failed to parse safety check output: {}",
66                    e
67                ))
68            })?;
69
70        self.parse_safety_output(&audit_data, dependencies)
71    }
72
73    fn parse_safety_output(
74        &self,
75        audit_data: &serde_json::Value,
76        dependencies: &[DependencyInfo],
77    ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
78        let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
79
80        // Safety JSON structure parsing
81        if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
82            for vulnerability in vulns {
83                if let Some(vuln_obj) = vulnerability.as_object() {
84                    let package_name = vuln_obj
85                        .get("package_name")
86                        .and_then(|n| n.as_str())
87                        .unwrap_or("")
88                        .to_string();
89                    let package_version = vuln_obj
90                        .get("package_version")
91                        .and_then(|v| v.as_str())
92                        .unwrap_or("")
93                        .to_string();
94
95                    // Find matching dependency
96                    if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
97                        let vuln_id = vuln_obj
98                            .get("vulnerability_id")
99                            .and_then(|i| i.as_str())
100                            .unwrap_or("unknown")
101                            .to_string();
102                        let title = vuln_obj
103                            .get("advisory")
104                            .and_then(|a| a.as_str())
105                            .unwrap_or("Unknown vulnerability")
106                            .to_string();
107                        let description = vuln_obj
108                            .get("description")
109                            .and_then(|d| d.as_str())
110                            .unwrap_or("")
111                            .to_string();
112                        let severity =
113                            self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
114                        let cve = vuln_obj
115                            .get("CVE")
116                            .and_then(|c| c.as_str())
117                            .map(|s| s.to_string());
118                        let specs = vuln_obj
119                            .get("specs")
120                            .and_then(|s| s.as_array())
121                            .map(|arr| {
122                                arr.iter()
123                                    .filter_map(|s| s.as_str())
124                                    .map(|s| s.to_string())
125                                    .collect::<Vec<String>>()
126                            })
127                            .unwrap_or_default();
128                        let affected_versions = if specs.is_empty() {
129                            "*".to_string()
130                        } else {
131                            specs.join(", ")
132                        };
133
134                        let vuln_info = VulnerabilityInfo {
135                            id: vuln_id,
136                            vuln_type: "security".to_string(), // Security vulnerability
137                            severity,
138                            title,
139                            description,
140                            cve,
141                            ghsa: None, // Safety doesn't provide GHSA
142                            affected_versions,
143                            patched_versions: None, // Safety doesn't provide this directly
144                            published_date: None,
145                            references: Vec::new(), // Safety doesn't provide references
146                        };
147
148                        // Check if we already have this dependency
149                        if let Some(existing) = vulnerable_deps.iter_mut().find(|vuln_dep| {
150                            vuln_dep.name == package_name && vuln_dep.version == package_version
151                        }) {
152                            existing.vulnerabilities.push(vuln_info);
153                        } else {
154                            vulnerable_deps.push(VulnerableDependency {
155                                name: dep.name.clone(),
156                                version: package_version,
157                                language: crate::analyzer::dependency_parser::Language::Python,
158                                vulnerabilities: vec![vuln_info],
159                            });
160                        }
161                    }
162                }
163            }
164        }
165
166        if vulnerable_deps.is_empty() {
167            Ok(None)
168        } else {
169            Ok(Some(vulnerable_deps))
170        }
171    }
172
173    fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
174        match severity.map(|s| s.to_lowercase()).as_deref() {
175            Some("critical") => VulnerabilitySeverity::Critical,
176            Some("high") => VulnerabilitySeverity::High,
177            Some("medium") => VulnerabilitySeverity::Medium,
178            Some("low") => VulnerabilitySeverity::Low,
179            _ => VulnerabilitySeverity::Medium, // Default to medium if not specified
180        }
181    }
182}
183
184impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
185    fn check_vulnerabilities(
186        &mut self,
187        dependencies: &[DependencyInfo],
188        project_path: &Path,
189    ) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
190        info!("Checking Python dependencies");
191
192        match self.execute_safety_check(project_path, dependencies) {
193            Ok(Some(vulns)) => Ok(vulns),
194            Ok(None) => Ok(vec![]),
195            Err(e) => Err(e),
196        }
197    }
198}