Skip to main content

zlayer_builder/pipeline/
executor.rs

1//! Pipeline executor - Coordinates building multiple images with wave-based orchestration
2//!
3//! This module provides the [`PipelineExecutor`] which processes [`ZPipeline`] manifests,
4//! resolving dependencies and building images in parallel waves.
5//!
6//! # Execution Model
7//!
8//! Images are grouped into "waves" based on their dependency depth:
9//! - **Wave 0**: Images with no dependencies - can all run in parallel
10//! - **Wave 1**: Images that depend only on Wave 0 images
11//! - **Wave N**: Images that depend only on images from earlier waves
12//!
13//! Within each wave, all builds run concurrently, sharing the same
14//! [`BuildahExecutor`] (and thus cache storage).
15//!
16//! # Example
17//!
18//! ```no_run
19//! use zlayer_builder::pipeline::{PipelineExecutor, parse_pipeline};
20//! use zlayer_builder::BuildahExecutor;
21//! use std::path::PathBuf;
22//!
23//! # async fn example() -> Result<(), zlayer_builder::BuildError> {
24//! let yaml = std::fs::read_to_string("ZPipeline.yaml")?;
25//! let pipeline = parse_pipeline(&yaml)?;
26//!
27//! let executor = BuildahExecutor::new_async().await?;
28//! let result = PipelineExecutor::new(pipeline, PathBuf::from("."), executor)
29//!     .fail_fast(true)
30//!     .run()
31//!     .await?;
32//!
33//! println!("Built {} images in {}ms", result.succeeded.len(), result.total_time_ms);
34//! # Ok(())
35//! # }
36//! ```
37
38use std::collections::{HashMap, HashSet};
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use tokio::task::JoinSet;
43use tracing::{error, info, warn};
44
45use crate::buildah::BuildahExecutor;
46use crate::builder::{BuiltImage, ImageBuilder};
47use crate::error::{BuildError, Result};
48
49use super::types::ZPipeline;
50
51/// Result of a pipeline execution
52#[derive(Debug)]
53pub struct PipelineResult {
54    /// Images that were successfully built
55    pub succeeded: HashMap<String, BuiltImage>,
56    /// Images that failed to build (name -> error message)
57    pub failed: HashMap<String, String>,
58    /// Total execution time in milliseconds
59    pub total_time_ms: u64,
60}
61
62impl PipelineResult {
63    /// Returns true if all images were built successfully
64    #[must_use]
65    pub fn is_success(&self) -> bool {
66        self.failed.is_empty()
67    }
68
69    /// Returns the total number of images in the pipeline
70    #[must_use]
71    pub fn total_images(&self) -> usize {
72        self.succeeded.len() + self.failed.len()
73    }
74}
75
76/// Pipeline executor configuration and runtime
77///
78/// The executor processes a [`ZPipeline`] manifest, resolving dependencies
79/// and building images in parallel waves.
80pub struct PipelineExecutor {
81    /// The pipeline configuration
82    pipeline: ZPipeline,
83    /// Base directory for resolving relative paths
84    base_dir: PathBuf,
85    /// Buildah executor (shared across all builds)
86    executor: BuildahExecutor,
87    /// Whether to abort on first failure
88    fail_fast: bool,
89    /// Whether to push images after building
90    push_enabled: bool,
91}
92
93impl PipelineExecutor {
94    /// Create a new pipeline executor
95    ///
96    /// # Arguments
97    ///
98    /// * `pipeline` - The parsed `ZPipeline` configuration
99    /// * `base_dir` - Base directory for resolving relative paths in the pipeline
100    /// * `executor` - The buildah executor to use for all builds
101    #[must_use]
102    pub fn new(pipeline: ZPipeline, base_dir: PathBuf, executor: BuildahExecutor) -> Self {
103        // Determine push behavior from pipeline config
104        let push_enabled = pipeline.push.after_all;
105
106        Self {
107            pipeline,
108            base_dir,
109            executor,
110            fail_fast: true,
111            push_enabled,
112        }
113    }
114
115    /// Set fail-fast mode (default: true)
116    ///
117    /// When enabled, the executor will abort immediately when any image
118    /// fails to build. When disabled, it will continue building independent
119    /// images even after failures.
120    #[must_use]
121    pub fn fail_fast(mut self, fail_fast: bool) -> Self {
122        self.fail_fast = fail_fast;
123        self
124    }
125
126    /// Enable or disable pushing (overrides `pipeline.push.after_all`)
127    ///
128    /// When enabled and all builds succeed, images will be pushed to their
129    /// configured registries.
130    #[must_use]
131    pub fn push(mut self, enabled: bool) -> Self {
132        self.push_enabled = enabled;
133        self
134    }
135
136    /// Resolve execution order into waves
137    ///
138    /// Returns a vector of waves, where each wave contains image names
139    /// that can be built in parallel. Images in wave N depend only on
140    /// images from waves 0..N-1.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if:
145    /// - An image depends on an unknown image
146    /// - A circular dependency is detected
147    fn resolve_execution_order(&self) -> Result<Vec<Vec<String>>> {
148        let mut waves: Vec<Vec<String>> = Vec::new();
149        let mut assigned: HashSet<String> = HashSet::new();
150        let mut remaining: HashSet<String> = self.pipeline.images.keys().cloned().collect();
151
152        // Validate: check for missing dependencies
153        for (name, image) in &self.pipeline.images {
154            for dep in &image.depends_on {
155                if !self.pipeline.images.contains_key(dep) {
156                    return Err(BuildError::invalid_instruction(
157                        "pipeline",
158                        format!("Image '{name}' depends on unknown image '{dep}'"),
159                    ));
160                }
161            }
162        }
163
164        // Build waves iteratively
165        while !remaining.is_empty() {
166            let mut wave: Vec<String> = Vec::new();
167
168            for name in &remaining {
169                let image = &self.pipeline.images[name];
170                // Can build if all dependencies are already assigned to previous waves
171                let deps_satisfied = image.depends_on.iter().all(|d| assigned.contains(d));
172                if deps_satisfied {
173                    wave.push(name.clone());
174                }
175            }
176
177            if wave.is_empty() {
178                // No images could be added to this wave - circular dependency
179                return Err(BuildError::CircularDependency {
180                    stages: remaining.into_iter().collect(),
181                });
182            }
183
184            // Move wave images from remaining to assigned
185            for name in &wave {
186                remaining.remove(name);
187                assigned.insert(name.clone());
188            }
189
190            waves.push(wave);
191        }
192
193        Ok(waves)
194    }
195
196    /// Execute the pipeline
197    ///
198    /// Builds all images in dependency order, with images in the same wave
199    /// running in parallel.
200    ///
201    /// # Returns
202    ///
203    /// A [`PipelineResult`] containing information about successful and failed builds.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if:
208    /// - The dependency graph is invalid (missing deps, cycles)
209    /// - Any build fails and `fail_fast` is enabled
210    pub async fn run(&self) -> Result<PipelineResult> {
211        let start = std::time::Instant::now();
212        let waves = self.resolve_execution_order()?;
213
214        let mut succeeded: HashMap<String, BuiltImage> = HashMap::new();
215        let mut failed: HashMap<String, String> = HashMap::new();
216
217        info!(
218            "Building {} images in {} waves",
219            self.pipeline.images.len(),
220            waves.len()
221        );
222
223        for (wave_idx, wave) in waves.iter().enumerate() {
224            info!("Wave {}: {:?}", wave_idx, wave);
225
226            // Check if we should abort due to previous failures
227            if self.fail_fast && !failed.is_empty() {
228                warn!("Aborting pipeline due to previous failures (fail_fast enabled)");
229                break;
230            }
231
232            // Build all images in this wave concurrently
233            let wave_results = self.build_wave(wave).await;
234
235            // Process results
236            for (name, result) in wave_results {
237                match result {
238                    Ok(image) => {
239                        info!("[{}] Build succeeded: {}", name, image.image_id);
240                        succeeded.insert(name, image);
241                    }
242                    Err(e) => {
243                        error!("[{}] Build failed: {}", name, e);
244                        failed.insert(name.clone(), e.to_string());
245
246                        if self.fail_fast {
247                            // Return early with the first error
248                            return Err(e);
249                        }
250                    }
251                }
252            }
253        }
254
255        // Push phase (only if all succeeded and push enabled)
256        if self.push_enabled && failed.is_empty() {
257            info!("Pushing {} images", succeeded.len());
258
259            for (name, image) in &succeeded {
260                for tag in &image.tags {
261                    if let Err(e) = self.push_image(tag).await {
262                        warn!("[{}] Failed to push {}: {}", name, tag, e);
263                        // Push failures don't fail the overall pipeline
264                        // since the images were built successfully
265                    } else {
266                        info!("[{}] Pushed: {}", name, tag);
267                    }
268                }
269            }
270        }
271
272        #[allow(clippy::cast_possible_truncation)]
273        let total_time_ms = start.elapsed().as_millis() as u64;
274
275        Ok(PipelineResult {
276            succeeded,
277            failed,
278            total_time_ms,
279        })
280    }
281
282    /// Build all images in a wave concurrently
283    ///
284    /// Returns a vector of (name, result) tuples for each image in the wave.
285    async fn build_wave(&self, wave: &[String]) -> Vec<(String, Result<BuiltImage>)> {
286        // Create shared data for spawned tasks
287        let pipeline = Arc::new(self.pipeline.clone());
288        let base_dir = Arc::new(self.base_dir.clone());
289        let executor = self.executor.clone();
290
291        let mut set = JoinSet::new();
292
293        for name in wave {
294            let name = name.clone();
295            let pipeline = Arc::clone(&pipeline);
296            let base_dir = Arc::clone(&base_dir);
297            let executor = executor.clone();
298
299            set.spawn(async move {
300                let result = build_single_image(&name, &pipeline, &base_dir, executor).await;
301                (name, result)
302            });
303        }
304
305        // Collect all results
306        let mut results = Vec::new();
307        while let Some(join_result) = set.join_next().await {
308            match join_result {
309                Ok((name, result)) => {
310                    results.push((name, result));
311                }
312                Err(e) => {
313                    // Task panicked
314                    error!("Build task panicked: {}", e);
315                    results.push((
316                        "unknown".to_string(),
317                        Err(BuildError::invalid_instruction(
318                            "pipeline",
319                            format!("Build task panicked: {e}"),
320                        )),
321                    ));
322                }
323            }
324        }
325
326        results
327    }
328
329    /// Push an image to its registry
330    async fn push_image(&self, tag: &str) -> Result<()> {
331        use crate::buildah::BuildahCommand;
332
333        let cmd = BuildahCommand::push(tag);
334        self.executor.execute_checked(&cmd).await?;
335        Ok(())
336    }
337}
338
339/// Build a single image from the pipeline
340///
341/// This is extracted as a separate function to make it easier to spawn
342/// in a tokio task without borrowing issues.
343async fn build_single_image(
344    name: &str,
345    pipeline: &ZPipeline,
346    base_dir: &Path,
347    executor: BuildahExecutor,
348) -> Result<BuiltImage> {
349    let image_config = &pipeline.images[name];
350    let context = base_dir.join(&image_config.context);
351    let file_path = base_dir.join(&image_config.file);
352
353    let mut builder = ImageBuilder::with_executor(&context, executor)?;
354
355    // Determine if this is a ZImagefile or Dockerfile based on extension/name
356    let file_name = file_path
357        .file_name()
358        .map(|n| n.to_string_lossy().to_string())
359        .unwrap_or_default();
360    let extension = file_path
361        .extension()
362        .map(|e| e.to_string_lossy().to_string())
363        .unwrap_or_default();
364
365    if extension == "yaml" || extension == "yml" || file_name.starts_with("ZImagefile") {
366        builder = builder.zimagefile(&file_path);
367    } else {
368        builder = builder.dockerfile(&file_path);
369    }
370
371    // Apply tags with variable expansion
372    for tag in &image_config.tags {
373        let expanded = expand_tag_with_vars(tag, &pipeline.vars);
374        builder = builder.tag(expanded);
375    }
376
377    // Merge build_args: defaults + per-image (per-image overrides defaults)
378    let mut args = pipeline.defaults.build_args.clone();
379    args.extend(image_config.build_args.clone());
380    builder = builder.build_args(args);
381
382    // Apply format (per-image overrides default)
383    if let Some(fmt) = image_config
384        .format
385        .as_ref()
386        .or(pipeline.defaults.format.as_ref())
387    {
388        builder = builder.format(fmt);
389    }
390
391    // Apply no_cache (per-image overrides default)
392    if image_config.no_cache.unwrap_or(pipeline.defaults.no_cache) {
393        builder = builder.no_cache();
394    }
395
396    // Merge cache mounts: defaults + per-image (per-image are additive)
397    let mut cache_mounts = pipeline.defaults.cache_mounts.clone();
398    cache_mounts.extend(image_config.cache_mounts.clone());
399    if !cache_mounts.is_empty() {
400        let run_mounts: Vec<_> = cache_mounts
401            .iter()
402            .map(crate::zimage::convert_cache_mount)
403            .collect();
404        builder = builder.default_cache_mounts(run_mounts);
405    }
406
407    // Apply retries (per-image overrides default)
408    let retries = image_config
409        .retries
410        .or(pipeline.defaults.retries)
411        .unwrap_or(0);
412    if retries > 0 {
413        builder = builder.retries(retries);
414    }
415
416    builder.build().await
417}
418
419/// Expand variables in a tag string
420///
421/// Standalone function for use in spawned tasks.
422fn expand_tag_with_vars(tag: &str, vars: &HashMap<String, String>) -> String {
423    let mut result = tag.to_string();
424    for (key, value) in vars {
425        result = result.replace(&format!("${{{key}}}"), value);
426    }
427    result
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::pipeline::parse_pipeline;
434
435    #[test]
436    fn test_resolve_execution_order_simple() {
437        let yaml = r#"
438images:
439  app:
440    file: Dockerfile
441"#;
442        let pipeline = parse_pipeline(yaml).unwrap();
443        let executor = PipelineExecutor::new(
444            pipeline,
445            PathBuf::from("/tmp"),
446            BuildahExecutor::with_path("/usr/bin/buildah"),
447        );
448
449        let waves = executor.resolve_execution_order().unwrap();
450        assert_eq!(waves.len(), 1);
451        assert_eq!(waves[0], vec!["app"]);
452    }
453
454    #[test]
455    fn test_resolve_execution_order_with_deps() {
456        let yaml = r#"
457images:
458  base:
459    file: Dockerfile.base
460  app:
461    file: Dockerfile.app
462    depends_on: [base]
463  test:
464    file: Dockerfile.test
465    depends_on: [app]
466"#;
467        let pipeline = parse_pipeline(yaml).unwrap();
468        let executor = PipelineExecutor::new(
469            pipeline,
470            PathBuf::from("/tmp"),
471            BuildahExecutor::with_path("/usr/bin/buildah"),
472        );
473
474        let waves = executor.resolve_execution_order().unwrap();
475        assert_eq!(waves.len(), 3);
476        assert_eq!(waves[0], vec!["base"]);
477        assert_eq!(waves[1], vec!["app"]);
478        assert_eq!(waves[2], vec!["test"]);
479    }
480
481    #[test]
482    fn test_resolve_execution_order_parallel() {
483        let yaml = r#"
484images:
485  base:
486    file: Dockerfile.base
487  app1:
488    file: Dockerfile.app1
489    depends_on: [base]
490  app2:
491    file: Dockerfile.app2
492    depends_on: [base]
493"#;
494        let pipeline = parse_pipeline(yaml).unwrap();
495        let executor = PipelineExecutor::new(
496            pipeline,
497            PathBuf::from("/tmp"),
498            BuildahExecutor::with_path("/usr/bin/buildah"),
499        );
500
501        let waves = executor.resolve_execution_order().unwrap();
502        assert_eq!(waves.len(), 2);
503        assert_eq!(waves[0], vec!["base"]);
504        // app1 and app2 should be in the same wave (order may vary)
505        assert_eq!(waves[1].len(), 2);
506        assert!(waves[1].contains(&"app1".to_string()));
507        assert!(waves[1].contains(&"app2".to_string()));
508    }
509
510    #[test]
511    fn test_resolve_execution_order_missing_dep() {
512        let yaml = r#"
513images:
514  app:
515    file: Dockerfile
516    depends_on: [missing]
517"#;
518        let pipeline = parse_pipeline(yaml).unwrap();
519        let executor = PipelineExecutor::new(
520            pipeline,
521            PathBuf::from("/tmp"),
522            BuildahExecutor::with_path("/usr/bin/buildah"),
523        );
524
525        let result = executor.resolve_execution_order();
526        assert!(result.is_err());
527        assert!(result.unwrap_err().to_string().contains("missing"));
528    }
529
530    #[test]
531    fn test_resolve_execution_order_circular() {
532        let yaml = r#"
533images:
534  a:
535    file: Dockerfile.a
536    depends_on: [b]
537  b:
538    file: Dockerfile.b
539    depends_on: [a]
540"#;
541        let pipeline = parse_pipeline(yaml).unwrap();
542        let executor = PipelineExecutor::new(
543            pipeline,
544            PathBuf::from("/tmp"),
545            BuildahExecutor::with_path("/usr/bin/buildah"),
546        );
547
548        let result = executor.resolve_execution_order();
549        assert!(result.is_err());
550        match result.unwrap_err() {
551            BuildError::CircularDependency { stages } => {
552                assert!(stages.contains(&"a".to_string()));
553                assert!(stages.contains(&"b".to_string()));
554            }
555            e => panic!("Expected CircularDependency error, got: {:?}", e),
556        }
557    }
558
559    #[test]
560    fn test_expand_tag() {
561        let mut vars = HashMap::new();
562        vars.insert("VERSION".to_string(), "1.0.0".to_string());
563        vars.insert("REGISTRY".to_string(), "ghcr.io/myorg".to_string());
564
565        let tag = "${REGISTRY}/app:${VERSION}";
566        let expanded = expand_tag_with_vars(tag, &vars);
567        assert_eq!(expanded, "ghcr.io/myorg/app:1.0.0");
568    }
569
570    #[test]
571    fn test_expand_tag_partial() {
572        let mut vars = HashMap::new();
573        vars.insert("VERSION".to_string(), "1.0.0".to_string());
574
575        // Unknown vars are left as-is
576        let tag = "myapp:${VERSION}-${UNKNOWN}";
577        let expanded = expand_tag_with_vars(tag, &vars);
578        assert_eq!(expanded, "myapp:1.0.0-${UNKNOWN}");
579    }
580
581    #[test]
582    fn test_pipeline_result_is_success() {
583        let mut result = PipelineResult {
584            succeeded: HashMap::new(),
585            failed: HashMap::new(),
586            total_time_ms: 100,
587        };
588
589        assert!(result.is_success());
590
591        result.failed.insert("app".to_string(), "error".to_string());
592        assert!(!result.is_success());
593    }
594
595    #[test]
596    fn test_pipeline_result_total_images() {
597        let mut result = PipelineResult {
598            succeeded: HashMap::new(),
599            failed: HashMap::new(),
600            total_time_ms: 100,
601        };
602
603        result.succeeded.insert(
604            "app1".to_string(),
605            BuiltImage {
606                image_id: "sha256:abc".to_string(),
607                tags: vec!["app1:latest".to_string()],
608                layer_count: 5,
609                size: 0,
610                build_time_ms: 50,
611            },
612        );
613        result
614            .failed
615            .insert("app2".to_string(), "error".to_string());
616
617        assert_eq!(result.total_images(), 2);
618    }
619
620    #[test]
621    fn test_builder_methods() {
622        let yaml = r#"
623images:
624  app:
625    file: Dockerfile
626push:
627  after_all: true
628"#;
629        let pipeline = parse_pipeline(yaml).unwrap();
630        let executor = PipelineExecutor::new(
631            pipeline,
632            PathBuf::from("/tmp"),
633            BuildahExecutor::with_path("/usr/bin/buildah"),
634        )
635        .fail_fast(false)
636        .push(false);
637
638        assert!(!executor.fail_fast);
639        assert!(!executor.push_enabled);
640    }
641}