use crate::audit_report::AuditReport;
use crate::dep_manifest::DepManifest;
use crate::dep_spec::DepSpec;
use crate::env_marker::EnvMarkerState;
use crate::package::Package;
use crate::pypi_project::query_pypi_project;
use crate::ureq_client::UreqClient;
use crate::util::logger;
use crate::util::name_to_key;
use crate::util::CacheConfig;
use crate::util::FlagCacheRefresh;
use crate::util::FlagLog;
use crate::util::FlagRetainPassing;
use crate::util::ResultDynError;
use crate::CvssFilter;
use rayon::prelude::*;
use serde::Serialize;
use std::ops::Deref;
use std::sync::Arc;
#[derive(Debug, Serialize)]
pub struct LookupReport(pub AuditReport);
impl LookupReport {
fn dep_spec_to_packages(
client: Arc<dyn UreqClient>,
ds: &DepSpec,
limit: Option<usize>,
cache_config: &CacheConfig,
log: FlagLog,
) -> Option<Vec<Package>> {
match ds.get_exact() {
Some(version) => Some(vec![Package {
name: ds.name.clone(),
key: name_to_key(&ds.name),
version,
direct_url: None,
}]),
None => {
query_pypi_project(client, &ds.key, cache_config, log)
.ok()
.map(|project| {
project
.get_version_specs(Some(ds), limit) .into_iter()
.map(|version| Package {
name: ds.name.clone(),
key: name_to_key(&ds.name),
version,
direct_url: None,
})
.collect()
})
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn from_dep_spec(
client: Arc<dyn UreqClient>,
ds: &DepSpec,
limit: Option<usize>,
cache_config: &CacheConfig,
cache_refresh: FlagCacheRefresh,
log: FlagLog,
filter_cvss: CvssFilter,
retain_passing: FlagRetainPassing,
) -> ResultDynError<Self> {
let packages: Vec<Package> = match ds.get_exact() {
Some(version) => vec![Package {
name: ds.name.clone(),
key: name_to_key(&ds.name),
version,
direct_url: None,
}],
None => query_pypi_project(client.clone(), &ds.key, cache_config, log)?
.get_version_specs(Some(ds), limit) .into_iter()
.map(|version| Package {
name: ds.name.clone(),
key: name_to_key(&ds.name),
version,
direct_url: None,
})
.collect(),
};
logger!(
log,
module_path!(),
"Looking up vulns in {:?} packages",
packages.len()
);
let audit_report = AuditReport::from_packages(
client,
&packages,
cache_refresh,
cache_config,
log,
filter_cvss,
retain_passing,
);
Ok(LookupReport(audit_report))
}
#[allow(clippy::too_many_arguments)]
pub fn from_dep_manifest(
client: Arc<dyn UreqClient>,
dep_manifest: &DepManifest,
env_marker_state: Option<&EnvMarkerState>,
cache_config: &CacheConfig,
cache_refresh: FlagCacheRefresh,
log: FlagLog,
filter_cvss: CvssFilter,
retain_passing: FlagRetainPassing,
) -> ResultDynError<Self> {
let mut dep_specs: Vec<DepSpec> = Vec::new();
for ds in dep_manifest.iter_dep_specs() {
if ds.env_marker.is_empty() {
dep_specs.push(ds.clone());
} else if let Some(ems) = env_marker_state {
if ds.validate_env_marker(ems) {
dep_specs.push(ds.clone());
}
}
}
let mut packages: Vec<Package> = dep_specs
.par_iter()
.filter_map(|ds| {
Self::dep_spec_to_packages(
client.clone(),
ds,
Some(1), cache_config,
log,
)
})
.flatten()
.collect();
packages.sort();
let audit_report = AuditReport::from_packages(
client,
&packages,
cache_refresh,
cache_config,
log,
filter_cvss,
retain_passing,
);
Ok(LookupReport(audit_report))
}
}
impl Deref for LookupReport {
type Target = AuditReport;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dep_spec::DepSpec;
use crate::table::Tableable;
use crate::ureq_client::UreqClientLive;
use crate::util::path_cache;
use crate::util::DURATION_0;
use std::sync::Arc;
#[test]
#[ignore]
fn test_from_dep_manifest_live() {
let dep_specs = vec![
DepSpec::from_string("numpy==1.20.0").unwrap(),
DepSpec::from_string("pandas>=2.0.0").unwrap(),
DepSpec::from_string("requests>=1,<2").unwrap(),
];
let dep_manifest = DepManifest::from_dep_specs(&dep_specs).unwrap();
let client = Arc::new(UreqClientLive) as Arc<dyn UreqClient>;
let cache_dir = path_cache(true).unwrap();
let cache_config = CacheConfig::new(DURATION_0, cache_dir);
let cache_refresh = FlagCacheRefresh(false);
let retain_passing = FlagRetainPassing(false);
let log = FlagLog(false);
let filter_cvss = CvssFilter::All;
let result = LookupReport::from_dep_manifest(
client,
&dep_manifest,
None,
&cache_config,
cache_refresh,
log,
filter_cvss,
retain_passing,
);
assert!(result.is_ok());
let lookup_report = result.unwrap();
let records = lookup_report.get_records();
for record in records {
println!(
"Package: {}, Vuln IDs: {:?}",
record.package, record.vuln_ids
);
}
}
#[test]
fn test_from_dep_spec_mock() {
use crate::ureq_client::UreqClientMock;
use std::collections::HashMap;
let pypi_json = r#"{"info":{"author":"Abubakar Abid","name":"gradio","project_urls":{"Homepage":"https://github.com/gradio-app/gradio"}},"releases":{"4.0.0":[{"filename":"gradio-4.0.0-py3-none-any.whl"}],"4.18.0":[{"filename":"gradio-4.18.0-py3-none-any.whl"}],"4.19.2":[{"filename":"gradio-4.19.2-py3-none-any.whl"}]}}"#;
let osv_batch_json = r#"{"results":[{"vulns":[{"id":"GHSA-48cq-79qq-6f7x","modified":"2024-05-21T14:58:25.710902Z"}]}]}"#;
let osv_vuln_json = r#"{"id":"GHSA-48cq-79qq-6f7x","summary":"Gradio applications running locally vulnerable to 3rd party websites accessing routes and uploading files","references":[{"type":"ADVISORY","url":"https://nvd.nist.gov/vuln/detail/CVE-2024-1727"}],"severity":[{"type":"CVSS_V3","score":"CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:N/I:N/A:L"}]}"#;
let mut mock_get_map = HashMap::new();
mock_get_map.insert("https://pypi.org".to_string(), pypi_json.to_string());
mock_get_map.insert("https://api.osv.dev".to_string(), osv_vuln_json.to_string());
let mut mock_post_map = HashMap::new();
mock_post_map.insert(
"https://api.osv.dev".to_string(),
osv_batch_json.to_string(),
);
let client = Arc::new(UreqClientMock {
mock_get: Some(mock_get_map),
mock_post: Some(mock_post_map),
}) as Arc<dyn UreqClient>;
let dep_spec = DepSpec::from_string("gradio>=4.0.0").unwrap();
let cache_dir = path_cache(true).unwrap();
let cache_config = CacheConfig::new(DURATION_0, cache_dir);
let result = LookupReport::from_dep_spec(
client,
&dep_spec,
Some(3), &cache_config,
FlagCacheRefresh(true),
FlagLog(false),
CvssFilter::All,
FlagRetainPassing(false),
);
assert!(result.is_ok());
let lookup_report = result.unwrap();
let records = lookup_report.get_records();
assert_eq!(records.len(), 1);
assert_eq!(records[0].package.name, "gradio");
assert_eq!(records[0].vuln_ids, vec!["GHSA-48cq-79qq-6f7x"]);
}
#[test]
fn test_from_dep_manifest_mock() {
use crate::ureq_client::UreqClientMock;
use std::collections::HashMap;
let pypi_gradio = r#"{"info":{"author":"Abubakar Abid","name":"gradio","project_urls":{"Homepage":"https://github.com/gradio-app/gradio"}},"releases":{"4.0.0":[{"filename":"gradio-4.0.0-py3-none-any.whl"}]}}"#;
let pypi_numpy = r#"{"info":{"author":"NumPy Developers","name":"numpy","project_urls":{"Homepage":"https://numpy.org"}},"releases":{"1.19.1":[{"filename":"numpy-1.19.1-py3-none-any.whl"}]}}"#;
let osv_batch_json = r#"{"results":[{"vulns":[{"id":"GHSA-48cq-79qq-6f7x","modified":"2024-05-21T14:58:25.710902Z"}]},{"vulns":null}]}"#;
let osv_vuln_json = r#"{"id":"GHSA-48cq-79qq-6f7x","summary":"Gradio vulnerability","references":[{"type":"ADVISORY","url":"https://nvd.nist.gov/vuln/detail/CVE-2024-1727"}],"severity":[{"type":"CVSS_V3","score":"CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:N/I:N/A:L"}]}"#;
let mut mock_get_map = HashMap::new();
mock_get_map.insert(
"https://pypi.org/pypi/gradio".to_string(),
pypi_gradio.to_string(),
);
mock_get_map.insert(
"https://pypi.org/pypi/numpy".to_string(),
pypi_numpy.to_string(),
);
mock_get_map.insert("https://api.osv.dev".to_string(), osv_vuln_json.to_string());
let mut mock_post_map = HashMap::new();
mock_post_map.insert(
"https://api.osv.dev".to_string(),
osv_batch_json.to_string(),
);
let client = Arc::new(UreqClientMock {
mock_get: Some(mock_get_map),
mock_post: Some(mock_post_map),
}) as Arc<dyn UreqClient>;
let dep_specs = vec![
DepSpec::from_string("gradio==4.0.0").unwrap(),
DepSpec::from_string("numpy==1.19.1").unwrap(),
];
let dep_manifest = DepManifest::from_dep_specs(&dep_specs).unwrap();
let cache_dir = path_cache(true).unwrap();
let cache_config = CacheConfig::new(DURATION_0, cache_dir);
let result = LookupReport::from_dep_manifest(
client,
&dep_manifest,
None,
&cache_config,
FlagCacheRefresh(true),
FlagLog(false),
CvssFilter::All,
FlagRetainPassing(false),
);
assert!(result.is_ok());
let lookup_report = result.unwrap();
let records = lookup_report.get_records();
assert_eq!(records.len(), 1);
assert_eq!(records[0].package.name, "gradio");
assert_eq!(records[0].package.version.to_string(), "4.0.0");
assert_eq!(records[0].vuln_ids, vec!["GHSA-48cq-79qq-6f7x"]);
}
}