1use async_trait::async_trait;
2use better_auth_core::adapters::traits::{
3 AccountOps, ApiKeyOps, InvitationOps, MemberOps, OrganizationOps, PasskeyOps, SessionOps,
4 TwoFactorOps, UserOps, VerificationOps,
5};
6use better_auth_core::error::{AuthError, AuthResult};
7use better_auth_core::types::{
8 Account, ApiKey, CreateAccount, CreateApiKey, CreateInvitation, CreateMember,
9 CreateOrganization, CreatePasskey, CreateSession, CreateTwoFactor, CreateUser,
10 CreateVerification, Invitation, InvitationStatus, ListUsersParams, Member, Organization,
11 Passkey, Session, TwoFactor, UpdateAccount, UpdateApiKey, UpdateOrganization, UpdateUser, User,
12 Verification,
13};
14use chrono::{DateTime, Utc};
15use uuid::Uuid;
16
17use crate::client::AllsourceClient;
18
19pub struct AllsourceAuthAdapter {
29 client: AllsourceClient,
30}
31
32impl AllsourceAuthAdapter {
33 pub fn new(core_url: &str, query_url: &str, api_key: &str) -> Self {
39 Self {
40 client: AllsourceClient::new(core_url, query_url, api_key),
41 }
42 }
43}
44
45fn user_entity(id: &str) -> String {
47 format!("auth-user:{id}")
48}
49fn session_entity(token: &str) -> String {
50 format!("auth-session:{token}")
51}
52fn account_entity(id: &str) -> String {
53 format!("auth-account:{id}")
54}
55fn verification_entity(id: &str) -> String {
56 format!("auth-verification:{id}")
57}
58fn org_entity(id: &str) -> String {
59 format!("auth-org:{id}")
60}
61fn member_entity(id: &str) -> String {
62 format!("auth-member:{id}")
63}
64fn invitation_entity(id: &str) -> String {
65 format!("auth-invitation:{id}")
66}
67fn two_factor_entity(id: &str) -> String {
68 format!("auth-2fa:{id}")
69}
70fn api_key_entity(id: &str) -> String {
71 format!("auth-apikey:{id}")
72}
73fn passkey_entity(id: &str) -> String {
74 format!("auth-passkey:{id}")
75}
76
77#[async_trait]
80impl UserOps for AllsourceAuthAdapter {
81 type User = User;
82
83 async fn create_user(&self, input: CreateUser) -> AuthResult<User> {
84 let id = input.id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
85
86 if let Some(email) = &input.email {
88 if let Some(_existing) = self
89 .client
90 .find_by_field::<User>("auth.user.created", "email", email)
91 .await
92 .map_err(AuthError::from)?
93 {
94 return Err(AuthError::config("Email already exists"));
95 }
96 }
97
98 if let Some(username) = &input.username {
100 if let Some(_existing) = self
101 .client
102 .find_by_field::<User>("auth.user.created", "username", username)
103 .await
104 .map_err(AuthError::from)?
105 {
106 return Err(AuthError::conflict(
107 "A user with this username already exists",
108 ));
109 }
110 }
111
112 let now = Utc::now();
113 let user = User {
114 id: id.clone(),
115 email: input.email,
116 name: input.name,
117 image: input.image,
118 email_verified: input.email_verified.unwrap_or(false),
119 created_at: now,
120 updated_at: now,
121 username: input.username,
122 display_username: input.display_username,
123 two_factor_enabled: false,
124 role: input.role,
125 banned: false,
126 ban_reason: None,
127 ban_expires: None,
128 metadata: input.metadata.unwrap_or(serde_json::Value::Null),
129 };
130
131 let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
132 self.client
133 .append_event(&user_entity(&id), "auth.user.created", payload)
134 .await
135 .map_err(AuthError::from)?;
136
137 Ok(user)
141 }
142
143 async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<User>> {
144 self.client
145 .get_latest(&user_entity(id))
146 .await
147 .map_err(AuthError::from)
148 }
149
150 async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<User>> {
151 self.client
152 .find_by_field("auth.user.created", "email", email)
153 .await
154 .map_err(AuthError::from)
155 }
156
157 async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<User>> {
158 self.client
159 .find_by_field("auth.user.created", "username", username)
160 .await
161 .map_err(AuthError::from)
162 }
163
164 async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<User> {
165 let mut user = self
166 .get_user_by_id(id)
167 .await?
168 .ok_or(AuthError::UserNotFound)?;
169
170 if let Some(email) = update.email {
171 user.email = Some(email);
172 }
173 if let Some(name) = update.name {
174 user.name = Some(name);
175 }
176 if let Some(image) = update.image {
177 user.image = Some(image);
178 }
179 if let Some(verified) = update.email_verified {
180 user.email_verified = verified;
181 }
182 if let Some(username) = update.username {
183 user.username = Some(username);
184 }
185 if let Some(display_username) = update.display_username {
186 user.display_username = Some(display_username);
187 }
188 if let Some(role) = update.role {
189 user.role = Some(role);
190 }
191 if let Some(banned) = update.banned {
192 user.banned = banned;
193 }
194 if let Some(ban_reason) = update.ban_reason {
195 user.ban_reason = Some(ban_reason);
196 }
197 if let Some(ban_expires) = update.ban_expires {
198 user.ban_expires = Some(ban_expires);
199 }
200 if let Some(two_factor_enabled) = update.two_factor_enabled {
201 user.two_factor_enabled = two_factor_enabled;
202 }
203 if let Some(metadata) = update.metadata {
204 user.metadata = metadata;
205 }
206 user.updated_at = Utc::now();
207
208 let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
209 self.client
210 .append_event(&user_entity(id), "auth.user.updated", payload)
211 .await
212 .map_err(AuthError::from)?;
213
214 Ok(user)
215 }
216
217 async fn delete_user(&self, id: &str) -> AuthResult<()> {
218 self.client
219 .append_delete(&user_entity(id), "auth.user.deleted")
220 .await
221 .map_err(AuthError::from)
222 }
223
224 async fn list_users(&self, params: ListUsersParams) -> AuthResult<(Vec<User>, usize)> {
225 let mut users: Vec<User> = self
226 .client
227 .query_all("auth.user.created", 10000)
228 .await
229 .map_err(AuthError::from)?;
230
231 if let Some(search_value) = ¶ms.search_value {
233 let field = params.search_field.as_deref().unwrap_or("email");
234 let op = params.search_operator.as_deref().unwrap_or("contains");
235 let sv = search_value.to_lowercase();
236 users.retain(|u| {
237 let field_val = match field {
238 "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
239 _ => u.email.as_deref().unwrap_or("").to_lowercase(),
240 };
241 match op {
242 "starts_with" => field_val.starts_with(&sv),
243 "ends_with" => field_val.ends_with(&sv),
244 _ => field_val.contains(&sv),
245 }
246 });
247 }
248
249 if let Some(filter_value) = ¶ms.filter_value {
251 let field = params.filter_field.as_deref().unwrap_or("email");
252 let op = params.filter_operator.as_deref().unwrap_or("eq");
253 let fv = filter_value.to_lowercase();
254 users.retain(|u| {
255 let field_val = match field {
256 "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
257 "role" => u.role.as_deref().unwrap_or("").to_lowercase(),
258 _ => u.email.as_deref().unwrap_or("").to_lowercase(),
259 };
260 match op {
261 "contains" => field_val.contains(&fv),
262 "starts_with" => field_val.starts_with(&fv),
263 "ends_with" => field_val.ends_with(&fv),
264 "ne" => field_val != fv,
265 _ => field_val == fv,
266 }
267 });
268 }
269
270 let total = users.len();
271 let offset = params.offset.unwrap_or(0);
272 let limit = params.limit.unwrap_or(100);
273 let paged: Vec<User> = users.into_iter().skip(offset).take(limit).collect();
274
275 Ok((paged, total))
276 }
277}
278
279#[async_trait]
282impl SessionOps for AllsourceAuthAdapter {
283 type Session = Session;
284
285 async fn create_session(&self, input: CreateSession) -> AuthResult<Session> {
286 let id = Uuid::new_v4().to_string();
287 let token = format!("session_{}", Uuid::new_v4());
288 let now = Utc::now();
289
290 let session = Session {
291 id,
292 token: token.clone(),
293 user_id: input.user_id,
294 expires_at: input.expires_at,
295 created_at: now,
296 updated_at: now,
297 ip_address: input.ip_address,
298 user_agent: input.user_agent,
299 impersonated_by: input.impersonated_by,
300 active_organization_id: input.active_organization_id,
301 active: true,
302 };
303
304 let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
305 self.client
306 .append_event(&session_entity(&token), "auth.session.created", payload)
307 .await
308 .map_err(AuthError::from)?;
309
310 Ok(session)
311 }
312
313 async fn get_session(&self, token: &str) -> AuthResult<Option<Session>> {
314 let session: Option<Session> = self
315 .client
316 .get_latest(&session_entity(token))
317 .await
318 .map_err(AuthError::from)?;
319 Ok(session.map(|mut s| {
322 s.active = true;
323 s
324 }))
325 }
326
327 async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<Session>> {
328 let sessions: Vec<Session> = self
329 .client
330 .find_all_by_field("auth.session.created", "user_id", user_id)
331 .await
332 .map_err(AuthError::from)?;
333 Ok(sessions
334 .into_iter()
335 .map(|mut s| {
336 s.active = true;
337 s
338 })
339 .collect())
340 }
341
342 async fn update_session_expiry(
343 &self,
344 token: &str,
345 expires_at: DateTime<Utc>,
346 ) -> AuthResult<()> {
347 if let Some(mut session) = self.get_session(token).await? {
348 session.expires_at = expires_at;
349 session.updated_at = Utc::now();
350 let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
351 self.client
352 .append_event(&session_entity(token), "auth.session.expiry_updated", payload)
353 .await
354 .map_err(AuthError::from)?;
355 }
356 Ok(())
357 }
358
359 async fn delete_session(&self, token: &str) -> AuthResult<()> {
360 self.client
361 .append_delete(&session_entity(token), "auth.session.deleted")
362 .await
363 .map_err(AuthError::from)
364 }
365
366 async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
367 let sessions = self.get_user_sessions(user_id).await?;
368 for session in sessions {
369 self.delete_session(&session.token).await?;
370 }
371 Ok(())
372 }
373
374 async fn delete_expired_sessions(&self) -> AuthResult<usize> {
375 let all: Vec<Session> = self
376 .client
377 .query_all("auth.session.created", 10000)
378 .await
379 .map_err(AuthError::from)?;
380
381 let now = Utc::now();
382 let mut deleted = 0;
383 for session in all {
384 if session.expires_at <= now || !session.active {
385 self.delete_session(&session.token).await?;
386 deleted += 1;
387 }
388 }
389 Ok(deleted)
390 }
391
392 async fn update_session_active_organization(
393 &self,
394 token: &str,
395 organization_id: Option<&str>,
396 ) -> AuthResult<Session> {
397 let mut session = self
398 .get_session(token)
399 .await?
400 .ok_or(AuthError::SessionNotFound)?;
401
402 session.active_organization_id = organization_id.map(|s| s.to_string());
403 session.updated_at = Utc::now();
404
405 let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
406 self.client
407 .append_event(&session_entity(token), "auth.session.updated", payload)
408 .await
409 .map_err(AuthError::from)?;
410
411 Ok(session)
412 }
413}
414
415#[async_trait]
418impl AccountOps for AllsourceAuthAdapter {
419 type Account = Account;
420
421 async fn create_account(&self, input: CreateAccount) -> AuthResult<Account> {
422 let id = Uuid::new_v4().to_string();
423 let now = Utc::now();
424
425 let account = Account {
426 id: id.clone(),
427 account_id: input.account_id,
428 provider_id: input.provider_id,
429 user_id: input.user_id,
430 access_token: input.access_token,
431 refresh_token: input.refresh_token,
432 id_token: input.id_token,
433 access_token_expires_at: input.access_token_expires_at,
434 refresh_token_expires_at: input.refresh_token_expires_at,
435 scope: input.scope,
436 password: input.password,
437 created_at: now,
438 updated_at: now,
439 };
440
441 let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
442 self.client
443 .append_event(&account_entity(&id), "auth.account.created", payload)
444 .await
445 .map_err(AuthError::from)?;
446
447 Ok(account)
448 }
449
450 async fn get_account(
451 &self,
452 provider: &str,
453 provider_account_id: &str,
454 ) -> AuthResult<Option<Account>> {
455 let all: Vec<Account> = self
457 .client
458 .query_all("auth.account.created", 10000)
459 .await
460 .map_err(AuthError::from)?;
461
462 Ok(all
463 .into_iter()
464 .find(|a| a.provider_id == provider && a.account_id == provider_account_id))
465 }
466
467 async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<Account>> {
468 self.client
469 .find_all_by_field("auth.account.created", "user_id", user_id)
470 .await
471 .map_err(AuthError::from)
472 }
473
474 async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<Account> {
475 let mut account: Account = self
476 .client
477 .get_latest(&account_entity(id))
478 .await
479 .map_err(AuthError::from)?
480 .ok_or_else(|| AuthError::not_found("Account not found"))?;
481
482 if let Some(access_token) = update.access_token {
483 account.access_token = Some(access_token);
484 }
485 if let Some(refresh_token) = update.refresh_token {
486 account.refresh_token = Some(refresh_token);
487 }
488 if let Some(id_token) = update.id_token {
489 account.id_token = Some(id_token);
490 }
491 if let Some(expires_at) = update.access_token_expires_at {
492 account.access_token_expires_at = Some(expires_at);
493 }
494 if let Some(expires_at) = update.refresh_token_expires_at {
495 account.refresh_token_expires_at = Some(expires_at);
496 }
497 if let Some(scope) = update.scope {
498 account.scope = Some(scope);
499 }
500 if let Some(password) = update.password {
501 account.password = Some(password);
502 }
503 account.updated_at = Utc::now();
504
505 let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
506 self.client
507 .append_event(&account_entity(id), "auth.account.updated", payload)
508 .await
509 .map_err(AuthError::from)?;
510
511 Ok(account)
512 }
513
514 async fn delete_account(&self, id: &str) -> AuthResult<()> {
515 self.client
516 .append_delete(&account_entity(id), "auth.account.deleted")
517 .await
518 .map_err(AuthError::from)
519 }
520}
521
522#[async_trait]
525impl VerificationOps for AllsourceAuthAdapter {
526 type Verification = Verification;
527
528 async fn create_verification(&self, input: CreateVerification) -> AuthResult<Verification> {
529 let id = Uuid::new_v4().to_string();
530 let now = Utc::now();
531
532 let verification = Verification {
533 id: id.clone(),
534 identifier: input.identifier,
535 value: input.value,
536 expires_at: input.expires_at,
537 created_at: now,
538 updated_at: now,
539 };
540
541 let payload = serde_json::to_value(&verification).map_err(AuthError::from)?;
542 self.client
543 .append_event(
544 &verification_entity(&id),
545 "auth.verification.created",
546 payload,
547 )
548 .await
549 .map_err(AuthError::from)?;
550
551 Ok(verification)
552 }
553
554 async fn get_verification(
555 &self,
556 identifier: &str,
557 value: &str,
558 ) -> AuthResult<Option<Verification>> {
559 let all: Vec<Verification> = self
560 .client
561 .query_all("auth.verification.created", 10000)
562 .await
563 .map_err(AuthError::from)?;
564
565 let now = Utc::now();
566 Ok(all
567 .into_iter()
568 .find(|v| v.identifier == identifier && v.value == value && v.expires_at > now))
569 }
570
571 async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<Verification>> {
572 let all: Vec<Verification> = self
573 .client
574 .query_all("auth.verification.created", 10000)
575 .await
576 .map_err(AuthError::from)?;
577
578 let now = Utc::now();
579 Ok(all
580 .into_iter()
581 .find(|v| v.value == value && v.expires_at > now))
582 }
583
584 async fn get_verification_by_identifier(
585 &self,
586 identifier: &str,
587 ) -> AuthResult<Option<Verification>> {
588 let all: Vec<Verification> = self
589 .client
590 .query_all("auth.verification.created", 10000)
591 .await
592 .map_err(AuthError::from)?;
593
594 let now = Utc::now();
595 Ok(all
596 .into_iter()
597 .find(|v| v.identifier == identifier && v.expires_at > now))
598 }
599
600 async fn consume_verification(
601 &self,
602 identifier: &str,
603 value: &str,
604 ) -> AuthResult<Option<Verification>> {
605 let verification = self.get_verification(identifier, value).await?;
606 if let Some(ref v) = verification {
607 self.client
608 .append_delete(&verification_entity(&v.id), "auth.verification.consumed")
609 .await
610 .map_err(AuthError::from)?;
611 }
612 Ok(verification)
613 }
614
615 async fn delete_verification(&self, id: &str) -> AuthResult<()> {
616 self.client
617 .append_delete(&verification_entity(id), "auth.verification.deleted")
618 .await
619 .map_err(AuthError::from)
620 }
621
622 async fn delete_expired_verifications(&self) -> AuthResult<usize> {
623 let all: Vec<Verification> = self
624 .client
625 .query_all("auth.verification.created", 10000)
626 .await
627 .map_err(AuthError::from)?;
628
629 let now = Utc::now();
630 let mut deleted = 0;
631 for v in all {
632 if v.expires_at <= now {
633 self.delete_verification(&v.id).await?;
634 deleted += 1;
635 }
636 }
637 Ok(deleted)
638 }
639}
640
641#[async_trait]
644impl OrganizationOps for AllsourceAuthAdapter {
645 type Organization = Organization;
646
647 async fn create_organization(&self, input: CreateOrganization) -> AuthResult<Organization> {
648 if self
650 .client
651 .find_by_field::<Organization>("auth.org.created", "slug", &input.slug)
652 .await
653 .map_err(AuthError::from)?
654 .is_some()
655 {
656 return Err(AuthError::conflict("Organization slug already exists"));
657 }
658
659 let id = input
660 .id
661 .clone()
662 .unwrap_or_else(|| Uuid::new_v4().to_string());
663 let now = Utc::now();
664
665 let org = Organization {
666 id: id.clone(),
667 name: input.name,
668 slug: input.slug,
669 logo: input.logo,
670 metadata: input.metadata,
671 created_at: now,
672 updated_at: now,
673 };
674
675 let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
676 self.client
677 .append_event(&org_entity(&id), "auth.org.created", payload)
678 .await
679 .map_err(AuthError::from)?;
680
681 Ok(org)
682 }
683
684 async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<Organization>> {
685 self.client
686 .get_latest(&org_entity(id))
687 .await
688 .map_err(AuthError::from)
689 }
690
691 async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<Organization>> {
692 self.client
693 .find_by_field("auth.org.created", "slug", slug)
694 .await
695 .map_err(AuthError::from)
696 }
697
698 async fn update_organization(
699 &self,
700 id: &str,
701 update: UpdateOrganization,
702 ) -> AuthResult<Organization> {
703 let mut org: Organization = self
704 .get_organization_by_id(id)
705 .await?
706 .ok_or_else(|| AuthError::not_found("Organization not found"))?;
707
708 if let Some(name) = update.name {
709 org.name = name;
710 }
711 if let Some(slug) = update.slug {
712 org.slug = slug;
713 }
714 if let Some(logo) = update.logo {
715 org.logo = Some(logo);
716 }
717 if let Some(metadata) = update.metadata {
718 org.metadata = Some(metadata);
719 }
720 org.updated_at = Utc::now();
721
722 let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
723 self.client
724 .append_event(&org_entity(id), "auth.org.updated", payload)
725 .await
726 .map_err(AuthError::from)?;
727
728 Ok(org)
729 }
730
731 async fn delete_organization(&self, id: &str) -> AuthResult<()> {
732 self.client
733 .append_delete(&org_entity(id), "auth.org.deleted")
734 .await
735 .map_err(AuthError::from)?;
736
737 let members: Vec<Member> = self
739 .client
740 .find_all_by_field("auth.member.created", "organization_id", id)
741 .await
742 .map_err(AuthError::from)?;
743 for m in members {
744 self.client
745 .append_delete(&member_entity(&m.id), "auth.member.deleted")
746 .await
747 .map_err(AuthError::from)?;
748 }
749
750 let invitations: Vec<Invitation> = self
751 .client
752 .find_all_by_field("auth.invitation.created", "organization_id", id)
753 .await
754 .map_err(AuthError::from)?;
755 for i in invitations {
756 self.client
757 .append_delete(&invitation_entity(&i.id), "auth.invitation.deleted")
758 .await
759 .map_err(AuthError::from)?;
760 }
761
762 Ok(())
763 }
764
765 async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<Organization>> {
766 let members: Vec<Member> = self
767 .client
768 .find_all_by_field("auth.member.created", "user_id", user_id)
769 .await
770 .map_err(AuthError::from)?;
771
772 let mut orgs = Vec::new();
773 for m in members {
774 if let Some(org) = self.get_organization_by_id(&m.organization_id).await? {
775 orgs.push(org);
776 }
777 }
778 Ok(orgs)
779 }
780}
781
782#[async_trait]
785impl MemberOps for AllsourceAuthAdapter {
786 type Member = Member;
787
788 async fn create_member(&self, input: CreateMember) -> AuthResult<Member> {
789 let existing: Vec<Member> = self
791 .client
792 .find_all_by_field("auth.member.created", "organization_id", &input.organization_id)
793 .await
794 .map_err(AuthError::from)?;
795
796 if existing.iter().any(|m| m.user_id == input.user_id) {
797 return Err(AuthError::conflict(
798 "User is already a member of this organization",
799 ));
800 }
801
802 let id = Uuid::new_v4().to_string();
803 let now = Utc::now();
804
805 let member = Member {
806 id: id.clone(),
807 organization_id: input.organization_id,
808 user_id: input.user_id,
809 role: input.role,
810 created_at: now,
811 };
812
813 let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
814 self.client
815 .append_event(&member_entity(&id), "auth.member.created", payload)
816 .await
817 .map_err(AuthError::from)?;
818
819 Ok(member)
820 }
821
822 async fn get_member(
823 &self,
824 organization_id: &str,
825 user_id: &str,
826 ) -> AuthResult<Option<Member>> {
827 let members: Vec<Member> = self
828 .client
829 .find_all_by_field("auth.member.created", "organization_id", organization_id)
830 .await
831 .map_err(AuthError::from)?;
832
833 Ok(members.into_iter().find(|m| m.user_id == user_id))
834 }
835
836 async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<Member>> {
837 self.client
838 .get_latest(&member_entity(id))
839 .await
840 .map_err(AuthError::from)
841 }
842
843 async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<Member> {
844 let mut member: Member = self
845 .get_member_by_id(member_id)
846 .await?
847 .ok_or_else(|| AuthError::not_found("Member not found"))?;
848
849 member.role = role.to_string();
850
851 let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
852 self.client
853 .append_event(&member_entity(member_id), "auth.member.updated", payload)
854 .await
855 .map_err(AuthError::from)?;
856
857 Ok(member)
858 }
859
860 async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
861 self.client
862 .append_delete(&member_entity(member_id), "auth.member.deleted")
863 .await
864 .map_err(AuthError::from)
865 }
866
867 async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<Member>> {
868 self.client
869 .find_all_by_field("auth.member.created", "organization_id", organization_id)
870 .await
871 .map_err(AuthError::from)
872 }
873
874 async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
875 Ok(self.list_organization_members(organization_id).await?.len())
876 }
877
878 async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
879 let members = self.list_organization_members(organization_id).await?;
880 Ok(members.iter().filter(|m| m.role == "owner").count())
881 }
882}
883
884#[async_trait]
887impl InvitationOps for AllsourceAuthAdapter {
888 type Invitation = Invitation;
889
890 async fn create_invitation(&self, input: CreateInvitation) -> AuthResult<Invitation> {
891 let id = Uuid::new_v4().to_string();
892 let now = Utc::now();
893
894 let invitation = Invitation {
895 id: id.clone(),
896 organization_id: input.organization_id,
897 email: input.email,
898 role: input.role,
899 status: InvitationStatus::Pending,
900 inviter_id: input.inviter_id,
901 expires_at: input.expires_at,
902 created_at: now,
903 };
904
905 let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
906 self.client
907 .append_event(&invitation_entity(&id), "auth.invitation.created", payload)
908 .await
909 .map_err(AuthError::from)?;
910
911 Ok(invitation)
912 }
913
914 async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<Invitation>> {
915 self.client
916 .get_latest(&invitation_entity(id))
917 .await
918 .map_err(AuthError::from)
919 }
920
921 async fn get_pending_invitation(
922 &self,
923 organization_id: &str,
924 email: &str,
925 ) -> AuthResult<Option<Invitation>> {
926 let invitations: Vec<Invitation> = self
927 .client
928 .find_all_by_field("auth.invitation.created", "organization_id", organization_id)
929 .await
930 .map_err(AuthError::from)?;
931
932 Ok(invitations.into_iter().find(|i| {
933 i.email.to_lowercase() == email.to_lowercase()
934 && i.status == InvitationStatus::Pending
935 }))
936 }
937
938 async fn update_invitation_status(
939 &self,
940 id: &str,
941 status: InvitationStatus,
942 ) -> AuthResult<Invitation> {
943 let mut invitation = self
944 .get_invitation_by_id(id)
945 .await?
946 .ok_or_else(|| AuthError::not_found("Invitation not found"))?;
947
948 invitation.status = status;
949
950 let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
951 self.client
952 .append_event(&invitation_entity(id), "auth.invitation.updated", payload)
953 .await
954 .map_err(AuthError::from)?;
955
956 Ok(invitation)
957 }
958
959 async fn list_organization_invitations(
960 &self,
961 organization_id: &str,
962 ) -> AuthResult<Vec<Invitation>> {
963 self.client
964 .find_all_by_field("auth.invitation.created", "organization_id", organization_id)
965 .await
966 .map_err(AuthError::from)
967 }
968
969 async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<Invitation>> {
970 let all: Vec<Invitation> = self
971 .client
972 .find_all_by_field("auth.invitation.created", "email", email)
973 .await
974 .map_err(AuthError::from)?;
975
976 let now = Utc::now();
977 Ok(all
978 .into_iter()
979 .filter(|i| i.status == InvitationStatus::Pending && i.expires_at > now)
980 .collect())
981 }
982}
983
984#[async_trait]
987impl TwoFactorOps for AllsourceAuthAdapter {
988 type TwoFactor = TwoFactor;
989
990 async fn create_two_factor(&self, input: CreateTwoFactor) -> AuthResult<TwoFactor> {
991 if self
993 .get_two_factor_by_user_id(&input.user_id)
994 .await?
995 .is_some()
996 {
997 return Err(AuthError::conflict(
998 "Two-factor authentication already enabled for this user",
999 ));
1000 }
1001
1002 let id = Uuid::new_v4().to_string();
1003 let now = Utc::now();
1004
1005 let two_factor = TwoFactor {
1006 id: id.clone(),
1007 secret: input.secret,
1008 backup_codes: input.backup_codes,
1009 user_id: input.user_id,
1010 created_at: now,
1011 updated_at: now,
1012 };
1013
1014 let payload = serde_json::to_value(&two_factor).map_err(AuthError::from)?;
1015 self.client
1016 .append_event(&two_factor_entity(&id), "auth.two_factor.created", payload)
1017 .await
1018 .map_err(AuthError::from)?;
1019
1020 Ok(two_factor)
1021 }
1022
1023 async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TwoFactor>> {
1024 self.client
1025 .find_by_field("auth.two_factor.created", "user_id", user_id)
1026 .await
1027 .map_err(AuthError::from)
1028 }
1029
1030 async fn update_two_factor_backup_codes(
1031 &self,
1032 user_id: &str,
1033 backup_codes: &str,
1034 ) -> AuthResult<TwoFactor> {
1035 let mut tf = self
1036 .get_two_factor_by_user_id(user_id)
1037 .await?
1038 .ok_or_else(|| AuthError::not_found("Two-factor record not found"))?;
1039
1040 tf.backup_codes = Some(backup_codes.to_string());
1041 tf.updated_at = Utc::now();
1042
1043 let payload = serde_json::to_value(&tf).map_err(AuthError::from)?;
1044 self.client
1045 .append_event(&two_factor_entity(&tf.id), "auth.two_factor.updated", payload)
1046 .await
1047 .map_err(AuthError::from)?;
1048
1049 Ok(tf)
1050 }
1051
1052 async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
1053 if let Some(tf) = self.get_two_factor_by_user_id(user_id).await? {
1054 self.client
1055 .append_delete(&two_factor_entity(&tf.id), "auth.two_factor.deleted")
1056 .await
1057 .map_err(AuthError::from)?;
1058 }
1059 Ok(())
1060 }
1061}
1062
1063#[async_trait]
1066impl ApiKeyOps for AllsourceAuthAdapter {
1067 type ApiKey = ApiKey;
1068
1069 async fn create_api_key(&self, input: CreateApiKey) -> AuthResult<ApiKey> {
1070 if self
1072 .get_api_key_by_hash(&input.key_hash)
1073 .await?
1074 .is_some()
1075 {
1076 return Err(AuthError::conflict("API key already exists"));
1077 }
1078
1079 let id = Uuid::new_v4().to_string();
1080 let now = Utc::now().to_rfc3339();
1081
1082 let api_key = ApiKey {
1083 id: id.clone(),
1084 name: input.name,
1085 start: input.start,
1086 prefix: input.prefix,
1087 key_hash: input.key_hash,
1088 user_id: input.user_id,
1089 refill_interval: input.refill_interval,
1090 refill_amount: input.refill_amount,
1091 last_refill_at: None,
1092 enabled: input.enabled,
1093 rate_limit_enabled: input.rate_limit_enabled,
1094 rate_limit_time_window: input.rate_limit_time_window,
1095 rate_limit_max: input.rate_limit_max,
1096 request_count: Some(0),
1097 remaining: input.remaining,
1098 last_request: None,
1099 expires_at: input.expires_at,
1100 created_at: now.clone(),
1101 updated_at: now,
1102 permissions: input.permissions,
1103 metadata: input.metadata,
1104 };
1105
1106 let payload = serde_json::to_value(&api_key).map_err(AuthError::from)?;
1107 self.client
1108 .append_event(&api_key_entity(&id), "auth.api_key.created", payload)
1109 .await
1110 .map_err(AuthError::from)?;
1111
1112 Ok(api_key)
1113 }
1114
1115 async fn get_api_key_by_id(&self, id: &str) -> AuthResult<Option<ApiKey>> {
1116 self.client
1117 .get_latest(&api_key_entity(id))
1118 .await
1119 .map_err(AuthError::from)
1120 }
1121
1122 async fn get_api_key_by_hash(&self, hash: &str) -> AuthResult<Option<ApiKey>> {
1123 self.client
1124 .find_by_field("auth.api_key.created", "key", hash)
1125 .await
1126 .map_err(AuthError::from)
1127 }
1128
1129 async fn list_api_keys_by_user(&self, user_id: &str) -> AuthResult<Vec<ApiKey>> {
1130 let mut keys: Vec<ApiKey> = self
1131 .client
1132 .find_all_by_field("auth.api_key.created", "user_id", user_id)
1133 .await
1134 .map_err(AuthError::from)?;
1135
1136 keys.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1137 Ok(keys)
1138 }
1139
1140 async fn update_api_key(&self, id: &str, update: UpdateApiKey) -> AuthResult<ApiKey> {
1141 let mut key = self
1142 .get_api_key_by_id(id)
1143 .await?
1144 .ok_or_else(|| AuthError::not_found("API key not found"))?;
1145
1146 if let Some(name) = update.name {
1147 key.name = Some(name);
1148 }
1149 if let Some(enabled) = update.enabled {
1150 key.enabled = enabled;
1151 }
1152 if let Some(remaining) = update.remaining {
1153 key.remaining = Some(remaining);
1154 }
1155 if let Some(rate_limit_enabled) = update.rate_limit_enabled {
1156 key.rate_limit_enabled = rate_limit_enabled;
1157 }
1158 if let Some(window) = update.rate_limit_time_window {
1159 key.rate_limit_time_window = Some(window);
1160 }
1161 if let Some(max) = update.rate_limit_max {
1162 key.rate_limit_max = Some(max);
1163 }
1164 if let Some(interval) = update.refill_interval {
1165 key.refill_interval = Some(interval);
1166 }
1167 if let Some(amount) = update.refill_amount {
1168 key.refill_amount = Some(amount);
1169 }
1170 if let Some(permissions) = update.permissions {
1171 key.permissions = Some(permissions);
1172 }
1173 if let Some(metadata) = update.metadata {
1174 key.metadata = Some(metadata);
1175 }
1176 if let Some(expires_at) = update.expires_at {
1177 key.expires_at = expires_at;
1178 }
1179 if let Some(last_request) = update.last_request {
1180 key.last_request = last_request;
1181 }
1182 if let Some(request_count) = update.request_count {
1183 key.request_count = Some(request_count);
1184 }
1185 if let Some(last_refill_at) = update.last_refill_at {
1186 key.last_refill_at = last_refill_at;
1187 }
1188 key.updated_at = Utc::now().to_rfc3339();
1189
1190 let payload = serde_json::to_value(&key).map_err(AuthError::from)?;
1191 self.client
1192 .append_event(&api_key_entity(id), "auth.api_key.updated", payload)
1193 .await
1194 .map_err(AuthError::from)?;
1195
1196 Ok(key)
1197 }
1198
1199 async fn delete_api_key(&self, id: &str) -> AuthResult<()> {
1200 self.client
1201 .append_delete(&api_key_entity(id), "auth.api_key.deleted")
1202 .await
1203 .map_err(AuthError::from)
1204 }
1205
1206 async fn delete_expired_api_keys(&self) -> AuthResult<usize> {
1207 let all: Vec<ApiKey> = self
1208 .client
1209 .query_all("auth.api_key.created", 10000)
1210 .await
1211 .map_err(AuthError::from)?;
1212
1213 let now = Utc::now();
1214 let mut deleted = 0;
1215 for key in all {
1216 if let Some(expires_at) = &key.expires_at {
1217 if let Ok(exp) = chrono::DateTime::parse_from_rfc3339(expires_at) {
1218 if exp <= now {
1219 self.delete_api_key(&key.id).await?;
1220 deleted += 1;
1221 }
1222 }
1223 }
1224 }
1225 Ok(deleted)
1226 }
1227}
1228
1229#[async_trait]
1232impl PasskeyOps for AllsourceAuthAdapter {
1233 type Passkey = Passkey;
1234
1235 async fn create_passkey(&self, input: CreatePasskey) -> AuthResult<Passkey> {
1236 if self
1238 .get_passkey_by_credential_id(&input.credential_id)
1239 .await?
1240 .is_some()
1241 {
1242 return Err(AuthError::conflict(
1243 "A passkey with this credential ID already exists",
1244 ));
1245 }
1246
1247 let id = Uuid::new_v4().to_string();
1248 let now = Utc::now();
1249
1250 let passkey = Passkey {
1251 id: id.clone(),
1252 name: input.name,
1253 public_key: input.public_key,
1254 user_id: input.user_id,
1255 credential_id: input.credential_id,
1256 counter: input.counter,
1257 device_type: input.device_type,
1258 backed_up: input.backed_up,
1259 transports: input.transports,
1260 created_at: now,
1261 };
1262
1263 let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1264 self.client
1265 .append_event(&passkey_entity(&id), "auth.passkey.created", payload)
1266 .await
1267 .map_err(AuthError::from)?;
1268
1269 Ok(passkey)
1270 }
1271
1272 async fn get_passkey_by_id(&self, id: &str) -> AuthResult<Option<Passkey>> {
1273 self.client
1274 .get_latest(&passkey_entity(id))
1275 .await
1276 .map_err(AuthError::from)
1277 }
1278
1279 async fn get_passkey_by_credential_id(
1280 &self,
1281 credential_id: &str,
1282 ) -> AuthResult<Option<Passkey>> {
1283 self.client
1284 .find_by_field("auth.passkey.created", "credentialID", credential_id)
1285 .await
1286 .map_err(AuthError::from)
1287 }
1288
1289 async fn list_passkeys_by_user(&self, user_id: &str) -> AuthResult<Vec<Passkey>> {
1290 let mut passkeys: Vec<Passkey> = self
1291 .client
1292 .find_all_by_field("auth.passkey.created", "user_id", user_id)
1293 .await
1294 .map_err(AuthError::from)?;
1295
1296 passkeys.sort_by_key(|p| std::cmp::Reverse(p.created_at));
1297 Ok(passkeys)
1298 }
1299
1300 async fn update_passkey_counter(&self, id: &str, counter: u64) -> AuthResult<Passkey> {
1301 let mut passkey = self
1302 .get_passkey_by_id(id)
1303 .await?
1304 .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1305
1306 passkey.counter = counter;
1307
1308 let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1309 self.client
1310 .append_event(&passkey_entity(id), "auth.passkey.updated", payload)
1311 .await
1312 .map_err(AuthError::from)?;
1313
1314 Ok(passkey)
1315 }
1316
1317 async fn update_passkey_name(&self, id: &str, name: &str) -> AuthResult<Passkey> {
1318 let mut passkey = self
1319 .get_passkey_by_id(id)
1320 .await?
1321 .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1322
1323 passkey.name = name.to_string();
1324
1325 let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1326 self.client
1327 .append_event(&passkey_entity(id), "auth.passkey.updated", payload)
1328 .await
1329 .map_err(AuthError::from)?;
1330
1331 Ok(passkey)
1332 }
1333
1334 async fn delete_passkey(&self, id: &str) -> AuthResult<()> {
1335 self.client
1336 .append_delete(&passkey_entity(id), "auth.passkey.deleted")
1337 .await
1338 .map_err(AuthError::from)
1339 }
1340}