anodizer 0.9.1

A Rust-native release automation tool inspired by GoReleaser
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
//! Release-pipeline orchestration for the `anodizer` CLI.
//!
//! This module is split into focused submodules:
//!
//! - [`config_loader`] — config discovery, format detection, `includes:`
//!   resolution, and post-load normalization.
//! - [`monorepo`] — monorepo path-prefix defaulting.
//! - [`builders`] — the `build_*_pipeline` constructors for each entry point.
//!
//! The [`Pipeline`] type and its sequential `run` loop live here; the
//! submodules' public surface is re-exported below so external callers keep
//! reaching items via `crate::pipeline::<item>`.

use anodizer_core::context::Context;
use anodizer_core::log::StageLogger;
use anodizer_core::stage::Stage;
use anyhow::Result;
use colored::Colorize;

mod builders;
mod config_loader;
mod monorepo;

pub use anodizer_core::hooks::run_hooks;

pub(crate) use builders::build_publish_only_pipeline;
pub use builders::{
    build_announce_pipeline, build_merge_pipeline, build_publish_pipeline, build_release_pipeline,
    build_split_pipeline,
};
pub use config_loader::{find_config, find_config_with_logger, load_config, load_repo_config};

pub struct Pipeline {
    stages: Vec<Box<dyn Stage>>,
    /// Whether this pipeline is expected to have a compiled binary for
    /// every in-scope crate that configures a binary-requiring surface.
    ///
    /// Set only on the build-producing pipelines (full release, which
    /// runs `BuildStage`, and merge, which pre-loads the split shards'
    /// binaries before the pipeline runs). The publish-only / publish /
    /// announce pipelines leave it `false`: they rehydrate or never touch
    /// binary artifacts, so a missing binary there is not a build mistake
    /// the guard should fail on.
    expects_binaries: bool,
}

impl Pipeline {
    pub fn new() -> Self {
        Self {
            stages: vec![],
            expects_binaries: false,
        }
    }

    pub fn add(&mut self, stage: Box<dyn Stage>) {
        self.stages.push(stage);
    }

    /// Arm the binary-artifact guard for this pipeline. Call only on the
    /// build-producing pipelines (full release, merge); see
    /// [`anodizer_core::binary_artifact_guard`].
    pub(crate) fn expect_binaries(&mut self) {
        self.expects_binaries = true;
    }

    /// Returns the registered stage names in pipeline order. Used by the
    /// pipeline-construction tests to assert stage ordering invariants
    /// (e.g. blob runs before snapcraft-publish so the submitter gate
    /// sees blob's outcome via `ctx.publish_report`).
    #[cfg(test)]
    pub fn stage_names(&self) -> Vec<&str> {
        self.stages.iter().map(|s| s.name()).collect()
    }

    /// Run every registered stage in order; `emit_summary` always
    /// fires after the inner body returns, regardless of `Ok`/`Err`.
    ///
    /// `emit_summary` runs at the pipeline level (not inside
    /// `AnnounceStage::run`) so the end-of-pipeline status table and
    /// `--summary-json=<path>` write always fire — including when
    /// announce itself is operator-skipped via `--skip=announce`. The
    /// scope-guard shape (inner-fn returns the outcome, outer wrapper
    /// calls `emit_summary` then propagates) means the summary fires
    /// on Ok, Err, AND when the pipeline body short-circuits early
    /// via `?`.
    ///
    /// # Panics
    ///
    /// If a stage panics, the unwind happens BEFORE the
    /// `emit_summary` post-call runs, so a panicking pipeline body
    /// will skip the summary write. This is an accepted limitation
    /// — a stage that panics is a bug in the stage (or a panic from
    /// `unwrap`/`expect` we missed in review), not an operator
    /// error we can recover from. The release pipeline is built
    /// around `Result`-propagation; a panic means something the
    /// review failed to catch is wrong, and dropping `summary.json`
    /// in that scenario is bug-on-bug (the missing summary is a
    /// downstream symptom of the underlying panic, not a release
    /// gate). A `scopeguard::defer!` wrapper would close this
    /// window but adds a panic-safety abstraction layer the rest
    /// of the codebase doesn't use; the inner-fn shape mirrors the
    /// convention already established by
    /// `AnnounceStage::run` → `announce_body`.
    pub fn run(&self, ctx: &mut Context, log: &StageLogger) -> Result<()> {
        let outcome = self.run_inner(ctx, log);
        anodizer_stage_announce::emit_summary(ctx);
        outcome
    }

