use crate::config::StoreConfig;
use crate::ecosystem::normalize_package_key;
use crate::error::{AdvisoryError, Result};
use crate::models::Advisory;
use async_stream::try_stream;
use async_trait::async_trait;
use futures_util::Stream;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::pin::Pin;
use std::time::Instant;
use tracing::{info, instrument};
#[async_trait]
pub trait AdvisoryStore: Send + Sync {
async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()>;
async fn get(&self, id: &str) -> Result<Option<Advisory>>;
async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>>;
async fn last_sync(&self, source: &str) -> Result<Option<String>>;
async fn health_check(&self) -> Result<HealthStatus>;
async fn get_by_package_stream(
&self,
ecosystem: &str,
package: &str,
) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>>;
async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>>;
async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()>;
async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>>;
async fn get_enrichment_batch(
&self,
cve_ids: &[String],
) -> Result<Vec<(String, EnrichmentData)>>;
async fn update_sync_timestamp(&self, source: &str) -> Result<()>;
async fn reset_sync_timestamp(&self, source: &str) -> Result<()>;
async fn advisory_count(&self) -> Result<u64>;
async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()>;
async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>>;
async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()>;
async fn invalidate_all_ossindex_cache(&self) -> Result<u64>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub connected: bool,
pub latency_ms: u64,
pub advisory_count: u64,
pub server_info: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnrichmentData {
pub epss_score: Option<f64>,
pub epss_percentile: Option<f64>,
pub is_kev: bool,
pub kev_due_date: Option<String>,
pub kev_date_added: Option<String>,
pub kev_ransomware: Option<bool>,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OssIndexCache {
pub advisories: Vec<crate::models::Advisory>,
pub cached_at: chrono::DateTime<chrono::Utc>,
pub ttl_seconds: u64,
}
const DEFAULT_OSSINDEX_CACHE_TTL: u64 = 3600;
impl OssIndexCache {
pub fn new(advisories: Vec<crate::models::Advisory>) -> Self {
Self {
advisories,
cached_at: chrono::Utc::now(),
ttl_seconds: DEFAULT_OSSINDEX_CACHE_TTL,
}
}
pub fn with_ttl(advisories: Vec<crate::models::Advisory>, ttl_seconds: u64) -> Self {
Self {
advisories,
cached_at: chrono::Utc::now(),
ttl_seconds,
}
}
pub fn is_valid(&self) -> bool {
!self.is_expired()
}
pub fn is_expired(&self) -> bool {
let age = chrono::Utc::now().signed_duration_since(self.cached_at);
age.num_seconds() >= self.ttl_seconds as i64
}
pub fn remaining_ttl(&self) -> i64 {
let age = chrono::Utc::now().signed_duration_since(self.cached_at);
(self.ttl_seconds as i64) - age.num_seconds()
}
}
pub struct DragonflyStore {
client: redis::Client,
config: StoreConfig,
}
impl DragonflyStore {
pub fn new(url: &str) -> Result<Self> {
Self::with_config(url, StoreConfig::default())
}
pub fn with_config(url: &str, config: StoreConfig) -> Result<Self> {
let client = redis::Client::open(url)?;
Ok(Self { client, config })
}
pub fn key_prefix(&self) -> &str {
&self.config.key_prefix
}
fn key(&self, suffix: &str) -> String {
format!("{}:{}", self.config.key_prefix, suffix)
}
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
let mut encoder =
zstd::stream::write::Encoder::new(Vec::new(), self.config.compression_level)?;
encoder.write_all(data)?;
encoder
.finish()
.map_err(|e| AdvisoryError::compression(e.to_string()))
}
fn decompress(data: &[u8]) -> Result<Vec<u8>> {
let mut decoder = zstd::stream::read::Decoder::new(data)?;
let mut decoded = Vec::new();
std::io::Read::read_to_end(&mut decoder, &mut decoded)?;
Ok(decoded)
}
async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(AdvisoryError::from)
}
}
#[async_trait]
impl AdvisoryStore for DragonflyStore {
#[instrument(skip(self, advisories), fields(count = advisories.len()))]
async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let mut pipe = redis::pipe();
for advisory in advisories {
let json = serde_json::to_vec(advisory)?;
let compressed = self.compress(&json)?;
let data_key = self.key(&format!("data:{}", advisory.id));
if let Some(ttl) = self.config.ttl_seconds {
pipe.cmd("SETEX").arg(&data_key).arg(ttl).arg(compressed);
} else {
pipe.set(&data_key, compressed);
}
for affected in &advisory.affected {
let (ecosystem, package) =
normalize_package_key(&affected.package.ecosystem, &affected.package.name);
let idx_key = self.key(&format!("idx:{}:{}", ecosystem, package));
pipe.sadd(&idx_key, &advisory.id);
}
}
pipe.query_async::<()>(&mut conn).await?;
info!("Upserted {} advisories from {}", advisories.len(), source);
Ok(())
}
async fn get(&self, id: &str) -> Result<Option<Advisory>> {
let mut conn = self.get_connection().await?;
let data: Option<Vec<u8>> = conn.get(self.key(&format!("data:{}", id))).await?;
match data {
Some(bytes) => {
let decompressed = Self::decompress(&bytes)?;
let advisory = serde_json::from_slice(&decompressed)?;
Ok(Some(advisory))
}
None => Ok(None),
}
}
async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
let (ecosystem, package) = normalize_package_key(ecosystem, package);
let mut conn = self.get_connection().await?;
let ids: Vec<String> = conn
.smembers(self.key(&format!("idx:{}:{}", ecosystem, package)))
.await?;
self.get_batch(&ids).await
}
async fn last_sync(&self, source: &str) -> Result<Option<String>> {
let mut conn = self.get_connection().await?;
Ok(conn.get(self.key(&format!("meta:{}", source))).await?)
}
async fn health_check(&self) -> Result<HealthStatus> {
let start = Instant::now();
let mut conn = self.get_connection().await?;
let pong: String = redis::cmd("PING").query_async(&mut conn).await?;
let connected = pong == "PONG";
let latency_ms = start.elapsed().as_millis() as u64;
let advisory_count = self.advisory_count().await.unwrap_or(0);
let info: std::result::Result<String, _> = redis::cmd("INFO")
.arg("server")
.query_async(&mut conn)
.await;
let server_info = info.ok().and_then(|s| {
s.lines()
.find(|l| l.starts_with("redis_version:"))
.map(|l| l.to_string())
});
Ok(HealthStatus {
connected,
latency_ms,
advisory_count,
server_info,
})
}
async fn get_by_package_stream(
&self,
ecosystem: &str,
package: &str,
) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>> {
let (ecosystem, package) = normalize_package_key(ecosystem, package);
let idx_key = self.key(&format!("idx:{}:{}", ecosystem, package));
let stream = try_stream! {
let mut conn = self.get_connection().await?;
let mut cursor = 0u64;
loop {
let (new_cursor, ids): (u64, Vec<String>) = redis::cmd("SSCAN")
.arg(&idx_key)
.arg(cursor)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for id in ids {
if let Some(advisory) = self.get(&id).await? {
yield advisory;
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
};
Ok(Box::pin(stream))
}
async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>> {
if ids.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.get_connection().await?;
let keys: Vec<String> = ids
.iter()
.map(|id| self.key(&format!("data:{}", id)))
.collect();
let data: Vec<Option<Vec<u8>>> =
redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
let mut advisories = Vec::new();
for bytes in data.into_iter().flatten() {
let decompressed = Self::decompress(&bytes)?;
let advisory: Advisory = serde_json::from_slice(&decompressed)?;
advisories.push(advisory);
}
Ok(advisories)
}
async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()> {
let mut conn = self.get_connection().await?;
let key = self.key(&format!("enrich:{}", cve_id));
let json = serde_json::to_string(data)?;
if let Some(ttl) = self.config.ttl_seconds {
redis::cmd("SETEX")
.arg(&key)
.arg(ttl)
.arg(json)
.query_async::<()>(&mut conn)
.await?;
} else {
let _: () = conn.set(&key, json).await?;
}
Ok(())
}
async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>> {
let mut conn = self.get_connection().await?;
let key = self.key(&format!("enrich:{}", cve_id));
let data: Option<String> = conn.get(&key).await?;
match data {
Some(json) => Ok(Some(serde_json::from_str(&json)?)),
None => Ok(None),
}
}
async fn get_enrichment_batch(
&self,
cve_ids: &[String],
) -> Result<Vec<(String, EnrichmentData)>> {
if cve_ids.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.get_connection().await?;
let keys: Vec<String> = cve_ids
.iter()
.map(|id| self.key(&format!("enrich:{}", id)))
.collect();
let data: Vec<Option<String>> =
redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
let mut results = Vec::new();
for (cve_id, json_opt) in cve_ids.iter().zip(data) {
if let Some(json) = json_opt {
if let Ok(enrichment) = serde_json::from_str(&json) {
results.push((cve_id.clone(), enrichment));
}
}
}
Ok(results)
}
async fn update_sync_timestamp(&self, source: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.set(
self.key(&format!("meta:{}", source)),
chrono::Utc::now().to_rfc3339(),
)
.await?;
Ok(())
}
async fn reset_sync_timestamp(&self, source: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn.del(self.key(&format!("meta:{}", source))).await?;
info!("Reset sync timestamp for {}", source);
Ok(())
}
async fn advisory_count(&self) -> Result<u64> {
let mut conn = self.get_connection().await?;
let pattern = self.key("data:*");
let mut count = 0u64;
let mut cursor = 0u64;
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(1000)
.query_async(&mut conn)
.await?;
count += keys.len() as u64;
cursor = new_cursor;
if cursor == 0 {
break;
}
}
Ok(count)
}
async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()> {
let mut conn = self.get_connection().await?;
let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
let json = serde_json::to_string(cache)?;
let ttl = cache.remaining_ttl().max(1) as u64;
redis::cmd("SETEX")
.arg(&key)
.arg(ttl)
.arg(json)
.query_async::<()>(&mut conn)
.await?;
Ok(())
}
async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>> {
let mut conn = self.get_connection().await?;
let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
let data: Option<String> = conn.get(&key).await?;
match data {
Some(json) => {
let cache: OssIndexCache = serde_json::from_str(&json)?;
if cache.is_valid() {
Ok(Some(cache))
} else {
let _: () = conn.del(&key).await?;
Ok(None)
}
}
None => Ok(None),
}
}
async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
let _: () = conn.del(&key).await?;
Ok(())
}
async fn invalidate_all_ossindex_cache(&self) -> Result<u64> {
let mut conn = self.get_connection().await?;
let pattern = self.key("ossidx:*");
let mut deleted = 0u64;
let mut cursor = 0u64;
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(1000)
.query_async(&mut conn)
.await?;
if !keys.is_empty() {
let count: u64 = redis::cmd("DEL").arg(&keys).query_async(&mut conn).await?;
deleted += count;
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
Ok(deleted)
}
}
impl DragonflyStore {
fn hash_purl(purl: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
purl.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
}