Skip to main content

vulnera_advisor/
manager.rs

1//! Vulnerability manager for orchestrating syncs and queries.
2//!
3//! The [`VulnerabilityManager`] is the main entry point for using this crate.
4//! Use [`VulnerabilityManagerBuilder`] for flexible configuration.
5
6use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::ecosystem::normalize_package_key;
8use crate::error::{AdvisoryError, Result};
9use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
10use crate::purl::Purl;
11use crate::sources::epss::EpssSource;
12use crate::sources::kev::KevSource;
13use crate::sources::ossindex::OssIndexSource;
14use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
15use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
16use std::cmp::Ordering;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::{debug, error, info, warn};
20
21/// Options for filtering vulnerability matches.
22#[derive(Debug, Clone, Default)]
23pub struct MatchOptions {
24    /// Minimum CVSS v3 score (0.0 - 10.0).
25    pub min_cvss: Option<f64>,
26    /// Minimum EPSS score (0.0 - 1.0).
27    pub min_epss: Option<f64>,
28    /// Only return KEV (actively exploited) vulnerabilities.
29    pub kev_only: bool,
30    /// Minimum severity level.
31    pub min_severity: Option<Severity>,
32    /// Include enrichment data (EPSS, KEV) in results.
33    pub include_enrichment: bool,
34    /// Filter by CWE IDs (e.g., ["CWE-79", "CWE-89"]).
35    /// Only advisories with at least one matching CWE will be returned.
36    pub cwe_ids: Option<Vec<String>>,
37}
38
39/// Statistics for a sync operation.
40#[derive(Debug, Clone, Default)]
41pub struct SyncStats {
42    /// Total number of sources attempted.
43    pub total_sources: usize,
44    /// Number of sources that synced successfully.
45    pub successful_sources: usize,
46    /// Number of sources that failed.
47    pub failed_sources: usize,
48    /// Total advisories synced across all sources.
49    pub total_advisories_synced: usize,
50    /// Map of source name to error message for failed sources.
51    pub errors: HashMap<String, String>,
52}
53
54/// Observer for monitoring sync progress and events.
55pub trait SyncObserver: Send + Sync {
56    /// Called when the sync operation starts.
57    fn on_sync_start(&self);
58
59    /// Called when a specific source starts syncing.
60    fn on_source_start(&self, source_name: &str);
61
62    /// Called when a source successfully syncs.
63    fn on_source_success(&self, source_name: &str, count: usize);
64
65    /// Called when a source fails to sync.
66    fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError);
67
68    /// Called when the sync operation completes.
69    fn on_sync_complete(&self, stats: &SyncStats);
70}
71
72/// Default observer that logs events using the `tracing` crate.
73pub struct TracingSyncObserver;
74
75impl SyncObserver for TracingSyncObserver {
76    fn on_sync_start(&self) {
77        info!("Starting full vulnerability sync...");
78    }
79
80    fn on_source_start(&self, source_name: &str) {
81        debug!("Syncing {}...", source_name);
82    }
83
84    fn on_source_success(&self, source_name: &str, count: usize) {
85        if count > 0 {
86            info!(
87                "Successfully synced {} advisories from {}",
88                count, source_name
89            );
90        } else {
91            debug!(
92                "Successfully synced {} advisories from {}",
93                count, source_name
94            );
95        }
96    }
97
98    fn on_source_error(&self, source_name: &str, error: &crate::error::AdvisoryError) {
99        error!("Failed to sync {}: {}", source_name, error);
100    }
101
102    fn on_sync_complete(&self, _stats: &SyncStats) {
103        info!("Sync completed.");
104    }
105}
106
107impl MatchOptions {
108    /// Create options that include all vulnerabilities with enrichment.
109    pub fn with_enrichment() -> Self {
110        Self {
111            include_enrichment: true,
112            ..Default::default()
113        }
114    }
115
116    /// Create options for high-severity vulnerabilities only.
117    pub fn high_severity() -> Self {
118        Self {
119            min_severity: Some(Severity::High),
120            include_enrichment: true,
121            ..Default::default()
122        }
123    }
124
125    /// Create options for actively exploited vulnerabilities only.
126    pub fn exploited_only() -> Self {
127        Self {
128            kev_only: true,
129            include_enrichment: true,
130            ..Default::default()
131        }
132    }
133
134    /// Create options to filter by specific CWE IDs.
135    ///
136    /// Only advisories containing at least one of the specified CWEs will match.
137    ///
138    /// # Example
139    ///
140    /// ```rust,ignore
141    /// use vulnera_advisors::MatchOptions;
142    ///
143    /// // Filter for XSS (CWE-79) or SQL Injection (CWE-89)
144    /// let options = MatchOptions::with_cwes(vec!["CWE-79".to_string(), "CWE-89".to_string()]);
145    /// ```
146    pub fn with_cwes(cwe_ids: Vec<String>) -> Self {
147        Self {
148            cwe_ids: Some(cwe_ids),
149            include_enrichment: true,
150            ..Default::default()
151        }
152    }
153}
154
155/// A key identifying a package for batch queries.
156#[derive(Debug, Clone, Hash, PartialEq, Eq)]
157pub struct PackageKey {
158    /// Package ecosystem (e.g., "npm", "PyPI").
159    pub ecosystem: String,
160    /// Package name.
161    pub name: String,
162    /// Optional version for matching.
163    pub version: Option<String>,
164}
165
166/// Stage where a batch query failure occurred.
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum BatchFailureStage {
169    /// Local advisory store lookup/filtering failed.
170    StoreLookup,
171    /// OSS Index enrichment query failed.
172    OssIndex,
173}
174
175/// Structured per-package failure details for batch operations.
176#[derive(Debug, Clone)]
177pub struct BatchFailure {
178    /// Package key associated with the failure.
179    pub package: PackageKey,
180    /// Stage that produced the error.
181    pub stage: BatchFailureStage,
182    /// Whether the failure is retryable.
183    pub retryable: bool,
184    /// Error message for diagnostics.
185    pub error: String,
186}
187
188/// Aggregate batch query summary counters.
189#[derive(Debug, Clone, Default)]
190pub struct BatchSummary {
191    /// Number of requested packages.
192    pub total: usize,
193    /// Number of packages with successful results.
194    pub succeeded: usize,
195    /// Number of packages that failed.
196    pub failed: usize,
197    /// Aggregated range translation status counters from returned advisories.
198    pub range_translation_statuses: HashMap<String, usize>,
199}
200
201/// Structured output for batch operations.
202#[derive(Debug, Clone)]
203pub struct BatchOutcome<T> {
204    /// Successful results keyed by package.
205    pub successes: HashMap<PackageKey, T>,
206    /// Per-package failures with stage metadata.
207    pub failures: Vec<BatchFailure>,
208    /// Aggregate counters.
209    pub summary: BatchSummary,
210}
211
212impl<T> BatchOutcome<T> {
213    fn from_parts(
214        successes: HashMap<PackageKey, T>,
215        failures: Vec<BatchFailure>,
216        total: usize,
217    ) -> Self {
218        use std::collections::HashSet;
219
220        let failed_packages: HashSet<_> = failures
221            .iter()
222            .map(|failure| failure.package.clone())
223            .collect();
224        Self {
225            summary: BatchSummary {
226                total,
227                succeeded: successes.len(),
228                failed: failed_packages.len(),
229                range_translation_statuses: HashMap::new(),
230            },
231            successes,
232            failures,
233        }
234    }
235}
236
237impl PackageKey {
238    /// Create a new package key.
239    pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
240        let (ecosystem, name) = normalize_package_key(&ecosystem.into(), &name.into());
241        Self {
242            ecosystem,
243            name,
244            version: None,
245        }
246    }
247
248    /// Create a package key with a version.
249    pub fn with_version(
250        ecosystem: impl Into<String>,
251        name: impl Into<String>,
252        version: impl Into<String>,
253    ) -> Self {
254        let (ecosystem, name) = normalize_package_key(&ecosystem.into(), &name.into());
255        Self {
256            ecosystem,
257            name,
258            version: Some(version.into()),
259        }
260    }
261}
262
263/// Builder for configuring VulnerabilityManager.
264pub struct VulnerabilityManagerBuilder {
265    redis_url: Option<String>,
266    store_config: StoreConfig,
267    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
268    custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
269    ossindex_source: Option<OssIndexSource>,
270    observer: Option<Arc<dyn SyncObserver>>,
271}
272
273impl Default for VulnerabilityManagerBuilder {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279impl VulnerabilityManagerBuilder {
280    /// Create a new builder with default settings.
281    pub fn new() -> Self {
282        Self {
283            redis_url: None,
284            store_config: StoreConfig::default(),
285            sources: Vec::new(),
286            custom_store: None,
287            ossindex_source: None,
288            observer: None,
289        }
290    }
291
292    /// Set the Redis connection URL.
293    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
294        self.redis_url = Some(url.into());
295        self
296    }
297
298    /// Set the store configuration.
299    pub fn store_config(mut self, config: StoreConfig) -> Self {
300        self.store_config = config;
301        self
302    }
303
304    /// Use a custom store implementation.
305    pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
306        self.custom_store = Some(store);
307        self
308    }
309
310    /// Add a vulnerability source.
311    pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
312        self.sources.push(source);
313        self
314    }
315
316    /// Add the GHSA source with the given token.
317    pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
318        self.sources.push(Arc::new(GHSASource::new(token.into())));
319        self
320    }
321
322    /// Add the NVD source with optional API key.
323    pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
324        self.sources.push(Arc::new(NVDSource::new(api_key)));
325        self
326    }
327
328    /// Add the OSV source for specified ecosystems.
329    pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
330        self.sources.push(Arc::new(OSVSource::new(ecosystems)));
331        self
332    }
333
334    /// Add default OSV ecosystems.
335    pub fn with_osv_defaults(self) -> Self {
336        self.with_osv(vec![
337            "npm".to_string(),
338            "PyPI".to_string(),
339            "Maven".to_string(),
340            "crates.io".to_string(),
341            "Go".to_string(),
342            "Packagist".to_string(),
343            "RubyGems".to_string(),
344            "NuGet".to_string(),
345        ])
346    }
347
348    /// Add the OSS Index source with optional configuration.
349    ///
350    /// OSS Index provides on-demand vulnerability queries by PURL.
351    /// If no config is provided, credentials are loaded from environment variables.
352    pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
353        match OssIndexSource::new(config) {
354            Ok(source) => {
355                self.ossindex_source = Some(source);
356            }
357            Err(e) => {
358                warn!("Failed to configure OSS Index source: {}", e);
359            }
360        }
361        self
362    }
363
364    /// Set a custom sync observer.
365    pub fn with_observer(mut self, observer: Arc<dyn SyncObserver>) -> Self {
366        self.observer = Some(observer);
367        self
368    }
369
370    /// Build the VulnerabilityManager.
371    pub fn build(self) -> Result<VulnerabilityManager> {
372        let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
373            Some(s) => s,
374            None => {
375                let url = self.redis_url.ok_or_else(|| {
376                    AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
377                })?;
378                Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
379            }
380        };
381
382        if self.sources.is_empty() {
383            warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
384        }
385
386        Ok(VulnerabilityManager {
387            store,
388            sources: self.sources,
389            kev_source: KevSource::new(),
390            epss_source: EpssSource::new(),
391            ossindex_source: self.ossindex_source,
392            observer: self
393                .observer
394                .unwrap_or_else(|| Arc::new(TracingSyncObserver)),
395        })
396    }
397}
398
399/// Main vulnerability manager for syncing and querying advisories.
400pub struct VulnerabilityManager {
401    store: Arc<dyn AdvisoryStore + Send + Sync>,
402    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
403    kev_source: KevSource,
404    epss_source: EpssSource,
405    ossindex_source: Option<OssIndexSource>,
406    observer: Arc<dyn SyncObserver>,
407}
408
409impl VulnerabilityManager {
410    fn collect_range_translation_statuses(
411        advisories_by_package: &HashMap<PackageKey, Vec<Advisory>>,
412    ) -> HashMap<String, usize> {
413        let mut counters = HashMap::new();
414
415        for advisories in advisories_by_package.values() {
416            for advisory in advisories {
417                for affected in &advisory.affected {
418                    let Some(database_specific) = &affected.database_specific else {
419                        continue;
420                    };
421                    let Some(status) = database_specific
422                        .get("range_translation")
423                        .and_then(|translation| translation.get("status"))
424                        .and_then(|status| status.as_str())
425                    else {
426                        continue;
427                    };
428
429                    *counters.entry(status.to_string()).or_insert(0) += 1;
430                }
431            }
432        }
433
434        counters
435    }
436
437    /// Create a new manager from a Config.
438    ///
439    /// This is a convenience method. For more control, use [`VulnerabilityManagerBuilder`].
440    pub async fn new(config: Config) -> Result<Self> {
441        let mut builder = VulnerabilityManagerBuilder::new()
442            .redis_url(&config.redis_url)
443            .store_config(config.store.clone());
444
445        // Add OSV source
446        builder = builder.with_osv_defaults();
447
448        // Add NVD source
449        builder = builder.with_nvd(config.nvd_api_key.clone());
450
451        // Add GHSA source if token is provided
452        if let Some(token) = &config.ghsa_token {
453            builder = builder.with_ghsa(token.clone());
454        }
455
456        // Add OSS Index source if configured
457        if config.ossindex.is_some() {
458            builder = builder.with_ossindex(config.ossindex.clone());
459        }
460
461        builder.build()
462    }
463
464    /// Create a builder for custom configuration.
465    pub fn builder() -> VulnerabilityManagerBuilder {
466        VulnerabilityManagerBuilder::new()
467    }
468
469    /// Get a reference to the underlying store.
470    pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
471        &self.store
472    }
473
474    /// Check the health of the store connection.
475    pub async fn health_check(&self) -> Result<HealthStatus> {
476        self.store.health_check().await
477    }
478
479    /// Sync advisories from all configured sources.
480    pub async fn sync_all(&self) -> Result<SyncStats> {
481        self.observer.on_sync_start();
482
483        let mut handles = Vec::new();
484        let mut stats = SyncStats {
485            total_sources: self.sources.len(),
486            ..Default::default()
487        };
488
489        for source in &self.sources {
490            let source = source.clone();
491            let store = self.store.clone();
492            let observer = self.observer.clone();
493
494            let handle = tokio::spawn(async move {
495                observer.on_source_start(source.name());
496
497                let last_sync = match store.last_sync(source.name()).await {
498                    Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
499                        Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
500                        Err(_) => None,
501                    },
502                    _ => None,
503                };
504
505                match source.fetch(last_sync).await {
506                    Ok(advisories) => {
507                        if !advisories.is_empty() {
508                            match store.upsert_batch(&advisories, source.name()).await {
509                                Ok(_) => {
510                                    observer.on_source_success(source.name(), advisories.len());
511                                    // Update timestamp only after successful storage
512                                    if let Err(e) = store.update_sync_timestamp(source.name()).await
513                                    {
514                                        let err = AdvisoryError::source_fetch(
515                                            source.name(),
516                                            format!("Failed to update timestamp: {}", e),
517                                        );
518                                        observer.on_source_error(source.name(), &err);
519                                        // Non-critical error, count as success but maybe log warn?
520                                        // Observer handles logging.
521                                    }
522                                    Ok((source.name().to_string(), advisories.len()))
523                                }
524                                Err(e) => {
525                                    // Store error is critical for this source
526                                    observer.on_source_error(source.name(), &e);
527                                    Err((source.name().to_string(), e.to_string()))
528                                }
529                            }
530                        } else {
531                            observer.on_source_success(source.name(), 0);
532                            // Update sync timestamp even if no new advisories
533                            if let Err(e) = store.update_sync_timestamp(source.name()).await {
534                                let err = AdvisoryError::source_fetch(
535                                    source.name(),
536                                    format!("Failed to update timestamp: {}", e),
537                                );
538                                observer.on_source_error(source.name(), &err);
539                            }
540                            Ok((source.name().to_string(), 0))
541                        }
542                    }
543                    Err(e) => {
544                        observer.on_source_error(source.name(), &e);
545                        Err((source.name().to_string(), e.to_string()))
546                    }
547                }
548            });
549            handles.push(handle);
550        }
551
552        // Wait for all tasks to complete
553        for handle in handles {
554            match handle.await {
555                Ok(result) => match result {
556                    Ok((_, count)) => {
557                        stats.successful_sources += 1;
558                        stats.total_advisories_synced += count;
559                    }
560                    Err((name, error)) => {
561                        stats.failed_sources += 1;
562                        stats.errors.insert(name, error);
563                    }
564                },
565                Err(e) => {
566                    // Task panic or join error
567                    error!("Task join error: {}", e);
568                    stats.failed_sources += 1;
569                    stats
570                        .errors
571                        .insert("unknown".to_string(), format!("Task join error: {}", e));
572                }
573            }
574        }
575
576        self.observer.on_sync_complete(&stats);
577        Ok(stats)
578    }
579
580    /// Reset the sync timestamp for a specific source.
581    ///
582    /// This forces a full re-sync on the next `sync_all()` call.
583    pub async fn reset_sync(&self, source: &str) -> Result<()> {
584        self.store.reset_sync_timestamp(source).await
585    }
586
587    /// Reset all sync timestamps, forcing a full re-sync of all sources.
588    pub async fn reset_all_syncs(&self) -> Result<()> {
589        for source in &self.sources {
590            self.store.reset_sync_timestamp(source.name()).await?;
591        }
592        Ok(())
593    }
594
595    /// Sync enrichment data (KEV and EPSS).
596    pub async fn sync_enrichment(&self) -> Result<()> {
597        self.sync_enrichment_with_cves(&[]).await
598    }
599
600    /// Sync enrichment data with optional extra CVE IDs to broaden EPSS coverage.
601    pub async fn sync_enrichment_with_cves(&self, extra_cves: &[String]) -> Result<()> {
602        debug!("Syncing enrichment data (KEV, EPSS)...");
603
604        let mut enrichment: HashMap<String, EnrichmentData> = HashMap::new();
605
606        // Sync KEV data
607        match self.kev_source.fetch_catalog().await {
608            Ok(kev_entries) => {
609                debug!("Processing {} KEV entries", kev_entries.len());
610                for (cve_id, entry) in kev_entries {
611                    let data = enrichment
612                        .entry(cve_id.clone())
613                        .or_insert_with(|| EnrichmentData {
614                            epss_score: None,
615                            epss_percentile: None,
616                            is_kev: false,
617                            kev_due_date: None,
618                            kev_date_added: None,
619                            kev_ransomware: None,
620                            updated_at: String::new(),
621                        });
622
623                    data.is_kev = true;
624                    data.kev_due_date = entry.due_date_utc().map(|d| d.to_rfc3339());
625                    data.kev_date_added = entry.date_added_utc().map(|d| d.to_rfc3339());
626                    data.kev_ransomware = Some(entry.is_ransomware_related());
627                }
628            }
629            Err(e) => {
630                error!("Failed to fetch KEV catalog: {}", e);
631            }
632        }
633
634        // Sync EPSS for known CVEs plus any extra provided by caller
635        let epss_targets = Self::collect_enrichment_targets(&enrichment, extra_cves);
636        if !epss_targets.is_empty() {
637            match self
638                .epss_source
639                .fetch_scores_batch(&epss_targets, 200)
640                .await
641            {
642                Ok(scores) => {
643                    Self::merge_epss_scores(&mut enrichment, scores);
644                }
645                Err(e) => {
646                    warn!("Failed to fetch EPSS scores: {}", e);
647                }
648            }
649        }
650
651        // Persist merged enrichment data
652        if !enrichment.is_empty() {
653            let now = chrono::Utc::now().to_rfc3339();
654            for (cve_id, mut data) in enrichment {
655                if data.updated_at.is_empty() {
656                    data.updated_at = now.clone();
657                }
658                if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
659                    debug!("Failed to store enrichment for {}: {}", cve_id, e);
660                }
661            }
662        }
663
664        Ok(())
665    }
666
667    /// Build the list of CVE IDs to request EPSS for.
668    fn collect_enrichment_targets(
669        current: &HashMap<String, EnrichmentData>,
670        extra: &[String],
671    ) -> Vec<String> {
672        let mut set: std::collections::HashSet<String> = current.keys().cloned().collect();
673        for c in extra {
674            set.insert(c.clone());
675        }
676        set.into_iter().collect()
677    }
678
679    /// Merge EPSS scores into enrichment map.
680    fn merge_epss_scores(
681        enrichment: &mut HashMap<String, EnrichmentData>,
682        scores: HashMap<String, crate::sources::epss::EpssScore>,
683    ) {
684        for (cve_id, score) in scores {
685            let data = enrichment
686                .entry(cve_id.clone())
687                .or_insert_with(|| EnrichmentData {
688                    epss_score: None,
689                    epss_percentile: None,
690                    is_kev: false,
691                    kev_due_date: None,
692                    kev_date_added: None,
693                    kev_ransomware: None,
694                    updated_at: String::new(),
695                });
696
697            data.epss_score = Some(score.epss);
698            data.epss_percentile = Some(score.percentile);
699            if let Some(date) = score.date_utc() {
700                data.updated_at = date.to_rfc3339();
701            }
702        }
703    }
704
705    /// Query advisories for a specific package.
706    pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
707        let (ecosystem, package) = normalize_package_key(ecosystem, package);
708        let advisories = self.store.get_by_package(&ecosystem, &package).await?;
709        Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
710    }
711
712    /// Query advisories with enrichment data.
713    pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
714        let mut advisories = self.query(ecosystem, package).await?;
715        self.enrich_advisories(&mut advisories).await?;
716        Ok(advisories)
717    }
718
719    /// Query multiple packages in a batch (concurrent).
720    ///
721    /// All queries run in parallel for maximum throughput.
722    pub async fn query_batch(
723        &self,
724        packages: &[PackageKey],
725    ) -> Result<BatchOutcome<Vec<Advisory>>> {
726        use futures_util::future::join_all;
727
728        let tasks: Vec<_> = packages
729            .iter()
730            .map(|pkg| {
731                let pkg = pkg.clone();
732                let ecosystem = pkg.ecosystem.clone();
733                let name = pkg.name.clone();
734                let version = pkg.version.clone();
735                let store = self.store.clone();
736
737                async move {
738                    let advisories = if let Some(ver) = &version {
739                        // For version matching, we need the full logic
740                        match store.get_by_package(&ecosystem, &name).await {
741                            Ok(all) => {
742                                let aggregated =
743                                    crate::aggregator::ReportAggregator::aggregate(all);
744                                Ok(Self::filter_by_version(aggregated, &ecosystem, &name, ver))
745                            }
746                            Err(e) => Err(e),
747                        }
748                    } else {
749                        match store.get_by_package(&ecosystem, &name).await {
750                            Ok(all) => Ok(crate::aggregator::ReportAggregator::aggregate(all)),
751                            Err(e) => Err(e),
752                        }
753                    };
754                    (pkg, advisories)
755                }
756            })
757            .collect();
758
759        let results: Vec<_> = join_all(tasks).await;
760
761        let mut successes = HashMap::new();
762        let mut failures = Vec::new();
763        for (pkg, result) in results {
764            match result {
765                Ok(advisories) => {
766                    successes.insert(pkg, advisories);
767                }
768                Err(e) => {
769                    failures.push(BatchFailure {
770                        package: pkg,
771                        stage: BatchFailureStage::StoreLookup,
772                        retryable: e.is_retryable(),
773                        error: e.to_string(),
774                    });
775                }
776            }
777        }
778
779        let mut outcome = BatchOutcome::from_parts(successes, failures, packages.len());
780        outcome.summary.range_translation_statuses =
781            Self::collect_range_translation_statuses(&outcome.successes);
782        Ok(outcome)
783    }
784
785    /// Filter advisories by version (static helper for concurrent batch queries)
786    fn filter_by_version(
787        advisories: Vec<Advisory>,
788        ecosystem: &str,
789        package: &str,
790        version: &str,
791    ) -> Vec<Advisory> {
792        let (ecosystem, package) = normalize_package_key(ecosystem, package);
793        advisories
794            .into_iter()
795            .filter(|advisory| {
796                for affected in &advisory.affected {
797                    let (affected_ecosystem, affected_package) =
798                        normalize_package_key(&affected.package.ecosystem, &affected.package.name);
799                    if affected_package != package || affected_ecosystem != ecosystem {
800                        continue;
801                    }
802
803                    // Check explicit versions
804                    if affected.versions.contains(&version.to_string()) {
805                        return true;
806                    }
807
808                    // Check ranges
809                    for range in &affected.ranges {
810                        match range.range_type {
811                            RangeType::Semver => {
812                                if Self::matches_semver_range(version, &range.events) {
813                                    return true;
814                                }
815                            }
816                            RangeType::Ecosystem => {
817                                if Self::matches_ecosystem_range(version, &range.events) {
818                                    return true;
819                                }
820                            }
821                            RangeType::Git => {
822                                if Self::matches_git_range(version, &range.events) {
823                                    return true;
824                                }
825                            }
826                        }
827                    }
828                }
829                false
830            })
831            .collect()
832    }
833
834    /// Check if a specific package version is affected by any vulnerabilities.
835    pub async fn matches(
836        &self,
837        ecosystem: &str,
838        package: &str,
839        version: &str,
840    ) -> Result<Vec<Advisory>> {
841        self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
842            .await
843    }
844
845    /// Check if a package version is affected, with filtering options.
846    pub async fn matches_with_options(
847        &self,
848        ecosystem: &str,
849        package: &str,
850        version: &str,
851        options: &MatchOptions,
852    ) -> Result<Vec<Advisory>> {
853        let advisories = self.query(ecosystem, package).await?;
854        let mut matched = Vec::new();
855
856        for mut advisory in advisories {
857            let mut is_vulnerable = false;
858            for affected in &advisory.affected {
859                if affected.package.name != package || affected.package.ecosystem != ecosystem {
860                    continue;
861                }
862
863                // Check explicit versions
864                if affected.versions.contains(&version.to_string()) {
865                    is_vulnerable = true;
866                    break;
867                }
868
869                // Check ranges
870                for range in &affected.ranges {
871                    match range.range_type {
872                        RangeType::Semver => {
873                            if Self::matches_semver_range(version, &range.events) {
874                                is_vulnerable = true;
875                                break;
876                            }
877                        }
878                        RangeType::Ecosystem => {
879                            if Self::matches_ecosystem_range(version, &range.events) {
880                                is_vulnerable = true;
881                                break;
882                            }
883                        }
884                        RangeType::Git => {
885                            if Self::matches_git_range(version, &range.events) {
886                                is_vulnerable = true;
887                                break;
888                            }
889                        }
890                    }
891                }
892                if is_vulnerable {
893                    break;
894                }
895            }
896
897            if is_vulnerable {
898                // Apply enrichment if requested
899                if options.include_enrichment {
900                    self.enrich_advisory(&mut advisory).await?;
901                }
902
903                // Apply filters
904                if self.advisory_passes_filters(&advisory, options) {
905                    matched.push(advisory);
906                }
907            }
908        }
909
910        Ok(matched)
911    }
912
913    /// Check if a version matches any semver interval described by OSV events.
914    ///
915    /// OSV allows multiple introduced/fixed pairs; we evaluate each interval in order.
916    fn matches_semver_range(version: &str, events: &[Event]) -> bool {
917        let Ok(v) = semver::Version::parse(version) else {
918            return false;
919        };
920
921        #[derive(Default)]
922        struct Interval {
923            start: Option<semver::Version>,
924            end: Option<semver::Version>,
925            end_inclusive: bool,
926        }
927
928        let mut intervals: Vec<Interval> = Vec::new();
929        let mut current_start: Option<semver::Version> = None;
930
931        for event in events {
932            match event {
933                Event::Introduced(ver) => {
934                    if let Ok(parsed) = semver::Version::parse(ver) {
935                        current_start = Some(parsed);
936                    } else if ver == "0" {
937                        current_start = Some(semver::Version::new(0, 0, 0));
938                    }
939                }
940                Event::Fixed(ver) => {
941                    let end = semver::Version::parse(ver).ok();
942                    intervals.push(Interval {
943                        start: current_start.clone(),
944                        end,
945                        end_inclusive: false,
946                    });
947                    current_start = None;
948                }
949                Event::LastAffected(ver) => {
950                    let end = semver::Version::parse(ver).ok();
951                    intervals.push(Interval {
952                        start: current_start.clone(),
953                        end,
954                        end_inclusive: true,
955                    });
956                    current_start = None;
957                }
958                Event::Limit(ver) => {
959                    // Treat limit as an exclusive upper bound for any open interval.
960                    let end = semver::Version::parse(ver).ok();
961                    intervals.push(Interval {
962                        start: current_start.clone(),
963                        end,
964                        end_inclusive: false,
965                    });
966                    current_start = None;
967                }
968            }
969        }
970
971        // Open-ended interval from the last introduction.
972        if current_start.is_some() {
973            intervals.push(Interval {
974                start: current_start,
975                end: None,
976                end_inclusive: false,
977            });
978        }
979
980        intervals.into_iter().any(|interval| {
981            if let Some(start) = &interval.start {
982                if v < *start {
983                    return false;
984                }
985            }
986
987            match (&interval.end, interval.end_inclusive) {
988                (Some(end), true) => v <= *end,
989                (Some(end), false) => v < *end,
990                (None, _) => true,
991            }
992        })
993    }
994
995    /// Check if a version matches an ecosystem range. Falls back to semver if both parse as semver,
996    /// otherwise uses dotted numeric comparison (e.g., "1.10" > "1.2").
997    fn matches_ecosystem_range(version: &str, events: &[Event]) -> bool {
998        // Try semver first; if any boundary fails semver parsing, fall back to dotted.
999        if events.iter().all(|e| match e {
1000            Event::Introduced(v) | Event::Fixed(v) | Event::LastAffected(v) | Event::Limit(v) => {
1001                semver::Version::parse(v).is_ok() || v == "0"
1002            }
1003        }) {
1004            return Self::matches_semver_range(version, events);
1005        }
1006
1007        let version_parts = match Self::parse_dotted(version) {
1008            Some(p) => p,
1009            None => return false,
1010        };
1011
1012        #[derive(Default)]
1013        struct Interval {
1014            start: Option<Vec<u64>>,
1015            end: Option<Vec<u64>>,
1016            end_inclusive: bool,
1017        }
1018
1019        let mut intervals: Vec<Interval> = Vec::new();
1020        let mut current_start: Option<Vec<u64>> = None;
1021
1022        for event in events {
1023            match event {
1024                Event::Introduced(ver) => {
1025                    current_start = Self::parse_dotted(ver);
1026                }
1027                Event::Fixed(ver) => {
1028                    intervals.push(Interval {
1029                        start: current_start.clone(),
1030                        end: Self::parse_dotted(ver),
1031                        end_inclusive: false,
1032                    });
1033                    current_start = None;
1034                }
1035                Event::LastAffected(ver) => {
1036                    intervals.push(Interval {
1037                        start: current_start.clone(),
1038                        end: Self::parse_dotted(ver),
1039                        end_inclusive: true,
1040                    });
1041                    current_start = None;
1042                }
1043                Event::Limit(ver) => {
1044                    intervals.push(Interval {
1045                        start: current_start.clone(),
1046                        end: Self::parse_dotted(ver),
1047                        end_inclusive: false,
1048                    });
1049                    current_start = None;
1050                }
1051            }
1052        }
1053
1054        if current_start.is_some() {
1055            intervals.push(Interval {
1056                start: current_start,
1057                end: None,
1058                end_inclusive: false,
1059            });
1060        }
1061
1062        intervals.into_iter().any(|interval| {
1063            if let Some(start) = &interval.start {
1064                if Self::cmp_dotted(&version_parts, start) == Ordering::Less {
1065                    return false;
1066                }
1067            }
1068
1069            match (&interval.end, interval.end_inclusive) {
1070                (Some(end), true) => Self::cmp_dotted(&version_parts, end) != Ordering::Greater,
1071                (Some(end), false) => Self::cmp_dotted(&version_parts, end) == Ordering::Less,
1072                (None, _) => true,
1073            }
1074        })
1075    }
1076
1077    fn normalize_git_revision(value: &str) -> String {
1078        value
1079            .trim()
1080            .trim_start_matches("refs/")
1081            .trim_start_matches("heads/")
1082            .trim_start_matches("tags/")
1083            .to_ascii_lowercase()
1084    }
1085
1086    /// Best-effort Git range matching for deterministic cases.
1087    ///
1088    /// Full ancestry checks are not possible without repository graph context, so this
1089    /// matcher handles exact event-boundary commits and unbounded introduced ranges.
1090    fn matches_git_range(version: &str, events: &[Event]) -> bool {
1091        let target = Self::normalize_git_revision(version);
1092        if target.is_empty() {
1093            return false;
1094        }
1095
1096        let mut has_unbounded_start = false;
1097        let mut has_closing_boundary = false;
1098
1099        for event in events {
1100            match event {
1101                Event::Introduced(commit) => {
1102                    let normalized = Self::normalize_git_revision(commit);
1103                    if normalized == "0" || normalized == "*" {
1104                        has_unbounded_start = true;
1105                        continue;
1106                    }
1107                    if normalized == target {
1108                        return true;
1109                    }
1110                }
1111                Event::LastAffected(commit) => {
1112                    if Self::normalize_git_revision(commit) == target {
1113                        return true;
1114                    }
1115                }
1116                Event::Fixed(commit) | Event::Limit(commit) => {
1117                    has_closing_boundary = true;
1118                    if Self::normalize_git_revision(commit) == target {
1119                        return false;
1120                    }
1121                }
1122            }
1123        }
1124
1125        has_unbounded_start && !has_closing_boundary
1126    }
1127
1128    /// Parse dotted numeric versions (e.g., "1.2.10"). Non-numeric segments cause failure.
1129    fn parse_dotted(v: &str) -> Option<Vec<u64>> {
1130        let mut parts = Vec::new();
1131        for chunk in v.split(|c: char| !c.is_ascii_digit()) {
1132            if chunk.is_empty() {
1133                continue;
1134            }
1135            let Ok(num) = chunk.parse::<u64>() else {
1136                return None;
1137            };
1138            parts.push(num);
1139        }
1140        if parts.is_empty() { None } else { Some(parts) }
1141    }
1142
1143    /// Compare dotted numeric versions.
1144    fn cmp_dotted(a: &[u64], b: &[u64]) -> Ordering {
1145        let max_len = a.len().max(b.len());
1146        for i in 0..max_len {
1147            let ai = *a.get(i).unwrap_or(&0);
1148            let bi = *b.get(i).unwrap_or(&0);
1149            match ai.cmp(&bi) {
1150                Ordering::Equal => continue,
1151                ord => return ord,
1152            }
1153        }
1154        Ordering::Equal
1155    }
1156
1157    /// Enrich a single advisory with EPSS/KEV data.
1158    async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
1159        // Find CVE aliases
1160        let cve_ids = Self::extract_cve_ids(advisory);
1161
1162        if cve_ids.is_empty() {
1163            return Ok(());
1164        }
1165
1166        // Look up enrichment data
1167        for cve_id in &cve_ids {
1168            if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
1169                let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
1170                enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
1171                enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
1172                enrichment.is_kev = enrichment.is_kev || data.is_kev;
1173                if data.kev_due_date.is_some() {
1174                    enrichment.kev_due_date = data
1175                        .kev_due_date
1176                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
1177                        .map(|d| d.with_timezone(&chrono::Utc));
1178                }
1179                if data.kev_ransomware.is_some() {
1180                    enrichment.kev_ransomware = data.kev_ransomware;
1181                }
1182            }
1183        }
1184
1185        Ok(())
1186    }
1187
1188    /// Enrich multiple advisories with EPSS/KEV data.
1189    async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
1190        for advisory in advisories.iter_mut() {
1191            self.enrich_advisory(advisory).await?;
1192        }
1193        Ok(())
1194    }
1195
1196    /// Extract CVE IDs from an advisory (from ID or aliases).
1197    fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
1198        let mut cve_ids = Vec::new();
1199
1200        if advisory.id.starts_with("CVE-") {
1201            cve_ids.push(advisory.id.clone());
1202        }
1203
1204        if let Some(aliases) = &advisory.aliases {
1205            for alias in aliases {
1206                if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
1207                    cve_ids.push(alias.clone());
1208                }
1209            }
1210        }
1211
1212        cve_ids
1213    }
1214
1215    /// Check if an advisory passes the filter options.
1216    fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
1217        // Check KEV filter
1218        if options.kev_only {
1219            let is_kev = advisory
1220                .enrichment
1221                .as_ref()
1222                .map(|e| e.is_kev)
1223                .unwrap_or(false);
1224            if !is_kev {
1225                return false;
1226            }
1227        }
1228
1229        // Check CVSS filter
1230        if let Some(min_cvss) = options.min_cvss {
1231            let cvss = advisory
1232                .enrichment
1233                .as_ref()
1234                .and_then(|e| e.cvss_v3_score)
1235                .unwrap_or(0.0);
1236            if cvss < min_cvss {
1237                return false;
1238            }
1239        }
1240
1241        // Check EPSS filter
1242        if let Some(min_epss) = options.min_epss {
1243            let epss = advisory
1244                .enrichment
1245                .as_ref()
1246                .and_then(|e| e.epss_score)
1247                .unwrap_or(0.0);
1248            if epss < min_epss {
1249                return false;
1250            }
1251        }
1252
1253        // Check severity filter
1254        if let Some(min_severity) = &options.min_severity {
1255            let severity = advisory
1256                .enrichment
1257                .as_ref()
1258                .and_then(|e| e.cvss_v3_severity)
1259                .unwrap_or(Severity::None);
1260            if severity < *min_severity {
1261                return false;
1262            }
1263        }
1264
1265        // Check CWE filter
1266        if let Some(ref filter_cwes) = options.cwe_ids {
1267            if !filter_cwes.is_empty() {
1268                let advisory_cwes = Self::extract_cwes_from_advisory(advisory);
1269                // Normalize both filter CWEs and advisory CWEs for consistent matching
1270                let normalized_filter: Vec<String> = filter_cwes
1271                    .iter()
1272                    .map(|c| Self::normalize_cwe_id(c))
1273                    .collect();
1274                let normalized_advisory: Vec<String> = advisory_cwes
1275                    .iter()
1276                    .map(|c| Self::normalize_cwe_id(c))
1277                    .collect();
1278                // Advisory must have at least one matching CWE
1279                let has_match = normalized_filter
1280                    .iter()
1281                    .any(|cwe| normalized_advisory.iter().any(|ac| ac == cwe));
1282                if !has_match {
1283                    return false;
1284                }
1285            }
1286        }
1287
1288        true
1289    }
1290
1291    /// Normalize a CWE identifier to uppercase "CWE-XXX" format.
1292    ///
1293    /// Handles various input formats:
1294    /// - "79" → "CWE-79"
1295    /// - "cwe-79" → "CWE-79"
1296    /// - "CWE-79" → "CWE-79"
1297    fn normalize_cwe_id(cwe: &str) -> String {
1298        let trimmed = cwe.trim();
1299        let upper = trimmed.to_uppercase();
1300
1301        if upper.starts_with("CWE-") {
1302            upper
1303        } else {
1304            format!("CWE-{}", trimmed)
1305        }
1306    }
1307
1308    /// Extract CWE identifiers from an advisory.
1309    ///
1310    /// CWEs may be stored in `database_specific.cwe_ids` (from OSS Index and some OSV sources).
1311    fn extract_cwes_from_advisory(advisory: &Advisory) -> Vec<String> {
1312        let mut cwes = Vec::new();
1313
1314        // Check database_specific.cwe_ids (OSS Index, some OSV sources)
1315        if let Some(ref db_specific) = advisory.database_specific {
1316            if let Some(cwe_ids) = db_specific.get("cwe_ids") {
1317                if let Some(arr) = cwe_ids.as_array() {
1318                    for cwe in arr {
1319                        if let Some(s) = cwe.as_str() {
1320                            cwes.push(s.to_string());
1321                        }
1322                    }
1323                }
1324            }
1325        }
1326
1327        cwes
1328    }
1329
1330    /// Fetch live EPSS scores for CVEs (not from cache).
1331    pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
1332        let scores = self.epss_source.fetch_scores(cve_ids).await?;
1333        Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
1334    }
1335
1336    /// Check if a CVE is in the CISA KEV catalog.
1337    pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
1338        // Check cache first
1339        if let Some(data) = self.store.get_enrichment(cve_id).await? {
1340            return Ok(data.is_kev);
1341        }
1342
1343        // Fetch from source
1344        let entry = self.kev_source.is_kev(cve_id).await?;
1345        Ok(entry.is_some())
1346    }
1347
1348    // === OSS Index Methods ===
1349
1350    /// Query OSS Index for vulnerabilities affecting the given PURLs.
1351    ///
1352    /// This method provides automatic caching:
1353    /// - First checks the cache for each PURL
1354    /// - Only queries OSS Index for cache misses
1355    /// - Caches results for future queries
1356    ///
1357    /// # Arguments
1358    ///
1359    /// * `purls` - Package URLs to query (e.g., "pkg:npm/lodash@4.17.20")
1360    ///
1361    /// # Returns
1362    ///
1363    /// Vector of advisories for all vulnerabilities found.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns an error if OSS Index is not configured or if the query fails.
1368    ///
1369    /// # Example
1370    ///
1371    /// ```rust,ignore
1372    /// use vulnera_advisors::{VulnerabilityManager, Purl};
1373    ///
1374    /// let manager = VulnerabilityManager::builder()
1375    ///     .redis_url("redis://localhost:6379")
1376    ///     .with_ossindex(None)
1377    ///     .build()?;
1378    ///
1379    /// let purls = vec![
1380    ///     Purl::new("npm", "lodash").with_version("4.17.20").to_string(),
1381    /// ];
1382    ///
1383    /// let advisories = manager.query_ossindex(&purls).await?;
1384    /// ```
1385    pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
1386        let source = self.ossindex_source.as_ref().ok_or_else(|| {
1387            AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
1388        })?;
1389
1390        // Check cache for all PURLs
1391        let mut cached_advisories = Vec::new();
1392        let mut cache_misses = Vec::new();
1393
1394        for purl in purls {
1395            let cache_key = Purl::cache_key_from_str(purl);
1396            match self.store.get_ossindex_cache(&cache_key).await {
1397                Ok(Some(cache)) if !cache.is_expired() => {
1398                    debug!("OSS Index cache hit for {}", purl);
1399                    cached_advisories.extend(cache.advisories);
1400                }
1401                _ => {
1402                    debug!("OSS Index cache miss for {}", purl);
1403                    cache_misses.push(purl.clone());
1404                }
1405            }
1406        }
1407
1408        // Query OSS Index for cache misses
1409        if !cache_misses.is_empty() {
1410            debug!("Querying OSS Index for {} cache misses", cache_misses.len());
1411            let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
1412                AdvisoryError::SourceFetch {
1413                    source_name: "ossindex".to_string(),
1414                    message: e.to_string(),
1415                }
1416            })?;
1417
1418            // Group advisories by PURL for caching
1419            let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
1420
1421            // Cache results for each PURL
1422            for (purl, advisories) in &advisory_map {
1423                let cache_key = Purl::cache_key_from_str(purl);
1424                let cache = OssIndexCache::new(advisories.clone());
1425                if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
1426                    debug!("Failed to cache OSS Index result for {}: {}", purl, e);
1427                }
1428            }
1429
1430            // Flatten and add to results
1431            for advisories in advisory_map.into_values() {
1432                cached_advisories.extend(advisories);
1433            }
1434        }
1435
1436        Ok(cached_advisories)
1437    }
1438
1439    /// Query OSS Index for vulnerabilities with fallback to stored advisories.
1440    ///
1441    /// This method first queries OSS Index, then falls back to the local store
1442    /// if the OSS Index query fails or returns no results.
1443    ///
1444    /// # Arguments
1445    ///
1446    /// * `packages` - List of packages to query (ecosystem, name, optional version)
1447    ///
1448    /// # Returns
1449    ///
1450    /// Structured batch outcome with successful results and per-package failures.
1451    pub async fn query_batch_with_ossindex(
1452        &self,
1453        packages: &[PackageKey],
1454    ) -> Result<BatchOutcome<Vec<Advisory>>> {
1455        let mut successes: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
1456        let mut failures: Vec<BatchFailure> = Vec::new();
1457
1458        // Build PURLs for packages that have versions
1459        let (with_version, without_version): (Vec<_>, Vec<_>) =
1460            packages.iter().partition(|p| p.version.is_some());
1461
1462        // Query OSS Index for packages with versions
1463        if !with_version.is_empty() && self.ossindex_source.is_some() {
1464            let purls: Vec<String> = with_version
1465                .iter()
1466                .map(|p| {
1467                    Purl::new(&p.ecosystem, &p.name)
1468                        .with_version(p.version.as_ref().unwrap())
1469                        .to_string()
1470                })
1471                .collect();
1472
1473            match self.query_ossindex(&purls).await {
1474                Ok(advisories) => {
1475                    // Group advisories by package key
1476                    for pkg in &with_version {
1477                        let pkg_advisories: Vec<_> = advisories
1478                            .iter()
1479                            .filter(|a| {
1480                                a.affected.iter().any(|aff| {
1481                                    aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
1482                                        && aff.package.name == pkg.name
1483                                })
1484                            })
1485                            .cloned()
1486                            .collect();
1487                        successes.insert((*pkg).clone(), pkg_advisories);
1488                    }
1489                }
1490                Err(e) => {
1491                    warn!("OSS Index query failed, falling back to local store: {}", e);
1492                    // Fallback to local store
1493                    for pkg in &with_version {
1494                        failures.push(BatchFailure {
1495                            package: (*pkg).clone(),
1496                            stage: BatchFailureStage::OssIndex,
1497                            retryable: e.is_retryable(),
1498                            error: e.to_string(),
1499                        });
1500                        let advisories = if let Some(version) = &pkg.version {
1501                            self.matches(&pkg.ecosystem, &pkg.name, version).await
1502                        } else {
1503                            self.query(&pkg.ecosystem, &pkg.name).await
1504                        };
1505
1506                        match advisories {
1507                            Ok(advisories) => {
1508                                successes.insert((*pkg).clone(), advisories);
1509                            }
1510                            Err(fallback_err) => {
1511                                failures.push(BatchFailure {
1512                                    package: (*pkg).clone(),
1513                                    stage: BatchFailureStage::StoreLookup,
1514                                    retryable: fallback_err.is_retryable(),
1515                                    error: fallback_err.to_string(),
1516                                });
1517                            }
1518                        }
1519                    }
1520                }
1521            }
1522        }
1523
1524        // Query local store for packages without versions
1525        for pkg in &without_version {
1526            match self.query(&pkg.ecosystem, &pkg.name).await {
1527                Ok(advisories) => {
1528                    successes.insert((*pkg).clone(), advisories);
1529                }
1530                Err(e) => {
1531                    failures.push(BatchFailure {
1532                        package: (*pkg).clone(),
1533                        stage: BatchFailureStage::StoreLookup,
1534                        retryable: e.is_retryable(),
1535                        error: e.to_string(),
1536                    });
1537                }
1538            }
1539        }
1540
1541        let mut outcome = BatchOutcome::from_parts(successes, failures, packages.len());
1542        outcome.summary.range_translation_statuses =
1543            Self::collect_range_translation_statuses(&outcome.successes);
1544        Ok(outcome)
1545    }
1546
1547    /// Invalidate cached OSS Index results for specific PURLs.
1548    ///
1549    /// Use this to force a fresh query on the next call.
1550    pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
1551        for purl in purls {
1552            let cache_key = Purl::cache_key_from_str(purl);
1553            self.store.invalidate_ossindex_cache(&cache_key).await?;
1554        }
1555        Ok(())
1556    }
1557
1558    /// Invalidate all cached OSS Index results.
1559    pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
1560        self.store.invalidate_all_ossindex_cache().await?;
1561        Ok(())
1562    }
1563
1564    // === Remediation Methods ===
1565
1566    /// Get remediation suggestions for a vulnerable package.
1567    ///
1568    /// This method checks if the specified version is vulnerable, and if so,
1569    /// suggests the nearest and latest safe versions based on fixed versions
1570    /// declared in the advisories.
1571    ///
1572    /// # Arguments
1573    ///
1574    /// * `ecosystem` - Package ecosystem (e.g., "npm", "pypi")
1575    /// * `package` - Package name
1576    /// * `current_version` - Current version to analyze
1577    ///
1578    /// # Returns
1579    ///
1580    /// A [`crate::remediation::Remediation`] containing safe version suggestions and upgrade impact.
1581    ///
1582    /// # Example
1583    ///
1584    /// ```rust,ignore
1585    /// use vulnera_advisors::VulnerabilityManager;
1586    ///
1587    /// let remediation = manager.suggest_remediation("npm", "lodash", "4.17.20").await?;
1588    /// if let Some(nearest) = remediation.nearest_safe {
1589    ///     println!("Upgrade to {} ({:?} impact)", nearest, remediation.upgrade_impact);
1590    /// }
1591    /// ```
1592    pub async fn suggest_remediation(
1593        &self,
1594        ecosystem: &str,
1595        package: &str,
1596        current_version: &str,
1597    ) -> Result<crate::remediation::Remediation> {
1598        // Get matching advisories for this version
1599        let advisories = self.matches(ecosystem, package, current_version).await?;
1600
1601        // Build remediation using the semver matcher
1602        let remediation = crate::remediation::build_remediation(
1603            ecosystem,
1604            package,
1605            current_version,
1606            &advisories,
1607            None, // No registry versions, use only fixed versions from advisories
1608            Self::matches_semver_range,
1609        );
1610
1611        Ok(remediation)
1612    }
1613
1614    /// Get remediation suggestions with registry lookup for all available versions.
1615    ///
1616    /// This is an enhanced version of [`Self::suggest_remediation`] that fetches
1617    /// available versions from package registries to provide more complete
1618    /// upgrade suggestions.
1619    ///
1620    /// # Arguments
1621    ///
1622    /// * `ecosystem` - Package ecosystem (e.g., "npm", "pypi")
1623    /// * `package` - Package name
1624    /// * `current_version` - Current version to analyze
1625    /// * `registry` - A version registry implementation to fetch available versions
1626    ///
1627    /// # Returns
1628    ///
1629    /// A [`crate::remediation::Remediation`] containing safe version suggestions from the full version list.
1630    ///
1631    /// # Example
1632    ///
1633    /// ```rust,ignore
1634    /// use vulnera_advisors::{VulnerabilityManager, PackageRegistry};
1635    ///
1636    /// let registry = PackageRegistry::new();
1637    /// let remediation = manager
1638    ///     .suggest_remediation_with_registry("npm", "lodash", "4.17.20", &registry)
1639    ///     .await?;
1640    /// ```
1641    pub async fn suggest_remediation_with_registry(
1642        &self,
1643        ecosystem: &str,
1644        package: &str,
1645        current_version: &str,
1646        registry: &dyn crate::version_registry::VersionRegistry,
1647    ) -> Result<crate::remediation::Remediation> {
1648        // Get matching advisories for this version
1649        let advisories = self.matches(ecosystem, package, current_version).await?;
1650
1651        // Fetch all available versions from registry
1652        let available_versions = match registry.get_versions(ecosystem, package).await {
1653            Ok(versions) => Some(versions),
1654            Err(e) => {
1655                warn!(
1656                    "Failed to fetch versions from registry, using advisory data only: {}",
1657                    e
1658                );
1659                None
1660            }
1661        };
1662
1663        // Build remediation with registry versions
1664        let remediation = crate::remediation::build_remediation(
1665            ecosystem,
1666            package,
1667            current_version,
1668            &advisories,
1669            available_versions.as_deref(),
1670            Self::matches_semver_range,
1671        );
1672
1673        Ok(remediation)
1674    }
1675
1676    /// Group advisories by their associated PURL.
1677    fn group_advisories_by_purl(
1678        purls: &[String],
1679        advisories: &[Advisory],
1680    ) -> HashMap<String, Vec<Advisory>> {
1681        let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
1682
1683        // Initialize map with empty vectors for all PURLs
1684        for purl in purls {
1685            map.insert(purl.clone(), Vec::new());
1686        }
1687
1688        for advisory in advisories {
1689            for affected in &advisory.affected {
1690                for purl in purls {
1691                    let Ok(parsed) = Purl::parse(purl) else {
1692                        continue;
1693                    };
1694
1695                    // Match on ecosystem as well as name to avoid cross-ecosystem collisions
1696                    let affected_eco = affected.package.ecosystem.to_lowercase();
1697                    let purl_eco = parsed.purl_type.to_lowercase();
1698                    let purl_eco_alt = parsed.ecosystem().to_lowercase();
1699                    if affected_eco != purl_eco && affected_eco != purl_eco_alt {
1700                        continue;
1701                    }
1702
1703                    if parsed.name != affected.package.name {
1704                        continue;
1705                    }
1706
1707                    if let Some(ver) = parsed.version.as_deref() {
1708                        // If a version is specified, ensure the advisory actually covers it.
1709                        let version_matches = affected.versions.contains(&ver.to_string())
1710                            || affected.ranges.iter().any(|r| {
1711                                matches!(r.range_type, RangeType::Semver | RangeType::Ecosystem)
1712                                    && Self::matches_semver_range(ver, &r.events)
1713                            });
1714
1715                        if !version_matches {
1716                            continue;
1717                        }
1718                    }
1719
1720                    map.entry(purl.clone()).or_default().push(advisory.clone());
1721                    break;
1722                }
1723            }
1724        }
1725
1726        map
1727    }
1728}
1729
1730#[cfg(test)]
1731mod tests {
1732    use super::*;
1733    use crate::models::{Advisory, Enrichment, Severity};
1734
1735    /// Helper to create a test advisory with optional CWEs in database_specific
1736    fn create_advisory_with_cwes(id: &str, cwes: Option<Vec<&str>>) -> Advisory {
1737        let database_specific = cwes.map(|cwe_list| {
1738            serde_json::json!({
1739                "cwe_ids": cwe_list
1740            })
1741        });
1742
1743        Advisory {
1744            id: id.to_string(),
1745            summary: Some("Test advisory".to_string()),
1746            details: None,
1747            affected: vec![],
1748            references: vec![],
1749            published: None,
1750            modified: None,
1751            aliases: None,
1752            database_specific,
1753            enrichment: None,
1754        }
1755    }
1756
1757    /// Helper to create a test advisory with enrichment data
1758    fn create_advisory_with_enrichment(id: &str, severity: Severity, is_kev: bool) -> Advisory {
1759        Advisory {
1760            id: id.to_string(),
1761            summary: Some("Test advisory".to_string()),
1762            details: None,
1763            affected: vec![],
1764            references: vec![],
1765            published: None,
1766            modified: None,
1767            aliases: None,
1768            database_specific: None,
1769            enrichment: Some(Enrichment {
1770                cvss_v3_severity: Some(severity),
1771                is_kev,
1772                ..Default::default()
1773            }),
1774        }
1775    }
1776
1777    #[test]
1778    fn test_match_options_default() {
1779        let options = MatchOptions::default();
1780        assert!(options.cwe_ids.is_none());
1781        assert!(options.min_cvss.is_none());
1782        assert!(!options.kev_only);
1783    }
1784
1785    #[test]
1786    fn test_match_options_with_cwes() {
1787        let options = MatchOptions::with_cwes(vec!["CWE-79".to_string(), "CWE-89".to_string()]);
1788        assert!(options.cwe_ids.is_some());
1789        let cwes = options.cwe_ids.unwrap();
1790        assert_eq!(cwes.len(), 2);
1791        assert!(cwes.contains(&"CWE-79".to_string()));
1792        assert!(cwes.contains(&"CWE-89".to_string()));
1793        assert!(options.include_enrichment);
1794    }
1795
1796    #[test]
1797    fn test_extract_cwes_from_advisory_with_cwes() {
1798        let advisory = create_advisory_with_cwes("CVE-2024-1234", Some(vec!["CWE-79", "CWE-89"]));
1799        let cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1800        assert_eq!(cwes.len(), 2);
1801        assert!(cwes.contains(&"CWE-79".to_string()));
1802        assert!(cwes.contains(&"CWE-89".to_string()));
1803    }
1804
1805    #[test]
1806    fn test_extract_cwes_from_advisory_no_cwes() {
1807        let advisory = create_advisory_with_cwes("CVE-2024-1234", None);
1808        let cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1809        assert!(cwes.is_empty());
1810    }
1811
1812    #[test]
1813    fn test_extract_cwes_from_advisory_empty_cwes() {
1814        let advisory = create_advisory_with_cwes("CVE-2024-1234", Some(vec![]));
1815        let cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1816        assert!(cwes.is_empty());
1817    }
1818
1819    #[test]
1820    fn test_cwe_filter_case_insensitive() {
1821        // Test that CWE matching is case-insensitive
1822        let advisory = create_advisory_with_cwes("CVE-2024-1234", Some(vec!["cwe-79"]));
1823
1824        // Create options with uppercase CWE
1825        let options = MatchOptions::with_cwes(vec!["CWE-79".to_string()]);
1826
1827        // Extract CWEs
1828        let advisory_cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1829
1830        // Check case-insensitive matching
1831        let filter_cwes = options.cwe_ids.as_ref().unwrap();
1832        let has_match = filter_cwes
1833            .iter()
1834            .any(|cwe| advisory_cwes.iter().any(|ac| ac.eq_ignore_ascii_case(cwe)));
1835        assert!(has_match, "CWE matching should be case-insensitive");
1836    }
1837
1838    #[test]
1839    fn test_cwe_filter_no_match() {
1840        let advisory = create_advisory_with_cwes("CVE-2024-1234", Some(vec!["CWE-79"]));
1841
1842        // Create options filtering for a different CWE
1843        let options = MatchOptions::with_cwes(vec!["CWE-89".to_string()]);
1844
1845        let advisory_cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1846        let filter_cwes = options.cwe_ids.as_ref().unwrap();
1847        let has_match = filter_cwes
1848            .iter()
1849            .any(|cwe| advisory_cwes.iter().any(|ac| ac.eq_ignore_ascii_case(cwe)));
1850        assert!(!has_match, "Should not match when CWEs don't overlap");
1851    }
1852
1853    #[test]
1854    fn test_cwe_filter_partial_match() {
1855        // Advisory has multiple CWEs, filter matches one of them
1856        let advisory =
1857            create_advisory_with_cwes("CVE-2024-1234", Some(vec!["CWE-79", "CWE-352", "CWE-94"]));
1858
1859        let options = MatchOptions::with_cwes(vec!["CWE-89".to_string(), "CWE-79".to_string()]);
1860
1861        let advisory_cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1862        let filter_cwes = options.cwe_ids.as_ref().unwrap();
1863        let has_match = filter_cwes
1864            .iter()
1865            .any(|cwe| advisory_cwes.iter().any(|ac| ac.eq_ignore_ascii_case(cwe)));
1866        assert!(has_match, "Should match when at least one CWE overlaps");
1867    }
1868
1869    #[test]
1870    fn test_match_options_empty_cwe_list() {
1871        // Empty CWE list should not filter anything
1872        let options = MatchOptions {
1873            cwe_ids: Some(vec![]),
1874            ..Default::default()
1875        };
1876
1877        // The filter check should pass when cwe_ids list is empty
1878        assert!(options.cwe_ids.as_ref().is_none_or(|v| v.is_empty()));
1879    }
1880
1881    #[test]
1882    fn test_match_options_combined_filters() {
1883        // Test that CWE filter can be combined with other filters
1884        let options = MatchOptions {
1885            cwe_ids: Some(vec!["CWE-79".to_string()]),
1886            min_severity: Some(Severity::High),
1887            kev_only: true,
1888            include_enrichment: true,
1889            ..Default::default()
1890        };
1891
1892        assert!(options.cwe_ids.is_some());
1893        assert_eq!(options.min_severity, Some(Severity::High));
1894        assert!(options.kev_only);
1895    }
1896
1897    #[test]
1898    fn test_normalize_cwe_id_with_prefix() {
1899        assert_eq!(VulnerabilityManager::normalize_cwe_id("CWE-79"), "CWE-79");
1900        assert_eq!(VulnerabilityManager::normalize_cwe_id("cwe-79"), "CWE-79");
1901        assert_eq!(VulnerabilityManager::normalize_cwe_id("Cwe-89"), "CWE-89");
1902    }
1903
1904    #[test]
1905    fn test_normalize_cwe_id_bare_number() {
1906        assert_eq!(VulnerabilityManager::normalize_cwe_id("79"), "CWE-79");
1907        assert_eq!(VulnerabilityManager::normalize_cwe_id("89"), "CWE-89");
1908        assert_eq!(VulnerabilityManager::normalize_cwe_id("352"), "CWE-352");
1909    }
1910
1911    #[test]
1912    fn test_normalize_cwe_id_with_whitespace() {
1913        assert_eq!(VulnerabilityManager::normalize_cwe_id(" CWE-79 "), "CWE-79");
1914        assert_eq!(VulnerabilityManager::normalize_cwe_id(" 79 "), "CWE-79");
1915    }
1916
1917    #[test]
1918    fn test_cwe_filter_bare_id_matches_prefixed() {
1919        // User filters with bare "79", advisory has "CWE-79"
1920        let advisory = create_advisory_with_cwes("CVE-2024-1234", Some(vec!["CWE-79"]));
1921        let advisory_cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1922
1923        let filter_cwes = ["79".to_string()];
1924        let normalized_filter: Vec<String> = filter_cwes
1925            .iter()
1926            .map(|c| VulnerabilityManager::normalize_cwe_id(c))
1927            .collect();
1928        let normalized_advisory: Vec<String> = advisory_cwes
1929            .iter()
1930            .map(|c| VulnerabilityManager::normalize_cwe_id(c))
1931            .collect();
1932
1933        let has_match = normalized_filter
1934            .iter()
1935            .any(|cwe| normalized_advisory.iter().any(|ac| ac == cwe));
1936        assert!(has_match, "Bare '79' should match 'CWE-79'");
1937    }
1938
1939    #[test]
1940    fn test_cwe_filter_prefixed_matches_bare() {
1941        // User filters with "CWE-79", advisory has bare "79"
1942        let advisory = create_advisory_with_cwes("CVE-2024-1234", Some(vec!["79"]));
1943        let advisory_cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1944
1945        let filter_cwes = ["CWE-79".to_string()];
1946        let normalized_filter: Vec<String> = filter_cwes
1947            .iter()
1948            .map(|c| VulnerabilityManager::normalize_cwe_id(c))
1949            .collect();
1950        let normalized_advisory: Vec<String> = advisory_cwes
1951            .iter()
1952            .map(|c| VulnerabilityManager::normalize_cwe_id(c))
1953            .collect();
1954
1955        let has_match = normalized_filter
1956            .iter()
1957            .any(|cwe| normalized_advisory.iter().any(|ac| ac == cwe));
1958        assert!(has_match, "'CWE-79' should match bare '79'");
1959    }
1960
1961    #[test]
1962    fn test_cwe_filter_with_enrichment_severity() {
1963        // Test CWE filtering works correctly with enrichment data (severity)
1964        let mut advisory = create_advisory_with_enrichment("CVE-2024-1234", Severity::High, false);
1965
1966        // Add CWE data to the advisory
1967        let mut db_specific = serde_json::Map::new();
1968        db_specific.insert(
1969            "cwe_ids".to_string(),
1970            serde_json::json!(["CWE-79", "CWE-89"]),
1971        );
1972        advisory.database_specific = Some(serde_json::Value::Object(db_specific));
1973
1974        // Verify enrichment is present
1975        assert!(advisory.enrichment.is_some());
1976        assert_eq!(
1977            advisory.enrichment.as_ref().unwrap().cvss_v3_severity,
1978            Some(Severity::High)
1979        );
1980
1981        // Verify CWE extraction works with enrichment
1982        let cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
1983        assert_eq!(cwes, vec!["CWE-79", "CWE-89"]);
1984    }
1985
1986    #[test]
1987    fn test_cwe_filter_with_enrichment_kev() {
1988        // Test CWE filtering works correctly with KEV status
1989        let mut advisory =
1990            create_advisory_with_enrichment("CVE-2024-5678", Severity::Critical, true);
1991
1992        // Add CWE data
1993        let mut db_specific = serde_json::Map::new();
1994        db_specific.insert("cwe_ids".to_string(), serde_json::json!(["CWE-78"]));
1995        advisory.database_specific = Some(serde_json::Value::Object(db_specific));
1996
1997        // Verify KEV status is present
1998        assert!(advisory.enrichment.as_ref().unwrap().is_kev);
1999
2000        // Verify CWE extraction still works
2001        let cwes = VulnerabilityManager::extract_cwes_from_advisory(&advisory);
2002        assert_eq!(cwes, vec!["CWE-78"]);
2003
2004        // Test normalization
2005        let normalized: Vec<String> = cwes
2006            .iter()
2007            .map(|c| VulnerabilityManager::normalize_cwe_id(c))
2008            .collect();
2009        assert_eq!(normalized, vec!["CWE-78"]);
2010    }
2011
2012    #[test]
2013    fn test_matches_git_range_exact_boundary_commits() {
2014        let events = vec![
2015            Event::Introduced("abc123".to_string()),
2016            Event::Fixed("def456".to_string()),
2017        ];
2018
2019        assert!(VulnerabilityManager::matches_git_range("abc123", &events));
2020        assert!(!VulnerabilityManager::matches_git_range("def456", &events));
2021    }
2022
2023    #[test]
2024    fn test_matches_git_range_unbounded_introduced() {
2025        let events = vec![Event::Introduced("0".to_string())];
2026        assert!(VulnerabilityManager::matches_git_range("deadbeef", &events));
2027
2028        let closed_events = vec![
2029            Event::Introduced("0".to_string()),
2030            Event::Limit("ffff".to_string()),
2031        ];
2032        assert!(!VulnerabilityManager::matches_git_range(
2033            "deadbeef",
2034            &closed_events
2035        ));
2036    }
2037}