1use crate::sharding::{BackpressureAction, BackpressureManager};
7use anyhow::{Context, Result};
8use async_nats::jetstream::{self, consumer::Info as ConsumerInfo};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::sync::{mpsc, RwLock};
14use tracing::{debug, error, info, warn};
15
16#[derive(Clone)]
18pub struct LagMonitor {
19 jetstream: jetstream::Context,
20 backpressure_manager: BackpressureManager,
21 lag_stats: std::sync::Arc<RwLock<HashMap<String, ConsumerLagStats>>>,
22 alert_sender: Option<mpsc::Sender<BackpressureAlert>>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConsumerLagStats {
28 pub consumer_name: String,
30
31 pub stream_name: String,
33
34 pub message_lag: u64,
36
37 pub pending_acks: i64,
39
40 pub throughput_mps: f64,
42
43 pub last_updated: DateTime<Utc>,
45
46 pub backpressure_active: bool,
48
49 pub utilization_percent: f64,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct BackpressureAlert {
56 pub consumer_name: String,
57 pub stream_name: String,
58 pub alert_type: BackpressureAlertType,
59 pub message_lag: u64,
60 pub pending_acks: i64,
61 pub actions_taken: Vec<String>,
62 pub timestamp: chrono::DateTime<chrono::Utc>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub enum BackpressureAlertType {
67 HighLag,
69 HighPendingAcks,
71 ConsumerStalled,
73 BackpressureResolved,
75}
76
77impl LagMonitor {
78 pub fn new(jetstream: jetstream::Context) -> Self {
80 Self {
81 jetstream,
82 backpressure_manager: BackpressureManager::default(),
83 lag_stats: std::sync::Arc::new(RwLock::new(HashMap::new())),
84 alert_sender: None,
85 }
86 }
87
88 pub fn with_backpressure_config(
90 jetstream: jetstream::Context,
91 backpressure_manager: BackpressureManager,
92 ) -> Self {
93 Self {
94 jetstream,
95 backpressure_manager,
96 lag_stats: std::sync::Arc::new(RwLock::new(HashMap::new())),
97 alert_sender: None,
98 }
99 }
100
101 pub fn with_alerts(mut self, alert_sender: mpsc::Sender<BackpressureAlert>) -> Self {
103 self.alert_sender = Some(alert_sender);
104 self
105 }
106
107 pub async fn start_monitoring(&self, check_interval: Duration) -> Result<()> {
109 let mut interval = tokio::time::interval(check_interval);
110
111 info!(
112 "Starting consumer lag monitoring (interval: {:?})",
113 check_interval
114 );
115
116 loop {
117 interval.tick().await;
118
119 if let Err(e) = self.check_all_consumers().await {
120 error!("Failed to check consumer lag: {}", e);
121 }
122 }
123 }
124
125 async fn check_all_consumers(&self) -> Result<()> {
127 let stream_names = self.get_smith_stream_names().await?;
128
129 for stream_name in stream_names {
130 if let Err(e) = self.check_stream_consumers(&stream_name).await {
131 error!(
132 "Failed to check consumers for stream {}: {}",
133 stream_name, e
134 );
135 continue;
136 }
137 }
138
139 Ok(())
140 }
141
142 async fn get_smith_stream_names(&self) -> Result<Vec<String>> {
144 let streams = vec![
145 "SDLC_RAW".to_string(),
146 "ATOMS_VETTED".to_string(),
147 "ATOMS_RESULTS".to_string(),
148 "AUDIT_SECURITY".to_string(),
149 "SDLC_QUARANTINE_BACKPRESSURE".to_string(),
150 ];
151
152 Ok(streams)
153 }
154
155 async fn check_stream_consumers(&self, stream_name: &str) -> Result<()> {
157 let mut stream = self
158 .jetstream
159 .get_stream(stream_name)
160 .await
161 .context(format!("Failed to get stream: {}", stream_name))?;
162
163 let stream_info = stream.info().await.context("Failed to get stream info")?;
164
165 let consumer_names: Vec<String> = stream_info
167 .config
168 .clone()
169 .subjects
170 .iter()
171 .map(|_| format!("{}-consumer", stream_name.to_lowercase()))
172 .collect();
173
174 for consumer_name in consumer_names {
175 if let Ok(consumer) = self
176 .jetstream
177 .get_consumer_from_stream(stream_name, &consumer_name)
178 .await
179 {
180 if let Err(e) = self
181 .check_consumer_lag(stream_name, &consumer_name, consumer)
182 .await
183 {
184 error!("Failed to check lag for consumer {}: {}", consumer_name, e);
185 }
186 }
187 }
188
189 Ok(())
190 }
191
192 async fn check_consumer_lag(
194 &self,
195 stream_name: &str,
196 consumer_name: &str,
197 mut consumer: jetstream::consumer::Consumer<jetstream::consumer::pull::Config>,
198 ) -> Result<()> {
199 let consumer_info = consumer
200 .info()
201 .await
202 .context("Failed to get consumer info")?;
203
204 let lag_stats = self
205 .calculate_lag_stats(stream_name, consumer_name, consumer_info)
206 .await?;
207
208 let should_apply_backpressure = self
210 .backpressure_manager
211 .should_apply_backpressure(lag_stats.message_lag, lag_stats.pending_acks);
212
213 if should_apply_backpressure && !lag_stats.backpressure_active {
215 self.apply_backpressure(&lag_stats).await?;
216 } else if !should_apply_backpressure && lag_stats.backpressure_active {
217 self.remove_backpressure(&lag_stats).await?;
218 }
219
220 {
222 let mut stats_map = self.lag_stats.write().await;
223 stats_map.insert(consumer_name.to_string(), lag_stats);
224 }
225
226 Ok(())
227 }
228
229 async fn calculate_lag_stats(
231 &self,
232 stream_name: &str,
233 consumer_name: &str,
234 consumer_info: &ConsumerInfo,
235 ) -> Result<ConsumerLagStats> {
236 let message_lag = consumer_info.num_pending;
238
239 let pending_acks = consumer_info.num_pending as i64;
241
242 let throughput_mps = self
244 .calculate_throughput(consumer_name)
245 .await
246 .unwrap_or(0.0);
247
248 let max_ack_pending = consumer_info.config.max_ack_pending as f64;
250 let utilization_percent = (pending_acks as f64 / max_ack_pending) * 100.0;
251
252 let backpressure_active = {
254 let stats_map = self.lag_stats.read().await;
255 stats_map
256 .get(consumer_name)
257 .map(|stats| stats.backpressure_active)
258 .unwrap_or(false)
259 };
260
261 Ok(ConsumerLagStats {
262 consumer_name: consumer_name.to_string(),
263 stream_name: stream_name.to_string(),
264 message_lag,
265 pending_acks,
266 throughput_mps,
267 last_updated: Utc::now(),
268 backpressure_active,
269 utilization_percent,
270 })
271 }
272
273 async fn calculate_throughput(&self, consumer_name: &str) -> Option<f64> {
275 let stats_map = self.lag_stats.read().await;
276 if let Some(previous_stats) = stats_map.get(consumer_name) {
277 let time_diff = (Utc::now() - previous_stats.last_updated).num_seconds() as f64;
278 if time_diff > 0.0 {
279 return Some(10.0); }
282 }
283 None
284 }
285
286 async fn apply_backpressure(&self, lag_stats: &ConsumerLagStats) -> Result<()> {
288 let actions = self
289 .backpressure_manager
290 .generate_backpressure_response(lag_stats.message_lag, lag_stats.pending_acks);
291
292 let mut action_descriptions = Vec::new();
293
294 for action in actions {
295 match action {
296 BackpressureAction::RouteToQuarantine => {
297 self.route_to_quarantine(&lag_stats.consumer_name).await?;
299 action_descriptions.push("Routed to quarantine".to_string());
300 }
301 BackpressureAction::ReduceBatchSize(new_size) => {
302 self.reduce_batch_size(&lag_stats.consumer_name, new_size)
304 .await?;
305 action_descriptions.push(format!("Reduced batch size to {}", new_size));
306 }
307 BackpressureAction::ExtendAckWait(duration) => {
308 self.extend_ack_wait(&lag_stats.consumer_name, duration)
310 .await?;
311 action_descriptions.push(format!("Extended ack wait to {:?}", duration));
312 }
313 BackpressureAction::AlertOps(message) => {
314 self.send_ops_alert(&lag_stats.consumer_name, &message)
316 .await?;
317 action_descriptions.push(format!("Ops alert: {}", message));
318 }
319 }
320 }
321
322 if let Some(ref alert_sender) = self.alert_sender {
324 let alert = BackpressureAlert {
325 consumer_name: lag_stats.consumer_name.clone(),
326 stream_name: lag_stats.stream_name.clone(),
327 alert_type: if lag_stats.message_lag > self.backpressure_manager.lag_threshold {
328 BackpressureAlertType::HighLag
329 } else {
330 BackpressureAlertType::HighPendingAcks
331 },
332 message_lag: lag_stats.message_lag,
333 pending_acks: lag_stats.pending_acks,
334 actions_taken: action_descriptions,
335 timestamp: chrono::Utc::now(),
336 };
337
338 if let Err(e) = alert_sender.try_send(alert) {
339 error!("Failed to send backpressure alert: {}", e);
340 }
341 }
342
343 warn!(
344 consumer = lag_stats.consumer_name,
345 stream = lag_stats.stream_name,
346 message_lag = lag_stats.message_lag,
347 pending_acks = lag_stats.pending_acks,
348 "Applied backpressure measures"
349 );
350
351 Ok(())
352 }
353
354 async fn remove_backpressure(&self, lag_stats: &ConsumerLagStats) -> Result<()> {
356 info!(
357 consumer = lag_stats.consumer_name,
358 stream = lag_stats.stream_name,
359 "Removing backpressure - lag resolved"
360 );
361
362 if let Some(ref alert_sender) = self.alert_sender {
364 let alert = BackpressureAlert {
365 consumer_name: lag_stats.consumer_name.clone(),
366 stream_name: lag_stats.stream_name.clone(),
367 alert_type: BackpressureAlertType::BackpressureResolved,
368 message_lag: lag_stats.message_lag,
369 pending_acks: lag_stats.pending_acks,
370 actions_taken: vec!["Backpressure resolved".to_string()],
371 timestamp: chrono::Utc::now(),
372 };
373
374 if let Err(e) = alert_sender.try_send(alert) {
375 error!("Failed to send backpressure resolution alert: {}", e);
376 }
377 }
378
379 Ok(())
380 }
381
382 async fn route_to_quarantine(&self, consumer_name: &str) -> Result<()> {
384 debug!(
386 "Routing messages to quarantine for consumer: {}",
387 consumer_name
388 );
389 Ok(())
390 }
391
392 async fn reduce_batch_size(&self, consumer_name: &str, new_size: usize) -> Result<()> {
394 debug!(
396 "Reducing batch size to {} for consumer: {}",
397 new_size, consumer_name
398 );
399 Ok(())
400 }
401
402 async fn extend_ack_wait(&self, consumer_name: &str, duration: Duration) -> Result<()> {
404 debug!(
406 "Extending ack wait to {:?} for consumer: {}",
407 duration, consumer_name
408 );
409 Ok(())
410 }
411
412 async fn send_ops_alert(&self, consumer_name: &str, message: &str) -> Result<()> {
414 warn!(
415 consumer = consumer_name,
416 alert = message,
417 "Operations alert triggered"
418 );
419 Ok(())
420 }
421
422 pub async fn get_lag_stats(&self) -> HashMap<String, ConsumerLagStats> {
424 self.lag_stats.read().await.clone()
425 }
426
427 pub async fn get_consumer_lag_stats(&self, consumer_name: &str) -> Option<ConsumerLagStats> {
429 self.lag_stats.read().await.get(consumer_name).cloned()
430 }
431
432 pub async fn has_active_backpressure(&self) -> bool {
434 let stats_map = self.lag_stats.read().await;
435 stats_map.values().any(|stats| stats.backpressure_active)
436 }
437
438 pub async fn get_total_lag(&self) -> u64 {
440 let stats_map = self.lag_stats.read().await;
441 stats_map.values().map(|stats| stats.message_lag).sum()
442 }
443}
444
445pub struct BackpressureAlertHandler {
447 alert_receiver: mpsc::Receiver<BackpressureAlert>,
448}
449
450impl BackpressureAlertHandler {
451 pub fn new(alert_receiver: mpsc::Receiver<BackpressureAlert>) -> Self {
452 Self { alert_receiver }
453 }
454
455 pub async fn start_handling(&mut self) {
457 while let Some(alert) = self.alert_receiver.recv().await {
458 self.handle_alert(alert).await;
459 }
460 }
461
462 async fn handle_alert(&self, alert: BackpressureAlert) {
463 match alert.alert_type {
464 BackpressureAlertType::HighLag => {
465 error!(
466 consumer = alert.consumer_name,
467 stream = alert.stream_name,
468 lag = alert.message_lag,
469 "HIGH LAG ALERT: Consumer is {} messages behind",
470 alert.message_lag
471 );
472 }
473 BackpressureAlertType::HighPendingAcks => {
474 error!(
475 consumer = alert.consumer_name,
476 stream = alert.stream_name,
477 pending_acks = alert.pending_acks,
478 "HIGH PENDING ACKS ALERT: {} unacknowledged messages",
479 alert.pending_acks
480 );
481 }
482 BackpressureAlertType::ConsumerStalled => {
483 error!(
484 consumer = alert.consumer_name,
485 stream = alert.stream_name,
486 "CONSUMER STALLED ALERT: Consumer appears to be stuck"
487 );
488 }
489 BackpressureAlertType::BackpressureResolved => {
490 info!(
491 consumer = alert.consumer_name,
492 stream = alert.stream_name,
493 "Backpressure resolved for consumer"
494 );
495 }
496 }
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn test_consumer_lag_stats_creation() {
506 let stats = ConsumerLagStats {
507 consumer_name: "test-consumer".to_string(),
508 stream_name: "TEST_STREAM".to_string(),
509 message_lag: 500,
510 pending_acks: 100,
511 throughput_mps: 50.0,
512 last_updated: Utc::now(),
513 backpressure_active: false,
514 utilization_percent: 75.0,
515 };
516
517 assert_eq!(stats.message_lag, 500);
518 assert_eq!(stats.pending_acks, 100);
519 assert!(!stats.backpressure_active);
520 }
521
522 #[test]
523 fn test_backpressure_alert_creation() {
524 let alert = BackpressureAlert {
525 consumer_name: "test-consumer".to_string(),
526 stream_name: "TEST_STREAM".to_string(),
527 alert_type: BackpressureAlertType::HighLag,
528 message_lag: 2000,
529 pending_acks: 600,
530 actions_taken: vec!["Routed to quarantine".to_string()],
531 timestamp: chrono::Utc::now(),
532 };
533
534 assert_eq!(alert.message_lag, 2000);
535 assert!(matches!(alert.alert_type, BackpressureAlertType::HighLag));
536 assert_eq!(alert.actions_taken.len(), 1);
537 }
538}