shipper-cli 0.4.0

CLI adapter for Shipper; install the user-facing `shipper` package unless embedding the adapter.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
//! End-to-end #90 Recover rehearsal — synthetic side.
//!
//! The existing `bdd_resume` tests cover resume behavior against
//! **hand-constructed** state files: "given state X, run resume, assert Y."
//! That covers the read-side contract but doesn't prove the write-side under
//! an actual run: does `.shipper/state.json` and `.shipper/events.jsonl` stay
//! coherent when `shipper publish` is interrupted mid-workspace?
//!
//! This test closes that gap:
//!
//! 1. Build a 3-crate workspace (a, b, c with c→b→a deps) so we exercise
//!    a real dependency-ordered plan, not a single-crate trivial case.
//! 2. Spawn a "smart" mock registry that returns 404 on the first lookup per
//!    crate path (preflight) and 200 afterward (post-publish visibility).
//! 3. First run: fake cargo succeeds for a/b and fails for c. That reaches
//!    a realistic interrupted-mid-run state where two crates are published
//!    and one is left Failed.
//! 4. Inspect the persisted evidence and assert events-as-truth invariants:
//!    - `state.json` parses and reflects a/b published, c not published.
//!    - `events.jsonl` is valid NDJSON — every line parses, no half-written
//!      line from a partial write.
//!    - PackagePublished count equals the number of actually-published
//!      crates (no spurious duplicates).
//! 5. Second run: `shipper resume` with fake cargo now succeeding for c.
//! 6. Assert the resume respected the persisted state:
//!    - exit 0
//!    - a/b NOT re-published (idempotency)
//!    - c reaches Published
//!    - PackagePublished event count is exactly N_crates (one per crate,
//!      across both runs combined)
//!    - PackageSkipped events emitted for a/b during resume
//!
//! This is the regression guard that pairs with the real-workflow rehearsal
//! documented in `docs/how-to/run-recover-rehearsal.md`. The real rehearsal
//! exercises the same invariants against crates.io proper.

use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use assert_cmd::Command;
use serial_test::serial;
use tempfile::tempdir;
use tiny_http::{Header, Response, Server, StatusCode};

// ---------------------------------------------------------------------------
// Fixture: 3-crate workspace (a, b, c with c→b→a)
// ---------------------------------------------------------------------------

fn write_file(path: &Path, content: &str) {
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent).expect("mkdir");
    }
    fs::write(path, content).expect("write");
}

fn create_three_crate_workspace(root: &Path) {
    write_file(
        &root.join("Cargo.toml"),
        r#"
[workspace]
members = ["crate-a", "crate-b", "crate-c"]
resolver = "2"
"#,
    );

    for (name, deps) in [
        ("crate-a", ""),
        ("crate-b", "crate-a = { path = \"../crate-a\" }"),
        (
            "crate-c",
            "crate-a = { path = \"../crate-a\" }\ncrate-b = { path = \"../crate-b\" }",
        ),
    ] {
        write_file(
            &root.join(format!("{name}/Cargo.toml")),
            &format!(
                r#"[package]
name = "{name}"
version = "0.1.0"
edition = "2021"

[dependencies]
{deps}
"#
            ),
        );
        write_file(&root.join(format!("{name}/src/lib.rs")), "pub fn hi() {}\n");
    }
}

// ---------------------------------------------------------------------------
// Fake cargo: exit code selected by env var SHIPPER_FAKE_EXIT_FOR_<crate>.
// Falls back to `0` (success) if the matching env var is absent.
// ---------------------------------------------------------------------------