    /// Inner pipeline body. Lives separately so `Pipeline::run` can
    /// wrap it in the unconditional `emit_summary` post-call — see
    /// the audit reference at the top of `run`.
    fn run_inner(&self, ctx: &mut Context, log: &StageLogger) -> Result<()> {
        // Skip-stage validation runs at the CLI entry (`validate_skip_values`
        // in main.rs); the command never reaches this point with an unknown
        // value. No runtime warning is needed.

        // Stages that only make sense when binary artifacts exist.  When the
        // build stage produces no binaries (library-only crate), these stages
        // are skipped with a clear message instead of silently reporting ✓.
        const BINARY_DEPENDENT_STAGES: &[&str] = &[
            "upx",
            "archive",
            "makeself",
            "appimage",
            "nfpm",
            "snapcraft",
            "appbundle",
            "dmg",
            "msi",
            "pkg",
            "nsis",
            "flatpak",
            "notarize",
            "srpm",
        ];

        // Check if binaries already exist (merge mode loads artifacts before
        // the pipeline runs, so build stage never executes).
        let mut has_binaries = ctx.artifacts.all().iter().any(|a| {
            matches!(
                a.kind,
                anodizer_core::artifact::ArtifactKind::Binary
                    | anodizer_core::artifact::ArtifactKind::UploadableBinary
                    | anodizer_core::artifact::ArtifactKind::UniversalBinary
            )
        });

        // Whether `BuildStage` runs inside this pipeline. Drives where the
        // binary-artifact guard fires: merge pre-loads its binaries (no
        // build stage), so the guard runs up-front here; the full-release
        // and determinism-harness-child pipelines compile in-process, so
        // their guard runs immediately after the build stage completes.
        //
        // `--skip=build` registers the build stage but skips its body in the
        // loop below, so a registered-but-skipped build must count as "not
        // running": otherwise the up-front guard never fires (build appears
        // in-pipeline) AND the post-build guard never fires (the stage body
        // is `continue`d) — silently bypassing the guard. Honoring the skip
        // set routes such runs through the up-front guard, which validates
        // the prebuilt / pre-loaded binaries.
        let build_in_pipeline =
            self.stages.iter().any(|s| s.name() == "build") && !ctx.should_skip("build");

        // Merge-mode checkpoint: binaries are already loaded, so the
        // artifact set is final before the first stage runs.
        if self.expects_binaries && !build_in_pipeline {
            // Merge mode pre-loaded every crate's binaries and ran no build
            // stage, so there is no built-set to scope by — pass `None` to
            // check every in-scope crate.
            anodizer_core::binary_artifact_guard::check(
                &ctx.config,
                &ctx.artifacts,
                &ctx.options.selected_crates,
                None,
            )?;
        }

        // Consecutive skipped stages collapse into one `skipped  a, b, c`
        // row (flushed when a stage actually runs, or at end of loop)
        // instead of one line per stage — a heavily-skipped run (e.g. a
        // determinism-harness child) would otherwise drown its real
        // output in dozens of single-skip lines.
        let mut pending_skips: Vec<(&str, bool)> = Vec::new();

        for stage in &self.stages {
            let name = stage.name();
            // Operator-skipped stage: no section is opened (the deferred
            // header only prints on a real body line, which a skip is
            // not); buffer the name for the consolidated row.
            if ctx.should_skip(name) {
                pending_skips.push((name, false));
                continue;
            }

            // After the build stage, check if any binary artifacts were produced.
            // Skip binary-dependent stages if not (library-only crate).
            // NOTE: This is a pipeline optimization, not a feature skip. Each stage
            // checks its own config internally; stages with no config return Ok(())
            // immediately. The strict_guard for "no binaries" lives inside the
            // individual stages (e.g., archive, upx) where it fires AFTER the stage
            // confirms it has work to do.
            if BINARY_DEPENDENT_STAGES.contains(&name) && !has_binaries {
                pending_skips.push((name, true));
                continue;
            }

            flush_skipped(log, &mut pending_skips);

            // Write metadata.json + artifacts.json before the release stage
            // so that include_meta can attach them to the GitHub release.
            // run_post_pipeline overwrites these with the final version later.
            if name == "release"
                && let Err(e) = write_pre_release_metadata(ctx)
            {
                log.warn(&format!("failed to write pre-release metadata: {}", e));
            }

            // One collapsible section per stage: `::group::<name>` under
            // GitHub Actions, a Cargo-style verb header locally. The guard
            // closes the section (`::endgroup::` / de-indent) when it drops
            // at the end of this loop iteration — on the normal path, on the
            // early `?` return below, and on any panic unwind — so the
            // section is always balanced without an explicit drop in either
            // arm.
            let _section = log.group(name);
            match stage.run(ctx) {
                Ok(()) => {
                    // After the build stage, record whether binaries were produced.
                    if name == "build" {
                        has_binaries = ctx.artifacts.all().iter().any(|a| {
                            matches!(
                                a.kind,
                                anodizer_core::artifact::ArtifactKind::Binary
                                    | anodizer_core::artifact::ArtifactKind::UploadableBinary
                                    | anodizer_core::artifact::ArtifactKind::UniversalBinary
                            )
                        });
                        // Build-producing checkpoint: the per-crate binary
                        // artifact set is final once the build stage finishes.
                        // Fail loud here so a crate that configures a
                        // binary-requiring surface but produced no binary
                        // aborts the release at build time rather than 20
                        // minutes later inside publish/docker.
                        if self.expects_binaries {
                            // Pass the set of crates the build stage actually
                            // built so a crate with no in-scope target in this
                            // shard is skipped, while a built-but-binary-less
                            // crate still fails.
                            anodizer_core::binary_artifact_guard::check(
                                &ctx.config,
                                &ctx.artifacts,
                                &ctx.options.selected_crates,
                                ctx.built_crate_names(),
                            )?;
                        }
                    }
                    // After the changelog stage completes, populate the ReleaseNotes
                    // template variable so subsequent stages can reference it.
                    if name == "changelog" {
                        ctx.populate_release_notes_var();
                    }
                }
                Err(e) => {
                    // The message names the failing stage; the section header
                    // already scopes it inside `::group::<name>`.
                    log.error(&format!("{name} failed: {e}"));
                    return Err(e);
                }
            }
        }

        flush_skipped(log, &mut pending_skips);

        // End-of-pipeline skip summary. Stages (sign, docker-sign, publisher)
        // record intentional per-sub-config skips via
        // `ctx.remember_skip(...)`; before this hook the skips were emitted
        // at verbose level and lost in the final "✓ done" output.
        let skips = ctx.skip_memento.drain();
        if !skips.is_empty() {
            let noun = if skips.len() == 1 {
                "intentional skip"
            } else {
                "intentional skips"
            };
            log.status(&format!("{} {}:", skips.len(), noun.yellow()));
            for ev in &skips {
                log.status(&format!(
                    "  {} [{}] {}{}",
                    "\u{21b3}".yellow(),
                    ev.stage.bold(),
                    ev.label,
                    ev.reason
                ));
            }
        }
        Ok(())
    }
}

