Skip to main content

better_auth_allsource/
adapter.rs

1use async_trait::async_trait;
2use better_auth_core::adapters::traits::{
3    AccountOps, ApiKeyOps, InvitationOps, MemberOps, OrganizationOps, PasskeyOps, SessionOps,
4    TwoFactorOps, UserOps, VerificationOps,
5};
6use better_auth_core::error::{AuthError, AuthResult};
7use better_auth_core::types::{
8    Account, ApiKey, CreateAccount, CreateApiKey, CreateInvitation, CreateMember,
9    CreateOrganization, CreatePasskey, CreateSession, CreateTwoFactor, CreateUser,
10    CreateVerification, Invitation, InvitationStatus, ListUsersParams, Member, Organization,
11    Passkey, Session, TwoFactor, UpdateAccount, UpdateApiKey, UpdateOrganization, UpdateUser, User,
12    Verification,
13};
14use chrono::{DateTime, Utc};
15use uuid::Uuid;
16
17use crate::client::AllsourceClient;
18
19/// Allsource-backed `DatabaseAdapter` for better-auth-rs.
20///
21/// Each auth entity is stored as an Allsource event stream:
22/// - Entity ID pattern: `auth-{type}:{id}` (e.g., `auth-user:abc123`)
23/// - Each create/update appends a full-state snapshot event
24/// - Deletes append a `_deleted: true` marker
25/// - Reads fetch the latest event and deserialize the payload
26///
27/// This gives you event-sourced auth with full history and time-travel.
28pub struct AllsourceAuthAdapter {
29    client: AllsourceClient,
30}
31
32impl AllsourceAuthAdapter {
33    /// Create a new adapter.
34    ///
35    /// - `core_url`: Allsource Core URL (e.g., `http://localhost:3900`)
36    /// - `query_url`: Allsource Query Service URL (e.g., `http://localhost:3902`)
37    /// - `api_key`: API key for authentication (e.g., `ask_...`)
38    pub fn new(core_url: &str, query_url: &str, api_key: &str) -> Self {
39        Self {
40            client: AllsourceClient::new(core_url, query_url, api_key),
41        }
42    }
43}
44
45// Entity ID helpers
46fn user_entity(id: &str) -> String {
47    format!("auth-user:{id}")
48}
49fn session_entity(token: &str) -> String {
50    format!("auth-session:{token}")
51}
52fn account_entity(id: &str) -> String {
53    format!("auth-account:{id}")
54}
55fn verification_entity(id: &str) -> String {
56    format!("auth-verification:{id}")
57}
58fn org_entity(id: &str) -> String {
59    format!("auth-org:{id}")
60}
61fn member_entity(id: &str) -> String {
62    format!("auth-member:{id}")
63}
64fn invitation_entity(id: &str) -> String {
65    format!("auth-invitation:{id}")
66}
67fn two_factor_entity(id: &str) -> String {
68    format!("auth-2fa:{id}")
69}
70fn api_key_entity(id: &str) -> String {
71    format!("auth-apikey:{id}")
72}
73fn passkey_entity(id: &str) -> String {
74    format!("auth-passkey:{id}")
75}
76
77// ── UserOps ──
78
79#[async_trait]
80impl UserOps for AllsourceAuthAdapter {
81    type User = User;
82
83    async fn create_user(&self, input: CreateUser) -> AuthResult<User> {
84        let id = input.id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
85
86        // Check email uniqueness
87        if let Some(email) = &input.email {
88            if let Some(_existing) = self
89                .client
90                .find_by_field::<User>("UserCreated", "email", email)
91                .await
92                .map_err(AuthError::from)?
93            {
94                return Err(AuthError::config("Email already exists"));
95            }
96        }
97
98        // Check username uniqueness
99        if let Some(username) = &input.username {
100            if let Some(_existing) = self
101                .client
102                .find_by_field::<User>("UserCreated", "username", username)
103                .await
104                .map_err(AuthError::from)?
105            {
106                return Err(AuthError::conflict(
107                    "A user with this username already exists",
108                ));
109            }
110        }
111
112        let now = Utc::now();
113        let user = User {
114            id: id.clone(),
115            email: input.email,
116            name: input.name,
117            image: input.image,
118            email_verified: input.email_verified.unwrap_or(false),
119            created_at: now,
120            updated_at: now,
121            username: input.username,
122            display_username: input.display_username,
123            two_factor_enabled: false,
124            role: input.role,
125            banned: false,
126            ban_reason: None,
127            ban_expires: None,
128            metadata: input.metadata.unwrap_or(serde_json::Value::Null),
129        };
130
131        let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
132        self.client
133            .append_event(&user_entity(&id), "UserCreated", payload)
134            .await
135            .map_err(AuthError::from)?;
136
137        // Store password in the account, not the user (matches better-auth pattern)
138        // Password is handled via AccountOps
139
140        Ok(user)
141    }
142
143    async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<User>> {
144        self.client
145            .get_latest(&user_entity(id))
146            .await
147            .map_err(AuthError::from)
148    }
149
150    async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<User>> {
151        self.client
152            .find_by_field("UserCreated", "email", email)
153            .await
154            .map_err(AuthError::from)
155    }
156
157    async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<User>> {
158        self.client
159            .find_by_field("UserCreated", "username", username)
160            .await
161            .map_err(AuthError::from)
162    }
163
164    async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<User> {
165        let mut user = self
166            .get_user_by_id(id)
167            .await?
168            .ok_or(AuthError::UserNotFound)?;
169
170        if let Some(email) = update.email {
171            user.email = Some(email);
172        }
173        if let Some(name) = update.name {
174            user.name = Some(name);
175        }
176        if let Some(image) = update.image {
177            user.image = Some(image);
178        }
179        if let Some(verified) = update.email_verified {
180            user.email_verified = verified;
181        }
182        if let Some(username) = update.username {
183            user.username = Some(username);
184        }
185        if let Some(display_username) = update.display_username {
186            user.display_username = Some(display_username);
187        }
188        if let Some(role) = update.role {
189            user.role = Some(role);
190        }
191        if let Some(banned) = update.banned {
192            user.banned = banned;
193        }
194        if let Some(ban_reason) = update.ban_reason {
195            user.ban_reason = Some(ban_reason);
196        }
197        if let Some(ban_expires) = update.ban_expires {
198            user.ban_expires = Some(ban_expires);
199        }
200        if let Some(two_factor_enabled) = update.two_factor_enabled {
201            user.two_factor_enabled = two_factor_enabled;
202        }
203        if let Some(metadata) = update.metadata {
204            user.metadata = metadata;
205        }
206        user.updated_at = Utc::now();
207
208        let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
209        self.client
210            .append_event(&user_entity(id), "UserUpdated", payload)
211            .await
212            .map_err(AuthError::from)?;
213
214        Ok(user)
215    }
216
217    async fn delete_user(&self, id: &str) -> AuthResult<()> {
218        self.client
219            .append_delete(&user_entity(id), "UserDeleted")
220            .await
221            .map_err(AuthError::from)
222    }
223
224    async fn list_users(&self, params: ListUsersParams) -> AuthResult<(Vec<User>, usize)> {
225        let mut users: Vec<User> = self
226            .client
227            .query_all("UserCreated", 10000)
228            .await
229            .map_err(AuthError::from)?;
230
231        // Apply search filter
232        if let Some(search_value) = &params.search_value {
233            let field = params.search_field.as_deref().unwrap_or("email");
234            let op = params.search_operator.as_deref().unwrap_or("contains");
235            let sv = search_value.to_lowercase();
236            users.retain(|u| {
237                let field_val = match field {
238                    "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
239                    _ => u.email.as_deref().unwrap_or("").to_lowercase(),
240                };
241                match op {
242                    "starts_with" => field_val.starts_with(&sv),
243                    "ends_with" => field_val.ends_with(&sv),
244                    _ => field_val.contains(&sv),
245                }
246            });
247        }
248
249        // Apply filter
250        if let Some(filter_value) = &params.filter_value {
251            let field = params.filter_field.as_deref().unwrap_or("email");
252            let op = params.filter_operator.as_deref().unwrap_or("eq");
253            let fv = filter_value.to_lowercase();
254            users.retain(|u| {
255                let field_val = match field {
256                    "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
257                    "role" => u.role.as_deref().unwrap_or("").to_lowercase(),
258                    _ => u.email.as_deref().unwrap_or("").to_lowercase(),
259                };
260                match op {
261                    "contains" => field_val.contains(&fv),
262                    "starts_with" => field_val.starts_with(&fv),
263                    "ends_with" => field_val.ends_with(&fv),
264                    "ne" => field_val != fv,
265                    _ => field_val == fv,
266                }
267            });
268        }
269
270        let total = users.len();
271        let offset = params.offset.unwrap_or(0);
272        let limit = params.limit.unwrap_or(100);
273        let paged: Vec<User> = users.into_iter().skip(offset).take(limit).collect();
274
275        Ok((paged, total))
276    }
277}
278
279// ── SessionOps ──
280
281#[async_trait]
282impl SessionOps for AllsourceAuthAdapter {
283    type Session = Session;
284
285    async fn create_session(&self, input: CreateSession) -> AuthResult<Session> {
286        let id = Uuid::new_v4().to_string();
287        let token = format!("session_{}", Uuid::new_v4());
288        let now = Utc::now();
289
290        let session = Session {
291            id,
292            token: token.clone(),
293            user_id: input.user_id,
294            expires_at: input.expires_at,
295            created_at: now,
296            updated_at: now,
297            ip_address: input.ip_address,
298            user_agent: input.user_agent,
299            impersonated_by: input.impersonated_by,
300            active_organization_id: input.active_organization_id,
301            active: true,
302        };
303
304        let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
305        self.client
306            .append_event(&session_entity(&token), "SessionCreated", payload)
307            .await
308            .map_err(AuthError::from)?;
309
310        Ok(session)
311    }
312
313    async fn get_session(&self, token: &str) -> AuthResult<Option<Session>> {
314        self.client
315            .get_latest(&session_entity(token))
316            .await
317            .map_err(AuthError::from)
318    }
319
320    async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<Session>> {
321        self.client
322            .find_all_by_field("SessionCreated", "user_id", user_id)
323            .await
324            .map_err(AuthError::from)
325    }
326
327    async fn update_session_expiry(
328        &self,
329        token: &str,
330        expires_at: DateTime<Utc>,
331    ) -> AuthResult<()> {
332        if let Some(mut session) = self.get_session(token).await? {
333            session.expires_at = expires_at;
334            session.updated_at = Utc::now();
335            let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
336            self.client
337                .append_event(&session_entity(token), "SessionExpiryUpdated", payload)
338                .await
339                .map_err(AuthError::from)?;
340        }
341        Ok(())
342    }
343
344    async fn delete_session(&self, token: &str) -> AuthResult<()> {
345        self.client
346            .append_delete(&session_entity(token), "SessionDeleted")
347            .await
348            .map_err(AuthError::from)
349    }
350
351    async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
352        let sessions = self.get_user_sessions(user_id).await?;
353        for session in sessions {
354            self.delete_session(&session.token).await?;
355        }
356        Ok(())
357    }
358
359    async fn delete_expired_sessions(&self) -> AuthResult<usize> {
360        let all: Vec<Session> = self
361            .client
362            .query_all("SessionCreated", 10000)
363            .await
364            .map_err(AuthError::from)?;
365
366        let now = Utc::now();
367        let mut deleted = 0;
368        for session in all {
369            if session.expires_at <= now || !session.active {
370                self.delete_session(&session.token).await?;
371                deleted += 1;
372            }
373        }
374        Ok(deleted)
375    }
376
377    async fn update_session_active_organization(
378        &self,
379        token: &str,
380        organization_id: Option<&str>,
381    ) -> AuthResult<Session> {
382        let mut session = self
383            .get_session(token)
384            .await?
385            .ok_or(AuthError::SessionNotFound)?;
386
387        session.active_organization_id = organization_id.map(|s| s.to_string());
388        session.updated_at = Utc::now();
389
390        let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
391        self.client
392            .append_event(&session_entity(token), "SessionUpdated", payload)
393            .await
394            .map_err(AuthError::from)?;
395
396        Ok(session)
397    }
398}
399
400// ── AccountOps ──
401
402#[async_trait]
403impl AccountOps for AllsourceAuthAdapter {
404    type Account = Account;
405
406    async fn create_account(&self, input: CreateAccount) -> AuthResult<Account> {
407        let id = Uuid::new_v4().to_string();
408        let now = Utc::now();
409
410        let account = Account {
411            id: id.clone(),
412            account_id: input.account_id,
413            provider_id: input.provider_id,
414            user_id: input.user_id,
415            access_token: input.access_token,
416            refresh_token: input.refresh_token,
417            id_token: input.id_token,
418            access_token_expires_at: input.access_token_expires_at,
419            refresh_token_expires_at: input.refresh_token_expires_at,
420            scope: input.scope,
421            password: input.password,
422            created_at: now,
423            updated_at: now,
424        };
425
426        let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
427        self.client
428            .append_event(&account_entity(&id), "AccountCreated", payload)
429            .await
430            .map_err(AuthError::from)?;
431
432        Ok(account)
433    }
434
435    async fn get_account(
436        &self,
437        provider: &str,
438        provider_account_id: &str,
439    ) -> AuthResult<Option<Account>> {
440        // Need to scan accounts matching provider + account_id
441        let all: Vec<Account> = self
442            .client
443            .query_all("AccountCreated", 10000)
444            .await
445            .map_err(AuthError::from)?;
446
447        Ok(all
448            .into_iter()
449            .find(|a| a.provider_id == provider && a.account_id == provider_account_id))
450    }
451
452    async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<Account>> {
453        self.client
454            .find_all_by_field("AccountCreated", "user_id", user_id)
455            .await
456            .map_err(AuthError::from)
457    }
458
459    async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<Account> {
460        let mut account: Account = self
461            .client
462            .get_latest(&account_entity(id))
463            .await
464            .map_err(AuthError::from)?
465            .ok_or_else(|| AuthError::not_found("Account not found"))?;
466
467        if let Some(access_token) = update.access_token {
468            account.access_token = Some(access_token);
469        }
470        if let Some(refresh_token) = update.refresh_token {
471            account.refresh_token = Some(refresh_token);
472        }
473        if let Some(id_token) = update.id_token {
474            account.id_token = Some(id_token);
475        }
476        if let Some(expires_at) = update.access_token_expires_at {
477            account.access_token_expires_at = Some(expires_at);
478        }
479        if let Some(expires_at) = update.refresh_token_expires_at {
480            account.refresh_token_expires_at = Some(expires_at);
481        }
482        if let Some(scope) = update.scope {
483            account.scope = Some(scope);
484        }
485        if let Some(password) = update.password {
486            account.password = Some(password);
487        }
488        account.updated_at = Utc::now();
489
490        let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
491        self.client
492            .append_event(&account_entity(id), "AccountUpdated", payload)
493            .await
494            .map_err(AuthError::from)?;
495
496        Ok(account)
497    }
498
499    async fn delete_account(&self, id: &str) -> AuthResult<()> {
500        self.client
501            .append_delete(&account_entity(id), "AccountDeleted")
502            .await
503            .map_err(AuthError::from)
504    }
505}
506
507// ── VerificationOps ──
508
509#[async_trait]
510impl VerificationOps for AllsourceAuthAdapter {
511    type Verification = Verification;
512
513    async fn create_verification(&self, input: CreateVerification) -> AuthResult<Verification> {
514        let id = Uuid::new_v4().to_string();
515        let now = Utc::now();
516
517        let verification = Verification {
518            id: id.clone(),
519            identifier: input.identifier,
520            value: input.value,
521            expires_at: input.expires_at,
522            created_at: now,
523            updated_at: now,
524        };
525
526        let payload = serde_json::to_value(&verification).map_err(AuthError::from)?;
527        self.client
528            .append_event(
529                &verification_entity(&id),
530                "VerificationCreated",
531                payload,
532            )
533            .await
534            .map_err(AuthError::from)?;
535
536        Ok(verification)
537    }
538
539    async fn get_verification(
540        &self,
541        identifier: &str,
542        value: &str,
543    ) -> AuthResult<Option<Verification>> {
544        let all: Vec<Verification> = self
545            .client
546            .query_all("VerificationCreated", 10000)
547            .await
548            .map_err(AuthError::from)?;
549
550        let now = Utc::now();
551        Ok(all
552            .into_iter()
553            .find(|v| v.identifier == identifier && v.value == value && v.expires_at > now))
554    }
555
556    async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<Verification>> {
557        let all: Vec<Verification> = self
558            .client
559            .query_all("VerificationCreated", 10000)
560            .await
561            .map_err(AuthError::from)?;
562
563        let now = Utc::now();
564        Ok(all
565            .into_iter()
566            .find(|v| v.value == value && v.expires_at > now))
567    }
568
569    async fn get_verification_by_identifier(
570        &self,
571        identifier: &str,
572    ) -> AuthResult<Option<Verification>> {
573        let all: Vec<Verification> = self
574            .client
575            .query_all("VerificationCreated", 10000)
576            .await
577            .map_err(AuthError::from)?;
578
579        let now = Utc::now();
580        Ok(all
581            .into_iter()
582            .find(|v| v.identifier == identifier && v.expires_at > now))
583    }
584
585    async fn consume_verification(
586        &self,
587        identifier: &str,
588        value: &str,
589    ) -> AuthResult<Option<Verification>> {
590        let verification = self.get_verification(identifier, value).await?;
591        if let Some(ref v) = verification {
592            self.client
593                .append_delete(&verification_entity(&v.id), "VerificationConsumed")
594                .await
595                .map_err(AuthError::from)?;
596        }
597        Ok(verification)
598    }
599
600    async fn delete_verification(&self, id: &str) -> AuthResult<()> {
601        self.client
602            .append_delete(&verification_entity(id), "VerificationDeleted")
603            .await
604            .map_err(AuthError::from)
605    }
606
607    async fn delete_expired_verifications(&self) -> AuthResult<usize> {
608        let all: Vec<Verification> = self
609            .client
610            .query_all("VerificationCreated", 10000)
611            .await
612            .map_err(AuthError::from)?;
613
614        let now = Utc::now();
615        let mut deleted = 0;
616        for v in all {
617            if v.expires_at <= now {
618                self.delete_verification(&v.id).await?;
619                deleted += 1;
620            }
621        }
622        Ok(deleted)
623    }
624}
625
626// ── OrganizationOps ──
627
628#[async_trait]
629impl OrganizationOps for AllsourceAuthAdapter {
630    type Organization = Organization;
631
632    async fn create_organization(&self, input: CreateOrganization) -> AuthResult<Organization> {
633        // Check slug uniqueness
634        if self
635            .client
636            .find_by_field::<Organization>("OrgCreated", "slug", &input.slug)
637            .await
638            .map_err(AuthError::from)?
639            .is_some()
640        {
641            return Err(AuthError::conflict("Organization slug already exists"));
642        }
643
644        let id = input
645            .id
646            .clone()
647            .unwrap_or_else(|| Uuid::new_v4().to_string());
648        let now = Utc::now();
649
650        let org = Organization {
651            id: id.clone(),
652            name: input.name,
653            slug: input.slug,
654            logo: input.logo,
655            metadata: input.metadata,
656            created_at: now,
657            updated_at: now,
658        };
659
660        let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
661        self.client
662            .append_event(&org_entity(&id), "OrgCreated", payload)
663            .await
664            .map_err(AuthError::from)?;
665
666        Ok(org)
667    }
668
669    async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<Organization>> {
670        self.client
671            .get_latest(&org_entity(id))
672            .await
673            .map_err(AuthError::from)
674    }
675
676    async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<Organization>> {
677        self.client
678            .find_by_field("OrgCreated", "slug", slug)
679            .await
680            .map_err(AuthError::from)
681    }
682
683    async fn update_organization(
684        &self,
685        id: &str,
686        update: UpdateOrganization,
687    ) -> AuthResult<Organization> {
688        let mut org: Organization = self
689            .get_organization_by_id(id)
690            .await?
691            .ok_or_else(|| AuthError::not_found("Organization not found"))?;
692
693        if let Some(name) = update.name {
694            org.name = name;
695        }
696        if let Some(slug) = update.slug {
697            org.slug = slug;
698        }
699        if let Some(logo) = update.logo {
700            org.logo = Some(logo);
701        }
702        if let Some(metadata) = update.metadata {
703            org.metadata = Some(metadata);
704        }
705        org.updated_at = Utc::now();
706
707        let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
708        self.client
709            .append_event(&org_entity(id), "OrgUpdated", payload)
710            .await
711            .map_err(AuthError::from)?;
712
713        Ok(org)
714    }
715
716    async fn delete_organization(&self, id: &str) -> AuthResult<()> {
717        self.client
718            .append_delete(&org_entity(id), "OrgDeleted")
719            .await
720            .map_err(AuthError::from)?;
721
722        // Also delete members and invitations for this org
723        let members: Vec<Member> = self
724            .client
725            .find_all_by_field("MemberCreated", "organization_id", id)
726            .await
727            .map_err(AuthError::from)?;
728        for m in members {
729            self.client
730                .append_delete(&member_entity(&m.id), "MemberDeleted")
731                .await
732                .map_err(AuthError::from)?;
733        }
734
735        let invitations: Vec<Invitation> = self
736            .client
737            .find_all_by_field("InvitationCreated", "organization_id", id)
738            .await
739            .map_err(AuthError::from)?;
740        for i in invitations {
741            self.client
742                .append_delete(&invitation_entity(&i.id), "InvitationDeleted")
743                .await
744                .map_err(AuthError::from)?;
745        }
746
747        Ok(())
748    }
749
750    async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<Organization>> {
751        let members: Vec<Member> = self
752            .client
753            .find_all_by_field("MemberCreated", "user_id", user_id)
754            .await
755            .map_err(AuthError::from)?;
756
757        let mut orgs = Vec::new();
758        for m in members {
759            if let Some(org) = self.get_organization_by_id(&m.organization_id).await? {
760                orgs.push(org);
761            }
762        }
763        Ok(orgs)
764    }
765}
766
767// ── MemberOps ──
768
769#[async_trait]
770impl MemberOps for AllsourceAuthAdapter {
771    type Member = Member;
772
773    async fn create_member(&self, input: CreateMember) -> AuthResult<Member> {
774        // Check if already a member
775        let existing: Vec<Member> = self
776            .client
777            .find_all_by_field("MemberCreated", "organization_id", &input.organization_id)
778            .await
779            .map_err(AuthError::from)?;
780
781        if existing.iter().any(|m| m.user_id == input.user_id) {
782            return Err(AuthError::conflict(
783                "User is already a member of this organization",
784            ));
785        }
786
787        let id = Uuid::new_v4().to_string();
788        let now = Utc::now();
789
790        let member = Member {
791            id: id.clone(),
792            organization_id: input.organization_id,
793            user_id: input.user_id,
794            role: input.role,
795            created_at: now,
796        };
797
798        let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
799        self.client
800            .append_event(&member_entity(&id), "MemberCreated", payload)
801            .await
802            .map_err(AuthError::from)?;
803
804        Ok(member)
805    }
806
807    async fn get_member(
808        &self,
809        organization_id: &str,
810        user_id: &str,
811    ) -> AuthResult<Option<Member>> {
812        let members: Vec<Member> = self
813            .client
814            .find_all_by_field("MemberCreated", "organization_id", organization_id)
815            .await
816            .map_err(AuthError::from)?;
817
818        Ok(members.into_iter().find(|m| m.user_id == user_id))
819    }
820
821    async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<Member>> {
822        self.client
823            .get_latest(&member_entity(id))
824            .await
825            .map_err(AuthError::from)
826    }
827
828    async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<Member> {
829        let mut member: Member = self
830            .get_member_by_id(member_id)
831            .await?
832            .ok_or_else(|| AuthError::not_found("Member not found"))?;
833
834        member.role = role.to_string();
835
836        let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
837        self.client
838            .append_event(&member_entity(member_id), "MemberUpdated", payload)
839            .await
840            .map_err(AuthError::from)?;
841
842        Ok(member)
843    }
844
845    async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
846        self.client
847            .append_delete(&member_entity(member_id), "MemberDeleted")
848            .await
849            .map_err(AuthError::from)
850    }
851
852    async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<Member>> {
853        self.client
854            .find_all_by_field("MemberCreated", "organization_id", organization_id)
855            .await
856            .map_err(AuthError::from)
857    }
858
859    async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
860        Ok(self.list_organization_members(organization_id).await?.len())
861    }
862
863    async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
864        let members = self.list_organization_members(organization_id).await?;
865        Ok(members.iter().filter(|m| m.role == "owner").count())
866    }
867}
868
869// ── InvitationOps ──
870
871#[async_trait]
872impl InvitationOps for AllsourceAuthAdapter {
873    type Invitation = Invitation;
874
875    async fn create_invitation(&self, input: CreateInvitation) -> AuthResult<Invitation> {
876        let id = Uuid::new_v4().to_string();
877        let now = Utc::now();
878
879        let invitation = Invitation {
880            id: id.clone(),
881            organization_id: input.organization_id,
882            email: input.email,
883            role: input.role,
884            status: InvitationStatus::Pending,
885            inviter_id: input.inviter_id,
886            expires_at: input.expires_at,
887            created_at: now,
888        };
889
890        let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
891        self.client
892            .append_event(&invitation_entity(&id), "InvitationCreated", payload)
893            .await
894            .map_err(AuthError::from)?;
895
896        Ok(invitation)
897    }
898
899    async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<Invitation>> {
900        self.client
901            .get_latest(&invitation_entity(id))
902            .await
903            .map_err(AuthError::from)
904    }
905
906    async fn get_pending_invitation(
907        &self,
908        organization_id: &str,
909        email: &str,
910    ) -> AuthResult<Option<Invitation>> {
911        let invitations: Vec<Invitation> = self
912            .client
913            .find_all_by_field("InvitationCreated", "organization_id", organization_id)
914            .await
915            .map_err(AuthError::from)?;
916
917        Ok(invitations.into_iter().find(|i| {
918            i.email.to_lowercase() == email.to_lowercase()
919                && i.status == InvitationStatus::Pending
920        }))
921    }
922
923    async fn update_invitation_status(
924        &self,
925        id: &str,
926        status: InvitationStatus,
927    ) -> AuthResult<Invitation> {
928        let mut invitation = self
929            .get_invitation_by_id(id)
930            .await?
931            .ok_or_else(|| AuthError::not_found("Invitation not found"))?;
932
933        invitation.status = status;
934
935        let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
936        self.client
937            .append_event(&invitation_entity(id), "InvitationUpdated", payload)
938            .await
939            .map_err(AuthError::from)?;
940
941        Ok(invitation)
942    }
943
944    async fn list_organization_invitations(
945        &self,
946        organization_id: &str,
947    ) -> AuthResult<Vec<Invitation>> {
948        self.client
949            .find_all_by_field("InvitationCreated", "organization_id", organization_id)
950            .await
951            .map_err(AuthError::from)
952    }
953
954    async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<Invitation>> {
955        let all: Vec<Invitation> = self
956            .client
957            .find_all_by_field("InvitationCreated", "email", email)
958            .await
959            .map_err(AuthError::from)?;
960
961        let now = Utc::now();
962        Ok(all
963            .into_iter()
964            .filter(|i| i.status == InvitationStatus::Pending && i.expires_at > now)
965            .collect())
966    }
967}
968
969// ── TwoFactorOps ──
970
971#[async_trait]
972impl TwoFactorOps for AllsourceAuthAdapter {
973    type TwoFactor = TwoFactor;
974
975    async fn create_two_factor(&self, input: CreateTwoFactor) -> AuthResult<TwoFactor> {
976        // Check if user already has 2FA
977        if self
978            .get_two_factor_by_user_id(&input.user_id)
979            .await?
980            .is_some()
981        {
982            return Err(AuthError::conflict(
983                "Two-factor authentication already enabled for this user",
984            ));
985        }
986
987        let id = Uuid::new_v4().to_string();
988        let now = Utc::now();
989
990        let two_factor = TwoFactor {
991            id: id.clone(),
992            secret: input.secret,
993            backup_codes: input.backup_codes,
994            user_id: input.user_id,
995            created_at: now,
996            updated_at: now,
997        };
998
999        let payload = serde_json::to_value(&two_factor).map_err(AuthError::from)?;
1000        self.client
1001            .append_event(&two_factor_entity(&id), "TwoFactorCreated", payload)
1002            .await
1003            .map_err(AuthError::from)?;
1004
1005        Ok(two_factor)
1006    }
1007
1008    async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TwoFactor>> {
1009        self.client
1010            .find_by_field("TwoFactorCreated", "user_id", user_id)
1011            .await
1012            .map_err(AuthError::from)
1013    }
1014
1015    async fn update_two_factor_backup_codes(
1016        &self,
1017        user_id: &str,
1018        backup_codes: &str,
1019    ) -> AuthResult<TwoFactor> {
1020        let mut tf = self
1021            .get_two_factor_by_user_id(user_id)
1022            .await?
1023            .ok_or_else(|| AuthError::not_found("Two-factor record not found"))?;
1024
1025        tf.backup_codes = Some(backup_codes.to_string());
1026        tf.updated_at = Utc::now();
1027
1028        let payload = serde_json::to_value(&tf).map_err(AuthError::from)?;
1029        self.client
1030            .append_event(&two_factor_entity(&tf.id), "TwoFactorUpdated", payload)
1031            .await
1032            .map_err(AuthError::from)?;
1033
1034        Ok(tf)
1035    }
1036
1037    async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
1038        if let Some(tf) = self.get_two_factor_by_user_id(user_id).await? {
1039            self.client
1040                .append_delete(&two_factor_entity(&tf.id), "TwoFactorDeleted")
1041                .await
1042                .map_err(AuthError::from)?;
1043        }
1044        Ok(())
1045    }
1046}
1047
1048// ── ApiKeyOps ──
1049
1050#[async_trait]
1051impl ApiKeyOps for AllsourceAuthAdapter {
1052    type ApiKey = ApiKey;
1053
1054    async fn create_api_key(&self, input: CreateApiKey) -> AuthResult<ApiKey> {
1055        // Check hash uniqueness
1056        if self
1057            .get_api_key_by_hash(&input.key_hash)
1058            .await?
1059            .is_some()
1060        {
1061            return Err(AuthError::conflict("API key already exists"));
1062        }
1063
1064        let id = Uuid::new_v4().to_string();
1065        let now = Utc::now().to_rfc3339();
1066
1067        let api_key = ApiKey {
1068            id: id.clone(),
1069            name: input.name,
1070            start: input.start,
1071            prefix: input.prefix,
1072            key_hash: input.key_hash,
1073            user_id: input.user_id,
1074            refill_interval: input.refill_interval,
1075            refill_amount: input.refill_amount,
1076            last_refill_at: None,
1077            enabled: input.enabled,
1078            rate_limit_enabled: input.rate_limit_enabled,
1079            rate_limit_time_window: input.rate_limit_time_window,
1080            rate_limit_max: input.rate_limit_max,
1081            request_count: Some(0),
1082            remaining: input.remaining,
1083            last_request: None,
1084            expires_at: input.expires_at,
1085            created_at: now.clone(),
1086            updated_at: now,
1087            permissions: input.permissions,
1088            metadata: input.metadata,
1089        };
1090
1091        let payload = serde_json::to_value(&api_key).map_err(AuthError::from)?;
1092        self.client
1093            .append_event(&api_key_entity(&id), "ApiKeyCreated", payload)
1094            .await
1095            .map_err(AuthError::from)?;
1096
1097        Ok(api_key)
1098    }
1099
1100    async fn get_api_key_by_id(&self, id: &str) -> AuthResult<Option<ApiKey>> {
1101        self.client
1102            .get_latest(&api_key_entity(id))
1103            .await
1104            .map_err(AuthError::from)
1105    }
1106
1107    async fn get_api_key_by_hash(&self, hash: &str) -> AuthResult<Option<ApiKey>> {
1108        self.client
1109            .find_by_field("ApiKeyCreated", "key", hash)
1110            .await
1111            .map_err(AuthError::from)
1112    }
1113
1114    async fn list_api_keys_by_user(&self, user_id: &str) -> AuthResult<Vec<ApiKey>> {
1115        let mut keys: Vec<ApiKey> = self
1116            .client
1117            .find_all_by_field("ApiKeyCreated", "user_id", user_id)
1118            .await
1119            .map_err(AuthError::from)?;
1120
1121        keys.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1122        Ok(keys)
1123    }
1124
1125    async fn update_api_key(&self, id: &str, update: UpdateApiKey) -> AuthResult<ApiKey> {
1126        let mut key = self
1127            .get_api_key_by_id(id)
1128            .await?
1129            .ok_or_else(|| AuthError::not_found("API key not found"))?;
1130
1131        if let Some(name) = update.name {
1132            key.name = Some(name);
1133        }
1134        if let Some(enabled) = update.enabled {
1135            key.enabled = enabled;
1136        }
1137        if let Some(remaining) = update.remaining {
1138            key.remaining = Some(remaining);
1139        }
1140        if let Some(rate_limit_enabled) = update.rate_limit_enabled {
1141            key.rate_limit_enabled = rate_limit_enabled;
1142        }
1143        if let Some(window) = update.rate_limit_time_window {
1144            key.rate_limit_time_window = Some(window);
1145        }
1146        if let Some(max) = update.rate_limit_max {
1147            key.rate_limit_max = Some(max);
1148        }
1149        if let Some(interval) = update.refill_interval {
1150            key.refill_interval = Some(interval);
1151        }
1152        if let Some(amount) = update.refill_amount {
1153            key.refill_amount = Some(amount);
1154        }
1155        if let Some(permissions) = update.permissions {
1156            key.permissions = Some(permissions);
1157        }
1158        if let Some(metadata) = update.metadata {
1159            key.metadata = Some(metadata);
1160        }
1161        if let Some(expires_at) = update.expires_at {
1162            key.expires_at = expires_at;
1163        }
1164        if let Some(last_request) = update.last_request {
1165            key.last_request = last_request;
1166        }
1167        if let Some(request_count) = update.request_count {
1168            key.request_count = Some(request_count);
1169        }
1170        if let Some(last_refill_at) = update.last_refill_at {
1171            key.last_refill_at = last_refill_at;
1172        }
1173        key.updated_at = Utc::now().to_rfc3339();
1174
1175        let payload = serde_json::to_value(&key).map_err(AuthError::from)?;
1176        self.client
1177            .append_event(&api_key_entity(id), "ApiKeyUpdated", payload)
1178            .await
1179            .map_err(AuthError::from)?;
1180
1181        Ok(key)
1182    }
1183
1184    async fn delete_api_key(&self, id: &str) -> AuthResult<()> {
1185        self.client
1186            .append_delete(&api_key_entity(id), "ApiKeyDeleted")
1187            .await
1188            .map_err(AuthError::from)
1189    }
1190
1191    async fn delete_expired_api_keys(&self) -> AuthResult<usize> {
1192        let all: Vec<ApiKey> = self
1193            .client
1194            .query_all("ApiKeyCreated", 10000)
1195            .await
1196            .map_err(AuthError::from)?;
1197
1198        let now = Utc::now();
1199        let mut deleted = 0;
1200        for key in all {
1201            if let Some(expires_at) = &key.expires_at {
1202                if let Ok(exp) = chrono::DateTime::parse_from_rfc3339(expires_at) {
1203                    if exp <= now {
1204                        self.delete_api_key(&key.id).await?;
1205                        deleted += 1;
1206                    }
1207                }
1208            }
1209        }
1210        Ok(deleted)
1211    }
1212}
1213
1214// ── PasskeyOps ──
1215
1216#[async_trait]
1217impl PasskeyOps for AllsourceAuthAdapter {
1218    type Passkey = Passkey;
1219
1220    async fn create_passkey(&self, input: CreatePasskey) -> AuthResult<Passkey> {
1221        // Check credential_id uniqueness
1222        if self
1223            .get_passkey_by_credential_id(&input.credential_id)
1224            .await?
1225            .is_some()
1226        {
1227            return Err(AuthError::conflict(
1228                "A passkey with this credential ID already exists",
1229            ));
1230        }
1231
1232        let id = Uuid::new_v4().to_string();
1233        let now = Utc::now();
1234
1235        let passkey = Passkey {
1236            id: id.clone(),
1237            name: input.name,
1238            public_key: input.public_key,
1239            user_id: input.user_id,
1240            credential_id: input.credential_id,
1241            counter: input.counter,
1242            device_type: input.device_type,
1243            backed_up: input.backed_up,
1244            transports: input.transports,
1245            created_at: now,
1246        };
1247
1248        let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1249        self.client
1250            .append_event(&passkey_entity(&id), "PasskeyCreated", payload)
1251            .await
1252            .map_err(AuthError::from)?;
1253
1254        Ok(passkey)
1255    }
1256
1257    async fn get_passkey_by_id(&self, id: &str) -> AuthResult<Option<Passkey>> {
1258        self.client
1259            .get_latest(&passkey_entity(id))
1260            .await
1261            .map_err(AuthError::from)
1262    }
1263
1264    async fn get_passkey_by_credential_id(
1265        &self,
1266        credential_id: &str,
1267    ) -> AuthResult<Option<Passkey>> {
1268        self.client
1269            .find_by_field("PasskeyCreated", "credentialID", credential_id)
1270            .await
1271            .map_err(AuthError::from)
1272    }
1273
1274    async fn list_passkeys_by_user(&self, user_id: &str) -> AuthResult<Vec<Passkey>> {
1275        let mut passkeys: Vec<Passkey> = self
1276            .client
1277            .find_all_by_field("PasskeyCreated", "user_id", user_id)
1278            .await
1279            .map_err(AuthError::from)?;
1280
1281        passkeys.sort_by_key(|p| std::cmp::Reverse(p.created_at));
1282        Ok(passkeys)
1283    }
1284
1285    async fn update_passkey_counter(&self, id: &str, counter: u64) -> AuthResult<Passkey> {
1286        let mut passkey = self
1287            .get_passkey_by_id(id)
1288            .await?
1289            .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1290
1291        passkey.counter = counter;
1292
1293        let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1294        self.client
1295            .append_event(&passkey_entity(id), "PasskeyUpdated", payload)
1296            .await
1297            .map_err(AuthError::from)?;
1298
1299        Ok(passkey)
1300    }
1301
1302    async fn update_passkey_name(&self, id: &str, name: &str) -> AuthResult<Passkey> {
1303        let mut passkey = self
1304            .get_passkey_by_id(id)
1305            .await?
1306            .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1307
1308        passkey.name = name.to_string();
1309
1310        let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1311        self.client
1312            .append_event(&passkey_entity(id), "PasskeyUpdated", payload)
1313            .await
1314            .map_err(AuthError::from)?;
1315
1316        Ok(passkey)
1317    }
1318
1319    async fn delete_passkey(&self, id: &str) -> AuthResult<()> {
1320        self.client
1321            .append_delete(&passkey_entity(id), "PasskeyDeleted")
1322            .await
1323            .map_err(AuthError::from)
1324    }
1325}