1use sqlx::{any::{AnyPoolOptions, AnyRow}, AnyPool, Transaction};
2use std::fmt::Debug;
3
4pub use sqlx;
5
6pub mod migrations;
7pub use migrations::{Migration, MigrationManager};
8
9pub mod relations;
10pub use relations::{HasMany, HasOne, BelongsTo};
11
12pub type Result<T> = std::result::Result<T, sqlx::Error>;
13
14pub use oxidite_macros::Model;
15pub use async_trait::async_trait;
16pub use chrono;
17pub use regex;
18pub use once_cell;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum DatabaseType {
23 Postgres,
24 MySql,
25 Sqlite,
26}
27
28#[derive(Debug, Clone)]
30pub struct PoolOptions {
31 pub max_connections: u32,
32 pub min_connections: u32,
33 pub connect_timeout: std::time::Duration,
34 pub idle_timeout: Option<std::time::Duration>,
35}
36
37impl Default for PoolOptions {
38 fn default() -> Self {
39 Self {
40 max_connections: 10,
41 min_connections: 0,
42 connect_timeout: std::time::Duration::from_secs(30),
43 idle_timeout: Some(std::time::Duration::from_secs(600)), }
45 }
46}
47
48#[async_trait]
50pub trait Database: Send + Sync + Debug {
51 fn db_type(&self) -> DatabaseType;
53
54 async fn execute(&self, query: &str) -> Result<u64>;
56
57 async fn query(&self, query: &str) -> Result<Vec<AnyRow>>;
59
60 async fn query_one(&self, query: &str) -> Result<Option<AnyRow>>;
62
63 async fn ping(&self) -> Result<()>;
65
66 async fn begin_transaction(&self) -> Result<DbTransaction>;
68
69 async fn execute_query<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<u64>;
71
72 async fn fetch_all<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Vec<AnyRow>>;
74
75 async fn fetch_one<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Option<AnyRow>>;
77}
78
79#[derive(Clone, Debug)]
81pub struct DbPool {
82 pool: AnyPool,
83 db_type: DatabaseType,
84}
85
86impl DbPool {
87 pub async fn connect(url: &str) -> Result<Self> {
88 Self::connect_with_options(url, PoolOptions::default()).await
89 }
90
91 pub async fn connect_with_options(url: &str, options: PoolOptions) -> Result<Self> {
92 sqlx::any::install_default_drivers();
93 let max_conns = if url.contains(":memory:") { 1 } else { options.max_connections };
94
95 let mut pool_options = AnyPoolOptions::new()
96 .max_connections(max_conns)
97 .min_connections(options.min_connections)
98 .acquire_timeout(options.connect_timeout);
99
100 if let Some(idle_timeout) = options.idle_timeout {
101 pool_options = pool_options.idle_timeout(idle_timeout);
102 }
103
104 let pool = pool_options.connect(url).await?;
105
106 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
107 DatabaseType::Postgres
108 } else if url.starts_with("mysql://") {
109 DatabaseType::MySql
110 } else if url.starts_with("sqlite://") {
111 DatabaseType::Sqlite
112 } else {
113 DatabaseType::Sqlite
119 };
120
121 Ok(Self { pool, db_type })
122 }
123}
124
125#[async_trait]
126impl Database for DbPool {
127 fn db_type(&self) -> DatabaseType {
128 self.db_type
129 }
130
131 async fn execute(&self, query: &str) -> Result<u64> {
132 let result = sqlx::query(query).execute(&self.pool).await?;
133 Ok(result.rows_affected())
134 }
135
136 async fn query(&self, query: &str) -> Result<Vec<AnyRow>> {
137 let rows = sqlx::query(query).fetch_all(&self.pool).await?;
138 Ok(rows)
139 }
140
141 async fn query_one(&self, query: &str) -> Result<Option<AnyRow>> {
142 let row = sqlx::query(query).fetch_optional(&self.pool).await?;
143 Ok(row)
144 }
145
146 async fn ping(&self) -> Result<()> {
147 sqlx::query("SELECT 1").execute(&self.pool).await?;
148 Ok(())
149 }
150
151 async fn begin_transaction(&self) -> Result<DbTransaction> {
152 let tx = self.pool.begin().await?;
153 Ok(DbTransaction { tx: Arc::new(Mutex::new(Some(tx))) })
154 }
155
156 async fn execute_query<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<u64> {
157 let result = query.execute(&self.pool).await?;
158 Ok(result.rows_affected())
159 }
160
161 async fn fetch_all<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Vec<AnyRow>> {
162 let rows = query.fetch_all(&self.pool).await?;
163 Ok(rows)
164 }
165
166 async fn fetch_one<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Option<AnyRow>> {
167 let row = query.fetch_optional(&self.pool).await?;
168 Ok(row)
169 }
170}
171
172use std::sync::Arc;
173use tokio::sync::Mutex;
174
175#[derive(Clone, Debug)]
177pub struct DbTransaction {
178 tx: Arc<Mutex<Option<Transaction<'static, sqlx::Any>>>>,
179}
180
181impl DbTransaction {
182 pub async fn execute(&self, query: &str) -> Result<u64> {
184 let mut lock = self.tx.lock().await;
185 if let Some(ref mut tx) = *lock {
186 let result = sqlx::query(query).execute(&mut **tx).await?;
187 Ok(result.rows_affected())
188 } else {
189 Err(sqlx::Error::PoolClosed)
190 }
191 }
192
193 pub async fn query(&self, query: &str) -> Result<Vec<AnyRow>> {
195 let mut lock = self.tx.lock().await;
196 if let Some(ref mut tx) = *lock {
197 let rows = sqlx::query(query).fetch_all(&mut **tx).await?;
198 Ok(rows)
199 } else {
200 Err(sqlx::Error::PoolClosed)
201 }
202 }
203
204 pub async fn query_one(&self, query: &str) -> Result<Option<AnyRow>> {
206 let mut lock = self.tx.lock().await;
207 if let Some(ref mut tx) = *lock {
208 let row = sqlx::query(query).fetch_optional(&mut **tx).await?;
209 Ok(row)
210 } else {
211 Err(sqlx::Error::PoolClosed)
212 }
213 }
214
215 pub async fn commit(self) -> Result<()> {
217 let mut lock = self.tx.lock().await;
218 if let Some(tx) = lock.take() {
219 tx.commit().await?;
220 }
221 Ok(())
222 }
223
224 pub async fn rollback(self) -> Result<()> {
226 let mut lock = self.tx.lock().await;
227 if let Some(tx) = lock.take() {
228 tx.rollback().await?;
229 }
230 Ok(())
231 }
232}
233
234#[async_trait]
235impl Database for DbTransaction {
236 fn db_type(&self) -> DatabaseType {
237 DatabaseType::Postgres
239 }
240
241 async fn execute(&self, query: &str) -> Result<u64> {
242 self.execute(query).await
243 }
244
245 async fn query(&self, query: &str) -> Result<Vec<AnyRow>> {
246 self.query(query).await
247 }
248
249 async fn query_one(&self, query: &str) -> Result<Option<AnyRow>> {
250 self.query_one(query).await
251 }
252
253 async fn ping(&self) -> Result<()> {
254 self.execute("SELECT 1").await?;
255 Ok(())
256 }
257
258 async fn begin_transaction(&self) -> Result<DbTransaction> {
259 Err(sqlx::Error::Configuration("Nested transactions not supported".into()))
262 }
263
264 async fn execute_query<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<u64> {
265 let mut lock = self.tx.lock().await;
266 if let Some(ref mut tx) = *lock {
267 let result = query.execute(&mut **tx).await?;
268 Ok(result.rows_affected())
269 } else {
270 Err(sqlx::Error::PoolClosed)
271 }
272 }
273
274 async fn fetch_all<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Vec<AnyRow>> {
275 let mut lock = self.tx.lock().await;
276 if let Some(ref mut tx) = *lock {
277 let rows = query.fetch_all(&mut **tx).await?;
278 Ok(rows)
279 } else {
280 Err(sqlx::Error::PoolClosed)
281 }
282 }
283
284 async fn fetch_one<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Option<AnyRow>> {
285 let mut lock = self.tx.lock().await;
286 if let Some(ref mut tx) = *lock {
287 let row = query.fetch_optional(&mut **tx).await?;
288 Ok(row)
289 } else {
290 Err(sqlx::Error::PoolClosed)
291 }
292 }
293}
294
295pub struct QueryBuilder {
297 table: String,
298 select_fields: Vec<String>,
299 where_clauses: Vec<String>,
300 order_by: Vec<String>,
301 limit: Option<usize>,
302 offset: Option<usize>,
303}
304
305impl QueryBuilder {
306 pub fn new(table: &str) -> Self {
307 Self {
308 table: table.to_string(),
309 select_fields: vec!["*".to_string()],
310 where_clauses: Vec::new(),
311 order_by: Vec::new(),
312 limit: None,
313 offset: None,
314 }
315 }
316
317 pub fn select(mut self, fields: &[&str]) -> Self {
318 self.select_fields = fields.iter().map(|s| s.to_string()).collect();
319 self
320 }
321
322 pub fn where_eq(mut self, column: &str, value: &str) -> Self {
323 self.where_clauses.push(format!("{} = '{}'", column, value));
324 self
325 }
326
327 pub fn order_by(mut self, column: &str, direction: &str) -> Self {
328 self.order_by.push(format!("{} {}", column, direction));
329 self
330 }
331
332 pub fn limit(mut self, limit: usize) -> Self {
333 self.limit = Some(limit);
334 self
335 }
336
337 pub fn offset(mut self, offset: usize) -> Self {
338 self.offset = Some(offset);
339 self
340 }
341
342 pub fn build(&self) -> String {
343 let mut query = format!("SELECT {} FROM {}", self.select_fields.join(", "), self.table);
344
345 if !self.where_clauses.is_empty() {
346 query.push_str(&format!(" WHERE {}", self.where_clauses.join(" AND ")));
347 }
348
349 if !self.order_by.is_empty() {
350 query.push_str(&format!(" ORDER BY {}", self.order_by.join(", ")));
351 }
352
353 if let Some(limit) = self.limit {
354 query.push_str(&format!(" LIMIT {}", limit));
355 }
356
357 if let Some(offset) = self.offset {
358 query.push_str(&format!(" OFFSET {}", offset));
359 }
360
361 query
362 }
363}
364#[async_trait]
366pub trait Model: Sized + Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, AnyRow> {
367 fn table_name() -> &'static str;
369
370 fn fields() -> &'static [&'static str];
372
373 fn has_soft_delete() -> bool {
375 false
376 }
377
378 async fn find(db: &impl Database, id: i64) -> Result<Option<Self>> {
380 let mut query = format!("SELECT * FROM {} WHERE id = {}", Self::table_name(), id);
381 if Self::has_soft_delete() {
382 query.push_str(" AND deleted_at IS NULL");
383 }
384 let row = db.query_one(&query).await?;
385
386 match row {
387 Some(row) => Ok(Some(Self::from_row(&row)?)),
388 None => Ok(None),
389 }
390 }
391
392 async fn all(db: &impl Database) -> Result<Vec<Self>> {
394 let mut query = format!("SELECT * FROM {}", Self::table_name());
395 if Self::has_soft_delete() {
396 query.push_str(" WHERE deleted_at IS NULL");
397 }
398 let rows = db.query(&query).await?;
399
400 let mut models = Vec::new();
401 for row in rows {
402 models.push(Self::from_row(&row)?);
403 }
404 Ok(models)
405 }
406
407 async fn create(&mut self, db: &impl Database) -> Result<()>;
409
410 async fn update(&mut self, db: &impl Database) -> Result<()>;
412
413 async fn delete(&self, db: &impl Database) -> Result<()>;
415
416 async fn force_delete(&self, db: &impl Database) -> Result<()>;
418
419 fn validate(&self) -> std::result::Result<(), String> {
421 Ok(())
422 }
423
424 async fn save(&mut self, db: &impl Database) -> Result<()> {
426 if let Err(e) = self.validate() {
427 return Err(sqlx::Error::Protocol(e.into()));
428 }
429 self.create(db).await
433 }
434}