use std::path::PathBuf;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::EvolutionEvent;
use super::config::EvolveConfig;
use super::egl::is_score_converged;
use super::engine::EvolutionEngine;
use super::history::EvolutionHistory;
use super::observer::Observer;
use super::trial::TrialRunner;
use super::types::{CycleRecord, EvolutionResult};
use super::versioning::VersionControl;
use super::workspace::AgentWorkspace;
pub struct EvolutionLoop {
config: EvolveConfig,
workspace: AgentWorkspace,
history: EvolutionHistory,
trial: TrialRunner,
event_tx: mpsc::UnboundedSender<EvolutionEvent>,
}
impl EvolutionLoop {
pub fn new(
workspace_root: PathBuf,
trial: TrialRunner,
config: EvolveConfig,
event_tx: mpsc::UnboundedSender<EvolutionEvent>,
) -> Result<Self> {
let workspace = AgentWorkspace::new(&workspace_root);
workspace.ensure_dirs()?;
let evolution_dir = workspace.evolution_dir.clone();
let observer = Observer::new(&evolution_dir);
let versioning = VersionControl::new(&workspace_root);
let history = EvolutionHistory::new(observer, versioning);
Ok(Self {
config,
workspace,
history,
trial,
event_tx,
})
}
pub async fn run(
&mut self,
engine: &mut dyn EvolutionEngine,
cancel: CancellationToken,
) -> Result<EvolutionResult> {
self.history.versioning().init()?;
let max_cycles = self.config.max_cycles;
let mut score_history: Vec<f64> = Vec::new();
for cycle in 1..=max_cycles {
if cancel.is_cancelled() {
tracing::info!("Evolution cancelled at cycle {cycle}");
break;
}
tracing::info!(cycle, max_cycles, "=== Evolution Cycle ===");
let _ = self
.event_tx
.send(EvolutionEvent::CycleStarted { cycle, max_cycles });
let tasks = self
.trial
.get_tasks("train", self.config.batch_size)
.await?;
let _ = self.event_tx.send(EvolutionEvent::SolveStarted {
task_count: tasks.len(),
});
let observations = self.trial.run_tasks(&tasks).await;
self.trial.agent().export_to_fs().await?;
let batch_path = self.history.observer_mut().collect(&observations)?;
let batch_name = batch_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let cycle_score = if observations.is_empty() {
0.0
} else {
observations.iter().map(|o| o.feedback.score).sum::<f64>()
/ observations.len() as f64
};
score_history.push(cycle_score);
tracing::info!(cycle, score = cycle_score, "Cycle score");
let _ = self
.event_tx
.send(EvolutionEvent::SolveCompleted { score: cycle_score });
self.history.versioning().commit(
&format!("pre-evo-{cycle}: score={cycle_score:.3}"),
Some(&format!("pre-evo-{cycle}")),
)?;
let engine_name = engine.name().to_string();
let _ = self.event_tx.send(EvolutionEvent::EngineStepStarted {
engine_name: engine_name.clone(),
});
let step_result = match engine
.step(&self.workspace, &observations, &self.history, &self.trial)
.await
{
Ok(r) => r,
Err(e) => {
let msg = format!("Engine step failed: {e}");
tracing::error!("{msg}");
let _ = self.event_tx.send(EvolutionEvent::Error(msg));
return Err(e);
}
};
let _ = self.event_tx.send(EvolutionEvent::EngineStepCompleted {
mutated: step_result.mutated,
summary: step_result.summary.clone(),
});
let tag = format!("evo-{cycle}");
let msg = if step_result.mutated {
format!("evo-{cycle}: {}", step_result.summary)
} else {
format!("evo-{cycle}: no mutation")
};
self.history.versioning().commit(&msg, Some(&tag))?;
if step_result.mutated {
let from = format!("pre-evo-{cycle}");
let to = format!("evo-{cycle}");
if let Ok(diff) = self.history.get_workspace_diff(&from, &to) {
if let Ok(stat) = self.history.versioning().get_diff_stat(&from, &to) {
tracing::debug!(cycle, diff_len = diff.len(), stat = %stat.trim(), "Workspace diff for cycle");
} else {
tracing::debug!(cycle, diff_len = diff.len(), "Workspace diff for cycle");
}
}
let best_score = score_history
.iter()
.copied()
.fold(f64::NEG_INFINITY, f64::max);
if cycle_score < best_score - 0.1
&& let Some(best_idx) = score_history.iter().position(|&s| s == best_score)
{
let best_tag = format!("evo-{}", best_idx + 1);
if let Err(e) = self.history.versioning().rollback_to_tag(&best_tag) {
tracing::warn!(cycle, tag = %best_tag, error = %e, "Rollback failed");
} else {
tracing::info!(cycle, tag = %best_tag, "Rolled back to best-scoring tag");
}
}
let evo_tag = format!("evo-{cycle}");
match self.create_eval_copy(&evo_tag, cycle) {
Ok(eval_path) => {
tracing::debug!(cycle, tag = %evo_tag, path = %eval_path.display(), "Eval worktree created");
if let Err(e) = self.cleanup_eval_copy(&eval_path) {
tracing::debug!(cycle, error = %e, "Eval worktree cleanup (non-fatal)");
}
}
Err(e) => {
tracing::debug!(cycle, error = %e, "Eval worktree not created (non-fatal — no git repo)");
}
}
}
let record = CycleRecord {
cycle,
score: cycle_score,
mutated: step_result.mutated,
engine_name: engine_name.clone(),
summary: step_result.summary.clone(),
observation_batch: batch_name,
metadata: step_result.metadata.clone(),
};
self.history.record_cycle(record);
tracing::debug!(
cycle,
total_cycles = self.history.latest_cycle(),
all_cycles = self.history.cycles().len(),
"Evolution cycle recorded"
);
self.trial.agent().reload_from_fs().await?;
engine.on_cycle_end(step_result.mutated, cycle_score);
let history_path = self.workspace.evolution_dir.join("history.jsonl");
let history_data = serde_json::json!({
"cycle": cycle,
"score": cycle_score,
"mutated": step_result.mutated,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let _ = tokio::task::spawn_blocking(move || {
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&history_path)
.map_err(|e| e.to_string())?;
serde_json::to_writer(&mut file, &history_data).map_err(|e| e.to_string())?;
writeln!(file).map_err(|e| e.to_string())?;
Ok::<(), String>(())
})
.await;
let metrics_path = self.workspace.evolution_dir.join("metrics.json");
let metrics_data = serde_json::json!({
"cycles_completed": score_history.len(),
"latest_score": score_history.last().unwrap_or(&0.0),
"best_score": score_history.iter().cloned().fold(0.0_f64, f64::max),
"avg_score": if score_history.is_empty() { 0.0 } else {
score_history.iter().sum::<f64>() / score_history.len() as f64
},
});
let _ = tokio::task::spawn_blocking(move || {
let json =
serde_json::to_string_pretty(&metrics_data).map_err(|e| e.to_string())?;
std::fs::write(&metrics_path, json).map_err(|e| e.to_string())?;
Ok::<(), String>(())
})
.await;
let _ = self.event_tx.send(EvolutionEvent::CycleCompleted {
cycle,
score: cycle_score,
mutated: step_result.mutated,
});
if is_score_converged(
&score_history,
self.config.egl_window,
self.config.egl_threshold,
) {
tracing::info!(cycle, "Score converged");
let result = EvolutionResult {
cycles_completed: cycle,
final_score: cycle_score,
score_history,
converged: true,
..Default::default()
};
let _ = self.event_tx.send(EvolutionEvent::Converged {
cycle,
final_score: cycle_score,
});
let _ = self.event_tx.send(EvolutionEvent::Done(result.clone()));
return Ok(result);
}
}
let final_score = score_history.last().copied().unwrap_or(0.0);
let result = EvolutionResult {
cycles_completed: score_history.len() as u32,
final_score,
score_history,
converged: false,
..Default::default()
};
let _ = self.event_tx.send(EvolutionEvent::Done(result.clone()));
Ok(result)
}
fn create_eval_copy(&self, reference: &str, cycle: u32) -> Result<std::path::PathBuf> {
let dest = std::env::temp_dir().join(format!("collet-evo-{cycle}-{reference}"));
self.history.versioning().checkout_copy(reference, &dest)?;
Ok(dest)
}
fn cleanup_eval_copy(&self, dest: &std::path::Path) -> Result<()> {
self.history.versioning().remove_copy(dest)
}
}