vulnera_advisor/
manager.rs

1//! Vulnerability manager for orchestrating syncs and queries.
2//!
3//! The [`VulnerabilityManager`] is the main entry point for using this crate.
4//! Use [`VulnerabilityManagerBuilder`] for flexible configuration.
5
6use crate::config::{Config, OssIndexConfig, StoreConfig};
7use crate::error::{AdvisoryError, Result};
8use crate::models::{Advisory, Enrichment, Event, RangeType, Severity};
9use crate::purl::Purl;
10use crate::sources::epss::EpssSource;
11use crate::sources::kev::KevSource;
12use crate::sources::ossindex::OssIndexSource;
13use crate::sources::{AdvisorySource, ghsa::GHSASource, nvd::NVDSource, osv::OSVSource};
14use crate::store::{AdvisoryStore, DragonflyStore, EnrichmentData, HealthStatus, OssIndexCache};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tracing::{debug, error, info, warn};
18
19/// Options for filtering vulnerability matches.
20#[derive(Debug, Clone, Default)]
21pub struct MatchOptions {
22    /// Minimum CVSS v3 score (0.0 - 10.0).
23    pub min_cvss: Option<f64>,
24    /// Minimum EPSS score (0.0 - 1.0).
25    pub min_epss: Option<f64>,
26    /// Only return KEV (actively exploited) vulnerabilities.
27    pub kev_only: bool,
28    /// Minimum severity level.
29    pub min_severity: Option<Severity>,
30    /// Include enrichment data (EPSS, KEV) in results.
31    pub include_enrichment: bool,
32}
33
34impl MatchOptions {
35    /// Create options that include all vulnerabilities with enrichment.
36    pub fn with_enrichment() -> Self {
37        Self {
38            include_enrichment: true,
39            ..Default::default()
40        }
41    }
42
43    /// Create options for high-severity vulnerabilities only.
44    pub fn high_severity() -> Self {
45        Self {
46            min_severity: Some(Severity::High),
47            include_enrichment: true,
48            ..Default::default()
49        }
50    }
51
52    /// Create options for actively exploited vulnerabilities only.
53    pub fn exploited_only() -> Self {
54        Self {
55            kev_only: true,
56            include_enrichment: true,
57            ..Default::default()
58        }
59    }
60}
61
62/// A key identifying a package for batch queries.
63#[derive(Debug, Clone, Hash, PartialEq, Eq)]
64pub struct PackageKey {
65    /// Package ecosystem (e.g., "npm", "PyPI").
66    pub ecosystem: String,
67    /// Package name.
68    pub name: String,
69    /// Optional version for matching.
70    pub version: Option<String>,
71}
72
73impl PackageKey {
74    /// Create a new package key.
75    pub fn new(ecosystem: impl Into<String>, name: impl Into<String>) -> Self {
76        Self {
77            ecosystem: ecosystem.into(),
78            name: name.into(),
79            version: None,
80        }
81    }
82
83    /// Create a package key with a version.
84    pub fn with_version(
85        ecosystem: impl Into<String>,
86        name: impl Into<String>,
87        version: impl Into<String>,
88    ) -> Self {
89        Self {
90            ecosystem: ecosystem.into(),
91            name: name.into(),
92            version: Some(version.into()),
93        }
94    }
95}
96
97/// Builder for configuring VulnerabilityManager.
98pub struct VulnerabilityManagerBuilder {
99    redis_url: Option<String>,
100    store_config: StoreConfig,
101    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
102    custom_store: Option<Arc<dyn AdvisoryStore + Send + Sync>>,
103    ossindex_source: Option<OssIndexSource>,
104}
105
106impl Default for VulnerabilityManagerBuilder {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl VulnerabilityManagerBuilder {
113    /// Create a new builder with default settings.
114    pub fn new() -> Self {
115        Self {
116            redis_url: None,
117            store_config: StoreConfig::default(),
118            sources: Vec::new(),
119            custom_store: None,
120            ossindex_source: None,
121        }
122    }
123
124    /// Set the Redis connection URL.
125    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
126        self.redis_url = Some(url.into());
127        self
128    }
129
130    /// Set the store configuration.
131    pub fn store_config(mut self, config: StoreConfig) -> Self {
132        self.store_config = config;
133        self
134    }
135
136    /// Use a custom store implementation.
137    pub fn store(mut self, store: Arc<dyn AdvisoryStore + Send + Sync>) -> Self {
138        self.custom_store = Some(store);
139        self
140    }
141
142    /// Add a vulnerability source.
143    pub fn add_source(mut self, source: Arc<dyn AdvisorySource + Send + Sync>) -> Self {
144        self.sources.push(source);
145        self
146    }
147
148    /// Add the GHSA source with the given token.
149    pub fn with_ghsa(mut self, token: impl Into<String>) -> Self {
150        self.sources.push(Arc::new(GHSASource::new(token.into())));
151        self
152    }
153
154    /// Add the NVD source with optional API key.
155    pub fn with_nvd(mut self, api_key: Option<String>) -> Self {
156        self.sources.push(Arc::new(NVDSource::new(api_key)));
157        self
158    }
159
160    /// Add the OSV source for specified ecosystems.
161    pub fn with_osv(mut self, ecosystems: Vec<String>) -> Self {
162        self.sources.push(Arc::new(OSVSource::new(ecosystems)));
163        self
164    }
165
166    /// Add default OSV ecosystems.
167    pub fn with_osv_defaults(self) -> Self {
168        self.with_osv(vec![
169            "npm".to_string(),
170            "PyPI".to_string(),
171            "Maven".to_string(),
172            "crates.io".to_string(),
173            "Go".to_string(),
174            "Packagist".to_string(),
175            "RubyGems".to_string(),
176            "NuGet".to_string(),
177        ])
178    }
179
180    /// Add the OSS Index source with optional configuration.
181    ///
182    /// OSS Index provides on-demand vulnerability queries by PURL.
183    /// If no config is provided, credentials are loaded from environment variables.
184    pub fn with_ossindex(mut self, config: Option<OssIndexConfig>) -> Self {
185        match OssIndexSource::new(config) {
186            Ok(source) => {
187                self.ossindex_source = Some(source);
188            }
189            Err(e) => {
190                warn!("Failed to configure OSS Index source: {}", e);
191            }
192        }
193        self
194    }
195
196    /// Build the VulnerabilityManager.
197    pub fn build(self) -> Result<VulnerabilityManager> {
198        let store: Arc<dyn AdvisoryStore + Send + Sync> = match self.custom_store {
199            Some(s) => s,
200            None => {
201                let url = self.redis_url.ok_or_else(|| {
202                    AdvisoryError::config("Redis URL is required. Use .redis_url() or .store()")
203                })?;
204                Arc::new(DragonflyStore::with_config(&url, self.store_config)?)
205            }
206        };
207
208        if self.sources.is_empty() {
209            warn!("No sources configured. Use .with_ghsa(), .with_nvd(), or .with_osv()");
210        }
211
212        Ok(VulnerabilityManager {
213            store,
214            sources: self.sources,
215            kev_source: KevSource::new(),
216            epss_source: EpssSource::new(),
217            ossindex_source: self.ossindex_source,
218        })
219    }
220}
221
222/// Main vulnerability manager for syncing and querying advisories.
223pub struct VulnerabilityManager {
224    store: Arc<dyn AdvisoryStore + Send + Sync>,
225    sources: Vec<Arc<dyn AdvisorySource + Send + Sync>>,
226    kev_source: KevSource,
227    epss_source: EpssSource,
228    ossindex_source: Option<OssIndexSource>,
229}
230
231impl VulnerabilityManager {
232    /// Create a new manager from a Config.
233    ///
234    /// This is a convenience method. For more control, use [`VulnerabilityManagerBuilder`].
235    pub async fn new(config: Config) -> Result<Self> {
236        let mut builder = VulnerabilityManagerBuilder::new()
237            .redis_url(&config.redis_url)
238            .store_config(config.store.clone());
239
240        // Add OSV source
241        builder = builder.with_osv_defaults();
242
243        // Add NVD source
244        builder = builder.with_nvd(config.nvd_api_key.clone());
245
246        // Add GHSA source if token is provided
247        if let Some(token) = &config.ghsa_token {
248            builder = builder.with_ghsa(token.clone());
249        }
250
251        // Add OSS Index source if configured
252        if config.ossindex.is_some() {
253            builder = builder.with_ossindex(config.ossindex.clone());
254        }
255
256        builder.build()
257    }
258
259    /// Create a builder for custom configuration.
260    pub fn builder() -> VulnerabilityManagerBuilder {
261        VulnerabilityManagerBuilder::new()
262    }
263
264    /// Get a reference to the underlying store.
265    pub fn store(&self) -> &Arc<dyn AdvisoryStore + Send + Sync> {
266        &self.store
267    }
268
269    /// Check the health of the store connection.
270    pub async fn health_check(&self) -> Result<HealthStatus> {
271        self.store.health_check().await
272    }
273
274    /// Sync advisories from all configured sources.
275    pub async fn sync_all(&self) -> Result<()> {
276        info!("Starting full vulnerability sync...");
277
278        let mut handles = Vec::new();
279
280        for source in &self.sources {
281            let source = source.clone();
282            let store = self.store.clone();
283
284            let handle = tokio::spawn(async move {
285                let last_sync = match store.last_sync(source.name()).await {
286                    Ok(Some(ts)) => match chrono::DateTime::parse_from_rfc3339(&ts) {
287                        Ok(dt) => Some(dt.with_timezone(&chrono::Utc)),
288                        Err(_) => None,
289                    },
290                    _ => None,
291                };
292
293                if let Some(since) = last_sync {
294                    info!("Syncing {} since {}", source.name(), since);
295                } else {
296                    info!("Syncing {} (full)", source.name());
297                }
298
299                match source.fetch(last_sync).await {
300                    Ok(advisories) => {
301                        if !advisories.is_empty() {
302                            match store.upsert_batch(&advisories, source.name()).await {
303                                Ok(_) => {
304                                    info!(
305                                        "Successfully synced {} advisories from {}",
306                                        advisories.len(),
307                                        source.name()
308                                    );
309                                    // Update timestamp only after successful storage
310                                    if let Err(e) = store.update_sync_timestamp(source.name()).await
311                                    {
312                                        error!(
313                                            "Failed to update sync timestamp for {}: {}",
314                                            source.name(),
315                                            e
316                                        );
317                                    }
318                                }
319                                Err(e) => {
320                                    error!(
321                                        "Failed to store advisories for {}: {}",
322                                        source.name(),
323                                        e
324                                    );
325                                    // Do NOT update timestamp on storage failure
326                                }
327                            }
328                        } else {
329                            info!("No new advisories for {}", source.name());
330                            // Update sync timestamp even if no new advisories (successful check)
331                            if let Err(e) = store.update_sync_timestamp(source.name()).await {
332                                error!(
333                                    "Failed to update sync timestamp for {}: {}",
334                                    source.name(),
335                                    e
336                                );
337                            }
338                        }
339                    }
340                    Err(e) => {
341                        error!("Failed to fetch from {}: {}", source.name(), e);
342                        // Do NOT update timestamp on fetch failure
343                    }
344                }
345            });
346            handles.push(handle);
347        }
348
349        // Wait for all tasks to complete
350        for handle in handles {
351            if let Err(e) = handle.await {
352                error!("Task join error: {}", e);
353            }
354        }
355
356        info!("Sync completed.");
357        Ok(())
358    }
359
360    /// Reset the sync timestamp for a specific source.
361    ///
362    /// This forces a full re-sync on the next `sync_all()` call.
363    pub async fn reset_sync(&self, source: &str) -> Result<()> {
364        self.store.reset_sync_timestamp(source).await
365    }
366
367    /// Reset all sync timestamps, forcing a full re-sync of all sources.
368    pub async fn reset_all_syncs(&self) -> Result<()> {
369        for source in &self.sources {
370            self.store.reset_sync_timestamp(source.name()).await?;
371        }
372        Ok(())
373    }
374
375    /// Sync enrichment data (KEV and EPSS).
376    pub async fn sync_enrichment(&self) -> Result<()> {
377        info!("Syncing enrichment data (KEV, EPSS)...");
378
379        // Sync KEV data
380        match self.kev_source.fetch_catalog().await {
381            Ok(kev_entries) => {
382                info!("Processing {} KEV entries", kev_entries.len());
383                for (cve_id, entry) in kev_entries {
384                    let data = EnrichmentData {
385                        epss_score: None,
386                        epss_percentile: None,
387                        is_kev: true,
388                        kev_due_date: entry.due_date_utc().map(|d| d.to_rfc3339()),
389                        kev_date_added: entry.date_added_utc().map(|d| d.to_rfc3339()),
390                        kev_ransomware: Some(entry.is_ransomware_related()),
391                        updated_at: chrono::Utc::now().to_rfc3339(),
392                    };
393                    if let Err(e) = self.store.store_enrichment(&cve_id, &data).await {
394                        debug!("Failed to store KEV enrichment for {}: {}", cve_id, e);
395                    }
396                }
397            }
398            Err(e) => {
399                error!("Failed to fetch KEV catalog: {}", e);
400            }
401        }
402
403        Ok(())
404    }
405
406    /// Query advisories for a specific package.
407    pub async fn query(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
408        let advisories = self.store.get_by_package(ecosystem, package).await?;
409        Ok(crate::aggregator::ReportAggregator::aggregate(advisories))
410    }
411
412    /// Query advisories with enrichment data.
413    pub async fn query_enriched(&self, ecosystem: &str, package: &str) -> Result<Vec<Advisory>> {
414        let mut advisories = self.query(ecosystem, package).await?;
415        self.enrich_advisories(&mut advisories).await?;
416        Ok(advisories)
417    }
418
419    /// Query multiple packages in a batch (concurrent).
420    ///
421    /// All queries run in parallel for maximum throughput.
422    pub async fn query_batch(
423        &self,
424        packages: &[PackageKey],
425    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
426        use futures_util::future::join_all;
427
428        let tasks: Vec<_> = packages
429            .iter()
430            .map(|pkg| {
431                let pkg = pkg.clone();
432                let ecosystem = pkg.ecosystem.clone();
433                let name = pkg.name.clone();
434                let version = pkg.version.clone();
435                let store = self.store.clone();
436
437                async move {
438                    let advisories = if let Some(ver) = &version {
439                        // For version matching, we need the full logic
440                        let all = store.get_by_package(&ecosystem, &name).await?;
441                        let aggregated = crate::aggregator::ReportAggregator::aggregate(all);
442                        Self::filter_by_version(aggregated, &ecosystem, &name, ver)
443                    } else {
444                        let all = store.get_by_package(&ecosystem, &name).await?;
445                        crate::aggregator::ReportAggregator::aggregate(all)
446                    };
447                    Ok::<_, crate::error::AdvisoryError>((pkg, advisories))
448                }
449            })
450            .collect();
451
452        let results: Vec<_> = join_all(tasks).await;
453
454        let mut map = HashMap::new();
455        for result in results {
456            match result {
457                Ok((pkg, advisories)) => {
458                    map.insert(pkg, advisories);
459                }
460                Err(e) => {
461                    warn!("Batch query error: {}", e);
462                }
463            }
464        }
465
466        Ok(map)
467    }
468
469    /// Filter advisories by version (static helper for concurrent batch queries)
470    fn filter_by_version(
471        advisories: Vec<Advisory>,
472        ecosystem: &str,
473        package: &str,
474        version: &str,
475    ) -> Vec<Advisory> {
476        advisories
477            .into_iter()
478            .filter(|advisory| {
479                for affected in &advisory.affected {
480                    if affected.package.name != package || affected.package.ecosystem != ecosystem {
481                        continue;
482                    }
483
484                    // Check explicit versions
485                    if affected.versions.contains(&version.to_string()) {
486                        return true;
487                    }
488
489                    // Check ranges
490                    for range in &affected.ranges {
491                        match range.range_type {
492                            RangeType::Semver | RangeType::Ecosystem => {
493                                if Self::matches_semver_range(version, &range.events) {
494                                    return true;
495                                }
496                            }
497                            RangeType::Git => {}
498                        }
499                    }
500                }
501                false
502            })
503            .collect()
504    }
505
506    /// Check if a specific package version is affected by any vulnerabilities.
507    pub async fn matches(
508        &self,
509        ecosystem: &str,
510        package: &str,
511        version: &str,
512    ) -> Result<Vec<Advisory>> {
513        self.matches_with_options(ecosystem, package, version, &MatchOptions::default())
514            .await
515    }
516
517    /// Check if a package version is affected, with filtering options.
518    pub async fn matches_with_options(
519        &self,
520        ecosystem: &str,
521        package: &str,
522        version: &str,
523        options: &MatchOptions,
524    ) -> Result<Vec<Advisory>> {
525        let advisories = self.query(ecosystem, package).await?;
526        let mut matched = Vec::new();
527
528        for mut advisory in advisories {
529            let mut is_vulnerable = false;
530            for affected in &advisory.affected {
531                if affected.package.name != package || affected.package.ecosystem != ecosystem {
532                    continue;
533                }
534
535                // Check explicit versions
536                if affected.versions.contains(&version.to_string()) {
537                    is_vulnerable = true;
538                    break;
539                }
540
541                // Check ranges
542                for range in &affected.ranges {
543                    match range.range_type {
544                        RangeType::Semver => {
545                            if Self::matches_semver_range(version, &range.events) {
546                                is_vulnerable = true;
547                                break;
548                            }
549                        }
550                        RangeType::Ecosystem => {
551                            // For ecosystem ranges, try semver first as fallback
552                            if Self::matches_semver_range(version, &range.events) {
553                                is_vulnerable = true;
554                                break;
555                            }
556                        }
557                        RangeType::Git => {
558                            // Git ranges require commit hash comparison, skip for now
559                        }
560                    }
561                }
562                if is_vulnerable {
563                    break;
564                }
565            }
566
567            if is_vulnerable {
568                // Apply enrichment if requested
569                if options.include_enrichment {
570                    self.enrich_advisory(&mut advisory).await?;
571                }
572
573                // Apply filters
574                if self.advisory_passes_filters(&advisory, options) {
575                    matched.push(advisory);
576                }
577            }
578        }
579
580        Ok(matched)
581    }
582
583    /// Check if a version matches a semver range.
584    fn matches_semver_range(version: &str, events: &[Event]) -> bool {
585        let Ok(v) = semver::Version::parse(version) else {
586            return false;
587        };
588
589        let mut introduced: Option<semver::Version> = None;
590        let mut fixed: Option<semver::Version> = None;
591        let mut last_affected: Option<semver::Version> = None;
592
593        for event in events {
594            match event {
595                Event::Introduced(ver) => {
596                    if let Ok(parsed) = semver::Version::parse(ver) {
597                        introduced = Some(parsed);
598                    } else if ver == "0" {
599                        introduced = Some(semver::Version::new(0, 0, 0));
600                    }
601                }
602                Event::Fixed(ver) => {
603                    if let Ok(parsed) = semver::Version::parse(ver) {
604                        fixed = Some(parsed);
605                    }
606                }
607                Event::LastAffected(ver) => {
608                    if let Ok(parsed) = semver::Version::parse(ver) {
609                        last_affected = Some(parsed);
610                    }
611                }
612                Event::Limit(_) => {}
613            }
614        }
615
616        match (introduced, fixed, last_affected) {
617            (Some(start), Some(end), _) => v >= start && v < end,
618            (Some(start), None, Some(last)) => v >= start && v <= last,
619            (Some(start), None, None) => v >= start,
620            (None, Some(end), _) => v < end,
621            _ => false,
622        }
623    }
624
625    /// Enrich a single advisory with EPSS/KEV data.
626    async fn enrich_advisory(&self, advisory: &mut Advisory) -> Result<()> {
627        // Find CVE aliases
628        let cve_ids = Self::extract_cve_ids(advisory);
629
630        if cve_ids.is_empty() {
631            return Ok(());
632        }
633
634        // Look up enrichment data
635        for cve_id in &cve_ids {
636            if let Ok(Some(data)) = self.store.get_enrichment(cve_id).await {
637                let enrichment = advisory.enrichment.get_or_insert_with(Enrichment::default);
638                enrichment.epss_score = data.epss_score.or(enrichment.epss_score);
639                enrichment.epss_percentile = data.epss_percentile.or(enrichment.epss_percentile);
640                enrichment.is_kev = enrichment.is_kev || data.is_kev;
641                if data.kev_due_date.is_some() {
642                    enrichment.kev_due_date = data
643                        .kev_due_date
644                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
645                        .map(|d| d.with_timezone(&chrono::Utc));
646                }
647                if data.kev_ransomware.is_some() {
648                    enrichment.kev_ransomware = data.kev_ransomware;
649                }
650            }
651        }
652
653        Ok(())
654    }
655
656    /// Enrich multiple advisories with EPSS/KEV data.
657    async fn enrich_advisories(&self, advisories: &mut [Advisory]) -> Result<()> {
658        for advisory in advisories.iter_mut() {
659            self.enrich_advisory(advisory).await?;
660        }
661        Ok(())
662    }
663
664    /// Extract CVE IDs from an advisory (from ID or aliases).
665    fn extract_cve_ids(advisory: &Advisory) -> Vec<String> {
666        let mut cve_ids = Vec::new();
667
668        if advisory.id.starts_with("CVE-") {
669            cve_ids.push(advisory.id.clone());
670        }
671
672        if let Some(aliases) = &advisory.aliases {
673            for alias in aliases {
674                if alias.starts_with("CVE-") && !cve_ids.contains(alias) {
675                    cve_ids.push(alias.clone());
676                }
677            }
678        }
679
680        cve_ids
681    }
682
683    /// Check if an advisory passes the filter options.
684    fn advisory_passes_filters(&self, advisory: &Advisory, options: &MatchOptions) -> bool {
685        // Check KEV filter
686        if options.kev_only {
687            let is_kev = advisory
688                .enrichment
689                .as_ref()
690                .map(|e| e.is_kev)
691                .unwrap_or(false);
692            if !is_kev {
693                return false;
694            }
695        }
696
697        // Check CVSS filter
698        if let Some(min_cvss) = options.min_cvss {
699            let cvss = advisory
700                .enrichment
701                .as_ref()
702                .and_then(|e| e.cvss_v3_score)
703                .unwrap_or(0.0);
704            if cvss < min_cvss {
705                return false;
706            }
707        }
708
709        // Check EPSS filter
710        if let Some(min_epss) = options.min_epss {
711            let epss = advisory
712                .enrichment
713                .as_ref()
714                .and_then(|e| e.epss_score)
715                .unwrap_or(0.0);
716            if epss < min_epss {
717                return false;
718            }
719        }
720
721        // Check severity filter
722        if let Some(min_severity) = &options.min_severity {
723            let severity = advisory
724                .enrichment
725                .as_ref()
726                .and_then(|e| e.cvss_v3_severity)
727                .unwrap_or(Severity::None);
728            if severity < *min_severity {
729                return false;
730            }
731        }
732
733        true
734    }
735
736    /// Fetch live EPSS scores for CVEs (not from cache).
737    pub async fn fetch_epss_scores(&self, cve_ids: &[&str]) -> Result<HashMap<String, f64>> {
738        let scores = self.epss_source.fetch_scores(cve_ids).await?;
739        Ok(scores.into_iter().map(|(k, v)| (k, v.epss)).collect())
740    }
741
742    /// Check if a CVE is in the CISA KEV catalog.
743    pub async fn is_kev(&self, cve_id: &str) -> Result<bool> {
744        // Check cache first
745        if let Some(data) = self.store.get_enrichment(cve_id).await? {
746            return Ok(data.is_kev);
747        }
748
749        // Fetch from source
750        let entry = self.kev_source.is_kev(cve_id).await?;
751        Ok(entry.is_some())
752    }
753
754    // === OSS Index Methods ===
755
756    /// Query OSS Index for vulnerabilities affecting the given PURLs.
757    ///
758    /// This method provides automatic caching:
759    /// - First checks the cache for each PURL
760    /// - Only queries OSS Index for cache misses
761    /// - Caches results for future queries
762    ///
763    /// # Arguments
764    ///
765    /// * `purls` - Package URLs to query (e.g., "pkg:npm/lodash@4.17.20")
766    ///
767    /// # Returns
768    ///
769    /// Vector of advisories for all vulnerabilities found.
770    ///
771    /// # Errors
772    ///
773    /// Returns an error if OSS Index is not configured or if the query fails.
774    ///
775    /// # Example
776    ///
777    /// ```rust,ignore
778    /// use vulnera_advisors::{VulnerabilityManager, Purl};
779    ///
780    /// let manager = VulnerabilityManager::builder()
781    ///     .redis_url("redis://localhost:6379")
782    ///     .with_ossindex(None)
783    ///     .build()?;
784    ///
785    /// let purls = vec![
786    ///     Purl::new("npm", "lodash").with_version("4.17.20").to_string(),
787    /// ];
788    ///
789    /// let advisories = manager.query_ossindex(&purls).await?;
790    /// ```
791    pub async fn query_ossindex(&self, purls: &[String]) -> Result<Vec<Advisory>> {
792        let source = self.ossindex_source.as_ref().ok_or_else(|| {
793            AdvisoryError::config("OSS Index not configured. Use .with_ossindex() in builder.")
794        })?;
795
796        // Check cache for all PURLs
797        let mut cached_advisories = Vec::new();
798        let mut cache_misses = Vec::new();
799
800        for purl in purls {
801            let cache_key = Purl::cache_key_from_str(purl);
802            match self.store.get_ossindex_cache(&cache_key).await {
803                Ok(Some(cache)) if !cache.is_expired() => {
804                    debug!("OSS Index cache hit for {}", purl);
805                    cached_advisories.extend(cache.advisories);
806                }
807                _ => {
808                    debug!("OSS Index cache miss for {}", purl);
809                    cache_misses.push(purl.clone());
810                }
811            }
812        }
813
814        // Query OSS Index for cache misses
815        if !cache_misses.is_empty() {
816            debug!("Querying OSS Index for {} cache misses", cache_misses.len());
817            let fresh_advisories = source.query_advisories(&cache_misses).await.map_err(|e| {
818                AdvisoryError::SourceFetch {
819                    source_name: "ossindex".to_string(),
820                    message: e.to_string(),
821                }
822            })?;
823
824            // Group advisories by PURL for caching
825            let advisory_map = Self::group_advisories_by_purl(&cache_misses, &fresh_advisories);
826
827            // Cache results for each PURL
828            for (purl, advisories) in &advisory_map {
829                let cache_key = Purl::cache_key_from_str(purl);
830                let cache = OssIndexCache::new(advisories.clone());
831                if let Err(e) = self.store.store_ossindex_cache(&cache_key, &cache).await {
832                    debug!("Failed to cache OSS Index result for {}: {}", purl, e);
833                }
834            }
835
836            // Flatten and add to results
837            for advisories in advisory_map.into_values() {
838                cached_advisories.extend(advisories);
839            }
840        }
841
842        Ok(cached_advisories)
843    }
844
845    /// Query OSS Index for vulnerabilities with fallback to stored advisories.
846    ///
847    /// This method first queries OSS Index, then falls back to the local store
848    /// if the OSS Index query fails or returns no results.
849    ///
850    /// # Arguments
851    ///
852    /// * `packages` - List of packages to query (ecosystem, name, optional version)
853    ///
854    /// # Returns
855    ///
856    /// Map of package keys to their advisories.
857    pub async fn query_batch_with_ossindex(
858        &self,
859        packages: &[PackageKey],
860    ) -> Result<HashMap<PackageKey, Vec<Advisory>>> {
861        let mut results: HashMap<PackageKey, Vec<Advisory>> = HashMap::new();
862
863        // Build PURLs for packages that have versions
864        let (with_version, without_version): (Vec<_>, Vec<_>) =
865            packages.iter().partition(|p| p.version.is_some());
866
867        // Query OSS Index for packages with versions
868        if !with_version.is_empty() && self.ossindex_source.is_some() {
869            let purls: Vec<String> = with_version
870                .iter()
871                .map(|p| {
872                    Purl::new(&p.ecosystem, &p.name)
873                        .with_version(p.version.as_ref().unwrap())
874                        .to_string()
875                })
876                .collect();
877
878            match self.query_ossindex(&purls).await {
879                Ok(advisories) => {
880                    // Group advisories by package key
881                    for pkg in &with_version {
882                        let pkg_advisories: Vec<_> = advisories
883                            .iter()
884                            .filter(|a| {
885                                a.affected.iter().any(|aff| {
886                                    aff.package.ecosystem.eq_ignore_ascii_case(&pkg.ecosystem)
887                                        && aff.package.name == pkg.name
888                                })
889                            })
890                            .cloned()
891                            .collect();
892                        results.insert((*pkg).clone(), pkg_advisories);
893                    }
894                }
895                Err(e) => {
896                    warn!("OSS Index query failed, falling back to local store: {}", e);
897                    // Fallback to local store
898                    for pkg in &with_version {
899                        let advisories = if let Some(version) = &pkg.version {
900                            self.matches(&pkg.ecosystem, &pkg.name, version).await?
901                        } else {
902                            self.query(&pkg.ecosystem, &pkg.name).await?
903                        };
904                        results.insert((*pkg).clone(), advisories);
905                    }
906                }
907            }
908        }
909
910        // Query local store for packages without versions
911        for pkg in &without_version {
912            let advisories = self.query(&pkg.ecosystem, &pkg.name).await?;
913            results.insert((*pkg).clone(), advisories);
914        }
915
916        Ok(results)
917    }
918
919    /// Invalidate cached OSS Index results for specific PURLs.
920    ///
921    /// Use this to force a fresh query on the next call.
922    pub async fn invalidate_ossindex_cache(&self, purls: &[String]) -> Result<()> {
923        for purl in purls {
924            let cache_key = Purl::cache_key_from_str(purl);
925            self.store.invalidate_ossindex_cache(&cache_key).await?;
926        }
927        Ok(())
928    }
929
930    /// Invalidate all cached OSS Index results.
931    pub async fn invalidate_all_ossindex_cache(&self) -> Result<()> {
932        self.store.invalidate_all_ossindex_cache().await?;
933        Ok(())
934    }
935
936    /// Group advisories by their associated PURL.
937    fn group_advisories_by_purl(
938        purls: &[String],
939        advisories: &[Advisory],
940    ) -> HashMap<String, Vec<Advisory>> {
941        let mut map: HashMap<String, Vec<Advisory>> = HashMap::new();
942
943        // Initialize map with empty vectors for all PURLs
944        for purl in purls {
945            map.insert(purl.clone(), Vec::new());
946        }
947
948        // Group advisories
949        for advisory in advisories {
950            for affected in &advisory.affected {
951                // Find matching PURL
952                for purl in purls {
953                    if let Ok(parsed) = Purl::parse(purl) {
954                        if parsed.name == affected.package.name {
955                            map.entry(purl.clone()).or_default().push(advisory.clone());
956                            break;
957                        }
958                    }
959                }
960            }
961        }
962
963        map
964    }
965}