use crate::core::store::sandbox::SandboxLevel;
use crate::core::types::SandboxBackend;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConvergenceResult {
pub resource_id: String,
pub resource_type: String,
pub converged: bool,
pub idempotent: bool,
pub preserved: bool,
pub duration_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl ConvergenceResult {
pub fn passed(&self) -> bool {
self.converged && self.idempotent && self.preserved && self.error.is_none()
}
}
impl fmt::Display for ConvergenceResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let status = if self.passed() { "PASS" } else { "FAIL" };
write!(
f,
"[{status}] {}/{}: converge={} idem={} preserve={} ({}ms)",
self.resource_id,
self.resource_type,
self.converged,
self.idempotent,
self.preserved,
self.duration_ms,
)
}
}
#[derive(Debug, Clone)]
pub struct ConvergenceTestConfig {
pub backend: SandboxBackend,
pub level: SandboxLevel,
pub test_pairs: bool,
pub parallelism: usize,
}
impl Default for ConvergenceTestConfig {
fn default() -> Self {
Self {
backend: SandboxBackend::Pepita,
level: SandboxLevel::Minimal,
test_pairs: false,
parallelism: 4,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RunnerMode {
Simulated,
Sandbox,
}
impl fmt::Display for RunnerMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Simulated => write!(f, "simulated"),
Self::Sandbox => write!(f, "sandbox"),
}
}
}
pub fn backend_available(backend: SandboxBackend) -> bool {
match backend {
SandboxBackend::Pepita => std::path::Path::new("/usr/local/bin/pepita").exists(),
SandboxBackend::Container => {
std::process::Command::new("docker")
.arg("--version")
.output()
.map(|o| o.status.success())
.unwrap_or(false)
|| std::process::Command::new("podman")
.arg("--version")
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
SandboxBackend::Chroot => {
std::fs::read_to_string("/proc/self/status")
.map(|s| s.lines().any(|l| l.starts_with("Uid:\t0\t")))
.unwrap_or(false)
}
}
}
pub fn resolve_mode(backend: SandboxBackend) -> RunnerMode {
if backend_available(backend) {
RunnerMode::Sandbox
} else {
RunnerMode::Simulated
}
}
#[derive(Debug, Clone)]
pub struct ConvergenceTarget {
pub resource_id: String,
pub resource_type: String,
pub apply_script: String,
pub state_query_script: String,
pub expected_hash: String,
}
pub fn run_convergence_test_dispatch(
target: &ConvergenceTarget,
backend: SandboxBackend,
) -> ConvergenceResult {
let mode = resolve_mode(backend);
match (mode, backend) {
(RunnerMode::Sandbox, SandboxBackend::Container) => {
super::convergence_container::run_convergence_test_container(target)
}
_ => run_convergence_test(target),
}
}
pub fn run_convergence_test(target: &ConvergenceTarget) -> ConvergenceResult {
let start = std::time::Instant::now();
let sandbox_dir = std::env::temp_dir().join(format!(
"forjar-conv-{}-{:x}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
let _ = std::fs::create_dir_all(&sandbox_dir);
let result = run_convergence_in_sandbox(target, &sandbox_dir, start);
let _ = std::fs::remove_dir_all(&sandbox_dir);
result
}
fn run_convergence_in_sandbox(
target: &ConvergenceTarget,
sandbox_dir: &std::path::Path,
start: std::time::Instant,
) -> ConvergenceResult {
let first_apply = local_apply(&target.apply_script, sandbox_dir);
if let Err(e) = first_apply {
return ConvergenceResult {
resource_id: target.resource_id.clone(),
resource_type: target.resource_type.clone(),
converged: false,
idempotent: false,
preserved: false,
duration_ms: start.elapsed().as_millis() as u64,
error: Some(e),
};
}
let state_after_first = local_state_query(&target.state_query_script, sandbox_dir);
let converged = if target.expected_hash.is_empty() {
state_after_first.is_ok()
} else {
state_after_first
.as_ref()
.map(|h| h == &target.expected_hash)
.unwrap_or(false)
};
let second_apply = local_apply(&target.apply_script, sandbox_dir);
let idempotent = second_apply.is_ok();
let state_after_second = local_state_query(&target.state_query_script, sandbox_dir);
let preserved = match (&state_after_first, &state_after_second) {
(Ok(h1), Ok(h2)) => h1 == h2,
_ => false,
};
ConvergenceResult {
resource_id: target.resource_id.clone(),
resource_type: target.resource_type.clone(),
converged,
idempotent,
preserved,
duration_ms: start.elapsed().as_millis() as u64,
error: None,
}
}
pub fn run_convergence_parallel(
targets: Vec<ConvergenceTarget>,
parallelism: usize,
) -> Vec<ConvergenceResult> {
run_convergence_parallel_with_backend(targets, parallelism, SandboxBackend::Pepita)
}
pub fn run_convergence_parallel_with_backend(
targets: Vec<ConvergenceTarget>,
parallelism: usize,
backend: SandboxBackend,
) -> Vec<ConvergenceResult> {
if targets.is_empty() {
return Vec::new();
}
let par = parallelism.max(1);
let results = std::sync::Mutex::new(Vec::with_capacity(targets.len()));
let chunks: Vec<_> = targets.chunks(par).collect();
for chunk in chunks {
std::thread::scope(|s| {
let handles: Vec<_> = chunk
.iter()
.map(|target| s.spawn(|| run_convergence_test_dispatch(target, backend)))
.collect();
for handle in handles {
if let Ok(result) = handle.join() {
results
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(result);
}
}
});
}
results.into_inner().unwrap_or_else(|e| e.into_inner())
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ConvergenceSummary {
pub total: usize,
pub passed: usize,
pub convergence_failures: usize,
pub idempotency_failures: usize,
pub preservation_failures: usize,
}
impl ConvergenceSummary {
pub fn from_results(results: &[ConvergenceResult]) -> Self {
Self {
total: results.len(),
passed: results.iter().filter(|r| r.passed()).count(),
convergence_failures: results.iter().filter(|r| !r.converged).count(),
idempotency_failures: results.iter().filter(|r| !r.idempotent).count(),
preservation_failures: results.iter().filter(|r| !r.preserved).count(),
}
}
pub fn pass_rate(&self) -> f64 {
if self.total == 0 {
return 100.0;
}
(self.passed as f64 / self.total as f64) * 100.0
}
}
impl fmt::Display for ConvergenceSummary {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Convergence: {}/{} passed ({:.0}%)",
self.passed,
self.total,
self.pass_rate(),
)?;
if self.convergence_failures > 0 {
write!(f, ", {} convergence failures", self.convergence_failures)?;
}
if self.idempotency_failures > 0 {
write!(f, ", {} idempotency failures", self.idempotency_failures)?;
}
if self.preservation_failures > 0 {
write!(f, ", {} preservation failures", self.preservation_failures)?;
}
Ok(())
}
}
pub fn format_convergence_report(results: &[ConvergenceResult]) -> String {
let mut out = String::new();
let summary = ConvergenceSummary::from_results(results);
out.push_str(&format!("{summary}\n"));
out.push_str("================================\n");
for r in results {
out.push_str(&format!(" {r}\n"));
}
if summary.passed < summary.total {
out.push_str("\nFailures:\n");
for r in results.iter().filter(|r| !r.passed()) {
if let Some(err) = &r.error {
out.push_str(&format!(" {}: {err}\n", r.resource_id));
} else {
let mut reasons = Vec::new();
if !r.converged {
reasons.push("convergence");
}
if !r.idempotent {
reasons.push("idempotency");
}
if !r.preserved {
reasons.push("preservation");
}
out.push_str(&format!(
" {}: failed {}\n",
r.resource_id,
reasons.join(", "),
));
}
}
}
out
}
const UNSAFE_PATTERNS: &[&str] = &[
"systemctl ",
"apt-get ",
"apt ",
"dpkg ",
"yum ",
"dnf ",
"pacman ",
"mount ",
"umount ",
"pkill ",
"kill ",
"shutdown ",
"reboot ",
"rm -rf /",
"dd if=",
"mkfs",
"fdisk",
];
fn is_script_safe_for_local(script: &str) -> bool {
!UNSAFE_PATTERNS.iter().any(|p| script.contains(p))
}
fn local_apply(script: &str, sandbox_dir: &std::path::Path) -> Result<String, String> {
if script.is_empty() {
return Err("empty apply script".into());
}
if !is_script_safe_for_local(script) {
return Err("script contains system commands (requires container backend)".into());
}
let output = std::process::Command::new("bash")
.args(["-euo", "pipefail", "-c", script])
.current_dir(sandbox_dir)
.env("FORJAR_SANDBOX", sandbox_dir)
.output()
.map_err(|e| format!("local exec: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"exit {}: {}",
output.status.code().unwrap_or(-1),
stderr.trim()
));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let refs = [stdout.as_ref()];
Ok(crate::tripwire::hasher::composite_hash(&refs))
}
fn local_state_query(script: &str, sandbox_dir: &std::path::Path) -> Result<String, String> {
if script.is_empty() {
return Err("empty state query script".into());
}
let output = std::process::Command::new("bash")
.args(["-euo", "pipefail", "-c", script])
.current_dir(sandbox_dir)
.env("FORJAR_SANDBOX", sandbox_dir)
.output()
.map_err(|e| format!("state query exec: {e}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
let refs = [stdout.as_ref()];
Ok(crate::tripwire::hasher::composite_hash(&refs))
}