vulnera_advisor/
manager.rs

1//! Vulnerability manager for orchestrating syncs and queries.
2//!
3//! The [`VulnerabilityManager`] is the main entry point for using this crate.
4//! Use [`VulnerabilityManagerBuilder`] for flexible configuration.
5
6use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tracing::{debug, error, info, warn};
19
20/// Options for filtering vulnerability matches.
21#[derive(Debug, Clone, Default)]
22pub struct MatchOptions {
23    /// Minimum CVSS v3 score (0.0 - 10.0).
24    pub min_cvss: Option<f64>,
25    /// Minimum EPSS score (0.0 - 1.0).
26    pub min_epss: Option<f64>,
27    /// Only return KEV (actively exploited) vulnerabilities.
28    pub kev_only: bool,
29    /// Minimum severity level.
30    pub min_severity: Option<Severity>,
31    /// Include enrichment data (EPSS, KEV) in results.
32    pub include_enrichment: bool,
33}
34
35impl MatchOptions {
36    /// Create options that include all vulnerabilities with enrichment.
37    pub fn with_enrichment() -> Self {
38        Self {
39            include_enrichment: true,
40            ..Default::default()
41        }
42    }
43
44    /// Create options for high-severity vulnerabilities only.
45    pub fn high_severity() -> Self {
46        Self {
47            min_severity: Some(Severity::High),
48            include_enrichment: true,
49            ..Default::default()
50        }
51    }
52
53    /// Create options for actively exploited vulnerabilities only.
54    pub fn exploited_only() -> Self {
55        Self {
56            kev_only: true,
57            include_enrichment: true,
58            ..Default::default()
59        }
60    }
61}
62
63/// A key identifying a package for batch queries.
64#[derive(Debug, Clone, Hash, PartialEq, Eq)]
65pub struct PackageKey {
66    /// Package ecosystem (e.g., "npm", "PyPI").
67    pub ecosystem: String,
68    /// Package name.
69    pub name: String,
70    /// Optional version for matching.
71    pub version: Option<String>,
72}
73
74impl PackageKey {
75    /// Create a new package key.
76    pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
77        Self {
78            ecosystem: ecosystem.into(),
79            name: name.into(),
80            version: None,
81        }
82    }
83
84    /// Create a package key with a version.
85    pub fn with_version(
86        ecosystem: impl Into<String>,
87        name: impl Into<String>,
88        version: impl Into<String>,
89    ) -> Self {
90        Self {
91            ecosystem: ecosystem.into(),
92            name: name.into(),
93            version: Some(version.into()),
94        }
95    }
96}
97
98/// Builder for configuring VulnerabilityManager.
99pub struct VulnerabilityManagerBuilder {
100    redis_url: Option<String>,
101    store_config: StoreConfig,
102    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
103    custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
104    ossindex_source: Option<OssIndexSource>,
105}
106
107impl Default for VulnerabilityManagerBuilder {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl VulnerabilityManagerBuilder {
114    /// Create a new builder with default settings.
115    pub fn new() -> Self {
116        Self {
117            redis_url: None,
118            store_config: StoreConfig::default(),
119            sources: Vec::new(),
120            custom_store: None,
121            ossindex_source: None,
122        }
123    }
124
125    /// Set the Redis connection URL.
126    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
127        self.redis_url = Some(url.into());
128        self
129    }
130
131    /// Set the store configuration.
132    pub fn store_config(mut self, config: StoreConfig) -> Self {
133        self.store_config = config;
134        self
135    }
136
137    /// Use a custom store implementation.
138    pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
139        self.custom_store = Some(store);
140        self
141    }
142
143    /// Add a vulnerability source.
144    pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
145        self.sources.push(source);
146        self
147    }
148
149    /// Add the GHSA source with the given token.
150    pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
151        self.sources.push(Arc::new(GHSASource::new(token.into())));
152        self
153    }
154
155    /// Add the NVD source with optional API key.
156    pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
157        self.sources.push(Arc::new(NVDSource::new(api_key)));
158        self
159    }
160
161    /// Add the OSV source for specified ecosystems.
162    pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
163        self.sources.push(Arc::new(OSVSource::new(ecosystems)));
164        self
165    }
166
167    /// Add default OSV ecosystems.
168    pub fn with_osv_defaults(self) -> Self {
169        self.with_osv(vec![
170            "npm".to_string(),
171            "PyPI".to_string(),
172            "Maven".to_string(),
173            "crates.io".to_string(),
174            "Go".to_string(),
175            "Packagist".to_string(),
176            "RubyGems".to_string(),
177            "NuGet".to_string(),
178        ])
179    }
180
181    /// Add the OSS Index source with optional configuration.
182    ///
183    /// OSS Index provides on-demand vulnerability queries by PURL.
184    /// If no config is provided, credentials are loaded from environment variables.
185    pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
186        match OssIndexSource::new(config) {
187            Ok(source) => {
188                self.ossindex_source = Some(source);
189            }
190            Err(e) => {
191                warn!("Failed to configure OSS Index source: {}", e);
192            }
193        }
194        self
195    }
196
197    /// Build the VulnerabilityManager.
198    pub fn build(self) -> Result<VulnerabilityManager> {
199        let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
200            Some(s) => s,
201            None => {
202                let url = self.redis_url.ok_or_else(|| {
203                    AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
204                })?;
205                Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
206            }
207        };
208
209        if self.sources.is_empty() {
210            warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
211        }
212
213        Ok(VulnerabilityManager {
214            store,
215            sources: self.sources,
216            kev_source: KevSource::new(),
217            epss_source: EpssSource::new(),
218            ossindex_source: self.ossindex_source,
219        })
220    }
221}
222
223/// Main vulnerability manager for syncing and querying advisories.
224pub struct VulnerabilityManager {
225    store: Arc<dyn AdvisoryStore + Send + Sync>,
226    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
227    kev_source: KevSource,
228    epss_source: EpssSource,
229    ossindex_source: Option<OssIndexSource>,
230}
231
232impl VulnerabilityManager {
233    /// Create a new manager from a Config.
234    ///
235    /// This is a convenience method. For more control, use [`VulnerabilityManagerBuilder`].
236    pub async fn new(config: Config) -> Result<Self> {
237        let mut builder = VulnerabilityManagerBuilder::new()
238            .redis_url(&config.redis_url)
239            .store_config(config.store.clone());
240
241        // Add OSV source
242        builder = builder.with_osv_defaults();
243
244        // Add NVD source
245        builder = builder.with_nvd(config.nvd_api_key.clone());
246
247        // Add GHSA source if token is provided
248        if let Some(token) = &config.ghsa_token {
249            builder = builder.with_ghsa(token.clone());
250        }
251
252        // Add OSS Index source if configured
253        if config.ossindex.is_some() {
254            builder = builder.with_ossindex(config.ossindex.clone());
255        }
256
257        builder.build()
258    }
259
260    /// Create a builder for custom configuration.
261    pub fn builder() -> VulnerabilityManagerBuilder {
262        VulnerabilityManagerBuilder::new()
263    }
264
265    /// Get a reference to the underlying store.
266    pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
267        &self.store
268    }
269
270    /// Check the health of the store connection.
271    pub async fn health_check(&self) -> Result<HealthStatus> {
272        self.store.health_check().await
273    }
274
275    /// Sync advisories from all configured sources.
276    pub async fn sync_all(&self) -> Result<()> {
277        info!("Starting full vulnerability sync...");
278
279        let mut handles = Vec::new();
280
281        for source in &self.sources {
282            let source = source.clone();
283            let store = self.store.clone();
284
285            let handle = tokio::spawn(async move {
286                let last_sync = match store.last_sync(source.name()).await {
287                    Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
288                        Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
289                        Err(_) => None,
290                    },
291                    _ => None,
292                };
293
294                if let Some(since) = last_sync {
295                    info!("Syncing {} since {}", source.name(), since);
296                } else {
297                    info!("Syncing {} (full)", source.name());
298                }
299
300                match source.fetch(last_sync).await {
301                    Ok(advisories) => {
302                        if !advisories.is_empty() {
303                            match store.upsert_batch(&advisories, source.name()).await {
304                                Ok(_) => {
305                                    info!(
306                                        "Successfully synced {} advisories from {}",
307                                        advisories.len(),
308                                        source.name()
309                                    );
310                                    // Update timestamp only after successful storage
311                                    if let Err(e) = store.update_sync_timestamp(source.name()).await
312                                    {
313                                        error!(
314                                            "Failed to update sync timestamp for {}: {}",
315                                            source.name(),
316                                            e
317                                        );
318                                    }
319                                }
320                                Err(e) => {
321                                    error!(
322                                        "Failed to store advisories for {}: {}",
323                                        source.name(),
324                                        e
325                                    );
326                                    // Do NOT update timestamp on storage failure
327                                }
328                            }
329                        } else {
330                            info!("No new advisories for {}", source.name());
331                            // Update sync timestamp even if no new advisories (successful check)
332                            if let Err(e) = store.update_sync_timestamp(source.name()).await {
333                                error!(
334                                    "Failed to update sync timestamp for {}: {}",
335                                    source.name(),
336                                    e
337                                );
338                            }
339                        }
340                    }
341                    Err(e) => {
342                        error!("Failed to fetch from {}: {}", source.name(), e);
343                        // Do NOT update timestamp on fetch failure
344                    }
345                }
346            });
347            handles.push(handle);
348        }
349
350        // Wait for all tasks to complete
351        for handle in handles {
352            if let Err(e) = handle.await {
353                error!("Task join error: {}", e);
354            }
355        }
356
357        info!("Sync completed.");
358        Ok(())
359    }
360
361    /// Reset the sync timestamp for a specific source.
362    ///
363    /// This forces a full re-sync on the next `sync_all()` call.
364    pub async fn reset_sync(&self, source: &str) -> Result<()> {
365        self.store.reset_sync_timestamp(source).await
366    }
367
368    /// Reset all sync timestamps, forcing a full re-sync of all sources.
369    pub async fn reset_all_syncs(&self) -> Result<()> {
370        for source in &self.sources {
371            self.store.reset_sync_timestamp(source.name()).await?;
372        }
373        Ok(())
374    }
375
376    /// Sync enrichment data (KEV and EPSS).
377    pub async fn sync_enrichment(&self) -> Result<()> {
378        self.sync_enrichment_with_cves(&[]).await
379    }
380
381    /// Sync enrichment data with optional extra CVE IDs to broaden EPSS coverage.
382    pub async fn sync_enrichment_with_cves(&self, extra_cves: &[String]) -> Result<()> {
383        info!("Syncing enrichment data (KEV, EPSS)...");
384
385        let mut enrichment: HashMap<String, EnrichmentData> = HashMap::new();
386
387        // Sync KEV data
388        match self.kev_source.fetch_catalog().await {
389            Ok(kev_entries) => {
390                info!("Processing {} KEV entries", kev_entries.len());
391                for (cve_id, entry) in kev_entries {
392                    let data = enrichment
393                        .entry(cve_id.clone())
394                        .or_insert_with(|| EnrichmentData {
395                            epss_score: None,
396                            epss_percentile: None,
397                            is_kev: false,
398                            kev_due_date: None,
399                            kev_date_added: None,
400                            kev_ransomware: None,
401                            updated_at: String::new(),
402                        });
403
404                    data.is_kev = true;
405                    data.kev_due_date = entry.due_date_utc().map(|d| d.to_rfc3339());
406                    data.kev_date_added = entry.date_added_utc().map(|d| d.to_rfc3339());
407                    data.kev_ransomware = Some(entry.is_ransomware_related());
408                }
409            }
410            Err(e) => {
411                error!("Failed to fetch KEV catalog: {}", e);
412            }
413        }
414
415        // Sync EPSS for known CVEs plus any extra provided by caller
416        let epss_targets = Self::collect_enrichment_targets(&enrichment, extra_cves);
417        if !epss_targets.is_empty() {
418            match self
419                .epss_source
420                .fetch_scores_batch(&epss_targets, 200)
421                .await
422            {
423                Ok(scores) => {
424                    Self::merge_epss_scores(&mut enrichment, scores);
425                }
426                Err(e) => {
427                    warn!("Failed to fetch EPSS scores: {}", e);
428                }
429            }
430        }
431
432        // Persist merged enrichment data
433        if !enrichment.is_empty() {
434            let now = chrono::Utc::now().to_rfc3339();
435            for (cve_id, mut data) in enrichment {
436                if data.updated_at.is_empty() {
437                    data.updated_at = now.clone();
438                }
439                if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
440                    debug!("Failed to store enrichment for {}: {}", cve_id, e);
441                }
442            }
443        }
444
445        Ok(())
446    }
447
448    /// Build the list of CVE IDs to request EPSS for.
449    fn collect_enrichment_targets(
450        current: &HashMap<String, EnrichmentData>,
451        extra: &[String],
452    ) -> Vec<String> {
453        let mut set: std::collections::HashSet<String> = current.keys().cloned().collect();
454        for c in extra {
455            set.insert(c.clone());
456        }
457        set.into_iter().collect()
458    }
459
460    /// Merge EPSS scores into enrichment map.
461    fn merge_epss_scores(
462        enrichment: &mut HashMap<String, EnrichmentData>,
463        scores: HashMap<String, crate::sources::epss::EpssScore>,
464    ) {
465        for (cve_id, score) in scores {
466            let data = enrichment
467                .entry(cve_id.clone())
468                .or_insert_with(|| EnrichmentData {
469                    epss_score: None,
470                    epss_percentile: None,
471                    is_kev: false,
472                    kev_due_date: None,
473                    kev_date_added: None,
474                    kev_ransomware: None,
475                    updated_at: String::new(),
476                });
477
478            data.epss_score = Some(score.epss);
479            data.epss_percentile = Some(score.percentile);
480            if let Some(date) = score.date_utc() {
481                data.updated_at = date.to_rfc3339();
482            }
483        }
484    }
485
486    /// Query advisories for a specific package.
487    pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
488        let advisories = self.store.get_by_package(ecosystem, package).await?;
489        Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
490    }
491
492    /// Query advisories with enrichment data.
493    pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
494        let mut advisories = self.query(ecosystem, package).await?;
495        self.enrich_advisories(&mut advisories).await?;
496        Ok(advisories)
497    }
498
499    /// Query multiple packages in a batch (concurrent).
500    ///
501    /// All queries run in parallel for maximum throughput.
502    pub async fn query_batch(
503        &self,
504        packages: &[PackageKey],
505    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
506        use futures_util::future::join_all;
507
508        let tasks: Vec<_> = packages
509            .iter()
510            .map(|pkg| {
511                let pkg = pkg.clone();
512                let ecosystem = pkg.ecosystem.clone();
513                let name = pkg.name.clone();
514                let version = pkg.version.clone();
515                let store = self.store.clone();
516
517                async move {
518                    let advisories = if let Some(ver) = &version {
519                        // For version matching, we need the full logic
520                        let all = store.get_by_package(&ecosystem, &name).await?;
521                        let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
522                        Self::filter_by_version(aggregated, &ecosystem, &name, ver)
523                    } else {
524                        let all = store.get_by_package(&ecosystem, &name).await?;
525                        crate::aggregator::ReportAggregator::aggregate(all)
526                    };
527                    Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
528                }
529            })
530            .collect();
531
532        let results: Vec<_> = join_all(tasks).await;
533
534        let mut map = HashMap::new();
535        for result in results {
536            match result {
537                Ok((pkg, advisories)) => {
538                    map.insert(pkg, advisories);
539                }
540                Err(e) => {
541                    warn!("Batch query error: {}", e);
542                }
543            }
544        }
545
546        Ok(map)
547    }
548
549    /// Filter advisories by version (static helper for concurrent batch queries)
550    fn filter_by_version(
551        advisories: Vec<Advisory>,
552        ecosystem: &str,
553        package: &str,
554        version: &str,
555    ) -> Vec<Advisory> {
556        advisories
557            .into_iter()
558            .filter(|advisory| {
559                for affected in &advisory.affected {
560                    if affected.package.name != package || affected.package.ecosystem != ecosystem {
561                        continue;
562                    }
563
564                    // Check explicit versions
565                    if affected.versions.contains(&version.to_string()) {
566                        return true;
567                    }
568
569                    // Check ranges
570                    for range in &affected.ranges {
571                        match range.range_type {
572                            RangeType::Semver => {
573                                if Self::matches_semver_range(version, &range.events) {
574                                    return true;
575                                }
576                            }
577                            RangeType::Ecosystem => {
578                                if Self::matches_ecosystem_range(version, &range.events) {
579                                    return true;
580                                }
581                            }
582                            RangeType::Git => {}
583                        }
584                    }
585                }
586                false
587            })
588            .collect()
589    }
590
591    /// Check if a specific package version is affected by any vulnerabilities.
592    pub async fn matches(
593        &self,
594        ecosystem: &str,
595        package: &str,
596        version: &str,
597    ) -> Result<Vec<Advisory>> {
598        self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
599            .await
600    }
601
602    /// Check if a package version is affected, with filtering options.
603    pub async fn matches_with_options(
604        &self,
605        ecosystem: &str,
606        package: &str,
607        version: &str,
608        options: &MatchOptions,
609    ) -> Result<Vec<Advisory>> {
610        let advisories = self.query(ecosystem, package).await?;
611        let mut matched = Vec::new();
612
613        for mut advisory in advisories {
614            let mut is_vulnerable = false;
615            for affected in &advisory.affected {
616                if affected.package.name != package || affected.package.ecosystem != ecosystem {
617                    continue;
618                }
619
620                // Check explicit versions
621                if affected.versions.contains(&version.to_string()) {
622                    is_vulnerable = true;
623                    break;
624                }
625
626                // Check ranges
627                for range in &affected.ranges {
628                    match range.range_type {
629                        RangeType::Semver => {
630                            if Self::matches_semver_range(version, &range.events) {
631                                is_vulnerable = true;
632                                break;
633                            }
634                        }
635                        RangeType::Ecosystem => {
636                            if Self::matches_ecosystem_range(version, &range.events) {
637                                is_vulnerable = true;
638                                break;
639                            }
640                        }
641                        RangeType::Git => {
642                            // Git ranges require commit hash comparison, skip for now
643                        }
644                    }
645                }
646                if is_vulnerable {
647                    break;
648                }
649            }
650
651            if is_vulnerable {
652                // Apply enrichment if requested
653                if options.include_enrichment {
654                    self.enrich_advisory(&mut advisory).await?;
655                }
656
657                // Apply filters
658                if self.advisory_passes_filters(&advisory, options) {
659                    matched.push(advisory);
660                }
661            }
662        }
663
664        Ok(matched)
665    }
666
667    /// Check if a version matches any semver interval described by OSV events.
668    ///
669    /// OSV allows multiple introduced/fixed pairs; we evaluate each interval in order.
670    fn matches_semver_range(version: &str, events: &[Event]) -> bool {
671        let Ok(v) = semver::Version::parse(version) else {
672            return false;
673        };
674
675        #[derive(Default)]
676        struct Interval {
677            start: Option<semver::Version>,
678            end: Option<semver::Version>,
679            end_inclusive: bool,
680        }
681
682        let mut intervals: Vec<Interval> = Vec::new();
683        let mut current_start: Option<semver::Version> = None;
684
685        for event in events {
686            match event {
687                Event::Introduced(ver) => {
688                    if let Ok(parsed) = semver::Version::parse(ver) {
689                        current_start = Some(parsed);
690                    } else if ver == "0" {
691                        current_start = Some(semver::Version::new(0, 0, 0));
692                    }
693                }
694                Event::Fixed(ver) => {
695                    let end = semver::Version::parse(ver).ok();
696                    intervals.push(Interval {
697                        start: current_start.clone(),
698                        end,
699                        end_inclusive: false,
700                    });
701                    current_start = None;
702                }
703                Event::LastAffected(ver) => {
704                    let end = semver::Version::parse(ver).ok();
705                    intervals.push(Interval {
706                        start: current_start.clone(),
707                        end,
708                        end_inclusive: true,
709                    });
710                    current_start = None;
711                }
712                Event::Limit(ver) => {
713                    // Treat limit as an exclusive upper bound for any open interval.
714                    let end = semver::Version::parse(ver).ok();
715                    intervals.push(Interval {
716                        start: current_start.clone(),
717                        end,
718                        end_inclusive: false,
719                    });
720                    current_start = None;
721                }
722            }
723        }
724
725        // Open-ended interval from the last introduction.
726        if current_start.is_some() {
727            intervals.push(Interval {
728                start: current_start,
729                end: None,
730                end_inclusive: false,
731            });
732        }
733
734        intervals.into_iter().any(|interval| {
735            if let Some(start) = &interval.start {
736                if v < *start {
737                    return false;
738                }
739            }
740
741            match (&interval.end, interval.end_inclusive) {
742                (Some(end), true) => v <= *end,
743                (Some(end), false) => v < *end,
744                (None, _) => true,
745            }
746        })
747    }
748
749    /// Check if a version matches an ecosystem range. Falls back to semver if both parse as semver,
750    /// otherwise uses dotted numeric comparison (e.g., "1.10" > "1.2").
751    fn matches_ecosystem_range(version: &str, events: &[Event]) -> bool {
752        // Try semver first; if any boundary fails semver parsing, fall back to dotted.
753        if events.iter().all(|e| match e {
754            Event::Introduced(v) | Event::Fixed(v) | Event::LastAffected(v) | Event::Limit(v) => {
755                semver::Version::parse(v).is_ok() || v == "0"
756            }
757        }) {
758            return Self::matches_semver_range(version, events);
759        }
760
761        let version_parts = match Self::parse_dotted(version) {
762            Some(p) => p,
763            None => return false,
764        };
765
766        #[derive(Default)]
767        struct Interval {
768            start: Option<Vec<u64>>,
769            end: Option<Vec<u64>>,
770            end_inclusive: bool,
771        }
772
773        let mut intervals: Vec<Interval> = Vec::new();
774        let mut current_start: Option<Vec<u64>> = None;
775
776        for event in events {
777            match event {
778                Event::Introduced(ver) => {
779                    current_start = Self::parse_dotted(ver);
780                }
781                Event::Fixed(ver) => {
782                    intervals.push(Interval {
783                        start: current_start.clone(),
784                        end: Self::parse_dotted(ver),
785                        end_inclusive: false,
786                    });
787                    current_start = None;
788                }
789                Event::LastAffected(ver) => {
790                    intervals.push(Interval {
791                        start: current_start.clone(),
792                        end: Self::parse_dotted(ver),
793                        end_inclusive: true,
794                    });
795                    current_start = None;
796                }
797                Event::Limit(ver) => {
798                    intervals.push(Interval {
799                        start: current_start.clone(),
800                        end: Self::parse_dotted(ver),
801                        end_inclusive: false,
802                    });
803                    current_start = None;
804                }
805            }
806        }
807
808        if current_start.is_some() {
809            intervals.push(Interval {
810                start: current_start,
811                end: None,
812                end_inclusive: false,
813            });
814        }
815
816        intervals.into_iter().any(|interval| {
817            if let Some(start) = &interval.start {
818                if Self::cmp_dotted(&version_parts, start) == Ordering::Less {
819                    return false;
820                }
821            }
822
823            match (&interval.end, interval.end_inclusive) {
824                (Some(end), true) => Self::cmp_dotted(&version_parts, end) != Ordering::Greater,
825                (Some(end), false) => Self::cmp_dotted(&version_parts, end) == Ordering::Less,
826                (None, _) => true,
827            }
828        })
829    }
830
831    /// Parse dotted numeric versions (e.g., "1.2.10"). Non-numeric segments cause failure.
832    fn parse_dotted(v: &str) -> Option<Vec<u64>> {
833        let mut parts = Vec::new();
834        for chunk in v.split(|c: char| !c.is_ascii_digit()) {
835            if chunk.is_empty() {
836                continue;
837            }
838            let Ok(num) = chunk.parse::<u64>() else {
839                return None;
840            };
841            parts.push(num);
842        }
843        if parts.is_empty() { None } else { Some(parts) }
844    }
845
846    /// Compare dotted numeric versions.
847    fn cmp_dotted(a: &[u64], b: &[u64]) -> Ordering {
848        let max_len = a.len().max(b.len());
849        for i in 0..max_len {
850            let ai = *a.get(i).unwrap_or(&0);
851            let bi = *b.get(i).unwrap_or(&0);
852            match ai.cmp(&bi) {
853                Ordering::Equal => continue,
854                ord => return ord,
855            }
856        }
857        Ordering::Equal
858    }
859
860    /// Enrich a single advisory with EPSS/KEV data.
861    async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
862        // Find CVE aliases
863        let cve_ids = Self::extract_cve_ids(advisory);
864
865        if cve_ids.is_empty() {
866            return Ok(());
867        }
868
869        // Look up enrichment data
870        for cve_id in &cve_ids {
871            if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
872                let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
873                enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
874                enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
875                enrichment.is_kev = enrichment.is_kev || data.is_kev;
876                if data.kev_due_date.is_some() {
877                    enrichment.kev_due_date = data
878                        .kev_due_date
879                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
880                        .map(|d| d.with_timezone(&chrono::Utc));
881                }
882                if data.kev_ransomware.is_some() {
883                    enrichment.kev_ransomware = data.kev_ransomware;
884                }
885            }
886        }
887
888        Ok(())
889    }
890
891    /// Enrich multiple advisories with EPSS/KEV data.
892    async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
893        for advisory in advisories.iter_mut() {
894            self.enrich_advisory(advisory).await?;
895        }
896        Ok(())
897    }
898
899    /// Extract CVE IDs from an advisory (from ID or aliases).
900    fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
901        let mut cve_ids = Vec::new();
902
903        if advisory.id.starts_with("CVE-") {
904            cve_ids.push(advisory.id.clone());
905        }
906
907        if let Some(aliases) = &advisory.aliases {
908            for alias in aliases {
909                if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
910                    cve_ids.push(alias.clone());
911                }
912            }
913        }
914
915        cve_ids
916    }
917
918    /// Check if an advisory passes the filter options.
919    fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
920        // Check KEV filter
921        if options.kev_only {
922            let is_kev = advisory
923                .enrichment
924                .as_ref()
925                .map(|e| e.is_kev)
926                .unwrap_or(false);
927            if !is_kev {
928                return false;
929            }
930        }
931
932        // Check CVSS filter
933        if let Some(min_cvss) = options.min_cvss {
934            let cvss = advisory
935                .enrichment
936                .as_ref()
937                .and_then(|e| e.cvss_v3_score)
938                .unwrap_or(0.0);
939            if cvss < min_cvss {
940                return false;
941            }
942        }
943
944        // Check EPSS filter
945        if let Some(min_epss) = options.min_epss {
946            let epss = advisory
947                .enrichment
948                .as_ref()
949                .and_then(|e| e.epss_score)
950                .unwrap_or(0.0);
951            if epss < min_epss {
952                return false;
953            }
954        }
955
956        // Check severity filter
957        if let Some(min_severity) = &options.min_severity {
958            let severity = advisory
959                .enrichment
960                .as_ref()
961                .and_then(|e| e.cvss_v3_severity)
962                .unwrap_or(Severity::None);
963            if severity < *min_severity {
964                return false;
965            }
966        }
967
968        true
969    }
970
971    /// Fetch live EPSS scores for CVEs (not from cache).
972    pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
973        let scores = self.epss_source.fetch_scores(cve_ids).await?;
974        Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
975    }
976
977    /// Check if a CVE is in the CISA KEV catalog.
978    pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
979        // Check cache first
980        if let Some(data) = self.store.get_enrichment(cve_id).await? {
981            return Ok(data.is_kev);
982        }
983
984        // Fetch from source
985        let entry = self.kev_source.is_kev(cve_id).await?;
986        Ok(entry.is_some())
987    }
988
989    // === OSS Index Methods ===
990
991    /// Query OSS Index for vulnerabilities affecting the given PURLs.
992    ///
993    /// This method provides automatic caching:
994    /// - First checks the cache for each PURL
995    /// - Only queries OSS Index for cache misses
996    /// - Caches results for future queries
997    ///
998    /// # Arguments
999    ///
1000    /// * `purls` - Package URLs to query (e.g., "pkg:npm/lodash@4.17.20")
1001    ///
1002    /// # Returns
1003    ///
1004    /// Vector of advisories for all vulnerabilities found.
1005    ///
1006    /// # Errors
1007    ///
1008    /// Returns an error if OSS Index is not configured or if the query fails.
1009    ///
1010    /// # Example
1011    ///
1012    /// ```rust,ignore
1013    /// use vulnera_advisors::{VulnerabilityManager, Purl};
1014    ///
1015    /// let manager = VulnerabilityManager::builder()
1016    ///     .redis_url("redis://localhost:6379")
1017    ///     .with_ossindex(None)
1018    ///     .build()?;
1019    ///
1020    /// let purls = vec![
1021    ///     Purl::new("npm", "lodash").with_version("4.17.20").to_string(),
1022    /// ];
1023    ///
1024    /// let advisories = manager.query_ossindex(&purls).await?;
1025    /// ```
1026    pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
1027        let source = self.ossindex_source.as_ref().ok_or_else(|| {
1028            AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
1029        })?;
1030
1031        // Check cache for all PURLs
1032        let mut cached_advisories = Vec::new();
1033        let mut cache_misses = Vec::new();
1034
1035        for purl in purls {
1036            let cache_key = Purl::cache_key_from_str(purl);
1037            match self.store.get_ossindex_cache(&cache_key).await {
1038                Ok(Some(cache)) if !cache.is_expired() => {
1039                    debug!("OSS Index cache hit for {}", purl);
1040                    cached_advisories.extend(cache.advisories);
1041                }
1042                _ => {
1043                    debug!("OSS Index cache miss for {}", purl);
1044                    cache_misses.push(purl.clone());
1045                }
1046            }
1047        }
1048
1049        // Query OSS Index for cache misses
1050        if !cache_misses.is_empty() {
1051            debug!("Querying OSS Index for {} cache misses", cache_misses.len());
1052            let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
1053                AdvisoryError::SourceFetch {
1054                    source_name: "ossindex".to_string(),
1055                    message: e.to_string(),
1056                }
1057            })?;
1058
1059            // Group advisories by PURL for caching
1060            let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
1061
1062            // Cache results for each PURL
1063            for (purl, advisories) in &advisory_map {
1064                let cache_key = Purl::cache_key_from_str(purl);
1065                let cache = OssIndexCache::new(advisories.clone());
1066                if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
1067                    debug!("Failed to cache OSS Index result for {}: {}", purl, e);
1068                }
1069            }
1070
1071            // Flatten and add to results
1072            for advisories in advisory_map.into_values() {
1073                cached_advisories.extend(advisories);
1074            }
1075        }
1076
1077        Ok(cached_advisories)
1078    }
1079
1080    /// Query OSS Index for vulnerabilities with fallback to stored advisories.
1081    ///
1082    /// This method first queries OSS Index, then falls back to the local store
1083    /// if the OSS Index query fails or returns no results.
1084    ///
1085    /// # Arguments
1086    ///
1087    /// * `packages` - List of packages to query (ecosystem, name, optional version)
1088    ///
1089    /// # Returns
1090    ///
1091    /// Map of package keys to their advisories.
1092    pub async fn query_batch_with_ossindex(
1093        &self,
1094        packages: &[PackageKey],
1095    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
1096        let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
1097
1098        // Build PURLs for packages that have versions
1099        let (with_version, without_version): (Vec<_>, Vec<_>) =
1100            packages.iter().partition(|p| p.version.is_some());
1101
1102        // Query OSS Index for packages with versions
1103        if !with_version.is_empty() && self.ossindex_source.is_some() {
1104            let purls: Vec<String> = with_version
1105                .iter()
1106                .map(|p| {
1107                    Purl::new(&p.ecosystem, &p.name)
1108                        .with_version(p.version.as_ref().unwrap())
1109                        .to_string()
1110                })
1111                .collect();
1112
1113            match self.query_ossindex(&purls).await {
1114                Ok(advisories) => {
1115                    // Group advisories by package key
1116                    for pkg in &with_version {
1117                        let pkg_advisories: Vec<_> = advisories
1118                            .iter()
1119                            .filter(|a| {
1120                                a.affected.iter().any(|aff| {
1121                                    aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
1122                                        && aff.package.name == pkg.name
1123                                })
1124                            })
1125                            .cloned()
1126                            .collect();
1127                        results.insert((*pkg).clone(), pkg_advisories);
1128                    }
1129                }
1130                Err(e) => {
1131                    warn!("OSS Index query failed, falling back to local store: {}", e);
1132                    // Fallback to local store
1133                    for pkg in &with_version {
1134                        let advisories = if let Some(version) = &pkg.version {
1135                            self.matches(&pkg.ecosystem, &pkg.name, version).await?
1136                        } else {
1137                            self.query(&pkg.ecosystem, &pkg.name).await?
1138                        };
1139                        results.insert((*pkg).clone(), advisories);
1140                    }
1141                }
1142            }
1143        }
1144
1145        // Query local store for packages without versions
1146        for pkg in &without_version {
1147            let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
1148            results.insert((*pkg).clone(), advisories);
1149        }
1150
1151        Ok(results)
1152    }
1153
1154    /// Invalidate cached OSS Index results for specific PURLs.
1155    ///
1156    /// Use this to force a fresh query on the next call.
1157    pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
1158        for purl in purls {
1159            let cache_key = Purl::cache_key_from_str(purl);
1160            self.store.invalidate_ossindex_cache(&cache_key).await?;
1161        }
1162        Ok(())
1163    }
1164
1165    /// Invalidate all cached OSS Index results.
1166    pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
1167        self.store.invalidate_all_ossindex_cache().await?;
1168        Ok(())
1169    }
1170
1171    /// Group advisories by their associated PURL.
1172    fn group_advisories_by_purl(
1173        purls: &[String],
1174        advisories: &[Advisory],
1175    ) -> HashMap<String, Vec<Advisory>> {
1176        let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
1177
1178        // Initialize map with empty vectors for all PURLs
1179        for purl in purls {
1180            map.insert(purl.clone(), Vec::new());
1181        }
1182
1183        for advisory in advisories {
1184            for affected in &advisory.affected {
1185                for purl in purls {
1186                    let Ok(parsed) = Purl::parse(purl) else {
1187                        continue;
1188                    };
1189
1190                    // Match on ecosystem as well as name to avoid cross-ecosystem collisions
1191                    let affected_eco = affected.package.ecosystem.to_lowercase();
1192                    let purl_eco = parsed.purl_type.to_lowercase();
1193                    let purl_eco_alt = parsed.ecosystem().to_lowercase();
1194                    if affected_eco != purl_eco && affected_eco != purl_eco_alt {
1195                        continue;
1196                    }
1197
1198                    if parsed.name != affected.package.name {
1199                        continue;
1200                    }
1201
1202                    if let Some(ver) = parsed.version.as_deref() {
1203                        // If a version is specified, ensure the advisory actually covers it.
1204                        let version_matches = affected.versions.contains(&ver.to_string())
1205                            || affected.ranges.iter().any(|r| {
1206                                matches!(r.range_type, RangeType::Semver | RangeType::Ecosystem)
1207                                    && Self::matches_semver_range(ver, &r.events)
1208                            });
1209
1210                        if !version_matches {
1211                            continue;
1212                        }
1213                    }
1214
1215                    map.entry(purl.clone()).or_default().push(advisory.clone());
1216                    break;
1217                }
1218            }
1219        }
1220
1221        map
1222    }
1223}