Skip to main content

celers_broker_sql/
utilities.rs

1//! MySQL broker utility functions
2//!
3//! This module provides utility functions for MySQL broker operations,
4//! including batch optimization, memory estimation, and performance tuning.
5//!
6//! # Examples
7//!
8//! ```
9//! use celers_broker_sql::utilities::*;
10//!
11//! // Calculate optimal batch size
12//! let batch_size = calculate_optimal_mysql_batch_size(1000, 1024, 100);
13//! println!("Recommended batch size: {}", batch_size);
14//!
15//! // Estimate memory usage
16//! let memory = estimate_mysql_queue_memory(1000, 1024);
17//! println!("Estimated memory: {} bytes", memory);
18//!
19//! // Calculate optimal pool size
20//! let pool_size = calculate_optimal_mysql_pool_size(100, 50);
21//! println!("Recommended pool size: {}", pool_size);
22//! ```
23
24use std::collections::HashMap;
25
26/// Calculate optimal batch size for MySQL operations
27///
28/// # Arguments
29///
30/// * `queue_size` - Current queue size
31/// * `avg_message_size` - Average message size in bytes
32/// * `target_latency_ms` - Target latency in milliseconds
33///
34/// # Returns
35///
36/// Recommended batch size
37///
38/// # Examples
39///
40/// ```
41/// use celers_broker_sql::utilities::calculate_optimal_mysql_batch_size;
42///
43/// let batch_size = calculate_optimal_mysql_batch_size(1000, 1024, 100);
44/// assert!(batch_size > 0);
45/// assert!(batch_size <= 1000);
46/// ```
47pub fn calculate_optimal_mysql_batch_size(
48    queue_size: usize,
49    avg_message_size: usize,
50    target_latency_ms: u64,
51) -> usize {
52    // MySQL optimal batch size formula:
53    // - Smaller batches for large messages (MEDIUMBLOB considerations)
54    // - Larger batches for small messages
55    // - Consider transaction overhead and max_allowed_packet
56
57    let base_batch_size = if avg_message_size > 100_000 {
58        // Large messages (> 100KB): small batches
59        10
60    } else if avg_message_size > 10_000 {
61        // Medium messages (10-100KB): medium batches
62        50
63    } else {
64        // Small messages (< 10KB): large batches
65        100
66    };
67
68    // Adjust for latency requirements
69    let latency_factor = if target_latency_ms < 50 {
70        0.5
71    } else if target_latency_ms < 100 {
72        1.0
73    } else {
74        1.5
75    };
76
77    let batch_size = (base_batch_size as f64 * latency_factor) as usize;
78
79    // Never exceed queue size or reasonable max (1000)
80    batch_size.min(queue_size).clamp(1, 1000)
81}
82
83/// Estimate MySQL memory usage for queue
84///
85/// # Arguments
86///
87/// * `queue_size` - Number of messages in queue
88/// * `avg_message_size` - Average message size in bytes
89///
90/// # Returns
91///
92/// Estimated memory usage in bytes
93///
94/// # Examples
95///
96/// ```
97/// use celers_broker_sql::utilities::estimate_mysql_queue_memory;
98///
99/// let memory = estimate_mysql_queue_memory(1000, 1024);
100/// assert!(memory > 0);
101/// ```
102pub fn estimate_mysql_queue_memory(queue_size: usize, avg_message_size: usize) -> usize {
103    // MySQL InnoDB overhead per row (approximate)
104    // - Row header: ~20 bytes
105    // - CHAR(36) for UUID: 36 bytes
106    // - Timestamps (4x): ~16 bytes
107    // - Index overhead: ~40 bytes per index (4 indexes)
108    // - Page overhead: ~16KB page, amortized
109    let overhead_per_row = 20 + 36 + 16 + (40 * 4) + 2; // Amortized page overhead
110
111    queue_size * (avg_message_size + overhead_per_row)
112}
113
114/// Calculate optimal number of MySQL connections for pool
115///
116/// # Arguments
117///
118/// * `expected_concurrency` - Expected concurrent operations
119/// * `avg_operation_duration_ms` - Average operation duration in ms
120///
121/// # Returns
122///
123/// Recommended pool size
124///
125/// # Examples
126///
127/// ```
128/// use celers_broker_sql::utilities::calculate_optimal_mysql_pool_size;
129///
130/// let pool_size = calculate_optimal_mysql_pool_size(100, 50);
131/// assert!(pool_size > 0);
132/// ```
133pub fn calculate_optimal_mysql_pool_size(
134    expected_concurrency: usize,
135    avg_operation_duration_ms: u64,
136) -> usize {
137    // Rule of thumb: Pool size should handle expected concurrency
138    // with some buffer for spikes
139    // MySQL has connection overhead, but less than PostgreSQL
140
141    let base_size = expected_concurrency;
142
143    // Add buffer based on operation duration
144    let buffer = if avg_operation_duration_ms > 100 {
145        (expected_concurrency as f64 * 0.5) as usize
146    } else if avg_operation_duration_ms > 50 {
147        (expected_concurrency as f64 * 0.3) as usize
148    } else {
149        (expected_concurrency as f64 * 0.2) as usize
150    };
151
152    // MySQL max_connections default is 151
153    // Keep pool size reasonable
154    (base_size + buffer).clamp(5, 200)
155}
156
157/// Estimate time to drain queue
158///
159/// # Arguments
160///
161/// * `queue_size` - Current queue size
162/// * `processing_rate` - Processing rate (messages per second)
163///
164/// # Returns
165///
166/// Estimated drain time in seconds
167///
168/// # Examples
169///
170/// ```
171/// use celers_broker_sql::utilities::estimate_mysql_queue_drain_time;
172///
173/// let drain_time = estimate_mysql_queue_drain_time(1000, 50.0);
174/// assert_eq!(drain_time, 20.0);
175/// ```
176pub fn estimate_mysql_queue_drain_time(queue_size: usize, processing_rate: f64) -> f64 {
177    if processing_rate > 0.0 {
178        queue_size as f64 / processing_rate
179    } else {
180        f64::INFINITY
181    }
182}
183
184/// Suggest MySQL query optimization strategy
185///
186/// # Arguments
187///
188/// * `operation_count` - Number of operations
189/// * `operation_type` - Type of operation ("read" or "write")
190///
191/// # Returns
192///
193/// Optimization strategy as string
194///
195/// # Examples
196///
197/// ```
198/// use celers_broker_sql::utilities::suggest_mysql_query_strategy;
199///
200/// let strategy = suggest_mysql_query_strategy(100, "write");
201/// assert!(!strategy.is_empty());
202/// ```
203pub fn suggest_mysql_query_strategy(operation_count: usize, operation_type: &str) -> String {
204    if operation_count < 10 {
205        "Execute operations individually - transaction overhead minimal".to_string()
206    } else if operation_count < 100 {
207        format!(
208            "Use single transaction with {} {} operations",
209            operation_count, operation_type
210        )
211    } else {
212        let chunk_size = if operation_type == "write" { 500 } else { 1000 };
213        format!(
214            "Use chunked transactions of {} operations each (total: {} chunks) with bulk INSERT for better performance",
215            chunk_size,
216            operation_count.div_ceil(chunk_size)
217        )
218    }
219}
220
221/// Suggest MySQL OPTIMIZE TABLE strategy
222///
223/// # Arguments
224///
225/// * `table_fragmentation_percent` - Table fragmentation percentage (0-100)
226/// * `table_size_mb` - Table size in megabytes
227///
228/// # Returns
229///
230/// Recommended OPTIMIZE strategy
231///
232/// # Examples
233///
234/// ```
235/// use celers_broker_sql::utilities::suggest_mysql_optimize_strategy;
236///
237/// let strategy = suggest_mysql_optimize_strategy(25.0, 100.0);
238/// assert!(strategy.contains("OPTIMIZE"));
239/// ```
240pub fn suggest_mysql_optimize_strategy(
241    table_fragmentation_percent: f64,
242    table_size_mb: f64,
243) -> String {
244    if table_fragmentation_percent > 50.0 {
245        "OPTIMIZE TABLE recommended - high fragmentation detected (will lock table)".to_string()
246    } else if table_fragmentation_percent > 20.0 && table_size_mb > 1000.0 {
247        "OPTIMIZE TABLE recommended - moderate fragmentation on large table (schedule during off-peak)".to_string()
248    } else if table_fragmentation_percent > 10.0 {
249        "OPTIMIZE TABLE recommended - low to moderate fragmentation".to_string()
250    } else {
251        "ANALYZE TABLE only - fragmentation is acceptable, update statistics".to_string()
252    }
253}
254
255/// Suggest MySQL index strategy
256///
257/// # Arguments
258///
259/// * `index_scan_count` - Number of index scans
260/// * `full_scan_count` - Number of full table scans
261/// * `table_rows` - Number of rows in table
262///
263/// # Returns
264///
265/// Index recommendation
266///
267/// # Examples
268///
269/// ```
270/// use celers_broker_sql::utilities::suggest_mysql_index_strategy;
271///
272/// let recommendation = suggest_mysql_index_strategy(100, 10000, 1000000);
273/// assert!(!recommendation.is_empty());
274/// ```
275pub fn suggest_mysql_index_strategy(
276    index_scan_count: u64,
277    full_scan_count: u64,
278    table_rows: usize,
279) -> String {
280    let total_scans = index_scan_count + full_scan_count;
281    if total_scans == 0 {
282        return "No query activity detected".to_string();
283    }
284
285    let full_scan_ratio = full_scan_count as f64 / total_scans as f64;
286
287    if full_scan_ratio > 0.5 && table_rows > 100_000 {
288        "High full table scan ratio on large table - consider adding indexes".to_string()
289    } else if full_scan_ratio > 0.2 && table_rows > 1_000_000 {
290        "Moderate full table scan ratio - review query patterns and consider selective indexes"
291            .to_string()
292    } else if index_scan_count > 0 && full_scan_ratio < 0.1 {
293        "Good index usage - indexes are effective".to_string()
294    } else {
295        "Balanced scan pattern - current indexes appear adequate".to_string()
296    }
297}
298
299/// Analyze MySQL query performance
300///
301/// # Arguments
302///
303/// * `query_latencies` - Map of query type to latency in ms
304///
305/// # Returns
306///
307/// Performance analysis
308///
309/// # Examples
310///
311/// ```
312/// use celers_broker_sql::utilities::analyze_mysql_query_performance;
313/// use std::collections::HashMap;
314///
315/// let mut latencies = HashMap::new();
316/// latencies.insert("enqueue".to_string(), 5.0);
317/// latencies.insert("dequeue".to_string(), 10.0);
318/// latencies.insert("ack".to_string(), 3.0);
319///
320/// let analysis = analyze_mysql_query_performance(&latencies);
321/// assert!(analysis.contains_key("slowest_query"));
322/// ```
323pub fn analyze_mysql_query_performance(
324    query_latencies: &HashMap<String, f64>,
325) -> HashMap<String, String> {
326    let mut analysis = HashMap::new();
327
328    if query_latencies.is_empty() {
329        analysis.insert("status".to_string(), "no_data".to_string());
330        return analysis;
331    }
332
333    // Find slowest query
334    let (slowest_query, max_latency) = query_latencies
335        .iter()
336        .max_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
337        .expect("collection validated to be non-empty");
338
339    analysis.insert("slowest_query".to_string(), slowest_query.clone());
340    analysis.insert("max_latency_ms".to_string(), format!("{:.2}", max_latency));
341
342    // Calculate average
343    let avg_latency: f64 = query_latencies.values().sum::<f64>() / query_latencies.len() as f64;
344    analysis.insert("avg_latency_ms".to_string(), format!("{:.2}", avg_latency));
345
346    // Performance status
347    let status = if avg_latency < 5.0 {
348        "excellent"
349    } else if avg_latency < 10.0 {
350        "good"
351    } else if avg_latency < 20.0 {
352        "acceptable"
353    } else {
354        "poor"
355    };
356    analysis.insert("overall_status".to_string(), status.to_string());
357
358    analysis
359}
360
361/// Suggest MySQL InnoDB buffer pool tuning
362///
363/// # Arguments
364///
365/// * `throughput_msg_per_sec` - Message throughput
366/// * `table_size_gb` - Total table size in GB
367///
368/// # Returns
369///
370/// Recommended InnoDB buffer pool configuration
371///
372/// # Examples
373///
374/// ```
375/// use celers_broker_sql::utilities::suggest_mysql_innodb_tuning;
376///
377/// let config = suggest_mysql_innodb_tuning(500.0, 10.0);
378/// assert!(config.contains("innodb_buffer_pool_size"));
379/// ```
380pub fn suggest_mysql_innodb_tuning(throughput_msg_per_sec: f64, table_size_gb: f64) -> String {
381    if table_size_gb > 50.0 && throughput_msg_per_sec > 1000.0 {
382        "High load: innodb_buffer_pool_size=70% of RAM, innodb_flush_log_at_trx_commit=2 (performance mode)".to_string()
383    } else if table_size_gb > 10.0 && throughput_msg_per_sec > 500.0 {
384        "Moderate load: innodb_buffer_pool_size=60% of RAM, innodb_flush_log_at_trx_commit=1 (balanced)".to_string()
385    } else if throughput_msg_per_sec > 100.0 {
386        "Standard load: innodb_buffer_pool_size=50% of RAM, innodb_flush_log_at_trx_commit=1 (standard)".to_string()
387    } else {
388        "Low load: innodb_buffer_pool_size=40% of RAM, innodb_flush_log_at_trx_commit=1 (conservative)".to_string()
389    }
390}
391
392/// Calculate optimal MySQL timeout values
393///
394/// # Arguments
395///
396/// * `avg_operation_ms` - Average operation duration in ms
397/// * `p99_operation_ms` - 99th percentile operation duration in ms
398///
399/// # Returns
400///
401/// (connect_timeout, wait_timeout) in seconds
402///
403/// # Examples
404///
405/// ```
406/// use celers_broker_sql::utilities::calculate_mysql_timeout_values;
407///
408/// let (conn_timeout, wait_timeout) = calculate_mysql_timeout_values(50.0, 200.0);
409/// assert!(conn_timeout > 0);
410/// assert!(wait_timeout > 0);
411/// ```
412pub fn calculate_mysql_timeout_values(avg_operation_ms: f64, p99_operation_ms: f64) -> (u64, u64) {
413    // Connection timeout: 3x average operation time, min 5 seconds
414    let connect_timeout = ((avg_operation_ms * 3.0 / 1000.0) as u64).max(5);
415
416    // Wait timeout: 2x p99 latency, min 60 seconds, max 28800 (8 hours)
417    let wait_timeout = ((p99_operation_ms * 2.0 / 1000.0) as u64).clamp(60, 28800);
418
419    (connect_timeout, wait_timeout)
420}
421
422/// Suggest MySQL sort_buffer_size setting
423///
424/// # Arguments
425///
426/// * `avg_sort_size_mb` - Average sort operation size in MB
427/// * `concurrent_workers` - Expected concurrent workers
428/// * `total_ram_gb` - Total available RAM in GB
429///
430/// # Returns
431///
432/// Recommended sort_buffer_size in MB
433///
434/// # Examples
435///
436/// ```
437/// use celers_broker_sql::utilities::suggest_mysql_sort_buffer_size;
438///
439/// let sort_buffer = suggest_mysql_sort_buffer_size(10.0, 20, 16.0);
440/// assert!(sort_buffer > 0);
441/// ```
442pub fn suggest_mysql_sort_buffer_size(
443    avg_sort_size_mb: f64,
444    concurrent_workers: usize,
445    total_ram_gb: f64,
446) -> usize {
447    // Rule of thumb: sort_buffer_size should accommodate average sort
448    // but not consume too much memory per connection
449
450    let ram_based = ((total_ram_gb * 1024.0 * 0.1) / concurrent_workers as f64) as usize;
451    let sort_based = (avg_sort_size_mb * 1.2) as usize;
452
453    // Use the larger of the two, but cap at reasonable limits
454    // MySQL default is 256KB, max recommended is ~16MB per connection
455    ram_based.max(sort_based).clamp(1, 16)
456}
457
458/// Estimate MySQL InnoDB buffer pool recommendation
459///
460/// # Arguments
461///
462/// * `total_ram_gb` - Total available RAM in GB
463/// * `database_size_gb` - Total database size in GB
464///
465/// # Returns
466///
467/// Recommended InnoDB buffer pool size in MB
468///
469/// # Examples
470///
471/// ```
472/// use celers_broker_sql::utilities::suggest_mysql_innodb_buffer_pool_size;
473///
474/// let buffer_pool = suggest_mysql_innodb_buffer_pool_size(32.0, 10.0);
475/// assert!(buffer_pool > 0);
476/// ```
477pub fn suggest_mysql_innodb_buffer_pool_size(total_ram_gb: f64, database_size_gb: f64) -> usize {
478    // Rule of thumb for dedicated MySQL server:
479    // - 70-80% of RAM for large databases
480    // - 50-70% of RAM for medium databases
481    // - Consider database size
482
483    let ram_based = if total_ram_gb >= 64.0 {
484        (total_ram_gb * 1024.0 * 0.75) as usize
485    } else if total_ram_gb >= 32.0 {
486        (total_ram_gb * 1024.0 * 0.70) as usize
487    } else if total_ram_gb >= 16.0 {
488        (total_ram_gb * 1024.0 * 0.60) as usize
489    } else {
490        (total_ram_gb * 1024.0 * 0.50) as usize
491    };
492
493    let db_based = (database_size_gb * 1024.0 * 1.2) as usize;
494
495    // Use the smaller of the two, capped at reasonable limits
496    ram_based.min(db_based).clamp(128, 65536)
497}
498
499/// Suggest MySQL max_allowed_packet setting
500///
501/// # Arguments
502///
503/// * `max_message_size_mb` - Maximum expected message size in MB
504///
505/// # Returns
506///
507/// Recommended max_allowed_packet in MB
508///
509/// # Examples
510///
511/// ```
512/// use celers_broker_sql::utilities::suggest_mysql_max_allowed_packet;
513///
514/// let max_packet = suggest_mysql_max_allowed_packet(5.0);
515/// assert!(max_packet > 0);
516/// ```
517pub fn suggest_mysql_max_allowed_packet(max_message_size_mb: f64) -> usize {
518    // max_allowed_packet should be larger than max message size
519    // Add 50% buffer for overhead
520    let recommended = (max_message_size_mb * 1.5) as usize;
521
522    // MySQL default is 64MB, min 1MB, max 1GB
523    recommended.clamp(1, 1024)
524}
525
526/// Query pattern analysis result
527#[derive(Debug, Clone)]
528pub struct QueryPatternAnalysis {
529    /// Query type (SELECT, INSERT, UPDATE, DELETE)
530    pub query_type: String,
531    /// Execution count
532    pub execution_count: u64,
533    /// Average execution time (ms)
534    pub avg_execution_time_ms: f64,
535    /// P95 execution time (ms)
536    pub p95_execution_time_ms: f64,
537    /// Rows examined per execution
538    pub avg_rows_examined: f64,
539    /// Rows returned per execution
540    pub avg_rows_returned: f64,
541    /// Optimization recommendation
542    pub recommendation: String,
543}
544
545/// Connection pool health metrics
546#[derive(Debug, Clone)]
547pub struct ConnectionPoolHealth {
548    /// Total connections in pool
549    pub total_connections: usize,
550    /// Active connections
551    pub active_connections: usize,
552    /// Idle connections
553    pub idle_connections: usize,
554    /// Pool utilization percentage
555    pub utilization_percent: f64,
556    /// Average connection wait time (ms)
557    pub avg_wait_time_ms: f64,
558    /// Connection failures count
559    pub connection_failures: u64,
560    /// Health status
561    pub health_status: PoolHealthStatus,
562    /// Recommendations
563    pub recommendations: Vec<String>,
564}
565
566/// Pool health status
567#[derive(Debug, Clone, Copy, PartialEq, Eq)]
568pub enum PoolHealthStatus {
569    /// Healthy (< 70% utilization, low wait times)
570    Healthy,
571    /// Warning (70-90% utilization, moderate wait times)
572    Warning,
573    /// Critical (> 90% utilization, high wait times)
574    Critical,
575}
576
577/// Index effectiveness metrics
578#[derive(Debug, Clone)]
579pub struct IndexEffectiveness {
580    /// Index name
581    pub index_name: String,
582    /// Table name
583    pub table_name: String,
584    /// Index scans count
585    pub index_scans: u64,
586    /// Full table scans that could use this index
587    pub potential_usage: u64,
588    /// Effectiveness score (0-100)
589    pub effectiveness_score: f64,
590    /// Recommendation
591    pub recommendation: String,
592}
593
594/// Table bloat analysis
595#[derive(Debug, Clone)]
596pub struct TableBloatAnalysis {
597    /// Table name
598    pub table_name: String,
599    /// Total table size (MB)
600    pub total_size_mb: f64,
601    /// Data size (MB)
602    pub data_size_mb: f64,
603    /// Index size (MB)
604    pub index_size_mb: f64,
605    /// Estimated bloat (MB)
606    pub bloat_mb: f64,
607    /// Bloat percentage
608    pub bloat_percent: f64,
609    /// Recommendation
610    pub recommendation: String,
611}
612
613/// Replication lag metrics
614#[derive(Debug, Clone)]
615pub struct ReplicationLag {
616    /// Replica server ID
617    pub replica_id: String,
618    /// Lag in seconds
619    pub lag_seconds: f64,
620    /// Replica status
621    pub status: ReplicaStatus,
622    /// Recommendation
623    pub recommendation: String,
624}
625
626/// Replica status
627#[derive(Debug, Clone, Copy, PartialEq, Eq)]
628pub enum ReplicaStatus {
629    /// Healthy (< 1 second lag)
630    Healthy,
631    /// Warning (1-5 seconds lag)
632    Warning,
633    /// Critical (> 5 seconds lag)
634    Critical,
635    /// Error (replication stopped)
636    Error,
637}
638
639/// Analyze query execution pattern
640///
641/// # Arguments
642///
643/// * `query_type` - Type of query (SELECT, INSERT, UPDATE, DELETE)
644/// * `execution_count` - Number of times query executed
645/// * `execution_times_ms` - Slice of execution times in milliseconds
646/// * `rows_examined` - Slice of rows examined per execution
647/// * `rows_returned` - Slice of rows returned per execution
648///
649/// # Returns
650///
651/// Query pattern analysis with optimization recommendations
652///
653/// # Examples
654///
655/// ```
656/// use celers_broker_sql::utilities::analyze_query_pattern;
657///
658/// let times = vec![10.0, 15.0, 12.0, 20.0, 11.0];
659/// let examined = vec![1000.0, 1200.0, 1100.0, 1500.0, 1000.0];
660/// let returned = vec![10.0, 12.0, 11.0, 15.0, 10.0];
661///
662/// let analysis = analyze_query_pattern(
663///     "SELECT",
664///     5,
665///     &times,
666///     &examined,
667///     &returned
668/// );
669/// assert_eq!(analysis.query_type, "SELECT");
670/// assert_eq!(analysis.execution_count, 5);
671/// ```
672pub fn analyze_query_pattern(
673    query_type: &str,
674    execution_count: u64,
675    execution_times_ms: &[f64],
676    rows_examined: &[f64],
677    rows_returned: &[f64],
678) -> QueryPatternAnalysis {
679    let avg_execution_time_ms = if !execution_times_ms.is_empty() {
680        execution_times_ms.iter().sum::<f64>() / execution_times_ms.len() as f64
681    } else {
682        0.0
683    };
684
685    let mut sorted_times = execution_times_ms.to_vec();
686    sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
687    let p95_execution_time_ms = if !sorted_times.is_empty() {
688        let index = ((95.0 / 100.0) * (sorted_times.len() - 1) as f64).round() as usize;
689        sorted_times[index.min(sorted_times.len() - 1)]
690    } else {
691        0.0
692    };
693
694    let avg_rows_examined = if !rows_examined.is_empty() {
695        rows_examined.iter().sum::<f64>() / rows_examined.len() as f64
696    } else {
697        0.0
698    };
699
700    let avg_rows_returned = if !rows_returned.is_empty() {
701        rows_returned.iter().sum::<f64>() / rows_returned.len() as f64
702    } else {
703        0.0
704    };
705
706    // Calculate selectivity ratio
707    let selectivity = if avg_rows_examined > 0.0 {
708        avg_rows_returned / avg_rows_examined
709    } else {
710        1.0
711    };
712
713    let recommendation = if selectivity < 0.01 && avg_rows_examined > 10000.0 {
714        "Poor selectivity: query examines many rows but returns few. Consider adding or optimizing indexes.".to_string()
715    } else if avg_execution_time_ms > 1000.0 {
716        "Slow query detected (>1s). Review query plan and consider optimization.".to_string()
717    } else if p95_execution_time_ms > avg_execution_time_ms * 5.0 {
718        "High variance in execution times. Investigate outliers and consider query cache."
719            .to_string()
720    } else if execution_count > 1000 && avg_execution_time_ms > 100.0 {
721        "Frequently executed slow query. Prime candidate for optimization.".to_string()
722    } else {
723        "Query performance acceptable.".to_string()
724    };
725
726    QueryPatternAnalysis {
727        query_type: query_type.to_string(),
728        execution_count,
729        avg_execution_time_ms,
730        p95_execution_time_ms,
731        avg_rows_examined,
732        avg_rows_returned,
733        recommendation,
734    }
735}
736
737/// Analyze connection pool health
738///
739/// # Arguments
740///
741/// * `total_connections` - Total connections in pool
742/// * `active_connections` - Currently active connections
743/// * `avg_wait_time_ms` - Average connection acquisition wait time
744/// * `connection_failures` - Number of connection failures
745///
746/// # Returns
747///
748/// Connection pool health analysis with recommendations
749///
750/// # Examples
751///
752/// ```
753/// use celers_broker_sql::utilities::analyze_connection_pool_health;
754///
755/// let health = analyze_connection_pool_health(20, 15, 50.0, 5);
756/// assert_eq!(health.total_connections, 20);
757/// assert_eq!(health.active_connections, 15);
758/// ```
759pub fn analyze_connection_pool_health(
760    total_connections: usize,
761    active_connections: usize,
762    avg_wait_time_ms: f64,
763    connection_failures: u64,
764) -> ConnectionPoolHealth {
765    let idle_connections = total_connections.saturating_sub(active_connections);
766    let utilization_percent = if total_connections > 0 {
767        (active_connections as f64 / total_connections as f64) * 100.0
768    } else {
769        0.0
770    };
771
772    let health_status = if utilization_percent > 90.0 || avg_wait_time_ms > 100.0 {
773        PoolHealthStatus::Critical
774    } else if utilization_percent > 70.0 || avg_wait_time_ms > 50.0 {
775        PoolHealthStatus::Warning
776    } else {
777        PoolHealthStatus::Healthy
778    };
779
780    let mut recommendations = Vec::new();
781
782    if utilization_percent > 90.0 {
783        recommendations.push(
784            "Pool utilization is very high (>90%). Consider increasing pool size.".to_string(),
785        );
786    }
787
788    if avg_wait_time_ms > 100.0 {
789        recommendations.push(
790            "High connection wait times (>100ms). Increase pool size or optimize query performance.".to_string()
791        );
792    }
793
794    if connection_failures > 0 {
795        recommendations.push(format!(
796            "Connection failures detected ({}). Check network stability and MySQL max_connections.",
797            connection_failures
798        ));
799    }
800
801    if utilization_percent < 30.0 && total_connections > 10 {
802        recommendations.push(
803            "Low pool utilization (<30%). Consider reducing pool size to conserve resources."
804                .to_string(),
805        );
806    }
807
808    if recommendations.is_empty() {
809        recommendations.push("Connection pool is healthy.".to_string());
810    }
811
812    ConnectionPoolHealth {
813        total_connections,
814        active_connections,
815        idle_connections,
816        utilization_percent,
817        avg_wait_time_ms,
818        connection_failures,
819        health_status,
820        recommendations,
821    }
822}
823
824/// Analyze index effectiveness
825///
826/// # Arguments
827///
828/// * `index_name` - Name of the index
829/// * `table_name` - Name of the table
830/// * `index_scans` - Number of index scans
831/// * `full_table_scans` - Number of full table scans
832///
833/// # Returns
834///
835/// Index effectiveness analysis
836///
837/// # Examples
838///
839/// ```
840/// use celers_broker_sql::utilities::analyze_index_effectiveness;
841///
842/// let analysis = analyze_index_effectiveness(
843///     "idx_tasks_state",
844///     "celers_tasks",
845///     10000,
846///     100
847/// );
848/// assert_eq!(analysis.index_name, "idx_tasks_state");
849/// assert!(analysis.effectiveness_score > 90.0);
850/// ```
851pub fn analyze_index_effectiveness(
852    index_name: &str,
853    table_name: &str,
854    index_scans: u64,
855    full_table_scans: u64,
856) -> IndexEffectiveness {
857    let total_scans = index_scans + full_table_scans;
858    let effectiveness_score = if total_scans > 0 {
859        (index_scans as f64 / total_scans as f64) * 100.0
860    } else {
861        0.0
862    };
863
864    let potential_usage = full_table_scans;
865
866    let recommendation = if effectiveness_score > 90.0 {
867        "Index is highly effective and well-utilized.".to_string()
868    } else if effectiveness_score > 70.0 {
869        "Index is moderately effective. Review query patterns for optimization opportunities."
870            .to_string()
871    } else if effectiveness_score > 50.0 {
872        "Index has low effectiveness. Consider reviewing index design or query patterns."
873            .to_string()
874    } else if index_scans == 0 && full_table_scans > 1000 {
875        "Index is not being used despite many table scans. Consider dropping or redesigning."
876            .to_string()
877    } else {
878        "Index effectiveness is very low. Review if this index is needed.".to_string()
879    };
880
881    IndexEffectiveness {
882        index_name: index_name.to_string(),
883        table_name: table_name.to_string(),
884        index_scans,
885        potential_usage,
886        effectiveness_score,
887        recommendation,
888    }
889}
890
891/// Analyze table bloat
892///
893/// # Arguments
894///
895/// * `table_name` - Name of the table
896/// * `total_size_mb` - Total table size in MB
897/// * `row_count` - Number of rows in table
898/// * `avg_row_length_bytes` - Average row length in bytes
899///
900/// # Returns
901///
902/// Table bloat analysis with recommendations
903///
904/// # Examples
905///
906/// ```
907/// use celers_broker_sql::utilities::analyze_table_bloat;
908///
909/// let analysis = analyze_table_bloat("celers_tasks", 1000.0, 500000, 2048);
910/// assert_eq!(analysis.table_name, "celers_tasks");
911/// assert!(analysis.total_size_mb > 0.0);
912/// ```
913pub fn analyze_table_bloat(
914    table_name: &str,
915    total_size_mb: f64,
916    row_count: u64,
917    avg_row_length_bytes: usize,
918) -> TableBloatAnalysis {
919    // Estimate expected data size
920    let expected_data_mb = (row_count as f64 * avg_row_length_bytes as f64) / (1024.0 * 1024.0);
921
922    // InnoDB overhead: ~30% for indexes, page overhead, etc.
923    let expected_total_mb = expected_data_mb * 1.3;
924
925    let bloat_mb = (total_size_mb - expected_total_mb).max(0.0);
926    let bloat_percent = if expected_total_mb > 0.0 {
927        (bloat_mb / expected_total_mb) * 100.0
928    } else {
929        0.0
930    };
931
932    // Rough estimates for data vs index split (InnoDB typically 70/30)
933    let data_size_mb = total_size_mb * 0.7;
934    let index_size_mb = total_size_mb * 0.3;
935
936    let recommendation = if bloat_percent > 50.0 {
937        "Significant bloat detected (>50%). Run OPTIMIZE TABLE to reclaim space.".to_string()
938    } else if bloat_percent > 25.0 {
939        "Moderate bloat detected (25-50%). Consider running OPTIMIZE TABLE during maintenance window.".to_string()
940    } else if bloat_percent > 10.0 {
941        "Low bloat detected (10-25%). Monitor and optimize if it increases.".to_string()
942    } else {
943        "Table bloat is within acceptable range.".to_string()
944    };
945
946    TableBloatAnalysis {
947        table_name: table_name.to_string(),
948        total_size_mb,
949        data_size_mb,
950        index_size_mb,
951        bloat_mb,
952        bloat_percent,
953        recommendation,
954    }
955}
956
957/// Analyze replication lag
958///
959/// # Arguments
960///
961/// * `replica_id` - Replica server identifier
962/// * `lag_seconds` - Replication lag in seconds
963/// * `io_thread_running` - Whether IO thread is running
964/// * `sql_thread_running` - Whether SQL thread is running
965///
966/// # Returns
967///
968/// Replication lag analysis with recommendations
969///
970/// # Examples
971///
972/// ```
973/// use celers_broker_sql::utilities::analyze_replication_lag;
974///
975/// let analysis = analyze_replication_lag("replica-1", 0.5, true, true);
976/// assert_eq!(analysis.replica_id, "replica-1");
977/// assert_eq!(analysis.lag_seconds, 0.5);
978/// ```
979pub fn analyze_replication_lag(
980    replica_id: &str,
981    lag_seconds: f64,
982    io_thread_running: bool,
983    sql_thread_running: bool,
984) -> ReplicationLag {
985    let status = if !io_thread_running || !sql_thread_running {
986        ReplicaStatus::Error
987    } else if lag_seconds > 5.0 {
988        ReplicaStatus::Critical
989    } else if lag_seconds > 1.0 {
990        ReplicaStatus::Warning
991    } else {
992        ReplicaStatus::Healthy
993    };
994
995    let recommendation = match status {
996        ReplicaStatus::Error => {
997            "Replication threads are not running. Check replica configuration and logs.".to_string()
998        }
999        ReplicaStatus::Critical => {
1000            "Replication lag is critical (>5s). Check replica load, network, and binlog position."
1001                .to_string()
1002        }
1003        ReplicaStatus::Warning => {
1004            "Replication lag is elevated (1-5s). Monitor closely and investigate if it persists."
1005                .to_string()
1006        }
1007        ReplicaStatus::Healthy => "Replication is healthy with minimal lag (<1s).".to_string(),
1008    };
1009
1010    ReplicationLag {
1011        replica_id: replica_id.to_string(),
1012        lag_seconds,
1013        status,
1014        recommendation,
1015    }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020    use super::*;
1021
1022    #[test]
1023    fn test_calculate_optimal_batch_size() {
1024        let batch_size = calculate_optimal_mysql_batch_size(1000, 1024, 100);
1025        assert!(batch_size > 0);
1026        assert!(batch_size <= 1000);
1027    }
1028
1029    #[test]
1030    fn test_calculate_optimal_batch_size_large_messages() {
1031        let batch_size = calculate_optimal_mysql_batch_size(1000, 200_000, 100);
1032        assert!(batch_size <= 20);
1033    }
1034
1035    #[test]
1036    fn test_estimate_queue_memory() {
1037        let memory = estimate_mysql_queue_memory(1000, 1024);
1038        assert!(memory > 1024 * 1000);
1039    }
1040
1041    #[test]
1042    fn test_calculate_optimal_pool_size() {
1043        let pool_size = calculate_optimal_mysql_pool_size(100, 50);
1044        assert!(pool_size >= 100);
1045        assert!(pool_size <= 200);
1046    }
1047
1048    #[test]
1049    fn test_estimate_drain_time() {
1050        let drain_time = estimate_mysql_queue_drain_time(1000, 50.0);
1051        assert_eq!(drain_time, 20.0);
1052    }
1053
1054    #[test]
1055    fn test_estimate_drain_time_zero_rate() {
1056        let drain_time = estimate_mysql_queue_drain_time(1000, 0.0);
1057        assert!(drain_time.is_infinite());
1058    }
1059
1060    #[test]
1061    fn test_suggest_query_strategy() {
1062        let strategy = suggest_mysql_query_strategy(5, "write");
1063        assert!(strategy.contains("individually"));
1064
1065        let strategy = suggest_mysql_query_strategy(50, "write");
1066        assert!(strategy.contains("single transaction"));
1067
1068        let strategy = suggest_mysql_query_strategy(1000, "write");
1069        assert!(strategy.contains("chunked"));
1070    }
1071
1072    #[test]
1073    fn test_suggest_optimize_strategy() {
1074        let strategy = suggest_mysql_optimize_strategy(60.0, 100.0);
1075        assert!(strategy.contains("OPTIMIZE TABLE"));
1076
1077        let strategy = suggest_mysql_optimize_strategy(25.0, 1500.0);
1078        assert!(strategy.contains("OPTIMIZE TABLE"));
1079
1080        let strategy = suggest_mysql_optimize_strategy(5.0, 100.0);
1081        assert!(strategy.contains("ANALYZE"));
1082    }
1083
1084    #[test]
1085    fn test_suggest_index_strategy() {
1086        let recommendation = suggest_mysql_index_strategy(100, 10000, 1000000);
1087        assert!(recommendation.contains("full table scan"));
1088
1089        let recommendation = suggest_mysql_index_strategy(10000, 100, 1000000);
1090        assert!(recommendation.contains("Good index usage"));
1091    }
1092
1093    #[test]
1094    fn test_analyze_query_performance() {
1095        let mut latencies = HashMap::new();
1096        latencies.insert("enqueue".to_string(), 5.0);
1097        latencies.insert("dequeue".to_string(), 15.0);
1098        latencies.insert("ack".to_string(), 3.0);
1099
1100        let analysis = analyze_mysql_query_performance(&latencies);
1101        assert_eq!(analysis.get("slowest_query"), Some(&"dequeue".to_string()));
1102        assert!(analysis.contains_key("avg_latency_ms"));
1103        assert!(analysis.contains_key("overall_status"));
1104    }
1105
1106    #[test]
1107    fn test_suggest_innodb_tuning() {
1108        let config = suggest_mysql_innodb_tuning(1500.0, 60.0);
1109        assert!(config.contains("innodb_buffer_pool_size"));
1110
1111        let config = suggest_mysql_innodb_tuning(50.0, 5.0);
1112        assert!(config.contains("innodb_buffer_pool_size"));
1113    }
1114
1115    #[test]
1116    fn test_calculate_timeout_values() {
1117        let (conn_timeout, wait_timeout) = calculate_mysql_timeout_values(50.0, 200.0);
1118        assert!(conn_timeout >= 5);
1119        assert!(wait_timeout >= 60);
1120    }
1121
1122    #[test]
1123    fn test_suggest_sort_buffer_size() {
1124        let sort_buffer = suggest_mysql_sort_buffer_size(10.0, 20, 16.0);
1125        assert!(sort_buffer >= 1);
1126        assert!(sort_buffer <= 16);
1127    }
1128
1129    #[test]
1130    fn test_suggest_innodb_buffer_pool_size() {
1131        let buffer_pool = suggest_mysql_innodb_buffer_pool_size(32.0, 10.0);
1132        assert!(buffer_pool >= 128);
1133        assert!(buffer_pool <= 65536);
1134    }
1135
1136    #[test]
1137    fn test_suggest_max_allowed_packet() {
1138        let max_packet = suggest_mysql_max_allowed_packet(5.0);
1139        assert!(max_packet >= 1);
1140        assert!(max_packet <= 1024);
1141    }
1142
1143    #[test]
1144    fn test_analyze_query_pattern_good() {
1145        let times = vec![10.0, 15.0, 12.0, 20.0, 11.0];
1146        let examined = vec![100.0, 120.0, 110.0, 150.0, 100.0];
1147        let returned = vec![10.0, 12.0, 11.0, 15.0, 10.0];
1148
1149        let analysis = analyze_query_pattern("SELECT", 5, &times, &examined, &returned);
1150        assert_eq!(analysis.query_type, "SELECT");
1151        assert_eq!(analysis.execution_count, 5);
1152        assert!(analysis.avg_execution_time_ms > 0.0);
1153        assert!(analysis.recommendation.contains("acceptable"));
1154    }
1155
1156    #[test]
1157    fn test_analyze_query_pattern_slow() {
1158        let times = vec![1500.0, 1600.0, 1700.0]; // Slow queries > 1s
1159        let examined = vec![1000.0, 1200.0, 1100.0];
1160        let returned = vec![10.0, 12.0, 11.0];
1161
1162        let analysis = analyze_query_pattern("SELECT", 3, &times, &examined, &returned);
1163        assert!(analysis.avg_execution_time_ms > 1000.0);
1164        assert!(analysis.recommendation.contains("Slow query"));
1165    }
1166
1167    #[test]
1168    fn test_analyze_query_pattern_poor_selectivity() {
1169        let times = vec![100.0, 120.0, 110.0];
1170        let examined = vec![100000.0, 120000.0, 110000.0]; // Many rows examined
1171        let returned = vec![10.0, 12.0, 11.0]; // Few returned
1172
1173        let analysis = analyze_query_pattern("SELECT", 3, &times, &examined, &returned);
1174        assert!(analysis.recommendation.contains("selectivity"));
1175    }
1176
1177    #[test]
1178    fn test_analyze_connection_pool_health_healthy() {
1179        let health = analyze_connection_pool_health(20, 10, 20.0, 0);
1180        assert_eq!(health.total_connections, 20);
1181        assert_eq!(health.active_connections, 10);
1182        assert_eq!(health.idle_connections, 10);
1183        assert_eq!(health.utilization_percent, 50.0);
1184        assert_eq!(health.health_status, PoolHealthStatus::Healthy);
1185    }
1186
1187    #[test]
1188    fn test_analyze_connection_pool_health_critical() {
1189        let health = analyze_connection_pool_health(20, 19, 150.0, 5);
1190        assert!(health.utilization_percent > 90.0);
1191        assert_eq!(health.health_status, PoolHealthStatus::Critical);
1192        assert!(!health.recommendations.is_empty());
1193    }
1194
1195    #[test]
1196    fn test_analyze_connection_pool_health_warning() {
1197        let health = analyze_connection_pool_health(20, 15, 60.0, 0);
1198        assert_eq!(health.utilization_percent, 75.0);
1199        assert_eq!(health.health_status, PoolHealthStatus::Warning);
1200    }
1201
1202    #[test]
1203    fn test_analyze_index_effectiveness_high() {
1204        let analysis = analyze_index_effectiveness("idx_tasks_state", "celers_tasks", 10000, 100);
1205        assert_eq!(analysis.index_name, "idx_tasks_state");
1206        assert_eq!(analysis.table_name, "celers_tasks");
1207        assert!(analysis.effectiveness_score > 90.0);
1208        assert!(analysis.recommendation.contains("effective"));
1209    }
1210
1211    #[test]
1212    fn test_analyze_index_effectiveness_low() {
1213        let analysis = analyze_index_effectiveness("idx_unused", "celers_tasks", 0, 5000);
1214        assert_eq!(analysis.effectiveness_score, 0.0);
1215        assert!(analysis.recommendation.contains("not being used"));
1216    }
1217
1218    #[test]
1219    fn test_analyze_table_bloat_low() {
1220        let analysis = analyze_table_bloat("celers_tasks", 1000.0, 500000, 2048);
1221        assert_eq!(analysis.table_name, "celers_tasks");
1222        assert!(analysis.total_size_mb > 0.0);
1223        assert!(analysis.bloat_percent >= 0.0);
1224    }
1225
1226    #[test]
1227    fn test_analyze_table_bloat_high() {
1228        // Small row count but large size = high bloat
1229        let analysis = analyze_table_bloat("bloated_table", 5000.0, 10000, 1024);
1230        assert!(analysis.bloat_mb > 0.0);
1231        assert!(analysis.bloat_percent > 0.0);
1232    }
1233
1234    #[test]
1235    fn test_analyze_replication_lag_healthy() {
1236        let analysis = analyze_replication_lag("replica-1", 0.5, true, true);
1237        assert_eq!(analysis.replica_id, "replica-1");
1238        assert_eq!(analysis.lag_seconds, 0.5);
1239        assert_eq!(analysis.status, ReplicaStatus::Healthy);
1240    }
1241
1242    #[test]
1243    fn test_analyze_replication_lag_warning() {
1244        let analysis = analyze_replication_lag("replica-2", 3.0, true, true);
1245        assert_eq!(analysis.status, ReplicaStatus::Warning);
1246        assert!(analysis.recommendation.contains("elevated"));
1247    }
1248
1249    #[test]
1250    fn test_analyze_replication_lag_critical() {
1251        let analysis = analyze_replication_lag("replica-3", 10.0, true, true);
1252        assert_eq!(analysis.status, ReplicaStatus::Critical);
1253        assert!(analysis.recommendation.contains("critical"));
1254    }
1255
1256    #[test]
1257    fn test_analyze_replication_lag_error() {
1258        let analysis = analyze_replication_lag("replica-4", 0.0, false, false);
1259        assert_eq!(analysis.status, ReplicaStatus::Error);
1260        assert!(analysis.recommendation.contains("not running"));
1261    }
1262}