Skip to main content

autumn_web/
experiments.rs

1//! A/B experiments with deterministic bucketing and exposure telemetry.
2//!
3//! Provides multi-variant experiment assignment with stable per-actor bucketing,
4//! sticky assignments, structured exposure events, override support for QA/staff,
5//! mutual exclusion groups, and experiment lifecycle management.
6//!
7//! # Quick start
8//!
9//! ```rust
10//! use autumn_web::experiments::{
11//!     ExperimentConfig, ExperimentService, InMemoryExperimentStore, VariantConfig,
12//! };
13//! use std::sync::Arc;
14//!
15//! // 1. Create a store and service.
16//! let store = Arc::new(InMemoryExperimentStore::new());
17//! let svc = ExperimentService::new(store);
18//!
19//! // 2. Declare an experiment with two 50/50 variants.
20//! svc.create(ExperimentConfig::new("checkout_v2", vec![
21//!     VariantConfig::new("control", 50),
22//!     VariantConfig::new("treatment", 50),
23//! ])).unwrap();
24//!
25//! // 3. Start it.
26//! svc.start("checkout_v2").unwrap();
27//!
28//! // 4. Assign an actor — stable and deterministic.
29//! let variant = svc.assign("checkout_v2", "user:1").unwrap();
30//! assert!(variant == "control" || variant == "treatment");
31//!
32//! // 5. Re-assignment returns the same sticky variant.
33//! let again = svc.assign("checkout_v2", "user:1").unwrap();
34//! assert_eq!(variant, again);
35//! ```
36//!
37//! # Assignment semantics
38//!
39//! Assignment is deterministic per `(experiment_name, actor_id)` using a
40//! FNV-1a 64-bit hash of the UTF-8 string `"<experiment>:<actor>"`, bucketed
41//! into `[0, 10 000)`. This hash function is **stable**: the same inputs always
42//! produce the same output across restarts, replicas, and library versions.
43//! Changing the hash function requires a documented migration path (see
44//! [`experiment_bucket`]).
45//!
46//! Variant selection maps the bucket to a variant proportionally by weight:
47//! given variants `[("control", 30), ("treatment", 70)]`, actors with
48//! buckets 0–2 999 are assigned `"control"` and the rest `"treatment"`.
49//!
50//! # Lifecycle
51//!
52//! Experiments move through states in order:
53//!
54//! ```text
55//! draft ─────── running ──── concluded
56//!        │              │
57//!        └── archived ──┘   (archived from any state)
58//! ```
59//!
60//! - **Draft**: declared but not yet accepting assignments.
61//! - **Running**: assignments + exposures active.
62//! - **Concluded**: winner pinned; all actors see the winner; no new exposures emitted.
63//! - **Archived**: `assign()` returns `Err(ExperimentError::Archived)`.
64//!
65//! # Exposure events
66//!
67//! Every successful `assign()` call on a `Running` experiment emits one
68//! [`ExposureRecord`] to the configured [`ExposureSink`]. The default sink
69//! logs at `INFO` via `tracing`. Supply a custom sink to forward events to
70//! your analytics pipeline.
71//!
72//! # Mutual exclusion groups
73//!
74//! Experiments can share a named group. An actor who has already been assigned
75//! to any experiment in the group will be excluded from all sibling experiments
76//! (`Err(ExperimentError::ExcludedByGroup)`). This prevents interaction effects
77//! between experiments targeting the same funnel.
78
79use std::collections::HashMap;
80use std::sync::{Arc, Mutex, RwLock};
81use std::time::{SystemTime, UNIX_EPOCH};
82
83use serde::{Deserialize, Serialize};
84
85// ── ExperimentState ──────────────────────────────────────────────────────────
86
87/// Lifecycle state of an experiment.
88///
89/// Experiments move through `draft → running → concluded`. They may be
90/// archived from any state. See module-level documentation for semantics.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum ExperimentState {
94    /// Declared but not yet accepting assignments.
95    Draft,
96    /// Accepting assignments and emitting exposures.
97    Running,
98    /// Winner pinned; all actors see the winner variant; no new exposures.
99    Concluded,
100    /// Archived; `assign()` returns [`ExperimentError::Archived`].
101    Archived,
102}
103
104impl std::fmt::Display for ExperimentState {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Draft => write!(f, "draft"),
108            Self::Running => write!(f, "running"),
109            Self::Concluded => write!(f, "concluded"),
110            Self::Archived => write!(f, "archived"),
111        }
112    }
113}
114
115// ── VariantConfig ────────────────────────────────────────────────────────────
116
117/// A single variant in an experiment with its assignment weight.
118///
119/// Weights are relative — they do not need to sum to 100. For a 30/70 split
120/// use `[VariantConfig::new("control", 30), VariantConfig::new("treatment", 70)]`.
121/// Equal weights produce equal splits. Use weight `0` to disable a variant.
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123pub struct VariantConfig {
124    /// Unique variant name within the experiment (e.g. `"control"`, `"treatment_a"`).
125    pub name: String,
126    /// Relative assignment weight. Use `0` to disable a variant without removing it.
127    pub weight: u32,
128}
129
130impl VariantConfig {
131    /// Create a new variant with the given name and weight.
132    #[must_use]
133    pub fn new(name: impl Into<String>, weight: u32) -> Self {
134        Self {
135            name: name.into(),
136            weight,
137        }
138    }
139}
140
141// ── ExperimentConfig ─────────────────────────────────────────────────────────
142
143/// Full configuration of a single experiment.
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct ExperimentConfig {
146    /// Unique experiment name (e.g. `"checkout_v2"`).
147    pub name: String,
148    /// Optional human-readable description.
149    pub description: Option<String>,
150    /// Current lifecycle state.
151    pub state: ExperimentState,
152    /// Ordered list of variants and their relative weights.
153    pub variants: Vec<VariantConfig>,
154    /// Set when the experiment is concluded: name of the winning variant.
155    pub winner: Option<String>,
156    /// Named mutual-exclusion group. Actors assigned to any experiment in
157    /// this group are excluded from all sibling experiments in the group.
158    pub exclusion_group: Option<String>,
159    /// Wall-clock time the experiment was last modified (seconds since UNIX epoch).
160    pub updated_at_secs: u64,
161}
162
163impl ExperimentConfig {
164    /// Create a new experiment in `Draft` state with the given variants.
165    #[must_use]
166    pub fn new(name: impl Into<String>, variants: Vec<VariantConfig>) -> Self {
167        Self {
168            name: name.into(),
169            description: None,
170            state: ExperimentState::Draft,
171            variants,
172            winner: None,
173            exclusion_group: None,
174            updated_at_secs: now_secs(),
175        }
176    }
177
178    /// Set a human-readable description.
179    #[must_use]
180    pub fn description(mut self, desc: impl Into<String>) -> Self {
181        self.description = Some(desc.into());
182        self
183    }
184
185    /// Set the mutual exclusion group name.
186    #[must_use]
187    pub fn exclusion_group(mut self, group: impl Into<String>) -> Self {
188        self.exclusion_group = Some(group.into());
189        self
190    }
191}
192
193// ── Assignment ───────────────────────────────────────────────────────────────
194
195/// A recorded sticky assignment for an actor in an experiment.
196///
197/// Once an actor is assigned, subsequent `assign()` calls return the same
198/// variant without re-computing the bucket (sticky semantics).
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Assignment {
201    /// Experiment name.
202    pub experiment: String,
203    /// Actor identifier.
204    pub actor: String,
205    /// Assigned variant name.
206    pub variant: String,
207    /// `true` when this assignment was produced by a staff/QA override rather
208    /// than weight-based bucketing.
209    pub is_override: bool,
210    /// Wall-clock time of first assignment (seconds since UNIX epoch).
211    pub assigned_at_secs: u64,
212}
213
214impl Assignment {
215    fn new(experiment: &str, actor: &str, variant: &str, is_override: bool) -> Self {
216        Self {
217            experiment: experiment.to_owned(),
218            actor: actor.to_owned(),
219            variant: variant.to_owned(),
220            is_override,
221            assigned_at_secs: now_secs(),
222        }
223    }
224}
225
226// ── ChangeRecord ─────────────────────────────────────────────────────────────
227
228/// A single mutation recorded in the experiment change log.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct ChangeRecord {
231    /// Experiment name.
232    pub experiment: String,
233    /// Human-readable description of the mutation.
234    pub mutation: String,
235    /// Actor who performed the mutation (username, `"cli"`, etc.).
236    pub actor: Option<String>,
237    /// Wall-clock time (seconds since UNIX epoch).
238    pub timestamp_secs: u64,
239}
240
241impl ChangeRecord {
242    fn now(experiment: &str, mutation: impl Into<String>, actor: Option<&str>) -> Self {
243        Self {
244            experiment: experiment.to_owned(),
245            mutation: mutation.into(),
246            actor: actor.map(str::to_owned),
247            timestamp_secs: now_secs(),
248        }
249    }
250}
251
252// ── ExposureRecord ────────────────────────────────────────────────────────────
253
254/// Structured exposure event emitted by each successful `assign()` call.
255///
256/// Consumers pipe these into their analytics pipeline to join exposures with
257/// outcome events (conversions, revenue, etc.).
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct ExposureRecord {
260    /// Experiment name.
261    pub experiment: String,
262    /// Assigned variant name.
263    pub variant: String,
264    /// Actor identifier.
265    pub actor: String,
266    /// Optional request ID for correlation (propagated from the HTTP request).
267    pub request_id: Option<String>,
268    /// `true` when this assignment was produced by a staff/QA override.
269    pub is_override: bool,
270    /// Wall-clock time of exposure (seconds since UNIX epoch).
271    pub timestamp_secs: u64,
272}
273
274// ── ExposureSink ─────────────────────────────────────────────────────────────
275
276/// Pluggable sink for exposure events.
277///
278/// Every successful `assign()` call on a `Running` experiment emits one
279/// [`ExposureRecord`] to the configured sink. The default is
280/// [`TracingExposureSink`] which logs the event at `INFO` level via the
281/// `tracing` crate.
282///
283/// # Custom sink
284///
285/// ```rust,ignore
286/// use autumn_web::experiments::{ExposureSink, ExposureRecord, ExperimentService,
287///     InMemoryExperimentStore};
288/// use std::sync::Arc;
289///
290/// struct MyAnalyticsSink;
291///
292/// impl ExposureSink for MyAnalyticsSink {
293///     fn record(&self, exposure: ExposureRecord) {
294///         // Forward to your analytics pipeline.
295///         send_to_segment(&exposure);
296///     }
297/// }
298///
299/// let svc = ExperimentService::new(Arc::new(InMemoryExperimentStore::new()))
300///     .with_exposure_sink(Arc::new(MyAnalyticsSink));
301/// ```
302pub trait ExposureSink: Send + Sync + 'static {
303    /// Record a single exposure event.
304    fn record(&self, exposure: ExposureRecord);
305}
306
307/// Default [`ExposureSink`]: emits a structured `tracing::info!` event.
308///
309/// The event carries `experiment`, `variant`, `actor`, `request_id`, and
310/// `is_override` fields so structured logging pipelines (JSON, OTLP) can
311/// aggregate exposures without parsing message text.
312pub struct TracingExposureSink;
313
314impl ExposureSink for TracingExposureSink {
315    fn record(&self, exposure: ExposureRecord) {
316        tracing::info!(
317            experiment = %exposure.experiment,
318            variant    = %exposure.variant,
319            actor      = %exposure.actor,
320            request_id = exposure.request_id.as_deref().unwrap_or(""),
321            is_override = %exposure.is_override,
322            "experiment_exposure"
323        );
324    }
325}
326
327/// A no-op [`ExposureSink`] that discards all exposure events.
328///
329/// Useful in benchmarks or contexts where exposure recording is handled
330/// out-of-band.
331pub struct NoOpExposureSink;
332
333impl ExposureSink for NoOpExposureSink {
334    fn record(&self, _: ExposureRecord) {}
335}
336
337/// A recording [`ExposureSink`] that collects all exposure events in memory.
338///
339/// Primarily useful for integration tests.
340///
341/// ```rust
342/// use autumn_web::experiments::{
343///     RecordingExposureSink, ExperimentService, ExperimentConfig,
344///     VariantConfig, InMemoryExperimentStore,
345/// };
346/// use std::sync::Arc;
347///
348/// let (sink, records) = RecordingExposureSink::new();
349/// let store = Arc::new(InMemoryExperimentStore::new());
350/// let svc = ExperimentService::new(store)
351///     .with_exposure_sink(Arc::new(sink));
352/// svc.create(ExperimentConfig::new("exp", vec![
353///     VariantConfig::new("control", 1),
354///     VariantConfig::new("treatment", 1),
355/// ])).unwrap();
356/// svc.start("exp").unwrap();
357/// svc.assign("exp", "user:1").unwrap();
358/// assert_eq!(records.lock().unwrap().len(), 1);
359/// ```
360pub struct RecordingExposureSink {
361    records: Arc<Mutex<Vec<ExposureRecord>>>,
362}
363
364impl Default for RecordingExposureSink {
365    fn default() -> Self {
366        Self::new().0
367    }
368}
369
370impl RecordingExposureSink {
371    /// Create a new recording sink, returning it alongside a shared handle to
372    /// the collected records.
373    #[must_use]
374    pub fn new() -> (Self, Arc<Mutex<Vec<ExposureRecord>>>) {
375        let records = Arc::new(Mutex::new(Vec::new()));
376        let sink = Self {
377            records: Arc::clone(&records),
378        };
379        (sink, records)
380    }
381}
382
383impl ExposureSink for RecordingExposureSink {
384    fn record(&self, exposure: ExposureRecord) {
385        self.records.lock().unwrap().push(exposure);
386    }
387}
388
389// ── ExperimentStoreError ──────────────────────────────────────────────────────
390
391/// Error from an [`ExperimentStore`] backend.
392#[derive(Debug, thiserror::Error)]
393pub enum ExperimentStoreError {
394    /// The backend reported an I/O or connection failure.
395    #[error("experiment store backend error: {0}")]
396    Backend(String),
397}
398
399// ── ExperimentStore trait ─────────────────────────────────────────────────────
400
401/// Pluggable storage backend for experiments.
402///
403/// All mutation methods should record an audit trail in the change log
404/// (accessible via [`history`](ExperimentStore::history)).
405pub trait ExperimentStore: Send + Sync + 'static {
406    /// Return the current configuration for `name`, or `None` if unknown.
407    ///
408    /// # Errors
409    ///
410    /// Returns [`ExperimentStoreError`] on backend failure.
411    fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError>;
412
413    /// Return all known experiments, sorted by name.
414    ///
415    /// # Errors
416    ///
417    /// Returns [`ExperimentStoreError`] on backend failure.
418    fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError>;
419
420    /// Insert or update an experiment configuration.
421    ///
422    /// # Errors
423    ///
424    /// Returns [`ExperimentStoreError`] on backend failure.
425    fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError>;
426
427    /// Update the lifecycle state of an experiment.
428    ///
429    /// Set `winner` when concluding; ignored otherwise.
430    ///
431    /// # Errors
432    ///
433    /// Returns [`ExperimentStoreError`] on backend failure.
434    fn set_state(
435        &self,
436        name: &str,
437        state: ExperimentState,
438        winner: Option<&str>,
439    ) -> Result<(), ExperimentStoreError>;
440
441    /// Update the variants and weights for an experiment.
442    ///
443    /// Existing sticky assignments are NOT re-bucketed.
444    ///
445    /// # Errors
446    ///
447    /// Returns [`ExperimentStoreError`] on backend failure.
448    fn set_variants(
449        &self,
450        name: &str,
451        variants: Vec<VariantConfig>,
452        actor: Option<&str>,
453    ) -> Result<(), ExperimentStoreError>;
454
455    /// Return the sticky assignment for `(experiment, actor)`, or `None`.
456    ///
457    /// # Errors
458    ///
459    /// Returns [`ExperimentStoreError`] on backend failure.
460    fn get_assignment(
461        &self,
462        experiment: &str,
463        actor: &str,
464    ) -> Result<Option<Assignment>, ExperimentStoreError>;
465
466    /// Record a sticky assignment.
467    ///
468    /// # Errors
469    ///
470    /// Returns the variant that was persisted (the existing row's variant on
471    /// conflict, or the newly-inserted variant on first write). Callers must
472    /// use this return value so that concurrent first-writers agree on the
473    /// same sticky bucket.
474    ///
475    /// Returns [`ExperimentStoreError`] on backend failure.
476    fn record_assignment(&self, assignment: Assignment) -> Result<String, ExperimentStoreError>;
477
478    /// Return the override variant name for `(experiment, actor)`, or `None`.
479    ///
480    /// # Errors
481    ///
482    /// Returns [`ExperimentStoreError`] on backend failure.
483    fn get_override(
484        &self,
485        experiment: &str,
486        actor: &str,
487    ) -> Result<Option<String>, ExperimentStoreError>;
488
489    /// Pin `actor` to `variant` in `experiment`, bypassing weight-based bucketing.
490    ///
491    /// # Errors
492    ///
493    /// Returns [`ExperimentStoreError`] on backend failure.
494    fn set_override(
495        &self,
496        experiment: &str,
497        actor: &str,
498        variant: &str,
499    ) -> Result<(), ExperimentStoreError>;
500
501    /// Return `true` if `actor` has an existing assignment in any experiment
502    /// belonging to `group`, excluding `exclude_experiment` itself.
503    ///
504    /// Used to enforce mutual exclusion: once an actor is in one experiment
505    /// of a group, they are excluded from all siblings.
506    ///
507    /// # Errors
508    ///
509    /// Returns [`ExperimentStoreError`] on backend failure.
510    fn has_assignment_in_group(
511        &self,
512        actor: &str,
513        group: &str,
514        exclude_experiment: &str,
515    ) -> Result<bool, ExperimentStoreError>;
516
517    /// Return the change log for `experiment` (most-recent first), capped at
518    /// `limit` entries. Returns an empty `Vec` for unknown experiments.
519    ///
520    /// # Errors
521    ///
522    /// Returns [`ExperimentStoreError`] on backend failure.
523    fn history(
524        &self,
525        experiment: &str,
526        limit: usize,
527    ) -> Result<Vec<ChangeRecord>, ExperimentStoreError>;
528}
529
530impl<T: ExperimentStore> ExperimentStore for Arc<T> {
531    fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError> {
532        (**self).get(name)
533    }
534    fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError> {
535        (**self).list()
536    }
537    fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError> {
538        (**self).upsert(config)
539    }
540    fn set_state(
541        &self,
542        name: &str,
543        state: ExperimentState,
544        winner: Option<&str>,
545    ) -> Result<(), ExperimentStoreError> {
546        (**self).set_state(name, state, winner)
547    }
548    fn set_variants(
549        &self,
550        name: &str,
551        variants: Vec<VariantConfig>,
552        actor: Option<&str>,
553    ) -> Result<(), ExperimentStoreError> {
554        (**self).set_variants(name, variants, actor)
555    }
556    fn get_assignment(
557        &self,
558        experiment: &str,
559        actor: &str,
560    ) -> Result<Option<Assignment>, ExperimentStoreError> {
561        (**self).get_assignment(experiment, actor)
562    }
563    fn record_assignment(&self, assignment: Assignment) -> Result<String, ExperimentStoreError> {
564        (**self).record_assignment(assignment)
565    }
566    fn get_override(
567        &self,
568        experiment: &str,
569        actor: &str,
570    ) -> Result<Option<String>, ExperimentStoreError> {
571        (**self).get_override(experiment, actor)
572    }
573    fn set_override(
574        &self,
575        experiment: &str,
576        actor: &str,
577        variant: &str,
578    ) -> Result<(), ExperimentStoreError> {
579        (**self).set_override(experiment, actor, variant)
580    }
581    fn has_assignment_in_group(
582        &self,
583        actor: &str,
584        group: &str,
585        exclude_experiment: &str,
586    ) -> Result<bool, ExperimentStoreError> {
587        (**self).has_assignment_in_group(actor, group, exclude_experiment)
588    }
589    fn history(
590        &self,
591        experiment: &str,
592        limit: usize,
593    ) -> Result<Vec<ChangeRecord>, ExperimentStoreError> {
594        (**self).history(experiment, limit)
595    }
596}
597
598// ── InMemoryExperimentStore ───────────────────────────────────────────────────
599
600#[derive(Default)]
601struct StoreInner {
602    experiments: HashMap<String, ExperimentConfig>,
603    assignments: HashMap<(String, String), Assignment>,
604    overrides: HashMap<(String, String), String>,
605    changes: HashMap<String, Vec<ChangeRecord>>,
606}
607
608/// In-memory [`ExperimentStore`] implementation.
609///
610/// All data is lost when the process exits. Best suited for tests and single-
611/// replica development setups where cross-restart persistence is not required.
612///
613/// For production, use the Postgres-backed store (coming soon) which persists
614/// assignments across restarts and propagates weight changes via LISTEN/NOTIFY.
615#[derive(Default)]
616pub struct InMemoryExperimentStore {
617    inner: RwLock<StoreInner>,
618}
619
620impl InMemoryExperimentStore {
621    /// Create an empty in-memory store.
622    #[must_use]
623    pub fn new() -> Self {
624        Self::default()
625    }
626}
627
628impl ExperimentStore for InMemoryExperimentStore {
629    fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError> {
630        Ok(self.inner.read().unwrap().experiments.get(name).cloned())
631    }
632
633    fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError> {
634        let mut exps: Vec<ExperimentConfig> = {
635            let inner = self.inner.read().unwrap();
636            inner.experiments.values().cloned().collect()
637        };
638        exps.sort_by(|a, b| a.name.cmp(&b.name));
639        Ok(exps)
640    }
641
642    fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError> {
643        let name = config.name.clone();
644        {
645            let mut inner = self.inner.write().unwrap();
646            let active_variants: std::collections::HashSet<String> = inner
647                .assignments
648                .values()
649                .filter(|a| a.experiment == name)
650                .map(|a| a.variant.clone())
651                .collect();
652
653            let new_variants: std::collections::HashSet<&str> =
654                config.variants.iter().map(|v| v.name.as_str()).collect();
655
656            for variant in active_variants {
657                if !new_variants.contains(variant.as_str()) {
658                    return Err(ExperimentStoreError::Backend(format!(
659                        "cannot delete variant '{variant}' because it has active assignments"
660                    )));
661                }
662            }
663
664            let exists = inner.experiments.contains_key(&name);
665            inner.experiments.insert(name.clone(), config);
666            let mutation = if exists { "updated" } else { "created" };
667            inner
668                .changes
669                .entry(name.clone())
670                .or_default()
671                .push(ChangeRecord::now(&name, mutation, None));
672        }
673        Ok(())
674    }
675
676    fn set_state(
677        &self,
678        name: &str,
679        state: ExperimentState,
680        winner: Option<&str>,
681    ) -> Result<(), ExperimentStoreError> {
682        {
683            let mut inner = self.inner.write().unwrap();
684            if let Some(exp) = inner.experiments.get_mut(name) {
685                exp.state = state;
686                if let Some(w) = winner {
687                    exp.winner = Some(w.to_owned());
688                }
689                exp.updated_at_secs = now_secs();
690            }
691            inner
692                .changes
693                .entry(name.to_owned())
694                .or_default()
695                .push(ChangeRecord::now(
696                    name,
697                    winner.map_or_else(|| format!("state={state}"), |w| format!("concluded={w}")),
698                    None,
699                ));
700        }
701        Ok(())
702    }
703
704    fn set_variants(
705        &self,
706        name: &str,
707        variants: Vec<VariantConfig>,
708        actor: Option<&str>,
709    ) -> Result<(), ExperimentStoreError> {
710        {
711            let mut inner = self.inner.write().unwrap();
712            let active_variants: std::collections::HashSet<String> = inner
713                .assignments
714                .values()
715                .filter(|a| a.experiment == name)
716                .map(|a| a.variant.clone())
717                .collect();
718
719            let new_variants: std::collections::HashSet<&str> =
720                variants.iter().map(|v| v.name.as_str()).collect();
721
722            for variant in active_variants {
723                if !new_variants.contains(variant.as_str()) {
724                    return Err(ExperimentStoreError::Backend(format!(
725                        "cannot delete variant '{variant}' because it has active assignments"
726                    )));
727                }
728            }
729
730            if let Some(exp) = inner.experiments.get_mut(name) {
731                exp.variants = variants;
732                exp.updated_at_secs = now_secs();
733            }
734            inner
735                .changes
736                .entry(name.to_owned())
737                .or_default()
738                .push(ChangeRecord::now(name, "set_weights", actor));
739        }
740        Ok(())
741    }
742
743    fn get_assignment(
744        &self,
745        experiment: &str,
746        actor: &str,
747    ) -> Result<Option<Assignment>, ExperimentStoreError> {
748        let inner = self.inner.read().unwrap();
749        Ok(inner
750            .assignments
751            .get(&(experiment.to_owned(), actor.to_owned()))
752            .cloned())
753    }
754
755    fn record_assignment(&self, assignment: Assignment) -> Result<String, ExperimentStoreError> {
756        let mut inner = self.inner.write().unwrap();
757
758        if !assignment.is_override {
759            // Find the exclusion group for this experiment.
760            if let Some(group) = inner
761                .experiments
762                .get(&assignment.experiment)
763                .and_then(|c| c.exclusion_group.as_ref())
764            {
765                // Check if there is a sibling assignment in the same group.
766                for (exp_name, exp_config) in &inner.experiments {
767                    if exp_name == &assignment.experiment {
768                        continue;
769                    }
770                    if exp_config.exclusion_group.as_ref() == Some(group)
771                        && inner
772                            .assignments
773                            .contains_key(&(exp_name.clone(), assignment.actor.clone()))
774                    {
775                        return Err(ExperimentStoreError::Backend(format!(
776                            "ExcludedByGroup:{group}"
777                        )));
778                    }
779                }
780            }
781        }
782
783        let variant = assignment.variant.clone();
784        let key = (assignment.experiment.clone(), assignment.actor.clone());
785        inner.assignments.insert(key, assignment);
786        drop(inner);
787        Ok(variant)
788    }
789
790    fn get_override(
791        &self,
792        experiment: &str,
793        actor: &str,
794    ) -> Result<Option<String>, ExperimentStoreError> {
795        let inner = self.inner.read().unwrap();
796        Ok(inner
797            .overrides
798            .get(&(experiment.to_owned(), actor.to_owned()))
799            .cloned())
800    }
801
802    fn set_override(
803        &self,
804        experiment: &str,
805        actor: &str,
806        variant: &str,
807    ) -> Result<(), ExperimentStoreError> {
808        let key = (experiment.to_owned(), actor.to_owned());
809        self.inner
810            .write()
811            .unwrap()
812            .overrides
813            .insert(key, variant.to_owned());
814        Ok(())
815    }
816
817    fn has_assignment_in_group(
818        &self,
819        actor: &str,
820        group: &str,
821        exclude_experiment: &str,
822    ) -> Result<bool, ExperimentStoreError> {
823        let inner = self.inner.read().unwrap();
824        for (exp_name, config) in &inner.experiments {
825            if exp_name == exclude_experiment {
826                continue;
827            }
828            if config.exclusion_group.as_deref() != Some(group) {
829                continue;
830            }
831            if inner
832                .assignments
833                .contains_key(&(exp_name.clone(), actor.to_owned()))
834            {
835                return Ok(true);
836            }
837        }
838        Ok(false)
839    }
840
841    fn history(
842        &self,
843        experiment: &str,
844        limit: usize,
845    ) -> Result<Vec<ChangeRecord>, ExperimentStoreError> {
846        let records = {
847            let inner = self.inner.read().unwrap();
848            inner
849                .changes
850                .get(experiment)
851                .map(|v| {
852                    if limit == 0 {
853                        v.clone()
854                    } else {
855                        v.iter().rev().take(limit).cloned().collect()
856                    }
857                })
858                .unwrap_or_default()
859        };
860        Ok(records)
861    }
862}
863
864// ── Hash and bucketing helpers ────────────────────────────────────────────────
865
866fn now_secs() -> u64 {
867    SystemTime::now()
868        .duration_since(UNIX_EPOCH)
869        .unwrap_or_default()
870        .as_secs()
871}
872
873/// FNV-1a 64-bit hash of a byte slice.
874///
875/// Stable, dependency-free, and specified by the FNV standard.
876fn fnv1a_64(data: &[u8]) -> u64 {
877    const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
878    const FNV_PRIME: u64 = 1_099_511_628_211;
879    let mut hash = FNV_OFFSET;
880    for &byte in data {
881        #[allow(clippy::cast_lossless)]
882        {
883            hash ^= byte as u64;
884        }
885        hash = hash.wrapping_mul(FNV_PRIME);
886    }
887    hash
888}
889
890/// Compute the assignment bucket for `(experiment_name, actor_id)`.
891///
892/// Returns a value in `[0, 10 000)`. The same inputs always produce the same
893/// output across restarts, replicas, and library versions.
894///
895/// ## Algorithm
896///
897/// FNV-1a 64-bit hash of the UTF-8 encoding of
898/// `"<experiment_name>:<actor_id>"`, reduced modulo 10 000.
899///
900/// **This function MUST NOT change** between releases without a documented
901/// migration path: changing it silently re-buckets every actor in every running
902/// experiment, corrupting all in-flight A/B tests. If the algorithm must change,
903/// bump the experiment schema version, migrate existing assignments, and update
904/// the regression test below.
905#[must_use]
906pub fn experiment_bucket(experiment: &str, actor_id: &str) -> u64 {
907    let key = format!("{experiment}:{actor_id}");
908    fnv1a_64(key.as_bytes()) % 10_000
909}
910
911/// Select a variant from `variants` given a `bucket` in `[0, 10 000)`.
912///
913/// Returns `None` if all variant weights are zero.
914fn select_variant(variants: &[VariantConfig], bucket: u64) -> Option<&str> {
915    let total_weight: u64 = variants.iter().map(|v| u64::from(v.weight)).sum();
916    if total_weight == 0 {
917        return None;
918    }
919    let threshold = bucket * total_weight / 10_000;
920    let mut cumulative: u64 = 0;
921    for v in variants {
922        cumulative += u64::from(v.weight);
923        if threshold < cumulative {
924            return Some(&v.name);
925        }
926    }
927    variants.last().map(|v| v.name.as_str())
928}
929
930// ── ExperimentError ───────────────────────────────────────────────────────────
931
932/// Error from [`ExperimentService::assign`] or other service methods.
933#[derive(Debug, thiserror::Error)]
934pub enum ExperimentError {
935    /// No experiment with this name was found.
936    #[error("experiment '{0}' not found")]
937    NotFound(String),
938
939    /// The experiment exists but is not in `Running` state.
940    #[error("experiment '{0}' is not running (state: {1})")]
941    NotRunning(String, ExperimentState),
942
943    /// The experiment is archived and rejects all assignments.
944    #[error("experiment '{0}' is archived")]
945    Archived(String),
946
947    /// The actor is excluded from this experiment by a mutual exclusion group.
948    #[error("actor excluded from experiment '{0}' by mutual exclusion group '{1}'")]
949    ExcludedByGroup(String, String),
950
951    /// No variant could be selected (all variant weights are zero).
952    #[error("experiment '{0}' has no assignable variant (all weights are zero)")]
953    NoVariant(String),
954
955    /// Store backend failure.
956    #[error(transparent)]
957    Store(#[from] ExperimentStoreError),
958}
959
960// ── Validation helpers ────────────────────────────────────────────────────────
961
962fn validate_variants(variants: &[VariantConfig]) -> Result<(), ExperimentError> {
963    let mut seen = std::collections::HashSet::new();
964    for v in variants {
965        if v.name.trim().is_empty() {
966            return Err(ExperimentError::NoVariant(
967                "variant name must not be empty".into(),
968            ));
969        }
970        if !seen.insert(v.name.as_str()) {
971            return Err(ExperimentError::NoVariant(format!(
972                "duplicate variant name: '{}'",
973                v.name
974            )));
975        }
976    }
977    Ok(())
978}
979
980// ── ExperimentService ─────────────────────────────────────────────────────────
981
982/// The main experiment service.
983///
984/// Wrap an [`ExperimentStore`] (for persistence) and an optional
985/// [`ExposureSink`] (default: [`TracingExposureSink`]). The service is cheaply
986/// clone-able and intended to be stored as an `AppState` extension.
987///
988/// # Example
989///
990/// ```rust
991/// use autumn_web::experiments::{
992///     ExperimentConfig, ExperimentService, InMemoryExperimentStore, VariantConfig,
993/// };
994/// use std::sync::Arc;
995///
996/// let store = Arc::new(InMemoryExperimentStore::new());
997/// let svc = ExperimentService::new(store);
998///
999/// svc.create(ExperimentConfig::new("onboarding_v3", vec![
1000///     VariantConfig::new("control", 50),
1001///     VariantConfig::new("wizard", 50),
1002/// ])).unwrap();
1003/// svc.start("onboarding_v3").unwrap();
1004///
1005/// let variant = svc.assign("onboarding_v3", "user:42").unwrap();
1006/// assert!(matches!(variant.as_str(), "control" | "wizard"));
1007/// ```
1008#[derive(Clone)]
1009pub struct ExperimentService {
1010    store: Arc<dyn ExperimentStore>,
1011    exposure_sink: Arc<dyn ExposureSink>,
1012}
1013
1014impl std::fmt::Debug for ExperimentService {
1015    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1016        f.debug_struct("ExperimentService").finish_non_exhaustive()
1017    }
1018}
1019
1020impl ExperimentService {
1021    /// Create a new service backed by `store`, with the default
1022    /// [`TracingExposureSink`].
1023    #[must_use]
1024    pub fn new(store: Arc<dyn ExperimentStore>) -> Self {
1025        Self {
1026            store,
1027            exposure_sink: Arc::new(TracingExposureSink),
1028        }
1029    }
1030
1031    /// Override the default [`ExposureSink`].
1032    #[must_use]
1033    pub fn with_exposure_sink(mut self, sink: Arc<dyn ExposureSink>) -> Self {
1034        self.exposure_sink = sink;
1035        self
1036    }
1037
1038    /// Assign a variant to `actor` in `experiment`.
1039    ///
1040    /// Assignment rules (evaluated in order):
1041    /// 1. `Archived` → `Err(Archived)`.
1042    /// 2. `Concluded` → `Ok(winner)` without emitting an exposure.
1043    /// 3. `Draft` → `Err(NotRunning)`.
1044    /// 4. Staff/QA override present → return override variant (exposure emitted,
1045    ///    tagged `is_override = true`).
1046    /// 5. Sticky assignment already recorded → return cached variant (exposure
1047    ///    emitted each call).
1048    /// 6. Mutual exclusion check → if `actor` has any assignment in a sibling
1049    ///    experiment in the same group, return `Err(ExcludedByGroup)`.
1050    /// 7. Compute bucket → select variant by weight → store sticky → emit exposure.
1051    ///
1052    /// # Errors
1053    ///
1054    /// - [`ExperimentError::NotFound`] — no such experiment.
1055    /// - [`ExperimentError::Archived`] — experiment is archived.
1056    /// - [`ExperimentError::NotRunning`] — experiment is in `Draft` state.
1057    /// - [`ExperimentError::ExcludedByGroup`] — actor excluded by mutual exclusion.
1058    /// - [`ExperimentError::NoVariant`] — all variant weights are zero.
1059    /// - [`ExperimentError::Store`] — backend failure.
1060    pub fn assign(&self, experiment: &str, actor: &str) -> Result<String, ExperimentError> {
1061        self.assign_with_request_id(experiment, actor, None)
1062    }
1063
1064    /// Like [`assign`](Self::assign) but propagates a `request_id` into the
1065    /// exposure record.
1066    ///
1067    /// Call this from HTTP handlers where a request trace ID is available.
1068    ///
1069    /// # Errors
1070    ///
1071    /// See [`assign`](Self::assign).
1072    pub fn assign_with_request_id(
1073        &self,
1074        experiment: &str,
1075        actor: &str,
1076        request_id: Option<&str>,
1077    ) -> Result<String, ExperimentError> {
1078        let config = self
1079            .store
1080            .get(experiment)?
1081            .ok_or_else(|| ExperimentError::NotFound(experiment.to_owned()))?;
1082
1083        match config.state {
1084            ExperimentState::Archived => {
1085                return Err(ExperimentError::Archived(experiment.to_owned()));
1086            }
1087            ExperimentState::Concluded => {
1088                // Return winner for all actors; no exposure emitted (experiment is done).
1089                let winner = config
1090                    .winner
1091                    .ok_or_else(|| ExperimentError::NoVariant(experiment.to_owned()))?;
1092                return Ok(winner);
1093            }
1094            ExperimentState::Draft => {
1095                return Err(ExperimentError::NotRunning(
1096                    experiment.to_owned(),
1097                    ExperimentState::Draft,
1098                ));
1099            }
1100            ExperimentState::Running => {}
1101        }
1102
1103        // Check for staff/QA override (takes precedence over sticky).
1104        // Skip stale overrides whose variant was removed from the experiment config.
1105        if let Some(override_variant) = self.store.get_override(experiment, actor)?
1106            && config.variants.iter().any(|v| v.name == override_variant)
1107        {
1108            let sticky = Assignment::new(experiment, actor, &override_variant, true);
1109            if let Err(e) = self.store.record_assignment(sticky) {
1110                match e {
1111                    ExperimentStoreError::Backend(msg) if msg.starts_with("ExcludedByGroup:") => {
1112                        let group = msg
1113                            .strip_prefix("ExcludedByGroup:")
1114                            .unwrap_or("")
1115                            .trim()
1116                            .to_owned();
1117                        return Err(ExperimentError::ExcludedByGroup(
1118                            experiment.to_owned(),
1119                            group,
1120                        ));
1121                    }
1122                    other @ ExperimentStoreError::Backend(_) => {
1123                        return Err(ExperimentError::Store(other));
1124                    }
1125                }
1126            }
1127            self.emit_exposure(experiment, &override_variant, actor, request_id, true);
1128            return Ok(override_variant);
1129        }
1130
1131        // Return existing sticky assignment (emit exposure each call).
1132        if let Some(existing) = self.store.get_assignment(experiment, actor)? {
1133            self.emit_exposure(
1134                experiment,
1135                &existing.variant,
1136                actor,
1137                request_id,
1138                existing.is_override,
1139            );
1140            return Ok(existing.variant);
1141        }
1142
1143        // Mutual exclusion group check.
1144        if let Some(group) = &config.exclusion_group
1145            && self
1146                .store
1147                .has_assignment_in_group(actor, group, experiment)?
1148        {
1149            return Err(ExperimentError::ExcludedByGroup(
1150                experiment.to_owned(),
1151                group.clone(),
1152            ));
1153        }
1154
1155        // Bucket the actor and pick a variant.
1156        let bucket = experiment_bucket(experiment, actor);
1157        let variant_name = select_variant(&config.variants, bucket)
1158            .ok_or_else(|| ExperimentError::NoVariant(experiment.to_owned()))?
1159            .to_owned();
1160
1161        // Store sticky assignment. Use the persisted variant (the winner of any
1162        // concurrent first-write race), not the locally-computed one.
1163        let assignment = Assignment::new(experiment, actor, &variant_name, false);
1164        let persisted_variant = match self.store.record_assignment(assignment) {
1165            Ok(v) => v,
1166            Err(ExperimentStoreError::Backend(msg)) if msg.starts_with("ExcludedByGroup:") => {
1167                let group = msg
1168                    .strip_prefix("ExcludedByGroup:")
1169                    .unwrap_or("")
1170                    .trim()
1171                    .to_owned();
1172                return Err(ExperimentError::ExcludedByGroup(
1173                    experiment.to_owned(),
1174                    group,
1175                ));
1176            }
1177            Err(other @ ExperimentStoreError::Backend(_)) => {
1178                return Err(ExperimentError::Store(other));
1179            }
1180        };
1181        self.emit_exposure(experiment, &persisted_variant, actor, request_id, false);
1182
1183        Ok(persisted_variant)
1184    }
1185
1186    fn emit_exposure(
1187        &self,
1188        experiment: &str,
1189        variant: &str,
1190        actor: &str,
1191        request_id: Option<&str>,
1192        is_override: bool,
1193    ) {
1194        self.exposure_sink.record(ExposureRecord {
1195            experiment: experiment.to_owned(),
1196            variant: variant.to_owned(),
1197            actor: actor.to_owned(),
1198            request_id: request_id.map(str::to_owned),
1199            is_override,
1200            timestamp_secs: now_secs(),
1201        });
1202    }
1203
1204    /// Declare a new experiment (starts in `Draft` state).
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns [`ExperimentError::Store`] on backend failure.
1209    pub fn create(&self, config: ExperimentConfig) -> Result<(), ExperimentError> {
1210        validate_variants(&config.variants)?;
1211        if config.state == ExperimentState::Concluded {
1212            let winner = config
1213                .winner
1214                .as_deref()
1215                .filter(|w| !w.trim().is_empty())
1216                .ok_or_else(|| {
1217                    ExperimentError::NoVariant(
1218                        "concluded experiment requires a non-empty winner".into(),
1219                    )
1220                })?;
1221            if !config.variants.iter().any(|v| v.name == winner) {
1222                return Err(ExperimentError::NoVariant(format!(
1223                    "'{winner}' is not a configured variant"
1224                )));
1225            }
1226        }
1227        self.store.upsert(config)?;
1228        Ok(())
1229    }
1230
1231    /// Transition a `Draft` or `Concluded` experiment to `Running`.
1232    ///
1233    /// `Archived` experiments are terminal and cannot be restarted.
1234    ///
1235    /// # Errors
1236    ///
1237    /// Returns [`ExperimentError::NotFound`] if the experiment is unknown.
1238    /// Returns [`ExperimentError::Archived`] if the experiment is archived.
1239    pub fn start(&self, name: &str) -> Result<(), ExperimentError> {
1240        let config = self
1241            .store
1242            .get(name)?
1243            .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1244        if config.state == ExperimentState::Archived {
1245            return Err(ExperimentError::Archived(name.to_owned()));
1246        }
1247        self.store.set_state(name, ExperimentState::Running, None)?;
1248        Ok(())
1249    }
1250
1251    /// Conclude a running experiment, pinning `winner` as the result.
1252    ///
1253    /// After concluding, `assign()` returns `winner` for all actors without
1254    /// emitting new exposure events.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Returns [`ExperimentError::NotFound`] if the experiment is unknown.
1259    pub fn conclude(&self, name: &str, winner: &str) -> Result<(), ExperimentError> {
1260        let config = self
1261            .store
1262            .get(name)?
1263            .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1264        if config.state == ExperimentState::Archived {
1265            return Err(ExperimentError::Archived(name.to_owned()));
1266        }
1267        if !config.variants.iter().any(|v| v.name == winner) {
1268            return Err(ExperimentError::NoVariant(format!(
1269                "'{winner}' is not a configured variant of experiment '{name}'"
1270            )));
1271        }
1272        self.store
1273            .set_state(name, ExperimentState::Concluded, Some(winner))?;
1274        Ok(())
1275    }
1276
1277    /// Archive an experiment.
1278    ///
1279    /// Archived experiments reject all new assignments with
1280    /// [`ExperimentError::Archived`].
1281    ///
1282    /// # Errors
1283    ///
1284    /// Returns [`ExperimentError::NotFound`] if the experiment is unknown.
1285    pub fn archive(&self, name: &str) -> Result<(), ExperimentError> {
1286        self.store
1287            .get(name)?
1288            .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1289        self.store
1290            .set_state(name, ExperimentState::Archived, None)?;
1291        Ok(())
1292    }
1293
1294    /// Update the variant weights for `name`.
1295    ///
1296    /// Existing sticky assignments are **not** re-bucketed: already-assigned
1297    /// actors keep their variant. New actors are bucketed against the updated
1298    /// weights.
1299    ///
1300    /// # Errors
1301    ///
1302    /// Returns [`ExperimentError::NotFound`] if the experiment is unknown.
1303    pub fn set_weights(
1304        &self,
1305        name: &str,
1306        variants: Vec<VariantConfig>,
1307        actor: Option<&str>,
1308    ) -> Result<(), ExperimentError> {
1309        validate_variants(&variants)?;
1310        let config = self
1311            .store
1312            .get(name)?
1313            .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1314        match config.state {
1315            ExperimentState::Concluded => {
1316                return Err(ExperimentError::NotRunning(
1317                    name.to_owned(),
1318                    ExperimentState::Concluded,
1319                ));
1320            }
1321            ExperimentState::Archived => {
1322                return Err(ExperimentError::Archived(name.to_owned()));
1323            }
1324            _ => {}
1325        }
1326        self.store.set_variants(name, variants, actor)?;
1327        Ok(())
1328    }
1329
1330    /// Pin `actor` to `variant` in `experiment`, bypassing weight-based bucketing.
1331    ///
1332    /// Overrides are used by staff/QA to force a specific variant during manual
1333    /// testing. Exposure events include `is_override: true`.
1334    ///
1335    /// # Errors
1336    ///
1337    /// Returns [`ExperimentError::NotFound`] if the experiment is unknown.
1338    pub fn set_override(
1339        &self,
1340        experiment: &str,
1341        actor: &str,
1342        variant: &str,
1343    ) -> Result<(), ExperimentError> {
1344        let config = self
1345            .store
1346            .get(experiment)?
1347            .ok_or_else(|| ExperimentError::NotFound(experiment.to_owned()))?;
1348        if !config.variants.iter().any(|v| v.name == variant) {
1349            return Err(ExperimentError::NoVariant(format!(
1350                "'{variant}' is not a configured variant of experiment '{experiment}'"
1351            )));
1352        }
1353        self.store.set_override(experiment, actor, variant)?;
1354        Ok(())
1355    }
1356
1357    /// List all declared experiments.
1358    ///
1359    /// # Errors
1360    ///
1361    /// Returns [`ExperimentError::Store`] on backend failure.
1362    pub fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentError> {
1363        Ok(self.store.list()?)
1364    }
1365
1366    /// Return the current configuration for `name`.
1367    ///
1368    /// # Errors
1369    ///
1370    /// Returns [`ExperimentError::NotFound`] if the experiment is unknown.
1371    pub fn status(&self, name: &str) -> Result<ExperimentConfig, ExperimentError> {
1372        self.store
1373            .get(name)?
1374            .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))
1375    }
1376
1377    /// Return the change log for `experiment` (most-recent first), capped at `limit`.
1378    ///
1379    /// # Errors
1380    ///
1381    /// Returns [`ExperimentError::Store`] on backend failure.
1382    pub fn history(
1383        &self,
1384        experiment: &str,
1385        limit: usize,
1386    ) -> Result<Vec<ChangeRecord>, ExperimentError> {
1387        Ok(self.store.history(experiment, limit)?)
1388    }
1389}
1390
1391// ── Experiments extractor ─────────────────────────────────────────────────────
1392
1393/// Request extractor that resolves the current user's experiment service handle.
1394///
1395/// Extracts [`ExperimentService`] from the `AppState` extension slot. Fails
1396/// with `500 Internal Server Error` if no service has been registered.
1397///
1398/// Also resolves:
1399/// - **`actor_id`** from the session `user_id` key (Autumn's default).
1400/// - **`request_id`** from the `x-request-id` HTTP header (if present).
1401///
1402/// # Example
1403///
1404/// ```rust,ignore
1405/// use autumn_web::prelude::*;
1406/// use autumn_web::experiments::Experiments;
1407///
1408/// #[get("/checkout")]
1409/// async fn checkout(exps: Experiments) -> AutumnResult<Markup> {
1410///     let variant = exps.assign("checkout_v2")?;
1411///     Ok(html! {
1412///         @match variant.as_str() {
1413///             "treatment" => (render_new_checkout()),
1414///             _           => (render_classic_checkout()),
1415///         }
1416///     })
1417/// }
1418/// ```
1419pub struct Experiments {
1420    service: ExperimentService,
1421    actor_id: Option<String>,
1422    request_id: Option<String>,
1423}
1424
1425impl Experiments {
1426    /// Assign a variant for the current session actor.
1427    ///
1428    /// Propagates the session actor ID and `x-request-id` header automatically.
1429    /// For logged-out sessions, falls back to the session ID so each visitor
1430    /// gets a stable, per-session bucket rather than collapsing all anonymous
1431    /// traffic into a single `"anonymous"` actor.
1432    ///
1433    /// # Errors
1434    ///
1435    /// See [`ExperimentService::assign_with_request_id`].
1436    pub fn assign(&self, experiment: &str) -> Result<String, ExperimentError> {
1437        let actor = self.actor_id.as_deref().unwrap_or("anonymous");
1438        self.service
1439            .assign_with_request_id(experiment, actor, self.request_id.as_deref())
1440    }
1441
1442    /// Return the underlying service for direct access to admin operations.
1443    #[must_use]
1444    pub const fn service(&self) -> &ExperimentService {
1445        &self.service
1446    }
1447}
1448
1449impl axum::extract::FromRequestParts<crate::AppState> for Experiments {
1450    type Rejection = crate::AutumnError;
1451
1452    async fn from_request_parts(
1453        parts: &mut axum::http::request::Parts,
1454        state: &crate::AppState,
1455    ) -> Result<Self, Self::Rejection> {
1456        let service = state
1457            .extension::<ExperimentService>()
1458            .map(|arc| (*arc).clone())
1459            .ok_or_else(|| {
1460                crate::AutumnError::internal_server_error_msg(
1461                    "experiment service not registered; \
1462                     install an ExperimentStore via AppBuilder::with_experiment_store()",
1463                )
1464            })?;
1465
1466        let actor_id = if let Some(session) = parts.extensions.get::<crate::session::Session>() {
1467            // Use the configured auth session key (e.g. "user_id"); fall back to
1468            // the session ID so each anonymous visitor gets a stable, per-session
1469            // bucket rather than all collapsing into a single "anonymous" actor.
1470            let session_key = state.auth_session_key();
1471            if let Some(uid) = session.get(session_key).await {
1472                Some(uid)
1473            } else {
1474                // Use or create a stable per-session anonymous actor. We must
1475                // insert into the session (marking it dirty) so the framework
1476                // sets a cookie and the ID persists across requests; a bare
1477                // session.id() call does not mark the session dirty.
1478                const ANON_KEY: &str = "_autumn_anon_actor";
1479                if let Some(existing) = session.get(ANON_KEY).await {
1480                    Some(existing)
1481                } else {
1482                    let id = session.id().await;
1483                    session.insert(ANON_KEY, &id).await;
1484                    Some(id)
1485                }
1486            }
1487        } else {
1488            None
1489        };
1490
1491        let request_id = parts
1492            .headers
1493            .get("x-request-id")
1494            .and_then(|v| v.to_str().ok())
1495            .map(str::to_owned);
1496
1497        Ok(Self {
1498            service,
1499            actor_id,
1500            request_id,
1501        })
1502    }
1503}
1504
1505// ── pg module ─────────────────────────────────────────────────────────────────
1506
1507#[cfg(feature = "db")]
1508pub mod pg {
1509    use super::{
1510        Assignment, ChangeRecord, ExperimentConfig, ExperimentState, ExperimentStore,
1511        ExperimentStoreError, VariantConfig,
1512    };
1513    use diesel::prelude::*;
1514    use std::collections::HashMap;
1515    use std::sync::RwLock;
1516    use std::time::{Duration, Instant};
1517
1518    // ── Cache types ───────────────────────────────────────────────────────────
1519
1520    #[derive(Debug, Clone, PartialEq, Eq)]
1521    enum CacheLookup {
1522        Hit(Option<ExperimentConfig>),
1523        Miss,
1524    }
1525
1526    #[derive(Debug, Clone)]
1527    struct CachedEntry {
1528        value: Option<ExperimentConfig>,
1529        expires_at: Instant,
1530    }
1531
1532    // ── Row structs ───────────────────────────────────────────────────────────
1533
1534    #[derive(diesel::QueryableByName)]
1535    struct ExperimentRow {
1536        #[diesel(sql_type = diesel::sql_types::Text)]
1537        name: String,
1538        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1539        description: Option<String>,
1540        #[diesel(sql_type = diesel::sql_types::Text)]
1541        state: String,
1542        #[diesel(sql_type = diesel::sql_types::Text)]
1543        variants: String,
1544        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1545        winner: Option<String>,
1546        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1547        exclusion_group: Option<String>,
1548        #[diesel(sql_type = diesel::sql_types::BigInt)]
1549        updated_at_secs: i64,
1550    }
1551
1552    impl ExperimentRow {
1553        fn into_config(self) -> ExperimentConfig {
1554            let variants: Vec<VariantConfig> =
1555                serde_json::from_str(&self.variants).unwrap_or_default();
1556            let state = match self.state.as_str() {
1557                "running" => ExperimentState::Running,
1558                "concluded" => ExperimentState::Concluded,
1559                "archived" => ExperimentState::Archived,
1560                _ => ExperimentState::Draft,
1561            };
1562            ExperimentConfig {
1563                name: self.name,
1564                description: self.description,
1565                state,
1566                variants,
1567                winner: self.winner,
1568                exclusion_group: self.exclusion_group,
1569                updated_at_secs: u64::try_from(self.updated_at_secs).unwrap_or(0),
1570            }
1571        }
1572    }
1573
1574    #[derive(diesel::QueryableByName)]
1575    struct AssignmentRow {
1576        #[diesel(sql_type = diesel::sql_types::Text)]
1577        experiment: String,
1578        #[diesel(sql_type = diesel::sql_types::Text)]
1579        actor: String,
1580        #[diesel(sql_type = diesel::sql_types::Text)]
1581        variant: String,
1582        #[diesel(sql_type = diesel::sql_types::Bool)]
1583        is_override: bool,
1584        #[diesel(sql_type = diesel::sql_types::BigInt)]
1585        assigned_at_secs: i64,
1586    }
1587
1588    #[derive(diesel::QueryableByName)]
1589    struct BoolRow {
1590        #[diesel(sql_type = diesel::sql_types::Bool)]
1591        result: bool,
1592    }
1593
1594    #[derive(diesel::QueryableByName)]
1595    struct VariantNameRow {
1596        #[diesel(sql_type = diesel::sql_types::Text)]
1597        variant: String,
1598    }
1599
1600    #[derive(diesel::QueryableByName)]
1601    struct ExclusionGroupRow {
1602        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1603        exclusion_group: Option<String>,
1604    }
1605
1606    #[derive(diesel::QueryableByName)]
1607    struct ChangeRow {
1608        #[diesel(sql_type = diesel::sql_types::Text)]
1609        experiment: String,
1610        #[diesel(sql_type = diesel::sql_types::Text)]
1611        mutation: String,
1612        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1613        actor: Option<String>,
1614        #[diesel(sql_type = diesel::sql_types::BigInt)]
1615        timestamp_secs: i64,
1616    }
1617
1618    #[derive(diesel::QueryableByName)]
1619    struct ChangeExperimentRow {
1620        #[diesel(sql_type = diesel::sql_types::Text)]
1621        experiment: String,
1622    }
1623
1624    // ── PgExperimentStore ─────────────────────────────────────────────────────
1625
1626    /// Postgres-backed [`ExperimentStore`] with a short-lived read-through cache.
1627    ///
1628    /// Writes trigger `pg_notify('autumn_experiments', name)` so replicas can
1629    /// invalidate their caches quickly via a background poll listener.
1630    #[derive(Debug)]
1631    pub struct PgExperimentStore {
1632        database_url: String,
1633        cache_ttl: Duration,
1634        cache: RwLock<HashMap<String, CachedEntry>>,
1635    }
1636
1637    impl Clone for PgExperimentStore {
1638        fn clone(&self) -> Self {
1639            Self::with_cache_ttl(self.database_url.clone(), self.cache_ttl)
1640        }
1641    }
1642
1643    impl PgExperimentStore {
1644        /// Default read-through cache lifetime.
1645        pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(1);
1646
1647        /// Create a store using the default 1 s read-through cache.
1648        #[must_use]
1649        pub fn new(database_url: impl Into<String>) -> Self {
1650            Self::with_cache_ttl(database_url, Self::DEFAULT_CACHE_TTL)
1651        }
1652
1653        /// Create a store with an explicit cache TTL. Use `Duration::ZERO` to
1654        /// disable caching.
1655        #[must_use]
1656        pub fn with_cache_ttl(database_url: impl Into<String>, cache_ttl: Duration) -> Self {
1657            Self {
1658                database_url: database_url.into(),
1659                cache_ttl,
1660                cache: RwLock::new(HashMap::new()),
1661            }
1662        }
1663
1664        /// Create a store from Autumn's primary database configuration.
1665        #[must_use]
1666        pub fn from_database_config(config: &crate::config::DatabaseConfig) -> Option<Self> {
1667            config.effective_primary_url().map(Self::new)
1668        }
1669
1670        fn connect(&self) -> Result<diesel::PgConnection, ExperimentStoreError> {
1671            diesel::PgConnection::establish(&self.database_url)
1672                .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
1673        }
1674
1675        fn cached(&self, name: &str) -> CacheLookup {
1676            let now = Instant::now();
1677            let Ok(cache) = self.cache.read() else {
1678                return CacheLookup::Miss;
1679            };
1680            match cache.get(name) {
1681                Some(c) if c.expires_at > now => CacheLookup::Hit(c.value.clone()),
1682                _ => CacheLookup::Miss,
1683            }
1684        }
1685
1686        fn store_cache(&self, name: &str, value: Option<ExperimentConfig>) {
1687            if self.cache_ttl.is_zero() {
1688                return;
1689            }
1690            let Some(expires_at) = Instant::now().checked_add(self.cache_ttl) else {
1691                return;
1692            };
1693            if let Ok(mut cache) = self.cache.write() {
1694                cache.insert(name.to_owned(), CachedEntry { value, expires_at });
1695            }
1696        }
1697
1698        fn invalidate(&self, name: &str) {
1699            if let Ok(mut cache) = self.cache.write() {
1700                cache.remove(name);
1701            }
1702        }
1703
1704        /// Spawn a background thread that polls `autumn_experiment_changes` and
1705        /// invalidates this store's cache for changed experiments.
1706        ///
1707        /// The thread runs indefinitely; the returned handle can be detached.
1708        pub fn spawn_poll_listener(
1709            store: std::sync::Arc<Self>,
1710            poll_interval: Duration,
1711        ) -> std::thread::JoinHandle<()> {
1712            std::thread::spawn(move || {
1713                const OVERLAP_SECS: i64 = 5;
1714                let now_secs = || {
1715                    i64::try_from(
1716                        std::time::SystemTime::now()
1717                            .duration_since(std::time::UNIX_EPOCH)
1718                            .unwrap_or_default()
1719                            .as_secs(),
1720                    )
1721                    .unwrap_or(i64::MAX)
1722                };
1723                let mut last_polled_secs: i64 = now_secs() - OVERLAP_SECS;
1724
1725                loop {
1726                    std::thread::sleep(poll_interval);
1727                    let new_horizon = now_secs() - OVERLAP_SECS;
1728                    if let Ok(mut conn) = store.connect() {
1729                        let rows: Vec<ChangeExperimentRow> = diesel::sql_query(
1730                            "SELECT DISTINCT experiment FROM autumn_experiment_changes \
1731                             WHERE changed_at > to_timestamp($1)",
1732                        )
1733                        .bind::<diesel::sql_types::BigInt, _>(last_polled_secs)
1734                        .load::<ChangeExperimentRow>(&mut conn)
1735                        .unwrap_or_default();
1736
1737                        for row in rows {
1738                            store.invalidate(&row.experiment);
1739                        }
1740                    }
1741                    last_polled_secs = new_horizon;
1742                }
1743            })
1744        }
1745    }
1746
1747    // ── ExperimentStore impl ──────────────────────────────────────────────────
1748
1749    impl ExperimentStore for PgExperimentStore {
1750        fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError> {
1751            if let CacheLookup::Hit(v) = self.cached(name) {
1752                return Ok(v);
1753            }
1754            let mut conn = self.connect()?;
1755            let result = diesel::sql_query(
1756                "SELECT name, description, state::text, variants::text, winner, \
1757                        exclusion_group, \
1758                        EXTRACT(EPOCH FROM updated_at)::bigint AS updated_at_secs \
1759                 FROM autumn_experiments WHERE name = $1",
1760            )
1761            .bind::<diesel::sql_types::Text, _>(name)
1762            .get_result::<ExperimentRow>(&mut conn)
1763            .optional()
1764            .map(|r| r.map(ExperimentRow::into_config))
1765            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1766
1767            self.store_cache(name, result.clone());
1768            Ok(result)
1769        }
1770
1771        fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError> {
1772            let mut conn = self.connect()?;
1773            diesel::sql_query(
1774                "SELECT name, description, state::text, variants::text, winner, \
1775                        exclusion_group, \
1776                        EXTRACT(EPOCH FROM updated_at)::bigint AS updated_at_secs \
1777                 FROM autumn_experiments ORDER BY name",
1778            )
1779            .load::<ExperimentRow>(&mut conn)
1780            .map(|rows| rows.into_iter().map(ExperimentRow::into_config).collect())
1781            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
1782        }
1783
1784        fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError> {
1785            let mut conn = self.connect()?;
1786
1787            let active_variants = diesel::sql_query(
1788                "SELECT DISTINCT variant FROM autumn_experiment_assignments WHERE experiment = $1",
1789            )
1790            .bind::<diesel::sql_types::Text, _>(&config.name)
1791            .load::<VariantNameRow>(&mut conn)
1792            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1793
1794            let new_variants: std::collections::HashSet<&str> =
1795                config.variants.iter().map(|v| v.name.as_str()).collect();
1796
1797            for row in active_variants {
1798                if !new_variants.contains(row.variant.as_str()) {
1799                    return Err(ExperimentStoreError::Backend(format!(
1800                        "cannot delete variant '{}' because it has active assignments",
1801                        row.variant
1802                    )));
1803                }
1804            }
1805
1806            let variants_json =
1807                serde_json::to_string(&config.variants).unwrap_or_else(|_| "[]".to_owned());
1808            let state_str = config.state.to_string();
1809            let rows_affected = diesel::sql_query(
1810                "WITH upserted AS ( \
1811                     INSERT INTO autumn_experiments \
1812                         (name, description, state, variants, winner, exclusion_group) \
1813                     VALUES ($1, $2, $3::autumn_experiment_state, $4::jsonb, $5, $6) \
1814                     ON CONFLICT (name) DO UPDATE SET \
1815                         description = EXCLUDED.description, \
1816                         state = EXCLUDED.state, \
1817                         variants = EXCLUDED.variants, \
1818                         winner = EXCLUDED.winner, \
1819                         exclusion_group = EXCLUDED.exclusion_group, \
1820                         updated_at = NOW() \
1821                     WHERE NOT EXISTS ( \
1822                         SELECT 1 FROM autumn_experiment_assignments a \
1823                         WHERE a.experiment = EXCLUDED.name \
1824                           AND a.variant NOT IN ( \
1825                               SELECT x.name FROM jsonb_to_recordset(EXCLUDED.variants) AS x(name text) \
1826                           ) \
1827                     ) \
1828                     RETURNING name, (xmax = 0) AS is_insert \
1829                 ) \
1830                 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
1831                 SELECT name, CASE WHEN is_insert THEN 'created' ELSE 'updated' END, NULL FROM upserted",
1832            )
1833            .bind::<diesel::sql_types::Text, _>(&config.name)
1834            .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(config.description)
1835            .bind::<diesel::sql_types::Text, _>(&state_str)
1836            .bind::<diesel::sql_types::Text, _>(&variants_json)
1837            .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(config.winner)
1838            .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(config.exclusion_group)
1839            .execute(&mut conn)
1840            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1841
1842            if rows_affected == 0 {
1843                return Err(ExperimentStoreError::Backend(
1844                    "cannot delete variant because it has active assignments".to_owned(),
1845                ));
1846            }
1847
1848            self.invalidate(&config.name);
1849            Ok(())
1850        }
1851
1852        fn set_state(
1853            &self,
1854            name: &str,
1855            state: ExperimentState,
1856            winner: Option<&str>,
1857        ) -> Result<(), ExperimentStoreError> {
1858            let state_str = state.to_string();
1859            let mutation =
1860                winner.map_or_else(|| format!("state={state}"), |w| format!("concluded={w}"));
1861            let mut conn = self.connect()?;
1862            diesel::sql_query(
1863                "WITH updated AS ( \
1864                     UPDATE autumn_experiments \
1865                     SET state = $2::autumn_experiment_state, \
1866                         winner = COALESCE($3, winner), \
1867                         updated_at = NOW() \
1868                     WHERE name = $1 \
1869                     RETURNING name \
1870                 ) \
1871                 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
1872                 SELECT name, $4, NULL FROM updated",
1873            )
1874            .bind::<diesel::sql_types::Text, _>(name)
1875            .bind::<diesel::sql_types::Text, _>(&state_str)
1876            .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
1877                winner.map(str::to_owned),
1878            )
1879            .bind::<diesel::sql_types::Text, _>(&mutation)
1880            .execute(&mut conn)
1881            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1882            self.invalidate(name);
1883            Ok(())
1884        }
1885
1886        fn set_variants(
1887            &self,
1888            name: &str,
1889            variants: Vec<VariantConfig>,
1890            actor: Option<&str>,
1891        ) -> Result<(), ExperimentStoreError> {
1892            let mut conn = self.connect()?;
1893
1894            let active_variants = diesel::sql_query(
1895                "SELECT DISTINCT variant FROM autumn_experiment_assignments WHERE experiment = $1",
1896            )
1897            .bind::<diesel::sql_types::Text, _>(name)
1898            .load::<VariantNameRow>(&mut conn)
1899            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1900
1901            let new_variants: std::collections::HashSet<&str> =
1902                variants.iter().map(|v| v.name.as_str()).collect();
1903
1904            for row in active_variants {
1905                if !new_variants.contains(row.variant.as_str()) {
1906                    return Err(ExperimentStoreError::Backend(format!(
1907                        "cannot delete variant '{}' because it has active assignments",
1908                        row.variant
1909                    )));
1910                }
1911            }
1912
1913            let variants_json =
1914                serde_json::to_string(&variants).unwrap_or_else(|_| "[]".to_owned());
1915            let rows_affected = diesel::sql_query(
1916                "WITH updated AS ( \
1917                     UPDATE autumn_experiments \
1918                     SET variants = $2::jsonb, updated_at = NOW() \
1919                     WHERE name = $1 \
1920                       AND NOT EXISTS ( \
1921                           SELECT 1 FROM autumn_experiment_assignments a \
1922                           WHERE a.experiment = name \
1923                             AND a.variant NOT IN ( \
1924                                 SELECT x.name FROM jsonb_to_recordset($2::jsonb) AS x(name text) \
1925                             ) \
1926                       ) \
1927                     RETURNING name \
1928                 ) \
1929                 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
1930                 SELECT name, 'set_weights', $3 FROM updated",
1931            )
1932            .bind::<diesel::sql_types::Text, _>(name)
1933            .bind::<diesel::sql_types::Text, _>(&variants_json)
1934            .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
1935                actor.map(str::to_owned),
1936            )
1937            .execute(&mut conn)
1938            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1939
1940            if rows_affected == 0 {
1941                let exists_row = diesel::sql_query(
1942                    "SELECT EXISTS(SELECT 1 FROM autumn_experiments WHERE name = $1) AS result",
1943                )
1944                .bind::<diesel::sql_types::Text, _>(name)
1945                .get_result::<BoolRow>(&mut conn)
1946                .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1947
1948                if exists_row.result {
1949                    return Err(ExperimentStoreError::Backend(
1950                        "cannot delete variant because it has active assignments".to_owned(),
1951                    ));
1952                }
1953            }
1954
1955            self.invalidate(name);
1956            Ok(())
1957        }
1958
1959        fn get_assignment(
1960            &self,
1961            experiment: &str,
1962            actor: &str,
1963        ) -> Result<Option<Assignment>, ExperimentStoreError> {
1964            let mut conn = self.connect()?;
1965            diesel::sql_query(
1966                "SELECT experiment, actor, variant, is_override, \
1967                        EXTRACT(EPOCH FROM assigned_at)::bigint AS assigned_at_secs \
1968                 FROM autumn_experiment_assignments \
1969                 WHERE experiment = $1 AND actor = $2",
1970            )
1971            .bind::<diesel::sql_types::Text, _>(experiment)
1972            .bind::<diesel::sql_types::Text, _>(actor)
1973            .get_result::<AssignmentRow>(&mut conn)
1974            .optional()
1975            .map(|r| {
1976                r.map(|row| Assignment {
1977                    experiment: row.experiment,
1978                    actor: row.actor,
1979                    variant: row.variant,
1980                    is_override: row.is_override,
1981                    assigned_at_secs: u64::try_from(row.assigned_at_secs).unwrap_or(0),
1982                })
1983            })
1984            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
1985        }
1986
1987        fn record_assignment(
1988            &self,
1989            assignment: Assignment,
1990        ) -> Result<String, ExperimentStoreError> {
1991            use diesel::connection::Connection as _;
1992
1993            #[derive(Debug)]
1994            enum TxError {
1995                Database(diesel::result::Error),
1996                Excluded(String),
1997            }
1998            impl From<diesel::result::Error> for TxError {
1999                fn from(e: diesel::result::Error) -> Self {
2000                    Self::Database(e)
2001                }
2002            }
2003
2004            let mut conn = self.connect()?;
2005            let result = conn.transaction::<String, TxError, _>(|conn| {
2006                // 1. Acquire advisory lock on actor to serialize concurrent assignments for this actor.
2007                diesel::sql_query("SELECT pg_advisory_xact_lock(hashtext($1))")
2008                    .bind::<diesel::sql_types::Text, _>(&assignment.actor)
2009                    .execute(conn)?;
2010
2011                // 2. Check exclusion group if this is NOT an override.
2012                if !assignment.is_override {
2013                    // Fetch the exclusion group for this experiment.
2014                    let group_row = diesel::sql_query(
2015                        "SELECT exclusion_group FROM autumn_experiments WHERE name = $1"
2016                    )
2017                    .bind::<diesel::sql_types::Text, _>(&assignment.experiment)
2018                    .get_result::<ExclusionGroupRow>(conn)
2019                    .optional()?;
2020
2021                    if let Some(ExclusionGroupRow { exclusion_group: Some(group) }) = group_row {
2022                        // Check if there are other assignments in the same group.
2023                        let exists = diesel::sql_query(
2024                            "SELECT EXISTS ( \
2025                                 SELECT 1 \
2026                                 FROM autumn_experiment_assignments a \
2027                                 JOIN autumn_experiments e ON e.name = a.experiment \
2028                                 WHERE a.actor = $1 \
2029                                   AND e.exclusion_group = $2 \
2030                                   AND a.experiment <> $3 \
2031                             ) AS result",
2032                        )
2033                        .bind::<diesel::sql_types::Text, _>(&assignment.actor)
2034                        .bind::<diesel::sql_types::Text, _>(&group)
2035                        .bind::<diesel::sql_types::Text, _>(&assignment.experiment)
2036                        .get_result::<BoolRow>(conn)
2037                        .map(|r| r.result)?;
2038
2039                        if exists {
2040                            return Err(TxError::Excluded(group));
2041                        }
2042                    }
2043                }
2044
2045                // 3. Insert or update on conflict.
2046                let variant_name = diesel::sql_query(
2047                    "INSERT INTO autumn_experiment_assignments \
2048                         (experiment, actor, variant, is_override) \
2049                     VALUES ($1, $2, $3, $4) \
2050                     ON CONFLICT (experiment, actor) DO UPDATE \
2051                         SET variant = CASE WHEN EXCLUDED.is_override THEN EXCLUDED.variant ELSE autumn_experiment_assignments.variant END, \
2052                             is_override = CASE WHEN EXCLUDED.is_override THEN EXCLUDED.is_override ELSE autumn_experiment_assignments.is_override END \
2053                     RETURNING variant",
2054                )
2055                .bind::<diesel::sql_types::Text, _>(&assignment.experiment)
2056                .bind::<diesel::sql_types::Text, _>(&assignment.actor)
2057                .bind::<diesel::sql_types::Text, _>(&assignment.variant)
2058                .bind::<diesel::sql_types::Bool, _>(assignment.is_override)
2059                .get_result::<VariantNameRow>(conn)
2060                .map(|r| r.variant)?;
2061
2062                Ok(variant_name)
2063            });
2064
2065            match result {
2066                Ok(v) => Ok(v),
2067                Err(TxError::Database(e)) => Err(ExperimentStoreError::Backend(e.to_string())),
2068                Err(TxError::Excluded(group)) => Err(ExperimentStoreError::Backend(format!(
2069                    "ExcludedByGroup:{group}"
2070                ))),
2071            }
2072        }
2073
2074        fn get_override(
2075            &self,
2076            experiment: &str,
2077            actor: &str,
2078        ) -> Result<Option<String>, ExperimentStoreError> {
2079            let mut conn = self.connect()?;
2080            diesel::sql_query(
2081                "SELECT variant FROM autumn_experiment_overrides \
2082                 WHERE experiment = $1 AND actor = $2",
2083            )
2084            .bind::<diesel::sql_types::Text, _>(experiment)
2085            .bind::<diesel::sql_types::Text, _>(actor)
2086            .get_result::<VariantNameRow>(&mut conn)
2087            .optional()
2088            .map(|r| r.map(|row| row.variant))
2089            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
2090        }
2091
2092        fn set_override(
2093            &self,
2094            experiment: &str,
2095            actor: &str,
2096            variant: &str,
2097        ) -> Result<(), ExperimentStoreError> {
2098            let mut conn = self.connect()?;
2099            diesel::sql_query(
2100                "WITH upserted AS ( \
2101                     INSERT INTO autumn_experiment_overrides (experiment, actor, variant) \
2102                     VALUES ($1, $2, $3) \
2103                     ON CONFLICT (experiment, actor) DO UPDATE SET variant = EXCLUDED.variant \
2104                     RETURNING experiment, actor, variant \
2105                 ) \
2106                 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
2107                 SELECT experiment, 'override=' || actor || ':' || variant, NULL FROM upserted",
2108            )
2109            .bind::<diesel::sql_types::Text, _>(experiment)
2110            .bind::<diesel::sql_types::Text, _>(actor)
2111            .bind::<diesel::sql_types::Text, _>(variant)
2112            .execute(&mut conn)
2113            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
2114            Ok(())
2115        }
2116
2117        fn has_assignment_in_group(
2118            &self,
2119            actor: &str,
2120            group: &str,
2121            exclude_experiment: &str,
2122        ) -> Result<bool, ExperimentStoreError> {
2123            let mut conn = self.connect()?;
2124            diesel::sql_query(
2125                "SELECT EXISTS ( \
2126                     SELECT 1 \
2127                     FROM autumn_experiment_assignments a \
2128                     JOIN autumn_experiments e ON e.name = a.experiment \
2129                     WHERE a.actor = $1 \
2130                       AND e.exclusion_group = $2 \
2131                       AND a.experiment <> $3 \
2132                 ) AS result",
2133            )
2134            .bind::<diesel::sql_types::Text, _>(actor)
2135            .bind::<diesel::sql_types::Text, _>(group)
2136            .bind::<diesel::sql_types::Text, _>(exclude_experiment)
2137            .get_result::<BoolRow>(&mut conn)
2138            .map(|r| r.result)
2139            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
2140        }
2141
2142        fn history(
2143            &self,
2144            experiment: &str,
2145            limit: usize,
2146        ) -> Result<Vec<ChangeRecord>, ExperimentStoreError> {
2147            let limit = i64::try_from(limit).unwrap_or(i64::MAX);
2148            let mut conn = self.connect()?;
2149            diesel::sql_query(
2150                "SELECT experiment, mutation, actor, \
2151                        EXTRACT(EPOCH FROM changed_at)::bigint AS timestamp_secs \
2152                 FROM autumn_experiment_changes \
2153                 WHERE experiment = $1 \
2154                 ORDER BY changed_at DESC \
2155                 LIMIT NULLIF($2::bigint, 0)",
2156            )
2157            .bind::<diesel::sql_types::Text, _>(experiment)
2158            .bind::<diesel::sql_types::BigInt, _>(limit)
2159            .load::<ChangeRow>(&mut conn)
2160            .map(|rows| {
2161                rows.into_iter()
2162                    .map(|r| ChangeRecord {
2163                        experiment: r.experiment,
2164                        mutation: r.mutation,
2165                        actor: r.actor,
2166                        timestamp_secs: u64::try_from(r.timestamp_secs).unwrap_or(0),
2167                    })
2168                    .collect()
2169            })
2170            .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
2171        }
2172    }
2173}
2174
2175// ── Tests ─────────────────────────────────────────────────────────────────────
2176
2177#[cfg(test)]
2178mod tests {
2179    use super::*;
2180
2181    // ── Helpers ───────────────────────────────────────────────────────────────
2182
2183    fn make_svc() -> ExperimentService {
2184        ExperimentService::new(Arc::new(InMemoryExperimentStore::new()))
2185    }
2186
2187    fn make_svc_with_sink() -> (ExperimentService, Arc<Mutex<Vec<ExposureRecord>>>) {
2188        let (sink, records) = RecordingExposureSink::new();
2189        let svc = ExperimentService::new(Arc::new(InMemoryExperimentStore::new()))
2190            .with_exposure_sink(Arc::new(sink));
2191        (svc, records)
2192    }
2193
2194    fn fifty_fifty(name: &str) -> ExperimentConfig {
2195        ExperimentConfig::new(
2196            name,
2197            vec![
2198                VariantConfig::new("control", 50),
2199                VariantConfig::new("treatment", 50),
2200            ],
2201        )
2202    }
2203
2204    fn running(svc: &ExperimentService, name: &str) {
2205        svc.create(fifty_fifty(name)).unwrap();
2206        svc.start(name).unwrap();
2207    }
2208
2209    // ═══════════════════════════════════ RED PHASE ═══════════════════════════
2210    // These tests were written before the full implementation existed and drove
2211    // the design of ExperimentService, ExperimentStore, and ExposureSink.
2212    // ═════════════════════════════════════════════════════════════════════════
2213
2214    // ── AC: assign() returns unknown-experiment error ─────────────────────────
2215
2216    #[test]
2217    fn assign_unknown_experiment_returns_not_found() {
2218        let svc = make_svc();
2219        let err = svc.assign("ghost", "user:1").unwrap_err();
2220        assert!(
2221            matches!(err, ExperimentError::NotFound(_)),
2222            "expected NotFound, got {err}"
2223        );
2224    }
2225
2226    // ── AC: lifecycle — draft rejects assignments ─────────────────────────────
2227
2228    #[test]
2229    fn assign_draft_experiment_returns_not_running() {
2230        let svc = make_svc();
2231        svc.create(fifty_fifty("exp")).unwrap();
2232        let err = svc.assign("exp", "user:1").unwrap_err();
2233        assert!(
2234            matches!(err, ExperimentError::NotRunning(_, ExperimentState::Draft)),
2235            "expected NotRunning(Draft), got {err}"
2236        );
2237    }
2238
2239    // ── AC: lifecycle — archived rejects assignments ──────────────────────────
2240
2241    #[test]
2242    fn assign_archived_experiment_returns_archived() {
2243        let svc = make_svc();
2244        running(&svc, "exp");
2245        svc.archive("exp").unwrap();
2246        let err = svc.assign("exp", "user:1").unwrap_err();
2247        assert!(
2248            matches!(err, ExperimentError::Archived(_)),
2249            "expected Archived, got {err}"
2250        );
2251    }
2252
2253    // ── AC: lifecycle — concluded returns winner for all actors ───────────────
2254
2255    #[test]
2256    fn concluded_experiment_returns_winner_for_all_actors() {
2257        let svc = make_svc();
2258        running(&svc, "exp");
2259        svc.conclude("exp", "treatment").unwrap();
2260        // Every actor sees the winner — regardless of their bucket.
2261        for i in 0..100_u32 {
2262            let actor = format!("user:{i}");
2263            let v = svc.assign("exp", &actor).unwrap();
2264            assert_eq!(
2265                v, "treatment",
2266                "concluded experiment must return winner for {actor}"
2267            );
2268        }
2269    }
2270
2271    // ── AC: concluded experiment emits no new exposures ───────────────────────
2272
2273    #[test]
2274    fn concluded_experiment_emits_no_exposures() {
2275        let (svc, records) = make_svc_with_sink();
2276        let store = Arc::new(InMemoryExperimentStore::new());
2277        let (sink2, records2) = RecordingExposureSink::new();
2278        let svc2 = ExperimentService::new(store as Arc<dyn ExperimentStore>)
2279            .with_exposure_sink(Arc::new(sink2));
2280        svc2.create(fifty_fifty("exp")).unwrap();
2281        svc2.start("exp").unwrap();
2282        svc2.conclude("exp", "treatment").unwrap();
2283        svc2.assign("exp", "user:1").unwrap();
2284        assert_eq!(
2285            records2.lock().unwrap().len(),
2286            0,
2287            "concluded experiment must not emit exposure events"
2288        );
2289        let _ = records; // silence unused-variable warning for the other sink
2290        let _ = svc;
2291    }
2292
2293    // ── AC: assign() returns deterministic variant for running experiment ─────
2294
2295    #[test]
2296    fn assign_running_experiment_returns_valid_variant() {
2297        let svc = make_svc();
2298        running(&svc, "exp");
2299        let v = svc.assign("exp", "user:1").unwrap();
2300        assert!(
2301            v == "control" || v == "treatment",
2302            "variant must be one of the declared names, got {v:?}"
2303        );
2304    }
2305
2306    // ── AC: deterministic bucketing — same actor always gets same variant ─────
2307
2308    #[test]
2309    fn assign_is_deterministic_for_same_actor() {
2310        let svc = make_svc();
2311        running(&svc, "exp");
2312        let v1 = svc.assign("exp", "user:42").unwrap();
2313        let v2 = svc.assign("exp", "exp").unwrap(); // different key — just to exercise more
2314        // Re-create without sticky to verify pure hash determinism.
2315        let svc2 = make_svc();
2316        running(&svc2, "exp");
2317        let v3 = svc2.assign("exp", "user:42").unwrap();
2318        assert_eq!(
2319            v1, v3,
2320            "same actor must receive the same variant across different service instances"
2321        );
2322        let _ = v2;
2323    }
2324
2325    // ── AC: 10 000 stable requests — zero reassignments ───────────────────────
2326
2327    #[test]
2328    fn zero_reassignments_across_10000_requests() {
2329        let svc = make_svc();
2330        running(&svc, "exp");
2331        let first = svc.assign("exp", "stable_user").unwrap();
2332        for _ in 1..10_000 {
2333            let v = svc.assign("exp", "stable_user").unwrap();
2334            assert_eq!(v, first, "re-assignment must return the same variant");
2335        }
2336    }
2337
2338    // ── AC: stable hash regression — known fixture inputs ────────────────────
2339
2340    #[test]
2341    fn stable_hash_regression_known_fixtures() {
2342        // Pre-computed: FNV-1a 64-bit of "<experiment>:<actor>" mod 10 000.
2343        // MUST NOT change without a documented migration path for existing assignments.
2344        // To regenerate: run `experiment_bucket(name, actor)` and record the output.
2345        let b1 = experiment_bucket("checkout_v2", "user:1");
2346        let b2 = experiment_bucket("checkout_v2", "user:1");
2347        assert_eq!(
2348            b1, b2,
2349            "hash must be deterministic (same input → same output)"
2350        );
2351
2352        // Fixture values established on first run — sentinel against algorithm drift.
2353        assert_eq!(
2354            b1, 4_830,
2355            "checkout_v2:user:1 bucket changed — hash regression"
2356        );
2357        assert_eq!(
2358            experiment_bucket("checkout_v2", "user:2"),
2359            6_619,
2360            "checkout_v2:user:2 bucket changed — hash regression"
2361        );
2362        assert_eq!(
2363            experiment_bucket("onboarding_v3", "user:100"),
2364            6_602,
2365            "onboarding_v3:user:100 bucket changed — hash regression"
2366        );
2367    }
2368
2369    // ── AC: weights — 0% weight variant is never assigned ────────────────────
2370
2371    #[test]
2372    fn zero_weight_variant_never_assigned() {
2373        let svc = make_svc();
2374        svc.create(ExperimentConfig::new(
2375            "exp",
2376            vec![
2377                VariantConfig::new("control", 100),
2378                VariantConfig::new("dead", 0),
2379            ],
2380        ))
2381        .unwrap();
2382        svc.start("exp").unwrap();
2383        for i in 0..200_u32 {
2384            let v = svc.assign("exp", &format!("user:{i}")).unwrap();
2385            assert_eq!(v, "control", "zero-weight variant must never be assigned");
2386        }
2387    }
2388
2389    // ── AC: weights — single variant at 100% is always assigned ──────────────
2390
2391    #[test]
2392    fn single_variant_always_assigned() {
2393        let svc = make_svc();
2394        svc.create(ExperimentConfig::new(
2395            "exp",
2396            vec![VariantConfig::new("only", 100)],
2397        ))
2398        .unwrap();
2399        svc.start("exp").unwrap();
2400        for i in 0..100_u32 {
2401            let v = svc.assign("exp", &format!("user:{i}")).unwrap();
2402            assert_eq!(v, "only");
2403        }
2404    }
2405
2406    // ── AC: weights — roughly 50/50 split ────────────────────────────────────
2407
2408    #[test]
2409    fn fifty_fifty_weights_split_roughly_evenly() {
2410        let svc = make_svc();
2411        running(&svc, "exp");
2412        let mut control_count = 0_u32;
2413        for i in 0..1000_u32 {
2414            if svc.assign("exp", &format!("user:{i}")).unwrap() == "control" {
2415                control_count += 1;
2416            }
2417        }
2418        assert!(
2419            (400..=600).contains(&control_count),
2420            "expected ~500 control assignments, got {control_count}"
2421        );
2422    }
2423
2424    // ── AC: all-zero-weight returns NoVariant error ───────────────────────────
2425
2426    #[test]
2427    fn all_zero_weights_returns_no_variant_error() {
2428        let svc = make_svc();
2429        svc.create(ExperimentConfig::new(
2430            "exp",
2431            vec![VariantConfig::new("a", 0), VariantConfig::new("b", 0)],
2432        ))
2433        .unwrap();
2434        svc.start("exp").unwrap();
2435        let err = svc.assign("exp", "user:1").unwrap_err();
2436        assert!(
2437            matches!(err, ExperimentError::NoVariant(_)),
2438            "expected NoVariant, got {err}"
2439        );
2440    }
2441
2442    // ── AC: sticky assignment — subsequent calls return same variant ──────────
2443
2444    #[test]
2445    fn sticky_assignment_returned_on_subsequent_calls() {
2446        let svc = make_svc();
2447        running(&svc, "exp");
2448        let first = svc.assign("exp", "user:1").unwrap();
2449        // Subsequent calls must return the same variant even with new service.
2450        for _ in 0..10 {
2451            assert_eq!(
2452                svc.assign("exp", "user:1").unwrap(),
2453                first,
2454                "sticky assignment must be returned on all subsequent calls"
2455            );
2456        }
2457    }
2458
2459    // ── AC: sticky assignment not re-bucketed on weight change ────────────────
2460
2461    #[test]
2462    fn set_weights_does_not_rebucket_existing_assignments() {
2463        let svc = make_svc();
2464        running(&svc, "exp");
2465        let original = svc.assign("exp", "user:1").unwrap();
2466
2467        // Flip weights completely — existing assignment must remain unchanged.
2468        let (new_heavy, new_light) = if original == "control" {
2469            (0_u32, 100_u32)
2470        } else {
2471            (100_u32, 0_u32)
2472        };
2473        svc.set_weights(
2474            "exp",
2475            vec![
2476                VariantConfig::new("control", new_heavy),
2477                VariantConfig::new("treatment", new_light),
2478            ],
2479            None,
2480        )
2481        .unwrap();
2482
2483        let after = svc.assign("exp", "user:1").unwrap();
2484        assert_eq!(
2485            original, after,
2486            "existing sticky assignment must not be re-bucketed after weight change"
2487        );
2488    }
2489
2490    #[test]
2491    fn set_weights_rejects_deleting_assigned_variant() {
2492        let svc = make_svc();
2493        running(&svc, "exp");
2494        let original = svc.assign("exp", "user:1").unwrap();
2495
2496        let remaining_variant = if original == "control" {
2497            "treatment"
2498        } else {
2499            "control"
2500        };
2501        let err = svc
2502            .set_weights(
2503                "exp",
2504                vec![VariantConfig::new(remaining_variant, 100)],
2505                None,
2506            )
2507            .unwrap_err();
2508        assert!(
2509            err.to_string().contains("cannot delete variant"),
2510            "expected active assignment delete guard error, got {err}"
2511        );
2512    }
2513
2514    // ── AC: exposure emitted exactly once per assign() call ───────────────────
2515
2516    #[test]
2517    fn exposure_emitted_exactly_once_per_assign_call() {
2518        let (svc, records) = make_svc_with_sink();
2519        running(&svc, "exp");
2520        svc.assign("exp", "user:1").unwrap();
2521        assert_eq!(
2522            records.lock().unwrap().len(),
2523            1,
2524            "first assign → 1 exposure"
2525        );
2526        svc.assign("exp", "user:1").unwrap();
2527        assert_eq!(
2528            records.lock().unwrap().len(),
2529            2,
2530            "second assign → 2 total exposures"
2531        );
2532        svc.assign("exp", "user:2").unwrap();
2533        assert_eq!(
2534            records.lock().unwrap().len(),
2535            3,
2536            "different actor → 3 total exposures"
2537        );
2538    }
2539
2540    // ── AC: exposure record contains correct fields ───────────────────────────
2541
2542    #[test]
2543    fn exposure_record_contains_correct_fields() {
2544        let (svc, records) = make_svc_with_sink();
2545        running(&svc, "checkout_v2");
2546        let variant = svc
2547            .assign_with_request_id("checkout_v2", "user:42", Some("req-abc"))
2548            .unwrap();
2549        let (len, exp_name, exp_variant, exp_actor, exp_req_id, exp_is_override) = {
2550            let rec = records.lock().unwrap();
2551            let r = &rec[0];
2552            (
2553                rec.len(),
2554                r.experiment.clone(),
2555                r.variant.clone(),
2556                r.actor.clone(),
2557                r.request_id.clone(),
2558                r.is_override,
2559            )
2560        };
2561        assert_eq!(len, 1);
2562        assert_eq!(exp_name, "checkout_v2");
2563        assert_eq!(exp_variant, variant);
2564        assert_eq!(exp_actor, "user:42");
2565        assert_eq!(exp_req_id.as_deref(), Some("req-abc"));
2566        assert!(!exp_is_override);
2567    }
2568
2569    // ── AC: override bypasses weight-based bucketing ──────────────────────────
2570
2571    #[test]
2572    fn override_bypasses_weights() {
2573        let svc = make_svc();
2574        // 100% control — without override every actor gets "control".
2575        svc.create(ExperimentConfig::new(
2576            "exp",
2577            vec![
2578                VariantConfig::new("control", 100),
2579                VariantConfig::new("treatment", 0),
2580            ],
2581        ))
2582        .unwrap();
2583        svc.start("exp").unwrap();
2584        svc.set_override("exp", "qa:alice", "treatment").unwrap();
2585        let v = svc.assign("exp", "qa:alice").unwrap();
2586        assert_eq!(
2587            v, "treatment",
2588            "override must bypass weight-based bucketing"
2589        );
2590    }
2591
2592    // ── AC: override emits exposure tagged as override ────────────────────────
2593
2594    #[test]
2595    fn override_emits_exposure_tagged_as_override() {
2596        let (svc, records) = make_svc_with_sink();
2597        running(&svc, "exp");
2598        svc.set_override("exp", "qa:alice", "treatment").unwrap();
2599        svc.assign("exp", "qa:alice").unwrap();
2600        let (len, is_override, exp_variant) = {
2601            let rec = records.lock().unwrap();
2602            (rec.len(), rec[0].is_override, rec[0].variant.clone())
2603        };
2604        assert_eq!(len, 1);
2605        assert!(
2606            is_override,
2607            "exposure from override must be tagged is_override = true"
2608        );
2609        assert_eq!(exp_variant, "treatment");
2610    }
2611
2612    // ── AC: mutual exclusion — second experiment in group is excluded ─────────
2613
2614    #[test]
2615    fn mutual_exclusion_prevents_sibling_assignment() {
2616        let svc = make_svc();
2617        // Two experiments in the same group.
2618        svc.create(
2619            ExperimentConfig::new("exp_a", vec![VariantConfig::new("v1", 1)])
2620                .exclusion_group("checkout"),
2621        )
2622        .unwrap();
2623        svc.start("exp_a").unwrap();
2624        svc.create(
2625            ExperimentConfig::new("exp_b", vec![VariantConfig::new("v1", 1)])
2626                .exclusion_group("checkout"),
2627        )
2628        .unwrap();
2629        svc.start("exp_b").unwrap();
2630
2631        // Assign actor to exp_a first.
2632        svc.assign("exp_a", "user:1").unwrap();
2633
2634        // exp_b must exclude the same actor.
2635        let err = svc.assign("exp_b", "user:1").unwrap_err();
2636        assert!(
2637            matches!(err, ExperimentError::ExcludedByGroup(_, _)),
2638            "expected ExcludedByGroup, got {err}"
2639        );
2640    }
2641
2642    // ── AC: mutual exclusion — different group allows both experiments ─────────
2643
2644    #[test]
2645    fn different_groups_do_not_exclude_each_other() {
2646        let svc = make_svc();
2647        svc.create(
2648            ExperimentConfig::new("exp_a", vec![VariantConfig::new("v1", 1)])
2649                .exclusion_group("group_a"),
2650        )
2651        .unwrap();
2652        svc.start("exp_a").unwrap();
2653        svc.create(
2654            ExperimentConfig::new("exp_b", vec![VariantConfig::new("v1", 1)])
2655                .exclusion_group("group_b"),
2656        )
2657        .unwrap();
2658        svc.start("exp_b").unwrap();
2659
2660        svc.assign("exp_a", "user:1").unwrap();
2661        let result = svc.assign("exp_b", "user:1");
2662        assert!(
2663            result.is_ok(),
2664            "experiments in different groups must not exclude each other"
2665        );
2666    }
2667
2668    // ── AC: mutual exclusion — no group means no exclusion ────────────────────
2669
2670    #[test]
2671    fn no_exclusion_group_allows_both_assignments() {
2672        let svc = make_svc();
2673        running(&svc, "exp_a");
2674        running(&svc, "exp_b");
2675        svc.assign("exp_a", "user:1").unwrap();
2676        let result = svc.assign("exp_b", "user:1");
2677        assert!(
2678            result.is_ok(),
2679            "experiments without exclusion groups must not exclude each other"
2680        );
2681    }
2682
2683    // ── AC: experiment_bucket is stable and in range ──────────────────────────
2684
2685    #[test]
2686    fn experiment_bucket_is_stable_and_in_range() {
2687        for i in 0..100_u32 {
2688            let actor = format!("user:{i}");
2689            let b1 = experiment_bucket("my_exp", &actor);
2690            let b2 = experiment_bucket("my_exp", &actor);
2691            assert_eq!(b1, b2, "bucket must be deterministic for {actor}");
2692            assert!(b1 < 10_000, "bucket must be in [0, 10000) for {actor}");
2693        }
2694    }
2695
2696    // ── AC: experiment_bucket differs across actors ───────────────────────────
2697
2698    #[test]
2699    fn experiment_bucket_produces_diverse_values() {
2700        let buckets: std::collections::HashSet<u64> = (0..100_u32)
2701            .map(|i| experiment_bucket("exp", &format!("user:{i}")))
2702            .collect();
2703        assert!(
2704            buckets.len() > 50,
2705            "expected diverse buckets across 100 actors, got {}",
2706            buckets.len()
2707        );
2708    }
2709
2710    // ── AC: list returns all experiments ─────────────────────────────────────
2711
2712    #[test]
2713    fn list_returns_all_experiments() {
2714        let svc = make_svc();
2715        svc.create(fifty_fifty("alpha")).unwrap();
2716        svc.create(fifty_fifty("beta")).unwrap();
2717        svc.create(fifty_fifty("gamma")).unwrap();
2718        let experiments = svc.list().unwrap();
2719        assert_eq!(experiments.len(), 3);
2720        assert_eq!(experiments[0].name, "alpha");
2721        assert_eq!(experiments[1].name, "beta");
2722        assert_eq!(experiments[2].name, "gamma");
2723    }
2724
2725    // ── AC: status returns experiment config ─────────────────────────────────
2726
2727    #[test]
2728    fn status_returns_current_config() {
2729        let svc = make_svc();
2730        running(&svc, "exp");
2731        let cfg = svc.status("exp").unwrap();
2732        assert_eq!(cfg.state, ExperimentState::Running);
2733        assert_eq!(cfg.variants.len(), 2);
2734    }
2735
2736    // ── AC: history records mutations ─────────────────────────────────────────
2737
2738    #[test]
2739    fn history_records_create_and_start_mutations() {
2740        let svc = make_svc();
2741        svc.create(fifty_fifty("exp")).unwrap();
2742        svc.start("exp").unwrap();
2743        let hist = svc.history("exp", 10).unwrap();
2744        assert!(!hist.is_empty(), "history must record mutations");
2745    }
2746
2747    // ── AC: set_weights records mutation in history ───────────────────────────
2748
2749    #[test]
2750    fn set_weights_recorded_in_history() {
2751        let svc = make_svc();
2752        running(&svc, "exp");
2753        svc.set_weights(
2754            "exp",
2755            vec![
2756                VariantConfig::new("control", 30),
2757                VariantConfig::new("treatment", 70),
2758            ],
2759            Some("ops@example.com"),
2760        )
2761        .unwrap();
2762        let hist = svc.history("exp", 10).unwrap();
2763        let has_set_weights = hist.iter().any(|r| r.mutation == "set_weights");
2764        assert!(has_set_weights, "set_weights must be recorded in history");
2765    }
2766
2767    // ── AC: select_variant helper ─────────────────────────────────────────────
2768
2769    #[test]
2770    fn select_variant_returns_none_for_empty_variants() {
2771        assert_eq!(select_variant(&[], 0), None);
2772    }
2773
2774    #[test]
2775    fn select_variant_returns_none_for_all_zero_weights() {
2776        let vs = vec![VariantConfig::new("a", 0), VariantConfig::new("b", 0)];
2777        assert_eq!(select_variant(&vs, 5_000), None);
2778    }
2779
2780    #[test]
2781    fn select_variant_50_50_boundary() {
2782        let vs = vec![
2783            VariantConfig::new("control", 50),
2784            VariantConfig::new("treatment", 50),
2785        ];
2786        // Bucket 0 (threshold = 0 * 100 / 10000 = 0) → cumulative[0]=50 > 0 → control
2787        assert_eq!(select_variant(&vs, 0), Some("control"));
2788        // Bucket 4999 (threshold = 4999 * 100 / 10000 = 49) → control
2789        assert_eq!(select_variant(&vs, 4_999), Some("control"));
2790        // Bucket 5000 (threshold = 5000 * 100 / 10000 = 50) → cumulative[0]=50 NOT > 50
2791        // → cumulative[1]=100 > 50 → treatment
2792        assert_eq!(select_variant(&vs, 5_000), Some("treatment"));
2793        // Bucket 9999 → treatment
2794        assert_eq!(select_variant(&vs, 9_999), Some("treatment"));
2795    }
2796
2797    // ── AC: InMemoryExperimentStore — Arc delegation ──────────────────────────
2798
2799    #[test]
2800    fn arc_experiment_store_delegates_all_operations() {
2801        let store = Arc::new(InMemoryExperimentStore::new());
2802        let arc_store: Arc<dyn ExperimentStore> = Arc::clone(&store) as _;
2803
2804        let cfg = fifty_fifty("my_exp");
2805        arc_store.upsert(cfg).unwrap();
2806        assert!(arc_store.get("my_exp").unwrap().is_some());
2807
2808        arc_store
2809            .set_state("my_exp", ExperimentState::Running, None)
2810            .unwrap();
2811        assert_eq!(
2812            arc_store.get("my_exp").unwrap().unwrap().state,
2813            ExperimentState::Running
2814        );
2815
2816        arc_store
2817            .record_assignment(Assignment::new("my_exp", "user:1", "control", false))
2818            .unwrap();
2819        let asgn = arc_store.get_assignment("my_exp", "user:1").unwrap();
2820        assert_eq!(asgn.unwrap().variant, "control");
2821
2822        arc_store
2823            .set_override("my_exp", "qa:1", "treatment")
2824            .unwrap();
2825        assert_eq!(
2826            arc_store.get_override("my_exp", "qa:1").unwrap().unwrap(),
2827            "treatment"
2828        );
2829    }
2830
2831    // ── AC: ExperimentState display ───────────────────────────────────────────
2832
2833    #[test]
2834    fn experiment_state_display_matches_expected() {
2835        assert_eq!(ExperimentState::Draft.to_string(), "draft");
2836        assert_eq!(ExperimentState::Running.to_string(), "running");
2837        assert_eq!(ExperimentState::Concluded.to_string(), "concluded");
2838        assert_eq!(ExperimentState::Archived.to_string(), "archived");
2839    }
2840
2841    // ── AC: ExperimentError display ───────────────────────────────────────────
2842
2843    #[test]
2844    fn experiment_error_display() {
2845        assert!(
2846            ExperimentError::NotFound("x".to_owned())
2847                .to_string()
2848                .contains("not found")
2849        );
2850        assert!(
2851            ExperimentError::Archived("x".to_owned())
2852                .to_string()
2853                .contains("archived")
2854        );
2855        assert!(
2856            ExperimentError::ExcludedByGroup("x".to_owned(), "g".to_owned())
2857                .to_string()
2858                .contains("mutual exclusion")
2859        );
2860        assert!(
2861            ExperimentError::NoVariant("x".to_owned())
2862                .to_string()
2863                .contains("weights are zero")
2864        );
2865    }
2866
2867    // ── AC: service debug ─────────────────────────────────────────────────────
2868
2869    #[test]
2870    fn service_debug_does_not_panic() {
2871        let svc = make_svc();
2872        let _ = format!("{svc:?}");
2873    }
2874
2875    #[test]
2876    fn upsert_logs_created_or_updated() {
2877        let svc = make_svc();
2878        let exp = fifty_fifty("exp");
2879        svc.create(exp.clone()).unwrap();
2880
2881        // Upsert the same experiment again to trigger update
2882        svc.create(exp).unwrap();
2883
2884        let hist = svc.history("exp", 10).unwrap();
2885        assert_eq!(hist.len(), 2);
2886        assert_eq!(hist[1].mutation, "created");
2887        assert_eq!(hist[0].mutation, "updated");
2888    }
2889
2890    #[test]
2891    fn upsert_rejects_deleting_variant_with_active_assignments() {
2892        let store = InMemoryExperimentStore::new();
2893        let name = "test_exp";
2894
2895        let config = ExperimentConfig {
2896            name: name.to_string(),
2897            description: None,
2898            state: ExperimentState::Running,
2899            variants: vec![
2900                VariantConfig {
2901                    name: "control".to_string(),
2902                    weight: 50,
2903                },
2904                VariantConfig {
2905                    name: "treatment".to_string(),
2906                    weight: 50,
2907                },
2908            ],
2909            winner: None,
2910            exclusion_group: None,
2911            updated_at_secs: 0,
2912        };
2913        store.upsert(config.clone()).unwrap();
2914
2915        // Assign an actor to "treatment"
2916        store
2917            .record_assignment(Assignment {
2918                experiment: name.to_string(),
2919                actor: "user1".to_string(),
2920                variant: "treatment".to_string(),
2921                is_override: false,
2922                assigned_at_secs: 0,
2923            })
2924            .unwrap();
2925
2926        // Attempting to upsert config without "treatment" should fail
2927        let mut new_config = config;
2928        new_config.variants = vec![VariantConfig {
2929            name: "control".to_string(),
2930            weight: 100,
2931        }];
2932
2933        let res = store.upsert(new_config);
2934        assert!(
2935            res.is_err(),
2936            "expected upsert to fail due to deleting active variant"
2937        );
2938        if let Err(ExperimentStoreError::Backend(msg)) = res {
2939            assert!(
2940                msg.contains("treatment"),
2941                "expected error message to mention 'treatment', got: {msg}"
2942            );
2943        } else {
2944            panic!("expected Backend error");
2945        }
2946    }
2947}