/// Flush the buffered consecutive stage skips as consolidated
/// `skipped  a, b, c` kv rows — one row for operator skips (`--skip=`),
/// a separate `... (no binaries)` row for library-only skips, so the two
/// causes stay distinguishable. No-op when nothing is buffered.
///
/// Each `(name, no_binaries)` pair records one skipped stage in pipeline
/// order; ordering within each row follows the pipeline.
fn flush_skipped(log: &StageLogger, pending: &mut Vec<(&str, bool)>) {
    if pending.is_empty() {
        return;
    }
    let join = |no_binaries: bool| -> Option<String> {
        let names: Vec<&str> = pending
            .iter()
            .filter(|(_, nb)| *nb == no_binaries)
            .map(|(n, _)| *n)
            .collect();
        (!names.is_empty()).then(|| names.join(", "))
    };
    // One level in so the row sits at the same column as the surrounding
    // sections' body bullets (no section is open between stages — the
    // previous stage's guard already dropped).
    let _indent = anodizer_core::log::indent_one_level();
    let key_width = "skipped".len();
    if let Some(names) = join(false) {
        log.kv("skipped", &names, key_width);
    }
    if let Some(names) = join(true) {
        log.kv("skipped", &format!("{names} (no binaries)"), key_width);
    }
    pending.clear();
}

