torii_storage_postgres/
lib.rs

1//! PostgreSQL storage backend for Torii
2//!
3//! This crate provides a PostgreSQL-based storage implementation for the Torii authentication framework.
4//! It includes implementations for all core storage traits and provides a complete authentication
5//! storage solution using PostgreSQL as the underlying database.
6//!
7//! # Features
8//!
9//! - **User Management**: Store and retrieve user accounts with email verification support
10//! - **Session Management**: Handle user sessions with configurable expiration
11//! - **Password Authentication**: Secure password hashing and verification
12//! - **OAuth Integration**: Store OAuth account connections and tokens
13//! - **Passkey Support**: WebAuthn/FIDO2 passkey storage and challenge management
14//! - **Database Migrations**: Automatic schema management and upgrades
15//! - **Production Ready**: Designed for high-performance production workloads
16//!
17//! # Usage
18//!
19//! ```rust,no_run
20//! use torii_storage_postgres::PostgresStorage;
21//! use torii_core::UserId;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
25//!     // Connect to PostgreSQL database
26//!     let storage = PostgresStorage::connect("postgresql://user:password@localhost/torii").await?;
27//!     
28//!     // Run migrations to set up the schema
29//!     storage.migrate().await?;
30//!     
31//!     // Use with Torii (PostgresRepositoryProvider not yet implemented)
32//!     // let repositories = std::sync::Arc::new(storage.into_repository_provider());
33//!     // let torii = torii::Torii::new(repositories);
34//!     
35//!     Ok(())
36//! }
37//! ```
38//!
39//! # Current Status
40//!
41//! This crate currently provides the base PostgreSQL storage implementation with user and session
42//! management. The full repository provider implementation is still in development.
43//!
44//! # Storage Implementations
45//!
46//! This crate implements the following storage traits:
47//! - [`UserStorage`](torii_core::storage::UserStorage) - User account management
48//! - [`SessionStorage`](torii_core::storage::SessionStorage) - Session management
49//! - Password repository for secure password storage
50//! - OAuth repository for third-party authentication
51//! - Passkey repository for WebAuthn support
52//!
53//! # Database Schema
54//!
55//! The PostgreSQL schema includes tables for:
56//! - `users` - User accounts and profile information
57//! - `sessions` - Active user sessions
58//! - `passwords` - Hashed password credentials
59//! - `oauth_accounts` - Connected OAuth accounts
60//! - `passkeys` - WebAuthn passkey credentials
61//! - `passkey_challenges` - Temporary passkey challenges
62//!
63//! All tables include appropriate indexes and constraints for optimal query performance and data integrity.
64
65mod migrations;
66mod oauth;
67mod passkey;
68mod password;
69mod session;
70
71use async_trait::async_trait;
72use chrono::DateTime;
73use chrono::Utc;
74use migrations::CreateIndexes;
75use migrations::CreateOAuthAccountsTable;
76use migrations::CreatePasskeyChallengesTable;
77use migrations::CreatePasskeysTable;
78use migrations::CreateSessionsTable;
79use migrations::CreateUsersTable;
80use migrations::PostgresMigrationManager;
81use sqlx::PgPool;
82use torii_core::error::StorageError;
83use torii_core::{
84    User, UserId,
85    storage::{NewUser, UserStorage},
86};
87use torii_migration::Migration;
88use torii_migration::MigrationManager;
89
90#[derive(Debug, Clone)]
91pub struct PostgresStorage {
92    pool: PgPool,
93}
94
95impl PostgresStorage {
96    pub fn new(pool: PgPool) -> Self {
97        Self { pool }
98    }
99
100    pub async fn connect(database_url: &str) -> Result<Self, StorageError> {
101        let pool = PgPool::connect(database_url).await.map_err(|e| {
102            tracing::error!(error = %e, "Failed to connect to database");
103            StorageError::Database("Failed to connect to database".to_string())
104        })?;
105
106        Ok(Self::new(pool))
107    }
108
109    pub async fn migrate(&self) -> Result<(), StorageError> {
110        let manager = PostgresMigrationManager::new(self.pool.clone());
111        manager.initialize().await.map_err(|e| {
112            tracing::error!(error = %e, "Failed to initialize migrations");
113            StorageError::Migration("Failed to initialize migrations".to_string())
114        })?;
115
116        let migrations: Vec<Box<dyn Migration<_>>> = vec![
117            Box::new(CreateUsersTable),
118            Box::new(CreateSessionsTable),
119            Box::new(CreateOAuthAccountsTable),
120            Box::new(CreatePasskeysTable),
121            Box::new(CreatePasskeyChallengesTable),
122            Box::new(CreateIndexes),
123        ];
124        manager.up(&migrations).await.map_err(|e| {
125            tracing::error!(error = %e, "Failed to run migrations");
126            StorageError::Migration("Failed to run migrations".to_string())
127        })?;
128
129        Ok(())
130    }
131}
132
133#[derive(Debug, Clone, sqlx::FromRow)]
134pub struct PostgresUser {
135    id: String,
136    email: String,
137    name: Option<String>,
138    email_verified_at: Option<DateTime<Utc>>,
139    created_at: DateTime<Utc>,
140    updated_at: DateTime<Utc>,
141}
142
143impl From<PostgresUser> for User {
144    fn from(user: PostgresUser) -> Self {
145        User::builder()
146            .id(UserId::new(&user.id))
147            .email(user.email)
148            .name(user.name)
149            .email_verified_at(user.email_verified_at)
150            .created_at(user.created_at)
151            .updated_at(user.updated_at)
152            .build()
153            .unwrap()
154    }
155}
156
157impl From<User> for PostgresUser {
158    fn from(user: User) -> Self {
159        PostgresUser {
160            id: user.id.into_inner(),
161            email: user.email,
162            name: user.name,
163            email_verified_at: user.email_verified_at,
164            created_at: user.created_at,
165            updated_at: user.updated_at,
166        }
167    }
168}
169
170#[async_trait]
171impl UserStorage for PostgresStorage {
172    async fn create_user(&self, user: &NewUser) -> Result<User, torii_core::Error> {
173        let user = sqlx::query_as::<_, PostgresUser>(
174            r#"
175            INSERT INTO users (id, email) 
176            VALUES ($1, $2) 
177            RETURNING id, email, name, email_verified_at, created_at, updated_at
178            "#,
179        )
180        .bind(user.id.as_str())
181        .bind(&user.email)
182        .fetch_one(&self.pool)
183        .await
184        .map_err(|e| {
185            tracing::error!(error = %e, "Failed to create user");
186            StorageError::Database("Failed to create user".to_string())
187        })?;
188
189        Ok(user.into())
190    }
191
192    async fn get_user(&self, id: &UserId) -> Result<Option<User>, torii_core::Error> {
193        let user = sqlx::query_as::<_, PostgresUser>(
194            r#"
195            SELECT id, email, name, email_verified_at, created_at, updated_at 
196            FROM users 
197            WHERE id = $1
198            "#,
199        )
200        .bind(id.as_str())
201        .fetch_optional(&self.pool)
202        .await
203        .map_err(|e| {
204            tracing::error!(error = %e, "Failed to get user");
205            StorageError::Database("Failed to get user".to_string())
206        })?;
207
208        match user {
209            Some(user) => Ok(Some(user.into())),
210            None => Ok(None),
211        }
212    }
213
214    async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, torii_core::Error> {
215        let user = sqlx::query_as::<_, PostgresUser>(
216            r#"
217            SELECT id, email, name, email_verified_at, created_at, updated_at 
218            FROM users 
219            WHERE email = $1
220            "#,
221        )
222        .bind(email)
223        .fetch_optional(&self.pool)
224        .await
225        .map_err(|e| {
226            tracing::error!(error = %e, "Failed to get user by email");
227            StorageError::Database("Failed to get user by email".to_string())
228        })?;
229
230        match user {
231            Some(user) => Ok(Some(user.into())),
232            None => Ok(None),
233        }
234    }
235
236    async fn get_or_create_user_by_email(&self, email: &str) -> Result<User, torii_core::Error> {
237        let user = self.get_user_by_email(email).await?;
238        if let Some(user) = user {
239            return Ok(user);
240        }
241
242        let user = self
243            .create_user(
244                &NewUser::builder()
245                    .id(UserId::new_random())
246                    .email(email.to_string())
247                    .build()
248                    .unwrap(),
249            )
250            .await
251            .map_err(|e| {
252                tracing::error!(error = %e, "Failed to get or create user by email");
253                StorageError::Database("Failed to get or create user by email".to_string())
254            })?;
255
256        Ok(user)
257    }
258
259    async fn update_user(&self, user: &User) -> Result<User, torii_core::Error> {
260        let user = sqlx::query_as::<_, PostgresUser>(
261            r#"
262            UPDATE users 
263            SET email = $1, name = $2, email_verified_at = $3, updated_at = $4 
264            WHERE id = $5
265            RETURNING id, email, name, email_verified_at, created_at, updated_at
266            "#,
267        )
268        .bind(&user.email)
269        .bind(&user.name)
270        .bind(user.email_verified_at)
271        .bind(user.updated_at)
272        .bind(user.id.as_str())
273        .fetch_one(&self.pool)
274        .await
275        .map_err(|e| {
276            tracing::error!(error = %e, "Failed to update user");
277            StorageError::Database("Failed to update user".to_string())
278        })?;
279
280        Ok(user.into())
281    }
282
283    async fn delete_user(&self, id: &UserId) -> Result<(), torii_core::Error> {
284        sqlx::query("DELETE FROM users WHERE id = $1")
285            .bind(id.as_str())
286            .execute(&self.pool)
287            .await
288            .map_err(|e| {
289                tracing::error!(error = %e, "Failed to delete user");
290                StorageError::Database("Failed to delete user".to_string())
291            })?;
292
293        Ok(())
294    }
295
296    async fn set_user_email_verified(&self, user_id: &UserId) -> Result<(), torii_core::Error> {
297        sqlx::query("UPDATE users SET email_verified_at = $1 WHERE id = $2")
298            .bind(Utc::now())
299            .bind(user_id.as_str())
300            .execute(&self.pool)
301            .await
302            .map_err(|e| {
303                tracing::error!(error = %e, "Failed to set user email verified");
304                StorageError::Database("Failed to set user email verified".to_string())
305            })?;
306
307        Ok(())
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use rand::Rng;
315    use sqlx::types::chrono::Utc;
316    use std::time::Duration;
317    use torii_core::session::SessionToken;
318    use torii_core::{Session, SessionStorage};
319
320    pub(crate) async fn setup_test_db() -> PostgresStorage {
321        // TODO: this function is leaking postgres databases after the test is done.
322        // We should find a way to clean up the database after the test is done.
323
324        let _ = tracing_subscriber::fmt().try_init();
325
326        let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
327            .await
328            .expect("Failed to create pool");
329
330        let db_name = format!("torii_test_{}", rand::rng().random_range(1..i64::MAX));
331
332        // Drop the database if it exists
333        sqlx::query(format!("DROP DATABASE IF EXISTS {db_name}").as_str())
334            .execute(&pool)
335            .await
336            .expect("Failed to drop database");
337
338        // Create a new database for the test
339        sqlx::query(format!("CREATE DATABASE {db_name}").as_str())
340            .execute(&pool)
341            .await
342            .expect("Failed to create database");
343
344        let pool = PgPool::connect(
345            format!("postgres://postgres:postgres@localhost:5432/{db_name}").as_str(),
346        )
347        .await
348        .expect("Failed to create pool");
349
350        let storage = PostgresStorage::new(pool);
351        storage.migrate().await.expect("Failed to run migrations");
352        storage
353    }
354
355    pub(crate) async fn create_test_user(
356        storage: &PostgresStorage,
357        id: &UserId,
358    ) -> Result<User, torii_core::Error> {
359        storage
360            .create_user(
361                &NewUser::builder()
362                    .id(id.clone())
363                    .email(format!("test{id}@example.com"))
364                    .build()
365                    .expect("Failed to build user"),
366            )
367            .await
368    }
369
370    pub(crate) async fn create_test_session(
371        storage: &PostgresStorage,
372        session_token: &SessionToken,
373        user_id: &UserId,
374        expires_in: Duration,
375    ) -> Result<Session, torii_core::Error> {
376        let now = Utc::now();
377        storage
378            .create_session(
379                &Session::builder()
380                    .token(session_token.clone())
381                    .user_id(user_id.clone())
382                    .user_agent(Some("test".to_string()))
383                    .ip_address(Some("127.0.0.1".to_string()))
384                    .created_at(now)
385                    .updated_at(now)
386                    .expires_at(now + expires_in)
387                    .build()
388                    .expect("Failed to build session"),
389            )
390            .await
391    }
392}