use std::collections::{BTreeMap, BTreeSet};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result, bail};
use clap::{CommandFactory, Parser, Subcommand};
use clap_complete::Shell;
use serde::Serialize;
use shipper_core::config::{CliOverrides, ShipperConfig};
use shipper_core::engine::{self, Reporter};
use shipper_core::plan;
use shipper_core::runtime::execution::pkg_key;
use shipper_core::types::{
EventType, ExecutionState, Finishability, PackageState, PlannedPackage, PreflightPackage,
PreflightReport, PublishEvent, Registry, ReleasePlan, ReleaseSpec, RuntimeOptions,
};
mod doctor;
mod output;
use crate::output::progress::ProgressReporter;
const RICH_VERSION_DETAILS: &str = concat!(
"commit: ",
env!("SHIPPER_GIT_SHA"),
"\nbuild: ",
env!("SHIPPER_BUILD_PROFILE"),
"\nrustc: ",
env!("SHIPPER_RUSTC_VERSION"),
);
#[derive(Parser, Debug)]
#[command(name = "shipper", version, disable_version_flag = true)]
#[command(about = "Resumable, backoff-aware crates.io publishing for workspaces")]
#[command(override_usage = "shipper [OPTIONS] <COMMAND>")]
struct Cli {
#[arg(short = 'V', long = "version", global = true)]
version: bool,
#[arg(long, global = true)]
config: Option<PathBuf>,
#[arg(long, default_value = "Cargo.toml", global = true)]
manifest_path: PathBuf,
#[arg(long, global = true)]
registry: Option<String>,
#[arg(long, global = true)]
api_base: Option<String>,
#[arg(long = "package", global = true)]
packages: Vec<String>,
#[arg(long, global = true)]
state_dir: Option<PathBuf>,
#[arg(long, global = true)]
output_lines: Option<usize>,
#[arg(long, global = true)]
allow_dirty: bool,
#[arg(long, global = true)]
skip_ownership_check: bool,
#[arg(long, global = true)]
strict_ownership: bool,
#[arg(long, global = true)]
no_verify: bool,
#[arg(long, global = true)]
max_attempts: Option<u32>,
#[arg(long, global = true)]
base_delay: Option<String>,
#[arg(long, global = true)]
max_delay: Option<String>,
#[arg(long, global = true)]
retry_strategy: Option<String>,
#[arg(long, global = true)]
retry_jitter: Option<f64>,
#[arg(long, global = true)]
verify_timeout: Option<String>,
#[arg(long, global = true)]
verify_poll: Option<String>,
#[arg(long, global = true)]
readiness_method: Option<String>,
#[arg(long, global = true)]
readiness_timeout: Option<String>,
#[arg(long, global = true)]
readiness_poll: Option<String>,
#[arg(long, global = true)]
no_readiness: bool,
#[arg(long, global = true)]
force_resume: bool,
#[arg(long, global = true)]
force: bool,
#[arg(long, global = true)]
lock_timeout: Option<String>,
#[arg(long, global = true)]
policy: Option<String>,
#[arg(long, global = true)]
verify_mode: Option<String>,
#[arg(long, global = true)]
parallel: bool,
#[arg(long, global = true)]
max_concurrent: Option<usize>,
#[arg(long, global = true)]
per_package_timeout: Option<String>,
#[arg(long, global = true)]
webhook_url: Option<String>,
#[arg(long, global = true)]
webhook_secret: Option<String>,
#[arg(long, global = true)]
encrypt: bool,
#[arg(long, global = true)]
encrypt_passphrase: Option<String>,
#[arg(long, global = true)]
registries: Option<String>,
#[arg(long, global = true)]
all_registries: bool,
#[arg(long, global = true)]
resume_from: Option<String>,
#[arg(long, global = true)]
rehearsal_registry: Option<String>,
#[arg(long, global = true)]
skip_rehearsal: bool,
#[arg(long = "smoke-install", global = true, value_name = "CRATE")]
rehearsal_smoke_install: Option<String>,
#[arg(long, default_value = "text", value_parser = ["text", "json"], global = true)]
format: String,
#[arg(long, global = true)]
verbose: bool,
#[arg(short, long, global = true)]
quiet: bool,
#[command(subcommand)]
cmd: Option<Commands>,
}
#[derive(Subcommand, Debug)]
enum Commands {
#[command(long_about = "\
Print the deterministic publish plan (dependency-first ordering).
Reads the workspace via `cargo metadata`, filters publishable crates,
topologically sorts them, and prints the order in which they will be
published. The plan is deterministic — the same workspace produces the
same plan ID on any machine — which is the anchor that makes `resume`
safe.
EXAMPLES:
# Preview the publish order for every publishable workspace member:
shipper plan
# Plan with dependency-level breakdown (who can publish in parallel):
shipper plan --verbose
")]
Plan,
#[command(long_about = "\
Run preflight checks without publishing.
Validates everything that can fail a live publish — git cleanliness,
registry reachability, token availability, dry-run, ownership — and
prints a `Finishability` verdict (PROVEN / NOT PROVEN / FAILED). No
crate is uploaded. Run this before `publish` on any run you cannot
afford to restart from scratch.
EXAMPLES:
# Run preflight across the whole workspace:
shipper preflight
# Machine-readable output for CI gates:
shipper preflight --format json
")]
Preflight {
#[arg(long = "preflight-only")]
preflight_only: bool,
},
#[command(long_about = "\
Execute the publish plan end-to-end, persisting resumable state after
every step.
If `.shipper/state.json` already exists for this plan, `publish` picks
up where the previous run left off — already-published crates are
skipped, and the run continues from the first pending or failed
package. On interruption (Ctrl-C, network drop, ambiguous registry
response), rerun `shipper publish` or `shipper resume`.
EXAMPLES:
# Publish the whole workspace to crates.io:
shipper publish
# Publish a subset, allowing a dirty git tree (local rehearsal):
shipper publish --package shipper-core --allow-dirty
")]
Publish,
#[command(long_about = "\
Resume a previous publish run.
Loads `.shipper/state.json`, validates it against the current plan, skips
already-published packages, and continues from the first pending or failed
package. Use this after a killed runner, network interruption, or manual
stop.
EXAMPLES:
# Continue the current workspace release from persisted state:
shipper resume
# Resume from a specific crate after reviewing the saved state:
shipper resume --resume-from shipper-core
# Force resume when the computed plan differs from saved state:
shipper resume --force-resume
")]
Resume,
Rehearse,
#[command(long_about = "\
Compare local workspace versions to the registry.
Use status before publish or after an interruption to see which local crate
versions already exist on the target registry. This is a read-only registry
comparison and does not mutate `.shipper/` state.
EXAMPLES:
# Check every publishable workspace member:
shipper status
# Check one package against the configured registry:
shipper status --package shipper-core
# Watch persisted release progress while publish or resume is running:
shipper status --watch
")]
Status {
#[arg(long)]
watch: bool,
},
#[command(long_about = "\
Print environment and auth diagnostics.
Checks local tools, registry reachability, authentication signals, workspace
health, and state-directory basics. Run this first when preflight or publish
reports an environment blocker.
EXAMPLES:
# Inspect local release prerequisites:
shipper doctor
# Check a named Cargo registry:
shipper doctor --registry crates-io
")]
Doctor,
#[command(long_about = "\
View the authoritative event log.
Reads `<state-dir>/events.jsonl`, which is the truth source for publish and
resume state transitions. Use `--follow` while another terminal is running
publish or resume.
EXAMPLES:
# Print the current event log:
shipper inspect-events
# Follow appended events during a release:
shipper inspect-events --follow
")]
InspectEvents {
#[arg(long)]
follow: bool,
},
#[command(long_about = "\
View the end-of-run receipt with evidence.
Reads `<state-dir>/receipt.json` and prints the completed release summary,
package outcomes, git context, environment fingerprint, and captured evidence.
EXAMPLES:
# Print the human-readable release receipt:
shipper inspect-receipt
# Emit the receipt in JSON for CI or an internal developer portal:
shipper inspect-receipt --format json
")]
InspectReceipt,
#[command(subcommand)]
Ci(CiCommands),
Clean {
#[arg(long)]
keep_receipt: bool,
},
Yank {
#[arg(long = "crate", value_name = "NAME", conflicts_with = "plan")]
crate_name: Option<String>,
#[arg(long, value_name = "VERSION", conflicts_with = "plan")]
version: Option<String>,
#[arg(long, conflicts_with = "plan")]
reason: Option<String>,
#[arg(long)]
mark_compromised: bool,
#[arg(long, value_name = "PATH")]
plan: Option<PathBuf>,
},
PlanYank {
#[arg(long, value_name = "PATH")]
from_receipt: Option<PathBuf>,
#[arg(long, conflicts_with = "starting_crate")]
compromised_only: bool,
#[arg(long, value_name = "CRATE")]
starting_crate: Option<String>,
#[arg(long, value_name = "REASON")]
reason: Option<String>,
},
#[command(name = "fix-forward")]
FixForward {
#[arg(long, value_name = "PATH")]
from_receipt: Option<PathBuf>,
},
#[command(
name = "remediate",
long_about = "\
Generate or execute a receipt-driven remediation plan.
In `--dry-run` mode, reads a prior `receipt.json`, targets a specific bad
crate version, computes the affected reverse-topological yank order and
publish-directional fix-forward suggestions, then writes
`<state-dir>/remediation-plan.json` for operator review and agent consumption.
In `--execute-plan` mode, consumes that reviewed artifact and executes only the
recorded containment yanks. It does NOT edit manifests or publish fix-forward
successors.
EXAMPLES:
shipper remediate --dry-run --from-receipt .shipper/receipt.json --crate bad-crate --target-version 0.4.0 --reason \"CVE-2026-0001\"
shipper remediate --execute-plan .shipper/remediation-plan.json
"
)]
Remediate {
#[arg(long, value_name = "PATH", conflicts_with = "execute_plan")]
from_receipt: Option<PathBuf>,
#[arg(
long = "crate",
value_name = "NAME",
required_unless_present = "execute_plan",
conflicts_with = "execute_plan"
)]
crate_name: Option<String>,
#[arg(
long = "target-version",
value_name = "VERSION",
required_unless_present = "execute_plan",
conflicts_with = "execute_plan"
)]
target_version: Option<String>,
#[arg(
long,
value_name = "REASON",
required_unless_present = "execute_plan",
conflicts_with = "execute_plan"
)]
reason: Option<String>,
#[arg(long, conflicts_with = "execute_plan")]
dry_run: bool,
#[arg(long = "execute-plan", value_name = "PATH")]
execute_plan: Option<PathBuf>,
},
#[command(subcommand)]
Config(ConfigCommands),
Completion {
#[arg(value_enum)]
shell: Shell,
},
}
#[derive(Subcommand, Debug)]
enum CiCommands {
#[command(name = "github-actions")]
GitHubActions,
#[command(name = "gitlab")]
GitLab,
#[command(name = "circleci")]
CircleCI,
#[command(name = "azure-devops")]
AzureDevOps,
}
#[derive(Subcommand, Debug, Clone)]
enum ConfigCommands {
Init {
#[arg(short, long, default_value = ".shipper.toml")]
output: PathBuf,
},
Validate {
#[arg(short, long, default_value = ".shipper.toml")]
path: PathBuf,
},
}
struct CliReporter {
quiet: bool,
progress: Option<ProgressReporter>,
package_positions: BTreeMap<String, usize>,
}
impl CliReporter {
fn new(quiet: bool) -> Self {
Self {
quiet,
progress: None,
package_positions: BTreeMap::new(),
}
}
fn install_progress(
&mut self,
progress: ProgressReporter,
package_positions: BTreeMap<String, usize>,
) {
self.progress = Some(progress);
self.package_positions = package_positions;
}
fn take_progress(&mut self) -> Option<ProgressReporter> {
self.package_positions.clear();
self.progress.take()
}
}
impl Reporter for CliReporter {
fn info(&mut self, msg: &str) {
if !self.quiet {
eprintln!("[info] {msg}");
}
}
fn warn(&mut self, msg: &str) {
if !self.quiet {
eprintln!("[warn] {msg}");
}
}
fn error(&mut self, msg: &str) {
eprintln!("[error] {msg}");
}
#[allow(clippy::too_many_arguments)]
fn retry_wait(
&mut self,
pkg_name: &str,
pkg_version: &str,
attempt: u32,
max_attempts: u32,
delay: std::time::Duration,
reason: shipper_core::types::ErrorClass,
message: &str,
) {
if let Some(progress) = &mut self.progress {
if let Some(index) = self
.package_positions
.get(&format!("{pkg_name}@{pkg_version}"))
{
progress.set_package(*index, pkg_name, pkg_version);
}
progress.retry_countdown(
pkg_name,
pkg_version,
attempt,
max_attempts,
delay,
&format!("{reason:?}"),
message,
);
} else if !self.quiet {
eprintln!(
"[warn] {pkg_name}@{pkg_version}: {message} ({reason:?}); next attempt in {} (attempt {}/{})",
humantime::format_duration(delay),
attempt.saturating_add(1),
max_attempts,
);
std::thread::sleep(delay);
} else {
std::thread::sleep(delay);
}
}
}
pub fn run() -> Result<()> {
let cli = Cli::parse();
if cli.version {
print_version(cli.verbose);
return Ok(());
}
if let Some(Commands::Config(config_cmd)) = &cli.cmd {
return run_config(config_cmd.clone());
}
if let Some(Commands::Completion { shell }) = &cli.cmd {
return run_completion(shell);
}
if cli.cmd.is_none() {
Cli::command()
.error(
clap::error::ErrorKind::MissingSubcommand,
"'shipper' requires a subcommand but one was not provided",
)
.exit();
}
let api_base = cli
.api_base
.clone()
.unwrap_or_else(|| "https://crates.io".to_string());
let index_base = cli.api_base.as_ref().map(|_| api_base.clone());
let spec = ReleaseSpec {
manifest_path: cli.manifest_path.clone(),
registry: Registry {
name: cli
.registry
.clone()
.unwrap_or_else(|| "crates-io".to_string()),
api_base,
index_base,
},
selected_packages: if cli.packages.is_empty() {
None
} else {
Some(cli.packages.clone())
},
};
let command_name = cli
.cmd
.as_ref()
.map(command_name_for_hint)
.unwrap_or("command");
let mut planned = plan::build_plan(&spec)
.with_context(|| plan_failure_hint(&spec.manifest_path, &cli.packages, command_name))?;
let config =
if let Some(ref config_path) = cli.config {
Some(ShipperConfig::load_from_file(config_path).with_context(|| {
format!("Failed to load config from: {}", config_path.display())
})?)
} else {
ShipperConfig::load_from_workspace(&planned.workspace_root)
.with_context(|| "Failed to load config from workspace")?
};
if let Some(ref cfg) = config {
let config_path = cli
.config
.clone()
.unwrap_or_else(|| planned.workspace_root.join(".shipper.toml"));
cfg.validate().with_context(|| {
format!(
"Configuration validation failed for {}",
config_path.display()
)
})?;
}
if let Some(ref cfg) = config
&& let Some(ref reg_config) = cfg.registry
{
if cli.registry.is_none() {
planned.plan.registry.name = reg_config.name.clone();
}
if cli.api_base.is_none() {
planned.plan.registry.api_base = reg_config.api_base.clone();
planned.plan.registry.index_base = reg_config.index_base.clone();
}
}
let cli_overrides = CliOverrides {
policy: cli.policy.as_deref().map(parse_policy).transpose()?,
verify_mode: cli
.verify_mode
.as_deref()
.map(parse_verify_mode)
.transpose()?,
max_attempts: cli.max_attempts,
base_delay: cli.base_delay.as_deref().map(parse_duration).transpose()?,
max_delay: cli.max_delay.as_deref().map(parse_duration).transpose()?,
retry_strategy: cli
.retry_strategy
.as_deref()
.map(parse_retry_strategy)
.transpose()?,
retry_jitter: cli.retry_jitter,
verify_timeout: cli
.verify_timeout
.as_deref()
.map(parse_duration)
.transpose()?,
verify_poll_interval: cli.verify_poll.as_deref().map(parse_duration).transpose()?,
output_lines: cli.output_lines,
lock_timeout: cli
.lock_timeout
.as_deref()
.map(parse_duration)
.transpose()?,
state_dir: cli.state_dir.clone(),
readiness_method: cli
.readiness_method
.as_deref()
.map(parse_readiness_method)
.transpose()?,
readiness_timeout: cli
.readiness_timeout
.as_deref()
.map(parse_duration)
.transpose()?,
readiness_poll: cli
.readiness_poll
.as_deref()
.map(parse_duration)
.transpose()?,
allow_dirty: cli.allow_dirty,
skip_ownership_check: cli.skip_ownership_check,
strict_ownership: cli.strict_ownership,
no_verify: cli.no_verify,
no_readiness: cli.no_readiness,
force: cli.force,
force_resume: cli.force_resume,
parallel_enabled: cli.parallel || cli.max_concurrent.is_some(),
max_concurrent: cli.max_concurrent,
per_package_timeout: cli
.per_package_timeout
.as_deref()
.map(parse_duration)
.transpose()?,
webhook_url: cli.webhook_url.clone(),
webhook_secret: cli.webhook_secret.clone(),
encrypt: cli.encrypt,
encrypt_passphrase: cli.encrypt_passphrase.clone(),
registries: cli.registries.as_ref().map(|s| {
s.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
}),
all_registries: cli.all_registries,
resume_from: cli.resume_from.clone(),
rehearsal_registry: cli.rehearsal_registry.clone(),
skip_rehearsal: cli.skip_rehearsal,
rehearsal_smoke_install: cli.rehearsal_smoke_install.clone(),
};
let config_for_merge = config.clone().unwrap_or_default();
let opts: RuntimeOptions = config_for_merge.build_runtime_options(cli_overrides);
let mut reporter = CliReporter::new(cli.quiet);
match cli.cmd.expect("subcommand checked above") {
Commands::Plan => {
print_plan(&planned, cli.verbose, &cli.format);
}
Commands::Preflight { preflight_only } => {
let rep = engine::run_preflight_in_place_with_options(
&mut planned,
&opts,
&mut reporter,
engine::PreflightRunOptions {
fresh_audit: preflight_only,
},
)
.with_context(|| preflight_failure_hint(&opts.state_dir))?;
print_preflight(&rep, &cli.format);
}
Commands::Publish => {
let target_registries = if opts.registries.is_empty() {
vec![planned.plan.registry.clone()]
} else {
opts.registries.clone()
};
for reg in target_registries {
if opts.registries.len() > 1 {
if cli.format == "json" {
eprintln!();
eprintln!("Publishing to registry: {} ({})", reg.name, reg.api_base);
} else {
println!(
"\n🚀 Publishing to registry: {} ({})",
reg.name, reg.api_base
);
}
}
let mut current_planned = planned.clone();
current_planned.plan.registry = reg.clone();
let mut current_opts = opts.clone();
if opts.registries.len() > 1 {
current_opts.state_dir = opts.state_dir.join(®.name);
}
let total_packages = current_planned.plan.packages.len();
let mut progress = ProgressReporter::new(total_packages, cli.quiet);
let package_positions: BTreeMap<String, usize> = current_planned
.plan
.packages
.iter()
.enumerate()
.map(|(idx, pkg)| (format!("{}@{}", pkg.name, pkg.version), idx + 1))
.collect();
if total_packages > 0 {
let first_pkg = ¤t_planned.plan.packages[0];
progress.set_package(1, &first_pkg.name, &first_pkg.version);
}
reporter.install_progress(progress, package_positions);
let receipt = engine::run_publish(¤t_planned, ¤t_opts, &mut reporter)
.with_context(|| publish_failure_hint(¤t_opts.state_dir))?;
if let Some(progress) = reporter.take_progress() {
progress.finish();
}
print_publish_output(
&receipt,
¤t_planned.workspace_root,
¤t_opts.state_dir,
&cli.format,
)?;
}
}
Commands::Resume => {
let target_registries = if opts.registries.is_empty() {
vec![planned.plan.registry.clone()]
} else {
opts.registries.clone()
};
for reg in target_registries {
if opts.registries.len() > 1 {
if cli.format == "json" {
eprintln!();
eprintln!("Resuming for registry: {} ({})", reg.name, reg.api_base);
} else {
println!(
"\n🔄 Resuming for registry: {} ({})",
reg.name, reg.api_base
);
}
}
let mut current_planned = planned.clone();
current_planned.plan.registry = reg.clone();
let mut current_opts = opts.clone();
if opts.registries.len() > 1 {
current_opts.state_dir = opts.state_dir.join(®.name);
}
let total_packages = current_planned.plan.packages.len();
let mut progress = ProgressReporter::new(total_packages, cli.quiet);
let package_positions: BTreeMap<String, usize> = current_planned
.plan
.packages
.iter()
.enumerate()
.map(|(idx, pkg)| (format!("{}@{}", pkg.name, pkg.version), idx + 1))
.collect();
if total_packages > 0 {
let first_pkg = ¤t_planned.plan.packages[0];
progress.set_package(1, &first_pkg.name, &first_pkg.version);
}
reporter.install_progress(progress, package_positions);
let receipt = engine::run_resume(¤t_planned, ¤t_opts, &mut reporter)
.with_context(|| resume_failure_hint(¤t_opts.state_dir))?;
if let Some(progress) = reporter.take_progress() {
progress.finish();
}
print_resume_output(
&receipt,
¤t_planned.workspace_root,
¤t_opts.state_dir,
&cli.format,
)?;
}
}
Commands::Rehearse => {
let outcome = engine::run_rehearsal(&planned, &opts, &mut reporter)?;
if outcome.passed {
println!(
"rehearsal OK: {} packages against '{}'",
outcome.packages_published, outcome.registry_name
);
} else {
println!(
"rehearsal FAILED after {}/{} packages against '{}': {}",
outcome.packages_published,
outcome.packages_attempted,
outcome.registry_name,
outcome.summary
);
anyhow::bail!("rehearsal did not pass");
}
}
Commands::Status { watch } => {
let target_registries = if opts.registries.is_empty() {
vec![planned.plan.registry.clone()]
} else {
opts.registries.clone()
};
if watch {
if target_registries.len() > 1 {
bail!(
"status --watch supports one registry at a time; pass --registry once or inspect the registry-specific state directory directly"
);
}
run_status_watch(&planned, &opts, &cli.format)?;
return Ok(());
}
let mut registry_reports = Vec::new();
for reg in target_registries {
let mut current_planned = planned.clone();
current_planned.plan.registry = reg;
registry_reports.push(build_status_registry_report(
¤t_planned,
&mut reporter,
)?);
}
let report = StatusReport {
schema_version: "shipper.status.v1",
plan_id: planned.plan.plan_id.clone(),
workspace_root: planned.workspace_root.display().to_string(),
registries: registry_reports,
};
write_status_report(&report, &cli.format)?;
}
Commands::Doctor => {
let target_registries = if opts.registries.is_empty() {
vec![planned.plan.registry.clone()]
} else {
opts.registries.clone()
};
if cli.format == "json" {
let mut reports = Vec::new();
for reg in target_registries {
let mut current_planned = planned.clone();
current_planned.plan.registry = reg;
reports.push(doctor::collect_report(¤t_planned, &opts)?);
}
doctor::print_json(reports)?;
} else {
for reg in target_registries {
if opts.registries.len() > 1 {
println!(
"\n🩺 Diagnostics for registry: {} ({})",
reg.name,
doctor::redact_diagnostic_value(®.api_base)
);
}
let mut current_planned = planned.clone();
current_planned.plan.registry = reg;
doctor::run(¤t_planned, &opts, &mut reporter)?;
}
}
}
Commands::InspectEvents { follow } => {
run_inspect_events(&planned, &opts, &cli.format, follow)?;
}
Commands::InspectReceipt => {
run_inspect_receipt(&planned, &opts, &cli.format)?;
}
Commands::Ci(ci_cmd) => {
run_ci(ci_cmd, &opts.state_dir, &planned.workspace_root)?;
}
Commands::Yank {
crate_name,
version,
reason,
mark_compromised,
plan,
} => {
use shipper_core::cargo;
use shipper_core::engine::plan_yank;
use shipper_core::state::events::{EventLog, events_path};
use shipper_core::state::execution_state::{load_receipt, receipt_path, write_receipt};
use shipper_core::types::{EventType, PublishEvent};
if let Some(plan_path) = plan {
let yank_plan = plan_yank::load_plan_from_path(&plan_path)?;
reporter.info(&format!(
"executing yank plan: {} entries against '{}' (plan_id {})",
yank_plan.entries.len(),
yank_plan.registry,
yank_plan.plan_id
));
let workspace_root = std::env::current_dir()
.context("failed to resolve current dir for plan execution")?;
let registry_name = opts
.registries
.first()
.map(|r| r.name.clone())
.unwrap_or_else(|| yank_plan.registry.clone());
let mut log = EventLog::new();
let events_file = events_path(&opts.state_dir);
let mut succeeded = 0usize;
let mut failed: Option<(String, i32)> = None;
for (i, entry) in yank_plan.entries.iter().enumerate() {
let entry_reason = entry
.reason
.clone()
.unwrap_or_else(|| "plan execution".to_string());
reporter.warn(&format!(
"[{}/{}] yanking {}@{} — reason: {}",
i + 1,
yank_plan.entries.len(),
entry.name,
entry.version,
entry_reason
));
let out = cargo::cargo_yank(
&workspace_root,
entry.name.as_str(),
entry.version.as_str(),
registry_name.as_str(),
opts.output_lines,
None,
)?;
log.record(PublishEvent {
timestamp: chrono::Utc::now(),
event_type: EventType::PackageYanked {
crate_name: entry.name.clone(),
version: entry.version.clone(),
reason: entry_reason.clone(),
exit_code: out.exit_code,
},
package: format!("{}@{}", entry.name, entry.version),
});
if let Err(err) = log.write_to_file(&events_file) {
reporter.warn(&format!(
"failed to append PackageYanked event to {}: {err:#}",
events_file.display()
));
}
log.clear();
if out.exit_code == 0 {
succeeded += 1;
reporter.info(&format!(
"[{}/{}] yanked {}@{}",
i + 1,
yank_plan.entries.len(),
entry.name,
entry.version
));
} else {
reporter.error(&format!(
"[{}/{}] cargo yank exited {} for {}@{}. stderr tail:\n{}",
i + 1,
yank_plan.entries.len(),
out.exit_code,
entry.name,
entry.version,
out.stderr_tail
));
failed = Some((format!("{}@{}", entry.name, entry.version), out.exit_code));
break;
}
}
if let Some((pkg, code)) = failed {
reporter.error(&format!(
"yank plan halted: {succeeded}/{} succeeded; failed at {pkg} (cargo exit {code})",
yank_plan.entries.len()
));
anyhow::bail!(
"yank plan failed at {pkg}; {succeeded}/{} entries succeeded before halt",
yank_plan.entries.len()
);
} else {
reporter.info(&format!(
"yank plan complete: {succeeded}/{} entries yanked successfully",
yank_plan.entries.len()
));
return Ok(());
}
}
let crate_name = crate_name.ok_or_else(|| {
anyhow::anyhow!("--crate is required when --plan is not supplied")
})?;
let version = version.ok_or_else(|| {
anyhow::anyhow!("--version is required when --plan is not supplied")
})?;
let reason = reason.ok_or_else(|| {
anyhow::anyhow!("--reason is required when --plan is not supplied")
})?;
reporter.warn(&format!(
"yanking {crate_name}@{version} from registry \
(containment, not undo) — reason: {reason}"
));
let workspace_root =
std::env::current_dir().context("failed to resolve current dir for cargo yank")?;
let registry_name = opts
.registries
.first()
.map(|r| r.name.clone())
.unwrap_or_else(|| "crates-io".to_string());
let out = cargo::cargo_yank(
&workspace_root,
crate_name.as_str(),
version.as_str(),
registry_name.as_str(),
opts.output_lines,
None,
)?;
let mut log = EventLog::new();
log.record(PublishEvent {
timestamp: chrono::Utc::now(),
event_type: EventType::PackageYanked {
crate_name: crate_name.clone(),
version: version.clone(),
reason: reason.clone(),
exit_code: out.exit_code,
},
package: format!("{crate_name}@{version}"),
});
let events_file = events_path(&opts.state_dir);
if let Err(err) = log.write_to_file(&events_file) {
reporter.warn(&format!(
"failed to append PackageYanked event to {}: {err:#}",
events_file.display()
));
}
if out.exit_code == 0 {
if mark_compromised {
let rpath = receipt_path(&opts.state_dir);
match load_receipt(&opts.state_dir) {
Ok(Some(mut receipt)) => {
let matched = receipt
.packages
.iter_mut()
.find(|p| p.name == crate_name && p.version == version);
if let Some(pkg) = matched {
pkg.compromised_at = Some(chrono::Utc::now());
pkg.compromised_by = Some(reason.clone());
if let Err(err) = write_receipt(&opts.state_dir, &receipt) {
reporter.warn(&format!(
"yanked successfully but failed to mark receipt at \
{}: {err:#}",
rpath.display()
));
} else {
reporter.info(&format!(
"marked {crate_name}@{version} compromised in {}",
rpath.display()
));
}
} else {
reporter.warn(&format!(
"--mark-compromised: no matching package entry for \
{crate_name}@{version} in {}; yank succeeded but the \
receipt was not amended.",
rpath.display()
));
}
}
Ok(None) => {
reporter.warn(&format!(
"--mark-compromised: no receipt at {}; yank succeeded but \
nothing to amend. Future plan-yank / fix-forward runs won't \
see this version as compromised unless the receipt is \
reconstructed.",
rpath.display()
));
}
Err(err) => {
reporter.warn(&format!(
"--mark-compromised: failed to load receipt at {}: {err:#}. \
Yank succeeded; receipt not amended.",
rpath.display()
));
}
}
}
reporter.info(&format!(
"yanked {crate_name}@{version} successfully. \
existing lockfile pins are NOT invalidated; \
downstream consumers should `cargo update -p {crate_name}` \
to pick up the next available version."
));
} else {
reporter.error(&format!(
"cargo yank exited {} for {crate_name}@{version}. \
stderr tail:\n{}",
out.exit_code, out.stderr_tail
));
anyhow::bail!(
"yank failed for {crate_name}@{version} (cargo exit {})",
out.exit_code
);
}
}
Commands::PlanYank {
from_receipt,
compromised_only,
starting_crate,
reason,
} => {
use shipper_core::engine::plan_yank::{self, PlanYankFilter};
let receipt_path = from_receipt.unwrap_or_else(|| {
opts.state_dir
.join(shipper_core::state::execution_state::RECEIPT_FILE)
});
let receipt = plan_yank::load_receipt_from_path(&receipt_path).with_context(|| {
"plan-yank needs a readable receipt; default path is \
<state_dir>/receipt.json. Pass --from-receipt <path> to \
override."
.to_string()
})?;
let plan = if let Some(ref starting) = starting_crate {
plan_yank::build_plan_from_starting_crate(
&receipt,
&planned.plan.dependencies,
starting,
reason.clone(),
)?
} else {
let filter = if compromised_only {
PlanYankFilter::CompromisedOnly
} else {
PlanYankFilter::AllPublished
};
plan_yank::build_plan(&receipt, filter)
};
match cli.format.as_str() {
"json" => {
let report = PlanYankJsonReport {
schema_version: "shipper.plan_yank.v1",
command: "plan-yank",
plan: &plan,
};
let out = serde_json::to_string_pretty(&report)
.context("failed to serialize yank plan as JSON")?;
println!("{out}");
}
_ => {
println!("{}", plan_yank::render_text(&plan));
}
}
}
Commands::FixForward { from_receipt } => {
use shipper_core::engine::fix_forward::{self, SuccessorStrategy};
let receipt_path = from_receipt.unwrap_or_else(|| {
opts.state_dir
.join(shipper_core::state::execution_state::RECEIPT_FILE)
});
let plan =
fix_forward::plan_from_path(&receipt_path, SuccessorStrategy::PlaceholderNext)
.with_context(|| {
"fix-forward needs a readable receipt; default path is \
<state_dir>/receipt.json. Pass --from-receipt <path> to \
override."
.to_string()
})?;
match cli.format.as_str() {
"json" => {
let report = FixForwardJsonReport {
schema_version: "shipper.fix_forward.v1",
command: "fix-forward",
plan: &plan,
};
let out = serde_json::to_string_pretty(&report)
.context("failed to serialize fix-forward plan as JSON")?;
println!("{out}");
}
_ => {
println!("{}", fix_forward::render_text(&plan));
}
}
}
Commands::Remediate {
from_receipt,
crate_name,
target_version,
reason,
dry_run,
execute_plan,
} => {
use shipper_core::cargo;
use shipper_core::engine::{plan_yank, remediation};
use shipper_core::runtime::execution::resolve_state_dir;
use shipper_core::state::events::{EventLog, events_path};
if let Some(plan_path) = execute_plan {
let state_dir = resolve_state_dir(&planned.workspace_root, &opts.state_dir);
let expected_plan_path =
shipper_core::state::execution_state::remediation_plan_path(&state_dir);
let requested_plan_path = plan_path.canonicalize().with_context(|| {
format!(
"failed to resolve remediation plan path {}; execute reviewed plans from <state-dir>/remediation-plan.json",
plan_path.display()
)
})?;
let expected_plan_path = expected_plan_path.canonicalize().with_context(|| {
format!(
"failed to resolve expected remediation plan {}; run `shipper remediate --dry-run` first or pass --state-dir",
expected_plan_path.display()
)
})?;
if requested_plan_path != expected_plan_path {
anyhow::bail!(
"refusing to execute remediation plan outside the configured state dir; expected {}",
expected_plan_path.display()
);
}
let plan = remediation::load_plan_from_path(&expected_plan_path)?;
let events_file = events_path(&state_dir);
let registry_name = opts
.registries
.first()
.map(|r| r.name.clone())
.unwrap_or_else(|| "crates-io".to_string());
if plan.registry != registry_name {
anyhow::bail!(
"remediation plan registry '{}' does not match configured registry '{}'",
plan.registry,
registry_name
);
}
reporter.warn(&format!(
"executing reviewed remediation plan: {} containment yanks for {}@{} (plan_id {})",
plan.yank_order.len(),
plan.target.crate_name,
plan.target.version,
plan.plan_id
));
reporter.warn(
"remediate --execute-plan runs yanks only; fix-forward suggestions remain planning output",
);
let mut succeeded = 0usize;
for (idx, step) in plan.yank_order.iter().enumerate() {
let event_reason = remediation::REDACTED_OPERATOR_REASON.to_string();
reporter.warn(&format!(
"[{}/{}] yanking {}@{} from {}",
idx + 1,
plan.yank_order.len(),
step.name,
step.version,
registry_name
));
let out = cargo::cargo_yank(
&planned.workspace_root,
step.name.as_str(),
step.version.as_str(),
registry_name.as_str(),
opts.output_lines,
None,
)?;
let mut log = EventLog::new();
log.record(PublishEvent {
timestamp: chrono::Utc::now(),
event_type: EventType::PackageYanked {
crate_name: step.name.clone(),
version: step.version.clone(),
reason: event_reason,
exit_code: out.exit_code,
},
package: format!("{}@{}", step.name, step.version),
});
if let Err(err) = log.write_to_file(&events_file) {
reporter.warn(&format!(
"failed to append PackageYanked event to {}: {err:#}",
events_file.display()
));
}
if out.exit_code == 0 {
succeeded += 1;
reporter.info(&format!(
"[{}/{}] yanked {}@{}",
idx + 1,
plan.yank_order.len(),
step.name,
step.version
));
} else {
reporter.error(&format!(
"[{}/{}] cargo yank exited {} for {}@{}. stderr tail:\n{}",
idx + 1,
plan.yank_order.len(),
out.exit_code,
step.name,
step.version,
out.stderr_tail
));
anyhow::bail!(
"remediation plan failed at {}@{}; {succeeded}/{} containment yanks succeeded before halt",
step.name,
step.version,
plan.yank_order.len()
);
}
}
reporter.info(&format!(
"remediation containment complete: {succeeded}/{} yanks executed successfully",
plan.yank_order.len()
));
return Ok(());
}
if !dry_run {
bail!("remediate currently supports planning only; rerun with --dry-run");
}
let crate_name = crate_name.ok_or_else(|| {
anyhow::anyhow!("--crate is required when --execute-plan is not supplied")
})?;
let target_version = target_version.ok_or_else(|| {
anyhow::anyhow!("--target-version is required when --execute-plan is not supplied")
})?;
let reason = reason.ok_or_else(|| {
anyhow::anyhow!("--reason is required when --execute-plan is not supplied")
})?;
let state_dir = resolve_state_dir(&planned.workspace_root, &opts.state_dir);
let receipt_path = from_receipt
.unwrap_or_else(|| shipper_core::state::execution_state::receipt_path(&state_dir));
let receipt = plan_yank::load_receipt_from_path(&receipt_path).with_context(|| {
"remediate needs a readable receipt; default path is \
<state_dir>/receipt.json. Pass --from-receipt <path> to \
override."
.to_string()
})?;
let plan = remediation::build_dry_run_plan(
&receipt,
&planned.plan.dependencies,
&receipt_path,
&crate_name,
&target_version,
&reason,
)?;
let artifact_path = remediation::write_dry_run_artifact(&state_dir, &plan)?;
match cli.format.as_str() {
"json" => {
let out = serde_json::to_string_pretty(&plan)
.context("failed to serialize remediation dry-run plan as JSON")?;
println!("{out}");
}
_ => {
println!("{}", remediation::render_text(&plan, &artifact_path));
}
}
}
Commands::Clean { keep_receipt } => {
run_clean(
&opts.state_dir,
&planned.workspace_root,
keep_receipt,
opts.force,
)?;
}
Commands::Config(_) => {
unreachable!("Config commands should be handled before this match");
}
Commands::Completion { .. } => {
unreachable!("Completion commands should be handled before this match");
}
}
Ok(())
}
fn preflight_failure_hint(state_dir: &Path) -> String {
let hint = format!(
"preflight failed — next steps:\n \
* run `shipper doctor` to diagnose auth / git / registry\n \
* inspect {}/events.jsonl for the authoritative event log\n \
* `shipper preflight --format json` for machine-readable detail",
state_dir.display()
);
with_common_blockers(
hint,
&[
"missing token/auth: run `cargo login <token>` or configure Trusted Publishing",
"dirty git: commit or stash changes, or pass `--allow-dirty` only for intentional rehearsal",
"version already exists: run `shipper status`, then bump or skip the crate version",
"ownership failure: confirm the token can publish with `cargo owner --list <crate>`",
"registry unreachable: verify `--registry`, `--api-base`, and network access",
],
)
}
fn publish_failure_hint(state_dir: &Path) -> String {
let hint = format!(
"publish failed — next steps:\n \
* inspect {dir}/events.jsonl (authoritative) and {dir}/state.json (projection)\n \
* run `shipper status` to compare local versions to the registry\n \
* run `shipper resume` after fixing the root cause to continue from the failed crate\n \
* run `shipper doctor` if auth / network is suspect",
dir = state_dir.display()
);
with_common_blockers(
hint,
&[
"ambiguous publish: inspect reconciliation evidence; do not blind-retry outside Shipper",
"rate limit or Retry-After: wait for Shipper's scheduled retry instead of restarting",
"version already exists: run `shipper status` before deciding to bump or resume",
"stale lock: verify no release is active before using `--force` or `shipper clean`",
"auth/network failure: run `shipper doctor` before resuming",
],
)
}
fn resume_failure_hint(state_dir: &Path) -> String {
let hint = format!(
"resume failed — next steps:\n \
* if plan-ID mismatch: either `shipper clean` and start a fresh plan, \
or pass `--force-resume` if you understand the divergence\n \
* inspect {dir}/events.jsonl for the authoritative event log\n \
* inspect {dir}/state.json to see what was already published\n \
* run `shipper status` to compare local versions to the registry",
dir = state_dir.display()
);
with_common_blockers(
hint,
&[
"state mismatch: compare the current plan with the saved `plan_id` before forcing",
"corrupt state: preserve `events.jsonl`, then rebuild or clean state intentionally",
"stale lock: verify no other release process owns the lock before forcing",
"ambiguous state: inspect `reconciliation.json` and let resume reconcile registry truth",
],
)
}
fn plan_failure_hint(manifest_path: &Path, packages: &[String], command_name: &str) -> String {
let mut hint = format!(
"failed to load release plan for `{command_name}` - next steps:\n \
* verify `--manifest-path` points at the workspace Cargo.toml: {}\n \
* run `cargo metadata --manifest-path \"{}\"` to inspect the underlying Cargo error",
manifest_path.display(),
manifest_path.display()
);
if packages.is_empty() {
hint.push_str("\n * run `shipper plan` first to inspect publishable and skipped crates");
} else {
hint.push_str(
"\n * run `shipper plan` without `--package` to list publishable crates\n \
* verify each selected `--package` is publishable and not marked `publish = false`",
);
}
with_common_blockers(
hint,
&[
"missing manifest: pass `--manifest-path <workspace>/Cargo.toml`",
"selected package not publishable: check `publish = false` and package spelling",
"Cargo metadata failure: run the printed `cargo metadata` command directly",
],
)
}
fn with_common_blockers(mut hint: String, blockers: &[&str]) -> String {
if blockers.is_empty() {
return hint;
}
hint.push_str("\n Common blockers to check:");
for blocker in blockers {
hint.push_str("\n * ");
hint.push_str(blocker);
}
hint
}
fn command_name_for_hint(command: &Commands) -> &'static str {
match command {
Commands::Plan => "plan",
Commands::Preflight { .. } => "preflight",
Commands::Publish => "publish",
Commands::Resume => "resume",
Commands::Rehearse => "rehearse",
Commands::Status { .. } => "status",
Commands::Doctor => "doctor",
Commands::InspectEvents { .. } => "inspect-events",
Commands::InspectReceipt => "inspect-receipt",
Commands::Ci(_) => "ci",
Commands::Clean { .. } => "clean",
Commands::Yank { .. } => "yank",
Commands::PlanYank { .. } => "plan-yank",
Commands::FixForward { .. } => "fix-forward",
Commands::Remediate { .. } => "remediate",
Commands::Config(_) => "config",
Commands::Completion { .. } => "completion",
}
}
fn parse_duration(s: &str) -> Result<Duration> {
shipper_duration::parse_duration(s).with_context(|| format!("invalid duration: {s}"))
}
fn parse_policy(s: &str) -> Result<shipper_core::config::PublishPolicy> {
match s.to_lowercase().as_str() {
"safe" => Ok(shipper_core::config::PublishPolicy::Safe),
"balanced" => Ok(shipper_core::config::PublishPolicy::Balanced),
"fast" => Ok(shipper_core::config::PublishPolicy::Fast),
_ => bail!("invalid policy: {s} (expected: safe, balanced, fast)"),
}
}
fn parse_verify_mode(s: &str) -> Result<shipper_core::config::VerifyMode> {
match s.to_lowercase().as_str() {
"workspace" => Ok(shipper_core::config::VerifyMode::Workspace),
"package" => Ok(shipper_core::config::VerifyMode::Package),
"none" => Ok(shipper_core::config::VerifyMode::None),
_ => bail!("invalid verify-mode: {s} (expected: workspace, package, none)"),
}
}
fn parse_readiness_method(s: &str) -> Result<shipper_core::config::ReadinessMethod> {
match s.to_lowercase().as_str() {
"api" => Ok(shipper_core::config::ReadinessMethod::Api),
"index" => Ok(shipper_core::config::ReadinessMethod::Index),
"both" => Ok(shipper_core::config::ReadinessMethod::Both),
_ => bail!("invalid readiness-method: {s} (expected: api, index, both)"),
}
}
fn parse_retry_strategy(s: &str) -> Result<shipper_core::retry::RetryStrategyType> {
match s.to_lowercase().as_str() {
"immediate" => Ok(shipper_core::retry::RetryStrategyType::Immediate),
"exponential" => Ok(shipper_core::retry::RetryStrategyType::Exponential),
"linear" => Ok(shipper_core::retry::RetryStrategyType::Linear),
"constant" => Ok(shipper_core::retry::RetryStrategyType::Constant),
_ => bail!(
"invalid retry-strategy: {s} (expected: immediate, exponential, linear, constant)"
),
}
}
fn print_version(verbose: bool) {
println!("shipper {}", env!("CARGO_PKG_VERSION"));
if verbose {
println!("{RICH_VERSION_DETAILS}");
}
}
#[derive(Debug, Serialize)]
struct PlanReport {
schema_version: &'static str,
plan_id: String,
registry: PlanRegistryReport,
workspace_root: String,
publishable_count: usize,
skipped_count: usize,
internal_dependency_edges: usize,
publish_levels: usize,
artifacts: Vec<PlanArtifactReport>,
packages: Vec<PlanPackageReport>,
skipped: Vec<PlanSkippedPackageReport>,
}
#[derive(Debug, Serialize)]
struct PlanRegistryReport {
name: String,
api_base: String,
#[serde(skip_serializing_if = "Option::is_none")]
index_base: Option<String>,
}
#[derive(Debug, Serialize)]
struct PlanPackageReport {
order: usize,
name: String,
version: String,
manifest_path: String,
level: Option<usize>,
dependencies: Vec<String>,
order_reason: String,
}
#[derive(Debug, Serialize)]
struct PlanSkippedPackageReport {
name: String,
version: String,
reason: String,
}
#[derive(Debug, Serialize)]
struct PlanArtifactReport {
kind: &'static str,
path: String,
description: &'static str,
}
#[derive(Debug, Serialize)]
struct PreflightJsonReport<'a> {
schema_version: &'static str,
#[serde(flatten)]
report: &'a PreflightReport,
proofs: Vec<PreflightEvidenceItem>,
gaps: Vec<PreflightEvidenceItem>,
failed_checks: Vec<PreflightEvidenceItem>,
live_release_evidence: Vec<PreflightEvidenceItem>,
#[serde(skip_serializing_if = "Option::is_none")]
registry_profile: Option<PreflightRegistryProfileReport>,
artifacts: Vec<PreflightArtifactReport>,
}
#[derive(Debug, Serialize)]
struct PreflightEvidenceItem {
id: &'static str,
status: &'static str,
summary: String,
packages: Vec<String>,
}
#[derive(Debug, Serialize)]
struct PreflightRegistryProfileReport {
name: String,
first_publish_count: usize,
update_count: usize,
minimum_registry_pacing: String,
notes: Vec<String>,
}
#[derive(Debug, Serialize)]
struct PreflightArtifactReport {
kind: &'static str,
path: Option<String>,
description: &'static str,
}
fn print_plan(ws: &plan::PlannedWorkspace, verbose: bool, format: &str) {
if format == "json" {
let report = build_plan_report(ws);
let json = serde_json::to_string_pretty(&report).expect("serialize plan report");
println!("{}", json);
return;
}
println!("plan_id: {}", ws.plan.plan_id);
println!(
"registry: {} ({})",
ws.plan.registry.name, ws.plan.registry.api_base
);
println!("workspace_root: {}", ws.workspace_root.display());
println!();
let total_packages = ws.plan.packages.len();
println!("Total packages to publish: {}", total_packages);
println!("Plan summary:");
println!(" Publishable packages: {}", total_packages);
println!(" Skipped packages: {}", ws.skipped.len());
println!(
" Internal dependency edges: {}",
internal_dependency_edges(&ws.plan)
);
println!(" Publish levels: {}", ws.plan.group_by_levels().len());
println!(" Plan artifact: .shipper/plan.txt (`shipper plan --format json` capture)");
println!();
if !ws.skipped.is_empty() {
println!("Skipped packages:");
for p in &ws.skipped {
println!(" - {}@{} ({})", p.name, p.version, p.reason);
}
println!();
}
if verbose {
print_detailed_plan(ws);
} else {
for (idx, p) in ws.plan.packages.iter().enumerate() {
println!(
"{:>3}. {}@{} ({})",
idx + 1,
p.name,
p.version,
dependency_summary(&ws.plan, p)
);
}
}
}
fn build_plan_report(ws: &plan::PlannedWorkspace) -> PlanReport {
let levels = ws.plan.group_by_levels();
let packages = ws
.plan
.packages
.iter()
.enumerate()
.map(|(idx, package)| {
let dependencies = dependency_names(&ws.plan, package);
let level = levels
.iter()
.find(|level| {
level
.packages
.iter()
.any(|level_pkg| level_pkg.name == package.name)
})
.map(|level| level.level);
PlanPackageReport {
order: idx + 1,
name: package.name.clone(),
version: package.version.clone(),
manifest_path: package.manifest_path.display().to_string(),
level,
dependencies,
order_reason: dependency_summary(&ws.plan, package),
}
})
.collect();
let skipped = ws
.skipped
.iter()
.map(|package| PlanSkippedPackageReport {
name: package.name.clone(),
version: package.version.clone(),
reason: package.reason.clone(),
})
.collect();
PlanReport {
schema_version: "shipper.plan.v1",
plan_id: ws.plan.plan_id.clone(),
registry: PlanRegistryReport {
name: ws.plan.registry.name.clone(),
api_base: ws.plan.registry.api_base.clone(),
index_base: ws.plan.registry.index_base.clone(),
},
workspace_root: ws.workspace_root.display().to_string(),
publishable_count: ws.plan.packages.len(),
skipped_count: ws.skipped.len(),
internal_dependency_edges: internal_dependency_edges(&ws.plan),
publish_levels: levels.len(),
artifacts: vec![plan_artifact_report()],
packages,
skipped,
}
}
fn plan_artifact_report() -> PlanArtifactReport {
PlanArtifactReport {
kind: "plan_json_stdout",
path: ".shipper/plan.txt".to_string(),
description: "Recommended CI capture path for `shipper plan --format json`.",
}
}
fn internal_dependency_edges(plan: &ReleasePlan) -> usize {
plan.dependencies.values().map(Vec::len).sum()
}
fn dependency_summary(plan: &ReleasePlan, package: &PlannedPackage) -> String {
let dependencies = dependency_names(plan, package);
if dependencies.is_empty() {
"no workspace dependencies".to_string()
} else {
format!("depends on: {}", dependencies.join(", "))
}
}
fn dependency_names(plan: &ReleasePlan, package: &PlannedPackage) -> Vec<String> {
plan.dependencies
.get(&package.name)
.map(|dependencies| {
dependencies
.iter()
.filter_map(|dependency| {
plan.packages
.iter()
.find(|candidate| candidate.name == *dependency)
.map(|candidate| format!("{}@{}", candidate.name, candidate.version))
})
.collect()
})
.unwrap_or_default()
}
fn print_detailed_plan(ws: &plan::PlannedWorkspace) {
let levels = ws.plan.group_by_levels();
let total_levels = levels.len();
println!("=== Dependency Analysis ===");
println!();
println!("Publishing Levels (packages at same level can be published in parallel):");
println!();
for level in &levels {
let level_pkgs: Vec<String> = level
.packages
.iter()
.map(|p| format!("{}@{}", p.name, p.version))
.collect();
println!(" Level {}: {}", level.level, level_pkgs.join(", "));
}
println!();
println!("Dependency Graph:");
println!();
for (idx, p) in ws.plan.packages.iter().enumerate() {
println!(
" {:>3}. {}@{} ({})",
idx + 1,
p.name,
p.version,
dependency_summary(&ws.plan, p)
);
}
println!();
println!("=== Preflight Considerations ===");
println!();
let mut issues: Vec<String> = Vec::new();
for p in &ws.plan.packages {
#[allow(clippy::collapsible_if)]
if let Some(deps) = ws.plan.dependencies.get(&p.name) {
if deps.len() > 3 {
issues.push(format!(
" - {}@{} has {} dependencies (may require longer publish time)",
p.name,
p.version,
deps.len()
));
}
}
}
let mut dependents_count: std::collections::HashMap<&str, usize> =
std::collections::HashMap::new();
for deps in ws.plan.dependencies.values() {
for dep in deps {
*dependents_count.entry(dep.as_str()).or_insert(0) += 1;
}
}
for (name, count) in &dependents_count {
#[allow(clippy::collapsible_if)]
if *count > 3 {
if let Some(pkg) = ws.plan.packages.iter().find(|p| p.name == *name) {
issues.push(format!(
" - {}@{} is a core dependency for {} packages (critical path)",
pkg.name, pkg.version, count
));
}
}
}
if issues.is_empty() {
println!(" No obvious issues detected.");
println!(" All packages have reasonable dependency structures.");
} else {
for issue in &issues {
println!("{}", issue);
}
}
println!();
println!("=== Estimated Publishing Analysis ===");
println!();
let max_parallel = levels.iter().map(|l| l.packages.len()).max().unwrap_or(0);
println!(
" Parallel publishing: {}",
if max_parallel > 1 {
"enabled"
} else {
"sequential"
}
);
println!(" Max concurrent packages: {}", max_parallel);
println!(" Total publish levels: {}", total_levels);
let total_packages = ws.plan.packages.len();
let estimated_sequential_secs = total_packages * 30;
let estimated_parallel_secs = levels.iter().map(|_l| 30).sum::<usize>();
println!(
" Estimated time (sequential): ~{}s ({:.1}min)",
estimated_sequential_secs,
estimated_sequential_secs as f64 / 60.0
);
println!(
" Estimated time (parallel): ~{}s ({:.1}min)",
estimated_parallel_secs,
estimated_parallel_secs as f64 / 60.0
);
println!();
println!("=== Full Publish Order ===");
println!();
for (idx, p) in ws.plan.packages.iter().enumerate() {
let level = levels
.iter()
.find(|l| l.packages.iter().any(|lp| lp.name == p.name));
let level_str = level
.map(|l| format!("[Level {}]", l.level))
.unwrap_or_else(|| "[?]".to_string());
println!(" {:>3}. {} {} @{}", idx + 1, level_str, p.name, p.version);
}
}
fn print_preflight(rep: &PreflightReport, format: &str) {
match format {
"json" => {
let report = build_preflight_json_report(rep);
let json = serde_json::to_string_pretty(&report).expect("serialize preflight report");
println!("{}", json);
}
_ => {
println!("Preflight Report");
println!("===============");
println!();
println!("Plan ID: {}", rep.plan_id);
println!("Timestamp: {}", rep.timestamp.format("%Y-%m-%dT%H:%M:%SZ"));
println!();
println!(
"Token Detected: {}",
if rep.token_detected { "✓" } else { "✗" }
);
println!();
let (finishability_color, finishability_text) = match rep.finishability {
Finishability::Proven => ("\x1b[32m", "PROVEN"),
Finishability::NotProven => ("\x1b[33m", "NOT PROVEN"),
Finishability::Failed => ("\x1b[31m", "FAILED"),
};
println!(
"Finishability: {}{}",
finishability_color, finishability_text
);
println!();
println!("Packages:");
println!(
"┌─────────────────────┬─────────┬──────────┬──────────┬───────────────┬─────────────┬─────────────┐"
);
println!(
"│ Package │ Version │ Published│ New Crate │ Auth Type │ Ownership │ Dry-run │"
);
println!(
"├─────────────────────┼─────────┼──────────┼──────────┼───────────────┼─────────────┼─────────────┤"
);
for p in &rep.packages {
let published = if p.already_published { "Yes" } else { "No" };
let new_crate = if p.is_new_crate { "Yes" } else { "No" };
let auth_type = match p.auth_type {
Some(shipper_core::types::AuthType::Token) => "Token",
Some(shipper_core::types::AuthType::TrustedPublishing) => "Trusted",
Some(shipper_core::types::AuthType::Unknown) => "Unknown",
None => "-",
};
let ownership = if p.ownership_verified { "✓" } else { "✗" };
let dry_run = if p.dry_run_passed { "✓" } else { "✗" };
println!(
"│ {:<19} │ {:<7} │ {:<8} │ {:<8} │ {:<13} │ {:<11} │ {:<11} │",
p.name, p.version, published, new_crate, auth_type, ownership, dry_run
);
}
println!(
"└─────────────────────┴─────────┴──────────┴──────────┴───────────────┴─────────────┴─────────────┘"
);
println!();
let failed_packages: Vec<_> = rep
.packages
.iter()
.filter(|p| !p.dry_run_passed && p.dry_run_output.is_some())
.collect();
if !failed_packages.is_empty() {
println!("Dry-run Failures:");
println!("-----------------");
for p in failed_packages {
println!("Package: {}@{}", p.name, p.version);
println!("{}", p.dry_run_output.as_ref().unwrap());
println!();
}
} else if rep.finishability == Finishability::Failed && rep.dry_run_output.is_some() {
println!("Workspace Dry-run Failure:");
println!("--------------------------");
println!("{}", rep.dry_run_output.as_ref().unwrap());
println!();
}
let total = rep.packages.len();
let already_published = rep.packages.iter().filter(|p| p.already_published).count();
let new_crates = rep.packages.iter().filter(|p| p.is_new_crate).count();
let ownership_verified = rep.packages.iter().filter(|p| p.ownership_verified).count();
let dry_run_passed = rep.packages.iter().filter(|p| p.dry_run_passed).count();
println!("Summary:");
println!(" Total packages: {}", total);
println!(" Already published: {}", already_published);
println!(" New crates: {}", new_crates);
println!(" Ownership verified: {}", ownership_verified);
println!(" Dry-run passed: {}", dry_run_passed);
if let Some(estimate) = &rep.estimated_publish_duration {
println!(
" Estimated registry pacing: at least {}",
humantime::format_duration(estimate.minimum_registry_pacing)
);
println!(
" profile={} first_publish={} updates={}",
estimate.registry_profile, estimate.first_publish_count, estimate.update_count
);
}
println!();
print_preflight_proof_explanation(rep, total, dry_run_passed);
println!("What to do next:");
println!("-----------------");
match rep.finishability {
Finishability::Proven => {
println!(
"\x1b[32m✓ All local preflight checks passed. Next: shipper publish\x1b[0m"
);
}
Finishability::NotProven => {
println!(
"\x1b[33m⚠ Preflight did not prove every release prerequisite.\x1b[0m"
);
println!(
" - configure registry auth or Trusted Publishing if ownership is unverified"
);
println!(" - rerun `shipper preflight`");
println!(
" - if you accept the uncertainty, run `shipper publish` with an explicit policy choice"
);
}
Finishability::Failed => {
println!(
"\x1b[31m✗ Preflight failed. Fix the failed checks above, then rerun `shipper preflight`.\x1b[0m"
);
}
}
}
}
}
fn build_preflight_json_report(rep: &PreflightReport) -> PreflightJsonReport<'_> {
let total = rep.packages.len();
let dry_run_passed = rep.packages.iter().filter(|p| p.dry_run_passed).count();
let dry_run_failed = rep
.packages
.iter()
.filter(|p| !p.dry_run_passed)
.collect::<Vec<_>>();
let ownership_unverified = rep
.packages
.iter()
.filter(|p| !p.ownership_verified)
.collect::<Vec<_>>();
let mut proofs = Vec::new();
if dry_run_failed.is_empty() {
proofs.push(PreflightEvidenceItem {
id: "local_dry_run",
status: "passed",
summary: format!(
"Local package dry-run passed for {} of {} {}.",
dry_run_passed,
total,
package_noun(total)
),
packages: rep.packages.iter().map(package_ref).collect(),
});
} else if dry_run_passed > 0 {
proofs.push(PreflightEvidenceItem {
id: "local_dry_run_partial",
status: "partial",
summary: format!(
"Local package dry-run passed for {} of {} {}.",
dry_run_passed,
total,
package_noun(total)
),
packages: rep
.packages
.iter()
.filter(|p| p.dry_run_passed)
.map(package_ref)
.collect(),
});
}
proofs.push(PreflightEvidenceItem {
id: "registry_version_checks",
status: "completed",
summary: format!(
"Registry version/new-crate checks completed for {} {}.",
total,
package_noun(total)
),
packages: rep.packages.iter().map(package_ref).collect(),
});
if let Some(estimate) = &rep.estimated_publish_duration {
proofs.push(PreflightEvidenceItem {
id: "registry_pacing_estimate",
status: "completed",
summary: format!(
"Registry pacing estimate generated from the {} profile.",
estimate.registry_profile
),
packages: Vec::new(),
});
}
let mut gaps = Vec::new();
if !ownership_unverified.is_empty() {
gaps.push(PreflightEvidenceItem {
id: "ownership_unverified",
status: "not_proven",
summary: format!(
"Ownership was not verified for {} of {} {}.",
ownership_unverified.len(),
total,
package_noun(total)
),
packages: ownership_unverified
.iter()
.copied()
.map(package_ref)
.collect(),
});
}
if let Some(gap) = preflight_auth_gap(rep) {
gaps.push(gap);
}
let failed_checks = dry_run_failed
.iter()
.copied()
.map(|package| PreflightEvidenceItem {
id: "local_dry_run",
status: "failed",
summary: format!("Dry-run failed for {}.", package_ref(package)),
packages: vec![package_ref(package)],
})
.collect();
let live_release_evidence = vec![PreflightEvidenceItem {
id: "registry_acceptance_visibility",
status: "pending_publish",
summary:
"Registry acceptance and post-publish visibility are recorded during publish/resume."
.to_string(),
packages: rep.packages.iter().map(package_ref).collect(),
}];
PreflightJsonReport {
schema_version: "shipper.preflight.v1",
report: rep,
proofs,
gaps,
failed_checks,
live_release_evidence,
registry_profile: rep.estimated_publish_duration.as_ref().map(|estimate| {
PreflightRegistryProfileReport {
name: estimate.registry_profile.clone(),
first_publish_count: estimate.first_publish_count,
update_count: estimate.update_count,
minimum_registry_pacing: humantime::format_duration(
estimate.minimum_registry_pacing,
)
.to_string(),
notes: estimate.notes.clone(),
}
}),
artifacts: vec![PreflightArtifactReport {
kind: "preflight_json_stdout",
path: None,
description: "This JSON document is the preflight evidence artifact when captured by CI.",
}],
}
}
fn print_preflight_proof_explanation(rep: &PreflightReport, total: usize, dry_run_passed: usize) {
let dry_run_failed = rep
.packages
.iter()
.filter(|package| !package.dry_run_passed)
.collect::<Vec<_>>();
let ownership_unverified = rep
.packages
.iter()
.filter(|package| !package.ownership_verified)
.collect::<Vec<_>>();
println!("Proof explanation:");
println!(" Proven now:");
println!(
" - local package dry-run passed for {} of {} {}.",
dry_run_passed,
total,
package_noun(total)
);
println!(
" - registry version/new-crate checks completed for {} {}.",
total,
package_noun(total)
);
if let Some(estimate) = &rep.estimated_publish_duration {
println!(
" - registry pacing estimate generated from the {} profile.",
estimate.registry_profile
);
}
println!(" Proof gaps:");
if ownership_unverified.is_empty() {
println!(" - none from local preflight.");
} else {
println!(
" - ownership was not verified for {} of {} {}: {}.",
ownership_unverified.len(),
total,
package_noun(total),
package_refs(ownership_unverified.iter().copied())
);
}
if let Some(gap) = preflight_auth_gap(rep) {
println!(" - {}", evidence_bullet(&gap.summary));
}
println!(" Failed checks:");
if dry_run_failed.is_empty() {
println!(" - none.");
} else {
println!(
" - dry-run failed for {} of {} {}: {}.",
dry_run_failed.len(),
total,
package_noun(total),
package_refs(dry_run_failed.iter().copied())
);
}
println!(" Live-release evidence:");
println!(
" - registry acceptance and post-publish visibility are recorded during publish/resume."
);
println!();
}
fn package_refs<'a>(packages: impl Iterator<Item = &'a PreflightPackage>) -> String {
packages.map(package_ref).collect::<Vec<_>>().join(", ")
}
fn package_ref(package: &PreflightPackage) -> String {
format!("{}@{}", package.name, package.version)
}
fn evidence_bullet(summary: &str) -> String {
let mut chars = summary.chars();
let Some(first) = chars.next() else {
return String::new();
};
let mut bullet = String::new();
bullet.push(first.to_ascii_lowercase());
bullet.extend(chars);
bullet
}
fn preflight_auth_gap(rep: &PreflightReport) -> Option<PreflightEvidenceItem> {
if rep.token_detected {
return None;
}
let has_trusted_context = rep.packages.iter().any(|package| {
matches!(
package.auth_type,
Some(shipper_core::types::AuthType::TrustedPublishing)
)
});
let has_partial_trusted_context = rep.packages.iter().any(|package| {
matches!(
package.auth_type,
Some(shipper_core::types::AuthType::Unknown)
)
});
if has_trusted_context {
Some(PreflightEvidenceItem {
id: "trusted_publishing_token_not_minted",
status: "not_proven",
summary: "Trusted Publishing OIDC context was detected, but no short-lived registry token was minted into Cargo auth before preflight.".to_string(),
packages: Vec::new(),
})
} else if has_partial_trusted_context {
Some(PreflightEvidenceItem {
id: "trusted_publishing_oidc_incomplete",
status: "not_proven",
summary: "Trusted Publishing OIDC environment is incomplete; both GitHub OIDC request variables are required before a registry token can be minted.".to_string(),
packages: Vec::new(),
})
} else {
Some(PreflightEvidenceItem {
id: "registry_auth_missing",
status: "not_proven",
summary: "No registry token or Trusted Publishing context was detected.".to_string(),
packages: Vec::new(),
})
}
}
fn package_noun(count: usize) -> &'static str {
if count == 1 { "package" } else { "packages" }
}
#[derive(Serialize)]
struct PublishJsonReport<'a> {
schema_version: &'static str,
command: &'static str,
registry: String,
plan_id: &'a str,
state_dir: String,
published: usize,
pending: usize,
failed: usize,
ambiguous: usize,
uploaded: usize,
skipped: usize,
packages: Vec<CommandJsonPackageReport>,
artifacts: CommandJsonArtifacts,
receipt: &'a shipper_core::types::Receipt,
}
#[derive(Serialize)]
struct ResumeJsonReport<'a> {
schema_version: &'static str,
command: &'static str,
safe_to_resume: bool,
registry: String,
plan_id: &'a str,
state_dir: String,
published: usize,
pending: usize,
failed: usize,
ambiguous: usize,
uploaded: usize,
skipped: usize,
next_package: Option<String>,
packages: Vec<CommandJsonPackageReport>,
artifacts: CommandJsonArtifacts,
receipt: &'a shipper_core::types::Receipt,
}
struct CommandJsonPackageCounts {
published: usize,
pending: usize,
failed: usize,
ambiguous: usize,
uploaded: usize,
skipped: usize,
next_package: Option<String>,
}
#[derive(Serialize)]
struct PlanYankJsonReport<'a> {
schema_version: &'static str,
command: &'static str,
#[serde(flatten)]
plan: &'a shipper_core::engine::plan_yank::YankPlan,
}
#[derive(Serialize)]
struct FixForwardJsonReport<'a> {
schema_version: &'static str,
command: &'static str,
#[serde(flatten)]
plan: &'a shipper_core::engine::fix_forward::FixForwardPlan,
}
#[derive(Serialize)]
struct CommandJsonPackageReport {
name: String,
version: String,
state: &'static str,
attempts: u32,
reconciled: bool,
}
#[derive(Serialize)]
struct CommandJsonArtifacts {
state: CommandJsonArtifact,
events: CommandJsonArtifact,
receipt: CommandJsonArtifact,
reconciliation: CommandJsonArtifact,
}
#[derive(Serialize)]
struct CommandJsonArtifact {
path: String,
exists: bool,
}
fn print_publish_output(
receipt: &shipper_core::types::Receipt,
workspace_root: &Path,
state_dir: &Path,
format: &str,
) -> Result<()> {
if format == "json" {
let report = build_publish_json_report(receipt, state_dir)?;
let json = serde_json::to_string_pretty(&report)
.context("failed to serialize publish JSON envelope")?;
println!("{}", json);
return Ok(());
}
print_receipt(receipt, workspace_root, state_dir, format);
Ok(())
}
fn print_resume_output(
receipt: &shipper_core::types::Receipt,
workspace_root: &Path,
state_dir: &Path,
format: &str,
) -> Result<()> {
if format == "json" {
let report = build_resume_json_report(receipt, state_dir)?;
let json = serde_json::to_string_pretty(&report)
.context("failed to serialize resume JSON envelope")?;
println!("{}", json);
return Ok(());
}
print_receipt(receipt, workspace_root, state_dir, format);
Ok(())
}
fn build_publish_json_report<'a>(
receipt: &'a shipper_core::types::Receipt,
state_dir: &Path,
) -> Result<PublishJsonReport<'a>> {
let reconciled = reconciled_packages(state_dir)?;
let packages = command_package_reports(receipt, &reconciled);
let counts = command_package_counts(receipt);
Ok(PublishJsonReport {
schema_version: "shipper.publish.v1",
command: "publish",
registry: receipt.registry.name.clone(),
plan_id: &receipt.plan_id,
state_dir: state_dir.display().to_string(),
published: counts.published,
pending: counts.pending,
failed: counts.failed,
ambiguous: counts.ambiguous,
uploaded: counts.uploaded,
skipped: counts.skipped,
packages,
artifacts: command_json_artifacts(state_dir),
receipt,
})
}
fn build_resume_json_report<'a>(
receipt: &'a shipper_core::types::Receipt,
state_dir: &Path,
) -> Result<ResumeJsonReport<'a>> {
let reconciled = reconciled_packages(state_dir)?;
let packages = command_package_reports(receipt, &reconciled);
let counts = command_package_counts(receipt);
let safe_to_resume = counts.failed == 0 && counts.ambiguous == 0;
Ok(ResumeJsonReport {
schema_version: "shipper.resume.v1",
command: "resume",
safe_to_resume,
registry: receipt.registry.name.clone(),
plan_id: &receipt.plan_id,
state_dir: state_dir.display().to_string(),
published: counts.published,
pending: counts.pending,
failed: counts.failed,
ambiguous: counts.ambiguous,
uploaded: counts.uploaded,
skipped: counts.skipped,
next_package: counts.next_package,
packages,
artifacts: command_json_artifacts(state_dir),
receipt,
})
}
fn command_package_counts(receipt: &shipper_core::types::Receipt) -> CommandJsonPackageCounts {
let mut counts = CommandJsonPackageCounts {
published: 0,
pending: 0,
failed: 0,
ambiguous: 0,
uploaded: 0,
skipped: 0,
next_package: None,
};
for package in &receipt.packages {
match &package.state {
PackageState::Pending => {
counts.pending += 1;
counts
.next_package
.get_or_insert_with(|| package.name.clone());
}
PackageState::Uploaded => {
counts.uploaded += 1;
counts
.next_package
.get_or_insert_with(|| package.name.clone());
}
PackageState::Published => {
counts.published += 1;
}
PackageState::Skipped { .. } => {
counts.skipped += 1;
}
PackageState::Failed { .. } => {
counts.failed += 1;
counts
.next_package
.get_or_insert_with(|| package.name.clone());
}
PackageState::Ambiguous { .. } => {
counts.ambiguous += 1;
counts
.next_package
.get_or_insert_with(|| package.name.clone());
}
}
}
counts
}
fn command_package_reports(
receipt: &shipper_core::types::Receipt,
reconciled: &BTreeSet<(String, String)>,
) -> Vec<CommandJsonPackageReport> {
receipt
.packages
.iter()
.map(|package| CommandJsonPackageReport {
name: package.name.clone(),
version: package.version.clone(),
state: package_state_name(&package.state),
attempts: package.attempts,
reconciled: reconciled.contains(&(package.name.clone(), package.version.clone())),
})
.collect()
}
fn command_json_artifacts(state_dir: &Path) -> CommandJsonArtifacts {
CommandJsonArtifacts {
state: json_artifact(state_dir.join(shipper_core::state::execution_state::STATE_FILE)),
events: json_artifact(state_dir.join(shipper_core::state::events::EVENTS_FILE)),
receipt: json_artifact(state_dir.join(shipper_core::state::execution_state::RECEIPT_FILE)),
reconciliation: json_artifact(
state_dir.join(shipper_core::state::execution_state::RECONCILIATION_FILE),
),
}
}
fn json_artifact(path: PathBuf) -> CommandJsonArtifact {
CommandJsonArtifact {
exists: path.exists(),
path: path.display().to_string(),
}
}
fn reconciled_packages(state_dir: &Path) -> Result<BTreeSet<(String, String)>> {
let path = shipper_core::state::execution_state::reconciliation_path(state_dir);
if !path.exists() {
return Ok(BTreeSet::new());
}
let raw = std::fs::read_to_string(&path)
.with_context(|| format!("failed to read reconciliation report {}", path.display()))?;
let report: shipper_core::types::ReconciliationReport = serde_json::from_str(&raw)
.with_context(|| format!("failed to parse reconciliation report {}", path.display()))?;
Ok(report
.records
.into_iter()
.map(|record| (record.name, record.version))
.collect())
}
fn package_state_name(state: &PackageState) -> &'static str {
match state {
PackageState::Pending => "pending",
PackageState::Uploaded => "uploaded",
PackageState::Published => "published",
PackageState::Skipped { .. } => "skipped",
PackageState::Failed { .. } => "failed",
PackageState::Ambiguous { .. } => "ambiguous",
}
}
fn print_receipt(
receipt: &shipper_core::types::Receipt,
workspace_root: &Path,
state_dir: &Path,
format: &str,
) {
match format {
"json" => {
let json = serde_json::to_string_pretty(receipt).expect("serialize receipt");
println!("{}", json);
}
_ => {
println!("plan_id: {}", receipt.plan_id);
println!(
"registry: {} ({})",
receipt.registry.name, receipt.registry.api_base
);
let abs_state = if state_dir.is_absolute() {
state_dir.to_path_buf()
} else {
workspace_root.join(state_dir)
};
println!(
"state: {}/{}",
abs_state.display(),
shipper_core::state::execution_state::STATE_FILE
);
println!(
"receipt: {}/{}",
abs_state.display(),
shipper_core::state::execution_state::RECEIPT_FILE
);
println!(
"events: {}/{}",
abs_state.display(),
shipper_core::state::events::EVENTS_FILE
);
println!();
for p in &receipt.packages {
println!(
"{}@{}: {:?} (attempts={}, {}ms)",
p.name, p.version, p.state, p.attempts, p.duration_ms
);
if !p.evidence.attempts.is_empty() {
println!(" Evidence:");
for attempt in &p.evidence.attempts {
println!(
" Attempt {}: exit={}, duration={}ms",
attempt.attempt_number,
attempt.exit_code,
attempt.duration.as_millis()
);
if !attempt.stdout_tail.is_empty() {
println!(
" stdout (last {} lines):",
attempt.stdout_tail.lines().count()
);
for line in attempt.stdout_tail.lines().take(5) {
println!(" {}", line);
}
}
if !attempt.stderr_tail.is_empty() {
println!(
" stderr (last {} lines):",
attempt.stderr_tail.lines().count()
);
for line in attempt.stderr_tail.lines().take(5) {
println!(" {}", line);
}
}
}
}
if !p.evidence.readiness_checks.is_empty() {
println!(
" Readiness checks: {} attempts",
p.evidence.readiness_checks.len()
);
for check in &p.evidence.readiness_checks {
println!(
" Poll {}: visible={}, delay_before={}ms",
check.attempt,
check.visible,
check.delay_before.as_millis()
);
}
}
}
}
}
}
fn run_inspect_events(
ws: &plan::PlannedWorkspace,
opts: &RuntimeOptions,
format: &str,
follow: bool,
) -> Result<()> {
let state_dir = if opts.state_dir.is_absolute() {
opts.state_dir.clone()
} else {
ws.workspace_root.join(&opts.state_dir)
};
if follow {
return follow_authoritative_event_log(&state_dir, format);
}
let event_logs = discover_event_logs(&state_dir)?;
if event_logs.is_empty() {
println!("No event logs found under {}", state_dir.display());
return Ok(());
}
for (idx, events_path) in event_logs.iter().enumerate() {
let event_log = shipper_core::state::events::EventLog::read_from_file(events_path)
.with_context(|| format!("failed to read event log from {}", events_path.display()))?;
if format != "json" {
println!("Event log: {}", events_path.display());
println!();
}
for event in event_log.all_events() {
let json = serde_json::to_string(event).expect("serialize event");
println!("{}", json);
}
if format != "json" && idx + 1 != event_logs.len() {
println!();
}
}
Ok(())
}
fn follow_authoritative_event_log(state_dir: &Path, format: &str) -> Result<()> {
let events_path = shipper_core::state::events::events_path(state_dir);
if format != "json" {
println!("Event log: {}", events_path.display());
if !events_path.exists() {
println!("Waiting for events...");
}
println!("Press Ctrl+C to stop.");
println!();
}
let mut offset = 0;
let stdout = std::io::stdout();
let mut out = stdout.lock();
loop {
offset = write_event_lines_since(&events_path, offset, format, &mut out)?;
out.flush().context("failed to flush event output")?;
std::thread::sleep(Duration::from_millis(500));
}
}
fn write_event_lines_since<W: Write>(
events_path: &Path,
offset: u64,
format: &str,
out: &mut W,
) -> Result<u64> {
if !events_path.exists() {
return Ok(offset);
}
let len = std::fs::metadata(events_path)
.with_context(|| format!("failed to stat event log {}", events_path.display()))?
.len();
let mut next_offset = offset.min(len);
let mut file = std::fs::File::open(events_path)
.with_context(|| format!("failed to open event log {}", events_path.display()))?;
file.seek(SeekFrom::Start(next_offset))
.with_context(|| format!("failed to seek event log {}", events_path.display()))?;
let mut reader = BufReader::new(file);
let mut line = String::new();
loop {
line.clear();
let line_start = next_offset;
let read = reader
.read_line(&mut line)
.with_context(|| format!("failed to read event log {}", events_path.display()))?;
if read == 0 {
break;
}
if !line.ends_with('\n') {
next_offset = line_start;
break;
}
next_offset += read as u64;
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
continue;
}
let event: shipper_core::types::PublishEvent = serde_json::from_str(trimmed)
.with_context(|| format!("failed to parse event JSON from line: {}", trimmed))?;
write_follow_event_line(&event, format, out)?;
}
Ok(next_offset)
}
fn write_follow_event_line<W: Write>(
event: &shipper_core::types::PublishEvent,
format: &str,
out: &mut W,
) -> Result<()> {
if format == "json" {
serde_json::to_writer(&mut *out, event).context("failed to serialize event")?;
out.write_all(b"\n")
.context("failed to write event output")?;
return Ok(());
}
let report = status_watch_event_report(event);
writeln!(
out,
"{} {} {} - {}",
report.timestamp, report.package, report.kind, report.summary
)
.context("failed to write event output")?;
Ok(())
}
fn discover_event_logs(state_dir: &Path) -> Result<Vec<PathBuf>> {
let mut paths = Vec::new();
let authoritative = shipper_core::state::events::events_path(state_dir);
if authoritative.exists() {
paths.push(authoritative);
}
let mut seen = BTreeSet::new();
for path in shipper_core::state::events::preflight_only_events_paths(state_dir)? {
if seen.insert(path.clone()) {
paths.push(path);
}
}
Ok(paths)
}
fn run_inspect_receipt(
ws: &plan::PlannedWorkspace,
opts: &RuntimeOptions,
format: &str,
) -> Result<()> {
let state_dir = if opts.state_dir.is_absolute() {
opts.state_dir.clone()
} else {
ws.workspace_root.join(&opts.state_dir)
};
let receipt_path = shipper_core::state::execution_state::receipt_path(&state_dir);
let content = std::fs::read_to_string(&receipt_path)
.with_context(|| format!("failed to read receipt from {}", receipt_path.display()))?;
let receipt: shipper_core::types::Receipt = serde_json::from_str(&content)
.with_context(|| format!("failed to parse receipt from {}", receipt_path.display()))?;
if format == "json" {
let json = serde_json::to_string_pretty(&receipt).expect("serialize receipt");
println!("{}", json);
return Ok(());
}
println!("Receipt");
println!("=======");
println!();
println!("Plan ID: {}", receipt.plan_id);
println!(
"Registry: {} ({})",
receipt.registry.name, receipt.registry.api_base
);
println!(
"Started: {}",
receipt.started_at.format("%Y-%m-%dT%H:%M:%SZ")
);
println!(
"Finished: {}",
receipt.finished_at.format("%Y-%m-%dT%H:%M:%SZ")
);
println!(
"Duration: {}ms",
(receipt.finished_at - receipt.started_at).num_milliseconds()
);
println!();
if let Some(git) = &receipt.git_context {
println!("Git Context:");
println!("------------");
if let Some(commit) = &git.commit {
println!(" Commit: {}", commit);
}
if let Some(branch) = &git.branch {
println!(" Branch: {}", branch);
}
if let Some(tag) = &git.tag {
println!(" Tag: {}", tag);
}
if let Some(dirty) = git.dirty {
println!(" Dirty: {}", if dirty { "Yes" } else { "No" });
}
println!();
}
println!("Environment:");
println!("------------");
println!(" Shipper: {}", receipt.environment.shipper_version);
if let Some(cargo) = &receipt.environment.cargo_version {
println!(" Cargo: {}", cargo);
}
if let Some(rust) = &receipt.environment.rust_version {
println!(" Rust: {}", rust);
}
println!(" OS: {}", receipt.environment.os);
println!(" Arch: {}", receipt.environment.arch);
println!();
println!("Packages:");
println!("---------");
for p in &receipt.packages {
let state_str = match &p.state {
shipper_core::types::PackageState::Published => "\x1b[32mPublished\x1b[0m",
shipper_core::types::PackageState::Pending => "Pending",
shipper_core::types::PackageState::Uploaded => "\x1b[33mUploaded\x1b[0m",
shipper_core::types::PackageState::Skipped { reason } => {
&format!("Skipped: {}", reason)
}
shipper_core::types::PackageState::Failed { class, message } => {
&format!("\x1b[31mFailed ({:?}): {}\x1b[0m", class, message)
}
shipper_core::types::PackageState::Ambiguous { message } => {
&format!("\x1b[33mAmbiguous: {}\x1b[0m", message)
}
};
println!(
" {}@{}: {} (attempts={}, {}ms)",
p.name, p.version, state_str, p.attempts, p.duration_ms
);
}
Ok(())
}
#[derive(Debug, Serialize)]
struct StatusReport {
schema_version: &'static str,
plan_id: String,
workspace_root: String,
registries: Vec<StatusRegistryReport>,
}
#[derive(Debug, Serialize)]
struct StatusRegistryReport {
name: String,
api_base: String,
#[serde(skip_serializing_if = "Option::is_none")]
index_base: Option<String>,
packages: Vec<StatusPackageReport>,
}
#[derive(Debug, Serialize)]
struct StatusPackageReport {
name: String,
version: String,
status: &'static str,
exists: bool,
}
fn build_status_registry_report(
ws: &plan::PlannedWorkspace,
reporter: &mut dyn Reporter,
) -> Result<StatusRegistryReport> {
reporter.info("initializing registry client...");
let reg = shipper_core::registry::RegistryClient::new(ws.plan.registry.clone())?;
let mut packages = Vec::new();
for p in &ws.plan.packages {
let exists = reg.version_exists(&p.name, &p.version)?;
packages.push(StatusPackageReport {
name: p.name.clone(),
version: p.version.clone(),
status: if exists { "published" } else { "missing" },
exists,
});
}
Ok(StatusRegistryReport {
name: ws.plan.registry.name.clone(),
api_base: ws.plan.registry.api_base.clone(),
index_base: ws.plan.registry.index_base.clone(),
packages,
})
}
fn write_status_report(report: &StatusReport, format: &str) -> Result<()> {
if format == "json" {
let json = serde_json::to_string_pretty(report).context("serialize status report")?;
println!("{json}");
return Ok(());
}
println!("plan_id: {}", report.plan_id);
println!();
let multiple_registries = report.registries.len() > 1;
for (idx, registry) in report.registries.iter().enumerate() {
if multiple_registries {
if idx > 0 {
println!();
}
println!(
"📊 Status for registry: {} ({})",
registry.name, registry.api_base
);
}
for package in ®istry.packages {
println!("{}@{}: {}", package.name, package.version, package.status);
}
}
Ok(())
}
fn run_status_watch(
ws: &plan::PlannedWorkspace,
opts: &RuntimeOptions,
format: &str,
) -> Result<()> {
let state_dir = absolute_state_dir(ws, opts);
let stdout = std::io::stdout();
let mut first = true;
loop {
if !first && format != "json" {
println!();
}
first = false;
let report = build_status_watch_report(ws, &state_dir)?;
{
let mut out = stdout.lock();
write_status_watch_report(&report, format, &mut out)?;
out.flush().context("failed to flush status watch output")?;
}
std::thread::sleep(Duration::from_millis(500));
}
}
fn absolute_state_dir(ws: &plan::PlannedWorkspace, opts: &RuntimeOptions) -> PathBuf {
if opts.state_dir.is_absolute() {
opts.state_dir.clone()
} else {
ws.workspace_root.join(&opts.state_dir)
}
}
#[derive(Debug, Serialize)]
struct StatusWatchReport {
schema_version: &'static str,
plan_id: String,
state_dir: String,
events_path: String,
receipt_path: String,
state_present: bool,
event_count: usize,
counts: StatusWatchCounts,
current_package: Option<String>,
last_event: Option<StatusWatchEventReport>,
next_action: Option<StatusWatchNextAction>,
packages: Vec<StatusWatchPackageReport>,
}
#[derive(Debug, Default, Serialize)]
struct StatusWatchCounts {
total: usize,
pending: usize,
uploaded: usize,
published: usize,
skipped: usize,
failed: usize,
ambiguous: usize,
}
#[derive(Debug, Serialize)]
struct StatusWatchPackageReport {
name: String,
version: String,
state: String,
attempts: u32,
last_updated_at: Option<String>,
}
#[derive(Debug, Serialize)]
struct StatusWatchEventReport {
timestamp: String,
package: String,
kind: &'static str,
summary: String,
}
#[derive(Debug, Serialize)]
struct StatusWatchNextAction {
kind: &'static str,
package: String,
at: String,
delay_ms: u64,
summary: String,
}
fn build_status_watch_report(
ws: &plan::PlannedWorkspace,
state_dir: &Path,
) -> Result<StatusWatchReport> {
let state = shipper_core::state::execution_state::load_state(state_dir)?;
let events_path = shipper_core::state::events::events_path(state_dir);
let receipt_path = shipper_core::state::execution_state::receipt_path(state_dir);
let events = read_status_watch_events(&events_path)
.with_context(|| format!("failed to read event log from {}", events_path.display()))?;
let packages = build_status_watch_packages(ws, state.as_ref());
let counts = status_watch_counts(&packages);
let current_package = current_status_package(&events, state.as_ref(), &packages);
let last_event = events.last().map(status_watch_event_report);
let next_action = latest_status_watch_next_action(&events);
Ok(StatusWatchReport {
schema_version: "shipper.status.watch.v1",
plan_id: ws.plan.plan_id.clone(),
state_dir: state_dir.display().to_string(),
events_path: events_path.display().to_string(),
receipt_path: receipt_path.display().to_string(),
state_present: state.is_some(),
event_count: events.len(),
counts,
current_package,
last_event,
next_action,
packages,
})
}
fn read_status_watch_events(events_path: &Path) -> Result<Vec<PublishEvent>> {
if !events_path.exists() {
return Ok(Vec::new());
}
let content = std::fs::read_to_string(events_path)
.with_context(|| format!("failed to read event log {}", events_path.display()))?;
let lines: Vec<&str> = content.lines().collect();
let has_complete_tail = content.ends_with('\n');
let mut events = Vec::new();
for (idx, line) in lines.iter().enumerate() {
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<PublishEvent>(trimmed) {
Ok(event) => events.push(event),
Err(err) => {
if idx + 1 == lines.len() && !has_complete_tail {
break;
}
return Err(err)
.with_context(|| format!("failed to parse event JSON from line: {}", trimmed));
}
}
}
Ok(events)
}
fn build_status_watch_packages(
ws: &plan::PlannedWorkspace,
state: Option<&ExecutionState>,
) -> Vec<StatusWatchPackageReport> {
ws.plan
.packages
.iter()
.map(|planned| {
let key = pkg_key(&planned.name, &planned.version);
let progress = state
.and_then(|state| state.packages.get(&key))
.or_else(|| state.and_then(|state| state.packages.get(&planned.name)));
StatusWatchPackageReport {
name: planned.name.clone(),
version: planned.version.clone(),
state: progress
.map(|progress| package_state_label(&progress.state).to_string())
.unwrap_or_else(|| "pending".to_string()),
attempts: progress.map(|progress| progress.attempts).unwrap_or(0),
last_updated_at: progress.map(|progress| format_utc(progress.last_updated_at)),
}
})
.collect()
}
fn status_watch_counts(packages: &[StatusWatchPackageReport]) -> StatusWatchCounts {
let mut counts = StatusWatchCounts {
total: packages.len(),
..StatusWatchCounts::default()
};
for package in packages {
match package.state.as_str() {
"pending" => counts.pending += 1,
"uploaded" => counts.uploaded += 1,
"published" => counts.published += 1,
"skipped" => counts.skipped += 1,
"failed" => counts.failed += 1,
"ambiguous" => counts.ambiguous += 1,
_ => {}
}
}
counts
}
fn current_status_package(
events: &[PublishEvent],
state: Option<&ExecutionState>,
packages: &[StatusWatchPackageReport],
) -> Option<String> {
if let Some(state) = state {
for package in &state.packages {
let progress = package.1;
if !matches!(
progress.state,
PackageState::Published
| PackageState::Skipped { .. }
| PackageState::Failed { .. }
) {
return Some(format!("{}@{}", progress.name, progress.version));
}
}
return None;
}
if let Some(event) = latest_active_progress_event(events) {
return Some(event.package.clone());
}
packages
.iter()
.find(|package| {
package.state != "published" && package.state != "skipped" && package.state != "failed"
})
.map(|package| format!("{}@{}", package.name, package.version))
}
fn latest_active_progress_event(events: &[PublishEvent]) -> Option<&PublishEvent> {
for event in events.iter().rev() {
if !event.package.is_empty()
&& event.package != "workspace"
&& event_type_is_active_progress(&event.event_type)
{
return Some(event);
}
if event_type_clears_next_action(&event.event_type) {
return None;
}
}
None
}
fn status_watch_event_report(event: &PublishEvent) -> StatusWatchEventReport {
StatusWatchEventReport {
timestamp: format_utc(event.timestamp),
package: event.package.clone(),
kind: event_type_name(&event.event_type),
summary: summarize_event(event),
}
}
fn latest_status_watch_next_action(events: &[PublishEvent]) -> Option<StatusWatchNextAction> {
for event in events.iter().rev() {
if let Some(action) = status_watch_next_action(event) {
return Some(action);
}
if event_type_clears_next_action(&event.event_type) {
return None;
}
}
None
}
fn status_watch_next_action(event: &PublishEvent) -> Option<StatusWatchNextAction> {
match &event.event_type {
EventType::RetryScheduled {
attempt,
max_attempts,
delay_ms,
next_attempt_at,
reason,
..
}
| EventType::RetryBackoffStarted {
attempt,
max_attempts,
delay_ms,
next_attempt_at,
reason,
..
} => Some(StatusWatchNextAction {
kind: "retry",
package: event.package.clone(),
at: format_utc(*next_attempt_at),
delay_ms: *delay_ms,
summary: format!(
"attempt {}/{} scheduled after {} ({:?})",
attempt + 1,
max_attempts,
format_millis(*delay_ms),
reason
),
}),
EventType::PublishWaiting {
reason,
delay_ms,
until,
} => Some(StatusWatchNextAction {
kind: "wait",
package: event.package.clone(),
at: format_utc(*until),
delay_ms: *delay_ms,
summary: format!("{} for {}", reason, format_millis(*delay_ms)),
}),
EventType::ReadinessPollScheduled {
attempt,
delay_ms,
next_poll_at,
} => Some(StatusWatchNextAction {
kind: "readiness_poll",
package: event.package.clone(),
at: format_utc(*next_poll_at),
delay_ms: *delay_ms,
summary: format!(
"readiness poll {} scheduled after {}",
attempt + 1,
format_millis(*delay_ms)
),
}),
_ => None,
}
}
fn event_type_is_active_progress(event_type: &EventType) -> bool {
matches!(
event_type,
EventType::PackageStarted { .. }
| EventType::PackageAttempted { .. }
| EventType::PackageOutput { .. }
| EventType::PublishWaiting { .. }
| EventType::RateLimitObserved { .. }
| EventType::PublishReconciling { .. }
| EventType::RetryBackoffStarted { .. }
| EventType::RetryScheduled { .. }
| EventType::ReadinessStarted { .. }
| EventType::ReadinessPoll { .. }
| EventType::ReadinessPollScheduled { .. }
)
}
fn event_type_clears_next_action(event_type: &EventType) -> bool {
matches!(
event_type,
EventType::ExecutionFinished { .. }
| EventType::PackageStarted { .. }
| EventType::PackagePublished { .. }
| EventType::PackageFailed { .. }
| EventType::PackageSkipped { .. }
| EventType::PublishReconciled { .. }
| EventType::ReadinessComplete { .. }
| EventType::ReadinessTimeout { .. }
)
}
fn write_status_watch_report<W: Write>(
report: &StatusWatchReport,
format: &str,
out: &mut W,
) -> Result<()> {
if format == "json" {
serde_json::to_writer(&mut *out, report).context("failed to serialize status")?;
out.write_all(b"\n")
.context("failed to write status output")?;
return Ok(());
}
writeln!(out, "Status watch")?;
writeln!(out, "============")?;
writeln!(out, "plan_id: {}", report.plan_id)?;
writeln!(out, "state_dir: {}", report.state_dir)?;
writeln!(
out,
"state: {}",
if report.state_present {
"present"
} else {
"missing"
}
)?;
writeln!(
out,
"events: {} ({} events)",
report.events_path, report.event_count
)?;
writeln!(out, "receipt: {}", report.receipt_path)?;
writeln!(
out,
"progress: published={} pending={} uploaded={} skipped={} failed={} ambiguous={} total={}",
report.counts.published,
report.counts.pending,
report.counts.uploaded,
report.counts.skipped,
report.counts.failed,
report.counts.ambiguous,
report.counts.total
)?;
if let Some(current) = &report.current_package {
writeln!(out, "current: {}", current)?;
} else {
writeln!(out, "current: none")?;
}
if let Some(last_event) = &report.last_event {
writeln!(
out,
"last_event: {} {} {} - {}",
last_event.timestamp, last_event.package, last_event.kind, last_event.summary
)?;
} else {
writeln!(out, "last_event: none")?;
}
if let Some(next_action) = &report.next_action {
writeln!(
out,
"next: {} {} at {} - {}",
next_action.kind, next_action.package, next_action.at, next_action.summary
)?;
} else {
writeln!(out, "next: none scheduled")?;
}
writeln!(out, "packages:")?;
for package in &report.packages {
writeln!(
out,
" {}@{}: {} (attempts={})",
package.name, package.version, package.state, package.attempts
)?;
}
Ok(())
}
fn package_state_label(state: &PackageState) -> &'static str {
match state {
PackageState::Pending => "pending",
PackageState::Uploaded => "uploaded",
PackageState::Published => "published",
PackageState::Skipped { .. } => "skipped",
PackageState::Failed { .. } => "failed",
PackageState::Ambiguous { .. } => "ambiguous",
}
}
fn event_type_name(event_type: &EventType) -> &'static str {
match event_type {
EventType::PlanCreated { .. } => "plan_created",
EventType::ExecutionStarted => "execution_started",
EventType::ExecutionFinished { .. } => "execution_finished",
EventType::AuthEvidenceRecorded { .. } => "auth_evidence_recorded",
EventType::PackageStarted { .. } => "package_started",
EventType::PackageAttempted { .. } => "package_attempted",
EventType::PackageOutput { .. } => "package_output",
EventType::PackagePublished { .. } => "package_published",
EventType::PackageFailed { .. } => "package_failed",
EventType::PackageSkipped { .. } => "package_skipped",
EventType::PublishWaiting { .. } => "publish_waiting",
EventType::RateLimitObserved { .. } => "rate_limit_observed",
EventType::PublishReconciling { .. } => "publish_reconciling",
EventType::PublishReconciled { .. } => "publish_reconciled",
EventType::StateEventDriftDetected { .. } => "state_event_drift_detected",
EventType::PackageYanked { .. } => "package_yanked",
EventType::RehearsalStarted { .. } => "rehearsal_started",
EventType::RehearsalPackagePublished { .. } => "rehearsal_package_published",
EventType::RehearsalPackageFailed { .. } => "rehearsal_package_failed",
EventType::RehearsalComplete { .. } => "rehearsal_complete",
EventType::RehearsalSmokeCheckStarted { .. } => "rehearsal_smoke_check_started",
EventType::RehearsalSmokeCheckSucceeded { .. } => "rehearsal_smoke_check_succeeded",
EventType::RehearsalSmokeCheckFailed { .. } => "rehearsal_smoke_check_failed",
EventType::RetryBackoffStarted { .. } => "retry_backoff_started",
EventType::RetryScheduled { .. } => "retry_scheduled",
EventType::ReadinessStarted { .. } => "readiness_started",
EventType::ReadinessPoll { .. } => "readiness_poll",
EventType::ReadinessPollScheduled { .. } => "readiness_poll_scheduled",
EventType::ReadinessComplete { .. } => "readiness_complete",
EventType::ReadinessTimeout { .. } => "readiness_timeout",
EventType::IndexReadinessStarted { .. } => "index_readiness_started",
EventType::IndexReadinessCheck { .. } => "index_readiness_check",
EventType::IndexReadinessComplete { .. } => "index_readiness_complete",
EventType::PreflightStarted => "preflight_started",
EventType::PreflightWorkspaceVerify { .. } => "preflight_workspace_verify",
EventType::PreflightNewCrateDetected { .. } => "preflight_new_crate_detected",
EventType::PreflightOwnershipCheck { .. } => "preflight_ownership_check",
EventType::PreflightComplete { .. } => "preflight_complete",
}
}
fn summarize_event(event: &PublishEvent) -> String {
match &event.event_type {
EventType::ExecutionStarted => "execution started".to_string(),
EventType::ExecutionFinished { result } => format!("execution finished: {:?}", result),
EventType::PackageStarted { name, version } => {
format!("started {}@{}", name, version)
}
EventType::PackagePublished { duration_ms } => {
format!("published in {}", format_millis(*duration_ms))
}
EventType::PackageFailed { class, message } => format!("failed ({:?}): {}", class, message),
EventType::PackageSkipped { reason } => format!("skipped: {}", reason),
EventType::PublishWaiting {
reason, delay_ms, ..
} => {
format!("waiting for {} ({})", reason, format_millis(*delay_ms))
}
EventType::RateLimitObserved {
retry_after_ms,
message,
..
} => match retry_after_ms {
Some(delay) => format!(
"rate limit observed: {}; retry-after {}",
message,
format_millis(*delay)
),
None => format!("rate limit observed: {}", message),
},
EventType::RetryScheduled {
attempt,
max_attempts,
delay_ms,
reason,
..
} => format!(
"retry attempt {}/{} scheduled after {} ({:?})",
attempt + 1,
max_attempts,
format_millis(*delay_ms),
reason
),
EventType::RetryBackoffStarted {
attempt,
max_attempts,
delay_ms,
reason,
..
} => format!(
"retry backoff before attempt {}/{} for {} ({:?})",
attempt + 1,
max_attempts,
format_millis(*delay_ms),
reason
),
EventType::ReadinessStarted { method } => format!("readiness started: {:?}", method),
EventType::ReadinessPoll { attempt, visible } => {
format!("readiness poll {} visible={}", attempt, visible)
}
EventType::ReadinessPollScheduled {
attempt, delay_ms, ..
} => format!(
"readiness poll {} scheduled after {}",
attempt + 1,
format_millis(*delay_ms)
),
EventType::ReadinessComplete {
duration_ms,
attempts,
} => format!(
"readiness complete after {} checks in {}",
attempts,
format_millis(*duration_ms)
),
EventType::ReadinessTimeout { max_wait_ms } => {
format!("readiness timed out after {}", format_millis(*max_wait_ms))
}
EventType::PublishReconciling { method } => {
format!("reconciling publish outcome via {:?}", method)
}
EventType::PublishReconciled { outcome } => {
format!("reconciled publish outcome: {:?}", outcome)
}
other => event_type_name(other).replace('_', " "),
}
}
fn format_utc(value: chrono::DateTime<chrono::Utc>) -> String {
value.format("%Y-%m-%dT%H:%M:%SZ").to_string()
}
fn format_millis(ms: u64) -> String {
humantime::format_duration(Duration::from_millis(ms)).to_string()
}
fn run_ci(ci_cmd: CiCommands, state_dir: &Path, workspace_root: &Path) -> Result<()> {
let abs_state = if state_dir.is_absolute() {
state_dir.to_path_buf()
} else {
workspace_root.join(state_dir)
};
match ci_cmd {
CiCommands::GitHubActions => {
println!("# GitHub Actions workflow snippet for Shipper");
println!("# Add these steps to your workflow file");
println!();
println!("# Restore Shipper State (cache for faster restores)");
println!("- name: Restore Shipper State");
println!(" uses: actions/cache@v3");
println!(" with:");
println!(" path: {}/", abs_state.display());
println!(" key: shipper-${{{{ github.sha }}}}");
println!(" restore-keys: |");
println!(" shipper-");
println!();
println!("# Restore Shipper State (artifact for resumability)");
println!("- name: Restore Shipper State Artifact");
println!(" uses: actions/download-artifact@v4");
println!(" with:");
println!(" name: shipper-state");
println!(" path: {}/", abs_state.display());
println!(" continue-on-error: true");
println!();
println!("# Run shipper publish (will resume if state exists)");
println!("- name: Publish Crates");
println!(" run: shipper publish --quiet");
println!(" env:");
println!(" CARGO_REGISTRY_TOKEN: ${{{{ secrets.CARGO_REGISTRY_TOKEN }}}}");
println!();
println!("# Save Shipper State (even if publish fails)");
println!("- name: Save Shipper State");
println!(" if: always()");
println!(" uses: actions/upload-artifact@v3");
println!(" with:");
println!(" name: shipper-state");
println!(" path: {}/", abs_state.display());
}
CiCommands::GitLab => {
println!("# GitLab CI snippet for Shipper");
println!("# Add this to your .gitlab-ci.yml");
println!();
println!("publish:");
println!(" image: rust:latest");
println!(" stage: publish");
println!(" cache:");
println!(" key: ${{CI_COMMIT_REF_SLUG}}");
println!(" paths:");
println!(" - {}/", abs_state.display());
println!(" - target/");
println!(" script:");
println!(" - cargo install shipper --locked");
println!(" - shipper publish --quiet");
println!(" variables:");
println!(" CARGO_TERM_COLOR: \"always\"");
println!(" # Configure this in GitLab CI/CD settings (masked, protected)");
println!(" # CARGO_REGISTRY_TOKEN: \"...\"");
println!(" artifacts:");
println!(" paths:");
println!(" - {}/", abs_state.display());
println!(" expire_in: 1 day");
println!(" when: always");
}
CiCommands::CircleCI => {
println!("# CircleCI config snippet for Shipper");
println!("# Add this to your .circleci/config.yml");
println!();
println!("version: 2.1");
println!();
println!("jobs:");
println!(" publish:");
println!(" docker:");
println!(" - image: cimg/rust:latest");
println!(" steps:");
println!(" - checkout");
println!(" - restore_cache:");
println!(" keys:");
println!(" - shipper-state-{{{{ .Branch }}}}-{{{{ .Revision }}}}");
println!(" - shipper-state-{{{{ .Branch }}}}");
println!(" - shipper-state-");
println!(" - run:");
println!(" name: Install Shipper");
println!(" command: cargo install shipper --locked");
println!(" - run:");
println!(" name: Publish Crates");
println!(" command: shipper publish --quiet");
println!(" environment:");
println!(" CARGO_REGISTRY_TOKEN: ${{{{ CARGO_REGISTRY_TOKEN }}}}");
println!(" - save_cache:");
println!(" key: shipper-state-{{{{ .Branch }}}}-{{{{ .Revision }}}}");
println!(" paths:");
println!(" - {}", abs_state.display());
println!(" - store_artifacts:");
println!(" path: {}", abs_state.display());
println!(" destination: shipper-state");
println!();
println!("workflows:");
println!(" version: 2");
println!(" publish:");
println!(" jobs:");
println!(" - publish:");
println!(" filters:");
println!(" branches:");
println!(" only: main");
println!(" context: cargo-registry");
}
CiCommands::AzureDevOps => {
println!("# Azure DevOps pipeline snippet for Shipper");
println!("# Add this to your azure-pipelines.yml");
println!();
println!("trigger:");
println!(" - main");
println!();
println!("pool:");
println!(" vmImage: 'ubuntu-latest'");
println!();
println!("variables:");
println!(" CARGO_HOME: $(Pipeline.Workspace)/.cargo");
println!();
println!("steps:");
println!(" - task: Cache@2");
println!(" displayName: 'Cache Cargo and Shipper State'");
println!(" inputs:");
println!(" key: 'shipper | \"$(Agent.OS)\" | \"$(Build.SourceVersion)\"'");
println!(" restoreKeys: |");
println!(" shipper | \"$(Agent.OS)\"");
println!(" shipper");
println!(" path: $(CARGO_HOME)");
println!(" cacheHitVar: CACHE_RESTORED");
println!();
println!(" - script: cargo install shipper --locked");
println!(" displayName: 'Install Shipper'");
println!();
println!(" - script: shipper publish --quiet");
println!(" displayName: 'Publish Crates'");
println!(" env:");
println!(" CARGO_REGISTRY_TOKEN: $(CARGO_REGISTRY_TOKEN)");
println!();
println!(" - publish: {}", abs_state.display());
println!(" displayName: 'Publish Shipper State Artifact'");
println!(" condition: succeededOrFailed()");
println!(" artifact: 'shipper-state'");
}
}
Ok(())
}
fn run_clean(
state_dir: &PathBuf,
workspace_root: &Path,
keep_receipt: bool,
force: bool,
) -> Result<()> {
let abs_state = if state_dir.is_absolute() {
state_dir.clone()
} else {
workspace_root.join(state_dir)
};
if !abs_state.exists() {
println!("State directory does not exist: {}", abs_state.display());
return Ok(());
}
let mut dirs_to_clean = vec![abs_state.clone()];
if let Ok(entries) = std::fs::read_dir(&abs_state) {
for entry in entries.flatten() {
if let Ok(file_type) = entry.file_type()
&& file_type.is_dir()
&& entry.file_name() != "cache"
{
dirs_to_clean.push(entry.path());
}
}
}
for dir in dirs_to_clean {
clean_single_dir(&dir, workspace_root, keep_receipt, force)?;
}
println!("Clean complete");
Ok(())
}
fn clean_single_dir(
dir: &Path,
workspace_root: &Path,
keep_receipt: bool,
force: bool,
) -> Result<()> {
let state_path = dir.join(shipper_core::state::execution_state::STATE_FILE);
let receipt_path = dir.join(shipper_core::state::execution_state::RECEIPT_FILE);
let reconciliation_path = dir.join(shipper_core::state::execution_state::RECONCILIATION_FILE);
let lock_path = shipper_core::lock::lock_path(dir, Some(workspace_root));
if lock_path.exists() {
if force {
eprintln!(
"[warn] --force specified; removing lock file: {}",
lock_path.display()
);
std::fs::remove_file(&lock_path)
.with_context(|| format!("failed to remove lock file {}", lock_path.display()))?;
} else {
match shipper_core::lock::LockFile::read_lock_info(dir, Some(workspace_root)) {
Ok(lock_info) => {
eprintln!("[warn] Active lock found in {}:", dir.display());
eprintln!("[warn] PID: {}", lock_info.pid);
eprintln!("[warn] Hostname: {}", lock_info.hostname);
eprintln!("[warn] Acquired at: {}", lock_info.acquired_at);
eprintln!("[warn] Plan ID: {:?}", lock_info.plan_id);
}
Err(err) => {
eprintln!(
"[warn] Active lock found in {} but metadata could not be read: {err:#}",
dir.display()
);
}
}
eprintln!("[warn] Use --force to override the lock");
bail!("cannot clean: active lock exists in {}", dir.display());
}
}
if state_path.exists() {
std::fs::remove_file(&state_path)
.with_context(|| format!("failed to remove state file {}", state_path.display()))?;
println!("Removed: {}", state_path.display());
}
for events_path in discover_event_logs(dir)? {
if events_path.exists() {
std::fs::remove_file(&events_path).with_context(|| {
format!("failed to remove events file {}", events_path.display())
})?;
println!("Removed: {}", events_path.display());
}
}
if !keep_receipt && receipt_path.exists() {
std::fs::remove_file(&receipt_path)
.with_context(|| format!("failed to remove receipt file {}", receipt_path.display()))?;
println!("Removed: {}", receipt_path.display());
} else if keep_receipt && receipt_path.exists() {
println!(
"Kept: {} (--keep-receipt specified)",
receipt_path.display()
);
}
if !keep_receipt && reconciliation_path.exists() {
std::fs::remove_file(&reconciliation_path).with_context(|| {
format!(
"failed to remove reconciliation file {}",
reconciliation_path.display()
)
})?;
println!("Removed: {}", reconciliation_path.display());
} else if keep_receipt && reconciliation_path.exists() {
println!(
"Kept: {} (--keep-receipt specified)",
reconciliation_path.display()
);
}
let cache_dir = dir.join("cache");
if cache_dir.exists() {
std::fs::remove_dir_all(&cache_dir)
.with_context(|| format!("failed to remove cache directory {}", cache_dir.display()))?;
println!("Removed: {}", cache_dir.display());
}
Ok(())
}
fn run_config(cmd: ConfigCommands) -> Result<()> {
match cmd {
ConfigCommands::Init { output } => {
let template = ShipperConfig::default_toml_template();
std::fs::write(&output, template)
.with_context(|| format!("Failed to write config file to {}", output.display()))?;
println!("Created configuration file: {}", output.display());
println!();
println!("Edit the file to customize shipper settings for your workspace.");
println!("Run `shipper config validate` to check the configuration.");
}
ConfigCommands::Validate { path } => {
if !path.exists() {
bail!("Config file not found: {}", path.display());
}
let config = ShipperConfig::load_from_file(&path)
.with_context(|| format!("Failed to load config file: {}", path.display()))?;
config.validate().with_context(|| {
format!("Configuration validation failed for {}", path.display())
})?;
println!("Configuration file is valid: {}", path.display());
}
}
Ok(())
}
fn run_completion(shell: &Shell) -> Result<()> {
clap_complete::generate(
*shell,
&mut Cli::command(),
"shipper",
&mut std::io::stdout(),
);
Ok(())
}
#[cfg(test)]
mod tests {
use std::fs;
use chrono::Utc;
use serial_test::serial;
use tempfile::tempdir;
use super::*;
#[derive(Default)]
struct TestReporter {
infos: Vec<String>,
warns: Vec<String>,
errors: Vec<String>,
}
impl Reporter for TestReporter {
fn info(&mut self, msg: &str) {
self.infos.push(msg.to_string());
}
fn warn(&mut self, msg: &str) {
self.warns.push(msg.to_string());
}
fn error(&mut self, msg: &str) {
self.errors.push(msg.to_string());
}
}
#[test]
fn parse_duration_handles_valid_and_invalid_inputs() {
assert!(parse_duration("1s").is_ok());
assert!(parse_duration("nope").is_err());
}
#[test]
fn global_flags_parse_after_subcommand() {
let cli = Cli::try_parse_from([
"shipper",
"preflight",
"--allow-dirty",
"--strict-ownership",
"--verify-mode",
"package",
"--policy",
"safe",
"--format",
"json",
])
.expect("parse CLI");
assert!(matches!(
cli.cmd,
Some(Commands::Preflight {
preflight_only: false
})
));
assert!(cli.allow_dirty);
assert!(cli.strict_ownership);
assert_eq!(cli.verify_mode.as_deref(), Some("package"));
assert_eq!(cli.policy.as_deref(), Some("safe"));
assert_eq!(cli.format, "json");
}
#[test]
fn preflight_only_flag_parses_and_defaults_to_false() {
let cli = Cli::try_parse_from(["shipper", "preflight", "--preflight-only"])
.expect("parse with flag");
match cli.cmd {
Some(Commands::Preflight { preflight_only }) => assert!(preflight_only),
other => panic!("expected Preflight, got {other:?}"),
}
let cli = Cli::try_parse_from(["shipper", "preflight"]).expect("parse without flag");
match cli.cmd {
Some(Commands::Preflight { preflight_only }) => {
assert!(
!preflight_only,
"preflight_only must default to false for back-compat"
);
}
other => panic!("expected Preflight, got {other:?}"),
}
Cli::try_parse_from(["shipper", "publish", "--preflight-only"])
.expect_err("must reject --preflight-only on publish");
}
#[test]
fn status_watch_flag_parses() {
let cli = Cli::try_parse_from(["shipper", "status", "--watch"]).expect("parse status");
match cli.cmd {
Some(Commands::Status { watch }) => assert!(watch),
other => panic!("expected Status, got {other:?}"),
}
}
#[test]
fn cli_reporter_methods_are_callable() {
let mut rep = CliReporter::new(false);
rep.info("info");
rep.warn("warn");
rep.error("error");
}
#[test]
fn cli_reporter_retry_wait_without_progress_blocks_for_delay() {
use std::time::Instant;
let mut rep = CliReporter::new(true); let delay = Duration::from_millis(60);
let start = Instant::now();
rep.retry_wait(
"pkg",
"0.1.0",
1,
3,
delay,
shipper_core::types::ErrorClass::Retryable,
"rate limited",
);
assert!(
start.elapsed() >= delay,
"retry_wait returned early: {:?}",
start.elapsed()
);
}
#[test]
fn cli_reporter_retry_wait_without_progress_warns_and_blocks_for_delay() {
use std::time::Instant;
let mut rep = CliReporter::new(false);
let delay = Duration::from_millis(40);
let start = Instant::now();
rep.retry_wait(
"pkg",
"0.1.0",
1,
3,
delay,
shipper_core::types::ErrorClass::Retryable,
"rate limited",
);
assert!(start.elapsed() >= delay);
}
#[test]
fn cli_reporter_retry_wait_with_progress_routes_through_countdown() {
use std::time::Instant;
let mut rep = CliReporter::new(false);
rep.install_progress(
crate::output::progress::ProgressReporter::silent(2),
BTreeMap::from([(String::from("pkg@1.0.0"), 2usize)]),
);
let delay = Duration::from_millis(40);
let start = Instant::now();
rep.retry_wait(
"pkg",
"1.0.0",
2,
5,
delay,
shipper_core::types::ErrorClass::Retryable,
"server busy",
);
assert!(start.elapsed() >= delay);
assert!(rep.take_progress().is_some());
}
#[test]
fn cli_reporter_retry_wait_updates_progress_to_retrying_package() {
let mut rep = CliReporter::new(true);
rep.install_progress(
crate::output::progress::ProgressReporter::silent(3),
BTreeMap::from([(String::from("beta@0.2.0"), 2usize)]),
);
rep.retry_wait(
"beta",
"0.2.0",
1,
3,
Duration::from_millis(1),
shipper_core::types::ErrorClass::Retryable,
"server busy",
);
let progress = rep.take_progress().expect("progress handle");
assert_eq!(progress.current_package(), 2);
assert_eq!(progress.current_name(), "beta@0.2.0");
}
#[test]
fn cli_reporter_default_impl_preserves_warn_line() {
let mut tr = TestReporter::default();
tr.retry_wait(
"foo",
"1.2.3",
1,
5,
Duration::from_millis(1),
shipper_core::types::ErrorClass::Retryable,
"transient failure",
);
assert_eq!(tr.warns.len(), 1);
let w = &tr.warns[0];
assert!(w.contains("foo@1.2.3"));
assert!(w.contains("transient failure"));
assert!(w.contains("Retryable"));
assert!(w.contains("attempt 2/5"));
}
#[test]
fn preflight_failure_hint_names_common_release_blockers() {
let hint = preflight_failure_hint(Path::new(".shipper"));
for expected in [
"missing token/auth",
"dirty git",
"version already exists",
"ownership failure",
"registry unreachable",
] {
assert!(hint.contains(expected), "missing `{expected}` in:\n{hint}");
}
}
#[test]
fn publish_failure_hint_names_ambiguity_rate_limit_and_lock_blockers() {
let hint = publish_failure_hint(Path::new(".shipper"));
for expected in [
"ambiguous publish",
"rate limit or Retry-After",
"version already exists",
"stale lock",
"auth/network failure",
] {
assert!(hint.contains(expected), "missing `{expected}` in:\n{hint}");
}
}
#[test]
fn resume_failure_hint_names_state_and_reconciliation_blockers() {
let hint = resume_failure_hint(Path::new(".shipper"));
for expected in [
"state mismatch",
"corrupt state",
"stale lock",
"ambiguous state",
] {
assert!(hint.contains(expected), "missing `{expected}` in:\n{hint}");
}
}
#[test]
fn plan_failure_hint_names_manifest_and_package_blockers() {
let hint = plan_failure_hint(
Path::new("missing/Cargo.toml"),
&[String::from("demo")],
"preflight",
);
for expected in [
"missing manifest",
"selected package not publishable",
"Cargo metadata failure",
] {
assert!(hint.contains(expected), "missing `{expected}` in:\n{hint}");
}
}
#[test]
fn print_cmd_version_reports_missing_command() {
let mut reporter = TestReporter::default();
doctor::print_cmd_version("definitely-not-a-real-command-shipper", &mut reporter);
assert!(reporter.warns.iter().any(|w| w.contains("unable to run")));
}
#[test]
#[serial]
fn print_cmd_version_reports_non_zero_exit() {
let td = tempdir().expect("tempdir");
let bin_dir = td.path().join("bin");
fs::create_dir_all(&bin_dir).expect("mkdir");
#[cfg(windows)]
let cmd_path = {
let p = bin_dir.join("badver.cmd");
fs::write(
&p,
"@echo off\r\necho bad version error 1>&2\r\nexit /b 1\r\n",
)
.expect("write");
p
};
#[cfg(not(windows))]
let cmd_path = {
use std::os::unix::fs::PermissionsExt;
let p = bin_dir.join("badver");
fs::write(
&p,
"#!/usr/bin/env sh\necho bad version error >&2\nexit 1\n",
)
.expect("write");
let mut perms = fs::metadata(&p).expect("meta").permissions();
perms.set_mode(0o755);
fs::set_permissions(&p, perms).expect("chmod");
p
};
let mut reporter = TestReporter::default();
doctor::print_cmd_version(cmd_path.to_str().expect("utf8"), &mut reporter);
assert!(
reporter
.warns
.iter()
.any(|w| w.contains("--version failed"))
);
}
#[test]
fn test_reporter_collects_all_levels() {
let mut reporter = TestReporter::default();
reporter.info("i");
reporter.warn("w");
reporter.error("e");
assert_eq!(reporter.infos, vec!["i".to_string()]);
assert_eq!(reporter.warns, vec!["w".to_string()]);
assert_eq!(reporter.errors, vec!["e".to_string()]);
}
#[test]
fn status_watch_report_summarizes_state_and_scheduled_events() {
let td = tempdir().expect("tempdir");
let state_dir = td.path().join(".shipper");
let now = Utc::now();
let ws = plan::PlannedWorkspace {
workspace_root: td.path().to_path_buf(),
plan: ReleasePlan {
plan_version: "shipper.plan.v1".to_string(),
plan_id: "plan-watch".to_string(),
created_at: now,
registry: Registry::crates_io(),
packages: vec![
PlannedPackage {
name: "alpha".to_string(),
version: "0.1.0".to_string(),
manifest_path: td.path().join("alpha/Cargo.toml"),
regime: None,
},
PlannedPackage {
name: "beta".to_string(),
version: "0.2.0".to_string(),
manifest_path: td.path().join("beta/Cargo.toml"),
regime: None,
},
],
dependencies: BTreeMap::new(),
},
skipped: vec![],
};
let state = ExecutionState {
state_version: "shipper.state.v1".to_string(),
plan_id: "plan-watch".to_string(),
registry: Registry::crates_io(),
created_at: now,
updated_at: now,
attempt_history: Vec::new(),
packages: BTreeMap::from([
(
"alpha@0.1.0".to_string(),
shipper_core::types::PackageProgress {
name: "alpha".to_string(),
version: "0.1.0".to_string(),
attempts: 1,
state: PackageState::Published,
last_updated_at: now,
},
),
(
"beta@0.2.0".to_string(),
shipper_core::types::PackageProgress {
name: "beta".to_string(),
version: "0.2.0".to_string(),
attempts: 1,
state: PackageState::Uploaded,
last_updated_at: now,
},
),
]),
};
shipper_core::state::execution_state::save_state(&state_dir, &state).expect("save state");
let next_poll_at = now + chrono::Duration::seconds(5);
let mut event_log = shipper_core::state::events::EventLog::new();
event_log.record(PublishEvent {
timestamp: now,
package: "beta@0.2.0".to_string(),
event_type: EventType::ReadinessPollScheduled {
attempt: 1,
delay_ms: 5_000,
next_poll_at,
},
});
event_log
.write_to_file(&shipper_core::state::events::events_path(&state_dir))
.expect("write events");
let report = build_status_watch_report(&ws, &state_dir).expect("report");
assert_eq!(report.schema_version, "shipper.status.watch.v1");
assert_eq!(report.counts.published, 1);
assert_eq!(report.counts.uploaded, 1);
assert_eq!(report.current_package.as_deref(), Some("beta@0.2.0"));
assert_eq!(
report.next_action.as_ref().map(|action| action.kind),
Some("readiness_poll")
);
let mut rendered = Vec::new();
write_status_watch_report(&report, "text", &mut rendered).expect("render");
let rendered = String::from_utf8(rendered).expect("utf8");
assert!(rendered.contains("Status watch"));
assert!(rendered.contains("progress: published=1 pending=0 uploaded=1"));
assert!(rendered.contains("next: readiness_poll beta@0.2.0"));
let mut rendered_json = Vec::new();
write_status_watch_report(&report, "json", &mut rendered_json).expect("render JSON");
let rendered_json = String::from_utf8(rendered_json).expect("utf8");
let json: serde_json::Value =
serde_json::from_str(&rendered_json).expect("status watch JSON");
assert_eq!(
json.pointer("/schema_version")
.and_then(serde_json::Value::as_str),
Some("shipper.status.watch.v1")
);
}
#[test]
fn status_watch_next_action_ignores_stale_schedules_after_terminal_event() {
let now = Utc::now();
let scheduled = PublishEvent {
timestamp: now,
package: "beta@0.2.0".to_string(),
event_type: EventType::RetryScheduled {
attempt: 1,
max_attempts: 3,
delay_ms: 5_000,
next_attempt_at: now + chrono::Duration::seconds(5),
reason: shipper_core::types::ErrorClass::Retryable,
message: "rate limited".to_string(),
},
};
assert!(latest_status_watch_next_action(std::slice::from_ref(&scheduled)).is_some());
let published = PublishEvent {
timestamp: now,
package: "beta@0.2.0".to_string(),
event_type: EventType::PackagePublished { duration_ms: 10 },
};
let events = vec![scheduled, published];
assert!(latest_status_watch_next_action(&events).is_none());
}
#[test]
fn status_watch_current_package_ignores_stale_active_events_after_terminal_event() {
let now = Utc::now();
let events = vec![
PublishEvent {
timestamp: now,
package: "beta@0.2.0".to_string(),
event_type: EventType::PackageStarted {
name: "beta".to_string(),
version: "0.2.0".to_string(),
},
},
PublishEvent {
timestamp: now,
package: "beta@0.2.0".to_string(),
event_type: EventType::PackagePublished { duration_ms: 10 },
},
];
let packages = vec![StatusWatchPackageReport {
name: "beta".to_string(),
version: "0.2.0".to_string(),
state: "published".to_string(),
attempts: 1,
last_updated_at: Some(format_utc(now)),
}];
assert_eq!(current_status_package(&events, None, &packages), None);
}
#[test]
fn status_watch_event_reader_ignores_incomplete_tail_line() {
let td = tempdir().expect("tempdir");
let events_path = td.path().join("events.jsonl");
let event = PublishEvent {
timestamp: Utc::now(),
package: "beta@0.2.0".to_string(),
event_type: EventType::PackageStarted {
name: "beta".to_string(),
version: "0.2.0".to_string(),
},
};
let mut content = serde_json::to_string(&event).expect("serialize event");
content.push('\n');
content.push_str("{\"type\":\"package_started\"");
fs::write(&events_path, content).expect("write events");
let events = read_status_watch_events(&events_path).expect("read events");
assert_eq!(events.len(), 1);
}
#[test]
#[serial]
fn run_doctor_supports_absolute_state_dir() {
let td = tempdir().expect("tempdir");
let ws = plan::PlannedWorkspace {
workspace_root: td.path().to_path_buf(),
plan: shipper_core::types::ReleasePlan {
plan_version: "1".to_string(),
plan_id: "plan-x".to_string(),
created_at: chrono::Utc::now(),
registry: Registry::crates_io(),
packages: vec![],
dependencies: std::collections::BTreeMap::new(),
},
skipped: vec![],
};
let state_dir = td.path().join("abs-state");
let opts = RuntimeOptions {
allow_dirty: true,
skip_ownership_check: true,
strict_ownership: false,
no_verify: false,
max_attempts: 1,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
retry_strategy: shipper_core::retry::RetryStrategyType::Exponential,
retry_jitter: 0.5,
retry_per_error: shipper_core::retry::PerErrorConfig::default(),
verify_timeout: Duration::from_millis(0),
verify_poll_interval: Duration::from_millis(0),
state_dir: state_dir.clone(),
force_resume: false,
force: false,
lock_timeout: Duration::from_secs(3600),
policy: shipper_core::types::PublishPolicy::Safe,
verify_mode: shipper_core::types::VerifyMode::Workspace,
readiness: shipper_core::types::ReadinessConfig::default(),
output_lines: 50,
parallel: shipper_core::types::ParallelConfig::default(),
webhook: shipper_core::webhook::WebhookConfig::default(),
encryption: shipper_core::encryption::EncryptionConfig::default(),
registries: vec![],
resume_from: None,
rehearsal_registry: None,
rehearsal_skip: false,
rehearsal_smoke_install: None,
};
fs::create_dir_all(td.path().join("cargo-home")).expect("mkdir");
temp_env::with_vars(
[
("CARGO_REGISTRY_TOKEN", None::<String>),
("CARGO_REGISTRIES_CRATES_IO_TOKEN", None::<String>),
(
"CARGO_HOME",
Some(
td.path()
.join("cargo-home")
.to_str()
.expect("utf8")
.to_string(),
),
),
],
|| {
let mut reporter = TestReporter::default();
doctor::run(&ws, &opts, &mut reporter).expect("doctor");
},
);
}
#[test]
#[serial]
fn run_doctor_restores_env_when_old_values_are_missing_or_present() {
let td = tempdir().expect("tempdir");
let ws = plan::PlannedWorkspace {
workspace_root: td.path().to_path_buf(),
plan: shipper_core::types::ReleasePlan {
plan_version: "1".to_string(),
plan_id: "plan-y".to_string(),
created_at: chrono::Utc::now(),
registry: Registry::crates_io(),
packages: vec![],
dependencies: std::collections::BTreeMap::new(),
},
skipped: vec![],
};
let opts = RuntimeOptions {
allow_dirty: true,
skip_ownership_check: true,
strict_ownership: false,
no_verify: false,
max_attempts: 1,
base_delay: Duration::from_millis(0),
max_delay: Duration::from_millis(0),
retry_strategy: shipper_core::retry::RetryStrategyType::Exponential,
retry_jitter: 0.5,
retry_per_error: shipper_core::retry::PerErrorConfig::default(),
verify_timeout: Duration::from_millis(0),
verify_poll_interval: Duration::from_millis(0),
state_dir: td.path().join("abs-state-2"),
force_resume: false,
force: false,
lock_timeout: Duration::from_secs(3600),
policy: shipper_core::types::PublishPolicy::Safe,
verify_mode: shipper_core::types::VerifyMode::Workspace,
readiness: shipper_core::types::ReadinessConfig::default(),
output_lines: 50,
parallel: shipper_core::types::ParallelConfig::default(),
webhook: shipper_core::webhook::WebhookConfig::default(),
encryption: shipper_core::encryption::EncryptionConfig::default(),
registries: vec![],
resume_from: None,
rehearsal_registry: None,
rehearsal_skip: false,
rehearsal_smoke_install: None,
};
fs::create_dir_all(td.path().join("cargo-home")).expect("mkdir");
temp_env::with_vars(
[
("CARGO_REGISTRY_TOKEN", None::<String>),
("CARGO_REGISTRIES_CRATES_IO_TOKEN", None::<String>),
(
"CARGO_HOME",
Some(
td.path()
.join("cargo-home")
.to_str()
.expect("utf8")
.to_string(),
),
),
],
|| {
let mut reporter = TestReporter::default();
doctor::run(&ws, &opts, &mut reporter).expect("doctor");
},
);
}
#[test]
fn config_init_creates_file() {
let td = tempdir().expect("tempdir");
let config_path = td.path().join("test-config.toml");
run_config(ConfigCommands::Init {
output: config_path.clone(),
})
.expect("config init should succeed");
assert!(config_path.exists(), "config file should be created");
let content = fs::read_to_string(&config_path).expect("read config file");
assert!(
content.contains("[policy]"),
"config should contain [policy] section"
);
assert!(
content.contains("[readiness]"),
"config should contain [readiness] section"
);
}
#[test]
fn config_validate_valid_file() {
let td = tempdir().expect("tempdir");
let config_path = td.path().join("test-config.toml");
let valid_config = r#"
[policy]
mode = "safe"
[verify]
mode = "workspace"
[readiness]
enabled = true
method = "api"
initial_delay = "1s"
max_delay = "60s"
max_total_wait = "5m"
poll_interval = "2s"
jitter_factor = 0.5
[output]
lines = 50
[retry]
max_attempts = 6
base_delay = "2s"
max_delay = "2m"
[lock]
timeout = "1h"
"#;
fs::write(&config_path, valid_config).expect("write config file");
run_config(ConfigCommands::Validate {
path: config_path.clone(),
})
.expect("config validate should succeed for valid file");
}
#[test]
fn config_validate_invalid_file() {
let td = tempdir().expect("tempdir");
let config_path = td.path().join("test-config.toml");
let invalid_config = r#"
[output]
lines = 0
"#;
fs::write(&config_path, invalid_config).expect("write config file");
let result = run_config(ConfigCommands::Validate {
path: config_path.clone(),
});
assert!(
result.is_err(),
"config validate should fail for invalid file"
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("output.lines must be greater than 0")
|| err.contains("Configuration validation failed"),
"error should mention output.lines or validation failed"
);
}
#[test]
fn config_validate_missing_file() {
let td = tempdir().expect("tempdir");
let config_path = td.path().join("nonexistent-config.toml");
let result = run_config(ConfigCommands::Validate {
path: config_path.clone(),
});
assert!(
result.is_err(),
"config validate should fail for missing file"
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("not found") || err.contains("Config file not found"),
"error should mention file not found"
);
}
#[test]
fn config_load_from_workspace() {
let td = tempdir().expect("tempdir");
let workspace_root = td.path();
let result = ShipperConfig::load_from_workspace(workspace_root);
assert!(
result.is_ok(),
"load should succeed even without config file"
);
assert!(
result.unwrap().is_none(),
"should return None when no config exists"
);
let config_path = workspace_root.join(".shipper.toml");
let valid_config = r#"
[policy]
mode = "fast"
"#;
fs::write(&config_path, valid_config).expect("write config file");
let result = ShipperConfig::load_from_workspace(workspace_root);
assert!(result.is_ok(), "load should succeed");
let config = result.unwrap();
assert!(config.is_some(), "should return Some when config exists");
assert_eq!(
config.unwrap().policy.mode,
shipper_core::config::PublishPolicy::Fast
);
}
#[test]
fn config_merge_with_cli_overrides() {
let config = ShipperConfig {
schema_version: "shipper.config.v1".to_string(),
policy: shipper_core::config::PolicyConfig {
mode: shipper_core::config::PublishPolicy::Safe,
},
verify: shipper_core::config::VerifyConfig {
mode: shipper_core::config::VerifyMode::Workspace,
},
readiness: shipper_core::config::ReadinessConfig::default(),
output: shipper_core::config::OutputConfig { lines: 100 },
lock: shipper_core::config::LockConfig {
timeout: Duration::from_secs(1800),
},
flags: shipper_core::config::FlagsConfig {
allow_dirty: false,
skip_ownership_check: false,
strict_ownership: false,
},
retry: shipper_core::config::RetryConfig {
policy: shipper_core::retry::RetryPolicy::Custom,
max_attempts: 10,
base_delay: Duration::from_secs(5),
max_delay: Duration::from_secs(300),
strategy: shipper_core::retry::RetryStrategyType::Exponential,
jitter: 0.5,
per_error: shipper_core::retry::PerErrorConfig::default(),
},
state_dir: None,
registry: None,
registries: shipper_core::config::MultiRegistryConfig::default(),
parallel: shipper_core::config::ParallelConfig::default(),
webhook: shipper_core::config::WebhookConfig::default(),
encryption: shipper_core::config::EncryptionConfigInner::default(),
storage: shipper_core::config::StorageConfigInner::default(),
rehearsal: shipper_core::config::RehearsalConfig::default(),
};
let cli = CliOverrides {
allow_dirty: true,
max_attempts: Some(3),
output_lines: Some(50),
policy: Some(shipper_core::config::PublishPolicy::Fast),
verify_mode: Some(shipper_core::config::VerifyMode::None),
..Default::default()
};
let merged: RuntimeOptions = config.build_runtime_options(cli);
assert!(merged.allow_dirty, "CLI allow_dirty should win");
assert_eq!(merged.max_attempts, 3, "CLI max_attempts should win");
assert_eq!(merged.output_lines, 50, "CLI output_lines should win");
assert_eq!(
merged.policy,
shipper_core::types::PublishPolicy::Fast,
"CLI policy should win"
);
assert_eq!(
merged.verify_mode,
shipper_core::types::VerifyMode::None,
"CLI verify_mode should win"
);
assert_eq!(
merged.base_delay,
Duration::from_secs(5),
"config base_delay should apply"
);
assert_eq!(
merged.max_delay,
Duration::from_secs(300),
"config max_delay should apply"
);
assert_eq!(
merged.lock_timeout,
Duration::from_secs(1800),
"config lock_timeout should apply"
);
}
#[test]
fn run_clean_errors_when_lock_exists_without_force() {
let td = tempdir().expect("tempdir");
let state_dir = PathBuf::from(".shipper");
let abs_state = td.path().join(&state_dir);
fs::create_dir_all(&abs_state).expect("mkdir");
let lock_info = shipper_core::lock::LockInfo {
pid: 12345,
hostname: "test-host".to_string(),
acquired_at: Utc::now(),
plan_id: Some("plan-123".to_string()),
};
let lock_path = shipper_core::lock::lock_path(&abs_state, Some(td.path()));
fs::write(
&lock_path,
serde_json::to_string(&lock_info).expect("serialize"),
)
.expect("write lock");
let err = run_clean(&state_dir, td.path(), false, false).expect_err("must fail");
assert!(err.to_string().contains("cannot clean: active lock exists"));
assert!(lock_path.exists());
}
#[test]
fn run_clean_force_removes_lock_and_state_files() {
let td = tempdir().expect("tempdir");
let state_dir = PathBuf::from(".shipper");
let abs_state = td.path().join(&state_dir);
fs::create_dir_all(&abs_state).expect("mkdir");
let state_path = abs_state.join(shipper_core::state::execution_state::STATE_FILE);
let receipt_path = abs_state.join(shipper_core::state::execution_state::RECEIPT_FILE);
let reconciliation_path =
abs_state.join(shipper_core::state::execution_state::RECONCILIATION_FILE);
let events_path = abs_state.join(shipper_core::state::events::EVENTS_FILE);
let preflight_only_events_path =
abs_state.join("preflight-only-20260421T010101000000000Z-pid123.events.jsonl");
let lock_path = shipper_core::lock::lock_path(&abs_state, Some(td.path()));
fs::write(&state_path, "{}").expect("write state");
fs::write(&receipt_path, "{}").expect("write receipt");
fs::write(&reconciliation_path, "{}").expect("write reconciliation");
fs::write(&events_path, "{}").expect("write events");
fs::write(&preflight_only_events_path, "{}").expect("write preflight-only events");
let lock_info = shipper_core::lock::LockInfo {
pid: 12345,
hostname: "test-host".to_string(),
acquired_at: Utc::now(),
plan_id: Some("plan-123".to_string()),
};
fs::write(
&lock_path,
serde_json::to_string(&lock_info).expect("serialize"),
)
.expect("write lock");
run_clean(&state_dir, td.path(), false, true).expect("clean with force");
assert!(!state_path.exists(), "state file should be removed");
assert!(!receipt_path.exists(), "receipt file should be removed");
assert!(
!reconciliation_path.exists(),
"reconciliation file should be removed"
);
assert!(!events_path.exists(), "events file should be removed");
assert!(
!preflight_only_events_path.exists(),
"preflight-only sidecar should be removed"
);
assert!(!lock_path.exists(), "lock file should be removed");
}
#[test]
fn write_event_lines_since_streams_only_new_events() {
let td = tempdir().expect("tempdir");
let events_path = td.path().join("events.jsonl");
fs::write(
&events_path,
concat!(
r#"{"timestamp":"2025-01-01T00:00:00Z","event_type":{"type":"plan_created","plan_id":"abc123","package_count":1},"package":"all"}"#,
"\n",
),
)
.expect("write first event");
let mut out = Vec::new();
let offset =
write_event_lines_since(&events_path, 0, "json", &mut out).expect("read first");
let first = String::from_utf8(out).expect("utf8");
assert!(first.contains(r#""type":"plan_created""#));
assert_eq!(first.lines().count(), 1);
fs::OpenOptions::new()
.append(true)
.open(&events_path)
.expect("open append")
.write_all(
concat!(
r#"{"timestamp":"2025-01-01T00:00:01Z","event_type":{"type":"execution_started"},"package":"all"}"#,
"\n",
)
.as_bytes(),
)
.expect("append event");
let mut out = Vec::new();
let next_offset =
write_event_lines_since(&events_path, offset, "json", &mut out).expect("read second");
let second = String::from_utf8(out).expect("utf8");
assert!(second.contains(r#""type":"execution_started""#));
assert!(!second.contains(r#""type":"plan_created""#));
assert_eq!(second.lines().count(), 1);
assert!(next_offset > offset);
}
#[test]
fn inspect_events_follow_defers_incomplete_tail_line() {
let td = tempdir().expect("tempdir");
let events_path = td.path().join("events.jsonl");
let first_event = concat!(
r#"{"timestamp":"2025-01-01T00:00:00Z","event_type":{"type":"plan_created","plan_id":"abc123","package_count":1},"package":"all"}"#,
"\n",
);
let partial_event =
r#"{"timestamp":"2025-01-01T00:00:01Z","event_type":{"type":"execution_started"}"#;
fs::write(&events_path, format!("{first_event}{partial_event}")).expect("write events");
let mut out = Vec::new();
let offset =
write_event_lines_since(&events_path, 0, "json", &mut out).expect("read complete");
let text = String::from_utf8(out).expect("utf8");
assert_eq!(offset, first_event.len() as u64);
assert!(text.contains(r#""type":"plan_created""#));
assert!(!text.contains(r#""type":"execution_started""#));
assert_eq!(text.lines().count(), 1);
fs::OpenOptions::new()
.append(true)
.open(&events_path)
.expect("open append")
.write_all(br#","package":"all"}"#)
.expect("append event body");
fs::OpenOptions::new()
.append(true)
.open(&events_path)
.expect("open append")
.write_all(b"\n")
.expect("append newline");
let mut out = Vec::new();
let next_offset =
write_event_lines_since(&events_path, offset, "json", &mut out).expect("read tail");
let text = String::from_utf8(out).expect("utf8");
assert!(next_offset > offset);
assert!(text.contains(r#""type":"execution_started""#));
assert!(!text.contains(r#""type":"plan_created""#));
assert_eq!(text.lines().count(), 1);
}
#[test]
fn inspect_events_follow_reports_completed_malformed_line() {
let td = tempdir().expect("tempdir");
let events_path = td.path().join("events.jsonl");
fs::write(&events_path, "{\"not\":\"a publish event\"}\n").expect("write events");
let mut out = Vec::new();
let err = write_event_lines_since(&events_path, 0, "json", &mut out)
.expect_err("malformed completed line should fail");
assert!(
err.to_string().contains("failed to parse event JSON"),
"{err:#}"
);
assert!(out.is_empty());
}
#[test]
fn write_event_lines_since_missing_file_keeps_offset() {
let td = tempdir().expect("tempdir");
let events_path = td.path().join("missing-events.jsonl");
let mut out = Vec::new();
let offset =
write_event_lines_since(&events_path, 42, "text", &mut out).expect("missing file");
assert_eq!(offset, 42);
assert!(out.is_empty());
}
#[test]
fn write_event_lines_since_renders_text_follow_events() {
let td = tempdir().expect("tempdir");
let events_path = td.path().join("events.jsonl");
fs::write(
&events_path,
concat!(
r#"{"timestamp":"2025-01-01T00:00:00Z","event_type":{"type":"plan_created","plan_id":"abc123","package_count":1},"package":"all"}"#,
"\n",
r#"{"timestamp":"2025-01-01T00:00:01Z","event_type":{"type":"execution_started"},"package":"all"}"#,
"\n",
),
)
.expect("write events");
let mut out = Vec::new();
let offset = write_event_lines_since(&events_path, 0, "text", &mut out).expect("read");
let text = String::from_utf8(out).expect("utf8");
assert!(offset > 0);
assert!(text.contains("2025-01-01T00:00:00Z all plan_created - plan created"));
assert!(text.contains("2025-01-01T00:00:01Z all execution_started - execution started"));
assert!(!text.contains(r#""type":"plan_created""#));
}
#[test]
fn discover_event_logs_includes_preflight_only_sidecars() {
let td = tempdir().expect("tempdir");
let state_dir = td.path().join(".shipper");
fs::create_dir_all(&state_dir).expect("mkdir");
let sidecar = state_dir.join("preflight-only-20260421T010101000000000Z-pid1.events.jsonl");
fs::write(&sidecar, "{}").expect("write sidecar");
let discovered = discover_event_logs(&state_dir).expect("discover event logs");
assert_eq!(discovered, vec![sidecar]);
}
}