Skip to main content

celers_kombu/
middleware_monitoring.rs

1//! Monitoring and operational middleware implementations.
2
3use crate::{BrokerError, MessageMiddleware, Result};
4use async_trait::async_trait;
5use celers_protocol::Message;
6use std::collections::HashMap;
7
8/// Batch acknowledgment hint middleware
9///
10/// Provides hints to consumers about optimal batch sizes for acknowledgments.
11/// Helps optimize network round-trips and improve throughput.
12///
13/// # Examples
14///
15/// ```
16/// use celers_kombu::BatchAckHintMiddleware;
17///
18/// let batch_hint = BatchAckHintMiddleware::new(10);
19/// assert_eq!(batch_hint.batch_size(), 10);
20/// ```
21#[derive(Debug, Clone)]
22pub struct BatchAckHintMiddleware {
23    batch_size: usize,
24    hint_header: String,
25}
26
27impl BatchAckHintMiddleware {
28    /// Create a new batch acknowledgment hint middleware
29    pub fn new(batch_size: usize) -> Self {
30        Self {
31            batch_size: batch_size.max(1),
32            hint_header: "x-batch-ack-hint".to_string(),
33        }
34    }
35
36    /// Set the hint header name
37    pub fn with_hint_header(mut self, header: impl Into<String>) -> Self {
38        self.hint_header = header.into();
39        self
40    }
41
42    /// Get the batch size
43    pub fn batch_size(&self) -> usize {
44        self.batch_size
45    }
46}
47
48impl Default for BatchAckHintMiddleware {
49    fn default() -> Self {
50        Self::new(10)
51    }
52}
53
54#[async_trait]
55impl MessageMiddleware for BatchAckHintMiddleware {
56    async fn before_publish(&self, message: &mut Message) -> Result<()> {
57        // Inject batch acknowledgment hint
58        message
59            .headers
60            .extra
61            .insert(self.hint_header.clone(), serde_json::json!(self.batch_size));
62
63        // Add hint about whether batching is recommended
64        message.headers.extra.insert(
65            "x-batch-ack-recommended".to_string(),
66            serde_json::json!(true),
67        );
68
69        Ok(())
70    }
71
72    async fn after_consume(&self, _message: &mut Message) -> Result<()> {
73        // No action needed on consume
74        Ok(())
75    }
76
77    fn name(&self) -> &str {
78        "batch_ack_hint"
79    }
80}
81
82/// Load shedding middleware for graceful degradation under pressure
83///
84/// Automatically drops low-priority messages when system load exceeds thresholds.
85/// Helps maintain service stability during traffic spikes.
86///
87/// # Examples
88///
89/// ```
90/// use celers_kombu::LoadSheddingMiddleware;
91///
92/// let load_shedder = LoadSheddingMiddleware::new(0.8); // 80% threshold
93/// assert_eq!(load_shedder.threshold(), 0.8);
94/// ```
95#[derive(Debug, Clone)]
96pub struct LoadSheddingMiddleware {
97    load_threshold: f64, // Threshold for load shedding (0.0-1.0)
98    priority_cutoff: u8, // Drop messages below this priority
99    current_load: f64,   // Current system load estimate
100}
101
102impl LoadSheddingMiddleware {
103    /// Create a new load shedding middleware
104    pub fn new(load_threshold: f64) -> Self {
105        Self {
106            load_threshold: load_threshold.clamp(0.0, 1.0),
107            priority_cutoff: 3, // Default: drop priority < 3 (Low and below)
108            current_load: 0.0,
109        }
110    }
111
112    /// Set the priority cutoff
113    pub fn with_priority_cutoff(mut self, cutoff: u8) -> Self {
114        self.priority_cutoff = cutoff.min(10);
115        self
116    }
117
118    /// Update current load estimate
119    pub fn update_load(&mut self, load: f64) {
120        self.current_load = load.clamp(0.0, 1.0);
121    }
122
123    /// Get the load threshold
124    pub fn threshold(&self) -> f64 {
125        self.load_threshold
126    }
127
128    /// Check if message should be dropped
129    fn should_shed(&self, priority: u8) -> bool {
130        self.current_load > self.load_threshold && priority < self.priority_cutoff
131    }
132}
133
134impl Default for LoadSheddingMiddleware {
135    fn default() -> Self {
136        Self::new(0.8)
137    }
138}
139
140#[async_trait]
141impl MessageMiddleware for LoadSheddingMiddleware {
142    async fn before_publish(&self, message: &mut Message) -> Result<()> {
143        let priority = message
144            .headers
145            .extra
146            .get("priority")
147            .and_then(|v| v.as_u64())
148            .map(|v| v as u8)
149            .unwrap_or(5);
150
151        if self.should_shed(priority) {
152            // Inject load shedding marker
153            message
154                .headers
155                .extra
156                .insert("x-load-shed".to_string(), serde_json::json!(true));
157            message.headers.extra.insert(
158                "x-current-load".to_string(),
159                serde_json::json!(self.current_load),
160            );
161
162            return Err(BrokerError::OperationFailed(format!(
163                "Load shedding: current load {:.2} exceeds threshold {:.2}",
164                self.current_load, self.load_threshold
165            )));
166        }
167
168        Ok(())
169    }
170
171    async fn after_consume(&self, _message: &mut Message) -> Result<()> {
172        Ok(())
173    }
174
175    fn name(&self) -> &str {
176        "load_shedding"
177    }
178}
179
180/// Message priority escalation middleware
181///
182/// Automatically escalates message priority based on age and retry count.
183/// Prevents message starvation in priority queues.
184///
185/// # Examples
186///
187/// ```
188/// use celers_kombu::MessagePriorityEscalationMiddleware;
189///
190/// let escalator = MessagePriorityEscalationMiddleware::new(300); // 5 min threshold
191/// assert_eq!(escalator.age_threshold_secs(), 300);
192/// ```
193#[derive(Debug, Clone)]
194pub struct MessagePriorityEscalationMiddleware {
195    age_threshold_secs: u64, // Age threshold for escalation
196    escalation_step: u8,     // Priority increase per threshold
197    max_priority: u8,        // Maximum priority (cap)
198    escalate_on_retry: bool, // Also escalate based on retry count
199}
200
201impl MessagePriorityEscalationMiddleware {
202    /// Create a new priority escalation middleware
203    pub fn new(age_threshold_secs: u64) -> Self {
204        Self {
205            age_threshold_secs,
206            escalation_step: 1,
207            max_priority: 10,
208            escalate_on_retry: true,
209        }
210    }
211
212    /// Set escalation step
213    pub fn with_escalation_step(mut self, step: u8) -> Self {
214        self.escalation_step = step.max(1);
215        self
216    }
217
218    /// Set maximum priority
219    pub fn with_max_priority(mut self, max: u8) -> Self {
220        self.max_priority = max.min(10);
221        self
222    }
223
224    /// Set whether to escalate on retry
225    pub fn with_escalate_on_retry(mut self, enable: bool) -> Self {
226        self.escalate_on_retry = enable;
227        self
228    }
229
230    /// Get age threshold
231    pub fn age_threshold_secs(&self) -> u64 {
232        self.age_threshold_secs
233    }
234
235    /// Calculate escalated priority
236    fn calculate_priority(&self, base_priority: u8, age_secs: u64, retries: u32) -> u8 {
237        let mut priority = base_priority;
238
239        // Age-based escalation
240        if age_secs >= self.age_threshold_secs {
241            let age_multiplier = (age_secs / self.age_threshold_secs) as u8;
242            priority = priority.saturating_add(age_multiplier * self.escalation_step);
243        }
244
245        // Retry-based escalation
246        if self.escalate_on_retry && retries > 0 {
247            let retry_boost = (retries as u8).min(3); // Cap retry boost at 3
248            priority = priority.saturating_add(retry_boost);
249        }
250
251        priority.min(self.max_priority)
252    }
253}
254
255impl Default for MessagePriorityEscalationMiddleware {
256    fn default() -> Self {
257        Self::new(300) // 5 minutes
258    }
259}
260
261#[async_trait]
262impl MessageMiddleware for MessagePriorityEscalationMiddleware {
263    async fn before_publish(&self, message: &mut Message) -> Result<()> {
264        let base_priority = message
265            .headers
266            .extra
267            .get("priority")
268            .and_then(|v| v.as_u64())
269            .map(|v| v as u8)
270            .unwrap_or(5);
271        let age_secs = 0; // Would be calculated from message timestamp in real implementation
272        let retries = message.headers.retries.unwrap_or(0);
273
274        let new_priority = self.calculate_priority(base_priority, age_secs, retries);
275
276        if new_priority != base_priority {
277            message
278                .headers
279                .extra
280                .insert("priority".to_string(), serde_json::json!(new_priority));
281            message
282                .headers
283                .extra
284                .insert("x-priority-escalated".to_string(), serde_json::json!(true));
285            message.headers.extra.insert(
286                "x-original-priority".to_string(),
287                serde_json::json!(base_priority),
288            );
289        }
290
291        Ok(())
292    }
293
294    async fn after_consume(&self, _message: &mut Message) -> Result<()> {
295        Ok(())
296    }
297
298    fn name(&self) -> &str {
299        "priority_escalation"
300    }
301}
302
303/// Observability middleware for structured logging and metrics
304///
305/// Provides structured logging and metrics export for monitoring systems.
306/// Useful for integration with observability platforms.
307///
308/// # Examples
309///
310/// ```
311/// use celers_kombu::ObservabilityMiddleware;
312///
313/// let observability = ObservabilityMiddleware::new("my-service");
314/// assert_eq!(observability.service_name(), "my-service");
315/// ```
316#[derive(Debug, Clone)]
317pub struct ObservabilityMiddleware {
318    service_name: String,
319    enable_metrics: bool,
320    enable_logging: bool,
321    log_level: String,
322}
323
324impl ObservabilityMiddleware {
325    /// Create a new observability middleware
326    pub fn new(service_name: impl Into<String>) -> Self {
327        Self {
328            service_name: service_name.into(),
329            enable_metrics: true,
330            enable_logging: true,
331            log_level: "info".to_string(),
332        }
333    }
334
335    /// Disable metrics collection
336    pub fn without_metrics(mut self) -> Self {
337        self.enable_metrics = false;
338        self
339    }
340
341    /// Disable logging
342    pub fn without_logging(mut self) -> Self {
343        self.enable_logging = false;
344        self
345    }
346
347    /// Set log level
348    pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
349        self.log_level = level.into();
350        self
351    }
352
353    /// Get service name
354    pub fn service_name(&self) -> &str {
355        &self.service_name
356    }
357}
358
359impl Default for ObservabilityMiddleware {
360    fn default() -> Self {
361        Self::new("unknown-service")
362    }
363}
364
365#[async_trait]
366impl MessageMiddleware for ObservabilityMiddleware {
367    async fn before_publish(&self, message: &mut Message) -> Result<()> {
368        if self.enable_metrics {
369            message.headers.extra.insert(
370                "x-observability-enabled".to_string(),
371                serde_json::json!(true),
372            );
373        }
374
375        if self.enable_logging {
376            message
377                .headers
378                .extra
379                .insert("x-log-level".to_string(), serde_json::json!(self.log_level));
380        }
381
382        message.headers.extra.insert(
383            "x-service-name".to_string(),
384            serde_json::json!(self.service_name),
385        );
386
387        Ok(())
388    }
389
390    async fn after_consume(&self, _message: &mut Message) -> Result<()> {
391        // In a real implementation, would emit metrics and logs here
392        Ok(())
393    }
394
395    fn name(&self) -> &str {
396        "observability"
397    }
398}
399
400/// Health check middleware - automatic health status tracking
401///
402/// # Examples
403///
404/// ```
405/// use celers_kombu::HealthCheckMiddleware;
406///
407/// // Basic health check middleware
408/// let health_check = HealthCheckMiddleware::new();
409///
410/// // With custom health check interval
411/// let custom_health = HealthCheckMiddleware::new()
412///     .with_check_interval_secs(30);
413/// ```
414pub struct HealthCheckMiddleware {
415    /// Last health check timestamp (seconds since epoch)
416    last_check: std::sync::Arc<std::sync::Mutex<u64>>,
417    /// Health check interval in seconds
418    check_interval_secs: u64,
419    /// Health status
420    is_healthy: std::sync::Arc<std::sync::Mutex<bool>>,
421}
422
423impl HealthCheckMiddleware {
424    /// Create a new health check middleware
425    pub fn new() -> Self {
426        Self {
427            last_check: std::sync::Arc::new(std::sync::Mutex::new(0)),
428            check_interval_secs: 60, // Default: 1 minute
429            is_healthy: std::sync::Arc::new(std::sync::Mutex::new(true)),
430        }
431    }
432
433    /// Set health check interval
434    pub fn with_check_interval_secs(mut self, interval: u64) -> Self {
435        self.check_interval_secs = interval;
436        self
437    }
438
439    /// Get current health status
440    pub fn is_healthy(&self) -> bool {
441        *self.is_healthy.lock().unwrap()
442    }
443
444    /// Mark as unhealthy
445    pub fn mark_unhealthy(&self) {
446        *self.is_healthy.lock().unwrap() = false;
447    }
448
449    /// Mark as healthy
450    pub fn mark_healthy(&self) {
451        *self.is_healthy.lock().unwrap() = true;
452    }
453
454    fn should_check(&self) -> bool {
455        let now = std::time::SystemTime::now()
456            .duration_since(std::time::UNIX_EPOCH)
457            .unwrap()
458            .as_secs();
459        let last = *self.last_check.lock().unwrap();
460        now - last >= self.check_interval_secs
461    }
462
463    fn update_check_time(&self) {
464        let now = std::time::SystemTime::now()
465            .duration_since(std::time::UNIX_EPOCH)
466            .unwrap()
467            .as_secs();
468        *self.last_check.lock().unwrap() = now;
469    }
470}
471
472impl Default for HealthCheckMiddleware {
473    fn default() -> Self {
474        Self::new()
475    }
476}
477
478#[async_trait]
479impl MessageMiddleware for HealthCheckMiddleware {
480    async fn before_publish(&self, message: &mut Message) -> Result<()> {
481        if self.should_check() {
482            self.update_check_time();
483            // In a real implementation, would perform actual health check
484            // For now, just inject health status into message headers
485        }
486
487        let health_status = if self.is_healthy() {
488            "healthy"
489        } else {
490            "unhealthy"
491        };
492        message.headers.extra.insert(
493            "x-health-status".to_string(),
494            serde_json::json!(health_status),
495        );
496        Ok(())
497    }
498
499    async fn after_consume(&self, _message: &mut Message) -> Result<()> {
500        // Health check on consume could validate broker connectivity
501        Ok(())
502    }
503
504    fn name(&self) -> &str {
505        "health_check"
506    }
507}
508
509/// Message tagging middleware - automatic message tagging and categorization
510///
511/// # Examples
512///
513/// ```
514/// use celers_kombu::MessageTaggingMiddleware;
515/// use std::collections::HashMap;
516///
517/// // Basic tagging with environment
518/// let tagging = MessageTaggingMiddleware::new("production");
519///
520/// // With custom tags
521/// let mut tags = HashMap::new();
522/// tags.insert("region".to_string(), "us-east-1".to_string());
523/// tags.insert("team".to_string(), "platform".to_string());
524/// let custom_tagging = MessageTaggingMiddleware::new("production")
525///     .with_tags(tags);
526/// ```
527pub struct MessageTaggingMiddleware {
528    /// Environment tag (e.g., "production", "staging")
529    environment: String,
530    /// Additional custom tags
531    tags: HashMap<String, String>,
532}
533
534impl MessageTaggingMiddleware {
535    /// Create a new message tagging middleware
536    pub fn new(environment: impl Into<String>) -> Self {
537        Self {
538            environment: environment.into(),
539            tags: HashMap::new(),
540        }
541    }
542
543    /// Add custom tags
544    pub fn with_tags(mut self, tags: HashMap<String, String>) -> Self {
545        self.tags = tags;
546        self
547    }
548
549    /// Add a single tag
550    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
551        self.tags.insert(key.into(), value.into());
552        self
553    }
554}
555
556#[async_trait]
557impl MessageMiddleware for MessageTaggingMiddleware {
558    async fn before_publish(&self, message: &mut Message) -> Result<()> {
559        // Inject environment tag
560        message.headers.extra.insert(
561            "x-environment".to_string(),
562            serde_json::json!(self.environment.clone()),
563        );
564
565        // Inject custom tags
566        for (key, value) in &self.tags {
567            message
568                .headers
569                .extra
570                .insert(format!("x-tag-{}", key), serde_json::json!(value.clone()));
571        }
572
573        // Auto-categorize based on task name
574        let category = if message.task_name().contains("email") {
575            "communication"
576        } else if message.task_name().contains("report") {
577            "analytics"
578        } else if message.task_name().contains("process") {
579            "computation"
580        } else {
581            "general"
582        };
583        message
584            .headers
585            .extra
586            .insert("x-category".to_string(), serde_json::json!(category));
587
588        Ok(())
589    }
590
591    async fn after_consume(&self, _message: &mut Message) -> Result<()> {
592        // Tags are already present after consumption
593        Ok(())
594    }
595
596    fn name(&self) -> &str {
597        "message_tagging"
598    }
599}
600
601/// Cost attribution middleware - track costs per tenant/project
602///
603/// # Examples
604///
605/// ```
606/// use celers_kombu::CostAttributionMiddleware;
607///
608/// // Basic cost attribution
609/// let cost_attr = CostAttributionMiddleware::new(0.001); // $0.001 per message
610///
611/// // With custom cost factors
612/// let advanced_cost = CostAttributionMiddleware::new(0.001)
613///     .with_compute_cost_per_sec(0.0001)   // $0.0001 per second
614///     .with_storage_cost_per_mb(0.00001);  // $0.00001 per MB
615/// ```
616pub struct CostAttributionMiddleware {
617    /// Base cost per message (in dollars)
618    message_cost: f64,
619    /// Compute cost per second (in dollars)
620    compute_cost_per_sec: f64,
621    /// Storage cost per MB (in dollars)
622    storage_cost_per_mb: f64,
623}
624
625impl CostAttributionMiddleware {
626    /// Create a new cost attribution middleware
627    pub fn new(message_cost: f64) -> Self {
628        Self {
629            message_cost,
630            compute_cost_per_sec: 0.0,
631            storage_cost_per_mb: 0.0,
632        }
633    }
634
635    /// Set compute cost per second
636    pub fn with_compute_cost_per_sec(mut self, cost: f64) -> Self {
637        self.compute_cost_per_sec = cost;
638        self
639    }
640
641    /// Set storage cost per MB
642    pub fn with_storage_cost_per_mb(mut self, cost: f64) -> Self {
643        self.storage_cost_per_mb = cost;
644        self
645    }
646
647    fn calculate_cost(&self, message: &Message) -> f64 {
648        let mut cost = self.message_cost;
649
650        // Add storage cost based on message size
651        let size_mb = message.body.len() as f64 / (1024.0 * 1024.0);
652        cost += size_mb * self.storage_cost_per_mb;
653
654        cost
655    }
656}
657
658#[async_trait]
659impl MessageMiddleware for CostAttributionMiddleware {
660    async fn before_publish(&self, message: &mut Message) -> Result<()> {
661        // Calculate and inject cost
662        let cost = self.calculate_cost(message);
663        message.headers.extra.insert(
664            "x-cost-estimate".to_string(),
665            serde_json::json!(format!("{:.6}", cost)),
666        );
667
668        // Extract tenant/project from message headers if available
669        let tenant = message
670            .headers
671            .extra
672            .get("x-tenant")
673            .and_then(|v| v.as_str())
674            .or_else(|| message.headers.extra.get("tenant").and_then(|v| v.as_str()))
675            .unwrap_or("default")
676            .to_string();
677
678        message
679            .headers
680            .extra
681            .insert("x-cost-tenant".to_string(), serde_json::json!(tenant));
682
683        // Inject timestamp for cost tracking
684        let timestamp = std::time::SystemTime::now()
685            .duration_since(std::time::UNIX_EPOCH)
686            .unwrap()
687            .as_secs();
688        message.headers.extra.insert(
689            "x-cost-timestamp".to_string(),
690            serde_json::json!(timestamp.to_string()),
691        );
692
693        Ok(())
694    }
695
696    async fn after_consume(&self, message: &mut Message) -> Result<()> {
697        // Calculate actual compute cost if processing time is available
698        if let Some(cost_timestamp) = message.headers.extra.get("x-cost-timestamp") {
699            if let Some(timestamp_str) = cost_timestamp.as_str() {
700                if let Ok(start_time) = timestamp_str.parse::<u64>() {
701                    let now = std::time::SystemTime::now()
702                        .duration_since(std::time::UNIX_EPOCH)
703                        .unwrap()
704                        .as_secs();
705                    let duration_secs = (now - start_time) as f64;
706                    let compute_cost = duration_secs * self.compute_cost_per_sec;
707
708                    if let Some(base_cost) = message.headers.extra.get("x-cost-estimate") {
709                        if let Some(base_str) = base_cost.as_str() {
710                            if let Ok(base) = base_str.parse::<f64>() {
711                                let total_cost = base + compute_cost;
712                                message.headers.extra.insert(
713                                    "x-cost-actual".to_string(),
714                                    serde_json::json!(format!("{:.6}", total_cost)),
715                                );
716                            }
717                        }
718                    }
719                }
720            }
721        }
722        Ok(())
723    }
724
725    fn name(&self) -> &str {
726        "cost_attribution"
727    }
728}
729
730/// SLA monitoring middleware - track and enforce SLA requirements
731///
732/// # Examples
733///
734/// ```
735/// use celers_kombu::SLAMonitoringMiddleware;
736///
737/// // Basic SLA monitoring with 5-second target
738/// let sla_monitor = SLAMonitoringMiddleware::new(5000);
739///
740/// // With custom percentile and alert threshold
741/// let advanced_sla = SLAMonitoringMiddleware::new(3000)
742///     .with_percentile(99)
743///     .with_alert_threshold(0.95);
744/// ```
745pub struct SLAMonitoringMiddleware {
746    /// Target processing time in milliseconds
747    target_ms: u64,
748    /// Percentile to track (default 95th percentile)
749    percentile: u8,
750    /// Alert threshold (0.0-1.0, default 0.9 = 90% compliance)
751    alert_threshold: f64,
752    /// Processing times buffer
753    processing_times: std::sync::Arc<std::sync::Mutex<Vec<u64>>>,
754}
755
756impl SLAMonitoringMiddleware {
757    /// Create a new SLA monitoring middleware
758    pub fn new(target_ms: u64) -> Self {
759        Self {
760            target_ms,
761            percentile: 95,
762            alert_threshold: 0.9,
763            processing_times: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
764        }
765    }
766
767    /// Set the percentile to track
768    pub fn with_percentile(mut self, percentile: u8) -> Self {
769        self.percentile = percentile.clamp(1, 99);
770        self
771    }
772
773    /// Set the alert threshold
774    pub fn with_alert_threshold(mut self, threshold: f64) -> Self {
775        self.alert_threshold = threshold.clamp(0.0, 1.0);
776        self
777    }
778
779    /// Get current SLA compliance rate
780    pub fn compliance_rate(&self) -> f64 {
781        let times = self.processing_times.lock().unwrap();
782        if times.is_empty() {
783            return 1.0;
784        }
785
786        let within_sla = times.iter().filter(|&&t| t <= self.target_ms).count();
787        within_sla as f64 / times.len() as f64
788    }
789
790    /// Check if alert should be triggered
791    pub fn should_alert(&self) -> bool {
792        self.compliance_rate() < self.alert_threshold
793    }
794}
795
796#[async_trait]
797impl MessageMiddleware for SLAMonitoringMiddleware {
798    async fn before_publish(&self, message: &mut Message) -> Result<()> {
799        // Inject SLA target
800        message.headers.extra.insert(
801            "x-sla-target-ms".to_string(),
802            serde_json::json!(self.target_ms),
803        );
804
805        // Inject start timestamp for SLA tracking
806        let timestamp = std::time::SystemTime::now()
807            .duration_since(std::time::UNIX_EPOCH)
808            .unwrap()
809            .as_millis() as u64;
810        message
811            .headers
812            .extra
813            .insert("x-sla-start-ms".to_string(), serde_json::json!(timestamp));
814
815        Ok(())
816    }
817
818    async fn after_consume(&self, message: &mut Message) -> Result<()> {
819        // Calculate processing time
820        if let Some(start_ms) = message.headers.extra.get("x-sla-start-ms") {
821            if let Some(start_str) = start_ms.as_u64() {
822                let now = std::time::SystemTime::now()
823                    .duration_since(std::time::UNIX_EPOCH)
824                    .unwrap()
825                    .as_millis() as u64;
826                let processing_time = now - start_str;
827
828                // Record processing time
829                self.processing_times.lock().unwrap().push(processing_time);
830
831                // Inject SLA status
832                let within_sla = processing_time <= self.target_ms;
833                message
834                    .headers
835                    .extra
836                    .insert("x-sla-met".to_string(), serde_json::json!(within_sla));
837                message.headers.extra.insert(
838                    "x-sla-processing-ms".to_string(),
839                    serde_json::json!(processing_time),
840                );
841
842                // Check if we should alert
843                if self.should_alert() {
844                    message
845                        .headers
846                        .extra
847                        .insert("x-sla-alert".to_string(), serde_json::json!(true));
848                }
849            }
850        }
851        Ok(())
852    }
853
854    fn name(&self) -> &str {
855        "sla_monitoring"
856    }
857}
858
859/// Message versioning middleware - handle message schema versions
860///
861/// # Examples
862///
863/// ```
864/// use celers_kombu::MessageVersioningMiddleware;
865///
866/// // Basic versioning with current version
867/// let versioning = MessageVersioningMiddleware::new("2.0");
868///
869/// // With backward compatibility
870/// let compat_versioning = MessageVersioningMiddleware::new("3.0")
871///     .with_min_supported_version("2.5")
872///     .with_auto_upgrade(true);
873/// ```
874pub struct MessageVersioningMiddleware {
875    /// Current message version
876    current_version: String,
877    /// Minimum supported version
878    min_supported_version: Option<String>,
879    /// Auto-upgrade messages to current version
880    auto_upgrade: bool,
881}
882
883impl MessageVersioningMiddleware {
884    /// Create a new message versioning middleware
885    pub fn new(current_version: impl Into<String>) -> Self {
886        Self {
887            current_version: current_version.into(),
888            min_supported_version: None,
889            auto_upgrade: false,
890        }
891    }
892
893    /// Set minimum supported version
894    pub fn with_min_supported_version(mut self, version: impl Into<String>) -> Self {
895        self.min_supported_version = Some(version.into());
896        self
897    }
898
899    /// Enable auto-upgrade of messages
900    pub fn with_auto_upgrade(mut self, enabled: bool) -> Self {
901        self.auto_upgrade = enabled;
902        self
903    }
904
905    fn is_version_supported(&self, version: &str) -> bool {
906        if let Some(ref min_version) = self.min_supported_version {
907            // Simple string comparison (in production, use semantic versioning)
908            version >= min_version.as_str()
909        } else {
910            true
911        }
912    }
913}
914
915#[async_trait]
916impl MessageMiddleware for MessageVersioningMiddleware {
917    async fn before_publish(&self, message: &mut Message) -> Result<()> {
918        // Inject current version
919        message.headers.extra.insert(
920            "x-message-version".to_string(),
921            serde_json::json!(self.current_version.clone()),
922        );
923
924        // Inject schema version metadata
925        message.headers.extra.insert(
926            "x-schema-version".to_string(),
927            serde_json::json!(self.current_version.clone()),
928        );
929
930        Ok(())
931    }
932
933    async fn after_consume(&self, message: &mut Message) -> Result<()> {
934        // Check message version
935        if let Some(msg_version) = message.headers.extra.get("x-message-version") {
936            if let Some(version_str) = msg_version.as_str() {
937                // Check if version is supported
938                if !self.is_version_supported(version_str) {
939                    return Err(BrokerError::Configuration(format!(
940                        "Unsupported message version: {}. Minimum supported: {:?}",
941                        version_str, self.min_supported_version
942                    )));
943                }
944
945                // Auto-upgrade if needed
946                if self.auto_upgrade && version_str != self.current_version {
947                    message.headers.extra.insert(
948                        "x-upgraded-from".to_string(),
949                        serde_json::json!(version_str),
950                    );
951                    message.headers.extra.insert(
952                        "x-message-version".to_string(),
953                        serde_json::json!(self.current_version.clone()),
954                    );
955                }
956            }
957        } else {
958            // No version specified, treat as legacy
959            message
960                .headers
961                .extra
962                .insert("x-message-version".to_string(), serde_json::json!("legacy"));
963            if self.auto_upgrade {
964                message
965                    .headers
966                    .extra
967                    .insert("x-upgraded-from".to_string(), serde_json::json!("legacy"));
968                message.headers.extra.insert(
969                    "x-message-version".to_string(),
970                    serde_json::json!(self.current_version.clone()),
971                );
972            }
973        }
974        Ok(())
975    }
976
977    fn name(&self) -> &str {
978        "message_versioning"
979    }
980}
981
982/// Type alias for resource usage tracking: (message_count, byte_count, last_reset_timestamp)
983type ResourceUsageMap = std::sync::Arc<std::sync::Mutex<HashMap<String, (usize, usize, u64)>>>;
984
985/// Resource quota middleware - enforce per-consumer resource limits
986///
987/// # Examples
988///
989/// ```
990/// use celers_kombu::ResourceQuotaMiddleware;
991///
992/// // Basic quota (100 messages per consumer)
993/// let quota = ResourceQuotaMiddleware::new(100);
994///
995/// // With custom limits
996/// let advanced_quota = ResourceQuotaMiddleware::new(1000)
997///     .with_max_size_bytes(10_000_000)  // 10MB per consumer
998///     .with_time_window_secs(60);        // Reset every 60 seconds
999/// ```
1000pub struct ResourceQuotaMiddleware {
1001    /// Maximum messages per consumer
1002    max_messages: usize,
1003    /// Maximum bytes per consumer
1004    max_size_bytes: usize,
1005    /// Time window in seconds for quota reset
1006    time_window_secs: u64,
1007    /// Usage tracking per consumer
1008    usage: ResourceUsageMap,
1009}
1010
1011impl ResourceQuotaMiddleware {
1012    /// Create a new resource quota middleware
1013    pub fn new(max_messages: usize) -> Self {
1014        Self {
1015            max_messages,
1016            max_size_bytes: usize::MAX,
1017            time_window_secs: 3600, // Default 1 hour
1018            usage: std::sync::Arc::new(std::sync::Mutex::new(HashMap::new())),
1019        }
1020    }
1021
1022    /// Set maximum bytes per consumer
1023    pub fn with_max_size_bytes(mut self, max_bytes: usize) -> Self {
1024        self.max_size_bytes = max_bytes;
1025        self
1026    }
1027
1028    /// Set time window for quota reset
1029    pub fn with_time_window_secs(mut self, seconds: u64) -> Self {
1030        self.time_window_secs = seconds;
1031        self
1032    }
1033
1034    /// Get current usage for a consumer
1035    pub fn get_usage(&self, consumer_id: &str) -> (usize, usize) {
1036        let usage = self.usage.lock().unwrap();
1037        usage
1038            .get(consumer_id)
1039            .map(|(msgs, bytes, _)| (*msgs, *bytes))
1040            .unwrap_or((0, 0))
1041    }
1042
1043    /// Reset quota for a consumer
1044    pub fn reset_quota(&self, consumer_id: &str) {
1045        let mut usage = self.usage.lock().unwrap();
1046        usage.remove(consumer_id);
1047    }
1048
1049    fn check_and_update_quota(&self, consumer_id: &str, message_size: usize) -> Result<()> {
1050        let mut usage = self.usage.lock().unwrap();
1051        let now = std::time::SystemTime::now()
1052            .duration_since(std::time::UNIX_EPOCH)
1053            .unwrap()
1054            .as_secs();
1055
1056        let (msg_count, byte_count, last_reset) =
1057            usage.entry(consumer_id.to_string()).or_insert((0, 0, now));
1058
1059        // Reset if time window elapsed
1060        if now - *last_reset >= self.time_window_secs {
1061            *msg_count = 0;
1062            *byte_count = 0;
1063            *last_reset = now;
1064        }
1065
1066        // Check quota
1067        if *msg_count >= self.max_messages {
1068            return Err(BrokerError::Configuration(format!(
1069                "Message quota exceeded for consumer {}: {}/{}",
1070                consumer_id, msg_count, self.max_messages
1071            )));
1072        }
1073
1074        if *byte_count + message_size > self.max_size_bytes {
1075            return Err(BrokerError::Configuration(format!(
1076                "Size quota exceeded for consumer {}: {}/{}",
1077                consumer_id, byte_count, self.max_size_bytes
1078            )));
1079        }
1080
1081        // Update usage
1082        *msg_count += 1;
1083        *byte_count += message_size;
1084
1085        Ok(())
1086    }
1087}
1088
1089#[async_trait]
1090impl MessageMiddleware for ResourceQuotaMiddleware {
1091    async fn before_publish(&self, message: &mut Message) -> Result<()> {
1092        // Inject quota limits into message headers for transparency
1093        message.headers.extra.insert(
1094            "x-quota-max-messages".to_string(),
1095            serde_json::json!(self.max_messages),
1096        );
1097        if self.max_size_bytes != usize::MAX {
1098            message.headers.extra.insert(
1099                "x-quota-max-bytes".to_string(),
1100                serde_json::json!(self.max_size_bytes),
1101            );
1102        }
1103        Ok(())
1104    }
1105
1106    async fn after_consume(&self, message: &mut Message) -> Result<()> {
1107        // Extract consumer ID from message headers
1108        let consumer_id = message
1109            .headers
1110            .extra
1111            .get("x-consumer-id")
1112            .and_then(|v| v.as_str())
1113            .unwrap_or("default")
1114            .to_string();
1115
1116        // Check and update quota
1117        let message_size = message.body.len();
1118        self.check_and_update_quota(&consumer_id, message_size)?;
1119
1120        // Inject current usage into message
1121        let (msg_count, byte_count) = self.get_usage(&consumer_id);
1122        message.headers.extra.insert(
1123            "x-quota-used-messages".to_string(),
1124            serde_json::json!(msg_count),
1125        );
1126        message.headers.extra.insert(
1127            "x-quota-used-bytes".to_string(),
1128            serde_json::json!(byte_count),
1129        );
1130
1131        Ok(())
1132    }
1133
1134    fn name(&self) -> &str {
1135        "resource_quota"
1136    }
1137}