Skip to main content

actr_cli/core/
pipelines.rs

1//! 操作管道定义
2//!
3//! 定义了三个核心操作管道,实现命令间的逻辑复用
4
5use actr_config::{LockFile, LockedDependency, ProtoFileMeta, ServiceSpecMeta};
6use actr_protocol::ActrTypeExt;
7use anyhow::Result;
8use std::sync::Arc;
9
10use super::components::*;
11
12// ============================================================================
13// 管道结果类型
14// ============================================================================
15
16/// 安装结果
17#[derive(Debug, Clone)]
18pub struct InstallResult {
19    pub installed_dependencies: Vec<ResolvedDependency>,
20    pub updated_config: bool,
21    pub updated_lock_file: bool,
22    pub cache_updates: usize,
23    pub warnings: Vec<String>,
24}
25
26impl InstallResult {
27    pub fn success() -> Self {
28        Self {
29            installed_dependencies: Vec::new(),
30            updated_config: false,
31            updated_lock_file: false,
32            cache_updates: 0,
33            warnings: Vec::new(),
34        }
35    }
36
37    pub fn summary(&self) -> String {
38        format!(
39            "Installed {} dependencies, updated {} cache entries",
40            self.installed_dependencies.len(),
41            self.cache_updates
42        )
43    }
44}
45
46/// 安装计划
47#[derive(Debug, Clone)]
48pub struct InstallPlan {
49    pub dependencies_to_install: Vec<DependencySpec>,
50    pub resolved_dependencies: Vec<ResolvedDependency>,
51    pub estimated_cache_size: u64,
52    pub required_permissions: Vec<String>,
53}
54
55/// 生成选项
56#[derive(Debug, Clone)]
57pub struct GenerationOptions {
58    pub input_path: std::path::PathBuf,
59    pub output_path: std::path::PathBuf,
60    pub clean_before_generate: bool,
61    pub generate_scaffold: bool,
62    pub format_code: bool,
63    pub run_checks: bool,
64}
65
66// ============================================================================
67// 1. 验证管道 (ValidationPipeline)
68// ============================================================================
69
70/// 核心验证管道 - 被多个命令复用
71#[derive(Clone)]
72pub struct ValidationPipeline {
73    config_manager: Arc<dyn ConfigManager>,
74    dependency_resolver: Arc<dyn DependencyResolver>,
75    service_discovery: Arc<dyn ServiceDiscovery>,
76    network_validator: Arc<dyn NetworkValidator>,
77    fingerprint_validator: Arc<dyn FingerprintValidator>,
78}
79
80impl ValidationPipeline {
81    pub fn new(
82        config_manager: Arc<dyn ConfigManager>,
83        dependency_resolver: Arc<dyn DependencyResolver>,
84        service_discovery: Arc<dyn ServiceDiscovery>,
85        network_validator: Arc<dyn NetworkValidator>,
86        fingerprint_validator: Arc<dyn FingerprintValidator>,
87    ) -> Self {
88        Self {
89            config_manager,
90            dependency_resolver,
91            service_discovery,
92            network_validator,
93            fingerprint_validator,
94        }
95    }
96
97    /// Get service discovery component
98    pub fn service_discovery(&self) -> &Arc<dyn ServiceDiscovery> {
99        &self.service_discovery
100    }
101
102    /// Get network validator component
103    pub fn network_validator(&self) -> &Arc<dyn NetworkValidator> {
104        &self.network_validator
105    }
106
107    /// Get config manager component
108    pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
109        &self.config_manager
110    }
111
112    /// 完整的项目验证流程
113    pub async fn validate_project(&self) -> Result<ValidationReport> {
114        // 1. 配置文件验证
115        let config_validation = self.config_manager.validate_config().await?;
116
117        // 如果配置文件都有问题,直接返回
118        if !config_validation.is_valid {
119            return Ok(ValidationReport {
120                is_valid: false,
121                config_validation,
122                dependency_validation: vec![],
123                network_validation: vec![],
124                fingerprint_validation: vec![],
125                conflicts: vec![],
126            });
127        }
128
129        // 2. 依赖解析和验证
130        let config = self
131            .config_manager
132            .load_config(
133                self.config_manager
134                    .get_project_root()
135                    .join("Actr.toml")
136                    .as_path(),
137            )
138            .await?;
139        let dependency_specs = self.dependency_resolver.resolve_spec(&config).await?;
140
141        let mut service_details = Vec::new();
142        for spec in &dependency_specs {
143            match self.service_discovery.get_service_details(&spec.name).await {
144                Ok(details) => service_details.push(details),
145                Err(_) => {
146                    // Service might not be available, continue without details
147                }
148            }
149        }
150
151        let resolved_dependencies = self
152            .dependency_resolver
153            .resolve_dependencies(&dependency_specs, &service_details)
154            .await?;
155
156        // 3. 冲突检查
157        let conflicts = self
158            .dependency_resolver
159            .check_conflicts(&resolved_dependencies)
160            .await?;
161
162        let dependency_validation = self.validate_dependencies(&dependency_specs).await?;
163        let network_validation = self
164            .validate_network_connectivity(&resolved_dependencies, &NetworkCheckOptions::default())
165            .await?;
166        let fingerprint_validation = self.validate_fingerprints(&resolved_dependencies).await?;
167
168        let is_valid = config_validation.is_valid
169            && dependency_validation.iter().all(|d| d.is_available)
170            && network_validation
171                .iter()
172                .all(|n| !n.is_applicable || n.is_reachable)
173            && fingerprint_validation.iter().all(|f| f.is_valid)
174            && conflicts.is_empty();
175
176        Ok(ValidationReport {
177            is_valid,
178            config_validation,
179            dependency_validation,
180            network_validation,
181            fingerprint_validation,
182            conflicts,
183        })
184    }
185
186    /// 验证特定依赖列表
187    /// Note: Multiple aliases pointing to the same service name will be deduplicated
188    pub async fn validate_dependencies(
189        &self,
190        specs: &[DependencySpec],
191    ) -> Result<Vec<DependencyValidation>> {
192        use std::collections::HashMap;
193
194        let mut results = Vec::new();
195        // Cache validation results by service name to avoid duplicate checks
196        let mut validation_cache: HashMap<String, (bool, Option<String>)> = HashMap::new();
197
198        for spec in specs {
199            // Check cache first - if we already validated this service name, reuse the result
200            let (is_available, error) = if let Some(cached) = validation_cache.get(&spec.name) {
201                cached.clone()
202            } else {
203                // Perform validation
204                let (available, err) = match self
205                    .service_discovery
206                    .check_service_availability(&spec.name)
207                    .await
208                {
209                    Ok(status) => {
210                        if status.is_available {
211                            (true, None)
212                        } else {
213                            // Provide meaningful error when service is not found
214                            (
215                                false,
216                                Some(format!("Service '{}' not found in registry", spec.name)),
217                            )
218                        }
219                    }
220                    Err(e) => (false, Some(e.to_string())),
221                };
222
223                // Cache the result for this service name
224                validation_cache.insert(spec.name.clone(), (available, err.clone()));
225                (available, err)
226            };
227
228            results.push(DependencyValidation {
229                dependency: spec.alias.clone(),
230                is_available,
231                error,
232            });
233        }
234
235        Ok(results)
236    }
237
238    /// 网络连通性验证
239    pub async fn validate_network_connectivity(
240        &self,
241        deps: &[ResolvedDependency],
242        options: &NetworkCheckOptions,
243    ) -> Result<Vec<NetworkValidation>> {
244        let names = deps.iter().map(|d| d.spec.name.clone()).collect::<Vec<_>>();
245        let network_results = self.network_validator.batch_check(&names, options).await?;
246
247        Ok(network_results
248            .into_iter()
249            .map(|result| {
250                let mut is_applicable = true;
251                let mut error = result.connectivity.error;
252                let mut health = result.health;
253                let mut latency_ms = result.connectivity.response_time_ms;
254
255                if let Some(ref message) = error
256                    && message.starts_with("Address resolution failed: Invalid address format")
257                {
258                    is_applicable = false;
259                    error =
260                        Some("Network check skipped: no endpoint address available".to_string());
261                    health = HealthStatus::Unknown;
262                    latency_ms = None;
263                }
264
265                NetworkValidation {
266                    is_reachable: result.connectivity.is_reachable,
267                    health,
268                    latency_ms,
269                    error,
270                    is_applicable,
271                }
272            })
273            .collect())
274    }
275
276    /// 指纹验证
277    pub async fn validate_fingerprints(
278        &self,
279        deps: &[ResolvedDependency],
280    ) -> Result<Vec<FingerprintValidation>> {
281        let mut results = Vec::new();
282
283        for dep in deps {
284            let expected_val = dep.spec.fingerprint.clone().unwrap_or_default();
285            let expected = Fingerprint {
286                algorithm: "sha256".to_string(),
287                value: expected_val,
288            };
289
290            // 计算实际指纹(如果 resolved_dependencies 中没有指纹,从远程获取)
291            let actual_fp = if dep.fingerprint.is_empty() {
292                match self
293                    .service_discovery
294                    .get_service_details(&dep.spec.name)
295                    .await
296                {
297                    Ok(details) => {
298                        let computed = self
299                            .fingerprint_validator
300                            .compute_service_fingerprint(&details.info)
301                            .await?;
302                        Some(computed)
303                    }
304                    Err(e) => {
305                        results.push(FingerprintValidation {
306                            dependency: dep.spec.alias.clone(),
307                            expected,
308                            actual: None,
309                            is_valid: false,
310                            error: Some(e.to_string()),
311                        });
312                        continue;
313                    }
314                }
315            } else {
316                // 已有指纹,无需重新计算
317                None
318            };
319
320            let is_valid = if expected.value.is_empty() {
321                true
322            } else if let Some(ref computed) = actual_fp {
323                self.fingerprint_validator
324                    .verify_fingerprint(&expected, computed)
325                    .await
326                    .unwrap_or(false)
327            } else {
328                // 指纹已匹配(来自 resolve_dependencies)
329                true
330            };
331
332            results.push(FingerprintValidation {
333                dependency: dep.spec.alias.clone(),
334                expected,
335                actual: actual_fp,
336                is_valid,
337                error: None,
338            });
339        }
340
341        Ok(results)
342    }
343}
344
345// ============================================================================
346// 2. 安装管道 (InstallPipeline)
347// ============================================================================
348
349/// 安装管道 - 基于ValidationPipeline构建
350pub struct InstallPipeline {
351    validation_pipeline: ValidationPipeline,
352    config_manager: Arc<dyn ConfigManager>,
353    cache_manager: Arc<dyn CacheManager>,
354    #[allow(dead_code)]
355    proto_processor: Arc<dyn ProtoProcessor>,
356}
357
358impl InstallPipeline {
359    pub fn new(
360        validation_pipeline: ValidationPipeline,
361        config_manager: Arc<dyn ConfigManager>,
362        cache_manager: Arc<dyn CacheManager>,
363        proto_processor: Arc<dyn ProtoProcessor>,
364    ) -> Self {
365        Self {
366            validation_pipeline,
367            config_manager,
368            cache_manager,
369            proto_processor,
370        }
371    }
372
373    /// Get validation pipeline reference
374    pub fn validation_pipeline(&self) -> &ValidationPipeline {
375        &self.validation_pipeline
376    }
377
378    /// Get config manager reference
379    pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
380        &self.config_manager
381    }
382
383    /// Check-First 安装流程
384    pub async fn install_dependencies(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
385        // 🔍 阶段1: 完整验证 (复用ValidationPipeline)
386        let validation_report = self
387            .validation_pipeline
388            .validate_dependencies(specs)
389            .await?;
390
391        // 检查验证结果
392        let failed_validations: Vec<_> = validation_report
393            .iter()
394            .filter(|v| !v.is_available)
395            .collect();
396
397        if !failed_validations.is_empty() {
398            return Err(anyhow::anyhow!(
399                "依赖验证失败: {}",
400                failed_validations
401                    .iter()
402                    .map(|v| format!(
403                        "{}: {}",
404                        v.dependency,
405                        v.error.as_deref().unwrap_or("unknown error")
406                    ))
407                    .collect::<Vec<_>>()
408                    .join(", ")
409            ));
410        }
411
412        // 📝 阶段2: 原子性安装
413        let backup = self.config_manager.backup_config().await?;
414
415        match self.execute_atomic_install(specs).await {
416            Ok(result) => {
417                // 安装成功,清理备份
418                self.config_manager.remove_backup(backup).await?;
419                Ok(result)
420            }
421            Err(e) => {
422                // 安装失败,恢复备份
423                self.config_manager.restore_backup(backup).await?;
424                Err(e)
425            }
426        }
427    }
428
429    /// 原子性安装执行
430    /// Note: Multiple aliases pointing to the same service will be deduplicated -
431    /// only one entry per unique service name will be installed and recorded in lock file
432    async fn execute_atomic_install(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
433        use std::collections::HashSet;
434
435        let mut result = InstallResult::success();
436        let mut installed_services: HashSet<String> = HashSet::new();
437
438        for spec in specs {
439            // Skip if we already installed this service (by name)
440            if installed_services.contains(&spec.name) {
441                tracing::debug!(
442                    "Skipping duplicate service '{}' (alias: '{}')",
443                    spec.name,
444                    spec.alias
445                );
446                continue;
447            }
448
449            // 1. 获取服务详情(在更新配置之前,确保我们有完整的 actr_type)
450            let service_details = self
451                .validation_pipeline
452                .service_discovery
453                .get_service_details(&spec.name)
454                .await?;
455
456            // 2. 构建 resolved_spec,确保包含 actr_type
457            let mut resolved_spec = spec.clone();
458            // 如果 spec 中没有 actr_type,使用从服务详情中获取的
459            if resolved_spec.actr_type.is_none() {
460                resolved_spec.actr_type = Some(service_details.info.actr_type.clone());
461            }
462
463            // 3. 更新配置文件(使用包含 actr_type 的 resolved_spec)
464            self.config_manager
465                .update_dependency(&resolved_spec)
466                .await?;
467            result.updated_config = true;
468
469            // 4. 缓存Proto文件
470            self.cache_manager
471                .cache_proto(&spec.name, &service_details.proto_files)
472                .await?;
473
474            result.cache_updates += 1;
475
476            // 5. 记录已安装的依赖
477
478            let resolved_dep = ResolvedDependency {
479                spec: resolved_spec,
480                fingerprint: service_details.info.fingerprint,
481                proto_files: service_details.proto_files,
482            };
483            result.installed_dependencies.push(resolved_dep);
484
485            // Mark this service as installed
486            installed_services.insert(spec.name.clone());
487        }
488
489        // 4. 更新锁文件 (lock file also deduplicates by name)
490        self.update_lock_file(&result.installed_dependencies)
491            .await?;
492        result.updated_lock_file = true;
493
494        Ok(result)
495    }
496
497    /// Update lock file with new format (no embedded proto content)
498    async fn update_lock_file(&self, dependencies: &[ResolvedDependency]) -> Result<()> {
499        let project_root = self.config_manager.get_project_root();
500        let lock_file_path = project_root.join("Actr.lock.toml");
501
502        // Load existing lock file or create new one
503        let mut lock_file = if lock_file_path.exists() {
504            LockFile::from_file(&lock_file_path).unwrap_or_else(|_| LockFile::new())
505        } else {
506            LockFile::new()
507        };
508
509        // Update dependencies
510        for dep in dependencies {
511            let service_name = dep.spec.name.clone();
512
513            // Create protobuf entries with relative path (no content)
514            let protobufs: Vec<ProtoFileMeta> = dep
515                .proto_files
516                .iter()
517                .map(|pf| {
518                    let file_name = if pf.name.ends_with(".proto") {
519                        pf.name.clone()
520                    } else {
521                        format!("{}.proto", pf.name)
522                    };
523                    // Path relative to proto/remote/ (e.g., "service_name/file.proto")
524                    let path = format!("{}/{}", service_name, file_name);
525
526                    ProtoFileMeta {
527                        path,
528                        fingerprint: String::new(), // TODO: compute semantic fingerprint
529                    }
530                })
531                .collect();
532
533            // Create service spec metadata
534            let spec = ServiceSpecMeta {
535                name: dep.spec.name.clone(),
536                description: None,
537                fingerprint: dep.fingerprint.clone(),
538                protobufs,
539                published_at: None,
540                tags: Vec::new(),
541            };
542
543            // Create locked dependency
544            let actr_type = dep.spec.actr_type.clone().ok_or_else(|| {
545                anyhow::anyhow!("Actr type is required for dependency: {}", service_name)
546            })?;
547            let locked_dep = LockedDependency::new(actr_type.to_string_repr(), spec);
548            lock_file.add_dependency(locked_dep);
549        }
550
551        // Update timestamp and save
552        lock_file.update_timestamp();
553        lock_file.save_to_file(&lock_file_path)?;
554
555        tracing::info!("Updated lock file: {} dependencies", dependencies.len());
556        Ok(())
557    }
558}
559
560// ============================================================================
561// 3. 生成管道 (GenerationPipeline)
562// ============================================================================
563
564/// 代码生成管道
565pub struct GenerationPipeline {
566    #[allow(dead_code)]
567    config_manager: Arc<dyn ConfigManager>,
568    proto_processor: Arc<dyn ProtoProcessor>,
569    #[allow(dead_code)]
570    cache_manager: Arc<dyn CacheManager>,
571}
572
573impl GenerationPipeline {
574    pub fn new(
575        config_manager: Arc<dyn ConfigManager>,
576        proto_processor: Arc<dyn ProtoProcessor>,
577        cache_manager: Arc<dyn CacheManager>,
578    ) -> Self {
579        Self {
580            config_manager,
581            proto_processor,
582            cache_manager,
583        }
584    }
585
586    /// 执行代码生成
587    pub async fn generate_code(&self, options: &GenerationOptions) -> Result<GenerationResult> {
588        // 1. 清理输出目录(如果需要)
589        if options.clean_before_generate {
590            self.clean_output_directory(&options.output_path).await?;
591        }
592
593        // 2. 发现本地Proto文件
594        let local_protos = self
595            .proto_processor
596            .discover_proto_files(&options.input_path)
597            .await?;
598
599        // 3. 加载依赖的Proto文件
600        let dependency_protos = self.load_dependency_protos().await?;
601
602        // 4. 验证Proto语法
603        let all_protos = [local_protos, dependency_protos].concat();
604        let validation = self
605            .proto_processor
606            .validate_proto_syntax(&all_protos)
607            .await?;
608
609        if !validation.is_valid {
610            return Err(anyhow::anyhow!("Proto file syntax validation failed"));
611        }
612
613        // 5. 执行代码生成
614        let mut generation_result = self
615            .proto_processor
616            .generate_code(&options.input_path, &options.output_path)
617            .await?;
618
619        // 6. 后处理:格式化和检查
620        if options.format_code {
621            self.format_generated_code(&generation_result.generated_files)
622                .await?;
623        }
624
625        if options.run_checks {
626            let check_result = self
627                .run_code_checks(&generation_result.generated_files)
628                .await?;
629            generation_result.warnings.extend(check_result.warnings);
630            generation_result.errors.extend(check_result.errors);
631        }
632
633        Ok(generation_result)
634    }
635
636    /// 清理输出目录
637    async fn clean_output_directory(&self, output_path: &std::path::Path) -> Result<()> {
638        if output_path.exists() {
639            std::fs::remove_dir_all(output_path)?;
640        }
641        std::fs::create_dir_all(output_path)?;
642        Ok(())
643    }
644
645    /// 加载依赖的Proto文件
646    async fn load_dependency_protos(&self) -> Result<Vec<ProtoFile>> {
647        // TODO: 从缓存中加载依赖的Proto文件
648        Ok(Vec::new())
649    }
650
651    /// 格式化生成的代码
652    async fn format_generated_code(&self, files: &[std::path::PathBuf]) -> Result<()> {
653        for file in files {
654            if file.extension().and_then(|s| s.to_str()) == Some("rs") {
655                // 运行 rustfmt
656                let output = std::process::Command::new("rustfmt").arg(file).output()?;
657
658                if !output.status.success() {
659                    eprintln!(
660                        "rustfmt warning: {}",
661                        String::from_utf8_lossy(&output.stderr)
662                    );
663                }
664            }
665        }
666        Ok(())
667    }
668
669    /// 运行代码检查
670    async fn run_code_checks(&self, files: &[std::path::PathBuf]) -> Result<GenerationResult> {
671        // TODO: 运行 cargo check 或其他代码检查工具
672        Ok(GenerationResult {
673            generated_files: files.to_vec(),
674            warnings: vec![],
675            errors: vec![],
676        })
677    }
678}