Skip to main content

celers_broker_sql/
monitoring.rs

1//! MySQL broker monitoring utilities
2//!
3//! This module provides production-grade monitoring and analysis utilities
4//! for MySQL-based task queues. These utilities help with capacity planning,
5//! autoscaling decisions, SLA monitoring, and performance optimization.
6//!
7//! # Examples
8//!
9//! ```
10//! use celers_broker_sql::monitoring::*;
11//!
12//! # fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! // Analyze consumer lag and get scaling recommendations
14//! let queue_size = 1000;
15//! let processing_rate = 50.0; // messages/sec
16//! let lag = analyze_mysql_consumer_lag(queue_size, processing_rate, 100);
17//! println!("Queue lag: {} seconds", lag.lag_seconds);
18//! println!("Recommendation: {:?}", lag.recommendation);
19//!
20//! // Calculate message velocity and growth trends
21//! let velocity = calculate_mysql_message_velocity(
22//!     1000, // previous size
23//!     1500, // current size
24//!     60.0  // time window (seconds)
25//! );
26//! println!("Queue growing at {} msg/sec", velocity.velocity);
27//!
28//! // Get worker scaling recommendation
29//! let scaling = suggest_mysql_worker_scaling(
30//!     2000,  // queue_size
31//!     5,     // current_workers
32//!     40.0,  // avg_processing_rate (msg/sec per worker)
33//!     100    // target_lag_seconds
34//! );
35//! println!("Suggested workers: {}", scaling.recommended_workers);
36//! # Ok(())
37//! # }
38//! ```
39
40use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42
43/// Consumer lag analysis with autoscaling recommendations
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ConsumerLagAnalysis {
46    /// Current queue size
47    pub queue_size: usize,
48    /// Processing rate (messages per second)
49    pub processing_rate: f64,
50    /// Target acceptable lag (seconds)
51    pub target_lag_seconds: u64,
52    /// Calculated lag in seconds
53    pub lag_seconds: f64,
54    /// Whether the lag exceeds the target
55    pub is_lagging: bool,
56    /// Scaling recommendation
57    pub recommendation: ScalingRecommendation,
58}
59
60/// Worker scaling recommendation
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum ScalingRecommendation {
63    /// Scale up workers
64    ScaleUp { additional_workers: usize },
65    /// Current workers are sufficient
66    Optimal,
67    /// Can scale down workers
68    ScaleDown { workers_to_remove: usize },
69}
70
71/// Message velocity and queue growth trend
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct MessageVelocity {
74    /// Previous queue size
75    pub previous_size: usize,
76    /// Current queue size
77    pub current_size: usize,
78    /// Time window (seconds)
79    pub time_window_secs: f64,
80    /// Messages per second (positive = growing, negative = shrinking)
81    pub velocity: f64,
82    /// Queue growth trend
83    pub trend: QueueTrend,
84}
85
86/// Queue growth trend classification
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88pub enum QueueTrend {
89    /// Queue is growing rapidly (> 10 msg/sec)
90    RapidGrowth,
91    /// Queue is growing slowly (1-10 msg/sec)
92    SlowGrowth,
93    /// Queue is stable (< 1 msg/sec change)
94    Stable,
95    /// Queue is shrinking slowly (-10 to -1 msg/sec)
96    SlowShrink,
97    /// Queue is shrinking rapidly (< -10 msg/sec)
98    RapidShrink,
99}
100
101/// Worker scaling suggestion
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct WorkerScalingSuggestion {
104    /// Current queue size
105    pub queue_size: usize,
106    /// Current number of workers
107    pub current_workers: usize,
108    /// Average processing rate per worker (msg/sec)
109    pub avg_processing_rate: f64,
110    /// Target lag in seconds
111    pub target_lag_seconds: u64,
112    /// Recommended number of workers
113    pub recommended_workers: usize,
114    /// Scaling action needed
115    pub action: ScalingRecommendation,
116}
117
118/// Message age distribution for SLA monitoring
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct MessageAgeDistribution {
121    /// Total messages analyzed
122    pub total_messages: usize,
123    /// Minimum age (seconds)
124    pub min_age_secs: f64,
125    /// Maximum age (seconds)
126    pub max_age_secs: f64,
127    /// Average age (seconds)
128    pub avg_age_secs: f64,
129    /// 50th percentile (median) age (seconds)
130    pub p50_age_secs: f64,
131    /// 95th percentile age (seconds)
132    pub p95_age_secs: f64,
133    /// 99th percentile age (seconds)
134    pub p99_age_secs: f64,
135    /// Messages older than SLA threshold
136    pub messages_exceeding_sla: usize,
137}
138
139/// Processing capacity estimation
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct ProcessingCapacity {
142    /// Number of workers
143    pub workers: usize,
144    /// Average processing rate per worker (msg/sec)
145    pub rate_per_worker: f64,
146    /// Total system capacity (msg/sec)
147    pub total_capacity_per_sec: f64,
148    /// Total system capacity (msg/min)
149    pub total_capacity_per_min: f64,
150    /// Total system capacity (msg/hour)
151    pub total_capacity_per_hour: f64,
152    /// Time to process backlog (seconds)
153    pub time_to_clear_backlog_secs: f64,
154}
155
156/// Analyze consumer lag and provide scaling recommendations
157///
158/// # Arguments
159///
160/// * `queue_size` - Current number of messages in queue
161/// * `processing_rate` - Current processing rate (messages per second)
162/// * `target_lag_seconds` - Target acceptable lag in seconds
163///
164/// # Returns
165///
166/// Consumer lag analysis with scaling recommendation
167///
168/// # Examples
169///
170/// ```
171/// use celers_broker_sql::monitoring::analyze_mysql_consumer_lag;
172///
173/// let lag = analyze_mysql_consumer_lag(1000, 50.0, 100);
174/// assert_eq!(lag.queue_size, 1000);
175/// assert_eq!(lag.processing_rate, 50.0);
176/// ```
177pub fn analyze_mysql_consumer_lag(
178    queue_size: usize,
179    processing_rate: f64,
180    target_lag_seconds: u64,
181) -> ConsumerLagAnalysis {
182    let lag_seconds = if processing_rate > 0.0 {
183        queue_size as f64 / processing_rate
184    } else {
185        f64::INFINITY
186    };
187
188    let is_lagging = lag_seconds > target_lag_seconds as f64;
189
190    let recommendation = if is_lagging {
191        let target_rate = queue_size as f64 / target_lag_seconds as f64;
192        let additional_capacity_needed = target_rate - processing_rate;
193        let workers_needed = (additional_capacity_needed / processing_rate).ceil() as usize;
194        ScalingRecommendation::ScaleUp {
195            additional_workers: workers_needed.max(1),
196        }
197    } else if lag_seconds < (target_lag_seconds as f64 * 0.5) && queue_size > 0 {
198        // If lag is less than 50% of target, consider scaling down
199        let excess_capacity = processing_rate - (queue_size as f64 / target_lag_seconds as f64);
200        let workers_to_remove = (excess_capacity / processing_rate).floor() as usize;
201        if workers_to_remove > 0 {
202            ScalingRecommendation::ScaleDown { workers_to_remove }
203        } else {
204            ScalingRecommendation::Optimal
205        }
206    } else {
207        ScalingRecommendation::Optimal
208    };
209
210    ConsumerLagAnalysis {
211        queue_size,
212        processing_rate,
213        target_lag_seconds,
214        lag_seconds,
215        is_lagging,
216        recommendation,
217    }
218}
219
220/// Calculate message velocity and queue growth trend
221///
222/// # Arguments
223///
224/// * `previous_size` - Queue size at start of window
225/// * `current_size` - Queue size at end of window
226/// * `time_window_secs` - Time window in seconds
227///
228/// # Returns
229///
230/// Message velocity and trend analysis
231///
232/// # Examples
233///
234/// ```
235/// use celers_broker_sql::monitoring::{calculate_mysql_message_velocity, QueueTrend};
236///
237/// let velocity = calculate_mysql_message_velocity(1000, 1500, 60.0);
238/// assert_eq!(velocity.previous_size, 1000);
239/// assert_eq!(velocity.current_size, 1500);
240/// assert_eq!(velocity.trend, QueueTrend::SlowGrowth);
241/// ```
242pub fn calculate_mysql_message_velocity(
243    previous_size: usize,
244    current_size: usize,
245    time_window_secs: f64,
246) -> MessageVelocity {
247    let velocity = if time_window_secs > 0.0 {
248        (current_size as f64 - previous_size as f64) / time_window_secs
249    } else {
250        0.0
251    };
252
253    let trend = if velocity > 10.0 {
254        QueueTrend::RapidGrowth
255    } else if velocity > 1.0 {
256        QueueTrend::SlowGrowth
257    } else if velocity > -1.0 {
258        QueueTrend::Stable
259    } else if velocity > -10.0 {
260        QueueTrend::SlowShrink
261    } else {
262        QueueTrend::RapidShrink
263    };
264
265    MessageVelocity {
266        previous_size,
267        current_size,
268        time_window_secs,
269        velocity,
270        trend,
271    }
272}
273
274/// Suggest worker scaling based on queue metrics
275///
276/// # Arguments
277///
278/// * `queue_size` - Current queue size
279/// * `current_workers` - Current number of workers
280/// * `avg_processing_rate` - Average processing rate per worker (msg/sec)
281/// * `target_lag_seconds` - Target lag in seconds
282///
283/// # Returns
284///
285/// Worker scaling suggestion with recommended worker count
286///
287/// # Examples
288///
289/// ```
290/// use celers_broker_sql::monitoring::suggest_mysql_worker_scaling;
291///
292/// let scaling = suggest_mysql_worker_scaling(2000, 5, 40.0, 100);
293/// assert_eq!(scaling.current_workers, 5);
294/// assert!(scaling.recommended_workers >= 1);
295/// ```
296pub fn suggest_mysql_worker_scaling(
297    queue_size: usize,
298    current_workers: usize,
299    avg_processing_rate: f64,
300    target_lag_seconds: u64,
301) -> WorkerScalingSuggestion {
302    let current_total_rate = current_workers as f64 * avg_processing_rate;
303    let target_rate = queue_size as f64 / target_lag_seconds as f64;
304
305    let recommended_workers = if target_rate > current_total_rate {
306        ((target_rate / avg_processing_rate).ceil() as usize).max(1)
307    } else {
308        ((target_rate / avg_processing_rate).floor() as usize).max(1)
309    };
310
311    let action = if recommended_workers > current_workers {
312        ScalingRecommendation::ScaleUp {
313            additional_workers: recommended_workers - current_workers,
314        }
315    } else if recommended_workers < current_workers {
316        ScalingRecommendation::ScaleDown {
317            workers_to_remove: current_workers - recommended_workers,
318        }
319    } else {
320        ScalingRecommendation::Optimal
321    };
322
323    WorkerScalingSuggestion {
324        queue_size,
325        current_workers,
326        avg_processing_rate,
327        target_lag_seconds,
328        recommended_workers,
329        action,
330    }
331}
332
333/// Calculate message age distribution for SLA monitoring
334///
335/// # Arguments
336///
337/// * `message_ages` - Slice of message ages in seconds
338/// * `sla_threshold_secs` - SLA threshold in seconds
339///
340/// # Returns
341///
342/// Message age distribution with percentiles
343///
344/// # Examples
345///
346/// ```
347/// use celers_broker_sql::monitoring::calculate_mysql_message_age_distribution;
348///
349/// let ages = vec![10.0, 20.0, 30.0, 40.0, 50.0];
350/// let dist = calculate_mysql_message_age_distribution(&ages, 60.0);
351/// assert_eq!(dist.total_messages, 5);
352/// assert_eq!(dist.messages_exceeding_sla, 0);
353/// ```
354pub fn calculate_mysql_message_age_distribution(
355    message_ages: &[f64],
356    sla_threshold_secs: f64,
357) -> MessageAgeDistribution {
358    if message_ages.is_empty() {
359        return MessageAgeDistribution {
360            total_messages: 0,
361            min_age_secs: 0.0,
362            max_age_secs: 0.0,
363            avg_age_secs: 0.0,
364            p50_age_secs: 0.0,
365            p95_age_secs: 0.0,
366            p99_age_secs: 0.0,
367            messages_exceeding_sla: 0,
368        };
369    }
370
371    let mut sorted_ages = message_ages.to_vec();
372    sorted_ages.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
373
374    let total_messages = sorted_ages.len();
375    let min_age_secs = sorted_ages[0];
376    let max_age_secs = sorted_ages[total_messages - 1];
377    let avg_age_secs = sorted_ages.iter().sum::<f64>() / total_messages as f64;
378
379    let p50_age_secs = percentile(&sorted_ages, 50.0);
380    let p95_age_secs = percentile(&sorted_ages, 95.0);
381    let p99_age_secs = percentile(&sorted_ages, 99.0);
382
383    let messages_exceeding_sla = sorted_ages
384        .iter()
385        .filter(|&&age| age > sla_threshold_secs)
386        .count();
387
388    MessageAgeDistribution {
389        total_messages,
390        min_age_secs,
391        max_age_secs,
392        avg_age_secs,
393        p50_age_secs,
394        p95_age_secs,
395        p99_age_secs,
396        messages_exceeding_sla,
397    }
398}
399
400/// Estimate processing capacity of the MySQL broker system
401///
402/// # Arguments
403///
404/// * `workers` - Number of workers
405/// * `rate_per_worker` - Processing rate per worker (msg/sec)
406/// * `current_backlog` - Current queue backlog size
407///
408/// # Returns
409///
410/// Processing capacity estimation
411///
412/// # Examples
413///
414/// ```
415/// use celers_broker_sql::monitoring::estimate_mysql_processing_capacity;
416///
417/// let capacity = estimate_mysql_processing_capacity(10, 50.0, 5000);
418/// assert_eq!(capacity.workers, 10);
419/// assert_eq!(capacity.total_capacity_per_sec, 500.0);
420/// ```
421pub fn estimate_mysql_processing_capacity(
422    workers: usize,
423    rate_per_worker: f64,
424    current_backlog: usize,
425) -> ProcessingCapacity {
426    let total_capacity_per_sec = workers as f64 * rate_per_worker;
427    let total_capacity_per_min = total_capacity_per_sec * 60.0;
428    let total_capacity_per_hour = total_capacity_per_min * 60.0;
429
430    let time_to_clear_backlog_secs = if total_capacity_per_sec > 0.0 {
431        current_backlog as f64 / total_capacity_per_sec
432    } else {
433        f64::INFINITY
434    };
435
436    ProcessingCapacity {
437        workers,
438        rate_per_worker,
439        total_capacity_per_sec,
440        total_capacity_per_min,
441        total_capacity_per_hour,
442        time_to_clear_backlog_secs,
443    }
444}
445
446/// Calculate MySQL queue health score (0.0 - 1.0)
447///
448/// Higher score = healthier queue
449///
450/// # Arguments
451///
452/// * `queue_size` - Current queue size
453/// * `processing_rate` - Processing rate (msg/sec)
454/// * `max_acceptable_size` - Maximum acceptable queue size
455/// * `target_processing_rate` - Target processing rate
456///
457/// # Returns
458///
459/// Health score between 0.0 (unhealthy) and 1.0 (healthy)
460///
461/// # Examples
462///
463/// ```
464/// use celers_broker_sql::monitoring::calculate_mysql_queue_health_score;
465///
466/// let score = calculate_mysql_queue_health_score(100, 50.0, 1000, 40.0);
467/// assert!(score > 0.5);
468/// assert!(score <= 1.0);
469/// ```
470pub fn calculate_mysql_queue_health_score(
471    queue_size: usize,
472    processing_rate: f64,
473    max_acceptable_size: usize,
474    target_processing_rate: f64,
475) -> f64 {
476    // Size score: 1.0 when empty, 0.0 when at max
477    let size_score = if max_acceptable_size > 0 {
478        1.0 - (queue_size as f64 / max_acceptable_size as f64).min(1.0)
479    } else {
480        1.0
481    };
482
483    // Processing rate score: 1.0 when at or above target, 0.0 when zero
484    let rate_score = if target_processing_rate > 0.0 {
485        (processing_rate / target_processing_rate).min(1.0)
486    } else {
487        1.0
488    };
489
490    // Weighted average: 60% size, 40% rate
491    (size_score * 0.6) + (rate_score * 0.4)
492}
493
494/// Analyze MySQL broker performance metrics
495///
496/// # Arguments
497///
498/// * `metrics` - HashMap of metric name to value
499///
500/// # Returns
501///
502/// Performance analysis summary
503///
504/// # Examples
505///
506/// ```
507/// use celers_broker_sql::monitoring::analyze_mysql_broker_performance;
508/// use std::collections::HashMap;
509///
510/// let mut metrics = HashMap::new();
511/// metrics.insert("avg_latency_ms".to_string(), 25.0);
512/// metrics.insert("throughput_msg_per_sec".to_string(), 500.0);
513/// metrics.insert("error_rate_percent".to_string(), 0.5);
514///
515/// let analysis = analyze_mysql_broker_performance(&metrics);
516/// assert!(analysis.contains_key("latency_status"));
517/// ```
518pub fn analyze_mysql_broker_performance(metrics: &HashMap<String, f64>) -> HashMap<String, String> {
519    let mut analysis = HashMap::new();
520
521    if let Some(&latency) = metrics.get("avg_latency_ms") {
522        let status = if latency < 10.0 {
523            "excellent"
524        } else if latency < 50.0 {
525            "good"
526        } else if latency < 100.0 {
527            "acceptable"
528        } else {
529            "poor"
530        };
531        analysis.insert("latency_status".to_string(), status.to_string());
532    }
533
534    if let Some(&throughput) = metrics.get("throughput_msg_per_sec") {
535        let status = if throughput > 1000.0 {
536            "high"
537        } else if throughput > 100.0 {
538            "medium"
539        } else {
540            "low"
541        };
542        analysis.insert("throughput_status".to_string(), status.to_string());
543    }
544
545    if let Some(&error_rate) = metrics.get("error_rate_percent") {
546        let status = if error_rate < 1.0 {
547            "healthy"
548        } else if error_rate < 5.0 {
549            "warning"
550        } else {
551            "critical"
552        };
553        analysis.insert("error_rate_status".to_string(), status.to_string());
554    }
555
556    analysis
557}
558
559// Helper function to calculate percentile
560fn percentile(sorted_values: &[f64], p: f64) -> f64 {
561    if sorted_values.is_empty() {
562        return 0.0;
563    }
564
565    let index = (p / 100.0 * (sorted_values.len() - 1) as f64).round() as usize;
566    sorted_values[index.min(sorted_values.len() - 1)]
567}
568
569/// Cost analysis for MySQL broker operations
570#[derive(Debug, Clone, Serialize, Deserialize)]
571pub struct MysqlCostAnalysis {
572    /// Total database storage used (GB)
573    pub storage_gb: f64,
574    /// Total IOPS (read + write operations per second)
575    pub total_iops: f64,
576    /// Network egress (GB per day)
577    pub network_egress_gb_per_day: f64,
578    /// Estimated monthly cost (USD)
579    pub estimated_monthly_cost_usd: f64,
580    /// Cost per 1000 messages processed
581    pub cost_per_1000_messages: f64,
582    /// Recommendations for cost optimization
583    pub optimization_recommendations: Vec<String>,
584}
585
586/// SLA compliance tracking
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub struct SlaComplianceReport {
589    /// Total messages processed in period
590    pub total_messages: usize,
591    /// Messages within SLA
592    pub messages_within_sla: usize,
593    /// Messages exceeding SLA
594    pub messages_exceeding_sla: usize,
595    /// SLA compliance percentage (0-100)
596    pub compliance_percentage: f64,
597    /// Average processing time (seconds)
598    pub avg_processing_time_secs: f64,
599    /// P95 processing time (seconds)
600    pub p95_processing_time_secs: f64,
601    /// P99 processing time (seconds)
602    pub p99_processing_time_secs: f64,
603    /// SLA status
604    pub status: SlaStatus,
605}
606
607/// SLA compliance status
608#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
609pub enum SlaStatus {
610    /// Compliant (>= 99% within SLA)
611    Compliant,
612    /// Warning (95-99% within SLA)
613    Warning,
614    /// Violation (< 95% within SLA)
615    Violation,
616}
617
618/// Alert threshold recommendations
619#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct AlertThresholds {
621    /// Queue size alert threshold
622    pub queue_size_warning: usize,
623    /// Queue size critical threshold
624    pub queue_size_critical: usize,
625    /// Processing lag warning (seconds)
626    pub lag_warning_secs: u64,
627    /// Processing lag critical (seconds)
628    pub lag_critical_secs: u64,
629    /// Error rate warning (percentage)
630    pub error_rate_warning_percent: f64,
631    /// Error rate critical (percentage)
632    pub error_rate_critical_percent: f64,
633    /// DLQ size warning threshold
634    pub dlq_size_warning: usize,
635    /// DLQ size critical threshold
636    pub dlq_size_critical: usize,
637}
638
639/// Capacity forecast for future load
640#[derive(Debug, Clone, Serialize, Deserialize)]
641pub struct CapacityForecast {
642    /// Current capacity (messages per hour)
643    pub current_capacity_per_hour: f64,
644    /// Projected load (messages per hour)
645    pub projected_load_per_hour: f64,
646    /// Capacity utilization percentage
647    pub utilization_percent: f64,
648    /// Time until capacity exhausted (hours), None if capacity sufficient
649    pub time_to_exhaustion_hours: Option<f64>,
650    /// Recommended additional workers
651    pub recommended_additional_workers: usize,
652    /// Forecast status
653    pub status: CapacityStatus,
654}
655
656/// Capacity status
657#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
658pub enum CapacityStatus {
659    /// Capacity sufficient (< 60% utilization)
660    Sufficient,
661    /// Approaching capacity (60-80% utilization)
662    Warning,
663    /// At capacity (80-100% utilization)
664    Critical,
665    /// Over capacity (> 100% utilization)
666    Exceeded,
667}
668
669/// Estimate MySQL broker operational costs
670///
671/// # Arguments
672///
673/// * `storage_gb` - Total database storage in GB
674/// * `total_iops` - Total IOPS (read + write operations per second)
675/// * `network_egress_gb_per_day` - Network egress in GB per day
676/// * `messages_per_day` - Total messages processed per day
677/// * `storage_cost_per_gb` - Cost per GB of storage per month (default: $0.10 for AWS RDS)
678/// * `iops_cost_per_1000` - Cost per 1000 IOPS per month (default: $0.10 for AWS RDS)
679/// * `network_cost_per_gb` - Cost per GB of network egress (default: $0.09 for AWS)
680///
681/// # Returns
682///
683/// Cost analysis with optimization recommendations
684///
685/// # Examples
686///
687/// ```
688/// use celers_broker_sql::monitoring::estimate_mysql_operational_cost;
689///
690/// let cost = estimate_mysql_operational_cost(
691///     100.0,  // 100 GB storage
692///     5000.0, // 5000 IOPS
693///     50.0,   // 50 GB egress per day
694///     1_000_000, // 1M messages per day
695///     0.10,   // $0.10 per GB storage
696///     0.10,   // $0.10 per 1000 IOPS
697///     0.09    // $0.09 per GB egress
698/// );
699/// assert!(cost.estimated_monthly_cost_usd > 0.0);
700/// ```
701#[allow(clippy::too_many_arguments)]
702pub fn estimate_mysql_operational_cost(
703    storage_gb: f64,
704    total_iops: f64,
705    network_egress_gb_per_day: f64,
706    messages_per_day: usize,
707    storage_cost_per_gb: f64,
708    iops_cost_per_1000: f64,
709    network_cost_per_gb: f64,
710) -> MysqlCostAnalysis {
711    // Monthly storage cost
712    let monthly_storage_cost = storage_gb * storage_cost_per_gb;
713
714    // Monthly IOPS cost (IOPS is per second, need to convert to monthly)
715    let iops_per_month = total_iops * 60.0 * 60.0 * 24.0 * 30.0;
716    let monthly_iops_cost = (iops_per_month / 1000.0) * iops_cost_per_1000;
717
718    // Monthly network egress cost
719    let monthly_network_cost = network_egress_gb_per_day * 30.0 * network_cost_per_gb;
720
721    let estimated_monthly_cost_usd =
722        monthly_storage_cost + monthly_iops_cost + monthly_network_cost;
723
724    let messages_per_month = messages_per_day * 30;
725    let cost_per_1000_messages = if messages_per_month > 0 {
726        (estimated_monthly_cost_usd / messages_per_month as f64) * 1000.0
727    } else {
728        0.0
729    };
730
731    let mut optimization_recommendations = Vec::new();
732
733    // Storage optimization
734    if storage_gb > 500.0 {
735        optimization_recommendations.push(
736            "Consider implementing data archival policy for completed tasks older than 30 days"
737                .to_string(),
738        );
739    }
740    if storage_gb > 1000.0 {
741        optimization_recommendations.push(
742            "Large database: consider table partitioning to improve query performance and enable efficient archival".to_string()
743        );
744    }
745
746    // IOPS optimization
747    if total_iops > 10000.0 {
748        optimization_recommendations.push(
749            "High IOPS: consider batch operations to reduce database round-trips".to_string(),
750        );
751        optimization_recommendations.push(
752            "Review indexes to ensure optimal query performance and reduce unnecessary scans"
753                .to_string(),
754        );
755    }
756
757    // Network optimization
758    if network_egress_gb_per_day > 100.0 {
759        optimization_recommendations.push(
760            "High network egress: consider payload compression for large task data".to_string(),
761        );
762    }
763
764    // Cost per message optimization
765    if cost_per_1000_messages > 1.0 {
766        optimization_recommendations.push(
767            "High cost per message: review task retention policies and optimize query patterns"
768                .to_string(),
769        );
770    }
771
772    MysqlCostAnalysis {
773        storage_gb,
774        total_iops,
775        network_egress_gb_per_day,
776        estimated_monthly_cost_usd,
777        cost_per_1000_messages,
778        optimization_recommendations,
779    }
780}
781
782/// Calculate SLA compliance report
783///
784/// # Arguments
785///
786/// * `processing_times_secs` - Slice of processing times in seconds
787/// * `sla_threshold_secs` - SLA threshold in seconds
788///
789/// # Returns
790///
791/// SLA compliance report with status
792///
793/// # Examples
794///
795/// ```
796/// use celers_broker_sql::monitoring::calculate_sla_compliance;
797///
798/// let times = vec![5.0, 10.0, 15.0, 20.0, 25.0, 100.0]; // One SLA violation
799/// let report = calculate_sla_compliance(&times, 30.0);
800/// assert_eq!(report.total_messages, 6);
801/// assert_eq!(report.messages_within_sla, 5);
802/// ```
803pub fn calculate_sla_compliance(
804    processing_times_secs: &[f64],
805    sla_threshold_secs: f64,
806) -> SlaComplianceReport {
807    if processing_times_secs.is_empty() {
808        return SlaComplianceReport {
809            total_messages: 0,
810            messages_within_sla: 0,
811            messages_exceeding_sla: 0,
812            compliance_percentage: 0.0,
813            avg_processing_time_secs: 0.0,
814            p95_processing_time_secs: 0.0,
815            p99_processing_time_secs: 0.0,
816            status: SlaStatus::Violation,
817        };
818    }
819
820    let total_messages = processing_times_secs.len();
821    let messages_within_sla = processing_times_secs
822        .iter()
823        .filter(|&&time| time <= sla_threshold_secs)
824        .count();
825    let messages_exceeding_sla = total_messages - messages_within_sla;
826
827    let compliance_percentage = (messages_within_sla as f64 / total_messages as f64) * 100.0;
828
829    let mut sorted_times = processing_times_secs.to_vec();
830    sorted_times.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
831
832    let avg_processing_time_secs = sorted_times.iter().sum::<f64>() / total_messages as f64;
833    let p95_processing_time_secs = percentile(&sorted_times, 95.0);
834    let p99_processing_time_secs = percentile(&sorted_times, 99.0);
835
836    let status = if compliance_percentage >= 99.0 {
837        SlaStatus::Compliant
838    } else if compliance_percentage >= 95.0 {
839        SlaStatus::Warning
840    } else {
841        SlaStatus::Violation
842    };
843
844    SlaComplianceReport {
845        total_messages,
846        messages_within_sla,
847        messages_exceeding_sla,
848        compliance_percentage,
849        avg_processing_time_secs,
850        p95_processing_time_secs,
851        p99_processing_time_secs,
852        status,
853    }
854}
855
856/// Calculate recommended alert thresholds
857///
858/// # Arguments
859///
860/// * `avg_queue_size` - Average queue size under normal load
861/// * `max_queue_size` - Maximum observed queue size
862/// * `avg_processing_rate` - Average processing rate (messages per second)
863/// * `target_lag_secs` - Target processing lag in seconds
864///
865/// # Returns
866///
867/// Recommended alert thresholds for monitoring
868///
869/// # Examples
870///
871/// ```
872/// use celers_broker_sql::monitoring::calculate_alert_thresholds;
873///
874/// let thresholds = calculate_alert_thresholds(100, 1000, 50.0, 60);
875/// assert!(thresholds.queue_size_warning > 0);
876/// assert!(thresholds.queue_size_critical > thresholds.queue_size_warning);
877/// ```
878pub fn calculate_alert_thresholds(
879    avg_queue_size: usize,
880    max_queue_size: usize,
881    _avg_processing_rate: f64,
882    target_lag_secs: u64,
883) -> AlertThresholds {
884    // Queue size thresholds based on observed patterns
885    let queue_size_warning = ((avg_queue_size as f64 * 2.0) as usize).min(max_queue_size);
886    let queue_size_critical = ((avg_queue_size as f64 * 5.0) as usize).min(max_queue_size * 2);
887
888    // Lag thresholds based on target
889    let lag_warning_secs = target_lag_secs * 2;
890    let lag_critical_secs = target_lag_secs * 5;
891
892    // Error rate thresholds (industry standard)
893    let error_rate_warning_percent = 1.0;
894    let error_rate_critical_percent = 5.0;
895
896    // DLQ thresholds based on queue size
897    let dlq_size_warning = (avg_queue_size as f64 * 0.1) as usize;
898    let dlq_size_critical = (avg_queue_size as f64 * 0.5) as usize;
899
900    AlertThresholds {
901        queue_size_warning,
902        queue_size_critical,
903        lag_warning_secs,
904        lag_critical_secs,
905        error_rate_warning_percent,
906        error_rate_critical_percent,
907        dlq_size_warning,
908        dlq_size_critical,
909    }
910}
911
912/// Forecast capacity needs based on growth trends
913///
914/// # Arguments
915///
916/// * `current_load_per_hour` - Current load in messages per hour
917/// * `growth_rate_percent` - Expected growth rate percentage (e.g., 20.0 for 20%)
918/// * `forecast_horizon_days` - Number of days to forecast
919/// * `current_workers` - Current number of workers
920/// * `processing_rate_per_worker` - Processing rate per worker (messages per hour)
921///
922/// # Returns
923///
924/// Capacity forecast with scaling recommendations
925///
926/// # Examples
927///
928/// ```
929/// use celers_broker_sql::monitoring::forecast_capacity_needs;
930///
931/// let forecast = forecast_capacity_needs(10000.0, 20.0, 30, 10, 1200.0);
932/// assert!(forecast.current_capacity_per_hour > 0.0);
933/// ```
934pub fn forecast_capacity_needs(
935    current_load_per_hour: f64,
936    growth_rate_percent: f64,
937    forecast_horizon_days: u64,
938    current_workers: usize,
939    processing_rate_per_worker: f64,
940) -> CapacityForecast {
941    let current_capacity_per_hour = current_workers as f64 * processing_rate_per_worker;
942
943    // Calculate projected load using compound growth
944    let growth_multiplier = 1.0 + (growth_rate_percent / 100.0);
945    let days_factor = forecast_horizon_days as f64 / 30.0; // Convert to months
946    let projected_load_per_hour = current_load_per_hour * growth_multiplier.powf(days_factor);
947
948    let utilization_percent = (projected_load_per_hour / current_capacity_per_hour) * 100.0;
949
950    let time_to_exhaustion_hours = if projected_load_per_hour > current_capacity_per_hour {
951        // Calculate when capacity will be exhausted
952        let daily_growth = current_load_per_hour * (growth_multiplier.powf(1.0 / 30.0) - 1.0);
953        if daily_growth > 0.0 {
954            Some((current_capacity_per_hour - current_load_per_hour) / daily_growth * 24.0)
955        } else {
956            None
957        }
958    } else {
959        None
960    };
961
962    let recommended_additional_workers = if utilization_percent > 80.0 {
963        let needed_capacity = projected_load_per_hour * 1.2; // 20% buffer
964        let total_workers_needed = (needed_capacity / processing_rate_per_worker).ceil() as usize;
965        total_workers_needed.saturating_sub(current_workers)
966    } else {
967        0
968    };
969
970    let status = if utilization_percent > 100.0 {
971        CapacityStatus::Exceeded
972    } else if utilization_percent > 80.0 {
973        CapacityStatus::Critical
974    } else if utilization_percent > 60.0 {
975        CapacityStatus::Warning
976    } else {
977        CapacityStatus::Sufficient
978    };
979
980    CapacityForecast {
981        current_capacity_per_hour,
982        projected_load_per_hour,
983        utilization_percent,
984        time_to_exhaustion_hours,
985        recommended_additional_workers,
986        status,
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    #[test]
995    fn test_analyze_consumer_lag_optimal() {
996        let lag = analyze_mysql_consumer_lag(100, 50.0, 10);
997        assert_eq!(lag.queue_size, 100);
998        assert_eq!(lag.processing_rate, 50.0);
999        assert_eq!(lag.lag_seconds, 2.0);
1000        assert!(!lag.is_lagging);
1001        assert_eq!(lag.recommendation, ScalingRecommendation::Optimal);
1002    }
1003
1004    #[test]
1005    fn test_analyze_consumer_lag_needs_scale_up() {
1006        let lag = analyze_mysql_consumer_lag(1000, 5.0, 10);
1007        assert!(lag.is_lagging);
1008        assert!(matches!(
1009            lag.recommendation,
1010            ScalingRecommendation::ScaleUp { .. }
1011        ));
1012    }
1013
1014    #[test]
1015    fn test_calculate_message_velocity_growing() {
1016        let velocity = calculate_mysql_message_velocity(1000, 1600, 60.0);
1017        assert_eq!(velocity.velocity, 10.0);
1018        assert_eq!(velocity.trend, QueueTrend::SlowGrowth);
1019    }
1020
1021    #[test]
1022    fn test_calculate_message_velocity_stable() {
1023        let velocity = calculate_mysql_message_velocity(1000, 1010, 60.0);
1024        assert!(velocity.velocity < 1.0);
1025        assert_eq!(velocity.trend, QueueTrend::Stable);
1026    }
1027
1028    #[test]
1029    fn test_suggest_worker_scaling() {
1030        let scaling = suggest_mysql_worker_scaling(2000, 5, 40.0, 100);
1031        assert_eq!(scaling.current_workers, 5);
1032        assert!(scaling.recommended_workers >= 1);
1033    }
1034
1035    #[test]
1036    fn test_message_age_distribution() {
1037        let ages = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
1038        let dist = calculate_mysql_message_age_distribution(&ages, 75.0);
1039        assert_eq!(dist.total_messages, 10);
1040        assert_eq!(dist.min_age_secs, 10.0);
1041        assert_eq!(dist.max_age_secs, 100.0);
1042        assert_eq!(dist.messages_exceeding_sla, 3);
1043    }
1044
1045    #[test]
1046    fn test_message_age_distribution_empty() {
1047        let ages = vec![];
1048        let dist = calculate_mysql_message_age_distribution(&ages, 60.0);
1049        assert_eq!(dist.total_messages, 0);
1050    }
1051
1052    #[test]
1053    fn test_estimate_processing_capacity() {
1054        let capacity = estimate_mysql_processing_capacity(10, 50.0, 5000);
1055        assert_eq!(capacity.workers, 10);
1056        assert_eq!(capacity.total_capacity_per_sec, 500.0);
1057        assert_eq!(capacity.total_capacity_per_min, 30000.0);
1058        assert_eq!(capacity.time_to_clear_backlog_secs, 10.0);
1059    }
1060
1061    #[test]
1062    fn test_calculate_queue_health_score() {
1063        let score = calculate_mysql_queue_health_score(100, 50.0, 1000, 40.0);
1064        assert!(score > 0.5);
1065        assert!(score <= 1.0);
1066    }
1067
1068    #[test]
1069    fn test_analyze_broker_performance() {
1070        let mut metrics = HashMap::new();
1071        metrics.insert("avg_latency_ms".to_string(), 25.0);
1072        metrics.insert("throughput_msg_per_sec".to_string(), 500.0);
1073        metrics.insert("error_rate_percent".to_string(), 0.5);
1074
1075        let analysis = analyze_mysql_broker_performance(&metrics);
1076        assert_eq!(analysis.get("latency_status"), Some(&"good".to_string()));
1077        assert_eq!(
1078            analysis.get("throughput_status"),
1079            Some(&"medium".to_string())
1080        );
1081        assert_eq!(
1082            analysis.get("error_rate_status"),
1083            Some(&"healthy".to_string())
1084        );
1085    }
1086
1087    #[test]
1088    fn test_estimate_operational_cost() {
1089        let cost = estimate_mysql_operational_cost(
1090            100.0,     // 100 GB storage
1091            5000.0,    // 5000 IOPS
1092            50.0,      // 50 GB egress per day
1093            1_000_000, // 1M messages per day
1094            0.10,      // $0.10 per GB storage
1095            0.10,      // $0.10 per 1000 IOPS
1096            0.09,      // $0.09 per GB egress
1097        );
1098        assert!(cost.estimated_monthly_cost_usd > 0.0);
1099        assert!(cost.cost_per_1000_messages > 0.0);
1100        assert_eq!(cost.storage_gb, 100.0);
1101    }
1102
1103    #[test]
1104    fn test_estimate_operational_cost_high_storage() {
1105        let cost = estimate_mysql_operational_cost(
1106            1200.0, // Large storage
1107            10000.0, 50.0, 1_000_000, 0.10, 0.10, 0.09,
1108        );
1109        // Should have recommendations for high storage
1110        assert!(!cost.optimization_recommendations.is_empty());
1111        assert!(cost
1112            .optimization_recommendations
1113            .iter()
1114            .any(|r| r.contains("partition")));
1115    }
1116
1117    #[test]
1118    fn test_calculate_sla_compliance_compliant() {
1119        let times = vec![5.0, 10.0, 15.0, 20.0, 25.0];
1120        let report = calculate_sla_compliance(&times, 30.0);
1121        assert_eq!(report.total_messages, 5);
1122        assert_eq!(report.messages_within_sla, 5);
1123        assert_eq!(report.messages_exceeding_sla, 0);
1124        assert_eq!(report.compliance_percentage, 100.0);
1125        assert_eq!(report.status, SlaStatus::Compliant);
1126    }
1127
1128    #[test]
1129    fn test_calculate_sla_compliance_violation() {
1130        let times = vec![5.0, 10.0, 15.0, 20.0, 25.0, 100.0, 150.0]; // 2 violations
1131        let report = calculate_sla_compliance(&times, 30.0);
1132        assert_eq!(report.total_messages, 7);
1133        assert_eq!(report.messages_within_sla, 5);
1134        assert_eq!(report.messages_exceeding_sla, 2);
1135        assert!(report.compliance_percentage < 95.0);
1136        assert_eq!(report.status, SlaStatus::Violation);
1137    }
1138
1139    #[test]
1140    fn test_calculate_sla_compliance_empty() {
1141        let times = vec![];
1142        let report = calculate_sla_compliance(&times, 30.0);
1143        assert_eq!(report.total_messages, 0);
1144        assert_eq!(report.status, SlaStatus::Violation);
1145    }
1146
1147    #[test]
1148    fn test_calculate_alert_thresholds() {
1149        let thresholds = calculate_alert_thresholds(100, 1000, 50.0, 60);
1150        assert!(thresholds.queue_size_warning > 0);
1151        assert!(thresholds.queue_size_critical > thresholds.queue_size_warning);
1152        assert_eq!(thresholds.lag_warning_secs, 120);
1153        assert_eq!(thresholds.lag_critical_secs, 300);
1154        assert_eq!(thresholds.error_rate_warning_percent, 1.0);
1155        assert_eq!(thresholds.error_rate_critical_percent, 5.0);
1156    }
1157
1158    #[test]
1159    fn test_forecast_capacity_sufficient() {
1160        let forecast = forecast_capacity_needs(
1161            5000.0, // Current load per hour (lower load)
1162            10.0,   // 10% growth
1163            30,     // 30 days
1164            10,     // 10 workers
1165            1500.0, // 1500 msg/hour per worker
1166        );
1167        assert_eq!(forecast.current_capacity_per_hour, 15000.0);
1168        assert!(forecast.projected_load_per_hour > 5000.0);
1169        // With 5000 load and 15000 capacity, utilization should be ~33%, which is Sufficient
1170        assert_eq!(forecast.status, CapacityStatus::Sufficient);
1171        assert_eq!(forecast.recommended_additional_workers, 0);
1172    }
1173
1174    #[test]
1175    fn test_forecast_capacity_exceeded() {
1176        let forecast = forecast_capacity_needs(
1177            10000.0, // Current load per hour
1178            50.0,    // 50% growth
1179            60,      // 60 days (2 months)
1180            5,       // Only 5 workers
1181            1000.0,  // 1000 msg/hour per worker
1182        );
1183        assert_eq!(forecast.current_capacity_per_hour, 5000.0);
1184        assert!(forecast.projected_load_per_hour > forecast.current_capacity_per_hour);
1185        assert_eq!(forecast.status, CapacityStatus::Exceeded);
1186        assert!(forecast.recommended_additional_workers > 0);
1187    }
1188
1189    #[test]
1190    fn test_forecast_capacity_warning() {
1191        let forecast = forecast_capacity_needs(
1192            8000.0, // Current load
1193            20.0,   // 20% growth
1194            30,     // 30 days
1195            10,     // 10 workers
1196            1200.0, // 1200 msg/hour per worker
1197        );
1198        // Capacity should be in warning range (60-80%)
1199        assert!(forecast.utilization_percent > 60.0);
1200        assert!(forecast.utilization_percent <= 80.0);
1201        assert_eq!(forecast.status, CapacityStatus::Warning);
1202    }
1203}