1use std::collections::HashMap;
80use std::sync::{Arc, Mutex, RwLock};
81use std::time::{SystemTime, UNIX_EPOCH};
82
83use serde::{Deserialize, Serialize};
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub enum ExperimentState {
94 Draft,
96 Running,
98 Concluded,
100 Archived,
102}
103
104impl std::fmt::Display for ExperimentState {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 Self::Draft => write!(f, "draft"),
108 Self::Running => write!(f, "running"),
109 Self::Concluded => write!(f, "concluded"),
110 Self::Archived => write!(f, "archived"),
111 }
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
123pub struct VariantConfig {
124 pub name: String,
126 pub weight: u32,
128}
129
130impl VariantConfig {
131 #[must_use]
133 pub fn new(name: impl Into<String>, weight: u32) -> Self {
134 Self {
135 name: name.into(),
136 weight,
137 }
138 }
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct ExperimentConfig {
146 pub name: String,
148 pub description: Option<String>,
150 pub state: ExperimentState,
152 pub variants: Vec<VariantConfig>,
154 pub winner: Option<String>,
156 pub exclusion_group: Option<String>,
159 pub updated_at_secs: u64,
161}
162
163impl ExperimentConfig {
164 #[must_use]
166 pub fn new(name: impl Into<String>, variants: Vec<VariantConfig>) -> Self {
167 Self {
168 name: name.into(),
169 description: None,
170 state: ExperimentState::Draft,
171 variants,
172 winner: None,
173 exclusion_group: None,
174 updated_at_secs: now_secs(),
175 }
176 }
177
178 #[must_use]
180 pub fn description(mut self, desc: impl Into<String>) -> Self {
181 self.description = Some(desc.into());
182 self
183 }
184
185 #[must_use]
187 pub fn exclusion_group(mut self, group: impl Into<String>) -> Self {
188 self.exclusion_group = Some(group.into());
189 self
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Assignment {
201 pub experiment: String,
203 pub actor: String,
205 pub variant: String,
207 pub is_override: bool,
210 pub assigned_at_secs: u64,
212}
213
214impl Assignment {
215 fn new(experiment: &str, actor: &str, variant: &str, is_override: bool) -> Self {
216 Self {
217 experiment: experiment.to_owned(),
218 actor: actor.to_owned(),
219 variant: variant.to_owned(),
220 is_override,
221 assigned_at_secs: now_secs(),
222 }
223 }
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct ChangeRecord {
231 pub experiment: String,
233 pub mutation: String,
235 pub actor: Option<String>,
237 pub timestamp_secs: u64,
239}
240
241impl ChangeRecord {
242 fn now(experiment: &str, mutation: impl Into<String>, actor: Option<&str>) -> Self {
243 Self {
244 experiment: experiment.to_owned(),
245 mutation: mutation.into(),
246 actor: actor.map(str::to_owned),
247 timestamp_secs: now_secs(),
248 }
249 }
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct ExposureRecord {
260 pub experiment: String,
262 pub variant: String,
264 pub actor: String,
266 pub request_id: Option<String>,
268 pub is_override: bool,
270 pub timestamp_secs: u64,
272}
273
274pub trait ExposureSink: Send + Sync + 'static {
303 fn record(&self, exposure: ExposureRecord);
305}
306
307pub struct TracingExposureSink;
313
314impl ExposureSink for TracingExposureSink {
315 fn record(&self, exposure: ExposureRecord) {
316 tracing::info!(
317 experiment = %exposure.experiment,
318 variant = %exposure.variant,
319 actor = %exposure.actor,
320 request_id = exposure.request_id.as_deref().unwrap_or(""),
321 is_override = %exposure.is_override,
322 "experiment_exposure"
323 );
324 }
325}
326
327pub struct NoOpExposureSink;
332
333impl ExposureSink for NoOpExposureSink {
334 fn record(&self, _: ExposureRecord) {}
335}
336
337pub struct RecordingExposureSink {
361 records: Arc<Mutex<Vec<ExposureRecord>>>,
362}
363
364impl Default for RecordingExposureSink {
365 fn default() -> Self {
366 Self::new().0
367 }
368}
369
370impl RecordingExposureSink {
371 #[must_use]
374 pub fn new() -> (Self, Arc<Mutex<Vec<ExposureRecord>>>) {
375 let records = Arc::new(Mutex::new(Vec::new()));
376 let sink = Self {
377 records: Arc::clone(&records),
378 };
379 (sink, records)
380 }
381}
382
383impl ExposureSink for RecordingExposureSink {
384 fn record(&self, exposure: ExposureRecord) {
385 self.records.lock().unwrap().push(exposure);
386 }
387}
388
389#[derive(Debug, thiserror::Error)]
393pub enum ExperimentStoreError {
394 #[error("experiment store backend error: {0}")]
396 Backend(String),
397}
398
399pub trait ExperimentStore: Send + Sync + 'static {
406 fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError>;
412
413 fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError>;
419
420 fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError>;
426
427 fn set_state(
435 &self,
436 name: &str,
437 state: ExperimentState,
438 winner: Option<&str>,
439 ) -> Result<(), ExperimentStoreError>;
440
441 fn set_variants(
449 &self,
450 name: &str,
451 variants: Vec<VariantConfig>,
452 actor: Option<&str>,
453 ) -> Result<(), ExperimentStoreError>;
454
455 fn get_assignment(
461 &self,
462 experiment: &str,
463 actor: &str,
464 ) -> Result<Option<Assignment>, ExperimentStoreError>;
465
466 fn record_assignment(&self, assignment: Assignment) -> Result<String, ExperimentStoreError>;
477
478 fn get_override(
484 &self,
485 experiment: &str,
486 actor: &str,
487 ) -> Result<Option<String>, ExperimentStoreError>;
488
489 fn set_override(
495 &self,
496 experiment: &str,
497 actor: &str,
498 variant: &str,
499 ) -> Result<(), ExperimentStoreError>;
500
501 fn has_assignment_in_group(
511 &self,
512 actor: &str,
513 group: &str,
514 exclude_experiment: &str,
515 ) -> Result<bool, ExperimentStoreError>;
516
517 fn history(
524 &self,
525 experiment: &str,
526 limit: usize,
527 ) -> Result<Vec<ChangeRecord>, ExperimentStoreError>;
528}
529
530impl<T: ExperimentStore> ExperimentStore for Arc<T> {
531 fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError> {
532 (**self).get(name)
533 }
534 fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError> {
535 (**self).list()
536 }
537 fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError> {
538 (**self).upsert(config)
539 }
540 fn set_state(
541 &self,
542 name: &str,
543 state: ExperimentState,
544 winner: Option<&str>,
545 ) -> Result<(), ExperimentStoreError> {
546 (**self).set_state(name, state, winner)
547 }
548 fn set_variants(
549 &self,
550 name: &str,
551 variants: Vec<VariantConfig>,
552 actor: Option<&str>,
553 ) -> Result<(), ExperimentStoreError> {
554 (**self).set_variants(name, variants, actor)
555 }
556 fn get_assignment(
557 &self,
558 experiment: &str,
559 actor: &str,
560 ) -> Result<Option<Assignment>, ExperimentStoreError> {
561 (**self).get_assignment(experiment, actor)
562 }
563 fn record_assignment(&self, assignment: Assignment) -> Result<String, ExperimentStoreError> {
564 (**self).record_assignment(assignment)
565 }
566 fn get_override(
567 &self,
568 experiment: &str,
569 actor: &str,
570 ) -> Result<Option<String>, ExperimentStoreError> {
571 (**self).get_override(experiment, actor)
572 }
573 fn set_override(
574 &self,
575 experiment: &str,
576 actor: &str,
577 variant: &str,
578 ) -> Result<(), ExperimentStoreError> {
579 (**self).set_override(experiment, actor, variant)
580 }
581 fn has_assignment_in_group(
582 &self,
583 actor: &str,
584 group: &str,
585 exclude_experiment: &str,
586 ) -> Result<bool, ExperimentStoreError> {
587 (**self).has_assignment_in_group(actor, group, exclude_experiment)
588 }
589 fn history(
590 &self,
591 experiment: &str,
592 limit: usize,
593 ) -> Result<Vec<ChangeRecord>, ExperimentStoreError> {
594 (**self).history(experiment, limit)
595 }
596}
597
598#[derive(Default)]
601struct StoreInner {
602 experiments: HashMap<String, ExperimentConfig>,
603 assignments: HashMap<(String, String), Assignment>,
604 overrides: HashMap<(String, String), String>,
605 changes: HashMap<String, Vec<ChangeRecord>>,
606}
607
608#[derive(Default)]
616pub struct InMemoryExperimentStore {
617 inner: RwLock<StoreInner>,
618}
619
620impl InMemoryExperimentStore {
621 #[must_use]
623 pub fn new() -> Self {
624 Self::default()
625 }
626}
627
628impl ExperimentStore for InMemoryExperimentStore {
629 fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError> {
630 Ok(self.inner.read().unwrap().experiments.get(name).cloned())
631 }
632
633 fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError> {
634 let mut exps: Vec<ExperimentConfig> = {
635 let inner = self.inner.read().unwrap();
636 inner.experiments.values().cloned().collect()
637 };
638 exps.sort_by(|a, b| a.name.cmp(&b.name));
639 Ok(exps)
640 }
641
642 fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError> {
643 let name = config.name.clone();
644 {
645 let mut inner = self.inner.write().unwrap();
646 let active_variants: std::collections::HashSet<String> = inner
647 .assignments
648 .values()
649 .filter(|a| a.experiment == name)
650 .map(|a| a.variant.clone())
651 .collect();
652
653 let new_variants: std::collections::HashSet<&str> =
654 config.variants.iter().map(|v| v.name.as_str()).collect();
655
656 for variant in active_variants {
657 if !new_variants.contains(variant.as_str()) {
658 return Err(ExperimentStoreError::Backend(format!(
659 "cannot delete variant '{variant}' because it has active assignments"
660 )));
661 }
662 }
663
664 let exists = inner.experiments.contains_key(&name);
665 inner.experiments.insert(name.clone(), config);
666 let mutation = if exists { "updated" } else { "created" };
667 inner
668 .changes
669 .entry(name.clone())
670 .or_default()
671 .push(ChangeRecord::now(&name, mutation, None));
672 }
673 Ok(())
674 }
675
676 fn set_state(
677 &self,
678 name: &str,
679 state: ExperimentState,
680 winner: Option<&str>,
681 ) -> Result<(), ExperimentStoreError> {
682 {
683 let mut inner = self.inner.write().unwrap();
684 if let Some(exp) = inner.experiments.get_mut(name) {
685 exp.state = state;
686 if let Some(w) = winner {
687 exp.winner = Some(w.to_owned());
688 }
689 exp.updated_at_secs = now_secs();
690 }
691 inner
692 .changes
693 .entry(name.to_owned())
694 .or_default()
695 .push(ChangeRecord::now(
696 name,
697 winner.map_or_else(|| format!("state={state}"), |w| format!("concluded={w}")),
698 None,
699 ));
700 }
701 Ok(())
702 }
703
704 fn set_variants(
705 &self,
706 name: &str,
707 variants: Vec<VariantConfig>,
708 actor: Option<&str>,
709 ) -> Result<(), ExperimentStoreError> {
710 {
711 let mut inner = self.inner.write().unwrap();
712 let active_variants: std::collections::HashSet<String> = inner
713 .assignments
714 .values()
715 .filter(|a| a.experiment == name)
716 .map(|a| a.variant.clone())
717 .collect();
718
719 let new_variants: std::collections::HashSet<&str> =
720 variants.iter().map(|v| v.name.as_str()).collect();
721
722 for variant in active_variants {
723 if !new_variants.contains(variant.as_str()) {
724 return Err(ExperimentStoreError::Backend(format!(
725 "cannot delete variant '{variant}' because it has active assignments"
726 )));
727 }
728 }
729
730 if let Some(exp) = inner.experiments.get_mut(name) {
731 exp.variants = variants;
732 exp.updated_at_secs = now_secs();
733 }
734 inner
735 .changes
736 .entry(name.to_owned())
737 .or_default()
738 .push(ChangeRecord::now(name, "set_weights", actor));
739 }
740 Ok(())
741 }
742
743 fn get_assignment(
744 &self,
745 experiment: &str,
746 actor: &str,
747 ) -> Result<Option<Assignment>, ExperimentStoreError> {
748 let inner = self.inner.read().unwrap();
749 Ok(inner
750 .assignments
751 .get(&(experiment.to_owned(), actor.to_owned()))
752 .cloned())
753 }
754
755 fn record_assignment(&self, assignment: Assignment) -> Result<String, ExperimentStoreError> {
756 let mut inner = self.inner.write().unwrap();
757
758 if !assignment.is_override {
759 if let Some(group) = inner
761 .experiments
762 .get(&assignment.experiment)
763 .and_then(|c| c.exclusion_group.as_ref())
764 {
765 for (exp_name, exp_config) in &inner.experiments {
767 if exp_name == &assignment.experiment {
768 continue;
769 }
770 if exp_config.exclusion_group.as_ref() == Some(group)
771 && inner
772 .assignments
773 .contains_key(&(exp_name.clone(), assignment.actor.clone()))
774 {
775 return Err(ExperimentStoreError::Backend(format!(
776 "ExcludedByGroup:{group}"
777 )));
778 }
779 }
780 }
781 }
782
783 let variant = assignment.variant.clone();
784 let key = (assignment.experiment.clone(), assignment.actor.clone());
785 inner.assignments.insert(key, assignment);
786 drop(inner);
787 Ok(variant)
788 }
789
790 fn get_override(
791 &self,
792 experiment: &str,
793 actor: &str,
794 ) -> Result<Option<String>, ExperimentStoreError> {
795 let inner = self.inner.read().unwrap();
796 Ok(inner
797 .overrides
798 .get(&(experiment.to_owned(), actor.to_owned()))
799 .cloned())
800 }
801
802 fn set_override(
803 &self,
804 experiment: &str,
805 actor: &str,
806 variant: &str,
807 ) -> Result<(), ExperimentStoreError> {
808 let key = (experiment.to_owned(), actor.to_owned());
809 self.inner
810 .write()
811 .unwrap()
812 .overrides
813 .insert(key, variant.to_owned());
814 Ok(())
815 }
816
817 fn has_assignment_in_group(
818 &self,
819 actor: &str,
820 group: &str,
821 exclude_experiment: &str,
822 ) -> Result<bool, ExperimentStoreError> {
823 let inner = self.inner.read().unwrap();
824 for (exp_name, config) in &inner.experiments {
825 if exp_name == exclude_experiment {
826 continue;
827 }
828 if config.exclusion_group.as_deref() != Some(group) {
829 continue;
830 }
831 if inner
832 .assignments
833 .contains_key(&(exp_name.clone(), actor.to_owned()))
834 {
835 return Ok(true);
836 }
837 }
838 Ok(false)
839 }
840
841 fn history(
842 &self,
843 experiment: &str,
844 limit: usize,
845 ) -> Result<Vec<ChangeRecord>, ExperimentStoreError> {
846 let records = {
847 let inner = self.inner.read().unwrap();
848 inner
849 .changes
850 .get(experiment)
851 .map(|v| {
852 if limit == 0 {
853 v.clone()
854 } else {
855 v.iter().rev().take(limit).cloned().collect()
856 }
857 })
858 .unwrap_or_default()
859 };
860 Ok(records)
861 }
862}
863
864fn now_secs() -> u64 {
867 SystemTime::now()
868 .duration_since(UNIX_EPOCH)
869 .unwrap_or_default()
870 .as_secs()
871}
872
873fn fnv1a_64(data: &[u8]) -> u64 {
877 const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
878 const FNV_PRIME: u64 = 1_099_511_628_211;
879 let mut hash = FNV_OFFSET;
880 for &byte in data {
881 #[allow(clippy::cast_lossless)]
882 {
883 hash ^= byte as u64;
884 }
885 hash = hash.wrapping_mul(FNV_PRIME);
886 }
887 hash
888}
889
890#[must_use]
906pub fn experiment_bucket(experiment: &str, actor_id: &str) -> u64 {
907 let key = format!("{experiment}:{actor_id}");
908 fnv1a_64(key.as_bytes()) % 10_000
909}
910
911fn select_variant(variants: &[VariantConfig], bucket: u64) -> Option<&str> {
915 let total_weight: u64 = variants.iter().map(|v| u64::from(v.weight)).sum();
916 if total_weight == 0 {
917 return None;
918 }
919 let threshold = bucket * total_weight / 10_000;
920 let mut cumulative: u64 = 0;
921 for v in variants {
922 cumulative += u64::from(v.weight);
923 if threshold < cumulative {
924 return Some(&v.name);
925 }
926 }
927 variants.last().map(|v| v.name.as_str())
928}
929
930#[derive(Debug, thiserror::Error)]
934pub enum ExperimentError {
935 #[error("experiment '{0}' not found")]
937 NotFound(String),
938
939 #[error("experiment '{0}' is not running (state: {1})")]
941 NotRunning(String, ExperimentState),
942
943 #[error("experiment '{0}' is archived")]
945 Archived(String),
946
947 #[error("actor excluded from experiment '{0}' by mutual exclusion group '{1}'")]
949 ExcludedByGroup(String, String),
950
951 #[error("experiment '{0}' has no assignable variant (all weights are zero)")]
953 NoVariant(String),
954
955 #[error(transparent)]
957 Store(#[from] ExperimentStoreError),
958}
959
960fn validate_variants(variants: &[VariantConfig]) -> Result<(), ExperimentError> {
963 let mut seen = std::collections::HashSet::new();
964 for v in variants {
965 if v.name.trim().is_empty() {
966 return Err(ExperimentError::NoVariant(
967 "variant name must not be empty".into(),
968 ));
969 }
970 if !seen.insert(v.name.as_str()) {
971 return Err(ExperimentError::NoVariant(format!(
972 "duplicate variant name: '{}'",
973 v.name
974 )));
975 }
976 }
977 Ok(())
978}
979
980#[derive(Clone)]
1009pub struct ExperimentService {
1010 store: Arc<dyn ExperimentStore>,
1011 exposure_sink: Arc<dyn ExposureSink>,
1012}
1013
1014impl std::fmt::Debug for ExperimentService {
1015 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1016 f.debug_struct("ExperimentService").finish_non_exhaustive()
1017 }
1018}
1019
1020impl ExperimentService {
1021 #[must_use]
1024 pub fn new(store: Arc<dyn ExperimentStore>) -> Self {
1025 Self {
1026 store,
1027 exposure_sink: Arc::new(TracingExposureSink),
1028 }
1029 }
1030
1031 #[must_use]
1033 pub fn with_exposure_sink(mut self, sink: Arc<dyn ExposureSink>) -> Self {
1034 self.exposure_sink = sink;
1035 self
1036 }
1037
1038 pub fn assign(&self, experiment: &str, actor: &str) -> Result<String, ExperimentError> {
1061 self.assign_with_request_id(experiment, actor, None)
1062 }
1063
1064 pub fn assign_with_request_id(
1073 &self,
1074 experiment: &str,
1075 actor: &str,
1076 request_id: Option<&str>,
1077 ) -> Result<String, ExperimentError> {
1078 let config = self
1079 .store
1080 .get(experiment)?
1081 .ok_or_else(|| ExperimentError::NotFound(experiment.to_owned()))?;
1082
1083 match config.state {
1084 ExperimentState::Archived => {
1085 return Err(ExperimentError::Archived(experiment.to_owned()));
1086 }
1087 ExperimentState::Concluded => {
1088 let winner = config
1090 .winner
1091 .ok_or_else(|| ExperimentError::NoVariant(experiment.to_owned()))?;
1092 return Ok(winner);
1093 }
1094 ExperimentState::Draft => {
1095 return Err(ExperimentError::NotRunning(
1096 experiment.to_owned(),
1097 ExperimentState::Draft,
1098 ));
1099 }
1100 ExperimentState::Running => {}
1101 }
1102
1103 if let Some(override_variant) = self.store.get_override(experiment, actor)?
1106 && config.variants.iter().any(|v| v.name == override_variant)
1107 {
1108 let sticky = Assignment::new(experiment, actor, &override_variant, true);
1109 if let Err(e) = self.store.record_assignment(sticky) {
1110 match e {
1111 ExperimentStoreError::Backend(msg) if msg.starts_with("ExcludedByGroup:") => {
1112 let group = msg
1113 .strip_prefix("ExcludedByGroup:")
1114 .unwrap_or("")
1115 .trim()
1116 .to_owned();
1117 return Err(ExperimentError::ExcludedByGroup(
1118 experiment.to_owned(),
1119 group,
1120 ));
1121 }
1122 other @ ExperimentStoreError::Backend(_) => {
1123 return Err(ExperimentError::Store(other));
1124 }
1125 }
1126 }
1127 self.emit_exposure(experiment, &override_variant, actor, request_id, true);
1128 return Ok(override_variant);
1129 }
1130
1131 if let Some(existing) = self.store.get_assignment(experiment, actor)? {
1133 self.emit_exposure(
1134 experiment,
1135 &existing.variant,
1136 actor,
1137 request_id,
1138 existing.is_override,
1139 );
1140 return Ok(existing.variant);
1141 }
1142
1143 if let Some(group) = &config.exclusion_group
1145 && self
1146 .store
1147 .has_assignment_in_group(actor, group, experiment)?
1148 {
1149 return Err(ExperimentError::ExcludedByGroup(
1150 experiment.to_owned(),
1151 group.clone(),
1152 ));
1153 }
1154
1155 let bucket = experiment_bucket(experiment, actor);
1157 let variant_name = select_variant(&config.variants, bucket)
1158 .ok_or_else(|| ExperimentError::NoVariant(experiment.to_owned()))?
1159 .to_owned();
1160
1161 let assignment = Assignment::new(experiment, actor, &variant_name, false);
1164 let persisted_variant = match self.store.record_assignment(assignment) {
1165 Ok(v) => v,
1166 Err(ExperimentStoreError::Backend(msg)) if msg.starts_with("ExcludedByGroup:") => {
1167 let group = msg
1168 .strip_prefix("ExcludedByGroup:")
1169 .unwrap_or("")
1170 .trim()
1171 .to_owned();
1172 return Err(ExperimentError::ExcludedByGroup(
1173 experiment.to_owned(),
1174 group,
1175 ));
1176 }
1177 Err(other @ ExperimentStoreError::Backend(_)) => {
1178 return Err(ExperimentError::Store(other));
1179 }
1180 };
1181 self.emit_exposure(experiment, &persisted_variant, actor, request_id, false);
1182
1183 Ok(persisted_variant)
1184 }
1185
1186 fn emit_exposure(
1187 &self,
1188 experiment: &str,
1189 variant: &str,
1190 actor: &str,
1191 request_id: Option<&str>,
1192 is_override: bool,
1193 ) {
1194 self.exposure_sink.record(ExposureRecord {
1195 experiment: experiment.to_owned(),
1196 variant: variant.to_owned(),
1197 actor: actor.to_owned(),
1198 request_id: request_id.map(str::to_owned),
1199 is_override,
1200 timestamp_secs: now_secs(),
1201 });
1202 }
1203
1204 pub fn create(&self, config: ExperimentConfig) -> Result<(), ExperimentError> {
1210 validate_variants(&config.variants)?;
1211 if config.state == ExperimentState::Concluded {
1212 let winner = config
1213 .winner
1214 .as_deref()
1215 .filter(|w| !w.trim().is_empty())
1216 .ok_or_else(|| {
1217 ExperimentError::NoVariant(
1218 "concluded experiment requires a non-empty winner".into(),
1219 )
1220 })?;
1221 if !config.variants.iter().any(|v| v.name == winner) {
1222 return Err(ExperimentError::NoVariant(format!(
1223 "'{winner}' is not a configured variant"
1224 )));
1225 }
1226 }
1227 self.store.upsert(config)?;
1228 Ok(())
1229 }
1230
1231 pub fn start(&self, name: &str) -> Result<(), ExperimentError> {
1240 let config = self
1241 .store
1242 .get(name)?
1243 .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1244 if config.state == ExperimentState::Archived {
1245 return Err(ExperimentError::Archived(name.to_owned()));
1246 }
1247 self.store.set_state(name, ExperimentState::Running, None)?;
1248 Ok(())
1249 }
1250
1251 pub fn conclude(&self, name: &str, winner: &str) -> Result<(), ExperimentError> {
1260 let config = self
1261 .store
1262 .get(name)?
1263 .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1264 if config.state == ExperimentState::Archived {
1265 return Err(ExperimentError::Archived(name.to_owned()));
1266 }
1267 if !config.variants.iter().any(|v| v.name == winner) {
1268 return Err(ExperimentError::NoVariant(format!(
1269 "'{winner}' is not a configured variant of experiment '{name}'"
1270 )));
1271 }
1272 self.store
1273 .set_state(name, ExperimentState::Concluded, Some(winner))?;
1274 Ok(())
1275 }
1276
1277 pub fn archive(&self, name: &str) -> Result<(), ExperimentError> {
1286 self.store
1287 .get(name)?
1288 .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1289 self.store
1290 .set_state(name, ExperimentState::Archived, None)?;
1291 Ok(())
1292 }
1293
1294 pub fn set_weights(
1304 &self,
1305 name: &str,
1306 variants: Vec<VariantConfig>,
1307 actor: Option<&str>,
1308 ) -> Result<(), ExperimentError> {
1309 validate_variants(&variants)?;
1310 let config = self
1311 .store
1312 .get(name)?
1313 .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))?;
1314 match config.state {
1315 ExperimentState::Concluded => {
1316 return Err(ExperimentError::NotRunning(
1317 name.to_owned(),
1318 ExperimentState::Concluded,
1319 ));
1320 }
1321 ExperimentState::Archived => {
1322 return Err(ExperimentError::Archived(name.to_owned()));
1323 }
1324 _ => {}
1325 }
1326 self.store.set_variants(name, variants, actor)?;
1327 Ok(())
1328 }
1329
1330 pub fn set_override(
1339 &self,
1340 experiment: &str,
1341 actor: &str,
1342 variant: &str,
1343 ) -> Result<(), ExperimentError> {
1344 let config = self
1345 .store
1346 .get(experiment)?
1347 .ok_or_else(|| ExperimentError::NotFound(experiment.to_owned()))?;
1348 if !config.variants.iter().any(|v| v.name == variant) {
1349 return Err(ExperimentError::NoVariant(format!(
1350 "'{variant}' is not a configured variant of experiment '{experiment}'"
1351 )));
1352 }
1353 self.store.set_override(experiment, actor, variant)?;
1354 Ok(())
1355 }
1356
1357 pub fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentError> {
1363 Ok(self.store.list()?)
1364 }
1365
1366 pub fn status(&self, name: &str) -> Result<ExperimentConfig, ExperimentError> {
1372 self.store
1373 .get(name)?
1374 .ok_or_else(|| ExperimentError::NotFound(name.to_owned()))
1375 }
1376
1377 pub fn history(
1383 &self,
1384 experiment: &str,
1385 limit: usize,
1386 ) -> Result<Vec<ChangeRecord>, ExperimentError> {
1387 Ok(self.store.history(experiment, limit)?)
1388 }
1389}
1390
1391pub struct Experiments {
1420 service: ExperimentService,
1421 actor_id: Option<String>,
1422 request_id: Option<String>,
1423}
1424
1425impl Experiments {
1426 pub fn assign(&self, experiment: &str) -> Result<String, ExperimentError> {
1437 let actor = self.actor_id.as_deref().unwrap_or("anonymous");
1438 self.service
1439 .assign_with_request_id(experiment, actor, self.request_id.as_deref())
1440 }
1441
1442 #[must_use]
1444 pub const fn service(&self) -> &ExperimentService {
1445 &self.service
1446 }
1447}
1448
1449impl axum::extract::FromRequestParts<crate::AppState> for Experiments {
1450 type Rejection = crate::AutumnError;
1451
1452 async fn from_request_parts(
1453 parts: &mut axum::http::request::Parts,
1454 state: &crate::AppState,
1455 ) -> Result<Self, Self::Rejection> {
1456 let service = state
1457 .extension::<ExperimentService>()
1458 .map(|arc| (*arc).clone())
1459 .ok_or_else(|| {
1460 crate::AutumnError::internal_server_error_msg(
1461 "experiment service not registered; \
1462 install an ExperimentStore via AppBuilder::with_experiment_store()",
1463 )
1464 })?;
1465
1466 let actor_id = if let Some(session) = parts.extensions.get::<crate::session::Session>() {
1467 let session_key = state.auth_session_key();
1471 if let Some(uid) = session.get(session_key).await {
1472 Some(uid)
1473 } else {
1474 const ANON_KEY: &str = "_autumn_anon_actor";
1479 if let Some(existing) = session.get(ANON_KEY).await {
1480 Some(existing)
1481 } else {
1482 let id = session.id().await;
1483 session.insert(ANON_KEY, &id).await;
1484 Some(id)
1485 }
1486 }
1487 } else {
1488 None
1489 };
1490
1491 let request_id = parts
1492 .headers
1493 .get("x-request-id")
1494 .and_then(|v| v.to_str().ok())
1495 .map(str::to_owned);
1496
1497 Ok(Self {
1498 service,
1499 actor_id,
1500 request_id,
1501 })
1502 }
1503}
1504
1505#[cfg(feature = "db")]
1508pub mod pg {
1509 use super::{
1510 Assignment, ChangeRecord, ExperimentConfig, ExperimentState, ExperimentStore,
1511 ExperimentStoreError, VariantConfig,
1512 };
1513 use diesel::prelude::*;
1514 use std::collections::HashMap;
1515 use std::sync::RwLock;
1516 use std::time::{Duration, Instant};
1517
1518 #[derive(Debug, Clone, PartialEq, Eq)]
1521 enum CacheLookup {
1522 Hit(Option<ExperimentConfig>),
1523 Miss,
1524 }
1525
1526 #[derive(Debug, Clone)]
1527 struct CachedEntry {
1528 value: Option<ExperimentConfig>,
1529 expires_at: Instant,
1530 }
1531
1532 #[derive(diesel::QueryableByName)]
1535 struct ExperimentRow {
1536 #[diesel(sql_type = diesel::sql_types::Text)]
1537 name: String,
1538 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1539 description: Option<String>,
1540 #[diesel(sql_type = diesel::sql_types::Text)]
1541 state: String,
1542 #[diesel(sql_type = diesel::sql_types::Text)]
1543 variants: String,
1544 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1545 winner: Option<String>,
1546 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1547 exclusion_group: Option<String>,
1548 #[diesel(sql_type = diesel::sql_types::BigInt)]
1549 updated_at_secs: i64,
1550 }
1551
1552 impl ExperimentRow {
1553 fn into_config(self) -> ExperimentConfig {
1554 let variants: Vec<VariantConfig> =
1555 serde_json::from_str(&self.variants).unwrap_or_default();
1556 let state = match self.state.as_str() {
1557 "running" => ExperimentState::Running,
1558 "concluded" => ExperimentState::Concluded,
1559 "archived" => ExperimentState::Archived,
1560 _ => ExperimentState::Draft,
1561 };
1562 ExperimentConfig {
1563 name: self.name,
1564 description: self.description,
1565 state,
1566 variants,
1567 winner: self.winner,
1568 exclusion_group: self.exclusion_group,
1569 updated_at_secs: u64::try_from(self.updated_at_secs).unwrap_or(0),
1570 }
1571 }
1572 }
1573
1574 #[derive(diesel::QueryableByName)]
1575 struct AssignmentRow {
1576 #[diesel(sql_type = diesel::sql_types::Text)]
1577 experiment: String,
1578 #[diesel(sql_type = diesel::sql_types::Text)]
1579 actor: String,
1580 #[diesel(sql_type = diesel::sql_types::Text)]
1581 variant: String,
1582 #[diesel(sql_type = diesel::sql_types::Bool)]
1583 is_override: bool,
1584 #[diesel(sql_type = diesel::sql_types::BigInt)]
1585 assigned_at_secs: i64,
1586 }
1587
1588 #[derive(diesel::QueryableByName)]
1589 struct BoolRow {
1590 #[diesel(sql_type = diesel::sql_types::Bool)]
1591 result: bool,
1592 }
1593
1594 #[derive(diesel::QueryableByName)]
1595 struct VariantNameRow {
1596 #[diesel(sql_type = diesel::sql_types::Text)]
1597 variant: String,
1598 }
1599
1600 #[derive(diesel::QueryableByName)]
1601 struct ExclusionGroupRow {
1602 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1603 exclusion_group: Option<String>,
1604 }
1605
1606 #[derive(diesel::QueryableByName)]
1607 struct ChangeRow {
1608 #[diesel(sql_type = diesel::sql_types::Text)]
1609 experiment: String,
1610 #[diesel(sql_type = diesel::sql_types::Text)]
1611 mutation: String,
1612 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
1613 actor: Option<String>,
1614 #[diesel(sql_type = diesel::sql_types::BigInt)]
1615 timestamp_secs: i64,
1616 }
1617
1618 #[derive(diesel::QueryableByName)]
1619 struct ChangeExperimentRow {
1620 #[diesel(sql_type = diesel::sql_types::Text)]
1621 experiment: String,
1622 }
1623
1624 #[derive(Debug)]
1631 pub struct PgExperimentStore {
1632 database_url: String,
1633 cache_ttl: Duration,
1634 cache: RwLock<HashMap<String, CachedEntry>>,
1635 }
1636
1637 impl Clone for PgExperimentStore {
1638 fn clone(&self) -> Self {
1639 Self::with_cache_ttl(self.database_url.clone(), self.cache_ttl)
1640 }
1641 }
1642
1643 impl PgExperimentStore {
1644 pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(1);
1646
1647 #[must_use]
1649 pub fn new(database_url: impl Into<String>) -> Self {
1650 Self::with_cache_ttl(database_url, Self::DEFAULT_CACHE_TTL)
1651 }
1652
1653 #[must_use]
1656 pub fn with_cache_ttl(database_url: impl Into<String>, cache_ttl: Duration) -> Self {
1657 Self {
1658 database_url: database_url.into(),
1659 cache_ttl,
1660 cache: RwLock::new(HashMap::new()),
1661 }
1662 }
1663
1664 #[must_use]
1666 pub fn from_database_config(config: &crate::config::DatabaseConfig) -> Option<Self> {
1667 config.effective_primary_url().map(Self::new)
1668 }
1669
1670 fn connect(&self) -> Result<diesel::PgConnection, ExperimentStoreError> {
1671 diesel::PgConnection::establish(&self.database_url)
1672 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
1673 }
1674
1675 fn cached(&self, name: &str) -> CacheLookup {
1676 let now = Instant::now();
1677 let Ok(cache) = self.cache.read() else {
1678 return CacheLookup::Miss;
1679 };
1680 match cache.get(name) {
1681 Some(c) if c.expires_at > now => CacheLookup::Hit(c.value.clone()),
1682 _ => CacheLookup::Miss,
1683 }
1684 }
1685
1686 fn store_cache(&self, name: &str, value: Option<ExperimentConfig>) {
1687 if self.cache_ttl.is_zero() {
1688 return;
1689 }
1690 let Some(expires_at) = Instant::now().checked_add(self.cache_ttl) else {
1691 return;
1692 };
1693 if let Ok(mut cache) = self.cache.write() {
1694 cache.insert(name.to_owned(), CachedEntry { value, expires_at });
1695 }
1696 }
1697
1698 fn invalidate(&self, name: &str) {
1699 if let Ok(mut cache) = self.cache.write() {
1700 cache.remove(name);
1701 }
1702 }
1703
1704 pub fn spawn_poll_listener(
1709 store: std::sync::Arc<Self>,
1710 poll_interval: Duration,
1711 ) -> std::thread::JoinHandle<()> {
1712 std::thread::spawn(move || {
1713 const OVERLAP_SECS: i64 = 5;
1714 let now_secs = || {
1715 i64::try_from(
1716 std::time::SystemTime::now()
1717 .duration_since(std::time::UNIX_EPOCH)
1718 .unwrap_or_default()
1719 .as_secs(),
1720 )
1721 .unwrap_or(i64::MAX)
1722 };
1723 let mut last_polled_secs: i64 = now_secs() - OVERLAP_SECS;
1724
1725 loop {
1726 std::thread::sleep(poll_interval);
1727 let new_horizon = now_secs() - OVERLAP_SECS;
1728 if let Ok(mut conn) = store.connect() {
1729 let rows: Vec<ChangeExperimentRow> = diesel::sql_query(
1730 "SELECT DISTINCT experiment FROM autumn_experiment_changes \
1731 WHERE changed_at > to_timestamp($1)",
1732 )
1733 .bind::<diesel::sql_types::BigInt, _>(last_polled_secs)
1734 .load::<ChangeExperimentRow>(&mut conn)
1735 .unwrap_or_default();
1736
1737 for row in rows {
1738 store.invalidate(&row.experiment);
1739 }
1740 }
1741 last_polled_secs = new_horizon;
1742 }
1743 })
1744 }
1745 }
1746
1747 impl ExperimentStore for PgExperimentStore {
1750 fn get(&self, name: &str) -> Result<Option<ExperimentConfig>, ExperimentStoreError> {
1751 if let CacheLookup::Hit(v) = self.cached(name) {
1752 return Ok(v);
1753 }
1754 let mut conn = self.connect()?;
1755 let result = diesel::sql_query(
1756 "SELECT name, description, state::text, variants::text, winner, \
1757 exclusion_group, \
1758 EXTRACT(EPOCH FROM updated_at)::bigint AS updated_at_secs \
1759 FROM autumn_experiments WHERE name = $1",
1760 )
1761 .bind::<diesel::sql_types::Text, _>(name)
1762 .get_result::<ExperimentRow>(&mut conn)
1763 .optional()
1764 .map(|r| r.map(ExperimentRow::into_config))
1765 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1766
1767 self.store_cache(name, result.clone());
1768 Ok(result)
1769 }
1770
1771 fn list(&self) -> Result<Vec<ExperimentConfig>, ExperimentStoreError> {
1772 let mut conn = self.connect()?;
1773 diesel::sql_query(
1774 "SELECT name, description, state::text, variants::text, winner, \
1775 exclusion_group, \
1776 EXTRACT(EPOCH FROM updated_at)::bigint AS updated_at_secs \
1777 FROM autumn_experiments ORDER BY name",
1778 )
1779 .load::<ExperimentRow>(&mut conn)
1780 .map(|rows| rows.into_iter().map(ExperimentRow::into_config).collect())
1781 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
1782 }
1783
1784 fn upsert(&self, config: ExperimentConfig) -> Result<(), ExperimentStoreError> {
1785 let mut conn = self.connect()?;
1786
1787 let active_variants = diesel::sql_query(
1788 "SELECT DISTINCT variant FROM autumn_experiment_assignments WHERE experiment = $1",
1789 )
1790 .bind::<diesel::sql_types::Text, _>(&config.name)
1791 .load::<VariantNameRow>(&mut conn)
1792 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1793
1794 let new_variants: std::collections::HashSet<&str> =
1795 config.variants.iter().map(|v| v.name.as_str()).collect();
1796
1797 for row in active_variants {
1798 if !new_variants.contains(row.variant.as_str()) {
1799 return Err(ExperimentStoreError::Backend(format!(
1800 "cannot delete variant '{}' because it has active assignments",
1801 row.variant
1802 )));
1803 }
1804 }
1805
1806 let variants_json =
1807 serde_json::to_string(&config.variants).unwrap_or_else(|_| "[]".to_owned());
1808 let state_str = config.state.to_string();
1809 let rows_affected = diesel::sql_query(
1810 "WITH upserted AS ( \
1811 INSERT INTO autumn_experiments \
1812 (name, description, state, variants, winner, exclusion_group) \
1813 VALUES ($1, $2, $3::autumn_experiment_state, $4::jsonb, $5, $6) \
1814 ON CONFLICT (name) DO UPDATE SET \
1815 description = EXCLUDED.description, \
1816 state = EXCLUDED.state, \
1817 variants = EXCLUDED.variants, \
1818 winner = EXCLUDED.winner, \
1819 exclusion_group = EXCLUDED.exclusion_group, \
1820 updated_at = NOW() \
1821 WHERE NOT EXISTS ( \
1822 SELECT 1 FROM autumn_experiment_assignments a \
1823 WHERE a.experiment = EXCLUDED.name \
1824 AND a.variant NOT IN ( \
1825 SELECT x.name FROM jsonb_to_recordset(EXCLUDED.variants) AS x(name text) \
1826 ) \
1827 ) \
1828 RETURNING name, (xmax = 0) AS is_insert \
1829 ) \
1830 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
1831 SELECT name, CASE WHEN is_insert THEN 'created' ELSE 'updated' END, NULL FROM upserted",
1832 )
1833 .bind::<diesel::sql_types::Text, _>(&config.name)
1834 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(config.description)
1835 .bind::<diesel::sql_types::Text, _>(&state_str)
1836 .bind::<diesel::sql_types::Text, _>(&variants_json)
1837 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(config.winner)
1838 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(config.exclusion_group)
1839 .execute(&mut conn)
1840 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1841
1842 if rows_affected == 0 {
1843 return Err(ExperimentStoreError::Backend(
1844 "cannot delete variant because it has active assignments".to_owned(),
1845 ));
1846 }
1847
1848 self.invalidate(&config.name);
1849 Ok(())
1850 }
1851
1852 fn set_state(
1853 &self,
1854 name: &str,
1855 state: ExperimentState,
1856 winner: Option<&str>,
1857 ) -> Result<(), ExperimentStoreError> {
1858 let state_str = state.to_string();
1859 let mutation =
1860 winner.map_or_else(|| format!("state={state}"), |w| format!("concluded={w}"));
1861 let mut conn = self.connect()?;
1862 diesel::sql_query(
1863 "WITH updated AS ( \
1864 UPDATE autumn_experiments \
1865 SET state = $2::autumn_experiment_state, \
1866 winner = COALESCE($3, winner), \
1867 updated_at = NOW() \
1868 WHERE name = $1 \
1869 RETURNING name \
1870 ) \
1871 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
1872 SELECT name, $4, NULL FROM updated",
1873 )
1874 .bind::<diesel::sql_types::Text, _>(name)
1875 .bind::<diesel::sql_types::Text, _>(&state_str)
1876 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
1877 winner.map(str::to_owned),
1878 )
1879 .bind::<diesel::sql_types::Text, _>(&mutation)
1880 .execute(&mut conn)
1881 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1882 self.invalidate(name);
1883 Ok(())
1884 }
1885
1886 fn set_variants(
1887 &self,
1888 name: &str,
1889 variants: Vec<VariantConfig>,
1890 actor: Option<&str>,
1891 ) -> Result<(), ExperimentStoreError> {
1892 let mut conn = self.connect()?;
1893
1894 let active_variants = diesel::sql_query(
1895 "SELECT DISTINCT variant FROM autumn_experiment_assignments WHERE experiment = $1",
1896 )
1897 .bind::<diesel::sql_types::Text, _>(name)
1898 .load::<VariantNameRow>(&mut conn)
1899 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1900
1901 let new_variants: std::collections::HashSet<&str> =
1902 variants.iter().map(|v| v.name.as_str()).collect();
1903
1904 for row in active_variants {
1905 if !new_variants.contains(row.variant.as_str()) {
1906 return Err(ExperimentStoreError::Backend(format!(
1907 "cannot delete variant '{}' because it has active assignments",
1908 row.variant
1909 )));
1910 }
1911 }
1912
1913 let variants_json =
1914 serde_json::to_string(&variants).unwrap_or_else(|_| "[]".to_owned());
1915 let rows_affected = diesel::sql_query(
1916 "WITH updated AS ( \
1917 UPDATE autumn_experiments \
1918 SET variants = $2::jsonb, updated_at = NOW() \
1919 WHERE name = $1 \
1920 AND NOT EXISTS ( \
1921 SELECT 1 FROM autumn_experiment_assignments a \
1922 WHERE a.experiment = name \
1923 AND a.variant NOT IN ( \
1924 SELECT x.name FROM jsonb_to_recordset($2::jsonb) AS x(name text) \
1925 ) \
1926 ) \
1927 RETURNING name \
1928 ) \
1929 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
1930 SELECT name, 'set_weights', $3 FROM updated",
1931 )
1932 .bind::<diesel::sql_types::Text, _>(name)
1933 .bind::<diesel::sql_types::Text, _>(&variants_json)
1934 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
1935 actor.map(str::to_owned),
1936 )
1937 .execute(&mut conn)
1938 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1939
1940 if rows_affected == 0 {
1941 let exists_row = diesel::sql_query(
1942 "SELECT EXISTS(SELECT 1 FROM autumn_experiments WHERE name = $1) AS result",
1943 )
1944 .bind::<diesel::sql_types::Text, _>(name)
1945 .get_result::<BoolRow>(&mut conn)
1946 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
1947
1948 if exists_row.result {
1949 return Err(ExperimentStoreError::Backend(
1950 "cannot delete variant because it has active assignments".to_owned(),
1951 ));
1952 }
1953 }
1954
1955 self.invalidate(name);
1956 Ok(())
1957 }
1958
1959 fn get_assignment(
1960 &self,
1961 experiment: &str,
1962 actor: &str,
1963 ) -> Result<Option<Assignment>, ExperimentStoreError> {
1964 let mut conn = self.connect()?;
1965 diesel::sql_query(
1966 "SELECT experiment, actor, variant, is_override, \
1967 EXTRACT(EPOCH FROM assigned_at)::bigint AS assigned_at_secs \
1968 FROM autumn_experiment_assignments \
1969 WHERE experiment = $1 AND actor = $2",
1970 )
1971 .bind::<diesel::sql_types::Text, _>(experiment)
1972 .bind::<diesel::sql_types::Text, _>(actor)
1973 .get_result::<AssignmentRow>(&mut conn)
1974 .optional()
1975 .map(|r| {
1976 r.map(|row| Assignment {
1977 experiment: row.experiment,
1978 actor: row.actor,
1979 variant: row.variant,
1980 is_override: row.is_override,
1981 assigned_at_secs: u64::try_from(row.assigned_at_secs).unwrap_or(0),
1982 })
1983 })
1984 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
1985 }
1986
1987 fn record_assignment(
1988 &self,
1989 assignment: Assignment,
1990 ) -> Result<String, ExperimentStoreError> {
1991 use diesel::connection::Connection as _;
1992
1993 #[derive(Debug)]
1994 enum TxError {
1995 Database(diesel::result::Error),
1996 Excluded(String),
1997 }
1998 impl From<diesel::result::Error> for TxError {
1999 fn from(e: diesel::result::Error) -> Self {
2000 Self::Database(e)
2001 }
2002 }
2003
2004 let mut conn = self.connect()?;
2005 let result = conn.transaction::<String, TxError, _>(|conn| {
2006 diesel::sql_query("SELECT pg_advisory_xact_lock(hashtext($1))")
2008 .bind::<diesel::sql_types::Text, _>(&assignment.actor)
2009 .execute(conn)?;
2010
2011 if !assignment.is_override {
2013 let group_row = diesel::sql_query(
2015 "SELECT exclusion_group FROM autumn_experiments WHERE name = $1"
2016 )
2017 .bind::<diesel::sql_types::Text, _>(&assignment.experiment)
2018 .get_result::<ExclusionGroupRow>(conn)
2019 .optional()?;
2020
2021 if let Some(ExclusionGroupRow { exclusion_group: Some(group) }) = group_row {
2022 let exists = diesel::sql_query(
2024 "SELECT EXISTS ( \
2025 SELECT 1 \
2026 FROM autumn_experiment_assignments a \
2027 JOIN autumn_experiments e ON e.name = a.experiment \
2028 WHERE a.actor = $1 \
2029 AND e.exclusion_group = $2 \
2030 AND a.experiment <> $3 \
2031 ) AS result",
2032 )
2033 .bind::<diesel::sql_types::Text, _>(&assignment.actor)
2034 .bind::<diesel::sql_types::Text, _>(&group)
2035 .bind::<diesel::sql_types::Text, _>(&assignment.experiment)
2036 .get_result::<BoolRow>(conn)
2037 .map(|r| r.result)?;
2038
2039 if exists {
2040 return Err(TxError::Excluded(group));
2041 }
2042 }
2043 }
2044
2045 let variant_name = diesel::sql_query(
2047 "INSERT INTO autumn_experiment_assignments \
2048 (experiment, actor, variant, is_override) \
2049 VALUES ($1, $2, $3, $4) \
2050 ON CONFLICT (experiment, actor) DO UPDATE \
2051 SET variant = CASE WHEN EXCLUDED.is_override THEN EXCLUDED.variant ELSE autumn_experiment_assignments.variant END, \
2052 is_override = CASE WHEN EXCLUDED.is_override THEN EXCLUDED.is_override ELSE autumn_experiment_assignments.is_override END \
2053 RETURNING variant",
2054 )
2055 .bind::<diesel::sql_types::Text, _>(&assignment.experiment)
2056 .bind::<diesel::sql_types::Text, _>(&assignment.actor)
2057 .bind::<diesel::sql_types::Text, _>(&assignment.variant)
2058 .bind::<diesel::sql_types::Bool, _>(assignment.is_override)
2059 .get_result::<VariantNameRow>(conn)
2060 .map(|r| r.variant)?;
2061
2062 Ok(variant_name)
2063 });
2064
2065 match result {
2066 Ok(v) => Ok(v),
2067 Err(TxError::Database(e)) => Err(ExperimentStoreError::Backend(e.to_string())),
2068 Err(TxError::Excluded(group)) => Err(ExperimentStoreError::Backend(format!(
2069 "ExcludedByGroup:{group}"
2070 ))),
2071 }
2072 }
2073
2074 fn get_override(
2075 &self,
2076 experiment: &str,
2077 actor: &str,
2078 ) -> Result<Option<String>, ExperimentStoreError> {
2079 let mut conn = self.connect()?;
2080 diesel::sql_query(
2081 "SELECT variant FROM autumn_experiment_overrides \
2082 WHERE experiment = $1 AND actor = $2",
2083 )
2084 .bind::<diesel::sql_types::Text, _>(experiment)
2085 .bind::<diesel::sql_types::Text, _>(actor)
2086 .get_result::<VariantNameRow>(&mut conn)
2087 .optional()
2088 .map(|r| r.map(|row| row.variant))
2089 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
2090 }
2091
2092 fn set_override(
2093 &self,
2094 experiment: &str,
2095 actor: &str,
2096 variant: &str,
2097 ) -> Result<(), ExperimentStoreError> {
2098 let mut conn = self.connect()?;
2099 diesel::sql_query(
2100 "WITH upserted AS ( \
2101 INSERT INTO autumn_experiment_overrides (experiment, actor, variant) \
2102 VALUES ($1, $2, $3) \
2103 ON CONFLICT (experiment, actor) DO UPDATE SET variant = EXCLUDED.variant \
2104 RETURNING experiment, actor, variant \
2105 ) \
2106 INSERT INTO autumn_experiment_changes (experiment, mutation, actor) \
2107 SELECT experiment, 'override=' || actor || ':' || variant, NULL FROM upserted",
2108 )
2109 .bind::<diesel::sql_types::Text, _>(experiment)
2110 .bind::<diesel::sql_types::Text, _>(actor)
2111 .bind::<diesel::sql_types::Text, _>(variant)
2112 .execute(&mut conn)
2113 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))?;
2114 Ok(())
2115 }
2116
2117 fn has_assignment_in_group(
2118 &self,
2119 actor: &str,
2120 group: &str,
2121 exclude_experiment: &str,
2122 ) -> Result<bool, ExperimentStoreError> {
2123 let mut conn = self.connect()?;
2124 diesel::sql_query(
2125 "SELECT EXISTS ( \
2126 SELECT 1 \
2127 FROM autumn_experiment_assignments a \
2128 JOIN autumn_experiments e ON e.name = a.experiment \
2129 WHERE a.actor = $1 \
2130 AND e.exclusion_group = $2 \
2131 AND a.experiment <> $3 \
2132 ) AS result",
2133 )
2134 .bind::<diesel::sql_types::Text, _>(actor)
2135 .bind::<diesel::sql_types::Text, _>(group)
2136 .bind::<diesel::sql_types::Text, _>(exclude_experiment)
2137 .get_result::<BoolRow>(&mut conn)
2138 .map(|r| r.result)
2139 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
2140 }
2141
2142 fn history(
2143 &self,
2144 experiment: &str,
2145 limit: usize,
2146 ) -> Result<Vec<ChangeRecord>, ExperimentStoreError> {
2147 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
2148 let mut conn = self.connect()?;
2149 diesel::sql_query(
2150 "SELECT experiment, mutation, actor, \
2151 EXTRACT(EPOCH FROM changed_at)::bigint AS timestamp_secs \
2152 FROM autumn_experiment_changes \
2153 WHERE experiment = $1 \
2154 ORDER BY changed_at DESC \
2155 LIMIT NULLIF($2::bigint, 0)",
2156 )
2157 .bind::<diesel::sql_types::Text, _>(experiment)
2158 .bind::<diesel::sql_types::BigInt, _>(limit)
2159 .load::<ChangeRow>(&mut conn)
2160 .map(|rows| {
2161 rows.into_iter()
2162 .map(|r| ChangeRecord {
2163 experiment: r.experiment,
2164 mutation: r.mutation,
2165 actor: r.actor,
2166 timestamp_secs: u64::try_from(r.timestamp_secs).unwrap_or(0),
2167 })
2168 .collect()
2169 })
2170 .map_err(|e| ExperimentStoreError::Backend(e.to_string()))
2171 }
2172 }
2173}
2174
2175#[cfg(test)]
2178mod tests {
2179 use super::*;
2180
2181 fn make_svc() -> ExperimentService {
2184 ExperimentService::new(Arc::new(InMemoryExperimentStore::new()))
2185 }
2186
2187 fn make_svc_with_sink() -> (ExperimentService, Arc<Mutex<Vec<ExposureRecord>>>) {
2188 let (sink, records) = RecordingExposureSink::new();
2189 let svc = ExperimentService::new(Arc::new(InMemoryExperimentStore::new()))
2190 .with_exposure_sink(Arc::new(sink));
2191 (svc, records)
2192 }
2193
2194 fn fifty_fifty(name: &str) -> ExperimentConfig {
2195 ExperimentConfig::new(
2196 name,
2197 vec![
2198 VariantConfig::new("control", 50),
2199 VariantConfig::new("treatment", 50),
2200 ],
2201 )
2202 }
2203
2204 fn running(svc: &ExperimentService, name: &str) {
2205 svc.create(fifty_fifty(name)).unwrap();
2206 svc.start(name).unwrap();
2207 }
2208
2209 #[test]
2217 fn assign_unknown_experiment_returns_not_found() {
2218 let svc = make_svc();
2219 let err = svc.assign("ghost", "user:1").unwrap_err();
2220 assert!(
2221 matches!(err, ExperimentError::NotFound(_)),
2222 "expected NotFound, got {err}"
2223 );
2224 }
2225
2226 #[test]
2229 fn assign_draft_experiment_returns_not_running() {
2230 let svc = make_svc();
2231 svc.create(fifty_fifty("exp")).unwrap();
2232 let err = svc.assign("exp", "user:1").unwrap_err();
2233 assert!(
2234 matches!(err, ExperimentError::NotRunning(_, ExperimentState::Draft)),
2235 "expected NotRunning(Draft), got {err}"
2236 );
2237 }
2238
2239 #[test]
2242 fn assign_archived_experiment_returns_archived() {
2243 let svc = make_svc();
2244 running(&svc, "exp");
2245 svc.archive("exp").unwrap();
2246 let err = svc.assign("exp", "user:1").unwrap_err();
2247 assert!(
2248 matches!(err, ExperimentError::Archived(_)),
2249 "expected Archived, got {err}"
2250 );
2251 }
2252
2253 #[test]
2256 fn concluded_experiment_returns_winner_for_all_actors() {
2257 let svc = make_svc();
2258 running(&svc, "exp");
2259 svc.conclude("exp", "treatment").unwrap();
2260 for i in 0..100_u32 {
2262 let actor = format!("user:{i}");
2263 let v = svc.assign("exp", &actor).unwrap();
2264 assert_eq!(
2265 v, "treatment",
2266 "concluded experiment must return winner for {actor}"
2267 );
2268 }
2269 }
2270
2271 #[test]
2274 fn concluded_experiment_emits_no_exposures() {
2275 let (svc, records) = make_svc_with_sink();
2276 let store = Arc::new(InMemoryExperimentStore::new());
2277 let (sink2, records2) = RecordingExposureSink::new();
2278 let svc2 = ExperimentService::new(store as Arc<dyn ExperimentStore>)
2279 .with_exposure_sink(Arc::new(sink2));
2280 svc2.create(fifty_fifty("exp")).unwrap();
2281 svc2.start("exp").unwrap();
2282 svc2.conclude("exp", "treatment").unwrap();
2283 svc2.assign("exp", "user:1").unwrap();
2284 assert_eq!(
2285 records2.lock().unwrap().len(),
2286 0,
2287 "concluded experiment must not emit exposure events"
2288 );
2289 let _ = records; let _ = svc;
2291 }
2292
2293 #[test]
2296 fn assign_running_experiment_returns_valid_variant() {
2297 let svc = make_svc();
2298 running(&svc, "exp");
2299 let v = svc.assign("exp", "user:1").unwrap();
2300 assert!(
2301 v == "control" || v == "treatment",
2302 "variant must be one of the declared names, got {v:?}"
2303 );
2304 }
2305
2306 #[test]
2309 fn assign_is_deterministic_for_same_actor() {
2310 let svc = make_svc();
2311 running(&svc, "exp");
2312 let v1 = svc.assign("exp", "user:42").unwrap();
2313 let v2 = svc.assign("exp", "exp").unwrap(); let svc2 = make_svc();
2316 running(&svc2, "exp");
2317 let v3 = svc2.assign("exp", "user:42").unwrap();
2318 assert_eq!(
2319 v1, v3,
2320 "same actor must receive the same variant across different service instances"
2321 );
2322 let _ = v2;
2323 }
2324
2325 #[test]
2328 fn zero_reassignments_across_10000_requests() {
2329 let svc = make_svc();
2330 running(&svc, "exp");
2331 let first = svc.assign("exp", "stable_user").unwrap();
2332 for _ in 1..10_000 {
2333 let v = svc.assign("exp", "stable_user").unwrap();
2334 assert_eq!(v, first, "re-assignment must return the same variant");
2335 }
2336 }
2337
2338 #[test]
2341 fn stable_hash_regression_known_fixtures() {
2342 let b1 = experiment_bucket("checkout_v2", "user:1");
2346 let b2 = experiment_bucket("checkout_v2", "user:1");
2347 assert_eq!(
2348 b1, b2,
2349 "hash must be deterministic (same input → same output)"
2350 );
2351
2352 assert_eq!(
2354 b1, 4_830,
2355 "checkout_v2:user:1 bucket changed — hash regression"
2356 );
2357 assert_eq!(
2358 experiment_bucket("checkout_v2", "user:2"),
2359 6_619,
2360 "checkout_v2:user:2 bucket changed — hash regression"
2361 );
2362 assert_eq!(
2363 experiment_bucket("onboarding_v3", "user:100"),
2364 6_602,
2365 "onboarding_v3:user:100 bucket changed — hash regression"
2366 );
2367 }
2368
2369 #[test]
2372 fn zero_weight_variant_never_assigned() {
2373 let svc = make_svc();
2374 svc.create(ExperimentConfig::new(
2375 "exp",
2376 vec![
2377 VariantConfig::new("control", 100),
2378 VariantConfig::new("dead", 0),
2379 ],
2380 ))
2381 .unwrap();
2382 svc.start("exp").unwrap();
2383 for i in 0..200_u32 {
2384 let v = svc.assign("exp", &format!("user:{i}")).unwrap();
2385 assert_eq!(v, "control", "zero-weight variant must never be assigned");
2386 }
2387 }
2388
2389 #[test]
2392 fn single_variant_always_assigned() {
2393 let svc = make_svc();
2394 svc.create(ExperimentConfig::new(
2395 "exp",
2396 vec![VariantConfig::new("only", 100)],
2397 ))
2398 .unwrap();
2399 svc.start("exp").unwrap();
2400 for i in 0..100_u32 {
2401 let v = svc.assign("exp", &format!("user:{i}")).unwrap();
2402 assert_eq!(v, "only");
2403 }
2404 }
2405
2406 #[test]
2409 fn fifty_fifty_weights_split_roughly_evenly() {
2410 let svc = make_svc();
2411 running(&svc, "exp");
2412 let mut control_count = 0_u32;
2413 for i in 0..1000_u32 {
2414 if svc.assign("exp", &format!("user:{i}")).unwrap() == "control" {
2415 control_count += 1;
2416 }
2417 }
2418 assert!(
2419 (400..=600).contains(&control_count),
2420 "expected ~500 control assignments, got {control_count}"
2421 );
2422 }
2423
2424 #[test]
2427 fn all_zero_weights_returns_no_variant_error() {
2428 let svc = make_svc();
2429 svc.create(ExperimentConfig::new(
2430 "exp",
2431 vec![VariantConfig::new("a", 0), VariantConfig::new("b", 0)],
2432 ))
2433 .unwrap();
2434 svc.start("exp").unwrap();
2435 let err = svc.assign("exp", "user:1").unwrap_err();
2436 assert!(
2437 matches!(err, ExperimentError::NoVariant(_)),
2438 "expected NoVariant, got {err}"
2439 );
2440 }
2441
2442 #[test]
2445 fn sticky_assignment_returned_on_subsequent_calls() {
2446 let svc = make_svc();
2447 running(&svc, "exp");
2448 let first = svc.assign("exp", "user:1").unwrap();
2449 for _ in 0..10 {
2451 assert_eq!(
2452 svc.assign("exp", "user:1").unwrap(),
2453 first,
2454 "sticky assignment must be returned on all subsequent calls"
2455 );
2456 }
2457 }
2458
2459 #[test]
2462 fn set_weights_does_not_rebucket_existing_assignments() {
2463 let svc = make_svc();
2464 running(&svc, "exp");
2465 let original = svc.assign("exp", "user:1").unwrap();
2466
2467 let (new_heavy, new_light) = if original == "control" {
2469 (0_u32, 100_u32)
2470 } else {
2471 (100_u32, 0_u32)
2472 };
2473 svc.set_weights(
2474 "exp",
2475 vec![
2476 VariantConfig::new("control", new_heavy),
2477 VariantConfig::new("treatment", new_light),
2478 ],
2479 None,
2480 )
2481 .unwrap();
2482
2483 let after = svc.assign("exp", "user:1").unwrap();
2484 assert_eq!(
2485 original, after,
2486 "existing sticky assignment must not be re-bucketed after weight change"
2487 );
2488 }
2489
2490 #[test]
2491 fn set_weights_rejects_deleting_assigned_variant() {
2492 let svc = make_svc();
2493 running(&svc, "exp");
2494 let original = svc.assign("exp", "user:1").unwrap();
2495
2496 let remaining_variant = if original == "control" {
2497 "treatment"
2498 } else {
2499 "control"
2500 };
2501 let err = svc
2502 .set_weights(
2503 "exp",
2504 vec![VariantConfig::new(remaining_variant, 100)],
2505 None,
2506 )
2507 .unwrap_err();
2508 assert!(
2509 err.to_string().contains("cannot delete variant"),
2510 "expected active assignment delete guard error, got {err}"
2511 );
2512 }
2513
2514 #[test]
2517 fn exposure_emitted_exactly_once_per_assign_call() {
2518 let (svc, records) = make_svc_with_sink();
2519 running(&svc, "exp");
2520 svc.assign("exp", "user:1").unwrap();
2521 assert_eq!(
2522 records.lock().unwrap().len(),
2523 1,
2524 "first assign → 1 exposure"
2525 );
2526 svc.assign("exp", "user:1").unwrap();
2527 assert_eq!(
2528 records.lock().unwrap().len(),
2529 2,
2530 "second assign → 2 total exposures"
2531 );
2532 svc.assign("exp", "user:2").unwrap();
2533 assert_eq!(
2534 records.lock().unwrap().len(),
2535 3,
2536 "different actor → 3 total exposures"
2537 );
2538 }
2539
2540 #[test]
2543 fn exposure_record_contains_correct_fields() {
2544 let (svc, records) = make_svc_with_sink();
2545 running(&svc, "checkout_v2");
2546 let variant = svc
2547 .assign_with_request_id("checkout_v2", "user:42", Some("req-abc"))
2548 .unwrap();
2549 let (len, exp_name, exp_variant, exp_actor, exp_req_id, exp_is_override) = {
2550 let rec = records.lock().unwrap();
2551 let r = &rec[0];
2552 (
2553 rec.len(),
2554 r.experiment.clone(),
2555 r.variant.clone(),
2556 r.actor.clone(),
2557 r.request_id.clone(),
2558 r.is_override,
2559 )
2560 };
2561 assert_eq!(len, 1);
2562 assert_eq!(exp_name, "checkout_v2");
2563 assert_eq!(exp_variant, variant);
2564 assert_eq!(exp_actor, "user:42");
2565 assert_eq!(exp_req_id.as_deref(), Some("req-abc"));
2566 assert!(!exp_is_override);
2567 }
2568
2569 #[test]
2572 fn override_bypasses_weights() {
2573 let svc = make_svc();
2574 svc.create(ExperimentConfig::new(
2576 "exp",
2577 vec![
2578 VariantConfig::new("control", 100),
2579 VariantConfig::new("treatment", 0),
2580 ],
2581 ))
2582 .unwrap();
2583 svc.start("exp").unwrap();
2584 svc.set_override("exp", "qa:alice", "treatment").unwrap();
2585 let v = svc.assign("exp", "qa:alice").unwrap();
2586 assert_eq!(
2587 v, "treatment",
2588 "override must bypass weight-based bucketing"
2589 );
2590 }
2591
2592 #[test]
2595 fn override_emits_exposure_tagged_as_override() {
2596 let (svc, records) = make_svc_with_sink();
2597 running(&svc, "exp");
2598 svc.set_override("exp", "qa:alice", "treatment").unwrap();
2599 svc.assign("exp", "qa:alice").unwrap();
2600 let (len, is_override, exp_variant) = {
2601 let rec = records.lock().unwrap();
2602 (rec.len(), rec[0].is_override, rec[0].variant.clone())
2603 };
2604 assert_eq!(len, 1);
2605 assert!(
2606 is_override,
2607 "exposure from override must be tagged is_override = true"
2608 );
2609 assert_eq!(exp_variant, "treatment");
2610 }
2611
2612 #[test]
2615 fn mutual_exclusion_prevents_sibling_assignment() {
2616 let svc = make_svc();
2617 svc.create(
2619 ExperimentConfig::new("exp_a", vec![VariantConfig::new("v1", 1)])
2620 .exclusion_group("checkout"),
2621 )
2622 .unwrap();
2623 svc.start("exp_a").unwrap();
2624 svc.create(
2625 ExperimentConfig::new("exp_b", vec![VariantConfig::new("v1", 1)])
2626 .exclusion_group("checkout"),
2627 )
2628 .unwrap();
2629 svc.start("exp_b").unwrap();
2630
2631 svc.assign("exp_a", "user:1").unwrap();
2633
2634 let err = svc.assign("exp_b", "user:1").unwrap_err();
2636 assert!(
2637 matches!(err, ExperimentError::ExcludedByGroup(_, _)),
2638 "expected ExcludedByGroup, got {err}"
2639 );
2640 }
2641
2642 #[test]
2645 fn different_groups_do_not_exclude_each_other() {
2646 let svc = make_svc();
2647 svc.create(
2648 ExperimentConfig::new("exp_a", vec![VariantConfig::new("v1", 1)])
2649 .exclusion_group("group_a"),
2650 )
2651 .unwrap();
2652 svc.start("exp_a").unwrap();
2653 svc.create(
2654 ExperimentConfig::new("exp_b", vec![VariantConfig::new("v1", 1)])
2655 .exclusion_group("group_b"),
2656 )
2657 .unwrap();
2658 svc.start("exp_b").unwrap();
2659
2660 svc.assign("exp_a", "user:1").unwrap();
2661 let result = svc.assign("exp_b", "user:1");
2662 assert!(
2663 result.is_ok(),
2664 "experiments in different groups must not exclude each other"
2665 );
2666 }
2667
2668 #[test]
2671 fn no_exclusion_group_allows_both_assignments() {
2672 let svc = make_svc();
2673 running(&svc, "exp_a");
2674 running(&svc, "exp_b");
2675 svc.assign("exp_a", "user:1").unwrap();
2676 let result = svc.assign("exp_b", "user:1");
2677 assert!(
2678 result.is_ok(),
2679 "experiments without exclusion groups must not exclude each other"
2680 );
2681 }
2682
2683 #[test]
2686 fn experiment_bucket_is_stable_and_in_range() {
2687 for i in 0..100_u32 {
2688 let actor = format!("user:{i}");
2689 let b1 = experiment_bucket("my_exp", &actor);
2690 let b2 = experiment_bucket("my_exp", &actor);
2691 assert_eq!(b1, b2, "bucket must be deterministic for {actor}");
2692 assert!(b1 < 10_000, "bucket must be in [0, 10000) for {actor}");
2693 }
2694 }
2695
2696 #[test]
2699 fn experiment_bucket_produces_diverse_values() {
2700 let buckets: std::collections::HashSet<u64> = (0..100_u32)
2701 .map(|i| experiment_bucket("exp", &format!("user:{i}")))
2702 .collect();
2703 assert!(
2704 buckets.len() > 50,
2705 "expected diverse buckets across 100 actors, got {}",
2706 buckets.len()
2707 );
2708 }
2709
2710 #[test]
2713 fn list_returns_all_experiments() {
2714 let svc = make_svc();
2715 svc.create(fifty_fifty("alpha")).unwrap();
2716 svc.create(fifty_fifty("beta")).unwrap();
2717 svc.create(fifty_fifty("gamma")).unwrap();
2718 let experiments = svc.list().unwrap();
2719 assert_eq!(experiments.len(), 3);
2720 assert_eq!(experiments[0].name, "alpha");
2721 assert_eq!(experiments[1].name, "beta");
2722 assert_eq!(experiments[2].name, "gamma");
2723 }
2724
2725 #[test]
2728 fn status_returns_current_config() {
2729 let svc = make_svc();
2730 running(&svc, "exp");
2731 let cfg = svc.status("exp").unwrap();
2732 assert_eq!(cfg.state, ExperimentState::Running);
2733 assert_eq!(cfg.variants.len(), 2);
2734 }
2735
2736 #[test]
2739 fn history_records_create_and_start_mutations() {
2740 let svc = make_svc();
2741 svc.create(fifty_fifty("exp")).unwrap();
2742 svc.start("exp").unwrap();
2743 let hist = svc.history("exp", 10).unwrap();
2744 assert!(!hist.is_empty(), "history must record mutations");
2745 }
2746
2747 #[test]
2750 fn set_weights_recorded_in_history() {
2751 let svc = make_svc();
2752 running(&svc, "exp");
2753 svc.set_weights(
2754 "exp",
2755 vec![
2756 VariantConfig::new("control", 30),
2757 VariantConfig::new("treatment", 70),
2758 ],
2759 Some("ops@example.com"),
2760 )
2761 .unwrap();
2762 let hist = svc.history("exp", 10).unwrap();
2763 let has_set_weights = hist.iter().any(|r| r.mutation == "set_weights");
2764 assert!(has_set_weights, "set_weights must be recorded in history");
2765 }
2766
2767 #[test]
2770 fn select_variant_returns_none_for_empty_variants() {
2771 assert_eq!(select_variant(&[], 0), None);
2772 }
2773
2774 #[test]
2775 fn select_variant_returns_none_for_all_zero_weights() {
2776 let vs = vec![VariantConfig::new("a", 0), VariantConfig::new("b", 0)];
2777 assert_eq!(select_variant(&vs, 5_000), None);
2778 }
2779
2780 #[test]
2781 fn select_variant_50_50_boundary() {
2782 let vs = vec![
2783 VariantConfig::new("control", 50),
2784 VariantConfig::new("treatment", 50),
2785 ];
2786 assert_eq!(select_variant(&vs, 0), Some("control"));
2788 assert_eq!(select_variant(&vs, 4_999), Some("control"));
2790 assert_eq!(select_variant(&vs, 5_000), Some("treatment"));
2793 assert_eq!(select_variant(&vs, 9_999), Some("treatment"));
2795 }
2796
2797 #[test]
2800 fn arc_experiment_store_delegates_all_operations() {
2801 let store = Arc::new(InMemoryExperimentStore::new());
2802 let arc_store: Arc<dyn ExperimentStore> = Arc::clone(&store) as _;
2803
2804 let cfg = fifty_fifty("my_exp");
2805 arc_store.upsert(cfg).unwrap();
2806 assert!(arc_store.get("my_exp").unwrap().is_some());
2807
2808 arc_store
2809 .set_state("my_exp", ExperimentState::Running, None)
2810 .unwrap();
2811 assert_eq!(
2812 arc_store.get("my_exp").unwrap().unwrap().state,
2813 ExperimentState::Running
2814 );
2815
2816 arc_store
2817 .record_assignment(Assignment::new("my_exp", "user:1", "control", false))
2818 .unwrap();
2819 let asgn = arc_store.get_assignment("my_exp", "user:1").unwrap();
2820 assert_eq!(asgn.unwrap().variant, "control");
2821
2822 arc_store
2823 .set_override("my_exp", "qa:1", "treatment")
2824 .unwrap();
2825 assert_eq!(
2826 arc_store.get_override("my_exp", "qa:1").unwrap().unwrap(),
2827 "treatment"
2828 );
2829 }
2830
2831 #[test]
2834 fn experiment_state_display_matches_expected() {
2835 assert_eq!(ExperimentState::Draft.to_string(), "draft");
2836 assert_eq!(ExperimentState::Running.to_string(), "running");
2837 assert_eq!(ExperimentState::Concluded.to_string(), "concluded");
2838 assert_eq!(ExperimentState::Archived.to_string(), "archived");
2839 }
2840
2841 #[test]
2844 fn experiment_error_display() {
2845 assert!(
2846 ExperimentError::NotFound("x".to_owned())
2847 .to_string()
2848 .contains("not found")
2849 );
2850 assert!(
2851 ExperimentError::Archived("x".to_owned())
2852 .to_string()
2853 .contains("archived")
2854 );
2855 assert!(
2856 ExperimentError::ExcludedByGroup("x".to_owned(), "g".to_owned())
2857 .to_string()
2858 .contains("mutual exclusion")
2859 );
2860 assert!(
2861 ExperimentError::NoVariant("x".to_owned())
2862 .to_string()
2863 .contains("weights are zero")
2864 );
2865 }
2866
2867 #[test]
2870 fn service_debug_does_not_panic() {
2871 let svc = make_svc();
2872 let _ = format!("{svc:?}");
2873 }
2874
2875 #[test]
2876 fn upsert_logs_created_or_updated() {
2877 let svc = make_svc();
2878 let exp = fifty_fifty("exp");
2879 svc.create(exp.clone()).unwrap();
2880
2881 svc.create(exp).unwrap();
2883
2884 let hist = svc.history("exp", 10).unwrap();
2885 assert_eq!(hist.len(), 2);
2886 assert_eq!(hist[1].mutation, "created");
2887 assert_eq!(hist[0].mutation, "updated");
2888 }
2889
2890 #[test]
2891 fn upsert_rejects_deleting_variant_with_active_assignments() {
2892 let store = InMemoryExperimentStore::new();
2893 let name = "test_exp";
2894
2895 let config = ExperimentConfig {
2896 name: name.to_string(),
2897 description: None,
2898 state: ExperimentState::Running,
2899 variants: vec![
2900 VariantConfig {
2901 name: "control".to_string(),
2902 weight: 50,
2903 },
2904 VariantConfig {
2905 name: "treatment".to_string(),
2906 weight: 50,
2907 },
2908 ],
2909 winner: None,
2910 exclusion_group: None,
2911 updated_at_secs: 0,
2912 };
2913 store.upsert(config.clone()).unwrap();
2914
2915 store
2917 .record_assignment(Assignment {
2918 experiment: name.to_string(),
2919 actor: "user1".to_string(),
2920 variant: "treatment".to_string(),
2921 is_override: false,
2922 assigned_at_secs: 0,
2923 })
2924 .unwrap();
2925
2926 let mut new_config = config;
2928 new_config.variants = vec![VariantConfig {
2929 name: "control".to_string(),
2930 weight: 100,
2931 }];
2932
2933 let res = store.upsert(new_config);
2934 assert!(
2935 res.is_err(),
2936 "expected upsert to fail due to deleting active variant"
2937 );
2938 if let Err(ExperimentStoreError::Backend(msg)) = res {
2939 assert!(
2940 msg.contains("treatment"),
2941 "expected error message to mention 'treatment', got: {msg}"
2942 );
2943 } else {
2944 panic!("expected Backend error");
2945 }
2946 }
2947}