1use crate::config::StoreConfig;
7use crate::error::{AdvisoryError, Result};
8use crate::models::Advisory;
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures_util::Stream;
12use redis::AsyncCommands;
13use serde::{Deserialize, Serialize};
14use std::io::Write;
15use std::pin::Pin;
16use std::time::Instant;
17use tracing::{info, instrument};
18
19#[async_trait]
21pub trait AdvisoryStore: Send + Sync {
22 async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()>;
24
25 async fn get(&self, id: &str) -> Result<Option<Advisory>>;
27
28 async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>>;
30
31 async fn last_sync(&self, source: &str) -> Result<Option<String>>;
33
34 async fn health_check(&self) -> Result<HealthStatus>;
36
37 async fn get_by_package_stream(
39 &self,
40 ecosystem: &str,
41 package: &str,
42 ) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>>;
43
44 async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>>;
46
47 async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()>;
49
50 async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>>;
52
53 async fn get_enrichment_batch(
55 &self,
56 cve_ids: &[String],
57 ) -> Result<Vec<(String, EnrichmentData)>>;
58
59 async fn update_sync_timestamp(&self, source: &str) -> Result<()>;
61
62 async fn reset_sync_timestamp(&self, source: &str) -> Result<()>;
64
65 async fn advisory_count(&self) -> Result<u64>;
67
68 async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()>;
75
76 async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>>;
80
81 async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()>;
83
84 async fn invalidate_all_ossindex_cache(&self) -> Result<u64>;
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct HealthStatus {
91 pub connected: bool,
93 pub latency_ms: u64,
95 pub advisory_count: u64,
97 pub server_info: Option<String>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct EnrichmentData {
104 pub epss_score: Option<f64>,
106 pub epss_percentile: Option<f64>,
108 pub is_kev: bool,
110 pub kev_due_date: Option<String>,
112 pub kev_date_added: Option<String>,
114 pub kev_ransomware: Option<bool>,
116 pub updated_at: String,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct OssIndexCache {
126 pub advisories: Vec<crate::models::Advisory>,
128 pub cached_at: chrono::DateTime<chrono::Utc>,
130 pub ttl_seconds: u64,
132}
133
134const DEFAULT_OSSINDEX_CACHE_TTL: u64 = 3600;
136
137impl OssIndexCache {
138 pub fn new(advisories: Vec<crate::models::Advisory>) -> Self {
140 Self {
141 advisories,
142 cached_at: chrono::Utc::now(),
143 ttl_seconds: DEFAULT_OSSINDEX_CACHE_TTL,
144 }
145 }
146
147 pub fn with_ttl(advisories: Vec<crate::models::Advisory>, ttl_seconds: u64) -> Self {
149 Self {
150 advisories,
151 cached_at: chrono::Utc::now(),
152 ttl_seconds,
153 }
154 }
155
156 pub fn is_valid(&self) -> bool {
158 !self.is_expired()
159 }
160
161 pub fn is_expired(&self) -> bool {
163 let age = chrono::Utc::now().signed_duration_since(self.cached_at);
164 age.num_seconds() >= self.ttl_seconds as i64
165 }
166
167 pub fn remaining_ttl(&self) -> i64 {
169 let age = chrono::Utc::now().signed_duration_since(self.cached_at);
170 (self.ttl_seconds as i64) - age.num_seconds()
171 }
172}
173
174pub struct DragonflyStore {
176 client: redis::Client,
177 config: StoreConfig,
178}
179
180impl DragonflyStore {
181 pub fn new(url: &str) -> Result<Self> {
183 Self::with_config(url, StoreConfig::default())
184 }
185
186 pub fn with_config(url: &str, config: StoreConfig) -> Result<Self> {
188 let client = redis::Client::open(url)?;
189 Ok(Self { client, config })
190 }
191
192 pub fn key_prefix(&self) -> &str {
194 &self.config.key_prefix
195 }
196
197 fn key(&self, suffix: &str) -> String {
199 format!("{}:{}", self.config.key_prefix, suffix)
200 }
201
202 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
203 let mut encoder =
204 zstd::stream::write::Encoder::new(Vec::new(), self.config.compression_level)?;
205 encoder.write_all(data)?;
206 encoder
207 .finish()
208 .map_err(|e| AdvisoryError::compression(e.to_string()))
209 }
210
211 fn decompress(data: &[u8]) -> Result<Vec<u8>> {
212 let mut decoder = zstd::stream::read::Decoder::new(data)?;
213 let mut decoded = Vec::new();
214 std::io::Read::read_to_end(&mut decoder, &mut decoded)?;
215 Ok(decoded)
216 }
217
218 async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
219 self.client
220 .get_multiplexed_async_connection()
221 .await
222 .map_err(AdvisoryError::from)
223 }
224}
225
226#[async_trait]
227impl AdvisoryStore for DragonflyStore {
228 #[instrument(skip(self, advisories), fields(count = advisories.len()))]
229 async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()> {
230 let mut conn = self.get_connection().await?;
231 let mut pipe = redis::pipe();
232
233 for advisory in advisories {
234 let json = serde_json::to_vec(advisory)?;
235 let compressed = self.compress(&json)?;
236
237 let data_key = self.key(&format!("data:{}", advisory.id));
238
239 if let Some(ttl) = self.config.ttl_seconds {
241 pipe.cmd("SETEX").arg(&data_key).arg(ttl).arg(compressed);
242 } else {
243 pipe.set(&data_key, compressed);
244 }
245
246 for affected in &advisory.affected {
248 let idx_key = self.key(&format!(
249 "idx:{}:{}",
250 affected.package.ecosystem, affected.package.name
251 ));
252 pipe.sadd(&idx_key, &advisory.id);
253 }
254 }
255
256 pipe.query_async::<()>(&mut conn).await?;
260 info!("Upserted {} advisories from {}", advisories.len(), source);
261 Ok(())
262 }
263
264 async fn get(&self, id: &str) -> Result<Option<Advisory>> {
265 let mut conn = self.get_connection().await?;
266 let data: Option<Vec<u8>> = conn.get(self.key(&format!("data:{}", id))).await?;
267
268 match data {
269 Some(bytes) => {
270 let decompressed = Self::decompress(&bytes)?;
271 let advisory = serde_json::from_slice(&decompressed)?;
272 Ok(Some(advisory))
273 }
274 None => Ok(None),
275 }
276 }
277
278 async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
279 let mut conn = self.get_connection().await?;
280 let ids: Vec<String> = conn
281 .smembers(self.key(&format!("idx:{}:{}", ecosystem, package)))
282 .await?;
283
284 self.get_batch(&ids).await
286 }
287
288 async fn last_sync(&self, source: &str) -> Result<Option<String>> {
289 let mut conn = self.get_connection().await?;
290 Ok(conn.get(self.key(&format!("meta:{}", source))).await?)
291 }
292
293 async fn health_check(&self) -> Result<HealthStatus> {
294 let start = Instant::now();
295
296 let mut conn = self.get_connection().await?;
297
298 let pong: String = redis::cmd("PING").query_async(&mut conn).await?;
300 let connected = pong == "PONG";
301
302 let latency_ms = start.elapsed().as_millis() as u64;
303
304 let advisory_count = self.advisory_count().await.unwrap_or(0);
306
307 let info: std::result::Result<String, _> = redis::cmd("INFO")
309 .arg("server")
310 .query_async(&mut conn)
311 .await;
312 let server_info = info.ok().and_then(|s| {
313 s.lines()
314 .find(|l| l.starts_with("redis_version:"))
315 .map(|l| l.to_string())
316 });
317
318 Ok(HealthStatus {
319 connected,
320 latency_ms,
321 advisory_count,
322 server_info,
323 })
324 }
325
326 async fn get_by_package_stream(
327 &self,
328 ecosystem: &str,
329 package: &str,
330 ) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>> {
331 let idx_key = self.key(&format!("idx:{}:{}", ecosystem, package));
332
333 let stream = try_stream! {
334 let mut conn = self.get_connection().await?;
335
336 let mut cursor = 0u64;
338 loop {
339 let (new_cursor, ids): (u64, Vec<String>) = redis::cmd("SSCAN")
340 .arg(&idx_key)
341 .arg(cursor)
342 .arg("COUNT")
343 .arg(100)
344 .query_async(&mut conn)
345 .await?;
346
347 for id in ids {
348 if let Some(advisory) = self.get(&id).await? {
349 yield advisory;
350 }
351 }
352
353 cursor = new_cursor;
354 if cursor == 0 {
355 break;
356 }
357 }
358 };
359
360 Ok(Box::pin(stream))
361 }
362
363 async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>> {
364 if ids.is_empty() {
365 return Ok(Vec::new());
366 }
367
368 let mut conn = self.get_connection().await?;
369 let keys: Vec<String> = ids
370 .iter()
371 .map(|id| self.key(&format!("data:{}", id)))
372 .collect();
373
374 let data: Vec<Option<Vec<u8>>> =
375 redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
376
377 let mut advisories = Vec::new();
378 for bytes in data.into_iter().flatten() {
379 let decompressed = Self::decompress(&bytes)?;
380 let advisory: Advisory = serde_json::from_slice(&decompressed)?;
381 advisories.push(advisory);
382 }
383
384 Ok(advisories)
385 }
386
387 async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()> {
388 let mut conn = self.get_connection().await?;
389 let key = self.key(&format!("enrich:{}", cve_id));
390 let json = serde_json::to_string(data)?;
391
392 if let Some(ttl) = self.config.ttl_seconds {
393 redis::cmd("SETEX")
394 .arg(&key)
395 .arg(ttl)
396 .arg(json)
397 .query_async::<()>(&mut conn)
398 .await?;
399 } else {
400 let _: () = conn.set(&key, json).await?;
401 }
402
403 Ok(())
404 }
405
406 async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>> {
407 let mut conn = self.get_connection().await?;
408 let key = self.key(&format!("enrich:{}", cve_id));
409 let data: Option<String> = conn.get(&key).await?;
410
411 match data {
412 Some(json) => Ok(Some(serde_json::from_str(&json)?)),
413 None => Ok(None),
414 }
415 }
416
417 async fn get_enrichment_batch(
418 &self,
419 cve_ids: &[String],
420 ) -> Result<Vec<(String, EnrichmentData)>> {
421 if cve_ids.is_empty() {
422 return Ok(Vec::new());
423 }
424
425 let mut conn = self.get_connection().await?;
426 let keys: Vec<String> = cve_ids
427 .iter()
428 .map(|id| self.key(&format!("enrich:{}", id)))
429 .collect();
430
431 let data: Vec<Option<String>> =
432 redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
433
434 let mut results = Vec::new();
435 for (cve_id, json_opt) in cve_ids.iter().zip(data) {
436 if let Some(json) = json_opt {
437 if let Ok(enrichment) = serde_json::from_str(&json) {
438 results.push((cve_id.clone(), enrichment));
439 }
440 }
441 }
442
443 Ok(results)
444 }
445
446 async fn update_sync_timestamp(&self, source: &str) -> Result<()> {
447 let mut conn = self.get_connection().await?;
448 let _: () = conn
449 .set(
450 self.key(&format!("meta:{}", source)),
451 chrono::Utc::now().to_rfc3339(),
452 )
453 .await?;
454 Ok(())
455 }
456
457 async fn reset_sync_timestamp(&self, source: &str) -> Result<()> {
458 let mut conn = self.get_connection().await?;
459 let _: () = conn.del(self.key(&format!("meta:{}", source))).await?;
460 info!("Reset sync timestamp for {}", source);
461 Ok(())
462 }
463
464 async fn advisory_count(&self) -> Result<u64> {
465 let mut conn = self.get_connection().await?;
466 let pattern = self.key("data:*");
467
468 let mut count = 0u64;
470 let mut cursor = 0u64;
471
472 loop {
473 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
474 .arg(cursor)
475 .arg("MATCH")
476 .arg(&pattern)
477 .arg("COUNT")
478 .arg(1000)
479 .query_async(&mut conn)
480 .await?;
481
482 count += keys.len() as u64;
483 cursor = new_cursor;
484
485 if cursor == 0 {
486 break;
487 }
488 }
489
490 Ok(count)
491 }
492
493 async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()> {
494 let mut conn = self.get_connection().await?;
495 let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
496 let json = serde_json::to_string(cache)?;
497
498 let ttl = cache.remaining_ttl().max(1) as u64;
500 redis::cmd("SETEX")
501 .arg(&key)
502 .arg(ttl)
503 .arg(json)
504 .query_async::<()>(&mut conn)
505 .await?;
506
507 Ok(())
508 }
509
510 async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>> {
511 let mut conn = self.get_connection().await?;
512 let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
513 let data: Option<String> = conn.get(&key).await?;
514
515 match data {
516 Some(json) => {
517 let cache: OssIndexCache = serde_json::from_str(&json)?;
518 if cache.is_valid() {
520 Ok(Some(cache))
521 } else {
522 let _: () = conn.del(&key).await?;
524 Ok(None)
525 }
526 }
527 None => Ok(None),
528 }
529 }
530
531 async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()> {
532 let mut conn = self.get_connection().await?;
533 let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
534 let _: () = conn.del(&key).await?;
535 Ok(())
536 }
537
538 async fn invalidate_all_ossindex_cache(&self) -> Result<u64> {
539 let mut conn = self.get_connection().await?;
540 let pattern = self.key("ossidx:*");
541
542 let mut deleted = 0u64;
544 let mut cursor = 0u64;
545
546 loop {
547 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
548 .arg(cursor)
549 .arg("MATCH")
550 .arg(&pattern)
551 .arg("COUNT")
552 .arg(1000)
553 .query_async(&mut conn)
554 .await?;
555
556 if !keys.is_empty() {
557 let count: u64 = redis::cmd("DEL").arg(&keys).query_async(&mut conn).await?;
558 deleted += count;
559 }
560
561 cursor = new_cursor;
562 if cursor == 0 {
563 break;
564 }
565 }
566
567 Ok(deleted)
568 }
569}
570
571impl DragonflyStore {
572 fn hash_purl(purl: &str) -> String {
574 use std::collections::hash_map::DefaultHasher;
575 use std::hash::{Hash, Hasher};
576
577 let mut hasher = DefaultHasher::new();
578 purl.hash(&mut hasher);
579 format!("{:x}", hasher.finish())
580 }
581}