use super::AdvisorySource;
use crate::error::{AdvisoryError, Result};
use crate::models::Advisory;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use std::io::Read;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tracing::{debug, warn};
const MAX_CONCURRENT_ECOSYSTEMS: usize = 4;
const MAX_CONCURRENT_ADVISORY_FETCHES: usize = 20;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub struct OSVSource {
ecosystems: Vec<String>,
client: ClientWithMiddleware,
raw_client: reqwest::Client,
}
impl OSVSource {
pub fn new(ecosystems: Vec<String>) -> Self {
let raw_client = reqwest::Client::builder()
.timeout(REQUEST_TIMEOUT)
.connect_timeout(CONNECT_TIMEOUT)
.pool_max_idle_per_host(10)
.build()
.unwrap_or_default();
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(raw_client.clone())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Self {
ecosystems,
client,
raw_client,
}
}
}
#[async_trait]
impl AdvisorySource for OSVSource {
async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
self.fetch_internal(since).await
}
fn name(&self) -> &str {
"OSV"
}
}
impl OSVSource {
async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
let ecosystems = if self.ecosystems.is_empty() {
debug!("No ecosystems specified, fetching list from OSV...");
let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
let response = self.client.get(url).send().await?;
if !response.status().is_success() {
warn!("Failed to fetch ecosystems list: {}", response.status());
return Ok(vec![]);
}
let text = response.text().await?;
text.lines().map(|s| s.to_string()).collect()
} else {
self.ecosystems.clone()
};
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ECOSYSTEMS));
let client = self.raw_client.clone();
let tasks: Vec<_> = ecosystems
.into_iter()
.map(|ecosystem| {
let sem = semaphore.clone();
let client = client.clone();
tokio::spawn(async move {
let _permit = sem.acquire().await.expect("semaphore closed");
Self::fetch_ecosystem(&client, &ecosystem, since).await
})
})
.collect();
let mut all_advisories = Vec::new();
for task in tasks {
match task.await {
Ok(Ok(advisories)) => {
all_advisories.extend(advisories);
}
Ok(Err(e)) => {
warn!("Ecosystem fetch error: {}", e);
}
Err(e) => {
warn!("Task join error: {}", e);
}
}
}
Ok(all_advisories)
}
async fn fetch_ecosystem(
client: &reqwest::Client,
ecosystem: &str,
since: Option<DateTime<Utc>>,
) -> Result<Vec<Advisory>> {
if let Some(cutoff) = since {
debug!(
"Attempting incremental sync for {} since {}",
ecosystem, cutoff
);
match Self::fetch_incremental(client, ecosystem, cutoff).await {
Ok(advisories) => {
debug!(
"Incremental sync for {}: {} advisories",
ecosystem,
advisories.len()
);
return Ok(advisories);
}
Err(e) => {
warn!(
"Incremental sync failed for {}, falling back to full sync: {}",
ecosystem, e
);
}
}
}
debug!("Performing full sync for {}", ecosystem);
match Self::fetch_full(client, ecosystem).await {
Ok(advisories) => {
debug!(
"Full sync for {}: {} advisories",
ecosystem,
advisories.len()
);
Ok(advisories)
}
Err(e) => {
warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
Ok(vec![])
}
}
}
async fn fetch_incremental(
client: &reqwest::Client,
ecosystem: &str,
since: DateTime<Utc>,
) -> Result<Vec<Advisory>> {
let csv_url = format!(
"https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
ecosystem
);
let response = client.get(&csv_url).send().await?;
if !response.status().is_success() {
return Err(AdvisoryError::source_fetch(
"OSV",
format!("Failed to fetch modified_id.csv: {}", response.status()),
));
}
let csv_text = response.text().await?;
let mut changed_ids = Vec::new();
for line in csv_text.lines() {
let parts: Vec<&str> = line.split(',').collect();
if parts.len() != 2 {
continue;
}
let timestamp_str = parts[0];
let id = parts[1];
match DateTime::parse_from_rfc3339(timestamp_str) {
Ok(modified) => {
let modified_utc = modified.with_timezone(&chrono::Utc);
if modified_utc <= since {
break;
}
changed_ids.push(id.to_string());
}
Err(_) => {
warn!(
"Failed to parse timestamp in modified_id.csv: {}",
timestamp_str
);
continue;
}
}
}
debug!(
"Found {} changed advisories for {}",
changed_ids.len(),
ecosystem
);
if changed_ids.is_empty() {
return Ok(vec![]);
}
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ADVISORY_FETCHES));
let client = client.clone();
let ecosystem = ecosystem.to_string();
let tasks: Vec<_> = changed_ids
.into_iter()
.map(|id| {
let sem = semaphore.clone();
let client = client.clone();
let ecosystem = ecosystem.clone();
tokio::spawn(async move {
let _permit = sem.acquire().await.expect("semaphore closed");
let json_url = format!(
"https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
ecosystem, id
);
match client.get(&json_url).send().await {
Ok(response) if response.status().is_success() => {
match response.json::<Advisory>().await {
Ok(advisory) => Some(advisory),
Err(e) => {
debug!("Failed to parse advisory {}: {}", id, e);
None
}
}
}
Ok(response) => {
debug!("Failed to fetch advisory {}: {}", id, response.status());
None
}
Err(e) => {
debug!("Network error fetching advisory {}: {}", id, e);
None
}
}
})
})
.collect();
let mut advisories = Vec::with_capacity(tasks.len());
for task in tasks {
if let Ok(Some(advisory)) = task.await {
advisories.push(advisory);
}
}
Ok(advisories)
}
async fn fetch_full(client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
let url = format!(
"https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
ecosystem
);
let response = client.get(&url).send().await?;
if !response.status().is_success() {
return Err(AdvisoryError::source_fetch(
"OSV",
format!("Failed to fetch ZIP: {}", response.status()),
));
}
let bytes = response.bytes().await?;
let ecosystem = ecosystem.to_string();
let advisories =
tokio::task::spawn_blocking(move || Self::parse_zip_sync(&bytes, &ecosystem))
.await
.map_err(|e| {
AdvisoryError::source_fetch("OSV", format!("Task join error: {}", e))
})??;
Ok(advisories)
}
fn parse_zip_sync(bytes: &[u8], ecosystem: &str) -> Result<Vec<Advisory>> {
use std::io::Cursor;
let reader = Cursor::new(bytes);
let mut zip = zip::ZipArchive::new(reader)?;
let mut advisories = Vec::with_capacity(zip.len());
for i in 0..zip.len() {
let mut file = zip.by_index(i)?;
if !file.name().ends_with(".json") {
continue;
}
let mut content = String::new();
file.read_to_string(&mut content)?;
match serde_json::from_str::<Advisory>(&content) {
Ok(advisory) => advisories.push(advisory),
Err(e) => {
eprintln!("WARN: Failed to parse OSV advisory in {}: {}", ecosystem, e);
}
}
}
Ok(advisories)
}
}