1use anyhow::{Context, Result, anyhow};
7use chrono::Utc;
8use evolve_adapters::AdapterRegistry;
9use evolve_core::ids::{ConfigId, ExperimentId, ProjectId, SessionId};
10use evolve_core::promotion::{
11 AggregationConfig, Decision, PromotionConfig, SignalInput, SignalKind as PromSignalKind,
12 aggregate, promotion_decision,
13};
14use evolve_llm::LlmClient;
15use evolve_mutators::{MutationCtx, MutatorPicker};
16
17pub fn picker_for_environment(has_llm: bool) -> MutatorPicker {
22 if has_llm {
23 MutatorPicker::default()
24 } else {
25 MutatorPicker::without_llm()
26 }
27}
28use evolve_storage::Storage;
29use evolve_storage::agent_configs::{AgentConfigRepo, AgentConfigRow, ConfigRole};
30use evolve_storage::experiments::{Experiment, ExperimentRepo, ExperimentStatus};
31use evolve_storage::projects::{Project, ProjectRepo};
32use evolve_storage::sessions::SessionRepo;
33use evolve_storage::signals::{SignalKind as StorageSignalKind, SignalRepo};
34use rand::SeedableRng;
35use rand_chacha::ChaCha8Rng;
36use std::path::Path;
37
38fn map_kind(k: StorageSignalKind) -> PromSignalKind {
40 match k {
41 StorageSignalKind::Explicit => PromSignalKind::Explicit,
42 StorageSignalKind::Implicit => PromSignalKind::Implicit,
43 }
44}
45
46pub async fn collect_scores_for_config(storage: &Storage, config_id: ConfigId) -> Result<Vec<f64>> {
49 let signals = SignalRepo::new(storage).list_for_config(config_id).await?;
50 let mut by_session: std::collections::HashMap<SessionId, Vec<SignalInput>> = Default::default();
51 for sig in signals {
52 by_session
53 .entry(sig.session_id)
54 .or_default()
55 .push(SignalInput {
56 kind: map_kind(sig.kind),
57 value: sig.value,
58 });
59 }
60 let cfg = AggregationConfig::default();
61 Ok(by_session
62 .values()
63 .map(|sigs| aggregate(sigs, &cfg))
64 .collect())
65}
66
67pub async fn evaluate_promotion(
70 storage: &Storage,
71 project_id: ProjectId,
72) -> Result<Option<(Experiment, Decision)>> {
73 let exp = match ExperimentRepo::new(storage)
74 .get_running_for_project(project_id)
75 .await?
76 {
77 Some(e) => e,
78 None => return Ok(None),
79 };
80 let champion_scores = collect_scores_for_config(storage, exp.champion_config_id).await?;
81 let challenger_scores = collect_scores_for_config(storage, exp.challenger_config_id).await?;
82
83 let seed = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
84 let mut rng = ChaCha8Rng::seed_from_u64(seed);
85 let cfg = PromotionConfig::default();
86 let decision = promotion_decision(&champion_scores, &challenger_scores, &cfg, &mut rng);
87 Ok(Some((exp, decision)))
88}
89
90pub async fn promote_challenger(
93 storage: &Storage,
94 registry: &AdapterRegistry,
95 project: &Project,
96 experiment: &Experiment,
97 posterior: f64,
98) -> Result<()> {
99 let now = Utc::now();
100 ExperimentRepo::new(storage)
101 .update_status(
102 experiment.id,
103 ExperimentStatus::Promoted,
104 Some(now),
105 Some(posterior),
106 )
107 .await?;
108 ProjectRepo::new(storage)
109 .set_champion(project.id, experiment.challenger_config_id)
110 .await?;
111
112 let cfg_row = AgentConfigRepo::new(storage)
113 .get_by_id(experiment.challenger_config_id)
114 .await?
115 .ok_or_else(|| anyhow!("challenger config row missing"))?;
116 if let Some(adapter) = registry.get(project.adapter_id.as_str()) {
117 let _ = adapter
119 .apply_config(Path::new(&project.root_path), &cfg_row.payload)
120 .await;
121 }
122 Ok(())
123}
124
125pub async fn generate_challenger_with_picker(
135 storage: &Storage,
136 registry: &AdapterRegistry,
137 llm: &dyn LlmClient,
138 picker: &MutatorPicker,
139 project: &Project,
140 rng: &mut ChaCha8Rng,
141) -> Result<(ConfigId, ExperimentId)> {
142 let champion_id = project
143 .champion_config_id
144 .ok_or_else(|| anyhow!("project has no champion"))?;
145 let champion_row = AgentConfigRepo::new(storage)
146 .get_by_id(champion_id)
147 .await?
148 .ok_or_else(|| anyhow!("champion config row missing"))?;
149
150 let mutator = picker.pick(rng);
151 let mut ctx = MutationCtx { llm, rng };
152 let challenger_payload = mutator
153 .mutate(&champion_row.payload, &mut ctx)
154 .await
155 .context("mutator failed")?;
156
157 let challenger_id = ConfigId::new();
158 AgentConfigRepo::new(storage)
159 .insert(&AgentConfigRow {
160 id: challenger_id,
161 project_id: project.id,
162 adapter_id: champion_row.adapter_id.clone(),
163 role: ConfigRole::Challenger,
164 fingerprint: challenger_payload.fingerprint(),
165 payload: challenger_payload.clone(),
166 created_at: Utc::now(),
167 })
168 .await?;
169
170 let experiment_id = ExperimentId::new();
171 ExperimentRepo::new(storage)
172 .insert(&Experiment {
173 id: experiment_id,
174 project_id: project.id,
175 champion_config_id: champion_id,
176 challenger_config_id: challenger_id,
177 status: ExperimentStatus::Running,
178 traffic_share: 0.5,
179 started_at: Utc::now(),
180 decided_at: None,
181 decision_posterior: None,
182 })
183 .await?;
184
185 if let Some(adapter) = registry.get(project.adapter_id.as_str()) {
186 adapter
187 .apply_config(Path::new(&project.root_path), &challenger_payload)
188 .await
189 .context("adapter apply_config failed")?;
190 }
191
192 Ok((challenger_id, experiment_id))
193}
194
195pub async fn generate_challenger(
199 storage: &Storage,
200 registry: &AdapterRegistry,
201 llm: &dyn LlmClient,
202 project: &Project,
203 rng: &mut ChaCha8Rng,
204) -> Result<(ConfigId, ExperimentId)> {
205 let picker = MutatorPicker::default();
206 generate_challenger_with_picker(storage, registry, llm, &picker, project, rng).await
207}
208
209pub async fn should_evolve(
213 storage: &Storage,
214 project_id: ProjectId,
215 threshold_sessions: u32,
216) -> Result<bool> {
217 if ExperimentRepo::new(storage)
218 .get_running_for_project(project_id)
219 .await?
220 .is_some()
221 {
222 return Ok(false);
223 }
224 let sessions = SessionRepo::new(storage)
225 .list_recent(project_id, threshold_sessions)
226 .await?;
227 Ok(sessions.len() as u32 >= threshold_sessions)
228}
229
230pub async fn resolve_active_deployment(
239 storage: &Storage,
240 project: &Project,
241 home: &std::path::Path,
242) -> Result<(
243 evolve_storage::sessions::SessionVariant,
244 ConfigId,
245 Option<ExperimentId>,
246)> {
247 if let Some(state) = read_deployment_state(home, project.id).await? {
248 return Ok((state.variant, state.config_id, state.experiment_id));
249 }
250 if let Some(exp) = ExperimentRepo::new(storage)
251 .get_running_for_project(project.id)
252 .await?
253 {
254 return Ok((
255 evolve_storage::sessions::SessionVariant::Challenger,
256 exp.challenger_config_id,
257 Some(exp.id),
258 ));
259 }
260 let champ = project
261 .champion_config_id
262 .ok_or_else(|| anyhow!("project has no champion"))?;
263 Ok((
264 evolve_storage::sessions::SessionVariant::Champion,
265 champ,
266 None,
267 ))
268}
269
270#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
272pub struct DeploymentState {
273 pub project_id: ProjectId,
275 pub variant: evolve_storage::sessions::SessionVariant,
277 pub config_id: ConfigId,
279 pub experiment_id: Option<ExperimentId>,
281 pub deployed_at: String,
283}
284
285fn deployment_state_path(home: &std::path::Path, project_id: ProjectId) -> std::path::PathBuf {
286 home.join("state").join(format!("{}.json", project_id))
287}
288
289pub async fn read_deployment_state(
291 home: &std::path::Path,
292 project_id: ProjectId,
293) -> Result<Option<DeploymentState>> {
294 let p = deployment_state_path(home, project_id);
295 if !p.is_file() {
296 return Ok(None);
297 }
298 let raw = tokio::fs::read_to_string(&p).await?;
299 let state: DeploymentState = match serde_json::from_str(&raw) {
300 Ok(s) => s,
301 Err(_) => return Ok(None),
302 };
303 Ok(Some(state))
304}
305
306pub async fn write_deployment_state(home: &std::path::Path, state: &DeploymentState) -> Result<()> {
308 let p = deployment_state_path(home, state.project_id);
309 if let Some(parent) = p.parent() {
310 tokio::fs::create_dir_all(parent).await?;
311 }
312 tokio::fs::write(&p, serde_json::to_string_pretty(state)?).await?;
313 Ok(())
314}
315
316pub async fn handle_session_start(
320 storage: &Storage,
321 registry: &AdapterRegistry,
322 project: &Project,
323 home: &std::path::Path,
324 rng: &mut ChaCha8Rng,
325) -> Result<DeploymentState> {
326 use rand::Rng;
327
328 let exp = ExperimentRepo::new(storage)
329 .get_running_for_project(project.id)
330 .await?;
331
332 let (variant, config_id, experiment_id) = if let Some(e) = exp {
333 let to_challenger: bool = rng.gen_bool(e.traffic_share);
334 if to_challenger {
335 (
336 evolve_storage::sessions::SessionVariant::Challenger,
337 e.challenger_config_id,
338 Some(e.id),
339 )
340 } else {
341 (
342 evolve_storage::sessions::SessionVariant::Champion,
343 e.champion_config_id,
344 Some(e.id),
345 )
346 }
347 } else {
348 (
349 evolve_storage::sessions::SessionVariant::Champion,
350 project
351 .champion_config_id
352 .ok_or_else(|| anyhow!("project has no champion"))?,
353 None,
354 )
355 };
356
357 let cfg_row = AgentConfigRepo::new(storage)
358 .get_by_id(config_id)
359 .await?
360 .ok_or_else(|| anyhow!("config row missing for deployment"))?;
361 if let Some(adapter) = registry.get(project.adapter_id.as_str()) {
362 adapter
363 .apply_config(Path::new(&project.root_path), &cfg_row.payload)
364 .await
365 .context("adapter apply_config in session start")?;
366 }
367
368 let state = DeploymentState {
369 project_id: project.id,
370 variant,
371 config_id,
372 experiment_id,
373 deployed_at: Utc::now().to_rfc3339(),
374 };
375 write_deployment_state(home, &state).await?;
376 Ok(state)
377}