1use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Clone, Default)]
21pub struct MatchOptions {
22 pub min_cvss: Option<f64>,
24 pub min_epss: Option<f64>,
26 pub kev_only: bool,
28 pub min_severity: Option<Severity>,
30 pub include_enrichment: bool,
32}
33
34impl MatchOptions {
35 pub fn with_enrichment() -> Self {
37 Self {
38 include_enrichment: true,
39 ..Default::default()
40 }
41 }
42
43 pub fn high_severity() -> Self {
45 Self {
46 min_severity: Some(Severity::High),
47 include_enrichment: true,
48 ..Default::default()
49 }
50 }
51
52 pub fn exploited_only() -> Self {
54 Self {
55 kev_only: true,
56 include_enrichment: true,
57 ..Default::default()
58 }
59 }
60}
61
62#[derive(Debug, Clone, Hash, PartialEq, Eq)]
64pub struct PackageKey {
65 pub ecosystem: String,
67 pub name: String,
69 pub version: Option<String>,
71}
72
73impl PackageKey {
74 pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
76 Self {
77 ecosystem: ecosystem.into(),
78 name: name.into(),
79 version: None,
80 }
81 }
82
83 pub fn with_version(
85 ecosystem: impl Into<String>,
86 name: impl Into<String>,
87 version: impl Into<String>,
88 ) -> Self {
89 Self {
90 ecosystem: ecosystem.into(),
91 name: name.into(),
92 version: Some(version.into()),
93 }
94 }
95}
96
97pub struct VulnerabilityManagerBuilder {
99 redis_url: Option<String>,
100 store_config: StoreConfig,
101 sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
102 custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
103 ossindex_source: Option<OssIndexSource>,
104}
105
106impl Default for VulnerabilityManagerBuilder {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl VulnerabilityManagerBuilder {
113 pub fn new() -> Self {
115 Self {
116 redis_url: None,
117 store_config: StoreConfig::default(),
118 sources: Vec::new(),
119 custom_store: None,
120 ossindex_source: None,
121 }
122 }
123
124 pub fn redis_url(mut self, url: impl Into<String>) -> Self {
126 self.redis_url = Some(url.into());
127 self
128 }
129
130 pub fn store_config(mut self, config: StoreConfig) -> Self {
132 self.store_config = config;
133 self
134 }
135
136 pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
138 self.custom_store = Some(store);
139 self
140 }
141
142 pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
144 self.sources.push(source);
145 self
146 }
147
148 pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
150 self.sources.push(Arc::new(GHSASource::new(token.into())));
151 self
152 }
153
154 pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
156 self.sources.push(Arc::new(NVDSource::new(api_key)));
157 self
158 }
159
160 pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
162 self.sources.push(Arc::new(OSVSource::new(ecosystems)));
163 self
164 }
165
166 pub fn with_osv_defaults(self) -> Self {
168 self.with_osv(vec![
169 "npm".to_string(),
170 "PyPI".to_string(),
171 "Maven".to_string(),
172 "crates.io".to_string(),
173 "Go".to_string(),
174 "Packagist".to_string(),
175 "RubyGems".to_string(),
176 "NuGet".to_string(),
177 ])
178 }
179
180 pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
185 match OssIndexSource::new(config) {
186 Ok(source) => {
187 self.ossindex_source = Some(source);
188 }
189 Err(e) => {
190 warn!("Failed to configure OSS Index source: {}", e);
191 }
192 }
193 self
194 }
195
196 pub fn build(self) -> Result<VulnerabilityManager> {
198 let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
199 Some(s) => s,
200 None => {
201 let url = self.redis_url.ok_or_else(|| {
202 AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
203 })?;
204 Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
205 }
206 };
207
208 if self.sources.is_empty() {
209 warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
210 }
211
212 Ok(VulnerabilityManager {
213 store,
214 sources: self.sources,
215 kev_source: KevSource::new(),
216 epss_source: EpssSource::new(),
217 ossindex_source: self.ossindex_source,
218 })
219 }
220}
221
222pub struct VulnerabilityManager {
224 store: Arc<dyn AdvisoryStore + Send + Sync>,
225 sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
226 kev_source: KevSource,
227 epss_source: EpssSource,
228 ossindex_source: Option<OssIndexSource>,
229}
230
231impl VulnerabilityManager {
232 pub async fn new(config: Config) -> Result<Self> {
236 let mut builder = VulnerabilityManagerBuilder::new()
237 .redis_url(&config.redis_url)
238 .store_config(config.store.clone());
239
240 builder = builder.with_osv_defaults();
242
243 builder = builder.with_nvd(config.nvd_api_key.clone());
245
246 if let Some(token) = &config.ghsa_token {
248 builder = builder.with_ghsa(token.clone());
249 }
250
251 if config.ossindex.is_some() {
253 builder = builder.with_ossindex(config.ossindex.clone());
254 }
255
256 builder.build()
257 }
258
259 pub fn builder() -> VulnerabilityManagerBuilder {
261 VulnerabilityManagerBuilder::new()
262 }
263
264 pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
266 &self.store
267 }
268
269 pub async fn health_check(&self) -> Result<HealthStatus> {
271 self.store.health_check().await
272 }
273
274 pub async fn sync_all(&self) -> Result<()> {
276 info!("Starting full vulnerability sync...");
277
278 let mut handles = Vec::new();
279
280 for source in &self.sources {
281 let source = source.clone();
282 let store = self.store.clone();
283
284 let handle = tokio::spawn(async move {
285 let last_sync = match store.last_sync(source.name()).await {
286 Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
287 Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
288 Err(_) => None,
289 },
290 _ => None,
291 };
292
293 if let Some(since) = last_sync {
294 info!("Syncing {} since {}", source.name(), since);
295 } else {
296 info!("Syncing {} (full)", source.name());
297 }
298
299 match source.fetch(last_sync).await {
300 Ok(advisories) => {
301 if !advisories.is_empty() {
302 match store.upsert_batch(&advisories, source.name()).await {
303 Ok(_) => {
304 info!(
305 "Successfully synced {} advisories from {}",
306 advisories.len(),
307 source.name()
308 );
309 if let Err(e) = store.update_sync_timestamp(source.name()).await
311 {
312 error!(
313 "Failed to update sync timestamp for {}: {}",
314 source.name(),
315 e
316 );
317 }
318 }
319 Err(e) => {
320 error!(
321 "Failed to store advisories for {}: {}",
322 source.name(),
323 e
324 );
325 }
327 }
328 } else {
329 info!("No new advisories for {}", source.name());
330 if let Err(e) = store.update_sync_timestamp(source.name()).await {
332 error!(
333 "Failed to update sync timestamp for {}: {}",
334 source.name(),
335 e
336 );
337 }
338 }
339 }
340 Err(e) => {
341 error!("Failed to fetch from {}: {}", source.name(), e);
342 }
344 }
345 });
346 handles.push(handle);
347 }
348
349 for handle in handles {
351 if let Err(e) = handle.await {
352 error!("Task join error: {}", e);
353 }
354 }
355
356 info!("Sync completed.");
357 Ok(())
358 }
359
360 pub async fn reset_sync(&self, source: &str) -> Result<()> {
364 self.store.reset_sync_timestamp(source).await
365 }
366
367 pub async fn reset_all_syncs(&self) -> Result<()> {
369 for source in &self.sources {
370 self.store.reset_sync_timestamp(source.name()).await?;
371 }
372 Ok(())
373 }
374
375 pub async fn sync_enrichment(&self) -> Result<()> {
377 info!("Syncing enrichment data (KEV, EPSS)...");
378
379 match self.kev_source.fetch_catalog().await {
381 Ok(kev_entries) => {
382 info!("Processing {} KEV entries", kev_entries.len());
383 for (cve_id, entry) in kev_entries {
384 let data = EnrichmentData {
385 epss_score: None,
386 epss_percentile: None,
387 is_kev: true,
388 kev_due_date: entry.due_date_utc().map(|d| d.to_rfc3339()),
389 kev_date_added: entry.date_added_utc().map(|d| d.to_rfc3339()),
390 kev_ransomware: Some(entry.is_ransomware_related()),
391 updated_at: chrono::Utc::now().to_rfc3339(),
392 };
393 if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
394 debug!("Failed to store KEV enrichment for {}: {}", cve_id, e);
395 }
396 }
397 }
398 Err(e) => {
399 error!("Failed to fetch KEV catalog: {}", e);
400 }
401 }
402
403 Ok(())
404 }
405
406 pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
408 let advisories = self.store.get_by_package(ecosystem, package).await?;
409 Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
410 }
411
412 pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
414 let mut advisories = self.query(ecosystem, package).await?;
415 self.enrich_advisories(&mut advisories).await?;
416 Ok(advisories)
417 }
418
419 pub async fn query_batch(
423 &self,
424 packages: &[PackageKey],
425 ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
426 use futures_util::future::join_all;
427
428 let tasks: Vec<_> = packages
429 .iter()
430 .map(|pkg| {
431 let pkg = pkg.clone();
432 let ecosystem = pkg.ecosystem.clone();
433 let name = pkg.name.clone();
434 let version = pkg.version.clone();
435 let store = self.store.clone();
436
437 async move {
438 let advisories = if let Some(ver) = &version {
439 let all = store.get_by_package(&ecosystem, &name).await?;
441 let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
442 Self::filter_by_version(aggregated, &ecosystem, &name, ver)
443 } else {
444 let all = store.get_by_package(&ecosystem, &name).await?;
445 crate::aggregator::ReportAggregator::aggregate(all)
446 };
447 Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
448 }
449 })
450 .collect();
451
452 let results: Vec<_> = join_all(tasks).await;
453
454 let mut map = HashMap::new();
455 for result in results {
456 match result {
457 Ok((pkg, advisories)) => {
458 map.insert(pkg, advisories);
459 }
460 Err(e) => {
461 warn!("Batch query error: {}", e);
462 }
463 }
464 }
465
466 Ok(map)
467 }
468
469 fn filter_by_version(
471 advisories: Vec<Advisory>,
472 ecosystem: &str,
473 package: &str,
474 version: &str,
475 ) -> Vec<Advisory> {
476 advisories
477 .into_iter()
478 .filter(|advisory| {
479 for affected in &advisory.affected {
480 if affected.package.name != package || affected.package.ecosystem != ecosystem {
481 continue;
482 }
483
484 if affected.versions.contains(&version.to_string()) {
486 return true;
487 }
488
489 for range in &affected.ranges {
491 match range.range_type {
492 RangeType::Semver | RangeType::Ecosystem => {
493 if Self::matches_semver_range(version, &range.events) {
494 return true;
495 }
496 }
497 RangeType::Git => {}
498 }
499 }
500 }
501 false
502 })
503 .collect()
504 }
505
506 pub async fn matches(
508 &self,
509 ecosystem: &str,
510 package: &str,
511 version: &str,
512 ) -> Result<Vec<Advisory>> {
513 self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
514 .await
515 }
516
517 pub async fn matches_with_options(
519 &self,
520 ecosystem: &str,
521 package: &str,
522 version: &str,
523 options: &MatchOptions,
524 ) -> Result<Vec<Advisory>> {
525 let advisories = self.query(ecosystem, package).await?;
526 let mut matched = Vec::new();
527
528 for mut advisory in advisories {
529 let mut is_vulnerable = false;
530 for affected in &advisory.affected {
531 if affected.package.name != package || affected.package.ecosystem != ecosystem {
532 continue;
533 }
534
535 if affected.versions.contains(&version.to_string()) {
537 is_vulnerable = true;
538 break;
539 }
540
541 for range in &affected.ranges {
543 match range.range_type {
544 RangeType::Semver => {
545 if Self::matches_semver_range(version, &range.events) {
546 is_vulnerable = true;
547 break;
548 }
549 }
550 RangeType::Ecosystem => {
551 if Self::matches_semver_range(version, &range.events) {
553 is_vulnerable = true;
554 break;
555 }
556 }
557 RangeType::Git => {
558 }
560 }
561 }
562 if is_vulnerable {
563 break;
564 }
565 }
566
567 if is_vulnerable {
568 if options.include_enrichment {
570 self.enrich_advisory(&mut advisory).await?;
571 }
572
573 if self.advisory_passes_filters(&advisory, options) {
575 matched.push(advisory);
576 }
577 }
578 }
579
580 Ok(matched)
581 }
582
583 fn matches_semver_range(version: &str, events: &[Event]) -> bool {
585 let Ok(v) = semver::Version::parse(version) else {
586 return false;
587 };
588
589 let mut introduced: Option<semver::Version> = None;
590 let mut fixed: Option<semver::Version> = None;
591 let mut last_affected: Option<semver::Version> = None;
592
593 for event in events {
594 match event {
595 Event::Introduced(ver) => {
596 if let Ok(parsed) = semver::Version::parse(ver) {
597 introduced = Some(parsed);
598 } else if ver == "0" {
599 introduced = Some(semver::Version::new(0, 0, 0));
600 }
601 }
602 Event::Fixed(ver) => {
603 if let Ok(parsed) = semver::Version::parse(ver) {
604 fixed = Some(parsed);
605 }
606 }
607 Event::LastAffected(ver) => {
608 if let Ok(parsed) = semver::Version::parse(ver) {
609 last_affected = Some(parsed);
610 }
611 }
612 Event::Limit(_) => {}
613 }
614 }
615
616 match (introduced, fixed, last_affected) {
617 (Some(start), Some(end), _) => v >= start && v < end,
618 (Some(start), None, Some(last)) => v >= start && v <= last,
619 (Some(start), None, None) => v >= start,
620 (None, Some(end), _) => v < end,
621 _ => false,
622 }
623 }
624
625 async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
627 let cve_ids = Self::extract_cve_ids(advisory);
629
630 if cve_ids.is_empty() {
631 return Ok(());
632 }
633
634 for cve_id in &cve_ids {
636 if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
637 let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
638 enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
639 enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
640 enrichment.is_kev = enrichment.is_kev || data.is_kev;
641 if data.kev_due_date.is_some() {
642 enrichment.kev_due_date = data
643 .kev_due_date
644 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
645 .map(|d| d.with_timezone(&chrono::Utc));
646 }
647 if data.kev_ransomware.is_some() {
648 enrichment.kev_ransomware = data.kev_ransomware;
649 }
650 }
651 }
652
653 Ok(())
654 }
655
656 async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
658 for advisory in advisories.iter_mut() {
659 self.enrich_advisory(advisory).await?;
660 }
661 Ok(())
662 }
663
664 fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
666 let mut cve_ids = Vec::new();
667
668 if advisory.id.starts_with("CVE-") {
669 cve_ids.push(advisory.id.clone());
670 }
671
672 if let Some(aliases) = &advisory.aliases {
673 for alias in aliases {
674 if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
675 cve_ids.push(alias.clone());
676 }
677 }
678 }
679
680 cve_ids
681 }
682
683 fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
685 if options.kev_only {
687 let is_kev = advisory
688 .enrichment
689 .as_ref()
690 .map(|e| e.is_kev)
691 .unwrap_or(false);
692 if !is_kev {
693 return false;
694 }
695 }
696
697 if let Some(min_cvss) = options.min_cvss {
699 let cvss = advisory
700 .enrichment
701 .as_ref()
702 .and_then(|e| e.cvss_v3_score)
703 .unwrap_or(0.0);
704 if cvss < min_cvss {
705 return false;
706 }
707 }
708
709 if let Some(min_epss) = options.min_epss {
711 let epss = advisory
712 .enrichment
713 .as_ref()
714 .and_then(|e| e.epss_score)
715 .unwrap_or(0.0);
716 if epss < min_epss {
717 return false;
718 }
719 }
720
721 if let Some(min_severity) = &options.min_severity {
723 let severity = advisory
724 .enrichment
725 .as_ref()
726 .and_then(|e| e.cvss_v3_severity)
727 .unwrap_or(Severity::None);
728 if severity < *min_severity {
729 return false;
730 }
731 }
732
733 true
734 }
735
736 pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
738 let scores = self.epss_source.fetch_scores(cve_ids).await?;
739 Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
740 }
741
742 pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
744 if let Some(data) = self.store.get_enrichment(cve_id).await? {
746 return Ok(data.is_kev);
747 }
748
749 let entry = self.kev_source.is_kev(cve_id).await?;
751 Ok(entry.is_some())
752 }
753
754 pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
792 let source = self.ossindex_source.as_ref().ok_or_else(|| {
793 AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
794 })?;
795
796 let mut cached_advisories = Vec::new();
798 let mut cache_misses = Vec::new();
799
800 for purl in purls {
801 let cache_key = Purl::cache_key_from_str(purl);
802 match self.store.get_ossindex_cache(&cache_key).await {
803 Ok(Some(cache)) if !cache.is_expired() => {
804 debug!("OSS Index cache hit for {}", purl);
805 cached_advisories.extend(cache.advisories);
806 }
807 _ => {
808 debug!("OSS Index cache miss for {}", purl);
809 cache_misses.push(purl.clone());
810 }
811 }
812 }
813
814 if !cache_misses.is_empty() {
816 debug!("Querying OSS Index for {} cache misses", cache_misses.len());
817 let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
818 AdvisoryError::SourceFetch {
819 source_name: "ossindex".to_string(),
820 message: e.to_string(),
821 }
822 })?;
823
824 let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
826
827 for (purl, advisories) in &advisory_map {
829 let cache_key = Purl::cache_key_from_str(purl);
830 let cache = OssIndexCache::new(advisories.clone());
831 if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
832 debug!("Failed to cache OSS Index result for {}: {}", purl, e);
833 }
834 }
835
836 for advisories in advisory_map.into_values() {
838 cached_advisories.extend(advisories);
839 }
840 }
841
842 Ok(cached_advisories)
843 }
844
845 pub async fn query_batch_with_ossindex(
858 &self,
859 packages: &[PackageKey],
860 ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
861 let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
862
863 let (with_version, without_version): (Vec<_>, Vec<_>) =
865 packages.iter().partition(|p| p.version.is_some());
866
867 if !with_version.is_empty() && self.ossindex_source.is_some() {
869 let purls: Vec<String> = with_version
870 .iter()
871 .map(|p| {
872 Purl::new(&p.ecosystem, &p.name)
873 .with_version(p.version.as_ref().unwrap())
874 .to_string()
875 })
876 .collect();
877
878 match self.query_ossindex(&purls).await {
879 Ok(advisories) => {
880 for pkg in &with_version {
882 let pkg_advisories: Vec<_> = advisories
883 .iter()
884 .filter(|a| {
885 a.affected.iter().any(|aff| {
886 aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
887 && aff.package.name == pkg.name
888 })
889 })
890 .cloned()
891 .collect();
892 results.insert((*pkg).clone(), pkg_advisories);
893 }
894 }
895 Err(e) => {
896 warn!("OSS Index query failed, falling back to local store: {}", e);
897 for pkg in &with_version {
899 let advisories = if let Some(version) = &pkg.version {
900 self.matches(&pkg.ecosystem, &pkg.name, version).await?
901 } else {
902 self.query(&pkg.ecosystem, &pkg.name).await?
903 };
904 results.insert((*pkg).clone(), advisories);
905 }
906 }
907 }
908 }
909
910 for pkg in &without_version {
912 let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
913 results.insert((*pkg).clone(), advisories);
914 }
915
916 Ok(results)
917 }
918
919 pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
923 for purl in purls {
924 let cache_key = Purl::cache_key_from_str(purl);
925 self.store.invalidate_ossindex_cache(&cache_key).await?;
926 }
927 Ok(())
928 }
929
930 pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
932 self.store.invalidate_all_ossindex_cache().await?;
933 Ok(())
934 }
935
936 fn group_advisories_by_purl(
938 purls: &[String],
939 advisories: &[Advisory],
940 ) -> HashMap<String, Vec<Advisory>> {
941 let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
942
943 for purl in purls {
945 map.insert(purl.clone(), Vec::new());
946 }
947
948 for advisory in advisories {
950 for affected in &advisory.affected {
951 for purl in purls {
953 if let Ok(parsed) = Purl::parse(purl) {
954 if parsed.name == affected.package.name {
955 map.entry(purl.clone()).or_default().push(advisory.clone());
956 break;
957 }
958 }
959 }
960 }
961 }
962
963 map
964 }
965}