1use std::collections::HashMap;
51use std::sync::{Arc, RwLock};
52use std::time::{SystemTime, UNIX_EPOCH};
53
54use serde::{Deserialize, Serialize};
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct FlagChangeRecord {
61 pub key: String,
63 pub mutation: String,
65 pub actor: Option<String>,
67 pub timestamp_secs: u64,
69}
70
71impl FlagChangeRecord {
72 fn now(key: &str, mutation: impl Into<String>, actor: Option<&str>) -> Self {
73 let timestamp_secs = SystemTime::now()
74 .duration_since(UNIX_EPOCH)
75 .unwrap_or_default()
76 .as_secs();
77 Self {
78 key: key.to_owned(),
79 mutation: mutation.into(),
80 actor: actor.map(str::to_owned),
81 timestamp_secs,
82 }
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub struct FlagConfig {
99 pub key: String,
101 pub description: Option<String>,
103 pub enabled: bool,
105 pub rollout_pct: u8,
107 pub actor_allowlist: Vec<String>,
109 pub group_allowlist: Vec<String>,
111}
112
113impl FlagConfig {
114 #[must_use]
116 pub fn new(key: impl Into<String>) -> Self {
117 Self {
118 key: key.into(),
119 description: None,
120 enabled: false,
121 rollout_pct: 0,
122 actor_allowlist: Vec::new(),
123 group_allowlist: Vec::new(),
124 }
125 }
126}
127
128pub type GroupResolver = Arc<dyn Fn(&str, &str) -> bool + Send + Sync + 'static>;
135
136#[derive(Debug, thiserror::Error)]
140pub enum FlagStoreError {
141 #[error("flag store backend error: {0}")]
143 Backend(String),
144}
145
146pub trait FlagStore: Send + Sync + 'static {
151 fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError>;
157
158 fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError>;
164
165 fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError>;
173
174 fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError>;
183
184 fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError>;
193
194 fn allow_actor(
200 &self,
201 key: &str,
202 actor_id: &str,
203 actor: Option<&str>,
204 ) -> Result<(), FlagStoreError>;
205
206 fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError>;
212
213 fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError>;
219}
220
221impl FlagStore for Box<dyn FlagStore> {
223 fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
224 (**self).get(key)
225 }
226 fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
227 (**self).list()
228 }
229 fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
230 (**self).enable(key, actor)
231 }
232 fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
233 (**self).disable(key, actor)
234 }
235 fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError> {
236 (**self).set_rollout(key, pct, actor)
237 }
238 fn allow_actor(
239 &self,
240 key: &str,
241 actor_id: &str,
242 actor: Option<&str>,
243 ) -> Result<(), FlagStoreError> {
244 (**self).allow_actor(key, actor_id, actor)
245 }
246 fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
247 (**self).add_group(key, group, actor)
248 }
249 fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
250 (**self).history(key, limit)
251 }
252}
253
254impl<T: FlagStore + ?Sized> FlagStore for Arc<T> {
269 fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
270 (**self).get(key)
271 }
272 fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
273 (**self).list()
274 }
275 fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
276 (**self).enable(key, actor)
277 }
278 fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
279 (**self).disable(key, actor)
280 }
281 fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError> {
282 (**self).set_rollout(key, pct, actor)
283 }
284 fn allow_actor(
285 &self,
286 key: &str,
287 actor_id: &str,
288 actor: Option<&str>,
289 ) -> Result<(), FlagStoreError> {
290 (**self).allow_actor(key, actor_id, actor)
291 }
292 fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
293 (**self).add_group(key, group, actor)
294 }
295 fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
296 (**self).history(key, limit)
297 }
298}
299
300#[derive(Debug, Default)]
307pub struct InMemoryFlagStore {
308 flags: RwLock<HashMap<String, FlagConfig>>,
309 history: RwLock<HashMap<String, Vec<FlagChangeRecord>>>,
310}
311
312impl InMemoryFlagStore {
313 #[must_use]
315 pub fn new() -> Self {
316 Self::default()
317 }
318
319 fn upsert(&self, key: &str, f: impl FnOnce(&mut FlagConfig)) {
320 let mut flags = self.flags.write().unwrap();
321 let flag = flags
322 .entry(key.to_owned())
323 .or_insert_with(|| FlagConfig::new(key));
324 f(flag);
325 drop(flags);
326 }
327
328 fn record(&self, record: FlagChangeRecord) {
329 self.history
330 .write()
331 .unwrap()
332 .entry(record.key.clone())
333 .or_default()
334 .push(record);
335 }
336}
337
338impl FlagStore for InMemoryFlagStore {
339 fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
340 Ok(self.flags.read().unwrap().get(key).cloned())
341 }
342
343 fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
344 let mut flags: Vec<FlagConfig> = self.flags.read().unwrap().values().cloned().collect();
345 flags.sort_by(|a, b| a.key.cmp(&b.key));
346 Ok(flags)
347 }
348
349 fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
350 self.upsert(key, |f| {
351 f.enabled = true;
352 f.rollout_pct = 100;
353 });
354 self.record(FlagChangeRecord::now(key, "enabled", actor));
355 Ok(())
356 }
357
358 fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
359 self.upsert(key, |f| {
360 f.enabled = false;
361 });
362 self.record(FlagChangeRecord::now(key, "disabled", actor));
363 Ok(())
364 }
365
366 fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError> {
367 let pct = pct.min(100);
368 self.upsert(key, |f| {
369 f.enabled = true;
370 f.rollout_pct = pct;
371 });
372 self.record(FlagChangeRecord::now(key, format!("rollout={pct}"), actor));
373 Ok(())
374 }
375
376 fn allow_actor(
377 &self,
378 key: &str,
379 actor_id: &str,
380 actor: Option<&str>,
381 ) -> Result<(), FlagStoreError> {
382 self.upsert(key, |f| {
383 if !f.enabled {
384 f.rollout_pct = 0;
388 }
389 f.enabled = true;
390 if !f.actor_allowlist.contains(&actor_id.to_owned()) {
391 f.actor_allowlist.push(actor_id.to_owned());
392 }
393 });
394 self.record(FlagChangeRecord::now(
395 key,
396 format!("allowed_actor={actor_id}"),
397 actor,
398 ));
399 Ok(())
400 }
401
402 fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
403 self.upsert(key, |f| {
404 if !f.enabled {
405 f.rollout_pct = 0;
407 }
408 f.enabled = true;
409 if !f.group_allowlist.contains(&group.to_owned()) {
410 f.group_allowlist.push(group.to_owned());
411 }
412 });
413 self.record(FlagChangeRecord::now(
414 key,
415 format!("added_group={group}"),
416 actor,
417 ));
418 Ok(())
419 }
420
421 fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
422 Ok(self
423 .history
424 .read()
425 .unwrap()
426 .get(key)
427 .map(|records| records.iter().rev().take(limit).cloned().collect())
428 .unwrap_or_default())
429 }
430}
431
432#[cfg(feature = "db")]
441pub mod pg {
442 use super::{FlagChangeRecord, FlagConfig, FlagStore, FlagStoreError};
443 use diesel::prelude::*;
444 use std::collections::HashMap;
445 use std::sync::RwLock;
446 use std::time::{Duration, Instant};
447
448 #[derive(Debug, Clone, PartialEq, Eq)]
449 enum CacheLookup {
450 Hit(Option<FlagConfig>),
451 Miss,
452 }
453
454 #[derive(Debug, Clone)]
455 struct CachedFlag {
456 value: Option<FlagConfig>,
457 expires_at: Instant,
458 }
459
460 #[derive(Debug)]
467 pub struct PgFlagStore {
468 database_url: String,
469 cache_ttl: Duration,
470 cache: RwLock<HashMap<String, CachedFlag>>,
471 }
472
473 impl Clone for PgFlagStore {
474 fn clone(&self) -> Self {
475 Self::with_cache_ttl(self.database_url.clone(), self.cache_ttl)
476 }
477 }
478
479 impl PgFlagStore {
480 pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(1);
482
483 #[must_use]
485 pub fn new(database_url: impl Into<String>) -> Self {
486 Self::with_cache_ttl(database_url, Self::DEFAULT_CACHE_TTL)
487 }
488
489 #[must_use]
492 pub fn with_cache_ttl(database_url: impl Into<String>, cache_ttl: Duration) -> Self {
493 Self {
494 database_url: database_url.into(),
495 cache_ttl,
496 cache: RwLock::new(HashMap::new()),
497 }
498 }
499
500 #[must_use]
502 pub fn from_database_config(config: &crate::config::DatabaseConfig) -> Option<Self> {
503 config.effective_primary_url().map(Self::new)
504 }
505
506 fn connect(&self) -> Result<diesel::PgConnection, FlagStoreError> {
507 diesel::PgConnection::establish(&self.database_url)
508 .map_err(|e| FlagStoreError::Backend(e.to_string()))
509 }
510
511 fn cached(&self, key: &str) -> CacheLookup {
512 let now = Instant::now();
513 let Ok(cache) = self.cache.read() else {
514 return CacheLookup::Miss;
515 };
516 match cache.get(key) {
517 Some(c) if c.expires_at > now => CacheLookup::Hit(c.value.clone()),
518 _ => CacheLookup::Miss,
519 }
520 }
521
522 fn store_cache(&self, key: &str, value: Option<FlagConfig>) {
523 if self.cache_ttl.is_zero() {
524 return;
525 }
526 let Some(expires_at) = Instant::now().checked_add(self.cache_ttl) else {
527 return;
528 };
529 if let Ok(mut cache) = self.cache.write() {
530 cache.insert(key.to_owned(), CachedFlag { value, expires_at });
531 }
532 }
533
534 fn invalidate(&self, key: &str) {
535 if let Ok(mut cache) = self.cache.write() {
536 cache.remove(key);
537 }
538 }
539
540 fn upsert_flag(
541 conn: &mut diesel::PgConnection,
542 key: &str,
543 ) -> Result<(), diesel::result::Error> {
544 diesel::sql_query(
545 "INSERT INTO autumn_feature_flags (key) VALUES ($1) \
546 ON CONFLICT (key) DO NOTHING",
547 )
548 .bind::<diesel::sql_types::Text, _>(key)
549 .execute(conn)?;
550 Ok(())
551 }
552
553 fn notify(conn: &mut diesel::PgConnection, key: &str) -> Result<(), diesel::result::Error> {
554 diesel::sql_query("SELECT pg_notify('autumn_flags', $1)")
555 .bind::<diesel::sql_types::Text, _>(key)
556 .execute(conn)?;
557 Ok(())
558 }
559
560 pub fn spawn_poll_listener(
574 store: std::sync::Arc<Self>,
575 poll_interval: std::time::Duration,
576 ) -> std::thread::JoinHandle<()> {
577 std::thread::spawn(move || {
578 const OVERLAP_SECS: i64 = 5;
591 let now_secs = || {
592 i64::try_from(
593 std::time::SystemTime::now()
594 .duration_since(std::time::UNIX_EPOCH)
595 .unwrap_or_default()
596 .as_secs(),
597 )
598 .unwrap_or(i64::MAX)
599 };
600 let mut last_polled_secs: i64 = now_secs() - OVERLAP_SECS;
605
606 loop {
607 std::thread::sleep(poll_interval);
608 let new_horizon = now_secs() - OVERLAP_SECS;
611 if let Ok(mut conn) = store.connect() {
612 let rows: Vec<ChangeKeyRow> = diesel::sql_query(
613 "SELECT DISTINCT key FROM feature_flag_changes \
614 WHERE changed_at > to_timestamp($1)",
615 )
616 .bind::<diesel::sql_types::BigInt, _>(last_polled_secs)
617 .load::<ChangeKeyRow>(&mut conn)
618 .unwrap_or_default();
619
620 for row in rows {
621 store.invalidate(&row.key);
622 }
623 }
624 last_polled_secs = new_horizon;
625 }
626 })
627 }
628 }
629
630 #[derive(diesel::QueryableByName)]
631 struct ChangeKeyRow {
632 #[diesel(sql_type = diesel::sql_types::Text)]
633 key: String,
634 }
635
636 #[derive(diesel::QueryableByName)]
637 struct FlagRow {
638 #[diesel(sql_type = diesel::sql_types::Text)]
639 key: String,
640 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
641 description: Option<String>,
642 #[diesel(sql_type = diesel::sql_types::Bool)]
643 enabled: bool,
644 #[diesel(sql_type = diesel::sql_types::SmallInt)]
645 rollout_pct: i16,
646 #[diesel(sql_type = diesel::sql_types::Text)]
647 actor_allowlist: String,
648 #[diesel(sql_type = diesel::sql_types::Text)]
649 group_allowlist: String,
650 }
651
652 impl FlagRow {
653 fn into_config(self) -> FlagConfig {
654 let actor_allowlist: Vec<String> =
655 serde_json::from_str(&self.actor_allowlist).unwrap_or_default();
656 let group_allowlist: Vec<String> =
657 serde_json::from_str(&self.group_allowlist).unwrap_or_default();
658 FlagConfig {
659 key: self.key,
660 description: self.description,
661 enabled: self.enabled,
662 rollout_pct: u8::try_from(self.rollout_pct.clamp(0, 100)).unwrap_or(0),
663 actor_allowlist,
664 group_allowlist,
665 }
666 }
667 }
668
669 #[derive(diesel::QueryableByName)]
670 struct HistoryRow {
671 #[diesel(sql_type = diesel::sql_types::Text)]
672 key: String,
673 #[diesel(sql_type = diesel::sql_types::Text)]
674 mutation: String,
675 #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
676 actor: Option<String>,
677 #[diesel(sql_type = diesel::sql_types::BigInt)]
678 timestamp_secs: i64,
679 }
680
681 impl FlagStore for PgFlagStore {
682 fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
683 if let CacheLookup::Hit(v) = self.cached(key) {
684 return Ok(v);
685 }
686 let mut conn = self.connect()?;
687 let result = diesel::sql_query(
688 "SELECT key, description, enabled, rollout_pct, \
689 actor_allowlist, group_allowlist \
690 FROM autumn_feature_flags WHERE key = $1",
691 )
692 .bind::<diesel::sql_types::Text, _>(key)
693 .get_result::<FlagRow>(&mut conn)
694 .optional()
695 .map(|r| r.map(FlagRow::into_config))
696 .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
697
698 self.store_cache(key, result.clone());
699 Ok(result)
700 }
701
702 fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
703 let mut conn = self.connect()?;
704 diesel::sql_query(
705 "SELECT key, description, enabled, rollout_pct, \
706 actor_allowlist, group_allowlist \
707 FROM autumn_feature_flags ORDER BY key",
708 )
709 .load::<FlagRow>(&mut conn)
710 .map(|rows| rows.into_iter().map(FlagRow::into_config).collect())
711 .map_err(|e| FlagStoreError::Backend(e.to_string()))
712 }
713
714 fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
715 let mut conn = self.connect()?;
716 conn.transaction::<(), diesel::result::Error, _>(|conn| {
717 Self::upsert_flag(conn, key)?;
718 diesel::sql_query(
719 "UPDATE autumn_feature_flags \
720 SET enabled = true, rollout_pct = 100, updated_at = NOW() \
721 WHERE key = $1",
722 )
723 .bind::<diesel::sql_types::Text, _>(key)
724 .execute(conn)?;
725 diesel::sql_query(
726 "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
727 )
728 .bind::<diesel::sql_types::Text, _>(key)
729 .bind::<diesel::sql_types::Text, _>("enabled")
730 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
731 actor.map(str::to_owned),
732 )
733 .execute(conn)?;
734 Self::notify(conn, key)?;
735 Ok(())
736 })
737 .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
738 self.invalidate(key);
739 Ok(())
740 }
741
742 fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
743 let mut conn = self.connect()?;
744 conn.transaction::<(), diesel::result::Error, _>(|conn| {
745 Self::upsert_flag(conn, key)?;
746 diesel::sql_query(
747 "UPDATE autumn_feature_flags SET enabled = false, updated_at = NOW() \
748 WHERE key = $1",
749 )
750 .bind::<diesel::sql_types::Text, _>(key)
751 .execute(conn)?;
752 diesel::sql_query(
753 "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
754 )
755 .bind::<diesel::sql_types::Text, _>(key)
756 .bind::<diesel::sql_types::Text, _>("disabled")
757 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
758 actor.map(str::to_owned),
759 )
760 .execute(conn)?;
761 Self::notify(conn, key)?;
762 Ok(())
763 })
764 .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
765 self.invalidate(key);
766 Ok(())
767 }
768
769 fn set_rollout(
770 &self,
771 key: &str,
772 pct: u8,
773 actor: Option<&str>,
774 ) -> Result<(), FlagStoreError> {
775 let pct = i16::from(pct.min(100));
776 let mut conn = self.connect()?;
777 conn.transaction::<(), diesel::result::Error, _>(|conn| {
778 Self::upsert_flag(conn, key)?;
779 diesel::sql_query(
780 "UPDATE autumn_feature_flags \
781 SET enabled = true, rollout_pct = $2, updated_at = NOW() \
782 WHERE key = $1",
783 )
784 .bind::<diesel::sql_types::Text, _>(key)
785 .bind::<diesel::sql_types::SmallInt, _>(pct)
786 .execute(conn)?;
787 let mutation = format!("rollout={pct}");
788 diesel::sql_query(
789 "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
790 )
791 .bind::<diesel::sql_types::Text, _>(key)
792 .bind::<diesel::sql_types::Text, _>(&mutation)
793 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
794 actor.map(str::to_owned),
795 )
796 .execute(conn)?;
797 Self::notify(conn, key)?;
798 Ok(())
799 })
800 .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
801 self.invalidate(key);
802 Ok(())
803 }
804
805 fn allow_actor(
806 &self,
807 key: &str,
808 actor_id: &str,
809 actor: Option<&str>,
810 ) -> Result<(), FlagStoreError> {
811 let mut conn = self.connect()?;
812 conn.transaction::<(), diesel::result::Error, _>(|conn| {
813 Self::upsert_flag(conn, key)?;
814 diesel::sql_query(
815 "UPDATE autumn_feature_flags \
818 SET enabled = true, \
819 rollout_pct = CASE WHEN NOT enabled THEN 0 ELSE rollout_pct END, \
820 actor_allowlist = (
821 SELECT json_agg(DISTINCT elem) \
822 FROM (
823 SELECT jsonb_array_elements_text(actor_allowlist::jsonb) AS elem \
824 UNION SELECT $2
825 ) t \
826 )::text, \
827 updated_at = NOW() \
828 WHERE key = $1",
829 )
830 .bind::<diesel::sql_types::Text, _>(key)
831 .bind::<diesel::sql_types::Text, _>(actor_id)
832 .execute(conn)?;
833 let mutation = format!("allowed_actor={actor_id}");
834 diesel::sql_query(
835 "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
836 )
837 .bind::<diesel::sql_types::Text, _>(key)
838 .bind::<diesel::sql_types::Text, _>(&mutation)
839 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
840 actor.map(str::to_owned),
841 )
842 .execute(conn)?;
843 Self::notify(conn, key)?;
844 Ok(())
845 })
846 .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
847 self.invalidate(key);
848 Ok(())
849 }
850
851 fn add_group(
852 &self,
853 key: &str,
854 group: &str,
855 actor: Option<&str>,
856 ) -> Result<(), FlagStoreError> {
857 let mut conn = self.connect()?;
858 conn.transaction::<(), diesel::result::Error, _>(|conn| {
859 Self::upsert_flag(conn, key)?;
860 diesel::sql_query(
861 "UPDATE autumn_feature_flags \
863 SET enabled = true, \
864 rollout_pct = CASE WHEN NOT enabled THEN 0 ELSE rollout_pct END, \
865 group_allowlist = (
866 SELECT json_agg(DISTINCT elem) \
867 FROM (
868 SELECT jsonb_array_elements_text(group_allowlist::jsonb) AS elem \
869 UNION SELECT $2
870 ) t \
871 )::text, \
872 updated_at = NOW() \
873 WHERE key = $1",
874 )
875 .bind::<diesel::sql_types::Text, _>(key)
876 .bind::<diesel::sql_types::Text, _>(group)
877 .execute(conn)?;
878 let mutation = format!("added_group={group}");
879 diesel::sql_query(
880 "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
881 )
882 .bind::<diesel::sql_types::Text, _>(key)
883 .bind::<diesel::sql_types::Text, _>(&mutation)
884 .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
885 actor.map(str::to_owned),
886 )
887 .execute(conn)?;
888 Self::notify(conn, key)?;
889 Ok(())
890 })
891 .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
892 self.invalidate(key);
893 Ok(())
894 }
895
896 fn history(
897 &self,
898 key: &str,
899 limit: usize,
900 ) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
901 let limit = i64::try_from(limit).unwrap_or(i64::MAX);
902 let mut conn = self.connect()?;
903 diesel::sql_query(
904 "SELECT key, mutation, actor, \
905 EXTRACT(EPOCH FROM changed_at)::bigint AS timestamp_secs \
906 FROM feature_flag_changes \
907 WHERE key = $1 \
908 ORDER BY changed_at DESC LIMIT $2",
909 )
910 .bind::<diesel::sql_types::Text, _>(key)
911 .bind::<diesel::sql_types::BigInt, _>(limit)
912 .load::<HistoryRow>(&mut conn)
913 .map(|rows| {
914 rows.into_iter()
915 .map(|r| FlagChangeRecord {
916 key: r.key,
917 mutation: r.mutation,
918 actor: r.actor,
919 timestamp_secs: u64::try_from(r.timestamp_secs).unwrap_or(0),
920 })
921 .collect()
922 })
923 .map_err(|e| FlagStoreError::Backend(e.to_string()))
924 }
925 }
926
927 #[cfg(test)]
928 mod pg_tests {
929 use super::*;
930
931 #[test]
932 fn pg_store_exposes_database_url() {
933 let store = PgFlagStore::new("postgres://localhost/myapp");
934 assert_eq!(store.database_url, "postgres://localhost/myapp");
935 }
936
937 #[test]
938 fn pg_store_default_cache_ttl_is_one_second() {
939 let store = PgFlagStore::new("postgres://localhost/myapp");
940 assert_eq!(store.cache_ttl, PgFlagStore::DEFAULT_CACHE_TTL);
941 }
942
943 #[test]
944 fn pg_store_cache_miss_on_empty_store() {
945 let store = PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::ZERO);
946 assert_eq!(store.cached("my_flag"), CacheLookup::Miss);
947 }
948
949 #[test]
950 fn pg_store_cache_hit_returns_stored_value() {
951 let store =
952 PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
953 store.store_cache("my_flag", Some(FlagConfig::new("my_flag")));
954 assert!(matches!(store.cached("my_flag"), CacheLookup::Hit(Some(_))));
955 }
956
957 #[test]
958 fn pg_store_cache_hit_none_for_absent_flag() {
959 let store =
960 PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
961 store.store_cache("absent", None);
962 assert_eq!(store.cached("absent"), CacheLookup::Hit(None));
963 }
964
965 #[test]
966 fn pg_store_cache_expired_returns_miss() {
967 let store = PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::ZERO);
968 store.store_cache("expired", Some(FlagConfig::new("expired")));
969 assert_eq!(store.cached("expired"), CacheLookup::Miss);
971 }
972
973 #[test]
974 fn pg_store_invalidate_removes_from_cache() {
975 let store =
976 PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
977 store.store_cache("flag", Some(FlagConfig::new("flag")));
978 assert!(matches!(store.cached("flag"), CacheLookup::Hit(Some(_))));
979 store.invalidate("flag");
980 assert_eq!(store.cached("flag"), CacheLookup::Miss);
981 }
982
983 #[test]
984 fn pg_store_with_cache_ttl_sets_custom_ttl() {
985 let ttl = Duration::from_secs(30);
986 let store = PgFlagStore::with_cache_ttl("postgres://localhost/myapp", ttl);
987 assert_eq!(store.cache_ttl, ttl);
988 }
989
990 #[test]
991 fn pg_store_clone_has_independent_cache() {
992 let store =
995 PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
996 store.store_cache("cached", Some(FlagConfig::new("cached")));
997 let cloned = store.clone();
998 assert_eq!(cloned.cached("cached"), CacheLookup::Miss);
1000 assert!(matches!(store.cached("cached"), CacheLookup::Hit(Some(_))));
1002 }
1003 }
1004}
1005
1006fn fnv1a_64(data: &[u8]) -> u64 {
1013 const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
1014 const FNV_PRIME: u64 = 1_099_511_628_211;
1015 let mut hash = FNV_OFFSET;
1016 for &byte in data {
1017 hash ^= u64(byte);
1018 hash = hash.wrapping_mul(FNV_PRIME);
1019 }
1020 hash
1021}
1022
1023#[allow(clippy::cast_lossless)]
1024const fn u64(v: u8) -> u64 {
1025 v as u64
1026}
1027
1028#[must_use]
1033pub fn rollout_bucket(flag_key: &str, actor_id: &str) -> u8 {
1034 let key = format!("{flag_key}:{actor_id}");
1035 let hash = fnv1a_64(key.as_bytes());
1036 u8::try_from(hash % 100).unwrap_or(0)
1037}
1038
1039#[derive(Clone)]
1051pub struct FeatureFlagService {
1052 store: Arc<dyn FlagStore>,
1053 group_resolver: Option<GroupResolver>,
1054}
1055
1056impl std::fmt::Debug for FeatureFlagService {
1057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058 f.debug_struct("FeatureFlagService").finish_non_exhaustive()
1059 }
1060}
1061
1062impl FeatureFlagService {
1063 #[must_use]
1065 pub fn new(store: Arc<dyn FlagStore>) -> Self {
1066 Self {
1067 store,
1068 group_resolver: None,
1069 }
1070 }
1071
1072 #[must_use]
1074 pub fn with_group_resolver(mut self, resolver: GroupResolver) -> Self {
1075 self.group_resolver = Some(resolver);
1076 self
1077 }
1078
1079 #[must_use]
1083 pub fn is_enabled(&self, flag_key: &str, actor_id: Option<&str>) -> bool {
1084 let Ok(Some(flag)) = self.store.get(flag_key) else {
1085 return false;
1086 };
1087 self.evaluate(&flag, actor_id)
1088 }
1089
1090 fn evaluate(&self, flag: &FlagConfig, actor_id: Option<&str>) -> bool {
1091 if !flag.enabled {
1093 return false;
1094 }
1095
1096 if flag.rollout_pct >= 100 {
1098 return true;
1099 }
1100
1101 if let Some(actor) = actor_id
1103 && flag.actor_allowlist.iter().any(|a| a.as_str() == actor)
1104 {
1105 return true;
1106 }
1107
1108 if let (Some(actor), Some(resolver)) = (actor_id, &self.group_resolver) {
1110 for group in &flag.group_allowlist {
1111 if resolver(actor, group) {
1112 return true;
1113 }
1114 }
1115 }
1116
1117 if flag.rollout_pct > 0
1119 && let Some(actor) = actor_id
1120 {
1121 let bucket = rollout_bucket(&flag.key, actor);
1122 return bucket < flag.rollout_pct;
1123 }
1124
1125 false
1126 }
1127
1128 pub fn enable(&self, flag_key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
1134 self.store.enable(flag_key, actor)
1135 }
1136
1137 pub fn disable(&self, flag_key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
1143 self.store.disable(flag_key, actor)
1144 }
1145
1146 pub fn set_rollout(
1152 &self,
1153 flag_key: &str,
1154 pct: u8,
1155 actor: Option<&str>,
1156 ) -> Result<(), FlagStoreError> {
1157 self.store.set_rollout(flag_key, pct, actor)
1158 }
1159
1160 pub fn allow_actor(
1166 &self,
1167 flag_key: &str,
1168 actor_id: &str,
1169 actor: Option<&str>,
1170 ) -> Result<(), FlagStoreError> {
1171 self.store.allow_actor(flag_key, actor_id, actor)
1172 }
1173
1174 pub fn add_group(
1180 &self,
1181 flag_key: &str,
1182 group: &str,
1183 actor: Option<&str>,
1184 ) -> Result<(), FlagStoreError> {
1185 self.store.add_group(flag_key, group, actor)
1186 }
1187
1188 pub fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
1194 self.store.list()
1195 }
1196
1197 pub fn history(
1203 &self,
1204 flag_key: &str,
1205 limit: usize,
1206 ) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
1207 self.store.history(flag_key, limit)
1208 }
1209}
1210
1211pub struct Flags {
1232 service: FeatureFlagService,
1233 actor_id: Option<String>,
1234}
1235
1236impl Flags {
1237 #[must_use]
1239 pub fn enabled(&self, flag_key: &str) -> bool {
1240 self.service.is_enabled(flag_key, self.actor_id.as_deref())
1241 }
1242
1243 #[must_use]
1245 pub const fn service(&self) -> &FeatureFlagService {
1246 &self.service
1247 }
1248}
1249
1250impl axum::extract::FromRequestParts<crate::AppState> for Flags {
1251 type Rejection = crate::AutumnError;
1252
1253 async fn from_request_parts(
1254 parts: &mut axum::http::request::Parts,
1255 state: &crate::AppState,
1256 ) -> Result<Self, Self::Rejection> {
1257 let service = state
1258 .extension::<FeatureFlagService>()
1259 .map(|arc| (*arc).clone())
1260 .ok_or_else(|| {
1261 crate::AutumnError::internal_server_error_msg(
1262 "feature flag service not registered; \
1263 install a FlagStore via AppBuilder::with_flag_store()",
1264 )
1265 })?;
1266
1267 let actor_id = if let Some(session) = parts.extensions.get::<crate::session::Session>() {
1269 session.get(state.auth_session_key()).await
1270 } else {
1271 None
1272 };
1273
1274 Ok(Self { service, actor_id })
1275 }
1276}
1277
1278#[cfg(test)]
1281mod tests {
1282 use super::*;
1283
1284 fn make_svc() -> FeatureFlagService {
1287 FeatureFlagService::new(Arc::new(InMemoryFlagStore::new()))
1288 }
1289
1290 #[test]
1293 fn unknown_flag_returns_false() {
1294 let svc = make_svc();
1295 assert!(!svc.is_enabled("nonexistent", Some("user:1")));
1296 }
1297
1298 #[test]
1299 fn globally_enabled_flag_returns_true_for_any_actor() {
1300 let svc = make_svc();
1301 svc.enable("my_flag", None).unwrap();
1302 assert!(svc.is_enabled("my_flag", Some("user:1")));
1303 assert!(svc.is_enabled("my_flag", Some("user:99")));
1304 assert!(svc.is_enabled("my_flag", None));
1305 }
1306
1307 #[test]
1308 fn globally_disabled_flag_returns_false_for_any_actor() {
1309 let svc = make_svc();
1310 svc.enable("my_flag", None).unwrap();
1311 svc.disable("my_flag", None).unwrap();
1312 assert!(!svc.is_enabled("my_flag", Some("user:1")));
1313 assert!(!svc.is_enabled("my_flag", None));
1314 }
1315
1316 #[test]
1319 fn actor_allowlist_enables_specific_actor() {
1320 let svc = make_svc();
1321 svc.allow_actor("beta_feature", "user:42", None).unwrap();
1322 assert!(svc.is_enabled("beta_feature", Some("user:42")));
1323 assert!(!svc.is_enabled("beta_feature", Some("user:1")));
1324 }
1325
1326 #[test]
1327 fn group_resolver_enables_group_members() {
1328 let svc = FeatureFlagService::new(Arc::new(InMemoryFlagStore::new())).with_group_resolver(
1329 Arc::new(|actor_id: &str, group: &str| {
1330 group == "staff" && actor_id.starts_with("staff:")
1332 }),
1333 );
1334 svc.add_group("internal_feature", "staff", None).unwrap();
1335 assert!(svc.is_enabled("internal_feature", Some("staff:alice")));
1336 assert!(!svc.is_enabled("internal_feature", Some("user:bob")));
1337 }
1338
1339 #[test]
1340 fn percent_rollout_at_0_disables_for_all_actors() {
1341 let svc = make_svc();
1342 svc.set_rollout("gradual", 0, None).unwrap();
1343 for i in 0..50_u32 {
1345 let actor = format!("user:{i}");
1346 assert!(
1347 !svc.is_enabled("gradual", Some(&actor)),
1348 "expected disabled for {actor} at 0% rollout"
1349 );
1350 }
1351 }
1352
1353 #[test]
1354 fn percent_rollout_at_100_enables_for_all_actors() {
1355 let svc = make_svc();
1356 svc.set_rollout("gradual", 100, None).unwrap();
1357 for i in 0..50_u32 {
1358 let actor = format!("user:{i}");
1359 assert!(
1360 svc.is_enabled("gradual", Some(&actor)),
1361 "expected enabled for {actor} at 100% rollout"
1362 );
1363 }
1364 }
1365
1366 #[test]
1367 fn percent_rollout_at_50_enables_roughly_half() {
1368 let svc = make_svc();
1369 svc.set_rollout("rollout_flag", 50, None).unwrap();
1370 let enabled_count = (0..200_u32)
1371 .filter(|i| svc.is_enabled("rollout_flag", Some(&format!("user:{i}"))))
1372 .count();
1373 assert!(
1375 (80..=120).contains(&enabled_count),
1376 "expected ~100 enabled actors, got {enabled_count}"
1377 );
1378 }
1379
1380 #[test]
1383 fn rollout_bucket_is_stable_across_calls() {
1384 let b1 = rollout_bucket("my_flag", "user:1");
1385 let b2 = rollout_bucket("my_flag", "user:1");
1386 assert_eq!(b1, b2, "bucket must be deterministic");
1387 }
1388
1389 #[test]
1390 fn rollout_bucket_differs_for_different_actors() {
1391 let buckets: std::collections::HashSet<u8> = (0..50_u32)
1394 .map(|i| rollout_bucket("flag", &format!("user:{i}")))
1395 .collect();
1396 assert!(
1397 buckets.len() > 10,
1398 "expected diverse buckets, got {}: {buckets:?}",
1399 buckets.len()
1400 );
1401 }
1402
1403 #[test]
1404 fn rollout_bucket_in_range_0_to_99() {
1405 for i in 0..1000_u32 {
1406 let b = rollout_bucket("flag", &format!("actor:{i}"));
1407 assert!(b < 100, "bucket out of range: {b}");
1408 }
1409 }
1410
1411 #[test]
1412 fn percent_rollout_same_actor_same_flag_always_same_result() {
1413 let svc = make_svc();
1414 svc.set_rollout("stable_flag", 42, None).unwrap();
1415 let first = svc.is_enabled("stable_flag", Some("user:123"));
1416 for _ in 0..10 {
1417 assert_eq!(
1418 svc.is_enabled("stable_flag", Some("user:123")),
1419 first,
1420 "rollout result must not flip between calls"
1421 );
1422 }
1423 }
1424
1425 #[test]
1428 fn in_memory_store_returns_none_for_unknown_flag() {
1429 let store = InMemoryFlagStore::new();
1430 assert!(store.get("unknown").unwrap().is_none());
1431 }
1432
1433 #[test]
1434 fn in_memory_store_list_is_sorted() {
1435 let store = InMemoryFlagStore::new();
1436 store.enable("zebra", None).unwrap();
1437 store.enable("alpha", None).unwrap();
1438 store.enable("mango", None).unwrap();
1439 let keys: Vec<String> = store.list().unwrap().into_iter().map(|f| f.key).collect();
1440 assert_eq!(keys, vec!["alpha", "mango", "zebra"]);
1441 }
1442
1443 #[test]
1444 fn in_memory_store_enable_creates_flag_if_absent() {
1445 let store = InMemoryFlagStore::new();
1446 store.enable("new_flag", None).unwrap();
1447 let flag = store.get("new_flag").unwrap().unwrap();
1448 assert!(flag.enabled);
1449 }
1450
1451 #[test]
1452 fn in_memory_store_disable_sets_enabled_false() {
1453 let store = InMemoryFlagStore::new();
1454 store.enable("f", None).unwrap();
1455 store.disable("f", None).unwrap();
1456 assert!(!store.get("f").unwrap().unwrap().enabled);
1457 }
1458
1459 #[test]
1460 fn in_memory_store_allow_actor_does_not_duplicate() {
1461 let store = InMemoryFlagStore::new();
1462 store.allow_actor("f", "user:1", None).unwrap();
1463 store.allow_actor("f", "user:1", None).unwrap();
1464 let flag = store.get("f").unwrap().unwrap();
1465 assert_eq!(flag.actor_allowlist.len(), 1);
1466 }
1467
1468 #[test]
1469 fn in_memory_store_add_group_does_not_duplicate() {
1470 let store = InMemoryFlagStore::new();
1471 store.add_group("f", "staff", None).unwrap();
1472 store.add_group("f", "staff", None).unwrap();
1473 let flag = store.get("f").unwrap().unwrap();
1474 assert_eq!(flag.group_allowlist.len(), 1);
1475 }
1476
1477 #[test]
1480 fn mutations_are_recorded_in_history() {
1481 let svc = make_svc();
1482 svc.enable("tracked_flag", Some("ops@example.com")).unwrap();
1483 svc.disable("tracked_flag", Some("ops@example.com"))
1484 .unwrap();
1485 let history = svc.history("tracked_flag", 10).unwrap();
1486 assert_eq!(history.len(), 2, "two mutations should be recorded");
1487 assert_eq!(history[0].mutation, "disabled");
1488 assert_eq!(history[0].actor.as_deref(), Some("ops@example.com"));
1489 assert_eq!(history[1].mutation, "enabled");
1490 }
1491
1492 #[test]
1493 fn history_respects_limit() {
1494 let svc = make_svc();
1495 for _ in 0..5 {
1496 svc.enable("limited_flag", None).unwrap();
1497 }
1498 let history = svc.history("limited_flag", 3).unwrap();
1499 assert_eq!(history.len(), 3);
1500 }
1501
1502 #[test]
1503 fn history_empty_for_unknown_flag() {
1504 let svc = make_svc();
1505 let history = svc.history("ghost_flag", 10).unwrap();
1506 assert!(history.is_empty());
1507 }
1508
1509 #[test]
1510 fn rollout_mutation_recorded_with_pct() {
1511 let svc = make_svc();
1512 svc.set_rollout("roll", 25, Some("cli")).unwrap();
1513 let history = svc.history("roll", 1).unwrap();
1514 assert_eq!(history[0].mutation, "rollout=25");
1515 assert_eq!(history[0].actor.as_deref(), Some("cli"));
1516 }
1517
1518 #[test]
1519 fn allow_actor_mutation_recorded() {
1520 let svc = make_svc();
1521 svc.allow_actor("f", "user:7", Some("cli")).unwrap();
1522 let h = svc.history("f", 1).unwrap();
1523 assert_eq!(h[0].mutation, "allowed_actor=user:7");
1524 }
1525
1526 #[test]
1529 fn flag_config_new_defaults_to_disabled() {
1530 let f = FlagConfig::new("my_flag");
1531 assert_eq!(f.key, "my_flag");
1532 assert!(!f.enabled);
1533 assert_eq!(f.rollout_pct, 0);
1534 assert!(f.actor_allowlist.is_empty());
1535 assert!(f.group_allowlist.is_empty());
1536 }
1537
1538 #[test]
1541 fn set_rollout_clamps_to_100() {
1542 let store = InMemoryFlagStore::new();
1543 store.set_rollout("f", 200, None).unwrap();
1544 assert_eq!(store.get("f").unwrap().unwrap().rollout_pct, 100);
1545 }
1546
1547 #[test]
1550 fn disable_kills_flag_even_when_rollout_is_100_percent() {
1551 let svc = make_svc();
1552 svc.set_rollout("roll_flag", 100, None).unwrap();
1553 svc.disable("roll_flag", None).unwrap();
1554 for i in 0..20_u32 {
1555 assert!(
1556 !svc.is_enabled("roll_flag", Some(&format!("user:{i}"))),
1557 "disable() must override rollout for actor user:{i}"
1558 );
1559 }
1560 assert!(!svc.is_enabled("roll_flag", None));
1561 }
1562
1563 #[test]
1564 fn disable_kills_flag_even_when_actor_is_in_allowlist() {
1565 let svc = make_svc();
1566 svc.allow_actor("guarded", "user:42", None).unwrap();
1567 svc.disable("guarded", None).unwrap();
1568 assert!(
1569 !svc.is_enabled("guarded", Some("user:42")),
1570 "disable() must override actor allowlist"
1571 );
1572 }
1573
1574 #[test]
1575 fn enable_after_disable_restores_rollout_config() {
1576 let svc = make_svc();
1577 svc.set_rollout("roll_flag", 50, None).unwrap();
1578 svc.disable("roll_flag", None).unwrap();
1579 svc.enable("roll_flag", None).unwrap();
1582 assert!(svc.is_enabled("roll_flag", None));
1583 assert!(svc.is_enabled("roll_flag", Some("user:1")));
1584 }
1585
1586 #[test]
1589 fn allow_actor_after_kill_switch_does_not_restore_global_rollout() {
1590 let svc = make_svc();
1594 svc.enable("targeted", None).unwrap(); svc.disable("targeted", None).unwrap(); svc.allow_actor("targeted", "user:42", None).unwrap(); assert!(
1599 svc.is_enabled("targeted", Some("user:42")),
1600 "allowlisted actor must see the flag"
1601 );
1602 for i in [1_u32, 5, 10, 99] {
1604 let actor = format!("user:{i}");
1605 assert!(
1606 !svc.is_enabled("targeted", Some(&actor)),
1607 "non-allowlisted actor {actor} must NOT see the flag after allowlist-only re-enable"
1608 );
1609 }
1610 }
1611
1612 #[test]
1613 fn allow_actor_on_active_rollout_preserves_rollout_pct() {
1614 let svc = make_svc();
1617 svc.set_rollout("staged", 50, None).unwrap(); svc.allow_actor("staged", "user:42", None).unwrap();
1619
1620 let store = InMemoryFlagStore::new();
1622 store.set_rollout("staged", 50, None).unwrap();
1623 store.allow_actor("staged", "user:42", None).unwrap();
1624 let flag = store.get("staged").unwrap().unwrap();
1625 assert_eq!(
1626 flag.rollout_pct, 50,
1627 "rollout_pct must be preserved when flag was already enabled"
1628 );
1629 assert!(flag.actor_allowlist.contains(&"user:42".to_owned()));
1630 }
1631
1632 #[test]
1635 fn arc_flag_store_delegates_get() {
1636 let store = Arc::new(InMemoryFlagStore::new());
1637 store.enable("arc_flag", None).unwrap();
1638 let arc_store: Arc<dyn FlagStore> = store;
1639 let flag = arc_store.get("arc_flag").unwrap().unwrap();
1640 assert!(flag.enabled);
1641 }
1642
1643 #[test]
1644 fn arc_flag_store_delegates_list() {
1645 let store = Arc::new(InMemoryFlagStore::new());
1646 store.enable("f1", None).unwrap();
1647 store.enable("f2", None).unwrap();
1648 let arc_store: Arc<dyn FlagStore> = store;
1649 let flags = arc_store.list().unwrap();
1650 assert_eq!(flags.len(), 2);
1651 }
1652
1653 #[test]
1654 fn arc_flag_store_delegates_enable_and_disable() {
1655 let store = Arc::new(InMemoryFlagStore::new());
1656 let arc_store: Arc<dyn FlagStore> = store;
1657 arc_store.enable("f", None).unwrap();
1658 assert!(arc_store.get("f").unwrap().unwrap().enabled);
1659 arc_store.disable("f", None).unwrap();
1660 assert!(!arc_store.get("f").unwrap().unwrap().enabled);
1661 }
1662
1663 #[test]
1664 fn arc_flag_store_delegates_set_rollout() {
1665 let store = Arc::new(InMemoryFlagStore::new());
1666 let arc_store: Arc<dyn FlagStore> = store;
1667 arc_store.set_rollout("f", 42, None).unwrap();
1668 let flag = arc_store.get("f").unwrap().unwrap();
1669 assert_eq!(flag.rollout_pct, 42);
1670 }
1671
1672 #[test]
1673 fn arc_flag_store_delegates_allow_actor() {
1674 let store = Arc::new(InMemoryFlagStore::new());
1675 let arc_store: Arc<dyn FlagStore> = store;
1676 arc_store.allow_actor("f", "user:1", None).unwrap();
1677 let flag = arc_store.get("f").unwrap().unwrap();
1678 assert!(flag.actor_allowlist.contains(&"user:1".to_owned()));
1679 }
1680
1681 #[test]
1682 fn arc_flag_store_delegates_add_group() {
1683 let store = Arc::new(InMemoryFlagStore::new());
1684 let arc_store: Arc<dyn FlagStore> = store;
1685 arc_store.add_group("f", "beta_testers", None).unwrap();
1686 let flag = arc_store.get("f").unwrap().unwrap();
1687 assert!(flag.group_allowlist.contains(&"beta_testers".to_owned()));
1688 }
1689
1690 #[test]
1691 fn arc_flag_store_delegates_history() {
1692 let store = Arc::new(InMemoryFlagStore::new());
1693 let arc_store: Arc<dyn FlagStore> = store;
1694 arc_store.enable("f", Some("cli")).unwrap();
1695 let history = arc_store.history("f", 10).unwrap();
1696 assert_eq!(history.len(), 1);
1697 assert_eq!(history[0].mutation, "enabled");
1698 }
1699
1700 #[test]
1703 fn box_flag_store_delegates_all_operations() {
1704 let store = InMemoryFlagStore::new();
1705 let boxed: Box<dyn FlagStore> = Box::new(store);
1706 boxed.enable("f", None).unwrap();
1707 assert!(boxed.get("f").unwrap().unwrap().enabled);
1708 boxed.set_rollout("g", 25, Some("cli")).unwrap();
1709 assert_eq!(boxed.get("g").unwrap().unwrap().rollout_pct, 25);
1710 boxed.allow_actor("h", "user:1", None).unwrap();
1711 boxed.add_group("h", "staff", None).unwrap();
1712 let flags = boxed.list().unwrap();
1713 assert_eq!(flags.len(), 3);
1715 let hist = boxed.history("f", 5).unwrap();
1716 assert_eq!(hist[0].mutation, "enabled");
1717 boxed.disable("f", None).unwrap();
1718 assert!(!boxed.get("f").unwrap().unwrap().enabled);
1719 }
1720
1721 #[test]
1724 fn flag_store_error_displays_message() {
1725 let err = FlagStoreError::Backend("connection refused".to_owned());
1726 assert_eq!(
1727 err.to_string(),
1728 "flag store backend error: connection refused"
1729 );
1730 }
1731
1732 #[test]
1735 fn flag_config_clone_is_equal_to_original() {
1736 let mut f = FlagConfig::new("cloned");
1737 f.enabled = true;
1738 f.rollout_pct = 50;
1739 f.actor_allowlist = vec!["user:1".to_owned()];
1740 let g = f.clone();
1741 assert_eq!(f, g);
1742 }
1743
1744 #[test]
1747 fn rollout_with_no_actor_returns_false() {
1748 let svc = make_svc();
1751 svc.set_rollout("gradual", 99, None).unwrap();
1752 assert!(
1753 !svc.is_enabled("gradual", None),
1754 "percent rollout must not fire for anonymous (None) actor"
1755 );
1756 }
1757
1758 #[test]
1759 fn group_resolver_with_no_actor_does_not_panic() {
1760 let svc = FeatureFlagService::new(Arc::new(InMemoryFlagStore::new()))
1761 .with_group_resolver(Arc::new(|_: &str, _: &str| true));
1762 svc.add_group("f", "everyone", None).unwrap();
1763 assert!(!svc.is_enabled("f", None));
1765 }
1766
1767 #[test]
1768 fn add_group_mutation_format() {
1769 let store = InMemoryFlagStore::new();
1770 store.add_group("f", "beta_testers", Some("cli")).unwrap();
1771 let hist = store.history("f", 1).unwrap();
1772 assert_eq!(hist[0].mutation, "added_group=beta_testers");
1773 assert_eq!(hist[0].actor.as_deref(), Some("cli"));
1774 }
1775
1776 #[test]
1777 fn service_list_returns_all_flags() {
1778 let svc = make_svc();
1779 svc.enable("a", None).unwrap();
1780 svc.disable("b", None).unwrap();
1781 svc.set_rollout("c", 10, None).unwrap();
1782 let flags = svc.list().unwrap();
1783 assert_eq!(flags.len(), 3);
1784 assert_eq!(flags[0].key, "a");
1785 assert_eq!(flags[1].key, "b");
1786 assert_eq!(flags[2].key, "c");
1787 }
1788
1789 #[test]
1790 fn service_debug_does_not_panic() {
1791 let svc = make_svc();
1792 let _ = format!("{svc:?}");
1793 }
1794
1795 #[test]
1796 fn flags_enabled_delegates_to_service() {
1797 let svc = make_svc();
1799 svc.enable("active", None).unwrap();
1800 assert!(svc.is_enabled("active", Some("any_user")));
1801 assert!(!svc.is_enabled("missing", Some("any_user")));
1802 }
1803
1804 #[tokio::test]
1805 async fn from_request_parts_respects_custom_auth_session_key() {
1806 use axum::extract::FromRequestParts;
1807 use std::collections::HashMap;
1808
1809 let svc = make_svc();
1810 let state = crate::AppState::for_test().with_auth_session_key("custom_user_id");
1811 state.insert_extension(svc);
1812
1813 let mut data = HashMap::new();
1814 data.insert("custom_user_id".to_owned(), "user:123".to_owned());
1815 data.insert("user_id".to_owned(), "user:999".to_owned()); let session = crate::session::Session::new_for_test("session_id".to_owned(), data);
1817
1818 let mut req = axum::http::Request::builder().body(()).unwrap();
1819 req.extensions_mut().insert(session);
1820 let mut parts = req.into_parts().0;
1821
1822 let flags = Flags::from_request_parts(&mut parts, &state).await.unwrap();
1823 assert_eq!(flags.actor_id.as_deref(), Some("user:123"));
1824 }
1825}