1use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tracing::{debug, error, info, warn};
19
20#[derive(Debug, Clone, Default)]
22pub struct MatchOptions {
23 pub min_cvss: Option<f64>,
25 pub min_epss: Option<f64>,
27 pub kev_only: bool,
29 pub min_severity: Option<Severity>,
31 pub include_enrichment: bool,
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct SyncStats {
38 pub total_sources: usize,
40 pub successful_sources: usize,
42 pub failed_sources: usize,
44 pub total_advisories_synced: usize,
46 pub errors: HashMap<String, String>,
48}
49
50pub trait SyncObserver: Send + Sync {
52 fn on_sync_start(&self);
54
55 fn on_source_start(&self, source_name: &str);
57
58 fn on_source_success(&self, source_name: &str, count: usize);
60
61 fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError);
63
64 fn on_sync_complete(&self, stats: &SyncStats);
66}
67
68pub struct TracingSyncObserver;
70
71impl SyncObserver for TracingSyncObserver {
72 fn on_sync_start(&self) {
73 info!("Starting full vulnerability sync...");
74 }
75
76 fn on_source_start(&self, source_name: &str) {
77 info!("Syncing {}...", source_name);
82 }
83
84 fn on_source_success(&self, source_name: &str, count: usize) {
85 info!(
86 "Successfully synced {} advisories from {}",
87 count, source_name
88 );
89 }
90
91 fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError) {
92 error!("Failed to sync {}: {}", source_name, error);
93 }
94
95 fn on_sync_complete(&self, _stats: &SyncStats) {
96 info!("Sync completed.");
97 }
98}
99
100impl MatchOptions {
101 pub fn with_enrichment() -> Self {
103 Self {
104 include_enrichment: true,
105 ..Default::default()
106 }
107 }
108
109 pub fn high_severity() -> Self {
111 Self {
112 min_severity: Some(Severity::High),
113 include_enrichment: true,
114 ..Default::default()
115 }
116 }
117
118 pub fn exploited_only() -> Self {
120 Self {
121 kev_only: true,
122 include_enrichment: true,
123 ..Default::default()
124 }
125 }
126}
127
128#[derive(Debug, Clone, Hash, PartialEq, Eq)]
130pub struct PackageKey {
131 pub ecosystem: String,
133 pub name: String,
135 pub version: Option<String>,
137}
138
139impl PackageKey {
140 pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
142 Self {
143 ecosystem: ecosystem.into(),
144 name: name.into(),
145 version: None,
146 }
147 }
148
149 pub fn with_version(
151 ecosystem: impl Into<String>,
152 name: impl Into<String>,
153 version: impl Into<String>,
154 ) -> Self {
155 Self {
156 ecosystem: ecosystem.into(),
157 name: name.into(),
158 version: Some(version.into()),
159 }
160 }
161}
162
163pub struct VulnerabilityManagerBuilder {
165 redis_url: Option<String>,
166 store_config: StoreConfig,
167 sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
168 custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
169 ossindex_source: Option<OssIndexSource>,
170 observer: Option<Arc<dyn SyncObserver>>,
171}
172
173impl Default for VulnerabilityManagerBuilder {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179impl VulnerabilityManagerBuilder {
180 pub fn new() -> Self {
182 Self {
183 redis_url: None,
184 store_config: StoreConfig::default(),
185 sources: Vec::new(),
186 custom_store: None,
187 ossindex_source: None,
188 observer: None,
189 }
190 }
191
192 pub fn redis_url(mut self, url: impl Into<String>) -> Self {
194 self.redis_url = Some(url.into());
195 self
196 }
197
198 pub fn store_config(mut self, config: StoreConfig) -> Self {
200 self.store_config = config;
201 self
202 }
203
204 pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
206 self.custom_store = Some(store);
207 self
208 }
209
210 pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
212 self.sources.push(source);
213 self
214 }
215
216 pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
218 self.sources.push(Arc::new(GHSASource::new(token.into())));
219 self
220 }
221
222 pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
224 self.sources.push(Arc::new(NVDSource::new(api_key)));
225 self
226 }
227
228 pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
230 self.sources.push(Arc::new(OSVSource::new(ecosystems)));
231 self
232 }
233
234 pub fn with_osv_defaults(self) -> Self {
236 self.with_osv(vec![
237 "npm".to_string(),
238 "PyPI".to_string(),
239 "Maven".to_string(),
240 "crates.io".to_string(),
241 "Go".to_string(),
242 "Packagist".to_string(),
243 "RubyGems".to_string(),
244 "NuGet".to_string(),
245 ])
246 }
247
248 pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
253 match OssIndexSource::new(config) {
254 Ok(source) => {
255 self.ossindex_source = Some(source);
256 }
257 Err(e) => {
258 warn!("Failed to configure OSS Index source: {}", e);
259 }
260 }
261 self
262 }
263
264 pub fn with_observer(mut self, observer: Arc<dyn SyncObserver>) -> Self {
266 self.observer = Some(observer);
267 self
268 }
269
270 pub fn build(self) -> Result<VulnerabilityManager> {
272 let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
273 Some(s) => s,
274 None => {
275 let url = self.redis_url.ok_or_else(|| {
276 AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
277 })?;
278 Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
279 }
280 };
281
282 if self.sources.is_empty() {
283 warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
284 }
285
286 Ok(VulnerabilityManager {
287 store,
288 sources: self.sources,
289 kev_source: KevSource::new(),
290 epss_source: EpssSource::new(),
291 ossindex_source: self.ossindex_source,
292 observer: self
293 .observer
294 .unwrap_or_else(|| Arc::new(TracingSyncObserver)),
295 })
296 }
297}
298
299pub struct VulnerabilityManager {
301 store: Arc<dyn AdvisoryStore + Send + Sync>,
302 sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
303 kev_source: KevSource,
304 epss_source: EpssSource,
305 ossindex_source: Option<OssIndexSource>,
306 observer: Arc<dyn SyncObserver>,
307}
308
309impl VulnerabilityManager {
310 pub async fn new(config: Config) -> Result<Self> {
314 let mut builder = VulnerabilityManagerBuilder::new()
315 .redis_url(&config.redis_url)
316 .store_config(config.store.clone());
317
318 builder = builder.with_osv_defaults();
320
321 builder = builder.with_nvd(config.nvd_api_key.clone());
323
324 if let Some(token) = &config.ghsa_token {
326 builder = builder.with_ghsa(token.clone());
327 }
328
329 if config.ossindex.is_some() {
331 builder = builder.with_ossindex(config.ossindex.clone());
332 }
333
334 builder.build()
335 }
336
337 pub fn builder() -> VulnerabilityManagerBuilder {
339 VulnerabilityManagerBuilder::new()
340 }
341
342 pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
344 &self.store
345 }
346
347 pub async fn health_check(&self) -> Result<HealthStatus> {
349 self.store.health_check().await
350 }
351
352 pub async fn sync_all(&self) -> Result<SyncStats> {
354 self.observer.on_sync_start();
355
356 let mut handles = Vec::new();
357 let mut stats = SyncStats {
358 total_sources: self.sources.len(),
359 ..Default::default()
360 };
361
362 for source in &self.sources {
363 let source = source.clone();
364 let store = self.store.clone();
365 let observer = self.observer.clone();
366
367 let handle = tokio::spawn(async move {
368 observer.on_source_start(source.name());
369
370 let last_sync = match store.last_sync(source.name()).await {
371 Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
372 Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
373 Err(_) => None,
374 },
375 _ => None,
376 };
377
378 match source.fetch(last_sync).await {
379 Ok(advisories) => {
380 if !advisories.is_empty() {
381 match store.upsert_batch(&advisories, source.name()).await {
382 Ok(_) => {
383 observer.on_source_success(source.name(), advisories.len());
384 if let Err(e) = store.update_sync_timestamp(source.name()).await
386 {
387 let err = AdvisoryError::source_fetch(
388 source.name(),
389 format!("Failed to update timestamp: {}", e),
390 );
391 observer.on_source_error(source.name(), &err);
392 }
395 Ok((source.name().to_string(), advisories.len()))
396 }
397 Err(e) => {
398 observer.on_source_error(source.name(), &e);
400 Err((source.name().to_string(), e.to_string()))
401 }
402 }
403 } else {
404 observer.on_source_success(source.name(), 0);
405 if let Err(e) = store.update_sync_timestamp(source.name()).await {
407 let err = AdvisoryError::source_fetch(
408 source.name(),
409 format!("Failed to update timestamp: {}", e),
410 );
411 observer.on_source_error(source.name(), &err);
412 }
413 Ok((source.name().to_string(), 0))
414 }
415 }
416 Err(e) => {
417 observer.on_source_error(source.name(), &e);
418 Err((source.name().to_string(), e.to_string()))
419 }
420 }
421 });
422 handles.push(handle);
423 }
424
425 for handle in handles {
427 match handle.await {
428 Ok(result) => match result {
429 Ok((_, count)) => {
430 stats.successful_sources += 1;
431 stats.total_advisories_synced += count;
432 }
433 Err((name, error)) => {
434 stats.failed_sources += 1;
435 stats.errors.insert(name, error);
436 }
437 },
438 Err(e) => {
439 error!("Task join error: {}", e);
441 stats.failed_sources += 1;
442 stats
443 .errors
444 .insert("unknown".to_string(), format!("Task join error: {}", e));
445 }
446 }
447 }
448
449 self.observer.on_sync_complete(&stats);
450 Ok(stats)
451 }
452
453 pub async fn reset_sync(&self, source: &str) -> Result<()> {
457 self.store.reset_sync_timestamp(source).await
458 }
459
460 pub async fn reset_all_syncs(&self) -> Result<()> {
462 for source in &self.sources {
463 self.store.reset_sync_timestamp(source.name()).await?;
464 }
465 Ok(())
466 }
467
468 pub async fn sync_enrichment(&self) -> Result<()> {
470 self.sync_enrichment_with_cves(&[]).await
471 }
472
473 pub async fn sync_enrichment_with_cves(&self, extra_cves: &[String]) -> Result<()> {
475 info!("Syncing enrichment data (KEV, EPSS)...");
476
477 let mut enrichment: HashMap<String, EnrichmentData> = HashMap::new();
478
479 match self.kev_source.fetch_catalog().await {
481 Ok(kev_entries) => {
482 info!("Processing {} KEV entries", kev_entries.len());
483 for (cve_id, entry) in kev_entries {
484 let data = enrichment
485 .entry(cve_id.clone())
486 .or_insert_with(|| EnrichmentData {
487 epss_score: None,
488 epss_percentile: None,
489 is_kev: false,
490 kev_due_date: None,
491 kev_date_added: None,
492 kev_ransomware: None,
493 updated_at: String::new(),
494 });
495
496 data.is_kev = true;
497 data.kev_due_date = entry.due_date_utc().map(|d| d.to_rfc3339());
498 data.kev_date_added = entry.date_added_utc().map(|d| d.to_rfc3339());
499 data.kev_ransomware = Some(entry.is_ransomware_related());
500 }
501 }
502 Err(e) => {
503 error!("Failed to fetch KEV catalog: {}", e);
504 }
505 }
506
507 let epss_targets = Self::collect_enrichment_targets(&enrichment, extra_cves);
509 if !epss_targets.is_empty() {
510 match self
511 .epss_source
512 .fetch_scores_batch(&epss_targets, 200)
513 .await
514 {
515 Ok(scores) => {
516 Self::merge_epss_scores(&mut enrichment, scores);
517 }
518 Err(e) => {
519 warn!("Failed to fetch EPSS scores: {}", e);
520 }
521 }
522 }
523
524 if !enrichment.is_empty() {
526 let now = chrono::Utc::now().to_rfc3339();
527 for (cve_id, mut data) in enrichment {
528 if data.updated_at.is_empty() {
529 data.updated_at = now.clone();
530 }
531 if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
532 debug!("Failed to store enrichment for {}: {}", cve_id, e);
533 }
534 }
535 }
536
537 Ok(())
538 }
539
540 fn collect_enrichment_targets(
542 current: &HashMap<String, EnrichmentData>,
543 extra: &[String],
544 ) -> Vec<String> {
545 let mut set: std::collections::HashSet<String> = current.keys().cloned().collect();
546 for c in extra {
547 set.insert(c.clone());
548 }
549 set.into_iter().collect()
550 }
551
552 fn merge_epss_scores(
554 enrichment: &mut HashMap<String, EnrichmentData>,
555 scores: HashMap<String, crate::sources::epss::EpssScore>,
556 ) {
557 for (cve_id, score) in scores {
558 let data = enrichment
559 .entry(cve_id.clone())
560 .or_insert_with(|| EnrichmentData {
561 epss_score: None,
562 epss_percentile: None,
563 is_kev: false,
564 kev_due_date: None,
565 kev_date_added: None,
566 kev_ransomware: None,
567 updated_at: String::new(),
568 });
569
570 data.epss_score = Some(score.epss);
571 data.epss_percentile = Some(score.percentile);
572 if let Some(date) = score.date_utc() {
573 data.updated_at = date.to_rfc3339();
574 }
575 }
576 }
577
578 pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
580 let advisories = self.store.get_by_package(ecosystem, package).await?;
581 Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
582 }
583
584 pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
586 let mut advisories = self.query(ecosystem, package).await?;
587 self.enrich_advisories(&mut advisories).await?;
588 Ok(advisories)
589 }
590
591 pub async fn query_batch(
595 &self,
596 packages: &[PackageKey],
597 ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
598 use futures_util::future::join_all;
599
600 let tasks: Vec<_> = packages
601 .iter()
602 .map(|pkg| {
603 let pkg = pkg.clone();
604 let ecosystem = pkg.ecosystem.clone();
605 let name = pkg.name.clone();
606 let version = pkg.version.clone();
607 let store = self.store.clone();
608
609 async move {
610 let advisories = if let Some(ver) = &version {
611 let all = store.get_by_package(&ecosystem, &name).await?;
613 let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
614 Self::filter_by_version(aggregated, &ecosystem, &name, ver)
615 } else {
616 let all = store.get_by_package(&ecosystem, &name).await?;
617 crate::aggregator::ReportAggregator::aggregate(all)
618 };
619 Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
620 }
621 })
622 .collect();
623
624 let results: Vec<_> = join_all(tasks).await;
625
626 let mut map = HashMap::new();
627 for result in results {
628 match result {
629 Ok((pkg, advisories)) => {
630 map.insert(pkg, advisories);
631 }
632 Err(e) => {
633 warn!("Batch query error: {}", e);
634 }
635 }
636 }
637
638 Ok(map)
639 }
640
641 fn filter_by_version(
643 advisories: Vec<Advisory>,
644 ecosystem: &str,
645 package: &str,
646 version: &str,
647 ) -> Vec<Advisory> {
648 advisories
649 .into_iter()
650 .filter(|advisory| {
651 for affected in &advisory.affected {
652 if affected.package.name != package || affected.package.ecosystem != ecosystem {
653 continue;
654 }
655
656 if affected.versions.contains(&version.to_string()) {
658 return true;
659 }
660
661 for range in &affected.ranges {
663 match range.range_type {
664 RangeType::Semver => {
665 if Self::matches_semver_range(version, &range.events) {
666 return true;
667 }
668 }
669 RangeType::Ecosystem => {
670 if Self::matches_ecosystem_range(version, &range.events) {
671 return true;
672 }
673 }
674 RangeType::Git => {}
675 }
676 }
677 }
678 false
679 })
680 .collect()
681 }
682
683 pub async fn matches(
685 &self,
686 ecosystem: &str,
687 package: &str,
688 version: &str,
689 ) -> Result<Vec<Advisory>> {
690 self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
691 .await
692 }
693
694 pub async fn matches_with_options(
696 &self,
697 ecosystem: &str,
698 package: &str,
699 version: &str,
700 options: &MatchOptions,
701 ) -> Result<Vec<Advisory>> {
702 let advisories = self.query(ecosystem, package).await?;
703 let mut matched = Vec::new();
704
705 for mut advisory in advisories {
706 let mut is_vulnerable = false;
707 for affected in &advisory.affected {
708 if affected.package.name != package || affected.package.ecosystem != ecosystem {
709 continue;
710 }
711
712 if affected.versions.contains(&version.to_string()) {
714 is_vulnerable = true;
715 break;
716 }
717
718 for range in &affected.ranges {
720 match range.range_type {
721 RangeType::Semver => {
722 if Self::matches_semver_range(version, &range.events) {
723 is_vulnerable = true;
724 break;
725 }
726 }
727 RangeType::Ecosystem => {
728 if Self::matches_ecosystem_range(version, &range.events) {
729 is_vulnerable = true;
730 break;
731 }
732 }
733 RangeType::Git => {
734 }
736 }
737 }
738 if is_vulnerable {
739 break;
740 }
741 }
742
743 if is_vulnerable {
744 if options.include_enrichment {
746 self.enrich_advisory(&mut advisory).await?;
747 }
748
749 if self.advisory_passes_filters(&advisory, options) {
751 matched.push(advisory);
752 }
753 }
754 }
755
756 Ok(matched)
757 }
758
759 fn matches_semver_range(version: &str, events: &[Event]) -> bool {
763 let Ok(v) = semver::Version::parse(version) else {
764 return false;
765 };
766
767 #[derive(Default)]
768 struct Interval {
769 start: Option<semver::Version>,
770 end: Option<semver::Version>,
771 end_inclusive: bool,
772 }
773
774 let mut intervals: Vec<Interval> = Vec::new();
775 let mut current_start: Option<semver::Version> = None;
776
777 for event in events {
778 match event {
779 Event::Introduced(ver) => {
780 if let Ok(parsed) = semver::Version::parse(ver) {
781 current_start = Some(parsed);
782 } else if ver == "0" {
783 current_start = Some(semver::Version::new(0, 0, 0));
784 }
785 }
786 Event::Fixed(ver) => {
787 let end = semver::Version::parse(ver).ok();
788 intervals.push(Interval {
789 start: current_start.clone(),
790 end,
791 end_inclusive: false,
792 });
793 current_start = None;
794 }
795 Event::LastAffected(ver) => {
796 let end = semver::Version::parse(ver).ok();
797 intervals.push(Interval {
798 start: current_start.clone(),
799 end,
800 end_inclusive: true,
801 });
802 current_start = None;
803 }
804 Event::Limit(ver) => {
805 let end = semver::Version::parse(ver).ok();
807 intervals.push(Interval {
808 start: current_start.clone(),
809 end,
810 end_inclusive: false,
811 });
812 current_start = None;
813 }
814 }
815 }
816
817 if current_start.is_some() {
819 intervals.push(Interval {
820 start: current_start,
821 end: None,
822 end_inclusive: false,
823 });
824 }
825
826 intervals.into_iter().any(|interval| {
827 if let Some(start) = &interval.start {
828 if v < *start {
829 return false;
830 }
831 }
832
833 match (&interval.end, interval.end_inclusive) {
834 (Some(end), true) => v <= *end,
835 (Some(end), false) => v < *end,
836 (None, _) => true,
837 }
838 })
839 }
840
841 fn matches_ecosystem_range(version: &str, events: &[Event]) -> bool {
844 if events.iter().all(|e| match e {
846 Event::Introduced(v) | Event::Fixed(v) | Event::LastAffected(v) | Event::Limit(v) => {
847 semver::Version::parse(v).is_ok() || v == "0"
848 }
849 }) {
850 return Self::matches_semver_range(version, events);
851 }
852
853 let version_parts = match Self::parse_dotted(version) {
854 Some(p) => p,
855 None => return false,
856 };
857
858 #[derive(Default)]
859 struct Interval {
860 start: Option<Vec<u64>>,
861 end: Option<Vec<u64>>,
862 end_inclusive: bool,
863 }
864
865 let mut intervals: Vec<Interval> = Vec::new();
866 let mut current_start: Option<Vec<u64>> = None;
867
868 for event in events {
869 match event {
870 Event::Introduced(ver) => {
871 current_start = Self::parse_dotted(ver);
872 }
873 Event::Fixed(ver) => {
874 intervals.push(Interval {
875 start: current_start.clone(),
876 end: Self::parse_dotted(ver),
877 end_inclusive: false,
878 });
879 current_start = None;
880 }
881 Event::LastAffected(ver) => {
882 intervals.push(Interval {
883 start: current_start.clone(),
884 end: Self::parse_dotted(ver),
885 end_inclusive: true,
886 });
887 current_start = None;
888 }
889 Event::Limit(ver) => {
890 intervals.push(Interval {
891 start: current_start.clone(),
892 end: Self::parse_dotted(ver),
893 end_inclusive: false,
894 });
895 current_start = None;
896 }
897 }
898 }
899
900 if current_start.is_some() {
901 intervals.push(Interval {
902 start: current_start,
903 end: None,
904 end_inclusive: false,
905 });
906 }
907
908 intervals.into_iter().any(|interval| {
909 if let Some(start) = &interval.start {
910 if Self::cmp_dotted(&version_parts, start) == Ordering::Less {
911 return false;
912 }
913 }
914
915 match (&interval.end, interval.end_inclusive) {
916 (Some(end), true) => Self::cmp_dotted(&version_parts, end) != Ordering::Greater,
917 (Some(end), false) => Self::cmp_dotted(&version_parts, end) == Ordering::Less,
918 (None, _) => true,
919 }
920 })
921 }
922
923 fn parse_dotted(v: &str) -> Option<Vec<u64>> {
925 let mut parts = Vec::new();
926 for chunk in v.split(|c: char| !c.is_ascii_digit()) {
927 if chunk.is_empty() {
928 continue;
929 }
930 let Ok(num) = chunk.parse::<u64>() else {
931 return None;
932 };
933 parts.push(num);
934 }
935 if parts.is_empty() { None } else { Some(parts) }
936 }
937
938 fn cmp_dotted(a: &[u64], b: &[u64]) -> Ordering {
940 let max_len = a.len().max(b.len());
941 for i in 0..max_len {
942 let ai = *a.get(i).unwrap_or(&0);
943 let bi = *b.get(i).unwrap_or(&0);
944 match ai.cmp(&bi) {
945 Ordering::Equal => continue,
946 ord => return ord,
947 }
948 }
949 Ordering::Equal
950 }
951
952 async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
954 let cve_ids = Self::extract_cve_ids(advisory);
956
957 if cve_ids.is_empty() {
958 return Ok(());
959 }
960
961 for cve_id in &cve_ids {
963 if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
964 let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
965 enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
966 enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
967 enrichment.is_kev = enrichment.is_kev || data.is_kev;
968 if data.kev_due_date.is_some() {
969 enrichment.kev_due_date = data
970 .kev_due_date
971 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
972 .map(|d| d.with_timezone(&chrono::Utc));
973 }
974 if data.kev_ransomware.is_some() {
975 enrichment.kev_ransomware = data.kev_ransomware;
976 }
977 }
978 }
979
980 Ok(())
981 }
982
983 async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
985 for advisory in advisories.iter_mut() {
986 self.enrich_advisory(advisory).await?;
987 }
988 Ok(())
989 }
990
991 fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
993 let mut cve_ids = Vec::new();
994
995 if advisory.id.starts_with("CVE-") {
996 cve_ids.push(advisory.id.clone());
997 }
998
999 if let Some(aliases) = &advisory.aliases {
1000 for alias in aliases {
1001 if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
1002 cve_ids.push(alias.clone());
1003 }
1004 }
1005 }
1006
1007 cve_ids
1008 }
1009
1010 fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
1012 if options.kev_only {
1014 let is_kev = advisory
1015 .enrichment
1016 .as_ref()
1017 .map(|e| e.is_kev)
1018 .unwrap_or(false);
1019 if !is_kev {
1020 return false;
1021 }
1022 }
1023
1024 if let Some(min_cvss) = options.min_cvss {
1026 let cvss = advisory
1027 .enrichment
1028 .as_ref()
1029 .and_then(|e| e.cvss_v3_score)
1030 .unwrap_or(0.0);
1031 if cvss < min_cvss {
1032 return false;
1033 }
1034 }
1035
1036 if let Some(min_epss) = options.min_epss {
1038 let epss = advisory
1039 .enrichment
1040 .as_ref()
1041 .and_then(|e| e.epss_score)
1042 .unwrap_or(0.0);
1043 if epss < min_epss {
1044 return false;
1045 }
1046 }
1047
1048 if let Some(min_severity) = &options.min_severity {
1050 let severity = advisory
1051 .enrichment
1052 .as_ref()
1053 .and_then(|e| e.cvss_v3_severity)
1054 .unwrap_or(Severity::None);
1055 if severity < *min_severity {
1056 return false;
1057 }
1058 }
1059
1060 true
1061 }
1062
1063 pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
1065 let scores = self.epss_source.fetch_scores(cve_ids).await?;
1066 Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
1067 }
1068
1069 pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
1071 if let Some(data) = self.store.get_enrichment(cve_id).await? {
1073 return Ok(data.is_kev);
1074 }
1075
1076 let entry = self.kev_source.is_kev(cve_id).await?;
1078 Ok(entry.is_some())
1079 }
1080
1081 pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
1119 let source = self.ossindex_source.as_ref().ok_or_else(|| {
1120 AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
1121 })?;
1122
1123 let mut cached_advisories = Vec::new();
1125 let mut cache_misses = Vec::new();
1126
1127 for purl in purls {
1128 let cache_key = Purl::cache_key_from_str(purl);
1129 match self.store.get_ossindex_cache(&cache_key).await {
1130 Ok(Some(cache)) if !cache.is_expired() => {
1131 debug!("OSS Index cache hit for {}", purl);
1132 cached_advisories.extend(cache.advisories);
1133 }
1134 _ => {
1135 debug!("OSS Index cache miss for {}", purl);
1136 cache_misses.push(purl.clone());
1137 }
1138 }
1139 }
1140
1141 if !cache_misses.is_empty() {
1143 debug!("Querying OSS Index for {} cache misses", cache_misses.len());
1144 let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
1145 AdvisoryError::SourceFetch {
1146 source_name: "ossindex".to_string(),
1147 message: e.to_string(),
1148 }
1149 })?;
1150
1151 let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
1153
1154 for (purl, advisories) in &advisory_map {
1156 let cache_key = Purl::cache_key_from_str(purl);
1157 let cache = OssIndexCache::new(advisories.clone());
1158 if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
1159 debug!("Failed to cache OSS Index result for {}: {}", purl, e);
1160 }
1161 }
1162
1163 for advisories in advisory_map.into_values() {
1165 cached_advisories.extend(advisories);
1166 }
1167 }
1168
1169 Ok(cached_advisories)
1170 }
1171
1172 pub async fn query_batch_with_ossindex(
1185 &self,
1186 packages: &[PackageKey],
1187 ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
1188 let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
1189
1190 let (with_version, without_version): (Vec<_>, Vec<_>) =
1192 packages.iter().partition(|p| p.version.is_some());
1193
1194 if !with_version.is_empty() && self.ossindex_source.is_some() {
1196 let purls: Vec<String> = with_version
1197 .iter()
1198 .map(|p| {
1199 Purl::new(&p.ecosystem, &p.name)
1200 .with_version(p.version.as_ref().unwrap())
1201 .to_string()
1202 })
1203 .collect();
1204
1205 match self.query_ossindex(&purls).await {
1206 Ok(advisories) => {
1207 for pkg in &with_version {
1209 let pkg_advisories: Vec<_> = advisories
1210 .iter()
1211 .filter(|a| {
1212 a.affected.iter().any(|aff| {
1213 aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
1214 && aff.package.name == pkg.name
1215 })
1216 })
1217 .cloned()
1218 .collect();
1219 results.insert((*pkg).clone(), pkg_advisories);
1220 }
1221 }
1222 Err(e) => {
1223 warn!("OSS Index query failed, falling back to local store: {}", e);
1224 for pkg in &with_version {
1226 let advisories = if let Some(version) = &pkg.version {
1227 self.matches(&pkg.ecosystem, &pkg.name, version).await?
1228 } else {
1229 self.query(&pkg.ecosystem, &pkg.name).await?
1230 };
1231 results.insert((*pkg).clone(), advisories);
1232 }
1233 }
1234 }
1235 }
1236
1237 for pkg in &without_version {
1239 let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
1240 results.insert((*pkg).clone(), advisories);
1241 }
1242
1243 Ok(results)
1244 }
1245
1246 pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
1250 for purl in purls {
1251 let cache_key = Purl::cache_key_from_str(purl);
1252 self.store.invalidate_ossindex_cache(&cache_key).await?;
1253 }
1254 Ok(())
1255 }
1256
1257 pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
1259 self.store.invalidate_all_ossindex_cache().await?;
1260 Ok(())
1261 }
1262
1263 fn group_advisories_by_purl(
1265 purls: &[String],
1266 advisories: &[Advisory],
1267 ) -> HashMap<String, Vec<Advisory>> {
1268 let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
1269
1270 for purl in purls {
1272 map.insert(purl.clone(), Vec::new());
1273 }
1274
1275 for advisory in advisories {
1276 for affected in &advisory.affected {
1277 for purl in purls {
1278 let Ok(parsed) = Purl::parse(purl) else {
1279 continue;
1280 };
1281
1282 let affected_eco = affected.package.ecosystem.to_lowercase();
1284 let purl_eco = parsed.purl_type.to_lowercase();
1285 let purl_eco_alt = parsed.ecosystem().to_lowercase();
1286 if affected_eco != purl_eco && affected_eco != purl_eco_alt {
1287 continue;
1288 }
1289
1290 if parsed.name != affected.package.name {
1291 continue;
1292 }
1293
1294 if let Some(ver) = parsed.version.as_deref() {
1295 let version_matches = affected.versions.contains(&ver.to_string())
1297 || affected.ranges.iter().any(|r| {
1298 matches!(r.range_type, RangeType::Semver | RangeType::Ecosystem)
1299 && Self::matches_semver_range(ver, &r.events)
1300 });
1301
1302 if !version_matches {
1303 continue;
1304 }
1305 }
1306
1307 map.entry(purl.clone()).or_default().push(advisory.clone());
1308 break;
1309 }
1310 }
1311 }
1312
1313 map
1314 }
1315}