1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3
4use crate::entity::{
5 AuthAccount, AuthInvitation, AuthMember, AuthOrganization, AuthSession, AuthUser,
6 AuthVerification,
7};
8use crate::error::AuthResult;
9use crate::types::{
10 CreateAccount, CreateInvitation, CreateMember, CreateOrganization, CreateSession, CreateUser,
11 CreateVerification, InvitationStatus, UpdateOrganization, UpdateUser,
12};
13
14#[async_trait]
20pub trait DatabaseAdapter: Send + Sync + 'static {
21 type User: AuthUser;
22 type Session: AuthSession;
23 type Account: AuthAccount;
24 type Organization: AuthOrganization;
25 type Member: AuthMember;
26 type Invitation: AuthInvitation;
27 type Verification: AuthVerification;
28
29 async fn create_user(&self, user: CreateUser) -> AuthResult<Self::User>;
31 async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<Self::User>>;
32 async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<Self::User>>;
33 async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<Self::User>>;
34 async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<Self::User>;
35 async fn delete_user(&self, id: &str) -> AuthResult<()>;
36
37 async fn create_session(&self, session: CreateSession) -> AuthResult<Self::Session>;
39 async fn get_session(&self, token: &str) -> AuthResult<Option<Self::Session>>;
40 async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<Self::Session>>;
41 async fn update_session_expiry(&self, token: &str, expires_at: DateTime<Utc>)
42 -> AuthResult<()>;
43 async fn delete_session(&self, token: &str) -> AuthResult<()>;
44 async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()>;
45 async fn delete_expired_sessions(&self) -> AuthResult<usize>;
46
47 async fn create_account(&self, account: CreateAccount) -> AuthResult<Self::Account>;
49 async fn get_account(
50 &self,
51 provider: &str,
52 provider_account_id: &str,
53 ) -> AuthResult<Option<Self::Account>>;
54 async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<Self::Account>>;
55 async fn delete_account(&self, id: &str) -> AuthResult<()>;
56
57 async fn create_verification(
59 &self,
60 verification: CreateVerification,
61 ) -> AuthResult<Self::Verification>;
62 async fn get_verification(
63 &self,
64 identifier: &str,
65 value: &str,
66 ) -> AuthResult<Option<Self::Verification>>;
67 async fn get_verification_by_value(
68 &self,
69 value: &str,
70 ) -> AuthResult<Option<Self::Verification>>;
71 async fn delete_verification(&self, id: &str) -> AuthResult<()>;
72 async fn delete_expired_verifications(&self) -> AuthResult<usize>;
73
74 async fn create_organization(&self, org: CreateOrganization) -> AuthResult<Self::Organization>;
76 async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<Self::Organization>>;
77 async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<Self::Organization>>;
78 async fn update_organization(
79 &self,
80 id: &str,
81 update: UpdateOrganization,
82 ) -> AuthResult<Self::Organization>;
83 async fn delete_organization(&self, id: &str) -> AuthResult<()>;
84 async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<Self::Organization>>;
85
86 async fn create_member(&self, member: CreateMember) -> AuthResult<Self::Member>;
88 async fn get_member(
89 &self,
90 organization_id: &str,
91 user_id: &str,
92 ) -> AuthResult<Option<Self::Member>>;
93 async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<Self::Member>>;
94 async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<Self::Member>;
95 async fn delete_member(&self, member_id: &str) -> AuthResult<()>;
96 async fn list_organization_members(
97 &self,
98 organization_id: &str,
99 ) -> AuthResult<Vec<Self::Member>>;
100 async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize>;
101 async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize>;
102
103 async fn create_invitation(&self, invitation: CreateInvitation)
105 -> AuthResult<Self::Invitation>;
106 async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<Self::Invitation>>;
107 async fn get_pending_invitation(
108 &self,
109 organization_id: &str,
110 email: &str,
111 ) -> AuthResult<Option<Self::Invitation>>;
112 async fn update_invitation_status(
113 &self,
114 id: &str,
115 status: InvitationStatus,
116 ) -> AuthResult<Self::Invitation>;
117 async fn list_organization_invitations(
118 &self,
119 organization_id: &str,
120 ) -> AuthResult<Vec<Self::Invitation>>;
121 async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<Self::Invitation>>;
122
123 async fn update_session_active_organization(
125 &self,
126 token: &str,
127 organization_id: Option<&str>,
128 ) -> AuthResult<Self::Session>;
129}
130
131#[cfg(feature = "sqlx-postgres")]
132pub mod sqlx_adapter {
133 use super::*;
134 use crate::error::AuthError;
135 use crate::types::{Account, Invitation, Member, Organization, Session, User, Verification};
136 use sqlx::PgPool;
137 use sqlx::postgres::PgRow;
138 use std::marker::PhantomData;
139 use uuid::Uuid;
140
141 pub trait SqlxEntity:
148 for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
149 {
150 }
151
152 impl<T> SqlxEntity for T where
153 T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Clone + 'static
154 {
155 }
156
157 pub struct SqlxAdapter<
172 U = User,
173 S = Session,
174 A = Account,
175 O = Organization,
176 M = Member,
177 I = Invitation,
178 V = Verification,
179 > {
180 pool: PgPool,
181 _phantom: PhantomData<(U, S, A, O, M, I, V)>,
182 }
183
184 impl SqlxAdapter {
187 pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
188 let pool = PgPool::connect(database_url).await?;
189 Ok(Self {
190 pool,
191 _phantom: PhantomData,
192 })
193 }
194
195 pub async fn with_config(
197 database_url: &str,
198 config: PoolConfig,
199 ) -> Result<Self, sqlx::Error> {
200 let pool = sqlx::postgres::PgPoolOptions::new()
201 .max_connections(config.max_connections)
202 .min_connections(config.min_connections)
203 .acquire_timeout(config.acquire_timeout)
204 .idle_timeout(config.idle_timeout)
205 .max_lifetime(config.max_lifetime)
206 .connect(database_url)
207 .await?;
208 Ok(Self {
209 pool,
210 _phantom: PhantomData,
211 })
212 }
213 }
214
215 impl<U, S, A, O, M, I, V> SqlxAdapter<U, S, A, O, M, I, V> {
217 pub fn from_pool(pool: PgPool) -> Self {
218 Self {
219 pool,
220 _phantom: PhantomData,
221 }
222 }
223
224 pub async fn test_connection(&self) -> Result<(), sqlx::Error> {
226 sqlx::query("SELECT 1").execute(&self.pool).await?;
227 Ok(())
228 }
229
230 pub fn pool_stats(&self) -> PoolStats {
232 PoolStats {
233 size: self.pool.size(),
234 idle: self.pool.num_idle(),
235 }
236 }
237
238 pub async fn close(&self) {
240 self.pool.close().await;
241 }
242 }
243
244 #[derive(Debug, Clone)]
246 pub struct PoolConfig {
247 pub max_connections: u32,
248 pub min_connections: u32,
249 pub acquire_timeout: std::time::Duration,
250 pub idle_timeout: Option<std::time::Duration>,
251 pub max_lifetime: Option<std::time::Duration>,
252 }
253
254 impl Default for PoolConfig {
255 fn default() -> Self {
256 Self {
257 max_connections: 10,
258 min_connections: 0,
259 acquire_timeout: std::time::Duration::from_secs(30),
260 idle_timeout: Some(std::time::Duration::from_secs(600)), max_lifetime: Some(std::time::Duration::from_secs(1800)), }
263 }
264 }
265
266 #[derive(Debug, Clone)]
268 pub struct PoolStats {
269 pub size: u32,
270 pub idle: usize,
271 }
272
273 #[async_trait]
274 impl<U, S, A, O, M, I, V> DatabaseAdapter for SqlxAdapter<U, S, A, O, M, I, V>
275 where
276 U: AuthUser + SqlxEntity,
277 S: AuthSession + SqlxEntity,
278 A: AuthAccount + SqlxEntity,
279 O: AuthOrganization + SqlxEntity,
280 M: AuthMember + SqlxEntity,
281 I: AuthInvitation + SqlxEntity,
282 V: AuthVerification + SqlxEntity,
283 {
284 type User = U;
285 type Session = S;
286 type Account = A;
287 type Organization = O;
288 type Member = M;
289 type Invitation = I;
290 type Verification = V;
291
292 async fn create_user(&self, create_user: CreateUser) -> AuthResult<U> {
293 let id = create_user.id.unwrap_or_else(|| Uuid::new_v4().to_string());
294 let now = Utc::now();
295
296 let user = sqlx::query_as::<_, U>(
297 r#"
298 INSERT INTO users (id, email, name, image, email_verified, created_at, updated_at, metadata)
299 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
300 RETURNING *
301 "#,
302 )
303 .bind(&id)
304 .bind(&create_user.email)
305 .bind(&create_user.name)
306 .bind(&create_user.image)
307 .bind(false)
308 .bind(&now)
309 .bind(&now)
310 .bind(sqlx::types::Json(create_user.metadata.unwrap_or_default()))
311 .fetch_one(&self.pool)
312 .await?;
313
314 Ok(user)
315 }
316
317 async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<U>> {
318 let user = sqlx::query_as::<_, U>("SELECT * FROM users WHERE id = $1")
319 .bind(id)
320 .fetch_optional(&self.pool)
321 .await?;
322
323 Ok(user)
324 }
325
326 async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<U>> {
327 let user = sqlx::query_as::<_, U>("SELECT * FROM users WHERE email = $1")
328 .bind(email)
329 .fetch_optional(&self.pool)
330 .await?;
331
332 Ok(user)
333 }
334
335 async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<U>> {
336 let user = sqlx::query_as::<_, U>("SELECT * FROM users WHERE username = $1")
337 .bind(username)
338 .fetch_optional(&self.pool)
339 .await?;
340
341 Ok(user)
342 }
343
344 async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<U> {
345 let mut query = sqlx::QueryBuilder::new("UPDATE users SET updated_at = NOW()");
346 let mut has_updates = false;
347
348 if let Some(email) = &update.email {
349 query.push(", email = ");
350 query.push_bind(email);
351 has_updates = true;
352 }
353
354 if let Some(name) = &update.name {
355 query.push(", name = ");
356 query.push_bind(name);
357 has_updates = true;
358 }
359
360 if let Some(image) = &update.image {
361 query.push(", image = ");
362 query.push_bind(image);
363 has_updates = true;
364 }
365
366 if let Some(email_verified) = update.email_verified {
367 query.push(", email_verified = ");
368 query.push_bind(email_verified);
369 has_updates = true;
370 }
371
372 if let Some(metadata) = &update.metadata {
373 query.push(", metadata = ");
374 query.push_bind(sqlx::types::Json(metadata.clone()));
375 has_updates = true;
376 }
377
378 if !has_updates {
379 return self
380 .get_user_by_id(id)
381 .await?
382 .ok_or(AuthError::UserNotFound);
383 }
384
385 query.push(" WHERE id = ");
386 query.push_bind(id);
387 query.push(" RETURNING *");
388
389 let user = query.build_query_as::<U>().fetch_one(&self.pool).await?;
390
391 Ok(user)
392 }
393
394 async fn delete_user(&self, id: &str) -> AuthResult<()> {
395 sqlx::query("DELETE FROM users WHERE id = $1")
396 .bind(id)
397 .execute(&self.pool)
398 .await?;
399
400 Ok(())
401 }
402
403 async fn create_session(&self, create_session: CreateSession) -> AuthResult<S> {
404 let id = Uuid::new_v4().to_string();
405 let token = format!("session_{}", Uuid::new_v4());
406 let now = Utc::now();
407
408 let session = sqlx::query_as::<_, S>(
409 r#"
410 INSERT INTO sessions (id, user_id, token, expires_at, created_at, ip_address, user_agent, active)
411 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
412 RETURNING *
413 "#,
414 )
415 .bind(&id)
416 .bind(&create_session.user_id)
417 .bind(&token)
418 .bind(&create_session.expires_at)
419 .bind(&now)
420 .bind(&create_session.ip_address)
421 .bind(&create_session.user_agent)
422 .bind(true)
423 .fetch_one(&self.pool)
424 .await?;
425
426 Ok(session)
427 }
428
429 async fn get_session(&self, token: &str) -> AuthResult<Option<S>> {
430 let session =
431 sqlx::query_as::<_, S>("SELECT * FROM sessions WHERE token = $1 AND active = true")
432 .bind(token)
433 .fetch_optional(&self.pool)
434 .await?;
435
436 Ok(session)
437 }
438
439 async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<S>> {
440 let sessions = sqlx::query_as::<_, S>(
441 r#"
442 SELECT * FROM sessions
443 WHERE user_id = $1 AND active = true
444 ORDER BY created_at DESC
445 "#,
446 )
447 .bind(user_id)
448 .fetch_all(&self.pool)
449 .await?;
450
451 Ok(sessions)
452 }
453
454 async fn update_session_expiry(
455 &self,
456 token: &str,
457 expires_at: DateTime<Utc>,
458 ) -> AuthResult<()> {
459 sqlx::query("UPDATE sessions SET expires_at = $1 WHERE token = $2 AND active = true")
460 .bind(&expires_at)
461 .bind(token)
462 .execute(&self.pool)
463 .await?;
464
465 Ok(())
466 }
467
468 async fn delete_session(&self, token: &str) -> AuthResult<()> {
469 sqlx::query("DELETE FROM sessions WHERE token = $1")
470 .bind(token)
471 .execute(&self.pool)
472 .await?;
473
474 Ok(())
475 }
476
477 async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
478 sqlx::query("DELETE FROM sessions WHERE user_id = $1")
479 .bind(user_id)
480 .execute(&self.pool)
481 .await?;
482
483 Ok(())
484 }
485
486 async fn delete_expired_sessions(&self) -> AuthResult<usize> {
487 let result =
488 sqlx::query("DELETE FROM sessions WHERE expires_at < NOW() OR active = false")
489 .execute(&self.pool)
490 .await?;
491
492 Ok(result.rows_affected() as usize)
493 }
494
495 async fn create_account(&self, create_account: CreateAccount) -> AuthResult<A> {
496 let id = Uuid::new_v4().to_string();
497 let now = Utc::now();
498
499 let account = sqlx::query_as::<_, A>(
500 r#"
501 INSERT INTO accounts (id, account_id, provider_id, user_id, access_token, refresh_token, id_token, access_token_expires_at, refresh_token_expires_at, scope, password, created_at, updated_at)
502 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
503 RETURNING *
504 "#,
505 )
506 .bind(&id)
507 .bind(&create_account.account_id)
508 .bind(&create_account.provider_id)
509 .bind(&create_account.user_id)
510 .bind(&create_account.access_token)
511 .bind(&create_account.refresh_token)
512 .bind(&create_account.id_token)
513 .bind(&create_account.access_token_expires_at)
514 .bind(&create_account.refresh_token_expires_at)
515 .bind(&create_account.scope)
516 .bind(&create_account.password)
517 .bind(&now)
518 .bind(&now)
519 .fetch_one(&self.pool)
520 .await?;
521
522 Ok(account)
523 }
524
525 async fn get_account(
526 &self,
527 provider: &str,
528 provider_account_id: &str,
529 ) -> AuthResult<Option<A>> {
530 let account = sqlx::query_as::<_, A>(
531 "SELECT * FROM accounts WHERE provider_id = $1 AND account_id = $2",
532 )
533 .bind(provider)
534 .bind(provider_account_id)
535 .fetch_optional(&self.pool)
536 .await?;
537
538 Ok(account)
539 }
540
541 async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<A>> {
542 let accounts = sqlx::query_as::<_, A>(
543 "SELECT * FROM accounts WHERE user_id = $1 ORDER BY created_at DESC",
544 )
545 .bind(user_id)
546 .fetch_all(&self.pool)
547 .await?;
548
549 Ok(accounts)
550 }
551
552 async fn delete_account(&self, id: &str) -> AuthResult<()> {
553 sqlx::query("DELETE FROM accounts WHERE id = $1")
554 .bind(id)
555 .execute(&self.pool)
556 .await?;
557
558 Ok(())
559 }
560
561 async fn create_verification(
562 &self,
563 create_verification: CreateVerification,
564 ) -> AuthResult<V> {
565 let id = Uuid::new_v4().to_string();
566 let now = Utc::now();
567
568 let verification = sqlx::query_as::<_, V>(
569 r#"
570 INSERT INTO verifications (id, identifier, value, expires_at, created_at, updated_at)
571 VALUES ($1, $2, $3, $4, $5, $6)
572 RETURNING *
573 "#,
574 )
575 .bind(&id)
576 .bind(&create_verification.identifier)
577 .bind(&create_verification.value)
578 .bind(&create_verification.expires_at)
579 .bind(&now)
580 .bind(&now)
581 .fetch_one(&self.pool)
582 .await?;
583
584 Ok(verification)
585 }
586
587 async fn get_verification(&self, identifier: &str, value: &str) -> AuthResult<Option<V>> {
588 let verification = sqlx::query_as::<_, V>(
589 "SELECT * FROM verifications WHERE identifier = $1 AND value = $2 AND expires_at > NOW()",
590 )
591 .bind(identifier)
592 .bind(value)
593 .fetch_optional(&self.pool)
594 .await?;
595
596 Ok(verification)
597 }
598
599 async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<V>> {
600 let verification = sqlx::query_as::<_, V>(
601 "SELECT * FROM verifications WHERE value = $1 AND expires_at > NOW()",
602 )
603 .bind(value)
604 .fetch_optional(&self.pool)
605 .await?;
606
607 Ok(verification)
608 }
609
610 async fn delete_verification(&self, id: &str) -> AuthResult<()> {
611 sqlx::query("DELETE FROM verifications WHERE id = $1")
612 .bind(id)
613 .execute(&self.pool)
614 .await?;
615
616 Ok(())
617 }
618
619 async fn delete_expired_verifications(&self) -> AuthResult<usize> {
620 let result = sqlx::query("DELETE FROM verifications WHERE expires_at < NOW()")
621 .execute(&self.pool)
622 .await?;
623
624 Ok(result.rows_affected() as usize)
625 }
626
627 async fn create_organization(&self, create_org: CreateOrganization) -> AuthResult<O> {
629 let id = create_org.id.unwrap_or_else(|| Uuid::new_v4().to_string());
630 let now = Utc::now();
631
632 let organization = sqlx::query_as::<_, O>(
633 r#"
634 INSERT INTO organization (id, name, slug, logo, metadata, created_at, updated_at)
635 VALUES ($1, $2, $3, $4, $5, $6, $7)
636 RETURNING *
637 "#,
638 )
639 .bind(&id)
640 .bind(&create_org.name)
641 .bind(&create_org.slug)
642 .bind(&create_org.logo)
643 .bind(sqlx::types::Json(
644 create_org.metadata.unwrap_or(serde_json::json!({})),
645 ))
646 .bind(&now)
647 .bind(&now)
648 .fetch_one(&self.pool)
649 .await?;
650
651 Ok(organization)
652 }
653
654 async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<O>> {
655 let organization = sqlx::query_as::<_, O>("SELECT * FROM organization WHERE id = $1")
656 .bind(id)
657 .fetch_optional(&self.pool)
658 .await?;
659
660 Ok(organization)
661 }
662
663 async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<O>> {
664 let organization = sqlx::query_as::<_, O>("SELECT * FROM organization WHERE slug = $1")
665 .bind(slug)
666 .fetch_optional(&self.pool)
667 .await?;
668
669 Ok(organization)
670 }
671
672 async fn update_organization(&self, id: &str, update: UpdateOrganization) -> AuthResult<O> {
673 let mut query = sqlx::QueryBuilder::new("UPDATE organization SET updated_at = NOW()");
674
675 if let Some(name) = &update.name {
676 query.push(", name = ");
677 query.push_bind(name);
678 }
679 if let Some(slug) = &update.slug {
680 query.push(", slug = ");
681 query.push_bind(slug);
682 }
683 if let Some(logo) = &update.logo {
684 query.push(", logo = ");
685 query.push_bind(logo);
686 }
687 if let Some(metadata) = &update.metadata {
688 query.push(", metadata = ");
689 query.push_bind(sqlx::types::Json(metadata.clone()));
690 }
691
692 query.push(" WHERE id = ");
693 query.push_bind(id);
694 query.push(" RETURNING *");
695
696 let organization = query.build_query_as::<O>().fetch_one(&self.pool).await?;
697
698 Ok(organization)
699 }
700
701 async fn delete_organization(&self, id: &str) -> AuthResult<()> {
702 sqlx::query("DELETE FROM organization WHERE id = $1")
703 .bind(id)
704 .execute(&self.pool)
705 .await?;
706
707 Ok(())
708 }
709
710 async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<O>> {
711 let organizations = sqlx::query_as::<_, O>(
712 r#"
713 SELECT o.*
714 FROM organization o
715 INNER JOIN member m ON o.id = m.organization_id
716 WHERE m.user_id = $1
717 ORDER BY o.created_at DESC
718 "#,
719 )
720 .bind(user_id)
721 .fetch_all(&self.pool)
722 .await?;
723
724 Ok(organizations)
725 }
726
727 async fn create_member(&self, create_member: CreateMember) -> AuthResult<M> {
729 let id = Uuid::new_v4().to_string();
730 let now = Utc::now();
731
732 let member = sqlx::query_as::<_, M>(
733 r#"
734 INSERT INTO member (id, organization_id, user_id, role, created_at)
735 VALUES ($1, $2, $3, $4, $5)
736 RETURNING *
737 "#,
738 )
739 .bind(&id)
740 .bind(&create_member.organization_id)
741 .bind(&create_member.user_id)
742 .bind(&create_member.role)
743 .bind(&now)
744 .fetch_one(&self.pool)
745 .await?;
746
747 Ok(member)
748 }
749
750 async fn get_member(&self, organization_id: &str, user_id: &str) -> AuthResult<Option<M>> {
751 let member = sqlx::query_as::<_, M>(
752 "SELECT * FROM member WHERE organization_id = $1 AND user_id = $2",
753 )
754 .bind(organization_id)
755 .bind(user_id)
756 .fetch_optional(&self.pool)
757 .await?;
758
759 Ok(member)
760 }
761
762 async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<M>> {
763 let member = sqlx::query_as::<_, M>("SELECT * FROM member WHERE id = $1")
764 .bind(id)
765 .fetch_optional(&self.pool)
766 .await?;
767
768 Ok(member)
769 }
770
771 async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<M> {
772 let member =
773 sqlx::query_as::<_, M>("UPDATE member SET role = $1 WHERE id = $2 RETURNING *")
774 .bind(role)
775 .bind(member_id)
776 .fetch_one(&self.pool)
777 .await?;
778
779 Ok(member)
780 }
781
782 async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
783 sqlx::query("DELETE FROM member WHERE id = $1")
784 .bind(member_id)
785 .execute(&self.pool)
786 .await?;
787
788 Ok(())
789 }
790
791 async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<M>> {
792 let members = sqlx::query_as::<_, M>(
793 "SELECT * FROM member WHERE organization_id = $1 ORDER BY created_at ASC",
794 )
795 .bind(organization_id)
796 .fetch_all(&self.pool)
797 .await?;
798
799 Ok(members)
800 }
801
802 async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
803 let count: (i64,) =
804 sqlx::query_as("SELECT COUNT(*) FROM member WHERE organization_id = $1")
805 .bind(organization_id)
806 .fetch_one(&self.pool)
807 .await?;
808
809 Ok(count.0 as usize)
810 }
811
812 async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
813 let count: (i64,) = sqlx::query_as(
814 "SELECT COUNT(*) FROM member WHERE organization_id = $1 AND role = 'owner'",
815 )
816 .bind(organization_id)
817 .fetch_one(&self.pool)
818 .await?;
819
820 Ok(count.0 as usize)
821 }
822
823 async fn create_invitation(&self, create_inv: CreateInvitation) -> AuthResult<I> {
825 let id = Uuid::new_v4().to_string();
826 let now = Utc::now();
827
828 let invitation = sqlx::query_as::<_, I>(
829 r#"
830 INSERT INTO invitation (id, organization_id, email, role, status, inviter_id, expires_at, created_at)
831 VALUES ($1, $2, $3, $4, 'pending', $5, $6, $7)
832 RETURNING *
833 "#,
834 )
835 .bind(&id)
836 .bind(&create_inv.organization_id)
837 .bind(&create_inv.email)
838 .bind(&create_inv.role)
839 .bind(&create_inv.inviter_id)
840 .bind(&create_inv.expires_at)
841 .bind(&now)
842 .fetch_one(&self.pool)
843 .await?;
844
845 Ok(invitation)
846 }
847
848 async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<I>> {
849 let invitation = sqlx::query_as::<_, I>("SELECT * FROM invitation WHERE id = $1")
850 .bind(id)
851 .fetch_optional(&self.pool)
852 .await?;
853
854 Ok(invitation)
855 }
856
857 async fn get_pending_invitation(
858 &self,
859 organization_id: &str,
860 email: &str,
861 ) -> AuthResult<Option<I>> {
862 let invitation = sqlx::query_as::<_, I>(
863 "SELECT * FROM invitation WHERE organization_id = $1 AND LOWER(email) = LOWER($2) AND status = 'pending'",
864 )
865 .bind(organization_id)
866 .bind(email)
867 .fetch_optional(&self.pool)
868 .await?;
869
870 Ok(invitation)
871 }
872
873 async fn update_invitation_status(
874 &self,
875 id: &str,
876 status: InvitationStatus,
877 ) -> AuthResult<I> {
878 let invitation = sqlx::query_as::<_, I>(
879 "UPDATE invitation SET status = $1 WHERE id = $2 RETURNING *",
880 )
881 .bind(status.to_string())
882 .bind(id)
883 .fetch_one(&self.pool)
884 .await?;
885
886 Ok(invitation)
887 }
888
889 async fn list_organization_invitations(&self, organization_id: &str) -> AuthResult<Vec<I>> {
890 let invitations = sqlx::query_as::<_, I>(
891 "SELECT * FROM invitation WHERE organization_id = $1 ORDER BY created_at DESC",
892 )
893 .bind(organization_id)
894 .fetch_all(&self.pool)
895 .await?;
896
897 Ok(invitations)
898 }
899
900 async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<I>> {
901 let invitations = sqlx::query_as::<_, I>(
902 "SELECT * FROM invitation WHERE LOWER(email) = LOWER($1) AND status = 'pending' AND expires_at > NOW() ORDER BY created_at DESC",
903 )
904 .bind(email)
905 .fetch_all(&self.pool)
906 .await?;
907
908 Ok(invitations)
909 }
910
911 async fn update_session_active_organization(
913 &self,
914 token: &str,
915 organization_id: Option<&str>,
916 ) -> AuthResult<S> {
917 let session = sqlx::query_as::<_, S>(
918 "UPDATE sessions SET active_organization_id = $1, updated_at = NOW() WHERE token = $2 AND active = true RETURNING *",
919 )
920 .bind(organization_id)
921 .bind(token)
922 .fetch_one(&self.pool)
923 .await?;
924
925 Ok(session)
926 }
927 }
928}
929
930#[cfg(feature = "sqlx-postgres")]
931pub use sqlx_adapter::{SqlxAdapter, SqlxEntity};