vulnera_advisor/sources/
osv.rs

1use super::AdvisorySource;
2use crate::error::{AdvisoryError, Result};
3use crate::models::Advisory;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
7use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
8use std::io::Read;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::Semaphore;
12use tracing::{debug, info, warn};
13
14/// Maximum concurrent ecosystem syncs
15const MAX_CONCURRENT_ECOSYSTEMS: usize = 4;
16/// Maximum concurrent individual advisory fetches (incremental sync)
17const MAX_CONCURRENT_ADVISORY_FETCHES: usize = 20;
18/// Request timeout for individual requests
19const REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
20/// Connection timeout
21const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22
23pub struct OSVSource {
24    ecosystems: Vec<String>,
25    client: ClientWithMiddleware,
26    /// Raw client for operations that need direct reqwest (like streaming ZIPs)
27    raw_client: reqwest::Client,
28}
29
30impl OSVSource {
31    pub fn new(ecosystems: Vec<String>) -> Self {
32        let raw_client = reqwest::Client::builder()
33            .timeout(REQUEST_TIMEOUT)
34            .connect_timeout(CONNECT_TIMEOUT)
35            .pool_max_idle_per_host(10)
36            .build()
37            .unwrap_or_default();
38        
39        // Retry policy: 3 retries with exponential backoff
40        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
41        let client = ClientBuilder::new(raw_client.clone())
42            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
43            .build();
44        
45        Self { ecosystems, client, raw_client }
46    }
47}
48
49#[async_trait]
50impl AdvisorySource for OSVSource {
51    async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
52        // ... implementation ...
53        self.fetch_internal(since).await
54    }
55
56    fn name(&self) -> &str {
57        "OSV"
58    }
59}
60
61impl OSVSource {
62    async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
63        let ecosystems = if self.ecosystems.is_empty() {
64            info!("No ecosystems specified, fetching list from OSV...");
65            let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
66            let response = self.client.get(url).send().await?;
67            if !response.status().is_success() {
68                warn!("Failed to fetch ecosystems list: {}", response.status());
69                return Ok(vec![]);
70            }
71            let text = response.text().await?;
72            text.lines().map(|s| s.to_string()).collect()
73        } else {
74            self.ecosystems.clone()
75        };
76
77        // Process ecosystems concurrently with a semaphore to limit parallelism
78        let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ECOSYSTEMS));
79        // Use raw_client for ecosystem fetches (streaming ZIP downloads)
80        let client = self.raw_client.clone();
81        
82        let tasks: Vec<_> = ecosystems
83            .into_iter()
84            .map(|ecosystem| {
85                let sem = semaphore.clone();
86                let client = client.clone();
87                let since = since;
88                
89                tokio::spawn(async move {
90                    let _permit = sem.acquire().await.expect("semaphore closed");
91                    Self::fetch_ecosystem(&client, &ecosystem, since).await
92                })
93            })
94            .collect();
95
96        // Collect results from all tasks
97        let mut all_advisories = Vec::new();
98        for task in tasks {
99            match task.await {
100                Ok(Ok(advisories)) => {
101                    all_advisories.extend(advisories);
102                }
103                Ok(Err(e)) => {
104                    warn!("Ecosystem fetch error: {}", e);
105                }
106                Err(e) => {
107                    warn!("Task join error: {}", e);
108                }
109            }
110        }
111
112        Ok(all_advisories)
113    }
114
115    /// Fetch advisories for a single ecosystem
116    async fn fetch_ecosystem(
117        client: &reqwest::Client,
118        ecosystem: &str,
119        since: Option<DateTime<Utc>>,
120    ) -> Result<Vec<Advisory>> {
121        // Try incremental sync first if we have a timestamp
122        if let Some(cutoff) = since {
123            info!(
124                "Attempting incremental sync for {} since {}",
125                ecosystem, cutoff
126            );
127            match Self::fetch_incremental(client, ecosystem, cutoff).await {
128                Ok(advisories) => {
129                    info!(
130                        "Incremental sync for {}: {} advisories",
131                        ecosystem,
132                        advisories.len()
133                    );
134                    return Ok(advisories);
135                }
136                Err(e) => {
137                    warn!(
138                        "Incremental sync failed for {}, falling back to full sync: {}",
139                        ecosystem, e
140                    );
141                }
142            }
143        }
144
145        // Full sync: download entire ZIP
146        info!("Performing full sync for {}", ecosystem);
147        match Self::fetch_full(client, ecosystem).await {
148            Ok(advisories) => {
149                info!(
150                    "Full sync for {}: {} advisories",
151                    ecosystem,
152                    advisories.len()
153                );
154                Ok(advisories)
155            }
156            Err(e) => {
157                warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
158                Ok(vec![])
159            }
160        }
161    }
162
163    /// Fetch changed advisories incrementally using the modified_id.csv
164    async fn fetch_incremental(
165        client: &reqwest::Client,
166        ecosystem: &str,
167        since: DateTime<Utc>,
168    ) -> Result<Vec<Advisory>> {
169        let csv_url = format!(
170            "https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
171            ecosystem
172        );
173
174        let response = client.get(&csv_url).send().await?;
175        if !response.status().is_success() {
176            return Err(AdvisoryError::source_fetch(
177                "OSV",
178                format!("Failed to fetch modified_id.csv: {}", response.status()),
179            ));
180        }
181
182        let csv_text = response.text().await?;
183        let mut changed_ids = Vec::new();
184
185        // Parse CSV: format is "iso_date,id"
186        for line in csv_text.lines() {
187            let parts: Vec<&str> = line.split(',').collect();
188            if parts.len() != 2 {
189                continue;
190            }
191
192            let timestamp_str = parts[0];
193            let id = parts[1];
194
195            // Parse timestamp
196            match DateTime::parse_from_rfc3339(timestamp_str) {
197                Ok(modified) => {
198                    let modified_utc = modified.with_timezone(&chrono::Utc);
199                    if modified_utc <= since {
200                        // CSV is sorted newest first, so we can stop here
201                        break;
202                    }
203                    changed_ids.push(id.to_string());
204                }
205                Err(_) => {
206                    warn!(
207                        "Failed to parse timestamp in modified_id.csv: {}",
208                        timestamp_str
209                    );
210                    continue;
211                }
212            }
213        }
214
215        info!(
216            "Found {} changed advisories for {}",
217            changed_ids.len(),
218            ecosystem
219        );
220
221        if changed_ids.is_empty() {
222            return Ok(vec![]);
223        }
224
225        // Download individual JSONs concurrently with rate limiting
226        let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_ADVISORY_FETCHES));
227        let client = client.clone();
228        let ecosystem = ecosystem.to_string();
229        
230        let tasks: Vec<_> = changed_ids
231            .into_iter()
232            .map(|id| {
233                let sem = semaphore.clone();
234                let client = client.clone();
235                let ecosystem = ecosystem.clone();
236                
237                tokio::spawn(async move {
238                    let _permit = sem.acquire().await.expect("semaphore closed");
239                    let json_url = format!(
240                        "https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
241                        ecosystem, id
242                    );
243
244                    match client.get(&json_url).send().await {
245                        Ok(response) if response.status().is_success() => {
246                            match response.json::<Advisory>().await {
247                                Ok(advisory) => Some(advisory),
248                                Err(e) => {
249                                    debug!("Failed to parse advisory {}: {}", id, e);
250                                    None
251                                }
252                            }
253                        }
254                        Ok(response) => {
255                            debug!("Failed to fetch advisory {}: {}", id, response.status());
256                            None
257                        }
258                        Err(e) => {
259                            debug!("Network error fetching advisory {}: {}", id, e);
260                            None
261                        }
262                    }
263                })
264            })
265            .collect();
266
267        // Collect results
268        let mut advisories = Vec::with_capacity(tasks.len());
269        for task in tasks {
270            if let Ok(Some(advisory)) = task.await {
271                advisories.push(advisory);
272            }
273        }
274
275        Ok(advisories)
276    }
277
278    /// Fetch all advisories from the ecosystem ZIP file
279    async fn fetch_full(client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
280        let url = format!(
281            "https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
282            ecosystem
283        );
284
285        let response = client.get(&url).send().await?;
286        if !response.status().is_success() {
287            return Err(AdvisoryError::source_fetch(
288                "OSV",
289                format!("Failed to fetch ZIP: {}", response.status()),
290            ));
291        }
292
293        // Stream download to memory (ZIPs are usually <50MB compressed)
294        let bytes = response.bytes().await?;
295        let ecosystem = ecosystem.to_string();
296        
297        // Parse ZIP in a blocking task to avoid blocking the async runtime
298        let advisories = tokio::task::spawn_blocking(move || {
299            Self::parse_zip_sync(&bytes, &ecosystem)
300        })
301        .await
302        .map_err(|e| AdvisoryError::source_fetch("OSV", format!("Task join error: {}", e)))??;
303
304        Ok(advisories)
305    }
306
307    /// Synchronous ZIP parsing (runs in spawn_blocking)
308    fn parse_zip_sync(bytes: &[u8], ecosystem: &str) -> Result<Vec<Advisory>> {
309        use std::io::Cursor;
310        
311        let reader = Cursor::new(bytes);
312        let mut zip = zip::ZipArchive::new(reader)?;
313        let mut advisories = Vec::with_capacity(zip.len());
314
315        for i in 0..zip.len() {
316            let mut file = zip.by_index(i)?;
317            if !file.name().ends_with(".json") {
318                continue;
319            }
320
321            let mut content = String::new();
322            file.read_to_string(&mut content)?;
323
324            match serde_json::from_str::<Advisory>(&content) {
325                Ok(advisory) => advisories.push(advisory),
326                Err(e) => {
327                    // Use eprintln since we're in a blocking context
328                    // In production, you'd want structured logging
329                    eprintln!("WARN: Failed to parse OSV advisory in {}: {}", ecosystem, e);
330                }
331            }
332        }
333
334        Ok(advisories)
335    }
336}