Skip to main content

authx_storage/sqlx/
postgres.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4use uuid::Uuid;
5
6use authx_core::{
7    error::{AuthError, Result, StorageError},
8    models::{
9        ApiKey, AuditLog, AuthorizationCode, CreateApiKey, CreateAuditLog, CreateAuthorizationCode,
10        CreateCredential, CreateDeviceCode, CreateInvite, CreateOidcClient,
11        CreateOidcFederationProvider, CreateOidcToken, CreateOrg, CreateSession, CreateUser,
12        Credential, CredentialKind, DeviceCode, Invite, Membership, OAuthAccount, OidcClient,
13        OidcFederationProvider, OidcToken, OidcTokenType, Organization, Role, Session, UpdateUser,
14        UpsertOAuthAccount, User,
15    },
16};
17
18use crate::ports::{
19    ApiKeyRepository, AuditLogRepository, AuthorizationCodeRepository, CredentialRepository,
20    DeviceCodeRepository, InviteRepository, OAuthAccountRepository, OidcClientRepository,
21    OidcFederationProviderRepository, OidcTokenRepository, OrgRepository, SessionRepository,
22    UserRepository,
23};
24
25// ── Store ─────────────────────────────────────────────────────────────────────
26
27/// Postgres-backed storage adapter.
28///
29/// Wrap a [`PgPool`] and pass this to [`AuthxState::new`].
30#[derive(Clone)]
31pub struct PostgresStore {
32    pub pool: PgPool,
33}
34
35impl PostgresStore {
36    pub async fn connect(database_url: &str) -> std::result::Result<Self, sqlx::Error> {
37        let pool = PgPoolOptions::new()
38            .max_connections(10)
39            .connect(database_url)
40            .await?;
41        tracing::info!("postgres pool connected");
42        Ok(Self { pool })
43    }
44
45    pub fn from_pool(pool: PgPool) -> Self {
46        Self { pool }
47    }
48
49    pub async fn migrate(pool: &PgPool) -> std::result::Result<(), sqlx::migrate::MigrateError> {
50        sqlx::migrate!("src/sqlx/migrations").run(pool).await?;
51        tracing::info!("database migrations applied");
52        Ok(())
53    }
54}
55
56// ── Helpers ───────────────────────────────────────────────────────────────────
57
58fn db_err(e: sqlx::Error) -> AuthError {
59    match e {
60        sqlx::Error::RowNotFound => AuthError::Storage(StorageError::NotFound),
61        sqlx::Error::Database(ref dbe) if dbe.constraint().is_some() => {
62            AuthError::Storage(StorageError::Conflict(dbe.message().to_owned()))
63        }
64        other => AuthError::Storage(StorageError::Database(other.to_string())),
65    }
66}
67
68fn credential_kind_str(k: &CredentialKind) -> &'static str {
69    match k {
70        CredentialKind::Password => "password",
71        CredentialKind::Passkey => "passkey",
72        CredentialKind::Webauthn => "webauthn",
73        CredentialKind::OauthToken => "oauth_token",
74    }
75}
76
77fn credential_kind_from_str(s: &str) -> CredentialKind {
78    match s {
79        "passkey" => CredentialKind::Passkey,
80        "webauthn" => CredentialKind::Webauthn,
81        "oauth_token" => CredentialKind::OauthToken,
82        _ => CredentialKind::Password,
83    }
84}
85
86fn map_user(r: &sqlx::postgres::PgRow) -> User {
87    User {
88        id: r.get("id"),
89        email: r.get("email"),
90        email_verified: r.get("email_verified"),
91        username: r.get("username"),
92        created_at: r.get("created_at"),
93        updated_at: r.get("updated_at"),
94        metadata: r.get::<serde_json::Value, _>("metadata"),
95    }
96}
97
98fn map_session(r: &sqlx::postgres::PgRow) -> Session {
99    Session {
100        id: r.get("id"),
101        user_id: r.get("user_id"),
102        token_hash: r.get("token_hash"),
103        device_info: r.get::<serde_json::Value, _>("device_info"),
104        ip_address: r.get("ip_address"),
105        org_id: r.get("org_id"),
106        expires_at: r.get("expires_at"),
107        created_at: r.get("created_at"),
108    }
109}
110
111fn map_audit_log(r: &sqlx::postgres::PgRow) -> AuditLog {
112    AuditLog {
113        id: r.get("id"),
114        user_id: r.get("user_id"),
115        org_id: r.get("org_id"),
116        action: r.get("action"),
117        resource_type: r.get("resource_type"),
118        resource_id: r.get("resource_id"),
119        ip_address: r.get("ip_address"),
120        metadata: r.get::<serde_json::Value, _>("metadata"),
121        created_at: r.get("created_at"),
122    }
123}
124
125// ── UserRepository ────────────────────────────────────────────────────────────
126
127#[async_trait]
128impl UserRepository for PostgresStore {
129    async fn find_by_id(&self, id: Uuid) -> Result<Option<User>> {
130        let row = sqlx::query(
131            "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
132             FROM authx_users WHERE id = $1",
133        )
134        .bind(id)
135        .fetch_optional(&self.pool)
136        .await
137        .map_err(db_err)?;
138        Ok(row.as_ref().map(map_user))
139    }
140
141    async fn find_by_email(&self, email: &str) -> Result<Option<User>> {
142        let row = sqlx::query(
143            "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
144             FROM authx_users WHERE email = $1",
145        )
146        .bind(email)
147        .fetch_optional(&self.pool)
148        .await
149        .map_err(db_err)?;
150        Ok(row.as_ref().map(map_user))
151    }
152
153    async fn find_by_username(&self, username: &str) -> Result<Option<User>> {
154        let row = sqlx::query(
155            "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
156             FROM authx_users WHERE username = $1",
157        )
158        .bind(username)
159        .fetch_optional(&self.pool)
160        .await
161        .map_err(db_err)?;
162        Ok(row.as_ref().map(map_user))
163    }
164
165    async fn list(&self, offset: u32, limit: u32) -> Result<Vec<User>> {
166        let rows = sqlx::query(
167            "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
168             FROM authx_users ORDER BY created_at ASC LIMIT $1 OFFSET $2",
169        )
170        .bind(limit as i64)
171        .bind(offset as i64)
172        .fetch_all(&self.pool)
173        .await
174        .map_err(db_err)?;
175        Ok(rows.iter().map(map_user).collect())
176    }
177
178    async fn create(&self, data: CreateUser) -> Result<User> {
179        let meta = data.metadata.unwrap_or(serde_json::Value::Null);
180        let row = sqlx::query(
181            "INSERT INTO authx_users (id, email, email_verified, username, metadata) \
182             VALUES ($1, $2, false, $3, $4) \
183             RETURNING id, email, email_verified, username, created_at, updated_at, metadata",
184        )
185        .bind(Uuid::new_v4())
186        .bind(&data.email)
187        .bind(data.username.as_deref())
188        .bind(&meta)
189        .fetch_one(&self.pool)
190        .await
191        .map_err(|e| {
192            if let sqlx::Error::Database(ref dbe) = e {
193                if dbe.constraint() == Some("authx_users_email_key") {
194                    return AuthError::EmailTaken;
195                }
196                if dbe.constraint() == Some("authx_users_username_key") {
197                    return AuthError::Storage(StorageError::Conflict(
198                        "username already taken".into(),
199                    ));
200                }
201            }
202            db_err(e)
203        })?;
204
205        tracing::debug!(email = %data.email, "user row inserted");
206        Ok(map_user(&row))
207    }
208
209    async fn update(&self, id: Uuid, data: UpdateUser) -> Result<User> {
210        let row = sqlx::query(
211            "UPDATE authx_users \
212             SET \
213               email          = COALESCE($2, email), \
214               email_verified = COALESCE($3, email_verified), \
215               username       = COALESCE($4, username), \
216               metadata       = COALESCE($5, metadata), \
217               updated_at     = NOW() \
218             WHERE id = $1 \
219             RETURNING id, email, email_verified, username, created_at, updated_at, metadata",
220        )
221        .bind(id)
222        .bind(data.email.as_deref())
223        .bind(data.email_verified)
224        .bind(data.username.as_deref())
225        .bind(data.metadata.as_ref())
226        .fetch_optional(&self.pool)
227        .await
228        .map_err(db_err)?
229        .ok_or(AuthError::UserNotFound)?;
230
231        tracing::debug!(user_id = %id, "user row updated");
232        Ok(map_user(&row))
233    }
234
235    async fn delete(&self, id: Uuid) -> Result<()> {
236        let result = sqlx::query("DELETE FROM authx_users WHERE id = $1")
237            .bind(id)
238            .execute(&self.pool)
239            .await
240            .map_err(db_err)?;
241
242        if result.rows_affected() == 0 {
243            return Err(AuthError::UserNotFound);
244        }
245        tracing::debug!(user_id = %id, "user row deleted");
246        Ok(())
247    }
248}
249
250// ── SessionRepository ─────────────────────────────────────────────────────────
251
252#[async_trait]
253impl SessionRepository for PostgresStore {
254    async fn create(&self, data: CreateSession) -> Result<Session> {
255        let row = sqlx::query(
256            "INSERT INTO authx_sessions \
257               (id, user_id, token_hash, device_info, ip_address, org_id, expires_at) \
258             VALUES ($1, $2, $3, $4, $5, $6, $7) \
259             RETURNING id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at",
260        )
261        .bind(Uuid::new_v4())
262        .bind(data.user_id)
263        .bind(&data.token_hash)
264        .bind(&data.device_info)
265        .bind(&data.ip_address)
266        .bind(data.org_id)
267        .bind(data.expires_at)
268        .fetch_one(&self.pool)
269        .await
270        .map_err(db_err)?;
271
272        tracing::debug!(user_id = %data.user_id, "session row inserted");
273        Ok(map_session(&row))
274    }
275
276    async fn find_by_token_hash(&self, hash: &str) -> Result<Option<Session>> {
277        let row = sqlx::query(
278            "SELECT id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at \
279             FROM authx_sessions WHERE token_hash = $1 AND expires_at > NOW()",
280        )
281        .bind(hash)
282        .fetch_optional(&self.pool)
283        .await
284        .map_err(db_err)?;
285        Ok(row.as_ref().map(map_session))
286    }
287
288    async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<Session>> {
289        let rows = sqlx::query(
290            "SELECT id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at \
291             FROM authx_sessions WHERE user_id = $1",
292        )
293        .bind(user_id)
294        .fetch_all(&self.pool)
295        .await
296        .map_err(db_err)?;
297        Ok(rows.iter().map(map_session).collect())
298    }
299
300    async fn invalidate(&self, session_id: Uuid) -> Result<()> {
301        let result = sqlx::query("DELETE FROM authx_sessions WHERE id = $1")
302            .bind(session_id)
303            .execute(&self.pool)
304            .await
305            .map_err(db_err)?;
306
307        if result.rows_affected() == 0 {
308            return Err(AuthError::SessionNotFound);
309        }
310        tracing::debug!(session_id = %session_id, "session invalidated");
311        Ok(())
312    }
313
314    async fn invalidate_all_for_user(&self, user_id: Uuid) -> Result<()> {
315        sqlx::query("DELETE FROM authx_sessions WHERE user_id = $1")
316            .bind(user_id)
317            .execute(&self.pool)
318            .await
319            .map_err(db_err)?;
320        tracing::debug!(user_id = %user_id, "all user sessions invalidated");
321        Ok(())
322    }
323
324    async fn set_org(&self, session_id: Uuid, org_id: Option<Uuid>) -> Result<Session> {
325        let row = sqlx::query(
326            "UPDATE authx_sessions SET org_id = $2 WHERE id = $1 \
327             RETURNING id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at",
328        )
329        .bind(session_id)
330        .bind(org_id)
331        .fetch_optional(&self.pool)
332        .await
333        .map_err(db_err)?
334        .ok_or(AuthError::SessionNotFound)?;
335
336        tracing::debug!(session_id = %session_id, "session org updated");
337        Ok(map_session(&row))
338    }
339}
340
341// ── CredentialRepository ──────────────────────────────────────────────────────
342
343#[async_trait]
344impl CredentialRepository for PostgresStore {
345    async fn create(&self, data: CreateCredential) -> Result<Credential> {
346        let kind_str = credential_kind_str(&data.kind);
347        let meta = data.metadata.unwrap_or(serde_json::Value::Null);
348
349        let row = sqlx::query(
350            "INSERT INTO authx_credentials (id, user_id, kind, credential_hash, metadata) \
351             VALUES ($1, $2, $3, $4, $5) \
352             RETURNING id, user_id, kind, credential_hash, metadata",
353        )
354        .bind(Uuid::new_v4())
355        .bind(data.user_id)
356        .bind(kind_str)
357        .bind(&data.credential_hash)
358        .bind(&meta)
359        .fetch_one(&self.pool)
360        .await
361        .map_err(db_err)?;
362
363        tracing::debug!(user_id = %data.user_id, kind = kind_str, "credential inserted");
364        Ok(Credential {
365            id: row.get("id"),
366            user_id: row.get("user_id"),
367            kind: credential_kind_from_str(row.get("kind")),
368            credential_hash: row.get("credential_hash"),
369            metadata: row.get::<serde_json::Value, _>("metadata"),
370        })
371    }
372
373    async fn find_password_hash(&self, user_id: Uuid) -> Result<Option<String>> {
374        let row = sqlx::query(
375            "SELECT credential_hash FROM authx_credentials \
376             WHERE user_id = $1 AND kind = 'password'",
377        )
378        .bind(user_id)
379        .fetch_optional(&self.pool)
380        .await
381        .map_err(db_err)?;
382        Ok(row.map(|r| r.get("credential_hash")))
383    }
384
385    async fn find_by_user_and_kind(
386        &self,
387        user_id: Uuid,
388        kind: CredentialKind,
389    ) -> Result<Option<Credential>> {
390        let row = sqlx::query(
391            "SELECT id, user_id, kind, credential_hash, metadata \
392             FROM authx_credentials WHERE user_id = $1 AND kind = $2",
393        )
394        .bind(user_id)
395        .bind(credential_kind_str(&kind))
396        .fetch_optional(&self.pool)
397        .await
398        .map_err(db_err)?;
399
400        Ok(row.map(|r| Credential {
401            id: r.get("id"),
402            user_id: r.get("user_id"),
403            kind: credential_kind_from_str(r.get("kind")),
404            credential_hash: r.get("credential_hash"),
405            metadata: r.get::<serde_json::Value, _>("metadata"),
406        }))
407    }
408
409    async fn delete_by_user_and_kind(&self, user_id: Uuid, kind: CredentialKind) -> Result<()> {
410        let result = sqlx::query("DELETE FROM authx_credentials WHERE user_id = $1 AND kind = $2")
411            .bind(user_id)
412            .bind(credential_kind_str(&kind))
413            .execute(&self.pool)
414            .await
415            .map_err(db_err)?;
416
417        if result.rows_affected() == 0 {
418            return Err(AuthError::Storage(StorageError::NotFound));
419        }
420        Ok(())
421    }
422}
423
424// ── OrgRepository ─────────────────────────────────────────────────────────────
425
426fn map_org(r: &sqlx::postgres::PgRow) -> Organization {
427    Organization {
428        id: r.get("id"),
429        name: r.get("name"),
430        slug: r.get("slug"),
431        metadata: r.get::<serde_json::Value, _>("metadata"),
432        created_at: r.get("created_at"),
433    }
434}
435
436fn map_membership(r: &sqlx::postgres::PgRow) -> Membership {
437    Membership {
438        id: r.get("id"),
439        user_id: r.get("user_id"),
440        org_id: r.get("org_id"),
441        role: Role {
442            id: r.get("role_id"),
443            org_id: r.get("role_org_id"),
444            name: r.get("role_name"),
445            permissions: r.get::<Vec<String>, _>("permissions"),
446        },
447        created_at: r.get("created_at"),
448    }
449}
450
451#[async_trait]
452impl OrgRepository for PostgresStore {
453    async fn create(&self, data: CreateOrg) -> Result<Organization> {
454        let meta = data.metadata.unwrap_or(serde_json::Value::Null);
455        let row = sqlx::query(
456            "INSERT INTO authx_orgs (id, name, slug, metadata) \
457             VALUES ($1, $2, $3, $4) \
458             RETURNING id, name, slug, metadata, created_at",
459        )
460        .bind(Uuid::new_v4())
461        .bind(&data.name)
462        .bind(&data.slug)
463        .bind(&meta)
464        .fetch_one(&self.pool)
465        .await
466        .map_err(|e| {
467            if let sqlx::Error::Database(ref dbe) = e {
468                if dbe.constraint() == Some("authx_orgs_slug_key") {
469                    return AuthError::Storage(StorageError::Conflict(format!(
470                        "slug '{}' already taken",
471                        data.slug
472                    )));
473                }
474            }
475            db_err(e)
476        })?;
477
478        tracing::debug!(slug = %data.slug, "org row inserted");
479        Ok(map_org(&row))
480    }
481
482    async fn find_by_id(&self, id: Uuid) -> Result<Option<Organization>> {
483        let row = sqlx::query(
484            "SELECT id, name, slug, metadata, created_at FROM authx_orgs WHERE id = $1",
485        )
486        .bind(id)
487        .fetch_optional(&self.pool)
488        .await
489        .map_err(db_err)?;
490        Ok(row.as_ref().map(map_org))
491    }
492
493    async fn find_by_slug(&self, slug: &str) -> Result<Option<Organization>> {
494        let row = sqlx::query(
495            "SELECT id, name, slug, metadata, created_at FROM authx_orgs WHERE slug = $1",
496        )
497        .bind(slug)
498        .fetch_optional(&self.pool)
499        .await
500        .map_err(db_err)?;
501        Ok(row.as_ref().map(map_org))
502    }
503
504    async fn add_member(&self, org_id: Uuid, user_id: Uuid, role_id: Uuid) -> Result<Membership> {
505        let role_row =
506            sqlx::query("SELECT id, org_id, name, permissions FROM authx_roles WHERE id = $1")
507                .bind(role_id)
508                .fetch_optional(&self.pool)
509                .await
510                .map_err(db_err)?
511                .ok_or(AuthError::Storage(StorageError::NotFound))?;
512
513        let role = Role {
514            id: role_row.get("id"),
515            org_id: role_row.get("org_id"),
516            name: role_row.get("name"),
517            permissions: role_row.get::<Vec<String>, _>("permissions"),
518        };
519
520        let row = sqlx::query(
521            "INSERT INTO authx_memberships (id, user_id, org_id, role_id) \
522             VALUES ($1, $2, $3, $4) \
523             RETURNING id, user_id, org_id, created_at",
524        )
525        .bind(Uuid::new_v4())
526        .bind(user_id)
527        .bind(org_id)
528        .bind(role_id)
529        .fetch_one(&self.pool)
530        .await
531        .map_err(db_err)?;
532
533        tracing::debug!(org_id = %org_id, user_id = %user_id, "member added");
534        Ok(Membership {
535            id: row.get("id"),
536            user_id: row.get("user_id"),
537            org_id: row.get("org_id"),
538            role,
539            created_at: row.get("created_at"),
540        })
541    }
542
543    async fn remove_member(&self, org_id: Uuid, user_id: Uuid) -> Result<()> {
544        let result =
545            sqlx::query("DELETE FROM authx_memberships WHERE org_id = $1 AND user_id = $2")
546                .bind(org_id)
547                .bind(user_id)
548                .execute(&self.pool)
549                .await
550                .map_err(db_err)?;
551
552        if result.rows_affected() == 0 {
553            return Err(AuthError::Storage(StorageError::NotFound));
554        }
555        Ok(())
556    }
557
558    async fn get_members(&self, org_id: Uuid) -> Result<Vec<Membership>> {
559        let rows = sqlx::query(
560            "SELECT m.id, m.user_id, m.org_id, m.created_at, \
561                    r.id AS role_id, r.org_id AS role_org_id, r.name AS role_name, r.permissions \
562             FROM authx_memberships m \
563             JOIN authx_roles r ON r.id = m.role_id \
564             WHERE m.org_id = $1",
565        )
566        .bind(org_id)
567        .fetch_all(&self.pool)
568        .await
569        .map_err(db_err)?;
570        Ok(rows.iter().map(map_membership).collect())
571    }
572
573    async fn find_roles(&self, org_id: Uuid) -> Result<Vec<Role>> {
574        let rows =
575            sqlx::query("SELECT id, org_id, name, permissions FROM authx_roles WHERE org_id = $1")
576                .bind(org_id)
577                .fetch_all(&self.pool)
578                .await
579                .map_err(db_err)?;
580        Ok(rows
581            .iter()
582            .map(|r| Role {
583                id: r.get("id"),
584                org_id: r.get("org_id"),
585                name: r.get("name"),
586                permissions: r.get::<Vec<String>, _>("permissions"),
587            })
588            .collect())
589    }
590
591    async fn create_role(
592        &self,
593        org_id: Uuid,
594        name: String,
595        permissions: Vec<String>,
596    ) -> Result<Role> {
597        let row = sqlx::query(
598            "INSERT INTO authx_roles (id, org_id, name, permissions) \
599             VALUES ($1, $2, $3, $4) \
600             RETURNING id, org_id, name, permissions",
601        )
602        .bind(Uuid::new_v4())
603        .bind(org_id)
604        .bind(&name)
605        .bind(&permissions)
606        .fetch_one(&self.pool)
607        .await
608        .map_err(db_err)?;
609
610        tracing::debug!(org_id = %org_id, name = %name, "role created");
611        Ok(Role {
612            id: row.get("id"),
613            org_id: row.get("org_id"),
614            name: row.get("name"),
615            permissions: row.get::<Vec<String>, _>("permissions"),
616        })
617    }
618
619    async fn update_member_role(
620        &self,
621        org_id: Uuid,
622        user_id: Uuid,
623        role_id: Uuid,
624    ) -> Result<Membership> {
625        sqlx::query("UPDATE authx_memberships SET role_id = $3 WHERE org_id = $1 AND user_id = $2")
626            .bind(org_id)
627            .bind(user_id)
628            .bind(role_id)
629            .execute(&self.pool)
630            .await
631            .map_err(db_err)?;
632
633        let rows = sqlx::query(
634            "SELECT m.id, m.user_id, m.org_id, m.created_at, \
635                    r.id AS role_id, r.org_id AS role_org_id, r.name AS role_name, r.permissions \
636             FROM authx_memberships m \
637             JOIN authx_roles r ON r.id = m.role_id \
638             WHERE m.org_id = $1 AND m.user_id = $2",
639        )
640        .bind(org_id)
641        .bind(user_id)
642        .fetch_optional(&self.pool)
643        .await
644        .map_err(db_err)?
645        .ok_or(AuthError::Storage(StorageError::NotFound))?;
646
647        Ok(map_membership(&rows))
648    }
649}
650
651// ── AuditLogRepository ────────────────────────────────────────────────────────
652
653#[async_trait]
654impl AuditLogRepository for PostgresStore {
655    async fn append(&self, entry: CreateAuditLog) -> Result<AuditLog> {
656        let meta = entry.metadata.unwrap_or(serde_json::Value::Null);
657        let row = sqlx::query(
658            "INSERT INTO authx_audit_logs \
659               (id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata) \
660             VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
661             RETURNING id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata, created_at",
662        )
663        .bind(Uuid::new_v4())
664        .bind(entry.user_id)
665        .bind(entry.org_id)
666        .bind(&entry.action)
667        .bind(&entry.resource_type)
668        .bind(entry.resource_id.as_deref())
669        .bind(&entry.ip_address)
670        .bind(&meta)
671        .fetch_one(&self.pool)
672        .await
673        .map_err(db_err)?;
674
675        tracing::debug!(action = %entry.action, "audit log appended");
676        Ok(map_audit_log(&row))
677    }
678
679    async fn find_by_user(&self, user_id: Uuid, limit: u32) -> Result<Vec<AuditLog>> {
680        let rows = sqlx::query(
681            "SELECT id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata, created_at \
682             FROM authx_audit_logs WHERE user_id = $1 ORDER BY created_at DESC LIMIT $2",
683        )
684        .bind(user_id)
685        .bind(limit as i64)
686        .fetch_all(&self.pool)
687        .await
688        .map_err(db_err)?;
689        Ok(rows.iter().map(map_audit_log).collect())
690    }
691
692    async fn find_by_org(&self, org_id: Uuid, limit: u32) -> Result<Vec<AuditLog>> {
693        let rows = sqlx::query(
694            "SELECT id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata, created_at \
695             FROM authx_audit_logs WHERE org_id = $1 ORDER BY created_at DESC LIMIT $2",
696        )
697        .bind(org_id)
698        .bind(limit as i64)
699        .fetch_all(&self.pool)
700        .await
701        .map_err(db_err)?;
702        Ok(rows.iter().map(map_audit_log).collect())
703    }
704}
705
706// ── ApiKeyRepository ──────────────────────────────────────────────────────────
707
708fn map_api_key(r: &sqlx::postgres::PgRow) -> ApiKey {
709    ApiKey {
710        id: r.get("id"),
711        user_id: r.get("user_id"),
712        org_id: r.get("org_id"),
713        key_hash: r.get("key_hash"),
714        prefix: r.get("prefix"),
715        name: r.get("name"),
716        scopes: r.get::<Vec<String>, _>("scopes"),
717        expires_at: r.get("expires_at"),
718        last_used_at: r.get("last_used_at"),
719    }
720}
721
722#[async_trait]
723impl ApiKeyRepository for PostgresStore {
724    async fn create(&self, data: CreateApiKey) -> Result<ApiKey> {
725        let row = sqlx::query(
726            "INSERT INTO authx_api_keys \
727               (id, user_id, org_id, key_hash, prefix, name, scopes, expires_at) \
728             VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
729             RETURNING id, user_id, org_id, key_hash, prefix, name, scopes, expires_at, last_used_at",
730        )
731        .bind(Uuid::new_v4())
732        .bind(data.user_id)
733        .bind(data.org_id)
734        .bind(&data.key_hash)
735        .bind(&data.prefix)
736        .bind(&data.name)
737        .bind(&data.scopes)
738        .bind(data.expires_at)
739        .fetch_one(&self.pool)
740        .await
741        .map_err(db_err)?;
742
743        tracing::debug!(user_id = %data.user_id, "api key created");
744        Ok(map_api_key(&row))
745    }
746
747    async fn find_by_hash(&self, key_hash: &str) -> Result<Option<ApiKey>> {
748        let row = sqlx::query(
749            "SELECT id, user_id, org_id, key_hash, prefix, name, scopes, expires_at, last_used_at \
750             FROM authx_api_keys WHERE key_hash = $1",
751        )
752        .bind(key_hash)
753        .fetch_optional(&self.pool)
754        .await
755        .map_err(db_err)?;
756        Ok(row.as_ref().map(map_api_key))
757    }
758
759    async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<ApiKey>> {
760        let rows = sqlx::query(
761            "SELECT id, user_id, org_id, key_hash, prefix, name, scopes, expires_at, last_used_at \
762             FROM authx_api_keys WHERE user_id = $1 ORDER BY id",
763        )
764        .bind(user_id)
765        .fetch_all(&self.pool)
766        .await
767        .map_err(db_err)?;
768        Ok(rows.iter().map(map_api_key).collect())
769    }
770
771    async fn revoke(&self, key_id: Uuid, user_id: Uuid) -> Result<()> {
772        let result = sqlx::query("DELETE FROM authx_api_keys WHERE id = $1 AND user_id = $2")
773            .bind(key_id)
774            .bind(user_id)
775            .execute(&self.pool)
776            .await
777            .map_err(db_err)?;
778
779        if result.rows_affected() == 0 {
780            return Err(AuthError::Storage(StorageError::NotFound));
781        }
782        tracing::debug!(key_id = %key_id, "api key revoked");
783        Ok(())
784    }
785
786    async fn touch_last_used(&self, key_id: Uuid, at: DateTime<Utc>) -> Result<()> {
787        sqlx::query("UPDATE authx_api_keys SET last_used_at = $2 WHERE id = $1")
788            .bind(key_id)
789            .bind(at)
790            .execute(&self.pool)
791            .await
792            .map_err(db_err)?;
793        Ok(())
794    }
795}
796
797// ── OAuthAccountRepository ────────────────────────────────────────────────────
798
799fn map_oauth_account(r: &sqlx::postgres::PgRow) -> OAuthAccount {
800    OAuthAccount {
801        id: r.get("id"),
802        user_id: r.get("user_id"),
803        provider: r.get("provider"),
804        provider_user_id: r.get("provider_user_id"),
805        access_token_enc: r.get("access_token_enc"),
806        refresh_token_enc: r.get("refresh_token_enc"),
807        expires_at: r.get("expires_at"),
808    }
809}
810
811#[async_trait]
812impl OAuthAccountRepository for PostgresStore {
813    async fn upsert(&self, data: UpsertOAuthAccount) -> Result<OAuthAccount> {
814        let row = sqlx::query(
815            "INSERT INTO authx_oauth_accounts \
816               (id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at) \
817             VALUES ($1, $2, $3, $4, $5, $6, $7) \
818             ON CONFLICT (provider, provider_user_id) DO UPDATE SET \
819               access_token_enc  = EXCLUDED.access_token_enc, \
820               refresh_token_enc = EXCLUDED.refresh_token_enc, \
821               expires_at        = EXCLUDED.expires_at \
822             RETURNING id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at",
823        )
824        .bind(Uuid::new_v4())
825        .bind(data.user_id)
826        .bind(&data.provider)
827        .bind(&data.provider_user_id)
828        .bind(&data.access_token_enc)
829        .bind(data.refresh_token_enc.as_deref())
830        .bind(data.expires_at)
831        .fetch_one(&self.pool)
832        .await
833        .map_err(db_err)?;
834
835        tracing::debug!(provider = %data.provider, user_id = %data.user_id, "oauth account upserted");
836        Ok(map_oauth_account(&row))
837    }
838
839    async fn find_by_provider(
840        &self,
841        provider: &str,
842        provider_user_id: &str,
843    ) -> Result<Option<OAuthAccount>> {
844        let row = sqlx::query(
845            "SELECT id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at \
846             FROM authx_oauth_accounts WHERE provider = $1 AND provider_user_id = $2",
847        )
848        .bind(provider)
849        .bind(provider_user_id)
850        .fetch_optional(&self.pool)
851        .await
852        .map_err(db_err)?;
853        Ok(row.as_ref().map(map_oauth_account))
854    }
855
856    async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<OAuthAccount>> {
857        let rows = sqlx::query(
858            "SELECT id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at \
859             FROM authx_oauth_accounts WHERE user_id = $1",
860        )
861        .bind(user_id)
862        .fetch_all(&self.pool)
863        .await
864        .map_err(db_err)?;
865        Ok(rows.iter().map(map_oauth_account).collect())
866    }
867
868    async fn delete(&self, id: Uuid) -> Result<()> {
869        let result = sqlx::query("DELETE FROM authx_oauth_accounts WHERE id = $1")
870            .bind(id)
871            .execute(&self.pool)
872            .await
873            .map_err(db_err)?;
874
875        if result.rows_affected() == 0 {
876            return Err(AuthError::Storage(StorageError::NotFound));
877        }
878        Ok(())
879    }
880}
881
882// ── InviteRepository ──────────────────────────────────────────────────────────
883
884fn map_invite(r: &sqlx::postgres::PgRow) -> Invite {
885    Invite {
886        id: r.get("id"),
887        org_id: r.get("org_id"),
888        email: r.get("email"),
889        role_id: r.get("role_id"),
890        token_hash: r.get("token_hash"),
891        expires_at: r.get("expires_at"),
892        accepted_at: r.get("accepted_at"),
893    }
894}
895
896#[async_trait]
897impl InviteRepository for PostgresStore {
898    async fn create(&self, data: CreateInvite) -> Result<Invite> {
899        let row = sqlx::query(
900            "INSERT INTO authx_invites (id, org_id, email, role_id, token_hash, expires_at) \
901             VALUES ($1, $2, $3, $4, $5, $6) \
902             RETURNING id, org_id, email, role_id, token_hash, expires_at, accepted_at",
903        )
904        .bind(Uuid::new_v4())
905        .bind(data.org_id)
906        .bind(&data.email)
907        .bind(data.role_id)
908        .bind(&data.token_hash)
909        .bind(data.expires_at)
910        .fetch_one(&self.pool)
911        .await
912        .map_err(db_err)?;
913
914        tracing::debug!(org_id = %data.org_id, email = %data.email, "invite created");
915        Ok(map_invite(&row))
916    }
917
918    async fn find_by_token_hash(&self, hash: &str) -> Result<Option<Invite>> {
919        let row = sqlx::query(
920            "SELECT id, org_id, email, role_id, token_hash, expires_at, accepted_at \
921             FROM authx_invites WHERE token_hash = $1",
922        )
923        .bind(hash)
924        .fetch_optional(&self.pool)
925        .await
926        .map_err(db_err)?;
927        Ok(row.as_ref().map(map_invite))
928    }
929
930    async fn accept(&self, invite_id: Uuid) -> Result<Invite> {
931        let row = sqlx::query(
932            "UPDATE authx_invites SET accepted_at = NOW() WHERE id = $1 \
933             RETURNING id, org_id, email, role_id, token_hash, expires_at, accepted_at",
934        )
935        .bind(invite_id)
936        .fetch_optional(&self.pool)
937        .await
938        .map_err(db_err)?
939        .ok_or(AuthError::Storage(StorageError::NotFound))?;
940
941        Ok(map_invite(&row))
942    }
943
944    async fn delete_expired(&self) -> Result<u64> {
945        let result = sqlx::query(
946            "DELETE FROM authx_invites WHERE accepted_at IS NULL AND expires_at < NOW()",
947        )
948        .execute(&self.pool)
949        .await
950        .map_err(db_err)?;
951        Ok(result.rows_affected())
952    }
953}
954
955// ── OidcClientRepository ───────────────────────────────────────────────────────
956
957fn oidc_token_type_str(t: &OidcTokenType) -> &'static str {
958    match t {
959        OidcTokenType::Access => "access",
960        OidcTokenType::Refresh => "refresh",
961        OidcTokenType::DeviceAccess => "device_access",
962    }
963}
964
965fn oidc_token_type_from_str(s: &str) -> OidcTokenType {
966    match s {
967        "refresh" => OidcTokenType::Refresh,
968        "device_access" => OidcTokenType::DeviceAccess,
969        _ => OidcTokenType::Access,
970    }
971}
972
973fn map_oidc_client(r: &sqlx::postgres::PgRow) -> OidcClient {
974    OidcClient {
975        id: r.get("id"),
976        client_id: r.get("client_id"),
977        secret_hash: r.get("secret_hash"),
978        name: r.get("name"),
979        redirect_uris: r.get::<Vec<String>, _>("redirect_uris"),
980        grant_types: r.get::<Vec<String>, _>("grant_types"),
981        response_types: r.get::<Vec<String>, _>("response_types"),
982        allowed_scopes: r.get("allowed_scopes"),
983        created_at: r.get("created_at"),
984    }
985}
986
987#[async_trait]
988impl OidcClientRepository for PostgresStore {
989    async fn create(&self, data: CreateOidcClient) -> Result<OidcClient> {
990        let client_id = Uuid::new_v4().to_string();
991        let row = sqlx::query(
992            "INSERT INTO authx_oidc_clients \
993               (id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes) \
994             VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
995             RETURNING id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes, created_at",
996        )
997        .bind(Uuid::new_v4())
998        .bind(&client_id)
999        .bind(&data.secret_hash)
1000        .bind(&data.name)
1001        .bind(&data.redirect_uris)
1002        .bind(&data.grant_types)
1003        .bind(&data.response_types)
1004        .bind(&data.allowed_scopes)
1005        .fetch_one(&self.pool)
1006        .await
1007        .map_err(db_err)?;
1008
1009        tracing::debug!(client_id = %client_id, "oidc client created");
1010        Ok(map_oidc_client(&row))
1011    }
1012
1013    async fn find_by_client_id(&self, client_id: &str) -> Result<Option<OidcClient>> {
1014        let row = sqlx::query(
1015            "SELECT id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes, created_at \
1016             FROM authx_oidc_clients WHERE client_id = $1",
1017        )
1018        .bind(client_id)
1019        .fetch_optional(&self.pool)
1020        .await
1021        .map_err(db_err)?;
1022        Ok(row.as_ref().map(map_oidc_client))
1023    }
1024
1025    async fn list(&self, offset: u32, limit: u32) -> Result<Vec<OidcClient>> {
1026        let rows = sqlx::query(
1027            "SELECT id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes, created_at \
1028             FROM authx_oidc_clients ORDER BY created_at ASC LIMIT $1 OFFSET $2",
1029        )
1030        .bind(limit as i64)
1031        .bind(offset as i64)
1032        .fetch_all(&self.pool)
1033        .await
1034        .map_err(db_err)?;
1035        Ok(rows.iter().map(map_oidc_client).collect())
1036    }
1037}
1038
1039// ── AuthorizationCodeRepository ────────────────────────────────────────────────
1040
1041fn map_authorization_code(r: &sqlx::postgres::PgRow) -> AuthorizationCode {
1042    AuthorizationCode {
1043        id: r.get("id"),
1044        code_hash: r.get("code_hash"),
1045        client_id: r.get("client_id"),
1046        user_id: r.get("user_id"),
1047        redirect_uri: r.get("redirect_uri"),
1048        scope: r.get("scope"),
1049        nonce: r.get("nonce"),
1050        code_challenge: r.get("code_challenge"),
1051        expires_at: r.get("expires_at"),
1052        used: r.get("used"),
1053    }
1054}
1055
1056#[async_trait]
1057impl AuthorizationCodeRepository for PostgresStore {
1058    async fn create(&self, data: CreateAuthorizationCode) -> Result<AuthorizationCode> {
1059        let row = sqlx::query(
1060            "INSERT INTO authx_oidc_authorization_codes \
1061               (id, code_hash, client_id, user_id, redirect_uri, scope, nonce, code_challenge, expires_at) \
1062             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
1063             RETURNING id, code_hash, client_id, user_id, redirect_uri, scope, nonce, code_challenge, expires_at, used",
1064        )
1065        .bind(Uuid::new_v4())
1066        .bind(&data.code_hash)
1067        .bind(&data.client_id)
1068        .bind(data.user_id)
1069        .bind(&data.redirect_uri)
1070        .bind(&data.scope)
1071        .bind(data.nonce.as_deref())
1072        .bind(data.code_challenge.as_deref())
1073        .bind(data.expires_at)
1074        .fetch_one(&self.pool)
1075        .await
1076        .map_err(db_err)?;
1077
1078        tracing::debug!(client_id = %data.client_id, "authorization code created");
1079        Ok(map_authorization_code(&row))
1080    }
1081
1082    async fn find_by_code_hash(&self, hash: &str) -> Result<Option<AuthorizationCode>> {
1083        let row = sqlx::query(
1084            "SELECT id, code_hash, client_id, user_id, redirect_uri, scope, nonce, code_challenge, expires_at, used \
1085             FROM authx_oidc_authorization_codes WHERE code_hash = $1 AND expires_at > NOW() AND used = false",
1086        )
1087        .bind(hash)
1088        .fetch_optional(&self.pool)
1089        .await
1090        .map_err(db_err)?;
1091        Ok(row.as_ref().map(map_authorization_code))
1092    }
1093
1094    async fn mark_used(&self, id: Uuid) -> Result<()> {
1095        let result =
1096            sqlx::query("UPDATE authx_oidc_authorization_codes SET used = true WHERE id = $1")
1097                .bind(id)
1098                .execute(&self.pool)
1099                .await
1100                .map_err(db_err)?;
1101
1102        if result.rows_affected() == 0 {
1103            return Err(AuthError::Storage(StorageError::NotFound));
1104        }
1105        Ok(())
1106    }
1107
1108    async fn delete_expired(&self) -> Result<u64> {
1109        let result =
1110            sqlx::query("DELETE FROM authx_oidc_authorization_codes WHERE expires_at < NOW()")
1111                .execute(&self.pool)
1112                .await
1113                .map_err(db_err)?;
1114        Ok(result.rows_affected())
1115    }
1116}
1117
1118// ── OidcTokenRepository ────────────────────────────────────────────────────────
1119
1120fn map_oidc_token(r: &sqlx::postgres::PgRow) -> OidcToken {
1121    OidcToken {
1122        id: r.get("id"),
1123        token_hash: r.get("token_hash"),
1124        client_id: r.get("client_id"),
1125        user_id: r.get("user_id"),
1126        scope: r.get("scope"),
1127        token_type: oidc_token_type_from_str(r.get::<&str, _>("token_type")),
1128        expires_at: r.get("expires_at"),
1129        revoked: r.get("revoked"),
1130        created_at: r.get("created_at"),
1131    }
1132}
1133
1134#[async_trait]
1135impl OidcTokenRepository for PostgresStore {
1136    async fn create(&self, data: CreateOidcToken) -> Result<OidcToken> {
1137        let row = sqlx::query(
1138            "INSERT INTO authx_oidc_tokens \
1139               (id, token_hash, client_id, user_id, scope, token_type, expires_at) \
1140             VALUES ($1, $2, $3, $4, $5, $6, $7) \
1141             RETURNING id, token_hash, client_id, user_id, scope, token_type, expires_at, revoked, created_at",
1142        )
1143        .bind(Uuid::new_v4())
1144        .bind(&data.token_hash)
1145        .bind(&data.client_id)
1146        .bind(data.user_id)
1147        .bind(&data.scope)
1148        .bind(oidc_token_type_str(&data.token_type))
1149        .bind(data.expires_at)
1150        .fetch_one(&self.pool)
1151        .await
1152        .map_err(db_err)?;
1153
1154        tracing::debug!(client_id = %data.client_id, "oidc token created");
1155        Ok(map_oidc_token(&row))
1156    }
1157
1158    async fn find_by_token_hash(&self, hash: &str) -> Result<Option<OidcToken>> {
1159        let row = sqlx::query(
1160            "SELECT id, token_hash, client_id, user_id, scope, token_type, expires_at, revoked, created_at \
1161             FROM authx_oidc_tokens WHERE token_hash = $1 AND revoked = false",
1162        )
1163        .bind(hash)
1164        .fetch_optional(&self.pool)
1165        .await
1166        .map_err(db_err)?;
1167
1168        if let Some(ref r) = row {
1169            let tok = map_oidc_token(r);
1170            if let Some(exp) = tok.expires_at {
1171                if exp < Utc::now() {
1172                    return Ok(None);
1173                }
1174            }
1175        }
1176        Ok(row.as_ref().map(map_oidc_token))
1177    }
1178
1179    async fn revoke(&self, id: Uuid) -> Result<()> {
1180        let result = sqlx::query("UPDATE authx_oidc_tokens SET revoked = true WHERE id = $1")
1181            .bind(id)
1182            .execute(&self.pool)
1183            .await
1184            .map_err(db_err)?;
1185
1186        if result.rows_affected() == 0 {
1187            return Err(AuthError::Storage(StorageError::NotFound));
1188        }
1189        Ok(())
1190    }
1191
1192    async fn revoke_all_for_user_client(&self, user_id: Uuid, client_id: &str) -> Result<()> {
1193        sqlx::query(
1194            "UPDATE authx_oidc_tokens SET revoked = true WHERE user_id = $1 AND client_id = $2",
1195        )
1196        .bind(user_id)
1197        .bind(client_id)
1198        .execute(&self.pool)
1199        .await
1200        .map_err(db_err)?;
1201        Ok(())
1202    }
1203}
1204
1205// ── OidcFederationProviderRepository ──────────────────────────────────────────
1206
1207fn map_oidc_federation_provider(r: &sqlx::postgres::PgRow) -> OidcFederationProvider {
1208    let claim_mapping_json: serde_json::Value =
1209        r.try_get("claim_mapping").unwrap_or(serde_json::json!([]));
1210    let claim_mapping = serde_json::from_value(claim_mapping_json).unwrap_or_default();
1211    OidcFederationProvider {
1212        id: r.get("id"),
1213        name: r.get("name"),
1214        issuer: r.get("issuer"),
1215        client_id: r.get("client_id"),
1216        secret_enc: r.get("secret_enc"),
1217        scopes: r.get("scopes"),
1218        org_id: r.try_get("org_id").ok(),
1219        enabled: r.get("enabled"),
1220        created_at: r.get("created_at"),
1221        claim_mapping,
1222    }
1223}
1224
1225#[async_trait]
1226impl OidcFederationProviderRepository for PostgresStore {
1227    async fn create(&self, data: CreateOidcFederationProvider) -> Result<OidcFederationProvider> {
1228        let claim_mapping_json =
1229            serde_json::to_value(&data.claim_mapping).unwrap_or(serde_json::json!([]));
1230        let row = sqlx::query(
1231            "INSERT INTO authx_oidc_federation_providers \
1232               (id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled) \
1233             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true) \
1234             RETURNING id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at",
1235        )
1236        .bind(Uuid::new_v4())
1237        .bind(&data.name)
1238        .bind(&data.issuer)
1239        .bind(&data.client_id)
1240        .bind(&data.secret_enc)
1241        .bind(&data.scopes)
1242        .bind(data.org_id)
1243        .bind(claim_mapping_json)
1244        .fetch_one(&self.pool)
1245        .await
1246        .map_err(db_err)?;
1247
1248        tracing::debug!(name = %data.name, "oidc federation provider created");
1249        Ok(map_oidc_federation_provider(&row))
1250    }
1251
1252    async fn find_by_id(&self, id: Uuid) -> Result<Option<OidcFederationProvider>> {
1253        let row = sqlx::query(
1254            "SELECT id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at \
1255             FROM authx_oidc_federation_providers WHERE id = $1",
1256        )
1257        .bind(id)
1258        .fetch_optional(&self.pool)
1259        .await
1260        .map_err(db_err)?;
1261        Ok(row.as_ref().map(map_oidc_federation_provider))
1262    }
1263
1264    async fn find_by_name(&self, name: &str) -> Result<Option<OidcFederationProvider>> {
1265        let row = sqlx::query(
1266            "SELECT id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at \
1267             FROM authx_oidc_federation_providers WHERE name = $1",
1268        )
1269        .bind(name)
1270        .fetch_optional(&self.pool)
1271        .await
1272        .map_err(db_err)?;
1273        Ok(row.as_ref().map(map_oidc_federation_provider))
1274    }
1275
1276    async fn list_enabled(&self) -> Result<Vec<OidcFederationProvider>> {
1277        let rows = sqlx::query(
1278            "SELECT id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at \
1279             FROM authx_oidc_federation_providers WHERE enabled = true ORDER BY name",
1280        )
1281        .fetch_all(&self.pool)
1282        .await
1283        .map_err(db_err)?;
1284        Ok(rows.iter().map(map_oidc_federation_provider).collect())
1285    }
1286}
1287
1288// ── DeviceCodeRepository ─────────────────────────────────────────────────────
1289
1290const DEVICE_CODE_COLS: &str = "id, device_code_hash, user_code_hash, user_code, client_id, \
1291                                scope, expires_at, interval_secs, authorized, denied, user_id, \
1292                                last_polled_at";
1293
1294fn map_device_code(r: &sqlx::postgres::PgRow) -> DeviceCode {
1295    DeviceCode {
1296        id: r.get("id"),
1297        device_code_hash: r.get("device_code_hash"),
1298        user_code_hash: r.get("user_code_hash"),
1299        user_code: r.get("user_code"),
1300        client_id: r.get("client_id"),
1301        scope: r.get("scope"),
1302        expires_at: r.get("expires_at"),
1303        interval_secs: r.get::<i32, _>("interval_secs") as u32,
1304        authorized: r.get("authorized"),
1305        denied: r.get("denied"),
1306        user_id: r.get("user_id"),
1307        last_polled_at: r.get("last_polled_at"),
1308    }
1309}
1310
1311#[async_trait]
1312impl DeviceCodeRepository for PostgresStore {
1313    async fn create(&self, data: CreateDeviceCode) -> Result<DeviceCode> {
1314        let row = sqlx::query(&format!(
1315            "INSERT INTO authx_device_codes \
1316               (id, device_code_hash, user_code_hash, user_code, client_id, scope, expires_at, interval_secs) \
1317             VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
1318             RETURNING {DEVICE_CODE_COLS}"
1319        ))
1320        .bind(Uuid::new_v4())
1321        .bind(&data.device_code_hash)
1322        .bind(&data.user_code_hash)
1323        .bind(&data.user_code)
1324        .bind(&data.client_id)
1325        .bind(&data.scope)
1326        .bind(data.expires_at)
1327        .bind(data.interval_secs as i32)
1328        .fetch_one(&self.pool)
1329        .await
1330        .map_err(db_err)?;
1331
1332        tracing::debug!(client_id = %data.client_id, "device code created");
1333        Ok(map_device_code(&row))
1334    }
1335
1336    async fn find_by_device_code_hash(&self, hash: &str) -> Result<Option<DeviceCode>> {
1337        let row = sqlx::query(&format!(
1338            "SELECT {DEVICE_CODE_COLS} FROM authx_device_codes \
1339             WHERE device_code_hash = $1 AND expires_at > NOW()"
1340        ))
1341        .bind(hash)
1342        .fetch_optional(&self.pool)
1343        .await
1344        .map_err(db_err)?;
1345        Ok(row.as_ref().map(map_device_code))
1346    }
1347
1348    async fn find_by_user_code_hash(&self, hash: &str) -> Result<Option<DeviceCode>> {
1349        let row = sqlx::query(&format!(
1350            "SELECT {DEVICE_CODE_COLS} FROM authx_device_codes \
1351             WHERE user_code_hash = $1 AND expires_at > NOW() \
1352             AND authorized = false AND denied = false"
1353        ))
1354        .bind(hash)
1355        .fetch_optional(&self.pool)
1356        .await
1357        .map_err(db_err)?;
1358        Ok(row.as_ref().map(map_device_code))
1359    }
1360
1361    async fn authorize(&self, id: Uuid, user_id: Uuid) -> Result<()> {
1362        let result = sqlx::query(
1363            "UPDATE authx_device_codes SET authorized = true, user_id = $2 WHERE id = $1",
1364        )
1365        .bind(id)
1366        .bind(user_id)
1367        .execute(&self.pool)
1368        .await
1369        .map_err(db_err)?;
1370
1371        if result.rows_affected() == 0 {
1372            return Err(AuthError::Storage(StorageError::NotFound));
1373        }
1374        tracing::debug!(id = %id, "device code authorized");
1375        Ok(())
1376    }
1377
1378    async fn deny(&self, id: Uuid) -> Result<()> {
1379        let result = sqlx::query("UPDATE authx_device_codes SET denied = true WHERE id = $1")
1380            .bind(id)
1381            .execute(&self.pool)
1382            .await
1383            .map_err(db_err)?;
1384
1385        if result.rows_affected() == 0 {
1386            return Err(AuthError::Storage(StorageError::NotFound));
1387        }
1388        tracing::debug!(id = %id, "device code denied");
1389        Ok(())
1390    }
1391
1392    async fn update_last_polled(&self, id: Uuid, interval_secs: u32) -> Result<()> {
1393        sqlx::query(
1394            "UPDATE authx_device_codes SET last_polled_at = NOW(), interval_secs = $2 WHERE id = $1",
1395        )
1396        .bind(id)
1397        .bind(interval_secs as i32)
1398        .execute(&self.pool)
1399        .await
1400        .map_err(db_err)?;
1401        Ok(())
1402    }
1403
1404    async fn delete(&self, id: Uuid) -> Result<()> {
1405        sqlx::query("DELETE FROM authx_device_codes WHERE id = $1")
1406            .bind(id)
1407            .execute(&self.pool)
1408            .await
1409            .map_err(db_err)?;
1410        Ok(())
1411    }
1412
1413    async fn delete_expired(&self) -> Result<u64> {
1414        let result = sqlx::query("DELETE FROM authx_device_codes WHERE expires_at < NOW()")
1415            .execute(&self.pool)
1416            .await
1417            .map_err(db_err)?;
1418        Ok(result.rows_affected())
1419    }
1420
1421    async fn list_by_client(
1422        &self,
1423        client_id: &str,
1424        offset: u32,
1425        limit: u32,
1426    ) -> Result<Vec<DeviceCode>> {
1427        let rows = sqlx::query(&format!(
1428            "SELECT {DEVICE_CODE_COLS} FROM authx_device_codes \
1429             WHERE client_id = $1 ORDER BY expires_at DESC LIMIT $2 OFFSET $3"
1430        ))
1431        .bind(client_id)
1432        .bind(limit as i64)
1433        .bind(offset as i64)
1434        .fetch_all(&self.pool)
1435        .await
1436        .map_err(db_err)?;
1437        Ok(rows.iter().map(map_device_code).collect())
1438    }
1439}