Skip to main content

better_auth_allsource/
adapter.rs

1use async_trait::async_trait;
2use better_auth_core::adapters::traits::{
3    AccountOps, ApiKeyOps, InvitationOps, MemberOps, OrganizationOps, PasskeyOps, SessionOps,
4    TwoFactorOps, UserOps, VerificationOps,
5};
6use better_auth_core::error::{AuthError, AuthResult};
7use better_auth_core::types::{
8    Account, ApiKey, CreateAccount, CreateApiKey, CreateInvitation, CreateMember,
9    CreateOrganization, CreatePasskey, CreateSession, CreateTwoFactor, CreateUser,
10    CreateVerification, Invitation, InvitationStatus, ListUsersParams, Member, Organization,
11    Passkey, Session, TwoFactor, UpdateAccount, UpdateApiKey, UpdateOrganization, UpdateUser, User,
12    Verification,
13};
14use chrono::{DateTime, Utc};
15use uuid::Uuid;
16
17use crate::client::AllsourceClient;
18
19/// Allsource-backed `DatabaseAdapter` for better-auth-rs.
20///
21/// Each auth entity is stored as an Allsource event stream:
22/// - Entity ID pattern: `auth-{type}:{id}` (e.g., `auth-user:abc123`)
23/// - Each create/update appends a full-state snapshot event
24/// - Deletes append a `_deleted: true` marker
25/// - Reads fetch the latest event and deserialize the payload
26///
27/// This gives you event-sourced auth with full history and time-travel.
28pub struct AllsourceAuthAdapter {
29    client: AllsourceClient,
30}
31
32impl AllsourceAuthAdapter {
33    /// Create a new adapter.
34    ///
35    /// - `core_url`: Allsource Core URL (e.g., `http://localhost:3900`)
36    /// - `query_url`: Allsource Query Service URL (e.g., `http://localhost:3902`)
37    /// - `api_key`: API key for authentication (e.g., `ask_...`)
38    pub fn new(core_url: &str, query_url: &str, api_key: &str) -> Self {
39        Self {
40            client: AllsourceClient::new(core_url, query_url, api_key),
41        }
42    }
43}
44
45// Entity ID helpers
46fn user_entity(id: &str) -> String {
47    format!("auth-user:{id}")
48}
49fn session_entity(token: &str) -> String {
50    format!("auth-session:{token}")
51}
52fn account_entity(id: &str) -> String {
53    format!("auth-account:{id}")
54}
55fn verification_entity(id: &str) -> String {
56    format!("auth-verification:{id}")
57}
58fn org_entity(id: &str) -> String {
59    format!("auth-org:{id}")
60}
61fn member_entity(id: &str) -> String {
62    format!("auth-member:{id}")
63}
64fn invitation_entity(id: &str) -> String {
65    format!("auth-invitation:{id}")
66}
67fn two_factor_entity(id: &str) -> String {
68    format!("auth-2fa:{id}")
69}
70fn api_key_entity(id: &str) -> String {
71    format!("auth-apikey:{id}")
72}
73fn passkey_entity(id: &str) -> String {
74    format!("auth-passkey:{id}")
75}
76
77// ── UserOps ──
78
79#[async_trait]
80impl UserOps for AllsourceAuthAdapter {
81    type User = User;
82
83    async fn create_user(&self, input: CreateUser) -> AuthResult<User> {
84        let id = input.id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
85
86        // Check email uniqueness
87        if let Some(email) = &input.email {
88            if let Some(_existing) = self
89                .client
90                .find_by_field::<User>("auth.user.created", "email", email)
91                .await
92                .map_err(AuthError::from)?
93            {
94                return Err(AuthError::config("Email already exists"));
95            }
96        }
97
98        // Check username uniqueness
99        if let Some(username) = &input.username {
100            if let Some(_existing) = self
101                .client
102                .find_by_field::<User>("auth.user.created", "username", username)
103                .await
104                .map_err(AuthError::from)?
105            {
106                return Err(AuthError::conflict(
107                    "A user with this username already exists",
108                ));
109            }
110        }
111
112        let now = Utc::now();
113        let user = User {
114            id: id.clone(),
115            email: input.email,
116            name: input.name,
117            image: input.image,
118            email_verified: input.email_verified.unwrap_or(false),
119            created_at: now,
120            updated_at: now,
121            username: input.username,
122            display_username: input.display_username,
123            two_factor_enabled: false,
124            role: input.role,
125            banned: false,
126            ban_reason: None,
127            ban_expires: None,
128            metadata: input.metadata.unwrap_or(serde_json::Value::Null),
129        };
130
131        let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
132        self.client
133            .append_event(&user_entity(&id), "auth.user.created", payload)
134            .await
135            .map_err(AuthError::from)?;
136
137        // Store password in the account, not the user (matches better-auth pattern)
138        // Password is handled via AccountOps
139
140        Ok(user)
141    }
142
143    async fn get_user_by_id(&self, id: &str) -> AuthResult<Option<User>> {
144        self.client
145            .get_latest(&user_entity(id))
146            .await
147            .map_err(AuthError::from)
148    }
149
150    async fn get_user_by_email(&self, email: &str) -> AuthResult<Option<User>> {
151        self.client
152            .find_by_field("auth.user.created", "email", email)
153            .await
154            .map_err(AuthError::from)
155    }
156
157    async fn get_user_by_username(&self, username: &str) -> AuthResult<Option<User>> {
158        self.client
159            .find_by_field("auth.user.created", "username", username)
160            .await
161            .map_err(AuthError::from)
162    }
163
164    async fn update_user(&self, id: &str, update: UpdateUser) -> AuthResult<User> {
165        let mut user = self
166            .get_user_by_id(id)
167            .await?
168            .ok_or(AuthError::UserNotFound)?;
169
170        if let Some(email) = update.email {
171            user.email = Some(email);
172        }
173        if let Some(name) = update.name {
174            user.name = Some(name);
175        }
176        if let Some(image) = update.image {
177            user.image = Some(image);
178        }
179        if let Some(verified) = update.email_verified {
180            user.email_verified = verified;
181        }
182        if let Some(username) = update.username {
183            user.username = Some(username);
184        }
185        if let Some(display_username) = update.display_username {
186            user.display_username = Some(display_username);
187        }
188        if let Some(role) = update.role {
189            user.role = Some(role);
190        }
191        if let Some(banned) = update.banned {
192            user.banned = banned;
193        }
194        if let Some(ban_reason) = update.ban_reason {
195            user.ban_reason = Some(ban_reason);
196        }
197        if let Some(ban_expires) = update.ban_expires {
198            user.ban_expires = Some(ban_expires);
199        }
200        if let Some(two_factor_enabled) = update.two_factor_enabled {
201            user.two_factor_enabled = two_factor_enabled;
202        }
203        if let Some(metadata) = update.metadata {
204            user.metadata = metadata;
205        }
206        user.updated_at = Utc::now();
207
208        let payload = serde_json::to_value(&user).map_err(AuthError::from)?;
209        self.client
210            .append_event(&user_entity(id), "auth.user.updated", payload)
211            .await
212            .map_err(AuthError::from)?;
213
214        Ok(user)
215    }
216
217    async fn delete_user(&self, id: &str) -> AuthResult<()> {
218        self.client
219            .append_delete(&user_entity(id), "auth.user.deleted")
220            .await
221            .map_err(AuthError::from)
222    }
223
224    async fn list_users(&self, params: ListUsersParams) -> AuthResult<(Vec<User>, usize)> {
225        let mut users: Vec<User> = self
226            .client
227            .query_all("auth.user.created", 10000)
228            .await
229            .map_err(AuthError::from)?;
230
231        // Apply search filter
232        if let Some(search_value) = &params.search_value {
233            let field = params.search_field.as_deref().unwrap_or("email");
234            let op = params.search_operator.as_deref().unwrap_or("contains");
235            let sv = search_value.to_lowercase();
236            users.retain(|u| {
237                let field_val = match field {
238                    "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
239                    _ => u.email.as_deref().unwrap_or("").to_lowercase(),
240                };
241                match op {
242                    "starts_with" => field_val.starts_with(&sv),
243                    "ends_with" => field_val.ends_with(&sv),
244                    _ => field_val.contains(&sv),
245                }
246            });
247        }
248
249        // Apply filter
250        if let Some(filter_value) = &params.filter_value {
251            let field = params.filter_field.as_deref().unwrap_or("email");
252            let op = params.filter_operator.as_deref().unwrap_or("eq");
253            let fv = filter_value.to_lowercase();
254            users.retain(|u| {
255                let field_val = match field {
256                    "name" => u.name.as_deref().unwrap_or("").to_lowercase(),
257                    "role" => u.role.as_deref().unwrap_or("").to_lowercase(),
258                    _ => u.email.as_deref().unwrap_or("").to_lowercase(),
259                };
260                match op {
261                    "contains" => field_val.contains(&fv),
262                    "starts_with" => field_val.starts_with(&fv),
263                    "ends_with" => field_val.ends_with(&fv),
264                    "ne" => field_val != fv,
265                    _ => field_val == fv,
266                }
267            });
268        }
269
270        let total = users.len();
271        let offset = params.offset.unwrap_or(0);
272        let limit = params.limit.unwrap_or(100);
273        let paged: Vec<User> = users.into_iter().skip(offset).take(limit).collect();
274
275        Ok((paged, total))
276    }
277}
278
279// ── SessionOps ──
280
281#[async_trait]
282impl SessionOps for AllsourceAuthAdapter {
283    type Session = Session;
284
285    async fn create_session(&self, input: CreateSession) -> AuthResult<Session> {
286        let id = Uuid::new_v4().to_string();
287        let token = format!("session_{}", Uuid::new_v4());
288        let now = Utc::now();
289
290        let session = Session {
291            id,
292            token: token.clone(),
293            user_id: input.user_id,
294            expires_at: input.expires_at,
295            created_at: now,
296            updated_at: now,
297            ip_address: input.ip_address,
298            user_agent: input.user_agent,
299            impersonated_by: input.impersonated_by,
300            active_organization_id: input.active_organization_id,
301            active: true,
302        };
303
304        let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
305        self.client
306            .append_event(&session_entity(&token), "auth.session.created", payload)
307            .await
308            .map_err(AuthError::from)?;
309
310        Ok(session)
311    }
312
313    async fn get_session(&self, token: &str) -> AuthResult<Option<Session>> {
314        let session: Option<Session> = self
315            .client
316            .get_latest(&session_entity(token))
317            .await
318            .map_err(AuthError::from)?;
319        // Session.active is #[serde(skip)] so it defaults to false after deserialization.
320        // Non-deleted sessions retrieved from the store are active.
321        Ok(session.map(|mut s| {
322            s.active = true;
323            s
324        }))
325    }
326
327    async fn get_user_sessions(&self, user_id: &str) -> AuthResult<Vec<Session>> {
328        let sessions: Vec<Session> = self
329            .client
330            .find_all_by_field("auth.session.created", "user_id", user_id)
331            .await
332            .map_err(AuthError::from)?;
333        Ok(sessions
334            .into_iter()
335            .map(|mut s| {
336                s.active = true;
337                s
338            })
339            .collect())
340    }
341
342    async fn update_session_expiry(
343        &self,
344        token: &str,
345        expires_at: DateTime<Utc>,
346    ) -> AuthResult<()> {
347        if let Some(mut session) = self.get_session(token).await? {
348            session.expires_at = expires_at;
349            session.updated_at = Utc::now();
350            let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
351            self.client
352                .append_event(&session_entity(token), "auth.session.expiry_updated", payload)
353                .await
354                .map_err(AuthError::from)?;
355        }
356        Ok(())
357    }
358
359    async fn delete_session(&self, token: &str) -> AuthResult<()> {
360        self.client
361            .append_delete(&session_entity(token), "auth.session.deleted")
362            .await
363            .map_err(AuthError::from)
364    }
365
366    async fn delete_user_sessions(&self, user_id: &str) -> AuthResult<()> {
367        let sessions = self.get_user_sessions(user_id).await?;
368        for session in sessions {
369            self.delete_session(&session.token).await?;
370        }
371        Ok(())
372    }
373
374    async fn delete_expired_sessions(&self) -> AuthResult<usize> {
375        let all: Vec<Session> = self
376            .client
377            .query_all("auth.session.created", 10000)
378            .await
379            .map_err(AuthError::from)?;
380
381        let now = Utc::now();
382        let mut deleted = 0;
383        for session in all {
384            if session.expires_at <= now || !session.active {
385                self.delete_session(&session.token).await?;
386                deleted += 1;
387            }
388        }
389        Ok(deleted)
390    }
391
392    async fn update_session_active_organization(
393        &self,
394        token: &str,
395        organization_id: Option<&str>,
396    ) -> AuthResult<Session> {
397        let mut session = self
398            .get_session(token)
399            .await?
400            .ok_or(AuthError::SessionNotFound)?;
401
402        session.active_organization_id = organization_id.map(|s| s.to_string());
403        session.updated_at = Utc::now();
404
405        let payload = serde_json::to_value(&session).map_err(AuthError::from)?;
406        self.client
407            .append_event(&session_entity(token), "auth.session.updated", payload)
408            .await
409            .map_err(AuthError::from)?;
410
411        Ok(session)
412    }
413}
414
415// ── AccountOps ──
416
417#[async_trait]
418impl AccountOps for AllsourceAuthAdapter {
419    type Account = Account;
420
421    async fn create_account(&self, input: CreateAccount) -> AuthResult<Account> {
422        let id = Uuid::new_v4().to_string();
423        let now = Utc::now();
424
425        let account = Account {
426            id: id.clone(),
427            account_id: input.account_id,
428            provider_id: input.provider_id,
429            user_id: input.user_id,
430            access_token: input.access_token,
431            refresh_token: input.refresh_token,
432            id_token: input.id_token,
433            access_token_expires_at: input.access_token_expires_at,
434            refresh_token_expires_at: input.refresh_token_expires_at,
435            scope: input.scope,
436            password: input.password,
437            created_at: now,
438            updated_at: now,
439        };
440
441        let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
442        self.client
443            .append_event(&account_entity(&id), "auth.account.created", payload)
444            .await
445            .map_err(AuthError::from)?;
446
447        Ok(account)
448    }
449
450    async fn get_account(
451        &self,
452        provider: &str,
453        provider_account_id: &str,
454    ) -> AuthResult<Option<Account>> {
455        // Need to scan accounts matching provider + account_id
456        let all: Vec<Account> = self
457            .client
458            .query_all("auth.account.created", 10000)
459            .await
460            .map_err(AuthError::from)?;
461
462        Ok(all
463            .into_iter()
464            .find(|a| a.provider_id == provider && a.account_id == provider_account_id))
465    }
466
467    async fn get_user_accounts(&self, user_id: &str) -> AuthResult<Vec<Account>> {
468        self.client
469            .find_all_by_field("auth.account.created", "user_id", user_id)
470            .await
471            .map_err(AuthError::from)
472    }
473
474    async fn update_account(&self, id: &str, update: UpdateAccount) -> AuthResult<Account> {
475        let mut account: Account = self
476            .client
477            .get_latest(&account_entity(id))
478            .await
479            .map_err(AuthError::from)?
480            .ok_or_else(|| AuthError::not_found("Account not found"))?;
481
482        if let Some(access_token) = update.access_token {
483            account.access_token = Some(access_token);
484        }
485        if let Some(refresh_token) = update.refresh_token {
486            account.refresh_token = Some(refresh_token);
487        }
488        if let Some(id_token) = update.id_token {
489            account.id_token = Some(id_token);
490        }
491        if let Some(expires_at) = update.access_token_expires_at {
492            account.access_token_expires_at = Some(expires_at);
493        }
494        if let Some(expires_at) = update.refresh_token_expires_at {
495            account.refresh_token_expires_at = Some(expires_at);
496        }
497        if let Some(scope) = update.scope {
498            account.scope = Some(scope);
499        }
500        if let Some(password) = update.password {
501            account.password = Some(password);
502        }
503        account.updated_at = Utc::now();
504
505        let payload = serde_json::to_value(&account).map_err(AuthError::from)?;
506        self.client
507            .append_event(&account_entity(id), "auth.account.updated", payload)
508            .await
509            .map_err(AuthError::from)?;
510
511        Ok(account)
512    }
513
514    async fn delete_account(&self, id: &str) -> AuthResult<()> {
515        self.client
516            .append_delete(&account_entity(id), "auth.account.deleted")
517            .await
518            .map_err(AuthError::from)
519    }
520}
521
522// ── VerificationOps ──
523
524#[async_trait]
525impl VerificationOps for AllsourceAuthAdapter {
526    type Verification = Verification;
527
528    async fn create_verification(&self, input: CreateVerification) -> AuthResult<Verification> {
529        let id = Uuid::new_v4().to_string();
530        let now = Utc::now();
531
532        let verification = Verification {
533            id: id.clone(),
534            identifier: input.identifier,
535            value: input.value,
536            expires_at: input.expires_at,
537            created_at: now,
538            updated_at: now,
539        };
540
541        let payload = serde_json::to_value(&verification).map_err(AuthError::from)?;
542        self.client
543            .append_event(
544                &verification_entity(&id),
545                "auth.verification.created",
546                payload,
547            )
548            .await
549            .map_err(AuthError::from)?;
550
551        Ok(verification)
552    }
553
554    async fn get_verification(
555        &self,
556        identifier: &str,
557        value: &str,
558    ) -> AuthResult<Option<Verification>> {
559        let all: Vec<Verification> = self
560            .client
561            .query_all("auth.verification.created", 10000)
562            .await
563            .map_err(AuthError::from)?;
564
565        let now = Utc::now();
566        Ok(all
567            .into_iter()
568            .find(|v| v.identifier == identifier && v.value == value && v.expires_at > now))
569    }
570
571    async fn get_verification_by_value(&self, value: &str) -> AuthResult<Option<Verification>> {
572        let all: Vec<Verification> = self
573            .client
574            .query_all("auth.verification.created", 10000)
575            .await
576            .map_err(AuthError::from)?;
577
578        let now = Utc::now();
579        Ok(all
580            .into_iter()
581            .find(|v| v.value == value && v.expires_at > now))
582    }
583
584    async fn get_verification_by_identifier(
585        &self,
586        identifier: &str,
587    ) -> AuthResult<Option<Verification>> {
588        let all: Vec<Verification> = self
589            .client
590            .query_all("auth.verification.created", 10000)
591            .await
592            .map_err(AuthError::from)?;
593
594        let now = Utc::now();
595        Ok(all
596            .into_iter()
597            .find(|v| v.identifier == identifier && v.expires_at > now))
598    }
599
600    async fn consume_verification(
601        &self,
602        identifier: &str,
603        value: &str,
604    ) -> AuthResult<Option<Verification>> {
605        let verification = self.get_verification(identifier, value).await?;
606        if let Some(ref v) = verification {
607            self.client
608                .append_delete(&verification_entity(&v.id), "auth.verification.consumed")
609                .await
610                .map_err(AuthError::from)?;
611        }
612        Ok(verification)
613    }
614
615    async fn delete_verification(&self, id: &str) -> AuthResult<()> {
616        self.client
617            .append_delete(&verification_entity(id), "auth.verification.deleted")
618            .await
619            .map_err(AuthError::from)
620    }
621
622    async fn delete_expired_verifications(&self) -> AuthResult<usize> {
623        let all: Vec<Verification> = self
624            .client
625            .query_all("auth.verification.created", 10000)
626            .await
627            .map_err(AuthError::from)?;
628
629        let now = Utc::now();
630        let mut deleted = 0;
631        for v in all {
632            if v.expires_at <= now {
633                self.delete_verification(&v.id).await?;
634                deleted += 1;
635            }
636        }
637        Ok(deleted)
638    }
639}
640
641// ── OrganizationOps ──
642
643#[async_trait]
644impl OrganizationOps for AllsourceAuthAdapter {
645    type Organization = Organization;
646
647    async fn create_organization(&self, input: CreateOrganization) -> AuthResult<Organization> {
648        // Check slug uniqueness
649        if self
650            .client
651            .find_by_field::<Organization>("auth.org.created", "slug", &input.slug)
652            .await
653            .map_err(AuthError::from)?
654            .is_some()
655        {
656            return Err(AuthError::conflict("Organization slug already exists"));
657        }
658
659        let id = input
660            .id
661            .clone()
662            .unwrap_or_else(|| Uuid::new_v4().to_string());
663        let now = Utc::now();
664
665        let org = Organization {
666            id: id.clone(),
667            name: input.name,
668            slug: input.slug,
669            logo: input.logo,
670            metadata: input.metadata,
671            created_at: now,
672            updated_at: now,
673        };
674
675        let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
676        self.client
677            .append_event(&org_entity(&id), "auth.org.created", payload)
678            .await
679            .map_err(AuthError::from)?;
680
681        Ok(org)
682    }
683
684    async fn get_organization_by_id(&self, id: &str) -> AuthResult<Option<Organization>> {
685        self.client
686            .get_latest(&org_entity(id))
687            .await
688            .map_err(AuthError::from)
689    }
690
691    async fn get_organization_by_slug(&self, slug: &str) -> AuthResult<Option<Organization>> {
692        self.client
693            .find_by_field("auth.org.created", "slug", slug)
694            .await
695            .map_err(AuthError::from)
696    }
697
698    async fn update_organization(
699        &self,
700        id: &str,
701        update: UpdateOrganization,
702    ) -> AuthResult<Organization> {
703        let mut org: Organization = self
704            .get_organization_by_id(id)
705            .await?
706            .ok_or_else(|| AuthError::not_found("Organization not found"))?;
707
708        if let Some(name) = update.name {
709            org.name = name;
710        }
711        if let Some(slug) = update.slug {
712            org.slug = slug;
713        }
714        if let Some(logo) = update.logo {
715            org.logo = Some(logo);
716        }
717        if let Some(metadata) = update.metadata {
718            org.metadata = Some(metadata);
719        }
720        org.updated_at = Utc::now();
721
722        let payload = serde_json::to_value(&org).map_err(AuthError::from)?;
723        self.client
724            .append_event(&org_entity(id), "auth.org.updated", payload)
725            .await
726            .map_err(AuthError::from)?;
727
728        Ok(org)
729    }
730
731    async fn delete_organization(&self, id: &str) -> AuthResult<()> {
732        self.client
733            .append_delete(&org_entity(id), "auth.org.deleted")
734            .await
735            .map_err(AuthError::from)?;
736
737        // Also delete members and invitations for this org
738        let members: Vec<Member> = self
739            .client
740            .find_all_by_field("auth.member.created", "organization_id", id)
741            .await
742            .map_err(AuthError::from)?;
743        for m in members {
744            self.client
745                .append_delete(&member_entity(&m.id), "auth.member.deleted")
746                .await
747                .map_err(AuthError::from)?;
748        }
749
750        let invitations: Vec<Invitation> = self
751            .client
752            .find_all_by_field("auth.invitation.created", "organization_id", id)
753            .await
754            .map_err(AuthError::from)?;
755        for i in invitations {
756            self.client
757                .append_delete(&invitation_entity(&i.id), "auth.invitation.deleted")
758                .await
759                .map_err(AuthError::from)?;
760        }
761
762        Ok(())
763    }
764
765    async fn list_user_organizations(&self, user_id: &str) -> AuthResult<Vec<Organization>> {
766        let members: Vec<Member> = self
767            .client
768            .find_all_by_field("auth.member.created", "user_id", user_id)
769            .await
770            .map_err(AuthError::from)?;
771
772        let mut orgs = Vec::new();
773        for m in members {
774            if let Some(org) = self.get_organization_by_id(&m.organization_id).await? {
775                orgs.push(org);
776            }
777        }
778        Ok(orgs)
779    }
780}
781
782// ── MemberOps ──
783
784#[async_trait]
785impl MemberOps for AllsourceAuthAdapter {
786    type Member = Member;
787
788    async fn create_member(&self, input: CreateMember) -> AuthResult<Member> {
789        // Check if already a member
790        let existing: Vec<Member> = self
791            .client
792            .find_all_by_field("auth.member.created", "organization_id", &input.organization_id)
793            .await
794            .map_err(AuthError::from)?;
795
796        if existing.iter().any(|m| m.user_id == input.user_id) {
797            return Err(AuthError::conflict(
798                "User is already a member of this organization",
799            ));
800        }
801
802        let id = Uuid::new_v4().to_string();
803        let now = Utc::now();
804
805        let member = Member {
806            id: id.clone(),
807            organization_id: input.organization_id,
808            user_id: input.user_id,
809            role: input.role,
810            created_at: now,
811        };
812
813        let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
814        self.client
815            .append_event(&member_entity(&id), "auth.member.created", payload)
816            .await
817            .map_err(AuthError::from)?;
818
819        Ok(member)
820    }
821
822    async fn get_member(
823        &self,
824        organization_id: &str,
825        user_id: &str,
826    ) -> AuthResult<Option<Member>> {
827        let members: Vec<Member> = self
828            .client
829            .find_all_by_field("auth.member.created", "organization_id", organization_id)
830            .await
831            .map_err(AuthError::from)?;
832
833        Ok(members.into_iter().find(|m| m.user_id == user_id))
834    }
835
836    async fn get_member_by_id(&self, id: &str) -> AuthResult<Option<Member>> {
837        self.client
838            .get_latest(&member_entity(id))
839            .await
840            .map_err(AuthError::from)
841    }
842
843    async fn update_member_role(&self, member_id: &str, role: &str) -> AuthResult<Member> {
844        let mut member: Member = self
845            .get_member_by_id(member_id)
846            .await?
847            .ok_or_else(|| AuthError::not_found("Member not found"))?;
848
849        member.role = role.to_string();
850
851        let payload = serde_json::to_value(&member).map_err(AuthError::from)?;
852        self.client
853            .append_event(&member_entity(member_id), "auth.member.updated", payload)
854            .await
855            .map_err(AuthError::from)?;
856
857        Ok(member)
858    }
859
860    async fn delete_member(&self, member_id: &str) -> AuthResult<()> {
861        self.client
862            .append_delete(&member_entity(member_id), "auth.member.deleted")
863            .await
864            .map_err(AuthError::from)
865    }
866
867    async fn list_organization_members(&self, organization_id: &str) -> AuthResult<Vec<Member>> {
868        self.client
869            .find_all_by_field("auth.member.created", "organization_id", organization_id)
870            .await
871            .map_err(AuthError::from)
872    }
873
874    async fn count_organization_members(&self, organization_id: &str) -> AuthResult<usize> {
875        Ok(self.list_organization_members(organization_id).await?.len())
876    }
877
878    async fn count_organization_owners(&self, organization_id: &str) -> AuthResult<usize> {
879        let members = self.list_organization_members(organization_id).await?;
880        Ok(members.iter().filter(|m| m.role == "owner").count())
881    }
882}
883
884// ── InvitationOps ──
885
886#[async_trait]
887impl InvitationOps for AllsourceAuthAdapter {
888    type Invitation = Invitation;
889
890    async fn create_invitation(&self, input: CreateInvitation) -> AuthResult<Invitation> {
891        let id = Uuid::new_v4().to_string();
892        let now = Utc::now();
893
894        let invitation = Invitation {
895            id: id.clone(),
896            organization_id: input.organization_id,
897            email: input.email,
898            role: input.role,
899            status: InvitationStatus::Pending,
900            inviter_id: input.inviter_id,
901            expires_at: input.expires_at,
902            created_at: now,
903        };
904
905        let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
906        self.client
907            .append_event(&invitation_entity(&id), "auth.invitation.created", payload)
908            .await
909            .map_err(AuthError::from)?;
910
911        Ok(invitation)
912    }
913
914    async fn get_invitation_by_id(&self, id: &str) -> AuthResult<Option<Invitation>> {
915        self.client
916            .get_latest(&invitation_entity(id))
917            .await
918            .map_err(AuthError::from)
919    }
920
921    async fn get_pending_invitation(
922        &self,
923        organization_id: &str,
924        email: &str,
925    ) -> AuthResult<Option<Invitation>> {
926        let invitations: Vec<Invitation> = self
927            .client
928            .find_all_by_field("auth.invitation.created", "organization_id", organization_id)
929            .await
930            .map_err(AuthError::from)?;
931
932        Ok(invitations.into_iter().find(|i| {
933            i.email.to_lowercase() == email.to_lowercase()
934                && i.status == InvitationStatus::Pending
935        }))
936    }
937
938    async fn update_invitation_status(
939        &self,
940        id: &str,
941        status: InvitationStatus,
942    ) -> AuthResult<Invitation> {
943        let mut invitation = self
944            .get_invitation_by_id(id)
945            .await?
946            .ok_or_else(|| AuthError::not_found("Invitation not found"))?;
947
948        invitation.status = status;
949
950        let payload = serde_json::to_value(&invitation).map_err(AuthError::from)?;
951        self.client
952            .append_event(&invitation_entity(id), "auth.invitation.updated", payload)
953            .await
954            .map_err(AuthError::from)?;
955
956        Ok(invitation)
957    }
958
959    async fn list_organization_invitations(
960        &self,
961        organization_id: &str,
962    ) -> AuthResult<Vec<Invitation>> {
963        self.client
964            .find_all_by_field("auth.invitation.created", "organization_id", organization_id)
965            .await
966            .map_err(AuthError::from)
967    }
968
969    async fn list_user_invitations(&self, email: &str) -> AuthResult<Vec<Invitation>> {
970        let all: Vec<Invitation> = self
971            .client
972            .find_all_by_field("auth.invitation.created", "email", email)
973            .await
974            .map_err(AuthError::from)?;
975
976        let now = Utc::now();
977        Ok(all
978            .into_iter()
979            .filter(|i| i.status == InvitationStatus::Pending && i.expires_at > now)
980            .collect())
981    }
982}
983
984// ── TwoFactorOps ──
985
986#[async_trait]
987impl TwoFactorOps for AllsourceAuthAdapter {
988    type TwoFactor = TwoFactor;
989
990    async fn create_two_factor(&self, input: CreateTwoFactor) -> AuthResult<TwoFactor> {
991        // Check if user already has 2FA
992        if self
993            .get_two_factor_by_user_id(&input.user_id)
994            .await?
995            .is_some()
996        {
997            return Err(AuthError::conflict(
998                "Two-factor authentication already enabled for this user",
999            ));
1000        }
1001
1002        let id = Uuid::new_v4().to_string();
1003        let now = Utc::now();
1004
1005        let two_factor = TwoFactor {
1006            id: id.clone(),
1007            secret: input.secret,
1008            backup_codes: input.backup_codes,
1009            user_id: input.user_id,
1010            created_at: now,
1011            updated_at: now,
1012        };
1013
1014        let payload = serde_json::to_value(&two_factor).map_err(AuthError::from)?;
1015        self.client
1016            .append_event(&two_factor_entity(&id), "auth.two_factor.created", payload)
1017            .await
1018            .map_err(AuthError::from)?;
1019
1020        Ok(two_factor)
1021    }
1022
1023    async fn get_two_factor_by_user_id(&self, user_id: &str) -> AuthResult<Option<TwoFactor>> {
1024        self.client
1025            .find_by_field("auth.two_factor.created", "user_id", user_id)
1026            .await
1027            .map_err(AuthError::from)
1028    }
1029
1030    async fn update_two_factor_backup_codes(
1031        &self,
1032        user_id: &str,
1033        backup_codes: &str,
1034    ) -> AuthResult<TwoFactor> {
1035        let mut tf = self
1036            .get_two_factor_by_user_id(user_id)
1037            .await?
1038            .ok_or_else(|| AuthError::not_found("Two-factor record not found"))?;
1039
1040        tf.backup_codes = Some(backup_codes.to_string());
1041        tf.updated_at = Utc::now();
1042
1043        let payload = serde_json::to_value(&tf).map_err(AuthError::from)?;
1044        self.client
1045            .append_event(&two_factor_entity(&tf.id), "auth.two_factor.updated", payload)
1046            .await
1047            .map_err(AuthError::from)?;
1048
1049        Ok(tf)
1050    }
1051
1052    async fn delete_two_factor(&self, user_id: &str) -> AuthResult<()> {
1053        if let Some(tf) = self.get_two_factor_by_user_id(user_id).await? {
1054            self.client
1055                .append_delete(&two_factor_entity(&tf.id), "auth.two_factor.deleted")
1056                .await
1057                .map_err(AuthError::from)?;
1058        }
1059        Ok(())
1060    }
1061}
1062
1063// ── ApiKeyOps ──
1064
1065#[async_trait]
1066impl ApiKeyOps for AllsourceAuthAdapter {
1067    type ApiKey = ApiKey;
1068
1069    async fn create_api_key(&self, input: CreateApiKey) -> AuthResult<ApiKey> {
1070        // Check hash uniqueness
1071        if self
1072            .get_api_key_by_hash(&input.key_hash)
1073            .await?
1074            .is_some()
1075        {
1076            return Err(AuthError::conflict("API key already exists"));
1077        }
1078
1079        let id = Uuid::new_v4().to_string();
1080        let now = Utc::now().to_rfc3339();
1081
1082        let api_key = ApiKey {
1083            id: id.clone(),
1084            name: input.name,
1085            start: input.start,
1086            prefix: input.prefix,
1087            key_hash: input.key_hash,
1088            user_id: input.user_id,
1089            refill_interval: input.refill_interval,
1090            refill_amount: input.refill_amount,
1091            last_refill_at: None,
1092            enabled: input.enabled,
1093            rate_limit_enabled: input.rate_limit_enabled,
1094            rate_limit_time_window: input.rate_limit_time_window,
1095            rate_limit_max: input.rate_limit_max,
1096            request_count: Some(0),
1097            remaining: input.remaining,
1098            last_request: None,
1099            expires_at: input.expires_at,
1100            created_at: now.clone(),
1101            updated_at: now,
1102            permissions: input.permissions,
1103            metadata: input.metadata,
1104        };
1105
1106        let payload = serde_json::to_value(&api_key).map_err(AuthError::from)?;
1107        self.client
1108            .append_event(&api_key_entity(&id), "auth.api_key.created", payload)
1109            .await
1110            .map_err(AuthError::from)?;
1111
1112        Ok(api_key)
1113    }
1114
1115    async fn get_api_key_by_id(&self, id: &str) -> AuthResult<Option<ApiKey>> {
1116        self.client
1117            .get_latest(&api_key_entity(id))
1118            .await
1119            .map_err(AuthError::from)
1120    }
1121
1122    async fn get_api_key_by_hash(&self, hash: &str) -> AuthResult<Option<ApiKey>> {
1123        self.client
1124            .find_by_field("auth.api_key.created", "key", hash)
1125            .await
1126            .map_err(AuthError::from)
1127    }
1128
1129    async fn list_api_keys_by_user(&self, user_id: &str) -> AuthResult<Vec<ApiKey>> {
1130        let mut keys: Vec<ApiKey> = self
1131            .client
1132            .find_all_by_field("auth.api_key.created", "user_id", user_id)
1133            .await
1134            .map_err(AuthError::from)?;
1135
1136        keys.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1137        Ok(keys)
1138    }
1139
1140    async fn update_api_key(&self, id: &str, update: UpdateApiKey) -> AuthResult<ApiKey> {
1141        let mut key = self
1142            .get_api_key_by_id(id)
1143            .await?
1144            .ok_or_else(|| AuthError::not_found("API key not found"))?;
1145
1146        if let Some(name) = update.name {
1147            key.name = Some(name);
1148        }
1149        if let Some(enabled) = update.enabled {
1150            key.enabled = enabled;
1151        }
1152        if let Some(remaining) = update.remaining {
1153            key.remaining = Some(remaining);
1154        }
1155        if let Some(rate_limit_enabled) = update.rate_limit_enabled {
1156            key.rate_limit_enabled = rate_limit_enabled;
1157        }
1158        if let Some(window) = update.rate_limit_time_window {
1159            key.rate_limit_time_window = Some(window);
1160        }
1161        if let Some(max) = update.rate_limit_max {
1162            key.rate_limit_max = Some(max);
1163        }
1164        if let Some(interval) = update.refill_interval {
1165            key.refill_interval = Some(interval);
1166        }
1167        if let Some(amount) = update.refill_amount {
1168            key.refill_amount = Some(amount);
1169        }
1170        if let Some(permissions) = update.permissions {
1171            key.permissions = Some(permissions);
1172        }
1173        if let Some(metadata) = update.metadata {
1174            key.metadata = Some(metadata);
1175        }
1176        if let Some(expires_at) = update.expires_at {
1177            key.expires_at = expires_at;
1178        }
1179        if let Some(last_request) = update.last_request {
1180            key.last_request = last_request;
1181        }
1182        if let Some(request_count) = update.request_count {
1183            key.request_count = Some(request_count);
1184        }
1185        if let Some(last_refill_at) = update.last_refill_at {
1186            key.last_refill_at = last_refill_at;
1187        }
1188        key.updated_at = Utc::now().to_rfc3339();
1189
1190        let payload = serde_json::to_value(&key).map_err(AuthError::from)?;
1191        self.client
1192            .append_event(&api_key_entity(id), "auth.api_key.updated", payload)
1193            .await
1194            .map_err(AuthError::from)?;
1195
1196        Ok(key)
1197    }
1198
1199    async fn delete_api_key(&self, id: &str) -> AuthResult<()> {
1200        self.client
1201            .append_delete(&api_key_entity(id), "auth.api_key.deleted")
1202            .await
1203            .map_err(AuthError::from)
1204    }
1205
1206    async fn delete_expired_api_keys(&self) -> AuthResult<usize> {
1207        let all: Vec<ApiKey> = self
1208            .client
1209            .query_all("auth.api_key.created", 10000)
1210            .await
1211            .map_err(AuthError::from)?;
1212
1213        let now = Utc::now();
1214        let mut deleted = 0;
1215        for key in all {
1216            if let Some(expires_at) = &key.expires_at {
1217                if let Ok(exp) = chrono::DateTime::parse_from_rfc3339(expires_at) {
1218                    if exp <= now {
1219                        self.delete_api_key(&key.id).await?;
1220                        deleted += 1;
1221                    }
1222                }
1223            }
1224        }
1225        Ok(deleted)
1226    }
1227}
1228
1229// ── PasskeyOps ──
1230
1231#[async_trait]
1232impl PasskeyOps for AllsourceAuthAdapter {
1233    type Passkey = Passkey;
1234
1235    async fn create_passkey(&self, input: CreatePasskey) -> AuthResult<Passkey> {
1236        // Check credential_id uniqueness
1237        if self
1238            .get_passkey_by_credential_id(&input.credential_id)
1239            .await?
1240            .is_some()
1241        {
1242            return Err(AuthError::conflict(
1243                "A passkey with this credential ID already exists",
1244            ));
1245        }
1246
1247        let id = Uuid::new_v4().to_string();
1248        let now = Utc::now();
1249
1250        let passkey = Passkey {
1251            id: id.clone(),
1252            name: input.name,
1253            public_key: input.public_key,
1254            user_id: input.user_id,
1255            credential_id: input.credential_id,
1256            counter: input.counter,
1257            device_type: input.device_type,
1258            backed_up: input.backed_up,
1259            transports: input.transports,
1260            created_at: now,
1261        };
1262
1263        let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1264        self.client
1265            .append_event(&passkey_entity(&id), "auth.passkey.created", payload)
1266            .await
1267            .map_err(AuthError::from)?;
1268
1269        Ok(passkey)
1270    }
1271
1272    async fn get_passkey_by_id(&self, id: &str) -> AuthResult<Option<Passkey>> {
1273        self.client
1274            .get_latest(&passkey_entity(id))
1275            .await
1276            .map_err(AuthError::from)
1277    }
1278
1279    async fn get_passkey_by_credential_id(
1280        &self,
1281        credential_id: &str,
1282    ) -> AuthResult<Option<Passkey>> {
1283        self.client
1284            .find_by_field("auth.passkey.created", "credentialID", credential_id)
1285            .await
1286            .map_err(AuthError::from)
1287    }
1288
1289    async fn list_passkeys_by_user(&self, user_id: &str) -> AuthResult<Vec<Passkey>> {
1290        let mut passkeys: Vec<Passkey> = self
1291            .client
1292            .find_all_by_field("auth.passkey.created", "user_id", user_id)
1293            .await
1294            .map_err(AuthError::from)?;
1295
1296        passkeys.sort_by_key(|p| std::cmp::Reverse(p.created_at));
1297        Ok(passkeys)
1298    }
1299
1300    async fn update_passkey_counter(&self, id: &str, counter: u64) -> AuthResult<Passkey> {
1301        let mut passkey = self
1302            .get_passkey_by_id(id)
1303            .await?
1304            .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1305
1306        passkey.counter = counter;
1307
1308        let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1309        self.client
1310            .append_event(&passkey_entity(id), "auth.passkey.updated", payload)
1311            .await
1312            .map_err(AuthError::from)?;
1313
1314        Ok(passkey)
1315    }
1316
1317    async fn update_passkey_name(&self, id: &str, name: &str) -> AuthResult<Passkey> {
1318        let mut passkey = self
1319            .get_passkey_by_id(id)
1320            .await?
1321            .ok_or_else(|| AuthError::not_found("Passkey not found"))?;
1322
1323        passkey.name = name.to_string();
1324
1325        let payload = serde_json::to_value(&passkey).map_err(AuthError::from)?;
1326        self.client
1327            .append_event(&passkey_entity(id), "auth.passkey.updated", payload)
1328            .await
1329            .map_err(AuthError::from)?;
1330
1331        Ok(passkey)
1332    }
1333
1334    async fn delete_passkey(&self, id: &str) -> AuthResult<()> {
1335        self.client
1336            .append_delete(&passkey_entity(id), "auth.passkey.deleted")
1337            .await
1338            .map_err(AuthError::from)
1339    }
1340}