1use crate::command_utils::mask_password;
34use colored::Colorize;
35use tabled::{settings::Style, Table, Tabled};
36
37pub async fn db_test_connection(url: &str, benchmark: bool) -> anyhow::Result<()> {
67 println!("{}", "=== Database Connection Test ===".bold().cyan());
68 println!();
69 println!("Database URL: {}", mask_password(url).cyan());
70 println!();
71
72 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
74 "PostgreSQL"
75 } else if url.starts_with("mysql://") {
76 "MySQL"
77 } else {
78 "Unknown"
79 };
80
81 println!("Database type: {}", db_type.yellow());
82 println!();
83
84 println!("Testing connection...");
86 let start = std::time::Instant::now();
87
88 if db_type == "PostgreSQL" {
90 match test_postgres_connection(url).await {
91 Ok(version) => {
92 let duration = start.elapsed();
93 println!("{}", "✓ Connection successful".green());
94 println!(" Version: {}", version.cyan());
95 println!(" Latency: {}ms", duration.as_millis().to_string().yellow());
96 }
97 Err(e) => {
98 println!("{}", "✗ Connection failed".red());
99 println!(" Error: {e}");
100 return Err(e);
101 }
102 }
103 } else if db_type == "MySQL" {
104 println!("{}", "⚠️ MySQL support not yet implemented".yellow());
105 return Ok(());
106 } else {
107 println!("{}", "⚠️ Unknown database type".yellow());
108 return Ok(());
109 }
110
111 if benchmark {
113 println!();
114 println!("{}", "Running latency benchmark...".bold());
115 println!();
116
117 let mut latencies = Vec::new();
118 for i in 1..=10 {
119 let start = std::time::Instant::now();
120 if let Err(e) = test_postgres_connection(url).await {
121 println!(" {} Query {} failed: {}", "✗".red(), i, e);
122 continue;
123 }
124 let duration = start.elapsed();
125 latencies.push(duration.as_millis());
126 println!(" {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
127 }
128
129 if !latencies.is_empty() {
130 let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
131 let min = latencies
132 .iter()
133 .min()
134 .expect("collection validated to be non-empty");
135 let max = latencies
136 .iter()
137 .max()
138 .expect("collection validated to be non-empty");
139
140 println!();
141 println!("Benchmark results:");
142 println!(" Average: {}ms", avg.to_string().cyan());
143 println!(" Min: {}ms", min.to_string().green());
144 println!(" Max: {}ms", max.to_string().yellow());
145 }
146 }
147
148 Ok(())
149}
150
151async fn test_postgres_connection(url: &str) -> anyhow::Result<String> {
153 use celers_broker_postgres::PostgresBroker;
154
155 let broker = PostgresBroker::new(url).await?;
157
158 let connected = broker.test_connection().await?;
160 if connected {
161 Ok("Connected".to_string())
162 } else {
163 anyhow::bail!("Connection test failed")
164 }
165}
166
167pub async fn db_health(url: &str) -> anyhow::Result<()> {
194 println!("{}", "=== Database Health Check ===".bold().cyan());
195 println!();
196
197 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
198 "PostgreSQL"
199 } else if url.starts_with("mysql://") {
200 "MySQL"
201 } else {
202 "Unknown"
203 };
204
205 println!("Database: {}", db_type.yellow());
206 println!();
207
208 if db_type == "PostgreSQL" {
209 check_postgres_health(url).await?;
210 } else {
211 println!(
212 "{}",
213 "⚠️ Health check not supported for this database type".yellow()
214 );
215 }
216
217 Ok(())
218}
219
220async fn check_postgres_health(url: &str) -> anyhow::Result<()> {
222 use celers_broker_postgres::PostgresBroker;
223
224 println!("Checking connection...");
225 let broker = PostgresBroker::new(url).await?;
226 println!("{}", " ✓ Connection OK".green());
227
228 println!();
229 println!("Testing connection with latency measurement...");
230
231 let mut latencies = Vec::new();
233 for i in 1..=5 {
234 let start = std::time::Instant::now();
235 let connected = broker.test_connection().await?;
236 let duration = start.elapsed();
237
238 if connected {
239 latencies.push(duration.as_millis());
240 println!(" {} Query {}: {}ms", "✓".green(), i, duration.as_millis());
241 } else {
242 println!(" {} Query {} failed", "✗".red(), i);
243 }
244 }
245
246 if !latencies.is_empty() {
247 let avg = latencies.iter().sum::<u128>() / latencies.len() as u128;
248 let min = latencies
249 .iter()
250 .min()
251 .expect("collection validated to be non-empty");
252 let max = latencies
253 .iter()
254 .max()
255 .expect("collection validated to be non-empty");
256
257 println!();
258 println!("Query Performance:");
259 println!(" Average: {}ms", avg.to_string().cyan());
260 println!(" Min: {}ms", min.to_string().green());
261 println!(" Max: {}ms", max.to_string().yellow());
262
263 if avg > 100 {
264 println!(" {}", "⚠️ High query latency detected".yellow());
265 } else {
266 println!(" {}", "✓ Query latency is healthy".green());
267 }
268 }
269
270 println!();
272 println!("Connection Pool Status:");
273 let pool_metrics = broker.get_pool_metrics();
274 println!(
275 " Max Connections: {}",
276 pool_metrics.max_size.to_string().cyan()
277 );
278 println!(" Active: {}", pool_metrics.size.to_string().cyan());
279 println!(" Idle: {}", pool_metrics.idle.to_string().cyan());
280 println!(" In-Use: {}", pool_metrics.in_use.to_string().cyan());
281
282 let utilization = if pool_metrics.max_size > 0 {
283 (f64::from(pool_metrics.in_use) / f64::from(pool_metrics.max_size)) * 100.0
284 } else {
285 0.0
286 };
287
288 if utilization > 80.0 {
289 println!(
290 " {}",
291 "⚠️ High pool utilization - consider scaling".yellow()
292 );
293 }
294
295 println!();
296 println!("{}", "✓ Database health check completed".green().bold());
297
298 Ok(())
299}
300
301pub async fn db_pool_stats(url: &str) -> anyhow::Result<()> {
335 println!("{}", "=== Connection Pool Statistics ===".bold().cyan());
336 println!();
337 println!("Database URL: {}", mask_password(url).cyan());
338 println!();
339
340 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
341 "PostgreSQL"
342 } else if url.starts_with("mysql://") {
343 "MySQL"
344 } else {
345 "Unknown"
346 };
347
348 if db_type == "PostgreSQL" {
349 use celers_broker_postgres::PostgresBroker;
350
351 println!("Connecting to database...");
352 let broker = PostgresBroker::new(url).await?;
353 println!("{}", " ✓ Connected".green());
354 println!();
355
356 let metrics = broker.get_pool_metrics();
357
358 #[derive(Tabled)]
359 struct PoolStat {
360 #[tabled(rename = "Metric")]
361 metric: String,
362 #[tabled(rename = "Value")]
363 value: String,
364 }
365
366 let stats = vec![
367 PoolStat {
368 metric: "Max Connections".to_string(),
369 value: metrics.max_size.to_string(),
370 },
371 PoolStat {
372 metric: "Active Connections".to_string(),
373 value: metrics.size.to_string(),
374 },
375 PoolStat {
376 metric: "Idle Connections".to_string(),
377 value: metrics.idle.to_string(),
378 },
379 PoolStat {
380 metric: "In-Use Connections".to_string(),
381 value: metrics.in_use.to_string(),
382 },
383 PoolStat {
384 metric: "Waiting Tasks".to_string(),
385 value: if metrics.waiting > 0 {
386 metrics.waiting.to_string()
387 } else {
388 "0 (estimated)".to_string()
389 },
390 },
391 ];
392
393 let table = Table::new(stats).with(Style::rounded()).to_string();
394 println!("{table}");
395 println!();
396
397 let utilization = if metrics.max_size > 0 {
399 (f64::from(metrics.in_use) / f64::from(metrics.max_size)) * 100.0
400 } else {
401 0.0
402 };
403
404 println!("Pool Utilization: {utilization:.1}%");
405 if utilization > 80.0 {
406 println!(
407 "{}",
408 "⚠️ High pool utilization - consider increasing max_connections".yellow()
409 );
410 } else if utilization < 20.0 && metrics.max_size > 10 {
411 println!(
412 "{}",
413 "ℹ️ Low pool utilization - consider reducing max_connections".cyan()
414 );
415 } else {
416 println!("{}", "✓ Pool utilization is healthy".green());
417 }
418 } else {
419 println!(
420 "{}",
421 "⚠️ Pool statistics only supported for PostgreSQL".yellow()
422 );
423 }
424
425 Ok(())
426}
427
428pub async fn db_migrate(url: &str, action: &str, steps: usize) -> anyhow::Result<()> {
464 println!("{}", "=== Database Migrations ===".bold().cyan());
465 println!();
466 println!("Database URL: {}", mask_password(url).cyan());
467 println!("Action: {}", action.yellow());
468 println!();
469
470 let db_type = if url.starts_with("postgres://") || url.starts_with("postgresql://") {
471 "PostgreSQL"
472 } else if url.starts_with("mysql://") {
473 "MySQL"
474 } else {
475 "Unknown"
476 };
477
478 if db_type == "PostgreSQL" {
479 use celers_broker_postgres::PostgresBroker;
480
481 match action.to_lowercase().as_str() {
482 "apply" => {
483 println!("Applying migrations...");
484 let broker = PostgresBroker::new(url).await?;
485 let _ = broker; println!("{}", " ✓ Schema initialized".green());
488 println!();
489 println!("{}", "✓ Migrations applied successfully".green().bold());
490 }
491 "rollback" => {
492 println!("Rolling back {steps} migration(s)...");
493 println!();
494 println!(
495 "{}",
496 "⚠️ Manual rollback required - use SQL scripts".yellow()
497 );
498 println!(" CeleRS uses auto-migration with SQLx");
499 println!(" To rollback, restore from database backup");
500 }
501 "status" => {
502 println!("Checking migration status...");
503 let broker = PostgresBroker::new(url).await?;
504 let connected = broker.test_connection().await?;
505
506 println!();
507 if connected {
508 println!("{}", " ✓ Database schema is up-to-date".green());
509 println!(" Tables: celers_tasks, celers_results");
510 } else {
511 println!("{}", " ✗ Cannot connect to database".red());
512 }
513 }
514 _ => {
515 anyhow::bail!("Unknown action '{action}'. Valid actions: apply, rollback, status");
516 }
517 }
518 } else {
519 println!(
520 "{}",
521 "⚠️ Migrations only supported for PostgreSQL".yellow()
522 );
523 }
524
525 Ok(())
526}
527
528#[cfg(test)]
529mod tests {
530 #[test]
531 fn test_database_type_detection() {
532 assert!(matches!(
534 "postgres://localhost/db",
535 url if url.starts_with("postgres://")
536 ));
537 assert!(matches!(
538 "postgresql://localhost/db",
539 url if url.starts_with("postgresql://")
540 ));
541
542 assert!(matches!(
544 "mysql://localhost/db",
545 url if url.starts_with("mysql://")
546 ));
547 }
548
549 #[test]
550 fn test_migration_action_validation() {
551 let valid_actions = vec!["apply", "rollback", "status"];
552 for action in valid_actions {
553 assert!(
554 matches!(action, "apply" | "rollback" | "status"),
555 "Action {action} should be valid"
556 );
557 }
558
559 let invalid_action = "invalid";
560 assert!(
561 !matches!(invalid_action, "apply" | "rollback" | "status"),
562 "Action {invalid_action} should be invalid"
563 );
564 }
565
566 #[test]
567 fn test_pool_utilization_calculation() {
568 let in_use = 85_u32;
570 let max_size = 100_u32;
571 let utilization = (f64::from(in_use) / f64::from(max_size)) * 100.0;
572 assert!(utilization > 80.0);
573
574 let in_use = 15_u32;
576 let utilization = (f64::from(in_use) / f64::from(max_size)) * 100.0;
577 assert!(utilization < 20.0);
578
579 let max_size = 0_u32;
581 let utilization = if max_size > 0 {
582 (f64::from(in_use) / f64::from(max_size)) * 100.0
583 } else {
584 0.0
585 };
586 assert_eq!(utilization, 0.0);
587 }
588}