1use actr_config::{LockFile, LockedDependency, ProtoFileMeta, ServiceSpecMeta};
6use actr_protocol::ActrTypeExt;
7use anyhow::Result;
8use std::sync::Arc;
9
10use super::components::*;
11
12#[derive(Debug, Clone)]
18pub struct InstallResult {
19 pub installed_dependencies: Vec<ResolvedDependency>,
20 pub updated_config: bool,
21 pub updated_lock_file: bool,
22 pub cache_updates: usize,
23 pub warnings: Vec<String>,
24}
25
26impl InstallResult {
27 pub fn success() -> Self {
28 Self {
29 installed_dependencies: Vec::new(),
30 updated_config: false,
31 updated_lock_file: false,
32 cache_updates: 0,
33 warnings: Vec::new(),
34 }
35 }
36
37 pub fn summary(&self) -> String {
38 format!(
39 "Installed {} dependencies, updated {} cache entries",
40 self.installed_dependencies.len(),
41 self.cache_updates
42 )
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct InstallPlan {
49 pub dependencies_to_install: Vec<DependencySpec>,
50 pub resolved_dependencies: Vec<ResolvedDependency>,
51 pub estimated_cache_size: u64,
52 pub required_permissions: Vec<String>,
53}
54
55#[derive(Debug, Clone)]
57pub struct GenerationOptions {
58 pub input_path: std::path::PathBuf,
59 pub output_path: std::path::PathBuf,
60 pub clean_before_generate: bool,
61 pub generate_scaffold: bool,
62 pub format_code: bool,
63 pub run_checks: bool,
64}
65
66#[derive(Clone)]
72pub struct ValidationPipeline {
73 config_manager: Arc<dyn ConfigManager>,
74 dependency_resolver: Arc<dyn DependencyResolver>,
75 service_discovery: Arc<dyn ServiceDiscovery>,
76 network_validator: Arc<dyn NetworkValidator>,
77 fingerprint_validator: Arc<dyn FingerprintValidator>,
78}
79
80impl ValidationPipeline {
81 pub fn new(
82 config_manager: Arc<dyn ConfigManager>,
83 dependency_resolver: Arc<dyn DependencyResolver>,
84 service_discovery: Arc<dyn ServiceDiscovery>,
85 network_validator: Arc<dyn NetworkValidator>,
86 fingerprint_validator: Arc<dyn FingerprintValidator>,
87 ) -> Self {
88 Self {
89 config_manager,
90 dependency_resolver,
91 service_discovery,
92 network_validator,
93 fingerprint_validator,
94 }
95 }
96
97 pub fn service_discovery(&self) -> &Arc<dyn ServiceDiscovery> {
99 &self.service_discovery
100 }
101
102 pub fn network_validator(&self) -> &Arc<dyn NetworkValidator> {
104 &self.network_validator
105 }
106
107 pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
109 &self.config_manager
110 }
111
112 pub async fn validate_project(&self) -> Result<ValidationReport> {
114 let config_validation = self.config_manager.validate_config().await?;
116
117 if !config_validation.is_valid {
119 return Ok(ValidationReport {
120 is_valid: false,
121 config_validation,
122 dependency_validation: vec![],
123 network_validation: vec![],
124 fingerprint_validation: vec![],
125 conflicts: vec![],
126 });
127 }
128
129 let config = self
131 .config_manager
132 .load_config(
133 self.config_manager
134 .get_project_root()
135 .join("Actr.toml")
136 .as_path(),
137 )
138 .await?;
139 let dependency_specs = self.dependency_resolver.resolve_spec(&config).await?;
140
141 let mut service_details = Vec::new();
142 for spec in &dependency_specs {
143 match self.service_discovery.get_service_details(&spec.name).await {
144 Ok(details) => service_details.push(details),
145 Err(_) => {
146 }
148 }
149 }
150
151 let resolved_dependencies = self
152 .dependency_resolver
153 .resolve_dependencies(&dependency_specs, &service_details)
154 .await?;
155
156 let conflicts = self
158 .dependency_resolver
159 .check_conflicts(&resolved_dependencies)
160 .await?;
161
162 let dependency_validation = self.validate_dependencies(&dependency_specs).await?;
163 let network_validation = self
164 .validate_network_connectivity(&resolved_dependencies, &NetworkCheckOptions::default())
165 .await?;
166 let fingerprint_validation = self.validate_fingerprints(&resolved_dependencies).await?;
167
168 let is_valid = config_validation.is_valid
169 && dependency_validation.iter().all(|d| d.is_available)
170 && network_validation
171 .iter()
172 .all(|n| !n.is_applicable || n.is_reachable)
173 && fingerprint_validation.iter().all(|f| f.is_valid)
174 && conflicts.is_empty();
175
176 Ok(ValidationReport {
177 is_valid,
178 config_validation,
179 dependency_validation,
180 network_validation,
181 fingerprint_validation,
182 conflicts,
183 })
184 }
185
186 pub async fn validate_dependencies(
189 &self,
190 specs: &[DependencySpec],
191 ) -> Result<Vec<DependencyValidation>> {
192 use std::collections::HashMap;
193
194 let mut results = Vec::new();
195 let mut validation_cache: HashMap<String, (bool, Option<String>)> = HashMap::new();
197
198 for spec in specs {
199 let (is_available, error) = if let Some(cached) = validation_cache.get(&spec.name) {
201 cached.clone()
202 } else {
203 let (available, err) = match self
205 .service_discovery
206 .check_service_availability(&spec.name)
207 .await
208 {
209 Ok(status) => {
210 if status.is_available {
211 (true, None)
212 } else {
213 (
215 false,
216 Some(format!("Service '{}' not found in registry", spec.name)),
217 )
218 }
219 }
220 Err(e) => (false, Some(e.to_string())),
221 };
222
223 validation_cache.insert(spec.name.clone(), (available, err.clone()));
225 (available, err)
226 };
227
228 results.push(DependencyValidation {
229 dependency: spec.alias.clone(),
230 is_available,
231 error,
232 });
233 }
234
235 Ok(results)
236 }
237
238 pub async fn validate_network_connectivity(
240 &self,
241 deps: &[ResolvedDependency],
242 options: &NetworkCheckOptions,
243 ) -> Result<Vec<NetworkValidation>> {
244 let names = deps.iter().map(|d| d.spec.name.clone()).collect::<Vec<_>>();
245 let network_results = self.network_validator.batch_check(&names, options).await?;
246
247 Ok(network_results
248 .into_iter()
249 .map(|result| {
250 let mut is_applicable = true;
251 let mut error = result.connectivity.error;
252 let mut health = result.health;
253 let mut latency_ms = result.connectivity.response_time_ms;
254
255 if let Some(ref message) = error
256 && message.starts_with("Address resolution failed: Invalid address format")
257 {
258 is_applicable = false;
259 error =
260 Some("Network check skipped: no endpoint address available".to_string());
261 health = HealthStatus::Unknown;
262 latency_ms = None;
263 }
264
265 NetworkValidation {
266 is_reachable: result.connectivity.is_reachable,
267 health,
268 latency_ms,
269 error,
270 is_applicable,
271 }
272 })
273 .collect())
274 }
275
276 pub async fn validate_fingerprints(
278 &self,
279 deps: &[ResolvedDependency],
280 ) -> Result<Vec<FingerprintValidation>> {
281 let mut results = Vec::new();
282
283 for dep in deps {
284 let expected_val = dep.spec.fingerprint.clone().unwrap_or_default();
285 let expected = Fingerprint {
286 algorithm: "sha256".to_string(),
287 value: expected_val,
288 };
289
290 let actual_fp = if dep.fingerprint.is_empty() {
292 match self
293 .service_discovery
294 .get_service_details(&dep.spec.name)
295 .await
296 {
297 Ok(details) => {
298 let computed = self
299 .fingerprint_validator
300 .compute_service_fingerprint(&details.info)
301 .await?;
302 Some(computed)
303 }
304 Err(e) => {
305 results.push(FingerprintValidation {
306 dependency: dep.spec.alias.clone(),
307 expected,
308 actual: None,
309 is_valid: false,
310 error: Some(e.to_string()),
311 });
312 continue;
313 }
314 }
315 } else {
316 None
318 };
319
320 let is_valid = if expected.value.is_empty() {
321 true
322 } else if let Some(ref computed) = actual_fp {
323 self.fingerprint_validator
324 .verify_fingerprint(&expected, computed)
325 .await
326 .unwrap_or(false)
327 } else {
328 true
330 };
331
332 results.push(FingerprintValidation {
333 dependency: dep.spec.alias.clone(),
334 expected,
335 actual: actual_fp,
336 is_valid,
337 error: None,
338 });
339 }
340
341 Ok(results)
342 }
343}
344
345pub struct InstallPipeline {
351 validation_pipeline: ValidationPipeline,
352 config_manager: Arc<dyn ConfigManager>,
353 cache_manager: Arc<dyn CacheManager>,
354 #[allow(dead_code)]
355 proto_processor: Arc<dyn ProtoProcessor>,
356}
357
358impl InstallPipeline {
359 pub fn new(
360 validation_pipeline: ValidationPipeline,
361 config_manager: Arc<dyn ConfigManager>,
362 cache_manager: Arc<dyn CacheManager>,
363 proto_processor: Arc<dyn ProtoProcessor>,
364 ) -> Self {
365 Self {
366 validation_pipeline,
367 config_manager,
368 cache_manager,
369 proto_processor,
370 }
371 }
372
373 pub fn validation_pipeline(&self) -> &ValidationPipeline {
375 &self.validation_pipeline
376 }
377
378 pub fn config_manager(&self) -> &Arc<dyn ConfigManager> {
380 &self.config_manager
381 }
382
383 pub async fn install_dependencies(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
385 let validation_report = self
387 .validation_pipeline
388 .validate_dependencies(specs)
389 .await?;
390
391 let failed_validations: Vec<_> = validation_report
393 .iter()
394 .filter(|v| !v.is_available)
395 .collect();
396
397 if !failed_validations.is_empty() {
398 return Err(anyhow::anyhow!(
399 "依赖验证失败: {}",
400 failed_validations
401 .iter()
402 .map(|v| format!(
403 "{}: {}",
404 v.dependency,
405 v.error.as_deref().unwrap_or("unknown error")
406 ))
407 .collect::<Vec<_>>()
408 .join(", ")
409 ));
410 }
411
412 let backup = self.config_manager.backup_config().await?;
414
415 match self.execute_atomic_install(specs).await {
416 Ok(result) => {
417 self.config_manager.remove_backup(backup).await?;
419 Ok(result)
420 }
421 Err(e) => {
422 self.config_manager.restore_backup(backup).await?;
424 Err(e)
425 }
426 }
427 }
428
429 async fn execute_atomic_install(&self, specs: &[DependencySpec]) -> Result<InstallResult> {
433 use std::collections::HashSet;
434
435 let mut result = InstallResult::success();
436 let mut installed_services: HashSet<String> = HashSet::new();
437
438 for spec in specs {
439 if installed_services.contains(&spec.name) {
441 tracing::debug!(
442 "Skipping duplicate service '{}' (alias: '{}')",
443 spec.name,
444 spec.alias
445 );
446 continue;
447 }
448
449 let service_details = self
451 .validation_pipeline
452 .service_discovery
453 .get_service_details(&spec.name)
454 .await?;
455
456 let mut resolved_spec = spec.clone();
458 if resolved_spec.actr_type.is_none() {
460 resolved_spec.actr_type = Some(service_details.info.actr_type.clone());
461 }
462
463 self.config_manager
465 .update_dependency(&resolved_spec)
466 .await?;
467 result.updated_config = true;
468
469 self.cache_manager
471 .cache_proto(&spec.name, &service_details.proto_files)
472 .await?;
473
474 result.cache_updates += 1;
475
476 let resolved_dep = ResolvedDependency {
479 spec: resolved_spec,
480 fingerprint: service_details.info.fingerprint,
481 proto_files: service_details.proto_files,
482 };
483 result.installed_dependencies.push(resolved_dep);
484
485 installed_services.insert(spec.name.clone());
487 }
488
489 self.update_lock_file(&result.installed_dependencies)
491 .await?;
492 result.updated_lock_file = true;
493
494 Ok(result)
495 }
496
497 async fn update_lock_file(&self, dependencies: &[ResolvedDependency]) -> Result<()> {
499 let project_root = self.config_manager.get_project_root();
500 let lock_file_path = project_root.join("Actr.lock.toml");
501
502 let mut lock_file = if lock_file_path.exists() {
504 LockFile::from_file(&lock_file_path).unwrap_or_else(|_| LockFile::new())
505 } else {
506 LockFile::new()
507 };
508
509 for dep in dependencies {
511 let service_name = dep.spec.name.clone();
512
513 let protobufs: Vec<ProtoFileMeta> = dep
515 .proto_files
516 .iter()
517 .map(|pf| {
518 let file_name = if pf.name.ends_with(".proto") {
519 pf.name.clone()
520 } else {
521 format!("{}.proto", pf.name)
522 };
523 let path = format!("{}/{}", service_name, file_name);
525
526 ProtoFileMeta {
527 path,
528 fingerprint: String::new(), }
530 })
531 .collect();
532
533 let spec = ServiceSpecMeta {
535 name: dep.spec.name.clone(),
536 description: None,
537 fingerprint: dep.fingerprint.clone(),
538 protobufs,
539 published_at: None,
540 tags: Vec::new(),
541 };
542
543 let actr_type = dep.spec.actr_type.clone().ok_or_else(|| {
545 anyhow::anyhow!("Actr type is required for dependency: {}", service_name)
546 })?;
547 let locked_dep = LockedDependency::new(actr_type.to_string_repr(), spec);
548 lock_file.add_dependency(locked_dep);
549 }
550
551 lock_file.update_timestamp();
553 lock_file.save_to_file(&lock_file_path)?;
554
555 tracing::info!("Updated lock file: {} dependencies", dependencies.len());
556 Ok(())
557 }
558}
559
560pub struct GenerationPipeline {
566 #[allow(dead_code)]
567 config_manager: Arc<dyn ConfigManager>,
568 proto_processor: Arc<dyn ProtoProcessor>,
569 #[allow(dead_code)]
570 cache_manager: Arc<dyn CacheManager>,
571}
572
573impl GenerationPipeline {
574 pub fn new(
575 config_manager: Arc<dyn ConfigManager>,
576 proto_processor: Arc<dyn ProtoProcessor>,
577 cache_manager: Arc<dyn CacheManager>,
578 ) -> Self {
579 Self {
580 config_manager,
581 proto_processor,
582 cache_manager,
583 }
584 }
585
586 pub async fn generate_code(&self, options: &GenerationOptions) -> Result<GenerationResult> {
588 if options.clean_before_generate {
590 self.clean_output_directory(&options.output_path).await?;
591 }
592
593 let local_protos = self
595 .proto_processor
596 .discover_proto_files(&options.input_path)
597 .await?;
598
599 let dependency_protos = self.load_dependency_protos().await?;
601
602 let all_protos = [local_protos, dependency_protos].concat();
604 let validation = self
605 .proto_processor
606 .validate_proto_syntax(&all_protos)
607 .await?;
608
609 if !validation.is_valid {
610 return Err(anyhow::anyhow!("Proto file syntax validation failed"));
611 }
612
613 let mut generation_result = self
615 .proto_processor
616 .generate_code(&options.input_path, &options.output_path)
617 .await?;
618
619 if options.format_code {
621 self.format_generated_code(&generation_result.generated_files)
622 .await?;
623 }
624
625 if options.run_checks {
626 let check_result = self
627 .run_code_checks(&generation_result.generated_files)
628 .await?;
629 generation_result.warnings.extend(check_result.warnings);
630 generation_result.errors.extend(check_result.errors);
631 }
632
633 Ok(generation_result)
634 }
635
636 async fn clean_output_directory(&self, output_path: &std::path::Path) -> Result<()> {
638 if output_path.exists() {
639 std::fs::remove_dir_all(output_path)?;
640 }
641 std::fs::create_dir_all(output_path)?;
642 Ok(())
643 }
644
645 async fn load_dependency_protos(&self) -> Result<Vec<ProtoFile>> {
647 Ok(Vec::new())
649 }
650
651 async fn format_generated_code(&self, files: &[std::path::PathBuf]) -> Result<()> {
653 for file in files {
654 if file.extension().and_then(|s| s.to_str()) == Some("rs") {
655 let output = std::process::Command::new("rustfmt").arg(file).output()?;
657
658 if !output.status.success() {
659 eprintln!(
660 "rustfmt warning: {}",
661 String::from_utf8_lossy(&output.stderr)
662 );
663 }
664 }
665 }
666 Ok(())
667 }
668
669 async fn run_code_checks(&self, files: &[std::path::PathBuf]) -> Result<GenerationResult> {
671 Ok(GenerationResult {
673 generated_files: files.to_vec(),
674 warnings: vec![],
675 errors: vec![],
676 })
677 }
678}