1use async_trait::async_trait;
2use better_auth_core::adapters::traits::{
3 AccountOps, ApiKeyOps, InvitationOps, MemberOps, OrganizationOps, PasskeyOps, SessionOps,
4 TwoFactorOps, UserOps, VerificationOps,
5};
6use better_auth_core::error::{AuthError, AuthResult};
7use better_auth_core::types::{
8 Account, ApiKey, CreateAccount, CreateApiKey, CreateInvitation, CreateMember,
9 CreateOrganization, CreatePasskey, CreateSession, CreateTwoFactor, CreateUser,
10 CreateVerification, Invitation, InvitationStatus, ListUsersParams, Member, Organization,
11 Passkey, Session, TwoFactor, UpdateAccount, UpdateApiKey, UpdateOrganization, UpdateUser, User,
12 Verification,
13};
14use chrono::{DateTime, Utc};
15use uuid::Uuid;
16
17use crate::client::AllsourceClient;
18
19pub struct AllsourceAuthAdapter {
29 client: AllsourceClient,
30}
31
32impl AllsourceAuthAdapter {
33 pub fn new(core_url: &str, query_url: &str, api_key: &str) -> Self {
39 Self {
40 client: AllsourceClient::new(core_url, query_url, api_key),
41 }
42 }
43}
44
45fn user_entity(id: &str) -> String {
47 format!("auth-user:{id}")
48}
49fn session_entity(token: &str) -> String {
50 format!("auth-session:{token}")
51}
52fn account_entity(id: &str) -> String {
53 format!("auth-account:{id}")
54}
55fn verification_entity(id: &str) -> String {
56 format!("auth-verification:{id}")
57}
58fn org_entity(id: &str) -> String {
59 format!("auth-org:{id}")
60}
61fn member_entity(id: &str) -> String {
62 format!("auth-member:{id}")
63}
64fn invitation_entity(id: &str) -> String {
65 format!("auth-invitation:{id}")
66}
67fn two_factor_entity(id: &str) -> String {
68 format!("auth-2fa:{id}")
69}
70fn api_key_entity(id: &str) -> String {
71 format!("auth-apikey:{id}")
72}
73fn passkey_entity(id: &str) -> String {
74 format!("auth-passkey:{id}")
75}
76
77#[async_trait]
80impl UserOps for AllsourceAuthAdapter {
81 type User = User;
82
83 async fn create_user(&self, input: CreateUser) -> AuthResult<User> {
84 let id = input.id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
85
86 if let Some(email) = &input.email {
88 if let Some(_existing) = self
89 .client
90 .find_by_field::<User>("UserCreated", "email", email)
91 .await
92 .map_err(AuthError::from)?
93 {
94 return Err(AuthError::config("Email already exists"));
95 }
96 }
97
98 if let Some(username) = &input.username {
100 if let Some(_existing) = self
101 .client
102 .find_by_field::<User>("UserCreated", "username", username)
103 .await
104 .map_err(AuthError::from)?
105 {
106 return Err(AuthError::conflict(
107 "A user with this username already exists",
108 ));
109 }
110 }
111
112 let now = Utc::now();
113 let user = User {
114 id: id.clone(),
115 email: input.email,
116 name: input.name,
117 image: input.image,
118 email_verified: input.email_verified.unwrap_or(false),
119 created_at: now,
120 updated_at: now,
121 username: input.username,
122 display_username: input.display_username,
123 two_factor_enabled: false,
124 role: input.role,
125 banned: false,
126 ban_reason: None,
127 ban_expires: None,
128 metadata: input.metadata.unwrap_or(serde_json::Value::Null),
129 };
130
131 let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
132 self.client
133 .append_event(&user_entity(&id), "UserCreated", payload)
134 .await
135 .map_err(AuthError::from)?;
136
137 Ok(user)
141 }
142
143 async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<User>> {
144 self.client
145 .get_latest(&user_entity(id))
146 .await
147 .map_err(AuthError::from)
148 }
149
150 async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<User>> {
151 self.client
152 .find_by_field("UserCreated", "email", email)
153 .await
154 .map_err(AuthError::from)
155 }
156
157 async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<User>> {
158 self.client
159 .find_by_field("UserCreated", "username", username)
160 .await
161 .map_err(AuthError::from)
162 }
163
164 async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<User> {
165 let mut user = self
166 .get_user_by_id(id)
167 .await?
168 .ok_or(AuthError::UserNotFound)?;
169
170 if let Some(email) = update.email {
171 user.email = Some(email);
172 }
173 if let Some(name) = update.name {
174 user.name = Some(name);
175 }
176 if let Some(image) = update.image {
177 user.image = Some(image);
178 }
179 if let Some(verified) = update.email_verified {
180 user.email_verified = verified;
181 }
182 if let Some(username) = update.username {
183 user.username = Some(username);
184 }
185 if let Some(display_username) = update.display_username {
186 user.display_username = Some(display_username);
187 }
188 if let Some(role) = update.role {
189 user.role = Some(role);
190 }
191 if let Some(banned) = update.banned {
192 user.banned = banned;
193 }
194 if let Some(ban_reason) = update.ban_reason {
195 user.ban_reason = Some(ban_reason);
196 }
197 if let Some(ban_expires) = update.ban_expires {
198 user.ban_expires = Some(ban_expires);
199 }
200 if let Some(two_factor_enabled) = update.two_factor_enabled {
201 user.two_factor_enabled = two_factor_enabled;
202 }
203 if let Some(metadata) = update.metadata {
204 user.metadata = metadata;
205 }
206 user.updated_at = Utc::now();
207
208 let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
209 self.client
210 .append_event(&user_entity(id), "UserUpdated", payload)
211 .await
212 .map_err(AuthError::from)?;
213
214 Ok(user)
215 }
216
217 async fn delete_user(&self, id: &str) -> AuthResult<()> {
218 self.client
219 .append_delete(&user_entity(id), "UserDeleted")
220 .await
221 .map_err(AuthError::from)
222 }
223
224 async fn list_users(&self, params: ListUsersParams) -> AuthResult<(Vec<User>, usize)> {
225 let mut users: Vec<User> = self
226 .client
227 .query_all("UserCreated", 10000)
228 .await
229 .map_err(AuthError::from)?;
230
231 if let Some(search_value) = ¶ms.search_value {
233 let field = params.search_field.as_deref().unwrap_or("email");
234 let op = params.search_operator.as_deref().unwrap_or("contains");
235 let sv = search_value.to_lowercase();
236 users.retain(|u| {
237 let field_val = match field {
238 "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
239 _ => u.email.as_deref().unwrap_or("").to_lowercase(),
240 };
241 match op {
242 "starts_with" => field_val.starts_with(&sv),
243 "ends_with" => field_val.ends_with(&sv),
244 _ => field_val.contains(&sv),
245 }
246 });
247 }
248
249 if let Some(filter_value) = ¶ms.filter_value {
251 let field = params.filter_field.as_deref().unwrap_or("email");
252 let op = params.filter_operator.as_deref().unwrap_or("eq");
253 let fv = filter_value.to_lowercase();
254 users.retain(|u| {
255 let field_val = match field {
256 "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
257 "role" => u.role.as_deref().unwrap_or("").to_lowercase(),
258 _ => u.email.as_deref().unwrap_or("").to_lowercase(),
259 };
260 match op {
261 "contains" => field_val.contains(&fv),
262 "starts_with" => field_val.starts_with(&fv),
263 "ends_with" => field_val.ends_with(&fv),
264 "ne" => field_val != fv,
265 _ => field_val == fv,
266 }
267 });
268 }
269
270 let total = users.len();
271 let offset = params.offset.unwrap_or(0);
272 let limit = params.limit.unwrap_or(100);
273 let paged: Vec<User> = users.into_iter().skip(offset).take(limit).collect();
274
275 Ok((paged, total))
276 }
277}
278
279#[async_trait]
282impl SessionOps for AllsourceAuthAdapter {
283 type Session = Session;
284
285 async fn create_session(&self, input: CreateSession) -> AuthResult<Session> {
286 let id = Uuid::new_v4().to_string();
287 let token = format!("session_{}", Uuid::new_v4());
288 let now = Utc::now();
289
290 let session = Session {
291 id,
292 token: token.clone(),
293 user_id: input.user_id,
294 expires_at: input.expires_at,
295 created_at: now,
296 updated_at: now,
297 ip_address: input.ip_address,
298 user_agent: input.user_agent,
299 impersonated_by: input.impersonated_by,
300 active_organization_id: input.active_organization_id,
301 active: true,
302 };
303
304 let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
305 self.client
306 .append_event(&session_entity(&token), "SessionCreated", payload)
307 .await
308 .map_err(AuthError::from)?;
309
310 Ok(session)
311 }
312
313 async fn get_session(&self, token: &str) -> AuthResult<Option<Session>> {
314 self.client
315 .get_latest(&session_entity(token))
316 .await
317 .map_err(AuthError::from)
318 }
319
320 async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<Session>> {
321 self.client
322 .find_all_by_field("SessionCreated", "user_id", user_id)
323 .await
324 .map_err(AuthError::from)
325 }
326
327 async fn update_session_expiry(
328 &self,
329 token: &str,
330 expires_at: DateTime<Utc>,
331 ) -> AuthResult<()> {
332 if let Some(mut session) = self.get_session(token).await? {
333 session.expires_at = expires_at;
334 session.updated_at = Utc::now();
335 let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
336 self.client
337 .append_event(&session_entity(token), "SessionExpiryUpdated", payload)
338 .await
339 .map_err(AuthError::from)?;
340 }
341 Ok(())
342 }
343
344 async fn delete_session(&self, token: &str) -> AuthResult<()> {
345 self.client
346 .append_delete(&session_entity(token), "SessionDeleted")
347 .await
348 .map_err(AuthError::from)
349 }
350
351 async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
352 let sessions = self.get_user_sessions(user_id).await?;
353 for session in sessions {
354 self.delete_session(&session.token).await?;
355 }
356 Ok(())
357 }
358
359 async fn delete_expired_sessions(&self) -> AuthResult<usize> {
360 let all: Vec<Session> = self
361 .client
362 .query_all("SessionCreated", 10000)
363 .await
364 .map_err(AuthError::from)?;
365
366 let now = Utc::now();
367 let mut deleted = 0;
368 for session in all {
369 if session.expires_at <= now || !session.active {
370 self.delete_session(&session.token).await?;
371 deleted += 1;
372 }
373 }
374 Ok(deleted)
375 }
376
377 async fn update_session_active_organization(
378 &self,
379 token: &str,
380 organization_id: Option<&str>,
381 ) -> AuthResult<Session> {
382 let mut session = self
383 .get_session(token)
384 .await?
385 .ok_or(AuthError::SessionNotFound)?;
386
387 session.active_organization_id = organization_id.map(|s| s.to_string());
388 session.updated_at = Utc::now();
389
390 let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
391 self.client
392 .append_event(&session_entity(token), "SessionUpdated", payload)
393 .await
394 .map_err(AuthError::from)?;
395
396 Ok(session)
397 }
398}
399
400#[async_trait]
403impl AccountOps for AllsourceAuthAdapter {
404 type Account = Account;
405
406 async fn create_account(&self, input: CreateAccount) -> AuthResult<Account> {
407 let id = Uuid::new_v4().to_string();
408 let now = Utc::now();
409
410 let account = Account {
411 id: id.clone(),
412 account_id: input.account_id,
413 provider_id: input.provider_id,
414 user_id: input.user_id,
415 access_token: input.access_token,
416 refresh_token: input.refresh_token,
417 id_token: input.id_token,
418 access_token_expires_at: input.access_token_expires_at,
419 refresh_token_expires_at: input.refresh_token_expires_at,
420 scope: input.scope,
421 password: input.password,
422 created_at: now,
423 updated_at: now,
424 };
425
426 let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
427 self.client
428 .append_event(&account_entity(&id), "AccountCreated", payload)
429 .await
430 .map_err(AuthError::from)?;
431
432 Ok(account)
433 }
434
435 async fn get_account(
436 &self,
437 provider: &str,
438 provider_account_id: &str,
439 ) -> AuthResult<Option<Account>> {
440 let all: Vec<Account> = self
442 .client
443 .query_all("AccountCreated", 10000)
444 .await
445 .map_err(AuthError::from)?;
446
447 Ok(all
448 .into_iter()
449 .find(|a| a.provider_id == provider && a.account_id == provider_account_id))
450 }
451
452 async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<Account>> {
453 self.client
454 .find_all_by_field("AccountCreated", "user_id", user_id)
455 .await
456 .map_err(AuthError::from)
457 }
458
459 async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<Account> {
460 let mut account: Account = self
461 .client
462 .get_latest(&account_entity(id))
463 .await
464 .map_err(AuthError::from)?
465 .ok_or_else(|| AuthError::not_found("Account not found"))?;
466
467 if let Some(access_token) = update.access_token {
468 account.access_token = Some(access_token);
469 }
470 if let Some(refresh_token) = update.refresh_token {
471 account.refresh_token = Some(refresh_token);
472 }
473 if let Some(id_token) = update.id_token {
474 account.id_token = Some(id_token);
475 }
476 if let Some(expires_at) = update.access_token_expires_at {
477 account.access_token_expires_at = Some(expires_at);
478 }
479 if let Some(expires_at) = update.refresh_token_expires_at {
480 account.refresh_token_expires_at = Some(expires_at);
481 }
482 if let Some(scope) = update.scope {
483 account.scope = Some(scope);
484 }
485 if let Some(password) = update.password {
486 account.password = Some(password);
487 }
488 account.updated_at = Utc::now();
489
490 let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
491 self.client
492 .append_event(&account_entity(id), "AccountUpdated", payload)
493 .await
494 .map_err(AuthError::from)?;
495
496 Ok(account)
497 }
498
499 async fn delete_account(&self, id: &str) -> AuthResult<()> {
500 self.client
501 .append_delete(&account_entity(id), "AccountDeleted")
502 .await
503 .map_err(AuthError::from)
504 }
505}
506
507#[async_trait]
510impl VerificationOps for AllsourceAuthAdapter {
511 type Verification = Verification;
512
513 async fn create_verification(&self, input: CreateVerification) -> AuthResult<Verification> {
514 let id = Uuid::new_v4().to_string();
515 let now = Utc::now();
516
517 let verification = Verification {
518 id: id.clone(),
519 identifier: input.identifier,
520 value: input.value,
521 expires_at: input.expires_at,
522 created_at: now,
523 updated_at: now,
524 };
525
526 let payload = serde_json::to_value(&verification).map_err(AuthError::from)?;
527 self.client
528 .append_event(
529 &verification_entity(&id),
530 "VerificationCreated",
531 payload,
532 )
533 .await
534 .map_err(AuthError::from)?;
535
536 Ok(verification)
537 }
538
539 async fn get_verification(
540 &self,
541 identifier: &str,
542 value: &str,
543 ) -> AuthResult<Option<Verification>> {
544 let all: Vec<Verification> = self
545 .client
546 .query_all("VerificationCreated", 10000)
547 .await
548 .map_err(AuthError::from)?;
549
550 let now = Utc::now();
551 Ok(all
552 .into_iter()
553 .find(|v| v.identifier == identifier && v.value == value && v.expires_at > now))
554 }
555
556 async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<Verification>> {
557 let all: Vec<Verification> = self
558 .client
559 .query_all("VerificationCreated", 10000)
560 .await
561 .map_err(AuthError::from)?;
562
563 let now = Utc::now();
564 Ok(all
565 .into_iter()
566 .find(|v| v.value == value && v.expires_at > now))
567 }
568
569 async fn get_verification_by_identifier(
570 &self,
571 identifier: &str,
572 ) -> AuthResult<Option<Verification>> {
573 let all: Vec<Verification> = self
574 .client
575 .query_all("VerificationCreated", 10000)
576 .await
577 .map_err(AuthError::from)?;
578
579 let now = Utc::now();
580 Ok(all
581 .into_iter()
582 .find(|v| v.identifier == identifier && v.expires_at > now))
583 }
584
585 async fn consume_verification(
586 &self,
587 identifier: &str,
588 value: &str,
589 ) -> AuthResult<Option<Verification>> {
590 let verification = self.get_verification(identifier, value).await?;
591 if let Some(ref v) = verification {
592 self.client
593 .append_delete(&verification_entity(&v.id), "VerificationConsumed")
594 .await
595 .map_err(AuthError::from)?;
596 }
597 Ok(verification)
598 }
599
600 async fn delete_verification(&self, id: &str) -> AuthResult<()> {
601 self.client
602 .append_delete(&verification_entity(id), "VerificationDeleted")
603 .await
604 .map_err(AuthError::from)
605 }
606
607 async fn delete_expired_verifications(&self) -> AuthResult<usize> {
608 let all: Vec<Verification> = self
609 .client
610 .query_all("VerificationCreated", 10000)
611 .await
612 .map_err(AuthError::from)?;
613
614 let now = Utc::now();
615 let mut deleted = 0;
616 for v in all {
617 if v.expires_at <= now {
618 self.delete_verification(&v.id).await?;
619 deleted += 1;
620 }
621 }
622 Ok(deleted)
623 }
624}
625
626#[async_trait]
629impl OrganizationOps for AllsourceAuthAdapter {
630 type Organization = Organization;
631
632 async fn create_organization(&self, input: CreateOrganization) -> AuthResult<Organization> {
633 if self
635 .client
636 .find_by_field::<Organization>("OrgCreated", "slug", &input.slug)
637 .await
638 .map_err(AuthError::from)?
639 .is_some()
640 {
641 return Err(AuthError::conflict("Organization slug already exists"));
642 }
643
644 let id = input
645 .id
646 .clone()
647 .unwrap_or_else(|| Uuid::new_v4().to_string());
648 let now = Utc::now();
649
650 let org = Organization {
651 id: id.clone(),
652 name: input.name,
653 slug: input.slug,
654 logo: input.logo,
655 metadata: input.metadata,
656 created_at: now,
657 updated_at: now,
658 };
659
660 let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
661 self.client
662 .append_event(&org_entity(&id), "OrgCreated", payload)
663 .await
664 .map_err(AuthError::from)?;
665
666 Ok(org)
667 }
668
669 async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<Organization>> {
670 self.client
671 .get_latest(&org_entity(id))
672 .await
673 .map_err(AuthError::from)
674 }
675
676 async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<Organization>> {
677 self.client
678 .find_by_field("OrgCreated", "slug", slug)
679 .await
680 .map_err(AuthError::from)
681 }
682
683 async fn update_organization(
684 &self,
685 id: &str,
686 update: UpdateOrganization,
687 ) -> AuthResult<Organization> {
688 let mut org: Organization = self
689 .get_organization_by_id(id)
690 .await?
691 .ok_or_else(|| AuthError::not_found("Organization not found"))?;
692
693 if let Some(name) = update.name {
694 org.name = name;
695 }
696 if let Some(slug) = update.slug {
697 org.slug = slug;
698 }
699 if let Some(logo) = update.logo {
700 org.logo = Some(logo);
701 }
702 if let Some(metadata) = update.metadata {
703 org.metadata = Some(metadata);
704 }
705 org.updated_at = Utc::now();
706
707 let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
708 self.client
709 .append_event(&org_entity(id), "OrgUpdated", payload)
710 .await
711 .map_err(AuthError::from)?;
712
713 Ok(org)
714 }
715
716 async fn delete_organization(&self, id: &str) -> AuthResult<()> {
717 self.client
718 .append_delete(&org_entity(id), "OrgDeleted")
719 .await
720 .map_err(AuthError::from)?;
721
722 let members: Vec<Member> = self
724 .client
725 .find_all_by_field("MemberCreated", "organization_id", id)
726 .await
727 .map_err(AuthError::from)?;
728 for m in members {
729 self.client
730 .append_delete(&member_entity(&m.id), "MemberDeleted")
731 .await
732 .map_err(AuthError::from)?;
733 }
734
735 let invitations: Vec<Invitation> = self
736 .client
737 .find_all_by_field("InvitationCreated", "organization_id", id)
738 .await
739 .map_err(AuthError::from)?;
740 for i in invitations {
741 self.client
742 .append_delete(&invitation_entity(&i.id), "InvitationDeleted")
743 .await
744 .map_err(AuthError::from)?;
745 }
746
747 Ok(())
748 }
749
750 async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<Organization>> {
751 let members: Vec<Member> = self
752 .client
753 .find_all_by_field("MemberCreated", "user_id", user_id)
754 .await
755 .map_err(AuthError::from)?;
756
757 let mut orgs = Vec::new();
758 for m in members {
759 if let Some(org) = self.get_organization_by_id(&m.organization_id).await? {
760 orgs.push(org);
761 }
762 }
763 Ok(orgs)
764 }
765}
766
767#[async_trait]
770impl MemberOps for AllsourceAuthAdapter {
771 type Member = Member;
772
773 async fn create_member(&self, input: CreateMember) -> AuthResult<Member> {
774 let existing: Vec<Member> = self
776 .client
777 .find_all_by_field("MemberCreated", "organization_id", &input.organization_id)
778 .await
779 .map_err(AuthError::from)?;
780
781 if existing.iter().any(|m| m.user_id == input.user_id) {
782 return Err(AuthError::conflict(
783 "User is already a member of this organization",
784 ));
785 }
786
787 let id = Uuid::new_v4().to_string();
788 let now = Utc::now();
789
790 let member = Member {
791 id: id.clone(),
792 organization_id: input.organization_id,
793 user_id: input.user_id,
794 role: input.role,
795 created_at: now,
796 };
797
798 let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
799 self.client
800 .append_event(&member_entity(&id), "MemberCreated", payload)
801 .await
802 .map_err(AuthError::from)?;
803
804 Ok(member)
805 }
806
807 async fn get_member(
808 &self,
809 organization_id: &str,
810 user_id: &str,
811 ) -> AuthResult<Option<Member>> {
812 let members: Vec<Member> = self
813 .client
814 .find_all_by_field("MemberCreated", "organization_id", organization_id)
815 .await
816 .map_err(AuthError::from)?;
817
818 Ok(members.into_iter().find(|m| m.user_id == user_id))
819 }
820
821 async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<Member>> {
822 self.client
823 .get_latest(&member_entity(id))
824 .await
825 .map_err(AuthError::from)
826 }
827
828 async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<Member> {
829 let mut member: Member = self
830 .get_member_by_id(member_id)
831 .await?
832 .ok_or_else(|| AuthError::not_found("Member not found"))?;
833
834 member.role = role.to_string();
835
836 let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
837 self.client
838 .append_event(&member_entity(member_id), "MemberUpdated", payload)
839 .await
840 .map_err(AuthError::from)?;
841
842 Ok(member)
843 }
844
845 async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
846 self.client
847 .append_delete(&member_entity(member_id), "MemberDeleted")
848 .await
849 .map_err(AuthError::from)
850 }
851
852 async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<Member>> {
853 self.client
854 .find_all_by_field("MemberCreated", "organization_id", organization_id)
855 .await
856 .map_err(AuthError::from)
857 }
858
859 async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
860 Ok(self.list_organization_members(organization_id).await?.len())
861 }
862
863 async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
864 let members = self.list_organization_members(organization_id).await?;
865 Ok(members.iter().filter(|m| m.role == "owner").count())
866 }
867}
868
869#[async_trait]
872impl InvitationOps for AllsourceAuthAdapter {
873 type Invitation = Invitation;
874
875 async fn create_invitation(&self, input: CreateInvitation) -> AuthResult<Invitation> {
876 let id = Uuid::new_v4().to_string();
877 let now = Utc::now();
878
879 let invitation = Invitation {
880 id: id.clone(),
881 organization_id: input.organization_id,
882 email: input.email,
883 role: input.role,
884 status: InvitationStatus::Pending,
885 inviter_id: input.inviter_id,
886 expires_at: input.expires_at,
887 created_at: now,
888 };
889
890 let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
891 self.client
892 .append_event(&invitation_entity(&id), "InvitationCreated", payload)
893 .await
894 .map_err(AuthError::from)?;
895
896 Ok(invitation)
897 }
898
899 async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<Invitation>> {
900 self.client
901 .get_latest(&invitation_entity(id))
902 .await
903 .map_err(AuthError::from)
904 }
905
906 async fn get_pending_invitation(
907 &self,
908 organization_id: &str,
909 email: &str,
910 ) -> AuthResult<Option<Invitation>> {
911 let invitations: Vec<Invitation> = self
912 .client
913 .find_all_by_field("InvitationCreated", "organization_id", organization_id)
914 .await
915 .map_err(AuthError::from)?;
916
917 Ok(invitations.into_iter().find(|i| {
918 i.email.to_lowercase() == email.to_lowercase()
919 && i.status == InvitationStatus::Pending
920 }))
921 }
922
923 async fn update_invitation_status(
924 &self,
925 id: &str,
926 status: InvitationStatus,
927 ) -> AuthResult<Invitation> {
928 let mut invitation = self
929 .get_invitation_by_id(id)
930 .await?
931 .ok_or_else(|| AuthError::not_found("Invitation not found"))?;
932
933 invitation.status = status;
934
935 let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
936 self.client
937 .append_event(&invitation_entity(id), "InvitationUpdated", payload)
938 .await
939 .map_err(AuthError::from)?;
940
941 Ok(invitation)
942 }
943
944 async fn list_organization_invitations(
945 &self,
946 organization_id: &str,
947 ) -> AuthResult<Vec<Invitation>> {
948 self.client
949 .find_all_by_field("InvitationCreated", "organization_id", organization_id)
950 .await
951 .map_err(AuthError::from)
952 }
953
954 async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<Invitation>> {
955 let all: Vec<Invitation> = self
956 .client
957 .find_all_by_field("InvitationCreated", "email", email)
958 .await
959 .map_err(AuthError::from)?;
960
961 let now = Utc::now();
962 Ok(all
963 .into_iter()
964 .filter(|i| i.status == InvitationStatus::Pending && i.expires_at > now)
965 .collect())
966 }
967}
968
969#[async_trait]
972impl TwoFactorOps for AllsourceAuthAdapter {
973 type TwoFactor = TwoFactor;
974
975 async fn create_two_factor(&self, input: CreateTwoFactor) -> AuthResult<TwoFactor> {
976 if self
978 .get_two_factor_by_user_id(&input.user_id)
979 .await?
980 .is_some()
981 {
982 return Err(AuthError::conflict(
983 "Two-factor authentication already enabled for this user",
984 ));
985 }
986
987 let id = Uuid::new_v4().to_string();
988 let now = Utc::now();
989
990 let two_factor = TwoFactor {
991 id: id.clone(),
992 secret: input.secret,
993 backup_codes: input.backup_codes,
994 user_id: input.user_id,
995 created_at: now,
996 updated_at: now,
997 };
998
999 let payload = serde_json::to_value(&two_factor).map_err(AuthError::from)?;
1000 self.client
1001 .append_event(&two_factor_entity(&id), "TwoFactorCreated", payload)
1002 .await
1003 .map_err(AuthError::from)?;
1004
1005 Ok(two_factor)
1006 }
1007
1008 async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TwoFactor>> {
1009 self.client
1010 .find_by_field("TwoFactorCreated", "user_id", user_id)
1011 .await
1012 .map_err(AuthError::from)
1013 }
1014
1015 async fn update_two_factor_backup_codes(
1016 &self,
1017 user_id: &str,
1018 backup_codes: &str,
1019 ) -> AuthResult<TwoFactor> {
1020 let mut tf = self
1021 .get_two_factor_by_user_id(user_id)
1022 .await?
1023 .ok_or_else(|| AuthError::not_found("Two-factor record not found"))?;
1024
1025 tf.backup_codes = Some(backup_codes.to_string());
1026 tf.updated_at = Utc::now();
1027
1028 let payload = serde_json::to_value(&tf).map_err(AuthError::from)?;
1029 self.client
1030 .append_event(&two_factor_entity(&tf.id), "TwoFactorUpdated", payload)
1031 .await
1032 .map_err(AuthError::from)?;
1033
1034 Ok(tf)
1035 }
1036
1037 async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
1038 if let Some(tf) = self.get_two_factor_by_user_id(user_id).await? {
1039 self.client
1040 .append_delete(&two_factor_entity(&tf.id), "TwoFactorDeleted")
1041 .await
1042 .map_err(AuthError::from)?;
1043 }
1044 Ok(())
1045 }
1046}
1047
1048#[async_trait]
1051impl ApiKeyOps for AllsourceAuthAdapter {
1052 type ApiKey = ApiKey;
1053
1054 async fn create_api_key(&self, input: CreateApiKey) -> AuthResult<ApiKey> {
1055 if self
1057 .get_api_key_by_hash(&input.key_hash)
1058 .await?
1059 .is_some()
1060 {
1061 return Err(AuthError::conflict("API key already exists"));
1062 }
1063
1064 let id = Uuid::new_v4().to_string();
1065 let now = Utc::now().to_rfc3339();
1066
1067 let api_key = ApiKey {
1068 id: id.clone(),
1069 name: input.name,
1070 start: input.start,
1071 prefix: input.prefix,
1072 key_hash: input.key_hash,
1073 user_id: input.user_id,
1074 refill_interval: input.refill_interval,
1075 refill_amount: input.refill_amount,
1076 last_refill_at: None,
1077 enabled: input.enabled,
1078 rate_limit_enabled: input.rate_limit_enabled,
1079 rate_limit_time_window: input.rate_limit_time_window,
1080 rate_limit_max: input.rate_limit_max,
1081 request_count: Some(0),
1082 remaining: input.remaining,
1083 last_request: None,
1084 expires_at: input.expires_at,
1085 created_at: now.clone(),
1086 updated_at: now,
1087 permissions: input.permissions,
1088 metadata: input.metadata,
1089 };
1090
1091 let payload = serde_json::to_value(&api_key).map_err(AuthError::from)?;
1092 self.client
1093 .append_event(&api_key_entity(&id), "ApiKeyCreated", payload)
1094 .await
1095 .map_err(AuthError::from)?;
1096
1097 Ok(api_key)
1098 }
1099
1100 async fn get_api_key_by_id(&self, id: &str) -> AuthResult<Option<ApiKey>> {
1101 self.client
1102 .get_latest(&api_key_entity(id))
1103 .await
1104 .map_err(AuthError::from)
1105 }
1106
1107 async fn get_api_key_by_hash(&self, hash: &str) -> AuthResult<Option<ApiKey>> {
1108 self.client
1109 .find_by_field("ApiKeyCreated", "key", hash)
1110 .await
1111 .map_err(AuthError::from)
1112 }
1113
1114 async fn list_api_keys_by_user(&self, user_id: &str) -> AuthResult<Vec<ApiKey>> {
1115 let mut keys: Vec<ApiKey> = self
1116 .client
1117 .find_all_by_field("ApiKeyCreated", "user_id", user_id)
1118 .await
1119 .map_err(AuthError::from)?;
1120
1121 keys.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1122 Ok(keys)
1123 }
1124
1125 async fn update_api_key(&self, id: &str, update: UpdateApiKey) -> AuthResult<ApiKey> {
1126 let mut key = self
1127 .get_api_key_by_id(id)
1128 .await?
1129 .ok_or_else(|| AuthError::not_found("API key not found"))?;
1130
1131 if let Some(name) = update.name {
1132 key.name = Some(name);
1133 }
1134 if let Some(enabled) = update.enabled {
1135 key.enabled = enabled;
1136 }
1137 if let Some(remaining) = update.remaining {
1138 key.remaining = Some(remaining);
1139 }
1140 if let Some(rate_limit_enabled) = update.rate_limit_enabled {
1141 key.rate_limit_enabled = rate_limit_enabled;
1142 }
1143 if let Some(window) = update.rate_limit_time_window {
1144 key.rate_limit_time_window = Some(window);
1145 }
1146 if let Some(max) = update.rate_limit_max {
1147 key.rate_limit_max = Some(max);
1148 }
1149 if let Some(interval) = update.refill_interval {
1150 key.refill_interval = Some(interval);
1151 }
1152 if let Some(amount) = update.refill_amount {
1153 key.refill_amount = Some(amount);
1154 }
1155 if let Some(permissions) = update.permissions {
1156 key.permissions = Some(permissions);
1157 }
1158 if let Some(metadata) = update.metadata {
1159 key.metadata = Some(metadata);
1160 }
1161 if let Some(expires_at) = update.expires_at {
1162 key.expires_at = expires_at;
1163 }
1164 if let Some(last_request) = update.last_request {
1165 key.last_request = last_request;
1166 }
1167 if let Some(request_count) = update.request_count {
1168 key.request_count = Some(request_count);
1169 }
1170 if let Some(last_refill_at) = update.last_refill_at {
1171 key.last_refill_at = last_refill_at;
1172 }
1173 key.updated_at = Utc::now().to_rfc3339();
1174
1175 let payload = serde_json::to_value(&key).map_err(AuthError::from)?;
1176 self.client
1177 .append_event(&api_key_entity(id), "ApiKeyUpdated", payload)
1178 .await
1179 .map_err(AuthError::from)?;
1180
1181 Ok(key)
1182 }
1183
1184 async fn delete_api_key(&self, id: &str) -> AuthResult<()> {
1185 self.client
1186 .append_delete(&api_key_entity(id), "ApiKeyDeleted")
1187 .await
1188 .map_err(AuthError::from)
1189 }
1190
1191 async fn delete_expired_api_keys(&self) -> AuthResult<usize> {
1192 let all: Vec<ApiKey> = self
1193 .client
1194 .query_all("ApiKeyCreated", 10000)
1195 .await
1196 .map_err(AuthError::from)?;
1197
1198 let now = Utc::now();
1199 let mut deleted = 0;
1200 for key in all {
1201 if let Some(expires_at) = &key.expires_at {
1202 if let Ok(exp) = chrono::DateTime::parse_from_rfc3339(expires_at) {
1203 if exp <= now {
1204 self.delete_api_key(&key.id).await?;
1205 deleted += 1;
1206 }
1207 }
1208 }
1209 }
1210 Ok(deleted)
1211 }
1212}
1213
1214#[async_trait]
1217impl PasskeyOps for AllsourceAuthAdapter {
1218 type Passkey = Passkey;
1219
1220 async fn create_passkey(&self, input: CreatePasskey) -> AuthResult<Passkey> {
1221 if self
1223 .get_passkey_by_credential_id(&input.credential_id)
1224 .await?
1225 .is_some()
1226 {
1227 return Err(AuthError::conflict(
1228 "A passkey with this credential ID already exists",
1229 ));
1230 }
1231
1232 let id = Uuid::new_v4().to_string();
1233 let now = Utc::now();
1234
1235 let passkey = Passkey {
1236 id: id.clone(),
1237 name: input.name,
1238 public_key: input.public_key,
1239 user_id: input.user_id,
1240 credential_id: input.credential_id,
1241 counter: input.counter,
1242 device_type: input.device_type,
1243 backed_up: input.backed_up,
1244 transports: input.transports,
1245 created_at: now,
1246 };
1247
1248 let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1249 self.client
1250 .append_event(&passkey_entity(&id), "PasskeyCreated", payload)
1251 .await
1252 .map_err(AuthError::from)?;
1253
1254 Ok(passkey)
1255 }
1256
1257 async fn get_passkey_by_id(&self, id: &str) -> AuthResult<Option<Passkey>> {
1258 self.client
1259 .get_latest(&passkey_entity(id))
1260 .await
1261 .map_err(AuthError::from)
1262 }
1263
1264 async fn get_passkey_by_credential_id(
1265 &self,
1266 credential_id: &str,
1267 ) -> AuthResult<Option<Passkey>> {
1268 self.client
1269 .find_by_field("PasskeyCreated", "credentialID", credential_id)
1270 .await
1271 .map_err(AuthError::from)
1272 }
1273
1274 async fn list_passkeys_by_user(&self, user_id: &str) -> AuthResult<Vec<Passkey>> {
1275 let mut passkeys: Vec<Passkey> = self
1276 .client
1277 .find_all_by_field("PasskeyCreated", "user_id", user_id)
1278 .await
1279 .map_err(AuthError::from)?;
1280
1281 passkeys.sort_by_key(|p| std::cmp::Reverse(p.created_at));
1282 Ok(passkeys)
1283 }
1284
1285 async fn update_passkey_counter(&self, id: &str, counter: u64) -> AuthResult<Passkey> {
1286 let mut passkey = self
1287 .get_passkey_by_id(id)
1288 .await?
1289 .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1290
1291 passkey.counter = counter;
1292
1293 let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1294 self.client
1295 .append_event(&passkey_entity(id), "PasskeyUpdated", payload)
1296 .await
1297 .map_err(AuthError::from)?;
1298
1299 Ok(passkey)
1300 }
1301
1302 async fn update_passkey_name(&self, id: &str, name: &str) -> AuthResult<Passkey> {
1303 let mut passkey = self
1304 .get_passkey_by_id(id)
1305 .await?
1306 .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1307
1308 passkey.name = name.to_string();
1309
1310 let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1311 self.client
1312 .append_event(&passkey_entity(id), "PasskeyUpdated", payload)
1313 .await
1314 .map_err(AuthError::from)?;
1315
1316 Ok(passkey)
1317 }
1318
1319 async fn delete_passkey(&self, id: &str) -> AuthResult<()> {
1320 self.client
1321 .append_delete(&passkey_entity(id), "PasskeyDeleted")
1322 .await
1323 .map_err(AuthError::from)
1324 }
1325}