1mod magic_link;
2mod migrations;
3mod oauth;
4mod passkey;
5mod password;
6mod session;
7
8use async_trait::async_trait;
9use chrono::DateTime;
10use chrono::Utc;
11use migrations::CreateIndexes;
12use migrations::CreateMagicLinksTable;
13use migrations::CreateOAuthAccountsTable;
14use migrations::CreatePasskeyChallengesTable;
15use migrations::CreatePasskeysTable;
16use migrations::CreateSessionsTable;
17use migrations::CreateUsersTable;
18use migrations::PostgresMigrationManager;
19use sqlx::PgPool;
20use torii_core::error::StorageError;
21use torii_core::{
22 User, UserId,
23 storage::{NewUser, UserStorage},
24};
25use torii_migration::Migration;
26use torii_migration::MigrationManager;
27
28#[derive(Debug)]
29pub struct PostgresStorage {
30 pool: PgPool,
31}
32
33impl PostgresStorage {
34 pub fn new(pool: PgPool) -> Self {
35 Self { pool }
36 }
37
38 pub async fn migrate(&self) -> Result<(), StorageError> {
39 let manager = PostgresMigrationManager::new(self.pool.clone());
40 manager.initialize().await.map_err(|e| {
41 tracing::error!(error = %e, "Failed to initialize migrations");
42 StorageError::Migration("Failed to initialize migrations".to_string())
43 })?;
44
45 let migrations: Vec<Box<dyn Migration<_>>> = vec![
46 Box::new(CreateUsersTable),
47 Box::new(CreateSessionsTable),
48 Box::new(CreateOAuthAccountsTable),
49 Box::new(CreatePasskeysTable),
50 Box::new(CreatePasskeyChallengesTable),
51 Box::new(CreateIndexes),
52 Box::new(CreateMagicLinksTable),
53 ];
54 manager.up(&migrations).await.map_err(|e| {
55 tracing::error!(error = %e, "Failed to run migrations");
56 StorageError::Migration("Failed to run migrations".to_string())
57 })?;
58
59 Ok(())
60 }
61}
62
63#[derive(Debug, Clone, sqlx::FromRow)]
64pub struct PostgresUser {
65 id: String,
66 email: String,
67 name: Option<String>,
68 email_verified_at: Option<DateTime<Utc>>,
69 created_at: DateTime<Utc>,
70 updated_at: DateTime<Utc>,
71}
72
73impl From<PostgresUser> for User {
74 fn from(user: PostgresUser) -> Self {
75 User::builder()
76 .id(UserId::new(&user.id))
77 .email(user.email)
78 .name(user.name)
79 .email_verified_at(user.email_verified_at)
80 .created_at(user.created_at)
81 .updated_at(user.updated_at)
82 .build()
83 .unwrap()
84 }
85}
86
87impl From<User> for PostgresUser {
88 fn from(user: User) -> Self {
89 PostgresUser {
90 id: user.id.into_inner(),
91 email: user.email,
92 name: user.name,
93 email_verified_at: user.email_verified_at,
94 created_at: user.created_at,
95 updated_at: user.updated_at,
96 }
97 }
98}
99
100#[async_trait]
101impl UserStorage for PostgresStorage {
102 type Error = torii_core::Error;
103
104 async fn create_user(&self, user: &NewUser) -> Result<User, Self::Error> {
105 let user = sqlx::query_as::<_, PostgresUser>(
106 r#"
107 INSERT INTO users (id, email)
108 VALUES ($1::uuid, $2)
109 RETURNING id::text, email, name, email_verified_at, created_at, updated_at
110 "#,
111 )
112 .bind(user.id.as_str())
113 .bind(&user.email)
114 .fetch_one(&self.pool)
115 .await
116 .map_err(|e| {
117 tracing::error!(error = %e, "Failed to create user");
118 StorageError::Database("Failed to create user".to_string())
119 })?;
120
121 Ok(user.into())
122 }
123
124 async fn get_user(&self, id: &UserId) -> Result<Option<User>, Self::Error> {
125 let user = sqlx::query_as::<_, PostgresUser>(
126 r#"
127 SELECT id::text, email, name, email_verified_at, created_at, updated_at
128 FROM users
129 WHERE id::text = $1
130 "#,
131 )
132 .bind(id.as_str())
133 .fetch_optional(&self.pool)
134 .await
135 .map_err(|e| {
136 tracing::error!(error = %e, "Failed to get user");
137 StorageError::Database("Failed to get user".to_string())
138 })?;
139
140 match user {
141 Some(user) => Ok(Some(user.into())),
142 None => Ok(None),
143 }
144 }
145
146 async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, Self::Error> {
147 let user = sqlx::query_as::<_, PostgresUser>(
148 r#"
149 SELECT id::text, email, name, email_verified_at, created_at, updated_at
150 FROM users
151 WHERE email = $1
152 "#,
153 )
154 .bind(email)
155 .fetch_optional(&self.pool)
156 .await
157 .map_err(|e| {
158 tracing::error!(error = %e, "Failed to get user by email");
159 StorageError::Database("Failed to get user by email".to_string())
160 })?;
161
162 match user {
163 Some(user) => Ok(Some(user.into())),
164 None => Ok(None),
165 }
166 }
167
168 async fn get_or_create_user_by_email(&self, email: &str) -> Result<User, Self::Error> {
169 let user = self.get_user_by_email(email).await?;
170 if let Some(user) = user {
171 return Ok(user);
172 }
173
174 let user = self
175 .create_user(
176 &NewUser::builder()
177 .id(UserId::new_random())
178 .email(email.to_string())
179 .build()
180 .unwrap(),
181 )
182 .await
183 .map_err(|e| {
184 tracing::error!(error = %e, "Failed to get or create user by email");
185 StorageError::Database("Failed to get or create user by email".to_string())
186 })?;
187
188 Ok(user)
189 }
190
191 async fn update_user(&self, user: &User) -> Result<User, Self::Error> {
192 let user = sqlx::query_as::<_, PostgresUser>(
193 r#"
194 UPDATE users
195 SET email = $1, name = $2, email_verified_at = $3, updated_at = $4
196 WHERE id::text = $5
197 RETURNING id::text, email, name, email_verified_at, created_at, updated_at
198 "#,
199 )
200 .bind(&user.email)
201 .bind(&user.name)
202 .bind(user.email_verified_at)
203 .bind(user.updated_at)
204 .bind(user.id.as_str())
205 .fetch_one(&self.pool)
206 .await
207 .map_err(|e| {
208 tracing::error!(error = %e, "Failed to update user");
209 StorageError::Database("Failed to update user".to_string())
210 })?;
211
212 Ok(user.into())
213 }
214
215 async fn delete_user(&self, id: &UserId) -> Result<(), Self::Error> {
216 sqlx::query("DELETE FROM users WHERE id::text = $1")
217 .bind(id.as_str())
218 .execute(&self.pool)
219 .await
220 .map_err(|e| {
221 tracing::error!(error = %e, "Failed to delete user");
222 StorageError::Database("Failed to delete user".to_string())
223 })?;
224
225 Ok(())
226 }
227
228 async fn set_user_email_verified(&self, user_id: &UserId) -> Result<(), Self::Error> {
229 sqlx::query("UPDATE users SET email_verified_at = $1 WHERE id::text = $2")
230 .bind(Utc::now())
231 .bind(user_id.as_str())
232 .execute(&self.pool)
233 .await
234 .map_err(|e| {
235 tracing::error!(error = %e, "Failed to set user email verified");
236 StorageError::Database("Failed to set user email verified".to_string())
237 })?;
238
239 Ok(())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use rand::Rng;
247 use sqlx::types::chrono::Utc;
248 use std::time::Duration;
249 use torii_core::session::SessionId;
250 use torii_core::{Session, SessionStorage};
251
252 pub(crate) async fn setup_test_db() -> PostgresStorage {
253 let _ = tracing_subscriber::fmt().try_init();
257
258 let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
259 .await
260 .expect("Failed to create pool");
261
262 let db_name = format!("torii_test_{}", rand::rng().random_range(1..i64::MAX));
263
264 sqlx::query(format!("DROP DATABASE IF EXISTS {}", db_name).as_str())
266 .execute(&pool)
267 .await
268 .expect("Failed to drop database");
269
270 sqlx::query(format!("CREATE DATABASE {}", db_name).as_str())
272 .execute(&pool)
273 .await
274 .expect("Failed to create database");
275
276 let pool = PgPool::connect(
277 format!("postgres://postgres:postgres@localhost:5432/{}", db_name).as_str(),
278 )
279 .await
280 .expect("Failed to create pool");
281
282 let storage = PostgresStorage::new(pool);
283 storage.migrate().await.expect("Failed to run migrations");
284 storage
285 }
286
287 pub(crate) async fn create_test_user(
288 storage: &PostgresStorage,
289 id: &UserId,
290 ) -> Result<User, torii_core::Error> {
291 storage
292 .create_user(
293 &NewUser::builder()
294 .id(id.clone())
295 .email(format!("test{}@example.com", id))
296 .build()
297 .expect("Failed to build user"),
298 )
299 .await
300 }
301
302 pub(crate) async fn create_test_session(
303 storage: &PostgresStorage,
304 session_id: &SessionId,
305 user_id: &UserId,
306 expires_in: Duration,
307 ) -> Result<Session, torii_core::Error> {
308 let now = Utc::now();
309 storage
310 .create_session(
311 &Session::builder()
312 .id(session_id.clone())
313 .user_id(user_id.clone())
314 .user_agent(Some("test".to_string()))
315 .ip_address(Some("127.0.0.1".to_string()))
316 .created_at(now)
317 .updated_at(now)
318 .expires_at(now + expires_in)
319 .build()
320 .expect("Failed to build session"),
321 )
322 .await
323 }
324}