use tracing::{debug, error, info, warn};
use crate::config::{ConfigHasher, DeployConfig};
use crate::error::{HalldyllError, ReconcileError, Result};
use crate::planner::{DeploymentPlan, DiffEngine, PlanExecutor};
use crate::runpod::{ObservedPod, PodObserver, PodProvisioner};
use crate::state::{DeploymentState, StateStore};
pub struct Reconciler<'a, S: StateStore> {
config: &'a DeployConfig,
state_store: &'a S,
provisioner: &'a PodProvisioner,
observer: &'a PodObserver,
hasher: ConfigHasher,
diff_engine: DiffEngine,
max_attempts: u32,
}
#[derive(Debug, serde::Serialize)]
pub struct ReconciliationResult {
pub success: bool,
pub created: usize,
pub updated: usize,
pub deleted: usize,
pub unchanged: usize,
pub errors: Vec<String>,
#[serde(skip)]
pub final_state: Option<DeploymentState>,
}
impl<'a, S: StateStore> Reconciler<'a, S> {
#[must_use]
pub const fn new(
config: &'a DeployConfig,
state_store: &'a S,
provisioner: &'a PodProvisioner,
observer: &'a PodObserver,
) -> Self {
Self {
config,
state_store,
provisioner,
observer,
hasher: ConfigHasher::new(),
diff_engine: DiffEngine::new(),
max_attempts: 3,
}
}
#[must_use]
pub const fn with_max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts;
self
}
pub async fn reconcile(&self) -> Result<ReconciliationResult> {
info!(
"Starting reconciliation for {}/{}",
self.config.project.name, self.config.project.environment
);
let config_hash = self.hasher.hash_config(self.config);
let mut state = self
.state_store
.load()
.await?
.unwrap_or_else(|| {
DeploymentState::new(&self.config.project.name, &self.config.project.environment)
});
let observed = self
.observer
.list_project_pods(&self.config.project.name, &self.config.project.environment)
.await?;
debug!("Found {} existing pods", observed.len());
let mut last_error = None;
let mut result = ReconciliationResult {
success: false,
created: 0,
updated: 0,
deleted: 0,
unchanged: 0,
errors: vec![],
final_state: None,
};
for attempt in 1..=self.max_attempts {
debug!("Reconciliation attempt {}/{}", attempt, self.max_attempts);
match self
.reconcile_once(&mut state, &observed, &config_hash)
.await
{
Ok(r) => {
result = r;
if result.success {
break;
}
if attempt < self.max_attempts {
warn!("Reconciliation partially succeeded, retrying...");
}
}
Err(err) => {
error!("Reconciliation attempt {} failed: {}", attempt, err);
result.errors.push(format!("Attempt {attempt}: {err}"));
last_error = Some(err);
if attempt < self.max_attempts {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
if let Err(e) = self.state_store.save(&state).await {
error!("Failed to save state: {}", e);
result.errors.push(format!("Failed to save state: {e}"));
}
result.final_state = Some(state);
if !result.success && let Some(err) = last_error {
return Err(err);
}
Ok(result)
}
async fn reconcile_once(
&self,
state: &mut DeploymentState,
observed: &[ObservedPod],
config_hash: &str,
) -> Result<ReconciliationResult> {
let diff = self
.diff_engine
.compute_diff(self.config, Some(state), observed);
info!(
"Diff: {} creates, {} updates, {} deletes, {} unchanged",
diff.creates, diff.updates, diff.deletes, diff.unchanged
);
if !diff.has_changes() {
info!("No changes required - state is converged");
return Ok(ReconciliationResult {
success: true,
created: 0,
updated: 0,
deleted: 0,
unchanged: diff.unchanged,
errors: vec![],
final_state: None,
});
}
let plan = DeploymentPlan::from_diff(&diff, self.config, config_hash);
if !plan.passes_guardrails {
return Err(HalldyllError::Reconcile(ReconcileError::Aborted {
reason: format!(
"Plan violates guardrails: {}",
plan.guardrail_violations.join(", ")
),
}));
}
let executor = PlanExecutor::new(self.provisioner, &self.config.project)
.with_continue_on_error(true);
let execution_result = executor.execute(&plan, state).await?;
let mut errors: Vec<String> = execution_result
.results
.iter()
.filter(|r| !r.success)
.filter_map(|r| r.error.clone())
.collect();
if !execution_result.success {
errors.insert(
0,
format!(
"{} of {} actions failed",
execution_result.failed, execution_result.total_executed
),
);
}
Ok(ReconciliationResult {
success: execution_result.success,
created: diff.creates,
updated: diff.updates,
deleted: diff.deletes,
unchanged: diff.unchanged,
errors,
final_state: None,
})
}
pub async fn check_drift(&self) -> Result<DriftReport> {
info!(
"Checking for drift in {}/{}",
self.config.project.name, self.config.project.environment
);
let state = self.state_store.load().await?;
let observed = self
.observer
.list_project_pods(&self.config.project.name, &self.config.project.environment)
.await?;
let diff = self
.diff_engine
.compute_diff(self.config, state.as_ref(), &observed);
let drifted_resources: Vec<String> = diff
.diffs
.iter()
.filter(|d| {
matches!(
d.diff_type,
crate::planner::DiffType::Drift
| crate::planner::DiffType::Update
| crate::planner::DiffType::Create
| crate::planner::DiffType::Delete
)
})
.map(|d| d.name.clone())
.collect();
Ok(DriftReport {
has_drift: diff.has_changes(),
drifted_resources,
total_resources: self.config.pods.len(),
observed_count: observed.len(),
})
}
}
#[derive(Debug, serde::Serialize)]
pub struct DriftReport {
pub has_drift: bool,
pub drifted_resources: Vec<String>,
pub total_resources: usize,
pub observed_count: usize,
}
impl DriftReport {
#[must_use]
pub const fn is_converged(&self) -> bool {
!self.has_drift
}
}
impl std::fmt::Display for DriftReport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.has_drift {
writeln!(f, "Drift detected:")?;
for resource in &self.drifted_resources {
writeln!(f, " - {resource}")?;
}
} else {
write!(f, "No drift detected - state is converged")?;
}
Ok(())
}
}
impl std::fmt::Display for ReconciliationResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = if self.success { "successful" } else { "failed" };
writeln!(f, "Reconciliation {status}:")?;
writeln!(f, " Created: {}", self.created)?;
writeln!(f, " Updated: {}", self.updated)?;
writeln!(f, " Deleted: {}", self.deleted)?;
writeln!(f, " Unchanged: {}", self.unchanged)?;
if !self.errors.is_empty() {
writeln!(f, " Errors:")?;
for error in &self.errors {
writeln!(f, " - {error}")?;
}
}
Ok(())
}
}