Skip to main content

zlayer_builder/pipeline/
executor.rs

1//! Pipeline executor - Coordinates building multiple images with wave-based orchestration
2//!
3//! This module provides the [`PipelineExecutor`] which processes [`ZPipeline`] manifests,
4//! resolving dependencies and building images in parallel waves.
5//!
6//! # Execution Model
7//!
8//! Images are grouped into "waves" based on their dependency depth:
9//! - **Wave 0**: Images with no dependencies - can all run in parallel
10//! - **Wave 1**: Images that depend only on Wave 0 images
11//! - **Wave N**: Images that depend only on images from earlier waves
12//!
13//! Within each wave, all builds run concurrently, sharing the same
14//! [`BuildahExecutor`] (and thus cache storage).
15//!
16//! # Example
17//!
18//! ```no_run
19//! use zlayer_builder::pipeline::{PipelineExecutor, parse_pipeline};
20//! use zlayer_builder::BuildahExecutor;
21//! use std::path::PathBuf;
22//!
23//! # async fn example() -> Result<(), zlayer_builder::BuildError> {
24//! let yaml = std::fs::read_to_string("ZPipeline.yaml")?;
25//! let pipeline = parse_pipeline(&yaml)?;
26//!
27//! let executor = BuildahExecutor::new_async().await?;
28//! let result = PipelineExecutor::new(pipeline, PathBuf::from("."), executor)
29//!     .fail_fast(true)
30//!     .run()
31//!     .await?;
32//!
33//! println!("Built {} images in {}ms", result.succeeded.len(), result.total_time_ms);
34//! # Ok(())
35//! # }
36//! ```
37
38use std::collections::{HashMap, HashSet};
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use tokio::task::JoinSet;
43use tracing::{error, info, warn};
44
45use crate::buildah::BuildahExecutor;
46use crate::builder::{BuiltImage, ImageBuilder};
47use crate::error::{BuildError, Result};
48
49use super::types::ZPipeline;
50
51/// Result of a pipeline execution
52#[derive(Debug)]
53pub struct PipelineResult {
54    /// Images that were successfully built
55    pub succeeded: HashMap<String, BuiltImage>,
56    /// Images that failed to build (name -> error message)
57    pub failed: HashMap<String, String>,
58    /// Total execution time in milliseconds
59    pub total_time_ms: u64,
60}
61
62impl PipelineResult {
63    /// Returns true if all images were built successfully
64    pub fn is_success(&self) -> bool {
65        self.failed.is_empty()
66    }
67
68    /// Returns the total number of images in the pipeline
69    pub fn total_images(&self) -> usize {
70        self.succeeded.len() + self.failed.len()
71    }
72}
73
74/// Pipeline executor configuration and runtime
75///
76/// The executor processes a [`ZPipeline`] manifest, resolving dependencies
77/// and building images in parallel waves.
78pub struct PipelineExecutor {
79    /// The pipeline configuration
80    pipeline: ZPipeline,
81    /// Base directory for resolving relative paths
82    base_dir: PathBuf,
83    /// Buildah executor (shared across all builds)
84    executor: BuildahExecutor,
85    /// Whether to abort on first failure
86    fail_fast: bool,
87    /// Whether to push images after building
88    push_enabled: bool,
89}
90
91impl PipelineExecutor {
92    /// Create a new pipeline executor
93    ///
94    /// # Arguments
95    ///
96    /// * `pipeline` - The parsed ZPipeline configuration
97    /// * `base_dir` - Base directory for resolving relative paths in the pipeline
98    /// * `executor` - The buildah executor to use for all builds
99    pub fn new(pipeline: ZPipeline, base_dir: PathBuf, executor: BuildahExecutor) -> Self {
100        // Determine push behavior from pipeline config
101        let push_enabled = pipeline.push.after_all;
102
103        Self {
104            pipeline,
105            base_dir,
106            executor,
107            fail_fast: true,
108            push_enabled,
109        }
110    }
111
112    /// Set fail-fast mode (default: true)
113    ///
114    /// When enabled, the executor will abort immediately when any image
115    /// fails to build. When disabled, it will continue building independent
116    /// images even after failures.
117    pub fn fail_fast(mut self, fail_fast: bool) -> Self {
118        self.fail_fast = fail_fast;
119        self
120    }
121
122    /// Enable or disable pushing (overrides pipeline.push.after_all)
123    ///
124    /// When enabled and all builds succeed, images will be pushed to their
125    /// configured registries.
126    pub fn push(mut self, enabled: bool) -> Self {
127        self.push_enabled = enabled;
128        self
129    }
130
131    /// Resolve execution order into waves
132    ///
133    /// Returns a vector of waves, where each wave contains image names
134    /// that can be built in parallel. Images in wave N depend only on
135    /// images from waves 0..N-1.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - An image depends on an unknown image
141    /// - A circular dependency is detected
142    fn resolve_execution_order(&self) -> Result<Vec<Vec<String>>> {
143        let mut waves: Vec<Vec<String>> = Vec::new();
144        let mut assigned: HashSet<String> = HashSet::new();
145        let mut remaining: HashSet<String> = self.pipeline.images.keys().cloned().collect();
146
147        // Validate: check for missing dependencies
148        for (name, image) in &self.pipeline.images {
149            for dep in &image.depends_on {
150                if !self.pipeline.images.contains_key(dep) {
151                    return Err(BuildError::invalid_instruction(
152                        "pipeline",
153                        format!("Image '{}' depends on unknown image '{}'", name, dep),
154                    ));
155                }
156            }
157        }
158
159        // Build waves iteratively
160        while !remaining.is_empty() {
161            let mut wave: Vec<String> = Vec::new();
162
163            for name in &remaining {
164                let image = &self.pipeline.images[name];
165                // Can build if all dependencies are already assigned to previous waves
166                let deps_satisfied = image.depends_on.iter().all(|d| assigned.contains(d));
167                if deps_satisfied {
168                    wave.push(name.clone());
169                }
170            }
171
172            if wave.is_empty() {
173                // No images could be added to this wave - circular dependency
174                return Err(BuildError::CircularDependency {
175                    stages: remaining.into_iter().collect(),
176                });
177            }
178
179            // Move wave images from remaining to assigned
180            for name in &wave {
181                remaining.remove(name);
182                assigned.insert(name.clone());
183            }
184
185            waves.push(wave);
186        }
187
188        Ok(waves)
189    }
190
191    /// Execute the pipeline
192    ///
193    /// Builds all images in dependency order, with images in the same wave
194    /// running in parallel.
195    ///
196    /// # Returns
197    ///
198    /// A [`PipelineResult`] containing information about successful and failed builds.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if:
203    /// - The dependency graph is invalid (missing deps, cycles)
204    /// - Any build fails and `fail_fast` is enabled
205    pub async fn run(&self) -> Result<PipelineResult> {
206        let start = std::time::Instant::now();
207        let waves = self.resolve_execution_order()?;
208
209        let mut succeeded: HashMap<String, BuiltImage> = HashMap::new();
210        let mut failed: HashMap<String, String> = HashMap::new();
211
212        info!(
213            "Building {} images in {} waves",
214            self.pipeline.images.len(),
215            waves.len()
216        );
217
218        for (wave_idx, wave) in waves.iter().enumerate() {
219            info!("Wave {}: {:?}", wave_idx, wave);
220
221            // Check if we should abort due to previous failures
222            if self.fail_fast && !failed.is_empty() {
223                warn!("Aborting pipeline due to previous failures (fail_fast enabled)");
224                break;
225            }
226
227            // Build all images in this wave concurrently
228            let wave_results = self.build_wave(wave).await;
229
230            // Process results
231            for (name, result) in wave_results {
232                match result {
233                    Ok(image) => {
234                        info!("[{}] Build succeeded: {}", name, image.image_id);
235                        succeeded.insert(name, image);
236                    }
237                    Err(e) => {
238                        error!("[{}] Build failed: {}", name, e);
239                        failed.insert(name.clone(), e.to_string());
240
241                        if self.fail_fast {
242                            // Return early with the first error
243                            return Err(e);
244                        }
245                    }
246                }
247            }
248        }
249
250        // Push phase (only if all succeeded and push enabled)
251        if self.push_enabled && failed.is_empty() {
252            info!("Pushing {} images", succeeded.len());
253
254            for (name, image) in &succeeded {
255                for tag in &image.tags {
256                    if let Err(e) = self.push_image(tag).await {
257                        warn!("[{}] Failed to push {}: {}", name, tag, e);
258                        // Push failures don't fail the overall pipeline
259                        // since the images were built successfully
260                    } else {
261                        info!("[{}] Pushed: {}", name, tag);
262                    }
263                }
264            }
265        }
266
267        Ok(PipelineResult {
268            succeeded,
269            failed,
270            total_time_ms: start.elapsed().as_millis() as u64,
271        })
272    }
273
274    /// Build all images in a wave concurrently
275    ///
276    /// Returns a vector of (name, result) tuples for each image in the wave.
277    async fn build_wave(&self, wave: &[String]) -> Vec<(String, Result<BuiltImage>)> {
278        // Create shared data for spawned tasks
279        let pipeline = Arc::new(self.pipeline.clone());
280        let base_dir = Arc::new(self.base_dir.clone());
281        let executor = self.executor.clone();
282
283        let mut set = JoinSet::new();
284
285        for name in wave {
286            let name = name.clone();
287            let pipeline = Arc::clone(&pipeline);
288            let base_dir = Arc::clone(&base_dir);
289            let executor = executor.clone();
290
291            set.spawn(async move {
292                let result = build_single_image(&name, &pipeline, &base_dir, executor).await;
293                (name, result)
294            });
295        }
296
297        // Collect all results
298        let mut results = Vec::new();
299        while let Some(join_result) = set.join_next().await {
300            match join_result {
301                Ok((name, result)) => {
302                    results.push((name, result));
303                }
304                Err(e) => {
305                    // Task panicked
306                    error!("Build task panicked: {}", e);
307                    results.push((
308                        "unknown".to_string(),
309                        Err(BuildError::invalid_instruction(
310                            "pipeline",
311                            format!("Build task panicked: {}", e),
312                        )),
313                    ));
314                }
315            }
316        }
317
318        results
319    }
320
321    /// Push an image to its registry
322    async fn push_image(&self, tag: &str) -> Result<()> {
323        use crate::buildah::BuildahCommand;
324
325        let cmd = BuildahCommand::push(tag);
326        self.executor.execute_checked(&cmd).await?;
327        Ok(())
328    }
329}
330
331/// Build a single image from the pipeline
332///
333/// This is extracted as a separate function to make it easier to spawn
334/// in a tokio task without borrowing issues.
335async fn build_single_image(
336    name: &str,
337    pipeline: &ZPipeline,
338    base_dir: &Path,
339    executor: BuildahExecutor,
340) -> Result<BuiltImage> {
341    let image_config = &pipeline.images[name];
342    let context = base_dir.join(&image_config.context);
343    let file_path = base_dir.join(&image_config.file);
344
345    let mut builder = ImageBuilder::with_executor(&context, executor)?;
346
347    // Determine if this is a ZImagefile or Dockerfile based on extension/name
348    let file_name = file_path
349        .file_name()
350        .map(|n| n.to_string_lossy().to_string())
351        .unwrap_or_default();
352    let extension = file_path
353        .extension()
354        .map(|e| e.to_string_lossy().to_string())
355        .unwrap_or_default();
356
357    if extension == "yaml" || extension == "yml" || file_name.starts_with("ZImagefile") {
358        builder = builder.zimagefile(&file_path);
359    } else {
360        builder = builder.dockerfile(&file_path);
361    }
362
363    // Apply tags with variable expansion
364    for tag in &image_config.tags {
365        let expanded = expand_tag_with_vars(tag, &pipeline.vars);
366        builder = builder.tag(expanded);
367    }
368
369    // Merge build_args: defaults + per-image (per-image overrides defaults)
370    let mut args = pipeline.defaults.build_args.clone();
371    args.extend(image_config.build_args.clone());
372    builder = builder.build_args(args);
373
374    // Apply format (per-image overrides default)
375    if let Some(fmt) = image_config
376        .format
377        .as_ref()
378        .or(pipeline.defaults.format.as_ref())
379    {
380        builder = builder.format(fmt);
381    }
382
383    // Apply no_cache (per-image overrides default)
384    if image_config.no_cache.unwrap_or(pipeline.defaults.no_cache) {
385        builder = builder.no_cache();
386    }
387
388    // Merge cache mounts: defaults + per-image (per-image are additive)
389    let mut cache_mounts = pipeline.defaults.cache_mounts.clone();
390    cache_mounts.extend(image_config.cache_mounts.clone());
391    if !cache_mounts.is_empty() {
392        let run_mounts: Vec<_> = cache_mounts
393            .iter()
394            .map(crate::zimage::convert_cache_mount)
395            .collect();
396        builder = builder.default_cache_mounts(run_mounts);
397    }
398
399    // Apply retries (per-image overrides default)
400    let retries = image_config
401        .retries
402        .or(pipeline.defaults.retries)
403        .unwrap_or(0);
404    if retries > 0 {
405        builder = builder.retries(retries);
406    }
407
408    builder.build().await
409}
410
411/// Expand variables in a tag string
412///
413/// Standalone function for use in spawned tasks.
414fn expand_tag_with_vars(tag: &str, vars: &HashMap<String, String>) -> String {
415    let mut result = tag.to_string();
416    for (key, value) in vars {
417        result = result.replace(&format!("${{{}}}", key), value);
418    }
419    result
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use crate::pipeline::parse_pipeline;
426
427    #[test]
428    fn test_resolve_execution_order_simple() {
429        let yaml = r#"
430images:
431  app:
432    file: Dockerfile
433"#;
434        let pipeline = parse_pipeline(yaml).unwrap();
435        let executor = PipelineExecutor::new(
436            pipeline,
437            PathBuf::from("/tmp"),
438            BuildahExecutor::with_path("/usr/bin/buildah"),
439        );
440
441        let waves = executor.resolve_execution_order().unwrap();
442        assert_eq!(waves.len(), 1);
443        assert_eq!(waves[0], vec!["app"]);
444    }
445
446    #[test]
447    fn test_resolve_execution_order_with_deps() {
448        let yaml = r#"
449images:
450  base:
451    file: Dockerfile.base
452  app:
453    file: Dockerfile.app
454    depends_on: [base]
455  test:
456    file: Dockerfile.test
457    depends_on: [app]
458"#;
459        let pipeline = parse_pipeline(yaml).unwrap();
460        let executor = PipelineExecutor::new(
461            pipeline,
462            PathBuf::from("/tmp"),
463            BuildahExecutor::with_path("/usr/bin/buildah"),
464        );
465
466        let waves = executor.resolve_execution_order().unwrap();
467        assert_eq!(waves.len(), 3);
468        assert_eq!(waves[0], vec!["base"]);
469        assert_eq!(waves[1], vec!["app"]);
470        assert_eq!(waves[2], vec!["test"]);
471    }
472
473    #[test]
474    fn test_resolve_execution_order_parallel() {
475        let yaml = r#"
476images:
477  base:
478    file: Dockerfile.base
479  app1:
480    file: Dockerfile.app1
481    depends_on: [base]
482  app2:
483    file: Dockerfile.app2
484    depends_on: [base]
485"#;
486        let pipeline = parse_pipeline(yaml).unwrap();
487        let executor = PipelineExecutor::new(
488            pipeline,
489            PathBuf::from("/tmp"),
490            BuildahExecutor::with_path("/usr/bin/buildah"),
491        );
492
493        let waves = executor.resolve_execution_order().unwrap();
494        assert_eq!(waves.len(), 2);
495        assert_eq!(waves[0], vec!["base"]);
496        // app1 and app2 should be in the same wave (order may vary)
497        assert_eq!(waves[1].len(), 2);
498        assert!(waves[1].contains(&"app1".to_string()));
499        assert!(waves[1].contains(&"app2".to_string()));
500    }
501
502    #[test]
503    fn test_resolve_execution_order_missing_dep() {
504        let yaml = r#"
505images:
506  app:
507    file: Dockerfile
508    depends_on: [missing]
509"#;
510        let pipeline = parse_pipeline(yaml).unwrap();
511        let executor = PipelineExecutor::new(
512            pipeline,
513            PathBuf::from("/tmp"),
514            BuildahExecutor::with_path("/usr/bin/buildah"),
515        );
516
517        let result = executor.resolve_execution_order();
518        assert!(result.is_err());
519        assert!(result.unwrap_err().to_string().contains("missing"));
520    }
521
522    #[test]
523    fn test_resolve_execution_order_circular() {
524        let yaml = r#"
525images:
526  a:
527    file: Dockerfile.a
528    depends_on: [b]
529  b:
530    file: Dockerfile.b
531    depends_on: [a]
532"#;
533        let pipeline = parse_pipeline(yaml).unwrap();
534        let executor = PipelineExecutor::new(
535            pipeline,
536            PathBuf::from("/tmp"),
537            BuildahExecutor::with_path("/usr/bin/buildah"),
538        );
539
540        let result = executor.resolve_execution_order();
541        assert!(result.is_err());
542        match result.unwrap_err() {
543            BuildError::CircularDependency { stages } => {
544                assert!(stages.contains(&"a".to_string()));
545                assert!(stages.contains(&"b".to_string()));
546            }
547            e => panic!("Expected CircularDependency error, got: {:?}", e),
548        }
549    }
550
551    #[test]
552    fn test_expand_tag() {
553        let mut vars = HashMap::new();
554        vars.insert("VERSION".to_string(), "1.0.0".to_string());
555        vars.insert("REGISTRY".to_string(), "ghcr.io/myorg".to_string());
556
557        let tag = "${REGISTRY}/app:${VERSION}";
558        let expanded = expand_tag_with_vars(tag, &vars);
559        assert_eq!(expanded, "ghcr.io/myorg/app:1.0.0");
560    }
561
562    #[test]
563    fn test_expand_tag_partial() {
564        let mut vars = HashMap::new();
565        vars.insert("VERSION".to_string(), "1.0.0".to_string());
566
567        // Unknown vars are left as-is
568        let tag = "myapp:${VERSION}-${UNKNOWN}";
569        let expanded = expand_tag_with_vars(tag, &vars);
570        assert_eq!(expanded, "myapp:1.0.0-${UNKNOWN}");
571    }
572
573    #[test]
574    fn test_pipeline_result_is_success() {
575        let mut result = PipelineResult {
576            succeeded: HashMap::new(),
577            failed: HashMap::new(),
578            total_time_ms: 100,
579        };
580
581        assert!(result.is_success());
582
583        result.failed.insert("app".to_string(), "error".to_string());
584        assert!(!result.is_success());
585    }
586
587    #[test]
588    fn test_pipeline_result_total_images() {
589        let mut result = PipelineResult {
590            succeeded: HashMap::new(),
591            failed: HashMap::new(),
592            total_time_ms: 100,
593        };
594
595        result.succeeded.insert(
596            "app1".to_string(),
597            BuiltImage {
598                image_id: "sha256:abc".to_string(),
599                tags: vec!["app1:latest".to_string()],
600                layer_count: 5,
601                size: 0,
602                build_time_ms: 50,
603            },
604        );
605        result
606            .failed
607            .insert("app2".to_string(), "error".to_string());
608
609        assert_eq!(result.total_images(), 2);
610    }
611
612    #[test]
613    fn test_builder_methods() {
614        let yaml = r#"
615images:
616  app:
617    file: Dockerfile
618push:
619  after_all: true
620"#;
621        let pipeline = parse_pipeline(yaml).unwrap();
622        let executor = PipelineExecutor::new(
623            pipeline,
624            PathBuf::from("/tmp"),
625            BuildahExecutor::with_path("/usr/bin/buildah"),
626        )
627        .fail_fast(false)
628        .push(false);
629
630        assert!(!executor.fail_fast);
631        assert!(!executor.push_enabled);
632    }
633}