Skip to main content

fraiseql_auth/
pkce.rs

1//! PKCE state store — RFC 7636 Proof Key for Code Exchange.
2//!
3//! Stores `(code_verifier, redirect_uri)` under a random internal key while
4//! the OAuth2 authorization round-trip is in flight.  The token sent to the
5//! OIDC provider in the `?state=` query parameter is either:
6//! - the raw internal key (no encryption configured), or
7//! - `encrypt(internal_key)` (when [`crate::state_encryption::StateEncryptionService`] is
8//!   attached).
9//!
10//! State lifecycle:
11//! - `create_state(redirect_uri)` → `internal_key = random 32 bytes (base64url)` → `outbound_token
12//!   = encrypt(internal_key)` (or `internal_key` if no encryption) → `store.insert(internal_key,
13//!   {verifier, redirect_uri, ttl})` → returns `(outbound_token, verifier)`
14//! - `consume_state(outbound_token)` → `internal_key = decrypt(outbound_token)` (or
15//!   `outbound_token` if no encryption) → `entry = store.remove(internal_key)?` (
16//!   [`PkceError::StateNotFound`] if absent) → if `entry.elapsed > entry.ttl` →
17//!   [`PkceError::StateExpired`] → returns `{verifier, redirect_uri}`
18//!
19//! Backends:
20//! - **InMemory** — `DashMap`, single-process, per-replica
21//! - **Redis** — distributed, multi-replica (requires the `redis-pkce` Cargo feature)
22
23use std::{sync::Arc, time::Duration};
24
25use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
26use dashmap::DashMap;
27use rand::{RngCore, rngs::OsRng};
28use sha2::{Digest, Sha256};
29use thiserror::Error;
30
31use crate::state_encryption::StateEncryptionService;
32
33// ---------------------------------------------------------------------------
34// Error type
35// ---------------------------------------------------------------------------
36
37/// Errors returned by [`PkceStateStore::consume_state`].
38#[derive(Debug, Error)]
39#[non_exhaustive]
40pub enum PkceError {
41    /// The state token was not found — either never issued, already consumed,
42    /// or (when encryption is on) tampered/decryption failed.
43    ///
44    /// Clients receive the same message for unknown and tampered tokens to
45    /// avoid leaking information about the store.
46    #[error(
47        "state not found — the authorization flow may have already been completed or the state is invalid"
48    )]
49    StateNotFound,
50
51    /// The state token was found but its TTL has elapsed.
52    ///
53    /// Distinct from [`PkceError::StateNotFound`] so that clients can show
54    /// a useful "please restart the authorization flow" message rather than
55    /// a generic invalid-state error.
56    #[error("state expired — please restart the authorization flow")]
57    StateExpired,
58}
59
60// ---------------------------------------------------------------------------
61// Public consumed-state value
62// ---------------------------------------------------------------------------
63
64/// The data recovered after consuming a valid PKCE state token.
65#[derive(Debug)]
66pub struct ConsumedPkceState {
67    /// The `code_verifier` generated during `create_state`, needed for the
68    /// PKCE code exchange at `/token`.
69    pub verifier:     String,
70    /// The `redirect_uri` the client specified at `/auth/start`.
71    pub redirect_uri: String,
72}
73
74// ---------------------------------------------------------------------------
75// InMemoryPkceStateStore
76// ---------------------------------------------------------------------------
77
78struct PkceEntry {
79    verifier:     String,
80    redirect_uri: String,
81    /// Creation time as a Tokio instant so `tokio::time::pause()` +
82    /// `tokio::time::advance()` can control TTL expiry in tests.
83    created_at:   tokio::time::Instant,
84    ttl:          Duration,
85}
86
87/// In-memory PKCE state store backed by a [`DashMap`].
88///
89/// State is per-process: lost on restart, not shared across replicas.
90/// For multi-replica deployments, use `RedisPkceStateStore` instead
91/// (requires the `redis-pkce` Cargo feature).
92pub struct InMemoryPkceStateStore {
93    state_ttl_secs: u64,
94    entries:        DashMap<String, PkceEntry>,
95    encryptor:      Option<Arc<StateEncryptionService>>,
96}
97
98impl InMemoryPkceStateStore {
99    fn new(state_ttl_secs: u64, encryptor: Option<Arc<StateEncryptionService>>) -> Self {
100        Self {
101            state_ttl_secs,
102            entries: DashMap::new(),
103            encryptor,
104        }
105    }
106
107    fn create_state_sync(&self, redirect_uri: &str) -> Result<(String, String), anyhow::Error> {
108        // code_verifier — RFC 7636 §4.1: 43–128 chars, [A-Za-z0-9\-._~]
109        let mut verifier_bytes = [0u8; 32];
110        OsRng.fill_bytes(&mut verifier_bytes);
111        let verifier = URL_SAFE_NO_PAD.encode(verifier_bytes);
112
113        // internal_key — separate from verifier so outbound token cannot reveal it
114        let mut key_bytes = [0u8; 32];
115        OsRng.fill_bytes(&mut key_bytes);
116        let internal_key = URL_SAFE_NO_PAD.encode(key_bytes);
117
118        self.entries.insert(
119            internal_key.clone(),
120            PkceEntry {
121                verifier:     verifier.clone(),
122                redirect_uri: redirect_uri.to_owned(),
123                created_at:   tokio::time::Instant::now(),
124                ttl:          Duration::from_secs(self.state_ttl_secs),
125            },
126        );
127
128        let outbound_token = match &self.encryptor {
129            Some(enc) => enc.encrypt(internal_key.as_bytes())?,
130            None => internal_key,
131        };
132
133        Ok((outbound_token, verifier))
134    }
135
136    fn consume_state_sync(&self, outbound_token: &str) -> Result<ConsumedPkceState, PkceError> {
137        let internal_key = match &self.encryptor {
138            Some(enc) => {
139                let bytes = enc.decrypt(outbound_token).map_err(|_| PkceError::StateNotFound)?;
140                String::from_utf8(bytes).map_err(|_| PkceError::StateNotFound)?
141            },
142            None => outbound_token.to_owned(),
143        };
144
145        let (_, entry) = self.entries.remove(&internal_key).ok_or(PkceError::StateNotFound)?;
146
147        if entry.created_at.elapsed() > entry.ttl {
148            return Err(PkceError::StateExpired);
149        }
150
151        Ok(ConsumedPkceState {
152            verifier:     entry.verifier,
153            redirect_uri: entry.redirect_uri,
154        })
155    }
156
157    fn cleanup_expired_sync(&self) {
158        self.entries.retain(|_, e| e.created_at.elapsed() <= e.ttl);
159    }
160
161    fn len_sync(&self) -> usize {
162        self.entries.len()
163    }
164}
165
166// ---------------------------------------------------------------------------
167// Redis backend
168// ---------------------------------------------------------------------------
169
170/// Cumulative count of Redis PKCE store errors (unreachable Redis, etc.).
171///
172/// Exposed via `/metrics` as `fraiseql_pkce_redis_errors_total`.
173#[cfg(feature = "redis-pkce")]
174pub static REDIS_PKCE_ERRORS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
175
176/// Return the total number of Redis PKCE errors observed so far.
177#[cfg(feature = "redis-pkce")]
178pub fn redis_pkce_error_count_total() -> u64 {
179    REDIS_PKCE_ERRORS.load(std::sync::atomic::Ordering::Relaxed)
180}
181
182/// Redis-backed PKCE state store for distributed, multi-replica deployments.
183///
184/// Stores PKCE state tokens in Redis with TTL, enabling auth flows to be
185/// completed on any replica. Uses `GETDEL` for atomic one-shot consumption
186/// — a state token cannot be reused even under concurrent requests.
187///
188/// Key format:   `fraiseql:pkce:{internal_key}`
189/// Value format: `{"verifier":"...","redirect_uri":"..."}`
190#[cfg(feature = "redis-pkce")]
191pub struct RedisPkceStateStore {
192    pool:           redis::aio::ConnectionManager,
193    state_ttl_secs: u64,
194    encryptor:      Option<Arc<StateEncryptionService>>,
195}
196
197#[cfg(feature = "redis-pkce")]
198impl RedisPkceStateStore {
199    /// Connect to Redis and prepare the PKCE state store.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if the URL is invalid or the initial connection fails.
204    pub async fn new(
205        url: &str,
206        state_ttl_secs: u64,
207        encryptor: Option<Arc<StateEncryptionService>>,
208    ) -> Result<Self, redis::RedisError> {
209        let client = redis::Client::open(url)?;
210        let pool = redis::aio::ConnectionManager::new(client).await?;
211        Ok(Self {
212            pool,
213            state_ttl_secs,
214            encryptor,
215        })
216    }
217
218    async fn create_state_impl(
219        &self,
220        redirect_uri: &str,
221    ) -> Result<(String, String), anyhow::Error> {
222        // code_verifier (RFC 7636 §4.1)
223        let mut verifier_bytes = [0u8; 32];
224        OsRng.fill_bytes(&mut verifier_bytes);
225        let verifier = URL_SAFE_NO_PAD.encode(verifier_bytes);
226
227        // Opaque internal key — separate from verifier
228        let mut key_bytes = [0u8; 32];
229        OsRng.fill_bytes(&mut key_bytes);
230        let internal_key = URL_SAFE_NO_PAD.encode(key_bytes);
231
232        // Serialize state to JSON and store with TTL
233        let redis_key = format!("fraiseql:pkce:{internal_key}");
234        let value = serde_json::json!({
235            "verifier":     verifier,
236            "redirect_uri": redirect_uri,
237        })
238        .to_string();
239
240        let mut conn = self.pool.clone();
241        redis::cmd("SET")
242            .arg(&redis_key)
243            .arg(&value)
244            .arg("EX")
245            .arg(self.state_ttl_secs)
246            .query_async::<()>(&mut conn)
247            .await?;
248
249        let outbound_token = match &self.encryptor {
250            Some(enc) => enc.encrypt(internal_key.as_bytes())?,
251            None => internal_key,
252        };
253
254        Ok((outbound_token, verifier))
255    }
256
257    async fn consume_state_impl(
258        &self,
259        outbound_token: &str,
260    ) -> Result<ConsumedPkceState, PkceError> {
261        #[derive(serde::Deserialize)]
262        struct StoredEntry {
263            verifier:     String,
264            redirect_uri: String,
265        }
266
267        // Recover internal key from outbound token
268        let internal_key = match &self.encryptor {
269            Some(enc) => {
270                let bytes = enc.decrypt(outbound_token).map_err(|_| PkceError::StateNotFound)?;
271                String::from_utf8(bytes).map_err(|_| PkceError::StateNotFound)?
272            },
273            None => outbound_token.to_owned(),
274        };
275
276        let redis_key = format!("fraiseql:pkce:{internal_key}");
277        let mut conn = self.pool.clone();
278
279        // GETDEL — atomically retrieve and delete in a single round-trip.
280        // This guarantees one-shot consumption: no concurrent request can
281        // reuse the same state token, even without application-level locking.
282        let raw: Option<String> = redis::cmd("GETDEL")
283            .arg(&redis_key)
284            .query_async(&mut conn)
285            .await
286            .map_err(|_| PkceError::StateNotFound)?;
287
288        let json = raw.ok_or(PkceError::StateNotFound)?;
289
290        let entry: StoredEntry =
291            serde_json::from_str(&json).map_err(|_| PkceError::StateNotFound)?;
292
293        // Note: TTL expiry is handled by Redis — expired entries are absent.
294        // The Redis backend therefore never returns `PkceError::StateExpired`;
295        // callers receive `StateNotFound` for both absent and expired tokens.
296        Ok(ConsumedPkceState {
297            verifier:     entry.verifier,
298            redirect_uri: entry.redirect_uri,
299        })
300    }
301}
302
303// ---------------------------------------------------------------------------
304// PkceStateStore — unified public interface
305// ---------------------------------------------------------------------------
306
307/// PKCE state store that dispatches to an in-memory or Redis backend.
308///
309/// # Backends
310///
311/// - **InMemory** (default): per-process DashMap. Safe for single-replica deployments. State is
312///   lost on restart.
313///
314/// - **Redis** (requires `redis-pkce` Cargo feature): distributed, shared across all replicas.
315///   Required for multi-instance Kubernetes / ECS / fly.io deployments where `/auth/start` and
316///   `/auth/callback` may hit different nodes.
317///
318/// # Multi-replica requirement
319///
320/// Set `FRAISEQL_REQUIRE_REDIS=1` in your deployment environment to make
321/// FraiseQL refuse to start without a Redis-backed PKCE store. This is the
322/// recommended pattern for production Kubernetes deployments.
323#[non_exhaustive]
324pub enum PkceStateStore {
325    /// Single-node DashMap-backed store.
326    InMemory(InMemoryPkceStateStore),
327    /// Distributed Redis-backed store (requires `redis-pkce` Cargo feature).
328    #[cfg(feature = "redis-pkce")]
329    Redis(RedisPkceStateStore),
330}
331
332impl PkceStateStore {
333    /// Create an in-memory PKCE state store (single-replica deployments).
334    pub fn new(state_ttl_secs: u64, encryptor: Option<Arc<StateEncryptionService>>) -> Self {
335        Self::InMemory(InMemoryPkceStateStore::new(state_ttl_secs, encryptor))
336    }
337
338    /// Create a Redis-backed distributed PKCE state store.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if the Redis URL is invalid or the connection fails.
343    #[cfg(feature = "redis-pkce")]
344    pub async fn new_redis(
345        url: &str,
346        state_ttl_secs: u64,
347        encryptor: Option<Arc<StateEncryptionService>>,
348    ) -> Result<Self, redis::RedisError> {
349        let inner = RedisPkceStateStore::new(url, state_ttl_secs, encryptor).await?;
350        Ok(Self::Redis(inner))
351    }
352
353    /// Returns `true` when backed by the in-memory DashMap store.
354    ///
355    /// Used by the `FRAISEQL_REQUIRE_REDIS` startup check.
356    pub const fn is_in_memory(&self) -> bool {
357        matches!(self, Self::InMemory(_))
358    }
359
360    /// Generate an authorization-code verifier and reserve a state slot.
361    ///
362    /// Returns `(outbound_token, code_verifier)`:
363    /// - `outbound_token` goes in the OIDC `?state=` query parameter.
364    /// - `code_verifier` is passed to [`Self::s256_challenge`] and stored until the callback
365    ///   arrives.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if encryption fails (effectively never with a valid
370    /// key) or the Redis backend is unreachable.
371    pub async fn create_state(
372        &self,
373        redirect_uri: &str,
374    ) -> Result<(String, String), anyhow::Error> {
375        match self {
376            Self::InMemory(s) => s.create_state_sync(redirect_uri),
377            #[cfg(feature = "redis-pkce")]
378            Self::Redis(s) => s.create_state_impl(redirect_uri).await,
379        }
380    }
381
382    /// Consume a state token, atomically removing it from the store.
383    ///
384    /// Returns [`PkceError::StateNotFound`] for:
385    /// - tokens that were never issued,
386    /// - tokens that have already been consumed (one-time use), and
387    /// - tokens that fail decryption (tampered or from a different key).
388    ///
389    /// Returns [`PkceError::StateExpired`] when the in-memory token is valid
390    /// but its TTL has elapsed. The Redis backend returns `StateNotFound` for
391    /// expired tokens (Redis TTL handles expiry).
392    ///
393    /// # Errors
394    ///
395    /// Returns `PkceError::StateNotFound` if the token is unknown, already consumed,
396    /// or fails decryption. Returns `PkceError::StateExpired` if the token's TTL has elapsed.
397    pub async fn consume_state(
398        &self,
399        outbound_token: &str,
400    ) -> Result<ConsumedPkceState, PkceError> {
401        match self {
402            Self::InMemory(s) => s.consume_state_sync(outbound_token),
403            #[cfg(feature = "redis-pkce")]
404            Self::Redis(s) => s.consume_state_impl(outbound_token).await,
405        }
406    }
407
408    /// Compute the S256 code challenge for a given verifier.
409    ///
410    /// Per RFC 7636 §4.2:
411    /// `code_challenge = BASE64URL(SHA256(ASCII(code_verifier)))`
412    /// (no padding).
413    pub fn s256_challenge(verifier: &str) -> String {
414        URL_SAFE_NO_PAD.encode(Sha256::digest(verifier.as_bytes()))
415    }
416
417    /// Remove expired entries.
418    ///
419    /// No-op for the Redis backend — Redis TTL handles expiry automatically.
420    /// Call from a background task on a fixed interval for the in-memory backend.
421    pub async fn cleanup_expired(&self) {
422        match self {
423            Self::InMemory(s) => s.cleanup_expired_sync(),
424            #[cfg(feature = "redis-pkce")]
425            Self::Redis(_) => {}, // Redis TTL handles expiry
426        }
427    }
428
429    /// Number of entries currently in the store.
430    ///
431    /// Returns 0 for the Redis backend — Redis state is not enumerable locally.
432    pub fn len(&self) -> usize {
433        match self {
434            Self::InMemory(s) => s.len_sync(),
435            #[cfg(feature = "redis-pkce")]
436            Self::Redis(_) => 0,
437        }
438    }
439
440    /// Returns `true` when the in-memory store contains no entries.
441    ///
442    /// Always returns `true` for the Redis backend.
443    pub fn is_empty(&self) -> bool {
444        self.len() == 0
445    }
446}
447
448// ---------------------------------------------------------------------------
449// Unit tests
450// ---------------------------------------------------------------------------
451
452#[allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
453#[cfg(test)]
454mod tests {
455    use std::time::Duration;
456
457    #[allow(clippy::wildcard_imports)]
458    // Reason: test module — wildcard keeps test boilerplate minimal
459    use super::*;
460    use crate::state_encryption::{EncryptionAlgorithm, StateEncryptionService};
461
462    fn store_no_enc(ttl_secs: u64) -> PkceStateStore {
463        PkceStateStore::new(ttl_secs, None)
464    }
465
466    fn enc_service() -> Arc<StateEncryptionService> {
467        Arc::new(StateEncryptionService::from_raw_key(
468            &[0u8; 32],
469            EncryptionAlgorithm::Chacha20Poly1305,
470        ))
471    }
472
473    // ── Core state machine ────────────────────────────────────────────────────
474
475    #[tokio::test]
476    async fn test_create_and_consume_roundtrip() {
477        let store = store_no_enc(600);
478        let (token, verifier) = store.create_state("https://app.example.com/cb").await.unwrap();
479        let result = store.consume_state(&token).await.unwrap();
480        assert_eq!(result.verifier, verifier);
481        assert_eq!(result.redirect_uri, "https://app.example.com/cb");
482    }
483
484    #[tokio::test]
485    async fn test_consume_removes_entry_cannot_reuse() {
486        let store = store_no_enc(600);
487        let (token, _) = store.create_state("https://app.example.com/cb").await.unwrap();
488        store.consume_state(&token).await.unwrap();
489        assert!(
490            matches!(store.consume_state(&token).await, Err(PkceError::StateNotFound)),
491            "second consume must return StateNotFound"
492        );
493    }
494
495    #[tokio::test(start_paused = true)]
496    async fn test_expired_state_returns_state_expired_not_not_found() {
497        let store = store_no_enc(1);
498        let (token, _) = store.create_state("https://example.com").await.unwrap();
499        tokio::time::advance(Duration::from_millis(1100)).await;
500        assert!(
501            matches!(store.consume_state(&token).await, Err(PkceError::StateExpired)),
502            "expired state must be StateExpired, not StateNotFound"
503        );
504    }
505
506    #[tokio::test]
507    async fn test_unknown_token_returns_not_found() {
508        let store = store_no_enc(600);
509        assert!(matches!(
510            store.consume_state("completely-unknown-token").await,
511            Err(PkceError::StateNotFound)
512        ));
513    }
514
515    #[tokio::test]
516    async fn test_two_distinct_states_dont_interfere() {
517        let store = store_no_enc(600);
518        let (t1, v1) = store.create_state("https://a.example.com/cb").await.unwrap();
519        let (t2, v2) = store.create_state("https://b.example.com/cb").await.unwrap();
520        let r2 = store.consume_state(&t2).await.unwrap();
521        let r1 = store.consume_state(&t1).await.unwrap();
522        assert_eq!(r1.verifier, v1);
523        assert_eq!(r2.verifier, v2);
524    }
525
526    // ── RFC 7636 compliance ───────────────────────────────────────────────────
527
528    #[test]
529    fn test_s256_challenge_matches_rfc7636_appendix_a() {
530        let verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk";
531        let expected = "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM";
532        assert_eq!(PkceStateStore::s256_challenge(verifier), expected);
533    }
534
535    #[tokio::test]
536    async fn test_verifier_length_and_charset_are_rfc7636_compliant() {
537        let store = store_no_enc(600);
538        let (_, verifier) = store.create_state("https://example.com").await.unwrap();
539        assert!(
540            verifier.len() >= 43 && verifier.len() <= 128,
541            "verifier length {} is outside the 43–128 char range",
542            verifier.len()
543        );
544        assert!(!verifier.contains('='), "verifier must not contain padding characters");
545    }
546
547    // ── Encryption integration ────────────────────────────────────────────────
548
549    #[tokio::test]
550    async fn test_encrypted_token_is_longer_than_raw_internal_key() {
551        let store = PkceStateStore::new(600, Some(enc_service()));
552        let (token, _) = store.create_state("https://app.example.com/cb").await.unwrap();
553        assert!(
554            token.len() > 43,
555            "encrypted token (len={}) must be longer than a raw 32-byte key (43 chars)",
556            token.len()
557        );
558    }
559
560    #[tokio::test]
561    async fn test_encrypted_roundtrip_works_end_to_end() {
562        let store = PkceStateStore::new(600, Some(enc_service()));
563        let (token, verifier) = store.create_state("https://app.example.com/cb").await.unwrap();
564        let result = store.consume_state(&token).await.unwrap();
565        assert_eq!(result.verifier, verifier);
566    }
567
568    #[tokio::test]
569    async fn test_tampered_encrypted_token_returns_not_found() {
570        let store = PkceStateStore::new(600, Some(enc_service()));
571        store.create_state("https://app.example.com/cb").await.unwrap();
572        let result = store.consume_state("aGVsbG8gd29ybGQ").await;
573        assert!(
574            matches!(result, Err(PkceError::StateNotFound)),
575            "tampered token must yield StateNotFound, not an internal error"
576        );
577    }
578
579    // ── Boundary: exact TTL expiry ──────────────────────────────────────────
580
581    #[tokio::test(start_paused = true)]
582    async fn test_consume_at_exact_ttl_boundary_succeeds() {
583        let store = store_no_enc(2);
584        let (token, verifier) = store.create_state("https://example.com").await.unwrap();
585        // Advance to exactly the TTL — should still succeed (`>` not `>=`)
586        tokio::time::advance(Duration::from_secs(2)).await;
587        let result = store.consume_state(&token).await.unwrap();
588        assert_eq!(result.verifier, verifier, "state at exact TTL boundary must still be valid");
589    }
590
591    // ── is_in_memory ─────────────────────────────────────────────────────────
592
593    #[test]
594    fn test_is_in_memory_returns_true_for_in_memory_store() {
595        let store = PkceStateStore::new(600, None);
596        assert!(store.is_in_memory());
597    }
598
599    // ── is_empty / len ───────────────────────────────────────────────────────
600
601    #[tokio::test]
602    async fn test_is_empty_true_for_fresh_store() {
603        let store = store_no_enc(600);
604        assert!(store.is_empty(), "fresh store must be empty");
605        assert_eq!(store.len(), 0);
606    }
607
608    #[tokio::test]
609    async fn test_is_empty_false_after_create() {
610        let store = store_no_enc(600);
611        store.create_state("https://example.com").await.unwrap();
612        assert!(!store.is_empty(), "store with one entry must not be empty");
613        assert_eq!(store.len(), 1);
614    }
615
616    #[tokio::test]
617    async fn test_is_empty_true_after_consume() {
618        let store = store_no_enc(600);
619        let (token, _) = store.create_state("https://example.com").await.unwrap();
620        store.consume_state(&token).await.unwrap();
621        assert!(store.is_empty(), "store must be empty after consuming the only entry");
622    }
623
624    // ── Cleanup ───────────────────────────────────────────────────────────────
625
626    #[tokio::test]
627    async fn test_cleanup_removes_expired_leaves_valid() {
628        let store = store_no_enc(1);
629        store.create_state("https://a.example.com").await.unwrap();
630        tokio::time::sleep(Duration::from_millis(1100)).await;
631        store.cleanup_expired().await;
632        assert_eq!(store.len(), 0, "expired entry must be removed by cleanup");
633
634        let store2 = store_no_enc(600);
635        store2.create_state("https://b.example.com").await.unwrap();
636        store2.cleanup_expired().await;
637        assert_eq!(store2.len(), 1, "unexpired entry must survive cleanup");
638    }
639
640    // ── Redis integration tests ───────────────────────────────────────────────
641    // Require a live Redis instance.  Run with:
642    //   REDIS_URL=redis://localhost:6379 cargo test -p fraiseql-auth \
643    //     --features redis-pkce -- redis_pkce --ignored
644
645    #[cfg(feature = "redis-pkce")]
646    #[tokio::test]
647    #[ignore = "requires Redis — set REDIS_URL=redis://localhost:6379"]
648    async fn test_redis_pkce_create_and_consume_roundtrip() {
649        let url =
650            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
651        let store = PkceStateStore::new_redis(&url, 300, None)
652            .await
653            .expect("Redis connection failed");
654
655        let (token, verifier) = store.create_state("https://example.com/cb").await.unwrap();
656        let consumed = store.consume_state(&token).await.unwrap();
657        assert_eq!(consumed.verifier, verifier);
658        assert_eq!(consumed.redirect_uri, "https://example.com/cb");
659    }
660
661    #[cfg(feature = "redis-pkce")]
662    #[tokio::test]
663    #[ignore = "requires Redis — set REDIS_URL=redis://localhost:6379"]
664    async fn test_redis_pkce_one_shot_consumption() {
665        let url =
666            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
667        let store = PkceStateStore::new_redis(&url, 300, None)
668            .await
669            .expect("Redis connection failed");
670
671        let (token, _) = store.create_state("https://example.com/cb").await.unwrap();
672        store.consume_state(&token).await.unwrap();
673
674        let second = store.consume_state(&token).await;
675        assert!(
676            matches!(second, Err(PkceError::StateNotFound)),
677            "second consume must return StateNotFound — GETDEL guarantees one-shot"
678        );
679    }
680
681    #[cfg(feature = "redis-pkce")]
682    #[tokio::test]
683    #[ignore = "requires Redis — set REDIS_URL=redis://localhost:6379"]
684    async fn test_redis_pkce_two_instances_share_state() {
685        let url =
686            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
687
688        // Simulate two server replicas sharing the same Redis instance
689        let store_a = PkceStateStore::new_redis(&url, 300, None)
690            .await
691            .expect("Redis connection failed");
692        let store_b = PkceStateStore::new_redis(&url, 300, None)
693            .await
694            .expect("Redis connection failed");
695
696        // Replica A handles /auth/start
697        let (token, verifier) = store_a.create_state("https://example.com/cb").await.unwrap();
698
699        // Replica B handles /auth/callback — must be able to consume the state
700        let consumed = store_b.consume_state(&token).await.unwrap();
701        assert_eq!(
702            consumed.verifier, verifier,
703            "cross-replica consumption must succeed with shared Redis"
704        );
705    }
706
707    #[cfg(feature = "redis-pkce")]
708    #[tokio::test]
709    #[ignore = "requires Redis — set REDIS_URL=redis://localhost:6379"]
710    async fn test_redis_pkce_tampered_token_rejected() {
711        let url =
712            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
713        let enc = Some(Arc::new(StateEncryptionService::from_raw_key(
714            &[0u8; 32],
715            EncryptionAlgorithm::Chacha20Poly1305,
716        )));
717        let store = PkceStateStore::new_redis(&url, 300, enc)
718            .await
719            .expect("Redis connection failed");
720
721        store.create_state("https://example.com/cb").await.unwrap();
722
723        let result = store.consume_state("completely-fabricated-token").await;
724        assert!(
725            matches!(result, Err(PkceError::StateNotFound)),
726            "tampered token must be rejected"
727        );
728    }
729}