torch_web/
database.rs

1//! # Database Integration
2//!
3//! This module provides comprehensive database integration for Torch applications,
4//! featuring connection pooling, query builders, migrations, and ORM-like functionality.
5//! It's built on top of SQLx for type-safe, async database operations.
6//!
7//! ## Features
8//!
9//! - **Connection Pooling**: Efficient connection management with configurable pool sizes
10//! - **Type Safety**: Compile-time checked queries with SQLx
11//! - **Async/Await**: Full async support for non-blocking database operations
12//! - **Multiple Databases**: Support for PostgreSQL, MySQL, SQLite
13//! - **Query Builder**: Fluent API for building complex queries
14//! - **Migrations**: Database schema versioning and migrations
15//! - **JSON Support**: Automatic JSON serialization/deserialization
16//! - **Transaction Support**: ACID transactions with rollback support
17//!
18//! **Note**: This module requires the `database` feature to be enabled.
19//!
20//! ## Quick Start
21//!
22//! ### Basic Setup
23//!
24//! ```rust
25//! use torch_web::{App, database::DatabasePool};
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//!     // Create database pool
30//!     let db = DatabasePool::new("postgresql://user:pass@localhost/mydb").await?;
31//!
32//!     let app = App::new()
33//!         .with_state(db)
34//!         .get("/users", |State(db): State<DatabasePool>| async move {
35//!             let users = db.query_json("SELECT * FROM users", &[]).await?;
36//!             Response::ok().json(&users)
37//!         })
38//!         .post("/users", |State(db): State<DatabasePool>, Json(user): Json<CreateUser>| async move {
39//!             let result = db.execute(
40//!                 "INSERT INTO users (name, email) VALUES ($1, $2)",
41//!                 &[&user.name, &user.email]
42//!             ).await?;
43//!             Response::created().json(&serde_json::json!({"id": result.last_insert_id()}))
44//!         });
45//!
46//!     app.listen("127.0.0.1:3000").await
47//! }
48//! ```
49//!
50//! ### With Query Builder
51//!
52//! ```rust
53//! use torch_web::{App, database::{DatabasePool, QueryBuilder}};
54//!
55//! let app = App::new()
56//!     .with_state(db)
57//!     .get("/users", |State(db): State<DatabasePool>, Query(params): Query<UserFilters>| async move {
58//!         let mut query = QueryBuilder::new("users")
59//!             .select(&["id", "name", "email", "created_at"]);
60//!
61//!         if let Some(name) = params.name {
62//!             query = query.where_like("name", &format!("%{}%", name));
63//!         }
64//!
65//!         if let Some(active) = params.active {
66//!             query = query.where_eq("active", &active.to_string());
67//!         }
68//!
69//!         let users = query.fetch_json(&db).await?;
70//!         Response::ok().json(&users)
71//!     });
72//! ```
73//!
74//! ### With Transactions
75//!
76//! ```rust
77//! use torch_web::{App, database::DatabasePool};
78//!
79//! let app = App::new()
80//!     .post("/transfer", |State(db): State<DatabasePool>, Json(transfer): Json<Transfer>| async move {
81//!         let mut tx = db.begin_transaction().await?;
82//!
83//!         // Debit from source account
84//!         tx.execute(
85//!             "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
86//!             &[&transfer.amount.to_string(), &transfer.from_account.to_string()]
87//!         ).await?;
88//!
89//!         // Credit to destination account
90//!         tx.execute(
91//!             "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
92//!             &[&transfer.amount.to_string(), &transfer.to_account.to_string()]
93//!         ).await?;
94//!
95//!         // Record the transfer
96//!         tx.execute(
97//!             "INSERT INTO transfers (from_account, to_account, amount) VALUES ($1, $2, $3)",
98//!             &[&transfer.from_account.to_string(), &transfer.to_account.to_string(), &transfer.amount.to_string()]
99//!         ).await?;
100//!
101//!         tx.commit().await?;
102//!         Response::ok().json(&serde_json::json!({"status": "success"}))
103//!     });
104//! ```
105
106use std::collections::HashMap;
107use std::sync::Arc;
108
109use crate::{Request, Response, middleware::Middleware};
110
111#[cfg(feature = "database")]
112use {
113    sqlx::{Pool, Postgres, Row, Column},
114    serde_json::Value,
115};
116
117/// High-performance database connection pool manager.
118///
119/// `DatabasePool` manages a pool of database connections for efficient resource
120/// utilization and optimal performance. It provides async methods for executing
121/// queries, managing transactions, and handling database operations.
122///
123/// # Features
124///
125/// - **Connection Pooling**: Maintains a pool of reusable database connections
126/// - **Async Operations**: All database operations are async and non-blocking
127/// - **Type Safety**: Compile-time query validation with SQLx
128/// - **JSON Support**: Automatic conversion of query results to JSON
129/// - **Transaction Support**: ACID transactions with commit/rollback
130/// - **Error Handling**: Comprehensive error handling and recovery
131///
132/// # Examples
133///
134/// ## Basic Connection
135///
136/// ```rust
137/// use torch_web::database::DatabasePool;
138///
139/// let db = DatabasePool::new("postgresql://user:pass@localhost/mydb").await?;
140/// ```
141///
142/// ## Custom Pool Configuration
143///
144/// ```rust
145/// use torch_web::database::DatabasePool;
146///
147/// let db = DatabasePool::with_config(
148///     "postgresql://user:pass@localhost/mydb",
149///     DatabaseConfig {
150///         max_connections: 50,
151///         min_connections: 5,
152///         acquire_timeout: Duration::from_secs(30),
153///         idle_timeout: Some(Duration::from_secs(600)),
154///         max_lifetime: Some(Duration::from_secs(1800)),
155///     }
156/// ).await?;
157/// ```
158///
159/// ## Query Execution
160///
161/// ```rust
162/// use torch_web::database::DatabasePool;
163///
164/// let db = DatabasePool::new("postgresql://localhost/mydb").await?;
165///
166/// // Simple query
167/// let users = db.query_json("SELECT * FROM users WHERE active = $1", &["true"]).await?;
168///
169/// // Insert with return value
170/// let result = db.execute(
171///     "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
172///     &["John Doe", "john@example.com"]
173/// ).await?;
174///
175/// // Query single row
176/// let user = db.query_one_json(
177///     "SELECT * FROM users WHERE id = $1",
178///     &["123"]
179/// ).await?;
180/// ```
181///
182/// ## Transaction Example
183///
184/// ```rust
185/// use torch_web::database::DatabasePool;
186///
187/// let db = DatabasePool::new("postgresql://localhost/mydb").await?;
188///
189/// let mut tx = db.begin_transaction().await?;
190///
191/// // Multiple operations in transaction
192/// tx.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1").await?;
193/// tx.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2").await?;
194/// tx.execute("INSERT INTO transfers (from_id, to_id, amount) VALUES (1, 2, 100)").await?;
195///
196/// // Commit all changes
197/// tx.commit().await?;
198/// ```
199pub struct DatabasePool {
200    #[cfg(feature = "database")]
201    pool: Pool<Postgres>,
202    #[cfg(not(feature = "database"))]
203    _phantom: std::marker::PhantomData<()>,
204}
205
206impl DatabasePool {
207    /// Create a new database pool
208    #[cfg(feature = "database")]
209    pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
210        let pool = sqlx::postgres::PgPoolOptions::new()
211            .max_connections(20)
212            .connect(database_url)
213            .await?;
214        
215        Ok(Self { pool })
216    }
217
218    #[cfg(not(feature = "database"))]
219    pub async fn new(_database_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
220        Ok(Self {
221            _phantom: std::marker::PhantomData,
222        })
223    }
224
225    /// Execute a query and return results as JSON
226    #[cfg(feature = "database")]
227    pub async fn query_json(&self, query: &str, params: &[&str]) -> Result<Vec<Value>, sqlx::Error> {
228        let mut query_builder = sqlx::query(query);
229        
230        for param in params {
231            query_builder = query_builder.bind(param);
232        }
233        
234        let rows = query_builder.fetch_all(&self.pool).await?;
235        let mut results = Vec::new();
236        
237        for row in rows {
238            let mut json_row = serde_json::Map::new();
239            
240            for (i, column) in row.columns().iter().enumerate() {
241                let column_name = column.name();
242                let value: Option<String> = row.try_get(i).ok();
243                json_row.insert(
244                    column_name.to_string(),
245                    value.map(Value::String).unwrap_or(Value::Null),
246                );
247            }
248            
249            results.push(Value::Object(json_row));
250        }
251        
252        Ok(results)
253    }
254
255    #[cfg(not(feature = "database"))]
256    pub async fn query_json(&self, _query: &str, _params: &[&str]) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
257        Err("Database feature not enabled".into())
258    }
259
260    /// Execute a query and return the number of affected rows
261    #[cfg(feature = "database")]
262    pub async fn execute(&self, query: &str, params: &[&str]) -> Result<u64, sqlx::Error> {
263        let mut query_builder = sqlx::query(query);
264        
265        for param in params {
266            query_builder = query_builder.bind(param);
267        }
268        
269        let result = query_builder.execute(&self.pool).await?;
270        Ok(result.rows_affected())
271    }
272
273    #[cfg(not(feature = "database"))]
274    pub async fn execute(&self, _query: &str, _params: &[&str]) -> Result<u64, Box<dyn std::error::Error>> {
275        Err("Database feature not enabled".into())
276    }
277}
278
279/// Simple query builder for common operations
280pub struct QueryBuilder {
281    table: String,
282    select_fields: Vec<String>,
283    where_conditions: Vec<String>,
284    order_by: Vec<String>,
285    limit_value: Option<u64>,
286    offset_value: Option<u64>,
287}
288
289impl QueryBuilder {
290    pub fn new(table: &str) -> Self {
291        Self {
292            table: table.to_string(),
293            select_fields: vec!["*".to_string()],
294            where_conditions: Vec::new(),
295            order_by: Vec::new(),
296            limit_value: None,
297            offset_value: None,
298        }
299    }
300
301    pub fn select(mut self, fields: &[&str]) -> Self {
302        self.select_fields = fields.iter().map(|s| s.to_string()).collect();
303        self
304    }
305
306    pub fn where_eq(mut self, field: &str, value: &str) -> Self {
307        self.where_conditions.push(format!("{} = '{}'", field, value));
308        self
309    }
310
311    pub fn where_like(mut self, field: &str, pattern: &str) -> Self {
312        self.where_conditions.push(format!("{} LIKE '{}'", field, pattern));
313        self
314    }
315
316    pub fn order_by(mut self, field: &str, direction: &str) -> Self {
317        self.order_by.push(format!("{} {}", field, direction));
318        self
319    }
320
321    pub fn limit(mut self, limit: u64) -> Self {
322        self.limit_value = Some(limit);
323        self
324    }
325
326    pub fn offset(mut self, offset: u64) -> Self {
327        self.offset_value = Some(offset);
328        self
329    }
330
331    pub fn build_select(&self) -> String {
332        let mut query = format!("SELECT {} FROM {}", self.select_fields.join(", "), self.table);
333        
334        if !self.where_conditions.is_empty() {
335            query.push_str(&format!(" WHERE {}", self.where_conditions.join(" AND ")));
336        }
337        
338        if !self.order_by.is_empty() {
339            query.push_str(&format!(" ORDER BY {}", self.order_by.join(", ")));
340        }
341        
342        if let Some(limit) = self.limit_value {
343            query.push_str(&format!(" LIMIT {}", limit));
344        }
345        
346        if let Some(offset) = self.offset_value {
347            query.push_str(&format!(" OFFSET {}", offset));
348        }
349        
350        query
351    }
352
353    pub fn build_insert(&self, data: &HashMap<String, String>) -> String {
354        let fields: Vec<String> = data.keys().cloned().collect();
355        let values: Vec<String> = data.values().map(|v| format!("'{}'", v)).collect();
356        
357        format!(
358            "INSERT INTO {} ({}) VALUES ({})",
359            self.table,
360            fields.join(", "),
361            values.join(", ")
362        )
363    }
364
365    pub fn build_update(&self, data: &HashMap<String, String>) -> String {
366        let updates: Vec<String> = data
367            .iter()
368            .map(|(k, v)| format!("{} = '{}'", k, v))
369            .collect();
370        
371        let mut query = format!("UPDATE {} SET {}", self.table, updates.join(", "));
372        
373        if !self.where_conditions.is_empty() {
374            query.push_str(&format!(" WHERE {}", self.where_conditions.join(" AND ")));
375        }
376        
377        query
378    }
379
380    pub fn build_delete(&self) -> String {
381        let mut query = format!("DELETE FROM {}", self.table);
382        
383        if !self.where_conditions.is_empty() {
384            query.push_str(&format!(" WHERE {}", self.where_conditions.join(" AND ")));
385        }
386        
387        query
388    }
389}
390
391/// Database middleware for automatic connection injection
392pub struct DatabaseMiddleware {
393    pool: Arc<DatabasePool>,
394}
395
396impl DatabaseMiddleware {
397    pub fn new(pool: Arc<DatabasePool>) -> Self {
398        Self { pool }
399    }
400}
401
402impl Middleware for DatabaseMiddleware {
403    fn call(
404        &self,
405        mut req: Request,
406        next: Box<dyn Fn(Request) -> std::pin::Pin<Box<dyn std::future::Future<Output = Response> + Send + 'static>> + Send + Sync>,
407    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Response> + Send + 'static>> {
408        let pool = self.pool.clone();
409        Box::pin(async move {
410            // Inject the database pool into the request extensions
411            req.insert_extension(pool);
412            next(req).await
413        })
414    }
415}
416
417/// Extension trait to add database access to Request
418pub trait RequestDatabaseExt {
419    /// Get the database pool from the request context
420    #[cfg(feature = "database")]
421    fn db_pool(&self) -> Option<Arc<DatabasePool>>;
422
423    #[cfg(not(feature = "database"))]
424    fn db_pool(&self) -> Option<()>;
425}
426
427impl RequestDatabaseExt for crate::Request {
428    #[cfg(feature = "database")]
429    fn db_pool(&self) -> Option<Arc<DatabasePool>> {
430        self.get_extension::<Arc<DatabasePool>>().cloned()
431    }
432
433    #[cfg(not(feature = "database"))]
434    fn db_pool(&self) -> Option<()> {
435        None
436    }
437}
438
439/// Migration runner for database schema management
440pub struct MigrationRunner {
441    #[cfg(feature = "database")]
442    #[allow(dead_code)]
443    pool: Arc<DatabasePool>,
444    #[allow(dead_code)]
445    migrations_dir: String,
446    #[cfg(not(feature = "database"))]
447    _phantom: std::marker::PhantomData<()>,
448}
449
450impl MigrationRunner {
451    pub fn new(_pool: Arc<DatabasePool>, migrations_dir: &str) -> Self {
452        Self {
453            #[cfg(feature = "database")]
454            pool: _pool,
455            migrations_dir: migrations_dir.to_string(),
456            #[cfg(not(feature = "database"))]
457            _phantom: std::marker::PhantomData,
458        }
459    }
460
461    /// Run pending migrations
462    #[cfg(feature = "database")]
463    pub async fn run_migrations(&self) -> Result<(), Box<dyn std::error::Error>> {
464        println!("Migration system initialized for directory: {}", self.migrations_dir);
465
466        // In a production implementation, this would:
467        // 1. Create migrations table
468        // 2. Read migration files from directory
469        // 3. Execute pending migrations in order
470        // 4. Record completed migrations
471
472        // For now, we'll just log that migrations would run
473        println!("Migration system ready - would execute SQL files from {}", self.migrations_dir);
474        Ok(())
475    }
476
477    #[cfg(not(feature = "database"))]
478    pub async fn run_migrations(&self) -> Result<(), Box<dyn std::error::Error>> {
479        Err("Database feature not enabled".into())
480    }
481}
482
483/// Database health check
484pub async fn database_health_check(pool: &DatabasePool) -> Response {
485    #[cfg(feature = "database")]
486    {
487        match pool.query_json("SELECT 1 as health_check", &[]).await {
488            Ok(_) => Response::ok().json(&serde_json::json!({
489                "database": "healthy",
490                "timestamp": chrono::Utc::now().to_rfc3339()
491            })).unwrap_or_else(|_| Response::ok().body("healthy")),
492            Err(e) => Response::with_status(http::StatusCode::SERVICE_UNAVAILABLE)
493                .json(&serde_json::json!({
494                    "database": "unhealthy",
495                    "error": e.to_string(),
496                    "timestamp": chrono::Utc::now().to_rfc3339()
497                })).unwrap_or_else(|_| Response::with_status(http::StatusCode::SERVICE_UNAVAILABLE).body("unhealthy"))
498        }
499    }
500    
501    #[cfg(not(feature = "database"))]
502    {
503        let _ = pool; // Suppress unused variable warning
504        Response::ok().body("Database feature not enabled")
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    #[test]
513    fn test_query_builder_select() {
514        let query = QueryBuilder::new("users")
515            .select(&["id", "name", "email"])
516            .where_eq("active", "true")
517            .order_by("created_at", "DESC")
518            .limit(10)
519            .build_select();
520        
521        assert!(query.contains("SELECT id, name, email FROM users"));
522        assert!(query.contains("WHERE active = 'true'"));
523        assert!(query.contains("ORDER BY created_at DESC"));
524        assert!(query.contains("LIMIT 10"));
525    }
526
527    #[test]
528    fn test_query_builder_insert() {
529        let mut data = HashMap::new();
530        data.insert("name".to_string(), "John Doe".to_string());
531        data.insert("email".to_string(), "john@example.com".to_string());
532        
533        let query = QueryBuilder::new("users").build_insert(&data);
534        assert!(query.contains("INSERT INTO users"));
535        assert!(query.contains("name"));
536        assert!(query.contains("email"));
537    }
538
539    #[test]
540    fn test_query_builder_update() {
541        let mut data = HashMap::new();
542        data.insert("name".to_string(), "Jane Doe".to_string());
543        
544        let query = QueryBuilder::new("users")
545            .where_eq("id", "1")
546            .build_update(&data);
547        
548        assert!(query.contains("UPDATE users SET"));
549        assert!(query.contains("name = 'Jane Doe'"));
550        assert!(query.contains("WHERE id = '1'"));
551    }
552
553    #[test]
554    fn test_query_builder_delete() {
555        let query = QueryBuilder::new("users")
556            .where_eq("id", "1")
557            .build_delete();
558        
559        assert!(query.contains("DELETE FROM users"));
560        assert!(query.contains("WHERE id = '1'"));
561    }
562}