use std::time::Instant;
use crate::logging::audit::new_run_id;
use serde_json::json;
use crate::logging::ts_for_mode;
use crate::logging::{AuditSink, FactsEmitter};
use crate::types::ids::plan_id;
use crate::types::{Action, ApplyMode, ApplyReport, Plan};
use log::Level;
use crate::api::Switchyard;
use crate::logging::audit::{AuditCtx, AuditMode};
use crate::logging::StageLogger;
mod audit_fields;
mod executors;
mod handlers;
mod lock;
mod perf;
mod policy_gate;
mod rollback;
mod summary;
mod util;
use perf::PerfAgg;
#[allow(
clippy::too_many_lines,
reason = "Will be split further in PR6/PR8; keeping parity now"
)]
pub(crate) fn run<E: FactsEmitter, A: AuditSink>(
api: &Switchyard<E, A>,
plan: &Plan,
mode: ApplyMode,
) -> ApplyReport {
let t0 = Instant::now();
let mut executed: Vec<Action> = Vec::new();
let mut executed_indices: Vec<usize> = Vec::new();
let mut errors: Vec<String> = Vec::new();
let mut rollback_errors: Vec<String> = Vec::new();
let mut rolled_back = false;
let mut rolled_paths_opt: Option<Vec<String>> = None;
let dry = matches!(mode, ApplyMode::DryRun);
let pid = plan_id(plan);
let ts_now = ts_for_mode(&mode);
let run_id = new_run_id();
let tctx = AuditCtx::new(
&api.facts,
pid.to_string(),
run_id,
ts_now.clone(),
AuditMode {
dry_run: dry,
redact: dry,
},
);
let slog = StageLogger::new(&tctx);
api.audit.log(Level::Info, "apply: starting");
let linfo = lock::acquire(api, t0, pid, mode, &tctx);
let mut _lock_guard: Option<Box<dyn crate::adapters::lock::LockGuard>> = linfo.guard;
if let Some(early) = linfo.early_report {
return early;
}
let approx_attempts = if linfo.lock_backend == "none" {
linfo.approx_attempts
} else {
linfo.approx_attempts.max(2u64)
};
slog.apply_attempt()
.merge(&json!({
"lock_backend": linfo.lock_backend,
"lock_wait_ms": linfo.lock_wait_ms,
"lock_attempts": approx_attempts,
}))
.emit_success();
if let Some(report) = policy_gate::enforce(api, plan, pid, dry, t0, &slog) {
return report;
}
let mut perf_total = PerfAgg::default();
for (idx, act) in plan.actions.iter().enumerate() {
match act {
Action::EnsureSymlink { .. } => {
let (exec, err, perf) =
handlers::handle_ensure_symlink(api, &tctx, &pid, act, idx, dry, &slog);
perf_total.hash += perf.hash;
perf_total.backup += perf.backup;
perf_total.swap += perf.swap;
if let Some(e) = err {
errors.push(e);
}
if let Some(a) = exec {
executed.push(a);
executed_indices.push(idx);
}
}
Action::RestoreFromBackup { .. } => {
let (exec, err, perf) =
handlers::handle_restore(api, &tctx, &pid, act, idx, dry, &slog);
perf_total.hash += perf.hash;
perf_total.backup += perf.backup;
perf_total.swap += perf.swap;
if let Some(e) = err {
errors.push(e);
}
if let Some(a) = exec {
executed.push(a);
executed_indices.push(idx);
}
}
}
if !errors.is_empty() {
if !dry {
rolled_back = true;
let rolled_paths = rollback::do_rollback(
api,
&pid,
&executed,
&executed_indices,
dry,
&slog,
&mut rollback_errors,
);
rolled_paths_opt = Some(rolled_paths);
rollback::emit_summary(&slog, &rollback_errors);
perf_total.swap += 0; }
break;
}
}
if errors.is_empty() && !dry {
if let Some(smoke) = &api.smoke {
if smoke.run(plan).is_err() {
errors.push("smoke tests failed".to_string());
let auto_rb = match api.policy.governance.smoke {
crate::policy::types::SmokePolicy::Require { auto_rollback } => auto_rollback,
crate::policy::types::SmokePolicy::Off => true,
};
if auto_rb {
rolled_back = true;
let rolled_paths = rollback::do_rollback(
api,
&pid,
&executed,
&executed_indices,
dry,
&slog,
&mut rollback_errors,
);
rolled_paths_opt = Some(rolled_paths);
}
}
} else {
if matches!(
api.policy.governance.smoke,
crate::policy::types::SmokePolicy::Require { .. }
) {
errors.push("smoke runner missing".to_string());
let auto_rb = match api.policy.governance.smoke {
crate::policy::types::SmokePolicy::Require { auto_rollback } => auto_rollback,
crate::policy::types::SmokePolicy::Off => true,
};
if auto_rb {
rolled_back = true;
let rolled_paths = rollback::do_rollback(
api,
&pid,
&executed,
&executed_indices,
dry,
&slog,
&mut rollback_errors,
);
rolled_paths_opt = Some(rolled_paths);
}
}
}
}
let decision = if errors.is_empty() {
"success"
} else {
"failure"
};
let mut builder = summary::ApplySummary::new(&linfo.lock_backend, linfo.lock_wait_ms);
if !dry {
builder = builder.attestation(api, pid, executed.len(), rolled_back);
}
if decision == "failure" {
builder = builder.errors(&errors).smoke_or_policy_mapping(&errors);
}
if let Some(ref rb_paths) = rolled_paths_opt {
builder = builder.rolled_back_paths(rb_paths);
} else {
builder = builder.no_rollback();
}
let executed_count = executed.len();
let rolled_back_count = rolled_paths_opt.as_ref().map_or(0, Vec::len);
builder = builder.executed_counts(executed_count, rolled_back_count);
builder.perf(perf_total).emit(&slog, decision);
api.audit.log(Level::Info, "apply: finished");
let duration_ms = u64::try_from(t0.elapsed().as_millis()).unwrap_or(u64::MAX);
ApplyReport {
executed,
duration_ms,
errors,
plan_uuid: Some(pid),
rolled_back,
rollback_errors,
}
}