use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use assert_cmd::Command;
use serial_test::serial;
use tempfile::tempdir;
use tiny_http::{Header, Response, Server, StatusCode};
fn write_file(path: &Path, content: &str) {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).expect("mkdir");
}
fs::write(path, content).expect("write");
}
fn create_three_crate_workspace(root: &Path) {
write_file(
&root.join("Cargo.toml"),
r#"
[workspace]
members = ["crate-a", "crate-b", "crate-c"]
resolver = "2"
"#,
);
for (name, deps) in [
("crate-a", ""),
("crate-b", "crate-a = { path = \"../crate-a\" }"),
(
"crate-c",
"crate-a = { path = \"../crate-a\" }\ncrate-b = { path = \"../crate-b\" }",
),
] {
write_file(
&root.join(format!("{name}/Cargo.toml")),
&format!(
r#"[package]
name = "{name}"
version = "0.1.0"
edition = "2021"
[dependencies]
{deps}
"#
),
);
write_file(&root.join(format!("{name}/src/lib.rs")), "pub fn hi() {}\n");
}
}
fn write_fake_cargo(bin_dir: &Path) -> PathBuf {
#[cfg(windows)]
{
let path = bin_dir.join("cargo.cmd");
let script = "\
@echo off\r\n\
setlocal EnableDelayedExpansion\r\n\
set ARGS=%*\r\n\
set MATCH=\r\n\
echo !ARGS! | findstr /C:\"crate-c\" >nul && set MATCH=C\r\n\
echo !ARGS! | findstr /C:\"crate-b\" >nul && if \"!MATCH!\"==\"\" set MATCH=B\r\n\
echo !ARGS! | findstr /C:\"crate-a\" >nul && if \"!MATCH!\"==\"\" set MATCH=A\r\n\
if \"!MATCH!\"==\"C\" if defined SHIPPER_FAKE_EXIT_FOR_C exit /b !SHIPPER_FAKE_EXIT_FOR_C!\r\n\
if \"!MATCH!\"==\"B\" if defined SHIPPER_FAKE_EXIT_FOR_B exit /b !SHIPPER_FAKE_EXIT_FOR_B!\r\n\
if \"!MATCH!\"==\"A\" if defined SHIPPER_FAKE_EXIT_FOR_A exit /b !SHIPPER_FAKE_EXIT_FOR_A!\r\n\
exit /b 0\r\n";
fs::write(&path, script).expect("write fake cargo");
path
}
#[cfg(not(windows))]
{
use std::os::unix::fs::PermissionsExt;
let path = bin_dir.join("cargo");
let script = "#!/usr/bin/env sh\n\
case \"$*\" in\n\
*crate-c*) exit \"${SHIPPER_FAKE_EXIT_FOR_C:-0}\" ;;\n\
*crate-b*) exit \"${SHIPPER_FAKE_EXIT_FOR_B:-0}\" ;;\n\
*crate-a*) exit \"${SHIPPER_FAKE_EXIT_FOR_A:-0}\" ;;\n\
esac\n\
exit 0\n";
fs::write(&path, script).expect("write fake cargo");
let mut perms = fs::metadata(&path).expect("meta").permissions();
perms.set_mode(0o755);
fs::set_permissions(&path, perms).expect("chmod");
path
}
}
struct RegistryHandles {
never_flip: Arc<Mutex<Vec<&'static str>>>,
}
impl RegistryHandles {
fn pin_404(&self, substr: &'static str) {
self.never_flip.lock().expect("lock").push(substr);
}
fn clear_pins(&self) {
self.never_flip.lock().expect("lock").clear();
}
}
fn spawn_registry() -> (String, std::sync::mpsc::Sender<()>, RegistryHandles) {
let server = Server::http("127.0.0.1:0").expect("server");
let base_url = format!("http://{}", server.server_addr());
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let per_path_hits: Arc<Mutex<HashMap<String, usize>>> = Arc::new(Mutex::new(HashMap::new()));
let never_flip: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
let never_flip_for_thread = Arc::clone(&never_flip);
let hits_for_thread = Arc::clone(&per_path_hits);
thread::spawn(move || {
loop {
if stop_rx.try_recv().is_ok() {
break;
}
match server.recv_timeout(Duration::from_millis(200)) {
Ok(Some(req)) => {
let path = req.url().split('?').next().unwrap_or("").to_owned();
let pinned_404 = {
let list = never_flip_for_thread.lock().expect("lock");
list.iter().any(|needle| path.contains(needle))
};
let hits = {
let mut map = hits_for_thread.lock().expect("lock");
let counter = map.entry(path.clone()).or_insert(0);
*counter += 1;
*counter
};
let (status, body) = if pinned_404 || hits <= 1 {
(404u16, String::from("{}"))
} else {
(
200u16,
r#"{"crate":{"name":"x"},"versions":[{"num":"0.1.0","yanked":false}]}"#
.to_string(),
)
};
let resp = Response::from_string(body)
.with_status_code(StatusCode(status))
.with_header(
Header::from_bytes("Content-Type", "application/json").expect("header"),
);
let _ = req.respond(resp);
}
_ => continue,
}
}
});
(base_url, stop_tx, RegistryHandles { never_flip })
}
fn package_state(state: &serde_json::Value, name_at_ver: &str) -> Option<String> {
state
.get("packages")?
.get(name_at_ver)?
.get("state")?
.get("state")?
.as_str()
.map(str::to_owned)
}
fn read_events(events_path: &Path) -> Vec<serde_json::Value> {
let raw = fs::read_to_string(events_path).unwrap_or_default();
raw.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| serde_json::from_str(l).expect("events.jsonl must be valid NDJSON"))
.collect()
}
fn count_events_matching<F>(events: &[serde_json::Value], pred: F) -> usize
where
F: Fn(&serde_json::Value) -> bool,
{
events.iter().filter(|e| pred(e)).count()
}
fn event_type_matches(event: &serde_json::Value, expected_kind: &str) -> bool {
event
.get("event_type")
.and_then(|et| et.get("type"))
.and_then(|t| t.as_str())
.map(|s| s == pascal_to_snake(expected_kind))
.unwrap_or(false)
}
fn pascal_to_snake(name: &str) -> String {
let mut out = String::with_capacity(name.len() + 4);
for (i, ch) in name.chars().enumerate() {
if ch.is_ascii_uppercase() {
if i != 0 {
out.push('_');
}
out.push(ch.to_ascii_lowercase());
} else {
out.push(ch);
}
}
out
}
fn shipper_cmd() -> Command {
Command::new(assert_cmd::cargo::cargo_bin!("shipper-cli"))
}
fn common_args(
cmd: &mut Command,
manifest: &Path,
api_base: &str,
state_dir: &Path,
fake_cargo: &Path,
) {
cmd.arg("--manifest-path")
.arg(manifest)
.arg("--api-base")
.arg(api_base)
.arg("--allow-dirty")
.arg("--no-readiness")
.arg("--verify-timeout")
.arg("0ms")
.arg("--verify-poll")
.arg("0ms")
.arg("--verify-mode")
.arg("none")
.arg("--max-attempts")
.arg("1")
.arg("--base-delay")
.arg("0ms")
.arg("--state-dir")
.arg(state_dir)
.env("SHIPPER_CARGO_BIN", fake_cargo);
}
#[test]
#[serial]
fn rehearsal_interrupted_publish_then_resume_preserves_invariants() {
let td = tempdir().expect("tempdir");
let root = td.path();
create_three_crate_workspace(root);
let bin_dir = root.join("fake-bin");
fs::create_dir_all(&bin_dir).expect("mkdir bin");
let fake_cargo = write_fake_cargo(&bin_dir);
let (registry_url, registry_stop, registry) = spawn_registry();
registry.pin_404("crate-c");
let state_dir = root.join(".shipper");
let state_path = state_dir.join("state.json");
let events_path = state_dir.join("events.jsonl");
let mut cmd = shipper_cmd();
common_args(
&mut cmd,
&root.join("Cargo.toml"),
®istry_url,
&state_dir,
&fake_cargo,
);
cmd.arg("publish")
.env("SHIPPER_FAKE_EXIT_FOR_A", "0")
.env("SHIPPER_FAKE_EXIT_FOR_B", "0")
.env("SHIPPER_FAKE_EXIT_FOR_C", "1");
cmd.assert().failure();
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&state_path).expect("state.json exists"))
.expect("state.json is valid JSON");
assert_eq!(
package_state(&state, "crate-a@0.1.0").as_deref(),
Some("published"),
"a must be published after run 1"
);
assert_eq!(
package_state(&state, "crate-b@0.1.0").as_deref(),
Some("published"),
"b must be published after run 1"
);
assert_ne!(
package_state(&state, "crate-c@0.1.0").as_deref(),
Some("published"),
"c must NOT be published (fake cargo exited 1 for c)"
);
let events_r1 = read_events(&events_path);
assert!(
!events_r1.is_empty(),
"events.jsonl must have content after run 1"
);
let published_r1 =
count_events_matching(&events_r1, |e| event_type_matches(e, "PackagePublished"));
assert_eq!(
published_r1, 2,
"PackagePublished events after run 1 should equal succeeded crates (2 = a + b); got {published_r1}"
);
registry.clear_pins();
let mut resume = shipper_cmd();
common_args(
&mut resume,
&root.join("Cargo.toml"),
®istry_url,
&state_dir,
&fake_cargo,
);
resume.arg("resume").env("SHIPPER_FAKE_EXIT_FOR_C", "0");
resume.assert().success();
let _ = registry_stop.send(());
let state_after_raw = fs::read_to_string(&state_path).expect("read state");
let state_after: serde_json::Value =
serde_json::from_str(&state_after_raw).expect("parse state");
for pkg in ["crate-a@0.1.0", "crate-b@0.1.0"] {
assert_eq!(
package_state(&state_after, pkg).as_deref(),
Some("published"),
"{pkg} must be Published after resume. full state after resume:\n{}",
state_after_raw
);
}
let c_state = package_state(&state_after, "crate-c@0.1.0");
assert!(
matches!(c_state.as_deref(), Some("published") | Some("skipped")),
"crate-c must be Published or Skipped after resume; got {c_state:?}. \
full state:\n{state_after_raw}"
);
let events_all = read_events(&events_path);
let published_total =
count_events_matching(&events_all, |e| event_type_matches(e, "PackagePublished"));
assert!(
(2..=3).contains(&published_total),
"PackagePublished events across both runs should be 2 (a, b) or 3 \
(a, b, c if c was re-published during resume); got {published_total}. \
4+ would mean resume duplicated a or b — a correctness violation."
);
let execution_started =
count_events_matching(&events_all, |e| event_type_matches(e, "ExecutionStarted"));
assert_eq!(
execution_started, 2,
"ExecutionStarted events should be exactly 2 (one per run); got {execution_started}. \
< 2 means events.jsonl was truncated somewhere — append-only invariant broken."
);
}