use std::path::Path;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
use anyhow::{Result, bail};
use chrono::Utc;
use crate::ops::cargo;
use crate::plan::PlannedWorkspace;
use crate::runtime::execution::{
backoff_delay, classify_cargo_failure, pkg_key, update_state_locked,
};
use crate::state::events;
use crate::state::execution_state as state;
use shipper_registry::HttpRegistryClient as RegistryClient;
use shipper_types::{
AttemptEvidence, ErrorClass, EventType, ExecutionState, PackageEvidence, PackageReceipt,
PackageState, PlannedPackage, PublishEvent, PublishLevel, ReadinessConfig, ReadinessEvidence,
RuntimeOptions,
};
use super::Reporter;
use super::policy::policy_effects;
use super::readiness::is_version_visible_with_backoff;
use super::webhook::{WebhookEvent, maybe_send_event};
use crate::plan::chunking::chunk_by_max_concurrent;
#[derive(Debug)]
pub(super) struct PackagePublishResult {
pub(super) result: anyhow::Result<PackageReceipt>,
}
#[allow(clippy::too_many_arguments)]
pub(super) fn publish_package(
p: &PlannedPackage,
ws: &PlannedWorkspace,
opts: &RuntimeOptions,
reg: &RegistryClient,
st: &Arc<Mutex<ExecutionState>>,
state_dir: &Path,
event_log: &Arc<Mutex<events::EventLog>>,
events_path: &Path,
reporter: &Arc<Mutex<dyn Reporter + Send>>,
) -> PackagePublishResult {
let key = pkg_key(&p.name, &p.version);
let pkg_label = format!("{}@{}", p.name, p.version);
let started_at = Utc::now();
let start_instant = Instant::now();
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: started_at,
event_type: EventType::PackageStarted {
name: p.name.clone(),
version: p.version.clone(),
},
package: pkg_label.clone(),
});
let _ = log.write_to_file(events_path);
log.clear();
}
if let Ok(true) = reg.version_exists(&p.name, &p.version) {
{
let mut rep = reporter.lock().unwrap();
rep.info(&format!(
"{}@{}: already published (skipping)",
p.name, p.version
));
}
let skipped = PackageState::Skipped {
reason: "already published".to_string(),
};
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, skipped.clone());
let _ = state::save_state(state_dir, &state);
}
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: Utc::now(),
event_type: EventType::PackageSkipped {
reason: "already published".to_string(),
},
package: pkg_label.clone(),
});
let _ = log.write_to_file(events_path);
log.clear();
}
return PackagePublishResult {
result: Ok(PackageReceipt {
name: p.name.clone(),
version: p.version.clone(),
attempts: 0,
state: skipped,
started_at,
finished_at: Utc::now(),
duration_ms: start_instant.elapsed().as_millis(),
evidence: PackageEvidence {
attempts: vec![],
readiness_checks: vec![],
},
}),
};
}
{
let mut rep = reporter.lock().unwrap();
rep.info(&format!("{}@{}: publishing...", p.name, p.version));
}
let mut attempt = 0u32;
let mut last_err: Option<(ErrorClass, String)> = None;
let mut attempt_evidence: Vec<AttemptEvidence> = Vec::new();
let mut readiness_evidence: Vec<ReadinessEvidence> = Vec::new();
let mut cargo_succeeded = false;
{
let state = st.lock().unwrap();
if let Some(pr) = state.packages.get(&key)
&& matches!(pr.state, PackageState::Uploaded)
{
cargo_succeeded = true;
}
}
let effects = policy_effects(opts);
let readiness_config = ReadinessConfig {
enabled: effects.readiness_enabled,
..opts.readiness.clone()
};
while attempt < opts.max_attempts {
attempt += 1;
{
let mut state = st.lock().unwrap();
if let Some(pr) = state.packages.get_mut(&key) {
pr.attempts = attempt;
pr.last_updated_at = Utc::now();
}
let _ = state::save_state(state_dir, &state);
}
let command = format!(
"cargo publish -p {} --registry {}",
p.name, ws.plan.registry.name
);
{
let mut rep = reporter.lock().unwrap();
rep.info(&format!(
"{}@{}: attempt {}/{}",
p.name, p.version, attempt, opts.max_attempts
));
}
if !cargo_succeeded {
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: Utc::now(),
event_type: EventType::PackageAttempted {
attempt,
command: command.clone(),
},
package: pkg_label.clone(),
});
}
let out = match cargo::cargo_publish(
&ws.workspace_root,
&p.name,
&ws.plan.registry.name,
opts.allow_dirty,
opts.no_verify,
opts.output_lines,
Some(opts.parallel.per_package_timeout),
) {
Ok(o) => o,
Err(e) => {
{
let mut rep = reporter.lock().unwrap();
rep.error(&format!(
"{}@{}: cargo publish failed to execute: {}",
p.name, p.version, e
));
}
return PackagePublishResult { result: Err(e) };
}
};
attempt_evidence.push(AttemptEvidence {
attempt_number: attempt,
command: command.clone(),
exit_code: out.exit_code,
stdout_tail: out.stdout_tail.clone(),
stderr_tail: out.stderr_tail.clone(),
timestamp: Utc::now(),
duration: out.duration,
});
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: Utc::now(),
event_type: EventType::PackageOutput {
stdout_tail: out.stdout_tail.clone(),
stderr_tail: out.stderr_tail.clone(),
},
package: pkg_label.clone(),
});
}
if out.exit_code == 0 && !out.timed_out {
cargo_succeeded = true;
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, PackageState::Uploaded);
let _ = state::save_state(state_dir, &state);
}
} else {
{
let mut rep = reporter.lock().unwrap();
rep.warn(&format!(
"{}@{}: cargo publish failed (exit={}); checking registry...",
p.name, p.version, out.exit_code
));
}
if reg.version_exists(&p.name, &p.version).unwrap_or(false) {
{
let mut rep = reporter.lock().unwrap();
rep.info(&format!(
"{}@{}: version is present on registry; treating as published",
p.name, p.version
));
}
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, PackageState::Published);
let _ = state::save_state(state_dir, &state);
}
last_err = None;
break;
}
let (class, msg) = classify_cargo_failure(&out.stderr_tail, &out.stdout_tail);
last_err = Some((class.clone(), msg.clone()));
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: Utc::now(),
event_type: EventType::PackageFailed {
class: class.clone(),
message: msg.clone(),
},
package: pkg_label.clone(),
});
}
match class {
ErrorClass::Permanent => {
let failed = PackageState::Failed {
class: class.clone(),
message: msg.clone(),
};
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, failed);
let _ = state::save_state(state_dir, &state);
}
{
let mut log = event_log.lock().unwrap();
let _ = log.write_to_file(events_path);
log.clear();
}
maybe_send_event(
&opts.webhook,
WebhookEvent::PublishFailed {
plan_id: ws.plan.plan_id.clone(),
package_name: p.name.clone(),
package_version: p.version.clone(),
error_class: format!("{:?}", class),
message: msg.clone(),
},
);
return PackagePublishResult {
result: Err(anyhow::anyhow!(
"{}@{}: permanent failure: {}",
p.name,
p.version,
msg
)),
};
}
ErrorClass::Retryable | ErrorClass::Ambiguous => {
let delay = backoff_delay(
opts.base_delay,
opts.max_delay,
attempt,
opts.retry_strategy,
opts.retry_jitter,
);
{
let mut rep = reporter.lock().unwrap();
rep.warn(&format!(
"{}@{}: retrying in {}",
p.name,
p.version,
humantime::format_duration(delay)
));
}
thread::sleep(delay);
}
}
continue;
}
}
{
let mut rep = reporter.lock().unwrap();
rep.info(&format!(
"{}@{}: cargo publish exited successfully; verifying...",
p.name, p.version
));
}
let verify_result =
is_version_visible_with_backoff(reg, &p.name, &p.version, &readiness_config);
match verify_result {
Ok((visible, checks)) => {
readiness_evidence = checks;
if visible {
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, PackageState::Published);
let _ = state::save_state(state_dir, &state);
}
last_err = None;
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: Utc::now(),
event_type: EventType::PackagePublished {
duration_ms: start_instant.elapsed().as_millis() as u64,
},
package: pkg_label.clone(),
});
let _ = log.write_to_file(events_path);
log.clear();
}
maybe_send_event(
&opts.webhook,
WebhookEvent::PublishSucceeded {
plan_id: ws.plan.plan_id.clone(),
package_name: p.name.clone(),
package_version: p.version.clone(),
duration_ms: start_instant.elapsed().as_millis() as u64,
},
);
break;
} else {
last_err = Some((ErrorClass::Ambiguous, "publish succeeded locally, but version not observed on registry within timeout".into()));
let delay = backoff_delay(
opts.base_delay,
opts.max_delay,
attempt,
opts.retry_strategy,
opts.retry_jitter,
);
thread::sleep(delay);
}
}
Err(_) => {
last_err = Some((ErrorClass::Ambiguous, "readiness check failed".into()));
let delay = backoff_delay(
opts.base_delay,
opts.max_delay,
attempt,
opts.retry_strategy,
opts.retry_jitter,
);
thread::sleep(delay);
}
}
}
if last_err.is_none() {
let current_state = st
.lock()
.unwrap()
.packages
.get(&key)
.map(|p| p.state.clone());
if matches!(current_state, Some(PackageState::Uploaded)) {
if reg.version_exists(&p.name, &p.version).unwrap_or(false) {
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, PackageState::Published);
let _ = state::save_state(state_dir, &state);
}
maybe_send_event(
&opts.webhook,
WebhookEvent::PublishSucceeded {
plan_id: ws.plan.plan_id.clone(),
package_name: p.name.clone(),
package_version: p.version.clone(),
duration_ms: start_instant.elapsed().as_millis() as u64,
},
);
} else {
last_err = Some((
ErrorClass::Ambiguous,
"package was uploaded but not confirmed visible on registry".into(),
));
}
}
}
let finished_at = Utc::now();
let duration_ms = start_instant.elapsed().as_millis();
if let Some((class, msg)) = last_err {
if reg.version_exists(&p.name, &p.version).unwrap_or(false) {
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, PackageState::Published);
let _ = state::save_state(state_dir, &state);
}
maybe_send_event(
&opts.webhook,
WebhookEvent::PublishSucceeded {
plan_id: ws.plan.plan_id.clone(),
package_name: p.name.clone(),
package_version: p.version.clone(),
duration_ms: duration_ms as u64,
},
);
return PackagePublishResult {
result: Ok(PackageReceipt {
name: p.name.clone(),
version: p.version.clone(),
attempts: st
.lock()
.unwrap()
.packages
.get(&key)
.map_or(0, |p| p.attempts),
state: PackageState::Published,
started_at,
finished_at,
duration_ms,
evidence: PackageEvidence {
attempts: attempt_evidence,
readiness_checks: readiness_evidence,
},
}),
};
} else {
let error_class_str = format!("{:?}", class);
let failed = PackageState::Failed {
class,
message: msg.clone(),
};
{
let mut state = st.lock().unwrap();
update_state_locked(&mut state, &key, failed);
let _ = state::save_state(state_dir, &state);
}
{
let mut log = event_log.lock().unwrap();
log.record(PublishEvent {
timestamp: Utc::now(),
event_type: EventType::PackageFailed {
class: ErrorClass::Ambiguous,
message: msg.clone(),
},
package: pkg_label,
});
let _ = log.write_to_file(events_path);
log.clear();
}
maybe_send_event(
&opts.webhook,
WebhookEvent::PublishFailed {
plan_id: ws.plan.plan_id.clone(),
package_name: p.name.clone(),
package_version: p.version.clone(),
error_class: error_class_str,
message: msg.clone(),
},
);
return PackagePublishResult {
result: Err(anyhow::anyhow!("{}@{}: failed: {}", p.name, p.version, msg)),
};
}
}
maybe_send_event(
&opts.webhook,
WebhookEvent::PublishSucceeded {
plan_id: ws.plan.plan_id.clone(),
package_name: p.name.clone(),
package_version: p.version.clone(),
duration_ms: duration_ms as u64,
},
);
PackagePublishResult {
result: Ok(PackageReceipt {
name: p.name.clone(),
version: p.version.clone(),
attempts: st
.lock()
.unwrap()
.packages
.get(&key)
.map_or(0, |p| p.attempts),
state: PackageState::Published,
started_at,
finished_at,
duration_ms,
evidence: PackageEvidence {
attempts: attempt_evidence,
readiness_checks: readiness_evidence,
},
}),
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn run_publish_level(
level: &PublishLevel,
ws: &PlannedWorkspace,
opts: &RuntimeOptions,
reg: &RegistryClient,
st: &Arc<Mutex<ExecutionState>>,
state_dir: &Path,
event_log: &Arc<Mutex<events::EventLog>>,
events_path: &Path,
reporter: &Arc<Mutex<dyn Reporter + Send>>,
) -> Result<Vec<PackageReceipt>> {
let num_packages = level.packages.len();
let max_concurrent = opts.parallel.max_concurrent.min(num_packages);
reporter.lock().unwrap().info(&format!(
"Level {}: publishing {} packages (max concurrent: {})",
level.level, num_packages, max_concurrent
));
let mut all_receipts: Vec<PackageReceipt> = Vec::new();
let mut errors: Vec<String> = Vec::new();
for chunk in chunk_by_max_concurrent(&level.packages, max_concurrent) {
let mut handles: Vec<std::thread::JoinHandle<PackagePublishResult>> = Vec::new();
for p in chunk {
let p = p.clone();
let ws_clone = ws.clone();
let opts_clone = opts.clone();
let reg_clone = reg.clone();
let st_clone = Arc::clone(st);
let state_dir = state_dir.to_path_buf();
let event_log_clone = Arc::clone(event_log);
let events_path = events_path.to_path_buf();
let reporter_clone = Arc::clone(reporter);
let handle = thread::spawn(move || {
publish_package(
&p,
&ws_clone,
&opts_clone,
®_clone,
&st_clone,
&state_dir,
&event_log_clone,
&events_path,
&reporter_clone,
)
});
handles.push(handle);
}
for handle in handles {
let result = handle
.join()
.map_err(|_| anyhow::anyhow!("publish thread panicked"))?;
match result.result {
Ok(receipt) => all_receipts.push(receipt),
Err(e) => errors.push(format!("{e:#}")),
}
}
}
if !errors.is_empty() {
bail!(
"parallel publish failed for {} package(s): {}",
errors.len(),
errors.join("; ")
);
}
Ok(all_receipts)
}