oxidite_db/
lib.rs

1use sqlx::{any::{AnyPoolOptions, AnyRow}, AnyPool, Transaction};
2use std::fmt::Debug;
3
4pub use sqlx;
5
6pub mod migrations;
7pub use migrations::{Migration, MigrationManager};
8
9pub mod relations;
10pub use relations::{HasMany, HasOne, BelongsTo};
11
12pub type Result<T> = std::result::Result<T, sqlx::Error>;
13
14pub use oxidite_macros::Model;
15pub use async_trait::async_trait;
16pub use chrono;
17pub use regex;
18pub use once_cell;
19
20/// Database backend type
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum DatabaseType {
23    Postgres,
24    MySql,
25    Sqlite,
26}
27
28/// Connection pool configuration
29#[derive(Debug, Clone)]
30pub struct PoolOptions {
31    pub max_connections: u32,
32    pub min_connections: u32,
33    pub connect_timeout: std::time::Duration,
34    pub idle_timeout: Option<std::time::Duration>,
35}
36
37impl Default for PoolOptions {
38    fn default() -> Self {
39        Self {
40            max_connections: 10,
41            min_connections: 0,
42            connect_timeout: std::time::Duration::from_secs(30),
43            idle_timeout: Some(std::time::Duration::from_secs(600)), // 10 minutes
44        }
45    }
46}
47
48/// Common database trait
49#[async_trait]
50pub trait Database: Send + Sync + Debug {
51    /// Get the database type
52    fn db_type(&self) -> DatabaseType;
53
54    /// Execute a query
55    async fn execute(&self, query: &str) -> Result<u64>;
56
57    /// Query multiple rows
58    async fn query(&self, query: &str) -> Result<Vec<AnyRow>>;
59
60    /// Query one row
61    async fn query_one(&self, query: &str) -> Result<Option<AnyRow>>;
62
63    /// Check health
64    async fn ping(&self) -> Result<()>;
65    
66    /// Begin a transaction
67    async fn begin_transaction(&self) -> Result<DbTransaction>;
68
69    /// Execute a sqlx Query
70    async fn execute_query<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<u64>;
71
72    /// Fetch all from a sqlx Query
73    async fn fetch_all<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Vec<AnyRow>>;
74
75    /// Fetch one from a sqlx Query
76    async fn fetch_one<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Option<AnyRow>>;
77}
78
79/// Database connection pool wrapper
80#[derive(Clone, Debug)]
81pub struct DbPool {
82    pool: AnyPool,
83    db_type: DatabaseType,
84}
85
86impl DbPool {
87    pub async fn connect(url: &str) -> Result<Self> {
88        Self::connect_with_options(url, PoolOptions::default()).await
89    }
90    
91    pub async fn connect_with_options(url: &str, options: PoolOptions) -> Result<Self> {
92        sqlx::any::install_default_drivers();
93        let max_conns = if url.contains(":memory:") { 1 } else { options.max_connections };
94        
95        let mut pool_options = AnyPoolOptions::new()
96            .max_connections(max_conns)
97            .min_connections(options.min_connections)
98            .acquire_timeout(options.connect_timeout);
99        
100        if let Some(idle_timeout) = options.idle_timeout {
101            pool_options = pool_options.idle_timeout(idle_timeout);
102        }
103        
104        let pool = pool_options.connect(url).await?;
105        
106        let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
107            DatabaseType::Postgres
108        } else if url.starts_with("mysql://") {
109            DatabaseType::MySql
110        } else if url.starts_with("sqlite://") {
111            DatabaseType::Sqlite
112        } else {
113            // Default or unknown, maybe panic or error? 
114            // For AnyPool, the scheme matters.
115            // Let's assume sqlite if not specified? No, AnyPool needs scheme.
116            // We can try to infer from the pool kind but AnyPool hides it well.
117            // Let's just rely on the URL scheme for now.
118            DatabaseType::Sqlite 
119        };
120
121        Ok(Self { pool, db_type })
122    }
123}
124
125#[async_trait]
126impl Database for DbPool {
127    fn db_type(&self) -> DatabaseType {
128        self.db_type
129    }
130
131    async fn execute(&self, query: &str) -> Result<u64> {
132        let result = sqlx::query(query).execute(&self.pool).await?;
133        Ok(result.rows_affected())
134    }
135
136    async fn query(&self, query: &str) -> Result<Vec<AnyRow>> {
137        let rows = sqlx::query(query).fetch_all(&self.pool).await?;
138        Ok(rows)
139    }
140
141    async fn query_one(&self, query: &str) -> Result<Option<AnyRow>> {
142        let row = sqlx::query(query).fetch_optional(&self.pool).await?;
143        Ok(row)
144    }
145
146    async fn ping(&self) -> Result<()> {
147        sqlx::query("SELECT 1").execute(&self.pool).await?;
148        Ok(())
149    }
150    
151    async fn begin_transaction(&self) -> Result<DbTransaction> {
152        let tx = self.pool.begin().await?;
153        Ok(DbTransaction { tx: Arc::new(Mutex::new(Some(tx))) })
154    }
155
156    async fn execute_query<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<u64> {
157        let result = query.execute(&self.pool).await?;
158        Ok(result.rows_affected())
159    }
160
161    async fn fetch_all<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Vec<AnyRow>> {
162        let rows = query.fetch_all(&self.pool).await?;
163        Ok(rows)
164    }
165
166    async fn fetch_one<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Option<AnyRow>> {
167        let row = query.fetch_optional(&self.pool).await?;
168        Ok(row)
169    }
170}
171
172use std::sync::Arc;
173use tokio::sync::Mutex;
174
175/// Database transaction
176#[derive(Clone, Debug)]
177pub struct DbTransaction {
178    tx: Arc<Mutex<Option<Transaction<'static, sqlx::Any>>>>,
179}
180
181impl DbTransaction {
182    /// Execute a query within the transaction
183    pub async fn execute(&self, query: &str) -> Result<u64> {
184        let mut lock = self.tx.lock().await;
185        if let Some(ref mut tx) = *lock {
186            let result = sqlx::query(query).execute(&mut **tx).await?;
187            Ok(result.rows_affected())
188        } else {
189            Err(sqlx::Error::PoolClosed)
190        }
191    }
192
193    /// Query multiple rows within the transaction
194    pub async fn query(&self, query: &str) -> Result<Vec<AnyRow>> {
195        let mut lock = self.tx.lock().await;
196        if let Some(ref mut tx) = *lock {
197            let rows = sqlx::query(query).fetch_all(&mut **tx).await?;
198            Ok(rows)
199        } else {
200            Err(sqlx::Error::PoolClosed)
201        }
202    }
203
204    /// Query one row within the transaction
205    pub async fn query_one(&self, query: &str) -> Result<Option<AnyRow>> {
206        let mut lock = self.tx.lock().await;
207        if let Some(ref mut tx) = *lock {
208            let row = sqlx::query(query).fetch_optional(&mut **tx).await?;
209            Ok(row)
210        } else {
211            Err(sqlx::Error::PoolClosed)
212        }
213    }
214
215    /// Commit the transaction
216    pub async fn commit(self) -> Result<()> {
217        let mut lock = self.tx.lock().await;
218        if let Some(tx) = lock.take() {
219            tx.commit().await?;
220        }
221        Ok(())
222    }
223
224    /// Rollback the transaction
225    pub async fn rollback(self) -> Result<()> {
226        let mut lock = self.tx.lock().await;
227        if let Some(tx) = lock.take() {
228            tx.rollback().await?;
229        }
230        Ok(())
231    }
232}
233
234#[async_trait]
235impl Database for DbTransaction {
236    fn db_type(&self) -> DatabaseType {
237        // Still a placeholder, but consistent
238        DatabaseType::Postgres 
239    }
240
241    async fn execute(&self, query: &str) -> Result<u64> {
242        self.execute(query).await
243    }
244
245    async fn query(&self, query: &str) -> Result<Vec<AnyRow>> {
246        self.query(query).await
247    }
248
249    async fn query_one(&self, query: &str) -> Result<Option<AnyRow>> {
250        self.query_one(query).await
251    }
252
253    async fn ping(&self) -> Result<()> {
254        self.execute("SELECT 1").await?;
255        Ok(())
256    }
257    
258    async fn begin_transaction(&self) -> Result<DbTransaction> {
259        // Nested transactions not supported by this simple wrapper yet
260        // Could use savepoints if needed.
261        Err(sqlx::Error::Configuration("Nested transactions not supported".into()))
262    }
263
264    async fn execute_query<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<u64> {
265        let mut lock = self.tx.lock().await;
266        if let Some(ref mut tx) = *lock {
267            let result = query.execute(&mut **tx).await?;
268            Ok(result.rows_affected())
269        } else {
270            Err(sqlx::Error::PoolClosed)
271        }
272    }
273
274    async fn fetch_all<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Vec<AnyRow>> {
275        let mut lock = self.tx.lock().await;
276        if let Some(ref mut tx) = *lock {
277            let rows = query.fetch_all(&mut **tx).await?;
278            Ok(rows)
279        } else {
280            Err(sqlx::Error::PoolClosed)
281        }
282    }
283
284    async fn fetch_one<'q>(&self, query: sqlx::query::Query<'q, sqlx::Any, sqlx::any::AnyArguments<'q>>) -> Result<Option<AnyRow>> {
285        let mut lock = self.tx.lock().await;
286        if let Some(ref mut tx) = *lock {
287            let row = query.fetch_optional(&mut **tx).await?;
288            Ok(row)
289        } else {
290            Err(sqlx::Error::PoolClosed)
291        }
292    }
293}
294
295/// Query builder (simplified for now)
296pub struct QueryBuilder {
297    table: String,
298    select_fields: Vec<String>,
299    where_clauses: Vec<String>,
300    order_by: Vec<String>,
301    limit: Option<usize>,
302    offset: Option<usize>,
303}
304
305impl QueryBuilder {
306    pub fn new(table: &str) -> Self {
307        Self {
308            table: table.to_string(),
309            select_fields: vec!["*".to_string()],
310            where_clauses: Vec::new(),
311            order_by: Vec::new(),
312            limit: None,
313            offset: None,
314        }
315    }
316
317    pub fn select(mut self, fields: &[&str]) -> Self {
318        self.select_fields = fields.iter().map(|s| s.to_string()).collect();
319        self
320    }
321
322    pub fn where_eq(mut self, column: &str, value: &str) -> Self {
323        self.where_clauses.push(format!("{} = '{}'", column, value));
324        self
325    }
326
327    pub fn order_by(mut self, column: &str, direction: &str) -> Self {
328        self.order_by.push(format!("{} {}", column, direction));
329        self
330    }
331
332    pub fn limit(mut self, limit: usize) -> Self {
333        self.limit = Some(limit);
334        self
335    }
336
337    pub fn offset(mut self, offset: usize) -> Self {
338        self.offset = Some(offset);
339        self
340    }
341
342    pub fn build(&self) -> String {
343        let mut query = format!("SELECT {} FROM {}", self.select_fields.join(", "), self.table);
344
345        if !self.where_clauses.is_empty() {
346            query.push_str(&format!(" WHERE {}", self.where_clauses.join(" AND ")));
347        }
348
349        if !self.order_by.is_empty() {
350            query.push_str(&format!(" ORDER BY {}", self.order_by.join(", ")));
351        }
352
353        if let Some(limit) = self.limit {
354            query.push_str(&format!(" LIMIT {}", limit));
355        }
356
357        if let Some(offset) = self.offset {
358            query.push_str(&format!(" OFFSET {}", offset));
359        }
360
361        query
362    }
363}
364/// Model trait for database entities
365#[async_trait]
366pub trait Model: Sized + Send + Sync + Unpin + for<'r> sqlx::FromRow<'r, AnyRow> {
367    /// Get the table name
368    fn table_name() -> &'static str;
369
370    /// Get the list of fields (columns)
371    fn fields() -> &'static [&'static str];
372
373    /// Check if the model supports soft deletes
374    fn has_soft_delete() -> bool {
375        false
376    }
377
378    /// Find a record by ID
379    async fn find(db: &impl Database, id: i64) -> Result<Option<Self>> {
380        let mut query = format!("SELECT * FROM {} WHERE id = {}", Self::table_name(), id);
381        if Self::has_soft_delete() {
382            query.push_str(" AND deleted_at IS NULL");
383        }
384        let row = db.query_one(&query).await?;
385        
386        match row {
387            Some(row) => Ok(Some(Self::from_row(&row)?)),
388            None => Ok(None),
389        }
390    }
391
392    /// Find all records
393    async fn all(db: &impl Database) -> Result<Vec<Self>> {
394        let mut query = format!("SELECT * FROM {}", Self::table_name());
395        if Self::has_soft_delete() {
396            query.push_str(" WHERE deleted_at IS NULL");
397        }
398        let rows = db.query(&query).await?;
399        
400        let mut models = Vec::new();
401        for row in rows {
402            models.push(Self::from_row(&row)?);
403        }
404        Ok(models)
405    }
406    
407    /// Create a new record
408    async fn create(&mut self, db: &impl Database) -> Result<()>;
409
410    /// Update an existing record
411    async fn update(&mut self, db: &impl Database) -> Result<()>;
412
413    /// Delete the record (soft delete if supported, otherwise hard delete)
414    async fn delete(&self, db: &impl Database) -> Result<()>;
415    
416    /// Force delete the record (hard delete)
417    async fn force_delete(&self, db: &impl Database) -> Result<()>;
418    
419    /// Validate the model fields
420    fn validate(&self) -> std::result::Result<(), String> {
421        Ok(())
422    }
423
424    /// Save (create or update)
425    async fn save(&mut self, db: &impl Database) -> Result<()> {
426        if let Err(e) = self.validate() {
427            return Err(sqlx::Error::Protocol(e.into()));
428        }
429        // This default impl is tricky without knowing if it's new.
430        // For now, let's leave it to the user or macro to decide.
431        // But we can provide a helper if we had an `is_new()` method.
432        self.create(db).await
433    }
434}