fn write_fake_cargo(bin_dir: &Path) -> PathBuf {
    #[cfg(windows)]
    {
        let path = bin_dir.join("cargo.cmd");
        // For each known crate, check if an env var is set picking the exit
        // code. Cargo always invokes with `-p <crate>`, so the crate name
        // appears verbatim in the arg list.
        //
        // The nested `if defined / else` pattern combined with `findstr &&`
        // is fragile in cmd. Flatten it: use separate `if defined` checks
        // *after* the findstr sets the `_MATCH` flag, rather than inside
        // the short-circuit. Avoids delayed-expansion quoting footguns.
        let script = "\
@echo off\r\n\
setlocal EnableDelayedExpansion\r\n\
set ARGS=%*\r\n\
if defined SHIPPER_FAKE_CARGO_LOG echo !ARGS!>>\"%SHIPPER_FAKE_CARGO_LOG%\"\r\n\
set MATCH=\r\n\
echo !ARGS! | findstr /C:\"crate-c\" >nul && set MATCH=C\r\n\
echo !ARGS! | findstr /C:\"crate-b\" >nul && if \"!MATCH!\"==\"\" set MATCH=B\r\n\
echo !ARGS! | findstr /C:\"crate-a\" >nul && if \"!MATCH!\"==\"\" set MATCH=A\r\n\
if \"!MATCH!\"==\"C\" if defined SHIPPER_FAKE_EXIT_FOR_C exit /b !SHIPPER_FAKE_EXIT_FOR_C!\r\n\
if \"!MATCH!\"==\"B\" if defined SHIPPER_FAKE_EXIT_FOR_B exit /b !SHIPPER_FAKE_EXIT_FOR_B!\r\n\
if \"!MATCH!\"==\"A\" if defined SHIPPER_FAKE_EXIT_FOR_A exit /b !SHIPPER_FAKE_EXIT_FOR_A!\r\n\
exit /b 0\r\n";
        fs::write(&path, script).expect("write fake cargo");
        path
    }

    #[cfg(not(windows))]
    {
        use std::os::unix::fs::PermissionsExt;
        let path = bin_dir.join("cargo");
        let script = "#!/usr/bin/env sh\n\
if [ -n \"${SHIPPER_FAKE_CARGO_LOG:-}\" ]; then\n\
  printf '%s\\n' \"$*\" >> \"$SHIPPER_FAKE_CARGO_LOG\"\n\
fi\n\
case \"$*\" in\n\
  *crate-c*) exit \"${SHIPPER_FAKE_EXIT_FOR_C:-0}\" ;;\n\
  *crate-b*) exit \"${SHIPPER_FAKE_EXIT_FOR_B:-0}\" ;;\n\
  *crate-a*) exit \"${SHIPPER_FAKE_EXIT_FOR_A:-0}\" ;;\n\
esac\n\
exit 0\n";
        fs::write(&path, script).expect("write fake cargo");
        let mut perms = fs::metadata(&path).expect("meta").permissions();
        perms.set_mode(0o755);
        fs::set_permissions(&path, perms).expect("chmod");
        path
    }
}

// ---------------------------------------------------------------------------
// Mock registry.
//
// Per-path semantics:
//   * If the path contains any `never_flip` substring → always 404.
//     Used for a crate whose cargo publish we know will fail this run: we
//     want shipper to classify the failure as Failed, not as ambiguous-but-
//     actually-published (which the reconcile logic would do if 200 leaked).
//   * Otherwise the first hit returns 404 (preflight "new crate"), and every
//     subsequent hit returns 200 with a minimal versions body — mirroring
//     cargo actually publishing and the registry becoming visible.
// ---------------------------------------------------------------------------

struct RegistryHandles {
    never_flip: Arc<Mutex<Vec<&'static str>>>,
}

impl RegistryHandles {
    fn pin_404(&self, substr: &'static str) {
        self.never_flip.lock().expect("lock").push(substr);
    }
    fn clear_pins(&self) {
        self.never_flip.lock().expect("lock").clear();
    }
}

fn spawn_registry() -> (String, std::sync::mpsc::Sender<()>, RegistryHandles) {
    spawn_registry_at("127.0.0.1:0")
}

fn spawn_registry_at(addr: &str) -> (String, std::sync::mpsc::Sender<()>, RegistryHandles) {
    let server = Server::http(addr).expect("server");
    let base_url = format!("http://{}", server.server_addr());
    let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();

    let per_path_hits: Arc<Mutex<HashMap<String, usize>>> = Arc::new(Mutex::new(HashMap::new()));
    let never_flip: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
    let never_flip_for_thread = Arc::clone(&never_flip);
    let hits_for_thread = Arc::clone(&per_path_hits);

    thread::spawn(move || {
        loop {
            if stop_rx.try_recv().is_ok() {
                break;
            }
            match server.recv_timeout(Duration::from_millis(200)) {
                Ok(Some(req)) => {
                    let path = req.url().split('?').next().unwrap_or("").to_owned();

                    let pinned_404 = {
                        let list = never_flip_for_thread.lock().expect("lock");
                        list.iter().any(|needle| path.contains(needle))
                    };

                    let hits = {
                        let mut map = hits_for_thread.lock().expect("lock");
                        let counter = map.entry(path.clone()).or_insert(0);
                        *counter += 1;
                        *counter
                    };

                    let (status, body) = if pinned_404 || hits <= 1 {
                        (404u16, String::from("{}"))
                    } else {
                        (
                            200u16,
                            r#"{"crate":{"name":"x"},"versions":[{"num":"0.1.0","yanked":false}]}"#
                                .to_string(),
                        )
                    };

                    let resp = Response::from_string(body)
                        .with_status_code(StatusCode(status))
                        .with_header(
                            Header::from_bytes("Content-Type", "application/json").expect("header"),
                        );
                    let _ = req.respond(resp);
                }
                _ => continue,
            }
        }
    });

    (base_url, stop_tx, RegistryHandles { never_flip })
}

