Skip to main content

liminal/pressure/
monitor.rs

1use std::{collections::BTreeMap, fmt, sync::Arc};
2
3type ScoringFn = dyn Fn(&[ConsumerPressureMetrics]) -> f64 + Send + Sync;
4
5/// Per-consumer pressure counters tracked by the bus.
6#[derive(Clone, Debug, Default, PartialEq, Eq)]
7pub struct ConsumerPressureMetrics {
8    /// Messages currently in flight for the consumer.
9    pub current_in_flight: usize,
10    /// Maximum messages the consumer declared it can process concurrently.
11    pub max_in_flight: usize,
12    /// Messages currently buffered for the consumer.
13    pub buffer_depth: usize,
14    /// Number of accept decisions recorded for the consumer.
15    pub accept_count: usize,
16    /// Number of defer decisions recorded for the consumer.
17    pub defer_count: usize,
18    /// Number of reject decisions recorded for the consumer.
19    pub reject_count: usize,
20}
21
22impl ConsumerPressureMetrics {
23    /// Creates a metric snapshot with zeroed accept, defer, and reject counts.
24    #[must_use]
25    pub const fn new(current_in_flight: usize, max_in_flight: usize, buffer_depth: usize) -> Self {
26        Self {
27            current_in_flight,
28            max_in_flight,
29            buffer_depth,
30            accept_count: 0,
31            defer_count: 0,
32            reject_count: 0,
33        }
34    }
35
36    /// Returns the consumer's in-flight utilization clamped to `[0.0, 1.0]`.
37    #[must_use]
38    pub fn utilization(&self) -> f64 {
39        if self.max_in_flight == 0 {
40            0.0
41        } else {
42            clamp_pressure(usize_to_f64(self.current_in_flight) / usize_to_f64(self.max_in_flight))
43        }
44    }
45
46    /// Records one accepted delivery decision for this consumer.
47    pub const fn record_accept(&mut self) {
48        self.accept_count = self.accept_count.saturating_add(1);
49    }
50
51    /// Records one deferred delivery decision for this consumer.
52    pub const fn record_defer(&mut self) {
53        self.defer_count = self.defer_count.saturating_add(1);
54    }
55
56    /// Records one rejected delivery decision for this consumer.
57    pub const fn record_reject(&mut self) {
58        self.reject_count = self.reject_count.saturating_add(1);
59    }
60}
61
62/// Observable pressure snapshot for one tracked consumer.
63#[derive(Clone, Debug, PartialEq)]
64pub struct ConsumerPressureSnapshot {
65    /// Consumer identifier supplied by the routing or dispatch subsystem.
66    pub consumer_id: String,
67    /// Latest metrics recorded for the consumer.
68    pub metrics: ConsumerPressureMetrics,
69    /// Current in-flight utilization for the consumer.
70    pub utilization: f64,
71}
72
73/// Observable pressure snapshot for a channel and all tracked consumers.
74#[derive(Clone, Debug, PartialEq)]
75pub struct ChannelPressureSnapshot {
76    /// Channel identifier whose pressure was measured.
77    pub channel_id: String,
78    /// Configurable aggregate pressure score clamped to `[0.0, 1.0]`.
79    pub pressure_score: f64,
80    /// Per-consumer metric snapshots contributing to the pressure score.
81    pub consumers: Vec<ConsumerPressureSnapshot>,
82}
83
84impl ChannelPressureSnapshot {
85    /// Returns the number of consumers currently tracked for the channel.
86    #[must_use]
87    pub fn consumer_count(&self) -> usize {
88        self.consumers.len()
89    }
90}
91
92/// Tracks pressure metrics and derives channel pressure scores.
93pub struct PressureMonitor {
94    channels: BTreeMap<String, BTreeMap<String, ConsumerPressureMetrics>>,
95    scoring: Arc<ScoringFn>,
96}
97
98impl fmt::Debug for PressureMonitor {
99    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
100        formatter
101            .debug_struct("PressureMonitor")
102            .field("channels", &self.channels)
103            .finish_non_exhaustive()
104    }
105}
106
107impl Default for PressureMonitor {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl PressureMonitor {
114    /// Creates an empty pressure monitor with the default average-utilization scorer.
115    #[must_use]
116    pub fn new() -> Self {
117        Self::with_scoring(default_channel_score)
118    }
119
120    /// Creates an empty pressure monitor using a caller-supplied scoring function.
121    #[must_use]
122    pub fn with_scoring<F>(scoring: F) -> Self
123    where
124        F: Fn(&[ConsumerPressureMetrics]) -> f64 + Send + Sync + 'static,
125    {
126        Self {
127            channels: BTreeMap::new(),
128            scoring: Arc::new(scoring),
129        }
130    }
131
132    /// Returns true when no consumers are tracked on any channel.
133    #[must_use]
134    pub fn is_empty(&self) -> bool {
135        self.channels.values().all(BTreeMap::is_empty)
136    }
137
138    /// Returns the number of consumers tracked across all channels.
139    #[must_use]
140    pub fn total_consumer_count(&self) -> usize {
141        self.channels.values().map(BTreeMap::len).sum()
142    }
143
144    /// Records the latest metrics for a consumer and returns the updated channel snapshot.
145    pub fn record_consumer_metrics(
146        &mut self,
147        channel_id: impl Into<String>,
148        consumer_id: impl Into<String>,
149        metrics: ConsumerPressureMetrics,
150    ) -> ChannelPressureSnapshot {
151        let channel_id = channel_id.into();
152        let consumer_id = consumer_id.into();
153        self.channels
154            .entry(channel_id.clone())
155            .or_default()
156            .insert(consumer_id, metrics);
157        self.channel_snapshot(&channel_id)
158    }
159
160    /// Records one accept decision for a consumer and returns the updated channel snapshot.
161    pub fn record_accept(
162        &mut self,
163        channel_id: impl Into<String>,
164        consumer_id: impl Into<String>,
165    ) -> ChannelPressureSnapshot {
166        self.record_signal(
167            channel_id,
168            consumer_id,
169            ConsumerPressureMetrics::record_accept,
170        )
171    }
172
173    /// Records one defer decision for a consumer and returns the updated channel snapshot.
174    pub fn record_defer(
175        &mut self,
176        channel_id: impl Into<String>,
177        consumer_id: impl Into<String>,
178    ) -> ChannelPressureSnapshot {
179        self.record_signal(
180            channel_id,
181            consumer_id,
182            ConsumerPressureMetrics::record_defer,
183        )
184    }
185
186    /// Records one reject decision for a consumer and returns the updated channel snapshot.
187    pub fn record_reject(
188        &mut self,
189        channel_id: impl Into<String>,
190        consumer_id: impl Into<String>,
191    ) -> ChannelPressureSnapshot {
192        self.record_signal(
193            channel_id,
194            consumer_id,
195            ConsumerPressureMetrics::record_reject,
196        )
197    }
198
199    /// Returns the latest metrics recorded for one consumer.
200    #[must_use]
201    pub fn consumer_metrics(
202        &self,
203        channel_id: &str,
204        consumer_id: &str,
205    ) -> Option<&ConsumerPressureMetrics> {
206        self.channels.get(channel_id)?.get(consumer_id)
207    }
208
209    /// Returns the latest utilization for one consumer.
210    #[must_use]
211    pub fn consumer_utilization(&self, channel_id: &str, consumer_id: &str) -> Option<f64> {
212        self.consumer_metrics(channel_id, consumer_id)
213            .map(ConsumerPressureMetrics::utilization)
214    }
215
216    /// Returns the current aggregate pressure score for a channel.
217    #[must_use]
218    pub fn channel_pressure(&self, channel_id: &str) -> f64 {
219        self.channel_snapshot(channel_id).pressure_score
220    }
221
222    /// Returns the current number of tracked consumers for a channel.
223    #[must_use]
224    pub fn channel_consumer_count(&self, channel_id: &str) -> usize {
225        self.channels.get(channel_id).map_or(0, BTreeMap::len)
226    }
227
228    /// Returns an observable snapshot for one channel.
229    #[must_use]
230    pub fn channel_snapshot(&self, channel_id: &str) -> ChannelPressureSnapshot {
231        let consumers = self.consumer_snapshots(channel_id);
232        let metrics = consumers
233            .iter()
234            .map(|consumer| consumer.metrics.clone())
235            .collect::<Vec<_>>();
236        let pressure_score = clamp_pressure((self.scoring)(&metrics));
237        ChannelPressureSnapshot {
238            channel_id: channel_id.to_owned(),
239            pressure_score,
240            consumers,
241        }
242    }
243
244    fn record_signal(
245        &mut self,
246        channel_id: impl Into<String>,
247        consumer_id: impl Into<String>,
248        record: fn(&mut ConsumerPressureMetrics),
249    ) -> ChannelPressureSnapshot {
250        let channel_id = channel_id.into();
251        let consumer_id = consumer_id.into();
252        // Only record signals for an already-tracked consumer. A consumer must
253        // first be registered via `record_consumer_metrics` (which supplies the
254        // capacity baseline); recording a signal for an unknown consumer here
255        // would create a zero-capacity "ghost" (utilization 0.0) that silently
256        // dilutes the channel's aggregate pressure score and inflates the
257        // consumer count, disabling backpressure. Drop the unmatched signal.
258        if let Some(metrics) = self
259            .channels
260            .get_mut(&channel_id)
261            .and_then(|consumers| consumers.get_mut(consumer_id.as_str()))
262        {
263            record(metrics);
264        }
265        self.channel_snapshot(&channel_id)
266    }
267
268    fn consumer_snapshots(&self, channel_id: &str) -> Vec<ConsumerPressureSnapshot> {
269        self.channels
270            .get(channel_id)
271            .map_or_else(Vec::new, |consumers| {
272                consumers
273                    .iter()
274                    .map(|(consumer_id, metrics)| ConsumerPressureSnapshot {
275                        consumer_id: consumer_id.clone(),
276                        metrics: metrics.clone(),
277                        utilization: metrics.utilization(),
278                    })
279                    .collect()
280            })
281    }
282}
283
284fn default_channel_score(metrics: &[ConsumerPressureMetrics]) -> f64 {
285    if metrics.is_empty() {
286        0.0
287    } else {
288        let total_utilization = metrics
289            .iter()
290            .map(ConsumerPressureMetrics::utilization)
291            .sum::<f64>();
292        total_utilization / usize_to_f64(metrics.len())
293    }
294}
295
296const fn clamp_pressure(score: f64) -> f64 {
297    if score.is_nan() {
298        0.0
299    } else {
300        score.clamp(0.0, 1.0)
301    }
302}
303
304fn usize_to_f64(value: usize) -> f64 {
305    u32::try_from(value).map_or_else(|_| f64::from(u32::MAX), f64::from)
306}
307
308#[cfg(test)]
309mod tests {
310    use super::{ConsumerPressureMetrics, PressureMonitor};
311
312    fn close_to(actual: f64, expected: f64) -> bool {
313        (actual - expected).abs() < f64::EPSILON
314    }
315
316    #[test]
317    fn pressure_monitor_starts_without_tracked_consumers() {
318        let monitor = PressureMonitor::new();
319
320        assert!(monitor.is_empty());
321        assert_eq!(monitor.total_consumer_count(), 0);
322        assert_eq!(monitor.channel_consumer_count("orders"), 0);
323    }
324
325    #[test]
326    fn consumer_utilization_uses_current_and_max_in_flight() {
327        let mut monitor = PressureMonitor::new();
328
329        monitor.record_consumer_metrics(
330            "orders",
331            "consumer-a",
332            ConsumerPressureMetrics::new(8, 10, 0),
333        );
334
335        let utilization = monitor.consumer_utilization("orders", "consumer-a");
336        assert!(matches!(utilization, Some(score) if close_to(score, 0.8)));
337    }
338
339    #[test]
340    fn channel_pressure_aggregates_across_consumers() {
341        let mut monitor = PressureMonitor::new();
342
343        monitor.record_consumer_metrics(
344            "orders",
345            "consumer-a",
346            ConsumerPressureMetrics::new(8, 10, 2),
347        );
348        let snapshot = monitor.record_consumer_metrics(
349            "orders",
350            "consumer-b",
351            ConsumerPressureMetrics::new(2, 10, 1),
352        );
353
354        assert_eq!(snapshot.consumer_count(), 2);
355        assert!(close_to(snapshot.pressure_score, 0.5));
356        assert!(close_to(monitor.channel_pressure("orders"), 0.5));
357    }
358
359    #[test]
360    fn monitor_tracks_accept_defer_and_reject_counts_per_consumer() {
361        let mut monitor = PressureMonitor::new();
362        // Per ADR-004 a consumer declares capacity before its signals are
363        // tracked; register it first, then record the wire signals.
364        monitor.record_consumer_metrics(
365            "orders",
366            "consumer-a",
367            ConsumerPressureMetrics::new(0, 10, 0),
368        );
369
370        monitor.record_accept("orders", "consumer-a");
371        monitor.record_defer("orders", "consumer-a");
372        monitor.record_reject("orders", "consumer-a");
373
374        let metrics = monitor.consumer_metrics("orders", "consumer-a");
375        assert!(matches!(
376            metrics,
377            Some(ConsumerPressureMetrics {
378                accept_count: 1,
379                defer_count: 1,
380                reject_count: 1,
381                ..
382            })
383        ));
384    }
385
386    #[test]
387    fn signal_for_unregistered_consumer_does_not_create_ghost_or_dilute_pressure() {
388        let mut monitor = PressureMonitor::new();
389        // A saturated, declared consumer → channel pressure 1.0.
390        monitor.record_consumer_metrics(
391            "orders",
392            "consumer-a",
393            ConsumerPressureMetrics::new(10, 10, 0),
394        );
395        assert!(close_to(monitor.channel_pressure("orders"), 1.0));
396
397        // A stray signal for a consumer that never declared capacity (e.g. a
398        // mismatched id) must NOT create a zero-capacity ghost: that would
399        // dilute the aggregate pressure score (disabling backpressure on a
400        // saturated channel) and inflate the consumer count.
401        monitor.record_accept("orders", "consumer-typo");
402
403        assert!(close_to(monitor.channel_pressure("orders"), 1.0));
404        assert_eq!(monitor.channel_consumer_count("orders"), 1);
405        assert!(
406            monitor
407                .consumer_metrics("orders", "consumer-typo")
408                .is_none()
409        );
410    }
411
412    #[test]
413    fn pressure_scores_are_clamped_to_unit_range() {
414        let mut high = PressureMonitor::with_scoring(|_| 3.0);
415        let mut low = PressureMonitor::with_scoring(|_| -2.0);
416        let mut not_a_number = PressureMonitor::with_scoring(|_| f64::NAN);
417
418        high.record_consumer_metrics(
419            "orders",
420            "consumer-a",
421            ConsumerPressureMetrics::new(1, 1, 0),
422        );
423        low.record_consumer_metrics(
424            "orders",
425            "consumer-a",
426            ConsumerPressureMetrics::new(1, 1, 0),
427        );
428        not_a_number.record_consumer_metrics(
429            "orders",
430            "consumer-a",
431            ConsumerPressureMetrics::new(1, 1, 0),
432        );
433
434        assert!(close_to(high.channel_pressure("orders"), 1.0));
435        assert!(close_to(low.channel_pressure("orders"), 0.0));
436        assert!(close_to(not_a_number.channel_pressure("orders"), 0.0));
437    }
438
439    #[test]
440    fn scoring_function_is_configurable() {
441        let mut monitor = PressureMonitor::with_scoring(|metrics| {
442            if metrics.iter().any(|metric| metric.buffer_depth > 0) {
443                0.75
444            } else {
445                0.25
446            }
447        });
448
449        monitor.record_consumer_metrics(
450            "orders",
451            "consumer-a",
452            ConsumerPressureMetrics::new(1, 10, 3),
453        );
454
455        assert!(close_to(monitor.channel_pressure("orders"), 0.75));
456    }
457}