vulnera_advisor/
store.rs

1//! Storage backends for advisory data.
2//!
3//! This module provides the [`AdvisoryStore`] trait and implementations for
4//! persisting and querying vulnerability advisories.
5
6use crate::config::StoreConfig;
7use crate::error::{AdvisoryError, Result};
8use crate::models::Advisory;
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures_util::Stream;
12use redis::AsyncCommands;
13use serde::{Deserialize, Serialize};
14use std::io::Write;
15use std::pin::Pin;
16use std::time::Instant;
17use tracing::{info, instrument};
18
19/// Trait for advisory storage backends.
20#[async_trait]
21pub trait AdvisoryStore: Send + Sync {
22    /// Insert or update a batch of advisories.
23    async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()>;
24
25    /// Get a single advisory by ID.
26    async fn get(&self, id: &str) -> Result<Option<Advisory>>;
27
28    /// Get all advisories affecting a specific package.
29    async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>>;
30
31    /// Get the timestamp of the last sync for a source.
32    async fn last_sync(&self, source: &str) -> Result<Option<String>>;
33
34    /// Check the health of the store connection.
35    async fn health_check(&self) -> Result<HealthStatus>;
36
37    /// Get advisories as a stream for memory-efficient processing.
38    async fn get_by_package_stream(
39        &self,
40        ecosystem: &str,
41        package: &str,
42    ) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>>;
43
44    /// Get multiple advisories by IDs in a batch.
45    async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>>;
46
47    /// Store enrichment data (EPSS/KEV) for a CVE.
48    async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()>;
49
50    /// Get enrichment data for a CVE.
51    async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>>;
52
53    /// Get enrichment data for multiple CVEs.
54    async fn get_enrichment_batch(
55        &self,
56        cve_ids: &[String],
57    ) -> Result<Vec<(String, EnrichmentData)>>;
58
59    /// Update the last sync timestamp for a source.
60    async fn update_sync_timestamp(&self, source: &str) -> Result<()>;
61
62    /// Get the count of stored advisories.
63    async fn advisory_count(&self) -> Result<u64>;
64
65    /// Store an OSS Index component report in cache.
66    ///
67    /// # Arguments
68    ///
69    /// * `purl` - The Package URL that was queried
70    /// * `cache` - The cached component report with metadata
71    async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()>;
72
73    /// Get a cached OSS Index component report.
74    ///
75    /// Returns `None` if not cached or if the cache has expired.
76    async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>>;
77
78    /// Invalidate (delete) a cached OSS Index component report.
79    async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()>;
80
81    /// Invalidate all OSS Index cache entries.
82    async fn invalidate_all_ossindex_cache(&self) -> Result<u64>;
83}
84
85/// Health status of the store.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct HealthStatus {
88    /// Whether the connection is working.
89    pub connected: bool,
90    /// Round-trip latency in milliseconds.
91    pub latency_ms: u64,
92    /// Number of advisory keys (approximate).
93    pub advisory_count: u64,
94    /// Redis server info (version, etc.).
95    pub server_info: Option<String>,
96}
97
98/// Enrichment data stored separately for CVEs.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EnrichmentData {
101    /// EPSS score (0.0 - 1.0).
102    pub epss_score: Option<f64>,
103    /// EPSS percentile (0.0 - 1.0).
104    pub epss_percentile: Option<f64>,
105    /// Whether in CISA KEV catalog.
106    pub is_kev: bool,
107    /// KEV due date (RFC3339).
108    pub kev_due_date: Option<String>,
109    /// KEV date added (RFC3339).
110    pub kev_date_added: Option<String>,
111    /// Whether used in ransomware campaigns.
112    pub kev_ransomware: Option<bool>,
113    /// Last updated timestamp.
114    pub updated_at: String,
115}
116
117/// Cached OSS Index component report.
118///
119/// Stores advisories from OSS Index along with
120/// cache metadata for TTL management.
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct OssIndexCache {
123    /// The converted advisories from OSS Index.
124    pub advisories: Vec<crate::models::Advisory>,
125    /// When this was cached.
126    pub cached_at: chrono::DateTime<chrono::Utc>,
127    /// TTL in seconds from cache time.
128    pub ttl_seconds: u64,
129}
130
131/// Default cache TTL: 1 hour.
132const DEFAULT_OSSINDEX_CACHE_TTL: u64 = 3600;
133
134impl OssIndexCache {
135    /// Create a new cache entry with default TTL.
136    pub fn new(advisories: Vec<crate::models::Advisory>) -> Self {
137        Self {
138            advisories,
139            cached_at: chrono::Utc::now(),
140            ttl_seconds: DEFAULT_OSSINDEX_CACHE_TTL,
141        }
142    }
143
144    /// Create a new cache entry with custom TTL.
145    pub fn with_ttl(advisories: Vec<crate::models::Advisory>, ttl_seconds: u64) -> Self {
146        Self {
147            advisories,
148            cached_at: chrono::Utc::now(),
149            ttl_seconds,
150        }
151    }
152
153    /// Check if this cache entry is still valid (not expired).
154    pub fn is_valid(&self) -> bool {
155        !self.is_expired()
156    }
157
158    /// Check if this cache entry has expired.
159    pub fn is_expired(&self) -> bool {
160        let age = chrono::Utc::now().signed_duration_since(self.cached_at);
161        age.num_seconds() >= self.ttl_seconds as i64
162    }
163
164    /// Get the remaining TTL in seconds.
165    pub fn remaining_ttl(&self) -> i64 {
166        let age = chrono::Utc::now().signed_duration_since(self.cached_at);
167        (self.ttl_seconds as i64) - age.num_seconds()
168    }
169}
170
171/// Redis/DragonflyDB storage implementation.
172pub struct DragonflyStore {
173    client: redis::Client,
174    config: StoreConfig,
175}
176
177impl DragonflyStore {
178    /// Create a new store with default configuration.
179    pub fn new(url: &str) -> Result<Self> {
180        Self::with_config(url, StoreConfig::default())
181    }
182
183    /// Create a new store with custom configuration.
184    pub fn with_config(url: &str, config: StoreConfig) -> Result<Self> {
185        let client = redis::Client::open(url)?;
186        Ok(Self { client, config })
187    }
188
189    /// Get the key prefix for this store.
190    pub fn key_prefix(&self) -> &str {
191        &self.config.key_prefix
192    }
193
194    /// Build a key with the configured prefix.
195    fn key(&self, suffix: &str) -> String {
196        format!("{}:{}", self.config.key_prefix, suffix)
197    }
198
199    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
200        let mut encoder =
201            zstd::stream::write::Encoder::new(Vec::new(), self.config.compression_level)?;
202        encoder.write_all(data)?;
203        encoder
204            .finish()
205            .map_err(|e| AdvisoryError::compression(e.to_string()))
206    }
207
208    fn decompress(data: &[u8]) -> Result<Vec<u8>> {
209        let mut decoder = zstd::stream::read::Decoder::new(data)?;
210        let mut decoded = Vec::new();
211        std::io::Read::read_to_end(&mut decoder, &mut decoded)?;
212        Ok(decoded)
213    }
214
215    async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
216        self.client
217            .get_multiplexed_async_connection()
218            .await
219            .map_err(AdvisoryError::from)
220    }
221}
222
223#[async_trait]
224impl AdvisoryStore for DragonflyStore {
225    #[instrument(skip(self, advisories), fields(count = advisories.len()))]
226    async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()> {
227        let mut conn = self.get_connection().await?;
228        let mut pipe = redis::pipe();
229
230        for advisory in advisories {
231            let json = serde_json::to_vec(advisory)?;
232            let compressed = self.compress(&json)?;
233
234            let data_key = self.key(&format!("data:{}", advisory.id));
235
236            // Store data with optional TTL
237            if let Some(ttl) = self.config.ttl_seconds {
238                pipe.cmd("SETEX").arg(&data_key).arg(ttl).arg(compressed);
239            } else {
240                pipe.set(&data_key, compressed);
241            }
242
243            // Update index
244            for affected in &advisory.affected {
245                let idx_key = self.key(&format!(
246                    "idx:{}:{}",
247                    affected.package.ecosystem, affected.package.name
248                ));
249                pipe.sadd(&idx_key, &advisory.id);
250            }
251        }
252
253        // Update meta
254        pipe.set(
255            self.key(&format!("meta:{}", source)),
256            chrono::Utc::now().to_rfc3339(),
257        );
258
259        pipe.query_async::<()>(&mut conn).await?;
260        info!("Upserted {} advisories from {}", advisories.len(), source);
261        Ok(())
262    }
263
264    async fn get(&self, id: &str) -> Result<Option<Advisory>> {
265        let mut conn = self.get_connection().await?;
266        let data: Option<Vec<u8>> = conn.get(self.key(&format!("data:{}", id))).await?;
267
268        match data {
269            Some(bytes) => {
270                let decompressed = Self::decompress(&bytes)?;
271                let advisory = serde_json::from_slice(&decompressed)?;
272                Ok(Some(advisory))
273            }
274            None => Ok(None),
275        }
276    }
277
278    async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
279        let mut conn = self.get_connection().await?;
280        let ids: Vec<String> = conn
281            .smembers(self.key(&format!("idx:{}:{}", ecosystem, package)))
282            .await?;
283
284        let mut advisories = Vec::new();
285        for id in ids {
286            if let Some(advisory) = self.get(&id).await? {
287                advisories.push(advisory);
288            }
289        }
290        Ok(advisories)
291    }
292
293    async fn last_sync(&self, source: &str) -> Result<Option<String>> {
294        let mut conn = self.get_connection().await?;
295        Ok(conn.get(self.key(&format!("meta:{}", source))).await?)
296    }
297
298    async fn health_check(&self) -> Result<HealthStatus> {
299        let start = Instant::now();
300
301        let mut conn = self.get_connection().await?;
302
303        // Ping to check connection
304        let pong: String = redis::cmd("PING").query_async(&mut conn).await?;
305        let connected = pong == "PONG";
306
307        let latency_ms = start.elapsed().as_millis() as u64;
308
309        // Get approximate key count
310        let advisory_count = self.advisory_count().await.unwrap_or(0);
311
312        // Get server info
313        let info: std::result::Result<String, _> = redis::cmd("INFO")
314            .arg("server")
315            .query_async(&mut conn)
316            .await;
317        let server_info = info.ok().and_then(|s| {
318            s.lines()
319                .find(|l| l.starts_with("redis_version:"))
320                .map(|l| l.to_string())
321        });
322
323        Ok(HealthStatus {
324            connected,
325            latency_ms,
326            advisory_count,
327            server_info,
328        })
329    }
330
331    async fn get_by_package_stream(
332        &self,
333        ecosystem: &str,
334        package: &str,
335    ) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>> {
336        let idx_key = self.key(&format!("idx:{}:{}", ecosystem, package));
337
338        let stream = try_stream! {
339            let mut conn = self.get_connection().await?;
340
341            // Use SSCAN for memory-efficient iteration
342            let mut cursor = 0u64;
343            loop {
344                let (new_cursor, ids): (u64, Vec<String>) = redis::cmd("SSCAN")
345                    .arg(&idx_key)
346                    .arg(cursor)
347                    .arg("COUNT")
348                    .arg(100)
349                    .query_async(&mut conn)
350                    .await?;
351
352                for id in ids {
353                    if let Some(advisory) = self.get(&id).await? {
354                        yield advisory;
355                    }
356                }
357
358                cursor = new_cursor;
359                if cursor == 0 {
360                    break;
361                }
362            }
363        };
364
365        Ok(Box::pin(stream))
366    }
367
368    async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>> {
369        if ids.is_empty() {
370            return Ok(Vec::new());
371        }
372
373        let mut conn = self.get_connection().await?;
374        let keys: Vec<String> = ids
375            .iter()
376            .map(|id| self.key(&format!("data:{}", id)))
377            .collect();
378
379        let data: Vec<Option<Vec<u8>>> =
380            redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
381
382        let mut advisories = Vec::new();
383        for bytes in data.into_iter().flatten() {
384            let decompressed = Self::decompress(&bytes)?;
385            let advisory: Advisory = serde_json::from_slice(&decompressed)?;
386            advisories.push(advisory);
387        }
388
389        Ok(advisories)
390    }
391
392    async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()> {
393        let mut conn = self.get_connection().await?;
394        let key = self.key(&format!("enrich:{}", cve_id));
395        let json = serde_json::to_string(data)?;
396
397        if let Some(ttl) = self.config.ttl_seconds {
398            redis::cmd("SETEX")
399                .arg(&key)
400                .arg(ttl)
401                .arg(json)
402                .query_async::<()>(&mut conn)
403                .await?;
404        } else {
405            let _: () = conn.set(&key, json).await?;
406        }
407
408        Ok(())
409    }
410
411    async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>> {
412        let mut conn = self.get_connection().await?;
413        let key = self.key(&format!("enrich:{}", cve_id));
414        let data: Option<String> = conn.get(&key).await?;
415
416        match data {
417            Some(json) => Ok(Some(serde_json::from_str(&json)?)),
418            None => Ok(None),
419        }
420    }
421
422    async fn get_enrichment_batch(
423        &self,
424        cve_ids: &[String],
425    ) -> Result<Vec<(String, EnrichmentData)>> {
426        if cve_ids.is_empty() {
427            return Ok(Vec::new());
428        }
429
430        let mut conn = self.get_connection().await?;
431        let keys: Vec<String> = cve_ids
432            .iter()
433            .map(|id| self.key(&format!("enrich:{}", id)))
434            .collect();
435
436        let data: Vec<Option<String>> =
437            redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
438
439        let mut results = Vec::new();
440        for (cve_id, json_opt) in cve_ids.iter().zip(data) {
441            if let Some(json) = json_opt {
442                if let Ok(enrichment) = serde_json::from_str(&json) {
443                    results.push((cve_id.clone(), enrichment));
444                }
445            }
446        }
447
448        Ok(results)
449    }
450
451    async fn update_sync_timestamp(&self, source: &str) -> Result<()> {
452        let mut conn = self.get_connection().await?;
453        let _: () = conn
454            .set(
455                self.key(&format!("meta:{}", source)),
456                chrono::Utc::now().to_rfc3339(),
457            )
458            .await?;
459        Ok(())
460    }
461
462    async fn advisory_count(&self) -> Result<u64> {
463        let mut conn = self.get_connection().await?;
464        let pattern = self.key("data:*");
465
466        // Use SCAN to count keys matching pattern
467        let mut count = 0u64;
468        let mut cursor = 0u64;
469
470        loop {
471            let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
472                .arg(cursor)
473                .arg("MATCH")
474                .arg(&pattern)
475                .arg("COUNT")
476                .arg(1000)
477                .query_async(&mut conn)
478                .await?;
479
480            count += keys.len() as u64;
481            cursor = new_cursor;
482
483            if cursor == 0 {
484                break;
485            }
486        }
487
488        Ok(count)
489    }
490
491    async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()> {
492        let mut conn = self.get_connection().await?;
493        let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
494        let json = serde_json::to_string(cache)?;
495
496        // Use the remaining TTL or the configured TTL
497        let ttl = cache.remaining_ttl().max(1) as u64;
498        redis::cmd("SETEX")
499            .arg(&key)
500            .arg(ttl)
501            .arg(json)
502            .query_async::<()>(&mut conn)
503            .await?;
504
505        Ok(())
506    }
507
508    async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>> {
509        let mut conn = self.get_connection().await?;
510        let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
511        let data: Option<String> = conn.get(&key).await?;
512
513        match data {
514            Some(json) => {
515                let cache: OssIndexCache = serde_json::from_str(&json)?;
516                // Double-check validity (Redis TTL should handle this, but be safe)
517                if cache.is_valid() {
518                    Ok(Some(cache))
519                } else {
520                    // Cache expired, delete it
521                    let _: () = conn.del(&key).await?;
522                    Ok(None)
523                }
524            }
525            None => Ok(None),
526        }
527    }
528
529    async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()> {
530        let mut conn = self.get_connection().await?;
531        let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
532        let _: () = conn.del(&key).await?;
533        Ok(())
534    }
535
536    async fn invalidate_all_ossindex_cache(&self) -> Result<u64> {
537        let mut conn = self.get_connection().await?;
538        let pattern = self.key("ossidx:*");
539
540        // Use SCAN to find all OSS Index cache keys
541        let mut deleted = 0u64;
542        let mut cursor = 0u64;
543
544        loop {
545            let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
546                .arg(cursor)
547                .arg("MATCH")
548                .arg(&pattern)
549                .arg("COUNT")
550                .arg(1000)
551                .query_async(&mut conn)
552                .await?;
553
554            if !keys.is_empty() {
555                let count: u64 = redis::cmd("DEL").arg(&keys).query_async(&mut conn).await?;
556                deleted += count;
557            }
558
559            cursor = new_cursor;
560            if cursor == 0 {
561                break;
562            }
563        }
564
565        Ok(deleted)
566    }
567}
568
569impl DragonflyStore {
570    /// Generate a hash key for a PURL string.
571    fn hash_purl(purl: &str) -> String {
572        use std::collections::hash_map::DefaultHasher;
573        use std::hash::{Hash, Hasher};
574
575        let mut hasher = DefaultHasher::new();
576        purl.hash(&mut hasher);
577        format!("{:x}", hasher.finish())
578    }
579}