#![doc = include_str!("../README.md")]
mod downloader;
mod osv_gs;
pub mod types;
use std::{
fs::File,
io::Cursor,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicI64, Ordering},
},
};
use anyhow::Context;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use tempfile::tempdir_in;
pub use crate::osv_gs::{OsvGsEcosystem, OsvGsEcosystems};
use crate::{
downloader::{chuncked_download_to, simple_download_to},
osv_gs::{osv_archive_url, osv_modified_id_csv_url, osv_record_url},
types::{OsvModifiedRecord, OsvRecord, OsvRecordId},
};
const OSV_RECORD_FILE_EXTENSION: &str = "json";
const RECORDS_DIRECTORY: &str = "records";
#[derive(Debug, Clone)]
pub struct OsvDb(Arc<OsvDbInner>);
#[derive(Debug)]
struct OsvDbInner {
location: PathBuf,
ecosystems: OsvGsEcosystems,
last_modified: AtomicI64,
}
impl OsvDb {
pub fn new(
ecosystems: OsvGsEcosystems,
path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
anyhow::ensure!(
path.as_ref().is_dir(),
"Provided `path` {} must be a directory and exists",
path.as_ref().display()
);
Ok(Self(Arc::new(OsvDbInner {
location: path.as_ref().to_path_buf(),
ecosystems,
last_modified: AtomicI64::default(),
})))
}
#[must_use]
pub fn location(&self) -> &Path {
&self.0.location
}
#[must_use]
pub fn ecosystems(&self) -> &OsvGsEcosystems {
&self.0.ecosystems
}
#[must_use]
pub fn last_modified(&self) -> DateTime<Utc> {
DateTime::<Utc>::from_timestamp_nanos(self.0.last_modified.load(Ordering::Acquire))
}
fn records_dir(&self) -> PathBuf {
self.location().join(RECORDS_DIRECTORY)
}
fn tmp_dir(
&self,
prefix: &str,
) -> anyhow::Result<tempfile::TempDir> {
Ok(tempfile::Builder::new()
.prefix(prefix)
.tempdir_in(self.location())?)
}
pub fn get_record(
&self,
id: &OsvRecordId,
) -> anyhow::Result<Option<OsvRecord>> {
let records_dir = self.records_dir();
let mut record_path = records_dir.join(id);
record_path.add_extension(OSV_RECORD_FILE_EXTENSION);
if !record_path.exists() {
return Ok(None);
}
let osv_record_file = File::open(record_path)?;
let osv_record = serde_json::from_reader(&osv_record_file)?;
Ok(Some(osv_record))
}
pub fn records_stream(
&self
) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<OsvRecord>>> {
let records_dir_content = std::fs::read_dir(self.records_dir())?;
let stream = futures::stream::iter(records_dir_content)
.filter_map(|entry| {
async {
Some(entry).filter(|e| {
e.as_ref().is_ok_and(|e| {
e.path().extension().and_then(|e| e.to_str())
== Some(OSV_RECORD_FILE_EXTENSION)
})
})
}
})
.then(|entry| {
async move {
let entry = entry?;
let bytes = tokio::fs::read(entry.path()).await?;
let osv_record = serde_json::from_slice(&bytes)?;
anyhow::Ok(osv_record)
}
});
Ok(stream.boxed())
}
pub async fn download_latest(
&self,
chunk_size: u64,
) -> anyhow::Result<()> {
let tmp_dir = self.tmp_dir("osv-download")?;
let client = reqwest::Client::new();
let new_last_modified =
download_latest_archives(&client, &self.0.ecosystems, &tmp_dir, chunk_size).await?;
let records_dir = self.records_dir();
if records_dir.exists() {
std::fs::remove_dir_all(&records_dir)?;
}
std::fs::rename(&tmp_dir, records_dir)?;
let new_last_modified_timestamp_nanos = new_last_modified.timestamp_nanos_opt().context(format!("The date must be between 1677-09-21T00:12:43.145224192 and and 2262-04-11T23:47:16.854775807, provided: {new_last_modified}"))?;
self.0
.last_modified
.store(new_last_modified_timestamp_nanos, Ordering::Release);
Ok(())
}
pub async fn sync(
&self
) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<OsvRecord>>> {
let tmp_dir = self.tmp_dir("osv-sync")?;
let last_modified = self.last_modified();
let client = reqwest::Client::new();
let (new_last_modified, entries_to_download) =
collect_modified_entries(&client, &self.0.ecosystems, last_modified).await?;
let mut tasks = tokio::task::JoinSet::new();
for entry in entries_to_download {
let client = client.clone();
let tmp_path = tmp_dir.path().to_path_buf();
tasks.spawn(async move {
let mut record_filename = PathBuf::from(&entry.id);
record_filename.add_extension(OSV_RECORD_FILE_EXTENSION);
simple_download_to(
&client,
&osv_record_url(Some(&entry.ecosystem), &entry.id),
&tmp_path.join(&record_filename),
)
.await?;
anyhow::Ok(())
});
}
while let Some(res) = tasks.join_next().await {
res??;
}
let records_dir = self.records_dir();
if !records_dir.exists() {
std::fs::create_dir(&records_dir)?;
}
let mut tasks = tokio::task::JoinSet::new();
for entry in std::fs::read_dir(tmp_dir.path())? {
let records_dir = records_dir.clone();
tasks.spawn(async move {
let entry = entry?;
let dest = records_dir.join(entry.file_name());
tokio::fs::rename(entry.path(), &dest).await?;
anyhow::Ok(dest)
});
}
let new_record_paths: Vec<PathBuf> = tasks
.join_all()
.await
.into_iter()
.collect::<Result<_, _>>()?;
let new_last_modified_timestamp_nanos = new_last_modified.timestamp_nanos_opt().context(format!("The date must be between 1677-09-21T00:12:43.145224192 and and 2262-04-11T23:47:16.854775807, provided: {new_last_modified}"))?;
self.0
.last_modified
.store(new_last_modified_timestamp_nanos, Ordering::Release);
let stream = futures::stream::iter(new_record_paths).then(|path| {
async move {
let bytes = tokio::fs::read(&path).await?;
let osv_record = serde_json::from_slice(&bytes)?;
anyhow::Ok(osv_record)
}
});
Ok(stream.boxed())
}
}
async fn download_latest_archives(
client: &reqwest::Client,
ecosystems: &OsvGsEcosystems,
path: impl AsRef<Path>,
chunk_size: u64,
) -> anyhow::Result<DateTime<Utc>> {
if ecosystems.is_all() {
download_archive_for_ecosystem(client, None, &path, chunk_size).await
} else {
let mut tasks = tokio::task::JoinSet::new();
for eco in ecosystems.iter() {
let client = client.clone();
let path = path.as_ref().to_path_buf();
let eco = *eco;
tasks.spawn(async move {
download_archive_for_ecosystem(&client, Some(&eco), &path, chunk_size).await
});
}
tasks
.join_all()
.await
.into_iter()
.try_fold(DateTime::<Utc>::MIN_UTC, |last_modified, new_modified| {
anyhow::Ok(last_modified.max(new_modified?))
})
}
}
async fn download_archive_for_ecosystem(
client: &reqwest::Client,
ecosystem: Option<&OsvGsEcosystem>,
path: impl AsRef<Path>,
chunk_size: u64,
) -> anyhow::Result<DateTime<Utc>> {
download_and_extract_osv_archive(client, ecosystem, &path, chunk_size).await?;
let mut csv_rdr = download_osv_modified_csv(client, ecosystem).await?;
let first_record = csv_rdr
.records()
.next()
.context("OSV modified csv file must have at least one entry")?;
let entry = OsvModifiedRecord::try_from_csv_record(&first_record?, ecosystem.copied())?;
Ok(entry.modified)
}
async fn collect_modified_entries(
client: &reqwest::Client,
ecosystems: &OsvGsEcosystems,
since: DateTime<Utc>,
) -> anyhow::Result<(DateTime<Utc>, Vec<OsvModifiedRecord>)> {
if ecosystems.is_all() {
collect_entries_from_csv(client, None, since).await
} else {
let mut tasks = tokio::task::JoinSet::new();
for eco in ecosystems.iter() {
let client = client.clone();
let eco = *eco;
tasks.spawn(async move { collect_entries_from_csv(&client, Some(&eco), since).await });
}
tasks.join_all().await.into_iter().try_fold(
(since, Vec::new()),
|(last_modified, mut entries), res| {
let (new_modified, new_entries) = res?;
entries.extend(new_entries);
anyhow::Ok((last_modified.max(new_modified), entries))
},
)
}
}
async fn collect_entries_from_csv(
client: &reqwest::Client,
ecosystem: Option<&OsvGsEcosystem>,
since: DateTime<Utc>,
) -> anyhow::Result<(DateTime<Utc>, Vec<OsvModifiedRecord>)> {
let mut new_last_modified = since;
let mut entries = Vec::new();
let mut csv_rdr = download_osv_modified_csv(client, ecosystem).await?;
for result in csv_rdr.records() {
let entry = OsvModifiedRecord::try_from_csv_record(&result?, ecosystem.copied())?;
if entry.modified <= since {
break;
}
new_last_modified = new_last_modified.max(entry.modified);
entries.push(entry);
}
Ok((new_last_modified, entries))
}
async fn download_and_extract_osv_archive(
client: &reqwest::Client,
ecosystem: Option<&OsvGsEcosystem>,
path: impl AsRef<Path>,
chunk_size: u64,
) -> anyhow::Result<()> {
let temp_zip_archive_dir = tempdir_in(&path)?;
let archive = chuncked_download_to(
client,
&osv_archive_url(ecosystem),
chunk_size,
temp_zip_archive_dir.path().join("osv.zip"),
)
.await?;
let mut zip_archive = zip::ZipArchive::new(archive)?;
zip_archive.extract(&path)?;
Ok(())
}
async fn download_osv_modified_csv(
client: &reqwest::Client,
ecosystem: Option<&OsvGsEcosystem>,
) -> anyhow::Result<csv::Reader<Cursor<Bytes>>> {
let csv_bytes = client
.get(osv_modified_id_csv_url(ecosystem))
.send()
.await?
.bytes()
.await?;
Ok(csv::ReaderBuilder::new()
.has_headers(false)
.from_reader(Cursor::new(csv_bytes)))
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::atomic::Ordering};
use futures::StreamExt;
use tempfile::TempDir;
use super::*;
#[tokio::test]
async fn download_latest_test() {
let tmp = TempDir::new().unwrap();
let osv = OsvDb::new(
OsvGsEcosystems::all()
.add(OsvGsEcosystem::CratesIo)
.add(OsvGsEcosystem::Julia),
tmp.path(),
)
.unwrap();
let record_ids = [
"RUSTSEC-2024-0401".to_string(),
"JLSEC-2025-329".to_string(),
];
for record_id in &record_ids {
assert!(osv.get_record(record_id).unwrap().is_none());
}
osv.download_latest(10 * 1024 * 1024).await.unwrap();
for record_id in &record_ids {
let record = osv.get_record(record_id).unwrap().unwrap();
assert_eq!(&record.id, record_id);
}
let ids: HashSet<OsvRecordId> = osv
.records_stream()
.unwrap()
.map(|r| r.unwrap().id)
.collect()
.await;
for record_id in &record_ids {
assert!(ids.contains(record_id));
}
}
#[tokio::test]
async fn sync_test() {
let last_modified: DateTime<Utc> = "2026-03-05T00:00:00Z".parse().unwrap();
let tmp = TempDir::new().unwrap();
let osv = OsvDb::new(
OsvGsEcosystems::all().add(OsvGsEcosystem::CratesIo),
tmp.path(),
)
.unwrap();
let record_id = "RUSTSEC-2026-0032".to_string();
assert!(osv.get_record(&record_id).unwrap().is_none());
osv.0.last_modified.store(
last_modified.timestamp_nanos_opt().unwrap(),
Ordering::Release,
);
let sync_records: Vec<OsvRecord> = osv
.sync()
.await
.unwrap()
.map(|r| r.unwrap())
.collect()
.await;
assert!(
osv.get_record(&record_id).unwrap().is_some(),
"RUSTSEC-2026-0032 should exist after sync"
);
let stream_ids: HashSet<String> = osv
.records_stream()
.unwrap()
.map(|r| r.unwrap().id)
.collect()
.await;
for sync_record in &sync_records {
assert!(
stream_ids.contains(&sync_record.id),
"sync record {} is missing from records_stream",
sync_record.id
);
assert!(
sync_record.modified >= last_modified,
"sync record {} has modified {} which is before last_modified {}",
sync_record.id,
sync_record.modified,
last_modified
);
}
}
}