1#[doc(hidden)]
33pub mod binary_storage;
34#[doc(hidden)]
35pub mod change_detection;
36#[doc(hidden)]
37pub mod concurrency;
38pub mod config;
39#[doc(hidden)]
40pub mod config_validator;
41pub mod domain;
42pub mod error;
43#[cfg(feature = "fuzzy-search")]
44#[doc(hidden)]
45pub mod fuzzy;
46#[doc(hidden)]
47pub mod incremental_indexer;
48#[doc(hidden)]
49pub mod index_writer;
50#[doc(hidden)]
51pub mod output;
52#[doc(hidden)]
53pub mod package;
54#[doc(hidden)]
55pub mod performance;
56#[doc(hidden)]
57pub mod rebuild_strategy;
58#[doc(hidden)]
59pub mod registry;
60pub mod resolver;
61pub mod search;
62#[doc(hidden)]
63pub mod storage;
64#[doc(hidden)]
65pub mod traits;
66#[doc(hidden)]
67pub mod unified_storage;
68
69#[cfg(feature = "cli")]
70#[doc(hidden)]
71pub mod cli;
72
73#[cfg(feature = "cli")]
74#[doc(hidden)]
75pub mod cli_error;
76
77pub use config::{FcmConfig, OptimizationConfig, PackageSpec, RegistryConfig, StorageConfig};
79pub use domain::{CanonicalUrl, CanonicalWithVersion, FhirVersion, PackageVersion};
80pub use error::{FcmError, Result};
81pub use resolver::CanonicalResolver;
82pub use search::SearchEngine;
83pub use unified_storage::{UnifiedIntegrityReport, UnifiedStorageStats};
84
85use std::collections::HashSet;
86use std::sync::Arc;
87use tantivy::{Index, schema::*};
88use tokio::fs;
89use tracing::{debug, info, warn};
90
91use crate::binary_storage::PackageInfo;
93use crate::change_detection::PackageChangeDetector;
94use crate::config_validator::ValidationResult;
95use crate::incremental_indexer::IncrementalIndexer;
96use crate::performance::{
97 IndexOperationType, PackageOperationType, PerformanceAnalysis, PerformanceConfig,
98 PerformanceMetrics, PerformanceMonitor,
99};
100use crate::rebuild_strategy::{RebuildStrategy, SmartRebuildStrategy};
101use crate::unified_storage::UnifiedStorage;
102
103#[cfg(feature = "cli")]
104use crate::output::Progress;
105
106#[cfg(feature = "cli")]
107use crate::cli::ProgressContext;
108
109#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
111pub struct OptimizationMetrics {
112 pub change_detector_cache_size: usize,
113 pub incremental_updates_count: usize,
114 pub parallel_processing_efficiency: f64,
115 pub average_rebuild_time: std::time::Duration,
116 pub storage_compression_ratio: f64,
117}
118
119#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct PackageChangeStatus {
122 pub package_id: String,
123 pub has_changes: bool,
124 pub added_files: usize,
125 pub modified_files: usize,
126 pub removed_files: usize,
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
131pub struct SearchParameterInfo {
132 pub name: String,
133 pub code: String,
134 pub base: Vec<String>,
135 pub type_field: String,
136 pub description: Option<String>,
137 pub expression: Option<String>,
138 pub xpath: Option<String>,
139 pub url: Option<String>,
140 pub status: Option<String>,
141}
142
143impl SearchParameterInfo {
144 pub fn from_resource(resource: &package::FhirResource) -> Result<Self> {
146 let content = &resource.content;
147
148 Ok(Self {
149 name: content
150 .get("name")
151 .and_then(|v| v.as_str())
152 .unwrap_or("unknown")
153 .to_string(),
154 code: content
155 .get("code")
156 .and_then(|v| v.as_str())
157 .unwrap_or("")
158 .to_string(),
159 base: content
160 .get("base")
161 .and_then(|v| v.as_array())
162 .map(|arr| {
163 arr.iter()
164 .filter_map(|v| v.as_str().map(String::from))
165 .collect()
166 })
167 .or_else(|| {
168 content
169 .get("base")
170 .and_then(|v| v.as_str())
171 .map(|s| vec![s.to_string()])
172 })
173 .unwrap_or_default(),
174 type_field: content
175 .get("type")
176 .and_then(|v| v.as_str())
177 .unwrap_or("string")
178 .to_string(),
179 description: content
180 .get("description")
181 .and_then(|v| v.as_str())
182 .map(String::from),
183 expression: content
184 .get("expression")
185 .and_then(|v| v.as_str())
186 .map(String::from),
187 xpath: content
188 .get("xpath")
189 .and_then(|v| v.as_str())
190 .map(String::from),
191 url: content
192 .get("url")
193 .and_then(|v| v.as_str())
194 .map(String::from),
195 status: content
196 .get("status")
197 .and_then(|v| v.as_str())
198 .map(String::from),
199 })
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct RebuildPackageInfo {
206 pub id: String,
207 pub path: std::path::PathBuf,
208}
209
210impl RebuildPackageInfo {
211 pub fn new(id: String, path: std::path::PathBuf) -> Self {
212 Self { id, path }
213 }
214}
215
216pub struct CanonicalManager {
220 config: FcmConfig,
221 storage: Arc<UnifiedStorage>, registry_client: registry::RegistryClient,
223 registry_dyn: Option<Arc<dyn crate::traits::AsyncRegistry + Send + Sync>>,
225 pkg_store_dyn: Option<Arc<dyn crate::traits::PackageStore + Send + Sync>>,
226 #[allow(dead_code)]
227 index_store_dyn: Option<Arc<dyn crate::traits::IndexStore + Send + Sync>>,
228 resolver: CanonicalResolver,
229 search_engine: SearchEngine,
230 change_detector: Arc<PackageChangeDetector>,
232 rebuild_strategy: SmartRebuildStrategy,
233 incremental_indexer: Arc<IncrementalIndexer>,
234 performance_monitor: Arc<PerformanceMonitor>,
236 tantivy_index: Arc<Index>,
237}
238
239impl std::fmt::Debug for CanonicalManager {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("CanonicalManager")
242 .field("registry_url", &self.config.registry.url)
243 .field("packages_configured", &self.config.packages.len())
244 .finish()
245 }
246}
247
248impl CanonicalManager {
249 pub async fn new(mut config: FcmConfig) -> Result<Self> {
251 if std::env::var("CI").is_ok() || std::env::var("FHIRPATH_QUICK_INIT").is_ok() {
253 info!("Quick initialization mode detected (CI or FHIRPATH_QUICK_INIT set)");
254 config.optimization.parallel_workers = 1;
256 config.optimization.enable_metrics = false;
257 config.optimization.use_mmap = false;
258 }
259
260 let skip_optimization =
263 config.optimization.parallel_workers == 1 && !config.optimization.enable_metrics;
264 let validation = if skip_optimization {
265 debug!("Skipping config optimization for test mode");
266 crate::config_validator::ConfigValidator::validate_config(&config)
267 } else {
268 crate::config_validator::ConfigValidator::optimize_config(&mut config)
269 };
270
271 if !validation.is_valid {
272 for error in &validation.errors {
273 tracing::error!("Configuration error: {}", error);
274 }
275 return Err(crate::error::FcmError::Config(
276 crate::error::ConfigError::ValidationFailed {
277 message: "Invalid optimization configuration".to_string(),
278 },
279 ));
280 }
281
282 for warning in &validation.warnings {
284 warn!("Configuration warning: {}", warning);
285 }
286
287 for recommendation in &validation.recommendations {
288 info!("Configuration recommendation: {}", recommendation);
289 }
290
291 let storage_future = UnifiedStorage::new(
293 config.storage.clone(),
294 config.optimization.use_mmap,
295 config.optimization.compression_level,
296 );
297
298 let storage =
299 match tokio::time::timeout(std::time::Duration::from_secs(30), storage_future).await {
300 Ok(Ok(s)) => Arc::new(s),
301 Ok(Err(e)) => {
302 tracing::error!("Failed to initialize unified storage: {}", e);
303 return Err(e);
304 }
305 Err(_) => {
306 tracing::error!("Unified storage initialization timed out after 30 seconds");
307 return Err(FcmError::Config(
308 crate::error::ConfigError::ValidationFailed {
309 message: "Storage initialization timed out".to_string(),
310 },
311 ));
312 }
313 };
314
315 let expanded_storage = config.get_expanded_storage_config();
317 let registry_client =
318 registry::RegistryClient::new(&config.registry, expanded_storage.cache_dir.clone())
319 .await?;
320
321 let resolver = CanonicalResolver::new(Arc::clone(storage.package_storage()));
323
324 let search_engine = SearchEngine::new(Arc::clone(storage.package_storage()));
326
327 let change_detector = Arc::new(PackageChangeDetector::new());
329
330 let rebuild_strategy = SmartRebuildStrategy::new(Arc::clone(&change_detector))
332 .with_full_rebuild_threshold(config.optimization.full_rebuild_threshold)
333 .with_batch_size(config.optimization.incremental_batch_size);
334
335 let tantivy_index = Arc::new(Self::create_search_index()?);
337
338 let writer_manager = Arc::new(index_writer::IndexWriterManager::new(
340 Arc::clone(&tantivy_index),
341 50_000_000,
342 ));
343 let incremental_indexer = Arc::new(IncrementalIndexer::new(
344 Arc::clone(&writer_manager),
345 Arc::clone(&change_detector),
346 config.optimization.batch_size,
347 ));
348
349 let perf_config = PerformanceConfig {
351 enable_metrics: config.optimization.enable_metrics,
352 metrics_interval: std::time::Duration::from_secs(30),
353 max_samples: 1000,
354 enable_detailed_logging: config.optimization.enable_metrics,
355 };
356 let performance_monitor = Arc::new(PerformanceMonitor::new(perf_config));
357
358 info!("FHIR Canonical Manager initialized successfully with optimized configuration");
359
360 Ok(Self {
361 config,
362 storage,
363 registry_client,
364 registry_dyn: None,
365 pkg_store_dyn: None,
366 index_store_dyn: None,
367 resolver,
368 search_engine,
369 change_detector,
370 rebuild_strategy,
371 incremental_indexer,
372 performance_monitor,
373 tantivy_index,
374 })
375 }
376
377 pub async fn new_simple(config: FcmConfig) -> Result<Self> {
379 Self::new(config).await
380 }
381
382 pub async fn new_with_components(
385 mut config: FcmConfig,
386 package_store: Arc<dyn crate::traits::PackageStore + Send + Sync>,
387 index_store: Arc<dyn crate::traits::IndexStore + Send + Sync>,
388 registry: Arc<dyn crate::traits::AsyncRegistry + Send + Sync>,
389 search_storage: Arc<crate::binary_storage::BinaryStorage>,
390 ) -> Result<Self> {
391 let skip_optimization =
393 config.optimization.parallel_workers == 1 && !config.optimization.enable_metrics;
394 let validation = if skip_optimization {
395 crate::config_validator::ConfigValidator::validate_config(&config)
396 } else {
397 crate::config_validator::ConfigValidator::optimize_config(&mut config)
398 };
399 if !validation.is_valid {
400 return Err(crate::error::FcmError::Config(
401 crate::error::ConfigError::ValidationFailed {
402 message: "Invalid optimization configuration".to_string(),
403 },
404 ));
405 }
406
407 let storage = Arc::new(
409 UnifiedStorage::new(
410 config.storage.clone(),
411 config.optimization.use_mmap,
412 config.optimization.compression_level,
413 )
414 .await?,
415 );
416
417 let resolver = CanonicalResolver::new(Arc::clone(&search_storage));
419 let search_engine = SearchEngine::new(Arc::clone(&search_storage));
420
421 let change_detector = Arc::new(PackageChangeDetector::new());
423 let rebuild_strategy = SmartRebuildStrategy::new(Arc::clone(&change_detector))
424 .with_full_rebuild_threshold(config.optimization.full_rebuild_threshold)
425 .with_batch_size(config.optimization.incremental_batch_size);
426
427 let tantivy_index = Arc::new(Self::create_search_index()?);
428 let writer_manager = Arc::new(index_writer::IndexWriterManager::new(
429 Arc::clone(&tantivy_index),
430 50_000_000,
431 ));
432 let incremental_indexer = Arc::new(IncrementalIndexer::new(
433 Arc::clone(&writer_manager),
434 Arc::clone(&change_detector),
435 config.optimization.batch_size,
436 ));
437
438 let perf_config = PerformanceConfig {
439 enable_metrics: config.optimization.enable_metrics,
440 metrics_interval: std::time::Duration::from_secs(30),
441 max_samples: 1000,
442 enable_detailed_logging: config.optimization.enable_metrics,
443 };
444 let performance_monitor = Arc::new(PerformanceMonitor::new(perf_config));
445
446 let expanded_storage = config.get_expanded_storage_config();
448 let registry_client =
449 registry::RegistryClient::new(&config.registry, expanded_storage.cache_dir.clone())
450 .await?;
451
452 Ok(Self {
453 config,
454 storage,
455 registry_client,
456 registry_dyn: Some(registry),
457 pkg_store_dyn: Some(package_store),
458 index_store_dyn: Some(index_store),
459 resolver,
460 search_engine,
461 change_detector,
462 rebuild_strategy,
463 incremental_indexer,
464 performance_monitor,
465 tantivy_index,
466 })
467 }
468
469 fn create_search_index() -> Result<Index> {
470 let mut schema_builder = Schema::builder();
471
472 schema_builder.add_text_field("resource_type", TEXT | STORED);
473 schema_builder.add_text_field("id", TEXT | STORED);
474 schema_builder.add_text_field("canonical_url", TEXT | STORED);
475 schema_builder.add_text_field("package_id", TEXT | STORED);
476 schema_builder.add_text_field("name", TEXT | STORED);
477 schema_builder.add_text_field("title", TEXT | STORED);
478 schema_builder.add_text_field("description", TEXT);
479 schema_builder.add_text_field("content", TEXT);
480 schema_builder.add_text_field("search_text", TEXT);
481
482 let schema = schema_builder.build();
483 let index = Index::create_in_ram(schema);
484
485 Ok(index)
486 }
487
488 async fn registry_download(&self, spec: &PackageSpec) -> Result<registry::PackageDownload> {
490 if let Some(r) = &self.registry_dyn {
491 r.download_package(spec).await
492 } else {
493 self.registry_client.download_package(spec).await
494 }
495 }
496
497 async fn list_packages_via_store(&self) -> Result<Vec<PackageInfo>> {
499 if let Some(s) = &self.pkg_store_dyn {
500 s.list_packages().await
501 } else {
502 self.storage.list_packages().await
503 }
504 }
505
506 async fn add_package_via_store(&self, pkg: &package::ExtractedPackage) -> Result<()> {
508 if let Some(s) = &self.pkg_store_dyn {
509 s.add_package(pkg).await
510 } else {
511 self.storage.add_package(pkg).await
512 }
513 }
514
515 #[tracing::instrument(name = "manager.install_package", skip(self), fields(pkg = %name, ver = %version))]
518 pub async fn install_package(&self, name: &str, version: &str) -> Result<()> {
519 if self.config.optimization.parallel_workers == 1
521 && !self.config.optimization.enable_metrics
522 {
523 debug!("Using simplified installation for testing mode");
524 return self.install_package_simple(name, version).await;
525 }
526
527 let tracker = self.performance_monitor.start_operation("install_package");
528 let mut installed = HashSet::new();
529
530 info!(
531 "Installing package {}@{} with optimization features enabled",
532 name, version
533 );
534
535 self.install_package_with_dependencies(name, version, &mut installed)
536 .await?;
537
538 info!("Analyzing rebuild strategy for newly installed packages...");
540 self.smart_rebuild_index().await?;
541
542 let total_duration = tracker.finish();
543
544 self.performance_monitor
546 .record_package_operation(PackageOperationType::Install, total_duration)
547 .await;
548
549 info!("Package installation completed in {:?}", total_duration);
550
551 if self.config.optimization.enable_metrics {
553 info!(
554 "Performance: Installed {} packages in {:?}",
555 installed.len(),
556 total_duration
557 );
558 }
559
560 Ok(())
561 }
562
563 async fn install_package_simple(&self, name: &str, version: &str) -> Result<()> {
565 debug!("Installing package {}@{} in simplified mode", name, version);
566
567 let mut installed = std::collections::HashSet::new();
569 self.install_package_simple_recursive(name, version, &mut installed)
570 .await
571 }
572
573 fn install_package_simple_recursive<'a>(
575 &'a self,
576 name: &'a str,
577 version: &'a str,
578 installed: &'a mut std::collections::HashSet<String>,
579 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
580 Box::pin(async move {
581 let package_key = format!("{name}@{version}");
582
583 if installed.contains(&package_key) {
585 debug!("Package {} already processed", package_key);
586 return Ok(());
587 }
588
589 debug!(
591 "Checking if package {} already exists in storage...",
592 package_key
593 );
594 let packages = self.storage.list_packages().await?;
595 if packages
596 .iter()
597 .any(|p| p.name == name && p.version == version)
598 {
599 debug!("Package {} already installed in storage", package_key);
600 installed.insert(package_key);
601 return Ok(());
602 }
603 debug!(
604 "Package {} not found in storage, proceeding with installation",
605 package_key
606 );
607
608 let spec = crate::config::PackageSpec {
609 name: name.to_string(),
610 version: version.to_string(),
611 priority: 1,
612 };
613
614 debug!("Downloading package {}...", package_key);
616 let download = self.registry_client.download_package(&spec).await?;
617 let dependencies = download.metadata.dependencies.clone();
618 debug!(
619 "Package {} downloaded successfully, found {} dependencies",
620 package_key,
621 dependencies.len()
622 );
623
624 for (dep_name, dep_version) in dependencies {
626 debug!("Installing dependency: {}@{}", dep_name, dep_version);
627 self.install_package_simple_recursive(&dep_name, &dep_version, installed)
628 .await?;
629 }
630
631 debug!("Extracting package {}...", package_key);
632 let extractor =
633 crate::package::PackageExtractor::new(self.config.storage.packages_dir.clone());
634 let extracted = extractor.extract_package(download).await?;
635 debug!("Package {} extracted successfully", package_key);
636
637 debug!("Adding package {} to storage...", package_key);
639 self.storage.add_package(&extracted).await?;
640 #[cfg(feature = "metrics")]
641 {
642 metrics::increment_counter!("packages_installed");
643 }
644 debug!("Package {} added to storage successfully", package_key);
645
646 installed.insert(package_key.clone());
647 info!(
648 "Package {} installed successfully (simplified mode)",
649 package_key
650 );
651 Ok(())
652 })
653 }
654
655 #[cfg(feature = "cli")]
657 pub async fn count_packages_to_install(&self, name: &str, version: &str) -> Result<usize> {
658 let mut visited = HashSet::new();
659 self.count_packages_recursive(name, version, &mut visited)
660 .await
661 }
662
663 #[cfg(feature = "cli")]
665 fn count_packages_recursive<'a>(
666 &'a self,
667 name: &'a str,
668 version: &'a str,
669 visited: &'a mut HashSet<String>,
670 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize>> + Send + 'a>> {
671 Box::pin(async move {
672 let package_key = format!("{name}@{version}");
673
674 if visited.contains(&package_key) {
675 return Ok(0);
676 }
677
678 visited.insert(package_key);
679 let mut total = 1; let metadata = self
683 .registry_client
684 .get_package_metadata(name, version)
685 .await?;
686
687 for (dep_name, dep_version) in metadata.dependencies {
689 total += self
690 .count_packages_recursive(&dep_name, &dep_version, visited)
691 .await?;
692 }
693
694 Ok(total)
695 })
696 }
697
698 #[cfg(feature = "cli")]
700 pub async fn install_package_with_progress(
701 &self,
702 name: &str,
703 version: &str,
704 progress: &mut ProgressContext,
705 ) -> Result<()> {
706 let mut installed = HashSet::new();
707 self.install_package_with_dependencies_and_progress(
708 name,
709 version,
710 &mut installed,
711 progress,
712 )
713 .await?;
714
715 info!("Analyzing rebuild strategy for newly installed packages...");
717 self.smart_rebuild_index().await?;
718
719 Ok(())
720 }
721
722 fn install_package_with_dependencies<'a>(
724 &'a self,
725 name: &'a str,
726 version: &'a str,
727 installed: &'a mut HashSet<String>,
728 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
729 Box::pin(async move {
730 let package_key = format!("{name}@{version}");
731
732 if installed.contains(&package_key) {
734 info!("Package already processed: {}", package_key);
735 return Ok(());
736 }
737
738 let packages = self.list_packages_via_store().await?;
740 for p in &packages {
741 debug!("- {}@{}", p.name, p.version);
742 }
743
744 if packages
745 .iter()
746 .any(|p| p.name == name && p.version == version)
747 {
748 info!("Package already installed in storage: {}", package_key);
749 installed.insert(package_key);
750 return Ok(());
751 }
752
753 info!("Installing package: {}@{}", name, version);
754
755 let spec = PackageSpec {
756 name: name.to_string(),
757 version: version.to_string(),
758 priority: 1,
759 };
760
761 let download = self.registry_download(&spec).await?;
763 let dependencies = download.metadata.dependencies.clone();
764
765 let extractor = package::PackageExtractor::new(self.config.storage.cache_dir.clone());
767 let extracted = extractor.extract_package(download).await?;
768
769 let package_dir = self
771 .config
772 .storage
773 .packages_dir
774 .join(format!("{name}-{version}"));
775 if package_dir.exists() {
776 fs::remove_dir_all(&package_dir).await?;
777 }
778 fs::rename(&extracted.extraction_path, &package_dir).await?;
779
780 let mut updated_extracted = extracted;
782 for resource in &mut updated_extracted.resources {
783 if let Ok(relative_path) = resource
785 .file_path
786 .strip_prefix(&updated_extracted.extraction_path)
787 {
788 resource.file_path = package_dir.join(relative_path);
789 }
790 }
791 updated_extracted.extraction_path = package_dir.clone();
792
793 if let Err(e) = self.add_package_via_store(&updated_extracted).await {
795 match &e {
796 crate::error::FcmError::Storage(
797 crate::error::StorageError::PackageAlreadyExists { .. },
798 ) => {
799 info!("Package already exists in storage: {}", package_key);
800 }
801 _ => return Err(e),
802 }
803 }
804 #[cfg(feature = "metrics")]
805 {
806 metrics::increment_counter!("packages_installed");
807 }
808
809 installed.insert(package_key.clone());
811
812 for (dep_name, dep_version) in dependencies {
814 info!("Installing dependency: {}@{}", dep_name, dep_version);
815 self.install_package_with_dependencies(&dep_name, &dep_version, installed)
816 .await?;
817 }
818
819 info!("Package installed successfully: {}", package_key);
823 Ok(())
824 })
825 }
826
827 #[cfg(feature = "cli")]
829 fn install_package_with_dependencies_and_progress<'a>(
830 &'a self,
831 name: &'a str,
832 version: &'a str,
833 installed: &'a mut HashSet<String>,
834 progress: &'a mut ProgressContext,
835 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
836 Box::pin(async move {
837 let package_key = format!("{name}@{version}");
838
839 if installed.contains(&package_key) {
841 info!("Package already processed: {}", package_key);
842 return Ok(());
843 }
844
845 let packages = self.storage.list_packages().await?;
847 if packages
848 .iter()
849 .any(|p| p.name == name && p.version == version)
850 {
851 info!("Package already installed in storage: {}", package_key);
852 #[cfg(feature = "cli")]
853 Progress::step(&format!("✓ Package already installed: {package_key}"));
854 installed.insert(package_key);
855 return Ok(());
856 }
857
858 let stream_progress = progress.create_stream_progress();
860
861 info!("Installing package: {}@{}", name, version);
862 #[cfg(feature = "cli")]
863 Progress::step(&format!("Downloading {package_key}"));
864
865 let spec = PackageSpec {
866 name: name.to_string(),
867 version: version.to_string(),
868 priority: 1,
869 };
870
871 let download = self
873 .registry_client
874 .download_package_with_progress(&spec, Some(&stream_progress))
875 .await?;
876 let dependencies = download.metadata.dependencies.clone();
877
878 #[cfg(feature = "cli")]
880 Progress::step(&format!("Extracting {package_key}"));
881 let extractor = package::PackageExtractor::new(self.config.storage.cache_dir.clone());
882 let extracted = extractor.extract_package(download).await?;
883
884 let package_dir = self
886 .config
887 .storage
888 .packages_dir
889 .join(format!("{name}-{version}"));
890 if package_dir.exists() {
891 fs::remove_dir_all(&package_dir).await?;
892 }
893 fs::rename(&extracted.extraction_path, &package_dir).await?;
894
895 let mut updated_extracted = extracted;
897 for resource in &mut updated_extracted.resources {
898 if let Ok(relative_path) = resource
900 .file_path
901 .strip_prefix(&updated_extracted.extraction_path)
902 {
903 resource.file_path = package_dir.join(relative_path);
904 }
905 }
906 updated_extracted.extraction_path = package_dir.clone();
907
908 if let Err(e) = self.storage.add_package(&updated_extracted).await {
910 match &e {
911 crate::error::FcmError::Storage(
912 crate::error::StorageError::PackageAlreadyExists { .. },
913 ) => {
914 info!("Package already exists in storage: {}", package_key);
915 }
916 _ => return Err(e),
917 }
918 }
919
920 installed.insert(package_key.clone());
922
923 progress.increment(&format!("Completed {package_key}"));
925
926 for (dep_name, dep_version) in dependencies {
928 info!("Installing dependency: {}@{}", dep_name, dep_version);
929 self.install_package_with_dependencies_and_progress(
930 &dep_name,
931 &dep_version,
932 installed,
933 progress,
934 )
935 .await?;
936 }
937
938 info!("Package installed successfully: {}", package_key);
942 Ok(())
943 })
944 }
945
946 #[tracing::instrument(name = "manager.resolve", skip(self), fields(canonical = %canonical_url))]
949 pub async fn resolve(&self, canonical_url: &str) -> Result<resolver::ResolvedResource> {
950 let tracker = self.performance_monitor.start_operation("resolve");
951 let result = self.resolver.resolve(canonical_url).await;
952 let duration = tracker.finish();
953
954 self.performance_monitor
956 .record_search_operation(duration, false)
957 .await; if self.config.optimization.enable_metrics {
961 debug!("URL resolution took {:?} for: {}", duration, canonical_url);
962 }
963
964 result
965 }
966
967 pub async fn search(&self) -> search::SearchQueryBuilder {
969 search::SearchQueryBuilder::new(Arc::clone(self.storage.package_storage()))
970 }
971
972 pub fn search_engine(&self) -> &search::SearchEngine {
974 &self.search_engine
975 }
976
977 pub async fn list_packages(&self) -> Result<Vec<String>> {
979 let packages = self.storage.list_packages().await?;
980 Ok(packages
981 .into_iter()
982 .map(|p| format!("{}@{}", p.name, p.version))
983 .collect())
984 }
985
986 pub async fn rebuild_index(&self) -> Result<()> {
988 self.smart_rebuild_index().await
990 }
991
992 pub async fn smart_rebuild_index(&self) -> Result<()> {
994 info!("Starting optimized index rebuild...");
995
996 let start_time = std::time::Instant::now();
997
998 let packages = self.list_package_infos().await?;
1000 let strategy = self.rebuild_strategy.determine_strategy(&packages).await?;
1001
1002 let result = match strategy {
1003 RebuildStrategy::None => {
1004 info!("No changes detected, skipping index rebuild");
1005 Ok(())
1006 }
1007 RebuildStrategy::Incremental {
1008 ref packages,
1009 batch_size,
1010 } => {
1011 info!(
1012 "Performing incremental index update for {} packages",
1013 packages.len()
1014 );
1015 self.incremental_rebuild(packages, batch_size).await?;
1016
1017 let duration = start_time.elapsed();
1018 self.rebuild_strategy
1019 .record_performance(&strategy, duration, packages.len())
1020 .await;
1021
1022 self.performance_monitor
1024 .record_index_operation(IndexOperationType::IncrementalUpdate, duration)
1025 .await;
1026
1027 Ok(())
1028 }
1029 RebuildStrategy::Full => {
1030 info!(
1031 "Performing full index rebuild for {} packages",
1032 packages.len()
1033 );
1034
1035 let start_time = std::time::Instant::now();
1037 for package in &packages {
1038 let stats = self.incremental_indexer.update_index(&package.path).await?;
1039 info!(
1040 "Updated index for {}: indexed={}, removed={}, duration={:?}",
1041 package.id, stats.indexed, stats.removed, stats.duration
1042 );
1043 }
1044 let total_time = start_time.elapsed();
1045
1046 info!(
1047 "Full rebuild completed: {} packages processed in {:?}",
1048 packages.len(),
1049 total_time
1050 );
1051
1052 self.rebuild_strategy
1053 .record_performance(&strategy, total_time, packages.len())
1054 .await;
1055
1056 self.performance_monitor
1058 .record_index_operation(IndexOperationType::FullRebuild, total_time)
1059 .await;
1060
1061 Ok(())
1062 }
1063 };
1064
1065 info!("Index rebuild completed in {:?}", start_time.elapsed());
1066 result
1067 }
1068
1069 async fn incremental_rebuild(
1071 &self,
1072 packages: &[RebuildPackageInfo],
1073 _batch_size: usize,
1074 ) -> Result<()> {
1075 for package in packages {
1076 let stats = self.incremental_indexer.update_index(&package.path).await?;
1077 info!(
1078 "Updated index for {}: indexed={}, removed={}, duration={:?}",
1079 package.id, stats.indexed, stats.removed, stats.duration
1080 );
1081 }
1082 Ok(())
1083 }
1084
1085 async fn list_package_infos(&self) -> Result<Vec<RebuildPackageInfo>> {
1087 let packages = self.storage.list_packages().await?;
1088 let mut package_infos = Vec::new();
1089
1090 for package in packages {
1091 let package_dir = self
1092 .config
1093 .storage
1094 .packages_dir
1095 .join(format!("{}-{}", package.name, package.version));
1096
1097 if package_dir.exists() {
1098 let info = RebuildPackageInfo::new(
1099 format!("{}@{}", package.name, package.version),
1100 package_dir,
1101 );
1102 package_infos.push(info);
1103 }
1104 }
1105
1106 Ok(package_infos)
1107 }
1108
1109 pub async fn remove_package(&self, name: &str, version: &str) -> Result<()> {
1111 info!("Removing package: {}@{}", name, version);
1112
1113 let removed = self.storage.remove_package(name, version).await?;
1114
1115 if removed {
1116 info!("Package removed successfully: {}@{}", name, version);
1117 self.smart_rebuild_index().await?;
1119 } else {
1120 info!("Package not found: {}@{}", name, version);
1121 }
1122 Ok(())
1123 }
1124
1125 pub async fn batch_resolve(&self, urls: &[String]) -> Result<Vec<resolver::ResolvedResource>> {
1127 info!("Batch resolving {} URLs", urls.len());
1128
1129 let mut results = Vec::new();
1130 for url in urls {
1131 match self.resolve(url).await {
1132 Ok(resolved) => results.push(resolved),
1133 Err(e) => {
1134 tracing::warn!("Failed to resolve {}: {}", url, e);
1136 }
1137 }
1138 }
1139
1140 Ok(results)
1141 }
1142
1143 pub async fn get_optimization_metrics(&self) -> Result<OptimizationMetrics> {
1145 let perf_metrics = self.performance_monitor.get_metrics().await;
1146
1147 Ok(OptimizationMetrics {
1148 change_detector_cache_size: 0, incremental_updates_count: perf_metrics.index_operations.incremental_updates_count
1150 as usize,
1151 parallel_processing_efficiency: perf_metrics.index_operations.parallel_efficiency,
1152 average_rebuild_time: perf_metrics.index_operations.average_incremental_time,
1153 storage_compression_ratio: perf_metrics.system_metrics.compression_ratio,
1154 })
1155 }
1156
1157 pub async fn get_performance_metrics(&self) -> Result<PerformanceMetrics> {
1159 Ok(self.performance_monitor.get_metrics().await)
1160 }
1161
1162 pub async fn analyze_performance(&self) -> Result<PerformanceAnalysis> {
1164 Ok(self.performance_monitor.analyze_performance().await)
1165 }
1166
1167 pub async fn get_search_parameters(
1169 &self,
1170 resource_type: &str,
1171 ) -> Result<Vec<SearchParameterInfo>> {
1172 info!(
1173 "Getting search parameters for resource type: {}",
1174 resource_type
1175 );
1176
1177 let query = search::SearchQuery {
1178 text: None,
1179 resource_types: vec!["SearchParameter".to_string()],
1180 packages: vec![],
1181 canonical_pattern: None,
1182 version_constraints: vec![],
1183 limit: Some(1000),
1184 offset: Some(0),
1185 };
1186
1187 let results = self.search_engine.search(&query).await?;
1188 let mut search_params = Vec::new();
1189
1190 for resource_match in results.resources {
1191 let resource = &resource_match.resource;
1192 if let Some(base) = resource.content.get("base") {
1193 if let Some(base_array) = base.as_array() {
1194 for base_type in base_array {
1195 if base_type.as_str() == Some(resource_type) {
1196 search_params.push(SearchParameterInfo::from_resource(resource)?);
1197 break;
1198 }
1199 }
1200 } else if base.as_str() == Some(resource_type) {
1201 search_params.push(SearchParameterInfo::from_resource(resource)?);
1202 }
1203 }
1204 }
1205
1206 Ok(search_params)
1207 }
1208
1209 pub async fn force_full_rebuild(&self) -> Result<()> {
1211 info!("Forcing full index rebuild...");
1212 let packages = self.list_package_infos().await?;
1213
1214 let start_time = std::time::Instant::now();
1216 for package in &packages {
1217 let stats = self.incremental_indexer.update_index(&package.path).await?;
1218 info!(
1219 "Updated index for {}: indexed={}, removed={}, duration={:?}",
1220 package.id, stats.indexed, stats.removed, stats.duration
1221 );
1222 }
1223 let total_time = start_time.elapsed();
1224
1225 info!(
1226 "Forced rebuild completed: {} packages processed in {:?}",
1227 packages.len(),
1228 total_time
1229 );
1230
1231 Ok(())
1232 }
1233
1234 pub async fn get_package_change_status(&self) -> Result<Vec<PackageChangeStatus>> {
1236 let packages = self.list_package_infos().await?;
1237 let mut statuses = Vec::new();
1238
1239 for package in packages {
1240 let changes = match self.change_detector.detect_changes(&package.path).await {
1241 Ok(changes) => changes,
1242 Err(e) => {
1243 tracing::warn!("Failed to detect changes for {}: {}", package.id, e);
1244 continue;
1245 }
1246 };
1247
1248 statuses.push(PackageChangeStatus {
1249 package_id: package.id,
1250 has_changes: !changes.is_empty(),
1251 added_files: changes.added_files.len(),
1252 modified_files: changes.modified_files.len(),
1253 removed_files: changes.removed_files.len(),
1254 });
1255 }
1256
1257 Ok(statuses)
1258 }
1259
1260 pub fn validate_configuration(&self) -> ValidationResult {
1262 crate::config_validator::ConfigValidator::validate_config(&self.config)
1263 }
1264
1265 pub fn tantivy_index(&self) -> &Arc<Index> {
1267 &self.tantivy_index
1268 }
1269
1270 pub async fn get_unified_storage_stats(&self) -> Result<UnifiedStorageStats> {
1272 self.storage.get_unified_stats().await
1273 }
1274
1275 pub async fn integrity_check_unified(&self) -> Result<UnifiedIntegrityReport> {
1277 self.storage.integrity_check().await
1278 }
1279
1280 pub async fn compact_storage(&self) -> Result<()> {
1282 info!("Compacting unified storage system...");
1283 self.storage.compact().await?;
1284 info!("Storage compaction completed successfully");
1285 Ok(())
1286 }
1287
1288 pub fn storage(&self) -> &Arc<UnifiedStorage> {
1290 &self.storage
1291 }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296 use super::*;
1297 use tempfile::TempDir;
1298
1299 #[tokio::test]
1300 async fn test_canonical_manager_creation() {
1301 let temp_dir = TempDir::new().unwrap();
1302 let config = FcmConfig::test_config(temp_dir.path());
1303
1304 let manager = CanonicalManager::new(config).await;
1305 assert!(manager.is_ok());
1306 }
1307}