vulnera_advisor/sources/
osv.rs

1use super::AdvisorySource;
2use crate::error::{AdvisoryError, Result};
3use crate::models::Advisory;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
7use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
8use std::io::Read;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::Semaphore;
12use tracing::{debug, warn};
13
14/// Maximum concurrent ecosystem syncs
15const MAX_CONCURRENT_ECOSYSTEMS: usize = 4;
16/// Maximum concurrent individual advisory fetches (incremental sync)
17const MAX_CONCURRENT_ADVISORY_FETCHES: usize = 20;
18/// Request timeout for individual requests
19const REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
20/// Connection timeout
21const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22
23pub struct OSVSource {
24    ecosystems: Vec<String>,
25    client: ClientWithMiddleware,
26    /// Raw client for operations that need direct reqwest (like streaming ZIPs)
27    raw_client: reqwest::Client,
28}
29
30impl OSVSource {
31    pub fn new(ecosystems: Vec<String>) -> Self {
32        let raw_client = reqwest::Client::builder()
33            .timeout(REQUEST_TIMEOUT)
34            .connect_timeout(CONNECT_TIMEOUT)
35            .pool_max_idle_per_host(10)
36            .build()
37            .unwrap_or_default();
38
39        // Retry policy: 3 retries with exponential backoff
40        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
41        let client = ClientBuilder::new(raw_client.clone())
42            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
43            .build();
44
45        Self {
46            ecosystems,
47            client,
48            raw_client,
49        }
50    }
51}
52
53#[async_trait]
54impl AdvisorySource for OSVSource {
55    async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
56        // ... implementation ...
57        self.fetch_internal(since).await
58    }
59
60    fn name(&self) -> &str {
61        "OSV"
62    }
63}
64
65impl OSVSource {
66    async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
67        let ecosystems = if self.ecosystems.is_empty() {
68            debug!("No ecosystems specified, fetching list from OSV...");
69            let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
70            let response = self.client.get(url).send().await?;
71            if !response.status().is_success() {
72                warn!("Failed to fetch ecosystems list: {}", response.status());
73                return Ok(vec![]);
74            }
75            let text = response.text().await?;
76            text.lines().map(|s| s.to_string()).collect()
77        } else {
78            self.ecosystems.clone()
79        };
80
81        // Process ecosystems concurrently with a semaphore to limit parallelism
82        let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ECOSYSTEMS));
83        // Use raw_client for ecosystem fetches (streaming ZIP downloads)
84        let client = self.raw_client.clone();
85
86        let tasks: Vec<_> = ecosystems
87            .into_iter()
88            .map(|ecosystem| {
89                let sem = semaphore.clone();
90                let client = client.clone();
91
92                tokio::spawn(async move {
93                    let _permit = sem.acquire().await.expect("semaphore closed");
94                    Self::fetch_ecosystem(&client, &ecosystem, since).await
95                })
96            })
97            .collect();
98
99        // Collect results from all tasks
100        let mut all_advisories = Vec::new();
101        for task in tasks {
102            match task.await {
103                Ok(Ok(advisories)) => {
104                    all_advisories.extend(advisories);
105                }
106                Ok(Err(e)) => {
107                    warn!("Ecosystem fetch error: {}", e);
108                }
109                Err(e) => {
110                    warn!("Task join error: {}", e);
111                }
112            }
113        }
114
115        Ok(all_advisories)
116    }
117
118    /// Fetch advisories for a single ecosystem
119    async fn fetch_ecosystem(
120        client: &reqwest::Client,
121        ecosystem: &str,
122        since: Option<DateTime<Utc>>,
123    ) -> Result<Vec<Advisory>> {
124        // Try incremental sync first if we have a timestamp
125        if let Some(cutoff) = since {
126            debug!(
127                "Attempting incremental sync for {} since {}",
128                ecosystem, cutoff
129            );
130            match Self::fetch_incremental(client, ecosystem, cutoff).await {
131                Ok(advisories) => {
132                    debug!(
133                        "Incremental sync for {}: {} advisories",
134                        ecosystem,
135                        advisories.len()
136                    );
137                    return Ok(advisories);
138                }
139                Err(e) => {
140                    warn!(
141                        "Incremental sync failed for {}, falling back to full sync: {}",
142                        ecosystem, e
143                    );
144                }
145            }
146        }
147
148        // Full sync: download entire ZIP
149        debug!("Performing full sync for {}", ecosystem);
150        match Self::fetch_full(client, ecosystem).await {
151            Ok(advisories) => {
152                debug!(
153                    "Full sync for {}: {} advisories",
154                    ecosystem,
155                    advisories.len()
156                );
157                Ok(advisories)
158            }
159            Err(e) => {
160                warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
161                Ok(vec![])
162            }
163        }
164    }
165
166    /// Fetch changed advisories incrementally using the modified_id.csv
167    async fn fetch_incremental(
168        client: &reqwest::Client,
169        ecosystem: &str,
170        since: DateTime<Utc>,
171    ) -> Result<Vec<Advisory>> {
172        let csv_url = format!(
173            "https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
174            ecosystem
175        );
176
177        let response = client.get(&csv_url).send().await?;
178        if !response.status().is_success() {
179            return Err(AdvisoryError::source_fetch(
180                "OSV",
181                format!("Failed to fetch modified_id.csv: {}", response.status()),
182            ));
183        }
184
185        let csv_text = response.text().await?;
186        let mut changed_ids = Vec::new();
187
188        // Parse CSV: format is "iso_date,id"
189        for line in csv_text.lines() {
190            let parts: Vec<&str> = line.split(',').collect();
191            if parts.len() != 2 {
192                continue;
193            }
194
195            let timestamp_str = parts[0];
196            let id = parts[1];
197
198            // Parse timestamp
199            match DateTime::parse_from_rfc3339(timestamp_str) {
200                Ok(modified) => {
201                    let modified_utc = modified.with_timezone(&chrono::Utc);
202                    if modified_utc <= since {
203                        // CSV is sorted newest first, so we can stop here
204                        break;
205                    }
206                    changed_ids.push(id.to_string());
207                }
208                Err(_) => {
209                    warn!(
210                        "Failed to parse timestamp in modified_id.csv: {}",
211                        timestamp_str
212                    );
213                    continue;
214                }
215            }
216        }
217
218        debug!(
219            "Found {} changed advisories for {}",
220            changed_ids.len(),
221            ecosystem
222        );
223
224        if changed_ids.is_empty() {
225            return Ok(vec![]);
226        }
227
228        // Download individual JSONs concurrently with rate limiting
229        let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ADVISORY_FETCHES));
230        let client = client.clone();
231        let ecosystem = ecosystem.to_string();
232
233        let tasks: Vec<_> = changed_ids
234            .into_iter()
235            .map(|id| {
236                let sem = semaphore.clone();
237                let client = client.clone();
238                let ecosystem = ecosystem.clone();
239
240                tokio::spawn(async move {
241                    let _permit = sem.acquire().await.expect("semaphore closed");
242                    let json_url = format!(
243                        "https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
244                        ecosystem, id
245                    );
246
247                    match client.get(&json_url).send().await {
248                        Ok(response) if response.status().is_success() => {
249                            match response.json::<Advisory>().await {
250                                Ok(advisory) => Some(advisory),
251                                Err(e) => {
252                                    debug!("Failed to parse advisory {}: {}", id, e);
253                                    None
254                                }
255                            }
256                        }
257                        Ok(response) => {
258                            debug!("Failed to fetch advisory {}: {}", id, response.status());
259                            None
260                        }
261                        Err(e) => {
262                            debug!("Network error fetching advisory {}: {}", id, e);
263                            None
264                        }
265                    }
266                })
267            })
268            .collect();
269
270        // Collect results
271        let mut advisories = Vec::with_capacity(tasks.len());
272        for task in tasks {
273            if let Ok(Some(advisory)) = task.await {
274                advisories.push(advisory);
275            }
276        }
277
278        Ok(advisories)
279    }
280
281    /// Fetch all advisories from the ecosystem ZIP file
282    async fn fetch_full(client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
283        let url = format!(
284            "https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
285            ecosystem
286        );
287
288        let response = client.get(&url).send().await?;
289        if !response.status().is_success() {
290            return Err(AdvisoryError::source_fetch(
291                "OSV",
292                format!("Failed to fetch ZIP: {}", response.status()),
293            ));
294        }
295
296        // Stream download to memory (ZIPs are usually <50MB compressed)
297        let bytes = response.bytes().await?;
298        let ecosystem = ecosystem.to_string();
299
300        // Parse ZIP in a blocking task to avoid blocking the async runtime
301        let advisories =
302            tokio::task::spawn_blocking(move || Self::parse_zip_sync(&bytes, &ecosystem))
303                .await
304                .map_err(|e| {
305                    AdvisoryError::source_fetch("OSV", format!("Task join error: {}", e))
306                })??;
307
308        Ok(advisories)
309    }
310
311    /// Synchronous ZIP parsing (runs in spawn_blocking)
312    fn parse_zip_sync(bytes: &[u8], ecosystem: &str) -> Result<Vec<Advisory>> {
313        use std::io::Cursor;
314
315        let reader = Cursor::new(bytes);
316        let mut zip = zip::ZipArchive::new(reader)?;
317        let mut advisories = Vec::with_capacity(zip.len());
318
319        for i in 0..zip.len() {
320            let mut file = zip.by_index(i)?;
321            if !file.name().ends_with(".json") {
322                continue;
323            }
324
325            let mut content = String::new();
326            file.read_to_string(&mut content)?;
327
328            match serde_json::from_str::<Advisory>(&content) {
329                Ok(advisory) => advisories.push(advisory),
330                Err(e) => {
331                    // Use eprintln since we're in a blocking context
332                    // In production, you'd want structured logging
333                    eprintln!("WARN: Failed to parse OSV advisory in {}: {}", ecosystem, e);
334                }
335            }
336        }
337
338        Ok(advisories)
339    }
340}