octofhir_canonical_manager/
lib.rs

1//! # FHIR Canonical Manager
2//!
3//! A library-first solution for managing FHIR Implementation Guide packages,
4//! providing fast canonical URL resolution and resource search capabilities.
5//!
6//! ## Quick Start
7//!
8//! ```rust,no_run
9//! use octofhir_canonical_manager::{CanonicalManager, FcmConfig};
10//!
11//! #[tokio::main]
12//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
13//!     let config = FcmConfig::load().await?;
14//!     let manager = CanonicalManager::new(config).await?;
15//!
16//!     // Install a package
17//!     manager.install_package("hl7.fhir.us.core", "6.1.0").await?;
18//!
19//!     // Resolve a canonical URL
20//!     let resource = manager.resolve("http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient").await?;
21//!
22//!     // Get search parameters for a resource type
23//!     let search_params = manager.get_search_parameters("Patient").await?;
24//!     for param in search_params {
25//!         println!("{}: {} ({})", param.code, param.name, param.type_field);
26//!     }
27//!
28//!     Ok(())
29//! }
30//! ```
31
32#[doc(hidden)]
33pub mod binary_storage;
34#[doc(hidden)]
35pub mod change_detection;
36#[doc(hidden)]
37pub mod concurrency;
38pub mod config;
39#[doc(hidden)]
40pub mod config_validator;
41pub mod domain;
42pub mod error;
43#[cfg(feature = "fuzzy-search")]
44#[doc(hidden)]
45pub mod fuzzy;
46#[doc(hidden)]
47pub mod incremental_indexer;
48#[doc(hidden)]
49pub mod index_writer;
50#[doc(hidden)]
51pub mod output;
52#[doc(hidden)]
53pub mod package;
54#[doc(hidden)]
55pub mod performance;
56#[doc(hidden)]
57pub mod rebuild_strategy;
58#[doc(hidden)]
59pub mod registry;
60pub mod resolver;
61pub mod search;
62#[doc(hidden)]
63pub mod storage;
64#[doc(hidden)]
65pub mod traits;
66#[doc(hidden)]
67pub mod unified_storage;
68
69#[cfg(feature = "cli")]
70#[doc(hidden)]
71pub mod cli;
72
73#[cfg(feature = "cli")]
74#[doc(hidden)]
75pub mod cli_error;
76
77// Public facade: configs, domain, resolver/search entrypoints, storage stats, and top-level errors
78pub use config::{FcmConfig, OptimizationConfig, PackageSpec, RegistryConfig, StorageConfig};
79pub use domain::{CanonicalUrl, CanonicalWithVersion, FhirVersion, PackageVersion};
80pub use error::{FcmError, Result};
81pub use resolver::CanonicalResolver;
82pub use search::SearchEngine;
83pub use unified_storage::{UnifiedIntegrityReport, UnifiedStorageStats};
84
85use std::collections::HashSet;
86use std::sync::Arc;
87use tantivy::{Index, schema::*};
88use tokio::fs;
89use tracing::{debug, info, warn};
90
91// Internal imports for types no longer re-exported at crate root
92use crate::binary_storage::PackageInfo;
93use crate::change_detection::PackageChangeDetector;
94use crate::config_validator::ValidationResult;
95use crate::incremental_indexer::IncrementalIndexer;
96use crate::performance::{
97    IndexOperationType, PackageOperationType, PerformanceAnalysis, PerformanceConfig,
98    PerformanceMetrics, PerformanceMonitor,
99};
100use crate::rebuild_strategy::{RebuildStrategy, SmartRebuildStrategy};
101use crate::unified_storage::UnifiedStorage;
102
103#[cfg(feature = "cli")]
104use crate::output::Progress;
105
106#[cfg(feature = "cli")]
107use crate::cli::ProgressContext;
108
109/// Performance metrics for optimization components
110#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
111pub struct OptimizationMetrics {
112    pub change_detector_cache_size: usize,
113    pub incremental_updates_count: usize,
114    pub parallel_processing_efficiency: f64,
115    pub average_rebuild_time: std::time::Duration,
116    pub storage_compression_ratio: f64,
117}
118
119/// Status of package changes
120#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct PackageChangeStatus {
122    pub package_id: String,
123    pub has_changes: bool,
124    pub added_files: usize,
125    pub modified_files: usize,
126    pub removed_files: usize,
127}
128
129/// Information about a FHIR SearchParameter
130#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
131pub struct SearchParameterInfo {
132    pub name: String,
133    pub code: String,
134    pub base: Vec<String>,
135    pub type_field: String,
136    pub description: Option<String>,
137    pub expression: Option<String>,
138    pub xpath: Option<String>,
139    pub url: Option<String>,
140    pub status: Option<String>,
141}
142
143impl SearchParameterInfo {
144    /// Create from a FHIR resource
145    pub fn from_resource(resource: &package::FhirResource) -> Result<Self> {
146        let content = &resource.content;
147
148        Ok(Self {
149            name: content
150                .get("name")
151                .and_then(|v| v.as_str())
152                .unwrap_or("unknown")
153                .to_string(),
154            code: content
155                .get("code")
156                .and_then(|v| v.as_str())
157                .unwrap_or("")
158                .to_string(),
159            base: content
160                .get("base")
161                .and_then(|v| v.as_array())
162                .map(|arr| {
163                    arr.iter()
164                        .filter_map(|v| v.as_str().map(String::from))
165                        .collect()
166                })
167                .or_else(|| {
168                    content
169                        .get("base")
170                        .and_then(|v| v.as_str())
171                        .map(|s| vec![s.to_string()])
172                })
173                .unwrap_or_default(),
174            type_field: content
175                .get("type")
176                .and_then(|v| v.as_str())
177                .unwrap_or("string")
178                .to_string(),
179            description: content
180                .get("description")
181                .and_then(|v| v.as_str())
182                .map(String::from),
183            expression: content
184                .get("expression")
185                .and_then(|v| v.as_str())
186                .map(String::from),
187            xpath: content
188                .get("xpath")
189                .and_then(|v| v.as_str())
190                .map(String::from),
191            url: content
192                .get("url")
193                .and_then(|v| v.as_str())
194                .map(String::from),
195            status: content
196                .get("status")
197                .and_then(|v| v.as_str())
198                .map(String::from),
199        })
200    }
201}
202
203/// Package information for rebuild operations
204#[derive(Debug, Clone)]
205pub struct RebuildPackageInfo {
206    pub id: String,
207    pub path: std::path::PathBuf,
208}
209
210impl RebuildPackageInfo {
211    pub fn new(id: String, path: std::path::PathBuf) -> Self {
212        Self { id, path }
213    }
214}
215
216/// Main FHIR Canonical Manager
217///
218/// Provides high-level interface for managing FHIR packages and resolving canonical URLs.
219pub struct CanonicalManager {
220    config: FcmConfig,
221    storage: Arc<UnifiedStorage>, // Optimized unified storage system
222    registry_client: registry::RegistryClient,
223    // Optional trait-based components for advanced construction
224    registry_dyn: Option<Arc<dyn crate::traits::AsyncRegistry + Send + Sync>>,
225    pkg_store_dyn: Option<Arc<dyn crate::traits::PackageStore + Send + Sync>>,
226    #[allow(dead_code)]
227    index_store_dyn: Option<Arc<dyn crate::traits::IndexStore + Send + Sync>>,
228    resolver: CanonicalResolver,
229    search_engine: SearchEngine,
230    // Optimization components
231    change_detector: Arc<PackageChangeDetector>,
232    rebuild_strategy: SmartRebuildStrategy,
233    incremental_indexer: Arc<IncrementalIndexer>,
234    // Performance tracking
235    performance_monitor: Arc<PerformanceMonitor>,
236    tantivy_index: Arc<Index>,
237}
238
239impl std::fmt::Debug for CanonicalManager {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("CanonicalManager")
242            .field("registry_url", &self.config.registry.url)
243            .field("packages_configured", &self.config.packages.len())
244            .finish()
245    }
246}
247
248impl CanonicalManager {
249    /// Create a new CanonicalManager with the given configuration
250    pub async fn new(mut config: FcmConfig) -> Result<Self> {
251        // Add early timeout check for CI/test environments to avoid hanging
252        if std::env::var("CI").is_ok() || std::env::var("FHIRPATH_QUICK_INIT").is_ok() {
253            info!("Quick initialization mode detected (CI or FHIRPATH_QUICK_INIT set)");
254            // Use minimal configuration for quick initialization
255            config.optimization.parallel_workers = 1;
256            config.optimization.enable_metrics = false;
257            config.optimization.use_mmap = false;
258        }
259
260        // Validate and optimize configuration before initialization
261        // Skip optimization for test configurations (parallel_workers=1 and enable_metrics=false)
262        let skip_optimization =
263            config.optimization.parallel_workers == 1 && !config.optimization.enable_metrics;
264        let validation = if skip_optimization {
265            debug!("Skipping config optimization for test mode");
266            crate::config_validator::ConfigValidator::validate_config(&config)
267        } else {
268            crate::config_validator::ConfigValidator::optimize_config(&mut config)
269        };
270
271        if !validation.is_valid {
272            for error in &validation.errors {
273                tracing::error!("Configuration error: {}", error);
274            }
275            return Err(crate::error::FcmError::Config(
276                crate::error::ConfigError::ValidationFailed {
277                    message: "Invalid optimization configuration".to_string(),
278                },
279            ));
280        }
281
282        // Log configuration warnings and recommendations
283        for warning in &validation.warnings {
284            warn!("Configuration warning: {}", warning);
285        }
286
287        for recommendation in &validation.recommendations {
288            info!("Configuration recommendation: {}", recommendation);
289        }
290
291        // Initialize unified storage system with timeout to prevent hanging
292        let storage_future = UnifiedStorage::new(
293            config.storage.clone(),
294            config.optimization.use_mmap,
295            config.optimization.compression_level,
296        );
297
298        let storage =
299            match tokio::time::timeout(std::time::Duration::from_secs(30), storage_future).await {
300                Ok(Ok(s)) => Arc::new(s),
301                Ok(Err(e)) => {
302                    tracing::error!("Failed to initialize unified storage: {}", e);
303                    return Err(e);
304                }
305                Err(_) => {
306                    tracing::error!("Unified storage initialization timed out after 30 seconds");
307                    return Err(FcmError::Config(
308                        crate::error::ConfigError::ValidationFailed {
309                            message: "Storage initialization timed out".to_string(),
310                        },
311                    ));
312                }
313            };
314
315        // Initialize registry client
316        let expanded_storage = config.get_expanded_storage_config();
317        let registry_client =
318            registry::RegistryClient::new(&config.registry, expanded_storage.cache_dir.clone())
319                .await?;
320
321        // Initialize resolver with unified storage
322        let resolver = CanonicalResolver::new(Arc::clone(storage.package_storage()));
323
324        // Initialize search engine with unified storage
325        let search_engine = SearchEngine::new(Arc::clone(storage.package_storage()));
326
327        // Initialize optimization components based on config
328        let change_detector = Arc::new(PackageChangeDetector::new());
329
330        // Initialize smart rebuild strategy
331        let rebuild_strategy = SmartRebuildStrategy::new(Arc::clone(&change_detector))
332            .with_full_rebuild_threshold(config.optimization.full_rebuild_threshold)
333            .with_batch_size(config.optimization.incremental_batch_size);
334
335        // Create Tantivy index for full-text search
336        let tantivy_index = Arc::new(Self::create_search_index()?);
337
338        // Initialize incremental indexer with a persistent writer manager
339        let writer_manager = Arc::new(index_writer::IndexWriterManager::new(
340            Arc::clone(&tantivy_index),
341            50_000_000,
342        ));
343        let incremental_indexer = Arc::new(IncrementalIndexer::new(
344            Arc::clone(&writer_manager),
345            Arc::clone(&change_detector),
346            config.optimization.batch_size,
347        ));
348
349        // Initialize performance monitor
350        let perf_config = PerformanceConfig {
351            enable_metrics: config.optimization.enable_metrics,
352            metrics_interval: std::time::Duration::from_secs(30),
353            max_samples: 1000,
354            enable_detailed_logging: config.optimization.enable_metrics,
355        };
356        let performance_monitor = Arc::new(PerformanceMonitor::new(perf_config));
357
358        info!("FHIR Canonical Manager initialized successfully with optimized configuration");
359
360        Ok(Self {
361            config,
362            storage,
363            registry_client,
364            registry_dyn: None,
365            pkg_store_dyn: None,
366            index_store_dyn: None,
367            resolver,
368            search_engine,
369            change_detector,
370            rebuild_strategy,
371            incremental_indexer,
372            performance_monitor,
373            tantivy_index,
374        })
375    }
376
377    /// Simple alias for end-user clarity
378    pub async fn new_simple(config: FcmConfig) -> Result<Self> {
379        Self::new(config).await
380    }
381
382    /// Advanced constructor using trait-based components. Caller must provide a BinaryStorage-backed
383    /// search storage for resolver/search.
384    pub async fn new_with_components(
385        mut config: FcmConfig,
386        package_store: Arc<dyn crate::traits::PackageStore + Send + Sync>,
387        index_store: Arc<dyn crate::traits::IndexStore + Send + Sync>,
388        registry: Arc<dyn crate::traits::AsyncRegistry + Send + Sync>,
389        search_storage: Arc<crate::binary_storage::BinaryStorage>,
390    ) -> Result<Self> {
391        // Reuse config optimization/validation
392        let skip_optimization =
393            config.optimization.parallel_workers == 1 && !config.optimization.enable_metrics;
394        let validation = if skip_optimization {
395            crate::config_validator::ConfigValidator::validate_config(&config)
396        } else {
397            crate::config_validator::ConfigValidator::optimize_config(&mut config)
398        };
399        if !validation.is_valid {
400            return Err(crate::error::FcmError::Config(
401                crate::error::ConfigError::ValidationFailed {
402                    message: "Invalid optimization configuration".to_string(),
403                },
404            ));
405        }
406
407        // Build a minimal UnifiedStorage for compatibility where needed (index store may still be used separately)
408        let storage = Arc::new(
409            UnifiedStorage::new(
410                config.storage.clone(),
411                config.optimization.use_mmap,
412                config.optimization.compression_level,
413            )
414            .await?,
415        );
416
417        // Resolver/Search backed by provided search_storage (BinaryStorage)
418        let resolver = CanonicalResolver::new(Arc::clone(&search_storage));
419        let search_engine = SearchEngine::new(Arc::clone(&search_storage));
420
421        // Optimization components
422        let change_detector = Arc::new(PackageChangeDetector::new());
423        let rebuild_strategy = SmartRebuildStrategy::new(Arc::clone(&change_detector))
424            .with_full_rebuild_threshold(config.optimization.full_rebuild_threshold)
425            .with_batch_size(config.optimization.incremental_batch_size);
426
427        let tantivy_index = Arc::new(Self::create_search_index()?);
428        let writer_manager = Arc::new(index_writer::IndexWriterManager::new(
429            Arc::clone(&tantivy_index),
430            50_000_000,
431        ));
432        let incremental_indexer = Arc::new(IncrementalIndexer::new(
433            Arc::clone(&writer_manager),
434            Arc::clone(&change_detector),
435            config.optimization.batch_size,
436        ));
437
438        let perf_config = PerformanceConfig {
439            enable_metrics: config.optimization.enable_metrics,
440            metrics_interval: std::time::Duration::from_secs(30),
441            max_samples: 1000,
442            enable_detailed_logging: config.optimization.enable_metrics,
443        };
444        let performance_monitor = Arc::new(PerformanceMonitor::new(perf_config));
445
446        // Create a dummy concrete client for compatibility, but route calls to trait object helpers
447        let expanded_storage = config.get_expanded_storage_config();
448        let registry_client =
449            registry::RegistryClient::new(&config.registry, expanded_storage.cache_dir.clone())
450                .await?;
451
452        Ok(Self {
453            config,
454            storage,
455            registry_client,
456            registry_dyn: Some(registry),
457            pkg_store_dyn: Some(package_store),
458            index_store_dyn: Some(index_store),
459            resolver,
460            search_engine,
461            change_detector,
462            rebuild_strategy,
463            incremental_indexer,
464            performance_monitor,
465            tantivy_index,
466        })
467    }
468
469    fn create_search_index() -> Result<Index> {
470        let mut schema_builder = Schema::builder();
471
472        schema_builder.add_text_field("resource_type", TEXT | STORED);
473        schema_builder.add_text_field("id", TEXT | STORED);
474        schema_builder.add_text_field("canonical_url", TEXT | STORED);
475        schema_builder.add_text_field("package_id", TEXT | STORED);
476        schema_builder.add_text_field("name", TEXT | STORED);
477        schema_builder.add_text_field("title", TEXT | STORED);
478        schema_builder.add_text_field("description", TEXT);
479        schema_builder.add_text_field("content", TEXT);
480        schema_builder.add_text_field("search_text", TEXT);
481
482        let schema = schema_builder.build();
483        let index = Index::create_in_ram(schema);
484
485        Ok(index)
486    }
487
488    // Helper: registry download via trait or concrete client
489    async fn registry_download(&self, spec: &PackageSpec) -> Result<registry::PackageDownload> {
490        if let Some(r) = &self.registry_dyn {
491            r.download_package(spec).await
492        } else {
493            self.registry_client.download_package(spec).await
494        }
495    }
496
497    // Helper: list packages via trait or unified storage
498    async fn list_packages_via_store(&self) -> Result<Vec<PackageInfo>> {
499        if let Some(s) = &self.pkg_store_dyn {
500            s.list_packages().await
501        } else {
502            self.storage.list_packages().await
503        }
504    }
505
506    // Helper: add package via trait or unified storage
507    async fn add_package_via_store(&self, pkg: &package::ExtractedPackage) -> Result<()> {
508        if let Some(s) = &self.pkg_store_dyn {
509            s.add_package(pkg).await
510        } else {
511            self.storage.add_package(pkg).await
512        }
513    }
514
515    /// Install a FHIR package by name and version
516    /// Enhanced with optimization components for faster installation
517    #[tracing::instrument(name = "manager.install_package", skip(self), fields(pkg = %name, ver = %version))]
518    pub async fn install_package(&self, name: &str, version: &str) -> Result<()> {
519        // Check if we should use simplified installation for testing
520        if self.config.optimization.parallel_workers == 1
521            && !self.config.optimization.enable_metrics
522        {
523            debug!("Using simplified installation for testing mode");
524            return self.install_package_simple(name, version).await;
525        }
526
527        let tracker = self.performance_monitor.start_operation("install_package");
528        let mut installed = HashSet::new();
529
530        info!(
531            "Installing package {}@{} with optimization features enabled",
532            name, version
533        );
534
535        self.install_package_with_dependencies(name, version, &mut installed)
536            .await?;
537
538        // Use smart rebuild strategy to determine if full or incremental rebuild is needed
539        info!("Analyzing rebuild strategy for newly installed packages...");
540        self.smart_rebuild_index().await?;
541
542        let total_duration = tracker.finish();
543
544        // Record package operation metrics
545        self.performance_monitor
546            .record_package_operation(PackageOperationType::Install, total_duration)
547            .await;
548
549        info!("Package installation completed in {:?}", total_duration);
550
551        // Track performance metrics
552        if self.config.optimization.enable_metrics {
553            info!(
554                "Performance: Installed {} packages in {:?}",
555                installed.len(),
556                total_duration
557            );
558        }
559
560        Ok(())
561    }
562
563    /// Simplified package installation for testing (bypasses complex optimizations)
564    async fn install_package_simple(&self, name: &str, version: &str) -> Result<()> {
565        debug!("Installing package {}@{} in simplified mode", name, version);
566
567        // Use a simple set to track what we've installed to avoid infinite recursion
568        let mut installed = std::collections::HashSet::new();
569        self.install_package_simple_recursive(name, version, &mut installed)
570            .await
571    }
572
573    /// Recursive helper for simplified installation with dependency handling
574    fn install_package_simple_recursive<'a>(
575        &'a self,
576        name: &'a str,
577        version: &'a str,
578        installed: &'a mut std::collections::HashSet<String>,
579    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
580        Box::pin(async move {
581            let package_key = format!("{name}@{version}");
582
583            // Skip if already processed
584            if installed.contains(&package_key) {
585                debug!("Package {} already processed", package_key);
586                return Ok(());
587            }
588
589            // Check if package already exists in storage
590            debug!(
591                "Checking if package {} already exists in storage...",
592                package_key
593            );
594            let packages = self.storage.list_packages().await?;
595            if packages
596                .iter()
597                .any(|p| p.name == name && p.version == version)
598            {
599                debug!("Package {} already installed in storage", package_key);
600                installed.insert(package_key);
601                return Ok(());
602            }
603            debug!(
604                "Package {} not found in storage, proceeding with installation",
605                package_key
606            );
607
608            let spec = crate::config::PackageSpec {
609                name: name.to_string(),
610                version: version.to_string(),
611                priority: 1,
612            };
613
614            // Download package and get metadata (includes dependencies)
615            debug!("Downloading package {}...", package_key);
616            let download = self.registry_client.download_package(&spec).await?;
617            let dependencies = download.metadata.dependencies.clone();
618            debug!(
619                "Package {} downloaded successfully, found {} dependencies",
620                package_key,
621                dependencies.len()
622            );
623
624            // Install dependencies first
625            for (dep_name, dep_version) in dependencies {
626                debug!("Installing dependency: {}@{}", dep_name, dep_version);
627                self.install_package_simple_recursive(&dep_name, &dep_version, installed)
628                    .await?;
629            }
630
631            debug!("Extracting package {}...", package_key);
632            let extractor =
633                crate::package::PackageExtractor::new(self.config.storage.packages_dir.clone());
634            let extracted = extractor.extract_package(download).await?;
635            debug!("Package {} extracted successfully", package_key);
636
637            // Add to storage (simplified - no complex indexing)
638            debug!("Adding package {} to storage...", package_key);
639            self.storage.add_package(&extracted).await?;
640            #[cfg(feature = "metrics")]
641            {
642                metrics::increment_counter!("packages_installed");
643            }
644            debug!("Package {} added to storage successfully", package_key);
645
646            installed.insert(package_key.clone());
647            info!(
648                "Package {} installed successfully (simplified mode)",
649                package_key
650            );
651            Ok(())
652        })
653    }
654
655    /// Count total packages that will be installed (including dependencies)
656    #[cfg(feature = "cli")]
657    pub async fn count_packages_to_install(&self, name: &str, version: &str) -> Result<usize> {
658        let mut visited = HashSet::new();
659        self.count_packages_recursive(name, version, &mut visited)
660            .await
661    }
662
663    /// Recursively count packages
664    #[cfg(feature = "cli")]
665    fn count_packages_recursive<'a>(
666        &'a self,
667        name: &'a str,
668        version: &'a str,
669        visited: &'a mut HashSet<String>,
670    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize>> + Send + 'a>> {
671        Box::pin(async move {
672            let package_key = format!("{name}@{version}");
673
674            if visited.contains(&package_key) {
675                return Ok(0);
676            }
677
678            visited.insert(package_key);
679            let mut total = 1; // Count this package
680
681            // Get dependencies
682            let metadata = self
683                .registry_client
684                .get_package_metadata(name, version)
685                .await?;
686
687            // Recursively count dependencies
688            for (dep_name, dep_version) in metadata.dependencies {
689                total += self
690                    .count_packages_recursive(&dep_name, &dep_version, visited)
691                    .await?;
692            }
693
694            Ok(total)
695        })
696    }
697
698    /// Install a package with progress tracking (CLI feature)
699    #[cfg(feature = "cli")]
700    pub async fn install_package_with_progress(
701        &self,
702        name: &str,
703        version: &str,
704        progress: &mut ProgressContext,
705    ) -> Result<()> {
706        let mut installed = HashSet::new();
707        self.install_package_with_dependencies_and_progress(
708            name,
709            version,
710            &mut installed,
711            progress,
712        )
713        .await?;
714
715        // Use smart rebuild strategy to determine if full or incremental rebuild is needed
716        info!("Analyzing rebuild strategy for newly installed packages...");
717        self.smart_rebuild_index().await?;
718
719        Ok(())
720    }
721
722    /// Install a package with recursive dependency resolution
723    fn install_package_with_dependencies<'a>(
724        &'a self,
725        name: &'a str,
726        version: &'a str,
727        installed: &'a mut HashSet<String>,
728    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
729        Box::pin(async move {
730            let package_key = format!("{name}@{version}");
731
732            // Check if already installed to avoid cycles and duplicates
733            if installed.contains(&package_key) {
734                info!("Package already processed: {}", package_key);
735                return Ok(());
736            }
737
738            // Check if package already exists in storage (trait-aware)
739            let packages = self.list_packages_via_store().await?;
740            for p in &packages {
741                debug!("- {}@{}", p.name, p.version);
742            }
743
744            if packages
745                .iter()
746                .any(|p| p.name == name && p.version == version)
747            {
748                info!("Package already installed in storage: {}", package_key);
749                installed.insert(package_key);
750                return Ok(());
751            }
752
753            info!("Installing package: {}@{}", name, version);
754
755            let spec = PackageSpec {
756                name: name.to_string(),
757                version: version.to_string(),
758                priority: 1,
759            };
760
761            // Download package and get metadata
762            let download = self.registry_download(&spec).await?;
763            let dependencies = download.metadata.dependencies.clone();
764
765            // Extract package
766            let extractor = package::PackageExtractor::new(self.config.storage.cache_dir.clone());
767            let extracted = extractor.extract_package(download).await?;
768
769            // Move extracted package to packages directory
770            let package_dir = self
771                .config
772                .storage
773                .packages_dir
774                .join(format!("{name}-{version}"));
775            if package_dir.exists() {
776                fs::remove_dir_all(&package_dir).await?;
777            }
778            fs::rename(&extracted.extraction_path, &package_dir).await?;
779
780            // Update file paths in extracted package to point to new location
781            let mut updated_extracted = extracted;
782            for resource in &mut updated_extracted.resources {
783                // Replace the old extraction path with the new package directory path
784                if let Ok(relative_path) = resource
785                    .file_path
786                    .strip_prefix(&updated_extracted.extraction_path)
787                {
788                    resource.file_path = package_dir.join(relative_path);
789                }
790            }
791            updated_extracted.extraction_path = package_dir.clone();
792
793            // Add package to unified storage
794            if let Err(e) = self.add_package_via_store(&updated_extracted).await {
795                match &e {
796                    crate::error::FcmError::Storage(
797                        crate::error::StorageError::PackageAlreadyExists { .. },
798                    ) => {
799                        info!("Package already exists in storage: {}", package_key);
800                    }
801                    _ => return Err(e),
802                }
803            }
804            #[cfg(feature = "metrics")]
805            {
806                metrics::increment_counter!("packages_installed");
807            }
808
809            // Mark as installed
810            installed.insert(package_key.clone());
811
812            // Install dependencies recursively
813            for (dep_name, dep_version) in dependencies {
814                info!("Installing dependency: {}@{}", dep_name, dep_version);
815                self.install_package_with_dependencies(&dep_name, &dep_version, installed)
816                    .await?;
817            }
818
819            // Note: Package indexing happens during rebuild_index or explicit index updates
820            // Individual packages are indexed when the full index is rebuilt
821
822            info!("Package installed successfully: {}", package_key);
823            Ok(())
824        })
825    }
826
827    /// Install a package with recursive dependency resolution and progress tracking
828    #[cfg(feature = "cli")]
829    fn install_package_with_dependencies_and_progress<'a>(
830        &'a self,
831        name: &'a str,
832        version: &'a str,
833        installed: &'a mut HashSet<String>,
834        progress: &'a mut ProgressContext,
835    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
836        Box::pin(async move {
837            let package_key = format!("{name}@{version}");
838
839            // Check if already installed to avoid cycles and duplicates
840            if installed.contains(&package_key) {
841                info!("Package already processed: {}", package_key);
842                return Ok(());
843            }
844
845            // Check if package already exists in storage
846            let packages = self.storage.list_packages().await?;
847            if packages
848                .iter()
849                .any(|p| p.name == name && p.version == version)
850            {
851                info!("Package already installed in storage: {}", package_key);
852                #[cfg(feature = "cli")]
853                Progress::step(&format!("✓ Package already installed: {package_key}"));
854                installed.insert(package_key);
855                return Ok(());
856            }
857
858            // Create stream progress for this package
859            let stream_progress = progress.create_stream_progress();
860
861            info!("Installing package: {}@{}", name, version);
862            #[cfg(feature = "cli")]
863            Progress::step(&format!("Downloading {package_key}"));
864
865            let spec = PackageSpec {
866                name: name.to_string(),
867                version: version.to_string(),
868                priority: 1,
869            };
870
871            // Download package with streaming progress
872            let download = self
873                .registry_client
874                .download_package_with_progress(&spec, Some(&stream_progress))
875                .await?;
876            let dependencies = download.metadata.dependencies.clone();
877
878            // Extract package
879            #[cfg(feature = "cli")]
880            Progress::step(&format!("Extracting {package_key}"));
881            let extractor = package::PackageExtractor::new(self.config.storage.cache_dir.clone());
882            let extracted = extractor.extract_package(download).await?;
883
884            // Move extracted package to packages directory
885            let package_dir = self
886                .config
887                .storage
888                .packages_dir
889                .join(format!("{name}-{version}"));
890            if package_dir.exists() {
891                fs::remove_dir_all(&package_dir).await?;
892            }
893            fs::rename(&extracted.extraction_path, &package_dir).await?;
894
895            // Update file paths in extracted package to point to new location
896            let mut updated_extracted = extracted;
897            for resource in &mut updated_extracted.resources {
898                // Replace the old extraction path with the new package directory path
899                if let Ok(relative_path) = resource
900                    .file_path
901                    .strip_prefix(&updated_extracted.extraction_path)
902                {
903                    resource.file_path = package_dir.join(relative_path);
904                }
905            }
906            updated_extracted.extraction_path = package_dir.clone();
907
908            // Add package to unified storage
909            if let Err(e) = self.storage.add_package(&updated_extracted).await {
910                match &e {
911                    crate::error::FcmError::Storage(
912                        crate::error::StorageError::PackageAlreadyExists { .. },
913                    ) => {
914                        info!("Package already exists in storage: {}", package_key);
915                    }
916                    _ => return Err(e),
917                }
918            }
919
920            // Mark as installed
921            installed.insert(package_key.clone());
922
923            // Update progress
924            progress.increment(&format!("Completed {package_key}"));
925
926            // Install dependencies recursively
927            for (dep_name, dep_version) in dependencies {
928                info!("Installing dependency: {}@{}", dep_name, dep_version);
929                self.install_package_with_dependencies_and_progress(
930                    &dep_name,
931                    &dep_version,
932                    installed,
933                    progress,
934                )
935                .await?;
936            }
937
938            // Note: Package indexing happens during rebuild_index or explicit index updates
939            // Individual packages are indexed when the full index is rebuilt
940
941            info!("Package installed successfully: {}", package_key);
942            Ok(())
943        })
944    }
945
946    /// Resolve a canonical URL to a FHIR resource
947    /// Enhanced with optimization components for better performance
948    #[tracing::instrument(name = "manager.resolve", skip(self), fields(canonical = %canonical_url))]
949    pub async fn resolve(&self, canonical_url: &str) -> Result<resolver::ResolvedResource> {
950        let tracker = self.performance_monitor.start_operation("resolve");
951        let result = self.resolver.resolve(canonical_url).await;
952        let duration = tracker.finish();
953
954        // Record search operation metrics (resolution is a type of search)
955        self.performance_monitor
956            .record_search_operation(duration, false)
957            .await; // TODO: Track cache hits
958
959        // Log performance metrics if enabled
960        if self.config.optimization.enable_metrics {
961            debug!("URL resolution took {:?} for: {}", duration, canonical_url);
962        }
963
964        result
965    }
966
967    /// Search for FHIR resources
968    pub async fn search(&self) -> search::SearchQueryBuilder {
969        search::SearchQueryBuilder::new(Arc::clone(self.storage.package_storage()))
970    }
971
972    /// Get direct access to the search engine
973    pub fn search_engine(&self) -> &search::SearchEngine {
974        &self.search_engine
975    }
976
977    /// List installed packages
978    pub async fn list_packages(&self) -> Result<Vec<String>> {
979        let packages = self.storage.list_packages().await?;
980        Ok(packages
981            .into_iter()
982            .map(|p| format!("{}@{}", p.name, p.version))
983            .collect())
984    }
985
986    /// Rebuild the search index from existing packages (legacy method)
987    pub async fn rebuild_index(&self) -> Result<()> {
988        // Delegate to smart rebuild for backward compatibility
989        self.smart_rebuild_index().await
990    }
991
992    /// Smart rebuild using the new optimization system
993    pub async fn smart_rebuild_index(&self) -> Result<()> {
994        info!("Starting optimized index rebuild...");
995
996        let start_time = std::time::Instant::now();
997
998        // Get all packages and determine rebuild strategy
999        let packages = self.list_package_infos().await?;
1000        let strategy = self.rebuild_strategy.determine_strategy(&packages).await?;
1001
1002        let result = match strategy {
1003            RebuildStrategy::None => {
1004                info!("No changes detected, skipping index rebuild");
1005                Ok(())
1006            }
1007            RebuildStrategy::Incremental {
1008                ref packages,
1009                batch_size,
1010            } => {
1011                info!(
1012                    "Performing incremental index update for {} packages",
1013                    packages.len()
1014                );
1015                self.incremental_rebuild(packages, batch_size).await?;
1016
1017                let duration = start_time.elapsed();
1018                self.rebuild_strategy
1019                    .record_performance(&strategy, duration, packages.len())
1020                    .await;
1021
1022                // Record performance metrics
1023                self.performance_monitor
1024                    .record_index_operation(IndexOperationType::IncrementalUpdate, duration)
1025                    .await;
1026
1027                Ok(())
1028            }
1029            RebuildStrategy::Full => {
1030                info!(
1031                    "Performing full index rebuild for {} packages",
1032                    packages.len()
1033                );
1034
1035                // Process packages sequentially to avoid deadlocks
1036                let start_time = std::time::Instant::now();
1037                for package in &packages {
1038                    let stats = self.incremental_indexer.update_index(&package.path).await?;
1039                    info!(
1040                        "Updated index for {}: indexed={}, removed={}, duration={:?}",
1041                        package.id, stats.indexed, stats.removed, stats.duration
1042                    );
1043                }
1044                let total_time = start_time.elapsed();
1045
1046                info!(
1047                    "Full rebuild completed: {} packages processed in {:?}",
1048                    packages.len(),
1049                    total_time
1050                );
1051
1052                self.rebuild_strategy
1053                    .record_performance(&strategy, total_time, packages.len())
1054                    .await;
1055
1056                // Record performance metrics
1057                self.performance_monitor
1058                    .record_index_operation(IndexOperationType::FullRebuild, total_time)
1059                    .await;
1060
1061                Ok(())
1062            }
1063        };
1064
1065        info!("Index rebuild completed in {:?}", start_time.elapsed());
1066        result
1067    }
1068
1069    /// Perform incremental rebuild for specific packages
1070    async fn incremental_rebuild(
1071        &self,
1072        packages: &[RebuildPackageInfo],
1073        _batch_size: usize,
1074    ) -> Result<()> {
1075        for package in packages {
1076            let stats = self.incremental_indexer.update_index(&package.path).await?;
1077            info!(
1078                "Updated index for {}: indexed={}, removed={}, duration={:?}",
1079                package.id, stats.indexed, stats.removed, stats.duration
1080            );
1081        }
1082        Ok(())
1083    }
1084
1085    /// Get package information for rebuild strategy analysis
1086    async fn list_package_infos(&self) -> Result<Vec<RebuildPackageInfo>> {
1087        let packages = self.storage.list_packages().await?;
1088        let mut package_infos = Vec::new();
1089
1090        for package in packages {
1091            let package_dir = self
1092                .config
1093                .storage
1094                .packages_dir
1095                .join(format!("{}-{}", package.name, package.version));
1096
1097            if package_dir.exists() {
1098                let info = RebuildPackageInfo::new(
1099                    format!("{}@{}", package.name, package.version),
1100                    package_dir,
1101                );
1102                package_infos.push(info);
1103            }
1104        }
1105
1106        Ok(package_infos)
1107    }
1108
1109    /// Remove a FHIR package by name and version
1110    pub async fn remove_package(&self, name: &str, version: &str) -> Result<()> {
1111        info!("Removing package: {}@{}", name, version);
1112
1113        let removed = self.storage.remove_package(name, version).await?;
1114
1115        if removed {
1116            info!("Package removed successfully: {}@{}", name, version);
1117            // Trigger incremental index update to remove resources
1118            self.smart_rebuild_index().await?;
1119        } else {
1120            info!("Package not found: {}@{}", name, version);
1121        }
1122        Ok(())
1123    }
1124
1125    /// Batch resolve multiple canonical URLs
1126    pub async fn batch_resolve(&self, urls: &[String]) -> Result<Vec<resolver::ResolvedResource>> {
1127        info!("Batch resolving {} URLs", urls.len());
1128
1129        let mut results = Vec::new();
1130        for url in urls {
1131            match self.resolve(url).await {
1132                Ok(resolved) => results.push(resolved),
1133                Err(e) => {
1134                    // For batch operations, we continue processing even if one fails
1135                    tracing::warn!("Failed to resolve {}: {}", url, e);
1136                }
1137            }
1138        }
1139
1140        Ok(results)
1141    }
1142
1143    /// Get optimization performance metrics
1144    pub async fn get_optimization_metrics(&self) -> Result<OptimizationMetrics> {
1145        let perf_metrics = self.performance_monitor.get_metrics().await;
1146
1147        Ok(OptimizationMetrics {
1148            change_detector_cache_size: 0, // TODO: Implement actual metrics from change_detector
1149            incremental_updates_count: perf_metrics.index_operations.incremental_updates_count
1150                as usize,
1151            parallel_processing_efficiency: perf_metrics.index_operations.parallel_efficiency,
1152            average_rebuild_time: perf_metrics.index_operations.average_incremental_time,
1153            storage_compression_ratio: perf_metrics.system_metrics.compression_ratio,
1154        })
1155    }
1156
1157    /// Get detailed performance metrics
1158    pub async fn get_performance_metrics(&self) -> Result<PerformanceMetrics> {
1159        Ok(self.performance_monitor.get_metrics().await)
1160    }
1161
1162    /// Get performance analysis with recommendations
1163    pub async fn analyze_performance(&self) -> Result<PerformanceAnalysis> {
1164        Ok(self.performance_monitor.analyze_performance().await)
1165    }
1166
1167    /// Get search parameters for a specific resource type
1168    pub async fn get_search_parameters(
1169        &self,
1170        resource_type: &str,
1171    ) -> Result<Vec<SearchParameterInfo>> {
1172        info!(
1173            "Getting search parameters for resource type: {}",
1174            resource_type
1175        );
1176
1177        let query = search::SearchQuery {
1178            text: None,
1179            resource_types: vec!["SearchParameter".to_string()],
1180            packages: vec![],
1181            canonical_pattern: None,
1182            version_constraints: vec![],
1183            limit: Some(1000),
1184            offset: Some(0),
1185        };
1186
1187        let results = self.search_engine.search(&query).await?;
1188        let mut search_params = Vec::new();
1189
1190        for resource_match in results.resources {
1191            let resource = &resource_match.resource;
1192            if let Some(base) = resource.content.get("base") {
1193                if let Some(base_array) = base.as_array() {
1194                    for base_type in base_array {
1195                        if base_type.as_str() == Some(resource_type) {
1196                            search_params.push(SearchParameterInfo::from_resource(resource)?);
1197                            break;
1198                        }
1199                    }
1200                } else if base.as_str() == Some(resource_type) {
1201                    search_params.push(SearchParameterInfo::from_resource(resource)?);
1202                }
1203            }
1204        }
1205
1206        Ok(search_params)
1207    }
1208
1209    /// Force a full index rebuild (bypassing smart strategy)
1210    pub async fn force_full_rebuild(&self) -> Result<()> {
1211        info!("Forcing full index rebuild...");
1212        let packages = self.list_package_infos().await?;
1213
1214        // Process packages sequentially to avoid deadlocks
1215        let start_time = std::time::Instant::now();
1216        for package in &packages {
1217            let stats = self.incremental_indexer.update_index(&package.path).await?;
1218            info!(
1219                "Updated index for {}: indexed={}, removed={}, duration={:?}",
1220                package.id, stats.indexed, stats.removed, stats.duration
1221            );
1222        }
1223        let total_time = start_time.elapsed();
1224
1225        info!(
1226            "Forced rebuild completed: {} packages processed in {:?}",
1227            packages.len(),
1228            total_time
1229        );
1230
1231        Ok(())
1232    }
1233
1234    /// Get change detection status for packages
1235    pub async fn get_package_change_status(&self) -> Result<Vec<PackageChangeStatus>> {
1236        let packages = self.list_package_infos().await?;
1237        let mut statuses = Vec::new();
1238
1239        for package in packages {
1240            let changes = match self.change_detector.detect_changes(&package.path).await {
1241                Ok(changes) => changes,
1242                Err(e) => {
1243                    tracing::warn!("Failed to detect changes for {}: {}", package.id, e);
1244                    continue;
1245                }
1246            };
1247
1248            statuses.push(PackageChangeStatus {
1249                package_id: package.id,
1250                has_changes: !changes.is_empty(),
1251                added_files: changes.added_files.len(),
1252                modified_files: changes.modified_files.len(),
1253                removed_files: changes.removed_files.len(),
1254            });
1255        }
1256
1257        Ok(statuses)
1258    }
1259
1260    /// Validate the current configuration and get recommendations
1261    pub fn validate_configuration(&self) -> ValidationResult {
1262        crate::config_validator::ConfigValidator::validate_config(&self.config)
1263    }
1264
1265    /// Get access to the Tantivy search index
1266    pub fn tantivy_index(&self) -> &Arc<Index> {
1267        &self.tantivy_index
1268    }
1269
1270    /// Get unified storage statistics
1271    pub async fn get_unified_storage_stats(&self) -> Result<UnifiedStorageStats> {
1272        self.storage.get_unified_stats().await
1273    }
1274
1275    /// Perform integrity check on unified storage system
1276    pub async fn integrity_check_unified(&self) -> Result<UnifiedIntegrityReport> {
1277        self.storage.integrity_check().await
1278    }
1279
1280    /// Compact unified storage system for optimal performance
1281    pub async fn compact_storage(&self) -> Result<()> {
1282        info!("Compacting unified storage system...");
1283        self.storage.compact().await?;
1284        info!("Storage compaction completed successfully");
1285        Ok(())
1286    }
1287
1288    /// Get a reference to the unified storage system
1289    pub fn storage(&self) -> &Arc<UnifiedStorage> {
1290        &self.storage
1291    }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296    use super::*;
1297    use tempfile::TempDir;
1298
1299    #[tokio::test]
1300    async fn test_canonical_manager_creation() {
1301        let temp_dir = TempDir::new().unwrap();
1302        let config = FcmConfig::test_config(temp_dir.path());
1303
1304        let manager = CanonicalManager::new(config).await;
1305        assert!(manager.is_ok());
1306    }
1307}