1mod magic_link;
2mod migrations;
3mod oauth;
4mod passkey;
5mod password;
6mod session;
7
8use async_trait::async_trait;
9use chrono::DateTime;
10use chrono::Utc;
11use migrations::CreateIndexes;
12use migrations::CreateMagicLinksTable;
13use migrations::CreateOAuthAccountsTable;
14use migrations::CreatePasskeyChallengesTable;
15use migrations::CreatePasskeysTable;
16use migrations::CreateSessionsTable;
17use migrations::CreateUsersTable;
18use migrations::PostgresMigrationManager;
19use sqlx::PgPool;
20use torii_core::error::StorageError;
21use torii_core::{
22 User, UserId,
23 storage::{NewUser, UserStorage},
24};
25use torii_migration::Migration;
26use torii_migration::MigrationManager;
27
28#[derive(Debug, Clone)]
29pub struct PostgresStorage {
30 pool: PgPool,
31}
32
33impl PostgresStorage {
34 pub fn new(pool: PgPool) -> Self {
35 Self { pool }
36 }
37
38 pub async fn connect(database_url: &str) -> Result<Self, StorageError> {
39 let pool = PgPool::connect(database_url).await.map_err(|e| {
40 tracing::error!(error = %e, "Failed to connect to database");
41 StorageError::Database("Failed to connect to database".to_string())
42 })?;
43
44 Ok(Self::new(pool))
45 }
46
47 pub async fn migrate(&self) -> Result<(), StorageError> {
48 let manager = PostgresMigrationManager::new(self.pool.clone());
49 manager.initialize().await.map_err(|e| {
50 tracing::error!(error = %e, "Failed to initialize migrations");
51 StorageError::Migration("Failed to initialize migrations".to_string())
52 })?;
53
54 let migrations: Vec<Box<dyn Migration<_>>> = vec![
55 Box::new(CreateUsersTable),
56 Box::new(CreateSessionsTable),
57 Box::new(CreateOAuthAccountsTable),
58 Box::new(CreatePasskeysTable),
59 Box::new(CreatePasskeyChallengesTable),
60 Box::new(CreateIndexes),
61 Box::new(CreateMagicLinksTable),
62 ];
63 manager.up(&migrations).await.map_err(|e| {
64 tracing::error!(error = %e, "Failed to run migrations");
65 StorageError::Migration("Failed to run migrations".to_string())
66 })?;
67
68 Ok(())
69 }
70}
71
72#[derive(Debug, Clone, sqlx::FromRow)]
73pub struct PostgresUser {
74 id: String,
75 email: String,
76 name: Option<String>,
77 email_verified_at: Option<DateTime<Utc>>,
78 created_at: DateTime<Utc>,
79 updated_at: DateTime<Utc>,
80}
81
82impl From<PostgresUser> for User {
83 fn from(user: PostgresUser) -> Self {
84 User::builder()
85 .id(UserId::new(&user.id))
86 .email(user.email)
87 .name(user.name)
88 .email_verified_at(user.email_verified_at)
89 .created_at(user.created_at)
90 .updated_at(user.updated_at)
91 .build()
92 .unwrap()
93 }
94}
95
96impl From<User> for PostgresUser {
97 fn from(user: User) -> Self {
98 PostgresUser {
99 id: user.id.into_inner(),
100 email: user.email,
101 name: user.name,
102 email_verified_at: user.email_verified_at,
103 created_at: user.created_at,
104 updated_at: user.updated_at,
105 }
106 }
107}
108
109#[async_trait]
110impl UserStorage for PostgresStorage {
111 type Error = torii_core::Error;
112
113 async fn create_user(&self, user: &NewUser) -> Result<User, Self::Error> {
114 let user = sqlx::query_as::<_, PostgresUser>(
115 r#"
116 INSERT INTO users (id, email)
117 VALUES ($1, $2)
118 RETURNING id, email, name, email_verified_at, created_at, updated_at
119 "#,
120 )
121 .bind(user.id.as_str())
122 .bind(&user.email)
123 .fetch_one(&self.pool)
124 .await
125 .map_err(|e| {
126 tracing::error!(error = %e, "Failed to create user");
127 StorageError::Database("Failed to create user".to_string())
128 })?;
129
130 Ok(user.into())
131 }
132
133 async fn get_user(&self, id: &UserId) -> Result<Option<User>, Self::Error> {
134 let user = sqlx::query_as::<_, PostgresUser>(
135 r#"
136 SELECT id, email, name, email_verified_at, created_at, updated_at
137 FROM users
138 WHERE id = $1
139 "#,
140 )
141 .bind(id.as_str())
142 .fetch_optional(&self.pool)
143 .await
144 .map_err(|e| {
145 tracing::error!(error = %e, "Failed to get user");
146 StorageError::Database("Failed to get user".to_string())
147 })?;
148
149 match user {
150 Some(user) => Ok(Some(user.into())),
151 None => Ok(None),
152 }
153 }
154
155 async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, Self::Error> {
156 let user = sqlx::query_as::<_, PostgresUser>(
157 r#"
158 SELECT id, email, name, email_verified_at, created_at, updated_at
159 FROM users
160 WHERE email = $1
161 "#,
162 )
163 .bind(email)
164 .fetch_optional(&self.pool)
165 .await
166 .map_err(|e| {
167 tracing::error!(error = %e, "Failed to get user by email");
168 StorageError::Database("Failed to get user by email".to_string())
169 })?;
170
171 match user {
172 Some(user) => Ok(Some(user.into())),
173 None => Ok(None),
174 }
175 }
176
177 async fn get_or_create_user_by_email(&self, email: &str) -> Result<User, Self::Error> {
178 let user = self.get_user_by_email(email).await?;
179 if let Some(user) = user {
180 return Ok(user);
181 }
182
183 let user = self
184 .create_user(
185 &NewUser::builder()
186 .id(UserId::new_random())
187 .email(email.to_string())
188 .build()
189 .unwrap(),
190 )
191 .await
192 .map_err(|e| {
193 tracing::error!(error = %e, "Failed to get or create user by email");
194 StorageError::Database("Failed to get or create user by email".to_string())
195 })?;
196
197 Ok(user)
198 }
199
200 async fn update_user(&self, user: &User) -> Result<User, Self::Error> {
201 let user = sqlx::query_as::<_, PostgresUser>(
202 r#"
203 UPDATE users
204 SET email = $1, name = $2, email_verified_at = $3, updated_at = $4
205 WHERE id = $5
206 RETURNING id, email, name, email_verified_at, created_at, updated_at
207 "#,
208 )
209 .bind(&user.email)
210 .bind(&user.name)
211 .bind(user.email_verified_at)
212 .bind(user.updated_at)
213 .bind(user.id.as_str())
214 .fetch_one(&self.pool)
215 .await
216 .map_err(|e| {
217 tracing::error!(error = %e, "Failed to update user");
218 StorageError::Database("Failed to update user".to_string())
219 })?;
220
221 Ok(user.into())
222 }
223
224 async fn delete_user(&self, id: &UserId) -> Result<(), Self::Error> {
225 sqlx::query("DELETE FROM users WHERE id = $1")
226 .bind(id.as_str())
227 .execute(&self.pool)
228 .await
229 .map_err(|e| {
230 tracing::error!(error = %e, "Failed to delete user");
231 StorageError::Database("Failed to delete user".to_string())
232 })?;
233
234 Ok(())
235 }
236
237 async fn set_user_email_verified(&self, user_id: &UserId) -> Result<(), Self::Error> {
238 sqlx::query("UPDATE users SET email_verified_at = $1 WHERE id = $2")
239 .bind(Utc::now())
240 .bind(user_id.as_str())
241 .execute(&self.pool)
242 .await
243 .map_err(|e| {
244 tracing::error!(error = %e, "Failed to set user email verified");
245 StorageError::Database("Failed to set user email verified".to_string())
246 })?;
247
248 Ok(())
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use rand::Rng;
256 use sqlx::types::chrono::Utc;
257 use std::time::Duration;
258 use torii_core::session::SessionToken;
259 use torii_core::{Session, SessionStorage};
260
261 pub(crate) async fn setup_test_db() -> PostgresStorage {
262 let _ = tracing_subscriber::fmt().try_init();
266
267 let pool = PgPool::connect("postgres://postgres:postgres@localhost:5432/postgres")
268 .await
269 .expect("Failed to create pool");
270
271 let db_name = format!("torii_test_{}", rand::rng().random_range(1..i64::MAX));
272
273 sqlx::query(format!("DROP DATABASE IF EXISTS {}", db_name).as_str())
275 .execute(&pool)
276 .await
277 .expect("Failed to drop database");
278
279 sqlx::query(format!("CREATE DATABASE {}", db_name).as_str())
281 .execute(&pool)
282 .await
283 .expect("Failed to create database");
284
285 let pool = PgPool::connect(
286 format!("postgres://postgres:postgres@localhost:5432/{}", db_name).as_str(),
287 )
288 .await
289 .expect("Failed to create pool");
290
291 let storage = PostgresStorage::new(pool);
292 storage.migrate().await.expect("Failed to run migrations");
293 storage
294 }
295
296 pub(crate) async fn create_test_user(
297 storage: &PostgresStorage,
298 id: &UserId,
299 ) -> Result<User, torii_core::Error> {
300 storage
301 .create_user(
302 &NewUser::builder()
303 .id(id.clone())
304 .email(format!("test{}@example.com", id))
305 .build()
306 .expect("Failed to build user"),
307 )
308 .await
309 }
310
311 pub(crate) async fn create_test_session(
312 storage: &PostgresStorage,
313 session_token: &SessionToken,
314 user_id: &UserId,
315 expires_in: Duration,
316 ) -> Result<Session, torii_core::Error> {
317 let now = Utc::now();
318 storage
319 .create_session(
320 &Session::builder()
321 .token(session_token.clone())
322 .user_id(user_id.clone())
323 .user_agent(Some("test".to_string()))
324 .ip_address(Some("127.0.0.1".to_string()))
325 .created_at(now)
326 .updated_at(now)
327 .expires_at(now + expires_in)
328 .build()
329 .expect("Failed to build session"),
330 )
331 .await
332 }
333}