Skip to main content

better_auth_core/adapters/
database.rs

1pub use super::traits::{
2    AccountOps, ApiKeyOps, InvitationOps, MemberOps, OrganizationOps, PasskeyOps, SessionOps,
3    TwoFactorOps, UserOps, VerificationOps,
4};
5
6/// Database adapter trait for persistence.
7///
8/// Combines all entity-specific operation traits. Any type that implements
9/// all sub-traits (`UserOps`, `SessionOps`, etc.) automatically implements
10/// `DatabaseAdapter` via the blanket impl.
11///
12/// Use the sub-traits directly when you only need a subset of operations
13/// (e.g., a plugin that only accesses users and sessions).
14pub trait DatabaseAdapter:
15    UserOps
16    + SessionOps
17    + AccountOps
18    + VerificationOps
19    + OrganizationOps
20    + MemberOps
21    + InvitationOps
22    + TwoFactorOps
23    + ApiKeyOps
24    + PasskeyOps
25{
26}
27
28impl<T> DatabaseAdapter for T where
29    T: UserOps
30        + SessionOps
31        + AccountOps
32        + VerificationOps
33        + OrganizationOps
34        + MemberOps
35        + InvitationOps
36        + TwoFactorOps
37        + ApiKeyOps
38        + PasskeyOps
39{
40}
41
42#[cfg(feature = "sqlx-postgres")]
43pub mod sqlx_adapter {
44    use super::*;
45    use async_trait::async_trait;
46    use chrono::{DateTime, Utc};
47
48    use crate::entity::{
49        AuthAccount, AuthAccountMeta, AuthApiKey, AuthApiKeyMeta, AuthInvitation,
50        AuthInvitationMeta, AuthMember, AuthMemberMeta, AuthOrganization, AuthOrganizationMeta,
51        AuthPasskey, AuthPasskeyMeta, AuthSession, AuthSessionMeta, AuthTwoFactor,
52        AuthTwoFactorMeta, AuthUser, AuthUserMeta, AuthVerification, AuthVerificationMeta,
53    };
54    use crate::error::{AuthError, AuthResult};
55    use crate::types::{
56        Account, ApiKey, CreateAccount, CreateApiKey, CreateInvitation, CreateMember,
57        CreateOrganization, CreatePasskey, CreateSession, CreateTwoFactor, CreateUser,
58        CreateVerification, Invitation, InvitationStatus, ListUsersParams, Member, Organization,
59        Passkey, Session, TwoFactor, UpdateAccount, UpdateApiKey, UpdateOrganization, UpdateUser,
60        User, Verification,
61    };
62    use sqlx::PgPool;
63    use sqlx::postgres::PgRow;
64    use std::marker::PhantomData;
65    use uuid::Uuid;
66
67    /// Quote a SQL identifier with double quotes for PostgreSQL.
68    ///
69    /// This prevents issues with reserved words (e.g. `user`, `key`, `order`)
70    /// and ensures correct identifier handling regardless of the names returned
71    /// by `Auth*Meta` traits.
72    #[inline]
73    fn qi(ident: &str) -> String {
74        format!("\"{}\"", ident.replace('"', "\"\""))
75    }
76
77    /// Blanket trait combining all bounds needed for SQLx-based entity types.
78    ///
79    /// Any type that implements `sqlx::FromRow` plus the standard marker traits
80    /// automatically satisfies this bound. Custom entity types just need
81    /// `#[derive(sqlx::FromRow)]` (or a manual `FromRow` impl) alongside
82    /// their `Auth*` derive.
83    pub trait SqlxEntity:
84        for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
85    {
86    }
87
88    impl<T> SqlxEntity for T where
89        T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
90    {
91    }
92
93    type SqlxAdapterEntities<U, S, A, O, M, I, V, TF, AK, PK> = (U, S, A, O, M, I, V, TF, AK, PK);
94
95    /// PostgreSQL database adapter via SQLx.
96    ///
97    /// Generic over entity types — use default type parameters for the built-in
98    /// types, or supply your own custom structs that implement `Auth*` + `sqlx::FromRow`.
99    pub struct SqlxAdapter<
100        U = User,
101        S = Session,
102        A = Account,
103        O = Organization,
104        M = Member,
105        I = Invitation,
106        V = Verification,
107        TF = TwoFactor,
108        AK = ApiKey,
109        PK = Passkey,
110    > {
111        pool: PgPool,
112        #[allow(clippy::type_complexity)]
113        _phantom: PhantomData<SqlxAdapterEntities<U, S, A, O, M, I, V, TF, AK, PK>>,
114    }
115
116    /// Constructors for the default (built-in) entity types.
117    impl SqlxAdapter {
118        pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
119            let pool = PgPool::connect(database_url).await?;
120            Ok(Self {
121                pool,
122                _phantom: PhantomData,
123            })
124        }
125
126        pub async fn with_config(
127            database_url: &str,
128            config: PoolConfig,
129        ) -> Result<Self, sqlx::Error> {
130            let pool = sqlx::postgres::PgPoolOptions::new()
131                .max_connections(config.max_connections)
132                .min_connections(config.min_connections)
133                .acquire_timeout(config.acquire_timeout)
134                .idle_timeout(config.idle_timeout)
135                .max_lifetime(config.max_lifetime)
136                .connect(database_url)
137                .await?;
138            Ok(Self {
139                pool,
140                _phantom: PhantomData,
141            })
142        }
143    }
144
145    /// Methods available for all type parameterizations.
146    impl<U, S, A, O, M, I, V, TF, AK, PK> SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK> {
147        pub fn from_pool(pool: PgPool) -> Self {
148            Self {
149                pool,
150                _phantom: PhantomData,
151            }
152        }
153
154        pub async fn test_connection(&self) -> Result<(), sqlx::Error> {
155            sqlx::query("SELECT 1").execute(&self.pool).await?;
156            Ok(())
157        }
158
159        pub fn pool_stats(&self) -> PoolStats {
160            PoolStats {
161                size: self.pool.size(),
162                idle: self.pool.num_idle(),
163            }
164        }
165
166        pub async fn close(&self) {
167            self.pool.close().await;
168        }
169    }
170
171    #[derive(Debug, Clone)]
172    pub struct PoolConfig {
173        pub max_connections: u32,
174        pub min_connections: u32,
175        pub acquire_timeout: std::time::Duration,
176        pub idle_timeout: Option<std::time::Duration>,
177        pub max_lifetime: Option<std::time::Duration>,
178    }
179
180    impl Default for PoolConfig {
181        fn default() -> Self {
182            Self {
183                max_connections: 10,
184                min_connections: 0,
185                acquire_timeout: std::time::Duration::from_secs(30),
186                idle_timeout: Some(std::time::Duration::from_secs(600)),
187                max_lifetime: Some(std::time::Duration::from_secs(1800)),
188            }
189        }
190    }
191
192    #[derive(Debug, Clone)]
193    pub struct PoolStats {
194        pub size: u32,
195        pub idle: usize,
196    }
197
198    // -- UserOps --
199
200    #[async_trait]
201    impl<U, S, A, O, M, I, V, TF, AK, PK> UserOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
202    where
203        U: AuthUser + AuthUserMeta + SqlxEntity,
204        S: AuthSession + AuthSessionMeta + SqlxEntity,
205        A: AuthAccount + AuthAccountMeta + SqlxEntity,
206        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
207        M: AuthMember + AuthMemberMeta + SqlxEntity,
208        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
209        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
210        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
211        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
212        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
213    {
214        type User = U;
215
216        async fn create_user(&self, create_user: CreateUser) -> AuthResult<U> {
217            let id = create_user.id.unwrap_or_else(|| Uuid::new_v4().to_string());
218            let now = Utc::now();
219
220            let sql = format!(
221                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *",
222                qi(U::table()),
223                qi(U::col_id()),
224                qi(U::col_email()),
225                qi(U::col_name()),
226                qi(U::col_image()),
227                qi(U::col_email_verified()),
228                qi(U::col_username()),
229                qi(U::col_display_username()),
230                qi(U::col_role()),
231                qi(U::col_created_at()),
232                qi(U::col_updated_at()),
233                qi(U::col_metadata()),
234            );
235            let user = sqlx::query_as::<_, U>(&sql)
236                .bind(&id)
237                .bind(&create_user.email)
238                .bind(&create_user.name)
239                .bind(&create_user.image)
240                .bind(create_user.email_verified.unwrap_or(false))
241                .bind(&create_user.username)
242                .bind(&create_user.display_username)
243                .bind(&create_user.role)
244                .bind(now)
245                .bind(now)
246                .bind(sqlx::types::Json(
247                    create_user.metadata.unwrap_or(serde_json::json!({})),
248                ))
249                .fetch_one(&self.pool)
250                .await?;
251
252            Ok(user)
253        }
254
255        async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<U>> {
256            let sql = format!(
257                "SELECT * FROM {} WHERE {} = $1",
258                qi(U::table()),
259                qi(U::col_id())
260            );
261            let user = sqlx::query_as::<_, U>(&sql)
262                .bind(id)
263                .fetch_optional(&self.pool)
264                .await?;
265            Ok(user)
266        }
267
268        async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<U>> {
269            let sql = format!(
270                "SELECT * FROM {} WHERE {} = $1",
271                qi(U::table()),
272                qi(U::col_email())
273            );
274            let user = sqlx::query_as::<_, U>(&sql)
275                .bind(email)
276                .fetch_optional(&self.pool)
277                .await?;
278            Ok(user)
279        }
280
281        async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<U>> {
282            let sql = format!(
283                "SELECT * FROM {} WHERE {} = $1",
284                qi(U::table()),
285                qi(U::col_username())
286            );
287            let user = sqlx::query_as::<_, U>(&sql)
288                .bind(username)
289                .fetch_optional(&self.pool)
290                .await?;
291            Ok(user)
292        }
293
294        async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<U> {
295            let mut query = sqlx::QueryBuilder::new(format!(
296                "UPDATE {} SET {} = NOW()",
297                qi(U::table()),
298                qi(U::col_updated_at())
299            ));
300            let mut has_updates = false;
301
302            if let Some(email) = &update.email {
303                query.push(format!(", {} = ", qi(U::col_email())));
304                query.push_bind(email);
305                has_updates = true;
306            }
307            if let Some(name) = &update.name {
308                query.push(format!(", {} = ", qi(U::col_name())));
309                query.push_bind(name);
310                has_updates = true;
311            }
312            if let Some(image) = &update.image {
313                query.push(format!(", {} = ", qi(U::col_image())));
314                query.push_bind(image);
315                has_updates = true;
316            }
317            if let Some(email_verified) = update.email_verified {
318                query.push(format!(", {} = ", qi(U::col_email_verified())));
319                query.push_bind(email_verified);
320                has_updates = true;
321            }
322            if let Some(username) = &update.username {
323                query.push(format!(", {} = ", qi(U::col_username())));
324                query.push_bind(username);
325                has_updates = true;
326            }
327            if let Some(display_username) = &update.display_username {
328                query.push(format!(", {} = ", qi(U::col_display_username())));
329                query.push_bind(display_username);
330                has_updates = true;
331            }
332            if let Some(role) = &update.role {
333                query.push(format!(", {} = ", qi(U::col_role())));
334                query.push_bind(role);
335                has_updates = true;
336            }
337            if let Some(banned) = update.banned {
338                query.push(format!(", {} = ", qi(U::col_banned())));
339                query.push_bind(banned);
340                has_updates = true;
341                // When explicitly unbanning, clear ban_reason and ban_expires
342                if !banned {
343                    query.push(format!(
344                        ", {} = NULL, {} = NULL",
345                        qi(U::col_ban_reason()),
346                        qi(U::col_ban_expires())
347                    ));
348                }
349            }
350            // Only process ban_reason and ban_expires when we are NOT
351            // explicitly unbanning.  When banned == Some(false) the block
352            // above already emits `ban_reason = NULL, ban_expires = NULL`,
353            // so applying these fields again would overwrite the NULLs.
354            if update.banned != Some(false) {
355                if let Some(ban_reason) = &update.ban_reason {
356                    query.push(format!(", {} = ", qi(U::col_ban_reason())));
357                    query.push_bind(ban_reason);
358                    has_updates = true;
359                }
360                if let Some(ban_expires) = update.ban_expires {
361                    query.push(format!(", {} = ", qi(U::col_ban_expires())));
362                    query.push_bind(ban_expires);
363                    has_updates = true;
364                }
365            }
366            if let Some(two_factor_enabled) = update.two_factor_enabled {
367                query.push(format!(", {} = ", qi(U::col_two_factor_enabled())));
368                query.push_bind(two_factor_enabled);
369                has_updates = true;
370            }
371            if let Some(metadata) = &update.metadata {
372                query.push(format!(", {} = ", qi(U::col_metadata())));
373                query.push_bind(sqlx::types::Json(metadata.clone()));
374                has_updates = true;
375            }
376
377            if !has_updates {
378                return self
379                    .get_user_by_id(id)
380                    .await?
381                    .ok_or(AuthError::UserNotFound);
382            }
383
384            query.push(format!(" WHERE {} = ", qi(U::col_id())));
385            query.push_bind(id);
386            query.push(" RETURNING *");
387
388            let user = query.build_query_as::<U>().fetch_one(&self.pool).await?;
389            Ok(user)
390        }
391
392        async fn delete_user(&self, id: &str) -> AuthResult<()> {
393            let sql = format!(
394                "DELETE FROM {} WHERE {} = $1",
395                qi(U::table()),
396                qi(U::col_id())
397            );
398            sqlx::query(&sql).bind(id).execute(&self.pool).await?;
399            Ok(())
400        }
401
402        async fn list_users(&self, params: ListUsersParams) -> AuthResult<(Vec<U>, usize)> {
403            let limit = params.limit.unwrap_or(100) as i64;
404            let offset = params.offset.unwrap_or(0) as i64;
405
406            // Build WHERE clause
407            let mut conditions: Vec<String> = Vec::new();
408            let mut bind_values: Vec<String> = Vec::new();
409
410            if let Some(search_value) = &params.search_value {
411                let field = params.search_field.as_deref().unwrap_or("email");
412                let col = qi(match field {
413                    "name" => U::col_name(),
414                    _ => U::col_email(),
415                });
416                let op = params.search_operator.as_deref().unwrap_or("contains");
417                let escaped = search_value.replace('%', "\\%").replace('_', "\\_");
418                let pattern = match op {
419                    "starts_with" => format!("{}%", escaped),
420                    "ends_with" => format!("%{}", escaped),
421                    _ => format!("%{}%", escaped),
422                };
423                let idx = bind_values.len() + 1;
424                conditions.push(format!("{} ILIKE ${}", col, idx));
425                bind_values.push(pattern);
426            }
427
428            if let Some(filter_value) = &params.filter_value {
429                let field = params.filter_field.as_deref().unwrap_or("email");
430                let col = qi(match field {
431                    "name" => U::col_name(),
432                    "role" => U::col_role(),
433                    _ => U::col_email(),
434                });
435                let op = params.filter_operator.as_deref().unwrap_or("eq");
436                let idx = bind_values.len() + 1;
437                match op {
438                    "contains" => {
439                        let escaped = filter_value.replace('%', "\\%").replace('_', "\\_");
440                        conditions.push(format!("{} ILIKE ${}", col, idx));
441                        bind_values.push(format!("%{}%", escaped));
442                    }
443                    "ne" => {
444                        conditions.push(format!("{} != ${}", col, idx));
445                        bind_values.push(filter_value.clone());
446                    }
447                    _ => {
448                        conditions.push(format!("{} = ${}", col, idx));
449                        bind_values.push(filter_value.clone());
450                    }
451                }
452            }
453
454            let where_clause = if conditions.is_empty() {
455                String::new()
456            } else {
457                format!(" WHERE {}", conditions.join(" AND "))
458            };
459
460            // Sort
461            let order_clause = if let Some(sort_by) = &params.sort_by {
462                let col = qi(match sort_by.as_str() {
463                    "name" => U::col_name(),
464                    "createdAt" | "created_at" => U::col_created_at(),
465                    _ => U::col_email(),
466                });
467                let dir = if params.sort_direction.as_deref() == Some("desc") {
468                    "DESC"
469                } else {
470                    "ASC"
471                };
472                format!(" ORDER BY {} {}", col, dir)
473            } else {
474                format!(" ORDER BY {} DESC", qi(U::col_created_at()))
475            };
476
477            // Count query
478            let count_sql = format!(
479                "SELECT COUNT(*) as count FROM {}{}",
480                qi(U::table()),
481                where_clause
482            );
483            let mut count_query = sqlx::query_scalar::<_, i64>(&count_sql);
484            for v in &bind_values {
485                count_query = count_query.bind(v);
486            }
487            let total = count_query.fetch_one(&self.pool).await? as usize;
488
489            // Data query
490            let limit_idx = bind_values.len() + 1;
491            let offset_idx = bind_values.len() + 2;
492            let data_sql = format!(
493                "SELECT * FROM {}{}{} LIMIT ${} OFFSET ${}",
494                qi(U::table()),
495                where_clause,
496                order_clause,
497                limit_idx,
498                offset_idx
499            );
500            let mut data_query = sqlx::query_as::<_, U>(&data_sql);
501            for v in &bind_values {
502                data_query = data_query.bind(v);
503            }
504            data_query = data_query.bind(limit).bind(offset);
505            let users = data_query.fetch_all(&self.pool).await?;
506
507            Ok((users, total))
508        }
509    }
510
511    // -- SessionOps --
512
513    #[async_trait]
514    impl<U, S, A, O, M, I, V, TF, AK, PK> SessionOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
515    where
516        U: AuthUser + AuthUserMeta + SqlxEntity,
517        S: AuthSession + AuthSessionMeta + SqlxEntity,
518        A: AuthAccount + AuthAccountMeta + SqlxEntity,
519        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
520        M: AuthMember + AuthMemberMeta + SqlxEntity,
521        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
522        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
523        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
524        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
525        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
526    {
527        type Session = S;
528
529        async fn create_session(&self, create_session: CreateSession) -> AuthResult<S> {
530            let id = Uuid::new_v4().to_string();
531            let token = format!("session_{}", Uuid::new_v4());
532            let now = Utc::now();
533
534            let sql = format!(
535                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *",
536                qi(S::table()),
537                qi(S::col_id()),
538                qi(S::col_user_id()),
539                qi(S::col_token()),
540                qi(S::col_expires_at()),
541                qi(S::col_created_at()),
542                qi(S::col_ip_address()),
543                qi(S::col_user_agent()),
544                qi(S::col_impersonated_by()),
545                qi(S::col_active_organization_id()),
546                qi(S::col_active()),
547            );
548            let session = sqlx::query_as::<_, S>(&sql)
549                .bind(&id)
550                .bind(&create_session.user_id)
551                .bind(&token)
552                .bind(create_session.expires_at)
553                .bind(now)
554                .bind(&create_session.ip_address)
555                .bind(&create_session.user_agent)
556                .bind(&create_session.impersonated_by)
557                .bind(&create_session.active_organization_id)
558                .bind(true)
559                .fetch_one(&self.pool)
560                .await?;
561
562            Ok(session)
563        }
564
565        async fn get_session(&self, token: &str) -> AuthResult<Option<S>> {
566            let sql = format!(
567                "SELECT * FROM {} WHERE {} = $1 AND {} = true",
568                qi(S::table()),
569                qi(S::col_token()),
570                qi(S::col_active())
571            );
572            let session = sqlx::query_as::<_, S>(&sql)
573                .bind(token)
574                .fetch_optional(&self.pool)
575                .await?;
576            Ok(session)
577        }
578
579        async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<S>> {
580            let sql = format!(
581                "SELECT * FROM {} WHERE {} = $1 AND {} = true ORDER BY {} DESC",
582                qi(S::table()),
583                qi(S::col_user_id()),
584                qi(S::col_active()),
585                qi(S::col_created_at())
586            );
587            let sessions = sqlx::query_as::<_, S>(&sql)
588                .bind(user_id)
589                .fetch_all(&self.pool)
590                .await?;
591            Ok(sessions)
592        }
593
594        async fn update_session_expiry(
595            &self,
596            token: &str,
597            expires_at: DateTime<Utc>,
598        ) -> AuthResult<()> {
599            let sql = format!(
600                "UPDATE {} SET {} = $1 WHERE {} = $2 AND {} = true",
601                qi(S::table()),
602                qi(S::col_expires_at()),
603                qi(S::col_token()),
604                qi(S::col_active())
605            );
606            sqlx::query(&sql)
607                .bind(expires_at)
608                .bind(token)
609                .execute(&self.pool)
610                .await?;
611            Ok(())
612        }
613
614        async fn delete_session(&self, token: &str) -> AuthResult<()> {
615            let sql = format!(
616                "DELETE FROM {} WHERE {} = $1",
617                qi(S::table()),
618                qi(S::col_token())
619            );
620            sqlx::query(&sql).bind(token).execute(&self.pool).await?;
621            Ok(())
622        }
623
624        async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
625            let sql = format!(
626                "DELETE FROM {} WHERE {} = $1",
627                qi(S::table()),
628                qi(S::col_user_id())
629            );
630            sqlx::query(&sql).bind(user_id).execute(&self.pool).await?;
631            Ok(())
632        }
633
634        async fn delete_expired_sessions(&self) -> AuthResult<usize> {
635            let sql = format!(
636                "DELETE FROM {} WHERE {} < NOW() OR {} = false",
637                qi(S::table()),
638                qi(S::col_expires_at()),
639                qi(S::col_active())
640            );
641            let result = sqlx::query(&sql).execute(&self.pool).await?;
642            Ok(result.rows_affected() as usize)
643        }
644
645        async fn update_session_active_organization(
646            &self,
647            token: &str,
648            organization_id: Option<&str>,
649        ) -> AuthResult<S> {
650            let sql = format!(
651                "UPDATE {} SET {} = $1, {} = NOW() WHERE {} = $2 AND {} = true RETURNING *",
652                qi(S::table()),
653                qi(S::col_active_organization_id()),
654                qi(S::col_updated_at()),
655                qi(S::col_token()),
656                qi(S::col_active())
657            );
658            let session = sqlx::query_as::<_, S>(&sql)
659                .bind(organization_id)
660                .bind(token)
661                .fetch_one(&self.pool)
662                .await?;
663            Ok(session)
664        }
665    }
666
667    // -- AccountOps --
668
669    #[async_trait]
670    impl<U, S, A, O, M, I, V, TF, AK, PK> AccountOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
671    where
672        U: AuthUser + AuthUserMeta + SqlxEntity,
673        S: AuthSession + AuthSessionMeta + SqlxEntity,
674        A: AuthAccount + AuthAccountMeta + SqlxEntity,
675        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
676        M: AuthMember + AuthMemberMeta + SqlxEntity,
677        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
678        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
679        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
680        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
681        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
682    {
683        type Account = A;
684
685        async fn create_account(&self, create_account: CreateAccount) -> AuthResult<A> {
686            let id = Uuid::new_v4().to_string();
687            let now = Utc::now();
688
689            let sql = format!(
690                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
691                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *",
692                qi(A::table()),
693                qi(A::col_id()),
694                qi(A::col_account_id()),
695                qi(A::col_provider_id()),
696                qi(A::col_user_id()),
697                qi(A::col_access_token()),
698                qi(A::col_refresh_token()),
699                qi(A::col_id_token()),
700                qi(A::col_access_token_expires_at()),
701                qi(A::col_refresh_token_expires_at()),
702                qi(A::col_scope()),
703                qi(A::col_password()),
704                qi(A::col_created_at()),
705                qi(A::col_updated_at()),
706            );
707            let account = sqlx::query_as::<_, A>(&sql)
708                .bind(&id)
709                .bind(&create_account.account_id)
710                .bind(&create_account.provider_id)
711                .bind(&create_account.user_id)
712                .bind(&create_account.access_token)
713                .bind(&create_account.refresh_token)
714                .bind(&create_account.id_token)
715                .bind(create_account.access_token_expires_at)
716                .bind(create_account.refresh_token_expires_at)
717                .bind(&create_account.scope)
718                .bind(&create_account.password)
719                .bind(now)
720                .bind(now)
721                .fetch_one(&self.pool)
722                .await?;
723
724            Ok(account)
725        }
726
727        async fn get_account(
728            &self,
729            provider: &str,
730            provider_account_id: &str,
731        ) -> AuthResult<Option<A>> {
732            let sql = format!(
733                "SELECT * FROM {} WHERE {} = $1 AND {} = $2",
734                qi(A::table()),
735                qi(A::col_provider_id()),
736                qi(A::col_account_id())
737            );
738            let account = sqlx::query_as::<_, A>(&sql)
739                .bind(provider)
740                .bind(provider_account_id)
741                .fetch_optional(&self.pool)
742                .await?;
743            Ok(account)
744        }
745
746        async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<A>> {
747            let sql = format!(
748                "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
749                qi(A::table()),
750                qi(A::col_user_id()),
751                qi(A::col_created_at())
752            );
753            let accounts = sqlx::query_as::<_, A>(&sql)
754                .bind(user_id)
755                .fetch_all(&self.pool)
756                .await?;
757            Ok(accounts)
758        }
759
760        async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<A> {
761            let mut query = sqlx::QueryBuilder::new(format!(
762                "UPDATE {} SET {} = NOW()",
763                qi(A::table()),
764                qi(A::col_updated_at())
765            ));
766
767            if let Some(access_token) = &update.access_token {
768                query.push(format!(", {} = ", qi(A::col_access_token())));
769                query.push_bind(access_token);
770            }
771            if let Some(refresh_token) = &update.refresh_token {
772                query.push(format!(", {} = ", qi(A::col_refresh_token())));
773                query.push_bind(refresh_token);
774            }
775            if let Some(id_token) = &update.id_token {
776                query.push(format!(", {} = ", qi(A::col_id_token())));
777                query.push_bind(id_token);
778            }
779            if let Some(access_token_expires_at) = &update.access_token_expires_at {
780                query.push(format!(", {} = ", qi(A::col_access_token_expires_at())));
781                query.push_bind(access_token_expires_at);
782            }
783            if let Some(refresh_token_expires_at) = &update.refresh_token_expires_at {
784                query.push(format!(", {} = ", qi(A::col_refresh_token_expires_at())));
785                query.push_bind(refresh_token_expires_at);
786            }
787            if let Some(scope) = &update.scope {
788                query.push(format!(", {} = ", qi(A::col_scope())));
789                query.push_bind(scope);
790            }
791            if let Some(password) = &update.password {
792                query.push(format!(", {} = ", qi(A::col_password())));
793                query.push_bind(password);
794            }
795
796            query.push(format!(" WHERE {} = ", qi(A::col_id())));
797            query.push_bind(id);
798            query.push(" RETURNING *");
799
800            let account = query.build_query_as::<A>().fetch_one(&self.pool).await?;
801            Ok(account)
802        }
803
804        async fn delete_account(&self, id: &str) -> AuthResult<()> {
805            let sql = format!(
806                "DELETE FROM {} WHERE {} = $1",
807                qi(A::table()),
808                qi(A::col_id())
809            );
810            sqlx::query(&sql).bind(id).execute(&self.pool).await?;
811            Ok(())
812        }
813    }
814
815    // -- VerificationOps --
816
817    #[async_trait]
818    impl<U, S, A, O, M, I, V, TF, AK, PK> VerificationOps
819        for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
820    where
821        U: AuthUser + AuthUserMeta + SqlxEntity,
822        S: AuthSession + AuthSessionMeta + SqlxEntity,
823        A: AuthAccount + AuthAccountMeta + SqlxEntity,
824        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
825        M: AuthMember + AuthMemberMeta + SqlxEntity,
826        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
827        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
828        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
829        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
830        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
831    {
832        type Verification = V;
833
834        async fn create_verification(
835            &self,
836            create_verification: CreateVerification,
837        ) -> AuthResult<V> {
838            let id = Uuid::new_v4().to_string();
839            let now = Utc::now();
840
841            let sql = format!(
842                "INSERT INTO {} ({}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *",
843                qi(V::table()),
844                qi(V::col_id()),
845                qi(V::col_identifier()),
846                qi(V::col_value()),
847                qi(V::col_expires_at()),
848                qi(V::col_created_at()),
849                qi(V::col_updated_at()),
850            );
851            let verification = sqlx::query_as::<_, V>(&sql)
852                .bind(&id)
853                .bind(&create_verification.identifier)
854                .bind(&create_verification.value)
855                .bind(create_verification.expires_at)
856                .bind(now)
857                .bind(now)
858                .fetch_one(&self.pool)
859                .await?;
860
861            Ok(verification)
862        }
863
864        async fn get_verification(&self, identifier: &str, value: &str) -> AuthResult<Option<V>> {
865            let sql = format!(
866                "SELECT * FROM {} WHERE {} = $1 AND {} = $2 AND {} > NOW()",
867                qi(V::table()),
868                qi(V::col_identifier()),
869                qi(V::col_value()),
870                qi(V::col_expires_at())
871            );
872            let verification = sqlx::query_as::<_, V>(&sql)
873                .bind(identifier)
874                .bind(value)
875                .fetch_optional(&self.pool)
876                .await?;
877            Ok(verification)
878        }
879
880        async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<V>> {
881            let sql = format!(
882                "SELECT * FROM {} WHERE {} = $1 AND {} > NOW()",
883                qi(V::table()),
884                qi(V::col_value()),
885                qi(V::col_expires_at())
886            );
887            let verification = sqlx::query_as::<_, V>(&sql)
888                .bind(value)
889                .fetch_optional(&self.pool)
890                .await?;
891            Ok(verification)
892        }
893
894        async fn get_verification_by_identifier(&self, identifier: &str) -> AuthResult<Option<V>> {
895            let sql = format!(
896                "SELECT * FROM {} WHERE {} = $1 AND {} > NOW()",
897                qi(V::table()),
898                qi(V::col_identifier()),
899                qi(V::col_expires_at())
900            );
901            let verification = sqlx::query_as::<_, V>(&sql)
902                .bind(identifier)
903                .fetch_optional(&self.pool)
904                .await?;
905            Ok(verification)
906        }
907
908        async fn consume_verification(
909            &self,
910            identifier: &str,
911            value: &str,
912        ) -> AuthResult<Option<V>> {
913            let sql = format!(
914                "DELETE FROM {tbl} WHERE {id} IN (\
915                    SELECT {id} FROM {tbl} \
916                    WHERE {ident} = $1 AND {val} = $2 AND {exp} > NOW() \
917                    ORDER BY {ca} DESC \
918                    LIMIT 1\
919                ) RETURNING *",
920                tbl = qi(V::table()),
921                id = qi(V::col_id()),
922                ident = qi(V::col_identifier()),
923                val = qi(V::col_value()),
924                exp = qi(V::col_expires_at()),
925                ca = qi(V::col_created_at()),
926            );
927            let verification = sqlx::query_as::<_, V>(&sql)
928                .bind(identifier)
929                .bind(value)
930                .fetch_optional(&self.pool)
931                .await?;
932            Ok(verification)
933        }
934
935        async fn delete_verification(&self, id: &str) -> AuthResult<()> {
936            let sql = format!(
937                "DELETE FROM {} WHERE {} = $1",
938                qi(V::table()),
939                qi(V::col_id())
940            );
941            sqlx::query(&sql).bind(id).execute(&self.pool).await?;
942            Ok(())
943        }
944
945        async fn delete_expired_verifications(&self) -> AuthResult<usize> {
946            let sql = format!(
947                "DELETE FROM {} WHERE {} < NOW()",
948                qi(V::table()),
949                qi(V::col_expires_at())
950            );
951            let result = sqlx::query(&sql).execute(&self.pool).await?;
952            Ok(result.rows_affected() as usize)
953        }
954    }
955
956    // -- OrganizationOps --
957
958    #[async_trait]
959    impl<U, S, A, O, M, I, V, TF, AK, PK> OrganizationOps
960        for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
961    where
962        U: AuthUser + AuthUserMeta + SqlxEntity,
963        S: AuthSession + AuthSessionMeta + SqlxEntity,
964        A: AuthAccount + AuthAccountMeta + SqlxEntity,
965        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
966        M: AuthMember + AuthMemberMeta + SqlxEntity,
967        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
968        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
969        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
970        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
971        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
972    {
973        type Organization = O;
974
975        async fn create_organization(&self, create_org: CreateOrganization) -> AuthResult<O> {
976            let id = create_org.id.unwrap_or_else(|| Uuid::new_v4().to_string());
977            let now = Utc::now();
978
979            let sql = format!(
980                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *",
981                qi(O::table()),
982                qi(O::col_id()),
983                qi(O::col_name()),
984                qi(O::col_slug()),
985                qi(O::col_logo()),
986                qi(O::col_metadata()),
987                qi(O::col_created_at()),
988                qi(O::col_updated_at()),
989            );
990            let organization = sqlx::query_as::<_, O>(&sql)
991                .bind(&id)
992                .bind(&create_org.name)
993                .bind(&create_org.slug)
994                .bind(&create_org.logo)
995                .bind(sqlx::types::Json(
996                    create_org.metadata.unwrap_or(serde_json::json!({})),
997                ))
998                .bind(now)
999                .bind(now)
1000                .fetch_one(&self.pool)
1001                .await?;
1002
1003            Ok(organization)
1004        }
1005
1006        async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<O>> {
1007            let sql = format!(
1008                "SELECT * FROM {} WHERE {} = $1",
1009                qi(O::table()),
1010                qi(O::col_id())
1011            );
1012            let organization = sqlx::query_as::<_, O>(&sql)
1013                .bind(id)
1014                .fetch_optional(&self.pool)
1015                .await?;
1016            Ok(organization)
1017        }
1018
1019        async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<O>> {
1020            let sql = format!(
1021                "SELECT * FROM {} WHERE {} = $1",
1022                qi(O::table()),
1023                qi(O::col_slug())
1024            );
1025            let organization = sqlx::query_as::<_, O>(&sql)
1026                .bind(slug)
1027                .fetch_optional(&self.pool)
1028                .await?;
1029            Ok(organization)
1030        }
1031
1032        async fn update_organization(&self, id: &str, update: UpdateOrganization) -> AuthResult<O> {
1033            let mut query = sqlx::QueryBuilder::new(format!(
1034                "UPDATE {} SET {} = NOW()",
1035                qi(O::table()),
1036                qi(O::col_updated_at())
1037            ));
1038
1039            if let Some(name) = &update.name {
1040                query.push(format!(", {} = ", qi(O::col_name())));
1041                query.push_bind(name);
1042            }
1043            if let Some(slug) = &update.slug {
1044                query.push(format!(", {} = ", qi(O::col_slug())));
1045                query.push_bind(slug);
1046            }
1047            if let Some(logo) = &update.logo {
1048                query.push(format!(", {} = ", qi(O::col_logo())));
1049                query.push_bind(logo);
1050            }
1051            if let Some(metadata) = &update.metadata {
1052                query.push(format!(", {} = ", qi(O::col_metadata())));
1053                query.push_bind(sqlx::types::Json(metadata.clone()));
1054            }
1055
1056            query.push(format!(" WHERE {} = ", qi(O::col_id())));
1057            query.push_bind(id);
1058            query.push(" RETURNING *");
1059
1060            let organization = query.build_query_as::<O>().fetch_one(&self.pool).await?;
1061            Ok(organization)
1062        }
1063
1064        async fn delete_organization(&self, id: &str) -> AuthResult<()> {
1065            let sql = format!(
1066                "DELETE FROM {} WHERE {} = $1",
1067                qi(O::table()),
1068                qi(O::col_id())
1069            );
1070            sqlx::query(&sql).bind(id).execute(&self.pool).await?;
1071            Ok(())
1072        }
1073
1074        async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<O>> {
1075            let sql = format!(
1076                "SELECT o.* FROM {} o INNER JOIN {} m ON o.{} = m.{} WHERE m.{} = $1 ORDER BY o.{} DESC",
1077                qi(O::table()),
1078                qi(M::table()),
1079                qi(O::col_id()),
1080                qi(M::col_organization_id()),
1081                qi(M::col_user_id()),
1082                qi(O::col_created_at()),
1083            );
1084            let organizations = sqlx::query_as::<_, O>(&sql)
1085                .bind(user_id)
1086                .fetch_all(&self.pool)
1087                .await?;
1088            Ok(organizations)
1089        }
1090    }
1091
1092    // -- MemberOps --
1093
1094    #[async_trait]
1095    impl<U, S, A, O, M, I, V, TF, AK, PK> MemberOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1096    where
1097        U: AuthUser + AuthUserMeta + SqlxEntity,
1098        S: AuthSession + AuthSessionMeta + SqlxEntity,
1099        A: AuthAccount + AuthAccountMeta + SqlxEntity,
1100        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1101        M: AuthMember + AuthMemberMeta + SqlxEntity,
1102        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1103        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1104        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1105        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1106        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1107    {
1108        type Member = M;
1109
1110        async fn create_member(&self, create_member: CreateMember) -> AuthResult<M> {
1111            let id = Uuid::new_v4().to_string();
1112            let now = Utc::now();
1113
1114            let sql = format!(
1115                "INSERT INTO {} ({}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5) RETURNING *",
1116                qi(M::table()),
1117                qi(M::col_id()),
1118                qi(M::col_organization_id()),
1119                qi(M::col_user_id()),
1120                qi(M::col_role()),
1121                qi(M::col_created_at()),
1122            );
1123            let member = sqlx::query_as::<_, M>(&sql)
1124                .bind(&id)
1125                .bind(&create_member.organization_id)
1126                .bind(&create_member.user_id)
1127                .bind(&create_member.role)
1128                .bind(now)
1129                .fetch_one(&self.pool)
1130                .await?;
1131
1132            Ok(member)
1133        }
1134
1135        async fn get_member(&self, organization_id: &str, user_id: &str) -> AuthResult<Option<M>> {
1136            let sql = format!(
1137                "SELECT * FROM {} WHERE {} = $1 AND {} = $2",
1138                qi(M::table()),
1139                qi(M::col_organization_id()),
1140                qi(M::col_user_id())
1141            );
1142            let member = sqlx::query_as::<_, M>(&sql)
1143                .bind(organization_id)
1144                .bind(user_id)
1145                .fetch_optional(&self.pool)
1146                .await?;
1147            Ok(member)
1148        }
1149
1150        async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<M>> {
1151            let sql = format!(
1152                "SELECT * FROM {} WHERE {} = $1",
1153                qi(M::table()),
1154                qi(M::col_id())
1155            );
1156            let member = sqlx::query_as::<_, M>(&sql)
1157                .bind(id)
1158                .fetch_optional(&self.pool)
1159                .await?;
1160            Ok(member)
1161        }
1162
1163        async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<M> {
1164            let sql = format!(
1165                "UPDATE {} SET {} = $1 WHERE {} = $2 RETURNING *",
1166                qi(M::table()),
1167                qi(M::col_role()),
1168                qi(M::col_id())
1169            );
1170            let member = sqlx::query_as::<_, M>(&sql)
1171                .bind(role)
1172                .bind(member_id)
1173                .fetch_one(&self.pool)
1174                .await?;
1175            Ok(member)
1176        }
1177
1178        async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
1179            let sql = format!(
1180                "DELETE FROM {} WHERE {} = $1",
1181                qi(M::table()),
1182                qi(M::col_id())
1183            );
1184            sqlx::query(&sql)
1185                .bind(member_id)
1186                .execute(&self.pool)
1187                .await?;
1188            Ok(())
1189        }
1190
1191        async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<M>> {
1192            let sql = format!(
1193                "SELECT * FROM {} WHERE {} = $1 ORDER BY {} ASC",
1194                qi(M::table()),
1195                qi(M::col_organization_id()),
1196                qi(M::col_created_at())
1197            );
1198            let members = sqlx::query_as::<_, M>(&sql)
1199                .bind(organization_id)
1200                .fetch_all(&self.pool)
1201                .await?;
1202            Ok(members)
1203        }
1204
1205        async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
1206            let sql = format!(
1207                "SELECT COUNT(*) FROM {} WHERE {} = $1",
1208                qi(M::table()),
1209                qi(M::col_organization_id())
1210            );
1211            let count: (i64,) = sqlx::query_as(&sql)
1212                .bind(organization_id)
1213                .fetch_one(&self.pool)
1214                .await?;
1215            Ok(count.0 as usize)
1216        }
1217
1218        async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
1219            let sql = format!(
1220                "SELECT COUNT(*) FROM {} WHERE {} = $1 AND {} = 'owner'",
1221                qi(M::table()),
1222                qi(M::col_organization_id()),
1223                qi(M::col_role())
1224            );
1225            let count: (i64,) = sqlx::query_as(&sql)
1226                .bind(organization_id)
1227                .fetch_one(&self.pool)
1228                .await?;
1229            Ok(count.0 as usize)
1230        }
1231    }
1232
1233    // -- InvitationOps --
1234
1235    #[async_trait]
1236    impl<U, S, A, O, M, I, V, TF, AK, PK> InvitationOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1237    where
1238        U: AuthUser + AuthUserMeta + SqlxEntity,
1239        S: AuthSession + AuthSessionMeta + SqlxEntity,
1240        A: AuthAccount + AuthAccountMeta + SqlxEntity,
1241        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1242        M: AuthMember + AuthMemberMeta + SqlxEntity,
1243        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1244        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1245        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1246        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1247        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1248    {
1249        type Invitation = I;
1250
1251        async fn create_invitation(&self, create_inv: CreateInvitation) -> AuthResult<I> {
1252            let id = Uuid::new_v4().to_string();
1253            let now = Utc::now();
1254
1255            let sql = format!(
1256                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}) \
1257                 VALUES ($1, $2, $3, $4, 'pending', $5, $6, $7) RETURNING *",
1258                qi(I::table()),
1259                qi(I::col_id()),
1260                qi(I::col_organization_id()),
1261                qi(I::col_email()),
1262                qi(I::col_role()),
1263                qi(I::col_status()),
1264                qi(I::col_inviter_id()),
1265                qi(I::col_expires_at()),
1266                qi(I::col_created_at()),
1267            );
1268            let invitation = sqlx::query_as::<_, I>(&sql)
1269                .bind(&id)
1270                .bind(&create_inv.organization_id)
1271                .bind(&create_inv.email)
1272                .bind(&create_inv.role)
1273                .bind(&create_inv.inviter_id)
1274                .bind(create_inv.expires_at)
1275                .bind(now)
1276                .fetch_one(&self.pool)
1277                .await?;
1278
1279            Ok(invitation)
1280        }
1281
1282        async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<I>> {
1283            let sql = format!(
1284                "SELECT * FROM {} WHERE {} = $1",
1285                qi(I::table()),
1286                qi(I::col_id())
1287            );
1288            let invitation = sqlx::query_as::<_, I>(&sql)
1289                .bind(id)
1290                .fetch_optional(&self.pool)
1291                .await?;
1292            Ok(invitation)
1293        }
1294
1295        async fn get_pending_invitation(
1296            &self,
1297            organization_id: &str,
1298            email: &str,
1299        ) -> AuthResult<Option<I>> {
1300            let sql = format!(
1301                "SELECT * FROM {} WHERE {} = $1 AND LOWER({}) = LOWER($2) AND {} = 'pending'",
1302                qi(I::table()),
1303                qi(I::col_organization_id()),
1304                qi(I::col_email()),
1305                qi(I::col_status())
1306            );
1307            let invitation = sqlx::query_as::<_, I>(&sql)
1308                .bind(organization_id)
1309                .bind(email)
1310                .fetch_optional(&self.pool)
1311                .await?;
1312            Ok(invitation)
1313        }
1314
1315        async fn update_invitation_status(
1316            &self,
1317            id: &str,
1318            status: InvitationStatus,
1319        ) -> AuthResult<I> {
1320            let sql = format!(
1321                "UPDATE {} SET {} = $1 WHERE {} = $2 RETURNING *",
1322                qi(I::table()),
1323                qi(I::col_status()),
1324                qi(I::col_id())
1325            );
1326            let invitation = sqlx::query_as::<_, I>(&sql)
1327                .bind(status.to_string())
1328                .bind(id)
1329                .fetch_one(&self.pool)
1330                .await?;
1331            Ok(invitation)
1332        }
1333
1334        async fn list_organization_invitations(&self, organization_id: &str) -> AuthResult<Vec<I>> {
1335            let sql = format!(
1336                "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
1337                qi(I::table()),
1338                qi(I::col_organization_id()),
1339                qi(I::col_created_at())
1340            );
1341            let invitations = sqlx::query_as::<_, I>(&sql)
1342                .bind(organization_id)
1343                .fetch_all(&self.pool)
1344                .await?;
1345            Ok(invitations)
1346        }
1347
1348        async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<I>> {
1349            let sql = format!(
1350                "SELECT * FROM {} WHERE LOWER({}) = LOWER($1) AND {} = 'pending' AND {} > NOW() ORDER BY {} DESC",
1351                qi(I::table()),
1352                qi(I::col_email()),
1353                qi(I::col_status()),
1354                qi(I::col_expires_at()),
1355                qi(I::col_created_at())
1356            );
1357            let invitations = sqlx::query_as::<_, I>(&sql)
1358                .bind(email)
1359                .fetch_all(&self.pool)
1360                .await?;
1361            Ok(invitations)
1362        }
1363    }
1364
1365    // -- TwoFactorOps --
1366
1367    #[async_trait]
1368    impl<U, S, A, O, M, I, V, TF, AK, PK> TwoFactorOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1369    where
1370        U: AuthUser + AuthUserMeta + SqlxEntity,
1371        S: AuthSession + AuthSessionMeta + SqlxEntity,
1372        A: AuthAccount + AuthAccountMeta + SqlxEntity,
1373        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1374        M: AuthMember + AuthMemberMeta + SqlxEntity,
1375        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1376        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1377        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1378        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1379        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1380    {
1381        type TwoFactor = TF;
1382
1383        async fn create_two_factor(&self, create: CreateTwoFactor) -> AuthResult<TF> {
1384            let id = Uuid::new_v4().to_string();
1385            let now = Utc::now();
1386
1387            let sql = format!(
1388                "INSERT INTO {} ({}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *",
1389                qi(TF::table()),
1390                qi(TF::col_id()),
1391                qi(TF::col_secret()),
1392                qi(TF::col_backup_codes()),
1393                qi(TF::col_user_id()),
1394                qi(TF::col_created_at()),
1395                qi(TF::col_updated_at()),
1396            );
1397            let two_factor = sqlx::query_as::<_, TF>(&sql)
1398                .bind(&id)
1399                .bind(&create.secret)
1400                .bind(&create.backup_codes)
1401                .bind(&create.user_id)
1402                .bind(now)
1403                .bind(now)
1404                .fetch_one(&self.pool)
1405                .await?;
1406
1407            Ok(two_factor)
1408        }
1409
1410        async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TF>> {
1411            let sql = format!(
1412                "SELECT * FROM {} WHERE {} = $1",
1413                qi(TF::table()),
1414                qi(TF::col_user_id())
1415            );
1416            let two_factor = sqlx::query_as::<_, TF>(&sql)
1417                .bind(user_id)
1418                .fetch_optional(&self.pool)
1419                .await?;
1420            Ok(two_factor)
1421        }
1422
1423        async fn update_two_factor_backup_codes(
1424            &self,
1425            user_id: &str,
1426            backup_codes: &str,
1427        ) -> AuthResult<TF> {
1428            let sql = format!(
1429                "UPDATE {} SET {} = $1, {} = NOW() WHERE {} = $2 RETURNING *",
1430                qi(TF::table()),
1431                qi(TF::col_backup_codes()),
1432                qi(TF::col_updated_at()),
1433                qi(TF::col_user_id())
1434            );
1435            let two_factor = sqlx::query_as::<_, TF>(&sql)
1436                .bind(backup_codes)
1437                .bind(user_id)
1438                .fetch_one(&self.pool)
1439                .await?;
1440            Ok(two_factor)
1441        }
1442
1443        async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
1444            let sql = format!(
1445                "DELETE FROM {} WHERE {} = $1",
1446                qi(TF::table()),
1447                qi(TF::col_user_id())
1448            );
1449            sqlx::query(&sql).bind(user_id).execute(&self.pool).await?;
1450            Ok(())
1451        }
1452    }
1453
1454    // -- ApiKeyOps --
1455
1456    #[async_trait]
1457    impl<U, S, A, O, M, I, V, TF, AK, PK> ApiKeyOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1458    where
1459        U: AuthUser + AuthUserMeta + SqlxEntity,
1460        S: AuthSession + AuthSessionMeta + SqlxEntity,
1461        A: AuthAccount + AuthAccountMeta + SqlxEntity,
1462        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1463        M: AuthMember + AuthMemberMeta + SqlxEntity,
1464        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1465        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1466        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1467        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1468        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1469    {
1470        type ApiKey = AK;
1471
1472        async fn create_api_key(&self, input: CreateApiKey) -> AuthResult<AK> {
1473            let id = Uuid::new_v4().to_string();
1474            let now = Utc::now();
1475
1476            let sql = format!(
1477                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
1478                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::timestamptz, $15, $16, $17, $18) RETURNING *",
1479                qi(AK::table()),
1480                qi(AK::col_id()),
1481                qi(AK::col_name()),
1482                qi(AK::col_start()),
1483                qi(AK::col_prefix()),
1484                qi(AK::col_key_hash()),
1485                qi(AK::col_user_id()),
1486                qi(AK::col_refill_interval()),
1487                qi(AK::col_refill_amount()),
1488                qi(AK::col_enabled()),
1489                qi(AK::col_rate_limit_enabled()),
1490                qi(AK::col_rate_limit_time_window()),
1491                qi(AK::col_rate_limit_max()),
1492                qi(AK::col_remaining()),
1493                qi(AK::col_expires_at()),
1494                qi(AK::col_created_at()),
1495                qi(AK::col_updated_at()),
1496                qi(AK::col_permissions()),
1497                qi(AK::col_metadata()),
1498            );
1499            let api_key = sqlx::query_as::<_, AK>(&sql)
1500                .bind(&id)
1501                .bind(&input.name)
1502                .bind(&input.start)
1503                .bind(&input.prefix)
1504                .bind(&input.key_hash)
1505                .bind(&input.user_id)
1506                .bind(input.refill_interval)
1507                .bind(input.refill_amount)
1508                .bind(input.enabled)
1509                .bind(input.rate_limit_enabled)
1510                .bind(input.rate_limit_time_window)
1511                .bind(input.rate_limit_max)
1512                .bind(input.remaining)
1513                .bind(&input.expires_at)
1514                .bind(now)
1515                .bind(now)
1516                .bind(&input.permissions)
1517                .bind(&input.metadata)
1518                .fetch_one(&self.pool)
1519                .await?;
1520
1521            Ok(api_key)
1522        }
1523
1524        async fn get_api_key_by_id(&self, id: &str) -> AuthResult<Option<AK>> {
1525            let sql = format!(
1526                "SELECT * FROM {} WHERE {} = $1",
1527                qi(AK::table()),
1528                qi(AK::col_id())
1529            );
1530            let api_key = sqlx::query_as::<_, AK>(&sql)
1531                .bind(id)
1532                .fetch_optional(&self.pool)
1533                .await?;
1534            Ok(api_key)
1535        }
1536
1537        async fn get_api_key_by_hash(&self, hash: &str) -> AuthResult<Option<AK>> {
1538            let sql = format!(
1539                "SELECT * FROM {} WHERE {} = $1",
1540                qi(AK::table()),
1541                qi(AK::col_key_hash())
1542            );
1543            let api_key = sqlx::query_as::<_, AK>(&sql)
1544                .bind(hash)
1545                .fetch_optional(&self.pool)
1546                .await?;
1547            Ok(api_key)
1548        }
1549
1550        async fn list_api_keys_by_user(&self, user_id: &str) -> AuthResult<Vec<AK>> {
1551            let sql = format!(
1552                "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
1553                qi(AK::table()),
1554                qi(AK::col_user_id()),
1555                qi(AK::col_created_at())
1556            );
1557            let keys = sqlx::query_as::<_, AK>(&sql)
1558                .bind(user_id)
1559                .fetch_all(&self.pool)
1560                .await?;
1561            Ok(keys)
1562        }
1563
1564        async fn update_api_key(&self, id: &str, update: UpdateApiKey) -> AuthResult<AK> {
1565            let mut query = sqlx::QueryBuilder::new(format!(
1566                "UPDATE {} SET {} = NOW()",
1567                qi(AK::table()),
1568                qi(AK::col_updated_at())
1569            ));
1570
1571            if let Some(name) = &update.name {
1572                query.push(format!(", {} = ", qi(AK::col_name())));
1573                query.push_bind(name);
1574            }
1575            if let Some(enabled) = update.enabled {
1576                query.push(format!(", {} = ", qi(AK::col_enabled())));
1577                query.push_bind(enabled);
1578            }
1579            if let Some(remaining) = update.remaining {
1580                query.push(format!(", {} = ", qi(AK::col_remaining())));
1581                query.push_bind(remaining);
1582            }
1583            if let Some(rate_limit_enabled) = update.rate_limit_enabled {
1584                query.push(format!(", {} = ", qi(AK::col_rate_limit_enabled())));
1585                query.push_bind(rate_limit_enabled);
1586            }
1587            if let Some(rate_limit_time_window) = update.rate_limit_time_window {
1588                query.push(format!(", {} = ", qi(AK::col_rate_limit_time_window())));
1589                query.push_bind(rate_limit_time_window);
1590            }
1591            if let Some(rate_limit_max) = update.rate_limit_max {
1592                query.push(format!(", {} = ", qi(AK::col_rate_limit_max())));
1593                query.push_bind(rate_limit_max);
1594            }
1595            if let Some(refill_interval) = update.refill_interval {
1596                query.push(format!(", {} = ", qi(AK::col_refill_interval())));
1597                query.push_bind(refill_interval);
1598            }
1599            if let Some(refill_amount) = update.refill_amount {
1600                query.push(format!(", {} = ", qi(AK::col_refill_amount())));
1601                query.push_bind(refill_amount);
1602            }
1603            if let Some(permissions) = &update.permissions {
1604                query.push(format!(", {} = ", qi(AK::col_permissions())));
1605                query.push_bind(permissions);
1606            }
1607            if let Some(metadata) = &update.metadata {
1608                query.push(format!(", {} = ", qi(AK::col_metadata())));
1609                query.push_bind(metadata);
1610            }
1611
1612            query.push(format!(" WHERE {} = ", qi(AK::col_id())));
1613            query.push_bind(id);
1614            query.push(" RETURNING *");
1615
1616            let api_key = query
1617                .build_query_as::<AK>()
1618                .fetch_one(&self.pool)
1619                .await
1620                .map_err(|err| match err {
1621                    sqlx::Error::RowNotFound => AuthError::not_found("API key not found"),
1622                    other => AuthError::from(other),
1623                })?;
1624            Ok(api_key)
1625        }
1626
1627        async fn delete_api_key(&self, id: &str) -> AuthResult<()> {
1628            let sql = format!(
1629                "DELETE FROM {} WHERE {} = $1",
1630                qi(AK::table()),
1631                qi(AK::col_id())
1632            );
1633            sqlx::query(&sql).bind(id).execute(&self.pool).await?;
1634            Ok(())
1635        }
1636    }
1637
1638    // -- PasskeyOps --
1639
1640    #[async_trait]
1641    impl<U, S, A, O, M, I, V, TF, AK, PK> PasskeyOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1642    where
1643        U: AuthUser + AuthUserMeta + SqlxEntity,
1644        S: AuthSession + AuthSessionMeta + SqlxEntity,
1645        A: AuthAccount + AuthAccountMeta + SqlxEntity,
1646        O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1647        M: AuthMember + AuthMemberMeta + SqlxEntity,
1648        I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1649        V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1650        TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1651        AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1652        PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1653    {
1654        type Passkey = PK;
1655
1656        async fn create_passkey(&self, input: CreatePasskey) -> AuthResult<PK> {
1657            let id = Uuid::new_v4().to_string();
1658            let now = Utc::now();
1659            let counter = i64::try_from(input.counter)
1660                .map_err(|_| AuthError::bad_request("Passkey counter exceeds i64 range"))?;
1661
1662            let sql = format!(
1663                "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
1664                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *",
1665                qi(PK::table()),
1666                qi(PK::col_id()),
1667                qi(PK::col_name()),
1668                qi(PK::col_public_key()),
1669                qi(PK::col_user_id()),
1670                qi(PK::col_credential_id()),
1671                qi(PK::col_counter()),
1672                qi(PK::col_device_type()),
1673                qi(PK::col_backed_up()),
1674                qi(PK::col_transports()),
1675                qi(PK::col_created_at()),
1676            );
1677            let passkey = sqlx::query_as::<_, PK>(&sql)
1678                .bind(&id)
1679                .bind(&input.name)
1680                .bind(&input.public_key)
1681                .bind(&input.user_id)
1682                .bind(&input.credential_id)
1683                .bind(counter)
1684                .bind(&input.device_type)
1685                .bind(input.backed_up)
1686                .bind(&input.transports)
1687                .bind(now)
1688                .fetch_one(&self.pool)
1689                .await
1690                .map_err(|e| match e {
1691                    sqlx::Error::Database(ref db_err) if db_err.is_unique_violation() => {
1692                        AuthError::conflict("A passkey with this credential ID already exists")
1693                    }
1694                    other => AuthError::from(other),
1695                })?;
1696
1697            Ok(passkey)
1698        }
1699
1700        async fn get_passkey_by_id(&self, id: &str) -> AuthResult<Option<PK>> {
1701            let sql = format!(
1702                "SELECT * FROM {} WHERE {} = $1",
1703                qi(PK::table()),
1704                qi(PK::col_id())
1705            );
1706            let passkey = sqlx::query_as::<_, PK>(&sql)
1707                .bind(id)
1708                .fetch_optional(&self.pool)
1709                .await?;
1710            Ok(passkey)
1711        }
1712
1713        async fn get_passkey_by_credential_id(
1714            &self,
1715            credential_id: &str,
1716        ) -> AuthResult<Option<PK>> {
1717            let sql = format!(
1718                "SELECT * FROM {} WHERE {} = $1",
1719                qi(PK::table()),
1720                qi(PK::col_credential_id())
1721            );
1722            let passkey = sqlx::query_as::<_, PK>(&sql)
1723                .bind(credential_id)
1724                .fetch_optional(&self.pool)
1725                .await?;
1726            Ok(passkey)
1727        }
1728
1729        async fn list_passkeys_by_user(&self, user_id: &str) -> AuthResult<Vec<PK>> {
1730            let sql = format!(
1731                "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
1732                qi(PK::table()),
1733                qi(PK::col_user_id()),
1734                qi(PK::col_created_at())
1735            );
1736            let passkeys = sqlx::query_as::<_, PK>(&sql)
1737                .bind(user_id)
1738                .fetch_all(&self.pool)
1739                .await?;
1740            Ok(passkeys)
1741        }
1742
1743        async fn update_passkey_counter(&self, id: &str, counter: u64) -> AuthResult<PK> {
1744            let counter = i64::try_from(counter)
1745                .map_err(|_| AuthError::bad_request("Passkey counter exceeds i64 range"))?;
1746            let sql = format!(
1747                "UPDATE {} SET {} = $2 WHERE {} = $1 RETURNING *",
1748                qi(PK::table()),
1749                qi(PK::col_counter()),
1750                qi(PK::col_id())
1751            );
1752            let passkey = sqlx::query_as::<_, PK>(&sql)
1753                .bind(id)
1754                .bind(counter)
1755                .fetch_one(&self.pool)
1756                .await
1757                .map_err(|err| match err {
1758                    sqlx::Error::RowNotFound => AuthError::not_found("Passkey not found"),
1759                    other => AuthError::from(other),
1760                })?;
1761            Ok(passkey)
1762        }
1763
1764        async fn update_passkey_name(&self, id: &str, name: &str) -> AuthResult<PK> {
1765            let sql = format!(
1766                "UPDATE {} SET {} = $2 WHERE {} = $1 RETURNING *",
1767                qi(PK::table()),
1768                qi(PK::col_name()),
1769                qi(PK::col_id())
1770            );
1771            let passkey = sqlx::query_as::<_, PK>(&sql)
1772                .bind(id)
1773                .bind(name)
1774                .fetch_one(&self.pool)
1775                .await
1776                .map_err(|err| match err {
1777                    sqlx::Error::RowNotFound => AuthError::not_found("Passkey not found"),
1778                    other => AuthError::from(other),
1779                })?;
1780            Ok(passkey)
1781        }
1782
1783        async fn delete_passkey(&self, id: &str) -> AuthResult<()> {
1784            let sql = format!(
1785                "DELETE FROM {} WHERE {} = $1",
1786                qi(PK::table()),
1787                qi(PK::col_id())
1788            );
1789            sqlx::query(&sql).bind(id).execute(&self.pool).await?;
1790            Ok(())
1791        }
1792    }
1793}
1794
1795#[cfg(feature = "sqlx-postgres")]
1796pub use sqlx_adapter::{SqlxAdapter, SqlxEntity};