vulnera_advisor/
manager.rs

1//! Vulnerability manager for orchestrating syncs and queries.
2//!
3//! The [`VulnerabilityManager`] is the main entry point for using this crate.
4//! Use [`VulnerabilityManagerBuilder`] for flexible configuration.
5
6use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tracing::{debug, error, info, warn};
19
20/// Options for filtering vulnerability matches.
21#[derive(Debug, Clone, Default)]
22pub struct MatchOptions {
23    /// Minimum CVSS v3 score (0.0 - 10.0).
24    pub min_cvss: Option<f64>,
25    /// Minimum EPSS score (0.0 - 1.0).
26    pub min_epss: Option<f64>,
27    /// Only return KEV (actively exploited) vulnerabilities.
28    pub kev_only: bool,
29    /// Minimum severity level.
30    pub min_severity: Option<Severity>,
31    /// Include enrichment data (EPSS, KEV) in results.
32    pub include_enrichment: bool,
33}
34
35/// Statistics for a sync operation.
36#[derive(Debug, Clone, Default)]
37pub struct SyncStats {
38    /// Total number of sources attempted.
39    pub total_sources: usize,
40    /// Number of sources that synced successfully.
41    pub successful_sources: usize,
42    /// Number of sources that failed.
43    pub failed_sources: usize,
44    /// Total advisories synced across all sources.
45    pub total_advisories_synced: usize,
46    /// Map of source name to error message for failed sources.
47    pub errors: HashMap<String, String>,
48}
49
50/// Observer for monitoring sync progress and events.
51pub trait SyncObserver: Send + Sync {
52    /// Called when the sync operation starts.
53    fn on_sync_start(&self);
54
55    /// Called when a specific source starts syncing.
56    fn on_source_start(&self, source_name: &str);
57
58    /// Called when a source successfully syncs.
59    fn on_source_success(&self, source_name: &str, count: usize);
60
61    /// Called when a source fails to sync.
62    fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError);
63
64    /// Called when the sync operation completes.
65    fn on_sync_complete(&self, stats: &SyncStats);
66}
67
68/// Default observer that logs events using the `tracing` crate.
69pub struct TracingSyncObserver;
70
71impl SyncObserver for TracingSyncObserver {
72    fn on_sync_start(&self) {
73        info!("Starting full vulnerability sync...");
74    }
75
76    fn on_source_start(&self, source_name: &str) {
77        debug!("Syncing {}...", source_name);
78    }
79
80    fn on_source_success(&self, source_name: &str, count: usize) {
81        if count > 0 {
82            info!(
83                "Successfully synced {} advisories from {}",
84                count, source_name
85            );
86        } else {
87            debug!(
88                "Successfully synced {} advisories from {}",
89                count, source_name
90            );
91        }
92    }
93
94    fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError) {
95        error!("Failed to sync {}: {}", source_name, error);
96    }
97
98    fn on_sync_complete(&self, _stats: &SyncStats) {
99        info!("Sync completed.");
100    }
101}
102
103impl MatchOptions {
104    /// Create options that include all vulnerabilities with enrichment.
105    pub fn with_enrichment() -> Self {
106        Self {
107            include_enrichment: true,
108            ..Default::default()
109        }
110    }
111
112    /// Create options for high-severity vulnerabilities only.
113    pub fn high_severity() -> Self {
114        Self {
115            min_severity: Some(Severity::High),
116            include_enrichment: true,
117            ..Default::default()
118        }
119    }
120
121    /// Create options for actively exploited vulnerabilities only.
122    pub fn exploited_only() -> Self {
123        Self {
124            kev_only: true,
125            include_enrichment: true,
126            ..Default::default()
127        }
128    }
129}
130
131/// A key identifying a package for batch queries.
132#[derive(Debug, Clone, Hash, PartialEq, Eq)]
133pub struct PackageKey {
134    /// Package ecosystem (e.g., "npm", "PyPI").
135    pub ecosystem: String,
136    /// Package name.
137    pub name: String,
138    /// Optional version for matching.
139    pub version: Option<String>,
140}
141
142impl PackageKey {
143    /// Create a new package key.
144    pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
145        Self {
146            ecosystem: ecosystem.into(),
147            name: name.into(),
148            version: None,
149        }
150    }
151
152    /// Create a package key with a version.
153    pub fn with_version(
154        ecosystem: impl Into<String>,
155        name: impl Into<String>,
156        version: impl Into<String>,
157    ) -> Self {
158        Self {
159            ecosystem: ecosystem.into(),
160            name: name.into(),
161            version: Some(version.into()),
162        }
163    }
164}
165
166/// Builder for configuring VulnerabilityManager.
167pub struct VulnerabilityManagerBuilder {
168    redis_url: Option<String>,
169    store_config: StoreConfig,
170    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
171    custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
172    ossindex_source: Option<OssIndexSource>,
173    observer: Option<Arc<dyn SyncObserver>>,
174}
175
176impl Default for VulnerabilityManagerBuilder {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182impl VulnerabilityManagerBuilder {
183    /// Create a new builder with default settings.
184    pub fn new() -> Self {
185        Self {
186            redis_url: None,
187            store_config: StoreConfig::default(),
188            sources: Vec::new(),
189            custom_store: None,
190            ossindex_source: None,
191            observer: None,
192        }
193    }
194
195    /// Set the Redis connection URL.
196    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
197        self.redis_url = Some(url.into());
198        self
199    }
200
201    /// Set the store configuration.
202    pub fn store_config(mut self, config: StoreConfig) -> Self {
203        self.store_config = config;
204        self
205    }
206
207    /// Use a custom store implementation.
208    pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
209        self.custom_store = Some(store);
210        self
211    }
212
213    /// Add a vulnerability source.
214    pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
215        self.sources.push(source);
216        self
217    }
218
219    /// Add the GHSA source with the given token.
220    pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
221        self.sources.push(Arc::new(GHSASource::new(token.into())));
222        self
223    }
224
225    /// Add the NVD source with optional API key.
226    pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
227        self.sources.push(Arc::new(NVDSource::new(api_key)));
228        self
229    }
230
231    /// Add the OSV source for specified ecosystems.
232    pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
233        self.sources.push(Arc::new(OSVSource::new(ecosystems)));
234        self
235    }
236
237    /// Add default OSV ecosystems.
238    pub fn with_osv_defaults(self) -> Self {
239        self.with_osv(vec![
240            "npm".to_string(),
241            "PyPI".to_string(),
242            "Maven".to_string(),
243            "crates.io".to_string(),
244            "Go".to_string(),
245            "Packagist".to_string(),
246            "RubyGems".to_string(),
247            "NuGet".to_string(),
248        ])
249    }
250
251    /// Add the OSS Index source with optional configuration.
252    ///
253    /// OSS Index provides on-demand vulnerability queries by PURL.
254    /// If no config is provided, credentials are loaded from environment variables.
255    pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
256        match OssIndexSource::new(config) {
257            Ok(source) => {
258                self.ossindex_source = Some(source);
259            }
260            Err(e) => {
261                warn!("Failed to configure OSS Index source: {}", e);
262            }
263        }
264        self
265    }
266
267    /// Set a custom sync observer.
268    pub fn with_observer(mut self, observer: Arc<dyn SyncObserver>) -> Self {
269        self.observer = Some(observer);
270        self
271    }
272
273    /// Build the VulnerabilityManager.
274    pub fn build(self) -> Result<VulnerabilityManager> {
275        let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
276            Some(s) => s,
277            None => {
278                let url = self.redis_url.ok_or_else(|| {
279                    AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
280                })?;
281                Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
282            }
283        };
284
285        if self.sources.is_empty() {
286            warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
287        }
288
289        Ok(VulnerabilityManager {
290            store,
291            sources: self.sources,
292            kev_source: KevSource::new(),
293            epss_source: EpssSource::new(),
294            ossindex_source: self.ossindex_source,
295            observer: self
296                .observer
297                .unwrap_or_else(|| Arc::new(TracingSyncObserver)),
298        })
299    }
300}
301
302/// Main vulnerability manager for syncing and querying advisories.
303pub struct VulnerabilityManager {
304    store: Arc<dyn AdvisoryStore + Send + Sync>,
305    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
306    kev_source: KevSource,
307    epss_source: EpssSource,
308    ossindex_source: Option<OssIndexSource>,
309    observer: Arc<dyn SyncObserver>,
310}
311
312impl VulnerabilityManager {
313    /// Create a new manager from a Config.
314    ///
315    /// This is a convenience method. For more control, use [`VulnerabilityManagerBuilder`].
316    pub async fn new(config: Config) -> Result<Self> {
317        let mut builder = VulnerabilityManagerBuilder::new()
318            .redis_url(&config.redis_url)
319            .store_config(config.store.clone());
320
321        // Add OSV source
322        builder = builder.with_osv_defaults();
323
324        // Add NVD source
325        builder = builder.with_nvd(config.nvd_api_key.clone());
326
327        // Add GHSA source if token is provided
328        if let Some(token) = &config.ghsa_token {
329            builder = builder.with_ghsa(token.clone());
330        }
331
332        // Add OSS Index source if configured
333        if config.ossindex.is_some() {
334            builder = builder.with_ossindex(config.ossindex.clone());
335        }
336
337        builder.build()
338    }
339
340    /// Create a builder for custom configuration.
341    pub fn builder() -> VulnerabilityManagerBuilder {
342        VulnerabilityManagerBuilder::new()
343    }
344
345    /// Get a reference to the underlying store.
346    pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
347        &self.store
348    }
349
350    /// Check the health of the store connection.
351    pub async fn health_check(&self) -> Result<HealthStatus> {
352        self.store.health_check().await
353    }
354
355    /// Sync advisories from all configured sources.
356    pub async fn sync_all(&self) -> Result<SyncStats> {
357        self.observer.on_sync_start();
358
359        let mut handles = Vec::new();
360        let mut stats = SyncStats {
361            total_sources: self.sources.len(),
362            ..Default::default()
363        };
364
365        for source in &self.sources {
366            let source = source.clone();
367            let store = self.store.clone();
368            let observer = self.observer.clone();
369
370            let handle = tokio::spawn(async move {
371                observer.on_source_start(source.name());
372
373                let last_sync = match store.last_sync(source.name()).await {
374                    Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
375                        Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
376                        Err(_) => None,
377                    },
378                    _ => None,
379                };
380
381                match source.fetch(last_sync).await {
382                    Ok(advisories) => {
383                        if !advisories.is_empty() {
384                            match store.upsert_batch(&advisories, source.name()).await {
385                                Ok(_) => {
386                                    observer.on_source_success(source.name(), advisories.len());
387                                    // Update timestamp only after successful storage
388                                    if let Err(e) = store.update_sync_timestamp(source.name()).await
389                                    {
390                                        let err = AdvisoryError::source_fetch(
391                                            source.name(),
392                                            format!("Failed to update timestamp: {}", e),
393                                        );
394                                        observer.on_source_error(source.name(), &err);
395                                        // Non-critical error, count as success but maybe log warn?
396                                        // Observer handles logging.
397                                    }
398                                    Ok((source.name().to_string(), advisories.len()))
399                                }
400                                Err(e) => {
401                                    // Store error is critical for this source
402                                    observer.on_source_error(source.name(), &e);
403                                    Err((source.name().to_string(), e.to_string()))
404                                }
405                            }
406                        } else {
407                            observer.on_source_success(source.name(), 0);
408                            // Update sync timestamp even if no new advisories
409                            if let Err(e) = store.update_sync_timestamp(source.name()).await {
410                                let err = AdvisoryError::source_fetch(
411                                    source.name(),
412                                    format!("Failed to update timestamp: {}", e),
413                                );
414                                observer.on_source_error(source.name(), &err);
415                            }
416                            Ok((source.name().to_string(), 0))
417                        }
418                    }
419                    Err(e) => {
420                        observer.on_source_error(source.name(), &e);
421                        Err((source.name().to_string(), e.to_string()))
422                    }
423                }
424            });
425            handles.push(handle);
426        }
427
428        // Wait for all tasks to complete
429        for handle in handles {
430            match handle.await {
431                Ok(result) => match result {
432                    Ok((_, count)) => {
433                        stats.successful_sources += 1;
434                        stats.total_advisories_synced += count;
435                    }
436                    Err((name, error)) => {
437                        stats.failed_sources += 1;
438                        stats.errors.insert(name, error);
439                    }
440                },
441                Err(e) => {
442                    // Task panic or join error
443                    error!("Task join error: {}", e);
444                    stats.failed_sources += 1;
445                    stats
446                        .errors
447                        .insert("unknown".to_string(), format!("Task join error: {}", e));
448                }
449            }
450        }
451
452        self.observer.on_sync_complete(&stats);
453        Ok(stats)
454    }
455
456    /// Reset the sync timestamp for a specific source.
457    ///
458    /// This forces a full re-sync on the next `sync_all()` call.
459    pub async fn reset_sync(&self, source: &str) -> Result<()> {
460        self.store.reset_sync_timestamp(source).await
461    }
462
463    /// Reset all sync timestamps, forcing a full re-sync of all sources.
464    pub async fn reset_all_syncs(&self) -> Result<()> {
465        for source in &self.sources {
466            self.store.reset_sync_timestamp(source.name()).await?;
467        }
468        Ok(())
469    }
470
471    /// Sync enrichment data (KEV and EPSS).
472    pub async fn sync_enrichment(&self) -> Result<()> {
473        self.sync_enrichment_with_cves(&[]).await
474    }
475
476    /// Sync enrichment data with optional extra CVE IDs to broaden EPSS coverage.
477    pub async fn sync_enrichment_with_cves(&self, extra_cves: &[String]) -> Result<()> {
478        debug!("Syncing enrichment data (KEV, EPSS)...");
479
480        let mut enrichment: HashMap<String, EnrichmentData> = HashMap::new();
481
482        // Sync KEV data
483        match self.kev_source.fetch_catalog().await {
484            Ok(kev_entries) => {
485                debug!("Processing {} KEV entries", kev_entries.len());
486                for (cve_id, entry) in kev_entries {
487                    let data = enrichment
488                        .entry(cve_id.clone())
489                        .or_insert_with(|| EnrichmentData {
490                            epss_score: None,
491                            epss_percentile: None,
492                            is_kev: false,
493                            kev_due_date: None,
494                            kev_date_added: None,
495                            kev_ransomware: None,
496                            updated_at: String::new(),
497                        });
498
499                    data.is_kev = true;
500                    data.kev_due_date = entry.due_date_utc().map(|d| d.to_rfc3339());
501                    data.kev_date_added = entry.date_added_utc().map(|d| d.to_rfc3339());
502                    data.kev_ransomware = Some(entry.is_ransomware_related());
503                }
504            }
505            Err(e) => {
506                error!("Failed to fetch KEV catalog: {}", e);
507            }
508        }
509
510        // Sync EPSS for known CVEs plus any extra provided by caller
511        let epss_targets = Self::collect_enrichment_targets(&enrichment, extra_cves);
512        if !epss_targets.is_empty() {
513            match self
514                .epss_source
515                .fetch_scores_batch(&epss_targets, 200)
516                .await
517            {
518                Ok(scores) => {
519                    Self::merge_epss_scores(&mut enrichment, scores);
520                }
521                Err(e) => {
522                    warn!("Failed to fetch EPSS scores: {}", e);
523                }
524            }
525        }
526
527        // Persist merged enrichment data
528        if !enrichment.is_empty() {
529            let now = chrono::Utc::now().to_rfc3339();
530            for (cve_id, mut data) in enrichment {
531                if data.updated_at.is_empty() {
532                    data.updated_at = now.clone();
533                }
534                if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
535                    debug!("Failed to store enrichment for {}: {}", cve_id, e);
536                }
537            }
538        }
539
540        Ok(())
541    }
542
543    /// Build the list of CVE IDs to request EPSS for.
544    fn collect_enrichment_targets(
545        current: &HashMap<String, EnrichmentData>,
546        extra: &[String],
547    ) -> Vec<String> {
548        let mut set: std::collections::HashSet<String> = current.keys().cloned().collect();
549        for c in extra {
550            set.insert(c.clone());
551        }
552        set.into_iter().collect()
553    }
554
555    /// Merge EPSS scores into enrichment map.
556    fn merge_epss_scores(
557        enrichment: &mut HashMap<String, EnrichmentData>,
558        scores: HashMap<String, crate::sources::epss::EpssScore>,
559    ) {
560        for (cve_id, score) in scores {
561            let data = enrichment
562                .entry(cve_id.clone())
563                .or_insert_with(|| EnrichmentData {
564                    epss_score: None,
565                    epss_percentile: None,
566                    is_kev: false,
567                    kev_due_date: None,
568                    kev_date_added: None,
569                    kev_ransomware: None,
570                    updated_at: String::new(),
571                });
572
573            data.epss_score = Some(score.epss);
574            data.epss_percentile = Some(score.percentile);
575            if let Some(date) = score.date_utc() {
576                data.updated_at = date.to_rfc3339();
577            }
578        }
579    }
580
581    /// Query advisories for a specific package.
582    pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
583        let advisories = self.store.get_by_package(ecosystem, package).await?;
584        Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
585    }
586
587    /// Query advisories with enrichment data.
588    pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
589        let mut advisories = self.query(ecosystem, package).await?;
590        self.enrich_advisories(&mut advisories).await?;
591        Ok(advisories)
592    }
593
594    /// Query multiple packages in a batch (concurrent).
595    ///
596    /// All queries run in parallel for maximum throughput.
597    pub async fn query_batch(
598        &self,
599        packages: &[PackageKey],
600    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
601        use futures_util::future::join_all;
602
603        let tasks: Vec<_> = packages
604            .iter()
605            .map(|pkg| {
606                let pkg = pkg.clone();
607                let ecosystem = pkg.ecosystem.clone();
608                let name = pkg.name.clone();
609                let version = pkg.version.clone();
610                let store = self.store.clone();
611
612                async move {
613                    let advisories = if let Some(ver) = &version {
614                        // For version matching, we need the full logic
615                        let all = store.get_by_package(&ecosystem, &name).await?;
616                        let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
617                        Self::filter_by_version(aggregated, &ecosystem, &name, ver)
618                    } else {
619                        let all = store.get_by_package(&ecosystem, &name).await?;
620                        crate::aggregator::ReportAggregator::aggregate(all)
621                    };
622                    Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
623                }
624            })
625            .collect();
626
627        let results: Vec<_> = join_all(tasks).await;
628
629        let mut map = HashMap::new();
630        for result in results {
631            match result {
632                Ok((pkg, advisories)) => {
633                    map.insert(pkg, advisories);
634                }
635                Err(e) => {
636                    warn!("Batch query error: {}", e);
637                }
638            }
639        }
640
641        Ok(map)
642    }
643
644    /// Filter advisories by version (static helper for concurrent batch queries)
645    fn filter_by_version(
646        advisories: Vec<Advisory>,
647        ecosystem: &str,
648        package: &str,
649        version: &str,
650    ) -> Vec<Advisory> {
651        advisories
652            .into_iter()
653            .filter(|advisory| {
654                for affected in &advisory.affected {
655                    if affected.package.name != package || affected.package.ecosystem != ecosystem {
656                        continue;
657                    }
658
659                    // Check explicit versions
660                    if affected.versions.contains(&version.to_string()) {
661                        return true;
662                    }
663
664                    // Check ranges
665                    for range in &affected.ranges {
666                        match range.range_type {
667                            RangeType::Semver => {
668                                if Self::matches_semver_range(version, &range.events) {
669                                    return true;
670                                }
671                            }
672                            RangeType::Ecosystem => {
673                                if Self::matches_ecosystem_range(version, &range.events) {
674                                    return true;
675                                }
676                            }
677                            RangeType::Git => {}
678                        }
679                    }
680                }
681                false
682            })
683            .collect()
684    }
685
686    /// Check if a specific package version is affected by any vulnerabilities.
687    pub async fn matches(
688        &self,
689        ecosystem: &str,
690        package: &str,
691        version: &str,
692    ) -> Result<Vec<Advisory>> {
693        self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
694            .await
695    }
696
697    /// Check if a package version is affected, with filtering options.
698    pub async fn matches_with_options(
699        &self,
700        ecosystem: &str,
701        package: &str,
702        version: &str,
703        options: &MatchOptions,
704    ) -> Result<Vec<Advisory>> {
705        let advisories = self.query(ecosystem, package).await?;
706        let mut matched = Vec::new();
707
708        for mut advisory in advisories {
709            let mut is_vulnerable = false;
710            for affected in &advisory.affected {
711                if affected.package.name != package || affected.package.ecosystem != ecosystem {
712                    continue;
713                }
714
715                // Check explicit versions
716                if affected.versions.contains(&version.to_string()) {
717                    is_vulnerable = true;
718                    break;
719                }
720
721                // Check ranges
722                for range in &affected.ranges {
723                    match range.range_type {
724                        RangeType::Semver => {
725                            if Self::matches_semver_range(version, &range.events) {
726                                is_vulnerable = true;
727                                break;
728                            }
729                        }
730                        RangeType::Ecosystem => {
731                            if Self::matches_ecosystem_range(version, &range.events) {
732                                is_vulnerable = true;
733                                break;
734                            }
735                        }
736                        RangeType::Git => {
737                            // Git ranges require commit hash comparison, skip for now
738                        }
739                    }
740                }
741                if is_vulnerable {
742                    break;
743                }
744            }
745
746            if is_vulnerable {
747                // Apply enrichment if requested
748                if options.include_enrichment {
749                    self.enrich_advisory(&mut advisory).await?;
750                }
751
752                // Apply filters
753                if self.advisory_passes_filters(&advisory, options) {
754                    matched.push(advisory);
755                }
756            }
757        }
758
759        Ok(matched)
760    }
761
762    /// Check if a version matches any semver interval described by OSV events.
763    ///
764    /// OSV allows multiple introduced/fixed pairs; we evaluate each interval in order.
765    fn matches_semver_range(version: &str, events: &[Event]) -> bool {
766        let Ok(v) = semver::Version::parse(version) else {
767            return false;
768        };
769
770        #[derive(Default)]
771        struct Interval {
772            start: Option<semver::Version>,
773            end: Option<semver::Version>,
774            end_inclusive: bool,
775        }
776
777        let mut intervals: Vec<Interval> = Vec::new();
778        let mut current_start: Option<semver::Version> = None;
779
780        for event in events {
781            match event {
782                Event::Introduced(ver) => {
783                    if let Ok(parsed) = semver::Version::parse(ver) {
784                        current_start = Some(parsed);
785                    } else if ver == "0" {
786                        current_start = Some(semver::Version::new(0, 0, 0));
787                    }
788                }
789                Event::Fixed(ver) => {
790                    let end = semver::Version::parse(ver).ok();
791                    intervals.push(Interval {
792                        start: current_start.clone(),
793                        end,
794                        end_inclusive: false,
795                    });
796                    current_start = None;
797                }
798                Event::LastAffected(ver) => {
799                    let end = semver::Version::parse(ver).ok();
800                    intervals.push(Interval {
801                        start: current_start.clone(),
802                        end,
803                        end_inclusive: true,
804                    });
805                    current_start = None;
806                }
807                Event::Limit(ver) => {
808                    // Treat limit as an exclusive upper bound for any open interval.
809                    let end = semver::Version::parse(ver).ok();
810                    intervals.push(Interval {
811                        start: current_start.clone(),
812                        end,
813                        end_inclusive: false,
814                    });
815                    current_start = None;
816                }
817            }
818        }
819
820        // Open-ended interval from the last introduction.
821        if current_start.is_some() {
822            intervals.push(Interval {
823                start: current_start,
824                end: None,
825                end_inclusive: false,
826            });
827        }
828
829        intervals.into_iter().any(|interval| {
830            if let Some(start) = &interval.start {
831                if v < *start {
832                    return false;
833                }
834            }
835
836            match (&interval.end, interval.end_inclusive) {
837                (Some(end), true) => v <= *end,
838                (Some(end), false) => v < *end,
839                (None, _) => true,
840            }
841        })
842    }
843
844    /// Check if a version matches an ecosystem range. Falls back to semver if both parse as semver,
845    /// otherwise uses dotted numeric comparison (e.g., "1.10" > "1.2").
846    fn matches_ecosystem_range(version: &str, events: &[Event]) -> bool {
847        // Try semver first; if any boundary fails semver parsing, fall back to dotted.
848        if events.iter().all(|e| match e {
849            Event::Introduced(v) | Event::Fixed(v) | Event::LastAffected(v) | Event::Limit(v) => {
850                semver::Version::parse(v).is_ok() || v == "0"
851            }
852        }) {
853            return Self::matches_semver_range(version, events);
854        }
855
856        let version_parts = match Self::parse_dotted(version) {
857            Some(p) => p,
858            None => return false,
859        };
860
861        #[derive(Default)]
862        struct Interval {
863            start: Option<Vec<u64>>,
864            end: Option<Vec<u64>>,
865            end_inclusive: bool,
866        }
867
868        let mut intervals: Vec<Interval> = Vec::new();
869        let mut current_start: Option<Vec<u64>> = None;
870
871        for event in events {
872            match event {
873                Event::Introduced(ver) => {
874                    current_start = Self::parse_dotted(ver);
875                }
876                Event::Fixed(ver) => {
877                    intervals.push(Interval {
878                        start: current_start.clone(),
879                        end: Self::parse_dotted(ver),
880                        end_inclusive: false,
881                    });
882                    current_start = None;
883                }
884                Event::LastAffected(ver) => {
885                    intervals.push(Interval {
886                        start: current_start.clone(),
887                        end: Self::parse_dotted(ver),
888                        end_inclusive: true,
889                    });
890                    current_start = None;
891                }
892                Event::Limit(ver) => {
893                    intervals.push(Interval {
894                        start: current_start.clone(),
895                        end: Self::parse_dotted(ver),
896                        end_inclusive: false,
897                    });
898                    current_start = None;
899                }
900            }
901        }
902
903        if current_start.is_some() {
904            intervals.push(Interval {
905                start: current_start,
906                end: None,
907                end_inclusive: false,
908            });
909        }
910
911        intervals.into_iter().any(|interval| {
912            if let Some(start) = &interval.start {
913                if Self::cmp_dotted(&version_parts, start) == Ordering::Less {
914                    return false;
915                }
916            }
917
918            match (&interval.end, interval.end_inclusive) {
919                (Some(end), true) => Self::cmp_dotted(&version_parts, end) != Ordering::Greater,
920                (Some(end), false) => Self::cmp_dotted(&version_parts, end) == Ordering::Less,
921                (None, _) => true,
922            }
923        })
924    }
925
926    /// Parse dotted numeric versions (e.g., "1.2.10"). Non-numeric segments cause failure.
927    fn parse_dotted(v: &str) -> Option<Vec<u64>> {
928        let mut parts = Vec::new();
929        for chunk in v.split(|c: char| !c.is_ascii_digit()) {
930            if chunk.is_empty() {
931                continue;
932            }
933            let Ok(num) = chunk.parse::<u64>() else {
934                return None;
935            };
936            parts.push(num);
937        }
938        if parts.is_empty() { None } else { Some(parts) }
939    }
940
941    /// Compare dotted numeric versions.
942    fn cmp_dotted(a: &[u64], b: &[u64]) -> Ordering {
943        let max_len = a.len().max(b.len());
944        for i in 0..max_len {
945            let ai = *a.get(i).unwrap_or(&0);
946            let bi = *b.get(i).unwrap_or(&0);
947            match ai.cmp(&bi) {
948                Ordering::Equal => continue,
949                ord => return ord,
950            }
951        }
952        Ordering::Equal
953    }
954
955    /// Enrich a single advisory with EPSS/KEV data.
956    async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
957        // Find CVE aliases
958        let cve_ids = Self::extract_cve_ids(advisory);
959
960        if cve_ids.is_empty() {
961            return Ok(());
962        }
963
964        // Look up enrichment data
965        for cve_id in &cve_ids {
966            if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
967                let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
968                enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
969                enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
970                enrichment.is_kev = enrichment.is_kev || data.is_kev;
971                if data.kev_due_date.is_some() {
972                    enrichment.kev_due_date = data
973                        .kev_due_date
974                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
975                        .map(|d| d.with_timezone(&chrono::Utc));
976                }
977                if data.kev_ransomware.is_some() {
978                    enrichment.kev_ransomware = data.kev_ransomware;
979                }
980            }
981        }
982
983        Ok(())
984    }
985
986    /// Enrich multiple advisories with EPSS/KEV data.
987    async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
988        for advisory in advisories.iter_mut() {
989            self.enrich_advisory(advisory).await?;
990        }
991        Ok(())
992    }
993
994    /// Extract CVE IDs from an advisory (from ID or aliases).
995    fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
996        let mut cve_ids = Vec::new();
997
998        if advisory.id.starts_with("CVE-") {
999            cve_ids.push(advisory.id.clone());
1000        }
1001
1002        if let Some(aliases) = &advisory.aliases {
1003            for alias in aliases {
1004                if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
1005                    cve_ids.push(alias.clone());
1006                }
1007            }
1008        }
1009
1010        cve_ids
1011    }
1012
1013    /// Check if an advisory passes the filter options.
1014    fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
1015        // Check KEV filter
1016        if options.kev_only {
1017            let is_kev = advisory
1018                .enrichment
1019                .as_ref()
1020                .map(|e| e.is_kev)
1021                .unwrap_or(false);
1022            if !is_kev {
1023                return false;
1024            }
1025        }
1026
1027        // Check CVSS filter
1028        if let Some(min_cvss) = options.min_cvss {
1029            let cvss = advisory
1030                .enrichment
1031                .as_ref()
1032                .and_then(|e| e.cvss_v3_score)
1033                .unwrap_or(0.0);
1034            if cvss < min_cvss {
1035                return false;
1036            }
1037        }
1038
1039        // Check EPSS filter
1040        if let Some(min_epss) = options.min_epss {
1041            let epss = advisory
1042                .enrichment
1043                .as_ref()
1044                .and_then(|e| e.epss_score)
1045                .unwrap_or(0.0);
1046            if epss < min_epss {
1047                return false;
1048            }
1049        }
1050
1051        // Check severity filter
1052        if let Some(min_severity) = &options.min_severity {
1053            let severity = advisory
1054                .enrichment
1055                .as_ref()
1056                .and_then(|e| e.cvss_v3_severity)
1057                .unwrap_or(Severity::None);
1058            if severity < *min_severity {
1059                return false;
1060            }
1061        }
1062
1063        true
1064    }
1065
1066    /// Fetch live EPSS scores for CVEs (not from cache).
1067    pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
1068        let scores = self.epss_source.fetch_scores(cve_ids).await?;
1069        Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
1070    }
1071
1072    /// Check if a CVE is in the CISA KEV catalog.
1073    pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
1074        // Check cache first
1075        if let Some(data) = self.store.get_enrichment(cve_id).await? {
1076            return Ok(data.is_kev);
1077        }
1078
1079        // Fetch from source
1080        let entry = self.kev_source.is_kev(cve_id).await?;
1081        Ok(entry.is_some())
1082    }
1083
1084    // === OSS Index Methods ===
1085
1086    /// Query OSS Index for vulnerabilities affecting the given PURLs.
1087    ///
1088    /// This method provides automatic caching:
1089    /// - First checks the cache for each PURL
1090    /// - Only queries OSS Index for cache misses
1091    /// - Caches results for future queries
1092    ///
1093    /// # Arguments
1094    ///
1095    /// * `purls` - Package URLs to query (e.g., "pkg:npm/lodash@4.17.20")
1096    ///
1097    /// # Returns
1098    ///
1099    /// Vector of advisories for all vulnerabilities found.
1100    ///
1101    /// # Errors
1102    ///
1103    /// Returns an error if OSS Index is not configured or if the query fails.
1104    ///
1105    /// # Example
1106    ///
1107    /// ```rust,ignore
1108    /// use vulnera_advisors::{VulnerabilityManager, Purl};
1109    ///
1110    /// let manager = VulnerabilityManager::builder()
1111    ///     .redis_url("redis://localhost:6379")
1112    ///     .with_ossindex(None)
1113    ///     .build()?;
1114    ///
1115    /// let purls = vec![
1116    ///     Purl::new("npm", "lodash").with_version("4.17.20").to_string(),
1117    /// ];
1118    ///
1119    /// let advisories = manager.query_ossindex(&purls).await?;
1120    /// ```
1121    pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
1122        let source = self.ossindex_source.as_ref().ok_or_else(|| {
1123            AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
1124        })?;
1125
1126        // Check cache for all PURLs
1127        let mut cached_advisories = Vec::new();
1128        let mut cache_misses = Vec::new();
1129
1130        for purl in purls {
1131            let cache_key = Purl::cache_key_from_str(purl);
1132            match self.store.get_ossindex_cache(&cache_key).await {
1133                Ok(Some(cache)) if !cache.is_expired() => {
1134                    debug!("OSS Index cache hit for {}", purl);
1135                    cached_advisories.extend(cache.advisories);
1136                }
1137                _ => {
1138                    debug!("OSS Index cache miss for {}", purl);
1139                    cache_misses.push(purl.clone());
1140                }
1141            }
1142        }
1143
1144        // Query OSS Index for cache misses
1145        if !cache_misses.is_empty() {
1146            debug!("Querying OSS Index for {} cache misses", cache_misses.len());
1147            let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
1148                AdvisoryError::SourceFetch {
1149                    source_name: "ossindex".to_string(),
1150                    message: e.to_string(),
1151                }
1152            })?;
1153
1154            // Group advisories by PURL for caching
1155            let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
1156
1157            // Cache results for each PURL
1158            for (purl, advisories) in &advisory_map {
1159                let cache_key = Purl::cache_key_from_str(purl);
1160                let cache = OssIndexCache::new(advisories.clone());
1161                if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
1162                    debug!("Failed to cache OSS Index result for {}: {}", purl, e);
1163                }
1164            }
1165
1166            // Flatten and add to results
1167            for advisories in advisory_map.into_values() {
1168                cached_advisories.extend(advisories);
1169            }
1170        }
1171
1172        Ok(cached_advisories)
1173    }
1174
1175    /// Query OSS Index for vulnerabilities with fallback to stored advisories.
1176    ///
1177    /// This method first queries OSS Index, then falls back to the local store
1178    /// if the OSS Index query fails or returns no results.
1179    ///
1180    /// # Arguments
1181    ///
1182    /// * `packages` - List of packages to query (ecosystem, name, optional version)
1183    ///
1184    /// # Returns
1185    ///
1186    /// Map of package keys to their advisories.
1187    pub async fn query_batch_with_ossindex(
1188        &self,
1189        packages: &[PackageKey],
1190    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
1191        let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
1192
1193        // Build PURLs for packages that have versions
1194        let (with_version, without_version): (Vec<_>, Vec<_>) =
1195            packages.iter().partition(|p| p.version.is_some());
1196
1197        // Query OSS Index for packages with versions
1198        if !with_version.is_empty() && self.ossindex_source.is_some() {
1199            let purls: Vec<String> = with_version
1200                .iter()
1201                .map(|p| {
1202                    Purl::new(&p.ecosystem, &p.name)
1203                        .with_version(p.version.as_ref().unwrap())
1204                        .to_string()
1205                })
1206                .collect();
1207
1208            match self.query_ossindex(&purls).await {
1209                Ok(advisories) => {
1210                    // Group advisories by package key
1211                    for pkg in &with_version {
1212                        let pkg_advisories: Vec<_> = advisories
1213                            .iter()
1214                            .filter(|a| {
1215                                a.affected.iter().any(|aff| {
1216                                    aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
1217                                        && aff.package.name == pkg.name
1218                                })
1219                            })
1220                            .cloned()
1221                            .collect();
1222                        results.insert((*pkg).clone(), pkg_advisories);
1223                    }
1224                }
1225                Err(e) => {
1226                    warn!("OSS Index query failed, falling back to local store: {}", e);
1227                    // Fallback to local store
1228                    for pkg in &with_version {
1229                        let advisories = if let Some(version) = &pkg.version {
1230                            self.matches(&pkg.ecosystem, &pkg.name, version).await?
1231                        } else {
1232                            self.query(&pkg.ecosystem, &pkg.name).await?
1233                        };
1234                        results.insert((*pkg).clone(), advisories);
1235                    }
1236                }
1237            }
1238        }
1239
1240        // Query local store for packages without versions
1241        for pkg in &without_version {
1242            let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
1243            results.insert((*pkg).clone(), advisories);
1244        }
1245
1246        Ok(results)
1247    }
1248
1249    /// Invalidate cached OSS Index results for specific PURLs.
1250    ///
1251    /// Use this to force a fresh query on the next call.
1252    pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
1253        for purl in purls {
1254            let cache_key = Purl::cache_key_from_str(purl);
1255            self.store.invalidate_ossindex_cache(&cache_key).await?;
1256        }
1257        Ok(())
1258    }
1259
1260    /// Invalidate all cached OSS Index results.
1261    pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
1262        self.store.invalidate_all_ossindex_cache().await?;
1263        Ok(())
1264    }
1265
1266    // === Remediation Methods ===
1267
1268    /// Get remediation suggestions for a vulnerable package.
1269    ///
1270    /// This method checks if the specified version is vulnerable, and if so,
1271    /// suggests the nearest and latest safe versions based on fixed versions
1272    /// declared in the advisories.
1273    ///
1274    /// # Arguments
1275    ///
1276    /// * `ecosystem` - Package ecosystem (e.g., "npm", "pypi")
1277    /// * `package` - Package name
1278    /// * `current_version` - Current version to analyze
1279    ///
1280    /// # Returns
1281    ///
1282    /// A [`Remediation`] containing safe version suggestions and upgrade impact.
1283    ///
1284    /// # Example
1285    ///
1286    /// ```rust,ignore
1287    /// use vulnera_advisors::VulnerabilityManager;
1288    ///
1289    /// let remediation = manager.suggest_remediation("npm", "lodash", "4.17.20").await?;
1290    /// if let Some(nearest) = remediation.nearest_safe {
1291    ///     println!("Upgrade to {} ({:?} impact)", nearest, remediation.upgrade_impact);
1292    /// }
1293    /// ```
1294    pub async fn suggest_remediation(
1295        &self,
1296        ecosystem: &str,
1297        package: &str,
1298        current_version: &str,
1299    ) -> Result<crate::remediation::Remediation> {
1300        // Get matching advisories for this version
1301        let advisories = self.matches(ecosystem, package, current_version).await?;
1302
1303        // Build remediation using the semver matcher
1304        let remediation = crate::remediation::build_remediation(
1305            ecosystem,
1306            package,
1307            current_version,
1308            &advisories,
1309            None, // No registry versions, use only fixed versions from advisories
1310            Self::matches_semver_range,
1311        );
1312
1313        Ok(remediation)
1314    }
1315
1316    /// Get remediation suggestions with registry lookup for all available versions.
1317    ///
1318    /// This is an enhanced version of [`suggest_remediation`] that fetches
1319    /// available versions from package registries to provide more complete
1320    /// upgrade suggestions.
1321    ///
1322    /// # Arguments
1323    ///
1324    /// * `ecosystem` - Package ecosystem (e.g., "npm", "pypi")
1325    /// * `package` - Package name
1326    /// * `current_version` - Current version to analyze
1327    /// * `registry` - A version registry implementation to fetch available versions
1328    ///
1329    /// # Returns
1330    ///
1331    /// A [`crate::remediation::Remediation`] containing safe version suggestions from the full version list.
1332    ///
1333    /// # Example
1334    ///
1335    /// ```rust,ignore
1336    /// use vulnera_advisors::{VulnerabilityManager, PackageRegistry};
1337    ///
1338    /// let registry = PackageRegistry::new();
1339    /// let remediation = manager
1340    ///     .suggest_remediation_with_registry("npm", "lodash", "4.17.20", &registry)
1341    ///     .await?;
1342    /// ```
1343    pub async fn suggest_remediation_with_registry(
1344        &self,
1345        ecosystem: &str,
1346        package: &str,
1347        current_version: &str,
1348        registry: &dyn crate::version_registry::VersionRegistry,
1349    ) -> Result<crate::remediation::Remediation> {
1350        // Get matching advisories for this version
1351        let advisories = self.matches(ecosystem, package, current_version).await?;
1352
1353        // Fetch all available versions from registry
1354        let available_versions = match registry.get_versions(ecosystem, package).await {
1355            Ok(versions) => Some(versions),
1356            Err(e) => {
1357                warn!(
1358                    "Failed to fetch versions from registry, using advisory data only: {}",
1359                    e
1360                );
1361                None
1362            }
1363        };
1364
1365        // Build remediation with registry versions
1366        let remediation = crate::remediation::build_remediation(
1367            ecosystem,
1368            package,
1369            current_version,
1370            &advisories,
1371            available_versions.as_deref(),
1372            Self::matches_semver_range,
1373        );
1374
1375        Ok(remediation)
1376    }
1377
1378    /// Group advisories by their associated PURL.
1379    fn group_advisories_by_purl(
1380        purls: &[String],
1381        advisories: &[Advisory],
1382    ) -> HashMap<String, Vec<Advisory>> {
1383        let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
1384
1385        // Initialize map with empty vectors for all PURLs
1386        for purl in purls {
1387            map.insert(purl.clone(), Vec::new());
1388        }
1389
1390        for advisory in advisories {
1391            for affected in &advisory.affected {
1392                for purl in purls {
1393                    let Ok(parsed) = Purl::parse(purl) else {
1394                        continue;
1395                    };
1396
1397                    // Match on ecosystem as well as name to avoid cross-ecosystem collisions
1398                    let affected_eco = affected.package.ecosystem.to_lowercase();
1399                    let purl_eco = parsed.purl_type.to_lowercase();
1400                    let purl_eco_alt = parsed.ecosystem().to_lowercase();
1401                    if affected_eco != purl_eco && affected_eco != purl_eco_alt {
1402                        continue;
1403                    }
1404
1405                    if parsed.name != affected.package.name {
1406                        continue;
1407                    }
1408
1409                    if let Some(ver) = parsed.version.as_deref() {
1410                        // If a version is specified, ensure the advisory actually covers it.
1411                        let version_matches = affected.versions.contains(&ver.to_string())
1412                            || affected.ranges.iter().any(|r| {
1413                                matches!(r.range_type, RangeType::Semver | RangeType::Ecosystem)
1414                                    && Self::matches_semver_range(ver, &r.events)
1415                            });
1416
1417                        if !version_matches {
1418                            continue;
1419                        }
1420                    }
1421
1422                    map.entry(purl.clone()).or_default().push(advisory.clone());
1423                    break;
1424                }
1425            }
1426        }
1427
1428        map
1429    }
1430}