// ---------------------------------------------------------------------------
// Event / state parsing helpers.
// ---------------------------------------------------------------------------

fn package_state(state: &serde_json::Value, name_at_ver: &str) -> Option<String> {
    state
        .get("packages")?
        .get(name_at_ver)?
        .get("state")?
        .get("state")?
        .as_str()
        .map(str::to_owned)
}

fn read_events(events_path: &Path) -> Vec<serde_json::Value> {
    let raw = fs::read_to_string(events_path).unwrap_or_default();
    raw.lines()
        .filter(|l| !l.trim().is_empty())
        .map(|l| serde_json::from_str(l).expect("events.jsonl must be valid NDJSON"))
        .collect()
}

fn count_events_matching<F>(events: &[serde_json::Value], pred: F) -> usize
where
    F: Fn(&serde_json::Value) -> bool,
{
    events.iter().filter(|e| pred(e)).count()
}

fn event_type_matches(event: &serde_json::Value, expected_kind: &str) -> bool {
    // EventType is `#[serde(tag = "type", rename_all = "snake_case")]` so it
    // serializes internally-tagged with a `type` discriminator, e.g.
    // `{"type":"package_published","duration_ms":4500}`. Callers pass the
    // PascalCase variant name; we convert to snake_case before comparing.
    event
        .get("event_type")
        .and_then(|et| et.get("type"))
        .and_then(|t| t.as_str())
        .map(|s| s == pascal_to_snake(expected_kind))
        .unwrap_or(false)
}

fn pascal_to_snake(name: &str) -> String {
    let mut out = String::with_capacity(name.len() + 4);
    for (i, ch) in name.chars().enumerate() {
        if ch.is_ascii_uppercase() {
            if i != 0 {
                out.push('_');
            }
            out.push(ch.to_ascii_lowercase());
        } else {
            out.push(ch);
        }
    }
    out
}

fn shipper_cmd() -> Command {
    Command::new(assert_cmd::cargo::cargo_bin!("shipper-cli"))
}

fn common_args(
    cmd: &mut Command,
    manifest: &Path,
    api_base: &str,
    state_dir: &Path,
    fake_cargo: &Path,
) {
    common_args_with_max_attempts(cmd, manifest, api_base, state_dir, fake_cargo, "1");
}

fn common_args_with_max_attempts(
    cmd: &mut Command,
    manifest: &Path,
    api_base: &str,
    state_dir: &Path,
    fake_cargo: &Path,
    max_attempts: &str,
) {
    cmd.arg("--manifest-path")
        .arg(manifest)
        .arg("--api-base")
        .arg(api_base)
        .arg("--allow-dirty")
        .arg("--no-readiness")
        .arg("--verify-timeout")
        .arg("0ms")
        .arg("--verify-poll")
        .arg("0ms")
        .arg("--verify-mode")
        .arg("none")
        .arg("--max-attempts")
        .arg(max_attempts)
        .arg("--base-delay")
        .arg("0ms")
        .arg("--state-dir")
        .arg(state_dir)
        .env("SHIPPER_CARGO_BIN", fake_cargo);
}

fn live_rehearsal_root() -> PathBuf {
    PathBuf::from(
        env::var("SHIPPER_LIVE_REHEARSAL_ROOT")
            .expect("SHIPPER_LIVE_REHEARSAL_ROOT must point at the runner fixture root"),
    )
}

fn live_registry_addr() -> String {
    env::var("SHIPPER_LIVE_REHEARSAL_REGISTRY_ADDR")
        .unwrap_or_else(|_| "127.0.0.1:39197".to_string())
}

fn fake_cargo_log(state_dir: &Path) -> PathBuf {
    state_dir.join("fake-cargo.log")
}

fn count_fake_cargo_publishes(log_path: &Path, crate_name: &str) -> usize {
    fs::read_to_string(log_path)
        .unwrap_or_default()
        .lines()
        .filter(|line| line.contains("publish") && line.contains(crate_name))
        .count()
}

