use rustc_hash::{FxHashMap, FxHashSet};
use std::str::FromStr as _;
use tracing::trace;
use crate::types;
use futures::{StreamExt as _, TryStreamExt as _};
use jiff::Timestamp;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use uv_configuration::Concurrency;
use uv_pep440::Version;
use uv_redacted::{DisplaySafeUrl, DisplaySafeUrlError};
pub const API_BASE: &str = "https://api.osv.dev/";
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
ReqwestMiddleware(#[from] reqwest_middleware::Error),
#[error("Invalid API URL: {0}")]
Url(DisplaySafeUrl, #[source] DisplaySafeUrlError),
}
#[derive(Debug, Clone, Serialize)]
struct Package {
name: String,
ecosystem: String,
}
#[derive(Debug, Clone, Serialize)]
struct QueryRequest {
package: Package,
version: String,
#[serde(skip_serializing_if = "Option::is_none")]
page_token: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
enum Event {
Introduced(#[allow(dead_code)] String),
Fixed(String),
LastAffected(#[allow(dead_code)] String),
Limit(#[allow(dead_code)] String),
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
enum RangeType {
Semver,
Ecosystem,
Git,
#[serde(other)]
Other,
}
#[derive(Debug, Clone, Deserialize)]
struct Range {
#[serde(rename = "type")]
range_type: RangeType,
events: Vec<Event>,
}
#[derive(Debug, Clone, Deserialize)]
struct Affected {
ranges: Option<Vec<Range>>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
enum ReferenceType {
Advisory,
Article,
Detection,
Discussion,
Report,
Fix,
Introduced,
Package,
Evidence,
Web,
#[serde(other)]
Other,
}
#[derive(Debug, Clone, Deserialize)]
struct Reference {
#[serde(rename = "type")]
reference_type: ReferenceType,
url: DisplaySafeUrl,
}
#[derive(Debug, Clone, Deserialize)]
struct Vulnerability {
id: String,
modified: Timestamp,
#[allow(dead_code)]
schema_version: Option<String>,
summary: Option<String>,
details: Option<String>,
published: Option<Timestamp>,
affected: Option<Vec<Affected>>,
aliases: Option<Vec<String>>,
references: Option<Vec<Reference>>,
}
#[derive(Debug, Clone, Serialize)]
struct QueryBatchRequest {
queries: Vec<QueryRequest>,
}
#[derive(Debug, Clone, Deserialize)]
struct VulnSummary {
id: String,
}
#[derive(Debug, Clone, Deserialize)]
struct QueryBatchResult {
#[serde(default)]
vulns: Vec<VulnSummary>,
next_page_token: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
struct QueryBatchResponse {
results: Vec<QueryBatchResult>,
}
#[derive(Debug, Copy, Clone)]
pub enum Filter {
All,
Malware,
}
impl Filter {
fn matches(self, id: &str) -> bool {
match self {
Self::All => true,
Self::Malware => id.starts_with("MAL-"),
}
}
}
pub struct Osv {
base_url: DisplaySafeUrl,
client: ClientWithMiddleware,
concurrency: Concurrency,
}
impl Osv {
pub fn new(
client: ClientWithMiddleware,
base_url: Option<DisplaySafeUrl>,
concurrency: Concurrency,
) -> Self {
Self {
base_url: base_url.unwrap_or_else(|| {
DisplaySafeUrl::parse(API_BASE).expect("impossible: embedded URL is invalid")
}),
client,
concurrency,
}
}
pub async fn query_batch(
&self,
dependencies: &[types::Dependency],
filter: Filter,
) -> Result<Vec<types::Finding>, Error> {
if dependencies.is_empty() {
return Ok(vec![]);
}
let mut dep_vuln_ids: Vec<(&types::Dependency, String)> = Vec::new();
let mut pending: Vec<(&types::Dependency, Option<String>)> =
dependencies.iter().map(|dep| (dep, None)).collect();
loop {
let request = QueryBatchRequest {
queries: pending
.iter()
.map(|(dep, page_token)| QueryRequest {
package: Package {
name: dep.name().to_string(),
ecosystem: "PyPI".to_string(),
},
version: dep.version().to_string(),
page_token: page_token.clone(),
})
.collect(),
};
let url = self
.base_url
.join("v1/querybatch")
.map_err(|e| Error::Url(self.base_url.clone(), e))?;
let batch_response: QueryBatchResponse = self
.client
.post(url.as_ref())
.json(&request)
.send()
.await?
.error_for_status()
.map_err(reqwest_middleware::Error::Reqwest)?
.json()
.await
.map_err(reqwest_middleware::Error::Reqwest)?;
let mut next_pending = Vec::new();
for ((dep, _), result) in pending.iter().zip(batch_response.results.iter()) {
dep_vuln_ids.extend(
result
.vulns
.iter()
.filter(|v| filter.matches(&v.id))
.map(|v| (*dep, v.id.clone())),
);
if let Some(token) = &result.next_page_token {
next_pending.push((*dep, Some(token.clone())));
}
}
if next_pending.is_empty() {
break;
}
pending = next_pending;
}
let unique_ids: FxHashSet<_> = dep_vuln_ids.iter().map(|(_, id)| id.clone()).collect();
let vuln_details = futures::stream::iter(unique_ids)
.map(async |id| {
let vuln = self.fetch_vuln(&id).await?;
Ok::<(String, Vulnerability), Error>((id, vuln))
})
.buffer_unordered(self.concurrency.downloads)
.try_collect::<FxHashMap<String, Vulnerability>>()
.await?;
let findings = dep_vuln_ids
.iter()
.filter_map(|(dep, vuln_id)| {
vuln_details
.get(vuln_id)
.map(|vuln| Self::vulnerability_to_finding(dep, vuln.clone()))
})
.collect();
Ok(findings)
}
async fn fetch_vuln(&self, id: &str) -> Result<Vulnerability, Error> {
let url = self
.base_url
.join(&format!("v1/vulns/{id}"))
.map_err(|e| Error::Url(self.base_url.clone(), e))?;
let vuln: Vulnerability = self
.client
.get(url.as_ref())
.send()
.await?
.error_for_status()
.map_err(reqwest_middleware::Error::Reqwest)?
.json()
.await
.map_err(reqwest_middleware::Error::Reqwest)?;
Ok(vuln)
}
fn vulnerability_to_finding(
dependency: &types::Dependency,
vuln: Vulnerability,
) -> types::Finding {
let link = vuln
.references
.as_ref()
.and_then(|references| {
references
.iter()
.find(|reference| matches!(reference.reference_type, ReferenceType::Advisory))
.or_else(|| {
references.iter().find(|reference| {
matches!(reference.reference_type, ReferenceType::Web)
})
})
.map(|reference| reference.url.clone())
})
.unwrap_or_else(|| {
DisplaySafeUrl::parse(&format!("https://osv.dev/vulnerability/{}", vuln.id))
.expect("impossible: synthesized URL is invalid")
});
let fix_versions = vuln
.affected
.iter()
.flatten()
.flat_map(|affected| affected.ranges.iter().flatten())
.filter(|range| matches!(range.range_type, RangeType::Ecosystem))
.flat_map(|range| &range.events)
.filter_map(|event| match event {
Event::Fixed(fixed) => {
if let Ok(fixed) = Version::from_str(fixed) {
Some(fixed)
} else {
trace!(
"Skipping invalid (non-PEP 440) version in OSV record {id}: {fixed}",
id = vuln.id,
);
None
}
}
_ => None,
})
.collect();
let aliases = vuln
.aliases
.unwrap_or_default()
.into_iter()
.map(types::VulnerabilityID::new)
.collect();
types::Finding::Vulnerability(
types::Vulnerability::new(
dependency.clone(),
types::VulnerabilityID::new(vuln.id),
vuln.summary,
vuln.details,
Some(link),
fix_versions,
aliases,
vuln.published,
Some(vuln.modified),
)
.into(),
)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::json;
use uv_configuration::Concurrency;
use uv_normalize::PackageName;
use uv_pep440::Version;
use uv_redacted::DisplaySafeUrl;
use wiremock::matchers::{body_json, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use crate::service::osv::{Filter, RangeType};
use crate::types::{Dependency, Finding};
use super::Event;
use super::Osv;
#[test]
fn test_deserialize_events() {
let json = r#"[{ "introduced": "0" }, { "fixed": "46.0.5" }]"#;
let events: Vec<Event> = serde_json::from_str(json).expect("Failed to deserialize events");
insta::assert_debug_snapshot!(events, @r#"
[
Introduced(
"0",
),
Fixed(
"46.0.5",
),
]
"#);
}
#[test]
fn test_deserialize_rangetype() {
let json = r#"[
"SEMVER",
"ECOSYSTEM",
"GIT",
"OTHER",
"UNKNOWN_TYPE"
]"#;
let types: Vec<RangeType> =
serde_json::from_str(json).expect("Failed to deserialize range types");
insta::assert_debug_snapshot!(types, @"
[
Semver,
Ecosystem,
Git,
Other,
Other,
]
");
}
#[tokio::test]
async fn test_query_batch_basic() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/querybatch"))
.and(body_json(json!({
"queries": [
{
"package": { "name": "package-a", "ecosystem": "PyPI" },
"version": "1.0.0",
},
{
"package": { "name": "package-b", "ecosystem": "PyPI" },
"version": "2.0.0",
}
]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [
{ "vulns": [{ "id": "VULN-1", "modified": "2026-01-01T00:00:00Z" }] },
{ "vulns": [{ "id": "VULN-2", "modified": "2026-01-02T00:00:00Z" }] }
]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/vulns/VULN-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "VULN-1",
"modified": "2026-01-01T00:00:00Z",
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/vulns/VULN-2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "VULN-2",
"modified": "2026-01-02T00:00:00Z",
})))
.mount(&server)
.await;
let osv = Osv::new(
ClientWithMiddleware::default(),
Some(DisplaySafeUrl::parse(&server.uri()).unwrap()),
Concurrency::default(),
);
let dependencies = vec![
Dependency::new(
PackageName::from_str("package-a").unwrap(),
Version::from_str("1.0.0").unwrap(),
),
Dependency::new(
PackageName::from_str("package-b").unwrap(),
Version::from_str("2.0.0").unwrap(),
),
];
let findings = osv
.query_batch(&dependencies, Filter::All)
.await
.expect("Failed to query batch");
insta::assert_debug_snapshot!(findings, @r#"
[
Vulnerability(
Vulnerability {
dependency: Dependency {
name: PackageName(
"package-a",
),
version: "1.0.0",
},
id: VulnerabilityID(
"VULN-1",
),
summary: None,
description: None,
link: Some(
DisplaySafeUrl {
scheme: "https",
cannot_be_a_base: false,
username: "",
password: None,
host: Some(
Domain(
"osv.dev",
),
),
port: None,
path: "/vulnerability/VULN-1",
query: None,
fragment: None,
},
),
fix_versions: [],
aliases: [],
published: None,
modified: Some(
2026-01-01T00:00:00Z,
),
},
),
Vulnerability(
Vulnerability {
dependency: Dependency {
name: PackageName(
"package-b",
),
version: "2.0.0",
},
id: VulnerabilityID(
"VULN-2",
),
summary: None,
description: None,
link: Some(
DisplaySafeUrl {
scheme: "https",
cannot_be_a_base: false,
username: "",
password: None,
host: Some(
Domain(
"osv.dev",
),
),
port: None,
path: "/vulnerability/VULN-2",
query: None,
fragment: None,
},
),
fix_versions: [],
aliases: [],
published: None,
modified: Some(
2026-01-02T00:00:00Z,
),
},
),
]
"#);
assert_eq!(
server.received_requests().await.unwrap().len(),
3,
"Expected one querybatch request and two vuln detail requests"
);
}
#[tokio::test]
async fn test_query_batch_pagination() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/querybatch"))
.and(body_json(json!({
"queries": [
{
"package": { "name": "package-a", "ecosystem": "PyPI" },
"version": "1.0.0",
},
{
"package": { "name": "package-b", "ecosystem": "PyPI" },
"version": "2.0.0",
}
]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [
{
"vulns": [{ "id": "VULN-1", "modified": "2026-01-01T00:00:00Z" }],
"next_page_token": "tok1"
},
{
"vulns": [{ "id": "VULN-2", "modified": "2026-01-02T00:00:00Z" }]
}
]
})))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/querybatch"))
.and(body_json(json!({
"queries": [
{
"package": { "name": "package-a", "ecosystem": "PyPI" },
"version": "1.0.0",
"page_token": "tok1",
}
]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [
{ "vulns": [{ "id": "VULN-3", "modified": "2026-01-03T00:00:00Z" }] }
]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/vulns/VULN-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "VULN-1",
"modified": "2026-01-01T00:00:00Z",
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/vulns/VULN-2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "VULN-2",
"modified": "2026-01-02T00:00:00Z",
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/vulns/VULN-3"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "VULN-3",
"modified": "2026-01-03T00:00:00Z",
})))
.mount(&server)
.await;
let osv = Osv::new(
ClientWithMiddleware::default(),
Some(DisplaySafeUrl::parse(&server.uri()).unwrap()),
Concurrency::default(),
);
let dependencies = vec![
Dependency::new(
PackageName::from_str("package-a").unwrap(),
Version::from_str("1.0.0").unwrap(),
),
Dependency::new(
PackageName::from_str("package-b").unwrap(),
Version::from_str("2.0.0").unwrap(),
),
];
let findings = osv
.query_batch(&dependencies, Filter::All)
.await
.expect("Failed to query batch");
assert_eq!(findings.len(), 3);
let mut ids: Vec<&str> = findings
.iter()
.map(|f| match f {
Finding::Vulnerability(v) => v.id.as_str(),
Finding::ProjectStatus(_) => unreachable!(),
})
.collect();
ids.sort_unstable();
assert_eq!(ids, ["VULN-1", "VULN-2", "VULN-3"]);
assert_eq!(
server.received_requests().await.unwrap().len(),
5,
"Expected two querybatch requests and three vuln detail requests"
);
}
#[tokio::test]
async fn test_query_batch_malware_filter() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/querybatch"))
.and(body_json(json!({
"queries": [
{
"package": { "name": "package-a", "ecosystem": "PyPI" },
"version": "1.0.0",
}
]
})))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"results": [
{
"vulns": [
{ "id": "MAL-2026-1234", "modified": "2026-01-01T00:00:00Z" },
{ "id": "GHSA-xxxx-yyyy", "modified": "2026-01-02T00:00:00Z" }
]
}
]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/vulns/MAL-2026-1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"id": "MAL-2026-1234",
"modified": "2026-01-01T00:00:00Z",
})))
.mount(&server)
.await;
let osv = Osv::new(
ClientWithMiddleware::default(),
Some(DisplaySafeUrl::parse(&server.uri()).unwrap()),
Concurrency::default(),
);
let dependencies = vec![Dependency::new(
PackageName::from_str("package-a").unwrap(),
Version::from_str("1.0.0").unwrap(),
)];
let findings = osv
.query_batch(&dependencies, Filter::Malware)
.await
.expect("Failed to query batch");
let [Finding::Vulnerability(v)] = findings.as_slice() else {
panic!("Expected exactly one vulnerability finding");
};
assert_eq!(v.id.as_str(), "MAL-2026-1234");
assert_eq!(
server.received_requests().await.unwrap().len(),
2,
"Expected one querybatch request and one vuln detail request (non-MAL skipped)"
);
}
}