Skip to main content

osv_db/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod downloader;
4pub mod errors;
5mod osv_gs;
6pub mod types;
7
8use std::{
9    fs::File,
10    io::Cursor,
11    path::{Path, PathBuf},
12    sync::{
13        Arc,
14        atomic::{AtomicI64, Ordering},
15    },
16};
17
18use bytes::Bytes;
19use chrono::{DateTime, Utc};
20use tempfile::tempdir_in;
21
22pub use crate::osv_gs::{OsvGsEcosystem, OsvGsEcosystems};
23use crate::{
24    downloader::{chuncked_download_to, simple_download_to},
25    errors::{
26        DownloadLatestErr, DownloaderErr, GetRecordErr, OsvDbNewErr, ReadRecordErr, RecordsIterErr,
27        SyncErr,
28    },
29    osv_gs::{osv_archive_url, osv_modified_id_csv_url, osv_record_url},
30    types::{OsvModifiedRecord, OsvRecord, OsvRecordId},
31};
32
33const OSV_RECORD_FILE_EXTENSION: &str = "json";
34const RECORDS_DIRECTORY: &str = "records";
35
36#[derive(Debug, Clone)]
37pub struct OsvDb(Arc<OsvDbInner>);
38
39#[derive(Debug)]
40struct OsvDbInner {
41    /// On disk location of the OSV data
42    location: PathBuf,
43    /// The set of ecosystems this database targets. An empty set means all ecosystems.
44    ecosystems: OsvGsEcosystems,
45    /// The most recent `modified` timestamp seen across all records, stored as
46    /// nanoseconds since the Unix epoch. Updated atomically after each
47    /// [`OsvDb::download_latest`] or [`OsvDb::sync`] call. Defaults to `0` (Unix
48    /// epoch) until the database is populated.
49    last_modified: AtomicI64,
50}
51
52impl OsvDb {
53    /// Creates a new [`OsvDb`] rooted at `path` targeting the given `ecosystems`.
54    ///
55    /// Pass [`OsvGsEcosystems::all`] to cover all ecosystems, or build a specific set
56    /// with [`OsvGsEcosystems::add`].
57    ///
58    /// If `path` does not exist it is created (including all parent directories).
59    ///
60    /// # Errors
61    /// - [`OsvDbNewErr`]
62    pub fn new(
63        ecosystems: OsvGsEcosystems,
64        path: impl AsRef<Path>,
65    ) -> Result<Self, OsvDbNewErr> {
66        std::fs::create_dir_all(&path).map_err(OsvDbNewErr::Io)?;
67        Ok(Self(Arc::new(OsvDbInner {
68            location: path.as_ref().to_path_buf(),
69            ecosystems,
70            last_modified: AtomicI64::default(),
71        })))
72    }
73
74    /// Returns the on disk location of the database.
75    #[must_use]
76    pub fn location(&self) -> &Path {
77        &self.0.location
78    }
79
80    /// Returns the set of ecosystems this database targets.
81    ///
82    /// An empty set (i.e. [`OsvGsEcosystems::is_all`] is `true`) means all ecosystems.
83    #[must_use]
84    pub fn ecosystems(&self) -> &OsvGsEcosystems {
85        &self.0.ecosystems
86    }
87
88    /// Returns the latest `modified` timestamp seen across all records in the database.
89    ///
90    /// The value reflects the most recent [`download_latest`](Self::download_latest) or
91    /// [`sync`](Self::sync) call. Returns the Unix epoch if the database has not yet
92    /// been populated.
93    #[must_use]
94    pub fn last_modified(&self) -> DateTime<Utc> {
95        DateTime::<Utc>::from_timestamp_nanos(self.0.last_modified.load(Ordering::Acquire))
96    }
97
98    fn records_dir(&self) -> PathBuf {
99        self.location().join(RECORDS_DIRECTORY)
100    }
101
102    fn tmp_dir(
103        &self,
104        prefix: &str,
105    ) -> Result<tempfile::TempDir, std::io::Error> {
106        tempfile::Builder::new()
107            .prefix(prefix)
108            .tempdir_in(self.location())
109    }
110
111    /// Looks up a single OSV record by its [`OsvRecordId`].
112    ///
113    /// Returns `Ok(None)` if no record matching `id` exists.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if the record file cannot be opened or deserialized.
118    pub fn get_record(
119        &self,
120        id: &OsvRecordId,
121    ) -> Result<Option<OsvRecord>, GetRecordErr> {
122        let records_dir = self.records_dir();
123        let mut record_path = records_dir.join(id);
124        record_path.add_extension(OSV_RECORD_FILE_EXTENSION);
125        if !record_path.exists() {
126            return Ok(None);
127        }
128        let osv_record_file = File::open(record_path).map_err(GetRecordErr::Io)?;
129        let osv_record = serde_json::from_reader(&osv_record_file).map_err(GetRecordErr::Json)?;
130        Ok(Some(osv_record))
131    }
132
133    /// Returns an [`Iterator`] over every [`OsvRecord`] stored in the database.
134    ///
135    /// Files are read and parsed synchronously. Each record is yielded as
136    /// `Ok(`[`OsvRecord`]`)`. I/O or parse failures yield an [`Err`] item
137    /// without terminating the iterator.
138    pub fn records(
139        &self
140    ) -> Result<impl Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send, RecordsIterErr> {
141        let records_dir = self.records_dir();
142        if !records_dir.exists() {
143            let empty: Box<dyn Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send> =
144                Box::new(std::iter::empty());
145            return Ok(empty);
146        }
147
148        let records_dir_content =
149            std::fs::read_dir(records_dir).map_err(RecordsIterErr::ReadDir)?;
150
151        let iter: Box<dyn Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send> = Box::new(
152            records_dir_content
153                .filter(|entry| {
154                    entry.as_ref().is_ok_and(|e| {
155                        e.path().extension().and_then(|ext| ext.to_str())
156                            == Some(OSV_RECORD_FILE_EXTENSION)
157                    })
158                })
159                .map(|entry| {
160                    let entry = entry.map_err(ReadRecordErr::Io)?;
161                    let bytes = std::fs::read(entry.path()).map_err(ReadRecordErr::Io)?;
162                    let osv_record = serde_json::from_slice(&bytes).map_err(ReadRecordErr::Json)?;
163                    Ok(osv_record)
164                }),
165        );
166        Ok(iter)
167    }
168
169    /// Downloads a full, latest OSV database for all configured ecosystems.
170    ///
171    /// - For each targeted ecosystem (or the global archive when all ecosystems are
172    ///   selected), downloads the latest archive into a temporary subdirectory of
173    ///   `location` and extracts all records into a single flat directory.
174    /// - Atomically replaces the current records directory with the newly downloaded one.
175    /// - Updates `self.last_modified` with the maximum `modified` timestamp seen across
176    ///   all targeted ecosystems.
177    pub async fn download_latest(
178        &self,
179        chunk_size: u64,
180    ) -> Result<(), DownloadLatestErr> {
181        let tmp_dir = self
182            .tmp_dir("osv-download")
183            .map_err(DownloadLatestErr::Io)?;
184        let client = reqwest::Client::new();
185
186        let new_last_modified =
187            download_latest_archives(&client, &self.0.ecosystems, &tmp_dir, chunk_size).await?;
188
189        let records_dir = self.records_dir();
190        if records_dir.exists() {
191            std::fs::remove_dir_all(&records_dir).map_err(DownloadLatestErr::Io)?;
192        }
193        // Atomically replaces the current records directory with the newly downloaded one.
194        // rename(2) is guaranteed to be atomic on POSIX systems — see
195        // <https://man7.org/linux/man-pages/man2/rename.2.html>.
196        std::fs::rename(&tmp_dir, records_dir).map_err(DownloadLatestErr::Io)?;
197
198        let new_last_modified_timestamp_nanos = new_last_modified
199            .timestamp_nanos_opt()
200            .ok_or(DownloadLatestErr::TimestampOutOfRange(new_last_modified))?;
201        self.0
202            .last_modified
203            .store(new_last_modified_timestamp_nanos, Ordering::Release);
204
205        Ok(())
206    }
207
208    /// Sync with the latest OSV data, downloads only the records that have been modified
209    /// since [`Self::last_modified`] and updates the local database files
210    /// accordingly.
211    ///
212    /// Fetches the `modified_id.csv` index for the configured ecosystem (or all
213    /// ecosystems if [`None`]). The file is sorted in reverse chronological order, so
214    /// parsing stops as soon as a timestamp at or before [`Self::last_modified`] is
215    /// encountered, avoiding a full re-download. After all new records are saved,
216    /// [`Self::last_modified`] is updated to the highest timestamp seen.
217    ///
218    /// Returns an [`Iterator`] that yields each newly added or updated [`OsvRecord`].
219    pub async fn sync(
220        &self
221    ) -> Result<impl Iterator<Item = Result<OsvRecord, ReadRecordErr>> + Send, SyncErr> {
222        let tmp_dir = self.tmp_dir("osv-sync").map_err(SyncErr::Io)?;
223        let last_modified = self.last_modified();
224
225        let client = reqwest::Client::new();
226
227        // Collect all records that need to be downloaded before spawning concurrent tasks.
228        let (new_last_modified, entries_to_download) =
229            collect_modified_entries(&client, &self.0.ecosystems, last_modified).await?;
230
231        // Concurrently download all records.
232        let mut tasks = tokio::task::JoinSet::new();
233        for entry in entries_to_download {
234            let client = client.clone();
235            let tmp_path = tmp_dir.path().to_path_buf();
236            tasks.spawn(async move {
237                let mut record_filename = PathBuf::from(&entry.id);
238                record_filename.add_extension(OSV_RECORD_FILE_EXTENSION);
239                simple_download_to(
240                    &client,
241                    &osv_record_url(entry.ecosystem, &entry.id),
242                    &tmp_path.join(&record_filename),
243                )
244                .await
245                .map_err(SyncErr::Download)?;
246                Ok::<(), SyncErr>(())
247            });
248        }
249        while let Some(res) = tasks.join_next().await {
250            res.map_err(SyncErr::Join)??;
251        }
252
253        let records_dir = self.records_dir();
254        if !records_dir.exists() {
255            std::fs::create_dir(&records_dir).map_err(SyncErr::Io)?;
256        }
257        let mut tasks = tokio::task::JoinSet::new();
258        for entry in std::fs::read_dir(tmp_dir.path()).map_err(SyncErr::Io)? {
259            let records_dir = records_dir.clone();
260            tasks.spawn(async move {
261                let entry = entry.map_err(SyncErr::Io)?;
262                let dest = records_dir.join(entry.file_name());
263
264                // Atomically replaces the current records directory with the newly
265                // downloaded one. rename(2) is guaranteed
266                // to be atomic on POSIX systems — see <https://man7.org/linux/man-pages/man2/rename.2.html>.
267                tokio::fs::rename(entry.path(), &dest)
268                    .await
269                    .map_err(SyncErr::Io)?;
270                Ok::<PathBuf, SyncErr>(dest)
271            });
272        }
273        // resolve error right now, before modifying `last_modified` field
274        let new_record_paths: Vec<PathBuf> = tasks
275            .join_all()
276            .await
277            .into_iter()
278            .collect::<Result<_, _>>()?;
279
280        let new_last_modified_timestamp_nanos = new_last_modified
281            .timestamp_nanos_opt()
282            .ok_or(SyncErr::TimestampOutOfRange(new_last_modified))?;
283        self.0
284            .last_modified
285            .store(new_last_modified_timestamp_nanos, Ordering::Release);
286
287        Ok(new_record_paths.into_iter().map(|path| {
288            let bytes = std::fs::read(&path).map_err(ReadRecordErr::Io)?;
289            let osv_record = serde_json::from_slice(&bytes).map_err(ReadRecordErr::Json)?;
290            Ok::<OsvRecord, ReadRecordErr>(osv_record)
291        }))
292    }
293}
294
295/// Downloads archives for all ecosystems in `ecosystems` into `path` and returns the
296/// maximum `modified` timestamp seen across their `modified_id.csv` files.
297///
298/// When `ecosystems` targets all ecosystems, the single global archive is used.
299/// Otherwise each ecosystem's archive is downloaded and extracted into the same
300/// directory.
301///
302/// The `modified_id.csv` files are sorted in reverse chronological order, so the first
303/// entry is always the most recently modified record for that ecosystem.
304/// <https://google.github.io/osv.dev/data/#downloading-recent-changes>
305async fn download_latest_archives(
306    client: &reqwest::Client,
307    ecosystems: &OsvGsEcosystems,
308    path: impl AsRef<Path>,
309    chunk_size: u64,
310) -> Result<DateTime<Utc>, DownloadLatestErr> {
311    if ecosystems.is_all() {
312        download_archive_for_ecosystem(client, None, &path, chunk_size).await
313    } else {
314        let mut tasks = tokio::task::JoinSet::new();
315        for eco in ecosystems.iter() {
316            let client = client.clone();
317            let path = path.as_ref().to_path_buf();
318            tasks.spawn(async move {
319                download_archive_for_ecosystem(&client, Some(eco), &path, chunk_size).await
320            });
321        }
322        tasks
323            .join_all()
324            .await
325            .into_iter()
326            .try_fold(DateTime::<Utc>::MIN_UTC, |last_modified, new_modified| {
327                Ok(last_modified.max(new_modified?))
328            })
329    }
330}
331
332/// Downloads and extracts the OSV archive for the given `ecosystem` (or the global
333/// archive if [`None`]) into `path`, then reads the first entry of the
334/// `modified_id.csv` and returns its `modified` timestamp.
335async fn download_archive_for_ecosystem(
336    client: &reqwest::Client,
337    ecosystem: Option<OsvGsEcosystem>,
338    path: impl AsRef<Path>,
339    chunk_size: u64,
340) -> Result<DateTime<Utc>, DownloadLatestErr> {
341    download_and_extract_osv_archive(client, ecosystem, &path, chunk_size).await?;
342    let mut csv_rdr = download_osv_modified_csv(client, ecosystem)
343        .await
344        .map_err(DownloadLatestErr::Download)?;
345    let first_record = csv_rdr
346        .records()
347        .next()
348        .ok_or(DownloadLatestErr::EmptyModifiedCsv)?;
349    let entry = OsvModifiedRecord::try_from_csv_record(
350        &first_record.map_err(DownloadLatestErr::Csv)?,
351        ecosystem,
352    )
353    .map_err(DownloadLatestErr::ParseRecord)?;
354    Ok(entry.modified)
355}
356
357/// Reads the `modified_id.csv` for each ecosystem in `ecosystems` and collects every
358/// entry whose `modified` timestamp is strictly after `since`.
359///
360/// Each CSV is sorted in reverse chronological order, so reading stops as soon as an
361/// entry at or before `since` is encountered.
362///
363/// Returns the updated maximum `modified` timestamp and the list of entries to download.
364async fn collect_modified_entries(
365    client: &reqwest::Client,
366    ecosystems: &OsvGsEcosystems,
367    since: DateTime<Utc>,
368) -> Result<(DateTime<Utc>, Vec<OsvModifiedRecord>), SyncErr> {
369    if ecosystems.is_all() {
370        collect_entries_from_csv(client, None, since).await
371    } else {
372        let mut tasks = tokio::task::JoinSet::new();
373        for eco in ecosystems.iter() {
374            let client = client.clone();
375            tasks.spawn(async move { collect_entries_from_csv(&client, Some(eco), since).await });
376        }
377        tasks.join_all().await.into_iter().try_fold(
378            (since, Vec::new()),
379            |(last_modified, mut entries), res| {
380                let (new_modified, new_entries) = res?;
381                entries.extend(new_entries);
382                Ok((last_modified.max(new_modified), entries))
383            },
384        )
385    }
386}
387
388/// Downloads and reads the `modified_id.csv` for the given `ecosystem` (or the global
389/// index if [`None`]) and returns every entry whose `modified` timestamp is strictly
390/// after `since`, along with the maximum `modified` timestamp seen.
391///
392/// The CSV is sorted in reverse chronological order, so reading stops as soon as an
393/// entry at or before `since` is encountered.
394async fn collect_entries_from_csv(
395    client: &reqwest::Client,
396    ecosystem: Option<OsvGsEcosystem>,
397    since: DateTime<Utc>,
398) -> Result<(DateTime<Utc>, Vec<OsvModifiedRecord>), SyncErr> {
399    let mut new_last_modified = since;
400    let mut entries = Vec::new();
401    let mut csv_rdr = download_osv_modified_csv(client, ecosystem)
402        .await
403        .map_err(SyncErr::Download)?;
404    for result in csv_rdr.records() {
405        let entry =
406            OsvModifiedRecord::try_from_csv_record(&result.map_err(SyncErr::Csv)?, ecosystem)
407                .map_err(SyncErr::ParseRecord)?;
408        if entry.modified <= since {
409            break;
410        }
411        new_last_modified = new_last_modified.max(entry.modified);
412        entries.push(entry);
413    }
414    Ok((new_last_modified, entries))
415}
416
417/// Downloads the OSV archive for the given [`OsvGsEcosystem`] (or all ecosystems if
418/// [`None`]) from <https://storage.googleapis.com/osv-vulnerabilities> and extracts it into `path`.
419async fn download_and_extract_osv_archive(
420    client: &reqwest::Client,
421    ecosystem: Option<OsvGsEcosystem>,
422    path: impl AsRef<Path>,
423    chunk_size: u64,
424) -> Result<(), DownloadLatestErr> {
425    let temp_zip_archive_dir = tempdir_in(&path).map_err(DownloadLatestErr::Io)?;
426    let archive = chuncked_download_to(
427        client,
428        &osv_archive_url(ecosystem),
429        chunk_size,
430        temp_zip_archive_dir.path().join("osv.zip"),
431    )
432    .await
433    .map_err(DownloadLatestErr::Download)?;
434    let mut zip_archive = zip::ZipArchive::new(archive).map_err(DownloadLatestErr::Zip)?;
435    zip_archive.extract(&path).map_err(DownloadLatestErr::Zip)?;
436    Ok(())
437}
438
439async fn download_osv_modified_csv(
440    client: &reqwest::Client,
441    ecosystem: Option<OsvGsEcosystem>,
442) -> Result<csv::Reader<Cursor<Bytes>>, DownloaderErr> {
443    let csv_bytes = client
444        .get(osv_modified_id_csv_url(ecosystem))
445        .send()
446        .await
447        .map_err(DownloaderErr::Http)?
448        .bytes()
449        .await
450        .map_err(DownloaderErr::Http)?;
451
452    Ok(csv::ReaderBuilder::new()
453        .has_headers(false)
454        .from_reader(Cursor::new(csv_bytes)))
455}
456
457#[cfg(test)]
458mod tests {
459    use std::{collections::HashSet, sync::atomic::Ordering};
460
461    use tempfile::TempDir;
462
463    use super::*;
464
465    /// Downloads the latest OSV database, reads defiend record ids, removes all
466    /// records modified at or before its `modified` timestamp, then asserts the
467    /// record no longer exists. Then calls sync to re-download it and asserts it
468    /// is present again.
469    #[tokio::test]
470    async fn download_latest_test() {
471        let tmp = TempDir::new().unwrap();
472        let osv = OsvDb::new(
473            OsvGsEcosystems::all()
474                .add(OsvGsEcosystem::CratesIo)
475                .add(OsvGsEcosystem::Julia),
476            tmp.path(),
477        )
478        .unwrap();
479
480        let record_ids = [
481            "RUSTSEC-2024-0401".to_string(),
482            "JLSEC-2025-329".to_string(),
483        ];
484
485        for record_id in &record_ids {
486            assert!(osv.get_record(record_id).unwrap().is_none());
487        }
488
489        osv.download_latest(10 * 1024 * 1024).await.unwrap();
490
491        for record_id in &record_ids {
492            let record = osv.get_record(record_id).unwrap().unwrap();
493            assert_eq!(&record.id, record_id);
494        }
495
496        // verify records yields all records including our target
497        let ids: HashSet<OsvRecordId> = osv.records().unwrap().map(|r| r.unwrap().id).collect();
498
499        for record_id in &record_ids {
500            assert!(ids.contains(record_id));
501        }
502    }
503
504    /// Initialises an empty database, sets `last_modified` to the date of
505    /// `RUSTSEC-2026-0032` (2026-03-05T00:00:00Z), then calls `sync`. Verifies:
506    ///
507    /// 1. `RUSTSEC-2026-0032` was not present before sync.
508    /// 2. `RUSTSEC-2026-0032` exists after sync (it was modified at 2026-03-05T05:53:11Z,
509    ///    which is strictly after the `last_modified` cutoff).
510    /// 3. Every record returned by the `sync` stream is also present in `records_stream`.
511    /// 4. Every record returned by the `sync` stream has `modified >= last_modified`.
512    #[tokio::test]
513    async fn sync_test() {
514        // The date of RUSTSEC-2026-0032 (modified: 2026-03-05T05:53:11Z).
515        // Using midnight so the record itself (modified later that day) is captured.
516        let last_modified: DateTime<Utc> = "2026-03-05T00:00:00Z".parse().unwrap();
517
518        let tmp = TempDir::new().unwrap();
519        let osv = OsvDb::new(
520            OsvGsEcosystems::all().add(OsvGsEcosystem::CratesIo),
521            tmp.path(),
522        )
523        .unwrap();
524
525        let record_id = "RUSTSEC-2026-0032".to_string();
526
527        // DB is empty — record must not exist yet.
528        assert!(osv.get_record(&record_id).unwrap().is_none());
529
530        // Set last_modified to the date of RUSTSEC-2026-0032.
531        osv.0.last_modified.store(
532            last_modified.timestamp_nanos_opt().unwrap(),
533            Ordering::Release,
534        );
535
536        let sync_records: Vec<OsvRecord> = osv.sync().await.unwrap().map(|r| r.unwrap()).collect();
537
538        // RUSTSEC-2026-0032 must be present after sync.
539        assert!(
540            osv.get_record(&record_id).unwrap().is_some(),
541            "RUSTSEC-2026-0032 should exist after sync"
542        );
543
544        let stream_ids: HashSet<String> = osv.records().unwrap().map(|r| r.unwrap().id).collect();
545
546        for sync_record in &sync_records {
547            assert!(
548                stream_ids.contains(&sync_record.id),
549                "sync record {} is missing from records",
550                sync_record.id
551            );
552            assert!(
553                sync_record.modified >= last_modified,
554                "sync record {} has modified {} which is before last_modified {}",
555                sync_record.id,
556                sync_record.modified,
557                last_modified
558            );
559        }
560    }
561}