Skip to main content

zlayer_builder/pipeline/
executor.rs

1//! Pipeline executor - Coordinates building multiple images with wave-based orchestration
2//!
3//! This module provides the [`PipelineExecutor`] which processes [`ZPipeline`] manifests,
4//! resolving dependencies and building images in parallel waves.
5//!
6//! # Execution Model
7//!
8//! Images are grouped into "waves" based on their dependency depth:
9//! - **Wave 0**: Images with no dependencies - can all run in parallel
10//! - **Wave 1**: Images that depend only on Wave 0 images
11//! - **Wave N**: Images that depend only on images from earlier waves
12//!
13//! Within each wave, all builds run concurrently, sharing the same
14//! [`BuildahExecutor`] (and thus cache storage).
15//!
16//! # Example
17//!
18//! ```no_run
19//! use zlayer_builder::pipeline::{PipelineExecutor, parse_pipeline};
20//! use zlayer_builder::BuildahExecutor;
21//! use std::path::PathBuf;
22//!
23//! # async fn example() -> Result<(), zlayer_builder::BuildError> {
24//! let yaml = std::fs::read_to_string("ZPipeline.yaml")?;
25//! let pipeline = parse_pipeline(&yaml)?;
26//!
27//! let executor = BuildahExecutor::new_async().await?;
28//! let result = PipelineExecutor::new(pipeline, PathBuf::from("."), executor)
29//!     .fail_fast(true)
30//!     .run()
31//!     .await?;
32//!
33//! println!("Built {} images in {}ms", result.succeeded.len(), result.total_time_ms);
34//! # Ok(())
35//! # }
36//! ```
37
38use std::collections::{HashMap, HashSet};
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41
42use tokio::task::JoinSet;
43use tracing::{error, info, warn};
44
45use serde::Deserialize;
46
47use crate::backend::BuildBackend;
48
49/// Minimal struct to read `source_hash` from a cached image's `config.json`.
50/// Separate from `SandboxImageConfig` which is macOS-only.
51#[derive(Deserialize)]
52struct CachedImageConfig {
53    #[serde(default)]
54    source_hash: Option<String>,
55}
56use crate::buildah::{BuildahCommand, BuildahExecutor};
57use crate::builder::{BuiltImage, ImageBuilder};
58use crate::error::{BuildError, Result};
59use zlayer_paths::ZLayerDirs;
60
61use super::types::{PipelineDefaults, PipelineImage, ZPipeline};
62
63#[cfg(feature = "local-registry")]
64use zlayer_registry::LocalRegistry;
65
66/// Result of a pipeline execution
67#[derive(Debug)]
68pub struct PipelineResult {
69    /// Images that were successfully built
70    pub succeeded: HashMap<String, BuiltImage>,
71    /// Images that failed to build (name -> error message)
72    pub failed: HashMap<String, String>,
73    /// Total execution time in milliseconds
74    pub total_time_ms: u64,
75}
76
77impl PipelineResult {
78    /// Returns true if all images were built successfully
79    #[must_use]
80    pub fn is_success(&self) -> bool {
81        self.failed.is_empty()
82    }
83
84    /// Returns the total number of images in the pipeline
85    #[must_use]
86    pub fn total_images(&self) -> usize {
87        self.succeeded.len() + self.failed.len()
88    }
89}
90
91/// Pipeline executor configuration and runtime
92///
93/// The executor processes a [`ZPipeline`] manifest, resolving dependencies
94/// and building images in parallel waves.
95pub struct PipelineExecutor {
96    /// The pipeline configuration
97    pipeline: ZPipeline,
98    /// Base directory for resolving relative paths
99    base_dir: PathBuf,
100    /// Buildah executor (shared across all builds)
101    executor: BuildahExecutor,
102    /// Pluggable build backend (buildah, sandbox, etc.).
103    ///
104    /// When set, builds delegate to this backend instead of using the
105    /// `executor` field directly.
106    backend: Option<Arc<dyn BuildBackend>>,
107    /// Whether to abort on first failure
108    fail_fast: bool,
109    /// Whether to push images after building
110    push_enabled: bool,
111    /// Optional local registry for sharing built images between pipeline stages
112    #[cfg(feature = "local-registry")]
113    local_registry: Option<Arc<LocalRegistry>>,
114}
115
116impl PipelineExecutor {
117    /// Create a new pipeline executor
118    ///
119    /// # Arguments
120    ///
121    /// * `pipeline` - The parsed `ZPipeline` configuration
122    /// * `base_dir` - Base directory for resolving relative paths in the pipeline
123    /// * `executor` - The buildah executor to use for all builds
124    #[must_use]
125    pub fn new(pipeline: ZPipeline, base_dir: PathBuf, executor: BuildahExecutor) -> Self {
126        // Determine push behavior from pipeline config
127        let push_enabled = pipeline.push.after_all;
128
129        Self {
130            pipeline,
131            base_dir,
132            executor,
133            backend: None,
134            fail_fast: true,
135            push_enabled,
136            #[cfg(feature = "local-registry")]
137            local_registry: None,
138        }
139    }
140
141    /// Create a new pipeline executor with an explicit [`BuildBackend`].
142    ///
143    /// The backend is used for all build, push, and manifest operations.
144    /// A default `BuildahExecutor` is kept for backwards compatibility but
145    /// is not used when a backend is set.
146    ///
147    /// # Arguments
148    ///
149    /// * `pipeline` - The parsed `ZPipeline` configuration
150    /// * `base_dir` - Base directory for resolving relative paths in the pipeline
151    /// * `backend`  - The build backend to use for all operations
152    #[must_use]
153    pub fn with_backend(
154        pipeline: ZPipeline,
155        base_dir: PathBuf,
156        backend: Arc<dyn BuildBackend>,
157    ) -> Self {
158        let push_enabled = pipeline.push.after_all;
159
160        Self {
161            pipeline,
162            base_dir,
163            executor: BuildahExecutor::default(),
164            backend: Some(backend),
165            fail_fast: true,
166            push_enabled,
167            #[cfg(feature = "local-registry")]
168            local_registry: None,
169        }
170    }
171
172    /// Set fail-fast mode (default: true)
173    ///
174    /// When enabled, the executor will abort immediately when any image
175    /// fails to build. When disabled, it will continue building independent
176    /// images even after failures.
177    #[must_use]
178    pub fn fail_fast(mut self, fail_fast: bool) -> Self {
179        self.fail_fast = fail_fast;
180        self
181    }
182
183    /// Enable or disable pushing (overrides `pipeline.push.after_all`)
184    ///
185    /// When enabled and all builds succeed, images will be pushed to their
186    /// configured registries.
187    #[must_use]
188    pub fn push(mut self, enabled: bool) -> Self {
189        self.push_enabled = enabled;
190        self
191    }
192
193    /// Set a local registry for sharing built images between pipeline stages.
194    ///
195    /// When set, each image build receives a fresh [`LocalRegistry`] handle
196    /// pointing at the same on-disk root, so downstream images can resolve
197    /// base images that were built by earlier waves.
198    #[cfg(feature = "local-registry")]
199    #[must_use]
200    pub fn with_local_registry(mut self, registry: Arc<LocalRegistry>) -> Self {
201        self.local_registry = Some(registry);
202        self
203    }
204
205    /// Resolve execution order into waves
206    ///
207    /// Returns a vector of waves, where each wave contains image names
208    /// that can be built in parallel. Images in wave N depend only on
209    /// images from waves 0..N-1.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if:
214    /// - An image depends on an unknown image
215    /// - A circular dependency is detected
216    fn resolve_execution_order(&self) -> Result<Vec<Vec<String>>> {
217        let mut waves: Vec<Vec<String>> = Vec::new();
218        let mut assigned: HashSet<String> = HashSet::new();
219        let mut remaining: HashSet<String> = self.pipeline.images.keys().cloned().collect();
220
221        // Validate: check for missing dependencies
222        for (name, image) in &self.pipeline.images {
223            for dep in &image.depends_on {
224                if !self.pipeline.images.contains_key(dep) {
225                    return Err(BuildError::invalid_instruction(
226                        "pipeline",
227                        format!("Image '{name}' depends on unknown image '{dep}'"),
228                    ));
229                }
230            }
231        }
232
233        // Build waves iteratively
234        while !remaining.is_empty() {
235            let mut wave: Vec<String> = Vec::new();
236
237            for name in &remaining {
238                let image = &self.pipeline.images[name];
239                // Can build if all dependencies are already assigned to previous waves
240                let deps_satisfied = image.depends_on.iter().all(|d| assigned.contains(d));
241                if deps_satisfied {
242                    wave.push(name.clone());
243                }
244            }
245
246            if wave.is_empty() {
247                // No images could be added to this wave - circular dependency
248                return Err(BuildError::CircularDependency {
249                    stages: remaining.into_iter().collect(),
250                });
251            }
252
253            // Move wave images from remaining to assigned
254            for name in &wave {
255                remaining.remove(name);
256                assigned.insert(name.clone());
257            }
258
259            waves.push(wave);
260        }
261
262        Ok(waves)
263    }
264
265    /// Execute the pipeline
266    ///
267    /// Builds all images in dependency order, with images in the same wave
268    /// running in parallel.
269    ///
270    /// # Returns
271    ///
272    /// A [`PipelineResult`] containing information about successful and failed builds.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if:
277    /// - The dependency graph is invalid (missing deps, cycles)
278    /// - Any build fails and `fail_fast` is enabled
279    pub async fn run(&self) -> Result<PipelineResult> {
280        let start = std::time::Instant::now();
281        let waves = self.resolve_execution_order()?;
282
283        let mut succeeded: HashMap<String, BuiltImage> = HashMap::new();
284        let mut failed: HashMap<String, String> = HashMap::new();
285
286        info!(
287            "Building {} images in {} waves",
288            self.pipeline.images.len(),
289            waves.len()
290        );
291
292        for (wave_idx, wave) in waves.iter().enumerate() {
293            info!("Wave {}: {:?}", wave_idx, wave);
294
295            // Check if we should abort due to previous failures
296            if self.fail_fast && !failed.is_empty() {
297                warn!("Aborting pipeline due to previous failures (fail_fast enabled)");
298                break;
299            }
300
301            // Build all images in this wave concurrently
302            let wave_results = self.build_wave(wave).await;
303
304            // Process results
305            for (name, result) in wave_results {
306                match result {
307                    Ok(image) => {
308                        info!("[{}] Build succeeded: {}", name, image.image_id);
309                        succeeded.insert(name, image);
310                    }
311                    Err(e) => {
312                        error!("[{}] Build failed: {}", name, e);
313                        failed.insert(name.clone(), e.to_string());
314
315                        if self.fail_fast {
316                            // Return early with the first error
317                            return Err(e);
318                        }
319                    }
320                }
321            }
322        }
323
324        // Push phase (only if all succeeded and push enabled)
325        if self.push_enabled && failed.is_empty() {
326            info!("Pushing {} images", succeeded.len());
327
328            // Ensure secondary tags have on-disk directories (sandbox backend
329            // stores the rootfs under the first tag only; additional tags need
330            // to be created before push can find them).
331            if let Some(ref backend) = self.backend {
332                for image in succeeded.values() {
333                    if image.tags.len() > 1 {
334                        let first = &image.tags[0];
335                        for secondary in &image.tags[1..] {
336                            if let Err(e) = backend.tag_image(first, secondary).await {
337                                warn!("Failed to tag {} as {}: {}", first, secondary, e);
338                            }
339                        }
340                    }
341                }
342            }
343
344            for (name, image) in &succeeded {
345                for tag in &image.tags {
346                    let push_result = if image.is_manifest {
347                        self.push_manifest(tag).await
348                    } else {
349                        self.push_image(tag).await
350                    };
351
352                    if let Err(e) = push_result {
353                        warn!("[{}] Failed to push {}: {}", name, tag, e);
354                        // Push failures don't fail the overall pipeline
355                        // since the images were built successfully
356                    } else {
357                        info!("[{}] Pushed: {}", name, tag);
358                    }
359                }
360            }
361        }
362
363        #[allow(clippy::cast_possible_truncation)]
364        let total_time_ms = start.elapsed().as_millis() as u64;
365
366        Ok(PipelineResult {
367            succeeded,
368            failed,
369            total_time_ms,
370        })
371    }
372
373    /// Build all images in a wave concurrently
374    ///
375    /// Each image is checked for multi-platform configuration. Images with
376    /// 2+ platforms use `build_multiplatform_image` (manifest list), images
377    /// with exactly 1 platform use `build_single_image` with that platform
378    /// set, and images with no platforms use the native platform (existing
379    /// behavior).
380    ///
381    /// Returns a vector of (name, result) tuples for each image in the wave.
382    async fn build_wave(&self, wave: &[String]) -> Vec<(String, Result<BuiltImage>)> {
383        // Create shared data for spawned tasks
384        let pipeline = Arc::new(self.pipeline.clone());
385        let base_dir = Arc::new(self.base_dir.clone());
386        let executor = self.executor.clone();
387        let backend = self.backend.clone();
388
389        // Extract local registry root path (if configured) so spawned tasks
390        // can create their own LocalRegistry handles pointing at the same store.
391        #[cfg(feature = "local-registry")]
392        let registry_root: Option<PathBuf> =
393            self.local_registry.as_ref().map(|r| r.root().to_path_buf());
394        #[cfg(not(feature = "local-registry"))]
395        let registry_root: Option<PathBuf> = None;
396
397        let mut set = JoinSet::new();
398
399        for name in wave {
400            let name = name.clone();
401            let pipeline = Arc::clone(&pipeline);
402            let base_dir = Arc::clone(&base_dir);
403            let executor = executor.clone();
404            let backend = backend.clone();
405            let registry_root = registry_root.clone();
406
407            set.spawn(async move {
408                let platforms = {
409                    let image_config = &pipeline.images[&name];
410                    effective_platforms(image_config, &pipeline.defaults)
411                };
412
413                let result = match platforms.len() {
414                    // No platforms specified — native build (existing behavior)
415                    0 => {
416                        build_single_image(
417                            &name,
418                            &pipeline,
419                            &base_dir,
420                            executor,
421                            backend.as_ref().map(Arc::clone),
422                            None,
423                            registry_root.as_deref(),
424                        )
425                        .await
426                    }
427                    // Single platform — use build_single_image with platform set
428                    1 => {
429                        let platform = platforms[0].clone();
430                        build_single_image(
431                            &name,
432                            &pipeline,
433                            &base_dir,
434                            executor,
435                            backend.as_ref().map(Arc::clone),
436                            Some(&platform),
437                            registry_root.as_deref(),
438                        )
439                        .await
440                    }
441                    // Multiple platforms — build each, then create manifest list
442                    _ => {
443                        build_multiplatform_image(
444                            &name,
445                            &pipeline,
446                            &base_dir,
447                            executor,
448                            backend.as_ref().map(Arc::clone),
449                            &platforms,
450                            registry_root.as_deref(),
451                        )
452                        .await
453                    }
454                };
455
456                (name, result)
457            });
458        }
459
460        // Collect all results
461        let mut results = Vec::new();
462        while let Some(join_result) = set.join_next().await {
463            match join_result {
464                Ok((name, result)) => {
465                    results.push((name, result));
466                }
467                Err(e) => {
468                    // Task panicked
469                    error!("Build task panicked: {}", e);
470                    results.push((
471                        "unknown".to_string(),
472                        Err(BuildError::invalid_instruction(
473                            "pipeline",
474                            format!("Build task panicked: {e}"),
475                        )),
476                    ));
477                }
478            }
479        }
480
481        results
482    }
483
484    /// Push a regular image to its registry
485    async fn push_image(&self, tag: &str) -> Result<()> {
486        if let Some(ref backend) = self.backend {
487            return backend.push_image(tag, None).await;
488        }
489        let cmd = BuildahCommand::push(tag);
490        self.executor.execute_checked(&cmd).await?;
491        Ok(())
492    }
493
494    /// Push a manifest list (and all referenced images) to its registry
495    async fn push_manifest(&self, tag: &str) -> Result<()> {
496        if let Some(ref backend) = self.backend {
497            let destination = format!("docker://{tag}");
498            return backend.manifest_push(tag, &destination).await;
499        }
500        let destination = format!("docker://{tag}");
501        let cmd = BuildahCommand::manifest_push(tag, &destination);
502        self.executor.execute_checked(&cmd).await?;
503        Ok(())
504    }
505}
506
507/// Get the effective platforms for an image, considering defaults.
508///
509/// If the image specifies its own platforms, those take precedence.
510/// Otherwise, the pipeline-level defaults are used. An empty result
511/// means "native platform only" (no multi-arch).
512fn effective_platforms(image: &PipelineImage, defaults: &PipelineDefaults) -> Vec<String> {
513    if image.platforms.is_empty() {
514        defaults.platforms.clone()
515    } else {
516        image.platforms.clone()
517    }
518}
519
520/// Extract architecture suffix from a platform string.
521///
522/// # Examples
523///
524/// - `"linux/amd64"` -> `"amd64"`
525/// - `"linux/arm64"` -> `"arm64"`
526/// - `"linux/arm64/v8"` -> `"arm64-v8"`
527/// - `"linux"` -> `"linux"`
528fn platform_to_suffix(platform: &str) -> String {
529    let parts: Vec<&str> = platform.split('/').collect();
530    match parts.len() {
531        0 | 1 => platform.replace('/', "-"),
532        2 => parts[1].to_string(),
533        _ => format!("{}-{}", parts[1], parts[2]),
534    }
535}
536
537/// Apply pipeline configuration (`build_args`, format, `cache_mounts`, retries, `no_cache`)
538/// to an [`ImageBuilder`].
539///
540/// This merges default-level and per-image settings, with per-image values taking
541/// precedence for scalar settings and being additive for collections.
542fn apply_pipeline_config(
543    mut builder: ImageBuilder,
544    image_config: &PipelineImage,
545    defaults: &PipelineDefaults,
546) -> ImageBuilder {
547    // Merge build_args: defaults + per-image (per-image overrides defaults)
548    let mut args = defaults.build_args.clone();
549    args.extend(image_config.build_args.clone());
550    builder = builder.build_args(args);
551
552    // Format (per-image overrides default)
553    if let Some(fmt) = image_config.format.as_ref().or(defaults.format.as_ref()) {
554        builder = builder.format(fmt);
555    }
556
557    // No cache (per-image overrides default)
558    if image_config.no_cache.unwrap_or(defaults.no_cache) {
559        builder = builder.no_cache();
560    }
561
562    // Cache mounts: defaults + per-image (per-image are additive)
563    let mut cache_mounts = defaults.cache_mounts.clone();
564    cache_mounts.extend(image_config.cache_mounts.clone());
565    if !cache_mounts.is_empty() {
566        let run_mounts: Vec<_> = cache_mounts
567            .iter()
568            .map(crate::zimage::convert_cache_mount)
569            .collect();
570        builder = builder.default_cache_mounts(run_mounts);
571    }
572
573    // Retries (per-image overrides default)
574    let retries = image_config.retries.or(defaults.retries).unwrap_or(0);
575    if retries > 0 {
576        builder = builder.retries(retries);
577    }
578
579    builder
580}
581
582/// Detect whether a build file is a ZImagefile/YAML or a Dockerfile and
583/// configure the builder accordingly.
584fn apply_build_file(builder: ImageBuilder, file_path: &Path) -> ImageBuilder {
585    let file_name = file_path
586        .file_name()
587        .map(|n| n.to_string_lossy().to_string())
588        .unwrap_or_default();
589    let extension = file_path
590        .extension()
591        .map(|e| e.to_string_lossy().to_string())
592        .unwrap_or_default();
593
594    if extension == "yaml" || extension == "yml" || file_name.starts_with("ZImagefile") {
595        builder.zimagefile(file_path)
596    } else {
597        builder.dockerfile(file_path)
598    }
599}
600
601/// Compute a SHA-256 hash of a file's contents for content-based cache invalidation.
602///
603/// Returns `None` if the file cannot be read.
604async fn compute_file_hash(path: &Path) -> Option<String> {
605    use sha2::{Digest, Sha256};
606
607    let content = tokio::fs::read(path).await.ok()?;
608    let mut hasher = Sha256::new();
609    hasher.update(&content);
610    Some(format!("{:x}", hasher.finalize()))
611}
612
613/// Sanitize an image reference into a filesystem-safe directory name.
614///
615/// Mirrors the logic in `sandbox_builder::sanitize_image_name`.
616fn sanitize_image_name_for_cache(image: &str) -> String {
617    image.replace(['/', ':', '@'], "_")
618}
619
620/// Check if a cached sandbox image at `data_dir/images/{sanitized}/config.json`
621/// has a `source_hash` matching `expected_hash`.
622///
623/// Returns the sanitized image name if a match is found.
624async fn check_cached_image_hash(
625    data_dir: &Path,
626    tag: &str,
627    expected_hash: &str,
628) -> Option<String> {
629    let sanitized = sanitize_image_name_for_cache(tag);
630    let config_path = data_dir.join("images").join(&sanitized).join("config.json");
631    let data = tokio::fs::read_to_string(&config_path).await.ok()?;
632    let config: CachedImageConfig = serde_json::from_str(&data).ok()?;
633    if config.source_hash.as_deref() == Some(expected_hash) {
634        Some(sanitized)
635    } else {
636        None
637    }
638}
639
640/// Build a single image from the pipeline
641///
642/// This is extracted as a separate function to make it easier to spawn
643/// in a tokio task without borrowing issues.
644///
645/// When `platform` is `Some`, the builder is configured for that specific
646/// platform (e.g. `"linux/arm64"`), enabling cross-architecture builds.
647#[cfg_attr(not(feature = "local-registry"), allow(unused_variables))]
648async fn build_single_image(
649    name: &str,
650    pipeline: &ZPipeline,
651    base_dir: &Path,
652    executor: BuildahExecutor,
653    backend: Option<Arc<dyn BuildBackend>>,
654    platform: Option<&str>,
655    registry_root: Option<&Path>,
656) -> Result<BuiltImage> {
657    let image_config = &pipeline.images[name];
658    let context = base_dir.join(&image_config.context);
659    let file_path = base_dir.join(&image_config.file);
660
661    // Content-based cache invalidation: hash the build file and check if the
662    // output image was already built from identical source content.
663    let file_hash = compute_file_hash(&file_path).await;
664    if let Some(ref hash) = file_hash {
665        let data_dir = ZLayerDirs::default_data_dir();
666
667        let expanded_tags: Vec<String> = image_config
668            .tags
669            .iter()
670            .map(|t| expand_tag_with_vars(t, &pipeline.vars))
671            .collect();
672
673        // Check the first tag — if it has a cached image with matching hash, skip the build
674        if let Some(first_tag) = expanded_tags.first() {
675            if let Some(cached_id) = check_cached_image_hash(&data_dir, first_tag, hash).await {
676                info!(
677                    "[{}] Skipping build — cached image hash matches ({})",
678                    name, cached_id
679                );
680                return Ok(BuiltImage {
681                    image_id: cached_id,
682                    tags: expanded_tags,
683                    layer_count: 1,
684                    size: 0,
685                    build_time_ms: 0,
686                    is_manifest: false,
687                });
688            }
689        }
690    }
691
692    let effective_backend: Arc<dyn BuildBackend> = backend
693        .unwrap_or_else(|| Arc::new(crate::backend::BuildahBackend::with_executor(executor)));
694    let mut builder = ImageBuilder::with_backend(&context, effective_backend)?;
695
696    // Determine if this is a ZImagefile or Dockerfile based on extension/name
697    builder = apply_build_file(builder, &file_path);
698
699    // Pass the source hash so the sandbox builder stores it for future cache checks
700    if let Some(hash) = file_hash {
701        builder = builder.source_hash(hash);
702    }
703
704    // Set platform if specified
705    if let Some(plat) = platform {
706        builder = builder.platform(plat);
707    }
708
709    // Apply tags with variable expansion
710    for tag in &image_config.tags {
711        let expanded = expand_tag_with_vars(tag, &pipeline.vars);
712        builder = builder.tag(expanded);
713    }
714
715    // Apply shared pipeline config (build_args, format, no_cache, cache_mounts, retries)
716    builder = apply_pipeline_config(builder, image_config, &pipeline.defaults);
717
718    // Wire up local registry so this build can resolve images from earlier waves
719    #[cfg(feature = "local-registry")]
720    if let Some(root) = registry_root {
721        let shared_registry = LocalRegistry::new(root.to_path_buf()).await.map_err(|e| {
722            BuildError::invalid_instruction(
723                "pipeline",
724                format!("failed to open local registry: {e}"),
725            )
726        })?;
727        builder = builder.with_local_registry(shared_registry);
728    }
729
730    builder.build().await
731}
732
733/// Build an image for multiple platforms and create a manifest list.
734///
735/// Each platform is built sequentially (QEMU can be flaky with parallel
736/// cross-arch builds), then a buildah manifest list is created that
737/// references all per-platform images.
738#[cfg_attr(not(feature = "local-registry"), allow(unused_variables))]
739async fn build_multiplatform_image(
740    name: &str,
741    pipeline: &ZPipeline,
742    base_dir: &Path,
743    executor: BuildahExecutor,
744    backend: Option<Arc<dyn BuildBackend>>,
745    platforms: &[String],
746    registry_root: Option<&Path>,
747) -> Result<BuiltImage> {
748    let image_config = &pipeline.images[name];
749    let start_time = std::time::Instant::now();
750
751    // Expand tags with variables
752    let expanded_tags: Vec<String> = image_config
753        .tags
754        .iter()
755        .map(|t| expand_tag_with_vars(t, &pipeline.vars))
756        .collect();
757
758    let manifest_name = expanded_tags
759        .first()
760        .cloned()
761        .unwrap_or_else(|| format!("zlayer-manifest-{name}"));
762
763    // Build for each platform sequentially (QEMU can be flaky with parallel cross-arch)
764    let mut arch_tags: Vec<String> = Vec::new();
765    let mut total_layers = 0usize;
766    let mut total_size = 0u64;
767
768    for platform in platforms {
769        let suffix = platform_to_suffix(platform);
770        let platform_tags: Vec<String> = expanded_tags
771            .iter()
772            .map(|t| format!("{t}-{suffix}"))
773            .collect();
774
775        info!("[{name}] Building for platform {platform}");
776
777        // Build with platform-specific tags
778        let context = base_dir.join(&image_config.context);
779        let file_path = base_dir.join(&image_config.file);
780
781        let effective_backend: Arc<dyn BuildBackend> = match backend {
782            Some(ref b) => Arc::clone(b),
783            None => Arc::new(crate::backend::BuildahBackend::with_executor(
784                executor.clone(),
785            )),
786        };
787        let mut builder = ImageBuilder::with_backend(&context, effective_backend)?;
788
789        // Determine file type (same detection as build_single_image)
790        builder = apply_build_file(builder, &file_path);
791
792        // Set platform
793        builder = builder.platform(platform);
794
795        // Apply platform-specific tags
796        for tag in &platform_tags {
797            builder = builder.tag(tag);
798        }
799
800        // Apply shared config (build_args, format, no_cache, cache_mounts, retries)
801        builder = apply_pipeline_config(builder, image_config, &pipeline.defaults);
802
803        // Wire up local registry so this build can resolve images from earlier waves
804        #[cfg(feature = "local-registry")]
805        if let Some(root) = registry_root {
806            let shared_registry = LocalRegistry::new(root.to_path_buf()).await.map_err(|e| {
807                BuildError::invalid_instruction(
808                    "pipeline",
809                    format!("failed to open local registry: {e}"),
810                )
811            })?;
812            builder = builder.with_local_registry(shared_registry);
813        }
814
815        let built = builder.build().await?;
816        total_layers += built.layer_count;
817        total_size += built.size;
818
819        if let Some(first_tag) = platform_tags.first() {
820            arch_tags.push(first_tag.clone());
821        }
822    }
823
824    // Assemble the manifest list from per-platform images
825    assemble_manifest(
826        name,
827        &manifest_name,
828        &arch_tags,
829        &expanded_tags,
830        backend.as_ref(),
831        &executor,
832    )
833    .await?;
834
835    #[allow(clippy::cast_possible_truncation)]
836    let build_time_ms = start_time.elapsed().as_millis() as u64;
837
838    Ok(BuiltImage {
839        image_id: manifest_name,
840        tags: expanded_tags,
841        layer_count: total_layers,
842        size: total_size,
843        build_time_ms,
844        is_manifest: true,
845    })
846}
847
848/// Create a manifest list, add per-platform images, and apply additional tags.
849///
850/// This is a helper extracted from [`build_multiplatform_image`] to keep that
851/// function under the line-count limit.
852async fn assemble_manifest(
853    name: &str,
854    manifest_name: &str,
855    arch_tags: &[String],
856    expanded_tags: &[String],
857    backend: Option<&Arc<dyn BuildBackend>>,
858    executor: &BuildahExecutor,
859) -> Result<()> {
860    // Create manifest list — delegate to backend if available
861    info!("[{name}] Creating manifest: {manifest_name}");
862    if let Some(backend) = backend {
863        backend
864            .manifest_create(manifest_name)
865            .await
866            .map_err(|e| BuildError::pipeline_error(format!("manifest create failed: {e}")))?;
867    } else {
868        executor
869            .execute_checked(&BuildahCommand::manifest_create(manifest_name))
870            .await
871            .map_err(|e| BuildError::pipeline_error(format!("manifest create failed: {e}")))?;
872    }
873
874    // Add each arch image to the manifest
875    for arch_tag in arch_tags {
876        info!("[{name}] Adding to manifest: {arch_tag}");
877        if let Some(backend) = backend {
878            backend
879                .manifest_add(manifest_name, arch_tag)
880                .await
881                .map_err(|e| BuildError::pipeline_error(format!("manifest add failed: {e}")))?;
882        } else {
883            executor
884                .execute_checked(&BuildahCommand::manifest_add(manifest_name, arch_tag))
885                .await
886                .map_err(|e| BuildError::pipeline_error(format!("manifest add failed: {e}")))?;
887        }
888    }
889
890    // Tag the manifest with additional tags
891    for tag in expanded_tags.iter().skip(1) {
892        if let Some(backend) = backend {
893            backend
894                .tag_image(manifest_name, tag)
895                .await
896                .map_err(|e| BuildError::pipeline_error(format!("manifest tag failed: {e}")))?;
897        } else {
898            executor
899                .execute_checked(&BuildahCommand::tag(manifest_name, tag))
900                .await
901                .map_err(|e| BuildError::pipeline_error(format!("manifest tag failed: {e}")))?;
902        }
903    }
904
905    Ok(())
906}
907
908/// Expand variables in a tag string
909///
910/// Standalone function for use in spawned tasks.
911fn expand_tag_with_vars(tag: &str, vars: &HashMap<String, String>) -> String {
912    let mut result = tag.to_string();
913    for (key, value) in vars {
914        result = result.replace(&format!("${{{key}}}"), value);
915    }
916    result
917}
918
919#[cfg(test)]
920mod tests {
921    use super::*;
922    use crate::pipeline::parse_pipeline;
923
924    #[test]
925    fn test_resolve_execution_order_simple() {
926        let yaml = r"
927images:
928  app:
929    file: Dockerfile
930";
931        let pipeline = parse_pipeline(yaml).unwrap();
932        let executor = PipelineExecutor::new(
933            pipeline,
934            PathBuf::from("/tmp"),
935            BuildahExecutor::with_path("/usr/bin/buildah"),
936        );
937
938        let waves = executor.resolve_execution_order().unwrap();
939        assert_eq!(waves.len(), 1);
940        assert_eq!(waves[0], vec!["app"]);
941    }
942
943    #[test]
944    fn test_resolve_execution_order_with_deps() {
945        let yaml = r"
946images:
947  base:
948    file: Dockerfile.base
949  app:
950    file: Dockerfile.app
951    depends_on: [base]
952  test:
953    file: Dockerfile.test
954    depends_on: [app]
955";
956        let pipeline = parse_pipeline(yaml).unwrap();
957        let executor = PipelineExecutor::new(
958            pipeline,
959            PathBuf::from("/tmp"),
960            BuildahExecutor::with_path("/usr/bin/buildah"),
961        );
962
963        let waves = executor.resolve_execution_order().unwrap();
964        assert_eq!(waves.len(), 3);
965        assert_eq!(waves[0], vec!["base"]);
966        assert_eq!(waves[1], vec!["app"]);
967        assert_eq!(waves[2], vec!["test"]);
968    }
969
970    #[test]
971    fn test_resolve_execution_order_parallel() {
972        let yaml = r"
973images:
974  base:
975    file: Dockerfile.base
976  app1:
977    file: Dockerfile.app1
978    depends_on: [base]
979  app2:
980    file: Dockerfile.app2
981    depends_on: [base]
982";
983        let pipeline = parse_pipeline(yaml).unwrap();
984        let executor = PipelineExecutor::new(
985            pipeline,
986            PathBuf::from("/tmp"),
987            BuildahExecutor::with_path("/usr/bin/buildah"),
988        );
989
990        let waves = executor.resolve_execution_order().unwrap();
991        assert_eq!(waves.len(), 2);
992        assert_eq!(waves[0], vec!["base"]);
993        // app1 and app2 should be in the same wave (order may vary)
994        assert_eq!(waves[1].len(), 2);
995        assert!(waves[1].contains(&"app1".to_string()));
996        assert!(waves[1].contains(&"app2".to_string()));
997    }
998
999    #[test]
1000    fn test_resolve_execution_order_missing_dep() {
1001        let yaml = r"
1002images:
1003  app:
1004    file: Dockerfile
1005    depends_on: [missing]
1006";
1007        let pipeline = parse_pipeline(yaml).unwrap();
1008        let executor = PipelineExecutor::new(
1009            pipeline,
1010            PathBuf::from("/tmp"),
1011            BuildahExecutor::with_path("/usr/bin/buildah"),
1012        );
1013
1014        let result = executor.resolve_execution_order();
1015        assert!(result.is_err());
1016        assert!(result.unwrap_err().to_string().contains("missing"));
1017    }
1018
1019    #[test]
1020    fn test_resolve_execution_order_circular() {
1021        let yaml = r"
1022images:
1023  a:
1024    file: Dockerfile.a
1025    depends_on: [b]
1026  b:
1027    file: Dockerfile.b
1028    depends_on: [a]
1029";
1030        let pipeline = parse_pipeline(yaml).unwrap();
1031        let executor = PipelineExecutor::new(
1032            pipeline,
1033            PathBuf::from("/tmp"),
1034            BuildahExecutor::with_path("/usr/bin/buildah"),
1035        );
1036
1037        let result = executor.resolve_execution_order();
1038        assert!(result.is_err());
1039        match result.unwrap_err() {
1040            BuildError::CircularDependency { stages } => {
1041                assert!(stages.contains(&"a".to_string()));
1042                assert!(stages.contains(&"b".to_string()));
1043            }
1044            e => panic!("Expected CircularDependency error, got: {e:?}"),
1045        }
1046    }
1047
1048    #[test]
1049    fn test_expand_tag() {
1050        let mut vars = HashMap::new();
1051        vars.insert("VERSION".to_string(), "1.0.0".to_string());
1052        vars.insert("REGISTRY".to_string(), "ghcr.io/myorg".to_string());
1053
1054        let tag = "${REGISTRY}/app:${VERSION}";
1055        let expanded = expand_tag_with_vars(tag, &vars);
1056        assert_eq!(expanded, "ghcr.io/myorg/app:1.0.0");
1057    }
1058
1059    #[test]
1060    fn test_expand_tag_partial() {
1061        let mut vars = HashMap::new();
1062        vars.insert("VERSION".to_string(), "1.0.0".to_string());
1063
1064        // Unknown vars are left as-is
1065        let tag = "myapp:${VERSION}-${UNKNOWN}";
1066        let expanded = expand_tag_with_vars(tag, &vars);
1067        assert_eq!(expanded, "myapp:1.0.0-${UNKNOWN}");
1068    }
1069
1070    #[test]
1071    fn test_pipeline_result_is_success() {
1072        let mut result = PipelineResult {
1073            succeeded: HashMap::new(),
1074            failed: HashMap::new(),
1075            total_time_ms: 100,
1076        };
1077
1078        assert!(result.is_success());
1079
1080        result.failed.insert("app".to_string(), "error".to_string());
1081        assert!(!result.is_success());
1082    }
1083
1084    #[test]
1085    fn test_pipeline_result_total_images() {
1086        let mut result = PipelineResult {
1087            succeeded: HashMap::new(),
1088            failed: HashMap::new(),
1089            total_time_ms: 100,
1090        };
1091
1092        result.succeeded.insert(
1093            "app1".to_string(),
1094            BuiltImage {
1095                image_id: "sha256:abc".to_string(),
1096                tags: vec!["app1:latest".to_string()],
1097                layer_count: 5,
1098                size: 0,
1099                build_time_ms: 50,
1100                is_manifest: false,
1101            },
1102        );
1103        result
1104            .failed
1105            .insert("app2".to_string(), "error".to_string());
1106
1107        assert_eq!(result.total_images(), 2);
1108    }
1109
1110    #[test]
1111    fn test_builder_methods() {
1112        let yaml = r"
1113images:
1114  app:
1115    file: Dockerfile
1116push:
1117  after_all: true
1118";
1119        let pipeline = parse_pipeline(yaml).unwrap();
1120        let executor = PipelineExecutor::new(
1121            pipeline,
1122            PathBuf::from("/tmp"),
1123            BuildahExecutor::with_path("/usr/bin/buildah"),
1124        )
1125        .fail_fast(false)
1126        .push(false);
1127
1128        assert!(!executor.fail_fast);
1129        assert!(!executor.push_enabled);
1130    }
1131
1132    /// Helper to create a minimal `PipelineImage` for tests.
1133    fn test_pipeline_image() -> PipelineImage {
1134        PipelineImage {
1135            file: PathBuf::from("Dockerfile"),
1136            context: PathBuf::from("."),
1137            tags: vec![],
1138            build_args: HashMap::new(),
1139            depends_on: vec![],
1140            no_cache: None,
1141            format: None,
1142            cache_mounts: vec![],
1143            retries: None,
1144            platforms: vec![],
1145        }
1146    }
1147
1148    #[test]
1149    fn test_platform_to_suffix() {
1150        assert_eq!(platform_to_suffix("linux/amd64"), "amd64");
1151        assert_eq!(platform_to_suffix("linux/arm64"), "arm64");
1152        assert_eq!(platform_to_suffix("linux/arm64/v8"), "arm64-v8");
1153        assert_eq!(platform_to_suffix("linux"), "linux");
1154    }
1155
1156    #[test]
1157    fn test_effective_platforms_image_overrides() {
1158        let defaults = PipelineDefaults {
1159            platforms: vec!["linux/amd64".into()],
1160            ..Default::default()
1161        };
1162        let image = PipelineImage {
1163            platforms: vec!["linux/arm64".into()],
1164            ..test_pipeline_image()
1165        };
1166        assert_eq!(effective_platforms(&image, &defaults), vec!["linux/arm64"]);
1167    }
1168
1169    #[test]
1170    fn test_effective_platforms_inherits_defaults() {
1171        let defaults = PipelineDefaults {
1172            platforms: vec!["linux/amd64".into()],
1173            ..Default::default()
1174        };
1175        let image = test_pipeline_image();
1176        assert_eq!(effective_platforms(&image, &defaults), vec!["linux/amd64"]);
1177    }
1178
1179    #[test]
1180    fn test_effective_platforms_empty() {
1181        let defaults = PipelineDefaults::default();
1182        let image = test_pipeline_image();
1183        assert!(effective_platforms(&image, &defaults).is_empty());
1184    }
1185
1186    #[test]
1187    fn test_platform_to_suffix_edge_cases() {
1188        // Empty string
1189        assert_eq!(platform_to_suffix(""), "");
1190        // Single component
1191        assert_eq!(platform_to_suffix("linux"), "linux");
1192        // Four components (unusual but handle gracefully)
1193        assert_eq!(platform_to_suffix("linux/arm/v7/extra"), "arm-v7");
1194    }
1195
1196    #[test]
1197    fn test_effective_platforms_multiple_defaults() {
1198        let defaults = PipelineDefaults {
1199            platforms: vec!["linux/amd64".into(), "linux/arm64".into()],
1200            ..Default::default()
1201        };
1202        let image = test_pipeline_image();
1203        assert_eq!(
1204            effective_platforms(&image, &defaults),
1205            vec!["linux/amd64", "linux/arm64"]
1206        );
1207    }
1208
1209    #[test]
1210    fn test_effective_platforms_image_overrides_multiple() {
1211        let defaults = PipelineDefaults {
1212            platforms: vec!["linux/amd64".into(), "linux/arm64".into()],
1213            ..Default::default()
1214        };
1215        let image = PipelineImage {
1216            platforms: vec!["linux/s390x".into()],
1217            ..test_pipeline_image()
1218        };
1219        // Image platforms completely replace defaults, not merge
1220        assert_eq!(effective_platforms(&image, &defaults), vec!["linux/s390x"]);
1221    }
1222}