1use crate::config::StoreConfig;
7use crate::error::{AdvisoryError, Result};
8use crate::models::Advisory;
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures_util::Stream;
12use redis::AsyncCommands;
13use serde::{Deserialize, Serialize};
14use std::io::Write;
15use std::pin::Pin;
16use std::time::Instant;
17use tracing::{info, instrument};
18
19#[async_trait]
21pub trait AdvisoryStore: Send + Sync {
22 async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()>;
24
25 async fn get(&self, id: &str) -> Result<Option<Advisory>>;
27
28 async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>>;
30
31 async fn last_sync(&self, source: &str) -> Result<Option<String>>;
33
34 async fn health_check(&self) -> Result<HealthStatus>;
36
37 async fn get_by_package_stream(
39 &self,
40 ecosystem: &str,
41 package: &str,
42 ) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>>;
43
44 async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>>;
46
47 async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()>;
49
50 async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>>;
52
53 async fn get_enrichment_batch(
55 &self,
56 cve_ids: &[String],
57 ) -> Result<Vec<(String, EnrichmentData)>>;
58
59 async fn update_sync_timestamp(&self, source: &str) -> Result<()>;
61
62 async fn advisory_count(&self) -> Result<u64>;
64
65 async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()>;
72
73 async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>>;
77
78 async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()>;
80
81 async fn invalidate_all_ossindex_cache(&self) -> Result<u64>;
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct HealthStatus {
88 pub connected: bool,
90 pub latency_ms: u64,
92 pub advisory_count: u64,
94 pub server_info: Option<String>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EnrichmentData {
101 pub epss_score: Option<f64>,
103 pub epss_percentile: Option<f64>,
105 pub is_kev: bool,
107 pub kev_due_date: Option<String>,
109 pub kev_date_added: Option<String>,
111 pub kev_ransomware: Option<bool>,
113 pub updated_at: String,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct OssIndexCache {
123 pub advisories: Vec<crate::models::Advisory>,
125 pub cached_at: chrono::DateTime<chrono::Utc>,
127 pub ttl_seconds: u64,
129}
130
131const DEFAULT_OSSINDEX_CACHE_TTL: u64 = 3600;
133
134impl OssIndexCache {
135 pub fn new(advisories: Vec<crate::models::Advisory>) -> Self {
137 Self {
138 advisories,
139 cached_at: chrono::Utc::now(),
140 ttl_seconds: DEFAULT_OSSINDEX_CACHE_TTL,
141 }
142 }
143
144 pub fn with_ttl(advisories: Vec<crate::models::Advisory>, ttl_seconds: u64) -> Self {
146 Self {
147 advisories,
148 cached_at: chrono::Utc::now(),
149 ttl_seconds,
150 }
151 }
152
153 pub fn is_valid(&self) -> bool {
155 !self.is_expired()
156 }
157
158 pub fn is_expired(&self) -> bool {
160 let age = chrono::Utc::now().signed_duration_since(self.cached_at);
161 age.num_seconds() >= self.ttl_seconds as i64
162 }
163
164 pub fn remaining_ttl(&self) -> i64 {
166 let age = chrono::Utc::now().signed_duration_since(self.cached_at);
167 (self.ttl_seconds as i64) - age.num_seconds()
168 }
169}
170
171pub struct DragonflyStore {
173 client: redis::Client,
174 config: StoreConfig,
175}
176
177impl DragonflyStore {
178 pub fn new(url: &str) -> Result<Self> {
180 Self::with_config(url, StoreConfig::default())
181 }
182
183 pub fn with_config(url: &str, config: StoreConfig) -> Result<Self> {
185 let client = redis::Client::open(url)?;
186 Ok(Self { client, config })
187 }
188
189 pub fn key_prefix(&self) -> &str {
191 &self.config.key_prefix
192 }
193
194 fn key(&self, suffix: &str) -> String {
196 format!("{}:{}", self.config.key_prefix, suffix)
197 }
198
199 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
200 let mut encoder =
201 zstd::stream::write::Encoder::new(Vec::new(), self.config.compression_level)?;
202 encoder.write_all(data)?;
203 encoder
204 .finish()
205 .map_err(|e| AdvisoryError::compression(e.to_string()))
206 }
207
208 fn decompress(data: &[u8]) -> Result<Vec<u8>> {
209 let mut decoder = zstd::stream::read::Decoder::new(data)?;
210 let mut decoded = Vec::new();
211 std::io::Read::read_to_end(&mut decoder, &mut decoded)?;
212 Ok(decoded)
213 }
214
215 async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection> {
216 self.client
217 .get_multiplexed_async_connection()
218 .await
219 .map_err(AdvisoryError::from)
220 }
221}
222
223#[async_trait]
224impl AdvisoryStore for DragonflyStore {
225 #[instrument(skip(self, advisories), fields(count = advisories.len()))]
226 async fn upsert_batch(&self, advisories: &[Advisory], source: &str) -> Result<()> {
227 let mut conn = self.get_connection().await?;
228 let mut pipe = redis::pipe();
229
230 for advisory in advisories {
231 let json = serde_json::to_vec(advisory)?;
232 let compressed = self.compress(&json)?;
233
234 let data_key = self.key(&format!("data:{}", advisory.id));
235
236 if let Some(ttl) = self.config.ttl_seconds {
238 pipe.cmd("SETEX").arg(&data_key).arg(ttl).arg(compressed);
239 } else {
240 pipe.set(&data_key, compressed);
241 }
242
243 for affected in &advisory.affected {
245 let idx_key = self.key(&format!(
246 "idx:{}:{}",
247 affected.package.ecosystem, affected.package.name
248 ));
249 pipe.sadd(&idx_key, &advisory.id);
250 }
251 }
252
253 pipe.set(
255 self.key(&format!("meta:{}", source)),
256 chrono::Utc::now().to_rfc3339(),
257 );
258
259 pipe.query_async::<()>(&mut conn).await?;
260 info!("Upserted {} advisories from {}", advisories.len(), source);
261 Ok(())
262 }
263
264 async fn get(&self, id: &str) -> Result<Option<Advisory>> {
265 let mut conn = self.get_connection().await?;
266 let data: Option<Vec<u8>> = conn.get(self.key(&format!("data:{}", id))).await?;
267
268 match data {
269 Some(bytes) => {
270 let decompressed = Self::decompress(&bytes)?;
271 let advisory = serde_json::from_slice(&decompressed)?;
272 Ok(Some(advisory))
273 }
274 None => Ok(None),
275 }
276 }
277
278 async fn get_by_package(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
279 let mut conn = self.get_connection().await?;
280 let ids: Vec<String> = conn
281 .smembers(self.key(&format!("idx:{}:{}", ecosystem, package)))
282 .await?;
283
284 let mut advisories = Vec::new();
285 for id in ids {
286 if let Some(advisory) = self.get(&id).await? {
287 advisories.push(advisory);
288 }
289 }
290 Ok(advisories)
291 }
292
293 async fn last_sync(&self, source: &str) -> Result<Option<String>> {
294 let mut conn = self.get_connection().await?;
295 Ok(conn.get(self.key(&format!("meta:{}", source))).await?)
296 }
297
298 async fn health_check(&self) -> Result<HealthStatus> {
299 let start = Instant::now();
300
301 let mut conn = self.get_connection().await?;
302
303 let pong: String = redis::cmd("PING").query_async(&mut conn).await?;
305 let connected = pong == "PONG";
306
307 let latency_ms = start.elapsed().as_millis() as u64;
308
309 let advisory_count = self.advisory_count().await.unwrap_or(0);
311
312 let info: std::result::Result<String, _> = redis::cmd("INFO")
314 .arg("server")
315 .query_async(&mut conn)
316 .await;
317 let server_info = info.ok().and_then(|s| {
318 s.lines()
319 .find(|l| l.starts_with("redis_version:"))
320 .map(|l| l.to_string())
321 });
322
323 Ok(HealthStatus {
324 connected,
325 latency_ms,
326 advisory_count,
327 server_info,
328 })
329 }
330
331 async fn get_by_package_stream(
332 &self,
333 ecosystem: &str,
334 package: &str,
335 ) -> Result<Pin<Box<dyn Stream<Item = Result<Advisory>> + Send + '_>>> {
336 let idx_key = self.key(&format!("idx:{}:{}", ecosystem, package));
337
338 let stream = try_stream! {
339 let mut conn = self.get_connection().await?;
340
341 let mut cursor = 0u64;
343 loop {
344 let (new_cursor, ids): (u64, Vec<String>) = redis::cmd("SSCAN")
345 .arg(&idx_key)
346 .arg(cursor)
347 .arg("COUNT")
348 .arg(100)
349 .query_async(&mut conn)
350 .await?;
351
352 for id in ids {
353 if let Some(advisory) = self.get(&id).await? {
354 yield advisory;
355 }
356 }
357
358 cursor = new_cursor;
359 if cursor == 0 {
360 break;
361 }
362 }
363 };
364
365 Ok(Box::pin(stream))
366 }
367
368 async fn get_batch(&self, ids: &[String]) -> Result<Vec<Advisory>> {
369 if ids.is_empty() {
370 return Ok(Vec::new());
371 }
372
373 let mut conn = self.get_connection().await?;
374 let keys: Vec<String> = ids
375 .iter()
376 .map(|id| self.key(&format!("data:{}", id)))
377 .collect();
378
379 let data: Vec<Option<Vec<u8>>> =
380 redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
381
382 let mut advisories = Vec::new();
383 for bytes in data.into_iter().flatten() {
384 let decompressed = Self::decompress(&bytes)?;
385 let advisory: Advisory = serde_json::from_slice(&decompressed)?;
386 advisories.push(advisory);
387 }
388
389 Ok(advisories)
390 }
391
392 async fn store_enrichment(&self, cve_id: &str, data: &EnrichmentData) -> Result<()> {
393 let mut conn = self.get_connection().await?;
394 let key = self.key(&format!("enrich:{}", cve_id));
395 let json = serde_json::to_string(data)?;
396
397 if let Some(ttl) = self.config.ttl_seconds {
398 redis::cmd("SETEX")
399 .arg(&key)
400 .arg(ttl)
401 .arg(json)
402 .query_async::<()>(&mut conn)
403 .await?;
404 } else {
405 let _: () = conn.set(&key, json).await?;
406 }
407
408 Ok(())
409 }
410
411 async fn get_enrichment(&self, cve_id: &str) -> Result<Option<EnrichmentData>> {
412 let mut conn = self.get_connection().await?;
413 let key = self.key(&format!("enrich:{}", cve_id));
414 let data: Option<String> = conn.get(&key).await?;
415
416 match data {
417 Some(json) => Ok(Some(serde_json::from_str(&json)?)),
418 None => Ok(None),
419 }
420 }
421
422 async fn get_enrichment_batch(
423 &self,
424 cve_ids: &[String],
425 ) -> Result<Vec<(String, EnrichmentData)>> {
426 if cve_ids.is_empty() {
427 return Ok(Vec::new());
428 }
429
430 let mut conn = self.get_connection().await?;
431 let keys: Vec<String> = cve_ids
432 .iter()
433 .map(|id| self.key(&format!("enrich:{}", id)))
434 .collect();
435
436 let data: Vec<Option<String>> =
437 redis::cmd("MGET").arg(&keys).query_async(&mut conn).await?;
438
439 let mut results = Vec::new();
440 for (cve_id, json_opt) in cve_ids.iter().zip(data) {
441 if let Some(json) = json_opt {
442 if let Ok(enrichment) = serde_json::from_str(&json) {
443 results.push((cve_id.clone(), enrichment));
444 }
445 }
446 }
447
448 Ok(results)
449 }
450
451 async fn update_sync_timestamp(&self, source: &str) -> Result<()> {
452 let mut conn = self.get_connection().await?;
453 let _: () = conn
454 .set(
455 self.key(&format!("meta:{}", source)),
456 chrono::Utc::now().to_rfc3339(),
457 )
458 .await?;
459 Ok(())
460 }
461
462 async fn advisory_count(&self) -> Result<u64> {
463 let mut conn = self.get_connection().await?;
464 let pattern = self.key("data:*");
465
466 let mut count = 0u64;
468 let mut cursor = 0u64;
469
470 loop {
471 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
472 .arg(cursor)
473 .arg("MATCH")
474 .arg(&pattern)
475 .arg("COUNT")
476 .arg(1000)
477 .query_async(&mut conn)
478 .await?;
479
480 count += keys.len() as u64;
481 cursor = new_cursor;
482
483 if cursor == 0 {
484 break;
485 }
486 }
487
488 Ok(count)
489 }
490
491 async fn store_ossindex_cache(&self, purl: &str, cache: &OssIndexCache) -> Result<()> {
492 let mut conn = self.get_connection().await?;
493 let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
494 let json = serde_json::to_string(cache)?;
495
496 let ttl = cache.remaining_ttl().max(1) as u64;
498 redis::cmd("SETEX")
499 .arg(&key)
500 .arg(ttl)
501 .arg(json)
502 .query_async::<()>(&mut conn)
503 .await?;
504
505 Ok(())
506 }
507
508 async fn get_ossindex_cache(&self, purl: &str) -> Result<Option<OssIndexCache>> {
509 let mut conn = self.get_connection().await?;
510 let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
511 let data: Option<String> = conn.get(&key).await?;
512
513 match data {
514 Some(json) => {
515 let cache: OssIndexCache = serde_json::from_str(&json)?;
516 if cache.is_valid() {
518 Ok(Some(cache))
519 } else {
520 let _: () = conn.del(&key).await?;
522 Ok(None)
523 }
524 }
525 None => Ok(None),
526 }
527 }
528
529 async fn invalidate_ossindex_cache(&self, purl: &str) -> Result<()> {
530 let mut conn = self.get_connection().await?;
531 let key = self.key(&format!("ossidx:{}", Self::hash_purl(purl)));
532 let _: () = conn.del(&key).await?;
533 Ok(())
534 }
535
536 async fn invalidate_all_ossindex_cache(&self) -> Result<u64> {
537 let mut conn = self.get_connection().await?;
538 let pattern = self.key("ossidx:*");
539
540 let mut deleted = 0u64;
542 let mut cursor = 0u64;
543
544 loop {
545 let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
546 .arg(cursor)
547 .arg("MATCH")
548 .arg(&pattern)
549 .arg("COUNT")
550 .arg(1000)
551 .query_async(&mut conn)
552 .await?;
553
554 if !keys.is_empty() {
555 let count: u64 = redis::cmd("DEL").arg(&keys).query_async(&mut conn).await?;
556 deleted += count;
557 }
558
559 cursor = new_cursor;
560 if cursor == 0 {
561 break;
562 }
563 }
564
565 Ok(deleted)
566 }
567}
568
569impl DragonflyStore {
570 fn hash_purl(purl: &str) -> String {
572 use std::collections::hash_map::DefaultHasher;
573 use std::hash::{Hash, Hasher};
574
575 let mut hasher = DefaultHasher::new();
576 purl.hash(&mut hasher);
577 format!("{:x}", hasher.finish())
578 }
579}