1use crate::error::StorageError;
4use crate::pool::Storage;
5use chrono::{DateTime, Utc};
6use evolve_core::ids::{ConfigId, ExperimentId, ProjectId};
7use uuid::Uuid;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ExperimentStatus {
12 Running,
14 Promoted,
16 Aborted,
18 Held,
20}
21
22impl ExperimentStatus {
23 fn as_str(self) -> &'static str {
24 match self {
25 Self::Running => "running",
26 Self::Promoted => "promoted",
27 Self::Aborted => "aborted",
28 Self::Held => "held",
29 }
30 }
31
32 fn from_str(s: &str) -> Result<Self, StorageError> {
33 Ok(match s {
34 "running" => Self::Running,
35 "promoted" => Self::Promoted,
36 "aborted" => Self::Aborted,
37 "held" => Self::Held,
38 other => {
39 return Err(StorageError::Sqlx(sqlx::Error::Decode(
40 format!("unknown experiment status {other:?}").into(),
41 )));
42 }
43 })
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct Experiment {
50 pub id: ExperimentId,
52 pub project_id: ProjectId,
54 pub champion_config_id: ConfigId,
56 pub challenger_config_id: ConfigId,
58 pub status: ExperimentStatus,
60 pub traffic_share: f64,
62 pub started_at: DateTime<Utc>,
64 pub decided_at: Option<DateTime<Utc>>,
66 pub decision_posterior: Option<f64>,
68}
69
70type ExperimentRow = (
73 String,
74 String,
75 String,
76 String,
77 String,
78 f64,
79 String,
80 Option<String>,
81 Option<f64>,
82);
83
84#[derive(Debug, Clone)]
86pub struct ExperimentRepo<'a> {
87 storage: &'a Storage,
88}
89
90impl<'a> ExperimentRepo<'a> {
91 pub fn new(storage: &'a Storage) -> Self {
93 Self { storage }
94 }
95
96 pub async fn insert(&self, exp: &Experiment) -> Result<(), StorageError> {
98 sqlx::query(
99 "INSERT INTO experiments
100 (id, project_id, champion_config_id, challenger_config_id,
101 status, traffic_share, started_at, decided_at, decision_posterior)
102 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
103 )
104 .bind(exp.id.to_string())
105 .bind(exp.project_id.to_string())
106 .bind(exp.champion_config_id.to_string())
107 .bind(exp.challenger_config_id.to_string())
108 .bind(exp.status.as_str())
109 .bind(exp.traffic_share)
110 .bind(exp.started_at.to_rfc3339())
111 .bind(exp.decided_at.map(|d| d.to_rfc3339()))
112 .bind(exp.decision_posterior)
113 .execute(self.storage.pool())
114 .await?;
115 Ok(())
116 }
117
118 pub async fn get_running_for_project(
120 &self,
121 project_id: ProjectId,
122 ) -> Result<Option<Experiment>, StorageError> {
123 let row: Option<ExperimentRow> = sqlx::query_as(
124 "SELECT id, project_id, champion_config_id, challenger_config_id,
125 status, traffic_share, started_at, decided_at, decision_posterior
126 FROM experiments
127 WHERE project_id = ? AND status = 'running'
128 LIMIT 1",
129 )
130 .bind(project_id.to_string())
131 .fetch_optional(self.storage.pool())
132 .await?;
133 row.map(row_to_experiment).transpose()
134 }
135
136 pub async fn list_completed(
138 &self,
139 project_id: ProjectId,
140 ) -> Result<Vec<Experiment>, StorageError> {
141 let rows: Vec<ExperimentRow> = sqlx::query_as(
142 "SELECT id, project_id, champion_config_id, challenger_config_id,
143 status, traffic_share, started_at, decided_at, decision_posterior
144 FROM experiments
145 WHERE project_id = ? AND status != 'running'
146 ORDER BY COALESCE(decided_at, started_at) DESC",
147 )
148 .bind(project_id.to_string())
149 .fetch_all(self.storage.pool())
150 .await?;
151 rows.into_iter().map(row_to_experiment).collect()
152 }
153
154 pub async fn update_status(
156 &self,
157 id: ExperimentId,
158 status: ExperimentStatus,
159 decided_at: Option<DateTime<Utc>>,
160 decision_posterior: Option<f64>,
161 ) -> Result<(), StorageError> {
162 sqlx::query(
163 "UPDATE experiments
164 SET status = ?, decided_at = ?, decision_posterior = ?
165 WHERE id = ?",
166 )
167 .bind(status.as_str())
168 .bind(decided_at.map(|d| d.to_rfc3339()))
169 .bind(decision_posterior)
170 .bind(id.to_string())
171 .execute(self.storage.pool())
172 .await?;
173 Ok(())
174 }
175}
176
177fn row_to_experiment(
178 (
179 id,
180 project_id,
181 champion,
182 challenger,
183 status,
184 traffic_share,
185 started_at,
186 decided_at,
187 posterior,
188 ): ExperimentRow,
189) -> Result<Experiment, StorageError> {
190 Ok(Experiment {
191 id: ExperimentId::from_uuid(Uuid::parse_str(&id)?),
192 project_id: ProjectId::from_uuid(Uuid::parse_str(&project_id)?),
193 champion_config_id: ConfigId::from_uuid(Uuid::parse_str(&champion)?),
194 challenger_config_id: ConfigId::from_uuid(Uuid::parse_str(&challenger)?),
195 status: ExperimentStatus::from_str(&status)?,
196 traffic_share,
197 started_at: DateTime::parse_from_rfc3339(&started_at)
198 .map_err(|e| StorageError::Sqlx(sqlx::Error::Decode(Box::new(e))))?
199 .with_timezone(&Utc),
200 decided_at: decided_at
201 .map(|s| {
202 DateTime::parse_from_rfc3339(&s)
203 .map(|d| d.with_timezone(&Utc))
204 .map_err(|e| StorageError::Sqlx(sqlx::Error::Decode(Box::new(e))))
205 })
206 .transpose()?,
207 decision_posterior: posterior,
208 })
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::agent_configs::{AgentConfigRepo, AgentConfigRow, ConfigRole};
215 use crate::projects::{Project, ProjectRepo};
216 use evolve_core::agent_config::AgentConfig;
217 use evolve_core::ids::AdapterId;
218
219 async fn seeded() -> (Storage, ProjectId, ConfigId, ConfigId) {
220 let storage = Storage::in_memory_for_tests().await.unwrap();
221 let pid = ProjectId::new();
222 ProjectRepo::new(&storage)
223 .insert(&Project {
224 id: pid,
225 adapter_id: AdapterId::new("claude-code"),
226 root_path: "/tmp/experiments-test".into(),
227 name: "x".into(),
228 created_at: Utc::now(),
229 champion_config_id: None,
230 })
231 .await
232 .unwrap();
233
234 let champion = AgentConfigRow {
235 id: ConfigId::new(),
236 project_id: pid,
237 adapter_id: AdapterId::new("claude-code"),
238 role: ConfigRole::Champion,
239 fingerprint: 1,
240 payload: AgentConfig::default_for("claude-code"),
241 created_at: Utc::now(),
242 };
243 let challenger = AgentConfigRow {
244 id: ConfigId::new(),
245 role: ConfigRole::Challenger,
246 fingerprint: 2,
247 ..champion.clone()
248 };
249 let cfg = AgentConfigRepo::new(&storage);
250 cfg.insert(&champion).await.unwrap();
251 cfg.insert(&challenger).await.unwrap();
252
253 (storage, pid, champion.id, challenger.id)
254 }
255
256 #[tokio::test]
257 async fn insert_and_get_running_returns_the_row() {
258 let (storage, pid, champ, chall) = seeded().await;
259 let repo = ExperimentRepo::new(&storage);
260 let exp = Experiment {
261 id: ExperimentId::new(),
262 project_id: pid,
263 champion_config_id: champ,
264 challenger_config_id: chall,
265 status: ExperimentStatus::Running,
266 traffic_share: 0.05,
267 started_at: Utc::now(),
268 decided_at: None,
269 decision_posterior: None,
270 };
271 repo.insert(&exp).await.unwrap();
272 let back = repo.get_running_for_project(pid).await.unwrap().unwrap();
273 assert_eq!(back.id, exp.id);
274 }
275
276 #[tokio::test]
277 async fn only_one_running_experiment_per_project_is_allowed() {
278 let (storage, pid, champ, chall) = seeded().await;
279 let repo = ExperimentRepo::new(&storage);
280
281 let first = Experiment {
282 id: ExperimentId::new(),
283 project_id: pid,
284 champion_config_id: champ,
285 challenger_config_id: chall,
286 status: ExperimentStatus::Running,
287 traffic_share: 0.05,
288 started_at: Utc::now(),
289 decided_at: None,
290 decision_posterior: None,
291 };
292 let second = Experiment {
293 id: ExperimentId::new(),
294 ..first.clone()
295 };
296 repo.insert(&first).await.unwrap();
297 let err = repo.insert(&second).await.unwrap_err();
298 assert!(matches!(err, StorageError::Sqlx(sqlx::Error::Database(_))));
299 }
300
301 #[tokio::test]
302 async fn update_status_to_promoted_sets_decided_at_and_posterior() {
303 let (storage, pid, champ, chall) = seeded().await;
304 let repo = ExperimentRepo::new(&storage);
305 let exp = Experiment {
306 id: ExperimentId::new(),
307 project_id: pid,
308 champion_config_id: champ,
309 challenger_config_id: chall,
310 status: ExperimentStatus::Running,
311 traffic_share: 0.05,
312 started_at: Utc::now(),
313 decided_at: None,
314 decision_posterior: None,
315 };
316 repo.insert(&exp).await.unwrap();
317 let decided = Utc::now();
318 repo.update_status(
319 exp.id,
320 ExperimentStatus::Promoted,
321 Some(decided),
322 Some(0.97),
323 )
324 .await
325 .unwrap();
326
327 let completed = repo.list_completed(pid).await.unwrap();
328 assert_eq!(completed.len(), 1);
329 assert_eq!(completed[0].status, ExperimentStatus::Promoted);
330 assert_eq!(completed[0].decision_posterior, Some(0.97));
331 }
332
333 #[tokio::test]
334 async fn list_completed_excludes_running() {
335 let (storage, pid, champ, chall) = seeded().await;
336 let repo = ExperimentRepo::new(&storage);
337 let exp = Experiment {
338 id: ExperimentId::new(),
339 project_id: pid,
340 champion_config_id: champ,
341 challenger_config_id: chall,
342 status: ExperimentStatus::Running,
343 traffic_share: 0.05,
344 started_at: Utc::now(),
345 decided_at: None,
346 decision_posterior: None,
347 };
348 repo.insert(&exp).await.unwrap();
349 assert!(repo.list_completed(pid).await.unwrap().is_empty());
350 }
351}