/// Write preliminary metadata.json and artifacts.json before the release
/// stage so that `include_meta: true` can attach them to the GitHub release.
/// `run_post_pipeline` overwrites these with the final version afterward.
fn write_pre_release_metadata(ctx: &mut anodizer_core::context::Context) -> anyhow::Result<()> {
    let dist = &ctx.config.dist;
    std::fs::create_dir_all(dist)?;

    let tag = ctx.template_vars().get("Tag").cloned().unwrap_or_default();
    let version = ctx.version();
    let commit = ctx
        .template_vars()
        .get("FullCommit")
        .cloned()
        .unwrap_or_default();

    let metadata = serde_json::json!({
        "project_name": ctx.config.project_name,
        "tag": tag,
        "version": version,
        "commit": commit,
    });
    std::fs::write(
        dist.join("metadata.json"),
        serde_json::to_string_pretty(&metadata)?,
    )?;

    let artifacts_json = ctx.artifacts.to_artifacts_json()?;
    std::fs::write(
        dist.join("artifacts.json"),
        serde_json::to_string_pretty(&artifacts_json)?,
    )?;

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use anodizer_core::artifact::{Artifact, ArtifactKind};
    use anodizer_core::config::{Config, CrateConfig, DockerV2Config};
    use anodizer_core::context::{Context, ContextOptions};
    use std::collections::HashMap;
    use std::path::PathBuf;

    /// `Pipeline::run` ends with a default summary write to
    /// `<dist>/run-<id>/summary.json`; with the default relative
    /// `./dist` and the crate root as test cwd that would land in the
    /// working tree. Point `dist` at a tempdir; the returned guard
    /// keeps it alive across the run.
    fn isolate_dist(ctx: &mut Context) -> tempfile::TempDir {
        let tmp = tempfile::TempDir::new().expect("tempdir");
        ctx.config.dist = tmp.path().to_path_buf();
        tmp
    }

    /// No-op stage standing in for `BuildStage`: it shares the `"build"`
    /// name (so the pipeline's skip-set + guard plumbing treats it as the
    /// build stage) but produces no artifacts, mimicking a misconfigured
    /// build that compiles nothing.
    struct NoopBuildStage;
    impl Stage for NoopBuildStage {
        fn name(&self) -> &str {
            "build"
        }
        fn run(&self, _ctx: &mut Context) -> Result<()> {
            Ok(())
        }
    }

    fn binary_surface_config() -> Config {
        Config {
            crates: vec![CrateConfig {
                name: "svc".to_string(),
                dockers_v2: Some(vec![DockerV2Config::default()]),
                ..CrateConfig::default()
            }],
            ..Config::default()
        }
    }

    fn source_artifact() -> Artifact {
        Artifact {
            kind: ArtifactKind::SourceArchive,
            path: PathBuf::from("dist/svc.tar.gz"),
            name: "svc.tar.gz".to_string(),
            target: None,
            crate_name: "svc".to_string(),
            metadata: HashMap::new(),
            size: None,
        }
    }

    /// `--skip=build` must NOT disarm the binary-presence guard: the crate
    /// configures a binary-requiring surface (docker_v2) but only a source
    /// archive is present, so the up-front guard must fire rather than the
    /// pipeline silently proceeding with a source-only dist.
    #[test]
    fn skip_build_still_runs_binary_presence_guard() {
        let mut p = Pipeline::new();
        p.add(Box::new(NoopBuildStage));
        p.expect_binaries();

        let opts = ContextOptions {
            skip_stages: vec!["build".to_string()],
            ..Default::default()
        };
        let mut ctx = Context::new(binary_surface_config(), opts);
        ctx.artifacts.add(source_artifact());

        let _dist_guard = isolate_dist(&mut ctx);
        let log = ctx.logger("pipeline-test");
        let err = p
            .run(&mut ctx, &log)
            .expect_err("guard must fire with --skip=build and no binary");
        let msg = err.to_string();
        assert!(msg.contains("crate 'svc'"), "{msg}");
        assert!(msg.contains("no binary artifacts"), "{msg}");
    }

    /// Control: with a real prebuilt binary present, `--skip=build` passes
    /// the guard cleanly — the fix validates binaries, it does not blanket-
    /// fail every skip-build run.
    #[test]
    fn skip_build_passes_guard_when_prebuilt_binary_present() {
        let mut p = Pipeline::new();
        p.add(Box::new(NoopBuildStage));
        p.expect_binaries();

        let opts = ContextOptions {
            skip_stages: vec!["build".to_string()],
            ..Default::default()
        };
        let mut ctx = Context::new(binary_surface_config(), opts);
        ctx.artifacts.add(Artifact {
            kind: ArtifactKind::Binary,
            path: PathBuf::from("dist/svc"),
            name: "svc".to_string(),
            target: Some("x86_64-unknown-linux-gnu".to_string()),
            crate_name: "svc".to_string(),
            metadata: HashMap::new(),
            size: None,
        });

        let _dist_guard = isolate_dist(&mut ctx);
        let log = ctx.logger("pipeline-test");
        p.run(&mut ctx, &log)
            .expect("prebuilt binary satisfies the guard under --skip=build");
    }

    /// A stage that always fails, standing in for `PrePublishGuardStage`
    /// catching a broken template.
    struct FailingGuardStage;
    impl Stage for FailingGuardStage {
        fn name(&self) -> &str {
            "prepublish-guard"
        }
        fn run(&self, _ctx: &mut Context) -> Result<()> {
            anyhow::bail!("prepublish-guard: broken template")
        }
    }

    /// A spy standing in for `PublishStage`: flips a shared flag if its body
    /// ever runs, so a test can assert the one-way-door publisher was never
    /// reached.
    struct SpyPublishStage(std::sync::Arc<std::sync::atomic::AtomicBool>);
    impl Stage for SpyPublishStage {
        fn name(&self) -> &str {
            "publish"
        }
        fn run(&self, _ctx: &mut Context) -> Result<()> {
            self.0.store(true, std::sync::atomic::Ordering::SeqCst);
            Ok(())
        }
    }

    /// A failing `prepublish-guard` aborts the pipeline BEFORE `PublishStage`
    /// (an irreversible one-way-door publisher) is ever invoked. This is the
    /// load-bearing guarantee: a broken template must abort with no publisher
    /// having fired.
    #[test]
    fn failing_prepublish_guard_aborts_before_publish_runs() {
        let published = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
        let mut p = Pipeline::new();
        p.add(Box::new(FailingGuardStage));
        p.add(Box::new(SpyPublishStage(published.clone())));

        let mut ctx = Context::new(Config::default(), ContextOptions::default());
        let _dist_guard = isolate_dist(&mut ctx);
        let log = ctx.logger("pipeline-test");
        let err = p
            .run(&mut ctx, &log)
            .expect_err("a failing prepublish-guard must abort the pipeline");
        assert!(
            err.to_string().contains("broken template"),
            "abort surfaces the guard's error: {err}"
        );
        assert!(
            !published.load(std::sync::atomic::Ordering::SeqCst),
            "PublishStage must NOT run after the guard fails"
        );
    }

    /// No-op stage with a caller-chosen name, so a test can place an
    /// operator-skipped stage at an exact pipeline position.
    struct NoopNamedStage(&'static str);
    impl Stage for NoopNamedStage {
        fn name(&self) -> &str {
            self.0
        }
        fn run(&self, _ctx: &mut Context) -> Result<()> {
            Ok(())
        }
    }

    /// A stage whose body logs one status line through the context's
    /// logger, so capture order can pin where the consolidated skip row
    /// lands relative to a running stage's output.
    struct ChattyStage;
    impl Stage for ChattyStage {
        fn name(&self) -> &str {
            "beta"
        }
        fn run(&self, ctx: &mut Context) -> Result<()> {
            ctx.logger("beta").status("beta body line");
            Ok(())
        }
    }

    /// Ordering contract for the consolidated skip row: skips buffered
    /// while consecutive stages are skipped flush BEFORE the next running
    /// stage's body output and are not lost when that stage fails — the
    /// row precedes the failure line on the early error return. (Capture
    /// records messages, not section headers, so the pinned bound is the
    /// stage's first body line rather than its section opening.)
    #[test]
    fn buffered_skips_flush_before_next_stage_and_survive_its_failure() {
        use anodizer_core::log::LogCapture;

        // Skip → running stage: the row precedes the stage's body line.
        let mut p = Pipeline::new();
        p.add(Box::new(NoopNamedStage("alpha")));
        p.add(Box::new(ChattyStage));
        let opts = ContextOptions {
            skip_stages: vec!["alpha".to_string()],
            ..Default::default()
        };
        let mut ctx = Context::new(Config::default(), opts);
        let capture = LogCapture::new();
        ctx.with_log_capture(capture.clone());
        let _dist_guard = isolate_dist(&mut ctx);
        let log = ctx.logger("pipeline-test");
        p.run(&mut ctx, &log).expect("chatty stage succeeds");
        let msgs: Vec<String> = capture.all_messages().into_iter().map(|(_, m)| m).collect();
        let skip_idx = msgs
            .iter()
            .position(|m| m == "skipped = alpha")
            .unwrap_or_else(|| panic!("consolidated skip row missing: {msgs:?}"));
        let body_idx = msgs
            .iter()
            .position(|m| m == "beta body line")
            .unwrap_or_else(|| panic!("running stage body line missing: {msgs:?}"));
        assert!(
            skip_idx < body_idx,
            "skip row must flush before the next stage's output: {msgs:?}"
        );

        // Skip → failing stage: the row still flushes before the failure
        // line and is not swallowed by the early `?` return.
        let mut p = Pipeline::new();
        p.add(Box::new(NoopNamedStage("alpha")));
        p.add(Box::new(FailingGuardStage));
        let opts = ContextOptions {
            skip_stages: vec!["alpha".to_string()],
            ..Default::default()
        };
        let mut ctx = Context::new(Config::default(), opts);
        let capture = LogCapture::new();
        ctx.with_log_capture(capture.clone());
        let _dist_guard = isolate_dist(&mut ctx);
        let log = ctx.logger("pipeline-test");
        p.run(&mut ctx, &log)
            .expect_err("failing stage must abort the pipeline");
        let msgs: Vec<String> = capture.all_messages().into_iter().map(|(_, m)| m).collect();
        let skip_idx = msgs
            .iter()
            .position(|m| m == "skipped = alpha")
            .unwrap_or_else(|| panic!("consolidated skip row missing: {msgs:?}"));
        let fail_idx = msgs
            .iter()
            .position(|m| m.starts_with("prepublish-guard failed:"))
            .unwrap_or_else(|| panic!("failure line missing: {msgs:?}"));
        assert!(
            skip_idx < fail_idx,
            "skip row must flush before the failing stage's error line: {msgs:?}"
        );
    }
}