1#![doc = include_str!("../README.md")]
2
3mod downloader;
4pub mod errors;
5mod osv_gs;
6pub mod types;
7
8use std::{
9 fs::File,
10 io::Cursor,
11 path::{Path, PathBuf},
12 sync::{
13 Arc,
14 atomic::{AtomicI64, Ordering},
15 },
16};
17
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use tempfile::tempdir_in;
21
22pub use crate::osv_gs::{OsvGsEcosystem, OsvGsEcosystems};
23use crate::{
24 downloader::{chuncked_download_to, simple_download_to},
25 errors::{
26 DownloadLatestErr, DownloaderErr, GetRecordErr, OsvDbNewErr, ReadRecordErr, RecordsIterErr,
27 SyncErr,
28 },
29 osv_gs::{osv_archive_url, osv_modified_id_csv_url, osv_record_url},
30 types::{OsvModifiedRecord, OsvRecord, OsvRecordId},
31};
32
33const OSV_RECORD_FILE_EXTENSION: &str = "json";
34const RECORDS_DIRECTORY: &str = "records";
35
36#[derive(Debug, Clone)]
37pub struct OsvDb(Arc<OsvDbInner>);
38
39#[derive(Debug)]
40struct OsvDbInner {
41 location: PathBuf,
43 ecosystems: OsvGsEcosystems,
45 last_modified: AtomicI64,
50}
51
52impl OsvDb {
53 pub fn new(
63 ecosystems: OsvGsEcosystems,
64 path: impl AsRef<Path>,
65 ) -> Result<Self, OsvDbNewErr> {
66 std::fs::create_dir_all(&path).map_err(OsvDbNewErr::Io)?;
67 Ok(Self(Arc::new(OsvDbInner {
68 location: path.as_ref().to_path_buf(),
69 ecosystems,
70 last_modified: AtomicI64::default(),
71 })))
72 }
73
74 #[must_use]
76 pub fn location(&self) -> &Path {
77 &self.0.location
78 }
79
80 #[must_use]
84 pub fn ecosystems(&self) -> &OsvGsEcosystems {
85 &self.0.ecosystems
86 }
87
88 #[must_use]
94 pub fn last_modified(&self) -> DateTime<Utc> {
95 DateTime::<Utc>::from_timestamp_nanos(self.0.last_modified.load(Ordering::Acquire))
96 }
97
98 fn records_dir(&self) -> PathBuf {
99 self.location().join(RECORDS_DIRECTORY)
100 }
101
102 fn tmp_dir(
103 &self,
104 prefix: &str,
105 ) -> Result<tempfile::TempDir, std::io::Error> {
106 tempfile::Builder::new()
107 .prefix(prefix)
108 .tempdir_in(self.location())
109 }
110
111 pub fn get_record(
119 &self,
120 id: &OsvRecordId,
121 ) -> Result<Option<OsvRecord>, GetRecordErr> {
122 let records_dir = self.records_dir();
123 let mut record_path = records_dir.join(id);
124 record_path.add_extension(OSV_RECORD_FILE_EXTENSION);
125 if !record_path.exists() {
126 return Ok(None);
127 }
128 let osv_record_file = File::open(record_path).map_err(GetRecordErr::Io)?;
129 let osv_record = serde_json::from_reader(&osv_record_file).map_err(GetRecordErr::Json)?;
130 Ok(Some(osv_record))
131 }
132
133 pub fn records(
139 &self
140 ) -> Result<impl Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send, RecordsIterErr> {
141 let records_dir = self.records_dir();
142 if !records_dir.exists() {
143 let empty: Box<dyn Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send> =
144 Box::new(std::iter::empty());
145 return Ok(empty);
146 }
147
148 let records_dir_content =
149 std::fs::read_dir(records_dir).map_err(RecordsIterErr::ReadDir)?;
150
151 let iter: Box<dyn Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send> = Box::new(
152 records_dir_content
153 .filter(|entry| {
154 entry.as_ref().is_ok_and(|e| {
155 e.path().extension().and_then(|ext| ext.to_str())
156 == Some(OSV_RECORD_FILE_EXTENSION)
157 })
158 })
159 .map(|entry| {
160 let entry = entry.map_err(ReadRecordErr::Io)?;
161 let bytes = std::fs::read(entry.path()).map_err(ReadRecordErr::Io)?;
162 let osv_record = serde_json::from_slice(&bytes).map_err(ReadRecordErr::Json)?;
163 Ok(osv_record)
164 }),
165 );
166 Ok(iter)
167 }
168
169 pub async fn download_latest(
178 &self,
179 chunk_size: u64,
180 ) -> Result<(), DownloadLatestErr> {
181 let tmp_dir = self
182 .tmp_dir("osv-download")
183 .map_err(DownloadLatestErr::Io)?;
184 let client = reqwest::Client::new();
185
186 let new_last_modified =
187 download_latest_archives(&client, &self.0.ecosystems, &tmp_dir, chunk_size).await?;
188
189 let records_dir = self.records_dir();
190 if records_dir.exists() {
191 std::fs::remove_dir_all(&records_dir).map_err(DownloadLatestErr::Io)?;
192 }
193 std::fs::rename(&tmp_dir, records_dir).map_err(DownloadLatestErr::Io)?;
197
198 let new_last_modified_timestamp_nanos = new_last_modified
199 .timestamp_nanos_opt()
200 .ok_or(DownloadLatestErr::TimestampOutOfRange(new_last_modified))?;
201 self.0
202 .last_modified
203 .store(new_last_modified_timestamp_nanos, Ordering::Release);
204
205 Ok(())
206 }
207
208 pub async fn sync(
220 &self
221 ) -> Result<impl Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send, SyncErr> {
222 let tmp_dir = self.tmp_dir("osv-sync").map_err(SyncErr::Io)?;
223 let last_modified = self.last_modified();
224
225 let client = reqwest::Client::new();
226
227 let (new_last_modified, entries_to_download) =
229 collect_modified_entries(&client, &self.0.ecosystems, last_modified).await?;
230
231 let mut tasks = tokio::task::JoinSet::new();
233 for entry in entries_to_download {
234 let client = client.clone();
235 let tmp_path = tmp_dir.path().to_path_buf();
236 tasks.spawn(async move {
237 let mut record_filename = PathBuf::from(&entry.id);
238 record_filename.add_extension(OSV_RECORD_FILE_EXTENSION);
239 simple_download_to(
240 &client,
241 &osv_record_url(entry.ecosystem, &entry.id),
242 &tmp_path.join(&record_filename),
243 )
244 .await
245 .map_err(SyncErr::Download)?;
246 Ok::<(), SyncErr>(())
247 });
248 }
249 while let Some(res) = tasks.join_next().await {
250 res.map_err(SyncErr::Join)??;
251 }
252
253 let records_dir = self.records_dir();
254 if !records_dir.exists() {
255 std::fs::create_dir(&records_dir).map_err(SyncErr::Io)?;
256 }
257 let mut tasks = tokio::task::JoinSet::new();
258 for entry in std::fs::read_dir(tmp_dir.path()).map_err(SyncErr::Io)? {
259 let records_dir = records_dir.clone();
260 tasks.spawn(async move {
261 let entry = entry.map_err(SyncErr::Io)?;
262 let dest = records_dir.join(entry.file_name());
263
264 tokio::fs::rename(entry.path(), &dest)
268 .await
269 .map_err(SyncErr::Io)?;
270 Ok::<PathBuf, SyncErr>(dest)
271 });
272 }
273 let new_record_paths: Vec<PathBuf> = tasks
275 .join_all()
276 .await
277 .into_iter()
278 .collect::<Result<_, _>>()?;
279
280 let new_last_modified_timestamp_nanos = new_last_modified
281 .timestamp_nanos_opt()
282 .ok_or(SyncErr::TimestampOutOfRange(new_last_modified))?;
283 self.0
284 .last_modified
285 .store(new_last_modified_timestamp_nanos, Ordering::Release);
286
287 Ok(new_record_paths.into_iter().map(|path| {
288 let bytes = std::fs::read(&path).map_err(ReadRecordErr::Io)?;
289 let osv_record = serde_json::from_slice(&bytes).map_err(ReadRecordErr::Json)?;
290 Ok::<OsvRecord, ReadRecordErr>(osv_record)
291 }))
292 }
293}
294
295async fn download_latest_archives(
306 client: &reqwest::Client,
307 ecosystems: &OsvGsEcosystems,
308 path: impl AsRef<Path>,
309 chunk_size: u64,
310) -> Result<DateTime<Utc>, DownloadLatestErr> {
311 if ecosystems.is_all() {
312 download_archive_for_ecosystem(client, None, &path, chunk_size).await
313 } else {
314 let mut tasks = tokio::task::JoinSet::new();
315 for eco in ecosystems.iter() {
316 let client = client.clone();
317 let path = path.as_ref().to_path_buf();
318 tasks.spawn(async move {
319 download_archive_for_ecosystem(&client, Some(eco), &path, chunk_size).await
320 });
321 }
322 tasks
323 .join_all()
324 .await
325 .into_iter()
326 .try_fold(DateTime::<Utc>::MIN_UTC, |last_modified, new_modified| {
327 Ok(last_modified.max(new_modified?))
328 })
329 }
330}
331
332async fn download_archive_for_ecosystem(
336 client: &reqwest::Client,
337 ecosystem: Option<OsvGsEcosystem>,
338 path: impl AsRef<Path>,
339 chunk_size: u64,
340) -> Result<DateTime<Utc>, DownloadLatestErr> {
341 download_and_extract_osv_archive(client, ecosystem, &path, chunk_size).await?;
342 let mut csv_rdr = download_osv_modified_csv(client, ecosystem)
343 .await
344 .map_err(DownloadLatestErr::Download)?;
345 let first_record = csv_rdr
346 .records()
347 .next()
348 .ok_or(DownloadLatestErr::EmptyModifiedCsv)?;
349 let entry = OsvModifiedRecord::try_from_csv_record(
350 &first_record.map_err(DownloadLatestErr::Csv)?,
351 ecosystem,
352 )
353 .map_err(DownloadLatestErr::ParseRecord)?;
354 Ok(entry.modified)
355}
356
357async fn collect_modified_entries(
365 client: &reqwest::Client,
366 ecosystems: &OsvGsEcosystems,
367 since: DateTime<Utc>,
368) -> Result<(DateTime<Utc>, Vec<OsvModifiedRecord>), SyncErr> {
369 if ecosystems.is_all() {
370 collect_entries_from_csv(client, None, since).await
371 } else {
372 let mut tasks = tokio::task::JoinSet::new();
373 for eco in ecosystems.iter() {
374 let client = client.clone();
375 tasks.spawn(async move { collect_entries_from_csv(&client, Some(eco), since).await });
376 }
377 tasks.join_all().await.into_iter().try_fold(
378 (since, Vec::new()),
379 |(last_modified, mut entries), res| {
380 let (new_modified, new_entries) = res?;
381 entries.extend(new_entries);
382 Ok((last_modified.max(new_modified), entries))
383 },
384 )
385 }
386}
387
388async fn collect_entries_from_csv(
395 client: &reqwest::Client,
396 ecosystem: Option<OsvGsEcosystem>,
397 since: DateTime<Utc>,
398) -> Result<(DateTime<Utc>, Vec<OsvModifiedRecord>), SyncErr> {
399 let mut new_last_modified = since;
400 let mut entries = Vec::new();
401 let mut csv_rdr = download_osv_modified_csv(client, ecosystem)
402 .await
403 .map_err(SyncErr::Download)?;
404 for result in csv_rdr.records() {
405 let entry =
406 OsvModifiedRecord::try_from_csv_record(&result.map_err(SyncErr::Csv)?, ecosystem)
407 .map_err(SyncErr::ParseRecord)?;
408 if entry.modified <= since {
409 break;
410 }
411 new_last_modified = new_last_modified.max(entry.modified);
412 entries.push(entry);
413 }
414 Ok((new_last_modified, entries))
415}
416
417async fn download_and_extract_osv_archive(
420 client: &reqwest::Client,
421 ecosystem: Option<OsvGsEcosystem>,
422 path: impl AsRef<Path>,
423 chunk_size: u64,
424) -> Result<(), DownloadLatestErr> {
425 let temp_zip_archive_dir = tempdir_in(&path).map_err(DownloadLatestErr::Io)?;
426 let archive = chuncked_download_to(
427 client,
428 &osv_archive_url(ecosystem),
429 chunk_size,
430 temp_zip_archive_dir.path().join("osv.zip"),
431 )
432 .await
433 .map_err(DownloadLatestErr::Download)?;
434 let mut zip_archive = zip::ZipArchive::new(archive).map_err(DownloadLatestErr::Zip)?;
435 zip_archive.extract(&path).map_err(DownloadLatestErr::Zip)?;
436 Ok(())
437}
438
439async fn download_osv_modified_csv(
440 client: &reqwest::Client,
441 ecosystem: Option<OsvGsEcosystem>,
442) -> Result<csv::Reader<Cursor<Bytes>>, DownloaderErr> {
443 let csv_bytes = client
444 .get(osv_modified_id_csv_url(ecosystem))
445 .send()
446 .await
447 .map_err(DownloaderErr::Http)?
448 .bytes()
449 .await
450 .map_err(DownloaderErr::Http)?;
451
452 Ok(csv::ReaderBuilder::new()
453 .has_headers(false)
454 .from_reader(Cursor::new(csv_bytes)))
455}
456
457#[cfg(test)]
458mod tests {
459 use std::{collections::HashSet, sync::atomic::Ordering};
460
461 use tempfile::TempDir;
462
463 use super::*;
464
465 #[tokio::test]
470 async fn download_latest_test() {
471 let tmp = TempDir::new().unwrap();
472 let osv = OsvDb::new(
473 OsvGsEcosystems::all()
474 .add(OsvGsEcosystem::CratesIo)
475 .add(OsvGsEcosystem::Julia),
476 tmp.path(),
477 )
478 .unwrap();
479
480 let record_ids = [
481 "RUSTSEC-2024-0401".to_string(),
482 "JLSEC-2025-329".to_string(),
483 ];
484
485 for record_id in &record_ids {
486 assert!(osv.get_record(record_id).unwrap().is_none());
487 }
488
489 osv.download_latest(10 * 1024 * 1024).await.unwrap();
490
491 for record_id in &record_ids {
492 let record = osv.get_record(record_id).unwrap().unwrap();
493 assert_eq!(&record.id, record_id);
494 }
495
496 let ids: HashSet<OsvRecordId> = osv.records().unwrap().map(|r| r.unwrap().id).collect();
498
499 for record_id in &record_ids {
500 assert!(ids.contains(record_id));
501 }
502 }
503
504 #[tokio::test]
513 async fn sync_test() {
514 let last_modified: DateTime<Utc> = "2026-03-05T00:00:00Z".parse().unwrap();
517
518 let tmp = TempDir::new().unwrap();
519 let osv = OsvDb::new(
520 OsvGsEcosystems::all().add(OsvGsEcosystem::CratesIo),
521 tmp.path(),
522 )
523 .unwrap();
524
525 let record_id = "RUSTSEC-2026-0032".to_string();
526
527 assert!(osv.get_record(&record_id).unwrap().is_none());
529
530 osv.0.last_modified.store(
532 last_modified.timestamp_nanos_opt().unwrap(),
533 Ordering::Release,
534 );
535
536 let sync_records: Vec<OsvRecord> = osv.sync().await.unwrap().map(|r| r.unwrap()).collect();
537
538 assert!(
540 osv.get_record(&record_id).unwrap().is_some(),
541 "RUSTSEC-2026-0032 should exist after sync"
542 );
543
544 let stream_ids: HashSet<String> = osv.records().unwrap().map(|r| r.unwrap().id).collect();
545
546 for sync_record in &sync_records {
547 assert!(
548 stream_ids.contains(&sync_record.id),
549 "sync record {} is missing from records",
550 sync_record.id
551 );
552 assert!(
553 sync_record.modified >= last_modified,
554 "sync record {} has modified {} which is before last_modified {}",
555 sync_record.id,
556 sync_record.modified,
557 last_modified
558 );
559 }
560 }
561}