1use crate::{BrokerError, MessageMiddleware, Result};
4use async_trait::async_trait;
5use celers_protocol::Message;
6use std::collections::HashMap;
7
8#[derive(Debug, Clone)]
22pub struct BatchAckHintMiddleware {
23 batch_size: usize,
24 hint_header: String,
25}
26
27impl BatchAckHintMiddleware {
28 pub fn new(batch_size: usize) -> Self {
30 Self {
31 batch_size: batch_size.max(1),
32 hint_header: "x-batch-ack-hint".to_string(),
33 }
34 }
35
36 pub fn with_hint_header(mut self, header: impl Into<String>) -> Self {
38 self.hint_header = header.into();
39 self
40 }
41
42 pub fn batch_size(&self) -> usize {
44 self.batch_size
45 }
46}
47
48impl Default for BatchAckHintMiddleware {
49 fn default() -> Self {
50 Self::new(10)
51 }
52}
53
54#[async_trait]
55impl MessageMiddleware for BatchAckHintMiddleware {
56 async fn before_publish(&self, message: &mut Message) -> Result<()> {
57 message
59 .headers
60 .extra
61 .insert(self.hint_header.clone(), serde_json::json!(self.batch_size));
62
63 message.headers.extra.insert(
65 "x-batch-ack-recommended".to_string(),
66 serde_json::json!(true),
67 );
68
69 Ok(())
70 }
71
72 async fn after_consume(&self, _message: &mut Message) -> Result<()> {
73 Ok(())
75 }
76
77 fn name(&self) -> &str {
78 "batch_ack_hint"
79 }
80}
81
82#[derive(Debug, Clone)]
96pub struct LoadSheddingMiddleware {
97 load_threshold: f64, priority_cutoff: u8, current_load: f64, }
101
102impl LoadSheddingMiddleware {
103 pub fn new(load_threshold: f64) -> Self {
105 Self {
106 load_threshold: load_threshold.clamp(0.0, 1.0),
107 priority_cutoff: 3, current_load: 0.0,
109 }
110 }
111
112 pub fn with_priority_cutoff(mut self, cutoff: u8) -> Self {
114 self.priority_cutoff = cutoff.min(10);
115 self
116 }
117
118 pub fn update_load(&mut self, load: f64) {
120 self.current_load = load.clamp(0.0, 1.0);
121 }
122
123 pub fn threshold(&self) -> f64 {
125 self.load_threshold
126 }
127
128 fn should_shed(&self, priority: u8) -> bool {
130 self.current_load > self.load_threshold && priority < self.priority_cutoff
131 }
132}
133
134impl Default for LoadSheddingMiddleware {
135 fn default() -> Self {
136 Self::new(0.8)
137 }
138}
139
140#[async_trait]
141impl MessageMiddleware for LoadSheddingMiddleware {
142 async fn before_publish(&self, message: &mut Message) -> Result<()> {
143 let priority = message
144 .headers
145 .extra
146 .get("priority")
147 .and_then(|v| v.as_u64())
148 .map(|v| v as u8)
149 .unwrap_or(5);
150
151 if self.should_shed(priority) {
152 message
154 .headers
155 .extra
156 .insert("x-load-shed".to_string(), serde_json::json!(true));
157 message.headers.extra.insert(
158 "x-current-load".to_string(),
159 serde_json::json!(self.current_load),
160 );
161
162 return Err(BrokerError::OperationFailed(format!(
163 "Load shedding: current load {:.2} exceeds threshold {:.2}",
164 self.current_load, self.load_threshold
165 )));
166 }
167
168 Ok(())
169 }
170
171 async fn after_consume(&self, _message: &mut Message) -> Result<()> {
172 Ok(())
173 }
174
175 fn name(&self) -> &str {
176 "load_shedding"
177 }
178}
179
180#[derive(Debug, Clone)]
194pub struct MessagePriorityEscalationMiddleware {
195 age_threshold_secs: u64, escalation_step: u8, max_priority: u8, escalate_on_retry: bool, }
200
201impl MessagePriorityEscalationMiddleware {
202 pub fn new(age_threshold_secs: u64) -> Self {
204 Self {
205 age_threshold_secs,
206 escalation_step: 1,
207 max_priority: 10,
208 escalate_on_retry: true,
209 }
210 }
211
212 pub fn with_escalation_step(mut self, step: u8) -> Self {
214 self.escalation_step = step.max(1);
215 self
216 }
217
218 pub fn with_max_priority(mut self, max: u8) -> Self {
220 self.max_priority = max.min(10);
221 self
222 }
223
224 pub fn with_escalate_on_retry(mut self, enable: bool) -> Self {
226 self.escalate_on_retry = enable;
227 self
228 }
229
230 pub fn age_threshold_secs(&self) -> u64 {
232 self.age_threshold_secs
233 }
234
235 fn calculate_priority(&self, base_priority: u8, age_secs: u64, retries: u32) -> u8 {
237 let mut priority = base_priority;
238
239 if age_secs >= self.age_threshold_secs {
241 let age_multiplier = (age_secs / self.age_threshold_secs) as u8;
242 priority = priority.saturating_add(age_multiplier * self.escalation_step);
243 }
244
245 if self.escalate_on_retry && retries > 0 {
247 let retry_boost = (retries as u8).min(3); priority = priority.saturating_add(retry_boost);
249 }
250
251 priority.min(self.max_priority)
252 }
253}
254
255impl Default for MessagePriorityEscalationMiddleware {
256 fn default() -> Self {
257 Self::new(300) }
259}
260
261#[async_trait]
262impl MessageMiddleware for MessagePriorityEscalationMiddleware {
263 async fn before_publish(&self, message: &mut Message) -> Result<()> {
264 let base_priority = message
265 .headers
266 .extra
267 .get("priority")
268 .and_then(|v| v.as_u64())
269 .map(|v| v as u8)
270 .unwrap_or(5);
271 let age_secs = 0; let retries = message.headers.retries.unwrap_or(0);
273
274 let new_priority = self.calculate_priority(base_priority, age_secs, retries);
275
276 if new_priority != base_priority {
277 message
278 .headers
279 .extra
280 .insert("priority".to_string(), serde_json::json!(new_priority));
281 message
282 .headers
283 .extra
284 .insert("x-priority-escalated".to_string(), serde_json::json!(true));
285 message.headers.extra.insert(
286 "x-original-priority".to_string(),
287 serde_json::json!(base_priority),
288 );
289 }
290
291 Ok(())
292 }
293
294 async fn after_consume(&self, _message: &mut Message) -> Result<()> {
295 Ok(())
296 }
297
298 fn name(&self) -> &str {
299 "priority_escalation"
300 }
301}
302
303#[derive(Debug, Clone)]
317pub struct ObservabilityMiddleware {
318 service_name: String,
319 enable_metrics: bool,
320 enable_logging: bool,
321 log_level: String,
322}
323
324impl ObservabilityMiddleware {
325 pub fn new(service_name: impl Into<String>) -> Self {
327 Self {
328 service_name: service_name.into(),
329 enable_metrics: true,
330 enable_logging: true,
331 log_level: "info".to_string(),
332 }
333 }
334
335 pub fn without_metrics(mut self) -> Self {
337 self.enable_metrics = false;
338 self
339 }
340
341 pub fn without_logging(mut self) -> Self {
343 self.enable_logging = false;
344 self
345 }
346
347 pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
349 self.log_level = level.into();
350 self
351 }
352
353 pub fn service_name(&self) -> &str {
355 &self.service_name
356 }
357}
358
359impl Default for ObservabilityMiddleware {
360 fn default() -> Self {
361 Self::new("unknown-service")
362 }
363}
364
365#[async_trait]
366impl MessageMiddleware for ObservabilityMiddleware {
367 async fn before_publish(&self, message: &mut Message) -> Result<()> {
368 if self.enable_metrics {
369 message.headers.extra.insert(
370 "x-observability-enabled".to_string(),
371 serde_json::json!(true),
372 );
373 }
374
375 if self.enable_logging {
376 message
377 .headers
378 .extra
379 .insert("x-log-level".to_string(), serde_json::json!(self.log_level));
380 }
381
382 message.headers.extra.insert(
383 "x-service-name".to_string(),
384 serde_json::json!(self.service_name),
385 );
386
387 Ok(())
388 }
389
390 async fn after_consume(&self, _message: &mut Message) -> Result<()> {
391 Ok(())
393 }
394
395 fn name(&self) -> &str {
396 "observability"
397 }
398}
399
400pub struct HealthCheckMiddleware {
415 last_check: std::sync::Arc<std::sync::Mutex<u64>>,
417 check_interval_secs: u64,
419 is_healthy: std::sync::Arc<std::sync::Mutex<bool>>,
421}
422
423impl HealthCheckMiddleware {
424 pub fn new() -> Self {
426 Self {
427 last_check: std::sync::Arc::new(std::sync::Mutex::new(0)),
428 check_interval_secs: 60, is_healthy: std::sync::Arc::new(std::sync::Mutex::new(true)),
430 }
431 }
432
433 pub fn with_check_interval_secs(mut self, interval: u64) -> Self {
435 self.check_interval_secs = interval;
436 self
437 }
438
439 pub fn is_healthy(&self) -> bool {
441 *self.is_healthy.lock().unwrap()
442 }
443
444 pub fn mark_unhealthy(&self) {
446 *self.is_healthy.lock().unwrap() = false;
447 }
448
449 pub fn mark_healthy(&self) {
451 *self.is_healthy.lock().unwrap() = true;
452 }
453
454 fn should_check(&self) -> bool {
455 let now = std::time::SystemTime::now()
456 .duration_since(std::time::UNIX_EPOCH)
457 .unwrap()
458 .as_secs();
459 let last = *self.last_check.lock().unwrap();
460 now - last >= self.check_interval_secs
461 }
462
463 fn update_check_time(&self) {
464 let now = std::time::SystemTime::now()
465 .duration_since(std::time::UNIX_EPOCH)
466 .unwrap()
467 .as_secs();
468 *self.last_check.lock().unwrap() = now;
469 }
470}
471
472impl Default for HealthCheckMiddleware {
473 fn default() -> Self {
474 Self::new()
475 }
476}
477
478#[async_trait]
479impl MessageMiddleware for HealthCheckMiddleware {
480 async fn before_publish(&self, message: &mut Message) -> Result<()> {
481 if self.should_check() {
482 self.update_check_time();
483 }
486
487 let health_status = if self.is_healthy() {
488 "healthy"
489 } else {
490 "unhealthy"
491 };
492 message.headers.extra.insert(
493 "x-health-status".to_string(),
494 serde_json::json!(health_status),
495 );
496 Ok(())
497 }
498
499 async fn after_consume(&self, _message: &mut Message) -> Result<()> {
500 Ok(())
502 }
503
504 fn name(&self) -> &str {
505 "health_check"
506 }
507}
508
509pub struct MessageTaggingMiddleware {
528 environment: String,
530 tags: HashMap<String, String>,
532}
533
534impl MessageTaggingMiddleware {
535 pub fn new(environment: impl Into<String>) -> Self {
537 Self {
538 environment: environment.into(),
539 tags: HashMap::new(),
540 }
541 }
542
543 pub fn with_tags(mut self, tags: HashMap<String, String>) -> Self {
545 self.tags = tags;
546 self
547 }
548
549 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
551 self.tags.insert(key.into(), value.into());
552 self
553 }
554}
555
556#[async_trait]
557impl MessageMiddleware for MessageTaggingMiddleware {
558 async fn before_publish(&self, message: &mut Message) -> Result<()> {
559 message.headers.extra.insert(
561 "x-environment".to_string(),
562 serde_json::json!(self.environment.clone()),
563 );
564
565 for (key, value) in &self.tags {
567 message
568 .headers
569 .extra
570 .insert(format!("x-tag-{}", key), serde_json::json!(value.clone()));
571 }
572
573 let category = if message.task_name().contains("email") {
575 "communication"
576 } else if message.task_name().contains("report") {
577 "analytics"
578 } else if message.task_name().contains("process") {
579 "computation"
580 } else {
581 "general"
582 };
583 message
584 .headers
585 .extra
586 .insert("x-category".to_string(), serde_json::json!(category));
587
588 Ok(())
589 }
590
591 async fn after_consume(&self, _message: &mut Message) -> Result<()> {
592 Ok(())
594 }
595
596 fn name(&self) -> &str {
597 "message_tagging"
598 }
599}
600
601pub struct CostAttributionMiddleware {
617 message_cost: f64,
619 compute_cost_per_sec: f64,
621 storage_cost_per_mb: f64,
623}
624
625impl CostAttributionMiddleware {
626 pub fn new(message_cost: f64) -> Self {
628 Self {
629 message_cost,
630 compute_cost_per_sec: 0.0,
631 storage_cost_per_mb: 0.0,
632 }
633 }
634
635 pub fn with_compute_cost_per_sec(mut self, cost: f64) -> Self {
637 self.compute_cost_per_sec = cost;
638 self
639 }
640
641 pub fn with_storage_cost_per_mb(mut self, cost: f64) -> Self {
643 self.storage_cost_per_mb = cost;
644 self
645 }
646
647 fn calculate_cost(&self, message: &Message) -> f64 {
648 let mut cost = self.message_cost;
649
650 let size_mb = message.body.len() as f64 / (1024.0 * 1024.0);
652 cost += size_mb * self.storage_cost_per_mb;
653
654 cost
655 }
656}
657
658#[async_trait]
659impl MessageMiddleware for CostAttributionMiddleware {
660 async fn before_publish(&self, message: &mut Message) -> Result<()> {
661 let cost = self.calculate_cost(message);
663 message.headers.extra.insert(
664 "x-cost-estimate".to_string(),
665 serde_json::json!(format!("{:.6}", cost)),
666 );
667
668 let tenant = message
670 .headers
671 .extra
672 .get("x-tenant")
673 .and_then(|v| v.as_str())
674 .or_else(|| message.headers.extra.get("tenant").and_then(|v| v.as_str()))
675 .unwrap_or("default")
676 .to_string();
677
678 message
679 .headers
680 .extra
681 .insert("x-cost-tenant".to_string(), serde_json::json!(tenant));
682
683 let timestamp = std::time::SystemTime::now()
685 .duration_since(std::time::UNIX_EPOCH)
686 .unwrap()
687 .as_secs();
688 message.headers.extra.insert(
689 "x-cost-timestamp".to_string(),
690 serde_json::json!(timestamp.to_string()),
691 );
692
693 Ok(())
694 }
695
696 async fn after_consume(&self, message: &mut Message) -> Result<()> {
697 if let Some(cost_timestamp) = message.headers.extra.get("x-cost-timestamp") {
699 if let Some(timestamp_str) = cost_timestamp.as_str() {
700 if let Ok(start_time) = timestamp_str.parse::<u64>() {
701 let now = std::time::SystemTime::now()
702 .duration_since(std::time::UNIX_EPOCH)
703 .unwrap()
704 .as_secs();
705 let duration_secs = (now - start_time) as f64;
706 let compute_cost = duration_secs * self.compute_cost_per_sec;
707
708 if let Some(base_cost) = message.headers.extra.get("x-cost-estimate") {
709 if let Some(base_str) = base_cost.as_str() {
710 if let Ok(base) = base_str.parse::<f64>() {
711 let total_cost = base + compute_cost;
712 message.headers.extra.insert(
713 "x-cost-actual".to_string(),
714 serde_json::json!(format!("{:.6}", total_cost)),
715 );
716 }
717 }
718 }
719 }
720 }
721 }
722 Ok(())
723 }
724
725 fn name(&self) -> &str {
726 "cost_attribution"
727 }
728}
729
730pub struct SLAMonitoringMiddleware {
746 target_ms: u64,
748 percentile: u8,
750 alert_threshold: f64,
752 processing_times: std::sync::Arc<std::sync::Mutex<Vec<u64>>>,
754}
755
756impl SLAMonitoringMiddleware {
757 pub fn new(target_ms: u64) -> Self {
759 Self {
760 target_ms,
761 percentile: 95,
762 alert_threshold: 0.9,
763 processing_times: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
764 }
765 }
766
767 pub fn with_percentile(mut self, percentile: u8) -> Self {
769 self.percentile = percentile.clamp(1, 99);
770 self
771 }
772
773 pub fn with_alert_threshold(mut self, threshold: f64) -> Self {
775 self.alert_threshold = threshold.clamp(0.0, 1.0);
776 self
777 }
778
779 pub fn compliance_rate(&self) -> f64 {
781 let times = self.processing_times.lock().unwrap();
782 if times.is_empty() {
783 return 1.0;
784 }
785
786 let within_sla = times.iter().filter(|&&t| t <= self.target_ms).count();
787 within_sla as f64 / times.len() as f64
788 }
789
790 pub fn should_alert(&self) -> bool {
792 self.compliance_rate() < self.alert_threshold
793 }
794}
795
796#[async_trait]
797impl MessageMiddleware for SLAMonitoringMiddleware {
798 async fn before_publish(&self, message: &mut Message) -> Result<()> {
799 message.headers.extra.insert(
801 "x-sla-target-ms".to_string(),
802 serde_json::json!(self.target_ms),
803 );
804
805 let timestamp = std::time::SystemTime::now()
807 .duration_since(std::time::UNIX_EPOCH)
808 .unwrap()
809 .as_millis() as u64;
810 message
811 .headers
812 .extra
813 .insert("x-sla-start-ms".to_string(), serde_json::json!(timestamp));
814
815 Ok(())
816 }
817
818 async fn after_consume(&self, message: &mut Message) -> Result<()> {
819 if let Some(start_ms) = message.headers.extra.get("x-sla-start-ms") {
821 if let Some(start_str) = start_ms.as_u64() {
822 let now = std::time::SystemTime::now()
823 .duration_since(std::time::UNIX_EPOCH)
824 .unwrap()
825 .as_millis() as u64;
826 let processing_time = now - start_str;
827
828 self.processing_times.lock().unwrap().push(processing_time);
830
831 let within_sla = processing_time <= self.target_ms;
833 message
834 .headers
835 .extra
836 .insert("x-sla-met".to_string(), serde_json::json!(within_sla));
837 message.headers.extra.insert(
838 "x-sla-processing-ms".to_string(),
839 serde_json::json!(processing_time),
840 );
841
842 if self.should_alert() {
844 message
845 .headers
846 .extra
847 .insert("x-sla-alert".to_string(), serde_json::json!(true));
848 }
849 }
850 }
851 Ok(())
852 }
853
854 fn name(&self) -> &str {
855 "sla_monitoring"
856 }
857}
858
859pub struct MessageVersioningMiddleware {
875 current_version: String,
877 min_supported_version: Option<String>,
879 auto_upgrade: bool,
881}
882
883impl MessageVersioningMiddleware {
884 pub fn new(current_version: impl Into<String>) -> Self {
886 Self {
887 current_version: current_version.into(),
888 min_supported_version: None,
889 auto_upgrade: false,
890 }
891 }
892
893 pub fn with_min_supported_version(mut self, version: impl Into<String>) -> Self {
895 self.min_supported_version = Some(version.into());
896 self
897 }
898
899 pub fn with_auto_upgrade(mut self, enabled: bool) -> Self {
901 self.auto_upgrade = enabled;
902 self
903 }
904
905 fn is_version_supported(&self, version: &str) -> bool {
906 if let Some(ref min_version) = self.min_supported_version {
907 version >= min_version.as_str()
909 } else {
910 true
911 }
912 }
913}
914
915#[async_trait]
916impl MessageMiddleware for MessageVersioningMiddleware {
917 async fn before_publish(&self, message: &mut Message) -> Result<()> {
918 message.headers.extra.insert(
920 "x-message-version".to_string(),
921 serde_json::json!(self.current_version.clone()),
922 );
923
924 message.headers.extra.insert(
926 "x-schema-version".to_string(),
927 serde_json::json!(self.current_version.clone()),
928 );
929
930 Ok(())
931 }
932
933 async fn after_consume(&self, message: &mut Message) -> Result<()> {
934 if let Some(msg_version) = message.headers.extra.get("x-message-version") {
936 if let Some(version_str) = msg_version.as_str() {
937 if !self.is_version_supported(version_str) {
939 return Err(BrokerError::Configuration(format!(
940 "Unsupported message version: {}. Minimum supported: {:?}",
941 version_str, self.min_supported_version
942 )));
943 }
944
945 if self.auto_upgrade && version_str != self.current_version {
947 message.headers.extra.insert(
948 "x-upgraded-from".to_string(),
949 serde_json::json!(version_str),
950 );
951 message.headers.extra.insert(
952 "x-message-version".to_string(),
953 serde_json::json!(self.current_version.clone()),
954 );
955 }
956 }
957 } else {
958 message
960 .headers
961 .extra
962 .insert("x-message-version".to_string(), serde_json::json!("legacy"));
963 if self.auto_upgrade {
964 message
965 .headers
966 .extra
967 .insert("x-upgraded-from".to_string(), serde_json::json!("legacy"));
968 message.headers.extra.insert(
969 "x-message-version".to_string(),
970 serde_json::json!(self.current_version.clone()),
971 );
972 }
973 }
974 Ok(())
975 }
976
977 fn name(&self) -> &str {
978 "message_versioning"
979 }
980}
981
982type ResourceUsageMap = std::sync::Arc<std::sync::Mutex<HashMap<String, (usize, usize, u64)>>>;
984
985pub struct ResourceQuotaMiddleware {
1001 max_messages: usize,
1003 max_size_bytes: usize,
1005 time_window_secs: u64,
1007 usage: ResourceUsageMap,
1009}
1010
1011impl ResourceQuotaMiddleware {
1012 pub fn new(max_messages: usize) -> Self {
1014 Self {
1015 max_messages,
1016 max_size_bytes: usize::MAX,
1017 time_window_secs: 3600, usage: std::sync::Arc::new(std::sync::Mutex::new(HashMap::new())),
1019 }
1020 }
1021
1022 pub fn with_max_size_bytes(mut self, max_bytes: usize) -> Self {
1024 self.max_size_bytes = max_bytes;
1025 self
1026 }
1027
1028 pub fn with_time_window_secs(mut self, seconds: u64) -> Self {
1030 self.time_window_secs = seconds;
1031 self
1032 }
1033
1034 pub fn get_usage(&self, consumer_id: &str) -> (usize, usize) {
1036 let usage = self.usage.lock().unwrap();
1037 usage
1038 .get(consumer_id)
1039 .map(|(msgs, bytes, _)| (*msgs, *bytes))
1040 .unwrap_or((0, 0))
1041 }
1042
1043 pub fn reset_quota(&self, consumer_id: &str) {
1045 let mut usage = self.usage.lock().unwrap();
1046 usage.remove(consumer_id);
1047 }
1048
1049 fn check_and_update_quota(&self, consumer_id: &str, message_size: usize) -> Result<()> {
1050 let mut usage = self.usage.lock().unwrap();
1051 let now = std::time::SystemTime::now()
1052 .duration_since(std::time::UNIX_EPOCH)
1053 .unwrap()
1054 .as_secs();
1055
1056 let (msg_count, byte_count, last_reset) =
1057 usage.entry(consumer_id.to_string()).or_insert((0, 0, now));
1058
1059 if now - *last_reset >= self.time_window_secs {
1061 *msg_count = 0;
1062 *byte_count = 0;
1063 *last_reset = now;
1064 }
1065
1066 if *msg_count >= self.max_messages {
1068 return Err(BrokerError::Configuration(format!(
1069 "Message quota exceeded for consumer {}: {}/{}",
1070 consumer_id, msg_count, self.max_messages
1071 )));
1072 }
1073
1074 if *byte_count + message_size > self.max_size_bytes {
1075 return Err(BrokerError::Configuration(format!(
1076 "Size quota exceeded for consumer {}: {}/{}",
1077 consumer_id, byte_count, self.max_size_bytes
1078 )));
1079 }
1080
1081 *msg_count += 1;
1083 *byte_count += message_size;
1084
1085 Ok(())
1086 }
1087}
1088
1089#[async_trait]
1090impl MessageMiddleware for ResourceQuotaMiddleware {
1091 async fn before_publish(&self, message: &mut Message) -> Result<()> {
1092 message.headers.extra.insert(
1094 "x-quota-max-messages".to_string(),
1095 serde_json::json!(self.max_messages),
1096 );
1097 if self.max_size_bytes != usize::MAX {
1098 message.headers.extra.insert(
1099 "x-quota-max-bytes".to_string(),
1100 serde_json::json!(self.max_size_bytes),
1101 );
1102 }
1103 Ok(())
1104 }
1105
1106 async fn after_consume(&self, message: &mut Message) -> Result<()> {
1107 let consumer_id = message
1109 .headers
1110 .extra
1111 .get("x-consumer-id")
1112 .and_then(|v| v.as_str())
1113 .unwrap_or("default")
1114 .to_string();
1115
1116 let message_size = message.body.len();
1118 self.check_and_update_quota(&consumer_id, message_size)?;
1119
1120 let (msg_count, byte_count) = self.get_usage(&consumer_id);
1122 message.headers.extra.insert(
1123 "x-quota-used-messages".to_string(),
1124 serde_json::json!(msg_count),
1125 );
1126 message.headers.extra.insert(
1127 "x-quota-used-bytes".to_string(),
1128 serde_json::json!(byte_count),
1129 );
1130
1131 Ok(())
1132 }
1133
1134 fn name(&self) -> &str {
1135 "resource_quota"
1136 }
1137}