1use std::collections::HashMap;
107use std::sync::Arc;
108
109use crate::{Request, Response, middleware::Middleware};
110
111#[cfg(feature = "database")]
112use {
113 sqlx::{Pool, Postgres, Row, Column},
114 serde_json::Value,
115};
116
117pub struct DatabasePool {
200 #[cfg(feature = "database")]
201 pool: Pool<Postgres>,
202 #[cfg(not(feature = "database"))]
203 _phantom: std::marker::PhantomData<()>,
204}
205
206impl DatabasePool {
207 #[cfg(feature = "database")]
209 pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
210 let pool = sqlx::postgres::PgPoolOptions::new()
211 .max_connections(20)
212 .connect(database_url)
213 .await?;
214
215 Ok(Self { pool })
216 }
217
218 #[cfg(not(feature = "database"))]
219 pub async fn new(_database_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
220 Ok(Self {
221 _phantom: std::marker::PhantomData,
222 })
223 }
224
225 #[cfg(feature = "database")]
227 pub async fn query_json(&self, query: &str, params: &[&str]) -> Result<Vec<Value>, sqlx::Error> {
228 let mut query_builder = sqlx::query(query);
229
230 for param in params {
231 query_builder = query_builder.bind(param);
232 }
233
234 let rows = query_builder.fetch_all(&self.pool).await?;
235 let mut results = Vec::new();
236
237 for row in rows {
238 let mut json_row = serde_json::Map::new();
239
240 for (i, column) in row.columns().iter().enumerate() {
241 let column_name = column.name();
242 let value: Option<String> = row.try_get(i).ok();
243 json_row.insert(
244 column_name.to_string(),
245 value.map(Value::String).unwrap_or(Value::Null),
246 );
247 }
248
249 results.push(Value::Object(json_row));
250 }
251
252 Ok(results)
253 }
254
255 #[cfg(not(feature = "database"))]
256 pub async fn query_json(&self, _query: &str, _params: &[&str]) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
257 Err("Database feature not enabled".into())
258 }
259
260 #[cfg(feature = "database")]
262 pub async fn execute(&self, query: &str, params: &[&str]) -> Result<u64, sqlx::Error> {
263 let mut query_builder = sqlx::query(query);
264
265 for param in params {
266 query_builder = query_builder.bind(param);
267 }
268
269 let result = query_builder.execute(&self.pool).await?;
270 Ok(result.rows_affected())
271 }
272
273 #[cfg(not(feature = "database"))]
274 pub async fn execute(&self, _query: &str, _params: &[&str]) -> Result<u64, Box<dyn std::error::Error>> {
275 Err("Database feature not enabled".into())
276 }
277}
278
279pub struct QueryBuilder {
281 table: String,
282 select_fields: Vec<String>,
283 where_conditions: Vec<String>,
284 order_by: Vec<String>,
285 limit_value: Option<u64>,
286 offset_value: Option<u64>,
287}
288
289impl QueryBuilder {
290 pub fn new(table: &str) -> Self {
291 Self {
292 table: table.to_string(),
293 select_fields: vec!["*".to_string()],
294 where_conditions: Vec::new(),
295 order_by: Vec::new(),
296 limit_value: None,
297 offset_value: None,
298 }
299 }
300
301 pub fn select(mut self, fields: &[&str]) -> Self {
302 self.select_fields = fields.iter().map(|s| s.to_string()).collect();
303 self
304 }
305
306 pub fn where_eq(mut self, field: &str, value: &str) -> Self {
307 self.where_conditions.push(format!("{} = '{}'", field, value));
308 self
309 }
310
311 pub fn where_like(mut self, field: &str, pattern: &str) -> Self {
312 self.where_conditions.push(format!("{} LIKE '{}'", field, pattern));
313 self
314 }
315
316 pub fn order_by(mut self, field: &str, direction: &str) -> Self {
317 self.order_by.push(format!("{} {}", field, direction));
318 self
319 }
320
321 pub fn limit(mut self, limit: u64) -> Self {
322 self.limit_value = Some(limit);
323 self
324 }
325
326 pub fn offset(mut self, offset: u64) -> Self {
327 self.offset_value = Some(offset);
328 self
329 }
330
331 pub fn build_select(&self) -> String {
332 let mut query = format!("SELECT {} FROM {}", self.select_fields.join(", "), self.table);
333
334 if !self.where_conditions.is_empty() {
335 query.push_str(&format!(" WHERE {}", self.where_conditions.join(" AND ")));
336 }
337
338 if !self.order_by.is_empty() {
339 query.push_str(&format!(" ORDER BY {}", self.order_by.join(", ")));
340 }
341
342 if let Some(limit) = self.limit_value {
343 query.push_str(&format!(" LIMIT {}", limit));
344 }
345
346 if let Some(offset) = self.offset_value {
347 query.push_str(&format!(" OFFSET {}", offset));
348 }
349
350 query
351 }
352
353 pub fn build_insert(&self, data: &HashMap<String, String>) -> String {
354 let fields: Vec<String> = data.keys().cloned().collect();
355 let values: Vec<String> = data.values().map(|v| format!("'{}'", v)).collect();
356
357 format!(
358 "INSERT INTO {} ({}) VALUES ({})",
359 self.table,
360 fields.join(", "),
361 values.join(", ")
362 )
363 }
364
365 pub fn build_update(&self, data: &HashMap<String, String>) -> String {
366 let updates: Vec<String> = data
367 .iter()
368 .map(|(k, v)| format!("{} = '{}'", k, v))
369 .collect();
370
371 let mut query = format!("UPDATE {} SET {}", self.table, updates.join(", "));
372
373 if !self.where_conditions.is_empty() {
374 query.push_str(&format!(" WHERE {}", self.where_conditions.join(" AND ")));
375 }
376
377 query
378 }
379
380 pub fn build_delete(&self) -> String {
381 let mut query = format!("DELETE FROM {}", self.table);
382
383 if !self.where_conditions.is_empty() {
384 query.push_str(&format!(" WHERE {}", self.where_conditions.join(" AND ")));
385 }
386
387 query
388 }
389}
390
391pub struct DatabaseMiddleware {
393 pool: Arc<DatabasePool>,
394}
395
396impl DatabaseMiddleware {
397 pub fn new(pool: Arc<DatabasePool>) -> Self {
398 Self { pool }
399 }
400}
401
402impl Middleware for DatabaseMiddleware {
403 fn call(
404 &self,
405 mut req: Request,
406 next: Box<dyn Fn(Request) -> std::pin::Pin<Box<dyn std::future::Future<Output = Response> + Send + 'static>> + Send + Sync>,
407 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Response> + Send + 'static>> {
408 let pool = self.pool.clone();
409 Box::pin(async move {
410 req.insert_extension(pool);
412 next(req).await
413 })
414 }
415}
416
417pub trait RequestDatabaseExt {
419 #[cfg(feature = "database")]
421 fn db_pool(&self) -> Option<Arc<DatabasePool>>;
422
423 #[cfg(not(feature = "database"))]
424 fn db_pool(&self) -> Option<()>;
425}
426
427impl RequestDatabaseExt for crate::Request {
428 #[cfg(feature = "database")]
429 fn db_pool(&self) -> Option<Arc<DatabasePool>> {
430 self.get_extension::<Arc<DatabasePool>>().cloned()
431 }
432
433 #[cfg(not(feature = "database"))]
434 fn db_pool(&self) -> Option<()> {
435 None
436 }
437}
438
439pub struct MigrationRunner {
441 #[cfg(feature = "database")]
442 #[allow(dead_code)]
443 pool: Arc<DatabasePool>,
444 #[allow(dead_code)]
445 migrations_dir: String,
446 #[cfg(not(feature = "database"))]
447 _phantom: std::marker::PhantomData<()>,
448}
449
450impl MigrationRunner {
451 pub fn new(_pool: Arc<DatabasePool>, migrations_dir: &str) -> Self {
452 Self {
453 #[cfg(feature = "database")]
454 pool: _pool,
455 migrations_dir: migrations_dir.to_string(),
456 #[cfg(not(feature = "database"))]
457 _phantom: std::marker::PhantomData,
458 }
459 }
460
461 #[cfg(feature = "database")]
463 pub async fn run_migrations(&self) -> Result<(), Box<dyn std::error::Error>> {
464 println!("Migration system initialized for directory: {}", self.migrations_dir);
465
466 println!("Migration system ready - would execute SQL files from {}", self.migrations_dir);
474 Ok(())
475 }
476
477 #[cfg(not(feature = "database"))]
478 pub async fn run_migrations(&self) -> Result<(), Box<dyn std::error::Error>> {
479 Err("Database feature not enabled".into())
480 }
481}
482
483pub async fn database_health_check(pool: &DatabasePool) -> Response {
485 #[cfg(feature = "database")]
486 {
487 match pool.query_json("SELECT 1 as health_check", &[]).await {
488 Ok(_) => Response::ok().json(&serde_json::json!({
489 "database": "healthy",
490 "timestamp": chrono::Utc::now().to_rfc3339()
491 })).unwrap_or_else(|_| Response::ok().body("healthy")),
492 Err(e) => Response::with_status(http::StatusCode::SERVICE_UNAVAILABLE)
493 .json(&serde_json::json!({
494 "database": "unhealthy",
495 "error": e.to_string(),
496 "timestamp": chrono::Utc::now().to_rfc3339()
497 })).unwrap_or_else(|_| Response::with_status(http::StatusCode::SERVICE_UNAVAILABLE).body("unhealthy"))
498 }
499 }
500
501 #[cfg(not(feature = "database"))]
502 {
503 let _ = pool; Response::ok().body("Database feature not enabled")
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511
512 #[test]
513 fn test_query_builder_select() {
514 let query = QueryBuilder::new("users")
515 .select(&["id", "name", "email"])
516 .where_eq("active", "true")
517 .order_by("created_at", "DESC")
518 .limit(10)
519 .build_select();
520
521 assert!(query.contains("SELECT id, name, email FROM users"));
522 assert!(query.contains("WHERE active = 'true'"));
523 assert!(query.contains("ORDER BY created_at DESC"));
524 assert!(query.contains("LIMIT 10"));
525 }
526
527 #[test]
528 fn test_query_builder_insert() {
529 let mut data = HashMap::new();
530 data.insert("name".to_string(), "John Doe".to_string());
531 data.insert("email".to_string(), "john@example.com".to_string());
532
533 let query = QueryBuilder::new("users").build_insert(&data);
534 assert!(query.contains("INSERT INTO users"));
535 assert!(query.contains("name"));
536 assert!(query.contains("email"));
537 }
538
539 #[test]
540 fn test_query_builder_update() {
541 let mut data = HashMap::new();
542 data.insert("name".to_string(), "Jane Doe".to_string());
543
544 let query = QueryBuilder::new("users")
545 .where_eq("id", "1")
546 .build_update(&data);
547
548 assert!(query.contains("UPDATE users SET"));
549 assert!(query.contains("name = 'Jane Doe'"));
550 assert!(query.contains("WHERE id = '1'"));
551 }
552
553 #[test]
554 fn test_query_builder_delete() {
555 let query = QueryBuilder::new("users")
556 .where_eq("id", "1")
557 .build_delete();
558
559 assert!(query.contains("DELETE FROM users"));
560 assert!(query.contains("WHERE id = '1'"));
561 }
562}