Skip to main content

smith_bus/
lag_monitor.rs

1//! Consumer lag monitoring and backpressure management
2//!
3//! This module provides real-time consumer lag monitoring and automatic
4//! backpressure application to maintain system stability under load.
5
6use crate::sharding::{BackpressureAction, BackpressureManager};
7use anyhow::{Context, Result};
8use async_nats::jetstream::{self, consumer::Info as ConsumerInfo};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::sync::{mpsc, RwLock};
14use tracing::{debug, error, info, warn};
15
16/// Consumer lag monitoring service
17#[derive(Clone)]
18pub struct LagMonitor {
19    jetstream: jetstream::Context,
20    backpressure_manager: BackpressureManager,
21    lag_stats: std::sync::Arc<RwLock<HashMap<String, ConsumerLagStats>>>,
22    alert_sender: Option<mpsc::Sender<BackpressureAlert>>,
23}
24
25/// Consumer lag statistics
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConsumerLagStats {
28    /// Consumer name
29    pub consumer_name: String,
30
31    /// Stream name
32    pub stream_name: String,
33
34    /// Current lag in messages
35    pub message_lag: u64,
36
37    /// Pending acknowledgments
38    pub pending_acks: i64,
39
40    /// Messages per second (calculated)
41    pub throughput_mps: f64,
42
43    /// Last update timestamp
44    pub last_updated: DateTime<Utc>,
45
46    /// Backpressure status
47    pub backpressure_active: bool,
48
49    /// Consumer utilization percentage
50    pub utilization_percent: f64,
51}
52
53/// Backpressure alert information
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct BackpressureAlert {
56    pub consumer_name: String,
57    pub stream_name: String,
58    pub alert_type: BackpressureAlertType,
59    pub message_lag: u64,
60    pub pending_acks: i64,
61    pub actions_taken: Vec<String>,
62    pub timestamp: chrono::DateTime<chrono::Utc>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub enum BackpressureAlertType {
67    /// Lag exceeded threshold
68    HighLag,
69    /// Too many pending acknowledgments
70    HighPendingAcks,
71    /// Consumer completely stuck
72    ConsumerStalled,
73    /// Backpressure resolved
74    BackpressureResolved,
75}
76
77impl LagMonitor {
78    /// Create new lag monitor
79    pub fn new(jetstream: jetstream::Context) -> Self {
80        Self {
81            jetstream,
82            backpressure_manager: BackpressureManager::default(),
83            lag_stats: std::sync::Arc::new(RwLock::new(HashMap::new())),
84            alert_sender: None,
85        }
86    }
87
88    /// Create lag monitor with custom backpressure configuration
89    pub fn with_backpressure_config(
90        jetstream: jetstream::Context,
91        backpressure_manager: BackpressureManager,
92    ) -> Self {
93        Self {
94            jetstream,
95            backpressure_manager,
96            lag_stats: std::sync::Arc::new(RwLock::new(HashMap::new())),
97            alert_sender: None,
98        }
99    }
100
101    /// Enable backpressure alerts
102    pub fn with_alerts(mut self, alert_sender: mpsc::Sender<BackpressureAlert>) -> Self {
103        self.alert_sender = Some(alert_sender);
104        self
105    }
106
107    /// Start monitoring loop
108    pub async fn start_monitoring(&self, check_interval: Duration) -> Result<()> {
109        let mut interval = tokio::time::interval(check_interval);
110
111        info!(
112            "Starting consumer lag monitoring (interval: {:?})",
113            check_interval
114        );
115
116        loop {
117            interval.tick().await;
118
119            if let Err(e) = self.check_all_consumers().await {
120                error!("Failed to check consumer lag: {}", e);
121            }
122        }
123    }
124
125    /// Check lag for all consumers across all streams
126    async fn check_all_consumers(&self) -> Result<()> {
127        let stream_names = self.get_smith_stream_names().await?;
128
129        for stream_name in stream_names {
130            if let Err(e) = self.check_stream_consumers(&stream_name).await {
131                error!(
132                    "Failed to check consumers for stream {}: {}",
133                    stream_name, e
134                );
135                continue;
136            }
137        }
138
139        Ok(())
140    }
141
142    /// Get all Smith stream names
143    async fn get_smith_stream_names(&self) -> Result<Vec<String>> {
144        let streams = vec![
145            "SDLC_RAW".to_string(),
146            "ATOMS_VETTED".to_string(),
147            "ATOMS_RESULTS".to_string(),
148            "AUDIT_SECURITY".to_string(),
149            "SDLC_QUARANTINE_BACKPRESSURE".to_string(),
150        ];
151
152        Ok(streams)
153    }
154
155    /// Check consumers for a specific stream
156    async fn check_stream_consumers(&self, stream_name: &str) -> Result<()> {
157        let mut stream = self
158            .jetstream
159            .get_stream(stream_name)
160            .await
161            .context(format!("Failed to get stream: {}", stream_name))?;
162
163        let stream_info = stream.info().await.context("Failed to get stream info")?;
164
165        // Get consumer names for this stream
166        let consumer_names: Vec<String> = stream_info
167            .config
168            .clone()
169            .subjects
170            .iter()
171            .map(|_| format!("{}-consumer", stream_name.to_lowercase()))
172            .collect();
173
174        for consumer_name in consumer_names {
175            if let Ok(consumer) = self
176                .jetstream
177                .get_consumer_from_stream(stream_name, &consumer_name)
178                .await
179            {
180                if let Err(e) = self
181                    .check_consumer_lag(stream_name, &consumer_name, consumer)
182                    .await
183                {
184                    error!("Failed to check lag for consumer {}: {}", consumer_name, e);
185                }
186            }
187        }
188
189        Ok(())
190    }
191
192    /// Check lag for a specific consumer
193    async fn check_consumer_lag(
194        &self,
195        stream_name: &str,
196        consumer_name: &str,
197        mut consumer: jetstream::consumer::Consumer<jetstream::consumer::pull::Config>,
198    ) -> Result<()> {
199        let consumer_info = consumer
200            .info()
201            .await
202            .context("Failed to get consumer info")?;
203
204        let lag_stats = self
205            .calculate_lag_stats(stream_name, consumer_name, consumer_info)
206            .await?;
207
208        // Check if backpressure should be applied
209        let should_apply_backpressure = self
210            .backpressure_manager
211            .should_apply_backpressure(lag_stats.message_lag, lag_stats.pending_acks);
212
213        // Apply backpressure if needed
214        if should_apply_backpressure && !lag_stats.backpressure_active {
215            self.apply_backpressure(&lag_stats).await?;
216        } else if !should_apply_backpressure && lag_stats.backpressure_active {
217            self.remove_backpressure(&lag_stats).await?;
218        }
219
220        // Update lag statistics
221        {
222            let mut stats_map = self.lag_stats.write().await;
223            stats_map.insert(consumer_name.to_string(), lag_stats);
224        }
225
226        Ok(())
227    }
228
229    /// Calculate consumer lag statistics
230    async fn calculate_lag_stats(
231        &self,
232        stream_name: &str,
233        consumer_name: &str,
234        consumer_info: &ConsumerInfo,
235    ) -> Result<ConsumerLagStats> {
236        // Calculate message lag - simplified for this compilation fix
237        let message_lag = consumer_info.num_pending;
238
239        // Get pending acknowledgments
240        let pending_acks = consumer_info.num_pending as i64;
241
242        // Calculate throughput (simplified - would need historical data for accuracy)
243        let throughput_mps = self
244            .calculate_throughput(consumer_name)
245            .await
246            .unwrap_or(0.0);
247
248        // Calculate utilization percentage
249        let max_ack_pending = consumer_info.config.max_ack_pending as f64;
250        let utilization_percent = (pending_acks as f64 / max_ack_pending) * 100.0;
251
252        // Check if backpressure is currently active
253        let backpressure_active = {
254            let stats_map = self.lag_stats.read().await;
255            stats_map
256                .get(consumer_name)
257                .map(|stats| stats.backpressure_active)
258                .unwrap_or(false)
259        };
260
261        Ok(ConsumerLagStats {
262            consumer_name: consumer_name.to_string(),
263            stream_name: stream_name.to_string(),
264            message_lag,
265            pending_acks,
266            throughput_mps,
267            last_updated: Utc::now(),
268            backpressure_active,
269            utilization_percent,
270        })
271    }
272
273    /// Calculate throughput for a consumer (simplified implementation)
274    async fn calculate_throughput(&self, consumer_name: &str) -> Option<f64> {
275        let stats_map = self.lag_stats.read().await;
276        if let Some(previous_stats) = stats_map.get(consumer_name) {
277            let time_diff = (Utc::now() - previous_stats.last_updated).num_seconds() as f64;
278            if time_diff > 0.0 {
279                // This is a simplified calculation - in production you'd track actual message counts
280                return Some(10.0); // Placeholder throughput
281            }
282        }
283        None
284    }
285
286    /// Apply backpressure measures
287    async fn apply_backpressure(&self, lag_stats: &ConsumerLagStats) -> Result<()> {
288        let actions = self
289            .backpressure_manager
290            .generate_backpressure_response(lag_stats.message_lag, lag_stats.pending_acks);
291
292        let mut action_descriptions = Vec::new();
293
294        for action in actions {
295            match action {
296                BackpressureAction::RouteToQuarantine => {
297                    // Route new messages to quarantine stream
298                    self.route_to_quarantine(&lag_stats.consumer_name).await?;
299                    action_descriptions.push("Routed to quarantine".to_string());
300                }
301                BackpressureAction::ReduceBatchSize(new_size) => {
302                    // Reduce consumer batch size
303                    self.reduce_batch_size(&lag_stats.consumer_name, new_size)
304                        .await?;
305                    action_descriptions.push(format!("Reduced batch size to {}", new_size));
306                }
307                BackpressureAction::ExtendAckWait(duration) => {
308                    // Extend ack wait time
309                    self.extend_ack_wait(&lag_stats.consumer_name, duration)
310                        .await?;
311                    action_descriptions.push(format!("Extended ack wait to {:?}", duration));
312                }
313                BackpressureAction::AlertOps(message) => {
314                    // Send operations alert
315                    self.send_ops_alert(&lag_stats.consumer_name, &message)
316                        .await?;
317                    action_descriptions.push(format!("Ops alert: {}", message));
318                }
319            }
320        }
321
322        // Send backpressure alert
323        if let Some(ref alert_sender) = self.alert_sender {
324            let alert = BackpressureAlert {
325                consumer_name: lag_stats.consumer_name.clone(),
326                stream_name: lag_stats.stream_name.clone(),
327                alert_type: if lag_stats.message_lag > self.backpressure_manager.lag_threshold {
328                    BackpressureAlertType::HighLag
329                } else {
330                    BackpressureAlertType::HighPendingAcks
331                },
332                message_lag: lag_stats.message_lag,
333                pending_acks: lag_stats.pending_acks,
334                actions_taken: action_descriptions,
335                timestamp: chrono::Utc::now(),
336            };
337
338            if let Err(e) = alert_sender.try_send(alert) {
339                error!("Failed to send backpressure alert: {}", e);
340            }
341        }
342
343        warn!(
344            consumer = lag_stats.consumer_name,
345            stream = lag_stats.stream_name,
346            message_lag = lag_stats.message_lag,
347            pending_acks = lag_stats.pending_acks,
348            "Applied backpressure measures"
349        );
350
351        Ok(())
352    }
353
354    /// Remove backpressure measures
355    async fn remove_backpressure(&self, lag_stats: &ConsumerLagStats) -> Result<()> {
356        info!(
357            consumer = lag_stats.consumer_name,
358            stream = lag_stats.stream_name,
359            "Removing backpressure - lag resolved"
360        );
361
362        // Send resolution alert
363        if let Some(ref alert_sender) = self.alert_sender {
364            let alert = BackpressureAlert {
365                consumer_name: lag_stats.consumer_name.clone(),
366                stream_name: lag_stats.stream_name.clone(),
367                alert_type: BackpressureAlertType::BackpressureResolved,
368                message_lag: lag_stats.message_lag,
369                pending_acks: lag_stats.pending_acks,
370                actions_taken: vec!["Backpressure resolved".to_string()],
371                timestamp: chrono::Utc::now(),
372            };
373
374            if let Err(e) = alert_sender.try_send(alert) {
375                error!("Failed to send backpressure resolution alert: {}", e);
376            }
377        }
378
379        Ok(())
380    }
381
382    /// Route messages to quarantine stream
383    async fn route_to_quarantine(&self, consumer_name: &str) -> Result<()> {
384        // Implementation would configure routing to quarantine stream
385        debug!(
386            "Routing messages to quarantine for consumer: {}",
387            consumer_name
388        );
389        Ok(())
390    }
391
392    /// Reduce consumer batch size
393    async fn reduce_batch_size(&self, consumer_name: &str, new_size: usize) -> Result<()> {
394        // Implementation would update consumer configuration
395        debug!(
396            "Reducing batch size to {} for consumer: {}",
397            new_size, consumer_name
398        );
399        Ok(())
400    }
401
402    /// Extend ack wait time
403    async fn extend_ack_wait(&self, consumer_name: &str, duration: Duration) -> Result<()> {
404        // Implementation would update consumer ack wait configuration
405        debug!(
406            "Extending ack wait to {:?} for consumer: {}",
407            duration, consumer_name
408        );
409        Ok(())
410    }
411
412    /// Send operations alert
413    async fn send_ops_alert(&self, consumer_name: &str, message: &str) -> Result<()> {
414        warn!(
415            consumer = consumer_name,
416            alert = message,
417            "Operations alert triggered"
418        );
419        Ok(())
420    }
421
422    /// Get current lag statistics for all consumers
423    pub async fn get_lag_stats(&self) -> HashMap<String, ConsumerLagStats> {
424        self.lag_stats.read().await.clone()
425    }
426
427    /// Get lag statistics for a specific consumer
428    pub async fn get_consumer_lag_stats(&self, consumer_name: &str) -> Option<ConsumerLagStats> {
429        self.lag_stats.read().await.get(consumer_name).cloned()
430    }
431
432    /// Check if any consumer is under backpressure
433    pub async fn has_active_backpressure(&self) -> bool {
434        let stats_map = self.lag_stats.read().await;
435        stats_map.values().any(|stats| stats.backpressure_active)
436    }
437
438    /// Get total message lag across all consumers
439    pub async fn get_total_lag(&self) -> u64 {
440        let stats_map = self.lag_stats.read().await;
441        stats_map.values().map(|stats| stats.message_lag).sum()
442    }
443}
444
445/// Backpressure alert handler
446pub struct BackpressureAlertHandler {
447    alert_receiver: mpsc::Receiver<BackpressureAlert>,
448}
449
450impl BackpressureAlertHandler {
451    pub fn new(alert_receiver: mpsc::Receiver<BackpressureAlert>) -> Self {
452        Self { alert_receiver }
453    }
454
455    /// Start handling backpressure alerts
456    pub async fn start_handling(&mut self) {
457        while let Some(alert) = self.alert_receiver.recv().await {
458            self.handle_alert(alert).await;
459        }
460    }
461
462    async fn handle_alert(&self, alert: BackpressureAlert) {
463        match alert.alert_type {
464            BackpressureAlertType::HighLag => {
465                error!(
466                    consumer = alert.consumer_name,
467                    stream = alert.stream_name,
468                    lag = alert.message_lag,
469                    "HIGH LAG ALERT: Consumer is {} messages behind",
470                    alert.message_lag
471                );
472            }
473            BackpressureAlertType::HighPendingAcks => {
474                error!(
475                    consumer = alert.consumer_name,
476                    stream = alert.stream_name,
477                    pending_acks = alert.pending_acks,
478                    "HIGH PENDING ACKS ALERT: {} unacknowledged messages",
479                    alert.pending_acks
480                );
481            }
482            BackpressureAlertType::ConsumerStalled => {
483                error!(
484                    consumer = alert.consumer_name,
485                    stream = alert.stream_name,
486                    "CONSUMER STALLED ALERT: Consumer appears to be stuck"
487                );
488            }
489            BackpressureAlertType::BackpressureResolved => {
490                info!(
491                    consumer = alert.consumer_name,
492                    stream = alert.stream_name,
493                    "Backpressure resolved for consumer"
494                );
495            }
496        }
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn test_consumer_lag_stats_creation() {
506        let stats = ConsumerLagStats {
507            consumer_name: "test-consumer".to_string(),
508            stream_name: "TEST_STREAM".to_string(),
509            message_lag: 500,
510            pending_acks: 100,
511            throughput_mps: 50.0,
512            last_updated: Utc::now(),
513            backpressure_active: false,
514            utilization_percent: 75.0,
515        };
516
517        assert_eq!(stats.message_lag, 500);
518        assert_eq!(stats.pending_acks, 100);
519        assert!(!stats.backpressure_active);
520    }
521
522    #[test]
523    fn test_backpressure_alert_creation() {
524        let alert = BackpressureAlert {
525            consumer_name: "test-consumer".to_string(),
526            stream_name: "TEST_STREAM".to_string(),
527            alert_type: BackpressureAlertType::HighLag,
528            message_lag: 2000,
529            pending_acks: 600,
530            actions_taken: vec!["Routed to quarantine".to_string()],
531            timestamp: chrono::Utc::now(),
532        };
533
534        assert_eq!(alert.message_lag, 2000);
535        assert!(matches!(alert.alert_type, BackpressureAlertType::HighLag));
536        assert_eq!(alert.actions_taken.len(), 1);
537    }
538}