fn assert_live_rehearsal_interrupted_state(state_dir: &Path) {
    let state_path = state_dir.join("state.json");
    let events_path = state_dir.join("events.jsonl");
    let state: serde_json::Value =
        serde_json::from_str(&fs::read_to_string(&state_path).expect("state.json exists"))
            .expect("state.json is valid JSON");

    assert_eq!(
        package_state(&state, "crate-a@0.1.0").as_deref(),
        Some("published"),
        "crate-a must be published before interruption"
    );
    assert_eq!(
        package_state(&state, "crate-b@0.1.0").as_deref(),
        Some("published"),
        "crate-b must be published before interruption"
    );
    assert_ne!(
        package_state(&state, "crate-c@0.1.0").as_deref(),
        Some("published"),
        "crate-c must remain unfinished before resume"
    );

    let events = read_events(&events_path);
    assert!(
        !events.is_empty(),
        "interrupted runner artifact must include events.jsonl"
    );
    let published = count_events_matching(&events, |event| {
        event_type_matches(event, "PackagePublished")
    });
    assert_eq!(
        published, 2,
        "interrupted artifact should have exactly two PackagePublished events"
    );
}

fn assert_live_rehearsal_resumed_state(state_dir: &Path) {
    let state_path = state_dir.join("state.json");
    let events_path = state_dir.join("events.jsonl");
    let receipt_path = state_dir.join("receipt.json");
    let state_after_raw = fs::read_to_string(&state_path).expect("read state");
    let state_after: serde_json::Value =
        serde_json::from_str(&state_after_raw).expect("parse state");

    for pkg in ["crate-a@0.1.0", "crate-b@0.1.0", "crate-c@0.1.0"] {
        assert_eq!(
            package_state(&state_after, pkg).as_deref(),
            Some("published"),
            "{} must be Published after live-runner resume. state:\n{}",
            pkg,
            state_after_raw
        );
    }
    assert!(
        receipt_path.exists(),
        "resume must write receipt.json as final release summary"
    );

    let events = read_events(&events_path);
    let published_total = count_events_matching(&events, |event| {
        event_type_matches(event, "PackagePublished")
    });
    assert_eq!(
        published_total, 3,
        "resume should produce exactly one PackagePublished event per crate"
    );

    let skipped_total =
        count_events_matching(&events, |event| event_type_matches(event, "PackageSkipped"));
    assert!(
        skipped_total >= 2,
        "resume should document already-published crates as skipped"
    );

    let drift_total = count_events_matching(&events, |event| {
        event_type_matches(event, "StateEventDriftDetected")
    });
    assert_eq!(
        drift_total, 0,
        "live-runner rehearsal should finish without state/event drift"
    );
}

// ---------------------------------------------------------------------------
// THE TEST
// ---------------------------------------------------------------------------

