1mod migrations;
66mod oauth;
67mod passkey;
68mod password;
69mod session;
70
71use async_trait::async_trait;
72use chrono::DateTime;
73use chrono::Utc;
74use migrations::CreateIndexes;
75use migrations::CreateOAuthAccountsTable;
76use migrations::CreatePasskeyChallengesTable;
77use migrations::CreatePasskeysTable;
78use migrations::CreateSessionsTable;
79use migrations::CreateUsersTable;
80use migrations::PostgresMigrationManager;
81use sqlx::PgPool;
82use torii_core::error::StorageError;
83use torii_core::{
84 User, UserId,
85 storage::{NewUser, UserStorage},
86};
87use torii_migration::Migration;
88use torii_migration::MigrationManager;
89
90#[derive(Debug, Clone)]
91pub struct PostgresStorage {
92 pool: PgPool,
93}
94
95impl PostgresStorage {
96 pub fn new(pool: PgPool) -> Self {
97 Self { pool }
98 }
99
100 pub async fn connect(database_url: &str) -> Result<Self, StorageError> {
101 let pool = PgPool::connect(database_url).await.map_err(|e| {
102 tracing::error!(error = %e, "Failed to connect to database");
103 StorageError::Database("Failed to connect to database".to_string())
104 })?;
105
106 Ok(Self::new(pool))
107 }
108
109 pub async fn migrate(&self) -> Result<(), StorageError> {
110 let manager = PostgresMigrationManager::new(self.pool.clone());
111 manager.initialize().await.map_err(|e| {
112 tracing::error!(error = %e, "Failed to initialize migrations");
113 StorageError::Migration("Failed to initialize migrations".to_string())
114 })?;
115
116 let migrations: Vec<Box<dyn Migration<_>>> = vec![
117 Box::new(CreateUsersTable),
118 Box::new(CreateSessionsTable),
119 Box::new(CreateOAuthAccountsTable),
120 Box::new(CreatePasskeysTable),
121 Box::new(CreatePasskeyChallengesTable),
122 Box::new(CreateIndexes),
123 ];
124 manager.up(&migrations).await.map_err(|e| {
125 tracing::error!(error = %e, "Failed to run migrations");
126 StorageError::Migration("Failed to run migrations".to_string())
127 })?;
128
129 Ok(())
130 }
131}
132
133#[derive(Debug, Clone, sqlx::FromRow)]
134pub struct PostgresUser {
135 id: String,
136 email: String,
137 name: Option<String>,
138 email_verified_at: Option<DateTime<Utc>>,
139 created_at: DateTime<Utc>,
140 updated_at: DateTime<Utc>,
141}
142
143impl From<PostgresUser> for User {
144 fn from(user: PostgresUser) -> Self {
145 User::builder()
146 .id(UserId::new(&user.id))
147 .email(user.email)
148 .name(user.name)
149 .email_verified_at(user.email_verified_at)
150 .created_at(user.created_at)
151 .updated_at(user.updated_at)
152 .build()
153 .unwrap()
154 }
155}
156
157impl From<User> for PostgresUser {
158 fn from(user: User) -> Self {
159 PostgresUser {
160 id: user.id.into_inner(),
161 email: user.email,
162 name: user.name,
163 email_verified_at: user.email_verified_at,
164 created_at: user.created_at,
165 updated_at: user.updated_at,
166 }
167 }
168}
169
170#[async_trait]
171impl UserStorage for PostgresStorage {
172 async fn create_user(&self, user: &NewUser) -> Result<User, torii_core::Error> {
173 let user = sqlx::query_as::<_, PostgresUser>(
174 r#"
175 INSERT INTO users (id, email)
176 VALUES ($1, $2)
177 RETURNING id, email, name, email_verified_at, created_at, updated_at
178 "#,
179 )
180 .bind(user.id.as_str())
181 .bind(&user.email)
182 .fetch_one(&self.pool)
183 .await
184 .map_err(|e| {
185 tracing::error!(error = %e, "Failed to create user");
186 StorageError::Database("Failed to create user".to_string())
187 })?;
188
189 Ok(user.into())
190 }
191
192 async fn get_user(&self, id: &UserId) -> Result<Option<User>, torii_core::Error> {
193 let user = sqlx::query_as::<_, PostgresUser>(
194 r#"
195 SELECT id, email, name, email_verified_at, created_at, updated_at
196 FROM users
197 WHERE id = $1
198 "#,
199 )
200 .bind(id.as_str())
201 .fetch_optional(&self.pool)
202 .await
203 .map_err(|e| {
204 tracing::error!(error = %e, "Failed to get user");
205 StorageError::Database("Failed to get user".to_string())
206 })?;
207
208 match user {
209 Some(user) => Ok(Some(user.into())),
210 None => Ok(None),
211 }
212 }
213
214 async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, torii_core::Error> {
215 let user = sqlx::query_as::<_, PostgresUser>(
216 r#"
217 SELECT id, email, name, email_verified_at, created_at, updated_at
218 FROM users
219 WHERE email = $1
220 "#,
221 )
222 .bind(email)
223 .fetch_optional(&self.pool)
224 .await
225 .map_err(|e| {
226 tracing::error!(error = %e, "Failed to get user by email");
227 StorageError::Database("Failed to get user by email".to_string())
228 })?;
229
230 match user {
231 Some(user) => Ok(Some(user.into())),
232 None => Ok(None),
233 }
234 }
235
236 async fn get_or_create_user_by_email(&self, email: &str) -> Result<User, torii_core::Error> {
237 let user = self.get_user_by_email(email).await?;
238 if let Some(user) = user {
239 return Ok(user);
240 }
241
242 let user = self
243 .create_user(
244 &NewUser::builder()
245 .id(UserId::new_random())
246 .email(email.to_string())
247 .build()
248 .unwrap(),
249 )
250 .await
251 .map_err(|e| {
252 tracing::error!(error = %e, "Failed to get or create user by email");
253 StorageError::Database("Failed to get or create user by email".to_string())
254 })?;
255
256 Ok(user)
257 }
258
259 async fn update_user(&self, user: &User) -> Result<User, torii_core::Error> {
260 let user = sqlx::query_as::<_, PostgresUser>(
261 r#"
262 UPDATE users
263 SET email = $1, name = $2, email_verified_at = $3, updated_at = $4
264 WHERE id = $5
265 RETURNING id, email, name, email_verified_at, created_at, updated_at
266 "#,
267 )
268 .bind(&user.email)
269 .bind(&user.name)
270 .bind(user.email_verified_at)
271 .bind(user.updated_at)
272 .bind(user.id.as_str())
273 .fetch_one(&self.pool)
274 .await
275 .map_err(|e| {
276 tracing::error!(error = %e, "Failed to update user");
277 StorageError::Database("Failed to update user".to_string())
278 })?;
279
280 Ok(user.into())
281 }
282
283 async fn delete_user(&self, id: &UserId) -> Result<(), torii_core::Error> {
284 sqlx::query("DELETE FROM users WHERE id = $1")
285 .bind(id.as_str())
286 .execute(&self.pool)
287 .await
288 .map_err(|e| {
289 tracing::error!(error = %e, "Failed to delete user");
290 StorageError::Database("Failed to delete user".to_string())
291 })?;
292
293 Ok(())
294 }
295
296 async fn set_user_email_verified(&self, user_id: &UserId) -> Result<(), torii_core::Error> {
297 sqlx::query("UPDATE users SET email_verified_at = $1 WHERE id = $2")
298 .bind(Utc::now())
299 .bind(user_id.as_str())
300 .execute(&self.pool)
301 .await
302 .map_err(|e| {
303 tracing::error!(error = %e, "Failed to set user email verified");
304 StorageError::Database("Failed to set user email verified".to_string())
305 })?;
306
307 Ok(())
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use rand::Rng;
315 use sqlx::types::chrono::Utc;
316 use std::time::Duration;
317 use torii_core::session::SessionToken;
318 use torii_core::{Session, SessionStorage};
319
320 pub(crate) async fn setup_test_db() -> PostgresStorage {
321 let _ = tracing_subscriber::fmt().try_init();
325
326 let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
327 .await
328 .expect("Failed to create pool");
329
330 let db_name = format!("torii_test_{}", rand::rng().random_range(1..i64::MAX));
331
332 sqlx::query(format!("DROP DATABASE IF EXISTS {db_name}").as_str())
334 .execute(&pool)
335 .await
336 .expect("Failed to drop database");
337
338 sqlx::query(format!("CREATE DATABASE {db_name}").as_str())
340 .execute(&pool)
341 .await
342 .expect("Failed to create database");
343
344 let pool = PgPool::connect(
345 format!("postgres://postgres:postgres@localhost:5432/{db_name}").as_str(),
346 )
347 .await
348 .expect("Failed to create pool");
349
350 let storage = PostgresStorage::new(pool);
351 storage.migrate().await.expect("Failed to run migrations");
352 storage
353 }
354
355 pub(crate) async fn create_test_user(
356 storage: &PostgresStorage,
357 id: &UserId,
358 ) -> Result<User, torii_core::Error> {
359 storage
360 .create_user(
361 &NewUser::builder()
362 .id(id.clone())
363 .email(format!("test{id}@example.com"))
364 .build()
365 .expect("Failed to build user"),
366 )
367 .await
368 }
369
370 pub(crate) async fn create_test_session(
371 storage: &PostgresStorage,
372 session_token: &SessionToken,
373 user_id: &UserId,
374 expires_in: Duration,
375 ) -> Result<Session, torii_core::Error> {
376 let now = Utc::now();
377 storage
378 .create_session(
379 &Session::builder()
380 .token(session_token.clone())
381 .user_id(user_id.clone())
382 .user_agent(Some("test".to_string()))
383 .ip_address(Some("127.0.0.1".to_string()))
384 .created_at(now)
385 .updated_at(now)
386 .expires_at(now + expires_in)
387 .build()
388 .expect("Failed to build session"),
389 )
390 .await
391 }
392}