1use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tracing::{debug, error, info, warn};
19
20#[derive(Debug, Clone, Default)]
22pub struct MatchOptions {
23 pub min_cvss: Option<f64>,
25 pub min_epss: Option<f64>,
27 pub kev_only: bool,
29 pub min_severity: Option<Severity>,
31 pub include_enrichment: bool,
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct SyncStats {
38 pub total_sources: usize,
40 pub successful_sources: usize,
42 pub failed_sources: usize,
44 pub total_advisories_synced: usize,
46 pub errors: HashMap<String, String>,
48}
49
50pub trait SyncObserver: Send + Sync {
52 fn on_sync_start(&self);
54
55 fn on_source_start(&self, source_name: &str);
57
58 fn on_source_success(&self, source_name: &str, count: usize);
60
61 fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError);
63
64 fn on_sync_complete(&self, stats: &SyncStats);
66}
67
68pub struct TracingSyncObserver;
70
71impl SyncObserver for TracingSyncObserver {
72 fn on_sync_start(&self) {
73 info!("Starting full vulnerability sync...");
74 }
75
76 fn on_source_start(&self, source_name: &str) {
77 debug!("Syncing {}...", source_name);
78 }
79
80 fn on_source_success(&self, source_name: &str, count: usize) {
81 if count > 0 {
82 info!(
83 "Successfully synced {} advisories from {}",
84 count, source_name
85 );
86 } else {
87 debug!(
88 "Successfully synced {} advisories from {}",
89 count, source_name
90 );
91 }
92 }
93
94 fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError) {
95 error!("Failed to sync {}: {}", source_name, error);
96 }
97
98 fn on_sync_complete(&self, _stats: &SyncStats) {
99 info!("Sync completed.");
100 }
101}
102
103impl MatchOptions {
104 pub fn with_enrichment() -> Self {
106 Self {
107 include_enrichment: true,
108 ..Default::default()
109 }
110 }
111
112 pub fn high_severity() -> Self {
114 Self {
115 min_severity: Some(Severity::High),
116 include_enrichment: true,
117 ..Default::default()
118 }
119 }
120
121 pub fn exploited_only() -> Self {
123 Self {
124 kev_only: true,
125 include_enrichment: true,
126 ..Default::default()
127 }
128 }
129}
130
131#[derive(Debug, Clone, Hash, PartialEq, Eq)]
133pub struct PackageKey {
134 pub ecosystem: String,
136 pub name: String,
138 pub version: Option<String>,
140}
141
142impl PackageKey {
143 pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
145 Self {
146 ecosystem: ecosystem.into(),
147 name: name.into(),
148 version: None,
149 }
150 }
151
152 pub fn with_version(
154 ecosystem: impl Into<String>,
155 name: impl Into<String>,
156 version: impl Into<String>,
157 ) -> Self {
158 Self {
159 ecosystem: ecosystem.into(),
160 name: name.into(),
161 version: Some(version.into()),
162 }
163 }
164}
165
166pub struct VulnerabilityManagerBuilder {
168 redis_url: Option<String>,
169 store_config: StoreConfig,
170 sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
171 custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
172 ossindex_source: Option<OssIndexSource>,
173 observer: Option<Arc<dyn SyncObserver>>,
174}
175
176impl Default for VulnerabilityManagerBuilder {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl VulnerabilityManagerBuilder {
183 pub fn new() -> Self {
185 Self {
186 redis_url: None,
187 store_config: StoreConfig::default(),
188 sources: Vec::new(),
189 custom_store: None,
190 ossindex_source: None,
191 observer: None,
192 }
193 }
194
195 pub fn redis_url(mut self, url: impl Into<String>) -> Self {
197 self.redis_url = Some(url.into());
198 self
199 }
200
201 pub fn store_config(mut self, config: StoreConfig) -> Self {
203 self.store_config = config;
204 self
205 }
206
207 pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
209 self.custom_store = Some(store);
210 self
211 }
212
213 pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
215 self.sources.push(source);
216 self
217 }
218
219 pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
221 self.sources.push(Arc::new(GHSASource::new(token.into())));
222 self
223 }
224
225 pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
227 self.sources.push(Arc::new(NVDSource::new(api_key)));
228 self
229 }
230
231 pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
233 self.sources.push(Arc::new(OSVSource::new(ecosystems)));
234 self
235 }
236
237 pub fn with_osv_defaults(self) -> Self {
239 self.with_osv(vec![
240 "npm".to_string(),
241 "PyPI".to_string(),
242 "Maven".to_string(),
243 "crates.io".to_string(),
244 "Go".to_string(),
245 "Packagist".to_string(),
246 "RubyGems".to_string(),
247 "NuGet".to_string(),
248 ])
249 }
250
251 pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
256 match OssIndexSource::new(config) {
257 Ok(source) => {
258 self.ossindex_source = Some(source);
259 }
260 Err(e) => {
261 warn!("Failed to configure OSS Index source: {}", e);
262 }
263 }
264 self
265 }
266
267 pub fn with_observer(mut self, observer: Arc<dyn SyncObserver>) -> Self {
269 self.observer = Some(observer);
270 self
271 }
272
273 pub fn build(self) -> Result<VulnerabilityManager> {
275 let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
276 Some(s) => s,
277 None => {
278 let url = self.redis_url.ok_or_else(|| {
279 AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
280 })?;
281 Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
282 }
283 };
284
285 if self.sources.is_empty() {
286 warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
287 }
288
289 Ok(VulnerabilityManager {
290 store,
291 sources: self.sources,
292 kev_source: KevSource::new(),
293 epss_source: EpssSource::new(),
294 ossindex_source: self.ossindex_source,
295 observer: self
296 .observer
297 .unwrap_or_else(|| Arc::new(TracingSyncObserver)),
298 })
299 }
300}
301
302pub struct VulnerabilityManager {
304 store: Arc<dyn AdvisoryStore + Send + Sync>,
305 sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
306 kev_source: KevSource,
307 epss_source: EpssSource,
308 ossindex_source: Option<OssIndexSource>,
309 observer: Arc<dyn SyncObserver>,
310}
311
312impl VulnerabilityManager {
313 pub async fn new(config: Config) -> Result<Self> {
317 let mut builder = VulnerabilityManagerBuilder::new()
318 .redis_url(&config.redis_url)
319 .store_config(config.store.clone());
320
321 builder = builder.with_osv_defaults();
323
324 builder = builder.with_nvd(config.nvd_api_key.clone());
326
327 if let Some(token) = &config.ghsa_token {
329 builder = builder.with_ghsa(token.clone());
330 }
331
332 if config.ossindex.is_some() {
334 builder = builder.with_ossindex(config.ossindex.clone());
335 }
336
337 builder.build()
338 }
339
340 pub fn builder() -> VulnerabilityManagerBuilder {
342 VulnerabilityManagerBuilder::new()
343 }
344
345 pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
347 &self.store
348 }
349
350 pub async fn health_check(&self) -> Result<HealthStatus> {
352 self.store.health_check().await
353 }
354
355 pub async fn sync_all(&self) -> Result<SyncStats> {
357 self.observer.on_sync_start();
358
359 let mut handles = Vec::new();
360 let mut stats = SyncStats {
361 total_sources: self.sources.len(),
362 ..Default::default()
363 };
364
365 for source in &self.sources {
366 let source = source.clone();
367 let store = self.store.clone();
368 let observer = self.observer.clone();
369
370 let handle = tokio::spawn(async move {
371 observer.on_source_start(source.name());
372
373 let last_sync = match store.last_sync(source.name()).await {
374 Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
375 Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
376 Err(_) => None,
377 },
378 _ => None,
379 };
380
381 match source.fetch(last_sync).await {
382 Ok(advisories) => {
383 if !advisories.is_empty() {
384 match store.upsert_batch(&advisories, source.name()).await {
385 Ok(_) => {
386 observer.on_source_success(source.name(), advisories.len());
387 if let Err(e) = store.update_sync_timestamp(source.name()).await
389 {
390 let err = AdvisoryError::source_fetch(
391 source.name(),
392 format!("Failed to update timestamp: {}", e),
393 );
394 observer.on_source_error(source.name(), &err);
395 }
398 Ok((source.name().to_string(), advisories.len()))
399 }
400 Err(e) => {
401 observer.on_source_error(source.name(), &e);
403 Err((source.name().to_string(), e.to_string()))
404 }
405 }
406 } else {
407 observer.on_source_success(source.name(), 0);
408 if let Err(e) = store.update_sync_timestamp(source.name()).await {
410 let err = AdvisoryError::source_fetch(
411 source.name(),
412 format!("Failed to update timestamp: {}", e),
413 );
414 observer.on_source_error(source.name(), &err);
415 }
416 Ok((source.name().to_string(), 0))
417 }
418 }
419 Err(e) => {
420 observer.on_source_error(source.name(), &e);
421 Err((source.name().to_string(), e.to_string()))
422 }
423 }
424 });
425 handles.push(handle);
426 }
427
428 for handle in handles {
430 match handle.await {
431 Ok(result) => match result {
432 Ok((_, count)) => {
433 stats.successful_sources += 1;
434 stats.total_advisories_synced += count;
435 }
436 Err((name, error)) => {
437 stats.failed_sources += 1;
438 stats.errors.insert(name, error);
439 }
440 },
441 Err(e) => {
442 error!("Task join error: {}", e);
444 stats.failed_sources += 1;
445 stats
446 .errors
447 .insert("unknown".to_string(), format!("Task join error: {}", e));
448 }
449 }
450 }
451
452 self.observer.on_sync_complete(&stats);
453 Ok(stats)
454 }
455
456 pub async fn reset_sync(&self, source: &str) -> Result<()> {
460 self.store.reset_sync_timestamp(source).await
461 }
462
463 pub async fn reset_all_syncs(&self) -> Result<()> {
465 for source in &self.sources {
466 self.store.reset_sync_timestamp(source.name()).await?;
467 }
468 Ok(())
469 }
470
471 pub async fn sync_enrichment(&self) -> Result<()> {
473 self.sync_enrichment_with_cves(&[]).await
474 }
475
476 pub async fn sync_enrichment_with_cves(&self, extra_cves: &[String]) -> Result<()> {
478 debug!("Syncing enrichment data (KEV, EPSS)...");
479
480 let mut enrichment: HashMap<String, EnrichmentData> = HashMap::new();
481
482 match self.kev_source.fetch_catalog().await {
484 Ok(kev_entries) => {
485 debug!("Processing {} KEV entries", kev_entries.len());
486 for (cve_id, entry) in kev_entries {
487 let data = enrichment
488 .entry(cve_id.clone())
489 .or_insert_with(|| EnrichmentData {
490 epss_score: None,
491 epss_percentile: None,
492 is_kev: false,
493 kev_due_date: None,
494 kev_date_added: None,
495 kev_ransomware: None,
496 updated_at: String::new(),
497 });
498
499 data.is_kev = true;
500 data.kev_due_date = entry.due_date_utc().map(|d| d.to_rfc3339());
501 data.kev_date_added = entry.date_added_utc().map(|d| d.to_rfc3339());
502 data.kev_ransomware = Some(entry.is_ransomware_related());
503 }
504 }
505 Err(e) => {
506 error!("Failed to fetch KEV catalog: {}", e);
507 }
508 }
509
510 let epss_targets = Self::collect_enrichment_targets(&enrichment, extra_cves);
512 if !epss_targets.is_empty() {
513 match self
514 .epss_source
515 .fetch_scores_batch(&epss_targets, 200)
516 .await
517 {
518 Ok(scores) => {
519 Self::merge_epss_scores(&mut enrichment, scores);
520 }
521 Err(e) => {
522 warn!("Failed to fetch EPSS scores: {}", e);
523 }
524 }
525 }
526
527 if !enrichment.is_empty() {
529 let now = chrono::Utc::now().to_rfc3339();
530 for (cve_id, mut data) in enrichment {
531 if data.updated_at.is_empty() {
532 data.updated_at = now.clone();
533 }
534 if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
535 debug!("Failed to store enrichment for {}: {}", cve_id, e);
536 }
537 }
538 }
539
540 Ok(())
541 }
542
543 fn collect_enrichment_targets(
545 current: &HashMap<String, EnrichmentData>,
546 extra: &[String],
547 ) -> Vec<String> {
548 let mut set: std::collections::HashSet<String> = current.keys().cloned().collect();
549 for c in extra {
550 set.insert(c.clone());
551 }
552 set.into_iter().collect()
553 }
554
555 fn merge_epss_scores(
557 enrichment: &mut HashMap<String, EnrichmentData>,
558 scores: HashMap<String, crate::sources::epss::EpssScore>,
559 ) {
560 for (cve_id, score) in scores {
561 let data = enrichment
562 .entry(cve_id.clone())
563 .or_insert_with(|| EnrichmentData {
564 epss_score: None,
565 epss_percentile: None,
566 is_kev: false,
567 kev_due_date: None,
568 kev_date_added: None,
569 kev_ransomware: None,
570 updated_at: String::new(),
571 });
572
573 data.epss_score = Some(score.epss);
574 data.epss_percentile = Some(score.percentile);
575 if let Some(date) = score.date_utc() {
576 data.updated_at = date.to_rfc3339();
577 }
578 }
579 }
580
581 pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
583 let advisories = self.store.get_by_package(ecosystem, package).await?;
584 Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
585 }
586
587 pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
589 let mut advisories = self.query(ecosystem, package).await?;
590 self.enrich_advisories(&mut advisories).await?;
591 Ok(advisories)
592 }
593
594 pub async fn query_batch(
598 &self,
599 packages: &[PackageKey],
600 ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
601 use futures_util::future::join_all;
602
603 let tasks: Vec<_> = packages
604 .iter()
605 .map(|pkg| {
606 let pkg = pkg.clone();
607 let ecosystem = pkg.ecosystem.clone();
608 let name = pkg.name.clone();
609 let version = pkg.version.clone();
610 let store = self.store.clone();
611
612 async move {
613 let advisories = if let Some(ver) = &version {
614 let all = store.get_by_package(&ecosystem, &name).await?;
616 let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
617 Self::filter_by_version(aggregated, &ecosystem, &name, ver)
618 } else {
619 let all = store.get_by_package(&ecosystem, &name).await?;
620 crate::aggregator::ReportAggregator::aggregate(all)
621 };
622 Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
623 }
624 })
625 .collect();
626
627 let results: Vec<_> = join_all(tasks).await;
628
629 let mut map = HashMap::new();
630 for result in results {
631 match result {
632 Ok((pkg, advisories)) => {
633 map.insert(pkg, advisories);
634 }
635 Err(e) => {
636 warn!("Batch query error: {}", e);
637 }
638 }
639 }
640
641 Ok(map)
642 }
643
644 fn filter_by_version(
646 advisories: Vec<Advisory>,
647 ecosystem: &str,
648 package: &str,
649 version: &str,
650 ) -> Vec<Advisory> {
651 advisories
652 .into_iter()
653 .filter(|advisory| {
654 for affected in &advisory.affected {
655 if affected.package.name != package || affected.package.ecosystem != ecosystem {
656 continue;
657 }
658
659 if affected.versions.contains(&version.to_string()) {
661 return true;
662 }
663
664 for range in &affected.ranges {
666 match range.range_type {
667 RangeType::Semver => {
668 if Self::matches_semver_range(version, &range.events) {
669 return true;
670 }
671 }
672 RangeType::Ecosystem => {
673 if Self::matches_ecosystem_range(version, &range.events) {
674 return true;
675 }
676 }
677 RangeType::Git => {}
678 }
679 }
680 }
681 false
682 })
683 .collect()
684 }
685
686 pub async fn matches(
688 &self,
689 ecosystem: &str,
690 package: &str,
691 version: &str,
692 ) -> Result<Vec<Advisory>> {
693 self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
694 .await
695 }
696
697 pub async fn matches_with_options(
699 &self,
700 ecosystem: &str,
701 package: &str,
702 version: &str,
703 options: &MatchOptions,
704 ) -> Result<Vec<Advisory>> {
705 let advisories = self.query(ecosystem, package).await?;
706 let mut matched = Vec::new();
707
708 for mut advisory in advisories {
709 let mut is_vulnerable = false;
710 for affected in &advisory.affected {
711 if affected.package.name != package || affected.package.ecosystem != ecosystem {
712 continue;
713 }
714
715 if affected.versions.contains(&version.to_string()) {
717 is_vulnerable = true;
718 break;
719 }
720
721 for range in &affected.ranges {
723 match range.range_type {
724 RangeType::Semver => {
725 if Self::matches_semver_range(version, &range.events) {
726 is_vulnerable = true;
727 break;
728 }
729 }
730 RangeType::Ecosystem => {
731 if Self::matches_ecosystem_range(version, &range.events) {
732 is_vulnerable = true;
733 break;
734 }
735 }
736 RangeType::Git => {
737 }
739 }
740 }
741 if is_vulnerable {
742 break;
743 }
744 }
745
746 if is_vulnerable {
747 if options.include_enrichment {
749 self.enrich_advisory(&mut advisory).await?;
750 }
751
752 if self.advisory_passes_filters(&advisory, options) {
754 matched.push(advisory);
755 }
756 }
757 }
758
759 Ok(matched)
760 }
761
762 fn matches_semver_range(version: &str, events: &[Event]) -> bool {
766 let Ok(v) = semver::Version::parse(version) else {
767 return false;
768 };
769
770 #[derive(Default)]
771 struct Interval {
772 start: Option<semver::Version>,
773 end: Option<semver::Version>,
774 end_inclusive: bool,
775 }
776
777 let mut intervals: Vec<Interval> = Vec::new();
778 let mut current_start: Option<semver::Version> = None;
779
780 for event in events {
781 match event {
782 Event::Introduced(ver) => {
783 if let Ok(parsed) = semver::Version::parse(ver) {
784 current_start = Some(parsed);
785 } else if ver == "0" {
786 current_start = Some(semver::Version::new(0, 0, 0));
787 }
788 }
789 Event::Fixed(ver) => {
790 let end = semver::Version::parse(ver).ok();
791 intervals.push(Interval {
792 start: current_start.clone(),
793 end,
794 end_inclusive: false,
795 });
796 current_start = None;
797 }
798 Event::LastAffected(ver) => {
799 let end = semver::Version::parse(ver).ok();
800 intervals.push(Interval {
801 start: current_start.clone(),
802 end,
803 end_inclusive: true,
804 });
805 current_start = None;
806 }
807 Event::Limit(ver) => {
808 let end = semver::Version::parse(ver).ok();
810 intervals.push(Interval {
811 start: current_start.clone(),
812 end,
813 end_inclusive: false,
814 });
815 current_start = None;
816 }
817 }
818 }
819
820 if current_start.is_some() {
822 intervals.push(Interval {
823 start: current_start,
824 end: None,
825 end_inclusive: false,
826 });
827 }
828
829 intervals.into_iter().any(|interval| {
830 if let Some(start) = &interval.start {
831 if v < *start {
832 return false;
833 }
834 }
835
836 match (&interval.end, interval.end_inclusive) {
837 (Some(end), true) => v <= *end,
838 (Some(end), false) => v < *end,
839 (None, _) => true,
840 }
841 })
842 }
843
844 fn matches_ecosystem_range(version: &str, events: &[Event]) -> bool {
847 if events.iter().all(|e| match e {
849 Event::Introduced(v) | Event::Fixed(v) | Event::LastAffected(v) | Event::Limit(v) => {
850 semver::Version::parse(v).is_ok() || v == "0"
851 }
852 }) {
853 return Self::matches_semver_range(version, events);
854 }
855
856 let version_parts = match Self::parse_dotted(version) {
857 Some(p) => p,
858 None => return false,
859 };
860
861 #[derive(Default)]
862 struct Interval {
863 start: Option<Vec<u64>>,
864 end: Option<Vec<u64>>,
865 end_inclusive: bool,
866 }
867
868 let mut intervals: Vec<Interval> = Vec::new();
869 let mut current_start: Option<Vec<u64>> = None;
870
871 for event in events {
872 match event {
873 Event::Introduced(ver) => {
874 current_start = Self::parse_dotted(ver);
875 }
876 Event::Fixed(ver) => {
877 intervals.push(Interval {
878 start: current_start.clone(),
879 end: Self::parse_dotted(ver),
880 end_inclusive: false,
881 });
882 current_start = None;
883 }
884 Event::LastAffected(ver) => {
885 intervals.push(Interval {
886 start: current_start.clone(),
887 end: Self::parse_dotted(ver),
888 end_inclusive: true,
889 });
890 current_start = None;
891 }
892 Event::Limit(ver) => {
893 intervals.push(Interval {
894 start: current_start.clone(),
895 end: Self::parse_dotted(ver),
896 end_inclusive: false,
897 });
898 current_start = None;
899 }
900 }
901 }
902
903 if current_start.is_some() {
904 intervals.push(Interval {
905 start: current_start,
906 end: None,
907 end_inclusive: false,
908 });
909 }
910
911 intervals.into_iter().any(|interval| {
912 if let Some(start) = &interval.start {
913 if Self::cmp_dotted(&version_parts, start) == Ordering::Less {
914 return false;
915 }
916 }
917
918 match (&interval.end, interval.end_inclusive) {
919 (Some(end), true) => Self::cmp_dotted(&version_parts, end) != Ordering::Greater,
920 (Some(end), false) => Self::cmp_dotted(&version_parts, end) == Ordering::Less,
921 (None, _) => true,
922 }
923 })
924 }
925
926 fn parse_dotted(v: &str) -> Option<Vec<u64>> {
928 let mut parts = Vec::new();
929 for chunk in v.split(|c: char| !c.is_ascii_digit()) {
930 if chunk.is_empty() {
931 continue;
932 }
933 let Ok(num) = chunk.parse::<u64>() else {
934 return None;
935 };
936 parts.push(num);
937 }
938 if parts.is_empty() { None } else { Some(parts) }
939 }
940
941 fn cmp_dotted(a: &[u64], b: &[u64]) -> Ordering {
943 let max_len = a.len().max(b.len());
944 for i in 0..max_len {
945 let ai = *a.get(i).unwrap_or(&0);
946 let bi = *b.get(i).unwrap_or(&0);
947 match ai.cmp(&bi) {
948 Ordering::Equal => continue,
949 ord => return ord,
950 }
951 }
952 Ordering::Equal
953 }
954
955 async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
957 let cve_ids = Self::extract_cve_ids(advisory);
959
960 if cve_ids.is_empty() {
961 return Ok(());
962 }
963
964 for cve_id in &cve_ids {
966 if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
967 let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
968 enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
969 enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
970 enrichment.is_kev = enrichment.is_kev || data.is_kev;
971 if data.kev_due_date.is_some() {
972 enrichment.kev_due_date = data
973 .kev_due_date
974 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
975 .map(|d| d.with_timezone(&chrono::Utc));
976 }
977 if data.kev_ransomware.is_some() {
978 enrichment.kev_ransomware = data.kev_ransomware;
979 }
980 }
981 }
982
983 Ok(())
984 }
985
986 async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
988 for advisory in advisories.iter_mut() {
989 self.enrich_advisory(advisory).await?;
990 }
991 Ok(())
992 }
993
994 fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
996 let mut cve_ids = Vec::new();
997
998 if advisory.id.starts_with("CVE-") {
999 cve_ids.push(advisory.id.clone());
1000 }
1001
1002 if let Some(aliases) = &advisory.aliases {
1003 for alias in aliases {
1004 if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
1005 cve_ids.push(alias.clone());
1006 }
1007 }
1008 }
1009
1010 cve_ids
1011 }
1012
1013 fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
1015 if options.kev_only {
1017 let is_kev = advisory
1018 .enrichment
1019 .as_ref()
1020 .map(|e| e.is_kev)
1021 .unwrap_or(false);
1022 if !is_kev {
1023 return false;
1024 }
1025 }
1026
1027 if let Some(min_cvss) = options.min_cvss {
1029 let cvss = advisory
1030 .enrichment
1031 .as_ref()
1032 .and_then(|e| e.cvss_v3_score)
1033 .unwrap_or(0.0);
1034 if cvss < min_cvss {
1035 return false;
1036 }
1037 }
1038
1039 if let Some(min_epss) = options.min_epss {
1041 let epss = advisory
1042 .enrichment
1043 .as_ref()
1044 .and_then(|e| e.epss_score)
1045 .unwrap_or(0.0);
1046 if epss < min_epss {
1047 return false;
1048 }
1049 }
1050
1051 if let Some(min_severity) = &options.min_severity {
1053 let severity = advisory
1054 .enrichment
1055 .as_ref()
1056 .and_then(|e| e.cvss_v3_severity)
1057 .unwrap_or(Severity::None);
1058 if severity < *min_severity {
1059 return false;
1060 }
1061 }
1062
1063 true
1064 }
1065
1066 pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
1068 let scores = self.epss_source.fetch_scores(cve_ids).await?;
1069 Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
1070 }
1071
1072 pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
1074 if let Some(data) = self.store.get_enrichment(cve_id).await? {
1076 return Ok(data.is_kev);
1077 }
1078
1079 let entry = self.kev_source.is_kev(cve_id).await?;
1081 Ok(entry.is_some())
1082 }
1083
1084 pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
1122 let source = self.ossindex_source.as_ref().ok_or_else(|| {
1123 AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
1124 })?;
1125
1126 let mut cached_advisories = Vec::new();
1128 let mut cache_misses = Vec::new();
1129
1130 for purl in purls {
1131 let cache_key = Purl::cache_key_from_str(purl);
1132 match self.store.get_ossindex_cache(&cache_key).await {
1133 Ok(Some(cache)) if !cache.is_expired() => {
1134 debug!("OSS Index cache hit for {}", purl);
1135 cached_advisories.extend(cache.advisories);
1136 }
1137 _ => {
1138 debug!("OSS Index cache miss for {}", purl);
1139 cache_misses.push(purl.clone());
1140 }
1141 }
1142 }
1143
1144 if !cache_misses.is_empty() {
1146 debug!("Querying OSS Index for {} cache misses", cache_misses.len());
1147 let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
1148 AdvisoryError::SourceFetch {
1149 source_name: "ossindex".to_string(),
1150 message: e.to_string(),
1151 }
1152 })?;
1153
1154 let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
1156
1157 for (purl, advisories) in &advisory_map {
1159 let cache_key = Purl::cache_key_from_str(purl);
1160 let cache = OssIndexCache::new(advisories.clone());
1161 if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
1162 debug!("Failed to cache OSS Index result for {}: {}", purl, e);
1163 }
1164 }
1165
1166 for advisories in advisory_map.into_values() {
1168 cached_advisories.extend(advisories);
1169 }
1170 }
1171
1172 Ok(cached_advisories)
1173 }
1174
1175 pub async fn query_batch_with_ossindex(
1188 &self,
1189 packages: &[PackageKey],
1190 ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
1191 let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
1192
1193 let (with_version, without_version): (Vec<_>, Vec<_>) =
1195 packages.iter().partition(|p| p.version.is_some());
1196
1197 if !with_version.is_empty() && self.ossindex_source.is_some() {
1199 let purls: Vec<String> = with_version
1200 .iter()
1201 .map(|p| {
1202 Purl::new(&p.ecosystem, &p.name)
1203 .with_version(p.version.as_ref().unwrap())
1204 .to_string()
1205 })
1206 .collect();
1207
1208 match self.query_ossindex(&purls).await {
1209 Ok(advisories) => {
1210 for pkg in &with_version {
1212 let pkg_advisories: Vec<_> = advisories
1213 .iter()
1214 .filter(|a| {
1215 a.affected.iter().any(|aff| {
1216 aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
1217 && aff.package.name == pkg.name
1218 })
1219 })
1220 .cloned()
1221 .collect();
1222 results.insert((*pkg).clone(), pkg_advisories);
1223 }
1224 }
1225 Err(e) => {
1226 warn!("OSS Index query failed, falling back to local store: {}", e);
1227 for pkg in &with_version {
1229 let advisories = if let Some(version) = &pkg.version {
1230 self.matches(&pkg.ecosystem, &pkg.name, version).await?
1231 } else {
1232 self.query(&pkg.ecosystem, &pkg.name).await?
1233 };
1234 results.insert((*pkg).clone(), advisories);
1235 }
1236 }
1237 }
1238 }
1239
1240 for pkg in &without_version {
1242 let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
1243 results.insert((*pkg).clone(), advisories);
1244 }
1245
1246 Ok(results)
1247 }
1248
1249 pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
1253 for purl in purls {
1254 let cache_key = Purl::cache_key_from_str(purl);
1255 self.store.invalidate_ossindex_cache(&cache_key).await?;
1256 }
1257 Ok(())
1258 }
1259
1260 pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
1262 self.store.invalidate_all_ossindex_cache().await?;
1263 Ok(())
1264 }
1265
1266 pub async fn suggest_remediation(
1295 &self,
1296 ecosystem: &str,
1297 package: &str,
1298 current_version: &str,
1299 ) -> Result<crate::remediation::Remediation> {
1300 let advisories = self.matches(ecosystem, package, current_version).await?;
1302
1303 let remediation = crate::remediation::build_remediation(
1305 ecosystem,
1306 package,
1307 current_version,
1308 &advisories,
1309 None, Self::matches_semver_range,
1311 );
1312
1313 Ok(remediation)
1314 }
1315
1316 pub async fn suggest_remediation_with_registry(
1344 &self,
1345 ecosystem: &str,
1346 package: &str,
1347 current_version: &str,
1348 registry: &dyn crate::version_registry::VersionRegistry,
1349 ) -> Result<crate::remediation::Remediation> {
1350 let advisories = self.matches(ecosystem, package, current_version).await?;
1352
1353 let available_versions = match registry.get_versions(ecosystem, package).await {
1355 Ok(versions) => Some(versions),
1356 Err(e) => {
1357 warn!(
1358 "Failed to fetch versions from registry, using advisory data only: {}",
1359 e
1360 );
1361 None
1362 }
1363 };
1364
1365 let remediation = crate::remediation::build_remediation(
1367 ecosystem,
1368 package,
1369 current_version,
1370 &advisories,
1371 available_versions.as_deref(),
1372 Self::matches_semver_range,
1373 );
1374
1375 Ok(remediation)
1376 }
1377
1378 fn group_advisories_by_purl(
1380 purls: &[String],
1381 advisories: &[Advisory],
1382 ) -> HashMap<String, Vec<Advisory>> {
1383 let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
1384
1385 for purl in purls {
1387 map.insert(purl.clone(), Vec::new());
1388 }
1389
1390 for advisory in advisories {
1391 for affected in &advisory.affected {
1392 for purl in purls {
1393 let Ok(parsed) = Purl::parse(purl) else {
1394 continue;
1395 };
1396
1397 let affected_eco = affected.package.ecosystem.to_lowercase();
1399 let purl_eco = parsed.purl_type.to_lowercase();
1400 let purl_eco_alt = parsed.ecosystem().to_lowercase();
1401 if affected_eco != purl_eco && affected_eco != purl_eco_alt {
1402 continue;
1403 }
1404
1405 if parsed.name != affected.package.name {
1406 continue;
1407 }
1408
1409 if let Some(ver) = parsed.version.as_deref() {
1410 let version_matches = affected.versions.contains(&ver.to_string())
1412 || affected.ranges.iter().any(|r| {
1413 matches!(r.range_type, RangeType::Semver | RangeType::Ecosystem)
1414 && Self::matches_semver_range(ver, &r.events)
1415 });
1416
1417 if !version_matches {
1418 continue;
1419 }
1420 }
1421
1422 map.entry(purl.clone()).or_default().push(advisory.clone());
1423 break;
1424 }
1425 }
1426 }
1427
1428 map
1429 }
1430}