actr_cli/core/
pipelines.rs

1//! 操作管道定义
2//!
3//! 定义了三个核心操作管道,实现命令间的逻辑复用
4
5use actr_config::{LockFile, LockedDependency, ProtoFileMeta, ServiceSpecMeta};
6use actr_protocol::ActrTypeExt;
7use anyhow::Result;
8use std::sync::Arc;
9
10use super::components::*;
11
12// ============================================================================
13// 管道结果类型
14// ============================================================================
15
16/// 安装结果
17#[derive(Debug, Clone)]
18pub struct InstallResult {
19    pub installed_dependencies: Vec<ResolvedDependency>,
20    pub updated_config: bool,
21    pub updated_lock_file: bool,
22    pub cache_updates: usize,
23    pub warnings: Vec<String>,
24}
25
26impl InstallResult {
27    pub fn success() -> Self {
28        Self {
29            installed_dependencies: Vec::new(),
30            updated_config: false,
31            updated_lock_file: false,
32            cache_updates: 0,
33            warnings: Vec::new(),
34        }
35    }
36
37    pub fn summary(&self) -> String {
38        format!(
39            "Installed {} dependencies, updated {} cache entries",
40            self.installed_dependencies.len(),
41            self.cache_updates
42        )
43    }
44}
45
46/// 安装计划
47#[derive(Debug, Clone)]
48pub struct InstallPlan {
49    pub dependencies_to_install: Vec<DependencySpec>,
50    pub resolved_dependencies: Vec<ResolvedDependency>,
51    pub estimated_cache_size: u64,
52    pub required_permissions: Vec<String>,
53}
54
55/// 生成选项
56#[derive(Debug, Clone)]
57pub struct GenerationOptions {
58    pub input_path: std::path::PathBuf,
59    pub output_path: std::path::PathBuf,
60    pub clean_before_generate: bool,
61    pub generate_scaffold: bool,
62    pub format_code: bool,
63    pub run_checks: bool,
64}
65
66// ============================================================================
67// 1. 验证管道 (ValidationPipeline)
68// ============================================================================
69
70/// 核心验证管道 - 被多个命令复用
71#[derive(Clone)]
72pub struct ValidationPipeline {
73    config_manager: Arc<dyn ConfigManager>,
74    dependency_resolver: Arc<dyn DependencyResolver>,
75    service_discovery: Arc<dyn ServiceDiscovery>,
76    network_validator: Arc<dyn NetworkValidator>,
77    fingerprint_validator: Arc<dyn FingerprintValidator>,
78}
79
80impl ValidationPipeline {
81    pub fn new(
82        config_manager: Arc<dyn ConfigManager>,
83        dependency_resolver: Arc<dyn DependencyResolver>,
84        service_discovery: Arc<dyn ServiceDiscovery>,
85        network_validator: Arc<dyn NetworkValidator>,
86        fingerprint_validator: Arc<dyn FingerprintValidator>,
87    ) -> Self {
88        Self {
89            config_manager,
90            dependency_resolver,
91            service_discovery,
92            network_validator,
93            fingerprint_validator,
94        }
95    }
96
97    /// Get service discovery component
98    pub fn service_discovery(&self) -> &Arc<dyn ServiceDiscovery> {
99        &self.service_discovery
100    }
101
102    /// Get network validator component
103    pub fn network_validator(&self) -> &Arc<dyn NetworkValidator> {
104        &self.network_validator
105    }
106
107    /// Get config manager component
108    pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
109        &self.config_manager
110    }
111
112    /// 完整的项目验证流程
113    pub async fn validate_project(&self) -> Result<ValidationReport> {
114        // 1. 配置文件验证
115        let config_validation = self.config_manager.validate_config().await?;
116
117        // 如果配置文件都有问题,直接返回
118        if !config_validation.is_valid {
119            return Ok(ValidationReport {
120                is_valid: false,
121                config_validation,
122                dependency_validation: vec![],
123                network_validation: vec![],
124                fingerprint_validation: vec![],
125                conflicts: vec![],
126            });
127        }
128
129        // 2. 依赖解析和验证
130        let config = self
131            .config_manager
132            .load_config(
133                self.config_manager
134                    .get_project_root()
135                    .join("Actr.toml")
136                    .as_path(),
137            )
138            .await?;
139        let dependency_specs = self.dependency_resolver.resolve_spec(&config).await?;
140
141        let mut service_details = Vec::new();
142        for spec in &dependency_specs {
143            match self.service_discovery.get_service_details(&spec.name).await {
144                Ok(details) => service_details.push(details),
145                Err(_) => {
146                    // Service might not be available, continue without details
147                }
148            }
149        }
150
151        let resolved_dependencies = self
152            .dependency_resolver
153            .resolve_dependencies(&dependency_specs, &service_details)
154            .await?;
155
156        // 3. 冲突检查
157        let conflicts = self
158            .dependency_resolver
159            .check_conflicts(&resolved_dependencies)
160            .await?;
161
162        let dependency_validation = self.validate_dependencies(&dependency_specs).await?;
163        let network_validation = self
164            .validate_network_connectivity(&resolved_dependencies)
165            .await?;
166        let fingerprint_validation = self.validate_fingerprints(&resolved_dependencies).await?;
167
168        let is_valid = config_validation.is_valid
169            && dependency_validation.iter().all(|d| d.is_available)
170            && network_validation.iter().all(|n| n.is_reachable)
171            && fingerprint_validation.iter().all(|f| f.is_valid)
172            && conflicts.is_empty();
173
174        Ok(ValidationReport {
175            is_valid,
176            config_validation,
177            dependency_validation,
178            network_validation,
179            fingerprint_validation,
180            conflicts,
181        })
182    }
183
184    /// 验证特定依赖列表
185    /// Note: Multiple aliases pointing to the same service name will be deduplicated
186    pub async fn validate_dependencies(
187        &self,
188        specs: &[DependencySpec],
189    ) -> Result<Vec<DependencyValidation>> {
190        use std::collections::HashMap;
191
192        let mut results = Vec::new();
193        // Cache validation results by service name to avoid duplicate checks
194        let mut validation_cache: HashMap<String, (bool, Option<String>)> = HashMap::new();
195
196        for spec in specs {
197            // Check cache first - if we already validated this service name, reuse the result
198            let (is_available, error) = if let Some(cached) = validation_cache.get(&spec.name) {
199                cached.clone()
200            } else {
201                // Perform validation
202                let (available, err) = match self
203                    .service_discovery
204                    .check_service_availability(&spec.name)
205                    .await
206                {
207                    Ok(status) => {
208                        if status.is_available {
209                            (true, None)
210                        } else {
211                            // Provide meaningful error when service is not found
212                            (
213                                false,
214                                Some(format!("Service '{}' not found in registry", spec.name)),
215                            )
216                        }
217                    }
218                    Err(e) => (false, Some(e.to_string())),
219                };
220
221                // Cache the result for this service name
222                validation_cache.insert(spec.name.clone(), (available, err.clone()));
223                (available, err)
224            };
225
226            results.push(DependencyValidation {
227                dependency: spec.alias.clone(),
228                is_available,
229                error,
230            });
231        }
232
233        Ok(results)
234    }
235
236    /// 网络连通性验证
237    async fn validate_network_connectivity(
238        &self,
239        deps: &[ResolvedDependency],
240    ) -> Result<Vec<NetworkValidation>> {
241        let names = deps.iter().map(|d| d.spec.name.clone()).collect::<Vec<_>>();
242        let network_results = self.network_validator.batch_check(&names).await?;
243
244        Ok(network_results
245            .into_iter()
246            .map(|result| NetworkValidation {
247                is_reachable: result.connectivity.is_reachable,
248                health: result.health,
249                latency_ms: result.connectivity.response_time_ms,
250                error: result.connectivity.error,
251            })
252            .collect())
253    }
254
255    /// 指纹验证
256    async fn validate_fingerprints(
257        &self,
258        deps: &[ResolvedDependency],
259    ) -> Result<Vec<FingerprintValidation>> {
260        let mut results = Vec::new();
261
262        for dep in deps {
263            let expected = Fingerprint {
264                algorithm: "sha256".to_string(),
265                value: dep.fingerprint.clone(),
266            };
267
268            // 计算实际指纹(如果 resolved_dependencies 中没有指纹,从远程获取)
269            let actual_fp = if dep.fingerprint.is_empty() {
270                match self
271                    .service_discovery
272                    .get_service_details(&dep.spec.name)
273                    .await
274                {
275                    Ok(details) => {
276                        let computed = self
277                            .fingerprint_validator
278                            .compute_service_fingerprint(&details.info)
279                            .await?;
280                        Some(computed)
281                    }
282                    Err(e) => {
283                        results.push(FingerprintValidation {
284                            dependency: dep.spec.alias.clone(),
285                            expected,
286                            actual: None,
287                            is_valid: false,
288                            error: Some(e.to_string()),
289                        });
290                        continue;
291                    }
292                }
293            } else {
294                // 已有指纹,无需重新计算
295                None
296            };
297
298            let is_valid = if expected.value.is_empty() {
299                true
300            } else if let Some(ref computed) = actual_fp {
301                self.fingerprint_validator
302                    .verify_fingerprint(&expected, computed)
303                    .await
304                    .unwrap_or(false)
305            } else {
306                // 指纹已匹配(来自 resolve_dependencies)
307                true
308            };
309
310            results.push(FingerprintValidation {
311                dependency: dep.spec.alias.clone(),
312                expected,
313                actual: actual_fp,
314                is_valid,
315                error: None,
316            });
317        }
318
319        Ok(results)
320    }
321}
322
323// ============================================================================
324// 2. 安装管道 (InstallPipeline)
325// ============================================================================
326
327/// 安装管道 - 基于ValidationPipeline构建
328pub struct InstallPipeline {
329    validation_pipeline: ValidationPipeline,
330    config_manager: Arc<dyn ConfigManager>,
331    cache_manager: Arc<dyn CacheManager>,
332    #[allow(dead_code)]
333    proto_processor: Arc<dyn ProtoProcessor>,
334}
335
336impl InstallPipeline {
337    pub fn new(
338        validation_pipeline: ValidationPipeline,
339        config_manager: Arc<dyn ConfigManager>,
340        cache_manager: Arc<dyn CacheManager>,
341        proto_processor: Arc<dyn ProtoProcessor>,
342    ) -> Self {
343        Self {
344            validation_pipeline,
345            config_manager,
346            cache_manager,
347            proto_processor,
348        }
349    }
350
351    /// Get validation pipeline reference
352    pub fn validation_pipeline(&self) -> &ValidationPipeline {
353        &self.validation_pipeline
354    }
355
356    /// Get config manager reference
357    pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
358        &self.config_manager
359    }
360
361    /// Check-First 安装流程
362    pub async fn install_dependencies(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
363        // 🔍 阶段1: 完整验证 (复用ValidationPipeline)
364        let validation_report = self
365            .validation_pipeline
366            .validate_dependencies(specs)
367            .await?;
368
369        // 检查验证结果
370        let failed_validations: Vec<_> = validation_report
371            .iter()
372            .filter(|v| !v.is_available)
373            .collect();
374
375        if !failed_validations.is_empty() {
376            return Err(anyhow::anyhow!(
377                "依赖验证失败: {}",
378                failed_validations
379                    .iter()
380                    .map(|v| format!(
381                        "{}: {}",
382                        v.dependency,
383                        v.error.as_deref().unwrap_or("unknown error")
384                    ))
385                    .collect::<Vec<_>>()
386                    .join(", ")
387            ));
388        }
389
390        // 📝 阶段2: 原子性安装
391        let backup = self.config_manager.backup_config().await?;
392
393        match self.execute_atomic_install(specs).await {
394            Ok(result) => {
395                // 安装成功,清理备份
396                self.config_manager.remove_backup(backup).await?;
397                Ok(result)
398            }
399            Err(e) => {
400                // 安装失败,恢复备份
401                self.config_manager.restore_backup(backup).await?;
402                Err(e)
403            }
404        }
405    }
406
407    /// 原子性安装执行
408    /// Note: Multiple aliases pointing to the same service will be deduplicated -
409    /// only one entry per unique service name will be installed and recorded in lock file
410    async fn execute_atomic_install(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
411        use std::collections::HashSet;
412
413        let mut result = InstallResult::success();
414        let mut installed_services: HashSet<String> = HashSet::new();
415
416        for spec in specs {
417            // Skip if we already installed this service (by name)
418            if installed_services.contains(&spec.name) {
419                tracing::debug!(
420                    "Skipping duplicate service '{}' (alias: '{}')",
421                    spec.name,
422                    spec.alias
423                );
424                continue;
425            }
426
427            // 1. 获取服务详情(在更新配置之前,确保我们有完整的 actr_type)
428            let service_details = self
429                .validation_pipeline
430                .service_discovery
431                .get_service_details(&spec.name)
432                .await?;
433
434            // 2. 构建 resolved_spec,确保包含 actr_type
435            let mut resolved_spec = spec.clone();
436            // 如果 spec 中没有 actr_type,使用从服务详情中获取的
437            if resolved_spec.actr_type.is_none() {
438                resolved_spec.actr_type = Some(service_details.info.actr_type.clone());
439            }
440
441            // 3. 更新配置文件(使用包含 actr_type 的 resolved_spec)
442            self.config_manager
443                .update_dependency(&resolved_spec)
444                .await?;
445            result.updated_config = true;
446
447            // 4. 缓存Proto文件
448            self.cache_manager
449                .cache_proto(&spec.name, &service_details.proto_files)
450                .await?;
451
452            result.cache_updates += 1;
453
454            // 5. 记录已安装的依赖
455
456            let resolved_dep = ResolvedDependency {
457                spec: resolved_spec,
458                fingerprint: service_details.info.fingerprint,
459                proto_files: service_details.proto_files,
460            };
461            result.installed_dependencies.push(resolved_dep);
462
463            // Mark this service as installed
464            installed_services.insert(spec.name.clone());
465        }
466
467        // 4. 更新锁文件 (lock file also deduplicates by name)
468        self.update_lock_file(&result.installed_dependencies)
469            .await?;
470        result.updated_lock_file = true;
471
472        Ok(result)
473    }
474
475    /// Update lock file with new format (no embedded proto content)
476    async fn update_lock_file(&self, dependencies: &[ResolvedDependency]) -> Result<()> {
477        let project_root = self.config_manager.get_project_root();
478        let lock_file_path = project_root.join("Actr.lock.toml");
479
480        // Load existing lock file or create new one
481        let mut lock_file = if lock_file_path.exists() {
482            LockFile::from_file(&lock_file_path).unwrap_or_else(|_| LockFile::new())
483        } else {
484            LockFile::new()
485        };
486
487        // Update dependencies
488        for dep in dependencies {
489            let service_name = dep.spec.name.clone();
490
491            // Create protobuf entries with relative path (no content)
492            let protobufs: Vec<ProtoFileMeta> = dep
493                .proto_files
494                .iter()
495                .map(|pf| {
496                    let file_name = if pf.name.ends_with(".proto") {
497                        pf.name.clone()
498                    } else {
499                        format!("{}.proto", pf.name)
500                    };
501                    // Path relative to proto/remote/ (e.g., "service_name/file.proto")
502                    let path = format!("{}/{}", service_name, file_name);
503
504                    ProtoFileMeta {
505                        path,
506                        fingerprint: String::new(), // TODO: compute semantic fingerprint
507                    }
508                })
509                .collect();
510
511            // Create service spec metadata
512            let spec = ServiceSpecMeta {
513                name: dep.spec.name.clone(),
514                description: None,
515                fingerprint: dep.fingerprint.clone(),
516                protobufs,
517                published_at: None,
518                tags: Vec::new(),
519            };
520
521            // Create locked dependency
522            let actr_type = dep.spec.actr_type.clone().ok_or_else(|| {
523                anyhow::anyhow!("Actr type is required for dependency: {}", service_name)
524            })?;
525            let locked_dep = LockedDependency::new(actr_type.to_string_repr(), spec);
526            lock_file.add_dependency(locked_dep);
527        }
528
529        // Update timestamp and save
530        lock_file.update_timestamp();
531        lock_file.save_to_file(&lock_file_path)?;
532
533        tracing::info!("Updated lock file: {} dependencies", dependencies.len());
534        Ok(())
535    }
536}
537
538// ============================================================================
539// 3. 生成管道 (GenerationPipeline)
540// ============================================================================
541
542/// 代码生成管道
543pub struct GenerationPipeline {
544    #[allow(dead_code)]
545    config_manager: Arc<dyn ConfigManager>,
546    proto_processor: Arc<dyn ProtoProcessor>,
547    #[allow(dead_code)]
548    cache_manager: Arc<dyn CacheManager>,
549}
550
551impl GenerationPipeline {
552    pub fn new(
553        config_manager: Arc<dyn ConfigManager>,
554        proto_processor: Arc<dyn ProtoProcessor>,
555        cache_manager: Arc<dyn CacheManager>,
556    ) -> Self {
557        Self {
558            config_manager,
559            proto_processor,
560            cache_manager,
561        }
562    }
563
564    /// 执行代码生成
565    pub async fn generate_code(&self, options: &GenerationOptions) -> Result<GenerationResult> {
566        // 1. 清理输出目录(如果需要)
567        if options.clean_before_generate {
568            self.clean_output_directory(&options.output_path).await?;
569        }
570
571        // 2. 发现本地Proto文件
572        let local_protos = self
573            .proto_processor
574            .discover_proto_files(&options.input_path)
575            .await?;
576
577        // 3. 加载依赖的Proto文件
578        let dependency_protos = self.load_dependency_protos().await?;
579
580        // 4. 验证Proto语法
581        let all_protos = [local_protos, dependency_protos].concat();
582        let validation = self
583            .proto_processor
584            .validate_proto_syntax(&all_protos)
585            .await?;
586
587        if !validation.is_valid {
588            return Err(anyhow::anyhow!("Proto file syntax validation failed"));
589        }
590
591        // 5. 执行代码生成
592        let mut generation_result = self
593            .proto_processor
594            .generate_code(&options.input_path, &options.output_path)
595            .await?;
596
597        // 6. 后处理:格式化和检查
598        if options.format_code {
599            self.format_generated_code(&generation_result.generated_files)
600                .await?;
601        }
602
603        if options.run_checks {
604            let check_result = self
605                .run_code_checks(&generation_result.generated_files)
606                .await?;
607            generation_result.warnings.extend(check_result.warnings);
608            generation_result.errors.extend(check_result.errors);
609        }
610
611        Ok(generation_result)
612    }
613
614    /// 清理输出目录
615    async fn clean_output_directory(&self, output_path: &std::path::Path) -> Result<()> {
616        if output_path.exists() {
617            std::fs::remove_dir_all(output_path)?;
618        }
619        std::fs::create_dir_all(output_path)?;
620        Ok(())
621    }
622
623    /// 加载依赖的Proto文件
624    async fn load_dependency_protos(&self) -> Result<Vec<ProtoFile>> {
625        // TODO: 从缓存中加载依赖的Proto文件
626        Ok(Vec::new())
627    }
628
629    /// 格式化生成的代码
630    async fn format_generated_code(&self, files: &[std::path::PathBuf]) -> Result<()> {
631        for file in files {
632            if file.extension().and_then(|s| s.to_str()) == Some("rs") {
633                // 运行 rustfmt
634                let output = std::process::Command::new("rustfmt").arg(file).output()?;
635
636                if !output.status.success() {
637                    eprintln!(
638                        "rustfmt warning: {}",
639                        String::from_utf8_lossy(&output.stderr)
640                    );
641                }
642            }
643        }
644        Ok(())
645    }
646
647    /// 运行代码检查
648    async fn run_code_checks(&self, files: &[std::path::PathBuf]) -> Result<GenerationResult> {
649        // TODO: 运行 cargo check 或其他代码检查工具
650        Ok(GenerationResult {
651            generated_files: files.to_vec(),
652            warnings: vec![],
653            errors: vec![],
654        })
655    }
656}