vulnera_advisor/sources/
osv.rs1use super::AdvisorySource;
2use crate::error::{AdvisoryError, Result};
3use crate::models::Advisory;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
7use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
8use std::io::Read;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::Semaphore;
12use tracing::{debug, info, warn};
13
14const MAX_CONCURRENT_ECOSYSTEMS: usize = 4;
16const MAX_CONCURRENT_ADVISORY_FETCHES: usize = 20;
18const REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
20const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22
23pub struct OSVSource {
24 ecosystems: Vec<String>,
25 client: ClientWithMiddleware,
26 raw_client: reqwest::Client,
28}
29
30impl OSVSource {
31 pub fn new(ecosystems: Vec<String>) -> Self {
32 let raw_client = reqwest::Client::builder()
33 .timeout(REQUEST_TIMEOUT)
34 .connect_timeout(CONNECT_TIMEOUT)
35 .pool_max_idle_per_host(10)
36 .build()
37 .unwrap_or_default();
38
39 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
41 let client = ClientBuilder::new(raw_client.clone())
42 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
43 .build();
44
45 Self { ecosystems, client, raw_client }
46 }
47}
48
49#[async_trait]
50impl AdvisorySource for OSVSource {
51 async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
52 self.fetch_internal(since).await
54 }
55
56 fn name(&self) -> &str {
57 "OSV"
58 }
59}
60
61impl OSVSource {
62 async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
63 let ecosystems = if self.ecosystems.is_empty() {
64 info!("No ecosystems specified, fetching list from OSV...");
65 let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
66 let response = self.client.get(url).send().await?;
67 if !response.status().is_success() {
68 warn!("Failed to fetch ecosystems list: {}", response.status());
69 return Ok(vec![]);
70 }
71 let text = response.text().await?;
72 text.lines().map(|s| s.to_string()).collect()
73 } else {
74 self.ecosystems.clone()
75 };
76
77 let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ECOSYSTEMS));
79 let client = self.raw_client.clone();
81
82 let tasks: Vec<_> = ecosystems
83 .into_iter()
84 .map(|ecosystem| {
85 let sem = semaphore.clone();
86 let client = client.clone();
87 let since = since;
88
89 tokio::spawn(async move {
90 let _permit = sem.acquire().await.expect("semaphore closed");
91 Self::fetch_ecosystem(&client, &ecosystem, since).await
92 })
93 })
94 .collect();
95
96 let mut all_advisories = Vec::new();
98 for task in tasks {
99 match task.await {
100 Ok(Ok(advisories)) => {
101 all_advisories.extend(advisories);
102 }
103 Ok(Err(e)) => {
104 warn!("Ecosystem fetch error: {}", e);
105 }
106 Err(e) => {
107 warn!("Task join error: {}", e);
108 }
109 }
110 }
111
112 Ok(all_advisories)
113 }
114
115 async fn fetch_ecosystem(
117 client: &reqwest::Client,
118 ecosystem: &str,
119 since: Option<DateTime<Utc>>,
120 ) -> Result<Vec<Advisory>> {
121 if let Some(cutoff) = since {
123 info!(
124 "Attempting incremental sync for {} since {}",
125 ecosystem, cutoff
126 );
127 match Self::fetch_incremental(client, ecosystem, cutoff).await {
128 Ok(advisories) => {
129 info!(
130 "Incremental sync for {}: {} advisories",
131 ecosystem,
132 advisories.len()
133 );
134 return Ok(advisories);
135 }
136 Err(e) => {
137 warn!(
138 "Incremental sync failed for {}, falling back to full sync: {}",
139 ecosystem, e
140 );
141 }
142 }
143 }
144
145 info!("Performing full sync for {}", ecosystem);
147 match Self::fetch_full(client, ecosystem).await {
148 Ok(advisories) => {
149 info!(
150 "Full sync for {}: {} advisories",
151 ecosystem,
152 advisories.len()
153 );
154 Ok(advisories)
155 }
156 Err(e) => {
157 warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
158 Ok(vec![])
159 }
160 }
161 }
162
163 async fn fetch_incremental(
165 client: &reqwest::Client,
166 ecosystem: &str,
167 since: DateTime<Utc>,
168 ) -> Result<Vec<Advisory>> {
169 let csv_url = format!(
170 "https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
171 ecosystem
172 );
173
174 let response = client.get(&csv_url).send().await?;
175 if !response.status().is_success() {
176 return Err(AdvisoryError::source_fetch(
177 "OSV",
178 format!("Failed to fetch modified_id.csv: {}", response.status()),
179 ));
180 }
181
182 let csv_text = response.text().await?;
183 let mut changed_ids = Vec::new();
184
185 for line in csv_text.lines() {
187 let parts: Vec<&str> = line.split(',').collect();
188 if parts.len() != 2 {
189 continue;
190 }
191
192 let timestamp_str = parts[0];
193 let id = parts[1];
194
195 match DateTime::parse_from_rfc3339(timestamp_str) {
197 Ok(modified) => {
198 let modified_utc = modified.with_timezone(&chrono::Utc);
199 if modified_utc <= since {
200 break;
202 }
203 changed_ids.push(id.to_string());
204 }
205 Err(_) => {
206 warn!(
207 "Failed to parse timestamp in modified_id.csv: {}",
208 timestamp_str
209 );
210 continue;
211 }
212 }
213 }
214
215 info!(
216 "Found {} changed advisories for {}",
217 changed_ids.len(),
218 ecosystem
219 );
220
221 if changed_ids.is_empty() {
222 return Ok(vec![]);
223 }
224
225 let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ADVISORY_FETCHES));
227 let client = client.clone();
228 let ecosystem = ecosystem.to_string();
229
230 let tasks: Vec<_> = changed_ids
231 .into_iter()
232 .map(|id| {
233 let sem = semaphore.clone();
234 let client = client.clone();
235 let ecosystem = ecosystem.clone();
236
237 tokio::spawn(async move {
238 let _permit = sem.acquire().await.expect("semaphore closed");
239 let json_url = format!(
240 "https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
241 ecosystem, id
242 );
243
244 match client.get(&json_url).send().await {
245 Ok(response) if response.status().is_success() => {
246 match response.json::<Advisory>().await {
247 Ok(advisory) => Some(advisory),
248 Err(e) => {
249 debug!("Failed to parse advisory {}: {}", id, e);
250 None
251 }
252 }
253 }
254 Ok(response) => {
255 debug!("Failed to fetch advisory {}: {}", id, response.status());
256 None
257 }
258 Err(e) => {
259 debug!("Network error fetching advisory {}: {}", id, e);
260 None
261 }
262 }
263 })
264 })
265 .collect();
266
267 let mut advisories = Vec::with_capacity(tasks.len());
269 for task in tasks {
270 if let Ok(Some(advisory)) = task.await {
271 advisories.push(advisory);
272 }
273 }
274
275 Ok(advisories)
276 }
277
278 async fn fetch_full(client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
280 let url = format!(
281 "https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
282 ecosystem
283 );
284
285 let response = client.get(&url).send().await?;
286 if !response.status().is_success() {
287 return Err(AdvisoryError::source_fetch(
288 "OSV",
289 format!("Failed to fetch ZIP: {}", response.status()),
290 ));
291 }
292
293 let bytes = response.bytes().await?;
295 let ecosystem = ecosystem.to_string();
296
297 let advisories = tokio::task::spawn_blocking(move || {
299 Self::parse_zip_sync(&bytes, &ecosystem)
300 })
301 .await
302 .map_err(|e| AdvisoryError::source_fetch("OSV", format!("Task join error: {}", e)))??;
303
304 Ok(advisories)
305 }
306
307 fn parse_zip_sync(bytes: &[u8], ecosystem: &str) -> Result<Vec<Advisory>> {
309 use std::io::Cursor;
310
311 let reader = Cursor::new(bytes);
312 let mut zip = zip::ZipArchive::new(reader)?;
313 let mut advisories = Vec::with_capacity(zip.len());
314
315 for i in 0..zip.len() {
316 let mut file = zip.by_index(i)?;
317 if !file.name().ends_with(".json") {
318 continue;
319 }
320
321 let mut content = String::new();
322 file.read_to_string(&mut content)?;
323
324 match serde_json::from_str::<Advisory>(&content) {
325 Ok(advisory) => advisories.push(advisory),
326 Err(e) => {
327 eprintln!("WARN: Failed to parse OSV advisory in {}: {}", ecosystem, e);
330 }
331 }
332 }
333
334 Ok(advisories)
335 }
336}