1pub use super::traits::{
2 AccountOps, ApiKeyOps, InvitationOps, MemberOps, OrganizationOps, PasskeyOps, SessionOps,
3 TwoFactorOps, UserOps, VerificationOps,
4};
5
6pub trait DatabaseAdapter:
15 UserOps
16 + SessionOps
17 + AccountOps
18 + VerificationOps
19 + OrganizationOps
20 + MemberOps
21 + InvitationOps
22 + TwoFactorOps
23 + ApiKeyOps
24 + PasskeyOps
25{
26}
27
28impl<T> DatabaseAdapter for T where
29 T: UserOps
30 + SessionOps
31 + AccountOps
32 + VerificationOps
33 + OrganizationOps
34 + MemberOps
35 + InvitationOps
36 + TwoFactorOps
37 + ApiKeyOps
38 + PasskeyOps
39{
40}
41
42#[cfg(feature = "sqlx-postgres")]
43pub mod sqlx_adapter {
44 use super::*;
45 use async_trait::async_trait;
46 use chrono::{DateTime, Utc};
47
48 use crate::entity::{
49 AuthAccount, AuthAccountMeta, AuthApiKey, AuthApiKeyMeta, AuthInvitation,
50 AuthInvitationMeta, AuthMember, AuthMemberMeta, AuthOrganization, AuthOrganizationMeta,
51 AuthPasskey, AuthPasskeyMeta, AuthSession, AuthSessionMeta, AuthTwoFactor,
52 AuthTwoFactorMeta, AuthUser, AuthUserMeta, AuthVerification, AuthVerificationMeta,
53 };
54 use crate::error::{AuthError, AuthResult};
55 use crate::types::{
56 Account, ApiKey, CreateAccount, CreateApiKey, CreateInvitation, CreateMember,
57 CreateOrganization, CreatePasskey, CreateSession, CreateTwoFactor, CreateUser,
58 CreateVerification, Invitation, InvitationStatus, ListUsersParams, Member, Organization,
59 Passkey, Session, TwoFactor, UpdateAccount, UpdateApiKey, UpdateOrganization, UpdateUser,
60 User, Verification,
61 };
62 use sqlx::PgPool;
63 use sqlx::postgres::PgRow;
64 use std::marker::PhantomData;
65 use uuid::Uuid;
66
67 #[inline]
73 fn qi(ident: &str) -> String {
74 format!("\"{}\"", ident.replace('"', "\"\""))
75 }
76
77 pub trait SqlxEntity:
84 for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
85 {
86 }
87
88 impl<T> SqlxEntity for T where
89 T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
90 {
91 }
92
93 type SqlxAdapterEntities<U, S, A, O, M, I, V, TF, AK, PK> = (U, S, A, O, M, I, V, TF, AK, PK);
94
95 pub struct SqlxAdapter<
100 U = User,
101 S = Session,
102 A = Account,
103 O = Organization,
104 M = Member,
105 I = Invitation,
106 V = Verification,
107 TF = TwoFactor,
108 AK = ApiKey,
109 PK = Passkey,
110 > {
111 pool: PgPool,
112 #[allow(clippy::type_complexity)]
113 _phantom: PhantomData<SqlxAdapterEntities<U, S, A, O, M, I, V, TF, AK, PK>>,
114 }
115
116 impl SqlxAdapter {
118 pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
119 let pool = PgPool::connect(database_url).await?;
120 Ok(Self {
121 pool,
122 _phantom: PhantomData,
123 })
124 }
125
126 pub async fn with_config(
127 database_url: &str,
128 config: PoolConfig,
129 ) -> Result<Self, sqlx::Error> {
130 let pool = sqlx::postgres::PgPoolOptions::new()
131 .max_connections(config.max_connections)
132 .min_connections(config.min_connections)
133 .acquire_timeout(config.acquire_timeout)
134 .idle_timeout(config.idle_timeout)
135 .max_lifetime(config.max_lifetime)
136 .connect(database_url)
137 .await?;
138 Ok(Self {
139 pool,
140 _phantom: PhantomData,
141 })
142 }
143 }
144
145 impl<U, S, A, O, M, I, V, TF, AK, PK> SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK> {
147 pub fn from_pool(pool: PgPool) -> Self {
148 Self {
149 pool,
150 _phantom: PhantomData,
151 }
152 }
153
154 pub async fn test_connection(&self) -> Result<(), sqlx::Error> {
155 sqlx::query("SELECT 1").execute(&self.pool).await?;
156 Ok(())
157 }
158
159 pub fn pool_stats(&self) -> PoolStats {
160 PoolStats {
161 size: self.pool.size(),
162 idle: self.pool.num_idle(),
163 }
164 }
165
166 pub async fn close(&self) {
167 self.pool.close().await;
168 }
169 }
170
171 #[derive(Debug, Clone)]
172 pub struct PoolConfig {
173 pub max_connections: u32,
174 pub min_connections: u32,
175 pub acquire_timeout: std::time::Duration,
176 pub idle_timeout: Option<std::time::Duration>,
177 pub max_lifetime: Option<std::time::Duration>,
178 }
179
180 impl Default for PoolConfig {
181 fn default() -> Self {
182 Self {
183 max_connections: 10,
184 min_connections: 0,
185 acquire_timeout: std::time::Duration::from_secs(30),
186 idle_timeout: Some(std::time::Duration::from_secs(600)),
187 max_lifetime: Some(std::time::Duration::from_secs(1800)),
188 }
189 }
190 }
191
192 #[derive(Debug, Clone)]
193 pub struct PoolStats {
194 pub size: u32,
195 pub idle: usize,
196 }
197
198 #[async_trait]
201 impl<U, S, A, O, M, I, V, TF, AK, PK> UserOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
202 where
203 U: AuthUser + AuthUserMeta + SqlxEntity,
204 S: AuthSession + AuthSessionMeta + SqlxEntity,
205 A: AuthAccount + AuthAccountMeta + SqlxEntity,
206 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
207 M: AuthMember + AuthMemberMeta + SqlxEntity,
208 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
209 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
210 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
211 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
212 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
213 {
214 type User = U;
215
216 async fn create_user(&self, create_user: CreateUser) -> AuthResult<U> {
217 let id = create_user.id.unwrap_or_else(|| Uuid::new_v4().to_string());
218 let now = Utc::now();
219
220 let sql = format!(
221 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *",
222 qi(U::table()),
223 qi(U::col_id()),
224 qi(U::col_email()),
225 qi(U::col_name()),
226 qi(U::col_image()),
227 qi(U::col_email_verified()),
228 qi(U::col_username()),
229 qi(U::col_display_username()),
230 qi(U::col_role()),
231 qi(U::col_created_at()),
232 qi(U::col_updated_at()),
233 qi(U::col_metadata()),
234 );
235 let user = sqlx::query_as::<_, U>(&sql)
236 .bind(&id)
237 .bind(&create_user.email)
238 .bind(&create_user.name)
239 .bind(&create_user.image)
240 .bind(create_user.email_verified.unwrap_or(false))
241 .bind(&create_user.username)
242 .bind(&create_user.display_username)
243 .bind(&create_user.role)
244 .bind(now)
245 .bind(now)
246 .bind(sqlx::types::Json(
247 create_user.metadata.unwrap_or(serde_json::json!({})),
248 ))
249 .fetch_one(&self.pool)
250 .await?;
251
252 Ok(user)
253 }
254
255 async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<U>> {
256 let sql = format!(
257 "SELECT * FROM {} WHERE {} = $1",
258 qi(U::table()),
259 qi(U::col_id())
260 );
261 let user = sqlx::query_as::<_, U>(&sql)
262 .bind(id)
263 .fetch_optional(&self.pool)
264 .await?;
265 Ok(user)
266 }
267
268 async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<U>> {
269 let sql = format!(
270 "SELECT * FROM {} WHERE {} = $1",
271 qi(U::table()),
272 qi(U::col_email())
273 );
274 let user = sqlx::query_as::<_, U>(&sql)
275 .bind(email)
276 .fetch_optional(&self.pool)
277 .await?;
278 Ok(user)
279 }
280
281 async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<U>> {
282 let sql = format!(
283 "SELECT * FROM {} WHERE {} = $1",
284 qi(U::table()),
285 qi(U::col_username())
286 );
287 let user = sqlx::query_as::<_, U>(&sql)
288 .bind(username)
289 .fetch_optional(&self.pool)
290 .await?;
291 Ok(user)
292 }
293
294 async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<U> {
295 let mut query = sqlx::QueryBuilder::new(format!(
296 "UPDATE {} SET {} = NOW()",
297 qi(U::table()),
298 qi(U::col_updated_at())
299 ));
300 let mut has_updates = false;
301
302 if let Some(email) = &update.email {
303 query.push(format!(", {} = ", qi(U::col_email())));
304 query.push_bind(email);
305 has_updates = true;
306 }
307 if let Some(name) = &update.name {
308 query.push(format!(", {} = ", qi(U::col_name())));
309 query.push_bind(name);
310 has_updates = true;
311 }
312 if let Some(image) = &update.image {
313 query.push(format!(", {} = ", qi(U::col_image())));
314 query.push_bind(image);
315 has_updates = true;
316 }
317 if let Some(email_verified) = update.email_verified {
318 query.push(format!(", {} = ", qi(U::col_email_verified())));
319 query.push_bind(email_verified);
320 has_updates = true;
321 }
322 if let Some(username) = &update.username {
323 query.push(format!(", {} = ", qi(U::col_username())));
324 query.push_bind(username);
325 has_updates = true;
326 }
327 if let Some(display_username) = &update.display_username {
328 query.push(format!(", {} = ", qi(U::col_display_username())));
329 query.push_bind(display_username);
330 has_updates = true;
331 }
332 if let Some(role) = &update.role {
333 query.push(format!(", {} = ", qi(U::col_role())));
334 query.push_bind(role);
335 has_updates = true;
336 }
337 if let Some(banned) = update.banned {
338 query.push(format!(", {} = ", qi(U::col_banned())));
339 query.push_bind(banned);
340 has_updates = true;
341 if !banned {
343 query.push(format!(
344 ", {} = NULL, {} = NULL",
345 qi(U::col_ban_reason()),
346 qi(U::col_ban_expires())
347 ));
348 }
349 }
350 if update.banned != Some(false) {
355 if let Some(ban_reason) = &update.ban_reason {
356 query.push(format!(", {} = ", qi(U::col_ban_reason())));
357 query.push_bind(ban_reason);
358 has_updates = true;
359 }
360 if let Some(ban_expires) = update.ban_expires {
361 query.push(format!(", {} = ", qi(U::col_ban_expires())));
362 query.push_bind(ban_expires);
363 has_updates = true;
364 }
365 }
366 if let Some(two_factor_enabled) = update.two_factor_enabled {
367 query.push(format!(", {} = ", qi(U::col_two_factor_enabled())));
368 query.push_bind(two_factor_enabled);
369 has_updates = true;
370 }
371 if let Some(metadata) = &update.metadata {
372 query.push(format!(", {} = ", qi(U::col_metadata())));
373 query.push_bind(sqlx::types::Json(metadata.clone()));
374 has_updates = true;
375 }
376
377 if !has_updates {
378 return self
379 .get_user_by_id(id)
380 .await?
381 .ok_or(AuthError::UserNotFound);
382 }
383
384 query.push(format!(" WHERE {} = ", qi(U::col_id())));
385 query.push_bind(id);
386 query.push(" RETURNING *");
387
388 let user = query.build_query_as::<U>().fetch_one(&self.pool).await?;
389 Ok(user)
390 }
391
392 async fn delete_user(&self, id: &str) -> AuthResult<()> {
393 let sql = format!(
394 "DELETE FROM {} WHERE {} = $1",
395 qi(U::table()),
396 qi(U::col_id())
397 );
398 sqlx::query(&sql).bind(id).execute(&self.pool).await?;
399 Ok(())
400 }
401
402 async fn list_users(&self, params: ListUsersParams) -> AuthResult<(Vec<U>, usize)> {
403 let limit = params.limit.unwrap_or(100) as i64;
404 let offset = params.offset.unwrap_or(0) as i64;
405
406 let mut conditions: Vec<String> = Vec::new();
408 let mut bind_values: Vec<String> = Vec::new();
409
410 if let Some(search_value) = ¶ms.search_value {
411 let field = params.search_field.as_deref().unwrap_or("email");
412 let col = qi(match field {
413 "name" => U::col_name(),
414 _ => U::col_email(),
415 });
416 let op = params.search_operator.as_deref().unwrap_or("contains");
417 let escaped = search_value.replace('%', "\\%").replace('_', "\\_");
418 let pattern = match op {
419 "starts_with" => format!("{}%", escaped),
420 "ends_with" => format!("%{}", escaped),
421 _ => format!("%{}%", escaped),
422 };
423 let idx = bind_values.len() + 1;
424 conditions.push(format!("{} ILIKE ${}", col, idx));
425 bind_values.push(pattern);
426 }
427
428 if let Some(filter_value) = ¶ms.filter_value {
429 let field = params.filter_field.as_deref().unwrap_or("email");
430 let col = qi(match field {
431 "name" => U::col_name(),
432 "role" => U::col_role(),
433 _ => U::col_email(),
434 });
435 let op = params.filter_operator.as_deref().unwrap_or("eq");
436 let idx = bind_values.len() + 1;
437 match op {
438 "contains" => {
439 let escaped = filter_value.replace('%', "\\%").replace('_', "\\_");
440 conditions.push(format!("{} ILIKE ${}", col, idx));
441 bind_values.push(format!("%{}%", escaped));
442 }
443 "ne" => {
444 conditions.push(format!("{} != ${}", col, idx));
445 bind_values.push(filter_value.clone());
446 }
447 _ => {
448 conditions.push(format!("{} = ${}", col, idx));
449 bind_values.push(filter_value.clone());
450 }
451 }
452 }
453
454 let where_clause = if conditions.is_empty() {
455 String::new()
456 } else {
457 format!(" WHERE {}", conditions.join(" AND "))
458 };
459
460 let order_clause = if let Some(sort_by) = ¶ms.sort_by {
462 let col = qi(match sort_by.as_str() {
463 "name" => U::col_name(),
464 "createdAt" | "created_at" => U::col_created_at(),
465 _ => U::col_email(),
466 });
467 let dir = if params.sort_direction.as_deref() == Some("desc") {
468 "DESC"
469 } else {
470 "ASC"
471 };
472 format!(" ORDER BY {} {}", col, dir)
473 } else {
474 format!(" ORDER BY {} DESC", qi(U::col_created_at()))
475 };
476
477 let count_sql = format!(
479 "SELECT COUNT(*) as count FROM {}{}",
480 qi(U::table()),
481 where_clause
482 );
483 let mut count_query = sqlx::query_scalar::<_, i64>(&count_sql);
484 for v in &bind_values {
485 count_query = count_query.bind(v);
486 }
487 let total = count_query.fetch_one(&self.pool).await? as usize;
488
489 let limit_idx = bind_values.len() + 1;
491 let offset_idx = bind_values.len() + 2;
492 let data_sql = format!(
493 "SELECT * FROM {}{}{} LIMIT ${} OFFSET ${}",
494 qi(U::table()),
495 where_clause,
496 order_clause,
497 limit_idx,
498 offset_idx
499 );
500 let mut data_query = sqlx::query_as::<_, U>(&data_sql);
501 for v in &bind_values {
502 data_query = data_query.bind(v);
503 }
504 data_query = data_query.bind(limit).bind(offset);
505 let users = data_query.fetch_all(&self.pool).await?;
506
507 Ok((users, total))
508 }
509 }
510
511 #[async_trait]
514 impl<U, S, A, O, M, I, V, TF, AK, PK> SessionOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
515 where
516 U: AuthUser + AuthUserMeta + SqlxEntity,
517 S: AuthSession + AuthSessionMeta + SqlxEntity,
518 A: AuthAccount + AuthAccountMeta + SqlxEntity,
519 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
520 M: AuthMember + AuthMemberMeta + SqlxEntity,
521 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
522 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
523 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
524 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
525 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
526 {
527 type Session = S;
528
529 async fn create_session(&self, create_session: CreateSession) -> AuthResult<S> {
530 let id = Uuid::new_v4().to_string();
531 let token = format!("session_{}", Uuid::new_v4());
532 let now = Utc::now();
533
534 let sql = format!(
535 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *",
536 qi(S::table()),
537 qi(S::col_id()),
538 qi(S::col_user_id()),
539 qi(S::col_token()),
540 qi(S::col_expires_at()),
541 qi(S::col_created_at()),
542 qi(S::col_ip_address()),
543 qi(S::col_user_agent()),
544 qi(S::col_impersonated_by()),
545 qi(S::col_active_organization_id()),
546 qi(S::col_active()),
547 );
548 let session = sqlx::query_as::<_, S>(&sql)
549 .bind(&id)
550 .bind(&create_session.user_id)
551 .bind(&token)
552 .bind(create_session.expires_at)
553 .bind(now)
554 .bind(&create_session.ip_address)
555 .bind(&create_session.user_agent)
556 .bind(&create_session.impersonated_by)
557 .bind(&create_session.active_organization_id)
558 .bind(true)
559 .fetch_one(&self.pool)
560 .await?;
561
562 Ok(session)
563 }
564
565 async fn get_session(&self, token: &str) -> AuthResult<Option<S>> {
566 let sql = format!(
567 "SELECT * FROM {} WHERE {} = $1 AND {} = true",
568 qi(S::table()),
569 qi(S::col_token()),
570 qi(S::col_active())
571 );
572 let session = sqlx::query_as::<_, S>(&sql)
573 .bind(token)
574 .fetch_optional(&self.pool)
575 .await?;
576 Ok(session)
577 }
578
579 async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<S>> {
580 let sql = format!(
581 "SELECT * FROM {} WHERE {} = $1 AND {} = true ORDER BY {} DESC",
582 qi(S::table()),
583 qi(S::col_user_id()),
584 qi(S::col_active()),
585 qi(S::col_created_at())
586 );
587 let sessions = sqlx::query_as::<_, S>(&sql)
588 .bind(user_id)
589 .fetch_all(&self.pool)
590 .await?;
591 Ok(sessions)
592 }
593
594 async fn update_session_expiry(
595 &self,
596 token: &str,
597 expires_at: DateTime<Utc>,
598 ) -> AuthResult<()> {
599 let sql = format!(
600 "UPDATE {} SET {} = $1 WHERE {} = $2 AND {} = true",
601 qi(S::table()),
602 qi(S::col_expires_at()),
603 qi(S::col_token()),
604 qi(S::col_active())
605 );
606 sqlx::query(&sql)
607 .bind(expires_at)
608 .bind(token)
609 .execute(&self.pool)
610 .await?;
611 Ok(())
612 }
613
614 async fn delete_session(&self, token: &str) -> AuthResult<()> {
615 let sql = format!(
616 "DELETE FROM {} WHERE {} = $1",
617 qi(S::table()),
618 qi(S::col_token())
619 );
620 sqlx::query(&sql).bind(token).execute(&self.pool).await?;
621 Ok(())
622 }
623
624 async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
625 let sql = format!(
626 "DELETE FROM {} WHERE {} = $1",
627 qi(S::table()),
628 qi(S::col_user_id())
629 );
630 sqlx::query(&sql).bind(user_id).execute(&self.pool).await?;
631 Ok(())
632 }
633
634 async fn delete_expired_sessions(&self) -> AuthResult<usize> {
635 let sql = format!(
636 "DELETE FROM {} WHERE {} < NOW() OR {} = false",
637 qi(S::table()),
638 qi(S::col_expires_at()),
639 qi(S::col_active())
640 );
641 let result = sqlx::query(&sql).execute(&self.pool).await?;
642 Ok(result.rows_affected() as usize)
643 }
644
645 async fn update_session_active_organization(
646 &self,
647 token: &str,
648 organization_id: Option<&str>,
649 ) -> AuthResult<S> {
650 let sql = format!(
651 "UPDATE {} SET {} = $1, {} = NOW() WHERE {} = $2 AND {} = true RETURNING *",
652 qi(S::table()),
653 qi(S::col_active_organization_id()),
654 qi(S::col_updated_at()),
655 qi(S::col_token()),
656 qi(S::col_active())
657 );
658 let session = sqlx::query_as::<_, S>(&sql)
659 .bind(organization_id)
660 .bind(token)
661 .fetch_one(&self.pool)
662 .await?;
663 Ok(session)
664 }
665 }
666
667 #[async_trait]
670 impl<U, S, A, O, M, I, V, TF, AK, PK> AccountOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
671 where
672 U: AuthUser + AuthUserMeta + SqlxEntity,
673 S: AuthSession + AuthSessionMeta + SqlxEntity,
674 A: AuthAccount + AuthAccountMeta + SqlxEntity,
675 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
676 M: AuthMember + AuthMemberMeta + SqlxEntity,
677 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
678 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
679 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
680 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
681 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
682 {
683 type Account = A;
684
685 async fn create_account(&self, create_account: CreateAccount) -> AuthResult<A> {
686 let id = Uuid::new_v4().to_string();
687 let now = Utc::now();
688
689 let sql = format!(
690 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
691 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING *",
692 qi(A::table()),
693 qi(A::col_id()),
694 qi(A::col_account_id()),
695 qi(A::col_provider_id()),
696 qi(A::col_user_id()),
697 qi(A::col_access_token()),
698 qi(A::col_refresh_token()),
699 qi(A::col_id_token()),
700 qi(A::col_access_token_expires_at()),
701 qi(A::col_refresh_token_expires_at()),
702 qi(A::col_scope()),
703 qi(A::col_password()),
704 qi(A::col_created_at()),
705 qi(A::col_updated_at()),
706 );
707 let account = sqlx::query_as::<_, A>(&sql)
708 .bind(&id)
709 .bind(&create_account.account_id)
710 .bind(&create_account.provider_id)
711 .bind(&create_account.user_id)
712 .bind(&create_account.access_token)
713 .bind(&create_account.refresh_token)
714 .bind(&create_account.id_token)
715 .bind(create_account.access_token_expires_at)
716 .bind(create_account.refresh_token_expires_at)
717 .bind(&create_account.scope)
718 .bind(&create_account.password)
719 .bind(now)
720 .bind(now)
721 .fetch_one(&self.pool)
722 .await?;
723
724 Ok(account)
725 }
726
727 async fn get_account(
728 &self,
729 provider: &str,
730 provider_account_id: &str,
731 ) -> AuthResult<Option<A>> {
732 let sql = format!(
733 "SELECT * FROM {} WHERE {} = $1 AND {} = $2",
734 qi(A::table()),
735 qi(A::col_provider_id()),
736 qi(A::col_account_id())
737 );
738 let account = sqlx::query_as::<_, A>(&sql)
739 .bind(provider)
740 .bind(provider_account_id)
741 .fetch_optional(&self.pool)
742 .await?;
743 Ok(account)
744 }
745
746 async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<A>> {
747 let sql = format!(
748 "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
749 qi(A::table()),
750 qi(A::col_user_id()),
751 qi(A::col_created_at())
752 );
753 let accounts = sqlx::query_as::<_, A>(&sql)
754 .bind(user_id)
755 .fetch_all(&self.pool)
756 .await?;
757 Ok(accounts)
758 }
759
760 async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<A> {
761 let mut query = sqlx::QueryBuilder::new(format!(
762 "UPDATE {} SET {} = NOW()",
763 qi(A::table()),
764 qi(A::col_updated_at())
765 ));
766
767 if let Some(access_token) = &update.access_token {
768 query.push(format!(", {} = ", qi(A::col_access_token())));
769 query.push_bind(access_token);
770 }
771 if let Some(refresh_token) = &update.refresh_token {
772 query.push(format!(", {} = ", qi(A::col_refresh_token())));
773 query.push_bind(refresh_token);
774 }
775 if let Some(id_token) = &update.id_token {
776 query.push(format!(", {} = ", qi(A::col_id_token())));
777 query.push_bind(id_token);
778 }
779 if let Some(access_token_expires_at) = &update.access_token_expires_at {
780 query.push(format!(", {} = ", qi(A::col_access_token_expires_at())));
781 query.push_bind(access_token_expires_at);
782 }
783 if let Some(refresh_token_expires_at) = &update.refresh_token_expires_at {
784 query.push(format!(", {} = ", qi(A::col_refresh_token_expires_at())));
785 query.push_bind(refresh_token_expires_at);
786 }
787 if let Some(scope) = &update.scope {
788 query.push(format!(", {} = ", qi(A::col_scope())));
789 query.push_bind(scope);
790 }
791 if let Some(password) = &update.password {
792 query.push(format!(", {} = ", qi(A::col_password())));
793 query.push_bind(password);
794 }
795
796 query.push(format!(" WHERE {} = ", qi(A::col_id())));
797 query.push_bind(id);
798 query.push(" RETURNING *");
799
800 let account = query.build_query_as::<A>().fetch_one(&self.pool).await?;
801 Ok(account)
802 }
803
804 async fn delete_account(&self, id: &str) -> AuthResult<()> {
805 let sql = format!(
806 "DELETE FROM {} WHERE {} = $1",
807 qi(A::table()),
808 qi(A::col_id())
809 );
810 sqlx::query(&sql).bind(id).execute(&self.pool).await?;
811 Ok(())
812 }
813 }
814
815 #[async_trait]
818 impl<U, S, A, O, M, I, V, TF, AK, PK> VerificationOps
819 for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
820 where
821 U: AuthUser + AuthUserMeta + SqlxEntity,
822 S: AuthSession + AuthSessionMeta + SqlxEntity,
823 A: AuthAccount + AuthAccountMeta + SqlxEntity,
824 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
825 M: AuthMember + AuthMemberMeta + SqlxEntity,
826 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
827 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
828 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
829 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
830 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
831 {
832 type Verification = V;
833
834 async fn create_verification(
835 &self,
836 create_verification: CreateVerification,
837 ) -> AuthResult<V> {
838 let id = Uuid::new_v4().to_string();
839 let now = Utc::now();
840
841 let sql = format!(
842 "INSERT INTO {} ({}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *",
843 qi(V::table()),
844 qi(V::col_id()),
845 qi(V::col_identifier()),
846 qi(V::col_value()),
847 qi(V::col_expires_at()),
848 qi(V::col_created_at()),
849 qi(V::col_updated_at()),
850 );
851 let verification = sqlx::query_as::<_, V>(&sql)
852 .bind(&id)
853 .bind(&create_verification.identifier)
854 .bind(&create_verification.value)
855 .bind(create_verification.expires_at)
856 .bind(now)
857 .bind(now)
858 .fetch_one(&self.pool)
859 .await?;
860
861 Ok(verification)
862 }
863
864 async fn get_verification(&self, identifier: &str, value: &str) -> AuthResult<Option<V>> {
865 let sql = format!(
866 "SELECT * FROM {} WHERE {} = $1 AND {} = $2 AND {} > NOW()",
867 qi(V::table()),
868 qi(V::col_identifier()),
869 qi(V::col_value()),
870 qi(V::col_expires_at())
871 );
872 let verification = sqlx::query_as::<_, V>(&sql)
873 .bind(identifier)
874 .bind(value)
875 .fetch_optional(&self.pool)
876 .await?;
877 Ok(verification)
878 }
879
880 async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<V>> {
881 let sql = format!(
882 "SELECT * FROM {} WHERE {} = $1 AND {} > NOW()",
883 qi(V::table()),
884 qi(V::col_value()),
885 qi(V::col_expires_at())
886 );
887 let verification = sqlx::query_as::<_, V>(&sql)
888 .bind(value)
889 .fetch_optional(&self.pool)
890 .await?;
891 Ok(verification)
892 }
893
894 async fn get_verification_by_identifier(&self, identifier: &str) -> AuthResult<Option<V>> {
895 let sql = format!(
896 "SELECT * FROM {} WHERE {} = $1 AND {} > NOW()",
897 qi(V::table()),
898 qi(V::col_identifier()),
899 qi(V::col_expires_at())
900 );
901 let verification = sqlx::query_as::<_, V>(&sql)
902 .bind(identifier)
903 .fetch_optional(&self.pool)
904 .await?;
905 Ok(verification)
906 }
907
908 async fn consume_verification(
909 &self,
910 identifier: &str,
911 value: &str,
912 ) -> AuthResult<Option<V>> {
913 let sql = format!(
914 "DELETE FROM {tbl} WHERE {id} IN (\
915 SELECT {id} FROM {tbl} \
916 WHERE {ident} = $1 AND {val} = $2 AND {exp} > NOW() \
917 ORDER BY {ca} DESC \
918 LIMIT 1\
919 ) RETURNING *",
920 tbl = qi(V::table()),
921 id = qi(V::col_id()),
922 ident = qi(V::col_identifier()),
923 val = qi(V::col_value()),
924 exp = qi(V::col_expires_at()),
925 ca = qi(V::col_created_at()),
926 );
927 let verification = sqlx::query_as::<_, V>(&sql)
928 .bind(identifier)
929 .bind(value)
930 .fetch_optional(&self.pool)
931 .await?;
932 Ok(verification)
933 }
934
935 async fn delete_verification(&self, id: &str) -> AuthResult<()> {
936 let sql = format!(
937 "DELETE FROM {} WHERE {} = $1",
938 qi(V::table()),
939 qi(V::col_id())
940 );
941 sqlx::query(&sql).bind(id).execute(&self.pool).await?;
942 Ok(())
943 }
944
945 async fn delete_expired_verifications(&self) -> AuthResult<usize> {
946 let sql = format!(
947 "DELETE FROM {} WHERE {} < NOW()",
948 qi(V::table()),
949 qi(V::col_expires_at())
950 );
951 let result = sqlx::query(&sql).execute(&self.pool).await?;
952 Ok(result.rows_affected() as usize)
953 }
954 }
955
956 #[async_trait]
959 impl<U, S, A, O, M, I, V, TF, AK, PK> OrganizationOps
960 for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
961 where
962 U: AuthUser + AuthUserMeta + SqlxEntity,
963 S: AuthSession + AuthSessionMeta + SqlxEntity,
964 A: AuthAccount + AuthAccountMeta + SqlxEntity,
965 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
966 M: AuthMember + AuthMemberMeta + SqlxEntity,
967 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
968 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
969 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
970 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
971 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
972 {
973 type Organization = O;
974
975 async fn create_organization(&self, create_org: CreateOrganization) -> AuthResult<O> {
976 let id = create_org.id.unwrap_or_else(|| Uuid::new_v4().to_string());
977 let now = Utc::now();
978
979 let sql = format!(
980 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *",
981 qi(O::table()),
982 qi(O::col_id()),
983 qi(O::col_name()),
984 qi(O::col_slug()),
985 qi(O::col_logo()),
986 qi(O::col_metadata()),
987 qi(O::col_created_at()),
988 qi(O::col_updated_at()),
989 );
990 let organization = sqlx::query_as::<_, O>(&sql)
991 .bind(&id)
992 .bind(&create_org.name)
993 .bind(&create_org.slug)
994 .bind(&create_org.logo)
995 .bind(sqlx::types::Json(
996 create_org.metadata.unwrap_or(serde_json::json!({})),
997 ))
998 .bind(now)
999 .bind(now)
1000 .fetch_one(&self.pool)
1001 .await?;
1002
1003 Ok(organization)
1004 }
1005
1006 async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<O>> {
1007 let sql = format!(
1008 "SELECT * FROM {} WHERE {} = $1",
1009 qi(O::table()),
1010 qi(O::col_id())
1011 );
1012 let organization = sqlx::query_as::<_, O>(&sql)
1013 .bind(id)
1014 .fetch_optional(&self.pool)
1015 .await?;
1016 Ok(organization)
1017 }
1018
1019 async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<O>> {
1020 let sql = format!(
1021 "SELECT * FROM {} WHERE {} = $1",
1022 qi(O::table()),
1023 qi(O::col_slug())
1024 );
1025 let organization = sqlx::query_as::<_, O>(&sql)
1026 .bind(slug)
1027 .fetch_optional(&self.pool)
1028 .await?;
1029 Ok(organization)
1030 }
1031
1032 async fn update_organization(&self, id: &str, update: UpdateOrganization) -> AuthResult<O> {
1033 let mut query = sqlx::QueryBuilder::new(format!(
1034 "UPDATE {} SET {} = NOW()",
1035 qi(O::table()),
1036 qi(O::col_updated_at())
1037 ));
1038
1039 if let Some(name) = &update.name {
1040 query.push(format!(", {} = ", qi(O::col_name())));
1041 query.push_bind(name);
1042 }
1043 if let Some(slug) = &update.slug {
1044 query.push(format!(", {} = ", qi(O::col_slug())));
1045 query.push_bind(slug);
1046 }
1047 if let Some(logo) = &update.logo {
1048 query.push(format!(", {} = ", qi(O::col_logo())));
1049 query.push_bind(logo);
1050 }
1051 if let Some(metadata) = &update.metadata {
1052 query.push(format!(", {} = ", qi(O::col_metadata())));
1053 query.push_bind(sqlx::types::Json(metadata.clone()));
1054 }
1055
1056 query.push(format!(" WHERE {} = ", qi(O::col_id())));
1057 query.push_bind(id);
1058 query.push(" RETURNING *");
1059
1060 let organization = query.build_query_as::<O>().fetch_one(&self.pool).await?;
1061 Ok(organization)
1062 }
1063
1064 async fn delete_organization(&self, id: &str) -> AuthResult<()> {
1065 let sql = format!(
1066 "DELETE FROM {} WHERE {} = $1",
1067 qi(O::table()),
1068 qi(O::col_id())
1069 );
1070 sqlx::query(&sql).bind(id).execute(&self.pool).await?;
1071 Ok(())
1072 }
1073
1074 async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<O>> {
1075 let sql = format!(
1076 "SELECT o.* FROM {} o INNER JOIN {} m ON o.{} = m.{} WHERE m.{} = $1 ORDER BY o.{} DESC",
1077 qi(O::table()),
1078 qi(M::table()),
1079 qi(O::col_id()),
1080 qi(M::col_organization_id()),
1081 qi(M::col_user_id()),
1082 qi(O::col_created_at()),
1083 );
1084 let organizations = sqlx::query_as::<_, O>(&sql)
1085 .bind(user_id)
1086 .fetch_all(&self.pool)
1087 .await?;
1088 Ok(organizations)
1089 }
1090 }
1091
1092 #[async_trait]
1095 impl<U, S, A, O, M, I, V, TF, AK, PK> MemberOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1096 where
1097 U: AuthUser + AuthUserMeta + SqlxEntity,
1098 S: AuthSession + AuthSessionMeta + SqlxEntity,
1099 A: AuthAccount + AuthAccountMeta + SqlxEntity,
1100 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1101 M: AuthMember + AuthMemberMeta + SqlxEntity,
1102 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1103 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1104 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1105 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1106 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1107 {
1108 type Member = M;
1109
1110 async fn create_member(&self, create_member: CreateMember) -> AuthResult<M> {
1111 let id = Uuid::new_v4().to_string();
1112 let now = Utc::now();
1113
1114 let sql = format!(
1115 "INSERT INTO {} ({}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5) RETURNING *",
1116 qi(M::table()),
1117 qi(M::col_id()),
1118 qi(M::col_organization_id()),
1119 qi(M::col_user_id()),
1120 qi(M::col_role()),
1121 qi(M::col_created_at()),
1122 );
1123 let member = sqlx::query_as::<_, M>(&sql)
1124 .bind(&id)
1125 .bind(&create_member.organization_id)
1126 .bind(&create_member.user_id)
1127 .bind(&create_member.role)
1128 .bind(now)
1129 .fetch_one(&self.pool)
1130 .await?;
1131
1132 Ok(member)
1133 }
1134
1135 async fn get_member(&self, organization_id: &str, user_id: &str) -> AuthResult<Option<M>> {
1136 let sql = format!(
1137 "SELECT * FROM {} WHERE {} = $1 AND {} = $2",
1138 qi(M::table()),
1139 qi(M::col_organization_id()),
1140 qi(M::col_user_id())
1141 );
1142 let member = sqlx::query_as::<_, M>(&sql)
1143 .bind(organization_id)
1144 .bind(user_id)
1145 .fetch_optional(&self.pool)
1146 .await?;
1147 Ok(member)
1148 }
1149
1150 async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<M>> {
1151 let sql = format!(
1152 "SELECT * FROM {} WHERE {} = $1",
1153 qi(M::table()),
1154 qi(M::col_id())
1155 );
1156 let member = sqlx::query_as::<_, M>(&sql)
1157 .bind(id)
1158 .fetch_optional(&self.pool)
1159 .await?;
1160 Ok(member)
1161 }
1162
1163 async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<M> {
1164 let sql = format!(
1165 "UPDATE {} SET {} = $1 WHERE {} = $2 RETURNING *",
1166 qi(M::table()),
1167 qi(M::col_role()),
1168 qi(M::col_id())
1169 );
1170 let member = sqlx::query_as::<_, M>(&sql)
1171 .bind(role)
1172 .bind(member_id)
1173 .fetch_one(&self.pool)
1174 .await?;
1175 Ok(member)
1176 }
1177
1178 async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
1179 let sql = format!(
1180 "DELETE FROM {} WHERE {} = $1",
1181 qi(M::table()),
1182 qi(M::col_id())
1183 );
1184 sqlx::query(&sql)
1185 .bind(member_id)
1186 .execute(&self.pool)
1187 .await?;
1188 Ok(())
1189 }
1190
1191 async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<M>> {
1192 let sql = format!(
1193 "SELECT * FROM {} WHERE {} = $1 ORDER BY {} ASC",
1194 qi(M::table()),
1195 qi(M::col_organization_id()),
1196 qi(M::col_created_at())
1197 );
1198 let members = sqlx::query_as::<_, M>(&sql)
1199 .bind(organization_id)
1200 .fetch_all(&self.pool)
1201 .await?;
1202 Ok(members)
1203 }
1204
1205 async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
1206 let sql = format!(
1207 "SELECT COUNT(*) FROM {} WHERE {} = $1",
1208 qi(M::table()),
1209 qi(M::col_organization_id())
1210 );
1211 let count: (i64,) = sqlx::query_as(&sql)
1212 .bind(organization_id)
1213 .fetch_one(&self.pool)
1214 .await?;
1215 Ok(count.0 as usize)
1216 }
1217
1218 async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
1219 let sql = format!(
1220 "SELECT COUNT(*) FROM {} WHERE {} = $1 AND {} = 'owner'",
1221 qi(M::table()),
1222 qi(M::col_organization_id()),
1223 qi(M::col_role())
1224 );
1225 let count: (i64,) = sqlx::query_as(&sql)
1226 .bind(organization_id)
1227 .fetch_one(&self.pool)
1228 .await?;
1229 Ok(count.0 as usize)
1230 }
1231 }
1232
1233 #[async_trait]
1236 impl<U, S, A, O, M, I, V, TF, AK, PK> InvitationOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1237 where
1238 U: AuthUser + AuthUserMeta + SqlxEntity,
1239 S: AuthSession + AuthSessionMeta + SqlxEntity,
1240 A: AuthAccount + AuthAccountMeta + SqlxEntity,
1241 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1242 M: AuthMember + AuthMemberMeta + SqlxEntity,
1243 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1244 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1245 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1246 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1247 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1248 {
1249 type Invitation = I;
1250
1251 async fn create_invitation(&self, create_inv: CreateInvitation) -> AuthResult<I> {
1252 let id = Uuid::new_v4().to_string();
1253 let now = Utc::now();
1254
1255 let sql = format!(
1256 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}) \
1257 VALUES ($1, $2, $3, $4, 'pending', $5, $6, $7) RETURNING *",
1258 qi(I::table()),
1259 qi(I::col_id()),
1260 qi(I::col_organization_id()),
1261 qi(I::col_email()),
1262 qi(I::col_role()),
1263 qi(I::col_status()),
1264 qi(I::col_inviter_id()),
1265 qi(I::col_expires_at()),
1266 qi(I::col_created_at()),
1267 );
1268 let invitation = sqlx::query_as::<_, I>(&sql)
1269 .bind(&id)
1270 .bind(&create_inv.organization_id)
1271 .bind(&create_inv.email)
1272 .bind(&create_inv.role)
1273 .bind(&create_inv.inviter_id)
1274 .bind(create_inv.expires_at)
1275 .bind(now)
1276 .fetch_one(&self.pool)
1277 .await?;
1278
1279 Ok(invitation)
1280 }
1281
1282 async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<I>> {
1283 let sql = format!(
1284 "SELECT * FROM {} WHERE {} = $1",
1285 qi(I::table()),
1286 qi(I::col_id())
1287 );
1288 let invitation = sqlx::query_as::<_, I>(&sql)
1289 .bind(id)
1290 .fetch_optional(&self.pool)
1291 .await?;
1292 Ok(invitation)
1293 }
1294
1295 async fn get_pending_invitation(
1296 &self,
1297 organization_id: &str,
1298 email: &str,
1299 ) -> AuthResult<Option<I>> {
1300 let sql = format!(
1301 "SELECT * FROM {} WHERE {} = $1 AND LOWER({}) = LOWER($2) AND {} = 'pending'",
1302 qi(I::table()),
1303 qi(I::col_organization_id()),
1304 qi(I::col_email()),
1305 qi(I::col_status())
1306 );
1307 let invitation = sqlx::query_as::<_, I>(&sql)
1308 .bind(organization_id)
1309 .bind(email)
1310 .fetch_optional(&self.pool)
1311 .await?;
1312 Ok(invitation)
1313 }
1314
1315 async fn update_invitation_status(
1316 &self,
1317 id: &str,
1318 status: InvitationStatus,
1319 ) -> AuthResult<I> {
1320 let sql = format!(
1321 "UPDATE {} SET {} = $1 WHERE {} = $2 RETURNING *",
1322 qi(I::table()),
1323 qi(I::col_status()),
1324 qi(I::col_id())
1325 );
1326 let invitation = sqlx::query_as::<_, I>(&sql)
1327 .bind(status.to_string())
1328 .bind(id)
1329 .fetch_one(&self.pool)
1330 .await?;
1331 Ok(invitation)
1332 }
1333
1334 async fn list_organization_invitations(&self, organization_id: &str) -> AuthResult<Vec<I>> {
1335 let sql = format!(
1336 "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
1337 qi(I::table()),
1338 qi(I::col_organization_id()),
1339 qi(I::col_created_at())
1340 );
1341 let invitations = sqlx::query_as::<_, I>(&sql)
1342 .bind(organization_id)
1343 .fetch_all(&self.pool)
1344 .await?;
1345 Ok(invitations)
1346 }
1347
1348 async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<I>> {
1349 let sql = format!(
1350 "SELECT * FROM {} WHERE LOWER({}) = LOWER($1) AND {} = 'pending' AND {} > NOW() ORDER BY {} DESC",
1351 qi(I::table()),
1352 qi(I::col_email()),
1353 qi(I::col_status()),
1354 qi(I::col_expires_at()),
1355 qi(I::col_created_at())
1356 );
1357 let invitations = sqlx::query_as::<_, I>(&sql)
1358 .bind(email)
1359 .fetch_all(&self.pool)
1360 .await?;
1361 Ok(invitations)
1362 }
1363 }
1364
1365 #[async_trait]
1368 impl<U, S, A, O, M, I, V, TF, AK, PK> TwoFactorOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1369 where
1370 U: AuthUser + AuthUserMeta + SqlxEntity,
1371 S: AuthSession + AuthSessionMeta + SqlxEntity,
1372 A: AuthAccount + AuthAccountMeta + SqlxEntity,
1373 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1374 M: AuthMember + AuthMemberMeta + SqlxEntity,
1375 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1376 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1377 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1378 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1379 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1380 {
1381 type TwoFactor = TF;
1382
1383 async fn create_two_factor(&self, create: CreateTwoFactor) -> AuthResult<TF> {
1384 let id = Uuid::new_v4().to_string();
1385 let now = Utc::now();
1386
1387 let sql = format!(
1388 "INSERT INTO {} ({}, {}, {}, {}, {}, {}) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *",
1389 qi(TF::table()),
1390 qi(TF::col_id()),
1391 qi(TF::col_secret()),
1392 qi(TF::col_backup_codes()),
1393 qi(TF::col_user_id()),
1394 qi(TF::col_created_at()),
1395 qi(TF::col_updated_at()),
1396 );
1397 let two_factor = sqlx::query_as::<_, TF>(&sql)
1398 .bind(&id)
1399 .bind(&create.secret)
1400 .bind(&create.backup_codes)
1401 .bind(&create.user_id)
1402 .bind(now)
1403 .bind(now)
1404 .fetch_one(&self.pool)
1405 .await?;
1406
1407 Ok(two_factor)
1408 }
1409
1410 async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TF>> {
1411 let sql = format!(
1412 "SELECT * FROM {} WHERE {} = $1",
1413 qi(TF::table()),
1414 qi(TF::col_user_id())
1415 );
1416 let two_factor = sqlx::query_as::<_, TF>(&sql)
1417 .bind(user_id)
1418 .fetch_optional(&self.pool)
1419 .await?;
1420 Ok(two_factor)
1421 }
1422
1423 async fn update_two_factor_backup_codes(
1424 &self,
1425 user_id: &str,
1426 backup_codes: &str,
1427 ) -> AuthResult<TF> {
1428 let sql = format!(
1429 "UPDATE {} SET {} = $1, {} = NOW() WHERE {} = $2 RETURNING *",
1430 qi(TF::table()),
1431 qi(TF::col_backup_codes()),
1432 qi(TF::col_updated_at()),
1433 qi(TF::col_user_id())
1434 );
1435 let two_factor = sqlx::query_as::<_, TF>(&sql)
1436 .bind(backup_codes)
1437 .bind(user_id)
1438 .fetch_one(&self.pool)
1439 .await?;
1440 Ok(two_factor)
1441 }
1442
1443 async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
1444 let sql = format!(
1445 "DELETE FROM {} WHERE {} = $1",
1446 qi(TF::table()),
1447 qi(TF::col_user_id())
1448 );
1449 sqlx::query(&sql).bind(user_id).execute(&self.pool).await?;
1450 Ok(())
1451 }
1452 }
1453
1454 #[async_trait]
1457 impl<U, S, A, O, M, I, V, TF, AK, PK> ApiKeyOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1458 where
1459 U: AuthUser + AuthUserMeta + SqlxEntity,
1460 S: AuthSession + AuthSessionMeta + SqlxEntity,
1461 A: AuthAccount + AuthAccountMeta + SqlxEntity,
1462 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1463 M: AuthMember + AuthMemberMeta + SqlxEntity,
1464 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1465 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1466 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1467 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1468 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1469 {
1470 type ApiKey = AK;
1471
1472 async fn create_api_key(&self, input: CreateApiKey) -> AuthResult<AK> {
1473 let id = Uuid::new_v4().to_string();
1474 let now = Utc::now();
1475
1476 let sql = format!(
1477 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
1478 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14::timestamptz, $15, $16, $17, $18) RETURNING *",
1479 qi(AK::table()),
1480 qi(AK::col_id()),
1481 qi(AK::col_name()),
1482 qi(AK::col_start()),
1483 qi(AK::col_prefix()),
1484 qi(AK::col_key_hash()),
1485 qi(AK::col_user_id()),
1486 qi(AK::col_refill_interval()),
1487 qi(AK::col_refill_amount()),
1488 qi(AK::col_enabled()),
1489 qi(AK::col_rate_limit_enabled()),
1490 qi(AK::col_rate_limit_time_window()),
1491 qi(AK::col_rate_limit_max()),
1492 qi(AK::col_remaining()),
1493 qi(AK::col_expires_at()),
1494 qi(AK::col_created_at()),
1495 qi(AK::col_updated_at()),
1496 qi(AK::col_permissions()),
1497 qi(AK::col_metadata()),
1498 );
1499 let api_key = sqlx::query_as::<_, AK>(&sql)
1500 .bind(&id)
1501 .bind(&input.name)
1502 .bind(&input.start)
1503 .bind(&input.prefix)
1504 .bind(&input.key_hash)
1505 .bind(&input.user_id)
1506 .bind(input.refill_interval)
1507 .bind(input.refill_amount)
1508 .bind(input.enabled)
1509 .bind(input.rate_limit_enabled)
1510 .bind(input.rate_limit_time_window)
1511 .bind(input.rate_limit_max)
1512 .bind(input.remaining)
1513 .bind(&input.expires_at)
1514 .bind(now)
1515 .bind(now)
1516 .bind(&input.permissions)
1517 .bind(&input.metadata)
1518 .fetch_one(&self.pool)
1519 .await?;
1520
1521 Ok(api_key)
1522 }
1523
1524 async fn get_api_key_by_id(&self, id: &str) -> AuthResult<Option<AK>> {
1525 let sql = format!(
1526 "SELECT * FROM {} WHERE {} = $1",
1527 qi(AK::table()),
1528 qi(AK::col_id())
1529 );
1530 let api_key = sqlx::query_as::<_, AK>(&sql)
1531 .bind(id)
1532 .fetch_optional(&self.pool)
1533 .await?;
1534 Ok(api_key)
1535 }
1536
1537 async fn get_api_key_by_hash(&self, hash: &str) -> AuthResult<Option<AK>> {
1538 let sql = format!(
1539 "SELECT * FROM {} WHERE {} = $1",
1540 qi(AK::table()),
1541 qi(AK::col_key_hash())
1542 );
1543 let api_key = sqlx::query_as::<_, AK>(&sql)
1544 .bind(hash)
1545 .fetch_optional(&self.pool)
1546 .await?;
1547 Ok(api_key)
1548 }
1549
1550 async fn list_api_keys_by_user(&self, user_id: &str) -> AuthResult<Vec<AK>> {
1551 let sql = format!(
1552 "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
1553 qi(AK::table()),
1554 qi(AK::col_user_id()),
1555 qi(AK::col_created_at())
1556 );
1557 let keys = sqlx::query_as::<_, AK>(&sql)
1558 .bind(user_id)
1559 .fetch_all(&self.pool)
1560 .await?;
1561 Ok(keys)
1562 }
1563
1564 async fn update_api_key(&self, id: &str, update: UpdateApiKey) -> AuthResult<AK> {
1565 let mut query = sqlx::QueryBuilder::new(format!(
1566 "UPDATE {} SET {} = NOW()",
1567 qi(AK::table()),
1568 qi(AK::col_updated_at())
1569 ));
1570
1571 if let Some(name) = &update.name {
1572 query.push(format!(", {} = ", qi(AK::col_name())));
1573 query.push_bind(name);
1574 }
1575 if let Some(enabled) = update.enabled {
1576 query.push(format!(", {} = ", qi(AK::col_enabled())));
1577 query.push_bind(enabled);
1578 }
1579 if let Some(remaining) = update.remaining {
1580 query.push(format!(", {} = ", qi(AK::col_remaining())));
1581 query.push_bind(remaining);
1582 }
1583 if let Some(rate_limit_enabled) = update.rate_limit_enabled {
1584 query.push(format!(", {} = ", qi(AK::col_rate_limit_enabled())));
1585 query.push_bind(rate_limit_enabled);
1586 }
1587 if let Some(rate_limit_time_window) = update.rate_limit_time_window {
1588 query.push(format!(", {} = ", qi(AK::col_rate_limit_time_window())));
1589 query.push_bind(rate_limit_time_window);
1590 }
1591 if let Some(rate_limit_max) = update.rate_limit_max {
1592 query.push(format!(", {} = ", qi(AK::col_rate_limit_max())));
1593 query.push_bind(rate_limit_max);
1594 }
1595 if let Some(refill_interval) = update.refill_interval {
1596 query.push(format!(", {} = ", qi(AK::col_refill_interval())));
1597 query.push_bind(refill_interval);
1598 }
1599 if let Some(refill_amount) = update.refill_amount {
1600 query.push(format!(", {} = ", qi(AK::col_refill_amount())));
1601 query.push_bind(refill_amount);
1602 }
1603 if let Some(permissions) = &update.permissions {
1604 query.push(format!(", {} = ", qi(AK::col_permissions())));
1605 query.push_bind(permissions);
1606 }
1607 if let Some(metadata) = &update.metadata {
1608 query.push(format!(", {} = ", qi(AK::col_metadata())));
1609 query.push_bind(metadata);
1610 }
1611
1612 query.push(format!(" WHERE {} = ", qi(AK::col_id())));
1613 query.push_bind(id);
1614 query.push(" RETURNING *");
1615
1616 let api_key = query
1617 .build_query_as::<AK>()
1618 .fetch_one(&self.pool)
1619 .await
1620 .map_err(|err| match err {
1621 sqlx::Error::RowNotFound => AuthError::not_found("API key not found"),
1622 other => AuthError::from(other),
1623 })?;
1624 Ok(api_key)
1625 }
1626
1627 async fn delete_api_key(&self, id: &str) -> AuthResult<()> {
1628 let sql = format!(
1629 "DELETE FROM {} WHERE {} = $1",
1630 qi(AK::table()),
1631 qi(AK::col_id())
1632 );
1633 sqlx::query(&sql).bind(id).execute(&self.pool).await?;
1634 Ok(())
1635 }
1636 }
1637
1638 #[async_trait]
1641 impl<U, S, A, O, M, I, V, TF, AK, PK> PasskeyOps for SqlxAdapter<U, S, A, O, M, I, V, TF, AK, PK>
1642 where
1643 U: AuthUser + AuthUserMeta + SqlxEntity,
1644 S: AuthSession + AuthSessionMeta + SqlxEntity,
1645 A: AuthAccount + AuthAccountMeta + SqlxEntity,
1646 O: AuthOrganization + AuthOrganizationMeta + SqlxEntity,
1647 M: AuthMember + AuthMemberMeta + SqlxEntity,
1648 I: AuthInvitation + AuthInvitationMeta + SqlxEntity,
1649 V: AuthVerification + AuthVerificationMeta + SqlxEntity,
1650 TF: AuthTwoFactor + AuthTwoFactorMeta + SqlxEntity,
1651 AK: AuthApiKey + AuthApiKeyMeta + SqlxEntity,
1652 PK: AuthPasskey + AuthPasskeyMeta + SqlxEntity,
1653 {
1654 type Passkey = PK;
1655
1656 async fn create_passkey(&self, input: CreatePasskey) -> AuthResult<PK> {
1657 let id = Uuid::new_v4().to_string();
1658 let now = Utc::now();
1659 let counter = i64::try_from(input.counter)
1660 .map_err(|_| AuthError::bad_request("Passkey counter exceeds i64 range"))?;
1661
1662 let sql = format!(
1663 "INSERT INTO {} ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}) \
1664 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *",
1665 qi(PK::table()),
1666 qi(PK::col_id()),
1667 qi(PK::col_name()),
1668 qi(PK::col_public_key()),
1669 qi(PK::col_user_id()),
1670 qi(PK::col_credential_id()),
1671 qi(PK::col_counter()),
1672 qi(PK::col_device_type()),
1673 qi(PK::col_backed_up()),
1674 qi(PK::col_transports()),
1675 qi(PK::col_created_at()),
1676 );
1677 let passkey = sqlx::query_as::<_, PK>(&sql)
1678 .bind(&id)
1679 .bind(&input.name)
1680 .bind(&input.public_key)
1681 .bind(&input.user_id)
1682 .bind(&input.credential_id)
1683 .bind(counter)
1684 .bind(&input.device_type)
1685 .bind(input.backed_up)
1686 .bind(&input.transports)
1687 .bind(now)
1688 .fetch_one(&self.pool)
1689 .await
1690 .map_err(|e| match e {
1691 sqlx::Error::Database(ref db_err) if db_err.is_unique_violation() => {
1692 AuthError::conflict("A passkey with this credential ID already exists")
1693 }
1694 other => AuthError::from(other),
1695 })?;
1696
1697 Ok(passkey)
1698 }
1699
1700 async fn get_passkey_by_id(&self, id: &str) -> AuthResult<Option<PK>> {
1701 let sql = format!(
1702 "SELECT * FROM {} WHERE {} = $1",
1703 qi(PK::table()),
1704 qi(PK::col_id())
1705 );
1706 let passkey = sqlx::query_as::<_, PK>(&sql)
1707 .bind(id)
1708 .fetch_optional(&self.pool)
1709 .await?;
1710 Ok(passkey)
1711 }
1712
1713 async fn get_passkey_by_credential_id(
1714 &self,
1715 credential_id: &str,
1716 ) -> AuthResult<Option<PK>> {
1717 let sql = format!(
1718 "SELECT * FROM {} WHERE {} = $1",
1719 qi(PK::table()),
1720 qi(PK::col_credential_id())
1721 );
1722 let passkey = sqlx::query_as::<_, PK>(&sql)
1723 .bind(credential_id)
1724 .fetch_optional(&self.pool)
1725 .await?;
1726 Ok(passkey)
1727 }
1728
1729 async fn list_passkeys_by_user(&self, user_id: &str) -> AuthResult<Vec<PK>> {
1730 let sql = format!(
1731 "SELECT * FROM {} WHERE {} = $1 ORDER BY {} DESC",
1732 qi(PK::table()),
1733 qi(PK::col_user_id()),
1734 qi(PK::col_created_at())
1735 );
1736 let passkeys = sqlx::query_as::<_, PK>(&sql)
1737 .bind(user_id)
1738 .fetch_all(&self.pool)
1739 .await?;
1740 Ok(passkeys)
1741 }
1742
1743 async fn update_passkey_counter(&self, id: &str, counter: u64) -> AuthResult<PK> {
1744 let counter = i64::try_from(counter)
1745 .map_err(|_| AuthError::bad_request("Passkey counter exceeds i64 range"))?;
1746 let sql = format!(
1747 "UPDATE {} SET {} = $2 WHERE {} = $1 RETURNING *",
1748 qi(PK::table()),
1749 qi(PK::col_counter()),
1750 qi(PK::col_id())
1751 );
1752 let passkey = sqlx::query_as::<_, PK>(&sql)
1753 .bind(id)
1754 .bind(counter)
1755 .fetch_one(&self.pool)
1756 .await
1757 .map_err(|err| match err {
1758 sqlx::Error::RowNotFound => AuthError::not_found("Passkey not found"),
1759 other => AuthError::from(other),
1760 })?;
1761 Ok(passkey)
1762 }
1763
1764 async fn update_passkey_name(&self, id: &str, name: &str) -> AuthResult<PK> {
1765 let sql = format!(
1766 "UPDATE {} SET {} = $2 WHERE {} = $1 RETURNING *",
1767 qi(PK::table()),
1768 qi(PK::col_name()),
1769 qi(PK::col_id())
1770 );
1771 let passkey = sqlx::query_as::<_, PK>(&sql)
1772 .bind(id)
1773 .bind(name)
1774 .fetch_one(&self.pool)
1775 .await
1776 .map_err(|err| match err {
1777 sqlx::Error::RowNotFound => AuthError::not_found("Passkey not found"),
1778 other => AuthError::from(other),
1779 })?;
1780 Ok(passkey)
1781 }
1782
1783 async fn delete_passkey(&self, id: &str) -> AuthResult<()> {
1784 let sql = format!(
1785 "DELETE FROM {} WHERE {} = $1",
1786 qi(PK::table()),
1787 qi(PK::col_id())
1788 );
1789 sqlx::query(&sql).bind(id).execute(&self.pool).await?;
1790 Ok(())
1791 }
1792 }
1793}
1794
1795#[cfg(feature = "sqlx-postgres")]
1796pub use sqlx_adapter::{SqlxAdapter, SqlxEntity};