vulnera_advisor/sources/
osv.rs

1use super::AdvisorySource;
2use crate::error::{AdvisoryError, Result};
3use crate::models::Advisory;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use std::io::Read;
7use tracing::{info, warn};
8
9pub struct OSVSource {
10    ecosystems: Vec<String>,
11}
12
13impl OSVSource {
14    pub fn new(ecosystems: Vec<String>) -> Self {
15        Self { ecosystems }
16    }
17}
18
19#[async_trait]
20impl AdvisorySource for OSVSource {
21    async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
22        // ... implementation ...
23        self.fetch_internal(since).await
24    }
25
26    fn name(&self) -> &str {
27        "OSV"
28    }
29}
30
31impl OSVSource {
32    async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
33        let mut advisories = Vec::new();
34        let client = reqwest::Client::new();
35
36        let ecosystems = if self.ecosystems.is_empty() {
37            info!("No ecosystems specified, fetching list from OSV...");
38            let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
39            let response = client.get(url).send().await?;
40            if !response.status().is_success() {
41                warn!("Failed to fetch ecosystems list: {}", response.status());
42                return Ok(vec![]);
43            }
44            let text = response.text().await?;
45            text.lines().map(|s| s.to_string()).collect()
46        } else {
47            self.ecosystems.clone()
48        };
49
50        for ecosystem in &ecosystems {
51            // Try incremental sync first if we have a timestamp
52            if let Some(cutoff) = since {
53                info!(
54                    "Attempting incremental sync for {} since {}",
55                    ecosystem, cutoff
56                );
57                match self.fetch_incremental(&client, ecosystem, cutoff).await {
58                    Ok(mut incremental_advisories) => {
59                        info!(
60                            "Incremental sync for {}: {} advisories",
61                            ecosystem,
62                            incremental_advisories.len()
63                        );
64                        advisories.append(&mut incremental_advisories);
65                        continue; // Skip full sync for this ecosystem
66                    }
67                    Err(e) => {
68                        warn!(
69                            "Incremental sync failed for {}, falling back to full sync: {}",
70                            ecosystem, e
71                        );
72                        // Fall through to full sync
73                    }
74                }
75            }
76
77            // Full sync: download entire ZIP
78            info!("Performing full sync for {}", ecosystem);
79            match self.fetch_full(&client, ecosystem).await {
80                Ok(mut full_advisories) => {
81                    info!(
82                        "Full sync for {}: {} advisories",
83                        ecosystem,
84                        full_advisories.len()
85                    );
86                    advisories.append(&mut full_advisories);
87                }
88                Err(e) => {
89                    warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
90                }
91            }
92        }
93
94        Ok(advisories)
95    }
96
97    async fn fetch_incremental(
98        &self,
99        client: &reqwest::Client,
100        ecosystem: &str,
101        since: DateTime<Utc>,
102    ) -> Result<Vec<Advisory>> {
103        let csv_url = format!(
104            "https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
105            ecosystem
106        );
107
108        let response = client.get(&csv_url).send().await?;
109        if !response.status().is_success() {
110            return Err(AdvisoryError::source_fetch(
111                "OSV",
112                format!("Failed to fetch modified_id.csv: {}", response.status()),
113            ));
114        }
115
116        let csv_text = response.text().await?;
117        let mut changed_ids = Vec::new();
118
119        // Parse CSV: format is "iso_date,id"
120        for line in csv_text.lines() {
121            let parts: Vec<&str> = line.split(',').collect();
122            if parts.len() != 2 {
123                continue;
124            }
125
126            let timestamp_str = parts[0];
127            let id = parts[1];
128
129            // Parse timestamp
130            match DateTime::parse_from_rfc3339(timestamp_str) {
131                Ok(modified) => {
132                    let modified_utc = modified.with_timezone(&chrono::Utc);
133                    if modified_utc <= since {
134                        // CSV is sorted newest first, so we can stop here
135                        break;
136                    }
137                    changed_ids.push(id.to_string());
138                }
139                Err(_) => {
140                    warn!(
141                        "Failed to parse timestamp in modified_id.csv: {}",
142                        timestamp_str
143                    );
144                    continue;
145                }
146            }
147        }
148
149        info!(
150            "Found {} changed advisories for {}",
151            changed_ids.len(),
152            ecosystem
153        );
154
155        // Download individual JSONs for changed IDs
156        let mut advisories = Vec::new();
157        for id in changed_ids {
158            let json_url = format!(
159                "https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
160                ecosystem, id
161            );
162
163            match client.get(&json_url).send().await {
164                Ok(response) if response.status().is_success() => {
165                    match response.json::<Advisory>().await {
166                        Ok(advisory) => advisories.push(advisory),
167                        Err(e) => warn!("Failed to parse advisory {}: {}", id, e),
168                    }
169                }
170                Ok(response) => {
171                    warn!("Failed to fetch advisory {}: {}", id, response.status());
172                }
173                Err(e) => {
174                    warn!("Network error fetching advisory {}: {}", id, e);
175                }
176            }
177        }
178
179        Ok(advisories)
180    }
181
182    async fn fetch_full(&self, client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
183        let url = format!(
184            "https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
185            ecosystem
186        );
187
188        let response = client.get(&url).send().await?;
189        if !response.status().is_success() {
190            return Err(AdvisoryError::source_fetch(
191                "OSV",
192                format!("Failed to fetch ZIP: {}", response.status()),
193            ));
194        }
195
196        // Create a temporary file
197        let mut tmp_file = tempfile::tempfile()?;
198        let mut content = response.bytes_stream();
199
200        // Stream download to file
201        use futures_util::StreamExt;
202        while let Some(chunk) = content.next().await {
203            let chunk = chunk?;
204            std::io::Write::write_all(&mut tmp_file, &chunk)?;
205        }
206
207        // Rewind file for reading
208        use std::io::Seek;
209        tmp_file.seek(std::io::SeekFrom::Start(0))?;
210
211        let mut zip = zip::ZipArchive::new(tmp_file)?;
212        let mut advisories = Vec::new();
213
214        for i in 0..zip.len() {
215            let mut file = zip.by_index(i)?;
216            if !file.name().ends_with(".json") {
217                continue;
218            }
219
220            let mut content = String::new();
221            file.read_to_string(&mut content)?;
222
223            match serde_json::from_str::<Advisory>(&content) {
224                Ok(advisory) => advisories.push(advisory),
225                Err(e) => {
226                    warn!("Failed to parse OSV advisory in {}: {}", ecosystem, e);
227                }
228            }
229        }
230
231        Ok(advisories)
232    }
233}