torii_storage_postgres/
lib.rs

1mod magic_link;
2mod migrations;
3mod oauth;
4mod passkey;
5mod password;
6mod session;
7
8use async_trait::async_trait;
9use chrono::DateTime;
10use chrono::Utc;
11use migrations::CreateIndexes;
12use migrations::CreateMagicLinksTable;
13use migrations::CreateOAuthAccountsTable;
14use migrations::CreatePasskeyChallengesTable;
15use migrations::CreatePasskeysTable;
16use migrations::CreateSessionsTable;
17use migrations::CreateUsersTable;
18use migrations::PostgresMigrationManager;
19use sqlx::PgPool;
20use torii_core::error::StorageError;
21use torii_core::{
22    User, UserId,
23    storage::{NewUser, UserStorage},
24};
25use torii_migration::Migration;
26use torii_migration::MigrationManager;
27
28#[derive(Debug, Clone)]
29pub struct PostgresStorage {
30    pool: PgPool,
31}
32
33impl PostgresStorage {
34    pub fn new(pool: PgPool) -> Self {
35        Self { pool }
36    }
37
38    pub async fn connect(database_url: &str) -> Result<Self, StorageError> {
39        let pool = PgPool::connect(database_url).await.map_err(|e| {
40            tracing::error!(error = %e, "Failed to connect to database");
41            StorageError::Database("Failed to connect to database".to_string())
42        })?;
43
44        Ok(Self::new(pool))
45    }
46
47    pub async fn migrate(&self) -> Result<(), StorageError> {
48        let manager = PostgresMigrationManager::new(self.pool.clone());
49        manager.initialize().await.map_err(|e| {
50            tracing::error!(error = %e, "Failed to initialize migrations");
51            StorageError::Migration("Failed to initialize migrations".to_string())
52        })?;
53
54        let migrations: Vec<Box<dyn Migration<_>>> = vec![
55            Box::new(CreateUsersTable),
56            Box::new(CreateSessionsTable),
57            Box::new(CreateOAuthAccountsTable),
58            Box::new(CreatePasskeysTable),
59            Box::new(CreatePasskeyChallengesTable),
60            Box::new(CreateIndexes),
61            Box::new(CreateMagicLinksTable),
62        ];
63        manager.up(&migrations).await.map_err(|e| {
64            tracing::error!(error = %e, "Failed to run migrations");
65            StorageError::Migration("Failed to run migrations".to_string())
66        })?;
67
68        Ok(())
69    }
70}
71
72#[derive(Debug, Clone, sqlx::FromRow)]
73pub struct PostgresUser {
74    id: String,
75    email: String,
76    name: Option<String>,
77    email_verified_at: Option<DateTime<Utc>>,
78    created_at: DateTime<Utc>,
79    updated_at: DateTime<Utc>,
80}
81
82impl From<PostgresUser> for User {
83    fn from(user: PostgresUser) -> Self {
84        User::builder()
85            .id(UserId::new(&user.id))
86            .email(user.email)
87            .name(user.name)
88            .email_verified_at(user.email_verified_at)
89            .created_at(user.created_at)
90            .updated_at(user.updated_at)
91            .build()
92            .unwrap()
93    }
94}
95
96impl From<User> for PostgresUser {
97    fn from(user: User) -> Self {
98        PostgresUser {
99            id: user.id.into_inner(),
100            email: user.email,
101            name: user.name,
102            email_verified_at: user.email_verified_at,
103            created_at: user.created_at,
104            updated_at: user.updated_at,
105        }
106    }
107}
108
109#[async_trait]
110impl UserStorage for PostgresStorage {
111    type Error = torii_core::Error;
112
113    async fn create_user(&self, user: &NewUser) -> Result<User, Self::Error> {
114        let user = sqlx::query_as::<_, PostgresUser>(
115            r#"
116            INSERT INTO users (id, email) 
117            VALUES ($1, $2) 
118            RETURNING id, email, name, email_verified_at, created_at, updated_at
119            "#,
120        )
121        .bind(user.id.as_str())
122        .bind(&user.email)
123        .fetch_one(&self.pool)
124        .await
125        .map_err(|e| {
126            tracing::error!(error = %e, "Failed to create user");
127            StorageError::Database("Failed to create user".to_string())
128        })?;
129
130        Ok(user.into())
131    }
132
133    async fn get_user(&self, id: &UserId) -> Result<Option<User>, Self::Error> {
134        let user = sqlx::query_as::<_, PostgresUser>(
135            r#"
136            SELECT id, email, name, email_verified_at, created_at, updated_at 
137            FROM users 
138            WHERE id = $1
139            "#,
140        )
141        .bind(id.as_str())
142        .fetch_optional(&self.pool)
143        .await
144        .map_err(|e| {
145            tracing::error!(error = %e, "Failed to get user");
146            StorageError::Database("Failed to get user".to_string())
147        })?;
148
149        match user {
150            Some(user) => Ok(Some(user.into())),
151            None => Ok(None),
152        }
153    }
154
155    async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, Self::Error> {
156        let user = sqlx::query_as::<_, PostgresUser>(
157            r#"
158            SELECT id, email, name, email_verified_at, created_at, updated_at 
159            FROM users 
160            WHERE email = $1
161            "#,
162        )
163        .bind(email)
164        .fetch_optional(&self.pool)
165        .await
166        .map_err(|e| {
167            tracing::error!(error = %e, "Failed to get user by email");
168            StorageError::Database("Failed to get user by email".to_string())
169        })?;
170
171        match user {
172            Some(user) => Ok(Some(user.into())),
173            None => Ok(None),
174        }
175    }
176
177    async fn get_or_create_user_by_email(&self, email: &str) -> Result<User, Self::Error> {
178        let user = self.get_user_by_email(email).await?;
179        if let Some(user) = user {
180            return Ok(user);
181        }
182
183        let user = self
184            .create_user(
185                &NewUser::builder()
186                    .id(UserId::new_random())
187                    .email(email.to_string())
188                    .build()
189                    .unwrap(),
190            )
191            .await
192            .map_err(|e| {
193                tracing::error!(error = %e, "Failed to get or create user by email");
194                StorageError::Database("Failed to get or create user by email".to_string())
195            })?;
196
197        Ok(user)
198    }
199
200    async fn update_user(&self, user: &User) -> Result<User, Self::Error> {
201        let user = sqlx::query_as::<_, PostgresUser>(
202            r#"
203            UPDATE users 
204            SET email = $1, name = $2, email_verified_at = $3, updated_at = $4 
205            WHERE id = $5
206            RETURNING id, email, name, email_verified_at, created_at, updated_at
207            "#,
208        )
209        .bind(&user.email)
210        .bind(&user.name)
211        .bind(user.email_verified_at)
212        .bind(user.updated_at)
213        .bind(user.id.as_str())
214        .fetch_one(&self.pool)
215        .await
216        .map_err(|e| {
217            tracing::error!(error = %e, "Failed to update user");
218            StorageError::Database("Failed to update user".to_string())
219        })?;
220
221        Ok(user.into())
222    }
223
224    async fn delete_user(&self, id: &UserId) -> Result<(), Self::Error> {
225        sqlx::query("DELETE FROM users WHERE id = $1")
226            .bind(id.as_str())
227            .execute(&self.pool)
228            .await
229            .map_err(|e| {
230                tracing::error!(error = %e, "Failed to delete user");
231                StorageError::Database("Failed to delete user".to_string())
232            })?;
233
234        Ok(())
235    }
236
237    async fn set_user_email_verified(&self, user_id: &UserId) -> Result<(), Self::Error> {
238        sqlx::query("UPDATE users SET email_verified_at = $1 WHERE id = $2")
239            .bind(Utc::now())
240            .bind(user_id.as_str())
241            .execute(&self.pool)
242            .await
243            .map_err(|e| {
244                tracing::error!(error = %e, "Failed to set user email verified");
245                StorageError::Database("Failed to set user email verified".to_string())
246            })?;
247
248        Ok(())
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use rand::Rng;
256    use sqlx::types::chrono::Utc;
257    use std::time::Duration;
258    use torii_core::session::SessionToken;
259    use torii_core::{Session, SessionStorage};
260
261    pub(crate) async fn setup_test_db() -> PostgresStorage {
262        // TODO: this function is leaking postgres databases after the test is done.
263        // We should find a way to clean up the database after the test is done.
264
265        let _ = tracing_subscriber::fmt().try_init();
266
267        let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
268            .await
269            .expect("Failed to create pool");
270
271        let db_name = format!("torii_test_{}", rand::rng().random_range(1..i64::MAX));
272
273        // Drop the database if it exists
274        sqlx::query(format!("DROP DATABASE IF EXISTS {}", db_name).as_str())
275            .execute(&pool)
276            .await
277            .expect("Failed to drop database");
278
279        // Create a new database for the test
280        sqlx::query(format!("CREATE DATABASE {}", db_name).as_str())
281            .execute(&pool)
282            .await
283            .expect("Failed to create database");
284
285        let pool = PgPool::connect(
286            format!("postgres://postgres:postgres@localhost:5432/{}", db_name).as_str(),
287        )
288        .await
289        .expect("Failed to create pool");
290
291        let storage = PostgresStorage::new(pool);
292        storage.migrate().await.expect("Failed to run migrations");
293        storage
294    }
295
296    pub(crate) async fn create_test_user(
297        storage: &PostgresStorage,
298        id: &UserId,
299    ) -> Result<User, torii_core::Error> {
300        storage
301            .create_user(
302                &NewUser::builder()
303                    .id(id.clone())
304                    .email(format!("test{}@example.com", id))
305                    .build()
306                    .expect("Failed to build user"),
307            )
308            .await
309    }
310
311    pub(crate) async fn create_test_session(
312        storage: &PostgresStorage,
313        session_token: &SessionToken,
314        user_id: &UserId,
315        expires_in: Duration,
316    ) -> Result<Session, torii_core::Error> {
317        let now = Utc::now();
318        storage
319            .create_session(
320                &Session::builder()
321                    .token(session_token.clone())
322                    .user_id(user_id.clone())
323                    .user_agent(Some("test".to_string()))
324                    .ip_address(Some("127.0.0.1".to_string()))
325                    .created_at(now)
326                    .updated_at(now)
327                    .expires_at(now + expires_in)
328                    .build()
329                    .expect("Failed to build session"),
330            )
331            .await
332    }
333}