#[test]
#[serial]
fn rehearsal_interrupted_publish_then_resume_preserves_invariants() {
    let td = tempdir().expect("tempdir");
    let root = td.path();
    create_three_crate_workspace(root);

    let bin_dir = root.join("fake-bin");
    fs::create_dir_all(&bin_dir).expect("mkdir bin");
    let fake_cargo = write_fake_cargo(&bin_dir);

    // Single registry across both runs (same URL → same plan_id → resume
    // is allowed). Run 1 pins crate-c at 404 via `never_flip` so the
    // reconcile path sees it as truly absent; we unpin between runs.
    let (registry_url, registry_stop, registry) = spawn_registry();
    registry.pin_404("crate-c");

    let state_dir = root.join(".shipper");
    let state_path = state_dir.join("state.json");
    let events_path = state_dir.join("events.jsonl");

    // ── Run 1: publish with crate-c failing ──────────────────────────────
    // This is the "interrupted run" — a + b succeed, c fails. Shipper
    // persists state after each step, so state.json and events.jsonl
    // should reflect reality at the moment the loop gave up on c.
    let mut cmd = shipper_cmd();
    common_args(
        &mut cmd,
        &root.join("Cargo.toml"),
        &registry_url,
        &state_dir,
        &fake_cargo,
    );
    cmd.arg("publish")
        .env("SHIPPER_FAKE_EXIT_FOR_A", "0")
        .env("SHIPPER_FAKE_EXIT_FOR_B", "0")
        .env("SHIPPER_FAKE_EXIT_FOR_C", "1");
    cmd.assert().failure();

    // ── Invariant 1: state.json parses and reflects reality ──────────────
    let state: serde_json::Value =
        serde_json::from_str(&fs::read_to_string(&state_path).expect("state.json exists"))
            .expect("state.json is valid JSON");

    assert_eq!(
        package_state(&state, "crate-a@0.1.0").as_deref(),
        Some("published"),
        "a must be published after run 1"
    );
    assert_eq!(
        package_state(&state, "crate-b@0.1.0").as_deref(),
        Some("published"),
        "b must be published after run 1"
    );
    assert_ne!(
        package_state(&state, "crate-c@0.1.0").as_deref(),
        Some("published"),
        "c must NOT be published (fake cargo exited 1 for c)"
    );

    // ── Invariant 2: events.jsonl is valid NDJSON after an interrupted run
    // `read_events` panics if any line fails to parse — running it proves
    // there's no half-written or truncated event.
    let events_r1 = read_events(&events_path);
    assert!(
        !events_r1.is_empty(),
        "events.jsonl must have content after run 1"
    );

    // ── Invariant 3: PackagePublished events match actually-published count
    // — exactly one per success, no duplicates.
    let published_r1 =
        count_events_matching(&events_r1, |e| event_type_matches(e, "PackagePublished"));
    assert_eq!(
        published_r1, 2,
        "PackagePublished events after run 1 should equal succeeded crates (2 = a + b); got {published_r1}"
    );

    // ── Run 2: resume with crate-c succeeding ────────────────────────────
    // Unpin crate-c; keep per-path hit counters intact. Since crate-c's
    // preflight already fired in run 1 (counter > 1), run 2's post-publish
    // check gets 200 immediately, mirroring real crates.io where the
    // version is now visible after cargo's successful upload.
    // We keep the same registry URL so plan_id stays stable and resume
    // doesn't trip the stale-plan guard.
    registry.clear_pins();

    let mut resume = shipper_cmd();
    common_args_with_max_attempts(
        &mut resume,
        &root.join("Cargo.toml"),
        &registry_url,
        &state_dir,
        &fake_cargo,
        "2",
    );
    resume.arg("resume").env("SHIPPER_FAKE_EXIT_FOR_C", "0");
    resume.assert().success();

    let _ = registry_stop.send(());

    // ── Invariant 4: final state has a/b Published and c resolved ───────
    // A "resolved" c can be either Published (cargo was invoked and
    // succeeded) or Skipped (the pre-publish version_exists check saw c
    // already on the registry and short-circuited). Both are legitimate
    // end states that indicate "c is done, don't try again."
    let state_after_raw = fs::read_to_string(&state_path).expect("read state");
    let state_after: serde_json::Value =
        serde_json::from_str(&state_after_raw).expect("parse state");
    for pkg in ["crate-a@0.1.0", "crate-b@0.1.0"] {
        assert_eq!(
            package_state(&state_after, pkg).as_deref(),
            Some("published"),
            "{pkg} must be Published after resume. full state after resume:\n{}",
            state_after_raw
        );
    }
    let c_state = package_state(&state_after, "crate-c@0.1.0");
    assert!(
        matches!(c_state.as_deref(), Some("published") | Some("skipped")),
        "crate-c must be Published or Skipped after resume; got {c_state:?}. \
         full state:\n{state_after_raw}"
    );

    // ── Invariant 5: events-as-truth — idempotency. A successful publish
    // for any given crate+version must produce exactly one PackagePublished
    // event across all runs, never two. In this scenario:
    //   - run 1 emits PackagePublished for a and b (c fails → no event)
    //   - run 2's resume must NOT re-publish a or b (those events would be
    //     duplicates). c resolves either via a real publish (emitting a new
    //     PackagePublished) or via the "already published" short-circuit
    //     (emitting PackageSkipped with no PackagePublished). Either way,
    //     a + b account for 2 PackagePublished and c for 0 or 1.
    let events_all = read_events(&events_path);
    let published_total =
        count_events_matching(&events_all, |e| event_type_matches(e, "PackagePublished"));
    assert!(
        (2..=3).contains(&published_total),
        "PackagePublished events across both runs should be 2 (a, b) or 3 \
         (a, b, c if c was re-published during resume); got {published_total}. \
         4+ would mean resume duplicated a or b — a correctness violation."
    );

    // ── Invariant 6: every post-run-1 PackagePublished event that exists
    // has a partner ExecutionStarted event preceding it in the file.
    // (Sanity check for events.jsonl being actually append-only — if resume
    // somehow truncated and rewrote the file, the pre-resume events would
    // be gone and the ExecutionStarted count would drop.)
    let execution_started =
        count_events_matching(&events_all, |e| event_type_matches(e, "ExecutionStarted"));
    assert_eq!(
        execution_started, 2,
        "ExecutionStarted events should be exactly 2 (one per run); got {execution_started}. \
         < 2 means events.jsonl was truncated somewhere — append-only invariant broken."
    );
}

