1use std::collections::{HashMap, HashSet};
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use tokio::task::JoinSet;
43use tracing::{error, info, warn};
44
45use crate::buildah::BuildahExecutor;
46use crate::builder::{BuiltImage, ImageBuilder};
47use crate::error::{BuildError, Result};
48
49use super::types::ZPipeline;
50
51#[derive(Debug)]
53pub struct PipelineResult {
54 pub succeeded: HashMap<String, BuiltImage>,
56 pub failed: HashMap<String, String>,
58 pub total_time_ms: u64,
60}
61
62impl PipelineResult {
63 pub fn is_success(&self) -> bool {
65 self.failed.is_empty()
66 }
67
68 pub fn total_images(&self) -> usize {
70 self.succeeded.len() + self.failed.len()
71 }
72}
73
74pub struct PipelineExecutor {
79 pipeline: ZPipeline,
81 base_dir: PathBuf,
83 executor: BuildahExecutor,
85 fail_fast: bool,
87 push_enabled: bool,
89}
90
91impl PipelineExecutor {
92 pub fn new(pipeline: ZPipeline, base_dir: PathBuf, executor: BuildahExecutor) -> Self {
100 let push_enabled = pipeline.push.after_all;
102
103 Self {
104 pipeline,
105 base_dir,
106 executor,
107 fail_fast: true,
108 push_enabled,
109 }
110 }
111
112 pub fn fail_fast(mut self, fail_fast: bool) -> Self {
118 self.fail_fast = fail_fast;
119 self
120 }
121
122 pub fn push(mut self, enabled: bool) -> Self {
127 self.push_enabled = enabled;
128 self
129 }
130
131 fn resolve_execution_order(&self) -> Result<Vec<Vec<String>>> {
143 let mut waves: Vec<Vec<String>> = Vec::new();
144 let mut assigned: HashSet<String> = HashSet::new();
145 let mut remaining: HashSet<String> = self.pipeline.images.keys().cloned().collect();
146
147 for (name, image) in &self.pipeline.images {
149 for dep in &image.depends_on {
150 if !self.pipeline.images.contains_key(dep) {
151 return Err(BuildError::invalid_instruction(
152 "pipeline",
153 format!("Image '{}' depends on unknown image '{}'", name, dep),
154 ));
155 }
156 }
157 }
158
159 while !remaining.is_empty() {
161 let mut wave: Vec<String> = Vec::new();
162
163 for name in &remaining {
164 let image = &self.pipeline.images[name];
165 let deps_satisfied = image.depends_on.iter().all(|d| assigned.contains(d));
167 if deps_satisfied {
168 wave.push(name.clone());
169 }
170 }
171
172 if wave.is_empty() {
173 return Err(BuildError::CircularDependency {
175 stages: remaining.into_iter().collect(),
176 });
177 }
178
179 for name in &wave {
181 remaining.remove(name);
182 assigned.insert(name.clone());
183 }
184
185 waves.push(wave);
186 }
187
188 Ok(waves)
189 }
190
191 pub async fn run(&self) -> Result<PipelineResult> {
206 let start = std::time::Instant::now();
207 let waves = self.resolve_execution_order()?;
208
209 let mut succeeded: HashMap<String, BuiltImage> = HashMap::new();
210 let mut failed: HashMap<String, String> = HashMap::new();
211
212 info!(
213 "Building {} images in {} waves",
214 self.pipeline.images.len(),
215 waves.len()
216 );
217
218 for (wave_idx, wave) in waves.iter().enumerate() {
219 info!("Wave {}: {:?}", wave_idx, wave);
220
221 if self.fail_fast && !failed.is_empty() {
223 warn!("Aborting pipeline due to previous failures (fail_fast enabled)");
224 break;
225 }
226
227 let wave_results = self.build_wave(wave).await;
229
230 for (name, result) in wave_results {
232 match result {
233 Ok(image) => {
234 info!("[{}] Build succeeded: {}", name, image.image_id);
235 succeeded.insert(name, image);
236 }
237 Err(e) => {
238 error!("[{}] Build failed: {}", name, e);
239 failed.insert(name.clone(), e.to_string());
240
241 if self.fail_fast {
242 return Err(e);
244 }
245 }
246 }
247 }
248 }
249
250 if self.push_enabled && failed.is_empty() {
252 info!("Pushing {} images", succeeded.len());
253
254 for (name, image) in &succeeded {
255 for tag in &image.tags {
256 if let Err(e) = self.push_image(tag).await {
257 warn!("[{}] Failed to push {}: {}", name, tag, e);
258 } else {
261 info!("[{}] Pushed: {}", name, tag);
262 }
263 }
264 }
265 }
266
267 Ok(PipelineResult {
268 succeeded,
269 failed,
270 total_time_ms: start.elapsed().as_millis() as u64,
271 })
272 }
273
274 async fn build_wave(&self, wave: &[String]) -> Vec<(String, Result<BuiltImage>)> {
278 let pipeline = Arc::new(self.pipeline.clone());
280 let base_dir = Arc::new(self.base_dir.clone());
281 let executor = self.executor.clone();
282
283 let mut set = JoinSet::new();
284
285 for name in wave {
286 let name = name.clone();
287 let pipeline = Arc::clone(&pipeline);
288 let base_dir = Arc::clone(&base_dir);
289 let executor = executor.clone();
290
291 set.spawn(async move {
292 let result = build_single_image(&name, &pipeline, &base_dir, executor).await;
293 (name, result)
294 });
295 }
296
297 let mut results = Vec::new();
299 while let Some(join_result) = set.join_next().await {
300 match join_result {
301 Ok((name, result)) => {
302 results.push((name, result));
303 }
304 Err(e) => {
305 error!("Build task panicked: {}", e);
307 results.push((
308 "unknown".to_string(),
309 Err(BuildError::invalid_instruction(
310 "pipeline",
311 format!("Build task panicked: {}", e),
312 )),
313 ));
314 }
315 }
316 }
317
318 results
319 }
320
321 async fn push_image(&self, tag: &str) -> Result<()> {
323 use crate::buildah::BuildahCommand;
324
325 let cmd = BuildahCommand::push(tag);
326 self.executor.execute_checked(&cmd).await?;
327 Ok(())
328 }
329}
330
331async fn build_single_image(
336 name: &str,
337 pipeline: &ZPipeline,
338 base_dir: &Path,
339 executor: BuildahExecutor,
340) -> Result<BuiltImage> {
341 let image_config = &pipeline.images[name];
342 let context = base_dir.join(&image_config.context);
343 let file_path = base_dir.join(&image_config.file);
344
345 let mut builder = ImageBuilder::with_executor(&context, executor)?;
346
347 let file_name = file_path
349 .file_name()
350 .map(|n| n.to_string_lossy().to_string())
351 .unwrap_or_default();
352 let extension = file_path
353 .extension()
354 .map(|e| e.to_string_lossy().to_string())
355 .unwrap_or_default();
356
357 if extension == "yaml" || extension == "yml" || file_name.starts_with("ZImagefile") {
358 builder = builder.zimagefile(&file_path);
359 } else {
360 builder = builder.dockerfile(&file_path);
361 }
362
363 for tag in &image_config.tags {
365 let expanded = expand_tag_with_vars(tag, &pipeline.vars);
366 builder = builder.tag(expanded);
367 }
368
369 let mut args = pipeline.defaults.build_args.clone();
371 args.extend(image_config.build_args.clone());
372 builder = builder.build_args(args);
373
374 if let Some(fmt) = image_config
376 .format
377 .as_ref()
378 .or(pipeline.defaults.format.as_ref())
379 {
380 builder = builder.format(fmt);
381 }
382
383 if image_config.no_cache.unwrap_or(pipeline.defaults.no_cache) {
385 builder = builder.no_cache();
386 }
387
388 let mut cache_mounts = pipeline.defaults.cache_mounts.clone();
390 cache_mounts.extend(image_config.cache_mounts.clone());
391 if !cache_mounts.is_empty() {
392 let run_mounts: Vec<_> = cache_mounts
393 .iter()
394 .map(crate::zimage::convert_cache_mount)
395 .collect();
396 builder = builder.default_cache_mounts(run_mounts);
397 }
398
399 let retries = image_config
401 .retries
402 .or(pipeline.defaults.retries)
403 .unwrap_or(0);
404 if retries > 0 {
405 builder = builder.retries(retries);
406 }
407
408 builder.build().await
409}
410
411fn expand_tag_with_vars(tag: &str, vars: &HashMap<String, String>) -> String {
415 let mut result = tag.to_string();
416 for (key, value) in vars {
417 result = result.replace(&format!("${{{}}}", key), value);
418 }
419 result
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::pipeline::parse_pipeline;
426
427 #[test]
428 fn test_resolve_execution_order_simple() {
429 let yaml = r#"
430images:
431 app:
432 file: Dockerfile
433"#;
434 let pipeline = parse_pipeline(yaml).unwrap();
435 let executor = PipelineExecutor::new(
436 pipeline,
437 PathBuf::from("/tmp"),
438 BuildahExecutor::with_path("/usr/bin/buildah"),
439 );
440
441 let waves = executor.resolve_execution_order().unwrap();
442 assert_eq!(waves.len(), 1);
443 assert_eq!(waves[0], vec!["app"]);
444 }
445
446 #[test]
447 fn test_resolve_execution_order_with_deps() {
448 let yaml = r#"
449images:
450 base:
451 file: Dockerfile.base
452 app:
453 file: Dockerfile.app
454 depends_on: [base]
455 test:
456 file: Dockerfile.test
457 depends_on: [app]
458"#;
459 let pipeline = parse_pipeline(yaml).unwrap();
460 let executor = PipelineExecutor::new(
461 pipeline,
462 PathBuf::from("/tmp"),
463 BuildahExecutor::with_path("/usr/bin/buildah"),
464 );
465
466 let waves = executor.resolve_execution_order().unwrap();
467 assert_eq!(waves.len(), 3);
468 assert_eq!(waves[0], vec!["base"]);
469 assert_eq!(waves[1], vec!["app"]);
470 assert_eq!(waves[2], vec!["test"]);
471 }
472
473 #[test]
474 fn test_resolve_execution_order_parallel() {
475 let yaml = r#"
476images:
477 base:
478 file: Dockerfile.base
479 app1:
480 file: Dockerfile.app1
481 depends_on: [base]
482 app2:
483 file: Dockerfile.app2
484 depends_on: [base]
485"#;
486 let pipeline = parse_pipeline(yaml).unwrap();
487 let executor = PipelineExecutor::new(
488 pipeline,
489 PathBuf::from("/tmp"),
490 BuildahExecutor::with_path("/usr/bin/buildah"),
491 );
492
493 let waves = executor.resolve_execution_order().unwrap();
494 assert_eq!(waves.len(), 2);
495 assert_eq!(waves[0], vec!["base"]);
496 assert_eq!(waves[1].len(), 2);
498 assert!(waves[1].contains(&"app1".to_string()));
499 assert!(waves[1].contains(&"app2".to_string()));
500 }
501
502 #[test]
503 fn test_resolve_execution_order_missing_dep() {
504 let yaml = r#"
505images:
506 app:
507 file: Dockerfile
508 depends_on: [missing]
509"#;
510 let pipeline = parse_pipeline(yaml).unwrap();
511 let executor = PipelineExecutor::new(
512 pipeline,
513 PathBuf::from("/tmp"),
514 BuildahExecutor::with_path("/usr/bin/buildah"),
515 );
516
517 let result = executor.resolve_execution_order();
518 assert!(result.is_err());
519 assert!(result.unwrap_err().to_string().contains("missing"));
520 }
521
522 #[test]
523 fn test_resolve_execution_order_circular() {
524 let yaml = r#"
525images:
526 a:
527 file: Dockerfile.a
528 depends_on: [b]
529 b:
530 file: Dockerfile.b
531 depends_on: [a]
532"#;
533 let pipeline = parse_pipeline(yaml).unwrap();
534 let executor = PipelineExecutor::new(
535 pipeline,
536 PathBuf::from("/tmp"),
537 BuildahExecutor::with_path("/usr/bin/buildah"),
538 );
539
540 let result = executor.resolve_execution_order();
541 assert!(result.is_err());
542 match result.unwrap_err() {
543 BuildError::CircularDependency { stages } => {
544 assert!(stages.contains(&"a".to_string()));
545 assert!(stages.contains(&"b".to_string()));
546 }
547 e => panic!("Expected CircularDependency error, got: {:?}", e),
548 }
549 }
550
551 #[test]
552 fn test_expand_tag() {
553 let mut vars = HashMap::new();
554 vars.insert("VERSION".to_string(), "1.0.0".to_string());
555 vars.insert("REGISTRY".to_string(), "ghcr.io/myorg".to_string());
556
557 let tag = "${REGISTRY}/app:${VERSION}";
558 let expanded = expand_tag_with_vars(tag, &vars);
559 assert_eq!(expanded, "ghcr.io/myorg/app:1.0.0");
560 }
561
562 #[test]
563 fn test_expand_tag_partial() {
564 let mut vars = HashMap::new();
565 vars.insert("VERSION".to_string(), "1.0.0".to_string());
566
567 let tag = "myapp:${VERSION}-${UNKNOWN}";
569 let expanded = expand_tag_with_vars(tag, &vars);
570 assert_eq!(expanded, "myapp:1.0.0-${UNKNOWN}");
571 }
572
573 #[test]
574 fn test_pipeline_result_is_success() {
575 let mut result = PipelineResult {
576 succeeded: HashMap::new(),
577 failed: HashMap::new(),
578 total_time_ms: 100,
579 };
580
581 assert!(result.is_success());
582
583 result.failed.insert("app".to_string(), "error".to_string());
584 assert!(!result.is_success());
585 }
586
587 #[test]
588 fn test_pipeline_result_total_images() {
589 let mut result = PipelineResult {
590 succeeded: HashMap::new(),
591 failed: HashMap::new(),
592 total_time_ms: 100,
593 };
594
595 result.succeeded.insert(
596 "app1".to_string(),
597 BuiltImage {
598 image_id: "sha256:abc".to_string(),
599 tags: vec!["app1:latest".to_string()],
600 layer_count: 5,
601 size: 0,
602 build_time_ms: 50,
603 },
604 );
605 result
606 .failed
607 .insert("app2".to_string(), "error".to_string());
608
609 assert_eq!(result.total_images(), 2);
610 }
611
612 #[test]
613 fn test_builder_methods() {
614 let yaml = r#"
615images:
616 app:
617 file: Dockerfile
618push:
619 after_all: true
620"#;
621 let pipeline = parse_pipeline(yaml).unwrap();
622 let executor = PipelineExecutor::new(
623 pipeline,
624 PathBuf::from("/tmp"),
625 BuildahExecutor::with_path("/usr/bin/buildah"),
626 )
627 .fail_fast(false)
628 .push(false);
629
630 assert!(!executor.fail_fast);
631 assert!(!executor.push_enabled);
632 }
633}