vulnera_advisor/sources/
osv.rs1use super::AdvisorySource;
2use crate::error::{AdvisoryError, Result};
3use crate::models::Advisory;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
7use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
8use std::io::Read;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::Semaphore;
12use tracing::{debug, warn};
13
14const MAX_CONCURRENT_ECOSYSTEMS: usize = 4;
16const MAX_CONCURRENT_ADVISORY_FETCHES: usize = 20;
18const REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
20const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22
23pub struct OSVSource {
24 ecosystems: Vec<String>,
25 client: ClientWithMiddleware,
26 raw_client: reqwest::Client,
28}
29
30impl OSVSource {
31 pub fn new(ecosystems: Vec<String>) -> Self {
32 let raw_client = reqwest::Client::builder()
33 .timeout(REQUEST_TIMEOUT)
34 .connect_timeout(CONNECT_TIMEOUT)
35 .pool_max_idle_per_host(10)
36 .build()
37 .unwrap_or_default();
38
39 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
41 let client = ClientBuilder::new(raw_client.clone())
42 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
43 .build();
44
45 Self {
46 ecosystems,
47 client,
48 raw_client,
49 }
50 }
51}
52
53#[async_trait]
54impl AdvisorySource for OSVSource {
55 async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
56 self.fetch_internal(since).await
58 }
59
60 fn name(&self) -> &str {
61 "OSV"
62 }
63}
64
65impl OSVSource {
66 async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
67 let ecosystems = if self.ecosystems.is_empty() {
68 debug!("No ecosystems specified, fetching list from OSV...");
69 let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
70 let response = self.client.get(url).send().await?;
71 if !response.status().is_success() {
72 warn!("Failed to fetch ecosystems list: {}", response.status());
73 return Ok(vec![]);
74 }
75 let text = response.text().await?;
76 text.lines().map(|s| s.to_string()).collect()
77 } else {
78 self.ecosystems.clone()
79 };
80
81 let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ECOSYSTEMS));
83 let client = self.raw_client.clone();
85
86 let tasks: Vec<_> = ecosystems
87 .into_iter()
88 .map(|ecosystem| {
89 let sem = semaphore.clone();
90 let client = client.clone();
91
92 tokio::spawn(async move {
93 let _permit = sem.acquire().await.expect("semaphore closed");
94 Self::fetch_ecosystem(&client, &ecosystem, since).await
95 })
96 })
97 .collect();
98
99 let mut all_advisories = Vec::new();
101 for task in tasks {
102 match task.await {
103 Ok(Ok(advisories)) => {
104 all_advisories.extend(advisories);
105 }
106 Ok(Err(e)) => {
107 warn!("Ecosystem fetch error: {}", e);
108 }
109 Err(e) => {
110 warn!("Task join error: {}", e);
111 }
112 }
113 }
114
115 Ok(all_advisories)
116 }
117
118 async fn fetch_ecosystem(
120 client: &reqwest::Client,
121 ecosystem: &str,
122 since: Option<DateTime<Utc>>,
123 ) -> Result<Vec<Advisory>> {
124 if let Some(cutoff) = since {
126 debug!(
127 "Attempting incremental sync for {} since {}",
128 ecosystem, cutoff
129 );
130 match Self::fetch_incremental(client, ecosystem, cutoff).await {
131 Ok(advisories) => {
132 debug!(
133 "Incremental sync for {}: {} advisories",
134 ecosystem,
135 advisories.len()
136 );
137 return Ok(advisories);
138 }
139 Err(e) => {
140 warn!(
141 "Incremental sync failed for {}, falling back to full sync: {}",
142 ecosystem, e
143 );
144 }
145 }
146 }
147
148 debug!("Performing full sync for {}", ecosystem);
150 match Self::fetch_full(client, ecosystem).await {
151 Ok(advisories) => {
152 debug!(
153 "Full sync for {}: {} advisories",
154 ecosystem,
155 advisories.len()
156 );
157 Ok(advisories)
158 }
159 Err(e) => {
160 warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
161 Ok(vec![])
162 }
163 }
164 }
165
166 async fn fetch_incremental(
168 client: &reqwest::Client,
169 ecosystem: &str,
170 since: DateTime<Utc>,
171 ) -> Result<Vec<Advisory>> {
172 let csv_url = format!(
173 "https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
174 ecosystem
175 );
176
177 let response = client.get(&csv_url).send().await?;
178 if !response.status().is_success() {
179 return Err(AdvisoryError::source_fetch(
180 "OSV",
181 format!("Failed to fetch modified_id.csv: {}", response.status()),
182 ));
183 }
184
185 let csv_text = response.text().await?;
186 let mut changed_ids = Vec::new();
187
188 for line in csv_text.lines() {
190 let parts: Vec<&str> = line.split(',').collect();
191 if parts.len() != 2 {
192 continue;
193 }
194
195 let timestamp_str = parts[0];
196 let id = parts[1];
197
198 match DateTime::parse_from_rfc3339(timestamp_str) {
200 Ok(modified) => {
201 let modified_utc = modified.with_timezone(&chrono::Utc);
202 if modified_utc <= since {
203 break;
205 }
206 changed_ids.push(id.to_string());
207 }
208 Err(_) => {
209 warn!(
210 "Failed to parse timestamp in modified_id.csv: {}",
211 timestamp_str
212 );
213 continue;
214 }
215 }
216 }
217
218 debug!(
219 "Found {} changed advisories for {}",
220 changed_ids.len(),
221 ecosystem
222 );
223
224 if changed_ids.is_empty() {
225 return Ok(vec![]);
226 }
227
228 let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ADVISORY_FETCHES));
230 let client = client.clone();
231 let ecosystem = ecosystem.to_string();
232
233 let tasks: Vec<_> = changed_ids
234 .into_iter()
235 .map(|id| {
236 let sem = semaphore.clone();
237 let client = client.clone();
238 let ecosystem = ecosystem.clone();
239
240 tokio::spawn(async move {
241 let _permit = sem.acquire().await.expect("semaphore closed");
242 let json_url = format!(
243 "https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
244 ecosystem, id
245 );
246
247 match client.get(&json_url).send().await {
248 Ok(response) if response.status().is_success() => {
249 match response.json::<Advisory>().await {
250 Ok(advisory) => Some(advisory),
251 Err(e) => {
252 debug!("Failed to parse advisory {}: {}", id, e);
253 None
254 }
255 }
256 }
257 Ok(response) => {
258 debug!("Failed to fetch advisory {}: {}", id, response.status());
259 None
260 }
261 Err(e) => {
262 debug!("Network error fetching advisory {}: {}", id, e);
263 None
264 }
265 }
266 })
267 })
268 .collect();
269
270 let mut advisories = Vec::with_capacity(tasks.len());
272 for task in tasks {
273 if let Ok(Some(advisory)) = task.await {
274 advisories.push(advisory);
275 }
276 }
277
278 Ok(advisories)
279 }
280
281 async fn fetch_full(client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
283 let url = format!(
284 "https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
285 ecosystem
286 );
287
288 let response = client.get(&url).send().await?;
289 if !response.status().is_success() {
290 return Err(AdvisoryError::source_fetch(
291 "OSV",
292 format!("Failed to fetch ZIP: {}", response.status()),
293 ));
294 }
295
296 let bytes = response.bytes().await?;
298 let ecosystem = ecosystem.to_string();
299
300 let advisories =
302 tokio::task::spawn_blocking(move || Self::parse_zip_sync(&bytes, &ecosystem))
303 .await
304 .map_err(|e| {
305 AdvisoryError::source_fetch("OSV", format!("Task join error: {}", e))
306 })??;
307
308 Ok(advisories)
309 }
310
311 fn parse_zip_sync(bytes: &[u8], ecosystem: &str) -> Result<Vec<Advisory>> {
313 use std::io::Cursor;
314
315 let reader = Cursor::new(bytes);
316 let mut zip = zip::ZipArchive::new(reader)?;
317 let mut advisories = Vec::with_capacity(zip.len());
318
319 for i in 0..zip.len() {
320 let mut file = zip.by_index(i)?;
321 if !file.name().ends_with(".json") {
322 continue;
323 }
324
325 let mut content = String::new();
326 file.read_to_string(&mut content)?;
327
328 match serde_json::from_str::<Advisory>(&content) {
329 Ok(advisory) => advisories.push(advisory),
330 Err(e) => {
331 eprintln!("WARN: Failed to parse OSV advisory in {}: {}", ecosystem, e);
334 }
335 }
336 }
337
338 Ok(advisories)
339 }
340}