#[test]
#[ignore = "workflow-driven: creates the interrupted .shipper artifact for a later runner job"]
#[serial]
fn live_runner_interruption_seed_uploads_shipper_artifact() {
    let root = live_rehearsal_root();
    if root.exists() {
        fs::remove_dir_all(&root).expect("remove prior live rehearsal root");
    }
    fs::create_dir_all(&root).expect("mkdir live rehearsal root");
    create_three_crate_workspace(&root);

    let bin_dir = root.join("fake-bin");
    fs::create_dir_all(&bin_dir).expect("mkdir bin");
    let fake_cargo = write_fake_cargo(&bin_dir);

    let state_dir = root.join(".shipper");
    fs::create_dir_all(&state_dir).expect("mkdir state dir");
    let log_path = fake_cargo_log(&state_dir);

    let (registry_url, registry_stop, registry) = spawn_registry_at(&live_registry_addr());
    registry.pin_404("crate-c");

    let mut cmd = shipper_cmd();
    common_args(
        &mut cmd,
        &root.join("Cargo.toml"),
        &registry_url,
        &state_dir,
        &fake_cargo,
    );
    cmd.arg("publish")
        .env("SHIPPER_FAKE_CARGO_LOG", &log_path)
        .env("SHIPPER_FAKE_EXIT_FOR_A", "0")
        .env("SHIPPER_FAKE_EXIT_FOR_B", "0")
        .env("SHIPPER_FAKE_EXIT_FOR_C", "1");
    cmd.assert().failure();
    let _ = registry_stop.send(());

    assert_live_rehearsal_interrupted_state(&state_dir);
    assert_eq!(
        count_fake_cargo_publishes(&log_path, "crate-a"),
        1,
        "seed run must publish crate-a exactly once"
    );
    assert_eq!(
        count_fake_cargo_publishes(&log_path, "crate-b"),
        1,
        "seed run must publish crate-b exactly once"
    );
    assert_eq!(
        count_fake_cargo_publishes(&log_path, "crate-c"),
        1,
        "seed run should attempt crate-c once before interruption"
    );
}

#[test]
#[ignore = "workflow-driven: downloads interrupted .shipper artifact and resumes it"]
#[serial]
fn live_runner_interruption_resume_downloaded_artifact_preserves_invariants() {
    let root = live_rehearsal_root();
    fs::create_dir_all(&root).expect("mkdir live rehearsal root");
    create_three_crate_workspace(&root);

    let bin_dir = root.join("fake-bin");
    fs::create_dir_all(&bin_dir).expect("mkdir bin");
    let fake_cargo = write_fake_cargo(&bin_dir);

    let state_dir = root.join(".shipper");
    let log_path = fake_cargo_log(&state_dir);
    assert!(
        state_dir.join("state.json").exists(),
        "resume job must download interrupted .shipper/state.json first"
    );
    assert_live_rehearsal_interrupted_state(&state_dir);

    let (registry_url, registry_stop, _registry) = spawn_registry_at(&live_registry_addr());

    let mut resume = shipper_cmd();
    common_args_with_max_attempts(
        &mut resume,
        &root.join("Cargo.toml"),
        &registry_url,
        &state_dir,
        &fake_cargo,
        "2",
    );
    resume
        .arg("resume")
        .env("SHIPPER_FAKE_CARGO_LOG", &log_path)
        .env("SHIPPER_FAKE_EXIT_FOR_C", "0");
    resume.assert().success();
    let _ = registry_stop.send(());

    assert_live_rehearsal_resumed_state(&state_dir);
    assert_eq!(
        count_fake_cargo_publishes(&log_path, "crate-a"),
        1,
        "resume must not republish crate-a from downloaded state"
    );
    assert_eq!(
        count_fake_cargo_publishes(&log_path, "crate-b"),
        1,
        "resume must not republish crate-b from downloaded state"
    );
    assert_eq!(
        count_fake_cargo_publishes(&log_path, "crate-c"),
        2,
        "crate-c should be attempted once before interruption and once during resume"
    );
}