torii_storage_postgres/
lib.rs

1mod magic_link;
2mod migrations;
3mod oauth;
4mod passkey;
5mod password;
6mod session;
7
8use async_trait::async_trait;
9use chrono::DateTime;
10use chrono::Utc;
11use migrations::CreateIndexes;
12use migrations::CreateMagicLinksTable;
13use migrations::CreateOAuthAccountsTable;
14use migrations::CreatePasskeyChallengesTable;
15use migrations::CreatePasskeysTable;
16use migrations::CreateSessionsTable;
17use migrations::CreateUsersTable;
18use migrations::PostgresMigrationManager;
19use sqlx::PgPool;
20use torii_core::error::StorageError;
21use torii_core::{
22    User, UserId,
23    storage::{NewUser, UserStorage},
24};
25use torii_migration::Migration;
26use torii_migration::MigrationManager;
27
28#[derive(Debug)]
29pub struct PostgresStorage {
30    pool: PgPool,
31}
32
33impl PostgresStorage {
34    pub fn new(pool: PgPool) -> Self {
35        Self { pool }
36    }
37
38    pub async fn migrate(&self) -> Result<(), StorageError> {
39        let manager = PostgresMigrationManager::new(self.pool.clone());
40        manager.initialize().await.map_err(|e| {
41            tracing::error!(error = %e, "Failed to initialize migrations");
42            StorageError::Migration("Failed to initialize migrations".to_string())
43        })?;
44
45        let migrations: Vec<Box<dyn Migration<_>>> = vec![
46            Box::new(CreateUsersTable),
47            Box::new(CreateSessionsTable),
48            Box::new(CreateOAuthAccountsTable),
49            Box::new(CreatePasskeysTable),
50            Box::new(CreatePasskeyChallengesTable),
51            Box::new(CreateIndexes),
52            Box::new(CreateMagicLinksTable),
53        ];
54        manager.up(&migrations).await.map_err(|e| {
55            tracing::error!(error = %e, "Failed to run migrations");
56            StorageError::Migration("Failed to run migrations".to_string())
57        })?;
58
59        Ok(())
60    }
61}
62
63#[derive(Debug, Clone, sqlx::FromRow)]
64pub struct PostgresUser {
65    id: String,
66    email: String,
67    name: Option<String>,
68    email_verified_at: Option<DateTime<Utc>>,
69    created_at: DateTime<Utc>,
70    updated_at: DateTime<Utc>,
71}
72
73impl From<PostgresUser> for User {
74    fn from(user: PostgresUser) -> Self {
75        User::builder()
76            .id(UserId::new(&user.id))
77            .email(user.email)
78            .name(user.name)
79            .email_verified_at(user.email_verified_at)
80            .created_at(user.created_at)
81            .updated_at(user.updated_at)
82            .build()
83            .unwrap()
84    }
85}
86
87impl From<User> for PostgresUser {
88    fn from(user: User) -> Self {
89        PostgresUser {
90            id: user.id.into_inner(),
91            email: user.email,
92            name: user.name,
93            email_verified_at: user.email_verified_at,
94            created_at: user.created_at,
95            updated_at: user.updated_at,
96        }
97    }
98}
99
100#[async_trait]
101impl UserStorage for PostgresStorage {
102    type Error = torii_core::Error;
103
104    async fn create_user(&self, user: &NewUser) -> Result<User, Self::Error> {
105        let user = sqlx::query_as::<_, PostgresUser>(
106            r#"
107            INSERT INTO users (id, email) 
108            VALUES ($1::uuid, $2) 
109            RETURNING id::text, email, name, email_verified_at, created_at, updated_at
110            "#,
111        )
112        .bind(user.id.as_str())
113        .bind(&user.email)
114        .fetch_one(&self.pool)
115        .await
116        .map_err(|e| {
117            tracing::error!(error = %e, "Failed to create user");
118            StorageError::Database("Failed to create user".to_string())
119        })?;
120
121        Ok(user.into())
122    }
123
124    async fn get_user(&self, id: &UserId) -> Result<Option<User>, Self::Error> {
125        let user = sqlx::query_as::<_, PostgresUser>(
126            r#"
127            SELECT id::text, email, name, email_verified_at, created_at, updated_at 
128            FROM users 
129            WHERE id::text = $1
130            "#,
131        )
132        .bind(id.as_str())
133        .fetch_optional(&self.pool)
134        .await
135        .map_err(|e| {
136            tracing::error!(error = %e, "Failed to get user");
137            StorageError::Database("Failed to get user".to_string())
138        })?;
139
140        match user {
141            Some(user) => Ok(Some(user.into())),
142            None => Ok(None),
143        }
144    }
145
146    async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, Self::Error> {
147        let user = sqlx::query_as::<_, PostgresUser>(
148            r#"
149            SELECT id::text, email, name, email_verified_at, created_at, updated_at 
150            FROM users 
151            WHERE email = $1
152            "#,
153        )
154        .bind(email)
155        .fetch_optional(&self.pool)
156        .await
157        .map_err(|e| {
158            tracing::error!(error = %e, "Failed to get user by email");
159            StorageError::Database("Failed to get user by email".to_string())
160        })?;
161
162        match user {
163            Some(user) => Ok(Some(user.into())),
164            None => Ok(None),
165        }
166    }
167
168    async fn get_or_create_user_by_email(&self, email: &str) -> Result<User, Self::Error> {
169        let user = self.get_user_by_email(email).await?;
170        if let Some(user) = user {
171            return Ok(user);
172        }
173
174        let user = self
175            .create_user(
176                &NewUser::builder()
177                    .id(UserId::new_random())
178                    .email(email.to_string())
179                    .build()
180                    .unwrap(),
181            )
182            .await
183            .map_err(|e| {
184                tracing::error!(error = %e, "Failed to get or create user by email");
185                StorageError::Database("Failed to get or create user by email".to_string())
186            })?;
187
188        Ok(user)
189    }
190
191    async fn update_user(&self, user: &User) -> Result<User, Self::Error> {
192        let user = sqlx::query_as::<_, PostgresUser>(
193            r#"
194            UPDATE users 
195            SET email = $1, name = $2, email_verified_at = $3, updated_at = $4 
196            WHERE id::text = $5
197            RETURNING id::text, email, name, email_verified_at, created_at, updated_at
198            "#,
199        )
200        .bind(&user.email)
201        .bind(&user.name)
202        .bind(user.email_verified_at)
203        .bind(user.updated_at)
204        .bind(user.id.as_str())
205        .fetch_one(&self.pool)
206        .await
207        .map_err(|e| {
208            tracing::error!(error = %e, "Failed to update user");
209            StorageError::Database("Failed to update user".to_string())
210        })?;
211
212        Ok(user.into())
213    }
214
215    async fn delete_user(&self, id: &UserId) -> Result<(), Self::Error> {
216        sqlx::query("DELETE FROM users WHERE id::text = $1")
217            .bind(id.as_str())
218            .execute(&self.pool)
219            .await
220            .map_err(|e| {
221                tracing::error!(error = %e, "Failed to delete user");
222                StorageError::Database("Failed to delete user".to_string())
223            })?;
224
225        Ok(())
226    }
227
228    async fn set_user_email_verified(&self, user_id: &UserId) -> Result<(), Self::Error> {
229        sqlx::query("UPDATE users SET email_verified_at = $1 WHERE id::text = $2")
230            .bind(Utc::now())
231            .bind(user_id.as_str())
232            .execute(&self.pool)
233            .await
234            .map_err(|e| {
235                tracing::error!(error = %e, "Failed to set user email verified");
236                StorageError::Database("Failed to set user email verified".to_string())
237            })?;
238
239        Ok(())
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use rand::Rng;
247    use sqlx::types::chrono::Utc;
248    use std::time::Duration;
249    use torii_core::session::SessionId;
250    use torii_core::{Session, SessionStorage};
251
252    pub(crate) async fn setup_test_db() -> PostgresStorage {
253        // TODO: this function is leaking postgres databases after the test is done.
254        // We should find a way to clean up the database after the test is done.
255
256        let _ = tracing_subscriber::fmt().try_init();
257
258        let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
259            .await
260            .expect("Failed to create pool");
261
262        let db_name = format!("torii_test_{}", rand::rng().random_range(1..i64::MAX));
263
264        // Drop the database if it exists
265        sqlx::query(format!("DROP DATABASE IF EXISTS {}", db_name).as_str())
266            .execute(&pool)
267            .await
268            .expect("Failed to drop database");
269
270        // Create a new database for the test
271        sqlx::query(format!("CREATE DATABASE {}", db_name).as_str())
272            .execute(&pool)
273            .await
274            .expect("Failed to create database");
275
276        let pool = PgPool::connect(
277            format!("postgres://postgres:postgres@localhost:5432/{}", db_name).as_str(),
278        )
279        .await
280        .expect("Failed to create pool");
281
282        let storage = PostgresStorage::new(pool);
283        storage.migrate().await.expect("Failed to run migrations");
284        storage
285    }
286
287    pub(crate) async fn create_test_user(
288        storage: &PostgresStorage,
289        id: &UserId,
290    ) -> Result<User, torii_core::Error> {
291        storage
292            .create_user(
293                &NewUser::builder()
294                    .id(id.clone())
295                    .email(format!("test{}@example.com", id))
296                    .build()
297                    .expect("Failed to build user"),
298            )
299            .await
300    }
301
302    pub(crate) async fn create_test_session(
303        storage: &PostgresStorage,
304        session_id: &SessionId,
305        user_id: &UserId,
306        expires_in: Duration,
307    ) -> Result<Session, torii_core::Error> {
308        let now = Utc::now();
309        storage
310            .create_session(
311                &Session::builder()
312                    .id(session_id.clone())
313                    .user_id(user_id.clone())
314                    .user_agent(Some("test".to_string()))
315                    .ip_address(Some("127.0.0.1".to_string()))
316                    .created_at(now)
317                    .updated_at(now)
318                    .expires_at(now + expires_in)
319                    .build()
320                    .expect("Failed to build session"),
321            )
322            .await
323    }
324}