Skip to main content

autumn_web/
feature_flags.rs

1//! First-class feature flags with per-actor rollouts and kill switches.
2//!
3//! Provides a typed, pluggable flag system that supports global on/off,
4//! percent rollouts (stable per `(flag_name, actor_id)`), explicit actor
5//! allowlists, and named group membership checks — without requiring a
6//! redeploy to toggle any gate.
7//!
8//! # Quick start
9//!
10//! ```rust
11//! use autumn_web::feature_flags::{FeatureFlagService, InMemoryFlagStore, FlagConfig};
12//! use std::sync::Arc;
13//!
14//! // 1. Build a service backed by the in-memory store (perfect for tests).
15//! let store = Arc::new(InMemoryFlagStore::new());
16//! let svc = FeatureFlagService::new(store);
17//!
18//! // 2. Enable a flag for everyone.
19//! svc.enable("dark_mode", None).unwrap();
20//! assert!(svc.is_enabled("dark_mode", Some("user:1")));
21//!
22//! // 3. Disable it — all replicas pick up the change within seconds when
23//! //    backed by the Postgres store with LISTEN/NOTIFY.
24//! svc.disable("dark_mode", None).unwrap();
25//! assert!(!svc.is_enabled("dark_mode", Some("user:1")));
26//! ```
27//!
28//! # Evaluation order
29//!
30//! For a given `(flag, actor)` pair, rules are checked in this order:
31//!
32//! 1. **Kill switch**: if `enabled = false`, return `false` immediately.
33//!    Call `disable()` for an instant kill-switch that overrides rollout and allowlists.
34//! 2. **Global on**: if `rollout_pct >= 100`, return `true` for all actors.
35//!    Call `enable()` to globally enable a flag.
36//! 3. **Actor allowlist**: if the actor ID is in the explicit allowlist, return `true`.
37//! 4. **Group membership**: if the actor belongs to any allowed group, return `true`.
38//! 5. **Percent rollout**: if `rollout_pct > 0` and the deterministic hash bucket
39//!    of `(flag_name, actor_id)` falls below the threshold, return `true`.
40//! 6. Otherwise return `false`.
41//!
42//! Calling `enable()` sets `rollout_pct = 100` (globally on for all actors).
43//! Calling `disable()` sets `enabled = false` — a hard kill-switch that overrides
44//! rollout and allowlists — while preserving the rollout/allowlist configuration.
45//!
46//! Percent-rollout buckets are computed with a FNV-1a hash over the UTF-8
47//! encoding of `"<flag_name>:<actor_id>"` and are therefore stable across
48//! restarts and replicas.
49
50use std::collections::HashMap;
51use std::sync::{Arc, RwLock};
52use std::time::{SystemTime, UNIX_EPOCH};
53
54use serde::{Deserialize, Serialize};
55
56// ── Change log ───────────────────────────────────────────────────────────────
57
58/// A single mutation recorded in the flag change log.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct FlagChangeRecord {
61    /// The flag key that was changed.
62    pub key: String,
63    /// Human-readable description of the mutation (e.g. `"enabled"`, `"rollout=25"`).
64    pub mutation: String,
65    /// Actor identifier supplied by the caller (username, principal, `"cli"`, etc.).
66    pub actor: Option<String>,
67    /// Wall-clock time of the change in seconds since UNIX epoch.
68    pub timestamp_secs: u64,
69}
70
71impl FlagChangeRecord {
72    fn now(key: &str, mutation: impl Into<String>, actor: Option<&str>) -> Self {
73        let timestamp_secs = SystemTime::now()
74            .duration_since(UNIX_EPOCH)
75            .unwrap_or_default()
76            .as_secs();
77        Self {
78            key: key.to_owned(),
79            mutation: mutation.into(),
80            actor: actor.map(str::to_owned),
81            timestamp_secs,
82        }
83    }
84}
85
86// ── Flag configuration ───────────────────────────────────────────────────────
87
88/// The full configuration of a single feature flag.
89///
90/// A flag is enabled for a given actor when **any** of the following holds:
91///
92/// - `enabled` is `true` (global gate — fastest path).
93/// - The actor's ID appears in `actor_allowlist`.
94/// - The actor belongs to any group in `group_allowlist`.
95/// - `rollout_pct > 0` and the actor's deterministic bucket falls below the
96///   threshold (see module-level documentation for the hash algorithm).
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub struct FlagConfig {
99    /// Unique flag key in `snake_case`.
100    pub key: String,
101    /// Optional human-readable description.
102    pub description: Option<String>,
103    /// Global gate: when `true` every actor sees the flag as enabled.
104    pub enabled: bool,
105    /// Percent rollout (0 = off, 1–100 = percentage of actors).
106    pub rollout_pct: u8,
107    /// Explicit list of actor IDs that always see the flag as enabled.
108    pub actor_allowlist: Vec<String>,
109    /// Named groups whose members always see the flag as enabled.
110    pub group_allowlist: Vec<String>,
111}
112
113impl FlagConfig {
114    /// Create a new disabled flag with no gates set.
115    #[must_use]
116    pub fn new(key: impl Into<String>) -> Self {
117        Self {
118            key: key.into(),
119            description: None,
120            enabled: false,
121            rollout_pct: 0,
122            actor_allowlist: Vec::new(),
123            group_allowlist: Vec::new(),
124        }
125    }
126}
127
128// ── Group resolver ──────────────────────────────────────────────────────────
129
130/// A hook that checks whether `actor_id` belongs to `group`.
131///
132/// Register a resolver with [`FeatureFlagService::with_group_resolver`] to
133/// enable the named-group evaluation gate.
134pub type GroupResolver = Arc<dyn Fn(&str, &str) -> bool + Send + Sync + 'static>;
135
136// ── FlagStore trait ──────────────────────────────────────────────────────────
137
138/// Error from a [`FlagStore`] backend.
139#[derive(Debug, thiserror::Error)]
140pub enum FlagStoreError {
141    /// The backend reported an I/O or connection failure.
142    #[error("flag store backend error: {0}")]
143    Backend(String),
144}
145
146/// Pluggable storage backend for feature flags.
147///
148/// All mutation methods (`enable`, `disable`, `set_rollout`, `allow_actor`,
149/// `add_group`) record a [`FlagChangeRecord`] in the change log.
150pub trait FlagStore: Send + Sync + 'static {
151    /// Return the current configuration for `key`, or `None` if unknown.
152    ///
153    /// # Errors
154    ///
155    /// Returns a [`FlagStoreError`] on backend failure.
156    fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError>;
157
158    /// Return all known flags, sorted by key.
159    ///
160    /// # Errors
161    ///
162    /// Returns a [`FlagStoreError`] on backend failure.
163    fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError>;
164
165    /// Globally enable `key` for all actors (`enabled = true`, `rollout_pct = 100`).
166    ///
167    /// Creates the flag if absent. Clears any prior `disable()` kill-switch.
168    ///
169    /// # Errors
170    ///
171    /// Returns a [`FlagStoreError`] on backend failure.
172    fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError>;
173
174    /// Kill-switch `key` for all actors (`enabled = false`).
175    ///
176    /// Overrides rollout and allowlists while preserving their configuration.
177    /// Call `enable()` or `set_rollout()` to restore access.
178    ///
179    /// # Errors
180    ///
181    /// Returns a [`FlagStoreError`] on backend failure.
182    fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError>;
183
184    /// Set the percent-rollout gate for `key` to `pct` (0–100).
185    ///
186    /// Also clears any prior `disable()` kill-switch (`enabled = true`).
187    /// Use `disable()` for an instant kill-switch that overrides rollout.
188    ///
189    /// # Errors
190    ///
191    /// Returns a [`FlagStoreError`] on backend failure.
192    fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError>;
193
194    /// Add `actor_id` to the explicit allowlist for `key`.
195    ///
196    /// # Errors
197    ///
198    /// Returns a [`FlagStoreError`] on backend failure.
199    fn allow_actor(
200        &self,
201        key: &str,
202        actor_id: &str,
203        actor: Option<&str>,
204    ) -> Result<(), FlagStoreError>;
205
206    /// Add `group` to the named-group allowlist for `key`.
207    ///
208    /// # Errors
209    ///
210    /// Returns a [`FlagStoreError`] on backend failure.
211    fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError>;
212
213    /// Return the most recent `limit` change records for `key`.
214    ///
215    /// # Errors
216    ///
217    /// Returns a [`FlagStoreError`] on backend failure.
218    fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError>;
219}
220
221// Blanket delegation so `Box<dyn FlagStore>` can be passed to `with_flag_store`.
222impl FlagStore for Box<dyn FlagStore> {
223    fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
224        (**self).get(key)
225    }
226    fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
227        (**self).list()
228    }
229    fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
230        (**self).enable(key, actor)
231    }
232    fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
233        (**self).disable(key, actor)
234    }
235    fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError> {
236        (**self).set_rollout(key, pct, actor)
237    }
238    fn allow_actor(
239        &self,
240        key: &str,
241        actor_id: &str,
242        actor: Option<&str>,
243    ) -> Result<(), FlagStoreError> {
244        (**self).allow_actor(key, actor_id, actor)
245    }
246    fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
247        (**self).add_group(key, group, actor)
248    }
249    fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
250        (**self).history(key, limit)
251    }
252}
253
254/// `Arc<T>` delegates every method to the inner `T`.
255///
256/// This allows sharing the **same** store instance — and therefore the same
257/// cache — between `with_flag_store` and `PgFlagStore::spawn_poll_listener`:
258///
259/// ```rust,ignore
260/// use std::sync::Arc;
261/// use autumn_web::feature_flags::pg::PgFlagStore;
262///
263/// let store = Arc::new(PgFlagStore::new(&db_url));
264/// // Listener and app service share the same Arc → same cache.
265/// PgFlagStore::spawn_poll_listener(Arc::clone(&store), Duration::from_secs(1));
266/// app.with_flag_store(Arc::clone(&store)).run().await;
267/// ```
268impl<T: FlagStore + ?Sized> FlagStore for Arc<T> {
269    fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
270        (**self).get(key)
271    }
272    fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
273        (**self).list()
274    }
275    fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
276        (**self).enable(key, actor)
277    }
278    fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
279        (**self).disable(key, actor)
280    }
281    fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError> {
282        (**self).set_rollout(key, pct, actor)
283    }
284    fn allow_actor(
285        &self,
286        key: &str,
287        actor_id: &str,
288        actor: Option<&str>,
289    ) -> Result<(), FlagStoreError> {
290        (**self).allow_actor(key, actor_id, actor)
291    }
292    fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
293        (**self).add_group(key, group, actor)
294    }
295    fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
296        (**self).history(key, limit)
297    }
298}
299
300// ── InMemoryFlagStore ────────────────────────────────────────────────────────
301
302/// A thread-safe in-memory [`FlagStore`] suitable for tests and development.
303///
304/// State is **not** shared across processes or replicas. For production use
305/// the Postgres-backed store from `autumn_web::feature_flags::pg`.
306#[derive(Debug, Default)]
307pub struct InMemoryFlagStore {
308    flags: RwLock<HashMap<String, FlagConfig>>,
309    history: RwLock<HashMap<String, Vec<FlagChangeRecord>>>,
310}
311
312impl InMemoryFlagStore {
313    /// Create an empty in-memory store.
314    #[must_use]
315    pub fn new() -> Self {
316        Self::default()
317    }
318
319    fn upsert(&self, key: &str, f: impl FnOnce(&mut FlagConfig)) {
320        let mut flags = self.flags.write().unwrap();
321        let flag = flags
322            .entry(key.to_owned())
323            .or_insert_with(|| FlagConfig::new(key));
324        f(flag);
325        drop(flags);
326    }
327
328    fn record(&self, record: FlagChangeRecord) {
329        self.history
330            .write()
331            .unwrap()
332            .entry(record.key.clone())
333            .or_default()
334            .push(record);
335    }
336}
337
338impl FlagStore for InMemoryFlagStore {
339    fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
340        Ok(self.flags.read().unwrap().get(key).cloned())
341    }
342
343    fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
344        let mut flags: Vec<FlagConfig> = self.flags.read().unwrap().values().cloned().collect();
345        flags.sort_by(|a, b| a.key.cmp(&b.key));
346        Ok(flags)
347    }
348
349    fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
350        self.upsert(key, |f| {
351            f.enabled = true;
352            f.rollout_pct = 100;
353        });
354        self.record(FlagChangeRecord::now(key, "enabled", actor));
355        Ok(())
356    }
357
358    fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
359        self.upsert(key, |f| {
360            f.enabled = false;
361        });
362        self.record(FlagChangeRecord::now(key, "disabled", actor));
363        Ok(())
364    }
365
366    fn set_rollout(&self, key: &str, pct: u8, actor: Option<&str>) -> Result<(), FlagStoreError> {
367        let pct = pct.min(100);
368        self.upsert(key, |f| {
369            f.enabled = true;
370            f.rollout_pct = pct;
371        });
372        self.record(FlagChangeRecord::now(key, format!("rollout={pct}"), actor));
373        Ok(())
374    }
375
376    fn allow_actor(
377        &self,
378        key: &str,
379        actor_id: &str,
380        actor: Option<&str>,
381    ) -> Result<(), FlagStoreError> {
382        self.upsert(key, |f| {
383            if !f.enabled {
384                // Re-enabling from a kill-switch via allowlist: reset rollout to 0
385                // so only the explicitly listed actors gain access, not everyone
386                // who happened to be in the previous (e.g. 100%) rollout cohort.
387                f.rollout_pct = 0;
388            }
389            f.enabled = true;
390            if !f.actor_allowlist.contains(&actor_id.to_owned()) {
391                f.actor_allowlist.push(actor_id.to_owned());
392            }
393        });
394        self.record(FlagChangeRecord::now(
395            key,
396            format!("allowed_actor={actor_id}"),
397            actor,
398        ));
399        Ok(())
400    }
401
402    fn add_group(&self, key: &str, group: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
403        self.upsert(key, |f| {
404            if !f.enabled {
405                // Same targeted-enable semantics as allow_actor.
406                f.rollout_pct = 0;
407            }
408            f.enabled = true;
409            if !f.group_allowlist.contains(&group.to_owned()) {
410                f.group_allowlist.push(group.to_owned());
411            }
412        });
413        self.record(FlagChangeRecord::now(
414            key,
415            format!("added_group={group}"),
416            actor,
417        ));
418        Ok(())
419    }
420
421    fn history(&self, key: &str, limit: usize) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
422        Ok(self
423            .history
424            .read()
425            .unwrap()
426            .get(key)
427            .map(|records| records.iter().rev().take(limit).cloned().collect())
428            .unwrap_or_default())
429    }
430}
431
432// ── Postgres FlagStore ───────────────────────────────────────────────────────
433
434/// Postgres-backed flag storage with LISTEN/NOTIFY cache invalidation.
435///
436/// Uses the framework-owned `autumn_feature_flags` and `feature_flag_changes`
437/// tables managed by the `create_feature_flags` migration. On any write the
438/// store sends a `NOTIFY autumn_flags` notification so all replicas running
439/// the background LISTEN task pick up the change within seconds.
440#[cfg(feature = "db")]
441pub mod pg {
442    use super::{FlagChangeRecord, FlagConfig, FlagStore, FlagStoreError};
443    use diesel::prelude::*;
444    use std::collections::HashMap;
445    use std::sync::RwLock;
446    use std::time::{Duration, Instant};
447
448    #[derive(Debug, Clone, PartialEq, Eq)]
449    enum CacheLookup {
450        Hit(Option<FlagConfig>),
451        Miss,
452    }
453
454    #[derive(Debug, Clone)]
455    struct CachedFlag {
456        value: Option<FlagConfig>,
457        expires_at: Instant,
458    }
459
460    /// Postgres-backed [`FlagStore`] with a short-lived read-through cache.
461    ///
462    /// On each write the store sends `NOTIFY autumn_flags` so replicas
463    /// subscribed via a background LISTEN task can invalidate their caches
464    /// within seconds — achieving the sub-5-second kill-switch SLA without
465    /// requiring Redis.
466    #[derive(Debug)]
467    pub struct PgFlagStore {
468        database_url: String,
469        cache_ttl: Duration,
470        cache: RwLock<HashMap<String, CachedFlag>>,
471    }
472
473    impl Clone for PgFlagStore {
474        fn clone(&self) -> Self {
475            Self::with_cache_ttl(self.database_url.clone(), self.cache_ttl)
476        }
477    }
478
479    impl PgFlagStore {
480        /// Default read-through cache lifetime.
481        pub const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(1);
482
483        /// Create a store using the default 1 s read-through cache.
484        #[must_use]
485        pub fn new(database_url: impl Into<String>) -> Self {
486            Self::with_cache_ttl(database_url, Self::DEFAULT_CACHE_TTL)
487        }
488
489        /// Create a store with an explicit cache TTL. Use `Duration::ZERO` to
490        /// disable caching.
491        #[must_use]
492        pub fn with_cache_ttl(database_url: impl Into<String>, cache_ttl: Duration) -> Self {
493            Self {
494                database_url: database_url.into(),
495                cache_ttl,
496                cache: RwLock::new(HashMap::new()),
497            }
498        }
499
500        /// Create a store from Autumn's primary database configuration.
501        #[must_use]
502        pub fn from_database_config(config: &crate::config::DatabaseConfig) -> Option<Self> {
503            config.effective_primary_url().map(Self::new)
504        }
505
506        fn connect(&self) -> Result<diesel::PgConnection, FlagStoreError> {
507            diesel::PgConnection::establish(&self.database_url)
508                .map_err(|e| FlagStoreError::Backend(e.to_string()))
509        }
510
511        fn cached(&self, key: &str) -> CacheLookup {
512            let now = Instant::now();
513            let Ok(cache) = self.cache.read() else {
514                return CacheLookup::Miss;
515            };
516            match cache.get(key) {
517                Some(c) if c.expires_at > now => CacheLookup::Hit(c.value.clone()),
518                _ => CacheLookup::Miss,
519            }
520        }
521
522        fn store_cache(&self, key: &str, value: Option<FlagConfig>) {
523            if self.cache_ttl.is_zero() {
524                return;
525            }
526            let Some(expires_at) = Instant::now().checked_add(self.cache_ttl) else {
527                return;
528            };
529            if let Ok(mut cache) = self.cache.write() {
530                cache.insert(key.to_owned(), CachedFlag { value, expires_at });
531            }
532        }
533
534        fn invalidate(&self, key: &str) {
535            if let Ok(mut cache) = self.cache.write() {
536                cache.remove(key);
537            }
538        }
539
540        fn upsert_flag(
541            conn: &mut diesel::PgConnection,
542            key: &str,
543        ) -> Result<(), diesel::result::Error> {
544            diesel::sql_query(
545                "INSERT INTO autumn_feature_flags (key) VALUES ($1) \
546                 ON CONFLICT (key) DO NOTHING",
547            )
548            .bind::<diesel::sql_types::Text, _>(key)
549            .execute(conn)?;
550            Ok(())
551        }
552
553        fn notify(conn: &mut diesel::PgConnection, key: &str) -> Result<(), diesel::result::Error> {
554            diesel::sql_query("SELECT pg_notify('autumn_flags', $1)")
555                .bind::<diesel::sql_types::Text, _>(key)
556                .execute(conn)?;
557            Ok(())
558        }
559
560        /// Spawn a background thread that polls `feature_flag_changes` and
561        /// invalidates this store's cache whenever a remote replica writes a flag.
562        ///
563        /// Without this, the cache can only be invalidated when the TTL expires.
564        /// Call this once at startup when using `PgFlagStore` in a multi-replica
565        /// deployment:
566        ///
567        /// ```rust,ignore
568        /// let store = Arc::new(PgFlagStore::new(db_url));
569        /// PgFlagStore::spawn_poll_listener(Arc::clone(&store), Duration::from_secs(1));
570        /// ```
571        ///
572        /// The thread runs indefinitely; the returned handle can be detached.
573        pub fn spawn_poll_listener(
574            store: std::sync::Arc<Self>,
575            poll_interval: std::time::Duration,
576        ) -> std::thread::JoinHandle<()> {
577            std::thread::spawn(move || {
578                // Timestamp-based cursor with a small lookback overlap.
579                //
580                // A sequence-ID cursor (WHERE id > last_id) is unsafe because
581                // PostgreSQL sequences allocate IDs before the transaction
582                // commits: transaction T1 (id=10) can commit after T2 (id=11),
583                // so advancing last_id to 11 would permanently miss id=10.
584                //
585                // A timestamp cursor avoids that by including a 5-second
586                // lookback on every poll (OVERLAP_SECS).  Any transaction that
587                // takes longer than 5 seconds to commit will still be missed,
588                // but such long-running writes are far outside the norm.
589                // Invalidating the same key twice is always safe (idempotent).
590                const OVERLAP_SECS: i64 = 5;
591                let now_secs = || {
592                    i64::try_from(
593                        std::time::SystemTime::now()
594                            .duration_since(std::time::UNIX_EPOCH)
595                            .unwrap_or_default()
596                            .as_secs(),
597                    )
598                    .unwrap_or(i64::MAX)
599                };
600                // Start the cursor in the past so we don't replay the entire
601                // historical log: only changes that arrive after the listener
602                // starts need processing (the in-process cache starts empty and
603                // repopulates lazily on first access).
604                let mut last_polled_secs: i64 = now_secs() - OVERLAP_SECS;
605
606                loop {
607                    std::thread::sleep(poll_interval);
608                    // Advance the horizon before the query so concurrent writes
609                    // during the query are captured in the next poll cycle.
610                    let new_horizon = now_secs() - OVERLAP_SECS;
611                    if let Ok(mut conn) = store.connect() {
612                        let rows: Vec<ChangeKeyRow> = diesel::sql_query(
613                            "SELECT DISTINCT key FROM feature_flag_changes \
614                             WHERE changed_at > to_timestamp($1)",
615                        )
616                        .bind::<diesel::sql_types::BigInt, _>(last_polled_secs)
617                        .load::<ChangeKeyRow>(&mut conn)
618                        .unwrap_or_default();
619
620                        for row in rows {
621                            store.invalidate(&row.key);
622                        }
623                    }
624                    last_polled_secs = new_horizon;
625                }
626            })
627        }
628    }
629
630    #[derive(diesel::QueryableByName)]
631    struct ChangeKeyRow {
632        #[diesel(sql_type = diesel::sql_types::Text)]
633        key: String,
634    }
635
636    #[derive(diesel::QueryableByName)]
637    struct FlagRow {
638        #[diesel(sql_type = diesel::sql_types::Text)]
639        key: String,
640        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
641        description: Option<String>,
642        #[diesel(sql_type = diesel::sql_types::Bool)]
643        enabled: bool,
644        #[diesel(sql_type = diesel::sql_types::SmallInt)]
645        rollout_pct: i16,
646        #[diesel(sql_type = diesel::sql_types::Text)]
647        actor_allowlist: String,
648        #[diesel(sql_type = diesel::sql_types::Text)]
649        group_allowlist: String,
650    }
651
652    impl FlagRow {
653        fn into_config(self) -> FlagConfig {
654            let actor_allowlist: Vec<String> =
655                serde_json::from_str(&self.actor_allowlist).unwrap_or_default();
656            let group_allowlist: Vec<String> =
657                serde_json::from_str(&self.group_allowlist).unwrap_or_default();
658            FlagConfig {
659                key: self.key,
660                description: self.description,
661                enabled: self.enabled,
662                rollout_pct: u8::try_from(self.rollout_pct.clamp(0, 100)).unwrap_or(0),
663                actor_allowlist,
664                group_allowlist,
665            }
666        }
667    }
668
669    #[derive(diesel::QueryableByName)]
670    struct HistoryRow {
671        #[diesel(sql_type = diesel::sql_types::Text)]
672        key: String,
673        #[diesel(sql_type = diesel::sql_types::Text)]
674        mutation: String,
675        #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
676        actor: Option<String>,
677        #[diesel(sql_type = diesel::sql_types::BigInt)]
678        timestamp_secs: i64,
679    }
680
681    impl FlagStore for PgFlagStore {
682        fn get(&self, key: &str) -> Result<Option<FlagConfig>, FlagStoreError> {
683            if let CacheLookup::Hit(v) = self.cached(key) {
684                return Ok(v);
685            }
686            let mut conn = self.connect()?;
687            let result = diesel::sql_query(
688                "SELECT key, description, enabled, rollout_pct, \
689                        actor_allowlist, group_allowlist \
690                 FROM autumn_feature_flags WHERE key = $1",
691            )
692            .bind::<diesel::sql_types::Text, _>(key)
693            .get_result::<FlagRow>(&mut conn)
694            .optional()
695            .map(|r| r.map(FlagRow::into_config))
696            .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
697
698            self.store_cache(key, result.clone());
699            Ok(result)
700        }
701
702        fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
703            let mut conn = self.connect()?;
704            diesel::sql_query(
705                "SELECT key, description, enabled, rollout_pct, \
706                        actor_allowlist, group_allowlist \
707                 FROM autumn_feature_flags ORDER BY key",
708            )
709            .load::<FlagRow>(&mut conn)
710            .map(|rows| rows.into_iter().map(FlagRow::into_config).collect())
711            .map_err(|e| FlagStoreError::Backend(e.to_string()))
712        }
713
714        fn enable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
715            let mut conn = self.connect()?;
716            conn.transaction::<(), diesel::result::Error, _>(|conn| {
717                Self::upsert_flag(conn, key)?;
718                diesel::sql_query(
719                    "UPDATE autumn_feature_flags \
720                     SET enabled = true, rollout_pct = 100, updated_at = NOW() \
721                     WHERE key = $1",
722                )
723                .bind::<diesel::sql_types::Text, _>(key)
724                .execute(conn)?;
725                diesel::sql_query(
726                    "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
727                )
728                .bind::<diesel::sql_types::Text, _>(key)
729                .bind::<diesel::sql_types::Text, _>("enabled")
730                .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
731                    actor.map(str::to_owned),
732                )
733                .execute(conn)?;
734                Self::notify(conn, key)?;
735                Ok(())
736            })
737            .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
738            self.invalidate(key);
739            Ok(())
740        }
741
742        fn disable(&self, key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
743            let mut conn = self.connect()?;
744            conn.transaction::<(), diesel::result::Error, _>(|conn| {
745                Self::upsert_flag(conn, key)?;
746                diesel::sql_query(
747                    "UPDATE autumn_feature_flags SET enabled = false, updated_at = NOW() \
748                     WHERE key = $1",
749                )
750                .bind::<diesel::sql_types::Text, _>(key)
751                .execute(conn)?;
752                diesel::sql_query(
753                    "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
754                )
755                .bind::<diesel::sql_types::Text, _>(key)
756                .bind::<diesel::sql_types::Text, _>("disabled")
757                .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
758                    actor.map(str::to_owned),
759                )
760                .execute(conn)?;
761                Self::notify(conn, key)?;
762                Ok(())
763            })
764            .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
765            self.invalidate(key);
766            Ok(())
767        }
768
769        fn set_rollout(
770            &self,
771            key: &str,
772            pct: u8,
773            actor: Option<&str>,
774        ) -> Result<(), FlagStoreError> {
775            let pct = i16::from(pct.min(100));
776            let mut conn = self.connect()?;
777            conn.transaction::<(), diesel::result::Error, _>(|conn| {
778                Self::upsert_flag(conn, key)?;
779                diesel::sql_query(
780                    "UPDATE autumn_feature_flags \
781                     SET enabled = true, rollout_pct = $2, updated_at = NOW() \
782                     WHERE key = $1",
783                )
784                .bind::<diesel::sql_types::Text, _>(key)
785                .bind::<diesel::sql_types::SmallInt, _>(pct)
786                .execute(conn)?;
787                let mutation = format!("rollout={pct}");
788                diesel::sql_query(
789                    "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
790                )
791                .bind::<diesel::sql_types::Text, _>(key)
792                .bind::<diesel::sql_types::Text, _>(&mutation)
793                .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
794                    actor.map(str::to_owned),
795                )
796                .execute(conn)?;
797                Self::notify(conn, key)?;
798                Ok(())
799            })
800            .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
801            self.invalidate(key);
802            Ok(())
803        }
804
805        fn allow_actor(
806            &self,
807            key: &str,
808            actor_id: &str,
809            actor: Option<&str>,
810        ) -> Result<(), FlagStoreError> {
811            let mut conn = self.connect()?;
812            conn.transaction::<(), diesel::result::Error, _>(|conn| {
813                Self::upsert_flag(conn, key)?;
814                diesel::sql_query(
815                    // Re-enabling from kill-switch via allowlist resets rollout_pct to 0
816                    // so only listed actors gain access, not the previous global cohort.
817                    "UPDATE autumn_feature_flags \
818                     SET enabled = true, \
819                         rollout_pct = CASE WHEN NOT enabled THEN 0 ELSE rollout_pct END, \
820                         actor_allowlist = (
821                             SELECT json_agg(DISTINCT elem) \
822                             FROM (
823                                 SELECT jsonb_array_elements_text(actor_allowlist::jsonb) AS elem \
824                                 UNION SELECT $2
825                             ) t \
826                         )::text, \
827                         updated_at = NOW() \
828                     WHERE key = $1",
829                )
830                .bind::<diesel::sql_types::Text, _>(key)
831                .bind::<diesel::sql_types::Text, _>(actor_id)
832                .execute(conn)?;
833                let mutation = format!("allowed_actor={actor_id}");
834                diesel::sql_query(
835                    "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
836                )
837                .bind::<diesel::sql_types::Text, _>(key)
838                .bind::<diesel::sql_types::Text, _>(&mutation)
839                .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
840                    actor.map(str::to_owned),
841                )
842                .execute(conn)?;
843                Self::notify(conn, key)?;
844                Ok(())
845            })
846            .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
847            self.invalidate(key);
848            Ok(())
849        }
850
851        fn add_group(
852            &self,
853            key: &str,
854            group: &str,
855            actor: Option<&str>,
856        ) -> Result<(), FlagStoreError> {
857            let mut conn = self.connect()?;
858            conn.transaction::<(), diesel::result::Error, _>(|conn| {
859                Self::upsert_flag(conn, key)?;
860                diesel::sql_query(
861                    // Re-enabling from kill-switch via group allowlist resets rollout_pct.
862                    "UPDATE autumn_feature_flags \
863                     SET enabled = true, \
864                         rollout_pct = CASE WHEN NOT enabled THEN 0 ELSE rollout_pct END, \
865                         group_allowlist = (
866                             SELECT json_agg(DISTINCT elem) \
867                             FROM (
868                                 SELECT jsonb_array_elements_text(group_allowlist::jsonb) AS elem \
869                                 UNION SELECT $2
870                             ) t \
871                         )::text, \
872                         updated_at = NOW() \
873                     WHERE key = $1",
874                )
875                .bind::<diesel::sql_types::Text, _>(key)
876                .bind::<diesel::sql_types::Text, _>(group)
877                .execute(conn)?;
878                let mutation = format!("added_group={group}");
879                diesel::sql_query(
880                    "INSERT INTO feature_flag_changes (key, mutation, actor) VALUES ($1, $2, $3)",
881                )
882                .bind::<diesel::sql_types::Text, _>(key)
883                .bind::<diesel::sql_types::Text, _>(&mutation)
884                .bind::<diesel::sql_types::Nullable<diesel::sql_types::Text>, _>(
885                    actor.map(str::to_owned),
886                )
887                .execute(conn)?;
888                Self::notify(conn, key)?;
889                Ok(())
890            })
891            .map_err(|e| FlagStoreError::Backend(e.to_string()))?;
892            self.invalidate(key);
893            Ok(())
894        }
895
896        fn history(
897            &self,
898            key: &str,
899            limit: usize,
900        ) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
901            let limit = i64::try_from(limit).unwrap_or(i64::MAX);
902            let mut conn = self.connect()?;
903            diesel::sql_query(
904                "SELECT key, mutation, actor, \
905                        EXTRACT(EPOCH FROM changed_at)::bigint AS timestamp_secs \
906                 FROM feature_flag_changes \
907                 WHERE key = $1 \
908                 ORDER BY changed_at DESC LIMIT $2",
909            )
910            .bind::<diesel::sql_types::Text, _>(key)
911            .bind::<diesel::sql_types::BigInt, _>(limit)
912            .load::<HistoryRow>(&mut conn)
913            .map(|rows| {
914                rows.into_iter()
915                    .map(|r| FlagChangeRecord {
916                        key: r.key,
917                        mutation: r.mutation,
918                        actor: r.actor,
919                        timestamp_secs: u64::try_from(r.timestamp_secs).unwrap_or(0),
920                    })
921                    .collect()
922            })
923            .map_err(|e| FlagStoreError::Backend(e.to_string()))
924        }
925    }
926
927    #[cfg(test)]
928    mod pg_tests {
929        use super::*;
930
931        #[test]
932        fn pg_store_exposes_database_url() {
933            let store = PgFlagStore::new("postgres://localhost/myapp");
934            assert_eq!(store.database_url, "postgres://localhost/myapp");
935        }
936
937        #[test]
938        fn pg_store_default_cache_ttl_is_one_second() {
939            let store = PgFlagStore::new("postgres://localhost/myapp");
940            assert_eq!(store.cache_ttl, PgFlagStore::DEFAULT_CACHE_TTL);
941        }
942
943        #[test]
944        fn pg_store_cache_miss_on_empty_store() {
945            let store = PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::ZERO);
946            assert_eq!(store.cached("my_flag"), CacheLookup::Miss);
947        }
948
949        #[test]
950        fn pg_store_cache_hit_returns_stored_value() {
951            let store =
952                PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
953            store.store_cache("my_flag", Some(FlagConfig::new("my_flag")));
954            assert!(matches!(store.cached("my_flag"), CacheLookup::Hit(Some(_))));
955        }
956
957        #[test]
958        fn pg_store_cache_hit_none_for_absent_flag() {
959            let store =
960                PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
961            store.store_cache("absent", None);
962            assert_eq!(store.cached("absent"), CacheLookup::Hit(None));
963        }
964
965        #[test]
966        fn pg_store_cache_expired_returns_miss() {
967            let store = PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::ZERO);
968            store.store_cache("expired", Some(FlagConfig::new("expired")));
969            // TTL = 0 means entries expire immediately.
970            assert_eq!(store.cached("expired"), CacheLookup::Miss);
971        }
972
973        #[test]
974        fn pg_store_invalidate_removes_from_cache() {
975            let store =
976                PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
977            store.store_cache("flag", Some(FlagConfig::new("flag")));
978            assert!(matches!(store.cached("flag"), CacheLookup::Hit(Some(_))));
979            store.invalidate("flag");
980            assert_eq!(store.cached("flag"), CacheLookup::Miss);
981        }
982
983        #[test]
984        fn pg_store_with_cache_ttl_sets_custom_ttl() {
985            let ttl = Duration::from_secs(30);
986            let store = PgFlagStore::with_cache_ttl("postgres://localhost/myapp", ttl);
987            assert_eq!(store.cache_ttl, ttl);
988        }
989
990        #[test]
991        fn pg_store_clone_has_independent_cache() {
992            // PgFlagStore::clone() creates a fresh instance — it does NOT share the
993            // cache HashMap.  Only Arc<PgFlagStore> shares a single cache.
994            let store =
995                PgFlagStore::with_cache_ttl("postgres://localhost/myapp", Duration::from_secs(60));
996            store.store_cache("cached", Some(FlagConfig::new("cached")));
997            let cloned = store.clone();
998            // Clone starts with an empty cache, so this is a Miss.
999            assert_eq!(cloned.cached("cached"), CacheLookup::Miss);
1000            // Original still holds its value — confirming the two caches are independent.
1001            assert!(matches!(store.cached("cached"), CacheLookup::Hit(Some(_))));
1002        }
1003    }
1004}
1005
1006// ── Hash helpers ─────────────────────────────────────────────────────────────
1007
1008/// FNV-1a 64-bit hash of a byte slice.
1009///
1010/// Used for stable, deterministic percent-rollout bucket assignment.
1011/// No external dependency — the algorithm is specified by the FNV standard.
1012fn fnv1a_64(data: &[u8]) -> u64 {
1013    const FNV_OFFSET: u64 = 14_695_981_039_346_656_037;
1014    const FNV_PRIME: u64 = 1_099_511_628_211;
1015    let mut hash = FNV_OFFSET;
1016    for &byte in data {
1017        hash ^= u64(byte);
1018        hash = hash.wrapping_mul(FNV_PRIME);
1019    }
1020    hash
1021}
1022
1023#[allow(clippy::cast_lossless)]
1024const fn u64(v: u8) -> u64 {
1025    v as u64
1026}
1027
1028/// Compute the percent-rollout bucket for `(flag_key, actor_id)`.
1029///
1030/// Returns a value in `[0, 100)`. If the flag's `rollout_pct` is greater
1031/// than this value the actor is in the rollout cohort.
1032#[must_use]
1033pub fn rollout_bucket(flag_key: &str, actor_id: &str) -> u8 {
1034    let key = format!("{flag_key}:{actor_id}");
1035    let hash = fnv1a_64(key.as_bytes());
1036    u8::try_from(hash % 100).unwrap_or(0)
1037}
1038
1039// ── FeatureFlagService ───────────────────────────────────────────────────────
1040
1041/// The main feature-flag service.
1042///
1043/// Wrap a [`FlagStore`] (for persistence) and an optional [`GroupResolver`]
1044/// (for named-group membership). The service is cheaply clone-able and
1045/// intended to be stored as an `AppState` extension:
1046///
1047/// ```rust,ignore
1048/// state.insert_extension(FeatureFlagService::new(Arc::new(InMemoryFlagStore::new())));
1049/// ```
1050#[derive(Clone)]
1051pub struct FeatureFlagService {
1052    store: Arc<dyn FlagStore>,
1053    group_resolver: Option<GroupResolver>,
1054}
1055
1056impl std::fmt::Debug for FeatureFlagService {
1057    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058        f.debug_struct("FeatureFlagService").finish_non_exhaustive()
1059    }
1060}
1061
1062impl FeatureFlagService {
1063    /// Create a new service wrapping the given store.
1064    #[must_use]
1065    pub fn new(store: Arc<dyn FlagStore>) -> Self {
1066        Self {
1067            store,
1068            group_resolver: None,
1069        }
1070    }
1071
1072    /// Attach a group resolver so named-group gates are evaluated.
1073    #[must_use]
1074    pub fn with_group_resolver(mut self, resolver: GroupResolver) -> Self {
1075        self.group_resolver = Some(resolver);
1076        self
1077    }
1078
1079    /// Return `true` if `flag_key` is enabled for `actor_id`.
1080    ///
1081    /// Returns `false` for unknown flags (fail-closed).
1082    #[must_use]
1083    pub fn is_enabled(&self, flag_key: &str, actor_id: Option<&str>) -> bool {
1084        let Ok(Some(flag)) = self.store.get(flag_key) else {
1085            return false;
1086        };
1087        self.evaluate(&flag, actor_id)
1088    }
1089
1090    fn evaluate(&self, flag: &FlagConfig, actor_id: Option<&str>) -> bool {
1091        // Kill switch: enabled=false overrides all other gates.
1092        if !flag.enabled {
1093            return false;
1094        }
1095
1096        // Globally on: rollout_pct=100 enables everyone without per-actor check.
1097        if flag.rollout_pct >= 100 {
1098            return true;
1099        }
1100
1101        // Actor allowlist.
1102        if let Some(actor) = actor_id
1103            && flag.actor_allowlist.iter().any(|a| a.as_str() == actor)
1104        {
1105            return true;
1106        }
1107
1108        // Named groups.
1109        if let (Some(actor), Some(resolver)) = (actor_id, &self.group_resolver) {
1110            for group in &flag.group_allowlist {
1111                if resolver(actor, group) {
1112                    return true;
1113                }
1114            }
1115        }
1116
1117        // Percent rollout (1–99%).
1118        if flag.rollout_pct > 0
1119            && let Some(actor) = actor_id
1120        {
1121            let bucket = rollout_bucket(&flag.key, actor);
1122            return bucket < flag.rollout_pct;
1123        }
1124
1125        false
1126    }
1127
1128    /// Enable `flag_key` for all actors.
1129    ///
1130    /// # Errors
1131    ///
1132    /// Propagates [`FlagStoreError`] from the backing store.
1133    pub fn enable(&self, flag_key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
1134        self.store.enable(flag_key, actor)
1135    }
1136
1137    /// Disable `flag_key` globally.
1138    ///
1139    /// # Errors
1140    ///
1141    /// Propagates [`FlagStoreError`] from the backing store.
1142    pub fn disable(&self, flag_key: &str, actor: Option<&str>) -> Result<(), FlagStoreError> {
1143        self.store.disable(flag_key, actor)
1144    }
1145
1146    /// Set the percent-rollout gate for `flag_key` to `pct` (0–100).
1147    ///
1148    /// # Errors
1149    ///
1150    /// Propagates [`FlagStoreError`] from the backing store.
1151    pub fn set_rollout(
1152        &self,
1153        flag_key: &str,
1154        pct: u8,
1155        actor: Option<&str>,
1156    ) -> Result<(), FlagStoreError> {
1157        self.store.set_rollout(flag_key, pct, actor)
1158    }
1159
1160    /// Add `actor_id` to the explicit allowlist for `flag_key`.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Propagates [`FlagStoreError`] from the backing store.
1165    pub fn allow_actor(
1166        &self,
1167        flag_key: &str,
1168        actor_id: &str,
1169        actor: Option<&str>,
1170    ) -> Result<(), FlagStoreError> {
1171        self.store.allow_actor(flag_key, actor_id, actor)
1172    }
1173
1174    /// Add `group` to the named-group allowlist for `flag_key`.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Propagates [`FlagStoreError`] from the backing store.
1179    pub fn add_group(
1180        &self,
1181        flag_key: &str,
1182        group: &str,
1183        actor: Option<&str>,
1184    ) -> Result<(), FlagStoreError> {
1185        self.store.add_group(flag_key, group, actor)
1186    }
1187
1188    /// Return all known flags, sorted by key.
1189    ///
1190    /// # Errors
1191    ///
1192    /// Propagates [`FlagStoreError`] from the backing store.
1193    pub fn list(&self) -> Result<Vec<FlagConfig>, FlagStoreError> {
1194        self.store.list()
1195    }
1196
1197    /// Return the most recent `limit` change records for `flag_key`.
1198    ///
1199    /// # Errors
1200    ///
1201    /// Propagates [`FlagStoreError`] from the backing store.
1202    pub fn history(
1203        &self,
1204        flag_key: &str,
1205        limit: usize,
1206    ) -> Result<Vec<FlagChangeRecord>, FlagStoreError> {
1207        self.store.history(flag_key, limit)
1208    }
1209}
1210
1211// ── AppState extractor ───────────────────────────────────────────────────────
1212
1213/// Request extractor that resolves the current user's flag service handle.
1214///
1215/// Extracts [`FeatureFlagService`] from the `AppState` extension slot. If no
1216/// service is registered the extraction fails with `500 Internal Server Error`.
1217///
1218/// ```rust,ignore
1219/// use autumn_web::prelude::*;
1220/// use autumn_web::feature_flags::Flags;
1221///
1222/// #[get("/dashboard")]
1223/// async fn dashboard(flags: Flags) -> Markup {
1224///     html! {
1225///         @if flags.enabled("beta_inbox") {
1226///             (render_beta_inbox())
1227///         }
1228///     }
1229/// }
1230/// ```
1231pub struct Flags {
1232    service: FeatureFlagService,
1233    actor_id: Option<String>,
1234}
1235
1236impl Flags {
1237    /// Return `true` if `flag_key` is enabled for the current actor.
1238    #[must_use]
1239    pub fn enabled(&self, flag_key: &str) -> bool {
1240        self.service.is_enabled(flag_key, self.actor_id.as_deref())
1241    }
1242
1243    /// Return the underlying service for direct mutation from handlers.
1244    #[must_use]
1245    pub const fn service(&self) -> &FeatureFlagService {
1246        &self.service
1247    }
1248}
1249
1250impl axum::extract::FromRequestParts<crate::AppState> for Flags {
1251    type Rejection = crate::AutumnError;
1252
1253    async fn from_request_parts(
1254        parts: &mut axum::http::request::Parts,
1255        state: &crate::AppState,
1256    ) -> Result<Self, Self::Rejection> {
1257        let service = state
1258            .extension::<FeatureFlagService>()
1259            .map(|arc| (*arc).clone())
1260            .ok_or_else(|| {
1261                crate::AutumnError::internal_server_error_msg(
1262                    "feature flag service not registered; \
1263                     install a FlagStore via AppBuilder::with_flag_store()",
1264                )
1265            })?;
1266
1267        // Resolve actor_id from session if available (best-effort, non-blocking).
1268        let actor_id = if let Some(session) = parts.extensions.get::<crate::session::Session>() {
1269            session.get(state.auth_session_key()).await
1270        } else {
1271            None
1272        };
1273
1274        Ok(Self { service, actor_id })
1275    }
1276}
1277
1278// ── Tests ────────────────────────────────────────────────────────────────────
1279
1280#[cfg(test)]
1281mod tests {
1282    use super::*;
1283
1284    // ─────────────── RED PHASE: tests written before full implementation ──────
1285
1286    fn make_svc() -> FeatureFlagService {
1287        FeatureFlagService::new(Arc::new(InMemoryFlagStore::new()))
1288    }
1289
1290    // AC-1: service resolves flag to bool ─────────────────────────────────────
1291
1292    #[test]
1293    fn unknown_flag_returns_false() {
1294        let svc = make_svc();
1295        assert!(!svc.is_enabled("nonexistent", Some("user:1")));
1296    }
1297
1298    #[test]
1299    fn globally_enabled_flag_returns_true_for_any_actor() {
1300        let svc = make_svc();
1301        svc.enable("my_flag", None).unwrap();
1302        assert!(svc.is_enabled("my_flag", Some("user:1")));
1303        assert!(svc.is_enabled("my_flag", Some("user:99")));
1304        assert!(svc.is_enabled("my_flag", None));
1305    }
1306
1307    #[test]
1308    fn globally_disabled_flag_returns_false_for_any_actor() {
1309        let svc = make_svc();
1310        svc.enable("my_flag", None).unwrap();
1311        svc.disable("my_flag", None).unwrap();
1312        assert!(!svc.is_enabled("my_flag", Some("user:1")));
1313        assert!(!svc.is_enabled("my_flag", None));
1314    }
1315
1316    // AC-2: evaluation modes ──────────────────────────────────────────────────
1317
1318    #[test]
1319    fn actor_allowlist_enables_specific_actor() {
1320        let svc = make_svc();
1321        svc.allow_actor("beta_feature", "user:42", None).unwrap();
1322        assert!(svc.is_enabled("beta_feature", Some("user:42")));
1323        assert!(!svc.is_enabled("beta_feature", Some("user:1")));
1324    }
1325
1326    #[test]
1327    fn group_resolver_enables_group_members() {
1328        let svc = FeatureFlagService::new(Arc::new(InMemoryFlagStore::new())).with_group_resolver(
1329            Arc::new(|actor_id: &str, group: &str| {
1330                // "staff" group contains actor IDs starting with "staff:"
1331                group == "staff" && actor_id.starts_with("staff:")
1332            }),
1333        );
1334        svc.add_group("internal_feature", "staff", None).unwrap();
1335        assert!(svc.is_enabled("internal_feature", Some("staff:alice")));
1336        assert!(!svc.is_enabled("internal_feature", Some("user:bob")));
1337    }
1338
1339    #[test]
1340    fn percent_rollout_at_0_disables_for_all_actors() {
1341        let svc = make_svc();
1342        svc.set_rollout("gradual", 0, None).unwrap();
1343        // With 0% rollout and no other gates, every actor should be disabled.
1344        for i in 0..50_u32 {
1345            let actor = format!("user:{i}");
1346            assert!(
1347                !svc.is_enabled("gradual", Some(&actor)),
1348                "expected disabled for {actor} at 0% rollout"
1349            );
1350        }
1351    }
1352
1353    #[test]
1354    fn percent_rollout_at_100_enables_for_all_actors() {
1355        let svc = make_svc();
1356        svc.set_rollout("gradual", 100, None).unwrap();
1357        for i in 0..50_u32 {
1358            let actor = format!("user:{i}");
1359            assert!(
1360                svc.is_enabled("gradual", Some(&actor)),
1361                "expected enabled for {actor} at 100% rollout"
1362            );
1363        }
1364    }
1365
1366    #[test]
1367    fn percent_rollout_at_50_enables_roughly_half() {
1368        let svc = make_svc();
1369        svc.set_rollout("rollout_flag", 50, None).unwrap();
1370        let enabled_count = (0..200_u32)
1371            .filter(|i| svc.is_enabled("rollout_flag", Some(&format!("user:{i}"))))
1372            .count();
1373        // With 200 actors and 50% rollout, expect 80–120 enabled (±20%).
1374        assert!(
1375            (80..=120).contains(&enabled_count),
1376            "expected ~100 enabled actors, got {enabled_count}"
1377        );
1378    }
1379
1380    // AC-3: determinism ───────────────────────────────────────────────────────
1381
1382    #[test]
1383    fn rollout_bucket_is_stable_across_calls() {
1384        let b1 = rollout_bucket("my_flag", "user:1");
1385        let b2 = rollout_bucket("my_flag", "user:1");
1386        assert_eq!(b1, b2, "bucket must be deterministic");
1387    }
1388
1389    #[test]
1390    fn rollout_bucket_differs_for_different_actors() {
1391        // Ensure we don't always get the same bucket (birthday collision at
1392        // 100 buckets is essentially impossible with our FNV-1a implementation).
1393        let buckets: std::collections::HashSet<u8> = (0..50_u32)
1394            .map(|i| rollout_bucket("flag", &format!("user:{i}")))
1395            .collect();
1396        assert!(
1397            buckets.len() > 10,
1398            "expected diverse buckets, got {}: {buckets:?}",
1399            buckets.len()
1400        );
1401    }
1402
1403    #[test]
1404    fn rollout_bucket_in_range_0_to_99() {
1405        for i in 0..1000_u32 {
1406            let b = rollout_bucket("flag", &format!("actor:{i}"));
1407            assert!(b < 100, "bucket out of range: {b}");
1408        }
1409    }
1410
1411    #[test]
1412    fn percent_rollout_same_actor_same_flag_always_same_result() {
1413        let svc = make_svc();
1414        svc.set_rollout("stable_flag", 42, None).unwrap();
1415        let first = svc.is_enabled("stable_flag", Some("user:123"));
1416        for _ in 0..10 {
1417            assert_eq!(
1418                svc.is_enabled("stable_flag", Some("user:123")),
1419                first,
1420                "rollout result must not flip between calls"
1421            );
1422        }
1423    }
1424
1425    // AC-7: FlagStore trait + InMemoryFlagStore ────────────────────────────────
1426
1427    #[test]
1428    fn in_memory_store_returns_none_for_unknown_flag() {
1429        let store = InMemoryFlagStore::new();
1430        assert!(store.get("unknown").unwrap().is_none());
1431    }
1432
1433    #[test]
1434    fn in_memory_store_list_is_sorted() {
1435        let store = InMemoryFlagStore::new();
1436        store.enable("zebra", None).unwrap();
1437        store.enable("alpha", None).unwrap();
1438        store.enable("mango", None).unwrap();
1439        let keys: Vec<String> = store.list().unwrap().into_iter().map(|f| f.key).collect();
1440        assert_eq!(keys, vec!["alpha", "mango", "zebra"]);
1441    }
1442
1443    #[test]
1444    fn in_memory_store_enable_creates_flag_if_absent() {
1445        let store = InMemoryFlagStore::new();
1446        store.enable("new_flag", None).unwrap();
1447        let flag = store.get("new_flag").unwrap().unwrap();
1448        assert!(flag.enabled);
1449    }
1450
1451    #[test]
1452    fn in_memory_store_disable_sets_enabled_false() {
1453        let store = InMemoryFlagStore::new();
1454        store.enable("f", None).unwrap();
1455        store.disable("f", None).unwrap();
1456        assert!(!store.get("f").unwrap().unwrap().enabled);
1457    }
1458
1459    #[test]
1460    fn in_memory_store_allow_actor_does_not_duplicate() {
1461        let store = InMemoryFlagStore::new();
1462        store.allow_actor("f", "user:1", None).unwrap();
1463        store.allow_actor("f", "user:1", None).unwrap();
1464        let flag = store.get("f").unwrap().unwrap();
1465        assert_eq!(flag.actor_allowlist.len(), 1);
1466    }
1467
1468    #[test]
1469    fn in_memory_store_add_group_does_not_duplicate() {
1470        let store = InMemoryFlagStore::new();
1471        store.add_group("f", "staff", None).unwrap();
1472        store.add_group("f", "staff", None).unwrap();
1473        let flag = store.get("f").unwrap().unwrap();
1474        assert_eq!(flag.group_allowlist.len(), 1);
1475    }
1476
1477    // AC-10: audit trail ──────────────────────────────────────────────────────
1478
1479    #[test]
1480    fn mutations_are_recorded_in_history() {
1481        let svc = make_svc();
1482        svc.enable("tracked_flag", Some("ops@example.com")).unwrap();
1483        svc.disable("tracked_flag", Some("ops@example.com"))
1484            .unwrap();
1485        let history = svc.history("tracked_flag", 10).unwrap();
1486        assert_eq!(history.len(), 2, "two mutations should be recorded");
1487        assert_eq!(history[0].mutation, "disabled");
1488        assert_eq!(history[0].actor.as_deref(), Some("ops@example.com"));
1489        assert_eq!(history[1].mutation, "enabled");
1490    }
1491
1492    #[test]
1493    fn history_respects_limit() {
1494        let svc = make_svc();
1495        for _ in 0..5 {
1496            svc.enable("limited_flag", None).unwrap();
1497        }
1498        let history = svc.history("limited_flag", 3).unwrap();
1499        assert_eq!(history.len(), 3);
1500    }
1501
1502    #[test]
1503    fn history_empty_for_unknown_flag() {
1504        let svc = make_svc();
1505        let history = svc.history("ghost_flag", 10).unwrap();
1506        assert!(history.is_empty());
1507    }
1508
1509    #[test]
1510    fn rollout_mutation_recorded_with_pct() {
1511        let svc = make_svc();
1512        svc.set_rollout("roll", 25, Some("cli")).unwrap();
1513        let history = svc.history("roll", 1).unwrap();
1514        assert_eq!(history[0].mutation, "rollout=25");
1515        assert_eq!(history[0].actor.as_deref(), Some("cli"));
1516    }
1517
1518    #[test]
1519    fn allow_actor_mutation_recorded() {
1520        let svc = make_svc();
1521        svc.allow_actor("f", "user:7", Some("cli")).unwrap();
1522        let h = svc.history("f", 1).unwrap();
1523        assert_eq!(h[0].mutation, "allowed_actor=user:7");
1524    }
1525
1526    // ── FlagConfig defaults ───────────────────────────────────────────────────
1527
1528    #[test]
1529    fn flag_config_new_defaults_to_disabled() {
1530        let f = FlagConfig::new("my_flag");
1531        assert_eq!(f.key, "my_flag");
1532        assert!(!f.enabled);
1533        assert_eq!(f.rollout_pct, 0);
1534        assert!(f.actor_allowlist.is_empty());
1535        assert!(f.group_allowlist.is_empty());
1536    }
1537
1538    // ── Rollout clamping ──────────────────────────────────────────────────────
1539
1540    #[test]
1541    fn set_rollout_clamps_to_100() {
1542        let store = InMemoryFlagStore::new();
1543        store.set_rollout("f", 200, None).unwrap();
1544        assert_eq!(store.get("f").unwrap().unwrap().rollout_pct, 100);
1545    }
1546
1547    // AC-1 kill-switch: disable() must override rollout and allowlists ─────────
1548
1549    #[test]
1550    fn disable_kills_flag_even_when_rollout_is_100_percent() {
1551        let svc = make_svc();
1552        svc.set_rollout("roll_flag", 100, None).unwrap();
1553        svc.disable("roll_flag", None).unwrap();
1554        for i in 0..20_u32 {
1555            assert!(
1556                !svc.is_enabled("roll_flag", Some(&format!("user:{i}"))),
1557                "disable() must override rollout for actor user:{i}"
1558            );
1559        }
1560        assert!(!svc.is_enabled("roll_flag", None));
1561    }
1562
1563    #[test]
1564    fn disable_kills_flag_even_when_actor_is_in_allowlist() {
1565        let svc = make_svc();
1566        svc.allow_actor("guarded", "user:42", None).unwrap();
1567        svc.disable("guarded", None).unwrap();
1568        assert!(
1569            !svc.is_enabled("guarded", Some("user:42")),
1570            "disable() must override actor allowlist"
1571        );
1572    }
1573
1574    #[test]
1575    fn enable_after_disable_restores_rollout_config() {
1576        let svc = make_svc();
1577        svc.set_rollout("roll_flag", 50, None).unwrap();
1578        svc.disable("roll_flag", None).unwrap();
1579        // Re-enable globally — disable() preserves rollout_pct=50 in the store,
1580        // but enable() resets it to 100 (globally on).
1581        svc.enable("roll_flag", None).unwrap();
1582        assert!(svc.is_enabled("roll_flag", None));
1583        assert!(svc.is_enabled("roll_flag", Some("user:1")));
1584    }
1585
1586    // AC-1 allow_actor after kill-switch must not restore global rollout ────────
1587
1588    #[test]
1589    fn allow_actor_after_kill_switch_does_not_restore_global_rollout() {
1590        // Scenario: enable globally → disable (kill-switch) → allow_actor for
1591        // one tester. The flag must be visible only to the allowlisted actor,
1592        // NOT to everyone (which would happen if rollout_pct=100 were preserved).
1593        let svc = make_svc();
1594        svc.enable("targeted", None).unwrap(); // rollout_pct = 100
1595        svc.disable("targeted", None).unwrap(); // kill-switch, rollout_pct still 100
1596        svc.allow_actor("targeted", "user:42", None).unwrap(); // re-enable allowlist-only
1597
1598        assert!(
1599            svc.is_enabled("targeted", Some("user:42")),
1600            "allowlisted actor must see the flag"
1601        );
1602        // All non-allowlisted actors should NOT see it (rollout was reset to 0).
1603        for i in [1_u32, 5, 10, 99] {
1604            let actor = format!("user:{i}");
1605            assert!(
1606                !svc.is_enabled("targeted", Some(&actor)),
1607                "non-allowlisted actor {actor} must NOT see the flag after allowlist-only re-enable"
1608            );
1609        }
1610    }
1611
1612    #[test]
1613    fn allow_actor_on_active_rollout_preserves_rollout_pct() {
1614        // When the flag is already enabled (no kill-switch), adding an actor to the
1615        // allowlist must NOT reset the existing rollout percentage.
1616        let svc = make_svc();
1617        svc.set_rollout("staged", 50, None).unwrap(); // enabled=true, rollout=50%
1618        svc.allow_actor("staged", "user:42", None).unwrap();
1619
1620        // rollout_pct should still be 50, not reset to 0.
1621        let store = InMemoryFlagStore::new();
1622        store.set_rollout("staged", 50, None).unwrap();
1623        store.allow_actor("staged", "user:42", None).unwrap();
1624        let flag = store.get("staged").unwrap().unwrap();
1625        assert_eq!(
1626            flag.rollout_pct, 50,
1627            "rollout_pct must be preserved when flag was already enabled"
1628        );
1629        assert!(flag.actor_allowlist.contains(&"user:42".to_owned()));
1630    }
1631
1632    // ── Arc<T: FlagStore> delegation ──────────────────────────────────────────
1633
1634    #[test]
1635    fn arc_flag_store_delegates_get() {
1636        let store = Arc::new(InMemoryFlagStore::new());
1637        store.enable("arc_flag", None).unwrap();
1638        let arc_store: Arc<dyn FlagStore> = store;
1639        let flag = arc_store.get("arc_flag").unwrap().unwrap();
1640        assert!(flag.enabled);
1641    }
1642
1643    #[test]
1644    fn arc_flag_store_delegates_list() {
1645        let store = Arc::new(InMemoryFlagStore::new());
1646        store.enable("f1", None).unwrap();
1647        store.enable("f2", None).unwrap();
1648        let arc_store: Arc<dyn FlagStore> = store;
1649        let flags = arc_store.list().unwrap();
1650        assert_eq!(flags.len(), 2);
1651    }
1652
1653    #[test]
1654    fn arc_flag_store_delegates_enable_and_disable() {
1655        let store = Arc::new(InMemoryFlagStore::new());
1656        let arc_store: Arc<dyn FlagStore> = store;
1657        arc_store.enable("f", None).unwrap();
1658        assert!(arc_store.get("f").unwrap().unwrap().enabled);
1659        arc_store.disable("f", None).unwrap();
1660        assert!(!arc_store.get("f").unwrap().unwrap().enabled);
1661    }
1662
1663    #[test]
1664    fn arc_flag_store_delegates_set_rollout() {
1665        let store = Arc::new(InMemoryFlagStore::new());
1666        let arc_store: Arc<dyn FlagStore> = store;
1667        arc_store.set_rollout("f", 42, None).unwrap();
1668        let flag = arc_store.get("f").unwrap().unwrap();
1669        assert_eq!(flag.rollout_pct, 42);
1670    }
1671
1672    #[test]
1673    fn arc_flag_store_delegates_allow_actor() {
1674        let store = Arc::new(InMemoryFlagStore::new());
1675        let arc_store: Arc<dyn FlagStore> = store;
1676        arc_store.allow_actor("f", "user:1", None).unwrap();
1677        let flag = arc_store.get("f").unwrap().unwrap();
1678        assert!(flag.actor_allowlist.contains(&"user:1".to_owned()));
1679    }
1680
1681    #[test]
1682    fn arc_flag_store_delegates_add_group() {
1683        let store = Arc::new(InMemoryFlagStore::new());
1684        let arc_store: Arc<dyn FlagStore> = store;
1685        arc_store.add_group("f", "beta_testers", None).unwrap();
1686        let flag = arc_store.get("f").unwrap().unwrap();
1687        assert!(flag.group_allowlist.contains(&"beta_testers".to_owned()));
1688    }
1689
1690    #[test]
1691    fn arc_flag_store_delegates_history() {
1692        let store = Arc::new(InMemoryFlagStore::new());
1693        let arc_store: Arc<dyn FlagStore> = store;
1694        arc_store.enable("f", Some("cli")).unwrap();
1695        let history = arc_store.history("f", 10).unwrap();
1696        assert_eq!(history.len(), 1);
1697        assert_eq!(history[0].mutation, "enabled");
1698    }
1699
1700    // ── Box<dyn FlagStore> delegation ─────────────────────────────────────────
1701
1702    #[test]
1703    fn box_flag_store_delegates_all_operations() {
1704        let store = InMemoryFlagStore::new();
1705        let boxed: Box<dyn FlagStore> = Box::new(store);
1706        boxed.enable("f", None).unwrap();
1707        assert!(boxed.get("f").unwrap().unwrap().enabled);
1708        boxed.set_rollout("g", 25, Some("cli")).unwrap();
1709        assert_eq!(boxed.get("g").unwrap().unwrap().rollout_pct, 25);
1710        boxed.allow_actor("h", "user:1", None).unwrap();
1711        boxed.add_group("h", "staff", None).unwrap();
1712        let flags = boxed.list().unwrap();
1713        // f, g, h are present
1714        assert_eq!(flags.len(), 3);
1715        let hist = boxed.history("f", 5).unwrap();
1716        assert_eq!(hist[0].mutation, "enabled");
1717        boxed.disable("f", None).unwrap();
1718        assert!(!boxed.get("f").unwrap().unwrap().enabled);
1719    }
1720
1721    // ── FlagStoreError display ────────────────────────────────────────────────
1722
1723    #[test]
1724    fn flag_store_error_displays_message() {
1725        let err = FlagStoreError::Backend("connection refused".to_owned());
1726        assert_eq!(
1727            err.to_string(),
1728            "flag store backend error: connection refused"
1729        );
1730    }
1731
1732    // ── FlagConfig clone and equality ─────────────────────────────────────────
1733
1734    #[test]
1735    fn flag_config_clone_is_equal_to_original() {
1736        let mut f = FlagConfig::new("cloned");
1737        f.enabled = true;
1738        f.rollout_pct = 50;
1739        f.actor_allowlist = vec!["user:1".to_owned()];
1740        let g = f.clone();
1741        assert_eq!(f, g);
1742    }
1743
1744    // ── evaluate() edge cases ─────────────────────────────────────────────────
1745
1746    #[test]
1747    fn rollout_with_no_actor_returns_false() {
1748        // When actor_id is None and there are no allowlists, a percent rollout
1749        // must not enable the flag (there's no actor to compute a bucket for).
1750        let svc = make_svc();
1751        svc.set_rollout("gradual", 99, None).unwrap();
1752        assert!(
1753            !svc.is_enabled("gradual", None),
1754            "percent rollout must not fire for anonymous (None) actor"
1755        );
1756    }
1757
1758    #[test]
1759    fn group_resolver_with_no_actor_does_not_panic() {
1760        let svc = FeatureFlagService::new(Arc::new(InMemoryFlagStore::new()))
1761            .with_group_resolver(Arc::new(|_: &str, _: &str| true));
1762        svc.add_group("f", "everyone", None).unwrap();
1763        // No actor — group check must be skipped, not panic.
1764        assert!(!svc.is_enabled("f", None));
1765    }
1766
1767    #[test]
1768    fn add_group_mutation_format() {
1769        let store = InMemoryFlagStore::new();
1770        store.add_group("f", "beta_testers", Some("cli")).unwrap();
1771        let hist = store.history("f", 1).unwrap();
1772        assert_eq!(hist[0].mutation, "added_group=beta_testers");
1773        assert_eq!(hist[0].actor.as_deref(), Some("cli"));
1774    }
1775
1776    #[test]
1777    fn service_list_returns_all_flags() {
1778        let svc = make_svc();
1779        svc.enable("a", None).unwrap();
1780        svc.disable("b", None).unwrap();
1781        svc.set_rollout("c", 10, None).unwrap();
1782        let flags = svc.list().unwrap();
1783        assert_eq!(flags.len(), 3);
1784        assert_eq!(flags[0].key, "a");
1785        assert_eq!(flags[1].key, "b");
1786        assert_eq!(flags[2].key, "c");
1787    }
1788
1789    #[test]
1790    fn service_debug_does_not_panic() {
1791        let svc = make_svc();
1792        let _ = format!("{svc:?}");
1793    }
1794
1795    #[test]
1796    fn flags_enabled_delegates_to_service() {
1797        // Test the Flags::enabled() method via FeatureFlagService directly.
1798        let svc = make_svc();
1799        svc.enable("active", None).unwrap();
1800        assert!(svc.is_enabled("active", Some("any_user")));
1801        assert!(!svc.is_enabled("missing", Some("any_user")));
1802    }
1803
1804    #[tokio::test]
1805    async fn from_request_parts_respects_custom_auth_session_key() {
1806        use axum::extract::FromRequestParts;
1807        use std::collections::HashMap;
1808
1809        let svc = make_svc();
1810        let state = crate::AppState::for_test().with_auth_session_key("custom_user_id");
1811        state.insert_extension(svc);
1812
1813        let mut data = HashMap::new();
1814        data.insert("custom_user_id".to_owned(), "user:123".to_owned());
1815        data.insert("user_id".to_owned(), "user:999".to_owned()); // distracter
1816        let session = crate::session::Session::new_for_test("session_id".to_owned(), data);
1817
1818        let mut req = axum::http::Request::builder().body(()).unwrap();
1819        req.extensions_mut().insert(session);
1820        let mut parts = req.into_parts().0;
1821
1822        let flags = Flags::from_request_parts(&mut parts, &state).await.unwrap();
1823        assert_eq!(flags.actor_id.as_deref(), Some("user:123"));
1824    }
1825}