Skip to main content

assay_auth/oidc_provider/
store.rs

1//! Storage traits + PG/SQLite implementations for the OIDC provider.
2//!
3//! Two trait families:
4//!
5//! - [`OidcClientStore`] — CRUD over `auth.oidc_clients`.
6//! - [`OidcUpstreamStore`] — CRUD over `auth.upstream_providers`.
7//!
8//! Plus the concrete row stores:
9//!
10//! - [`OidcCodeStore`] — issue / consume `auth.oidc_authorization_codes`.
11//! - [`OidcRefreshStore`] — write / verify / revoke
12//!   `auth.oidc_refresh_tokens`.
13//! - [`OidcSessionStore`] — `auth.oidc_sessions` lookups for the SSO
14//!   registry + back-channel logout fan-out.
15//! - [`OidcConsentStore`] — per-(user, client) consent grants.
16//! - [`OidcUpstreamStateStore`] — short-lived federation login state.
17//!
18//! All trait methods return `anyhow::Result<…>` so a backend can surface
19//! its native error verbatim. Handlers translate to [`crate::Error`] at
20//! the boundary.
21
22use std::sync::Arc;
23
24use anyhow::{Context, Result};
25use async_trait::async_trait;
26
27use super::types::{
28    AuthorizationCode, ConsentGrant, OidcClient, OidcSession, RefreshToken, TokenAuthMethod,
29    UpstreamLoginState, UpstreamProvider,
30};
31
32#[async_trait]
33pub trait OidcClientStore: Send + Sync + 'static {
34    async fn create(&self, client: &OidcClient) -> Result<()>;
35    async fn get(&self, client_id: &str) -> Result<Option<OidcClient>>;
36    async fn list(&self) -> Result<Vec<OidcClient>>;
37    async fn update(&self, client: &OidcClient) -> Result<()>;
38    async fn delete(&self, client_id: &str) -> Result<bool>;
39    /// Replace the client_secret_hash. Returns Ok(false) if no row matched.
40    async fn rotate_secret_hash(&self, client_id: &str, new_hash: &str) -> Result<bool>;
41}
42
43#[async_trait]
44pub trait OidcUpstreamStore: Send + Sync + 'static {
45    async fn upsert(&self, provider: &UpstreamProvider) -> Result<()>;
46    async fn get(&self, slug: &str) -> Result<Option<UpstreamProvider>>;
47    async fn list(&self) -> Result<Vec<UpstreamProvider>>;
48    async fn delete(&self, slug: &str) -> Result<bool>;
49}
50
51#[async_trait]
52pub trait OidcCodeStore: Send + Sync + 'static {
53    async fn create(&self, code: &AuthorizationCode) -> Result<()>;
54    /// Atomic consume — UPDATE … WHERE consumed = FALSE. If 0 rows are
55    /// affected the code was either missing or already consumed; the
56    /// caller treats both as `invalid_grant`. Returns the row's pre-
57    /// consume snapshot when the consume succeeded.
58    async fn consume(&self, code: &str) -> Result<Option<AuthorizationCode>>;
59}
60
61#[async_trait]
62pub trait OidcRefreshStore: Send + Sync + 'static {
63    async fn create(&self, token: &RefreshToken) -> Result<()>;
64    async fn get(&self, token_hash: &str) -> Result<Option<RefreshToken>>;
65    async fn revoke(&self, token_hash: &str) -> Result<bool>;
66    /// Revoke every refresh token belonging to `user_id` — the replay-
67    /// detection nuke per OAuth 2.1.
68    async fn revoke_for_user(&self, user_id: &str) -> Result<u64>;
69}
70
71#[async_trait]
72pub trait OidcSessionStore: Send + Sync + 'static {
73    async fn create(&self, session: &OidcSession) -> Result<()>;
74    async fn get(&self, sid: &str) -> Result<Option<OidcSession>>;
75    /// Every SSO session row tied to a single assay session — used by
76    /// `/logout` to fan out back-channel logout.
77    async fn list_by_assay_session(&self, assay_session_id: &str) -> Result<Vec<OidcSession>>;
78    async fn delete(&self, sid: &str) -> Result<bool>;
79    async fn delete_by_assay_session(&self, assay_session_id: &str) -> Result<u64>;
80}
81
82#[async_trait]
83pub trait OidcConsentStore: Send + Sync + 'static {
84    async fn upsert(&self, grant: &ConsentGrant) -> Result<()>;
85    async fn get(&self, user_id: &str, client_id: &str) -> Result<Option<ConsentGrant>>;
86    async fn delete(&self, user_id: &str, client_id: &str) -> Result<bool>;
87}
88
89#[async_trait]
90pub trait OidcUpstreamStateStore: Send + Sync + 'static {
91    async fn create(&self, state: &UpstreamLoginState) -> Result<()>;
92    /// Atomically delete and return — single use.
93    async fn take(&self, state: &str) -> Result<Option<UpstreamLoginState>>;
94}
95
96// =====================================================================
97//   POSTGRES
98// =====================================================================
99
100#[cfg(feature = "backend-postgres")]
101mod pg {
102    use super::*;
103    use sqlx::{PgPool, Row};
104
105    fn parse_json_array(s: &str) -> Vec<String> {
106        serde_json::from_str(s).unwrap_or_default()
107    }
108
109    fn encode_json_array(v: &[String]) -> String {
110        serde_json::to_string(v).unwrap_or_else(|_| "[]".to_string())
111    }
112
113    fn map_client_row(row: sqlx::postgres::PgRow) -> OidcClient {
114        let auth_method: String = row.get("token_endpoint_auth_method");
115        OidcClient {
116            client_id: row.get("client_id"),
117            client_secret_hash: row.get("client_secret_hash"),
118            redirect_uris: parse_json_array(&row.get::<String, _>("redirect_uris")),
119            name: row.get("name"),
120            logo_url: row.get("logo_url"),
121            token_endpoint_auth_method: TokenAuthMethod::parse(&auth_method)
122                .unwrap_or(TokenAuthMethod::ClientSecretBasic),
123            default_scopes: parse_json_array(&row.get::<String, _>("default_scopes")),
124            require_consent: row.get("require_consent"),
125            grant_types: parse_json_array(&row.get::<String, _>("grant_types")),
126            response_types: parse_json_array(&row.get::<String, _>("response_types")),
127            pkce_required: row.get("pkce_required"),
128            backchannel_logout_uri: row.get("backchannel_logout_uri"),
129            created_at: row.get("created_at"),
130        }
131    }
132
133    /// Postgres-backed [`OidcClientStore`].
134    #[derive(Clone)]
135    pub struct PostgresOidcClientStore {
136        pool: PgPool,
137    }
138
139    impl PostgresOidcClientStore {
140        pub fn new(pool: PgPool) -> Self {
141            Self { pool }
142        }
143        pub fn into_dyn(self) -> Arc<dyn OidcClientStore> {
144            Arc::new(self)
145        }
146    }
147
148    #[async_trait]
149    impl OidcClientStore for PostgresOidcClientStore {
150        async fn create(&self, c: &OidcClient) -> Result<()> {
151            sqlx::query(
152                "INSERT INTO auth.oidc_clients
153                    (client_id, client_secret_hash, redirect_uris, name, logo_url,
154                     token_endpoint_auth_method, default_scopes, require_consent,
155                     grant_types, response_types, pkce_required,
156                     backchannel_logout_uri, created_at)
157                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
158            )
159            .bind(&c.client_id)
160            .bind(&c.client_secret_hash)
161            .bind(encode_json_array(&c.redirect_uris))
162            .bind(&c.name)
163            .bind(&c.logo_url)
164            .bind(c.token_endpoint_auth_method.as_str())
165            .bind(encode_json_array(&c.default_scopes))
166            .bind(c.require_consent)
167            .bind(encode_json_array(&c.grant_types))
168            .bind(encode_json_array(&c.response_types))
169            .bind(c.pkce_required)
170            .bind(&c.backchannel_logout_uri)
171            .bind(c.created_at)
172            .execute(&self.pool)
173            .await
174            .context("auth.oidc_clients insert")?;
175            Ok(())
176        }
177
178        async fn get(&self, client_id: &str) -> Result<Option<OidcClient>> {
179            let row = sqlx::query(
180                "SELECT * FROM auth.oidc_clients WHERE client_id = $1",
181            )
182            .bind(client_id)
183            .fetch_optional(&self.pool)
184            .await
185            .context("auth.oidc_clients select")?;
186            Ok(row.map(map_client_row))
187        }
188
189        async fn list(&self) -> Result<Vec<OidcClient>> {
190            let rows = sqlx::query("SELECT * FROM auth.oidc_clients ORDER BY created_at")
191                .fetch_all(&self.pool)
192                .await
193                .context("auth.oidc_clients list")?;
194            Ok(rows.into_iter().map(map_client_row).collect())
195        }
196
197        async fn update(&self, c: &OidcClient) -> Result<()> {
198            sqlx::query(
199                "UPDATE auth.oidc_clients SET
200                    client_secret_hash = $2,
201                    redirect_uris = $3,
202                    name = $4,
203                    logo_url = $5,
204                    token_endpoint_auth_method = $6,
205                    default_scopes = $7,
206                    require_consent = $8,
207                    grant_types = $9,
208                    response_types = $10,
209                    pkce_required = $11,
210                    backchannel_logout_uri = $12
211                 WHERE client_id = $1",
212            )
213            .bind(&c.client_id)
214            .bind(&c.client_secret_hash)
215            .bind(encode_json_array(&c.redirect_uris))
216            .bind(&c.name)
217            .bind(&c.logo_url)
218            .bind(c.token_endpoint_auth_method.as_str())
219            .bind(encode_json_array(&c.default_scopes))
220            .bind(c.require_consent)
221            .bind(encode_json_array(&c.grant_types))
222            .bind(encode_json_array(&c.response_types))
223            .bind(c.pkce_required)
224            .bind(&c.backchannel_logout_uri)
225            .execute(&self.pool)
226            .await
227            .context("auth.oidc_clients update")?;
228            Ok(())
229        }
230
231        async fn delete(&self, client_id: &str) -> Result<bool> {
232            let r = sqlx::query("DELETE FROM auth.oidc_clients WHERE client_id = $1")
233                .bind(client_id)
234                .execute(&self.pool)
235                .await
236                .context("auth.oidc_clients delete")?;
237            Ok(r.rows_affected() > 0)
238        }
239
240        async fn rotate_secret_hash(&self, client_id: &str, new_hash: &str) -> Result<bool> {
241            let r = sqlx::query(
242                "UPDATE auth.oidc_clients SET client_secret_hash = $2 WHERE client_id = $1",
243            )
244            .bind(client_id)
245            .bind(new_hash)
246            .execute(&self.pool)
247            .await
248            .context("auth.oidc_clients rotate_secret_hash")?;
249            Ok(r.rows_affected() > 0)
250        }
251    }
252
253    fn map_upstream_row(row: sqlx::postgres::PgRow) -> UpstreamProvider {
254        UpstreamProvider {
255            slug: row.get("slug"),
256            issuer: row.get("issuer"),
257            client_id: row.get("client_id"),
258            client_secret: row.get("client_secret"),
259            display_name: row.get("display_name"),
260            icon_url: row.get("icon_url"),
261            enabled: row.get("enabled"),
262        }
263    }
264
265    #[derive(Clone)]
266    pub struct PostgresOidcUpstreamStore {
267        pool: PgPool,
268    }
269
270    impl PostgresOidcUpstreamStore {
271        pub fn new(pool: PgPool) -> Self {
272            Self { pool }
273        }
274        pub fn into_dyn(self) -> Arc<dyn OidcUpstreamStore> {
275            Arc::new(self)
276        }
277    }
278
279    #[async_trait]
280    impl OidcUpstreamStore for PostgresOidcUpstreamStore {
281        async fn upsert(&self, p: &UpstreamProvider) -> Result<()> {
282            sqlx::query(
283                "INSERT INTO auth.upstream_providers
284                    (slug, issuer, client_id, client_secret, display_name, icon_url, enabled)
285                 VALUES ($1, $2, $3, $4, $5, $6, $7)
286                 ON CONFLICT (slug) DO UPDATE SET
287                    issuer = EXCLUDED.issuer,
288                    client_id = EXCLUDED.client_id,
289                    client_secret = EXCLUDED.client_secret,
290                    display_name = EXCLUDED.display_name,
291                    icon_url = EXCLUDED.icon_url,
292                    enabled = EXCLUDED.enabled",
293            )
294            .bind(&p.slug)
295            .bind(&p.issuer)
296            .bind(&p.client_id)
297            .bind(&p.client_secret)
298            .bind(&p.display_name)
299            .bind(&p.icon_url)
300            .bind(p.enabled)
301            .execute(&self.pool)
302            .await
303            .context("auth.upstream_providers upsert")?;
304            Ok(())
305        }
306
307        async fn get(&self, slug: &str) -> Result<Option<UpstreamProvider>> {
308            let row = sqlx::query("SELECT * FROM auth.upstream_providers WHERE slug = $1")
309                .bind(slug)
310                .fetch_optional(&self.pool)
311                .await
312                .context("auth.upstream_providers select")?;
313            Ok(row.map(map_upstream_row))
314        }
315
316        async fn list(&self) -> Result<Vec<UpstreamProvider>> {
317            let rows = sqlx::query("SELECT * FROM auth.upstream_providers ORDER BY slug")
318                .fetch_all(&self.pool)
319                .await
320                .context("auth.upstream_providers list")?;
321            Ok(rows.into_iter().map(map_upstream_row).collect())
322        }
323
324        async fn delete(&self, slug: &str) -> Result<bool> {
325            let r = sqlx::query("DELETE FROM auth.upstream_providers WHERE slug = $1")
326                .bind(slug)
327                .execute(&self.pool)
328                .await
329                .context("auth.upstream_providers delete")?;
330            Ok(r.rows_affected() > 0)
331        }
332    }
333
334    fn map_code_row(row: sqlx::postgres::PgRow) -> AuthorizationCode {
335        AuthorizationCode {
336            code: row.get("code"),
337            client_id: row.get("client_id"),
338            user_id: row.get("user_id"),
339            redirect_uri: row.get("redirect_uri"),
340            scopes: parse_json_array(&row.get::<String, _>("scopes")),
341            code_challenge: row.get("code_challenge"),
342            code_challenge_method: row.get("code_challenge_method"),
343            nonce: row.get("nonce"),
344            state: row.get("state"),
345            issued_at: row.get("issued_at"),
346            expires_at: row.get("expires_at"),
347            consumed: row.get("consumed"),
348        }
349    }
350
351    #[derive(Clone)]
352    pub struct PostgresOidcCodeStore {
353        pool: PgPool,
354    }
355    impl PostgresOidcCodeStore {
356        pub fn new(pool: PgPool) -> Self {
357            Self { pool }
358        }
359        pub fn into_dyn(self) -> Arc<dyn OidcCodeStore> {
360            Arc::new(self)
361        }
362    }
363
364    #[async_trait]
365    impl OidcCodeStore for PostgresOidcCodeStore {
366        async fn create(&self, c: &AuthorizationCode) -> Result<()> {
367            sqlx::query(
368                "INSERT INTO auth.oidc_authorization_codes
369                    (code, client_id, user_id, redirect_uri, scopes,
370                     code_challenge, code_challenge_method, nonce, state,
371                     issued_at, expires_at, consumed)
372                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
373            )
374            .bind(&c.code)
375            .bind(&c.client_id)
376            .bind(&c.user_id)
377            .bind(&c.redirect_uri)
378            .bind(encode_json_array(&c.scopes))
379            .bind(&c.code_challenge)
380            .bind(&c.code_challenge_method)
381            .bind(&c.nonce)
382            .bind(&c.state)
383            .bind(c.issued_at)
384            .bind(c.expires_at)
385            .bind(c.consumed)
386            .execute(&self.pool)
387            .await
388            .context("auth.oidc_authorization_codes insert")?;
389            Ok(())
390        }
391
392        async fn consume(&self, code: &str) -> Result<Option<AuthorizationCode>> {
393            // RETURNING semantics — fetch the row at the same time we
394            // mark it consumed. The `consumed = FALSE` predicate is the
395            // single-use guarantee.
396            let row = sqlx::query(
397                "UPDATE auth.oidc_authorization_codes
398                    SET consumed = TRUE
399                    WHERE code = $1 AND consumed = FALSE
400                    RETURNING *",
401            )
402            .bind(code)
403            .fetch_optional(&self.pool)
404            .await
405            .context("auth.oidc_authorization_codes consume")?;
406            Ok(row.map(map_code_row))
407        }
408    }
409
410    fn map_refresh_row(row: sqlx::postgres::PgRow) -> RefreshToken {
411        RefreshToken {
412            token_hash: row.get("token_hash"),
413            client_id: row.get("client_id"),
414            user_id: row.get("user_id"),
415            scopes: parse_json_array(&row.get::<String, _>("scopes")),
416            issued_at: row.get("issued_at"),
417            expires_at: row.get("expires_at"),
418            revoked: row.get("revoked"),
419        }
420    }
421
422    #[derive(Clone)]
423    pub struct PostgresOidcRefreshStore {
424        pool: PgPool,
425    }
426    impl PostgresOidcRefreshStore {
427        pub fn new(pool: PgPool) -> Self {
428            Self { pool }
429        }
430        pub fn into_dyn(self) -> Arc<dyn OidcRefreshStore> {
431            Arc::new(self)
432        }
433    }
434
435    #[async_trait]
436    impl OidcRefreshStore for PostgresOidcRefreshStore {
437        async fn create(&self, t: &RefreshToken) -> Result<()> {
438            sqlx::query(
439                "INSERT INTO auth.oidc_refresh_tokens
440                    (token_hash, client_id, user_id, scopes,
441                     issued_at, expires_at, revoked)
442                 VALUES ($1, $2, $3, $4, $5, $6, $7)",
443            )
444            .bind(&t.token_hash)
445            .bind(&t.client_id)
446            .bind(&t.user_id)
447            .bind(encode_json_array(&t.scopes))
448            .bind(t.issued_at)
449            .bind(t.expires_at)
450            .bind(t.revoked)
451            .execute(&self.pool)
452            .await
453            .context("auth.oidc_refresh_tokens insert")?;
454            Ok(())
455        }
456
457        async fn get(&self, token_hash: &str) -> Result<Option<RefreshToken>> {
458            let row = sqlx::query(
459                "SELECT * FROM auth.oidc_refresh_tokens WHERE token_hash = $1",
460            )
461            .bind(token_hash)
462            .fetch_optional(&self.pool)
463            .await
464            .context("auth.oidc_refresh_tokens select")?;
465            Ok(row.map(map_refresh_row))
466        }
467
468        async fn revoke(&self, token_hash: &str) -> Result<bool> {
469            let r = sqlx::query(
470                "UPDATE auth.oidc_refresh_tokens SET revoked = TRUE WHERE token_hash = $1",
471            )
472            .bind(token_hash)
473            .execute(&self.pool)
474            .await
475            .context("auth.oidc_refresh_tokens revoke")?;
476            Ok(r.rows_affected() > 0)
477        }
478
479        async fn revoke_for_user(&self, user_id: &str) -> Result<u64> {
480            let r = sqlx::query(
481                "UPDATE auth.oidc_refresh_tokens SET revoked = TRUE WHERE user_id = $1",
482            )
483            .bind(user_id)
484            .execute(&self.pool)
485            .await
486            .context("auth.oidc_refresh_tokens revoke_for_user")?;
487            Ok(r.rows_affected())
488        }
489    }
490
491    fn map_session_row(row: sqlx::postgres::PgRow) -> OidcSession {
492        OidcSession {
493            sid: row.get("sid"),
494            user_id: row.get("user_id"),
495            client_id: row.get("client_id"),
496            assay_session_id: row.get("assay_session_id"),
497            issued_at: row.get("issued_at"),
498            backchannel_logout_uri: row.get("backchannel_logout_uri"),
499        }
500    }
501
502    #[derive(Clone)]
503    pub struct PostgresOidcSessionStore {
504        pool: PgPool,
505    }
506    impl PostgresOidcSessionStore {
507        pub fn new(pool: PgPool) -> Self {
508            Self { pool }
509        }
510        pub fn into_dyn(self) -> Arc<dyn OidcSessionStore> {
511            Arc::new(self)
512        }
513    }
514
515    #[async_trait]
516    impl OidcSessionStore for PostgresOidcSessionStore {
517        async fn create(&self, s: &OidcSession) -> Result<()> {
518            sqlx::query(
519                "INSERT INTO auth.oidc_sessions
520                    (sid, user_id, client_id, assay_session_id, issued_at, backchannel_logout_uri)
521                 VALUES ($1, $2, $3, $4, $5, $6)",
522            )
523            .bind(&s.sid)
524            .bind(&s.user_id)
525            .bind(&s.client_id)
526            .bind(&s.assay_session_id)
527            .bind(s.issued_at)
528            .bind(&s.backchannel_logout_uri)
529            .execute(&self.pool)
530            .await
531            .context("auth.oidc_sessions insert")?;
532            Ok(())
533        }
534
535        async fn get(&self, sid: &str) -> Result<Option<OidcSession>> {
536            let row = sqlx::query("SELECT * FROM auth.oidc_sessions WHERE sid = $1")
537                .bind(sid)
538                .fetch_optional(&self.pool)
539                .await
540                .context("auth.oidc_sessions get")?;
541            Ok(row.map(map_session_row))
542        }
543
544        async fn list_by_assay_session(&self, assay_session_id: &str) -> Result<Vec<OidcSession>> {
545            let rows = sqlx::query(
546                "SELECT * FROM auth.oidc_sessions WHERE assay_session_id = $1",
547            )
548            .bind(assay_session_id)
549            .fetch_all(&self.pool)
550            .await
551            .context("auth.oidc_sessions list_by_assay_session")?;
552            Ok(rows.into_iter().map(map_session_row).collect())
553        }
554
555        async fn delete(&self, sid: &str) -> Result<bool> {
556            let r = sqlx::query("DELETE FROM auth.oidc_sessions WHERE sid = $1")
557                .bind(sid)
558                .execute(&self.pool)
559                .await
560                .context("auth.oidc_sessions delete")?;
561            Ok(r.rows_affected() > 0)
562        }
563
564        async fn delete_by_assay_session(&self, assay_session_id: &str) -> Result<u64> {
565            let r = sqlx::query(
566                "DELETE FROM auth.oidc_sessions WHERE assay_session_id = $1",
567            )
568            .bind(assay_session_id)
569            .execute(&self.pool)
570            .await
571            .context("auth.oidc_sessions delete_by_assay_session")?;
572            Ok(r.rows_affected())
573        }
574    }
575
576    fn map_consent_row(row: sqlx::postgres::PgRow) -> ConsentGrant {
577        ConsentGrant {
578            user_id: row.get("user_id"),
579            client_id: row.get("client_id"),
580            scopes: parse_json_array(&row.get::<String, _>("scopes")),
581            granted_at: row.get("granted_at"),
582        }
583    }
584
585    #[derive(Clone)]
586    pub struct PostgresOidcConsentStore {
587        pool: PgPool,
588    }
589    impl PostgresOidcConsentStore {
590        pub fn new(pool: PgPool) -> Self {
591            Self { pool }
592        }
593        pub fn into_dyn(self) -> Arc<dyn OidcConsentStore> {
594            Arc::new(self)
595        }
596    }
597
598    #[async_trait]
599    impl OidcConsentStore for PostgresOidcConsentStore {
600        async fn upsert(&self, g: &ConsentGrant) -> Result<()> {
601            sqlx::query(
602                "INSERT INTO auth.oidc_consents (user_id, client_id, scopes, granted_at)
603                 VALUES ($1, $2, $3, $4)
604                 ON CONFLICT (user_id, client_id) DO UPDATE
605                     SET scopes = EXCLUDED.scopes,
606                         granted_at = EXCLUDED.granted_at",
607            )
608            .bind(&g.user_id)
609            .bind(&g.client_id)
610            .bind(encode_json_array(&g.scopes))
611            .bind(g.granted_at)
612            .execute(&self.pool)
613            .await
614            .context("auth.oidc_consents upsert")?;
615            Ok(())
616        }
617
618        async fn get(&self, user_id: &str, client_id: &str) -> Result<Option<ConsentGrant>> {
619            let row = sqlx::query(
620                "SELECT * FROM auth.oidc_consents WHERE user_id = $1 AND client_id = $2",
621            )
622            .bind(user_id)
623            .bind(client_id)
624            .fetch_optional(&self.pool)
625            .await
626            .context("auth.oidc_consents get")?;
627            Ok(row.map(map_consent_row))
628        }
629
630        async fn delete(&self, user_id: &str, client_id: &str) -> Result<bool> {
631            let r = sqlx::query(
632                "DELETE FROM auth.oidc_consents WHERE user_id = $1 AND client_id = $2",
633            )
634            .bind(user_id)
635            .bind(client_id)
636            .execute(&self.pool)
637            .await
638            .context("auth.oidc_consents delete")?;
639            Ok(r.rows_affected() > 0)
640        }
641    }
642
643    fn map_upstream_state_row(row: sqlx::postgres::PgRow) -> UpstreamLoginState {
644        UpstreamLoginState {
645            state: row.get("state"),
646            provider_slug: row.get("provider_slug"),
647            nonce: row.get("nonce"),
648            pkce_verifier: row.get("pkce_verifier"),
649            return_to: row.get("return_to"),
650            created_at: row.get("created_at"),
651            expires_at: row.get("expires_at"),
652        }
653    }
654
655    #[derive(Clone)]
656    pub struct PostgresOidcUpstreamStateStore {
657        pool: PgPool,
658    }
659    impl PostgresOidcUpstreamStateStore {
660        pub fn new(pool: PgPool) -> Self {
661            Self { pool }
662        }
663        pub fn into_dyn(self) -> Arc<dyn OidcUpstreamStateStore> {
664            Arc::new(self)
665        }
666    }
667
668    #[async_trait]
669    impl OidcUpstreamStateStore for PostgresOidcUpstreamStateStore {
670        async fn create(&self, s: &UpstreamLoginState) -> Result<()> {
671            sqlx::query(
672                "INSERT INTO auth.oidc_upstream_states
673                    (state, provider_slug, nonce, pkce_verifier, return_to,
674                     created_at, expires_at)
675                 VALUES ($1, $2, $3, $4, $5, $6, $7)",
676            )
677            .bind(&s.state)
678            .bind(&s.provider_slug)
679            .bind(&s.nonce)
680            .bind(&s.pkce_verifier)
681            .bind(&s.return_to)
682            .bind(s.created_at)
683            .bind(s.expires_at)
684            .execute(&self.pool)
685            .await
686            .context("auth.oidc_upstream_states insert")?;
687            Ok(())
688        }
689
690        async fn take(&self, state: &str) -> Result<Option<UpstreamLoginState>> {
691            // Atomic delete-and-return — single-use semantic.
692            let row = sqlx::query(
693                "DELETE FROM auth.oidc_upstream_states WHERE state = $1 RETURNING *",
694            )
695            .bind(state)
696            .fetch_optional(&self.pool)
697            .await
698            .context("auth.oidc_upstream_states take")?;
699            Ok(row.map(map_upstream_state_row))
700        }
701    }
702}
703
704#[cfg(feature = "backend-postgres")]
705pub use pg::{
706    PostgresOidcClientStore, PostgresOidcCodeStore, PostgresOidcConsentStore,
707    PostgresOidcRefreshStore, PostgresOidcSessionStore, PostgresOidcUpstreamStateStore,
708    PostgresOidcUpstreamStore,
709};
710
711// =====================================================================
712//   SQLITE
713// =====================================================================
714
715#[cfg(feature = "backend-sqlite")]
716mod sqlite_impl {
717    use super::*;
718    use sqlx::{Row, SqlitePool};
719
720    fn parse_json_array(s: &str) -> Vec<String> {
721        serde_json::from_str(s).unwrap_or_default()
722    }
723    fn encode_json_array(v: &[String]) -> String {
724        serde_json::to_string(v).unwrap_or_else(|_| "[]".to_string())
725    }
726    fn b(v: bool) -> i64 {
727        if v { 1 } else { 0 }
728    }
729    fn ub(v: i64) -> bool {
730        v != 0
731    }
732
733    fn map_client_row(row: sqlx::sqlite::SqliteRow) -> OidcClient {
734        let auth_method: String = row.get("token_endpoint_auth_method");
735        OidcClient {
736            client_id: row.get("client_id"),
737            client_secret_hash: row.get("client_secret_hash"),
738            redirect_uris: parse_json_array(&row.get::<String, _>("redirect_uris")),
739            name: row.get("name"),
740            logo_url: row.get("logo_url"),
741            token_endpoint_auth_method: TokenAuthMethod::parse(&auth_method)
742                .unwrap_or(TokenAuthMethod::ClientSecretBasic),
743            default_scopes: parse_json_array(&row.get::<String, _>("default_scopes")),
744            require_consent: ub(row.get("require_consent")),
745            grant_types: parse_json_array(&row.get::<String, _>("grant_types")),
746            response_types: parse_json_array(&row.get::<String, _>("response_types")),
747            pkce_required: ub(row.get("pkce_required")),
748            backchannel_logout_uri: row.get("backchannel_logout_uri"),
749            created_at: row.get("created_at"),
750        }
751    }
752
753    #[derive(Clone)]
754    pub struct SqliteOidcClientStore {
755        pool: SqlitePool,
756    }
757    impl SqliteOidcClientStore {
758        pub fn new(pool: SqlitePool) -> Self {
759            Self { pool }
760        }
761        pub fn into_dyn(self) -> Arc<dyn OidcClientStore> {
762            Arc::new(self)
763        }
764    }
765
766    #[async_trait]
767    impl OidcClientStore for SqliteOidcClientStore {
768        async fn create(&self, c: &OidcClient) -> Result<()> {
769            sqlx::query(
770                "INSERT INTO auth.oidc_clients
771                    (client_id, client_secret_hash, redirect_uris, name, logo_url,
772                     token_endpoint_auth_method, default_scopes, require_consent,
773                     grant_types, response_types, pkce_required,
774                     backchannel_logout_uri, created_at)
775                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
776            )
777            .bind(&c.client_id)
778            .bind(&c.client_secret_hash)
779            .bind(encode_json_array(&c.redirect_uris))
780            .bind(&c.name)
781            .bind(&c.logo_url)
782            .bind(c.token_endpoint_auth_method.as_str())
783            .bind(encode_json_array(&c.default_scopes))
784            .bind(b(c.require_consent))
785            .bind(encode_json_array(&c.grant_types))
786            .bind(encode_json_array(&c.response_types))
787            .bind(b(c.pkce_required))
788            .bind(&c.backchannel_logout_uri)
789            .bind(c.created_at)
790            .execute(&self.pool)
791            .await
792            .context("auth.oidc_clients insert")?;
793            Ok(())
794        }
795
796        async fn get(&self, client_id: &str) -> Result<Option<OidcClient>> {
797            let row = sqlx::query("SELECT * FROM auth.oidc_clients WHERE client_id = ?")
798                .bind(client_id)
799                .fetch_optional(&self.pool)
800                .await
801                .context("auth.oidc_clients get")?;
802            Ok(row.map(map_client_row))
803        }
804
805        async fn list(&self) -> Result<Vec<OidcClient>> {
806            let rows = sqlx::query("SELECT * FROM auth.oidc_clients ORDER BY created_at")
807                .fetch_all(&self.pool)
808                .await
809                .context("auth.oidc_clients list")?;
810            Ok(rows.into_iter().map(map_client_row).collect())
811        }
812
813        async fn update(&self, c: &OidcClient) -> Result<()> {
814            sqlx::query(
815                "UPDATE auth.oidc_clients SET
816                    client_secret_hash = ?,
817                    redirect_uris = ?,
818                    name = ?,
819                    logo_url = ?,
820                    token_endpoint_auth_method = ?,
821                    default_scopes = ?,
822                    require_consent = ?,
823                    grant_types = ?,
824                    response_types = ?,
825                    pkce_required = ?,
826                    backchannel_logout_uri = ?
827                 WHERE client_id = ?",
828            )
829            .bind(&c.client_secret_hash)
830            .bind(encode_json_array(&c.redirect_uris))
831            .bind(&c.name)
832            .bind(&c.logo_url)
833            .bind(c.token_endpoint_auth_method.as_str())
834            .bind(encode_json_array(&c.default_scopes))
835            .bind(b(c.require_consent))
836            .bind(encode_json_array(&c.grant_types))
837            .bind(encode_json_array(&c.response_types))
838            .bind(b(c.pkce_required))
839            .bind(&c.backchannel_logout_uri)
840            .bind(&c.client_id)
841            .execute(&self.pool)
842            .await
843            .context("auth.oidc_clients update")?;
844            Ok(())
845        }
846
847        async fn delete(&self, client_id: &str) -> Result<bool> {
848            let r = sqlx::query("DELETE FROM auth.oidc_clients WHERE client_id = ?")
849                .bind(client_id)
850                .execute(&self.pool)
851                .await
852                .context("auth.oidc_clients delete")?;
853            Ok(r.rows_affected() > 0)
854        }
855
856        async fn rotate_secret_hash(&self, client_id: &str, new_hash: &str) -> Result<bool> {
857            let r = sqlx::query(
858                "UPDATE auth.oidc_clients SET client_secret_hash = ? WHERE client_id = ?",
859            )
860            .bind(new_hash)
861            .bind(client_id)
862            .execute(&self.pool)
863            .await
864            .context("auth.oidc_clients rotate_secret_hash")?;
865            Ok(r.rows_affected() > 0)
866        }
867    }
868
869    fn map_upstream_row(row: sqlx::sqlite::SqliteRow) -> UpstreamProvider {
870        UpstreamProvider {
871            slug: row.get("slug"),
872            issuer: row.get("issuer"),
873            client_id: row.get("client_id"),
874            client_secret: row.get("client_secret"),
875            display_name: row.get("display_name"),
876            icon_url: row.get("icon_url"),
877            enabled: ub(row.get("enabled")),
878        }
879    }
880
881    #[derive(Clone)]
882    pub struct SqliteOidcUpstreamStore {
883        pool: SqlitePool,
884    }
885    impl SqliteOidcUpstreamStore {
886        pub fn new(pool: SqlitePool) -> Self {
887            Self { pool }
888        }
889        pub fn into_dyn(self) -> Arc<dyn OidcUpstreamStore> {
890            Arc::new(self)
891        }
892    }
893
894    #[async_trait]
895    impl OidcUpstreamStore for SqliteOidcUpstreamStore {
896        async fn upsert(&self, p: &UpstreamProvider) -> Result<()> {
897            sqlx::query(
898                "INSERT INTO auth.upstream_providers
899                    (slug, issuer, client_id, client_secret, display_name, icon_url, enabled)
900                 VALUES (?, ?, ?, ?, ?, ?, ?)
901                 ON CONFLICT (slug) DO UPDATE SET
902                    issuer = excluded.issuer,
903                    client_id = excluded.client_id,
904                    client_secret = excluded.client_secret,
905                    display_name = excluded.display_name,
906                    icon_url = excluded.icon_url,
907                    enabled = excluded.enabled",
908            )
909            .bind(&p.slug)
910            .bind(&p.issuer)
911            .bind(&p.client_id)
912            .bind(&p.client_secret)
913            .bind(&p.display_name)
914            .bind(&p.icon_url)
915            .bind(b(p.enabled))
916            .execute(&self.pool)
917            .await
918            .context("auth.upstream_providers upsert")?;
919            Ok(())
920        }
921
922        async fn get(&self, slug: &str) -> Result<Option<UpstreamProvider>> {
923            let row = sqlx::query("SELECT * FROM auth.upstream_providers WHERE slug = ?")
924                .bind(slug)
925                .fetch_optional(&self.pool)
926                .await
927                .context("auth.upstream_providers get")?;
928            Ok(row.map(map_upstream_row))
929        }
930
931        async fn list(&self) -> Result<Vec<UpstreamProvider>> {
932            let rows = sqlx::query("SELECT * FROM auth.upstream_providers ORDER BY slug")
933                .fetch_all(&self.pool)
934                .await
935                .context("auth.upstream_providers list")?;
936            Ok(rows.into_iter().map(map_upstream_row).collect())
937        }
938
939        async fn delete(&self, slug: &str) -> Result<bool> {
940            let r = sqlx::query("DELETE FROM auth.upstream_providers WHERE slug = ?")
941                .bind(slug)
942                .execute(&self.pool)
943                .await
944                .context("auth.upstream_providers delete")?;
945            Ok(r.rows_affected() > 0)
946        }
947    }
948
949    fn map_code_row(row: sqlx::sqlite::SqliteRow) -> AuthorizationCode {
950        AuthorizationCode {
951            code: row.get("code"),
952            client_id: row.get("client_id"),
953            user_id: row.get("user_id"),
954            redirect_uri: row.get("redirect_uri"),
955            scopes: parse_json_array(&row.get::<String, _>("scopes")),
956            code_challenge: row.get("code_challenge"),
957            code_challenge_method: row.get("code_challenge_method"),
958            nonce: row.get("nonce"),
959            state: row.get("state"),
960            issued_at: row.get("issued_at"),
961            expires_at: row.get("expires_at"),
962            consumed: ub(row.get("consumed")),
963        }
964    }
965
966    #[derive(Clone)]
967    pub struct SqliteOidcCodeStore {
968        pool: SqlitePool,
969    }
970    impl SqliteOidcCodeStore {
971        pub fn new(pool: SqlitePool) -> Self {
972            Self { pool }
973        }
974        pub fn into_dyn(self) -> Arc<dyn OidcCodeStore> {
975            Arc::new(self)
976        }
977    }
978
979    #[async_trait]
980    impl OidcCodeStore for SqliteOidcCodeStore {
981        async fn create(&self, c: &AuthorizationCode) -> Result<()> {
982            sqlx::query(
983                "INSERT INTO auth.oidc_authorization_codes
984                    (code, client_id, user_id, redirect_uri, scopes,
985                     code_challenge, code_challenge_method, nonce, state,
986                     issued_at, expires_at, consumed)
987                 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
988            )
989            .bind(&c.code)
990            .bind(&c.client_id)
991            .bind(&c.user_id)
992            .bind(&c.redirect_uri)
993            .bind(encode_json_array(&c.scopes))
994            .bind(&c.code_challenge)
995            .bind(&c.code_challenge_method)
996            .bind(&c.nonce)
997            .bind(&c.state)
998            .bind(c.issued_at)
999            .bind(c.expires_at)
1000            .bind(b(c.consumed))
1001            .execute(&self.pool)
1002            .await
1003            .context("auth.oidc_authorization_codes insert")?;
1004            Ok(())
1005        }
1006
1007        async fn consume(&self, code: &str) -> Result<Option<AuthorizationCode>> {
1008            // SQLite has no RETURNING-after-UPDATE round-trip helper for
1009            // every backend version; do the load + conditional-update in
1010            // a transaction to preserve "consume returns row" semantics.
1011            let mut tx = self.pool.begin().await.context("begin consume tx")?;
1012            let row = sqlx::query(
1013                "SELECT * FROM auth.oidc_authorization_codes
1014                 WHERE code = ? AND consumed = 0",
1015            )
1016            .bind(code)
1017            .fetch_optional(&mut *tx)
1018            .await
1019            .context("auth.oidc_authorization_codes consume select")?;
1020            let Some(row) = row else {
1021                tx.rollback().await.ok();
1022                return Ok(None);
1023            };
1024            let result = sqlx::query(
1025                "UPDATE auth.oidc_authorization_codes SET consumed = 1 \
1026                 WHERE code = ? AND consumed = 0",
1027            )
1028            .bind(code)
1029            .execute(&mut *tx)
1030            .await
1031            .context("auth.oidc_authorization_codes consume update")?;
1032            if result.rows_affected() == 0 {
1033                tx.rollback().await.ok();
1034                return Ok(None);
1035            }
1036            tx.commit().await.context("commit consume tx")?;
1037            Ok(Some(map_code_row(row)))
1038        }
1039    }
1040
1041    fn map_refresh_row(row: sqlx::sqlite::SqliteRow) -> RefreshToken {
1042        RefreshToken {
1043            token_hash: row.get("token_hash"),
1044            client_id: row.get("client_id"),
1045            user_id: row.get("user_id"),
1046            scopes: parse_json_array(&row.get::<String, _>("scopes")),
1047            issued_at: row.get("issued_at"),
1048            expires_at: row.get("expires_at"),
1049            revoked: ub(row.get("revoked")),
1050        }
1051    }
1052
1053    #[derive(Clone)]
1054    pub struct SqliteOidcRefreshStore {
1055        pool: SqlitePool,
1056    }
1057    impl SqliteOidcRefreshStore {
1058        pub fn new(pool: SqlitePool) -> Self {
1059            Self { pool }
1060        }
1061        pub fn into_dyn(self) -> Arc<dyn OidcRefreshStore> {
1062            Arc::new(self)
1063        }
1064    }
1065
1066    #[async_trait]
1067    impl OidcRefreshStore for SqliteOidcRefreshStore {
1068        async fn create(&self, t: &RefreshToken) -> Result<()> {
1069            sqlx::query(
1070                "INSERT INTO auth.oidc_refresh_tokens
1071                    (token_hash, client_id, user_id, scopes,
1072                     issued_at, expires_at, revoked)
1073                 VALUES (?, ?, ?, ?, ?, ?, ?)",
1074            )
1075            .bind(&t.token_hash)
1076            .bind(&t.client_id)
1077            .bind(&t.user_id)
1078            .bind(encode_json_array(&t.scopes))
1079            .bind(t.issued_at)
1080            .bind(t.expires_at)
1081            .bind(b(t.revoked))
1082            .execute(&self.pool)
1083            .await
1084            .context("auth.oidc_refresh_tokens insert")?;
1085            Ok(())
1086        }
1087
1088        async fn get(&self, token_hash: &str) -> Result<Option<RefreshToken>> {
1089            let row = sqlx::query(
1090                "SELECT * FROM auth.oidc_refresh_tokens WHERE token_hash = ?",
1091            )
1092            .bind(token_hash)
1093            .fetch_optional(&self.pool)
1094            .await
1095            .context("auth.oidc_refresh_tokens get")?;
1096            Ok(row.map(map_refresh_row))
1097        }
1098
1099        async fn revoke(&self, token_hash: &str) -> Result<bool> {
1100            let r = sqlx::query(
1101                "UPDATE auth.oidc_refresh_tokens SET revoked = 1 WHERE token_hash = ?",
1102            )
1103            .bind(token_hash)
1104            .execute(&self.pool)
1105            .await
1106            .context("auth.oidc_refresh_tokens revoke")?;
1107            Ok(r.rows_affected() > 0)
1108        }
1109
1110        async fn revoke_for_user(&self, user_id: &str) -> Result<u64> {
1111            let r = sqlx::query(
1112                "UPDATE auth.oidc_refresh_tokens SET revoked = 1 WHERE user_id = ?",
1113            )
1114            .bind(user_id)
1115            .execute(&self.pool)
1116            .await
1117            .context("auth.oidc_refresh_tokens revoke_for_user")?;
1118            Ok(r.rows_affected())
1119        }
1120    }
1121
1122    fn map_session_row(row: sqlx::sqlite::SqliteRow) -> OidcSession {
1123        OidcSession {
1124            sid: row.get("sid"),
1125            user_id: row.get("user_id"),
1126            client_id: row.get("client_id"),
1127            assay_session_id: row.get("assay_session_id"),
1128            issued_at: row.get("issued_at"),
1129            backchannel_logout_uri: row.get("backchannel_logout_uri"),
1130        }
1131    }
1132
1133    #[derive(Clone)]
1134    pub struct SqliteOidcSessionStore {
1135        pool: SqlitePool,
1136    }
1137    impl SqliteOidcSessionStore {
1138        pub fn new(pool: SqlitePool) -> Self {
1139            Self { pool }
1140        }
1141        pub fn into_dyn(self) -> Arc<dyn OidcSessionStore> {
1142            Arc::new(self)
1143        }
1144    }
1145
1146    #[async_trait]
1147    impl OidcSessionStore for SqliteOidcSessionStore {
1148        async fn create(&self, s: &OidcSession) -> Result<()> {
1149            sqlx::query(
1150                "INSERT INTO auth.oidc_sessions
1151                    (sid, user_id, client_id, assay_session_id, issued_at, backchannel_logout_uri)
1152                 VALUES (?, ?, ?, ?, ?, ?)",
1153            )
1154            .bind(&s.sid)
1155            .bind(&s.user_id)
1156            .bind(&s.client_id)
1157            .bind(&s.assay_session_id)
1158            .bind(s.issued_at)
1159            .bind(&s.backchannel_logout_uri)
1160            .execute(&self.pool)
1161            .await
1162            .context("auth.oidc_sessions insert")?;
1163            Ok(())
1164        }
1165
1166        async fn get(&self, sid: &str) -> Result<Option<OidcSession>> {
1167            let row = sqlx::query("SELECT * FROM auth.oidc_sessions WHERE sid = ?")
1168                .bind(sid)
1169                .fetch_optional(&self.pool)
1170                .await
1171                .context("auth.oidc_sessions get")?;
1172            Ok(row.map(map_session_row))
1173        }
1174
1175        async fn list_by_assay_session(&self, assay_session_id: &str) -> Result<Vec<OidcSession>> {
1176            let rows = sqlx::query(
1177                "SELECT * FROM auth.oidc_sessions WHERE assay_session_id = ?",
1178            )
1179            .bind(assay_session_id)
1180            .fetch_all(&self.pool)
1181            .await
1182            .context("auth.oidc_sessions list_by_assay_session")?;
1183            Ok(rows.into_iter().map(map_session_row).collect())
1184        }
1185
1186        async fn delete(&self, sid: &str) -> Result<bool> {
1187            let r = sqlx::query("DELETE FROM auth.oidc_sessions WHERE sid = ?")
1188                .bind(sid)
1189                .execute(&self.pool)
1190                .await
1191                .context("auth.oidc_sessions delete")?;
1192            Ok(r.rows_affected() > 0)
1193        }
1194
1195        async fn delete_by_assay_session(&self, assay_session_id: &str) -> Result<u64> {
1196            let r = sqlx::query(
1197                "DELETE FROM auth.oidc_sessions WHERE assay_session_id = ?",
1198            )
1199            .bind(assay_session_id)
1200            .execute(&self.pool)
1201            .await
1202            .context("auth.oidc_sessions delete_by_assay_session")?;
1203            Ok(r.rows_affected())
1204        }
1205    }
1206
1207    fn map_consent_row(row: sqlx::sqlite::SqliteRow) -> ConsentGrant {
1208        ConsentGrant {
1209            user_id: row.get("user_id"),
1210            client_id: row.get("client_id"),
1211            scopes: parse_json_array(&row.get::<String, _>("scopes")),
1212            granted_at: row.get("granted_at"),
1213        }
1214    }
1215
1216    #[derive(Clone)]
1217    pub struct SqliteOidcConsentStore {
1218        pool: SqlitePool,
1219    }
1220    impl SqliteOidcConsentStore {
1221        pub fn new(pool: SqlitePool) -> Self {
1222            Self { pool }
1223        }
1224        pub fn into_dyn(self) -> Arc<dyn OidcConsentStore> {
1225            Arc::new(self)
1226        }
1227    }
1228
1229    #[async_trait]
1230    impl OidcConsentStore for SqliteOidcConsentStore {
1231        async fn upsert(&self, g: &ConsentGrant) -> Result<()> {
1232            sqlx::query(
1233                "INSERT INTO auth.oidc_consents (user_id, client_id, scopes, granted_at)
1234                 VALUES (?, ?, ?, ?)
1235                 ON CONFLICT (user_id, client_id) DO UPDATE
1236                     SET scopes = excluded.scopes,
1237                         granted_at = excluded.granted_at",
1238            )
1239            .bind(&g.user_id)
1240            .bind(&g.client_id)
1241            .bind(encode_json_array(&g.scopes))
1242            .bind(g.granted_at)
1243            .execute(&self.pool)
1244            .await
1245            .context("auth.oidc_consents upsert")?;
1246            Ok(())
1247        }
1248
1249        async fn get(&self, user_id: &str, client_id: &str) -> Result<Option<ConsentGrant>> {
1250            let row = sqlx::query(
1251                "SELECT * FROM auth.oidc_consents WHERE user_id = ? AND client_id = ?",
1252            )
1253            .bind(user_id)
1254            .bind(client_id)
1255            .fetch_optional(&self.pool)
1256            .await
1257            .context("auth.oidc_consents get")?;
1258            Ok(row.map(map_consent_row))
1259        }
1260
1261        async fn delete(&self, user_id: &str, client_id: &str) -> Result<bool> {
1262            let r = sqlx::query(
1263                "DELETE FROM auth.oidc_consents WHERE user_id = ? AND client_id = ?",
1264            )
1265            .bind(user_id)
1266            .bind(client_id)
1267            .execute(&self.pool)
1268            .await
1269            .context("auth.oidc_consents delete")?;
1270            Ok(r.rows_affected() > 0)
1271        }
1272    }
1273
1274    fn map_upstream_state_row(row: sqlx::sqlite::SqliteRow) -> UpstreamLoginState {
1275        UpstreamLoginState {
1276            state: row.get("state"),
1277            provider_slug: row.get("provider_slug"),
1278            nonce: row.get("nonce"),
1279            pkce_verifier: row.get("pkce_verifier"),
1280            return_to: row.get("return_to"),
1281            created_at: row.get("created_at"),
1282            expires_at: row.get("expires_at"),
1283        }
1284    }
1285
1286    #[derive(Clone)]
1287    pub struct SqliteOidcUpstreamStateStore {
1288        pool: SqlitePool,
1289    }
1290    impl SqliteOidcUpstreamStateStore {
1291        pub fn new(pool: SqlitePool) -> Self {
1292            Self { pool }
1293        }
1294        pub fn into_dyn(self) -> Arc<dyn OidcUpstreamStateStore> {
1295            Arc::new(self)
1296        }
1297    }
1298
1299    #[async_trait]
1300    impl OidcUpstreamStateStore for SqliteOidcUpstreamStateStore {
1301        async fn create(&self, s: &UpstreamLoginState) -> Result<()> {
1302            sqlx::query(
1303                "INSERT INTO auth.oidc_upstream_states
1304                    (state, provider_slug, nonce, pkce_verifier, return_to,
1305                     created_at, expires_at)
1306                 VALUES (?, ?, ?, ?, ?, ?, ?)",
1307            )
1308            .bind(&s.state)
1309            .bind(&s.provider_slug)
1310            .bind(&s.nonce)
1311            .bind(&s.pkce_verifier)
1312            .bind(&s.return_to)
1313            .bind(s.created_at)
1314            .bind(s.expires_at)
1315            .execute(&self.pool)
1316            .await
1317            .context("auth.oidc_upstream_states insert")?;
1318            Ok(())
1319        }
1320
1321        async fn take(&self, state: &str) -> Result<Option<UpstreamLoginState>> {
1322            // SQLite likewise lacks RETURNING in some sqlx mappings; do a
1323            // load + delete in a tx for parity with the PG behaviour.
1324            let mut tx = self.pool.begin().await.context("begin take tx")?;
1325            let row = sqlx::query(
1326                "SELECT * FROM auth.oidc_upstream_states WHERE state = ?",
1327            )
1328            .bind(state)
1329            .fetch_optional(&mut *tx)
1330            .await
1331            .context("auth.oidc_upstream_states take select")?;
1332            let Some(row) = row else {
1333                tx.rollback().await.ok();
1334                return Ok(None);
1335            };
1336            sqlx::query("DELETE FROM auth.oidc_upstream_states WHERE state = ?")
1337                .bind(state)
1338                .execute(&mut *tx)
1339                .await
1340                .context("auth.oidc_upstream_states take delete")?;
1341            tx.commit().await.context("commit take tx")?;
1342            Ok(Some(map_upstream_state_row(row)))
1343        }
1344    }
1345}
1346
1347#[cfg(feature = "backend-sqlite")]
1348pub use sqlite_impl::{
1349    SqliteOidcClientStore, SqliteOidcCodeStore, SqliteOidcConsentStore, SqliteOidcRefreshStore,
1350    SqliteOidcSessionStore, SqliteOidcUpstreamStateStore, SqliteOidcUpstreamStore,
1351};