1mod errors;
2mod joins;
3mod query;
4mod row;
5mod schema;
6mod state;
7mod support;
8
9use std::sync::Arc;
10
11use openauth_core::db::{
12 auth_schema, AdapterCapabilities, AdapterFuture, AuthSchemaOptions, Count, Create, DbAdapter,
13 DbRecord, DbSchema, Delete, DeleteMany, FindMany, FindOne, JoinAdapter, SchemaCreation,
14 TransactionCallback, Update, UpdateMany,
15};
16use openauth_core::error::OpenAuthError;
17use sqlx::sqlite::SqlitePoolOptions;
18use sqlx::{Executor, Sqlite, SqlitePool, Transaction};
19use tokio::sync::Mutex;
20
21use self::errors::sql_error;
22use self::schema::create_schema;
23use self::state::{SqliteExecutor, SqliteState};
24
25#[derive(Debug, Clone)]
26pub struct SqliteAdapter {
27 pool: SqlitePool,
28 schema: Arc<DbSchema>,
29}
30
31impl SqliteAdapter {
32 pub fn new(pool: SqlitePool) -> Self {
33 Self::with_schema(pool, auth_schema(AuthSchemaOptions::default()))
34 }
35
36 pub fn with_schema(pool: SqlitePool, schema: DbSchema) -> Self {
37 Self {
38 pool,
39 schema: Arc::new(schema),
40 }
41 }
42
43 pub async fn connect(database_url: &str) -> Result<Self, OpenAuthError> {
44 Self::connect_with_schema(database_url, auth_schema(AuthSchemaOptions::default())).await
45 }
46
47 pub async fn connect_with_schema(
48 database_url: &str,
49 schema: DbSchema,
50 ) -> Result<Self, OpenAuthError> {
51 let pool = SqlitePoolOptions::new()
52 .connect(database_url)
53 .await
54 .map_err(sql_error)?;
55 Ok(Self::with_schema(pool, schema))
56 }
57
58 fn state(&self) -> SqliteState<'_, '_> {
59 SqliteState {
60 schema: &self.schema,
61 executor: SqliteExecutor::Pool(&self.pool),
62 }
63 }
64}
65
66impl DbAdapter for SqliteAdapter {
67 fn id(&self) -> &str {
68 "sqlx-sqlite"
69 }
70
71 fn capabilities(&self) -> AdapterCapabilities {
72 AdapterCapabilities::new(self.id())
73 .named("SQLx SQLite")
74 .with_json()
75 .with_arrays()
76 .with_joins()
77 .with_transactions()
78 }
79
80 fn create<'a>(&'a self, query: Create) -> AdapterFuture<'a, DbRecord> {
81 Box::pin(async move { self.state().create(query).await })
82 }
83
84 fn find_one<'a>(&'a self, query: FindOne) -> AdapterFuture<'a, Option<DbRecord>> {
85 Box::pin(async move { self.state().find_one(query).await })
86 }
87
88 fn find_many<'a>(&'a self, query: FindMany) -> AdapterFuture<'a, Vec<DbRecord>> {
89 Box::pin(async move {
90 if query.joins.len() <= 1 {
91 self.state().find_many(query).await
92 } else {
93 let adapter =
94 JoinAdapter::new(self.schema.as_ref().clone(), Arc::new(self.clone()), false);
95 adapter.find_many(query).await
96 }
97 })
98 }
99
100 fn count<'a>(&'a self, query: Count) -> AdapterFuture<'a, u64> {
101 Box::pin(async move { self.state().count(query).await })
102 }
103
104 fn update<'a>(&'a self, query: Update) -> AdapterFuture<'a, Option<DbRecord>> {
105 Box::pin(async move { self.state().update(query).await })
106 }
107
108 fn update_many<'a>(&'a self, query: UpdateMany) -> AdapterFuture<'a, u64> {
109 Box::pin(async move { self.state().update_many(query).await })
110 }
111
112 fn delete<'a>(&'a self, query: Delete) -> AdapterFuture<'a, ()> {
113 Box::pin(async move { self.state().delete(query).await })
114 }
115
116 fn delete_many<'a>(&'a self, query: DeleteMany) -> AdapterFuture<'a, u64> {
117 Box::pin(async move { self.state().delete_many(query).await })
118 }
119
120 fn transaction<'a>(&'a self, callback: TransactionCallback<'a>) -> AdapterFuture<'a, ()> {
121 Box::pin(async move {
122 let tx = self.pool.begin().await.map_err(sql_error)?;
123 let adapter = Arc::new(SqliteTxAdapter {
124 schema: Arc::clone(&self.schema),
125 tx: Mutex::new(Some(tx)),
126 });
127 let result = callback(Box::new(Arc::clone(&adapter))).await;
128 let mut guard = adapter.tx.lock().await;
129 let Some(tx) = guard.take() else {
130 return Err(OpenAuthError::Adapter(
131 "sqlite transaction was already completed".to_owned(),
132 ));
133 };
134 drop(guard);
135 match result {
136 Ok(()) => tx.commit().await.map_err(sql_error),
137 Err(error) => {
138 let _rollback_result = tx.rollback().await;
139 Err(error)
140 }
141 }
142 })
143 }
144
145 fn create_schema<'a>(
146 &'a self,
147 schema: &'a DbSchema,
148 _file: Option<&'a str>,
149 ) -> AdapterFuture<'a, Option<SchemaCreation>> {
150 Box::pin(async move {
151 self.pool
152 .execute("PRAGMA foreign_keys = ON")
153 .await
154 .map_err(sql_error)?;
155 create_schema(SqliteExecutor::Pool(&self.pool), schema).await?;
156 Ok(None)
157 })
158 }
159
160 fn run_migrations<'a>(&'a self, schema: &'a DbSchema) -> AdapterFuture<'a, ()> {
161 Box::pin(async move {
162 self.create_schema(schema, None).await?;
163 Ok(())
164 })
165 }
166}
167
168struct SqliteTxAdapter<'tx> {
169 schema: Arc<DbSchema>,
170 tx: Mutex<Option<Transaction<'tx, Sqlite>>>,
171}
172
173impl DbAdapter for SqliteTxAdapter<'_> {
174 fn id(&self) -> &str {
175 "sqlx-sqlite"
176 }
177
178 fn capabilities(&self) -> AdapterCapabilities {
179 AdapterCapabilities::new(self.id())
180 .named("SQLx SQLite")
181 .with_json()
182 .with_arrays()
183 .with_transactions()
184 }
185
186 fn create<'a>(&'a self, query: Create) -> AdapterFuture<'a, DbRecord> {
187 Box::pin(async move { self.state().await?.create(query).await })
188 }
189
190 fn find_one<'a>(&'a self, query: FindOne) -> AdapterFuture<'a, Option<DbRecord>> {
191 Box::pin(async move { self.state().await?.find_one(query).await })
192 }
193
194 fn find_many<'a>(&'a self, query: FindMany) -> AdapterFuture<'a, Vec<DbRecord>> {
195 Box::pin(async move { self.state().await?.find_many(query).await })
196 }
197
198 fn count<'a>(&'a self, query: Count) -> AdapterFuture<'a, u64> {
199 Box::pin(async move { self.state().await?.count(query).await })
200 }
201
202 fn update<'a>(&'a self, query: Update) -> AdapterFuture<'a, Option<DbRecord>> {
203 Box::pin(async move { self.state().await?.update(query).await })
204 }
205
206 fn update_many<'a>(&'a self, query: UpdateMany) -> AdapterFuture<'a, u64> {
207 Box::pin(async move { self.state().await?.update_many(query).await })
208 }
209
210 fn delete<'a>(&'a self, query: Delete) -> AdapterFuture<'a, ()> {
211 Box::pin(async move { self.state().await?.delete(query).await })
212 }
213
214 fn delete_many<'a>(&'a self, query: DeleteMany) -> AdapterFuture<'a, u64> {
215 Box::pin(async move { self.state().await?.delete_many(query).await })
216 }
217
218 fn transaction<'a>(&'a self, callback: TransactionCallback<'a>) -> AdapterFuture<'a, ()> {
219 callback(Box::new(self))
220 }
221}
222
223impl<'tx> SqliteTxAdapter<'tx> {
224 async fn state<'a>(&'a self) -> Result<SqliteState<'a, 'tx>, OpenAuthError> {
225 let guard = self.tx.lock().await;
226 if guard.is_none() {
227 return Err(OpenAuthError::Adapter(
228 "sqlite transaction is no longer active".to_owned(),
229 ));
230 }
231 Ok(SqliteState {
232 schema: &self.schema,
233 executor: SqliteExecutor::Transaction(guard),
234 })
235 }
236}