1use std::{collections::BTreeMap, fmt, sync::Arc};
2
3type ScoringFn = dyn Fn(&[ConsumerPressureMetrics]) -> f64 + Send + Sync;
4
5#[derive(Clone, Debug, Default, PartialEq, Eq)]
7pub struct ConsumerPressureMetrics {
8 pub current_in_flight: usize,
10 pub max_in_flight: usize,
12 pub buffer_depth: usize,
14 pub accept_count: usize,
16 pub defer_count: usize,
18 pub reject_count: usize,
20}
21
22impl ConsumerPressureMetrics {
23 #[must_use]
25 pub const fn new(current_in_flight: usize, max_in_flight: usize, buffer_depth: usize) -> Self {
26 Self {
27 current_in_flight,
28 max_in_flight,
29 buffer_depth,
30 accept_count: 0,
31 defer_count: 0,
32 reject_count: 0,
33 }
34 }
35
36 #[must_use]
38 pub fn utilization(&self) -> f64 {
39 if self.max_in_flight == 0 {
40 0.0
41 } else {
42 clamp_pressure(usize_to_f64(self.current_in_flight) / usize_to_f64(self.max_in_flight))
43 }
44 }
45
46 pub const fn record_accept(&mut self) {
48 self.accept_count = self.accept_count.saturating_add(1);
49 }
50
51 pub const fn record_defer(&mut self) {
53 self.defer_count = self.defer_count.saturating_add(1);
54 }
55
56 pub const fn record_reject(&mut self) {
58 self.reject_count = self.reject_count.saturating_add(1);
59 }
60}
61
62#[derive(Clone, Debug, PartialEq)]
64pub struct ConsumerPressureSnapshot {
65 pub consumer_id: String,
67 pub metrics: ConsumerPressureMetrics,
69 pub utilization: f64,
71}
72
73#[derive(Clone, Debug, PartialEq)]
75pub struct ChannelPressureSnapshot {
76 pub channel_id: String,
78 pub pressure_score: f64,
80 pub consumers: Vec<ConsumerPressureSnapshot>,
82}
83
84impl ChannelPressureSnapshot {
85 #[must_use]
87 pub fn consumer_count(&self) -> usize {
88 self.consumers.len()
89 }
90}
91
92pub struct PressureMonitor {
94 channels: BTreeMap<String, BTreeMap<String, ConsumerPressureMetrics>>,
95 scoring: Arc<ScoringFn>,
96}
97
98impl fmt::Debug for PressureMonitor {
99 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
100 formatter
101 .debug_struct("PressureMonitor")
102 .field("channels", &self.channels)
103 .finish_non_exhaustive()
104 }
105}
106
107impl Default for PressureMonitor {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl PressureMonitor {
114 #[must_use]
116 pub fn new() -> Self {
117 Self::with_scoring(default_channel_score)
118 }
119
120 #[must_use]
122 pub fn with_scoring<F>(scoring: F) -> Self
123 where
124 F: Fn(&[ConsumerPressureMetrics]) -> f64 + Send + Sync + 'static,
125 {
126 Self {
127 channels: BTreeMap::new(),
128 scoring: Arc::new(scoring),
129 }
130 }
131
132 #[must_use]
134 pub fn is_empty(&self) -> bool {
135 self.channels.values().all(BTreeMap::is_empty)
136 }
137
138 #[must_use]
140 pub fn total_consumer_count(&self) -> usize {
141 self.channels.values().map(BTreeMap::len).sum()
142 }
143
144 pub fn record_consumer_metrics(
146 &mut self,
147 channel_id: impl Into<String>,
148 consumer_id: impl Into<String>,
149 metrics: ConsumerPressureMetrics,
150 ) -> ChannelPressureSnapshot {
151 let channel_id = channel_id.into();
152 let consumer_id = consumer_id.into();
153 self.channels
154 .entry(channel_id.clone())
155 .or_default()
156 .insert(consumer_id, metrics);
157 self.channel_snapshot(&channel_id)
158 }
159
160 pub fn record_accept(
162 &mut self,
163 channel_id: impl Into<String>,
164 consumer_id: impl Into<String>,
165 ) -> ChannelPressureSnapshot {
166 self.record_signal(
167 channel_id,
168 consumer_id,
169 ConsumerPressureMetrics::record_accept,
170 )
171 }
172
173 pub fn record_defer(
175 &mut self,
176 channel_id: impl Into<String>,
177 consumer_id: impl Into<String>,
178 ) -> ChannelPressureSnapshot {
179 self.record_signal(
180 channel_id,
181 consumer_id,
182 ConsumerPressureMetrics::record_defer,
183 )
184 }
185
186 pub fn record_reject(
188 &mut self,
189 channel_id: impl Into<String>,
190 consumer_id: impl Into<String>,
191 ) -> ChannelPressureSnapshot {
192 self.record_signal(
193 channel_id,
194 consumer_id,
195 ConsumerPressureMetrics::record_reject,
196 )
197 }
198
199 #[must_use]
201 pub fn consumer_metrics(
202 &self,
203 channel_id: &str,
204 consumer_id: &str,
205 ) -> Option<&ConsumerPressureMetrics> {
206 self.channels.get(channel_id)?.get(consumer_id)
207 }
208
209 #[must_use]
211 pub fn consumer_utilization(&self, channel_id: &str, consumer_id: &str) -> Option<f64> {
212 self.consumer_metrics(channel_id, consumer_id)
213 .map(ConsumerPressureMetrics::utilization)
214 }
215
216 #[must_use]
218 pub fn channel_pressure(&self, channel_id: &str) -> f64 {
219 self.channel_snapshot(channel_id).pressure_score
220 }
221
222 #[must_use]
224 pub fn channel_consumer_count(&self, channel_id: &str) -> usize {
225 self.channels.get(channel_id).map_or(0, BTreeMap::len)
226 }
227
228 #[must_use]
230 pub fn channel_snapshot(&self, channel_id: &str) -> ChannelPressureSnapshot {
231 let consumers = self.consumer_snapshots(channel_id);
232 let metrics = consumers
233 .iter()
234 .map(|consumer| consumer.metrics.clone())
235 .collect::<Vec<_>>();
236 let pressure_score = clamp_pressure((self.scoring)(&metrics));
237 ChannelPressureSnapshot {
238 channel_id: channel_id.to_owned(),
239 pressure_score,
240 consumers,
241 }
242 }
243
244 fn record_signal(
245 &mut self,
246 channel_id: impl Into<String>,
247 consumer_id: impl Into<String>,
248 record: fn(&mut ConsumerPressureMetrics),
249 ) -> ChannelPressureSnapshot {
250 let channel_id = channel_id.into();
251 let consumer_id = consumer_id.into();
252 if let Some(metrics) = self
259 .channels
260 .get_mut(&channel_id)
261 .and_then(|consumers| consumers.get_mut(consumer_id.as_str()))
262 {
263 record(metrics);
264 }
265 self.channel_snapshot(&channel_id)
266 }
267
268 fn consumer_snapshots(&self, channel_id: &str) -> Vec<ConsumerPressureSnapshot> {
269 self.channels
270 .get(channel_id)
271 .map_or_else(Vec::new, |consumers| {
272 consumers
273 .iter()
274 .map(|(consumer_id, metrics)| ConsumerPressureSnapshot {
275 consumer_id: consumer_id.clone(),
276 metrics: metrics.clone(),
277 utilization: metrics.utilization(),
278 })
279 .collect()
280 })
281 }
282}
283
284fn default_channel_score(metrics: &[ConsumerPressureMetrics]) -> f64 {
285 if metrics.is_empty() {
286 0.0
287 } else {
288 let total_utilization = metrics
289 .iter()
290 .map(ConsumerPressureMetrics::utilization)
291 .sum::<f64>();
292 total_utilization / usize_to_f64(metrics.len())
293 }
294}
295
296const fn clamp_pressure(score: f64) -> f64 {
297 if score.is_nan() {
298 0.0
299 } else {
300 score.clamp(0.0, 1.0)
301 }
302}
303
304fn usize_to_f64(value: usize) -> f64 {
305 u32::try_from(value).map_or_else(|_| f64::from(u32::MAX), f64::from)
306}
307
308#[cfg(test)]
309mod tests {
310 use super::{ConsumerPressureMetrics, PressureMonitor};
311
312 fn close_to(actual: f64, expected: f64) -> bool {
313 (actual - expected).abs() < f64::EPSILON
314 }
315
316 #[test]
317 fn pressure_monitor_starts_without_tracked_consumers() {
318 let monitor = PressureMonitor::new();
319
320 assert!(monitor.is_empty());
321 assert_eq!(monitor.total_consumer_count(), 0);
322 assert_eq!(monitor.channel_consumer_count("orders"), 0);
323 }
324
325 #[test]
326 fn consumer_utilization_uses_current_and_max_in_flight() {
327 let mut monitor = PressureMonitor::new();
328
329 monitor.record_consumer_metrics(
330 "orders",
331 "consumer-a",
332 ConsumerPressureMetrics::new(8, 10, 0),
333 );
334
335 let utilization = monitor.consumer_utilization("orders", "consumer-a");
336 assert!(matches!(utilization, Some(score) if close_to(score, 0.8)));
337 }
338
339 #[test]
340 fn channel_pressure_aggregates_across_consumers() {
341 let mut monitor = PressureMonitor::new();
342
343 monitor.record_consumer_metrics(
344 "orders",
345 "consumer-a",
346 ConsumerPressureMetrics::new(8, 10, 2),
347 );
348 let snapshot = monitor.record_consumer_metrics(
349 "orders",
350 "consumer-b",
351 ConsumerPressureMetrics::new(2, 10, 1),
352 );
353
354 assert_eq!(snapshot.consumer_count(), 2);
355 assert!(close_to(snapshot.pressure_score, 0.5));
356 assert!(close_to(monitor.channel_pressure("orders"), 0.5));
357 }
358
359 #[test]
360 fn monitor_tracks_accept_defer_and_reject_counts_per_consumer() {
361 let mut monitor = PressureMonitor::new();
362 monitor.record_consumer_metrics(
365 "orders",
366 "consumer-a",
367 ConsumerPressureMetrics::new(0, 10, 0),
368 );
369
370 monitor.record_accept("orders", "consumer-a");
371 monitor.record_defer("orders", "consumer-a");
372 monitor.record_reject("orders", "consumer-a");
373
374 let metrics = monitor.consumer_metrics("orders", "consumer-a");
375 assert!(matches!(
376 metrics,
377 Some(ConsumerPressureMetrics {
378 accept_count: 1,
379 defer_count: 1,
380 reject_count: 1,
381 ..
382 })
383 ));
384 }
385
386 #[test]
387 fn signal_for_unregistered_consumer_does_not_create_ghost_or_dilute_pressure() {
388 let mut monitor = PressureMonitor::new();
389 monitor.record_consumer_metrics(
391 "orders",
392 "consumer-a",
393 ConsumerPressureMetrics::new(10, 10, 0),
394 );
395 assert!(close_to(monitor.channel_pressure("orders"), 1.0));
396
397 monitor.record_accept("orders", "consumer-typo");
402
403 assert!(close_to(monitor.channel_pressure("orders"), 1.0));
404 assert_eq!(monitor.channel_consumer_count("orders"), 1);
405 assert!(
406 monitor
407 .consumer_metrics("orders", "consumer-typo")
408 .is_none()
409 );
410 }
411
412 #[test]
413 fn pressure_scores_are_clamped_to_unit_range() {
414 let mut high = PressureMonitor::with_scoring(|_| 3.0);
415 let mut low = PressureMonitor::with_scoring(|_| -2.0);
416 let mut not_a_number = PressureMonitor::with_scoring(|_| f64::NAN);
417
418 high.record_consumer_metrics(
419 "orders",
420 "consumer-a",
421 ConsumerPressureMetrics::new(1, 1, 0),
422 );
423 low.record_consumer_metrics(
424 "orders",
425 "consumer-a",
426 ConsumerPressureMetrics::new(1, 1, 0),
427 );
428 not_a_number.record_consumer_metrics(
429 "orders",
430 "consumer-a",
431 ConsumerPressureMetrics::new(1, 1, 0),
432 );
433
434 assert!(close_to(high.channel_pressure("orders"), 1.0));
435 assert!(close_to(low.channel_pressure("orders"), 0.0));
436 assert!(close_to(not_a_number.channel_pressure("orders"), 0.0));
437 }
438
439 #[test]
440 fn scoring_function_is_configurable() {
441 let mut monitor = PressureMonitor::with_scoring(|metrics| {
442 if metrics.iter().any(|metric| metric.buffer_depth > 0) {
443 0.75
444 } else {
445 0.25
446 }
447 });
448
449 monitor.record_consumer_metrics(
450 "orders",
451 "consumer-a",
452 ConsumerPressureMetrics::new(1, 10, 3),
453 );
454
455 assert!(close_to(monitor.channel_pressure("orders"), 0.75));
456 }
457}