1pub use super::traits::{
2 AccountOps, InvitationOps, MemberOps, OrganizationOps, SessionOps, TwoFactorOps, UserOps,
3 VerificationOps,
4};
5
6pub trait DatabaseAdapter:
15 UserOps
16 + SessionOps
17 + AccountOps
18 + VerificationOps
19 + OrganizationOps
20 + MemberOps
21 + InvitationOps
22 + TwoFactorOps
23{
24}
25
26impl<T> DatabaseAdapter for T where
27 T: UserOps
28 + SessionOps
29 + AccountOps
30 + VerificationOps
31 + OrganizationOps
32 + MemberOps
33 + InvitationOps
34 + TwoFactorOps
35{
36}
37
38#[cfg(feature = "sqlx-postgres")]
39pub mod sqlx_adapter {
40 use super::*;
41 use async_trait::async_trait;
42 use chrono::{DateTime, Utc};
43
44 use crate::entity::{
45 AuthAccount, AuthInvitation, AuthMember, AuthOrganization, AuthSession, AuthTwoFactor,
46 AuthUser, AuthVerification,
47 };
48 use crate::error::{AuthError, AuthResult};
49 use crate::types::{
50 Account, CreateAccount, CreateInvitation, CreateMember, CreateOrganization, CreateSession,
51 CreateTwoFactor, CreateUser, CreateVerification, Invitation, InvitationStatus, Member,
52 Organization, Session, TwoFactor, UpdateAccount, UpdateOrganization, UpdateUser, User,
53 Verification,
54 };
55 use sqlx::PgPool;
56 use sqlx::postgres::PgRow;
57 use std::marker::PhantomData;
58 use uuid::Uuid;
59
60 pub trait SqlxEntity:
67 for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
68 {
69 }
70
71 impl<T> SqlxEntity for T where
72 T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
73 {
74 }
75
76 pub struct SqlxAdapter<
81 U = User,
82 S = Session,
83 A = Account,
84 O = Organization,
85 M = Member,
86 I = Invitation,
87 V = Verification,
88 TF = TwoFactor,
89 > {
90 pool: PgPool,
91 _phantom: PhantomData<(U, S, A, O, M, I, V, TF)>,
92 }
93
94 impl SqlxAdapter {
96 pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
97 let pool = PgPool::connect(database_url).await?;
98 Ok(Self {
99 pool,
100 _phantom: PhantomData,
101 })
102 }
103
104 pub async fn with_config(
105 database_url: &str,
106 config: PoolConfig,
107 ) -> Result<Self, sqlx::Error> {
108 let pool = sqlx::postgres::PgPoolOptions::new()
109 .max_connections(config.max_connections)
110 .min_connections(config.min_connections)
111 .acquire_timeout(config.acquire_timeout)
112 .idle_timeout(config.idle_timeout)
113 .max_lifetime(config.max_lifetime)
114 .connect(database_url)
115 .await?;
116 Ok(Self {
117 pool,
118 _phantom: PhantomData,
119 })
120 }
121 }
122
123 impl<U, S, A, O, M, I, V, TF> SqlxAdapter<U, S, A, O, M, I, V, TF> {
125 pub fn from_pool(pool: PgPool) -> Self {
126 Self {
127 pool,
128 _phantom: PhantomData,
129 }
130 }
131
132 pub async fn test_connection(&self) -> Result<(), sqlx::Error> {
133 sqlx::query("SELECT 1").execute(&self.pool).await?;
134 Ok(())
135 }
136
137 pub fn pool_stats(&self) -> PoolStats {
138 PoolStats {
139 size: self.pool.size(),
140 idle: self.pool.num_idle(),
141 }
142 }
143
144 pub async fn close(&self) {
145 self.pool.close().await;
146 }
147 }
148
149 #[derive(Debug, Clone)]
150 pub struct PoolConfig {
151 pub max_connections: u32,
152 pub min_connections: u32,
153 pub acquire_timeout: std::time::Duration,
154 pub idle_timeout: Option<std::time::Duration>,
155 pub max_lifetime: Option<std::time::Duration>,
156 }
157
158 impl Default for PoolConfig {
159 fn default() -> Self {
160 Self {
161 max_connections: 10,
162 min_connections: 0,
163 acquire_timeout: std::time::Duration::from_secs(30),
164 idle_timeout: Some(std::time::Duration::from_secs(600)),
165 max_lifetime: Some(std::time::Duration::from_secs(1800)),
166 }
167 }
168 }
169
170 #[derive(Debug, Clone)]
171 pub struct PoolStats {
172 pub size: u32,
173 pub idle: usize,
174 }
175
176 #[async_trait]
179 impl<U, S, A, O, M, I, V, TF> UserOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
180 where
181 U: AuthUser + SqlxEntity,
182 S: AuthSession + SqlxEntity,
183 A: AuthAccount + SqlxEntity,
184 O: AuthOrganization + SqlxEntity,
185 M: AuthMember + SqlxEntity,
186 I: AuthInvitation + SqlxEntity,
187 V: AuthVerification + SqlxEntity,
188 TF: AuthTwoFactor + SqlxEntity,
189 {
190 type User = U;
191
192 async fn create_user(&self, create_user: CreateUser) -> AuthResult<U> {
193 let id = create_user.id.unwrap_or_else(|| Uuid::new_v4().to_string());
194 let now = Utc::now();
195
196 let user = sqlx::query_as::<_, U>(
197 r#"
198 INSERT INTO users (id, email, name, image, email_verified, created_at, updated_at, metadata)
199 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
200 RETURNING *
201 "#,
202 )
203 .bind(&id)
204 .bind(&create_user.email)
205 .bind(&create_user.name)
206 .bind(&create_user.image)
207 .bind(false)
208 .bind(&now)
209 .bind(&now)
210 .bind(sqlx::types::Json(create_user.metadata.unwrap_or(serde_json::json!({}))))
211 .fetch_one(&self.pool)
212 .await?;
213
214 Ok(user)
215 }
216
217 async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<U>> {
218 let user = sqlx::query_as::<_, U>("SELECT * FROM users WHERE id = $1")
219 .bind(id)
220 .fetch_optional(&self.pool)
221 .await?;
222 Ok(user)
223 }
224
225 async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<U>> {
226 let user = sqlx::query_as::<_, U>("SELECT * FROM users WHERE email = $1")
227 .bind(email)
228 .fetch_optional(&self.pool)
229 .await?;
230 Ok(user)
231 }
232
233 async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<U>> {
234 let user = sqlx::query_as::<_, U>("SELECT * FROM users WHERE username = $1")
235 .bind(username)
236 .fetch_optional(&self.pool)
237 .await?;
238 Ok(user)
239 }
240
241 async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<U> {
242 let mut query = sqlx::QueryBuilder::new("UPDATE users SET updated_at = NOW()");
243 let mut has_updates = false;
244
245 if let Some(email) = &update.email {
246 query.push(", email = ");
247 query.push_bind(email);
248 has_updates = true;
249 }
250 if let Some(name) = &update.name {
251 query.push(", name = ");
252 query.push_bind(name);
253 has_updates = true;
254 }
255 if let Some(image) = &update.image {
256 query.push(", image = ");
257 query.push_bind(image);
258 has_updates = true;
259 }
260 if let Some(email_verified) = update.email_verified {
261 query.push(", email_verified = ");
262 query.push_bind(email_verified);
263 has_updates = true;
264 }
265 if let Some(metadata) = &update.metadata {
266 query.push(", metadata = ");
267 query.push_bind(sqlx::types::Json(metadata.clone()));
268 has_updates = true;
269 }
270
271 if !has_updates {
272 return self
273 .get_user_by_id(id)
274 .await?
275 .ok_or(AuthError::UserNotFound);
276 }
277
278 query.push(" WHERE id = ");
279 query.push_bind(id);
280 query.push(" RETURNING *");
281
282 let user = query.build_query_as::<U>().fetch_one(&self.pool).await?;
283 Ok(user)
284 }
285
286 async fn delete_user(&self, id: &str) -> AuthResult<()> {
287 sqlx::query("DELETE FROM users WHERE id = $1")
288 .bind(id)
289 .execute(&self.pool)
290 .await?;
291 Ok(())
292 }
293 }
294
295 #[async_trait]
298 impl<U, S, A, O, M, I, V, TF> SessionOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
299 where
300 U: AuthUser + SqlxEntity,
301 S: AuthSession + SqlxEntity,
302 A: AuthAccount + SqlxEntity,
303 O: AuthOrganization + SqlxEntity,
304 M: AuthMember + SqlxEntity,
305 I: AuthInvitation + SqlxEntity,
306 V: AuthVerification + SqlxEntity,
307 TF: AuthTwoFactor + SqlxEntity,
308 {
309 type Session = S;
310
311 async fn create_session(&self, create_session: CreateSession) -> AuthResult<S> {
312 let id = Uuid::new_v4().to_string();
313 let token = format!("session_{}", Uuid::new_v4());
314 let now = Utc::now();
315
316 let session = sqlx::query_as::<_, S>(
317 r#"
318 INSERT INTO sessions (id, user_id, token, expires_at, created_at, ip_address, user_agent, active)
319 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
320 RETURNING *
321 "#,
322 )
323 .bind(&id)
324 .bind(&create_session.user_id)
325 .bind(&token)
326 .bind(&create_session.expires_at)
327 .bind(&now)
328 .bind(&create_session.ip_address)
329 .bind(&create_session.user_agent)
330 .bind(true)
331 .fetch_one(&self.pool)
332 .await?;
333
334 Ok(session)
335 }
336
337 async fn get_session(&self, token: &str) -> AuthResult<Option<S>> {
338 let session =
339 sqlx::query_as::<_, S>("SELECT * FROM sessions WHERE token = $1 AND active = true")
340 .bind(token)
341 .fetch_optional(&self.pool)
342 .await?;
343 Ok(session)
344 }
345
346 async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<S>> {
347 let sessions = sqlx::query_as::<_, S>(
348 "SELECT * FROM sessions WHERE user_id = $1 AND active = true ORDER BY created_at DESC",
349 )
350 .bind(user_id)
351 .fetch_all(&self.pool)
352 .await?;
353 Ok(sessions)
354 }
355
356 async fn update_session_expiry(
357 &self,
358 token: &str,
359 expires_at: DateTime<Utc>,
360 ) -> AuthResult<()> {
361 sqlx::query("UPDATE sessions SET expires_at = $1 WHERE token = $2 AND active = true")
362 .bind(&expires_at)
363 .bind(token)
364 .execute(&self.pool)
365 .await?;
366 Ok(())
367 }
368
369 async fn delete_session(&self, token: &str) -> AuthResult<()> {
370 sqlx::query("DELETE FROM sessions WHERE token = $1")
371 .bind(token)
372 .execute(&self.pool)
373 .await?;
374 Ok(())
375 }
376
377 async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
378 sqlx::query("DELETE FROM sessions WHERE user_id = $1")
379 .bind(user_id)
380 .execute(&self.pool)
381 .await?;
382 Ok(())
383 }
384
385 async fn delete_expired_sessions(&self) -> AuthResult<usize> {
386 let result =
387 sqlx::query("DELETE FROM sessions WHERE expires_at < NOW() OR active = false")
388 .execute(&self.pool)
389 .await?;
390 Ok(result.rows_affected() as usize)
391 }
392
393 async fn update_session_active_organization(
394 &self,
395 token: &str,
396 organization_id: Option<&str>,
397 ) -> AuthResult<S> {
398 let session = sqlx::query_as::<_, S>(
399 "UPDATE sessions SET active_organization_id = $1, updated_at = NOW() WHERE token = $2 AND active = true RETURNING *",
400 )
401 .bind(organization_id)
402 .bind(token)
403 .fetch_one(&self.pool)
404 .await?;
405 Ok(session)
406 }
407 }
408
409 #[async_trait]
412 impl<U, S, A, O, M, I, V, TF> AccountOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
413 where
414 U: AuthUser + SqlxEntity,
415 S: AuthSession + SqlxEntity,
416 A: AuthAccount + SqlxEntity,
417 O: AuthOrganization + SqlxEntity,
418 M: AuthMember + SqlxEntity,
419 I: AuthInvitation + SqlxEntity,
420 V: AuthVerification + SqlxEntity,
421 TF: AuthTwoFactor + SqlxEntity,
422 {
423 type Account = A;
424
425 async fn create_account(&self, create_account: CreateAccount) -> AuthResult<A> {
426 let id = Uuid::new_v4().to_string();
427 let now = Utc::now();
428
429 let account = sqlx::query_as::<_, A>(
430 r#"
431 INSERT INTO accounts (id, account_id, provider_id, user_id, access_token, refresh_token, id_token, access_token_expires_at, refresh_token_expires_at, scope, password, created_at, updated_at)
432 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
433 RETURNING *
434 "#,
435 )
436 .bind(&id)
437 .bind(&create_account.account_id)
438 .bind(&create_account.provider_id)
439 .bind(&create_account.user_id)
440 .bind(&create_account.access_token)
441 .bind(&create_account.refresh_token)
442 .bind(&create_account.id_token)
443 .bind(&create_account.access_token_expires_at)
444 .bind(&create_account.refresh_token_expires_at)
445 .bind(&create_account.scope)
446 .bind(&create_account.password)
447 .bind(&now)
448 .bind(&now)
449 .fetch_one(&self.pool)
450 .await?;
451
452 Ok(account)
453 }
454
455 async fn get_account(
456 &self,
457 provider: &str,
458 provider_account_id: &str,
459 ) -> AuthResult<Option<A>> {
460 let account = sqlx::query_as::<_, A>(
461 "SELECT * FROM accounts WHERE provider_id = $1 AND account_id = $2",
462 )
463 .bind(provider)
464 .bind(provider_account_id)
465 .fetch_optional(&self.pool)
466 .await?;
467 Ok(account)
468 }
469
470 async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<A>> {
471 let accounts = sqlx::query_as::<_, A>(
472 "SELECT * FROM accounts WHERE user_id = $1 ORDER BY created_at DESC",
473 )
474 .bind(user_id)
475 .fetch_all(&self.pool)
476 .await?;
477 Ok(accounts)
478 }
479
480 async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<A> {
481 let mut query = sqlx::QueryBuilder::new("UPDATE accounts SET updated_at = NOW()");
482
483 if let Some(access_token) = &update.access_token {
484 query.push(", access_token = ");
485 query.push_bind(access_token);
486 }
487 if let Some(refresh_token) = &update.refresh_token {
488 query.push(", refresh_token = ");
489 query.push_bind(refresh_token);
490 }
491 if let Some(id_token) = &update.id_token {
492 query.push(", id_token = ");
493 query.push_bind(id_token);
494 }
495 if let Some(access_token_expires_at) = &update.access_token_expires_at {
496 query.push(", access_token_expires_at = ");
497 query.push_bind(access_token_expires_at);
498 }
499 if let Some(refresh_token_expires_at) = &update.refresh_token_expires_at {
500 query.push(", refresh_token_expires_at = ");
501 query.push_bind(refresh_token_expires_at);
502 }
503 if let Some(scope) = &update.scope {
504 query.push(", scope = ");
505 query.push_bind(scope);
506 }
507
508 query.push(" WHERE id = ");
509 query.push_bind(id);
510 query.push(" RETURNING *");
511
512 let account = query.build_query_as::<A>().fetch_one(&self.pool).await?;
513 Ok(account)
514 }
515
516 async fn delete_account(&self, id: &str) -> AuthResult<()> {
517 sqlx::query("DELETE FROM accounts WHERE id = $1")
518 .bind(id)
519 .execute(&self.pool)
520 .await?;
521 Ok(())
522 }
523 }
524
525 #[async_trait]
528 impl<U, S, A, O, M, I, V, TF> VerificationOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
529 where
530 U: AuthUser + SqlxEntity,
531 S: AuthSession + SqlxEntity,
532 A: AuthAccount + SqlxEntity,
533 O: AuthOrganization + SqlxEntity,
534 M: AuthMember + SqlxEntity,
535 I: AuthInvitation + SqlxEntity,
536 V: AuthVerification + SqlxEntity,
537 TF: AuthTwoFactor + SqlxEntity,
538 {
539 type Verification = V;
540
541 async fn create_verification(
542 &self,
543 create_verification: CreateVerification,
544 ) -> AuthResult<V> {
545 let id = Uuid::new_v4().to_string();
546 let now = Utc::now();
547
548 let verification = sqlx::query_as::<_, V>(
549 r#"
550 INSERT INTO verifications (id, identifier, value, expires_at, created_at, updated_at)
551 VALUES ($1, $2, $3, $4, $5, $6)
552 RETURNING *
553 "#,
554 )
555 .bind(&id)
556 .bind(&create_verification.identifier)
557 .bind(&create_verification.value)
558 .bind(&create_verification.expires_at)
559 .bind(&now)
560 .bind(&now)
561 .fetch_one(&self.pool)
562 .await?;
563
564 Ok(verification)
565 }
566
567 async fn get_verification(&self, identifier: &str, value: &str) -> AuthResult<Option<V>> {
568 let verification = sqlx::query_as::<_, V>(
569 "SELECT * FROM verifications WHERE identifier = $1 AND value = $2 AND expires_at > NOW()",
570 )
571 .bind(identifier)
572 .bind(value)
573 .fetch_optional(&self.pool)
574 .await?;
575 Ok(verification)
576 }
577
578 async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<V>> {
579 let verification = sqlx::query_as::<_, V>(
580 "SELECT * FROM verifications WHERE value = $1 AND expires_at > NOW()",
581 )
582 .bind(value)
583 .fetch_optional(&self.pool)
584 .await?;
585 Ok(verification)
586 }
587
588 async fn get_verification_by_identifier(&self, identifier: &str) -> AuthResult<Option<V>> {
589 let verification = sqlx::query_as::<_, V>(
590 "SELECT * FROM verifications WHERE identifier = $1 AND expires_at > NOW()",
591 )
592 .bind(identifier)
593 .fetch_optional(&self.pool)
594 .await?;
595 Ok(verification)
596 }
597
598 async fn delete_verification(&self, id: &str) -> AuthResult<()> {
599 sqlx::query("DELETE FROM verifications WHERE id = $1")
600 .bind(id)
601 .execute(&self.pool)
602 .await?;
603 Ok(())
604 }
605
606 async fn delete_expired_verifications(&self) -> AuthResult<usize> {
607 let result = sqlx::query("DELETE FROM verifications WHERE expires_at < NOW()")
608 .execute(&self.pool)
609 .await?;
610 Ok(result.rows_affected() as usize)
611 }
612 }
613
614 #[async_trait]
617 impl<U, S, A, O, M, I, V, TF> OrganizationOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
618 where
619 U: AuthUser + SqlxEntity,
620 S: AuthSession + SqlxEntity,
621 A: AuthAccount + SqlxEntity,
622 O: AuthOrganization + SqlxEntity,
623 M: AuthMember + SqlxEntity,
624 I: AuthInvitation + SqlxEntity,
625 V: AuthVerification + SqlxEntity,
626 TF: AuthTwoFactor + SqlxEntity,
627 {
628 type Organization = O;
629
630 async fn create_organization(&self, create_org: CreateOrganization) -> AuthResult<O> {
631 let id = create_org.id.unwrap_or_else(|| Uuid::new_v4().to_string());
632 let now = Utc::now();
633
634 let organization = sqlx::query_as::<_, O>(
635 r#"
636 INSERT INTO organization (id, name, slug, logo, metadata, created_at, updated_at)
637 VALUES ($1, $2, $3, $4, $5, $6, $7)
638 RETURNING *
639 "#,
640 )
641 .bind(&id)
642 .bind(&create_org.name)
643 .bind(&create_org.slug)
644 .bind(&create_org.logo)
645 .bind(sqlx::types::Json(
646 create_org.metadata.unwrap_or(serde_json::json!({})),
647 ))
648 .bind(&now)
649 .bind(&now)
650 .fetch_one(&self.pool)
651 .await?;
652
653 Ok(organization)
654 }
655
656 async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<O>> {
657 let organization = sqlx::query_as::<_, O>("SELECT * FROM organization WHERE id = $1")
658 .bind(id)
659 .fetch_optional(&self.pool)
660 .await?;
661 Ok(organization)
662 }
663
664 async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<O>> {
665 let organization = sqlx::query_as::<_, O>("SELECT * FROM organization WHERE slug = $1")
666 .bind(slug)
667 .fetch_optional(&self.pool)
668 .await?;
669 Ok(organization)
670 }
671
672 async fn update_organization(&self, id: &str, update: UpdateOrganization) -> AuthResult<O> {
673 let mut query = sqlx::QueryBuilder::new("UPDATE organization SET updated_at = NOW()");
674
675 if let Some(name) = &update.name {
676 query.push(", name = ");
677 query.push_bind(name);
678 }
679 if let Some(slug) = &update.slug {
680 query.push(", slug = ");
681 query.push_bind(slug);
682 }
683 if let Some(logo) = &update.logo {
684 query.push(", logo = ");
685 query.push_bind(logo);
686 }
687 if let Some(metadata) = &update.metadata {
688 query.push(", metadata = ");
689 query.push_bind(sqlx::types::Json(metadata.clone()));
690 }
691
692 query.push(" WHERE id = ");
693 query.push_bind(id);
694 query.push(" RETURNING *");
695
696 let organization = query.build_query_as::<O>().fetch_one(&self.pool).await?;
697 Ok(organization)
698 }
699
700 async fn delete_organization(&self, id: &str) -> AuthResult<()> {
701 sqlx::query("DELETE FROM organization WHERE id = $1")
702 .bind(id)
703 .execute(&self.pool)
704 .await?;
705 Ok(())
706 }
707
708 async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<O>> {
709 let organizations = sqlx::query_as::<_, O>(
710 r#"
711 SELECT o.*
712 FROM organization o
713 INNER JOIN member m ON o.id = m.organization_id
714 WHERE m.user_id = $1
715 ORDER BY o.created_at DESC
716 "#,
717 )
718 .bind(user_id)
719 .fetch_all(&self.pool)
720 .await?;
721 Ok(organizations)
722 }
723 }
724
725 #[async_trait]
728 impl<U, S, A, O, M, I, V, TF> MemberOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
729 where
730 U: AuthUser + SqlxEntity,
731 S: AuthSession + SqlxEntity,
732 A: AuthAccount + SqlxEntity,
733 O: AuthOrganization + SqlxEntity,
734 M: AuthMember + SqlxEntity,
735 I: AuthInvitation + SqlxEntity,
736 V: AuthVerification + SqlxEntity,
737 TF: AuthTwoFactor + SqlxEntity,
738 {
739 type Member = M;
740
741 async fn create_member(&self, create_member: CreateMember) -> AuthResult<M> {
742 let id = Uuid::new_v4().to_string();
743 let now = Utc::now();
744
745 let member = sqlx::query_as::<_, M>(
746 r#"
747 INSERT INTO member (id, organization_id, user_id, role, created_at)
748 VALUES ($1, $2, $3, $4, $5)
749 RETURNING *
750 "#,
751 )
752 .bind(&id)
753 .bind(&create_member.organization_id)
754 .bind(&create_member.user_id)
755 .bind(&create_member.role)
756 .bind(&now)
757 .fetch_one(&self.pool)
758 .await?;
759
760 Ok(member)
761 }
762
763 async fn get_member(&self, organization_id: &str, user_id: &str) -> AuthResult<Option<M>> {
764 let member = sqlx::query_as::<_, M>(
765 "SELECT * FROM member WHERE organization_id = $1 AND user_id = $2",
766 )
767 .bind(organization_id)
768 .bind(user_id)
769 .fetch_optional(&self.pool)
770 .await?;
771 Ok(member)
772 }
773
774 async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<M>> {
775 let member = sqlx::query_as::<_, M>("SELECT * FROM member WHERE id = $1")
776 .bind(id)
777 .fetch_optional(&self.pool)
778 .await?;
779 Ok(member)
780 }
781
782 async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<M> {
783 let member =
784 sqlx::query_as::<_, M>("UPDATE member SET role = $1 WHERE id = $2 RETURNING *")
785 .bind(role)
786 .bind(member_id)
787 .fetch_one(&self.pool)
788 .await?;
789 Ok(member)
790 }
791
792 async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
793 sqlx::query("DELETE FROM member WHERE id = $1")
794 .bind(member_id)
795 .execute(&self.pool)
796 .await?;
797 Ok(())
798 }
799
800 async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<M>> {
801 let members = sqlx::query_as::<_, M>(
802 "SELECT * FROM member WHERE organization_id = $1 ORDER BY created_at ASC",
803 )
804 .bind(organization_id)
805 .fetch_all(&self.pool)
806 .await?;
807 Ok(members)
808 }
809
810 async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
811 let count: (i64,) =
812 sqlx::query_as("SELECT COUNT(*) FROM member WHERE organization_id = $1")
813 .bind(organization_id)
814 .fetch_one(&self.pool)
815 .await?;
816 Ok(count.0 as usize)
817 }
818
819 async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
820 let count: (i64,) = sqlx::query_as(
821 "SELECT COUNT(*) FROM member WHERE organization_id = $1 AND role = 'owner'",
822 )
823 .bind(organization_id)
824 .fetch_one(&self.pool)
825 .await?;
826 Ok(count.0 as usize)
827 }
828 }
829
830 #[async_trait]
833 impl<U, S, A, O, M, I, V, TF> InvitationOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
834 where
835 U: AuthUser + SqlxEntity,
836 S: AuthSession + SqlxEntity,
837 A: AuthAccount + SqlxEntity,
838 O: AuthOrganization + SqlxEntity,
839 M: AuthMember + SqlxEntity,
840 I: AuthInvitation + SqlxEntity,
841 V: AuthVerification + SqlxEntity,
842 TF: AuthTwoFactor + SqlxEntity,
843 {
844 type Invitation = I;
845
846 async fn create_invitation(&self, create_inv: CreateInvitation) -> AuthResult<I> {
847 let id = Uuid::new_v4().to_string();
848 let now = Utc::now();
849
850 let invitation = sqlx::query_as::<_, I>(
851 r#"
852 INSERT INTO invitation (id, organization_id, email, role, status, inviter_id, expires_at, created_at)
853 VALUES ($1, $2, $3, $4, 'pending', $5, $6, $7)
854 RETURNING *
855 "#,
856 )
857 .bind(&id)
858 .bind(&create_inv.organization_id)
859 .bind(&create_inv.email)
860 .bind(&create_inv.role)
861 .bind(&create_inv.inviter_id)
862 .bind(&create_inv.expires_at)
863 .bind(&now)
864 .fetch_one(&self.pool)
865 .await?;
866
867 Ok(invitation)
868 }
869
870 async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<I>> {
871 let invitation = sqlx::query_as::<_, I>("SELECT * FROM invitation WHERE id = $1")
872 .bind(id)
873 .fetch_optional(&self.pool)
874 .await?;
875 Ok(invitation)
876 }
877
878 async fn get_pending_invitation(
879 &self,
880 organization_id: &str,
881 email: &str,
882 ) -> AuthResult<Option<I>> {
883 let invitation = sqlx::query_as::<_, I>(
884 "SELECT * FROM invitation WHERE organization_id = $1 AND LOWER(email) = LOWER($2) AND status = 'pending'",
885 )
886 .bind(organization_id)
887 .bind(email)
888 .fetch_optional(&self.pool)
889 .await?;
890 Ok(invitation)
891 }
892
893 async fn update_invitation_status(
894 &self,
895 id: &str,
896 status: InvitationStatus,
897 ) -> AuthResult<I> {
898 let invitation = sqlx::query_as::<_, I>(
899 "UPDATE invitation SET status = $1 WHERE id = $2 RETURNING *",
900 )
901 .bind(status.to_string())
902 .bind(id)
903 .fetch_one(&self.pool)
904 .await?;
905 Ok(invitation)
906 }
907
908 async fn list_organization_invitations(&self, organization_id: &str) -> AuthResult<Vec<I>> {
909 let invitations = sqlx::query_as::<_, I>(
910 "SELECT * FROM invitation WHERE organization_id = $1 ORDER BY created_at DESC",
911 )
912 .bind(organization_id)
913 .fetch_all(&self.pool)
914 .await?;
915 Ok(invitations)
916 }
917
918 async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<I>> {
919 let invitations = sqlx::query_as::<_, I>(
920 "SELECT * FROM invitation WHERE LOWER(email) = LOWER($1) AND status = 'pending' AND expires_at > NOW() ORDER BY created_at DESC",
921 )
922 .bind(email)
923 .fetch_all(&self.pool)
924 .await?;
925 Ok(invitations)
926 }
927 }
928
929 #[async_trait]
932 impl<U, S, A, O, M, I, V, TF> TwoFactorOps for SqlxAdapter<U, S, A, O, M, I, V, TF>
933 where
934 U: AuthUser + SqlxEntity,
935 S: AuthSession + SqlxEntity,
936 A: AuthAccount + SqlxEntity,
937 O: AuthOrganization + SqlxEntity,
938 M: AuthMember + SqlxEntity,
939 I: AuthInvitation + SqlxEntity,
940 V: AuthVerification + SqlxEntity,
941 TF: AuthTwoFactor + SqlxEntity,
942 {
943 type TwoFactor = TF;
944
945 async fn create_two_factor(&self, create: CreateTwoFactor) -> AuthResult<TF> {
946 let id = Uuid::new_v4().to_string();
947 let now = Utc::now();
948
949 let two_factor = sqlx::query_as::<_, TF>(
950 r#"
951 INSERT INTO two_factor (id, secret, backup_codes, user_id, created_at, updated_at)
952 VALUES ($1, $2, $3, $4, $5, $6)
953 RETURNING *
954 "#,
955 )
956 .bind(&id)
957 .bind(&create.secret)
958 .bind(&create.backup_codes)
959 .bind(&create.user_id)
960 .bind(&now)
961 .bind(&now)
962 .fetch_one(&self.pool)
963 .await?;
964
965 Ok(two_factor)
966 }
967
968 async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TF>> {
969 let two_factor = sqlx::query_as::<_, TF>("SELECT * FROM two_factor WHERE user_id = $1")
970 .bind(user_id)
971 .fetch_optional(&self.pool)
972 .await?;
973 Ok(two_factor)
974 }
975
976 async fn update_two_factor_backup_codes(
977 &self,
978 user_id: &str,
979 backup_codes: &str,
980 ) -> AuthResult<TF> {
981 let two_factor = sqlx::query_as::<_, TF>(
982 "UPDATE two_factor SET backup_codes = $1, updated_at = NOW() WHERE user_id = $2 RETURNING *",
983 )
984 .bind(backup_codes)
985 .bind(user_id)
986 .fetch_one(&self.pool)
987 .await?;
988 Ok(two_factor)
989 }
990
991 async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
992 sqlx::query("DELETE FROM two_factor WHERE user_id = $1")
993 .bind(user_id)
994 .execute(&self.pool)
995 .await?;
996 Ok(())
997 }
998 }
999}
1000
1001#[cfg(feature = "sqlx-postgres")]
1002pub use sqlx_adapter::{SqlxAdapter, SqlxEntity};