celers_cli/
database.rs

1//! Database operations for CeleRS CLI.
2//!
3//! This module provides commands for managing database connections, health checks,
4//! and migrations. Currently supports PostgreSQL with planned support for MySQL.
5//!
6//! # Features
7//!
8//! - Connection testing with latency benchmarks
9//! - Health monitoring and diagnostics
10//! - Connection pool statistics
11//! - Schema migrations (auto-migration via SQLx)
12//!
13//! # Examples
14//!
15//! ```no_run
16//! use celers_cli::database;
17//!
18//! #[tokio::main]
19//! async fn main() -> anyhow::Result<()> {
20//!     // Test database connection
21//!     database::db_test_connection(
22//!         "postgresql://user:pass@localhost/celers",
23//!         false
24//!     ).await?;
25//!
26//!     // Check database health
27//!     database::db_health("postgresql://user:pass@localhost/celers").await?;
28//!
29//!     Ok(())
30//! }
31//! ```
32
33use crate::command_utils::mask_password;
34use colored::Colorize;
35use tabled::{settings::Style, Table, Tabled};
36
37/// Test database connection with optional benchmarking.
38///
39/// Connects to the database and verifies connectivity. If benchmark mode is enabled,
40/// runs 10 test queries and reports latency statistics.
41///
42/// # Arguments
43///
44/// * `url` - Database connection URL
45/// * `benchmark` - Whether to run latency benchmark (10 queries)
46///
47/// # Supported Databases
48///
49/// - PostgreSQL (fully supported)
50/// - MySQL (planned)
51///
52/// # Examples
53///
54/// ```no_run
55/// # use celers_cli::database::db_test_connection;
56/// # #[tokio::main]
57/// # async fn main() -> anyhow::Result<()> {
58/// // Simple connection test
59/// db_test_connection("postgresql://localhost/celers", false).await?;
60///
61/// // With latency benchmark
62/// db_test_connection("postgresql://localhost/celers", true).await?;
63/// # Ok(())
64/// # }
65/// ```
66pub async fn db_test_connection(url: &str, benchmark: bool) -> anyhow::Result<()> {
67    println!("{}", "=== Database Connection Test ===".bold().cyan());
68    println!();
69    println!("Database URL: {}", mask_password(url).cyan());
70    println!();
71
72    // Determine database type from URL
73    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
74        "PostgreSQL"
75    } else if url.starts_with("mysql://") {
76        "MySQL"
77    } else {
78        "Unknown"
79    };
80
81    println!("Database type: {}", db_type.yellow());
82    println!();
83
84    // Test connection
85    println!("Testing connection...");
86    let start = std::time::Instant::now();
87
88    // For PostgreSQL
89    if db_type == "PostgreSQL" {
90        match test_postgres_connection(url).await {
91            Ok(version) => {
92                let duration = start.elapsed();
93                println!("{}", "✓ Connection successful".green());
94                println!("  Version: {}", version.cyan());
95                println!("  Latency: {}ms", duration.as_millis().to_string().yellow());
96            }
97            Err(e) => {
98                println!("{}", "✗ Connection failed".red());
99                println!("  Error: {e}");
100                return Err(e);
101            }
102        }
103    } else if db_type == "MySQL" {
104        println!("{}", "⚠️  MySQL support not yet implemented".yellow());
105        return Ok(());
106    } else {
107        println!("{}", "⚠️  Unknown database type".yellow());
108        return Ok(());
109    }
110
111    // Run benchmark if requested
112    if benchmark {
113        println!();
114        println!("{}", "Running latency benchmark...".bold());
115        println!();
116
117        let mut latencies = Vec::new();
118        for i in 1..=10 {
119            let start = std::time::Instant::now();
120            if let Err(e) = test_postgres_connection(url).await {
121                println!("  {} Query {} failed: {}", "✗".red(), i, e);
122                continue;
123            }
124            let duration = start.elapsed();
125            latencies.push(duration.as_millis());
126            println!("  {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
127        }
128
129        if !latencies.is_empty() {
130            let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
131            let min = latencies
132                .iter()
133                .min()
134                .expect("collection validated to be non-empty");
135            let max = latencies
136                .iter()
137                .max()
138                .expect("collection validated to be non-empty");
139
140            println!();
141            println!("Benchmark results:");
142            println!("  Average: {}ms", avg.to_string().cyan());
143            println!("  Min: {}ms", min.to_string().green());
144            println!("  Max: {}ms", max.to_string().yellow());
145        }
146    }
147
148    Ok(())
149}
150
151/// Test PostgreSQL connection (internal helper).
152async fn test_postgres_connection(url: &str) -> anyhow::Result<String> {
153    use celers_broker_postgres::PostgresBroker;
154
155    // Create a temporary broker to test connection
156    let broker = PostgresBroker::new(url).await?;
157
158    // Test connection and return a version string
159    let connected = broker.test_connection().await?;
160    if connected {
161        Ok("Connected".to_string())
162    } else {
163        anyhow::bail!("Connection test failed")
164    }
165}
166
167/// Check database health with comprehensive diagnostics.
168///
169/// Performs multiple health checks including connection verification,
170/// query latency measurement, and connection pool status.
171///
172/// # Arguments
173///
174/// * `url` - Database connection URL
175///
176/// # Health Checks
177///
178/// - Connection status
179/// - Query latency (5 test queries)
180/// - Connection pool utilization
181/// - Performance warnings (high latency > 100ms, high pool usage > 80%)
182///
183/// # Examples
184///
185/// ```no_run
186/// # use celers_cli::database::db_health;
187/// # #[tokio::main]
188/// # async fn main() -> anyhow::Result<()> {
189/// db_health("postgresql://localhost/celers").await?;
190/// # Ok(())
191/// # }
192/// ```
193pub async fn db_health(url: &str) -> anyhow::Result<()> {
194    println!("{}", "=== Database Health Check ===".bold().cyan());
195    println!();
196
197    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
198        "PostgreSQL"
199    } else if url.starts_with("mysql://") {
200        "MySQL"
201    } else {
202        "Unknown"
203    };
204
205    println!("Database: {}", db_type.yellow());
206    println!();
207
208    if db_type == "PostgreSQL" {
209        check_postgres_health(url).await?;
210    } else {
211        println!(
212            "{}",
213            "⚠️  Health check not supported for this database type".yellow()
214        );
215    }
216
217    Ok(())
218}
219
220/// Check PostgreSQL health (internal helper).
221async fn check_postgres_health(url: &str) -> anyhow::Result<()> {
222    use celers_broker_postgres::PostgresBroker;
223
224    println!("Checking connection...");
225    let broker = PostgresBroker::new(url).await?;
226    println!("{}", "  ✓ Connection OK".green());
227
228    println!();
229    println!("Testing connection with latency measurement...");
230
231    // Measure query performance
232    let mut latencies = Vec::new();
233    for i in 1..=5 {
234        let start = std::time::Instant::now();
235        let connected = broker.test_connection().await?;
236        let duration = start.elapsed();
237
238        if connected {
239            latencies.push(duration.as_millis());
240            println!("  {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
241        } else {
242            println!("  {} Query {} failed", "✗".red(), i);
243        }
244    }
245
246    if !latencies.is_empty() {
247        let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
248        let min = latencies
249            .iter()
250            .min()
251            .expect("collection validated to be non-empty");
252        let max = latencies
253            .iter()
254            .max()
255            .expect("collection validated to be non-empty");
256
257        println!();
258        println!("Query Performance:");
259        println!("  Average: {}ms", avg.to_string().cyan());
260        println!("  Min: {}ms", min.to_string().green());
261        println!("  Max: {}ms", max.to_string().yellow());
262
263        if avg > 100 {
264            println!("  {}", "⚠️  High query latency detected".yellow());
265        } else {
266            println!("  {}", "✓ Query latency is healthy".green());
267        }
268    }
269
270    // Check pool metrics
271    println!();
272    println!("Connection Pool Status:");
273    let pool_metrics = broker.get_pool_metrics();
274    println!(
275        "  Max Connections: {}",
276        pool_metrics.max_size.to_string().cyan()
277    );
278    println!("  Active: {}", pool_metrics.size.to_string().cyan());
279    println!("  Idle: {}", pool_metrics.idle.to_string().cyan());
280    println!("  In-Use: {}", pool_metrics.in_use.to_string().cyan());
281
282    let utilization = if pool_metrics.max_size > 0 {
283        (f64::from(pool_metrics.in_use) / f64::from(pool_metrics.max_size)) * 100.0
284    } else {
285        0.0
286    };
287
288    if utilization > 80.0 {
289        println!(
290            "  {}",
291            "⚠️  High pool utilization - consider scaling".yellow()
292        );
293    }
294
295    println!();
296    println!("{}", "✓ Database health check completed".green().bold());
297
298    Ok(())
299}
300
301/// Show connection pool statistics.
302///
303/// Displays detailed metrics about the database connection pool including
304/// active, idle, and in-use connections with utilization percentage.
305///
306/// # Arguments
307///
308/// * `url` - Database connection URL
309///
310/// # Metrics
311///
312/// - Max connections (pool size)
313/// - Active connections
314/// - Idle connections
315/// - In-use connections
316/// - Waiting tasks (if any)
317/// - Pool utilization percentage
318///
319/// # Recommendations
320///
321/// - High utilization (>80%): Consider increasing max_connections
322/// - Low utilization (<20%): Consider reducing max_connections
323///
324/// # Examples
325///
326/// ```no_run
327/// # use celers_cli::database::db_pool_stats;
328/// # #[tokio::main]
329/// # async fn main() -> anyhow::Result<()> {
330/// db_pool_stats("postgresql://localhost/celers").await?;
331/// # Ok(())
332/// # }
333/// ```
334pub async fn db_pool_stats(url: &str) -> anyhow::Result<()> {
335    println!("{}", "=== Connection Pool Statistics ===".bold().cyan());
336    println!();
337    println!("Database URL: {}", mask_password(url).cyan());
338    println!();
339
340    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
341        "PostgreSQL"
342    } else if url.starts_with("mysql://") {
343        "MySQL"
344    } else {
345        "Unknown"
346    };
347
348    if db_type == "PostgreSQL" {
349        use celers_broker_postgres::PostgresBroker;
350
351        println!("Connecting to database...");
352        let broker = PostgresBroker::new(url).await?;
353        println!("{}", "  ✓ Connected".green());
354        println!();
355
356        let metrics = broker.get_pool_metrics();
357
358        #[derive(Tabled)]
359        struct PoolStat {
360            #[tabled(rename = "Metric")]
361            metric: String,
362            #[tabled(rename = "Value")]
363            value: String,
364        }
365
366        let stats = vec![
367            PoolStat {
368                metric: "Max Connections".to_string(),
369                value: metrics.max_size.to_string(),
370            },
371            PoolStat {
372                metric: "Active Connections".to_string(),
373                value: metrics.size.to_string(),
374            },
375            PoolStat {
376                metric: "Idle Connections".to_string(),
377                value: metrics.idle.to_string(),
378            },
379            PoolStat {
380                metric: "In-Use Connections".to_string(),
381                value: metrics.in_use.to_string(),
382            },
383            PoolStat {
384                metric: "Waiting Tasks".to_string(),
385                value: if metrics.waiting > 0 {
386                    metrics.waiting.to_string()
387                } else {
388                    "0 (estimated)".to_string()
389                },
390            },
391        ];
392
393        let table = Table::new(stats).with(Style::rounded()).to_string();
394        println!("{table}");
395        println!();
396
397        // Pool utilization
398        let utilization = if metrics.max_size > 0 {
399            (f64::from(metrics.in_use) / f64::from(metrics.max_size)) * 100.0
400        } else {
401            0.0
402        };
403
404        println!("Pool Utilization: {utilization:.1}%");
405        if utilization > 80.0 {
406            println!(
407                "{}",
408                "⚠️  High pool utilization - consider increasing max_connections".yellow()
409            );
410        } else if utilization < 20.0 && metrics.max_size > 10 {
411            println!(
412                "{}",
413                "ℹ️  Low pool utilization - consider reducing max_connections".cyan()
414            );
415        } else {
416            println!("{}", "✓ Pool utilization is healthy".green());
417        }
418    } else {
419        println!(
420            "{}",
421            "⚠️  Pool statistics only supported for PostgreSQL".yellow()
422        );
423    }
424
425    Ok(())
426}
427
428/// Run database migrations.
429///
430/// Manages database schema migrations using SQLx auto-migration.
431/// Supports applying migrations, checking status, and rollback guidance.
432///
433/// # Arguments
434///
435/// * `url` - Database connection URL
436/// * `action` - Migration action: "apply", "rollback", or "status"
437/// * `steps` - Number of steps for rollback (currently requires manual rollback)
438///
439/// # Actions
440///
441/// - **apply**: Initialize schema and apply migrations
442/// - **status**: Check current schema state
443/// - **rollback**: Show manual rollback instructions
444///
445/// # Note
446///
447/// CeleRS uses SQLx auto-migration. For rollbacks, restore from database backup.
448///
449/// # Examples
450///
451/// ```no_run
452/// # use celers_cli::database::db_migrate;
453/// # #[tokio::main]
454/// # async fn main() -> anyhow::Result<()> {
455/// // Apply migrations
456/// db_migrate("postgresql://localhost/celers", "apply", 0).await?;
457///
458/// // Check status
459/// db_migrate("postgresql://localhost/celers", "status", 0).await?;
460/// # Ok(())
461/// # }
462/// ```
463pub async fn db_migrate(url: &str, action: &str, steps: usize) -> anyhow::Result<()> {
464    println!("{}", "=== Database Migrations ===".bold().cyan());
465    println!();
466    println!("Database URL: {}", mask_password(url).cyan());
467    println!("Action: {}", action.yellow());
468    println!();
469
470    let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
471        "PostgreSQL"
472    } else if url.starts_with("mysql://") {
473        "MySQL"
474    } else {
475        "Unknown"
476    };
477
478    if db_type == "PostgreSQL" {
479        use celers_broker_postgres::PostgresBroker;
480
481        match action.to_lowercase().as_str() {
482            "apply" => {
483                println!("Applying migrations...");
484                let broker = PostgresBroker::new(url).await?;
485                let _ = broker; // Use broker to ensure it connects
486
487                println!("{}", "  ✓ Schema initialized".green());
488                println!();
489                println!("{}", "✓ Migrations applied successfully".green().bold());
490            }
491            "rollback" => {
492                println!("Rolling back {steps} migration(s)...");
493                println!();
494                println!(
495                    "{}",
496                    "⚠️  Manual rollback required - use SQL scripts".yellow()
497                );
498                println!("  CeleRS uses auto-migration with SQLx");
499                println!("  To rollback, restore from database backup");
500            }
501            "status" => {
502                println!("Checking migration status...");
503                let broker = PostgresBroker::new(url).await?;
504                let connected = broker.test_connection().await?;
505
506                println!();
507                if connected {
508                    println!("{}", "  ✓ Database schema is up-to-date".green());
509                    println!("  Tables: celers_tasks, celers_results");
510                } else {
511                    println!("{}", "  ✗ Cannot connect to database".red());
512                }
513            }
514            _ => {
515                anyhow::bail!("Unknown action '{action}'. Valid actions: apply, rollback, status");
516            }
517        }
518    } else {
519        println!(
520            "{}",
521            "⚠️  Migrations only supported for PostgreSQL".yellow()
522        );
523    }
524
525    Ok(())
526}
527
528#[cfg(test)]
529mod tests {
530    #[test]
531    fn test_database_type_detection() {
532        // PostgreSQL variants
533        assert!(matches!(
534            "postgres://localhost/db",
535            url if url.starts_with("postgres://")
536        ));
537        assert!(matches!(
538            "postgresql://localhost/db",
539            url if url.starts_with("postgresql://")
540        ));
541
542        // MySQL
543        assert!(matches!(
544            "mysql://localhost/db",
545            url if url.starts_with("mysql://")
546        ));
547    }
548
549    #[test]
550    fn test_migration_action_validation() {
551        let valid_actions = vec!["apply", "rollback", "status"];
552        for action in valid_actions {
553            assert!(
554                matches!(action, "apply" | "rollback" | "status"),
555                "Action {action} should be valid"
556            );
557        }
558
559        let invalid_action = "invalid";
560        assert!(
561            !matches!(invalid_action, "apply" | "rollback" | "status"),
562            "Action {invalid_action} should be invalid"
563        );
564    }
565
566    #[test]
567    fn test_pool_utilization_calculation() {
568        // High utilization
569        let in_use = 85_u32;
570        let max_size = 100_u32;
571        let utilization = (f64::from(in_use) / f64::from(max_size)) * 100.0;
572        assert!(utilization > 80.0);
573
574        // Low utilization
575        let in_use = 15_u32;
576        let utilization = (f64::from(in_use) / f64::from(max_size)) * 100.0;
577        assert!(utilization < 20.0);
578
579        // Zero max_size edge case
580        let max_size = 0_u32;
581        let utilization = if max_size > 0 {
582            (f64::from(in_use) / f64::from(max_size)) * 100.0
583        } else {
584            0.0
585        };
586        assert_eq!(utilization, 0.0);
587    }
588}