1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use sqlx::{postgres::PgPoolOptions, PgPool, Row};
4use uuid::Uuid;
5
6use authx_core::{
7 error::{AuthError, Result, StorageError},
8 models::{
9 ApiKey, AuditLog, AuthorizationCode, CreateApiKey, CreateAuditLog, CreateAuthorizationCode,
10 CreateCredential, CreateDeviceCode, CreateInvite, CreateOidcClient,
11 CreateOidcFederationProvider, CreateOidcToken, CreateOrg, CreateSession, CreateUser,
12 Credential, CredentialKind, DeviceCode, Invite, Membership, OAuthAccount, OidcClient,
13 OidcFederationProvider, OidcToken, OidcTokenType, Organization, Role, Session, UpdateUser,
14 UpsertOAuthAccount, User,
15 },
16};
17
18use crate::ports::{
19 ApiKeyRepository, AuditLogRepository, AuthorizationCodeRepository, CredentialRepository,
20 DeviceCodeRepository, InviteRepository, OAuthAccountRepository, OidcClientRepository,
21 OidcFederationProviderRepository, OidcTokenRepository, OrgRepository, SessionRepository,
22 UserRepository,
23};
24
25#[derive(Clone)]
31pub struct PostgresStore {
32 pub pool: PgPool,
33}
34
35impl PostgresStore {
36 pub async fn connect(database_url: &str) -> std::result::Result<Self, sqlx::Error> {
37 let pool = PgPoolOptions::new()
38 .max_connections(10)
39 .connect(database_url)
40 .await?;
41 tracing::info!("postgres pool connected");
42 Ok(Self { pool })
43 }
44
45 pub fn from_pool(pool: PgPool) -> Self {
46 Self { pool }
47 }
48
49 pub async fn migrate(pool: &PgPool) -> std::result::Result<(), sqlx::migrate::MigrateError> {
50 sqlx::migrate!("src/sqlx/migrations").run(pool).await?;
51 tracing::info!("database migrations applied");
52 Ok(())
53 }
54}
55
56fn db_err(e: sqlx::Error) -> AuthError {
59 match e {
60 sqlx::Error::RowNotFound => AuthError::Storage(StorageError::NotFound),
61 sqlx::Error::Database(ref dbe) if dbe.constraint().is_some() => {
62 AuthError::Storage(StorageError::Conflict(dbe.message().to_owned()))
63 }
64 other => AuthError::Storage(StorageError::Database(other.to_string())),
65 }
66}
67
68fn credential_kind_str(k: &CredentialKind) -> &'static str {
69 match k {
70 CredentialKind::Password => "password",
71 CredentialKind::Passkey => "passkey",
72 CredentialKind::Webauthn => "webauthn",
73 CredentialKind::OauthToken => "oauth_token",
74 }
75}
76
77fn credential_kind_from_str(s: &str) -> CredentialKind {
78 match s {
79 "passkey" => CredentialKind::Passkey,
80 "webauthn" => CredentialKind::Webauthn,
81 "oauth_token" => CredentialKind::OauthToken,
82 _ => CredentialKind::Password,
83 }
84}
85
86fn map_user(r: &sqlx::postgres::PgRow) -> User {
87 User {
88 id: r.get("id"),
89 email: r.get("email"),
90 email_verified: r.get("email_verified"),
91 username: r.get("username"),
92 created_at: r.get("created_at"),
93 updated_at: r.get("updated_at"),
94 metadata: r.get::<serde_json::Value, _>("metadata"),
95 }
96}
97
98fn map_session(r: &sqlx::postgres::PgRow) -> Session {
99 Session {
100 id: r.get("id"),
101 user_id: r.get("user_id"),
102 token_hash: r.get("token_hash"),
103 device_info: r.get::<serde_json::Value, _>("device_info"),
104 ip_address: r.get("ip_address"),
105 org_id: r.get("org_id"),
106 expires_at: r.get("expires_at"),
107 created_at: r.get("created_at"),
108 }
109}
110
111fn map_audit_log(r: &sqlx::postgres::PgRow) -> AuditLog {
112 AuditLog {
113 id: r.get("id"),
114 user_id: r.get("user_id"),
115 org_id: r.get("org_id"),
116 action: r.get("action"),
117 resource_type: r.get("resource_type"),
118 resource_id: r.get("resource_id"),
119 ip_address: r.get("ip_address"),
120 metadata: r.get::<serde_json::Value, _>("metadata"),
121 created_at: r.get("created_at"),
122 }
123}
124
125#[async_trait]
128impl UserRepository for PostgresStore {
129 async fn find_by_id(&self, id: Uuid) -> Result<Option<User>> {
130 let row = sqlx::query(
131 "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
132 FROM authx_users WHERE id = $1",
133 )
134 .bind(id)
135 .fetch_optional(&self.pool)
136 .await
137 .map_err(db_err)?;
138 Ok(row.as_ref().map(map_user))
139 }
140
141 async fn find_by_email(&self, email: &str) -> Result<Option<User>> {
142 let row = sqlx::query(
143 "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
144 FROM authx_users WHERE email = $1",
145 )
146 .bind(email)
147 .fetch_optional(&self.pool)
148 .await
149 .map_err(db_err)?;
150 Ok(row.as_ref().map(map_user))
151 }
152
153 async fn find_by_username(&self, username: &str) -> Result<Option<User>> {
154 let row = sqlx::query(
155 "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
156 FROM authx_users WHERE username = $1",
157 )
158 .bind(username)
159 .fetch_optional(&self.pool)
160 .await
161 .map_err(db_err)?;
162 Ok(row.as_ref().map(map_user))
163 }
164
165 async fn list(&self, offset: u32, limit: u32) -> Result<Vec<User>> {
166 let rows = sqlx::query(
167 "SELECT id, email, email_verified, username, created_at, updated_at, metadata \
168 FROM authx_users ORDER BY created_at ASC LIMIT $1 OFFSET $2",
169 )
170 .bind(limit as i64)
171 .bind(offset as i64)
172 .fetch_all(&self.pool)
173 .await
174 .map_err(db_err)?;
175 Ok(rows.iter().map(map_user).collect())
176 }
177
178 async fn create(&self, data: CreateUser) -> Result<User> {
179 let meta = data.metadata.unwrap_or(serde_json::Value::Null);
180 let row = sqlx::query(
181 "INSERT INTO authx_users (id, email, email_verified, username, metadata) \
182 VALUES ($1, $2, false, $3, $4) \
183 RETURNING id, email, email_verified, username, created_at, updated_at, metadata",
184 )
185 .bind(Uuid::new_v4())
186 .bind(&data.email)
187 .bind(data.username.as_deref())
188 .bind(&meta)
189 .fetch_one(&self.pool)
190 .await
191 .map_err(|e| {
192 if let sqlx::Error::Database(ref dbe) = e {
193 if dbe.constraint() == Some("authx_users_email_key") {
194 return AuthError::EmailTaken;
195 }
196 if dbe.constraint() == Some("authx_users_username_key") {
197 return AuthError::Storage(StorageError::Conflict(
198 "username already taken".into(),
199 ));
200 }
201 }
202 db_err(e)
203 })?;
204
205 tracing::debug!(email = %data.email, "user row inserted");
206 Ok(map_user(&row))
207 }
208
209 async fn update(&self, id: Uuid, data: UpdateUser) -> Result<User> {
210 let row = sqlx::query(
211 "UPDATE authx_users \
212 SET \
213 email = COALESCE($2, email), \
214 email_verified = COALESCE($3, email_verified), \
215 username = COALESCE($4, username), \
216 metadata = COALESCE($5, metadata), \
217 updated_at = NOW() \
218 WHERE id = $1 \
219 RETURNING id, email, email_verified, username, created_at, updated_at, metadata",
220 )
221 .bind(id)
222 .bind(data.email.as_deref())
223 .bind(data.email_verified)
224 .bind(data.username.as_deref())
225 .bind(data.metadata.as_ref())
226 .fetch_optional(&self.pool)
227 .await
228 .map_err(db_err)?
229 .ok_or(AuthError::UserNotFound)?;
230
231 tracing::debug!(user_id = %id, "user row updated");
232 Ok(map_user(&row))
233 }
234
235 async fn delete(&self, id: Uuid) -> Result<()> {
236 let result = sqlx::query("DELETE FROM authx_users WHERE id = $1")
237 .bind(id)
238 .execute(&self.pool)
239 .await
240 .map_err(db_err)?;
241
242 if result.rows_affected() == 0 {
243 return Err(AuthError::UserNotFound);
244 }
245 tracing::debug!(user_id = %id, "user row deleted");
246 Ok(())
247 }
248}
249
250#[async_trait]
253impl SessionRepository for PostgresStore {
254 async fn create(&self, data: CreateSession) -> Result<Session> {
255 let row = sqlx::query(
256 "INSERT INTO authx_sessions \
257 (id, user_id, token_hash, device_info, ip_address, org_id, expires_at) \
258 VALUES ($1, $2, $3, $4, $5, $6, $7) \
259 RETURNING id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at",
260 )
261 .bind(Uuid::new_v4())
262 .bind(data.user_id)
263 .bind(&data.token_hash)
264 .bind(&data.device_info)
265 .bind(&data.ip_address)
266 .bind(data.org_id)
267 .bind(data.expires_at)
268 .fetch_one(&self.pool)
269 .await
270 .map_err(db_err)?;
271
272 tracing::debug!(user_id = %data.user_id, "session row inserted");
273 Ok(map_session(&row))
274 }
275
276 async fn find_by_token_hash(&self, hash: &str) -> Result<Option<Session>> {
277 let row = sqlx::query(
278 "SELECT id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at \
279 FROM authx_sessions WHERE token_hash = $1 AND expires_at > NOW()",
280 )
281 .bind(hash)
282 .fetch_optional(&self.pool)
283 .await
284 .map_err(db_err)?;
285 Ok(row.as_ref().map(map_session))
286 }
287
288 async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<Session>> {
289 let rows = sqlx::query(
290 "SELECT id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at \
291 FROM authx_sessions WHERE user_id = $1",
292 )
293 .bind(user_id)
294 .fetch_all(&self.pool)
295 .await
296 .map_err(db_err)?;
297 Ok(rows.iter().map(map_session).collect())
298 }
299
300 async fn invalidate(&self, session_id: Uuid) -> Result<()> {
301 let result = sqlx::query("DELETE FROM authx_sessions WHERE id = $1")
302 .bind(session_id)
303 .execute(&self.pool)
304 .await
305 .map_err(db_err)?;
306
307 if result.rows_affected() == 0 {
308 return Err(AuthError::SessionNotFound);
309 }
310 tracing::debug!(session_id = %session_id, "session invalidated");
311 Ok(())
312 }
313
314 async fn invalidate_all_for_user(&self, user_id: Uuid) -> Result<()> {
315 sqlx::query("DELETE FROM authx_sessions WHERE user_id = $1")
316 .bind(user_id)
317 .execute(&self.pool)
318 .await
319 .map_err(db_err)?;
320 tracing::debug!(user_id = %user_id, "all user sessions invalidated");
321 Ok(())
322 }
323
324 async fn set_org(&self, session_id: Uuid, org_id: Option<Uuid>) -> Result<Session> {
325 let row = sqlx::query(
326 "UPDATE authx_sessions SET org_id = $2 WHERE id = $1 \
327 RETURNING id, user_id, token_hash, device_info, ip_address, org_id, expires_at, created_at",
328 )
329 .bind(session_id)
330 .bind(org_id)
331 .fetch_optional(&self.pool)
332 .await
333 .map_err(db_err)?
334 .ok_or(AuthError::SessionNotFound)?;
335
336 tracing::debug!(session_id = %session_id, "session org updated");
337 Ok(map_session(&row))
338 }
339}
340
341#[async_trait]
344impl CredentialRepository for PostgresStore {
345 async fn create(&self, data: CreateCredential) -> Result<Credential> {
346 let kind_str = credential_kind_str(&data.kind);
347 let meta = data.metadata.unwrap_or(serde_json::Value::Null);
348
349 let row = sqlx::query(
350 "INSERT INTO authx_credentials (id, user_id, kind, credential_hash, metadata) \
351 VALUES ($1, $2, $3, $4, $5) \
352 RETURNING id, user_id, kind, credential_hash, metadata",
353 )
354 .bind(Uuid::new_v4())
355 .bind(data.user_id)
356 .bind(kind_str)
357 .bind(&data.credential_hash)
358 .bind(&meta)
359 .fetch_one(&self.pool)
360 .await
361 .map_err(db_err)?;
362
363 tracing::debug!(user_id = %data.user_id, kind = kind_str, "credential inserted");
364 Ok(Credential {
365 id: row.get("id"),
366 user_id: row.get("user_id"),
367 kind: credential_kind_from_str(row.get("kind")),
368 credential_hash: row.get("credential_hash"),
369 metadata: row.get::<serde_json::Value, _>("metadata"),
370 })
371 }
372
373 async fn find_password_hash(&self, user_id: Uuid) -> Result<Option<String>> {
374 let row = sqlx::query(
375 "SELECT credential_hash FROM authx_credentials \
376 WHERE user_id = $1 AND kind = 'password'",
377 )
378 .bind(user_id)
379 .fetch_optional(&self.pool)
380 .await
381 .map_err(db_err)?;
382 Ok(row.map(|r| r.get("credential_hash")))
383 }
384
385 async fn find_by_user_and_kind(
386 &self,
387 user_id: Uuid,
388 kind: CredentialKind,
389 ) -> Result<Option<Credential>> {
390 let row = sqlx::query(
391 "SELECT id, user_id, kind, credential_hash, metadata \
392 FROM authx_credentials WHERE user_id = $1 AND kind = $2",
393 )
394 .bind(user_id)
395 .bind(credential_kind_str(&kind))
396 .fetch_optional(&self.pool)
397 .await
398 .map_err(db_err)?;
399
400 Ok(row.map(|r| Credential {
401 id: r.get("id"),
402 user_id: r.get("user_id"),
403 kind: credential_kind_from_str(r.get("kind")),
404 credential_hash: r.get("credential_hash"),
405 metadata: r.get::<serde_json::Value, _>("metadata"),
406 }))
407 }
408
409 async fn delete_by_user_and_kind(&self, user_id: Uuid, kind: CredentialKind) -> Result<()> {
410 let result = sqlx::query("DELETE FROM authx_credentials WHERE user_id = $1 AND kind = $2")
411 .bind(user_id)
412 .bind(credential_kind_str(&kind))
413 .execute(&self.pool)
414 .await
415 .map_err(db_err)?;
416
417 if result.rows_affected() == 0 {
418 return Err(AuthError::Storage(StorageError::NotFound));
419 }
420 Ok(())
421 }
422}
423
424fn map_org(r: &sqlx::postgres::PgRow) -> Organization {
427 Organization {
428 id: r.get("id"),
429 name: r.get("name"),
430 slug: r.get("slug"),
431 metadata: r.get::<serde_json::Value, _>("metadata"),
432 created_at: r.get("created_at"),
433 }
434}
435
436fn map_membership(r: &sqlx::postgres::PgRow) -> Membership {
437 Membership {
438 id: r.get("id"),
439 user_id: r.get("user_id"),
440 org_id: r.get("org_id"),
441 role: Role {
442 id: r.get("role_id"),
443 org_id: r.get("role_org_id"),
444 name: r.get("role_name"),
445 permissions: r.get::<Vec<String>, _>("permissions"),
446 },
447 created_at: r.get("created_at"),
448 }
449}
450
451#[async_trait]
452impl OrgRepository for PostgresStore {
453 async fn create(&self, data: CreateOrg) -> Result<Organization> {
454 let meta = data.metadata.unwrap_or(serde_json::Value::Null);
455 let row = sqlx::query(
456 "INSERT INTO authx_orgs (id, name, slug, metadata) \
457 VALUES ($1, $2, $3, $4) \
458 RETURNING id, name, slug, metadata, created_at",
459 )
460 .bind(Uuid::new_v4())
461 .bind(&data.name)
462 .bind(&data.slug)
463 .bind(&meta)
464 .fetch_one(&self.pool)
465 .await
466 .map_err(|e| {
467 if let sqlx::Error::Database(ref dbe) = e {
468 if dbe.constraint() == Some("authx_orgs_slug_key") {
469 return AuthError::Storage(StorageError::Conflict(format!(
470 "slug '{}' already taken",
471 data.slug
472 )));
473 }
474 }
475 db_err(e)
476 })?;
477
478 tracing::debug!(slug = %data.slug, "org row inserted");
479 Ok(map_org(&row))
480 }
481
482 async fn find_by_id(&self, id: Uuid) -> Result<Option<Organization>> {
483 let row = sqlx::query(
484 "SELECT id, name, slug, metadata, created_at FROM authx_orgs WHERE id = $1",
485 )
486 .bind(id)
487 .fetch_optional(&self.pool)
488 .await
489 .map_err(db_err)?;
490 Ok(row.as_ref().map(map_org))
491 }
492
493 async fn find_by_slug(&self, slug: &str) -> Result<Option<Organization>> {
494 let row = sqlx::query(
495 "SELECT id, name, slug, metadata, created_at FROM authx_orgs WHERE slug = $1",
496 )
497 .bind(slug)
498 .fetch_optional(&self.pool)
499 .await
500 .map_err(db_err)?;
501 Ok(row.as_ref().map(map_org))
502 }
503
504 async fn add_member(&self, org_id: Uuid, user_id: Uuid, role_id: Uuid) -> Result<Membership> {
505 let role_row =
506 sqlx::query("SELECT id, org_id, name, permissions FROM authx_roles WHERE id = $1")
507 .bind(role_id)
508 .fetch_optional(&self.pool)
509 .await
510 .map_err(db_err)?
511 .ok_or(AuthError::Storage(StorageError::NotFound))?;
512
513 let role = Role {
514 id: role_row.get("id"),
515 org_id: role_row.get("org_id"),
516 name: role_row.get("name"),
517 permissions: role_row.get::<Vec<String>, _>("permissions"),
518 };
519
520 let row = sqlx::query(
521 "INSERT INTO authx_memberships (id, user_id, org_id, role_id) \
522 VALUES ($1, $2, $3, $4) \
523 RETURNING id, user_id, org_id, created_at",
524 )
525 .bind(Uuid::new_v4())
526 .bind(user_id)
527 .bind(org_id)
528 .bind(role_id)
529 .fetch_one(&self.pool)
530 .await
531 .map_err(db_err)?;
532
533 tracing::debug!(org_id = %org_id, user_id = %user_id, "member added");
534 Ok(Membership {
535 id: row.get("id"),
536 user_id: row.get("user_id"),
537 org_id: row.get("org_id"),
538 role,
539 created_at: row.get("created_at"),
540 })
541 }
542
543 async fn remove_member(&self, org_id: Uuid, user_id: Uuid) -> Result<()> {
544 let result =
545 sqlx::query("DELETE FROM authx_memberships WHERE org_id = $1 AND user_id = $2")
546 .bind(org_id)
547 .bind(user_id)
548 .execute(&self.pool)
549 .await
550 .map_err(db_err)?;
551
552 if result.rows_affected() == 0 {
553 return Err(AuthError::Storage(StorageError::NotFound));
554 }
555 Ok(())
556 }
557
558 async fn get_members(&self, org_id: Uuid) -> Result<Vec<Membership>> {
559 let rows = sqlx::query(
560 "SELECT m.id, m.user_id, m.org_id, m.created_at, \
561 r.id AS role_id, r.org_id AS role_org_id, r.name AS role_name, r.permissions \
562 FROM authx_memberships m \
563 JOIN authx_roles r ON r.id = m.role_id \
564 WHERE m.org_id = $1",
565 )
566 .bind(org_id)
567 .fetch_all(&self.pool)
568 .await
569 .map_err(db_err)?;
570 Ok(rows.iter().map(map_membership).collect())
571 }
572
573 async fn find_roles(&self, org_id: Uuid) -> Result<Vec<Role>> {
574 let rows =
575 sqlx::query("SELECT id, org_id, name, permissions FROM authx_roles WHERE org_id = $1")
576 .bind(org_id)
577 .fetch_all(&self.pool)
578 .await
579 .map_err(db_err)?;
580 Ok(rows
581 .iter()
582 .map(|r| Role {
583 id: r.get("id"),
584 org_id: r.get("org_id"),
585 name: r.get("name"),
586 permissions: r.get::<Vec<String>, _>("permissions"),
587 })
588 .collect())
589 }
590
591 async fn create_role(
592 &self,
593 org_id: Uuid,
594 name: String,
595 permissions: Vec<String>,
596 ) -> Result<Role> {
597 let row = sqlx::query(
598 "INSERT INTO authx_roles (id, org_id, name, permissions) \
599 VALUES ($1, $2, $3, $4) \
600 RETURNING id, org_id, name, permissions",
601 )
602 .bind(Uuid::new_v4())
603 .bind(org_id)
604 .bind(&name)
605 .bind(&permissions)
606 .fetch_one(&self.pool)
607 .await
608 .map_err(db_err)?;
609
610 tracing::debug!(org_id = %org_id, name = %name, "role created");
611 Ok(Role {
612 id: row.get("id"),
613 org_id: row.get("org_id"),
614 name: row.get("name"),
615 permissions: row.get::<Vec<String>, _>("permissions"),
616 })
617 }
618
619 async fn update_member_role(
620 &self,
621 org_id: Uuid,
622 user_id: Uuid,
623 role_id: Uuid,
624 ) -> Result<Membership> {
625 sqlx::query("UPDATE authx_memberships SET role_id = $3 WHERE org_id = $1 AND user_id = $2")
626 .bind(org_id)
627 .bind(user_id)
628 .bind(role_id)
629 .execute(&self.pool)
630 .await
631 .map_err(db_err)?;
632
633 let rows = sqlx::query(
634 "SELECT m.id, m.user_id, m.org_id, m.created_at, \
635 r.id AS role_id, r.org_id AS role_org_id, r.name AS role_name, r.permissions \
636 FROM authx_memberships m \
637 JOIN authx_roles r ON r.id = m.role_id \
638 WHERE m.org_id = $1 AND m.user_id = $2",
639 )
640 .bind(org_id)
641 .bind(user_id)
642 .fetch_optional(&self.pool)
643 .await
644 .map_err(db_err)?
645 .ok_or(AuthError::Storage(StorageError::NotFound))?;
646
647 Ok(map_membership(&rows))
648 }
649}
650
651#[async_trait]
654impl AuditLogRepository for PostgresStore {
655 async fn append(&self, entry: CreateAuditLog) -> Result<AuditLog> {
656 let meta = entry.metadata.unwrap_or(serde_json::Value::Null);
657 let row = sqlx::query(
658 "INSERT INTO authx_audit_logs \
659 (id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata) \
660 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
661 RETURNING id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata, created_at",
662 )
663 .bind(Uuid::new_v4())
664 .bind(entry.user_id)
665 .bind(entry.org_id)
666 .bind(&entry.action)
667 .bind(&entry.resource_type)
668 .bind(entry.resource_id.as_deref())
669 .bind(&entry.ip_address)
670 .bind(&meta)
671 .fetch_one(&self.pool)
672 .await
673 .map_err(db_err)?;
674
675 tracing::debug!(action = %entry.action, "audit log appended");
676 Ok(map_audit_log(&row))
677 }
678
679 async fn find_by_user(&self, user_id: Uuid, limit: u32) -> Result<Vec<AuditLog>> {
680 let rows = sqlx::query(
681 "SELECT id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata, created_at \
682 FROM authx_audit_logs WHERE user_id = $1 ORDER BY created_at DESC LIMIT $2",
683 )
684 .bind(user_id)
685 .bind(limit as i64)
686 .fetch_all(&self.pool)
687 .await
688 .map_err(db_err)?;
689 Ok(rows.iter().map(map_audit_log).collect())
690 }
691
692 async fn find_by_org(&self, org_id: Uuid, limit: u32) -> Result<Vec<AuditLog>> {
693 let rows = sqlx::query(
694 "SELECT id, user_id, org_id, action, resource_type, resource_id, ip_address, metadata, created_at \
695 FROM authx_audit_logs WHERE org_id = $1 ORDER BY created_at DESC LIMIT $2",
696 )
697 .bind(org_id)
698 .bind(limit as i64)
699 .fetch_all(&self.pool)
700 .await
701 .map_err(db_err)?;
702 Ok(rows.iter().map(map_audit_log).collect())
703 }
704}
705
706fn map_api_key(r: &sqlx::postgres::PgRow) -> ApiKey {
709 ApiKey {
710 id: r.get("id"),
711 user_id: r.get("user_id"),
712 org_id: r.get("org_id"),
713 key_hash: r.get("key_hash"),
714 prefix: r.get("prefix"),
715 name: r.get("name"),
716 scopes: r.get::<Vec<String>, _>("scopes"),
717 expires_at: r.get("expires_at"),
718 last_used_at: r.get("last_used_at"),
719 }
720}
721
722#[async_trait]
723impl ApiKeyRepository for PostgresStore {
724 async fn create(&self, data: CreateApiKey) -> Result<ApiKey> {
725 let row = sqlx::query(
726 "INSERT INTO authx_api_keys \
727 (id, user_id, org_id, key_hash, prefix, name, scopes, expires_at) \
728 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
729 RETURNING id, user_id, org_id, key_hash, prefix, name, scopes, expires_at, last_used_at",
730 )
731 .bind(Uuid::new_v4())
732 .bind(data.user_id)
733 .bind(data.org_id)
734 .bind(&data.key_hash)
735 .bind(&data.prefix)
736 .bind(&data.name)
737 .bind(&data.scopes)
738 .bind(data.expires_at)
739 .fetch_one(&self.pool)
740 .await
741 .map_err(db_err)?;
742
743 tracing::debug!(user_id = %data.user_id, "api key created");
744 Ok(map_api_key(&row))
745 }
746
747 async fn find_by_hash(&self, key_hash: &str) -> Result<Option<ApiKey>> {
748 let row = sqlx::query(
749 "SELECT id, user_id, org_id, key_hash, prefix, name, scopes, expires_at, last_used_at \
750 FROM authx_api_keys WHERE key_hash = $1",
751 )
752 .bind(key_hash)
753 .fetch_optional(&self.pool)
754 .await
755 .map_err(db_err)?;
756 Ok(row.as_ref().map(map_api_key))
757 }
758
759 async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<ApiKey>> {
760 let rows = sqlx::query(
761 "SELECT id, user_id, org_id, key_hash, prefix, name, scopes, expires_at, last_used_at \
762 FROM authx_api_keys WHERE user_id = $1 ORDER BY id",
763 )
764 .bind(user_id)
765 .fetch_all(&self.pool)
766 .await
767 .map_err(db_err)?;
768 Ok(rows.iter().map(map_api_key).collect())
769 }
770
771 async fn revoke(&self, key_id: Uuid, user_id: Uuid) -> Result<()> {
772 let result = sqlx::query("DELETE FROM authx_api_keys WHERE id = $1 AND user_id = $2")
773 .bind(key_id)
774 .bind(user_id)
775 .execute(&self.pool)
776 .await
777 .map_err(db_err)?;
778
779 if result.rows_affected() == 0 {
780 return Err(AuthError::Storage(StorageError::NotFound));
781 }
782 tracing::debug!(key_id = %key_id, "api key revoked");
783 Ok(())
784 }
785
786 async fn touch_last_used(&self, key_id: Uuid, at: DateTime<Utc>) -> Result<()> {
787 sqlx::query("UPDATE authx_api_keys SET last_used_at = $2 WHERE id = $1")
788 .bind(key_id)
789 .bind(at)
790 .execute(&self.pool)
791 .await
792 .map_err(db_err)?;
793 Ok(())
794 }
795}
796
797fn map_oauth_account(r: &sqlx::postgres::PgRow) -> OAuthAccount {
800 OAuthAccount {
801 id: r.get("id"),
802 user_id: r.get("user_id"),
803 provider: r.get("provider"),
804 provider_user_id: r.get("provider_user_id"),
805 access_token_enc: r.get("access_token_enc"),
806 refresh_token_enc: r.get("refresh_token_enc"),
807 expires_at: r.get("expires_at"),
808 }
809}
810
811#[async_trait]
812impl OAuthAccountRepository for PostgresStore {
813 async fn upsert(&self, data: UpsertOAuthAccount) -> Result<OAuthAccount> {
814 let row = sqlx::query(
815 "INSERT INTO authx_oauth_accounts \
816 (id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at) \
817 VALUES ($1, $2, $3, $4, $5, $6, $7) \
818 ON CONFLICT (provider, provider_user_id) DO UPDATE SET \
819 access_token_enc = EXCLUDED.access_token_enc, \
820 refresh_token_enc = EXCLUDED.refresh_token_enc, \
821 expires_at = EXCLUDED.expires_at \
822 RETURNING id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at",
823 )
824 .bind(Uuid::new_v4())
825 .bind(data.user_id)
826 .bind(&data.provider)
827 .bind(&data.provider_user_id)
828 .bind(&data.access_token_enc)
829 .bind(data.refresh_token_enc.as_deref())
830 .bind(data.expires_at)
831 .fetch_one(&self.pool)
832 .await
833 .map_err(db_err)?;
834
835 tracing::debug!(provider = %data.provider, user_id = %data.user_id, "oauth account upserted");
836 Ok(map_oauth_account(&row))
837 }
838
839 async fn find_by_provider(
840 &self,
841 provider: &str,
842 provider_user_id: &str,
843 ) -> Result<Option<OAuthAccount>> {
844 let row = sqlx::query(
845 "SELECT id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at \
846 FROM authx_oauth_accounts WHERE provider = $1 AND provider_user_id = $2",
847 )
848 .bind(provider)
849 .bind(provider_user_id)
850 .fetch_optional(&self.pool)
851 .await
852 .map_err(db_err)?;
853 Ok(row.as_ref().map(map_oauth_account))
854 }
855
856 async fn find_by_user(&self, user_id: Uuid) -> Result<Vec<OAuthAccount>> {
857 let rows = sqlx::query(
858 "SELECT id, user_id, provider, provider_user_id, access_token_enc, refresh_token_enc, expires_at \
859 FROM authx_oauth_accounts WHERE user_id = $1",
860 )
861 .bind(user_id)
862 .fetch_all(&self.pool)
863 .await
864 .map_err(db_err)?;
865 Ok(rows.iter().map(map_oauth_account).collect())
866 }
867
868 async fn delete(&self, id: Uuid) -> Result<()> {
869 let result = sqlx::query("DELETE FROM authx_oauth_accounts WHERE id = $1")
870 .bind(id)
871 .execute(&self.pool)
872 .await
873 .map_err(db_err)?;
874
875 if result.rows_affected() == 0 {
876 return Err(AuthError::Storage(StorageError::NotFound));
877 }
878 Ok(())
879 }
880}
881
882fn map_invite(r: &sqlx::postgres::PgRow) -> Invite {
885 Invite {
886 id: r.get("id"),
887 org_id: r.get("org_id"),
888 email: r.get("email"),
889 role_id: r.get("role_id"),
890 token_hash: r.get("token_hash"),
891 expires_at: r.get("expires_at"),
892 accepted_at: r.get("accepted_at"),
893 }
894}
895
896#[async_trait]
897impl InviteRepository for PostgresStore {
898 async fn create(&self, data: CreateInvite) -> Result<Invite> {
899 let row = sqlx::query(
900 "INSERT INTO authx_invites (id, org_id, email, role_id, token_hash, expires_at) \
901 VALUES ($1, $2, $3, $4, $5, $6) \
902 RETURNING id, org_id, email, role_id, token_hash, expires_at, accepted_at",
903 )
904 .bind(Uuid::new_v4())
905 .bind(data.org_id)
906 .bind(&data.email)
907 .bind(data.role_id)
908 .bind(&data.token_hash)
909 .bind(data.expires_at)
910 .fetch_one(&self.pool)
911 .await
912 .map_err(db_err)?;
913
914 tracing::debug!(org_id = %data.org_id, email = %data.email, "invite created");
915 Ok(map_invite(&row))
916 }
917
918 async fn find_by_token_hash(&self, hash: &str) -> Result<Option<Invite>> {
919 let row = sqlx::query(
920 "SELECT id, org_id, email, role_id, token_hash, expires_at, accepted_at \
921 FROM authx_invites WHERE token_hash = $1",
922 )
923 .bind(hash)
924 .fetch_optional(&self.pool)
925 .await
926 .map_err(db_err)?;
927 Ok(row.as_ref().map(map_invite))
928 }
929
930 async fn accept(&self, invite_id: Uuid) -> Result<Invite> {
931 let row = sqlx::query(
932 "UPDATE authx_invites SET accepted_at = NOW() WHERE id = $1 \
933 RETURNING id, org_id, email, role_id, token_hash, expires_at, accepted_at",
934 )
935 .bind(invite_id)
936 .fetch_optional(&self.pool)
937 .await
938 .map_err(db_err)?
939 .ok_or(AuthError::Storage(StorageError::NotFound))?;
940
941 Ok(map_invite(&row))
942 }
943
944 async fn delete_expired(&self) -> Result<u64> {
945 let result = sqlx::query(
946 "DELETE FROM authx_invites WHERE accepted_at IS NULL AND expires_at < NOW()",
947 )
948 .execute(&self.pool)
949 .await
950 .map_err(db_err)?;
951 Ok(result.rows_affected())
952 }
953}
954
955fn oidc_token_type_str(t: &OidcTokenType) -> &'static str {
958 match t {
959 OidcTokenType::Access => "access",
960 OidcTokenType::Refresh => "refresh",
961 OidcTokenType::DeviceAccess => "device_access",
962 }
963}
964
965fn oidc_token_type_from_str(s: &str) -> OidcTokenType {
966 match s {
967 "refresh" => OidcTokenType::Refresh,
968 "device_access" => OidcTokenType::DeviceAccess,
969 _ => OidcTokenType::Access,
970 }
971}
972
973fn map_oidc_client(r: &sqlx::postgres::PgRow) -> OidcClient {
974 OidcClient {
975 id: r.get("id"),
976 client_id: r.get("client_id"),
977 secret_hash: r.get("secret_hash"),
978 name: r.get("name"),
979 redirect_uris: r.get::<Vec<String>, _>("redirect_uris"),
980 grant_types: r.get::<Vec<String>, _>("grant_types"),
981 response_types: r.get::<Vec<String>, _>("response_types"),
982 allowed_scopes: r.get("allowed_scopes"),
983 created_at: r.get("created_at"),
984 }
985}
986
987#[async_trait]
988impl OidcClientRepository for PostgresStore {
989 async fn create(&self, data: CreateOidcClient) -> Result<OidcClient> {
990 let client_id = Uuid::new_v4().to_string();
991 let row = sqlx::query(
992 "INSERT INTO authx_oidc_clients \
993 (id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes) \
994 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
995 RETURNING id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes, created_at",
996 )
997 .bind(Uuid::new_v4())
998 .bind(&client_id)
999 .bind(&data.secret_hash)
1000 .bind(&data.name)
1001 .bind(&data.redirect_uris)
1002 .bind(&data.grant_types)
1003 .bind(&data.response_types)
1004 .bind(&data.allowed_scopes)
1005 .fetch_one(&self.pool)
1006 .await
1007 .map_err(db_err)?;
1008
1009 tracing::debug!(client_id = %client_id, "oidc client created");
1010 Ok(map_oidc_client(&row))
1011 }
1012
1013 async fn find_by_client_id(&self, client_id: &str) -> Result<Option<OidcClient>> {
1014 let row = sqlx::query(
1015 "SELECT id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes, created_at \
1016 FROM authx_oidc_clients WHERE client_id = $1",
1017 )
1018 .bind(client_id)
1019 .fetch_optional(&self.pool)
1020 .await
1021 .map_err(db_err)?;
1022 Ok(row.as_ref().map(map_oidc_client))
1023 }
1024
1025 async fn list(&self, offset: u32, limit: u32) -> Result<Vec<OidcClient>> {
1026 let rows = sqlx::query(
1027 "SELECT id, client_id, secret_hash, name, redirect_uris, grant_types, response_types, allowed_scopes, created_at \
1028 FROM authx_oidc_clients ORDER BY created_at ASC LIMIT $1 OFFSET $2",
1029 )
1030 .bind(limit as i64)
1031 .bind(offset as i64)
1032 .fetch_all(&self.pool)
1033 .await
1034 .map_err(db_err)?;
1035 Ok(rows.iter().map(map_oidc_client).collect())
1036 }
1037}
1038
1039fn map_authorization_code(r: &sqlx::postgres::PgRow) -> AuthorizationCode {
1042 AuthorizationCode {
1043 id: r.get("id"),
1044 code_hash: r.get("code_hash"),
1045 client_id: r.get("client_id"),
1046 user_id: r.get("user_id"),
1047 redirect_uri: r.get("redirect_uri"),
1048 scope: r.get("scope"),
1049 nonce: r.get("nonce"),
1050 code_challenge: r.get("code_challenge"),
1051 expires_at: r.get("expires_at"),
1052 used: r.get("used"),
1053 }
1054}
1055
1056#[async_trait]
1057impl AuthorizationCodeRepository for PostgresStore {
1058 async fn create(&self, data: CreateAuthorizationCode) -> Result<AuthorizationCode> {
1059 let row = sqlx::query(
1060 "INSERT INTO authx_oidc_authorization_codes \
1061 (id, code_hash, client_id, user_id, redirect_uri, scope, nonce, code_challenge, expires_at) \
1062 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
1063 RETURNING id, code_hash, client_id, user_id, redirect_uri, scope, nonce, code_challenge, expires_at, used",
1064 )
1065 .bind(Uuid::new_v4())
1066 .bind(&data.code_hash)
1067 .bind(&data.client_id)
1068 .bind(data.user_id)
1069 .bind(&data.redirect_uri)
1070 .bind(&data.scope)
1071 .bind(data.nonce.as_deref())
1072 .bind(data.code_challenge.as_deref())
1073 .bind(data.expires_at)
1074 .fetch_one(&self.pool)
1075 .await
1076 .map_err(db_err)?;
1077
1078 tracing::debug!(client_id = %data.client_id, "authorization code created");
1079 Ok(map_authorization_code(&row))
1080 }
1081
1082 async fn find_by_code_hash(&self, hash: &str) -> Result<Option<AuthorizationCode>> {
1083 let row = sqlx::query(
1084 "SELECT id, code_hash, client_id, user_id, redirect_uri, scope, nonce, code_challenge, expires_at, used \
1085 FROM authx_oidc_authorization_codes WHERE code_hash = $1 AND expires_at > NOW() AND used = false",
1086 )
1087 .bind(hash)
1088 .fetch_optional(&self.pool)
1089 .await
1090 .map_err(db_err)?;
1091 Ok(row.as_ref().map(map_authorization_code))
1092 }
1093
1094 async fn mark_used(&self, id: Uuid) -> Result<()> {
1095 let result =
1096 sqlx::query("UPDATE authx_oidc_authorization_codes SET used = true WHERE id = $1")
1097 .bind(id)
1098 .execute(&self.pool)
1099 .await
1100 .map_err(db_err)?;
1101
1102 if result.rows_affected() == 0 {
1103 return Err(AuthError::Storage(StorageError::NotFound));
1104 }
1105 Ok(())
1106 }
1107
1108 async fn delete_expired(&self) -> Result<u64> {
1109 let result =
1110 sqlx::query("DELETE FROM authx_oidc_authorization_codes WHERE expires_at < NOW()")
1111 .execute(&self.pool)
1112 .await
1113 .map_err(db_err)?;
1114 Ok(result.rows_affected())
1115 }
1116}
1117
1118fn map_oidc_token(r: &sqlx::postgres::PgRow) -> OidcToken {
1121 OidcToken {
1122 id: r.get("id"),
1123 token_hash: r.get("token_hash"),
1124 client_id: r.get("client_id"),
1125 user_id: r.get("user_id"),
1126 scope: r.get("scope"),
1127 token_type: oidc_token_type_from_str(r.get::<&str, _>("token_type")),
1128 expires_at: r.get("expires_at"),
1129 revoked: r.get("revoked"),
1130 created_at: r.get("created_at"),
1131 }
1132}
1133
1134#[async_trait]
1135impl OidcTokenRepository for PostgresStore {
1136 async fn create(&self, data: CreateOidcToken) -> Result<OidcToken> {
1137 let row = sqlx::query(
1138 "INSERT INTO authx_oidc_tokens \
1139 (id, token_hash, client_id, user_id, scope, token_type, expires_at) \
1140 VALUES ($1, $2, $3, $4, $5, $6, $7) \
1141 RETURNING id, token_hash, client_id, user_id, scope, token_type, expires_at, revoked, created_at",
1142 )
1143 .bind(Uuid::new_v4())
1144 .bind(&data.token_hash)
1145 .bind(&data.client_id)
1146 .bind(data.user_id)
1147 .bind(&data.scope)
1148 .bind(oidc_token_type_str(&data.token_type))
1149 .bind(data.expires_at)
1150 .fetch_one(&self.pool)
1151 .await
1152 .map_err(db_err)?;
1153
1154 tracing::debug!(client_id = %data.client_id, "oidc token created");
1155 Ok(map_oidc_token(&row))
1156 }
1157
1158 async fn find_by_token_hash(&self, hash: &str) -> Result<Option<OidcToken>> {
1159 let row = sqlx::query(
1160 "SELECT id, token_hash, client_id, user_id, scope, token_type, expires_at, revoked, created_at \
1161 FROM authx_oidc_tokens WHERE token_hash = $1 AND revoked = false",
1162 )
1163 .bind(hash)
1164 .fetch_optional(&self.pool)
1165 .await
1166 .map_err(db_err)?;
1167
1168 if let Some(ref r) = row {
1169 let tok = map_oidc_token(r);
1170 if let Some(exp) = tok.expires_at {
1171 if exp < Utc::now() {
1172 return Ok(None);
1173 }
1174 }
1175 }
1176 Ok(row.as_ref().map(map_oidc_token))
1177 }
1178
1179 async fn revoke(&self, id: Uuid) -> Result<()> {
1180 let result = sqlx::query("UPDATE authx_oidc_tokens SET revoked = true WHERE id = $1")
1181 .bind(id)
1182 .execute(&self.pool)
1183 .await
1184 .map_err(db_err)?;
1185
1186 if result.rows_affected() == 0 {
1187 return Err(AuthError::Storage(StorageError::NotFound));
1188 }
1189 Ok(())
1190 }
1191
1192 async fn revoke_all_for_user_client(&self, user_id: Uuid, client_id: &str) -> Result<()> {
1193 sqlx::query(
1194 "UPDATE authx_oidc_tokens SET revoked = true WHERE user_id = $1 AND client_id = $2",
1195 )
1196 .bind(user_id)
1197 .bind(client_id)
1198 .execute(&self.pool)
1199 .await
1200 .map_err(db_err)?;
1201 Ok(())
1202 }
1203}
1204
1205fn map_oidc_federation_provider(r: &sqlx::postgres::PgRow) -> OidcFederationProvider {
1208 let claim_mapping_json: serde_json::Value =
1209 r.try_get("claim_mapping").unwrap_or(serde_json::json!([]));
1210 let claim_mapping = serde_json::from_value(claim_mapping_json).unwrap_or_default();
1211 OidcFederationProvider {
1212 id: r.get("id"),
1213 name: r.get("name"),
1214 issuer: r.get("issuer"),
1215 client_id: r.get("client_id"),
1216 secret_enc: r.get("secret_enc"),
1217 scopes: r.get("scopes"),
1218 org_id: r.try_get("org_id").ok(),
1219 enabled: r.get("enabled"),
1220 created_at: r.get("created_at"),
1221 claim_mapping,
1222 }
1223}
1224
1225#[async_trait]
1226impl OidcFederationProviderRepository for PostgresStore {
1227 async fn create(&self, data: CreateOidcFederationProvider) -> Result<OidcFederationProvider> {
1228 let claim_mapping_json =
1229 serde_json::to_value(&data.claim_mapping).unwrap_or(serde_json::json!([]));
1230 let row = sqlx::query(
1231 "INSERT INTO authx_oidc_federation_providers \
1232 (id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled) \
1233 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true) \
1234 RETURNING id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at",
1235 )
1236 .bind(Uuid::new_v4())
1237 .bind(&data.name)
1238 .bind(&data.issuer)
1239 .bind(&data.client_id)
1240 .bind(&data.secret_enc)
1241 .bind(&data.scopes)
1242 .bind(data.org_id)
1243 .bind(claim_mapping_json)
1244 .fetch_one(&self.pool)
1245 .await
1246 .map_err(db_err)?;
1247
1248 tracing::debug!(name = %data.name, "oidc federation provider created");
1249 Ok(map_oidc_federation_provider(&row))
1250 }
1251
1252 async fn find_by_id(&self, id: Uuid) -> Result<Option<OidcFederationProvider>> {
1253 let row = sqlx::query(
1254 "SELECT id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at \
1255 FROM authx_oidc_federation_providers WHERE id = $1",
1256 )
1257 .bind(id)
1258 .fetch_optional(&self.pool)
1259 .await
1260 .map_err(db_err)?;
1261 Ok(row.as_ref().map(map_oidc_federation_provider))
1262 }
1263
1264 async fn find_by_name(&self, name: &str) -> Result<Option<OidcFederationProvider>> {
1265 let row = sqlx::query(
1266 "SELECT id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at \
1267 FROM authx_oidc_federation_providers WHERE name = $1",
1268 )
1269 .bind(name)
1270 .fetch_optional(&self.pool)
1271 .await
1272 .map_err(db_err)?;
1273 Ok(row.as_ref().map(map_oidc_federation_provider))
1274 }
1275
1276 async fn list_enabled(&self) -> Result<Vec<OidcFederationProvider>> {
1277 let rows = sqlx::query(
1278 "SELECT id, name, issuer, client_id, secret_enc, scopes, org_id, claim_mapping, enabled, created_at \
1279 FROM authx_oidc_federation_providers WHERE enabled = true ORDER BY name",
1280 )
1281 .fetch_all(&self.pool)
1282 .await
1283 .map_err(db_err)?;
1284 Ok(rows.iter().map(map_oidc_federation_provider).collect())
1285 }
1286}
1287
1288const DEVICE_CODE_COLS: &str = "id, device_code_hash, user_code_hash, user_code, client_id, \
1291 scope, expires_at, interval_secs, authorized, denied, user_id, \
1292 last_polled_at";
1293
1294fn map_device_code(r: &sqlx::postgres::PgRow) -> DeviceCode {
1295 DeviceCode {
1296 id: r.get("id"),
1297 device_code_hash: r.get("device_code_hash"),
1298 user_code_hash: r.get("user_code_hash"),
1299 user_code: r.get("user_code"),
1300 client_id: r.get("client_id"),
1301 scope: r.get("scope"),
1302 expires_at: r.get("expires_at"),
1303 interval_secs: r.get::<i32, _>("interval_secs") as u32,
1304 authorized: r.get("authorized"),
1305 denied: r.get("denied"),
1306 user_id: r.get("user_id"),
1307 last_polled_at: r.get("last_polled_at"),
1308 }
1309}
1310
1311#[async_trait]
1312impl DeviceCodeRepository for PostgresStore {
1313 async fn create(&self, data: CreateDeviceCode) -> Result<DeviceCode> {
1314 let row = sqlx::query(&format!(
1315 "INSERT INTO authx_device_codes \
1316 (id, device_code_hash, user_code_hash, user_code, client_id, scope, expires_at, interval_secs) \
1317 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
1318 RETURNING {DEVICE_CODE_COLS}"
1319 ))
1320 .bind(Uuid::new_v4())
1321 .bind(&data.device_code_hash)
1322 .bind(&data.user_code_hash)
1323 .bind(&data.user_code)
1324 .bind(&data.client_id)
1325 .bind(&data.scope)
1326 .bind(data.expires_at)
1327 .bind(data.interval_secs as i32)
1328 .fetch_one(&self.pool)
1329 .await
1330 .map_err(db_err)?;
1331
1332 tracing::debug!(client_id = %data.client_id, "device code created");
1333 Ok(map_device_code(&row))
1334 }
1335
1336 async fn find_by_device_code_hash(&self, hash: &str) -> Result<Option<DeviceCode>> {
1337 let row = sqlx::query(&format!(
1338 "SELECT {DEVICE_CODE_COLS} FROM authx_device_codes \
1339 WHERE device_code_hash = $1 AND expires_at > NOW()"
1340 ))
1341 .bind(hash)
1342 .fetch_optional(&self.pool)
1343 .await
1344 .map_err(db_err)?;
1345 Ok(row.as_ref().map(map_device_code))
1346 }
1347
1348 async fn find_by_user_code_hash(&self, hash: &str) -> Result<Option<DeviceCode>> {
1349 let row = sqlx::query(&format!(
1350 "SELECT {DEVICE_CODE_COLS} FROM authx_device_codes \
1351 WHERE user_code_hash = $1 AND expires_at > NOW() \
1352 AND authorized = false AND denied = false"
1353 ))
1354 .bind(hash)
1355 .fetch_optional(&self.pool)
1356 .await
1357 .map_err(db_err)?;
1358 Ok(row.as_ref().map(map_device_code))
1359 }
1360
1361 async fn authorize(&self, id: Uuid, user_id: Uuid) -> Result<()> {
1362 let result = sqlx::query(
1363 "UPDATE authx_device_codes SET authorized = true, user_id = $2 WHERE id = $1",
1364 )
1365 .bind(id)
1366 .bind(user_id)
1367 .execute(&self.pool)
1368 .await
1369 .map_err(db_err)?;
1370
1371 if result.rows_affected() == 0 {
1372 return Err(AuthError::Storage(StorageError::NotFound));
1373 }
1374 tracing::debug!(id = %id, "device code authorized");
1375 Ok(())
1376 }
1377
1378 async fn deny(&self, id: Uuid) -> Result<()> {
1379 let result = sqlx::query("UPDATE authx_device_codes SET denied = true WHERE id = $1")
1380 .bind(id)
1381 .execute(&self.pool)
1382 .await
1383 .map_err(db_err)?;
1384
1385 if result.rows_affected() == 0 {
1386 return Err(AuthError::Storage(StorageError::NotFound));
1387 }
1388 tracing::debug!(id = %id, "device code denied");
1389 Ok(())
1390 }
1391
1392 async fn update_last_polled(&self, id: Uuid, interval_secs: u32) -> Result<()> {
1393 sqlx::query(
1394 "UPDATE authx_device_codes SET last_polled_at = NOW(), interval_secs = $2 WHERE id = $1",
1395 )
1396 .bind(id)
1397 .bind(interval_secs as i32)
1398 .execute(&self.pool)
1399 .await
1400 .map_err(db_err)?;
1401 Ok(())
1402 }
1403
1404 async fn delete(&self, id: Uuid) -> Result<()> {
1405 sqlx::query("DELETE FROM authx_device_codes WHERE id = $1")
1406 .bind(id)
1407 .execute(&self.pool)
1408 .await
1409 .map_err(db_err)?;
1410 Ok(())
1411 }
1412
1413 async fn delete_expired(&self) -> Result<u64> {
1414 let result = sqlx::query("DELETE FROM authx_device_codes WHERE expires_at < NOW()")
1415 .execute(&self.pool)
1416 .await
1417 .map_err(db_err)?;
1418 Ok(result.rows_affected())
1419 }
1420
1421 async fn list_by_client(
1422 &self,
1423 client_id: &str,
1424 offset: u32,
1425 limit: u32,
1426 ) -> Result<Vec<DeviceCode>> {
1427 let rows = sqlx::query(&format!(
1428 "SELECT {DEVICE_CODE_COLS} FROM authx_device_codes \
1429 WHERE client_id = $1 ORDER BY expires_at DESC LIMIT $2 OFFSET $3"
1430 ))
1431 .bind(client_id)
1432 .bind(limit as i64)
1433 .bind(offset as i64)
1434 .fetch_all(&self.pool)
1435 .await
1436 .map_err(db_err)?;
1437 Ok(rows.iter().map(map_device_code).collect())
1438 }
1439}