vulnera_advisor/sources/
osv.rs1use super::AdvisorySource;
2use crate::error::{AdvisoryError, Result};
3use crate::models::Advisory;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use std::io::Read;
7use tracing::{info, warn};
8
9pub struct OSVSource {
10 ecosystems: Vec<String>,
11}
12
13impl OSVSource {
14 pub fn new(ecosystems: Vec<String>) -> Self {
15 Self { ecosystems }
16 }
17}
18
19#[async_trait]
20impl AdvisorySource for OSVSource {
21 async fn fetch(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
22 self.fetch_internal(since).await
24 }
25
26 fn name(&self) -> &str {
27 "OSV"
28 }
29}
30
31impl OSVSource {
32 async fn fetch_internal(&self, since: Option<DateTime<Utc>>) -> Result<Vec<Advisory>> {
33 let mut advisories = Vec::new();
34 let client = reqwest::Client::new();
35
36 let ecosystems = if self.ecosystems.is_empty() {
37 info!("No ecosystems specified, fetching list from OSV...");
38 let url = "https://osv-vulnerabilities.storage.googleapis.com/ecosystems.txt";
39 let response = client.get(url).send().await?;
40 if !response.status().is_success() {
41 warn!("Failed to fetch ecosystems list: {}", response.status());
42 return Ok(vec![]);
43 }
44 let text = response.text().await?;
45 text.lines().map(|s| s.to_string()).collect()
46 } else {
47 self.ecosystems.clone()
48 };
49
50 for ecosystem in &ecosystems {
51 if let Some(cutoff) = since {
53 info!(
54 "Attempting incremental sync for {} since {}",
55 ecosystem, cutoff
56 );
57 match self.fetch_incremental(&client, ecosystem, cutoff).await {
58 Ok(mut incremental_advisories) => {
59 info!(
60 "Incremental sync for {}: {} advisories",
61 ecosystem,
62 incremental_advisories.len()
63 );
64 advisories.append(&mut incremental_advisories);
65 continue; }
67 Err(e) => {
68 warn!(
69 "Incremental sync failed for {}, falling back to full sync: {}",
70 ecosystem, e
71 );
72 }
74 }
75 }
76
77 info!("Performing full sync for {}", ecosystem);
79 match self.fetch_full(&client, ecosystem).await {
80 Ok(mut full_advisories) => {
81 info!(
82 "Full sync for {}: {} advisories",
83 ecosystem,
84 full_advisories.len()
85 );
86 advisories.append(&mut full_advisories);
87 }
88 Err(e) => {
89 warn!("Failed to fetch OSV data for {}: {}", ecosystem, e);
90 }
91 }
92 }
93
94 Ok(advisories)
95 }
96
97 async fn fetch_incremental(
98 &self,
99 client: &reqwest::Client,
100 ecosystem: &str,
101 since: DateTime<Utc>,
102 ) -> Result<Vec<Advisory>> {
103 let csv_url = format!(
104 "https://osv-vulnerabilities.storage.googleapis.com/{}/modified_id.csv",
105 ecosystem
106 );
107
108 let response = client.get(&csv_url).send().await?;
109 if !response.status().is_success() {
110 return Err(AdvisoryError::source_fetch(
111 "OSV",
112 format!("Failed to fetch modified_id.csv: {}", response.status()),
113 ));
114 }
115
116 let csv_text = response.text().await?;
117 let mut changed_ids = Vec::new();
118
119 for line in csv_text.lines() {
121 let parts: Vec<&str> = line.split(',').collect();
122 if parts.len() != 2 {
123 continue;
124 }
125
126 let timestamp_str = parts[0];
127 let id = parts[1];
128
129 match DateTime::parse_from_rfc3339(timestamp_str) {
131 Ok(modified) => {
132 let modified_utc = modified.with_timezone(&chrono::Utc);
133 if modified_utc <= since {
134 break;
136 }
137 changed_ids.push(id.to_string());
138 }
139 Err(_) => {
140 warn!(
141 "Failed to parse timestamp in modified_id.csv: {}",
142 timestamp_str
143 );
144 continue;
145 }
146 }
147 }
148
149 info!(
150 "Found {} changed advisories for {}",
151 changed_ids.len(),
152 ecosystem
153 );
154
155 let mut advisories = Vec::new();
157 for id in changed_ids {
158 let json_url = format!(
159 "https://osv-vulnerabilities.storage.googleapis.com/{}/{}.json",
160 ecosystem, id
161 );
162
163 match client.get(&json_url).send().await {
164 Ok(response) if response.status().is_success() => {
165 match response.json::<Advisory>().await {
166 Ok(advisory) => advisories.push(advisory),
167 Err(e) => warn!("Failed to parse advisory {}: {}", id, e),
168 }
169 }
170 Ok(response) => {
171 warn!("Failed to fetch advisory {}: {}", id, response.status());
172 }
173 Err(e) => {
174 warn!("Network error fetching advisory {}: {}", id, e);
175 }
176 }
177 }
178
179 Ok(advisories)
180 }
181
182 async fn fetch_full(&self, client: &reqwest::Client, ecosystem: &str) -> Result<Vec<Advisory>> {
183 let url = format!(
184 "https://osv-vulnerabilities.storage.googleapis.com/{}/all.zip",
185 ecosystem
186 );
187
188 let response = client.get(&url).send().await?;
189 if !response.status().is_success() {
190 return Err(AdvisoryError::source_fetch(
191 "OSV",
192 format!("Failed to fetch ZIP: {}", response.status()),
193 ));
194 }
195
196 let mut tmp_file = tempfile::tempfile()?;
198 let mut content = response.bytes_stream();
199
200 use futures_util::StreamExt;
202 while let Some(chunk) = content.next().await {
203 let chunk = chunk?;
204 std::io::Write::write_all(&mut tmp_file, &chunk)?;
205 }
206
207 use std::io::Seek;
209 tmp_file.seek(std::io::SeekFrom::Start(0))?;
210
211 let mut zip = zip::ZipArchive::new(tmp_file)?;
212 let mut advisories = Vec::new();
213
214 for i in 0..zip.len() {
215 let mut file = zip.by_index(i)?;
216 if !file.name().ends_with(".json") {
217 continue;
218 }
219
220 let mut content = String::new();
221 file.read_to_string(&mut content)?;
222
223 match serde_json::from_str::<Advisory>(&content) {
224 Ok(advisory) => advisories.push(advisory),
225 Err(e) => {
226 warn!("Failed to parse OSV advisory in {}: {}", ecosystem, e);
227 }
228 }
229 }
230
231 Ok(advisories)
232 }
233}