use super::AnalysisProvider;
use anyhow::{Context, Result};
use rma_common::{Confidence, Finding, FindingCategory, Language, Severity, SourceLocation};
use rustsec::{Database, Lockfile, Vulnerability};
use std::path::{Path, PathBuf};
use tracing::{debug, info, warn};
pub struct RustSecProvider {
database: Option<Database>,
available: bool,
}
impl Default for RustSecProvider {
fn default() -> Self {
Self::new()
}
}
impl RustSecProvider {
pub fn new() -> Self {
match Database::fetch() {
Ok(db) => {
info!("RustSec database loaded ({} advisories)", db.iter().count());
Self {
database: Some(db),
available: true,
}
}
Err(e) => {
warn!("Failed to fetch RustSec database: {}", e);
Self {
database: None,
available: false,
}
}
}
}
pub fn with_database(db: Database) -> Self {
Self {
database: Some(db),
available: true,
}
}
pub fn scan_lockfile(&self, lockfile_path: &Path) -> Result<Vec<Finding>> {
let db = self
.database
.as_ref()
.context("RustSec database not available")?;
let lockfile = Lockfile::load(lockfile_path)
.with_context(|| format!("Failed to load lockfile: {}", lockfile_path.display()))?;
let vulnerabilities = db.vulnerabilities(&lockfile);
debug!(
"Found {} vulnerabilities in {}",
vulnerabilities.len(),
lockfile_path.display()
);
let findings: Vec<Finding> = vulnerabilities
.iter()
.map(|vuln| self.vuln_to_finding(vuln, lockfile_path))
.collect();
Ok(findings)
}
fn vuln_to_finding(&self, vuln: &Vulnerability, lockfile_path: &Path) -> Finding {
let advisory = &vuln.advisory;
let severity = if let Some(cvss) = &advisory.cvss {
let score = cvss.score();
match score {
s if s >= 9.0 => Severity::Critical,
s if s >= 7.0 => Severity::Error,
s if s >= 4.0 => Severity::Warning,
_ => Severity::Info,
}
} else {
Severity::Warning
};
let rule_id = format!("rustsec/{}", advisory.id);
let package_name = &vuln.package.name;
let package_version = &vuln.package.version;
let message = format!(
"{} v{}: {} ({})",
package_name, package_version, advisory.title, advisory.id
);
let mut suggestion = format!("Advisory: {}", advisory.id);
let patched = vuln.versions.patched();
if !patched.is_empty() {
let patched_strs: Vec<String> = patched.iter().map(|v| v.to_string()).collect();
suggestion.push_str(&format!("\nPatched in: {}", patched_strs.join(", ")));
}
let unaffected = vuln.versions.unaffected();
if !unaffected.is_empty() {
let unaffected_strs: Vec<String> = unaffected.iter().map(|v| v.to_string()).collect();
suggestion.push_str(&format!("\nUnaffected: {}", unaffected_strs.join(", ")));
}
if let Some(url) = &advisory.url {
suggestion.push_str(&format!("\nMore info: {}", url));
}
Finding {
id: format!("{}:{}:{}", rule_id, package_name, package_version),
rule_id,
message,
severity,
location: SourceLocation::new(lockfile_path.to_path_buf(), 1, 1, 1, 1),
language: Language::Rust,
snippet: Some(format!("{} = \"{}\"", package_name, package_version)),
suggestion: Some(suggestion),
fix: None,
confidence: Confidence::High,
category: FindingCategory::Security,
fingerprint: None,
properties: None,
}
}
fn find_lockfiles(&self, path: &Path) -> Vec<PathBuf> {
let mut lockfiles = Vec::new();
let root_lock = path.join("Cargo.lock");
if root_lock.exists() {
lockfiles.push(root_lock);
}
for entry in walkdir::WalkDir::new(path)
.max_depth(3)
.into_iter()
.filter_map(|e| e.ok())
{
if entry.file_name() == "Cargo.lock" {
let lock_path = entry.path().to_path_buf();
if !lockfiles.contains(&lock_path) {
lockfiles.push(lock_path);
}
}
}
lockfiles
}
}
impl AnalysisProvider for RustSecProvider {
fn name(&self) -> &'static str {
"rustsec"
}
fn description(&self) -> &'static str {
"RustSec Advisory Database - scans Cargo.lock for known vulnerabilities"
}
fn supports_language(&self, lang: Language) -> bool {
lang == Language::Rust
}
fn is_available(&self) -> bool {
self.available
}
fn version(&self) -> Option<String> {
self.database
.as_ref()
.map(|db| format!("RustSec DB ({} advisories)", db.iter().count()))
}
fn analyze_file(&self, path: &Path) -> Result<Vec<Finding>> {
if path.file_name().map(|n| n == "Cargo.lock").unwrap_or(false) {
self.scan_lockfile(path)
} else {
Ok(Vec::new())
}
}
fn analyze_directory(&self, path: &Path) -> Result<Vec<Finding>> {
if !self.available {
return Ok(Vec::new());
}
let lockfiles = self.find_lockfiles(path);
let mut all_findings = Vec::new();
for lockfile in lockfiles {
match self.scan_lockfile(&lockfile) {
Ok(findings) => {
info!(
"RustSec: {} vulnerabilities in {}",
findings.len(),
lockfile.display()
);
all_findings.extend(findings);
}
Err(e) => {
warn!("Failed to scan {}: {}", lockfile.display(), e);
}
}
}
Ok(all_findings)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_provider_creation() {
let provider = RustSecProvider::new();
let _ = provider.is_available();
}
}