vulnera_advisor/
manager.rs

1//! Vulnerability manager for orchestrating syncs and queries.
2//!
3//! The [`VulnerabilityManager`] is the main entry point for using this crate.
4//! Use [`VulnerabilityManagerBuilder`] for flexible configuration.
5
6use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tracing::{debug, error, info, warn};
19
20/// Options for filtering vulnerability matches.
21#[derive(Debug, Clone, Default)]
22pub struct MatchOptions {
23    /// Minimum CVSS v3 score (0.0 - 10.0).
24    pub min_cvss: Option<f64>,
25    /// Minimum EPSS score (0.0 - 1.0).
26    pub min_epss: Option<f64>,
27    /// Only return KEV (actively exploited) vulnerabilities.
28    pub kev_only: bool,
29    /// Minimum severity level.
30    pub min_severity: Option<Severity>,
31    /// Include enrichment data (EPSS, KEV) in results.
32    pub include_enrichment: bool,
33}
34
35/// Statistics for a sync operation.
36#[derive(Debug, Clone, Default)]
37pub struct SyncStats {
38    /// Total number of sources attempted.
39    pub total_sources: usize,
40    /// Number of sources that synced successfully.
41    pub successful_sources: usize,
42    /// Number of sources that failed.
43    pub failed_sources: usize,
44    /// Total advisories synced across all sources.
45    pub total_advisories_synced: usize,
46    /// Map of source name to error message for failed sources.
47    pub errors: HashMap<String, String>,
48}
49
50/// Observer for monitoring sync progress and events.
51pub trait SyncObserver: Send + Sync {
52    /// Called when the sync operation starts.
53    fn on_sync_start(&self);
54
55    /// Called when a specific source starts syncing.
56    fn on_source_start(&self, source_name: &str);
57
58    /// Called when a source successfully syncs.
59    fn on_source_success(&self, source_name: &str, count: usize);
60
61    /// Called when a source fails to sync.
62    fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError);
63
64    /// Called when the sync operation completes.
65    fn on_sync_complete(&self, stats: &SyncStats);
66}
67
68/// Default observer that logs events using the `tracing` crate.
69pub struct TracingSyncObserver;
70
71impl SyncObserver for TracingSyncObserver {
72    fn on_sync_start(&self) {
73        info!("Starting full vulnerability sync...");
74    }
75
76    fn on_source_start(&self, source_name: &str) {
77        // We handle the "Syncing source..." log inside the manager logic usually,
78        // but let's centralize it here if possible, or support it.
79        // For now, let's just log at debug to avoid double logging if we keep old logs,
80        // but the plan is to REPLACE old logging.
81        info!("Syncing {}...", source_name);
82    }
83
84    fn on_source_success(&self, source_name: &str, count: usize) {
85        info!(
86            "Successfully synced {} advisories from {}",
87            count, source_name
88        );
89    }
90
91    fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError) {
92        error!("Failed to sync {}: {}", source_name, error);
93    }
94
95    fn on_sync_complete(&self, _stats: &SyncStats) {
96        info!("Sync completed.");
97    }
98}
99
100impl MatchOptions {
101    /// Create options that include all vulnerabilities with enrichment.
102    pub fn with_enrichment() -> Self {
103        Self {
104            include_enrichment: true,
105            ..Default::default()
106        }
107    }
108
109    /// Create options for high-severity vulnerabilities only.
110    pub fn high_severity() -> Self {
111        Self {
112            min_severity: Some(Severity::High),
113            include_enrichment: true,
114            ..Default::default()
115        }
116    }
117
118    /// Create options for actively exploited vulnerabilities only.
119    pub fn exploited_only() -> Self {
120        Self {
121            kev_only: true,
122            include_enrichment: true,
123            ..Default::default()
124        }
125    }
126}
127
128/// A key identifying a package for batch queries.
129#[derive(Debug, Clone, Hash, PartialEq, Eq)]
130pub struct PackageKey {
131    /// Package ecosystem (e.g., "npm", "PyPI").
132    pub ecosystem: String,
133    /// Package name.
134    pub name: String,
135    /// Optional version for matching.
136    pub version: Option<String>,
137}
138
139impl PackageKey {
140    /// Create a new package key.
141    pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
142        Self {
143            ecosystem: ecosystem.into(),
144            name: name.into(),
145            version: None,
146        }
147    }
148
149    /// Create a package key with a version.
150    pub fn with_version(
151        ecosystem: impl Into<String>,
152        name: impl Into<String>,
153        version: impl Into<String>,
154    ) -> Self {
155        Self {
156            ecosystem: ecosystem.into(),
157            name: name.into(),
158            version: Some(version.into()),
159        }
160    }
161}
162
163/// Builder for configuring VulnerabilityManager.
164pub struct VulnerabilityManagerBuilder {
165    redis_url: Option<String>,
166    store_config: StoreConfig,
167    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
168    custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
169    ossindex_source: Option<OssIndexSource>,
170    observer: Option<Arc<dyn SyncObserver>>,
171}
172
173impl Default for VulnerabilityManagerBuilder {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179impl VulnerabilityManagerBuilder {
180    /// Create a new builder with default settings.
181    pub fn new() -> Self {
182        Self {
183            redis_url: None,
184            store_config: StoreConfig::default(),
185            sources: Vec::new(),
186            custom_store: None,
187            ossindex_source: None,
188            observer: None,
189        }
190    }
191
192    /// Set the Redis connection URL.
193    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
194        self.redis_url = Some(url.into());
195        self
196    }
197
198    /// Set the store configuration.
199    pub fn store_config(mut self, config: StoreConfig) -> Self {
200        self.store_config = config;
201        self
202    }
203
204    /// Use a custom store implementation.
205    pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
206        self.custom_store = Some(store);
207        self
208    }
209
210    /// Add a vulnerability source.
211    pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
212        self.sources.push(source);
213        self
214    }
215
216    /// Add the GHSA source with the given token.
217    pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
218        self.sources.push(Arc::new(GHSASource::new(token.into())));
219        self
220    }
221
222    /// Add the NVD source with optional API key.
223    pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
224        self.sources.push(Arc::new(NVDSource::new(api_key)));
225        self
226    }
227
228    /// Add the OSV source for specified ecosystems.
229    pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
230        self.sources.push(Arc::new(OSVSource::new(ecosystems)));
231        self
232    }
233
234    /// Add default OSV ecosystems.
235    pub fn with_osv_defaults(self) -> Self {
236        self.with_osv(vec![
237            "npm".to_string(),
238            "PyPI".to_string(),
239            "Maven".to_string(),
240            "crates.io".to_string(),
241            "Go".to_string(),
242            "Packagist".to_string(),
243            "RubyGems".to_string(),
244            "NuGet".to_string(),
245        ])
246    }
247
248    /// Add the OSS Index source with optional configuration.
249    ///
250    /// OSS Index provides on-demand vulnerability queries by PURL.
251    /// If no config is provided, credentials are loaded from environment variables.
252    pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
253        match OssIndexSource::new(config) {
254            Ok(source) => {
255                self.ossindex_source = Some(source);
256            }
257            Err(e) => {
258                warn!("Failed to configure OSS Index source: {}", e);
259            }
260        }
261        self
262    }
263
264    /// Set a custom sync observer.
265    pub fn with_observer(mut self, observer: Arc<dyn SyncObserver>) -> Self {
266        self.observer = Some(observer);
267        self
268    }
269
270    /// Build the VulnerabilityManager.
271    pub fn build(self) -> Result<VulnerabilityManager> {
272        let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
273            Some(s) => s,
274            None => {
275                let url = self.redis_url.ok_or_else(|| {
276                    AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
277                })?;
278                Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
279            }
280        };
281
282        if self.sources.is_empty() {
283            warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
284        }
285
286        Ok(VulnerabilityManager {
287            store,
288            sources: self.sources,
289            kev_source: KevSource::new(),
290            epss_source: EpssSource::new(),
291            ossindex_source: self.ossindex_source,
292            observer: self
293                .observer
294                .unwrap_or_else(|| Arc::new(TracingSyncObserver)),
295        })
296    }
297}
298
299/// Main vulnerability manager for syncing and querying advisories.
300pub struct VulnerabilityManager {
301    store: Arc<dyn AdvisoryStore + Send + Sync>,
302    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
303    kev_source: KevSource,
304    epss_source: EpssSource,
305    ossindex_source: Option<OssIndexSource>,
306    observer: Arc<dyn SyncObserver>,
307}
308
309impl VulnerabilityManager {
310    /// Create a new manager from a Config.
311    ///
312    /// This is a convenience method. For more control, use [`VulnerabilityManagerBuilder`].
313    pub async fn new(config: Config) -> Result<Self> {
314        let mut builder = VulnerabilityManagerBuilder::new()
315            .redis_url(&config.redis_url)
316            .store_config(config.store.clone());
317
318        // Add OSV source
319        builder = builder.with_osv_defaults();
320
321        // Add NVD source
322        builder = builder.with_nvd(config.nvd_api_key.clone());
323
324        // Add GHSA source if token is provided
325        if let Some(token) = &config.ghsa_token {
326            builder = builder.with_ghsa(token.clone());
327        }
328
329        // Add OSS Index source if configured
330        if config.ossindex.is_some() {
331            builder = builder.with_ossindex(config.ossindex.clone());
332        }
333
334        builder.build()
335    }
336
337    /// Create a builder for custom configuration.
338    pub fn builder() -> VulnerabilityManagerBuilder {
339        VulnerabilityManagerBuilder::new()
340    }
341
342    /// Get a reference to the underlying store.
343    pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
344        &self.store
345    }
346
347    /// Check the health of the store connection.
348    pub async fn health_check(&self) -> Result<HealthStatus> {
349        self.store.health_check().await
350    }
351
352    /// Sync advisories from all configured sources.
353    pub async fn sync_all(&self) -> Result<SyncStats> {
354        self.observer.on_sync_start();
355
356        let mut handles = Vec::new();
357        let mut stats = SyncStats {
358            total_sources: self.sources.len(),
359            ..Default::default()
360        };
361
362        for source in &self.sources {
363            let source = source.clone();
364            let store = self.store.clone();
365            let observer = self.observer.clone();
366
367            let handle = tokio::spawn(async move {
368                observer.on_source_start(source.name());
369
370                let last_sync = match store.last_sync(source.name()).await {
371                    Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
372                        Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
373                        Err(_) => None,
374                    },
375                    _ => None,
376                };
377
378                match source.fetch(last_sync).await {
379                    Ok(advisories) => {
380                        if !advisories.is_empty() {
381                            match store.upsert_batch(&advisories, source.name()).await {
382                                Ok(_) => {
383                                    observer.on_source_success(source.name(), advisories.len());
384                                    // Update timestamp only after successful storage
385                                    if let Err(e) = store.update_sync_timestamp(source.name()).await
386                                    {
387                                        let err = AdvisoryError::source_fetch(
388                                            source.name(),
389                                            format!("Failed to update timestamp: {}", e),
390                                        );
391                                        observer.on_source_error(source.name(), &err);
392                                        // Non-critical error, count as success but maybe log warn?
393                                        // Observer handles logging.
394                                    }
395                                    Ok((source.name().to_string(), advisories.len()))
396                                }
397                                Err(e) => {
398                                    // Store error is critical for this source
399                                    observer.on_source_error(source.name(), &e);
400                                    Err((source.name().to_string(), e.to_string()))
401                                }
402                            }
403                        } else {
404                            observer.on_source_success(source.name(), 0);
405                            // Update sync timestamp even if no new advisories
406                            if let Err(e) = store.update_sync_timestamp(source.name()).await {
407                                let err = AdvisoryError::source_fetch(
408                                    source.name(),
409                                    format!("Failed to update timestamp: {}", e),
410                                );
411                                observer.on_source_error(source.name(), &err);
412                            }
413                            Ok((source.name().to_string(), 0))
414                        }
415                    }
416                    Err(e) => {
417                        observer.on_source_error(source.name(), &e);
418                        Err((source.name().to_string(), e.to_string()))
419                    }
420                }
421            });
422            handles.push(handle);
423        }
424
425        // Wait for all tasks to complete
426        for handle in handles {
427            match handle.await {
428                Ok(result) => match result {
429                    Ok((_, count)) => {
430                        stats.successful_sources += 1;
431                        stats.total_advisories_synced += count;
432                    }
433                    Err((name, error)) => {
434                        stats.failed_sources += 1;
435                        stats.errors.insert(name, error);
436                    }
437                },
438                Err(e) => {
439                    // Task panic or join error
440                    error!("Task join error: {}", e);
441                    stats.failed_sources += 1;
442                    stats
443                        .errors
444                        .insert("unknown".to_string(), format!("Task join error: {}", e));
445                }
446            }
447        }
448
449        self.observer.on_sync_complete(&stats);
450        Ok(stats)
451    }
452
453    /// Reset the sync timestamp for a specific source.
454    ///
455    /// This forces a full re-sync on the next `sync_all()` call.
456    pub async fn reset_sync(&self, source: &str) -> Result<()> {
457        self.store.reset_sync_timestamp(source).await
458    }
459
460    /// Reset all sync timestamps, forcing a full re-sync of all sources.
461    pub async fn reset_all_syncs(&self) -> Result<()> {
462        for source in &self.sources {
463            self.store.reset_sync_timestamp(source.name()).await?;
464        }
465        Ok(())
466    }
467
468    /// Sync enrichment data (KEV and EPSS).
469    pub async fn sync_enrichment(&self) -> Result<()> {
470        self.sync_enrichment_with_cves(&[]).await
471    }
472
473    /// Sync enrichment data with optional extra CVE IDs to broaden EPSS coverage.
474    pub async fn sync_enrichment_with_cves(&self, extra_cves: &[String]) -> Result<()> {
475        info!("Syncing enrichment data (KEV, EPSS)...");
476
477        let mut enrichment: HashMap<String, EnrichmentData> = HashMap::new();
478
479        // Sync KEV data
480        match self.kev_source.fetch_catalog().await {
481            Ok(kev_entries) => {
482                info!("Processing {} KEV entries", kev_entries.len());
483                for (cve_id, entry) in kev_entries {
484                    let data = enrichment
485                        .entry(cve_id.clone())
486                        .or_insert_with(|| EnrichmentData {
487                            epss_score: None,
488                            epss_percentile: None,
489                            is_kev: false,
490                            kev_due_date: None,
491                            kev_date_added: None,
492                            kev_ransomware: None,
493                            updated_at: String::new(),
494                        });
495
496                    data.is_kev = true;
497                    data.kev_due_date = entry.due_date_utc().map(|d| d.to_rfc3339());
498                    data.kev_date_added = entry.date_added_utc().map(|d| d.to_rfc3339());
499                    data.kev_ransomware = Some(entry.is_ransomware_related());
500                }
501            }
502            Err(e) => {
503                error!("Failed to fetch KEV catalog: {}", e);
504            }
505        }
506
507        // Sync EPSS for known CVEs plus any extra provided by caller
508        let epss_targets = Self::collect_enrichment_targets(&enrichment, extra_cves);
509        if !epss_targets.is_empty() {
510            match self
511                .epss_source
512                .fetch_scores_batch(&epss_targets, 200)
513                .await
514            {
515                Ok(scores) => {
516                    Self::merge_epss_scores(&mut enrichment, scores);
517                }
518                Err(e) => {
519                    warn!("Failed to fetch EPSS scores: {}", e);
520                }
521            }
522        }
523
524        // Persist merged enrichment data
525        if !enrichment.is_empty() {
526            let now = chrono::Utc::now().to_rfc3339();
527            for (cve_id, mut data) in enrichment {
528                if data.updated_at.is_empty() {
529                    data.updated_at = now.clone();
530                }
531                if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
532                    debug!("Failed to store enrichment for {}: {}", cve_id, e);
533                }
534            }
535        }
536
537        Ok(())
538    }
539
540    /// Build the list of CVE IDs to request EPSS for.
541    fn collect_enrichment_targets(
542        current: &HashMap<String, EnrichmentData>,
543        extra: &[String],
544    ) -> Vec<String> {
545        let mut set: std::collections::HashSet<String> = current.keys().cloned().collect();
546        for c in extra {
547            set.insert(c.clone());
548        }
549        set.into_iter().collect()
550    }
551
552    /// Merge EPSS scores into enrichment map.
553    fn merge_epss_scores(
554        enrichment: &mut HashMap<String, EnrichmentData>,
555        scores: HashMap<String, crate::sources::epss::EpssScore>,
556    ) {
557        for (cve_id, score) in scores {
558            let data = enrichment
559                .entry(cve_id.clone())
560                .or_insert_with(|| EnrichmentData {
561                    epss_score: None,
562                    epss_percentile: None,
563                    is_kev: false,
564                    kev_due_date: None,
565                    kev_date_added: None,
566                    kev_ransomware: None,
567                    updated_at: String::new(),
568                });
569
570            data.epss_score = Some(score.epss);
571            data.epss_percentile = Some(score.percentile);
572            if let Some(date) = score.date_utc() {
573                data.updated_at = date.to_rfc3339();
574            }
575        }
576    }
577
578    /// Query advisories for a specific package.
579    pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
580        let advisories = self.store.get_by_package(ecosystem, package).await?;
581        Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
582    }
583
584    /// Query advisories with enrichment data.
585    pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
586        let mut advisories = self.query(ecosystem, package).await?;
587        self.enrich_advisories(&mut advisories).await?;
588        Ok(advisories)
589    }
590
591    /// Query multiple packages in a batch (concurrent).
592    ///
593    /// All queries run in parallel for maximum throughput.
594    pub async fn query_batch(
595        &self,
596        packages: &[PackageKey],
597    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
598        use futures_util::future::join_all;
599
600        let tasks: Vec<_> = packages
601            .iter()
602            .map(|pkg| {
603                let pkg = pkg.clone();
604                let ecosystem = pkg.ecosystem.clone();
605                let name = pkg.name.clone();
606                let version = pkg.version.clone();
607                let store = self.store.clone();
608
609                async move {
610                    let advisories = if let Some(ver) = &version {
611                        // For version matching, we need the full logic
612                        let all = store.get_by_package(&ecosystem, &name).await?;
613                        let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
614                        Self::filter_by_version(aggregated, &ecosystem, &name, ver)
615                    } else {
616                        let all = store.get_by_package(&ecosystem, &name).await?;
617                        crate::aggregator::ReportAggregator::aggregate(all)
618                    };
619                    Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
620                }
621            })
622            .collect();
623
624        let results: Vec<_> = join_all(tasks).await;
625
626        let mut map = HashMap::new();
627        for result in results {
628            match result {
629                Ok((pkg, advisories)) => {
630                    map.insert(pkg, advisories);
631                }
632                Err(e) => {
633                    warn!("Batch query error: {}", e);
634                }
635            }
636        }
637
638        Ok(map)
639    }
640
641    /// Filter advisories by version (static helper for concurrent batch queries)
642    fn filter_by_version(
643        advisories: Vec<Advisory>,
644        ecosystem: &str,
645        package: &str,
646        version: &str,
647    ) -> Vec<Advisory> {
648        advisories
649            .into_iter()
650            .filter(|advisory| {
651                for affected in &advisory.affected {
652                    if affected.package.name != package || affected.package.ecosystem != ecosystem {
653                        continue;
654                    }
655
656                    // Check explicit versions
657                    if affected.versions.contains(&version.to_string()) {
658                        return true;
659                    }
660
661                    // Check ranges
662                    for range in &affected.ranges {
663                        match range.range_type {
664                            RangeType::Semver => {
665                                if Self::matches_semver_range(version, &range.events) {
666                                    return true;
667                                }
668                            }
669                            RangeType::Ecosystem => {
670                                if Self::matches_ecosystem_range(version, &range.events) {
671                                    return true;
672                                }
673                            }
674                            RangeType::Git => {}
675                        }
676                    }
677                }
678                false
679            })
680            .collect()
681    }
682
683    /// Check if a specific package version is affected by any vulnerabilities.
684    pub async fn matches(
685        &self,
686        ecosystem: &str,
687        package: &str,
688        version: &str,
689    ) -> Result<Vec<Advisory>> {
690        self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
691            .await
692    }
693
694    /// Check if a package version is affected, with filtering options.
695    pub async fn matches_with_options(
696        &self,
697        ecosystem: &str,
698        package: &str,
699        version: &str,
700        options: &MatchOptions,
701    ) -> Result<Vec<Advisory>> {
702        let advisories = self.query(ecosystem, package).await?;
703        let mut matched = Vec::new();
704
705        for mut advisory in advisories {
706            let mut is_vulnerable = false;
707            for affected in &advisory.affected {
708                if affected.package.name != package || affected.package.ecosystem != ecosystem {
709                    continue;
710                }
711
712                // Check explicit versions
713                if affected.versions.contains(&version.to_string()) {
714                    is_vulnerable = true;
715                    break;
716                }
717
718                // Check ranges
719                for range in &affected.ranges {
720                    match range.range_type {
721                        RangeType::Semver => {
722                            if Self::matches_semver_range(version, &range.events) {
723                                is_vulnerable = true;
724                                break;
725                            }
726                        }
727                        RangeType::Ecosystem => {
728                            if Self::matches_ecosystem_range(version, &range.events) {
729                                is_vulnerable = true;
730                                break;
731                            }
732                        }
733                        RangeType::Git => {
734                            // Git ranges require commit hash comparison, skip for now
735                        }
736                    }
737                }
738                if is_vulnerable {
739                    break;
740                }
741            }
742
743            if is_vulnerable {
744                // Apply enrichment if requested
745                if options.include_enrichment {
746                    self.enrich_advisory(&mut advisory).await?;
747                }
748
749                // Apply filters
750                if self.advisory_passes_filters(&advisory, options) {
751                    matched.push(advisory);
752                }
753            }
754        }
755
756        Ok(matched)
757    }
758
759    /// Check if a version matches any semver interval described by OSV events.
760    ///
761    /// OSV allows multiple introduced/fixed pairs; we evaluate each interval in order.
762    fn matches_semver_range(version: &str, events: &[Event]) -> bool {
763        let Ok(v) = semver::Version::parse(version) else {
764            return false;
765        };
766
767        #[derive(Default)]
768        struct Interval {
769            start: Option<semver::Version>,
770            end: Option<semver::Version>,
771            end_inclusive: bool,
772        }
773
774        let mut intervals: Vec<Interval> = Vec::new();
775        let mut current_start: Option<semver::Version> = None;
776
777        for event in events {
778            match event {
779                Event::Introduced(ver) => {
780                    if let Ok(parsed) = semver::Version::parse(ver) {
781                        current_start = Some(parsed);
782                    } else if ver == "0" {
783                        current_start = Some(semver::Version::new(0, 0, 0));
784                    }
785                }
786                Event::Fixed(ver) => {
787                    let end = semver::Version::parse(ver).ok();
788                    intervals.push(Interval {
789                        start: current_start.clone(),
790                        end,
791                        end_inclusive: false,
792                    });
793                    current_start = None;
794                }
795                Event::LastAffected(ver) => {
796                    let end = semver::Version::parse(ver).ok();
797                    intervals.push(Interval {
798                        start: current_start.clone(),
799                        end,
800                        end_inclusive: true,
801                    });
802                    current_start = None;
803                }
804                Event::Limit(ver) => {
805                    // Treat limit as an exclusive upper bound for any open interval.
806                    let end = semver::Version::parse(ver).ok();
807                    intervals.push(Interval {
808                        start: current_start.clone(),
809                        end,
810                        end_inclusive: false,
811                    });
812                    current_start = None;
813                }
814            }
815        }
816
817        // Open-ended interval from the last introduction.
818        if current_start.is_some() {
819            intervals.push(Interval {
820                start: current_start,
821                end: None,
822                end_inclusive: false,
823            });
824        }
825
826        intervals.into_iter().any(|interval| {
827            if let Some(start) = &interval.start {
828                if v < *start {
829                    return false;
830                }
831            }
832
833            match (&interval.end, interval.end_inclusive) {
834                (Some(end), true) => v <= *end,
835                (Some(end), false) => v < *end,
836                (None, _) => true,
837            }
838        })
839    }
840
841    /// Check if a version matches an ecosystem range. Falls back to semver if both parse as semver,
842    /// otherwise uses dotted numeric comparison (e.g., "1.10" > "1.2").
843    fn matches_ecosystem_range(version: &str, events: &[Event]) -> bool {
844        // Try semver first; if any boundary fails semver parsing, fall back to dotted.
845        if events.iter().all(|e| match e {
846            Event::Introduced(v) | Event::Fixed(v) | Event::LastAffected(v) | Event::Limit(v) => {
847                semver::Version::parse(v).is_ok() || v == "0"
848            }
849        }) {
850            return Self::matches_semver_range(version, events);
851        }
852
853        let version_parts = match Self::parse_dotted(version) {
854            Some(p) => p,
855            None => return false,
856        };
857
858        #[derive(Default)]
859        struct Interval {
860            start: Option<Vec<u64>>,
861            end: Option<Vec<u64>>,
862            end_inclusive: bool,
863        }
864
865        let mut intervals: Vec<Interval> = Vec::new();
866        let mut current_start: Option<Vec<u64>> = None;
867
868        for event in events {
869            match event {
870                Event::Introduced(ver) => {
871                    current_start = Self::parse_dotted(ver);
872                }
873                Event::Fixed(ver) => {
874                    intervals.push(Interval {
875                        start: current_start.clone(),
876                        end: Self::parse_dotted(ver),
877                        end_inclusive: false,
878                    });
879                    current_start = None;
880                }
881                Event::LastAffected(ver) => {
882                    intervals.push(Interval {
883                        start: current_start.clone(),
884                        end: Self::parse_dotted(ver),
885                        end_inclusive: true,
886                    });
887                    current_start = None;
888                }
889                Event::Limit(ver) => {
890                    intervals.push(Interval {
891                        start: current_start.clone(),
892                        end: Self::parse_dotted(ver),
893                        end_inclusive: false,
894                    });
895                    current_start = None;
896                }
897            }
898        }
899
900        if current_start.is_some() {
901            intervals.push(Interval {
902                start: current_start,
903                end: None,
904                end_inclusive: false,
905            });
906        }
907
908        intervals.into_iter().any(|interval| {
909            if let Some(start) = &interval.start {
910                if Self::cmp_dotted(&version_parts, start) == Ordering::Less {
911                    return false;
912                }
913            }
914
915            match (&interval.end, interval.end_inclusive) {
916                (Some(end), true) => Self::cmp_dotted(&version_parts, end) != Ordering::Greater,
917                (Some(end), false) => Self::cmp_dotted(&version_parts, end) == Ordering::Less,
918                (None, _) => true,
919            }
920        })
921    }
922
923    /// Parse dotted numeric versions (e.g., "1.2.10"). Non-numeric segments cause failure.
924    fn parse_dotted(v: &str) -> Option<Vec<u64>> {
925        let mut parts = Vec::new();
926        for chunk in v.split(|c: char| !c.is_ascii_digit()) {
927            if chunk.is_empty() {
928                continue;
929            }
930            let Ok(num) = chunk.parse::<u64>() else {
931                return None;
932            };
933            parts.push(num);
934        }
935        if parts.is_empty() { None } else { Some(parts) }
936    }
937
938    /// Compare dotted numeric versions.
939    fn cmp_dotted(a: &[u64], b: &[u64]) -> Ordering {
940        let max_len = a.len().max(b.len());
941        for i in 0..max_len {
942            let ai = *a.get(i).unwrap_or(&0);
943            let bi = *b.get(i).unwrap_or(&0);
944            match ai.cmp(&bi) {
945                Ordering::Equal => continue,
946                ord => return ord,
947            }
948        }
949        Ordering::Equal
950    }
951
952    /// Enrich a single advisory with EPSS/KEV data.
953    async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
954        // Find CVE aliases
955        let cve_ids = Self::extract_cve_ids(advisory);
956
957        if cve_ids.is_empty() {
958            return Ok(());
959        }
960
961        // Look up enrichment data
962        for cve_id in &cve_ids {
963            if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
964                let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
965                enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
966                enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
967                enrichment.is_kev = enrichment.is_kev || data.is_kev;
968                if data.kev_due_date.is_some() {
969                    enrichment.kev_due_date = data
970                        .kev_due_date
971                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
972                        .map(|d| d.with_timezone(&chrono::Utc));
973                }
974                if data.kev_ransomware.is_some() {
975                    enrichment.kev_ransomware = data.kev_ransomware;
976                }
977            }
978        }
979
980        Ok(())
981    }
982
983    /// Enrich multiple advisories with EPSS/KEV data.
984    async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
985        for advisory in advisories.iter_mut() {
986            self.enrich_advisory(advisory).await?;
987        }
988        Ok(())
989    }
990
991    /// Extract CVE IDs from an advisory (from ID or aliases).
992    fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
993        let mut cve_ids = Vec::new();
994
995        if advisory.id.starts_with("CVE-") {
996            cve_ids.push(advisory.id.clone());
997        }
998
999        if let Some(aliases) = &advisory.aliases {
1000            for alias in aliases {
1001                if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
1002                    cve_ids.push(alias.clone());
1003                }
1004            }
1005        }
1006
1007        cve_ids
1008    }
1009
1010    /// Check if an advisory passes the filter options.
1011    fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
1012        // Check KEV filter
1013        if options.kev_only {
1014            let is_kev = advisory
1015                .enrichment
1016                .as_ref()
1017                .map(|e| e.is_kev)
1018                .unwrap_or(false);
1019            if !is_kev {
1020                return false;
1021            }
1022        }
1023
1024        // Check CVSS filter
1025        if let Some(min_cvss) = options.min_cvss {
1026            let cvss = advisory
1027                .enrichment
1028                .as_ref()
1029                .and_then(|e| e.cvss_v3_score)
1030                .unwrap_or(0.0);
1031            if cvss < min_cvss {
1032                return false;
1033            }
1034        }
1035
1036        // Check EPSS filter
1037        if let Some(min_epss) = options.min_epss {
1038            let epss = advisory
1039                .enrichment
1040                .as_ref()
1041                .and_then(|e| e.epss_score)
1042                .unwrap_or(0.0);
1043            if epss < min_epss {
1044                return false;
1045            }
1046        }
1047
1048        // Check severity filter
1049        if let Some(min_severity) = &options.min_severity {
1050            let severity = advisory
1051                .enrichment
1052                .as_ref()
1053                .and_then(|e| e.cvss_v3_severity)
1054                .unwrap_or(Severity::None);
1055            if severity < *min_severity {
1056                return false;
1057            }
1058        }
1059
1060        true
1061    }
1062
1063    /// Fetch live EPSS scores for CVEs (not from cache).
1064    pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
1065        let scores = self.epss_source.fetch_scores(cve_ids).await?;
1066        Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
1067    }
1068
1069    /// Check if a CVE is in the CISA KEV catalog.
1070    pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
1071        // Check cache first
1072        if let Some(data) = self.store.get_enrichment(cve_id).await? {
1073            return Ok(data.is_kev);
1074        }
1075
1076        // Fetch from source
1077        let entry = self.kev_source.is_kev(cve_id).await?;
1078        Ok(entry.is_some())
1079    }
1080
1081    // === OSS Index Methods ===
1082
1083    /// Query OSS Index for vulnerabilities affecting the given PURLs.
1084    ///
1085    /// This method provides automatic caching:
1086    /// - First checks the cache for each PURL
1087    /// - Only queries OSS Index for cache misses
1088    /// - Caches results for future queries
1089    ///
1090    /// # Arguments
1091    ///
1092    /// * `purls` - Package URLs to query (e.g., "pkg:npm/lodash@4.17.20")
1093    ///
1094    /// # Returns
1095    ///
1096    /// Vector of advisories for all vulnerabilities found.
1097    ///
1098    /// # Errors
1099    ///
1100    /// Returns an error if OSS Index is not configured or if the query fails.
1101    ///
1102    /// # Example
1103    ///
1104    /// ```rust,ignore
1105    /// use vulnera_advisors::{VulnerabilityManager, Purl};
1106    ///
1107    /// let manager = VulnerabilityManager::builder()
1108    ///     .redis_url("redis://localhost:6379")
1109    ///     .with_ossindex(None)
1110    ///     .build()?;
1111    ///
1112    /// let purls = vec![
1113    ///     Purl::new("npm", "lodash").with_version("4.17.20").to_string(),
1114    /// ];
1115    ///
1116    /// let advisories = manager.query_ossindex(&purls).await?;
1117    /// ```
1118    pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
1119        let source = self.ossindex_source.as_ref().ok_or_else(|| {
1120            AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
1121        })?;
1122
1123        // Check cache for all PURLs
1124        let mut cached_advisories = Vec::new();
1125        let mut cache_misses = Vec::new();
1126
1127        for purl in purls {
1128            let cache_key = Purl::cache_key_from_str(purl);
1129            match self.store.get_ossindex_cache(&cache_key).await {
1130                Ok(Some(cache)) if !cache.is_expired() => {
1131                    debug!("OSS Index cache hit for {}", purl);
1132                    cached_advisories.extend(cache.advisories);
1133                }
1134                _ => {
1135                    debug!("OSS Index cache miss for {}", purl);
1136                    cache_misses.push(purl.clone());
1137                }
1138            }
1139        }
1140
1141        // Query OSS Index for cache misses
1142        if !cache_misses.is_empty() {
1143            debug!("Querying OSS Index for {} cache misses", cache_misses.len());
1144            let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
1145                AdvisoryError::SourceFetch {
1146                    source_name: "ossindex".to_string(),
1147                    message: e.to_string(),
1148                }
1149            })?;
1150
1151            // Group advisories by PURL for caching
1152            let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
1153
1154            // Cache results for each PURL
1155            for (purl, advisories) in &advisory_map {
1156                let cache_key = Purl::cache_key_from_str(purl);
1157                let cache = OssIndexCache::new(advisories.clone());
1158                if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
1159                    debug!("Failed to cache OSS Index result for {}: {}", purl, e);
1160                }
1161            }
1162
1163            // Flatten and add to results
1164            for advisories in advisory_map.into_values() {
1165                cached_advisories.extend(advisories);
1166            }
1167        }
1168
1169        Ok(cached_advisories)
1170    }
1171
1172    /// Query OSS Index for vulnerabilities with fallback to stored advisories.
1173    ///
1174    /// This method first queries OSS Index, then falls back to the local store
1175    /// if the OSS Index query fails or returns no results.
1176    ///
1177    /// # Arguments
1178    ///
1179    /// * `packages` - List of packages to query (ecosystem, name, optional version)
1180    ///
1181    /// # Returns
1182    ///
1183    /// Map of package keys to their advisories.
1184    pub async fn query_batch_with_ossindex(
1185        &self,
1186        packages: &[PackageKey],
1187    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
1188        let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
1189
1190        // Build PURLs for packages that have versions
1191        let (with_version, without_version): (Vec<_>, Vec<_>) =
1192            packages.iter().partition(|p| p.version.is_some());
1193
1194        // Query OSS Index for packages with versions
1195        if !with_version.is_empty() && self.ossindex_source.is_some() {
1196            let purls: Vec<String> = with_version
1197                .iter()
1198                .map(|p| {
1199                    Purl::new(&p.ecosystem, &p.name)
1200                        .with_version(p.version.as_ref().unwrap())
1201                        .to_string()
1202                })
1203                .collect();
1204
1205            match self.query_ossindex(&purls).await {
1206                Ok(advisories) => {
1207                    // Group advisories by package key
1208                    for pkg in &with_version {
1209                        let pkg_advisories: Vec<_> = advisories
1210                            .iter()
1211                            .filter(|a| {
1212                                a.affected.iter().any(|aff| {
1213                                    aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
1214                                        && aff.package.name == pkg.name
1215                                })
1216                            })
1217                            .cloned()
1218                            .collect();
1219                        results.insert((*pkg).clone(), pkg_advisories);
1220                    }
1221                }
1222                Err(e) => {
1223                    warn!("OSS Index query failed, falling back to local store: {}", e);
1224                    // Fallback to local store
1225                    for pkg in &with_version {
1226                        let advisories = if let Some(version) = &pkg.version {
1227                            self.matches(&pkg.ecosystem, &pkg.name, version).await?
1228                        } else {
1229                            self.query(&pkg.ecosystem, &pkg.name).await?
1230                        };
1231                        results.insert((*pkg).clone(), advisories);
1232                    }
1233                }
1234            }
1235        }
1236
1237        // Query local store for packages without versions
1238        for pkg in &without_version {
1239            let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
1240            results.insert((*pkg).clone(), advisories);
1241        }
1242
1243        Ok(results)
1244    }
1245
1246    /// Invalidate cached OSS Index results for specific PURLs.
1247    ///
1248    /// Use this to force a fresh query on the next call.
1249    pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
1250        for purl in purls {
1251            let cache_key = Purl::cache_key_from_str(purl);
1252            self.store.invalidate_ossindex_cache(&cache_key).await?;
1253        }
1254        Ok(())
1255    }
1256
1257    /// Invalidate all cached OSS Index results.
1258    pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
1259        self.store.invalidate_all_ossindex_cache().await?;
1260        Ok(())
1261    }
1262
1263    /// Group advisories by their associated PURL.
1264    fn group_advisories_by_purl(
1265        purls: &[String],
1266        advisories: &[Advisory],
1267    ) -> HashMap<String, Vec<Advisory>> {
1268        let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
1269
1270        // Initialize map with empty vectors for all PURLs
1271        for purl in purls {
1272            map.insert(purl.clone(), Vec::new());
1273        }
1274
1275        for advisory in advisories {
1276            for affected in &advisory.affected {
1277                for purl in purls {
1278                    let Ok(parsed) = Purl::parse(purl) else {
1279                        continue;
1280                    };
1281
1282                    // Match on ecosystem as well as name to avoid cross-ecosystem collisions
1283                    let affected_eco = affected.package.ecosystem.to_lowercase();
1284                    let purl_eco = parsed.purl_type.to_lowercase();
1285                    let purl_eco_alt = parsed.ecosystem().to_lowercase();
1286                    if affected_eco != purl_eco && affected_eco != purl_eco_alt {
1287                        continue;
1288                    }
1289
1290                    if parsed.name != affected.package.name {
1291                        continue;
1292                    }
1293
1294                    if let Some(ver) = parsed.version.as_deref() {
1295                        // If a version is specified, ensure the advisory actually covers it.
1296                        let version_matches = affected.versions.contains(&ver.to_string())
1297                            || affected.ranges.iter().any(|r| {
1298                                matches!(r.range_type, RangeType::Semver | RangeType::Ecosystem)
1299                                    && Self::matches_semver_range(ver, &r.events)
1300                            });
1301
1302                        if !version_matches {
1303                            continue;
1304                        }
1305                    }
1306
1307                    map.entry(purl.clone()).or_default().push(advisory.clone());
1308                    break;
1309                }
1310            }
1311        }
1312
1313        map
1314    }
1315}