Skip to main content

openauth_sqlx/sqlite/
mod.rs

1mod errors;
2mod joins;
3mod query;
4mod row;
5mod schema;
6mod state;
7mod support;
8
9use std::sync::Arc;
10
11use openauth_core::db::{
12    auth_schema, AdapterCapabilities, AdapterFuture, AuthSchemaOptions, Count, Create, DbAdapter,
13    DbRecord, DbSchema, Delete, DeleteMany, FindMany, FindOne, JoinAdapter, SchemaCreation,
14    TransactionCallback, Update, UpdateMany,
15};
16use openauth_core::error::OpenAuthError;
17use sqlx::sqlite::SqlitePoolOptions;
18use sqlx::{Executor, Sqlite, SqlitePool, Transaction};
19use tokio::sync::Mutex;
20
21use self::errors::sql_error;
22use self::schema::create_schema;
23use self::state::{SqliteExecutor, SqliteState};
24
25#[derive(Debug, Clone)]
26pub struct SqliteAdapter {
27    pool: SqlitePool,
28    schema: Arc<DbSchema>,
29}
30
31impl SqliteAdapter {
32    pub fn new(pool: SqlitePool) -> Self {
33        Self::with_schema(pool, auth_schema(AuthSchemaOptions::default()))
34    }
35
36    pub fn with_schema(pool: SqlitePool, schema: DbSchema) -> Self {
37        Self {
38            pool,
39            schema: Arc::new(schema),
40        }
41    }
42
43    pub async fn connect(database_url: &str) -> Result<Self, OpenAuthError> {
44        Self::connect_with_schema(database_url, auth_schema(AuthSchemaOptions::default())).await
45    }
46
47    pub async fn connect_with_schema(
48        database_url: &str,
49        schema: DbSchema,
50    ) -> Result<Self, OpenAuthError> {
51        let pool = SqlitePoolOptions::new()
52            .connect(database_url)
53            .await
54            .map_err(sql_error)?;
55        Ok(Self::with_schema(pool, schema))
56    }
57
58    fn state(&self) -> SqliteState<'_, '_> {
59        SqliteState {
60            schema: &self.schema,
61            executor: SqliteExecutor::Pool(&self.pool),
62        }
63    }
64}
65
66impl DbAdapter for SqliteAdapter {
67    fn id(&self) -> &str {
68        "sqlx-sqlite"
69    }
70
71    fn capabilities(&self) -> AdapterCapabilities {
72        AdapterCapabilities::new(self.id())
73            .named("SQLx SQLite")
74            .with_json()
75            .with_arrays()
76            .with_joins()
77            .with_transactions()
78    }
79
80    fn create<'a>(&'a self, query: Create) -> AdapterFuture<'a, DbRecord> {
81        Box::pin(async move { self.state().create(query).await })
82    }
83
84    fn find_one<'a>(&'a self, query: FindOne) -> AdapterFuture<'a, Option<DbRecord>> {
85        Box::pin(async move { self.state().find_one(query).await })
86    }
87
88    fn find_many<'a>(&'a self, query: FindMany) -> AdapterFuture<'a, Vec<DbRecord>> {
89        Box::pin(async move {
90            if query.joins.len() <= 1 {
91                self.state().find_many(query).await
92            } else {
93                let adapter =
94                    JoinAdapter::new(self.schema.as_ref().clone(), Arc::new(self.clone()), false);
95                adapter.find_many(query).await
96            }
97        })
98    }
99
100    fn count<'a>(&'a self, query: Count) -> AdapterFuture<'a, u64> {
101        Box::pin(async move { self.state().count(query).await })
102    }
103
104    fn update<'a>(&'a self, query: Update) -> AdapterFuture<'a, Option<DbRecord>> {
105        Box::pin(async move { self.state().update(query).await })
106    }
107
108    fn update_many<'a>(&'a self, query: UpdateMany) -> AdapterFuture<'a, u64> {
109        Box::pin(async move { self.state().update_many(query).await })
110    }
111
112    fn delete<'a>(&'a self, query: Delete) -> AdapterFuture<'a, ()> {
113        Box::pin(async move { self.state().delete(query).await })
114    }
115
116    fn delete_many<'a>(&'a self, query: DeleteMany) -> AdapterFuture<'a, u64> {
117        Box::pin(async move { self.state().delete_many(query).await })
118    }
119
120    fn transaction<'a>(&'a self, callback: TransactionCallback<'a>) -> AdapterFuture<'a, ()> {
121        Box::pin(async move {
122            let tx = self.pool.begin().await.map_err(sql_error)?;
123            let adapter = Arc::new(SqliteTxAdapter {
124                schema: Arc::clone(&self.schema),
125                tx: Mutex::new(Some(tx)),
126            });
127            let result = callback(Box::new(Arc::clone(&adapter))).await;
128            let mut guard = adapter.tx.lock().await;
129            let Some(tx) = guard.take() else {
130                return Err(OpenAuthError::Adapter(
131                    "sqlite transaction was already completed".to_owned(),
132                ));
133            };
134            drop(guard);
135            match result {
136                Ok(()) => tx.commit().await.map_err(sql_error),
137                Err(error) => {
138                    let _rollback_result = tx.rollback().await;
139                    Err(error)
140                }
141            }
142        })
143    }
144
145    fn create_schema<'a>(
146        &'a self,
147        schema: &'a DbSchema,
148        _file: Option<&'a str>,
149    ) -> AdapterFuture<'a, Option<SchemaCreation>> {
150        Box::pin(async move {
151            self.pool
152                .execute("PRAGMA foreign_keys = ON")
153                .await
154                .map_err(sql_error)?;
155            create_schema(SqliteExecutor::Pool(&self.pool), schema).await?;
156            Ok(None)
157        })
158    }
159
160    fn run_migrations<'a>(&'a self, schema: &'a DbSchema) -> AdapterFuture<'a, ()> {
161        Box::pin(async move {
162            self.create_schema(schema, None).await?;
163            Ok(())
164        })
165    }
166}
167
168struct SqliteTxAdapter<'tx> {
169    schema: Arc<DbSchema>,
170    tx: Mutex<Option<Transaction<'tx, Sqlite>>>,
171}
172
173impl DbAdapter for SqliteTxAdapter<'_> {
174    fn id(&self) -> &str {
175        "sqlx-sqlite"
176    }
177
178    fn capabilities(&self) -> AdapterCapabilities {
179        AdapterCapabilities::new(self.id())
180            .named("SQLx SQLite")
181            .with_json()
182            .with_arrays()
183            .with_transactions()
184    }
185
186    fn create<'a>(&'a self, query: Create) -> AdapterFuture<'a, DbRecord> {
187        Box::pin(async move { self.state().await?.create(query).await })
188    }
189
190    fn find_one<'a>(&'a self, query: FindOne) -> AdapterFuture<'a, Option<DbRecord>> {
191        Box::pin(async move { self.state().await?.find_one(query).await })
192    }
193
194    fn find_many<'a>(&'a self, query: FindMany) -> AdapterFuture<'a, Vec<DbRecord>> {
195        Box::pin(async move { self.state().await?.find_many(query).await })
196    }
197
198    fn count<'a>(&'a self, query: Count) -> AdapterFuture<'a, u64> {
199        Box::pin(async move { self.state().await?.count(query).await })
200    }
201
202    fn update<'a>(&'a self, query: Update) -> AdapterFuture<'a, Option<DbRecord>> {
203        Box::pin(async move { self.state().await?.update(query).await })
204    }
205
206    fn update_many<'a>(&'a self, query: UpdateMany) -> AdapterFuture<'a, u64> {
207        Box::pin(async move { self.state().await?.update_many(query).await })
208    }
209
210    fn delete<'a>(&'a self, query: Delete) -> AdapterFuture<'a, ()> {
211        Box::pin(async move { self.state().await?.delete(query).await })
212    }
213
214    fn delete_many<'a>(&'a self, query: DeleteMany) -> AdapterFuture<'a, u64> {
215        Box::pin(async move { self.state().await?.delete_many(query).await })
216    }
217
218    fn transaction<'a>(&'a self, callback: TransactionCallback<'a>) -> AdapterFuture<'a, ()> {
219        callback(Box::new(self))
220    }
221}
222
223impl<'tx> SqliteTxAdapter<'tx> {
224    async fn state<'a>(&'a self) -> Result<SqliteState<'a, 'tx>, OpenAuthError> {
225        let guard = self.tx.lock().await;
226        if guard.is_none() {
227            return Err(OpenAuthError::Adapter(
228                "sqlite transaction is no longer active".to_owned(),
229            ));
230        }
231        Ok(SqliteState {
232            schema: &self.schema,
233            executor: SqliteExecutor::Transaction(guard),
234        })
235    }
236}