Skip to main content

evolve_storage/
experiments.rs

1//! Repository for the `experiments` table.
2
3use crate::error::StorageError;
4use crate::pool::Storage;
5use chrono::{DateTime, Utc};
6use evolve_core::ids::{ConfigId, ExperimentId, ProjectId};
7use uuid::Uuid;
8
9/// Experiment lifecycle state.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum ExperimentStatus {
12    /// Actively collecting signals.
13    Running,
14    /// Challenger won; promoted to champion.
15    Promoted,
16    /// Manually cancelled or superseded.
17    Aborted,
18    /// Decision reached but champion kept.
19    Held,
20}
21
22impl ExperimentStatus {
23    fn as_str(self) -> &'static str {
24        match self {
25            Self::Running => "running",
26            Self::Promoted => "promoted",
27            Self::Aborted => "aborted",
28            Self::Held => "held",
29        }
30    }
31
32    fn from_str(s: &str) -> Result<Self, StorageError> {
33        Ok(match s {
34            "running" => Self::Running,
35            "promoted" => Self::Promoted,
36            "aborted" => Self::Aborted,
37            "held" => Self::Held,
38            other => {
39                return Err(StorageError::Sqlx(sqlx::Error::Decode(
40                    format!("unknown experiment status {other:?}").into(),
41                )));
42            }
43        })
44    }
45}
46
47/// One champion-vs-challenger experiment row.
48#[derive(Debug, Clone)]
49pub struct Experiment {
50    /// Experiment identity.
51    pub id: ExperimentId,
52    /// Owning project.
53    pub project_id: ProjectId,
54    /// Champion config under test.
55    pub champion_config_id: ConfigId,
56    /// Challenger config under test.
57    pub challenger_config_id: ConfigId,
58    /// Lifecycle state.
59    pub status: ExperimentStatus,
60    /// Share of sessions routed to challenger (0..=1).
61    pub traffic_share: f64,
62    /// When the experiment started.
63    pub started_at: DateTime<Utc>,
64    /// When the decision was reached (only set for non-Running statuses).
65    pub decided_at: Option<DateTime<Utc>>,
66    /// P(challenger > champion) at decision time.
67    pub decision_posterior: Option<f64>,
68}
69
70/// Raw tuple shape returned by `SELECT`s on `experiments`. Factored out so
71/// `fetch_optional` / `fetch_all` call sites stay under `clippy::type_complexity`.
72type ExperimentRow = (
73    String,
74    String,
75    String,
76    String,
77    String,
78    f64,
79    String,
80    Option<String>,
81    Option<f64>,
82);
83
84/// Repository for `experiments`.
85#[derive(Debug, Clone)]
86pub struct ExperimentRepo<'a> {
87    storage: &'a Storage,
88}
89
90impl<'a> ExperimentRepo<'a> {
91    /// Construct a new repo borrowing the storage handle.
92    pub fn new(storage: &'a Storage) -> Self {
93        Self { storage }
94    }
95
96    /// Insert a new experiment.
97    pub async fn insert(&self, exp: &Experiment) -> Result<(), StorageError> {
98        sqlx::query(
99            "INSERT INTO experiments
100                (id, project_id, champion_config_id, challenger_config_id,
101                 status, traffic_share, started_at, decided_at, decision_posterior)
102             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
103        )
104        .bind(exp.id.to_string())
105        .bind(exp.project_id.to_string())
106        .bind(exp.champion_config_id.to_string())
107        .bind(exp.challenger_config_id.to_string())
108        .bind(exp.status.as_str())
109        .bind(exp.traffic_share)
110        .bind(exp.started_at.to_rfc3339())
111        .bind(exp.decided_at.map(|d| d.to_rfc3339()))
112        .bind(exp.decision_posterior)
113        .execute(self.storage.pool())
114        .await?;
115        Ok(())
116    }
117
118    /// Return the single running experiment for a project, if any.
119    pub async fn get_running_for_project(
120        &self,
121        project_id: ProjectId,
122    ) -> Result<Option<Experiment>, StorageError> {
123        let row: Option<ExperimentRow> = sqlx::query_as(
124            "SELECT id, project_id, champion_config_id, challenger_config_id,
125                    status, traffic_share, started_at, decided_at, decision_posterior
126             FROM experiments
127             WHERE project_id = ? AND status = 'running'
128             LIMIT 1",
129        )
130        .bind(project_id.to_string())
131        .fetch_optional(self.storage.pool())
132        .await?;
133        row.map(row_to_experiment).transpose()
134    }
135
136    /// List all non-Running experiments for a project, most recent first.
137    pub async fn list_completed(
138        &self,
139        project_id: ProjectId,
140    ) -> Result<Vec<Experiment>, StorageError> {
141        let rows: Vec<ExperimentRow> = sqlx::query_as(
142            "SELECT id, project_id, champion_config_id, challenger_config_id,
143                    status, traffic_share, started_at, decided_at, decision_posterior
144             FROM experiments
145             WHERE project_id = ? AND status != 'running'
146             ORDER BY COALESCE(decided_at, started_at) DESC",
147        )
148        .bind(project_id.to_string())
149        .fetch_all(self.storage.pool())
150        .await?;
151        rows.into_iter().map(row_to_experiment).collect()
152    }
153
154    /// Update the lifecycle state (and decision timestamp + posterior on terminal transitions).
155    pub async fn update_status(
156        &self,
157        id: ExperimentId,
158        status: ExperimentStatus,
159        decided_at: Option<DateTime<Utc>>,
160        decision_posterior: Option<f64>,
161    ) -> Result<(), StorageError> {
162        sqlx::query(
163            "UPDATE experiments
164             SET status = ?, decided_at = ?, decision_posterior = ?
165             WHERE id = ?",
166        )
167        .bind(status.as_str())
168        .bind(decided_at.map(|d| d.to_rfc3339()))
169        .bind(decision_posterior)
170        .bind(id.to_string())
171        .execute(self.storage.pool())
172        .await?;
173        Ok(())
174    }
175}
176
177fn row_to_experiment(
178    (
179        id,
180        project_id,
181        champion,
182        challenger,
183        status,
184        traffic_share,
185        started_at,
186        decided_at,
187        posterior,
188    ): ExperimentRow,
189) -> Result<Experiment, StorageError> {
190    Ok(Experiment {
191        id: ExperimentId::from_uuid(Uuid::parse_str(&id)?),
192        project_id: ProjectId::from_uuid(Uuid::parse_str(&project_id)?),
193        champion_config_id: ConfigId::from_uuid(Uuid::parse_str(&champion)?),
194        challenger_config_id: ConfigId::from_uuid(Uuid::parse_str(&challenger)?),
195        status: ExperimentStatus::from_str(&status)?,
196        traffic_share,
197        started_at: DateTime::parse_from_rfc3339(&started_at)
198            .map_err(|e| StorageError::Sqlx(sqlx::Error::Decode(Box::new(e))))?
199            .with_timezone(&Utc),
200        decided_at: decided_at
201            .map(|s| {
202                DateTime::parse_from_rfc3339(&s)
203                    .map(|d| d.with_timezone(&Utc))
204                    .map_err(|e| StorageError::Sqlx(sqlx::Error::Decode(Box::new(e))))
205            })
206            .transpose()?,
207        decision_posterior: posterior,
208    })
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::agent_configs::{AgentConfigRepo, AgentConfigRow, ConfigRole};
215    use crate::projects::{Project, ProjectRepo};
216    use evolve_core::agent_config::AgentConfig;
217    use evolve_core::ids::AdapterId;
218
219    async fn seeded() -> (Storage, ProjectId, ConfigId, ConfigId) {
220        let storage = Storage::in_memory_for_tests().await.unwrap();
221        let pid = ProjectId::new();
222        ProjectRepo::new(&storage)
223            .insert(&Project {
224                id: pid,
225                adapter_id: AdapterId::new("claude-code"),
226                root_path: "/tmp/experiments-test".into(),
227                name: "x".into(),
228                created_at: Utc::now(),
229                champion_config_id: None,
230            })
231            .await
232            .unwrap();
233
234        let champion = AgentConfigRow {
235            id: ConfigId::new(),
236            project_id: pid,
237            adapter_id: AdapterId::new("claude-code"),
238            role: ConfigRole::Champion,
239            fingerprint: 1,
240            payload: AgentConfig::default_for("claude-code"),
241            created_at: Utc::now(),
242        };
243        let challenger = AgentConfigRow {
244            id: ConfigId::new(),
245            role: ConfigRole::Challenger,
246            fingerprint: 2,
247            ..champion.clone()
248        };
249        let cfg = AgentConfigRepo::new(&storage);
250        cfg.insert(&champion).await.unwrap();
251        cfg.insert(&challenger).await.unwrap();
252
253        (storage, pid, champion.id, challenger.id)
254    }
255
256    #[tokio::test]
257    async fn insert_and_get_running_returns_the_row() {
258        let (storage, pid, champ, chall) = seeded().await;
259        let repo = ExperimentRepo::new(&storage);
260        let exp = Experiment {
261            id: ExperimentId::new(),
262            project_id: pid,
263            champion_config_id: champ,
264            challenger_config_id: chall,
265            status: ExperimentStatus::Running,
266            traffic_share: 0.05,
267            started_at: Utc::now(),
268            decided_at: None,
269            decision_posterior: None,
270        };
271        repo.insert(&exp).await.unwrap();
272        let back = repo.get_running_for_project(pid).await.unwrap().unwrap();
273        assert_eq!(back.id, exp.id);
274    }
275
276    #[tokio::test]
277    async fn only_one_running_experiment_per_project_is_allowed() {
278        let (storage, pid, champ, chall) = seeded().await;
279        let repo = ExperimentRepo::new(&storage);
280
281        let first = Experiment {
282            id: ExperimentId::new(),
283            project_id: pid,
284            champion_config_id: champ,
285            challenger_config_id: chall,
286            status: ExperimentStatus::Running,
287            traffic_share: 0.05,
288            started_at: Utc::now(),
289            decided_at: None,
290            decision_posterior: None,
291        };
292        let second = Experiment {
293            id: ExperimentId::new(),
294            ..first.clone()
295        };
296        repo.insert(&first).await.unwrap();
297        let err = repo.insert(&second).await.unwrap_err();
298        assert!(matches!(err, StorageError::Sqlx(sqlx::Error::Database(_))));
299    }
300
301    #[tokio::test]
302    async fn update_status_to_promoted_sets_decided_at_and_posterior() {
303        let (storage, pid, champ, chall) = seeded().await;
304        let repo = ExperimentRepo::new(&storage);
305        let exp = Experiment {
306            id: ExperimentId::new(),
307            project_id: pid,
308            champion_config_id: champ,
309            challenger_config_id: chall,
310            status: ExperimentStatus::Running,
311            traffic_share: 0.05,
312            started_at: Utc::now(),
313            decided_at: None,
314            decision_posterior: None,
315        };
316        repo.insert(&exp).await.unwrap();
317        let decided = Utc::now();
318        repo.update_status(
319            exp.id,
320            ExperimentStatus::Promoted,
321            Some(decided),
322            Some(0.97),
323        )
324        .await
325        .unwrap();
326
327        let completed = repo.list_completed(pid).await.unwrap();
328        assert_eq!(completed.len(), 1);
329        assert_eq!(completed[0].status, ExperimentStatus::Promoted);
330        assert_eq!(completed[0].decision_posterior, Some(0.97));
331    }
332
333    #[tokio::test]
334    async fn list_completed_excludes_running() {
335        let (storage, pid, champ, chall) = seeded().await;
336        let repo = ExperimentRepo::new(&storage);
337        let exp = Experiment {
338            id: ExperimentId::new(),
339            project_id: pid,
340            champion_config_id: champ,
341            challenger_config_id: chall,
342            status: ExperimentStatus::Running,
343            traffic_share: 0.05,
344            started_at: Utc::now(),
345            decided_at: None,
346            decision_posterior: None,
347        };
348        repo.insert(&exp).await.unwrap();
349        assert!(repo.list_completed(pid).await.unwrap().is_empty());
350    }
351}