1use std::collections::{HashMap, HashSet};
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use tokio::task::JoinSet;
43use tracing::{error, info, warn};
44
45use crate::buildah::BuildahExecutor;
46use crate::builder::{BuiltImage, ImageBuilder};
47use crate::error::{BuildError, Result};
48
49use super::types::ZPipeline;
50
51#[derive(Debug)]
53pub struct PipelineResult {
54 pub succeeded: HashMap<String, BuiltImage>,
56 pub failed: HashMap<String, String>,
58 pub total_time_ms: u64,
60}
61
62impl PipelineResult {
63 #[must_use]
65 pub fn is_success(&self) -> bool {
66 self.failed.is_empty()
67 }
68
69 #[must_use]
71 pub fn total_images(&self) -> usize {
72 self.succeeded.len() + self.failed.len()
73 }
74}
75
76pub struct PipelineExecutor {
81 pipeline: ZPipeline,
83 base_dir: PathBuf,
85 executor: BuildahExecutor,
87 fail_fast: bool,
89 push_enabled: bool,
91}
92
93impl PipelineExecutor {
94 #[must_use]
102 pub fn new(pipeline: ZPipeline, base_dir: PathBuf, executor: BuildahExecutor) -> Self {
103 let push_enabled = pipeline.push.after_all;
105
106 Self {
107 pipeline,
108 base_dir,
109 executor,
110 fail_fast: true,
111 push_enabled,
112 }
113 }
114
115 #[must_use]
121 pub fn fail_fast(mut self, fail_fast: bool) -> Self {
122 self.fail_fast = fail_fast;
123 self
124 }
125
126 #[must_use]
131 pub fn push(mut self, enabled: bool) -> Self {
132 self.push_enabled = enabled;
133 self
134 }
135
136 fn resolve_execution_order(&self) -> Result<Vec<Vec<String>>> {
148 let mut waves: Vec<Vec<String>> = Vec::new();
149 let mut assigned: HashSet<String> = HashSet::new();
150 let mut remaining: HashSet<String> = self.pipeline.images.keys().cloned().collect();
151
152 for (name, image) in &self.pipeline.images {
154 for dep in &image.depends_on {
155 if !self.pipeline.images.contains_key(dep) {
156 return Err(BuildError::invalid_instruction(
157 "pipeline",
158 format!("Image '{name}' depends on unknown image '{dep}'"),
159 ));
160 }
161 }
162 }
163
164 while !remaining.is_empty() {
166 let mut wave: Vec<String> = Vec::new();
167
168 for name in &remaining {
169 let image = &self.pipeline.images[name];
170 let deps_satisfied = image.depends_on.iter().all(|d| assigned.contains(d));
172 if deps_satisfied {
173 wave.push(name.clone());
174 }
175 }
176
177 if wave.is_empty() {
178 return Err(BuildError::CircularDependency {
180 stages: remaining.into_iter().collect(),
181 });
182 }
183
184 for name in &wave {
186 remaining.remove(name);
187 assigned.insert(name.clone());
188 }
189
190 waves.push(wave);
191 }
192
193 Ok(waves)
194 }
195
196 pub async fn run(&self) -> Result<PipelineResult> {
211 let start = std::time::Instant::now();
212 let waves = self.resolve_execution_order()?;
213
214 let mut succeeded: HashMap<String, BuiltImage> = HashMap::new();
215 let mut failed: HashMap<String, String> = HashMap::new();
216
217 info!(
218 "Building {} images in {} waves",
219 self.pipeline.images.len(),
220 waves.len()
221 );
222
223 for (wave_idx, wave) in waves.iter().enumerate() {
224 info!("Wave {}: {:?}", wave_idx, wave);
225
226 if self.fail_fast && !failed.is_empty() {
228 warn!("Aborting pipeline due to previous failures (fail_fast enabled)");
229 break;
230 }
231
232 let wave_results = self.build_wave(wave).await;
234
235 for (name, result) in wave_results {
237 match result {
238 Ok(image) => {
239 info!("[{}] Build succeeded: {}", name, image.image_id);
240 succeeded.insert(name, image);
241 }
242 Err(e) => {
243 error!("[{}] Build failed: {}", name, e);
244 failed.insert(name.clone(), e.to_string());
245
246 if self.fail_fast {
247 return Err(e);
249 }
250 }
251 }
252 }
253 }
254
255 if self.push_enabled && failed.is_empty() {
257 info!("Pushing {} images", succeeded.len());
258
259 for (name, image) in &succeeded {
260 for tag in &image.tags {
261 if let Err(e) = self.push_image(tag).await {
262 warn!("[{}] Failed to push {}: {}", name, tag, e);
263 } else {
266 info!("[{}] Pushed: {}", name, tag);
267 }
268 }
269 }
270 }
271
272 #[allow(clippy::cast_possible_truncation)]
273 let total_time_ms = start.elapsed().as_millis() as u64;
274
275 Ok(PipelineResult {
276 succeeded,
277 failed,
278 total_time_ms,
279 })
280 }
281
282 async fn build_wave(&self, wave: &[String]) -> Vec<(String, Result<BuiltImage>)> {
286 let pipeline = Arc::new(self.pipeline.clone());
288 let base_dir = Arc::new(self.base_dir.clone());
289 let executor = self.executor.clone();
290
291 let mut set = JoinSet::new();
292
293 for name in wave {
294 let name = name.clone();
295 let pipeline = Arc::clone(&pipeline);
296 let base_dir = Arc::clone(&base_dir);
297 let executor = executor.clone();
298
299 set.spawn(async move {
300 let result = build_single_image(&name, &pipeline, &base_dir, executor).await;
301 (name, result)
302 });
303 }
304
305 let mut results = Vec::new();
307 while let Some(join_result) = set.join_next().await {
308 match join_result {
309 Ok((name, result)) => {
310 results.push((name, result));
311 }
312 Err(e) => {
313 error!("Build task panicked: {}", e);
315 results.push((
316 "unknown".to_string(),
317 Err(BuildError::invalid_instruction(
318 "pipeline",
319 format!("Build task panicked: {e}"),
320 )),
321 ));
322 }
323 }
324 }
325
326 results
327 }
328
329 async fn push_image(&self, tag: &str) -> Result<()> {
331 use crate::buildah::BuildahCommand;
332
333 let cmd = BuildahCommand::push(tag);
334 self.executor.execute_checked(&cmd).await?;
335 Ok(())
336 }
337}
338
339async fn build_single_image(
344 name: &str,
345 pipeline: &ZPipeline,
346 base_dir: &Path,
347 executor: BuildahExecutor,
348) -> Result<BuiltImage> {
349 let image_config = &pipeline.images[name];
350 let context = base_dir.join(&image_config.context);
351 let file_path = base_dir.join(&image_config.file);
352
353 let mut builder = ImageBuilder::with_executor(&context, executor)?;
354
355 let file_name = file_path
357 .file_name()
358 .map(|n| n.to_string_lossy().to_string())
359 .unwrap_or_default();
360 let extension = file_path
361 .extension()
362 .map(|e| e.to_string_lossy().to_string())
363 .unwrap_or_default();
364
365 if extension == "yaml" || extension == "yml" || file_name.starts_with("ZImagefile") {
366 builder = builder.zimagefile(&file_path);
367 } else {
368 builder = builder.dockerfile(&file_path);
369 }
370
371 for tag in &image_config.tags {
373 let expanded = expand_tag_with_vars(tag, &pipeline.vars);
374 builder = builder.tag(expanded);
375 }
376
377 let mut args = pipeline.defaults.build_args.clone();
379 args.extend(image_config.build_args.clone());
380 builder = builder.build_args(args);
381
382 if let Some(fmt) = image_config
384 .format
385 .as_ref()
386 .or(pipeline.defaults.format.as_ref())
387 {
388 builder = builder.format(fmt);
389 }
390
391 if image_config.no_cache.unwrap_or(pipeline.defaults.no_cache) {
393 builder = builder.no_cache();
394 }
395
396 let mut cache_mounts = pipeline.defaults.cache_mounts.clone();
398 cache_mounts.extend(image_config.cache_mounts.clone());
399 if !cache_mounts.is_empty() {
400 let run_mounts: Vec<_> = cache_mounts
401 .iter()
402 .map(crate::zimage::convert_cache_mount)
403 .collect();
404 builder = builder.default_cache_mounts(run_mounts);
405 }
406
407 let retries = image_config
409 .retries
410 .or(pipeline.defaults.retries)
411 .unwrap_or(0);
412 if retries > 0 {
413 builder = builder.retries(retries);
414 }
415
416 builder.build().await
417}
418
419fn expand_tag_with_vars(tag: &str, vars: &HashMap<String, String>) -> String {
423 let mut result = tag.to_string();
424 for (key, value) in vars {
425 result = result.replace(&format!("${{{key}}}"), value);
426 }
427 result
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::pipeline::parse_pipeline;
434
435 #[test]
436 fn test_resolve_execution_order_simple() {
437 let yaml = r#"
438images:
439 app:
440 file: Dockerfile
441"#;
442 let pipeline = parse_pipeline(yaml).unwrap();
443 let executor = PipelineExecutor::new(
444 pipeline,
445 PathBuf::from("/tmp"),
446 BuildahExecutor::with_path("/usr/bin/buildah"),
447 );
448
449 let waves = executor.resolve_execution_order().unwrap();
450 assert_eq!(waves.len(), 1);
451 assert_eq!(waves[0], vec!["app"]);
452 }
453
454 #[test]
455 fn test_resolve_execution_order_with_deps() {
456 let yaml = r#"
457images:
458 base:
459 file: Dockerfile.base
460 app:
461 file: Dockerfile.app
462 depends_on: [base]
463 test:
464 file: Dockerfile.test
465 depends_on: [app]
466"#;
467 let pipeline = parse_pipeline(yaml).unwrap();
468 let executor = PipelineExecutor::new(
469 pipeline,
470 PathBuf::from("/tmp"),
471 BuildahExecutor::with_path("/usr/bin/buildah"),
472 );
473
474 let waves = executor.resolve_execution_order().unwrap();
475 assert_eq!(waves.len(), 3);
476 assert_eq!(waves[0], vec!["base"]);
477 assert_eq!(waves[1], vec!["app"]);
478 assert_eq!(waves[2], vec!["test"]);
479 }
480
481 #[test]
482 fn test_resolve_execution_order_parallel() {
483 let yaml = r#"
484images:
485 base:
486 file: Dockerfile.base
487 app1:
488 file: Dockerfile.app1
489 depends_on: [base]
490 app2:
491 file: Dockerfile.app2
492 depends_on: [base]
493"#;
494 let pipeline = parse_pipeline(yaml).unwrap();
495 let executor = PipelineExecutor::new(
496 pipeline,
497 PathBuf::from("/tmp"),
498 BuildahExecutor::with_path("/usr/bin/buildah"),
499 );
500
501 let waves = executor.resolve_execution_order().unwrap();
502 assert_eq!(waves.len(), 2);
503 assert_eq!(waves[0], vec!["base"]);
504 assert_eq!(waves[1].len(), 2);
506 assert!(waves[1].contains(&"app1".to_string()));
507 assert!(waves[1].contains(&"app2".to_string()));
508 }
509
510 #[test]
511 fn test_resolve_execution_order_missing_dep() {
512 let yaml = r#"
513images:
514 app:
515 file: Dockerfile
516 depends_on: [missing]
517"#;
518 let pipeline = parse_pipeline(yaml).unwrap();
519 let executor = PipelineExecutor::new(
520 pipeline,
521 PathBuf::from("/tmp"),
522 BuildahExecutor::with_path("/usr/bin/buildah"),
523 );
524
525 let result = executor.resolve_execution_order();
526 assert!(result.is_err());
527 assert!(result.unwrap_err().to_string().contains("missing"));
528 }
529
530 #[test]
531 fn test_resolve_execution_order_circular() {
532 let yaml = r#"
533images:
534 a:
535 file: Dockerfile.a
536 depends_on: [b]
537 b:
538 file: Dockerfile.b
539 depends_on: [a]
540"#;
541 let pipeline = parse_pipeline(yaml).unwrap();
542 let executor = PipelineExecutor::new(
543 pipeline,
544 PathBuf::from("/tmp"),
545 BuildahExecutor::with_path("/usr/bin/buildah"),
546 );
547
548 let result = executor.resolve_execution_order();
549 assert!(result.is_err());
550 match result.unwrap_err() {
551 BuildError::CircularDependency { stages } => {
552 assert!(stages.contains(&"a".to_string()));
553 assert!(stages.contains(&"b".to_string()));
554 }
555 e => panic!("Expected CircularDependency error, got: {:?}", e),
556 }
557 }
558
559 #[test]
560 fn test_expand_tag() {
561 let mut vars = HashMap::new();
562 vars.insert("VERSION".to_string(), "1.0.0".to_string());
563 vars.insert("REGISTRY".to_string(), "ghcr.io/myorg".to_string());
564
565 let tag = "${REGISTRY}/app:${VERSION}";
566 let expanded = expand_tag_with_vars(tag, &vars);
567 assert_eq!(expanded, "ghcr.io/myorg/app:1.0.0");
568 }
569
570 #[test]
571 fn test_expand_tag_partial() {
572 let mut vars = HashMap::new();
573 vars.insert("VERSION".to_string(), "1.0.0".to_string());
574
575 let tag = "myapp:${VERSION}-${UNKNOWN}";
577 let expanded = expand_tag_with_vars(tag, &vars);
578 assert_eq!(expanded, "myapp:1.0.0-${UNKNOWN}");
579 }
580
581 #[test]
582 fn test_pipeline_result_is_success() {
583 let mut result = PipelineResult {
584 succeeded: HashMap::new(),
585 failed: HashMap::new(),
586 total_time_ms: 100,
587 };
588
589 assert!(result.is_success());
590
591 result.failed.insert("app".to_string(), "error".to_string());
592 assert!(!result.is_success());
593 }
594
595 #[test]
596 fn test_pipeline_result_total_images() {
597 let mut result = PipelineResult {
598 succeeded: HashMap::new(),
599 failed: HashMap::new(),
600 total_time_ms: 100,
601 };
602
603 result.succeeded.insert(
604 "app1".to_string(),
605 BuiltImage {
606 image_id: "sha256:abc".to_string(),
607 tags: vec!["app1:latest".to_string()],
608 layer_count: 5,
609 size: 0,
610 build_time_ms: 50,
611 },
612 );
613 result
614 .failed
615 .insert("app2".to_string(), "error".to_string());
616
617 assert_eq!(result.total_images(), 2);
618 }
619
620 #[test]
621 fn test_builder_methods() {
622 let yaml = r#"
623images:
624 app:
625 file: Dockerfile
626push:
627 after_all: true
628"#;
629 let pipeline = parse_pipeline(yaml).unwrap();
630 let executor = PipelineExecutor::new(
631 pipeline,
632 PathBuf::from("/tmp"),
633 BuildahExecutor::with_path("/usr/bin/buildah"),
634 )
635 .fail_fast(false)
636 .push(false);
637
638 assert!(!executor.fail_fast);
639 assert!(!executor.push_enabled);
640 }
641}