Skip to main content

actr_cli/core/
pipelines.rs

1//! Operation pipeline definitions
2//!
3//! Defines three core operation pipelines for cross-command logic reuse
4
5use actr_config::{LockFile, LockedDependency, ProtoFileMeta, ServiceSpecMeta};
6use anyhow::Result;
7use std::sync::Arc;
8
9use super::components::*;
10
11// ============================================================================
12// Pipeline result types
13// ============================================================================
14
15/// Install result
16#[derive(Debug, Clone)]
17pub struct InstallResult {
18    pub installed_dependencies: Vec<ResolvedDependency>,
19    pub updated_config: bool,
20    pub updated_lock_file: bool,
21    pub cache_updates: usize,
22    pub warnings: Vec<String>,
23}
24
25impl InstallResult {
26    pub fn success() -> Self {
27        Self {
28            installed_dependencies: Vec::new(),
29            updated_config: false,
30            updated_lock_file: false,
31            cache_updates: 0,
32            warnings: Vec::new(),
33        }
34    }
35
36    pub fn summary(&self) -> String {
37        format!(
38            "Installed {} dependencies, updated {} cache entries",
39            self.installed_dependencies.len(),
40            self.cache_updates
41        )
42    }
43}
44
45/// Install plan
46#[derive(Debug, Clone)]
47pub struct InstallPlan {
48    pub dependencies_to_install: Vec<DependencySpec>,
49    pub resolved_dependencies: Vec<ResolvedDependency>,
50    pub estimated_cache_size: u64,
51    pub required_permissions: Vec<String>,
52}
53
54/// Generation options
55#[derive(Debug, Clone)]
56pub struct GenerationOptions {
57    pub input_path: std::path::PathBuf,
58    pub output_path: std::path::PathBuf,
59    pub clean_before_generate: bool,
60    pub generate_scaffold: bool,
61    pub format_code: bool,
62    pub run_checks: bool,
63}
64
65// ============================================================================
66// 1. Validation Pipeline
67// ============================================================================
68
69/// Core validation pipeline - reused by multiple commands
70#[derive(Clone)]
71pub struct ValidationPipeline {
72    config_manager: Arc<dyn ConfigManager>,
73    dependency_resolver: Arc<dyn DependencyResolver>,
74    service_discovery: Arc<dyn ServiceDiscovery>,
75    network_validator: Arc<dyn NetworkValidator>,
76    fingerprint_validator: Arc<dyn FingerprintValidator>,
77}
78
79impl ValidationPipeline {
80    pub fn new(
81        config_manager: Arc<dyn ConfigManager>,
82        dependency_resolver: Arc<dyn DependencyResolver>,
83        service_discovery: Arc<dyn ServiceDiscovery>,
84        network_validator: Arc<dyn NetworkValidator>,
85        fingerprint_validator: Arc<dyn FingerprintValidator>,
86    ) -> Self {
87        Self {
88            config_manager,
89            dependency_resolver,
90            service_discovery,
91            network_validator,
92            fingerprint_validator,
93        }
94    }
95
96    /// Get service discovery component
97    pub fn service_discovery(&self) -> &Arc<dyn ServiceDiscovery> {
98        &self.service_discovery
99    }
100
101    /// Get network validator component
102    pub fn network_validator(&self) -> &Arc<dyn NetworkValidator> {
103        &self.network_validator
104    }
105
106    /// Get config manager component
107    pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
108        &self.config_manager
109    }
110
111    /// Get dependency resolver component
112    pub fn dependency_resolver(&self) -> &Arc<dyn DependencyResolver> {
113        &self.dependency_resolver
114    }
115
116    fn dependency_lookup_key(spec: &DependencySpec) -> String {
117        spec.actr_type
118            .as_ref()
119            .map(|actr_type| actr_type.to_string_repr())
120            .unwrap_or_else(|| spec.name.clone())
121    }
122
123    /// Full project validation flow
124    pub async fn validate_project(&self) -> Result<ValidationReport> {
125        // 1. Config file validation
126        let config_validation = self.config_manager.validate_config().await?;
127
128        // If config file has issues, return immediately
129        if !config_validation.is_valid {
130            return Ok(ValidationReport {
131                is_valid: false,
132                config_validation,
133                dependency_validation: vec![],
134                network_validation: vec![],
135                fingerprint_validation: vec![],
136                conflicts: vec![],
137            });
138        }
139
140        // 2. Dependency resolution and validation
141        let config = self
142            .config_manager
143            .load_config(
144                self.config_manager
145                    .get_project_root()
146                    .join("manifest.toml")
147                    .as_path(),
148            )
149            .await?;
150        let dependency_specs = self.dependency_resolver.resolve_spec(&config).await?;
151
152        let mut service_details = Vec::new();
153        for spec in &dependency_specs {
154            let lookup_key = Self::dependency_lookup_key(spec);
155            match self
156                .service_discovery
157                .get_service_details(&lookup_key)
158                .await
159            {
160                Ok(details) => service_details.push(details),
161                Err(_) => {
162                    // Service might not be available, continue without details
163                }
164            }
165        }
166
167        let resolved_dependencies = self
168            .dependency_resolver
169            .resolve_dependencies(&dependency_specs, &service_details)
170            .await?;
171
172        // 3. Conflict check
173        let conflicts = self
174            .dependency_resolver
175            .check_conflicts(&resolved_dependencies)
176            .await?;
177
178        let dependency_validation = self.validate_dependencies(&dependency_specs).await?;
179        let network_validation = self
180            .validate_network_connectivity(&resolved_dependencies, &NetworkCheckOptions::default())
181            .await?;
182        let fingerprint_validation = self.validate_fingerprints(&resolved_dependencies).await?;
183
184        let is_valid = config_validation.is_valid
185            && dependency_validation.iter().all(|d| d.is_available)
186            && network_validation
187                .iter()
188                .all(|n| !n.is_applicable || n.is_reachable)
189            && fingerprint_validation.iter().all(|f| f.is_valid)
190            && conflicts.is_empty();
191
192        Ok(ValidationReport {
193            is_valid,
194            config_validation,
195            dependency_validation,
196            network_validation,
197            fingerprint_validation,
198            conflicts,
199        })
200    }
201
202    /// Validate a specific list of dependencies
203    /// Note: Multiple aliases pointing to the same service name will be deduplicated
204    pub async fn validate_dependencies(
205        &self,
206        specs: &[DependencySpec],
207    ) -> Result<Vec<DependencyValidation>> {
208        use std::collections::HashMap;
209
210        let mut results = Vec::new();
211        // Cache validation results by service name to avoid duplicate checks
212        let mut validation_cache: HashMap<String, (bool, Option<String>)> = HashMap::new();
213
214        for spec in specs {
215            let lookup_key = Self::dependency_lookup_key(spec);
216            // Check cache first - if we already validated this service name, reuse the result
217            let (is_available, error) = if let Some(cached) = validation_cache.get(&lookup_key) {
218                cached.clone()
219            } else {
220                // Perform validation
221                let (available, err) = match self
222                    .service_discovery
223                    .check_service_availability(&lookup_key)
224                    .await
225                {
226                    Ok(status) => {
227                        if status.is_available {
228                            (true, None)
229                        } else {
230                            // Provide meaningful error when service is not found
231                            (
232                                false,
233                                Some(format!("Service '{}' not found in registry", lookup_key)),
234                            )
235                        }
236                    }
237                    Err(e) => (false, Some(e.to_string())),
238                };
239
240                // Cache the result for this service name
241                validation_cache.insert(lookup_key, (available, err.clone()));
242                (available, err)
243            };
244
245            results.push(DependencyValidation {
246                dependency: spec.alias.clone(),
247                is_available,
248                error,
249            });
250        }
251
252        Ok(results)
253    }
254
255    /// Network connectivity validation
256    pub async fn validate_network_connectivity(
257        &self,
258        deps: &[ResolvedDependency],
259        options: &NetworkCheckOptions,
260    ) -> Result<Vec<NetworkValidation>> {
261        let names = deps.iter().map(|d| d.spec.name.clone()).collect::<Vec<_>>();
262        let network_results = self.network_validator.batch_check(&names, options).await?;
263
264        Ok(network_results
265            .into_iter()
266            .map(|result| {
267                let mut is_applicable = true;
268                let mut error = result.connectivity.error;
269                let mut health = result.health;
270                let mut latency_ms = result.connectivity.response_time_ms;
271
272                if let Some(ref message) = error
273                    && message.starts_with("Address resolution failed: Invalid address format")
274                {
275                    is_applicable = false;
276                    error =
277                        Some("Network check skipped: no endpoint address available".to_string());
278                    health = HealthStatus::Unknown;
279                    latency_ms = None;
280                }
281
282                NetworkValidation {
283                    is_reachable: result.connectivity.is_reachable,
284                    health,
285                    latency_ms,
286                    error,
287                    is_applicable,
288                }
289            })
290            .collect())
291    }
292
293    /// Fingerprint validation
294    pub async fn validate_fingerprints(
295        &self,
296        deps: &[ResolvedDependency],
297    ) -> Result<Vec<FingerprintValidation>> {
298        let mut results = Vec::new();
299
300        for dep in deps {
301            let expected_val = dep.spec.fingerprint.clone().unwrap_or_default();
302            let expected = Fingerprint {
303                algorithm: "sha256".to_string(),
304                value: expected_val,
305            };
306
307            // Compute actual fingerprint (if resolved_dependencies has none, fetch from remote)
308            let actual_fp = if dep.fingerprint.is_empty() {
309                let lookup_key = Self::dependency_lookup_key(&dep.spec);
310                match self
311                    .service_discovery
312                    .get_service_details(&lookup_key)
313                    .await
314                {
315                    Ok(details) => {
316                        let computed = self
317                            .fingerprint_validator
318                            .compute_service_fingerprint(&details.info)
319                            .await?;
320                        Some(computed)
321                    }
322                    Err(e) => {
323                        results.push(FingerprintValidation {
324                            dependency: dep.spec.alias.clone(),
325                            expected,
326                            actual: None,
327                            is_valid: false,
328                            error: Some(e.to_string()),
329                        });
330                        continue;
331                    }
332                }
333            } else {
334                // Already has fingerprint, no need to recompute
335                None
336            };
337
338            let is_valid = if expected.value.is_empty() {
339                true
340            } else if let Some(ref computed) = actual_fp {
341                self.fingerprint_validator
342                    .verify_fingerprint(&expected, computed)
343                    .await
344                    .unwrap_or(false)
345            } else {
346                // Fingerprint already matched (from resolve_dependencies)
347                true
348            };
349
350            results.push(FingerprintValidation {
351                dependency: dep.spec.alias.clone(),
352                expected,
353                actual: actual_fp,
354                is_valid,
355                error: None,
356            });
357        }
358
359        Ok(results)
360    }
361}
362
363// ============================================================================
364// 2. Install Pipeline
365// ============================================================================
366
367/// Install pipeline - built on top of ValidationPipeline
368pub struct InstallPipeline {
369    validation_pipeline: ValidationPipeline,
370    config_manager: Arc<dyn ConfigManager>,
371    cache_manager: Arc<dyn CacheManager>,
372    #[allow(dead_code)]
373    proto_processor: Arc<dyn ProtoProcessor>,
374}
375
376impl InstallPipeline {
377    pub fn new(
378        validation_pipeline: ValidationPipeline,
379        config_manager: Arc<dyn ConfigManager>,
380        cache_manager: Arc<dyn CacheManager>,
381        proto_processor: Arc<dyn ProtoProcessor>,
382    ) -> Self {
383        Self {
384            validation_pipeline,
385            config_manager,
386            cache_manager,
387            proto_processor,
388        }
389    }
390
391    /// Get validation pipeline reference
392    pub fn validation_pipeline(&self) -> &ValidationPipeline {
393        &self.validation_pipeline
394    }
395
396    /// Get config manager reference
397    pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
398        &self.config_manager
399    }
400
401    /// Check-first install flow
402    pub async fn install_dependencies(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
403        // Phase 1: Full validation (reuse ValidationPipeline)
404        let validation_report = self
405            .validation_pipeline
406            .validate_dependencies(specs)
407            .await?;
408
409        // Check validation results
410        let failed_validations: Vec<_> = validation_report
411            .iter()
412            .filter(|v| !v.is_available)
413            .collect();
414
415        if !failed_validations.is_empty() {
416            return Err(anyhow::anyhow!(
417                "Dependency validation failed: {}",
418                failed_validations
419                    .iter()
420                    .map(|v| format!(
421                        "{}: {}",
422                        v.dependency,
423                        v.error.as_deref().unwrap_or("unknown error")
424                    ))
425                    .collect::<Vec<_>>()
426                    .join(", ")
427            ));
428        }
429
430        // Phase 2: Atomic install
431        let backup = self.config_manager.backup_config().await?;
432
433        match self.execute_atomic_install(specs).await {
434            Ok(result) => {
435                // Install succeeded, clean up backup
436                self.config_manager.remove_backup(backup).await?;
437                Ok(result)
438            }
439            Err(e) => {
440                // Install failed, restore backup
441                self.config_manager.restore_backup(backup).await?;
442                Err(e)
443            }
444        }
445    }
446
447    /// Atomic install execution
448    /// Note: Multiple aliases pointing to the same service will be deduplicated -
449    /// only one entry per unique service name will be installed and recorded in lock file
450    async fn execute_atomic_install(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
451        use std::collections::HashSet;
452
453        let mut result = InstallResult::success();
454        let mut installed_services: HashSet<String> = HashSet::new();
455
456        for spec in specs {
457            let lookup_key = ValidationPipeline::dependency_lookup_key(spec);
458            // Skip if we already installed this service (by name)
459            if installed_services.contains(&lookup_key) {
460                tracing::debug!(
461                    "Skipping duplicate service '{}' (alias: '{}')",
462                    lookup_key,
463                    spec.alias
464                );
465                continue;
466            }
467
468            // 1. Get service details (before updating config, ensure we have the full actr_type)
469            let service_details = self
470                .validation_pipeline
471                .service_discovery
472                .get_service_details(&lookup_key)
473                .await?;
474
475            // 2. Build resolved_spec using the canonical actr_type from discovery
476            let mut resolved_spec = spec.clone();
477            resolved_spec.actr_type = Some(service_details.info.actr_type.clone());
478
479            // 3. Update config file (using resolved_spec with actr_type)
480            self.config_manager
481                .update_dependency(&resolved_spec)
482                .await?;
483            result.updated_config = true;
484
485            // 4. Cache proto files
486            self.cache_manager
487                .cache_proto(&spec.name, &service_details.proto_files)
488                .await?;
489
490            result.cache_updates += 1;
491
492            // 5. Record installed dependency
493
494            let resolved_dep = ResolvedDependency {
495                spec: resolved_spec,
496                fingerprint: service_details.info.fingerprint,
497                proto_files: service_details.proto_files,
498            };
499            result.installed_dependencies.push(resolved_dep);
500
501            // Mark this service as installed
502            installed_services.insert(lookup_key);
503        }
504
505        // 4. Update lock file (lock file also deduplicates by name)
506        self.update_lock_file(&result.installed_dependencies)
507            .await?;
508        result.updated_lock_file = true;
509
510        Ok(result)
511    }
512
513    /// Update lock file with new format (no embedded proto content)
514    async fn update_lock_file(&self, dependencies: &[ResolvedDependency]) -> Result<()> {
515        let project_root = self.config_manager.get_project_root();
516        let lock_file_path = project_root.join("manifest.lock.toml");
517
518        // Load existing lock file or create new one
519        let mut lock_file = if lock_file_path.exists() {
520            LockFile::from_file(&lock_file_path).unwrap_or_else(|_| LockFile::new())
521        } else {
522            LockFile::new()
523        };
524
525        // Update dependencies
526        for dep in dependencies {
527            let service_name = dep.spec.name.clone();
528
529            // Create protobuf entries with relative path (no content)
530            let protobufs: Vec<ProtoFileMeta> = dep
531                .proto_files
532                .iter()
533                .map(|pf| {
534                    let file_name = if pf.name.ends_with(".proto") {
535                        pf.name.clone()
536                    } else {
537                        format!("{}.proto", pf.name)
538                    };
539                    // Path relative to proto/remote/ (e.g., "service_name/file.proto")
540                    let path = format!("{}/{}", service_name, file_name);
541
542                    ProtoFileMeta {
543                        path,
544                        fingerprint: String::new(), // TODO: compute semantic fingerprint
545                    }
546                })
547                .collect();
548
549            // Create service spec metadata
550            let spec = ServiceSpecMeta {
551                name: dep.spec.name.clone(),
552                description: None,
553                fingerprint: dep.fingerprint.clone(),
554                protobufs,
555                published_at: None,
556                tags: Vec::new(),
557            };
558
559            // Create locked dependency
560            let actr_type = dep.spec.actr_type.clone().ok_or_else(|| {
561                anyhow::anyhow!("Actr type is required for dependency: {}", service_name)
562            })?;
563            let locked_dep = LockedDependency::new(actr_type.to_string_repr(), spec);
564            lock_file.add_dependency(locked_dep);
565        }
566
567        // Update timestamp and save
568        lock_file.update_timestamp();
569        lock_file.save_to_file(&lock_file_path)?;
570
571        tracing::info!("Updated lock file: {} dependencies", dependencies.len());
572        Ok(())
573    }
574}
575
576// ============================================================================
577// 3. Generation Pipeline
578// ============================================================================
579
580/// Code generation pipeline
581pub struct GenerationPipeline {
582    #[allow(dead_code)]
583    config_manager: Arc<dyn ConfigManager>,
584    proto_processor: Arc<dyn ProtoProcessor>,
585    #[allow(dead_code)]
586    cache_manager: Arc<dyn CacheManager>,
587}
588
589impl GenerationPipeline {
590    pub fn new(
591        config_manager: Arc<dyn ConfigManager>,
592        proto_processor: Arc<dyn ProtoProcessor>,
593        cache_manager: Arc<dyn CacheManager>,
594    ) -> Self {
595        Self {
596            config_manager,
597            proto_processor,
598            cache_manager,
599        }
600    }
601
602    /// Execute code generation
603    pub async fn generate_code(&self, options: &GenerationOptions) -> Result<GenerationResult> {
604        // 1. Clean output directory (if needed)
605        if options.clean_before_generate {
606            self.clean_output_directory(&options.output_path).await?;
607        }
608
609        // 2. Discover local proto files
610        let local_protos = self
611            .proto_processor
612            .discover_proto_files(&options.input_path)
613            .await?;
614
615        // 3. Load dependency proto files
616        let dependency_protos = self.load_dependency_protos().await?;
617
618        // 4. Validate proto syntax
619        let all_protos = [local_protos, dependency_protos].concat();
620        let validation = self
621            .proto_processor
622            .validate_proto_syntax(&all_protos)
623            .await?;
624
625        if !validation.is_valid {
626            return Err(anyhow::anyhow!("Proto file syntax validation failed"));
627        }
628
629        // 5. Execute code generation
630        let mut generation_result = self
631            .proto_processor
632            .generate_code(&options.input_path, &options.output_path)
633            .await?;
634
635        // 6. Post-processing: format and check
636        if options.format_code {
637            self.format_generated_code(&generation_result.generated_files)
638                .await?;
639        }
640
641        if options.run_checks {
642            let check_result = self
643                .run_code_checks(&generation_result.generated_files)
644                .await?;
645            generation_result.warnings.extend(check_result.warnings);
646            generation_result.errors.extend(check_result.errors);
647        }
648
649        Ok(generation_result)
650    }
651
652    /// Clean the output directory
653    async fn clean_output_directory(&self, output_path: &std::path::Path) -> Result<()> {
654        if output_path.exists() {
655            std::fs::remove_dir_all(output_path)?;
656        }
657        std::fs::create_dir_all(output_path)?;
658        Ok(())
659    }
660
661    /// Load dependency proto files
662    async fn load_dependency_protos(&self) -> Result<Vec<ProtoFile>> {
663        // TODO: Load dependency proto files from cache
664        Ok(Vec::new())
665    }
666
667    /// Format generated code
668    async fn format_generated_code(&self, files: &[std::path::PathBuf]) -> Result<()> {
669        for file in files {
670            if file.extension().and_then(|s| s.to_str()) == Some("rs") {
671                // Run rustfmt
672                let output = std::process::Command::new("rustfmt").arg(file).output()?;
673
674                if !output.status.success() {
675                    eprintln!(
676                        "rustfmt warning: {}",
677                        String::from_utf8_lossy(&output.stderr)
678                    );
679                }
680            }
681        }
682        Ok(())
683    }
684
685    /// Run code checks
686    async fn run_code_checks(&self, files: &[std::path::PathBuf]) -> Result<GenerationResult> {
687        // TODO: Run cargo check or other code validation tools
688        Ok(GenerationResult {
689            generated_files: files.to_vec(),
690            warnings: vec![],
691            errors: vec![],
692        })
693    }
694}