syncable_cli/analyzer/vulnerability/checkers/
python.rs

1use super::MutableLanguageVulnerabilityChecker;
2use crate::analyzer::dependency_parser::DependencyInfo;
3use crate::analyzer::tool_management::ToolDetector;
4use crate::analyzer::vulnerability::{
5    VulnerabilityError, VulnerabilityInfo, VulnerabilitySeverity, VulnerableDependency,
6};
7use log::{info, warn};
8use std::path::Path;
9use std::process::Command;
10
11#[derive(Default)]
12pub struct PythonVulnerabilityChecker {
13    tool_detector: ToolDetector,
14}
15
16impl PythonVulnerabilityChecker {
17    pub fn new() -> Self {
18        Self {
19            tool_detector: ToolDetector::new(),
20        }
21    }
22
23    fn execute_safety_check(
24        &mut self,
25        project_path: &Path,
26        dependencies: &[DependencyInfo],
27    ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
28        // Check if safety is available
29        let safety_status = self.tool_detector.detect_tool("safety");
30        if !safety_status.available {
31            warn!(
32                "safety not found, skipping Python vulnerability check. Install with: pip install safety"
33            );
34            return Ok(None);
35        }
36
37        info!("Executing safety check in {}", project_path.display());
38
39        // Execute safety check --json
40        let output = Command::new("safety")
41            .args(["check", "--json"])
42            .current_dir(project_path)
43            .output()
44            .map_err(|e| {
45                VulnerabilityError::CommandError(format!("Failed to run safety check: {}", e))
46            })?;
47
48        // Safety returns non-zero exit code when vulnerabilities found
49        // This is expected behavior, not an error
50        if !output.status.success() && output.stdout.is_empty() && output.stderr.is_empty() {
51            return Err(VulnerabilityError::CommandError(format!(
52                "safety check failed with exit code {}: {}",
53                output.status.code().unwrap_or(-1),
54                String::from_utf8_lossy(&output.stderr)
55            )));
56        }
57
58        if output.stdout.is_empty() {
59            return Ok(None);
60        }
61
62        // Parse safety check output
63        let audit_data: serde_json::Value =
64            serde_json::from_slice(&output.stdout).map_err(|e| {
65                VulnerabilityError::ParseError(format!(
66                    "Failed to parse safety check output: {}",
67                    e
68                ))
69            })?;
70
71        self.parse_safety_output(&audit_data, dependencies)
72    }
73
74    fn parse_safety_output(
75        &self,
76        audit_data: &serde_json::Value,
77        dependencies: &[DependencyInfo],
78    ) -> Result<Option<Vec<VulnerableDependency>>, VulnerabilityError> {
79        let mut vulnerable_deps: Vec<VulnerableDependency> = Vec::new();
80
81        // Safety JSON structure parsing
82        if let Some(vulns) = audit_data.get("vulnerabilities").and_then(|v| v.as_array()) {
83            for vulnerability in vulns {
84                if let Some(vuln_obj) = vulnerability.as_object() {
85                    let package_name = vuln_obj
86                        .get("package_name")
87                        .and_then(|n| n.as_str())
88                        .unwrap_or("")
89                        .to_string();
90                    let package_version = vuln_obj
91                        .get("package_version")
92                        .and_then(|v| v.as_str())
93                        .unwrap_or("")
94                        .to_string();
95
96                    // Find matching dependency
97                    if let Some(dep) = dependencies.iter().find(|d| d.name == package_name) {
98                        let vuln_id = vuln_obj
99                            .get("vulnerability_id")
100                            .and_then(|i| i.as_str())
101                            .unwrap_or("unknown")
102                            .to_string();
103                        let title = vuln_obj
104                            .get("advisory")
105                            .and_then(|a| a.as_str())
106                            .unwrap_or("Unknown vulnerability")
107                            .to_string();
108                        let description = vuln_obj
109                            .get("description")
110                            .and_then(|d| d.as_str())
111                            .unwrap_or("")
112                            .to_string();
113                        let severity =
114                            self.parse_severity(vuln_obj.get("severity").and_then(|s| s.as_str()));
115                        let cve = vuln_obj
116                            .get("CVE")
117                            .and_then(|c| c.as_str())
118                            .map(|s| s.to_string());
119                        let specs = vuln_obj
120                            .get("specs")
121                            .and_then(|s| s.as_array())
122                            .map(|arr| {
123                                arr.iter()
124                                    .filter_map(|s| s.as_str())
125                                    .map(|s| s.to_string())
126                                    .collect::<Vec<String>>()
127                            })
128                            .unwrap_or_default();
129                        let affected_versions = if specs.is_empty() {
130                            "*".to_string()
131                        } else {
132                            specs.join(", ")
133                        };
134
135                        let vuln_info = VulnerabilityInfo {
136                            id: vuln_id,
137                            vuln_type: "security".to_string(), // Security vulnerability
138                            severity,
139                            title,
140                            description,
141                            cve,
142                            ghsa: None, // Safety doesn't provide GHSA
143                            affected_versions,
144                            patched_versions: None, // Safety doesn't provide this directly
145                            published_date: None,
146                            references: Vec::new(), // Safety doesn't provide references
147                        };
148
149                        // Check if we already have this dependency
150                        if let Some(existing) = vulnerable_deps.iter_mut().find(|vuln_dep| {
151                            vuln_dep.name == package_name && vuln_dep.version == package_version
152                        }) {
153                            existing.vulnerabilities.push(vuln_info);
154                        } else {
155                            vulnerable_deps.push(VulnerableDependency {
156                                name: dep.name.clone(),
157                                version: package_version,
158                                language: crate::analyzer::dependency_parser::Language::Python,
159                                vulnerabilities: vec![vuln_info],
160                            });
161                        }
162                    }
163                }
164            }
165        }
166
167        if vulnerable_deps.is_empty() {
168            Ok(None)
169        } else {
170            Ok(Some(vulnerable_deps))
171        }
172    }
173
174    fn parse_severity(&self, severity: Option<&str>) -> VulnerabilitySeverity {
175        match severity.map(|s| s.to_lowercase()).as_deref() {
176            Some("critical") => VulnerabilitySeverity::Critical,
177            Some("high") => VulnerabilitySeverity::High,
178            Some("medium") => VulnerabilitySeverity::Medium,
179            Some("low") => VulnerabilitySeverity::Low,
180            _ => VulnerabilitySeverity::Medium, // Default to medium if not specified
181        }
182    }
183}
184
185impl MutableLanguageVulnerabilityChecker for PythonVulnerabilityChecker {
186    fn check_vulnerabilities(
187        &mut self,
188        dependencies: &[DependencyInfo],
189        project_path: &Path,
190    ) -> Result<Vec<VulnerableDependency>, VulnerabilityError> {
191        info!("Checking Python dependencies");
192
193        match self.execute_safety_check(project_path, dependencies) {
194            Ok(Some(vulns)) => Ok(vulns),
195            Ok(None) => Ok(vec![]),
196            Err(e) => Err(e),
197        }
198    }
199}