Skip to main content

ringkernel_core/
backpressure.rs

1//! Credit-Based Backpressure & Flow Control — FR-003
2//!
3//! Implements credit-based flow control for K2K messaging:
4//! - Producer requests credits from consumer before sending
5//! - Consumer grants credits based on available queue capacity
6//! - When credits exhausted: producer blocks, buffers, or drops (configurable)
7//! - Queue watermark signals: LOW_WATER / HIGH_WATER
8//! - Per-channel flow control
9//!
10//! # Protocol
11//!
12//! ```text
13//! Producer                          Consumer
14//!    │                                  │
15//!    │── RequestCredits(n) ──────────►  │
16//!    │                                  │── GrantCredits(n) ─►
17//!    │◄── GrantCredits(n) ──────────── │
18//!    │                                  │
19//!    │── Message (credit-1) ─────────► │
20//!    │── Message (credit-1) ─────────► │
21//!    │── Message (credit-1) ─────────► │
22//!    │   (credits exhausted)            │
23//!    │── RequestCredits(n) ──────────►  │
24//!    │   ...                            │
25//! ```
26
27use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
28
29/// Flow control configuration for a K2K channel.
30#[derive(Debug, Clone)]
31pub struct FlowControlConfig {
32    /// Initial credits granted to producer.
33    pub initial_credits: u32,
34    /// Credits to grant on each refill.
35    pub refill_amount: u32,
36    /// Queue depth at which HIGH_WATER signal fires.
37    pub high_water_mark: u32,
38    /// Queue depth at which LOW_WATER signal fires (after HIGH_WATER).
39    pub low_water_mark: u32,
40    /// What to do when credits are exhausted.
41    pub overflow_policy: OverflowPolicy,
42}
43
44impl Default for FlowControlConfig {
45    fn default() -> Self {
46        Self {
47            initial_credits: 64,
48            refill_amount: 32,
49            high_water_mark: 192, // 75% of 256
50            low_water_mark: 64,   // 25% of 256
51            overflow_policy: OverflowPolicy::Block,
52        }
53    }
54}
55
56/// Policy when producer has no credits left.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum OverflowPolicy {
59    /// Block the producer until credits are available.
60    Block,
61    /// Buffer messages locally (up to buffer_limit).
62    Buffer {
63        /// Maximum number of messages to buffer.
64        limit: u32,
65    },
66    /// Drop the message and route to dead letter queue.
67    DropToDlq,
68    /// Drop the message silently (with metric increment).
69    DropSilent,
70}
71
72/// Watermark signal emitted when queue depth crosses thresholds.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum WatermarkSignal {
75    /// Queue depth rose above high_water_mark — producer should slow down.
76    HighWater,
77    /// Queue depth fell below low_water_mark — producer can resume.
78    LowWater,
79}
80
81/// Per-channel flow controller.
82///
83/// Tracks credits, watermark state, and flow control metrics.
84pub struct FlowController {
85    /// Available credits for the producer.
86    credits: AtomicI64,
87    /// Configuration.
88    config: FlowControlConfig,
89    /// Current watermark state.
90    is_high_water: std::sync::atomic::AtomicBool,
91    /// Metrics.
92    metrics: FlowMetrics,
93}
94
95/// Flow control metrics.
96pub struct FlowMetrics {
97    /// Total credits granted.
98    pub credits_granted: AtomicU64,
99    /// Total credits consumed (messages sent).
100    pub credits_consumed: AtomicU64,
101    /// Times producer was blocked (credits exhausted).
102    pub backpressure_events: AtomicU64,
103    /// Messages routed to DLQ due to overflow.
104    pub dlq_routed: AtomicU64,
105    /// Messages dropped silently.
106    pub dropped: AtomicU64,
107    /// High water signals emitted.
108    pub high_water_signals: AtomicU64,
109    /// Low water signals emitted.
110    pub low_water_signals: AtomicU64,
111}
112
113impl FlowController {
114    /// Create a new flow controller with the given config.
115    pub fn new(config: FlowControlConfig) -> Self {
116        let initial = config.initial_credits as i64;
117        Self {
118            credits: AtomicI64::new(initial),
119            config,
120            is_high_water: std::sync::atomic::AtomicBool::new(false),
121            metrics: FlowMetrics {
122                credits_granted: AtomicU64::new(0),
123                credits_consumed: AtomicU64::new(0),
124                backpressure_events: AtomicU64::new(0),
125                dlq_routed: AtomicU64::new(0),
126                dropped: AtomicU64::new(0),
127                high_water_signals: AtomicU64::new(0),
128                low_water_signals: AtomicU64::new(0),
129            },
130        }
131    }
132
133    /// Try to acquire a credit for sending. Returns true if credit available.
134    pub fn try_acquire(&self) -> bool {
135        let prev = self.credits.fetch_sub(1, Ordering::AcqRel);
136        if prev > 0 {
137            self.metrics
138                .credits_consumed
139                .fetch_add(1, Ordering::Relaxed);
140            true
141        } else {
142            // Restore the credit (we didn't actually send)
143            self.credits.fetch_add(1, Ordering::Release);
144            self.metrics
145                .backpressure_events
146                .fetch_add(1, Ordering::Relaxed);
147            false
148        }
149    }
150
151    /// Grant credits to the producer (called by consumer when queue has space).
152    pub fn grant(&self, amount: u32) {
153        self.credits.fetch_add(amount as i64, Ordering::Release);
154        self.metrics
155            .credits_granted
156            .fetch_add(amount as u64, Ordering::Relaxed);
157    }
158
159    /// Refill credits by the configured refill amount.
160    pub fn refill(&self) {
161        self.grant(self.config.refill_amount);
162    }
163
164    /// Available credits.
165    pub fn available_credits(&self) -> i64 {
166        self.credits.load(Ordering::Acquire)
167    }
168
169    /// Check watermark state based on current queue depth.
170    ///
171    /// Returns a watermark signal if a threshold was just crossed.
172    pub fn check_watermark(&self, queue_depth: u32) -> Option<WatermarkSignal> {
173        let was_high = self.is_high_water.load(Ordering::Acquire);
174
175        if !was_high && queue_depth >= self.config.high_water_mark {
176            self.is_high_water.store(true, Ordering::Release);
177            self.metrics
178                .high_water_signals
179                .fetch_add(1, Ordering::Relaxed);
180            Some(WatermarkSignal::HighWater)
181        } else if was_high && queue_depth <= self.config.low_water_mark {
182            self.is_high_water.store(false, Ordering::Release);
183            self.metrics
184                .low_water_signals
185                .fetch_add(1, Ordering::Relaxed);
186            Some(WatermarkSignal::LowWater)
187        } else {
188            None
189        }
190    }
191
192    /// Get the overflow policy.
193    pub fn overflow_policy(&self) -> OverflowPolicy {
194        self.config.overflow_policy
195    }
196
197    /// Record a message dropped (for metrics).
198    pub fn record_drop(&self) {
199        self.metrics.dropped.fetch_add(1, Ordering::Relaxed);
200    }
201
202    /// Record a message routed to DLQ (for metrics).
203    pub fn record_dlq(&self) {
204        self.metrics.dlq_routed.fetch_add(1, Ordering::Relaxed);
205    }
206
207    /// Get a snapshot of flow control metrics.
208    pub fn metrics_snapshot(&self) -> FlowMetricsSnapshot {
209        FlowMetricsSnapshot {
210            credits_available: self.credits.load(Ordering::Relaxed),
211            credits_granted: self.metrics.credits_granted.load(Ordering::Relaxed),
212            credits_consumed: self.metrics.credits_consumed.load(Ordering::Relaxed),
213            backpressure_events: self.metrics.backpressure_events.load(Ordering::Relaxed),
214            dlq_routed: self.metrics.dlq_routed.load(Ordering::Relaxed),
215            dropped: self.metrics.dropped.load(Ordering::Relaxed),
216            high_water_signals: self.metrics.high_water_signals.load(Ordering::Relaxed),
217            low_water_signals: self.metrics.low_water_signals.load(Ordering::Relaxed),
218        }
219    }
220}
221
222/// Snapshot of flow control metrics (non-atomic, for reporting).
223#[derive(Debug, Clone)]
224pub struct FlowMetricsSnapshot {
225    /// Credits currently available to the producer.
226    pub credits_available: i64,
227    /// Total credits granted since creation.
228    pub credits_granted: u64,
229    /// Total credits consumed since creation.
230    pub credits_consumed: u64,
231    /// Number of backpressure events triggered.
232    pub backpressure_events: u64,
233    /// Number of messages routed to the dead letter queue.
234    pub dlq_routed: u64,
235    /// Number of messages silently dropped.
236    pub dropped: u64,
237    /// Number of high water mark signals emitted.
238    pub high_water_signals: u64,
239    /// Number of low water mark signals emitted.
240    pub low_water_signals: u64,
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_flow_controller_basic() {
249        let fc = FlowController::new(FlowControlConfig {
250            initial_credits: 3,
251            ..Default::default()
252        });
253
254        assert!(fc.try_acquire());
255        assert!(fc.try_acquire());
256        assert!(fc.try_acquire());
257        assert!(!fc.try_acquire()); // No credits left
258
259        fc.grant(2);
260        assert!(fc.try_acquire());
261        assert!(fc.try_acquire());
262        assert!(!fc.try_acquire());
263    }
264
265    #[test]
266    fn test_watermark_signals() {
267        let fc = FlowController::new(FlowControlConfig {
268            high_water_mark: 80,
269            low_water_mark: 20,
270            ..Default::default()
271        });
272
273        assert_eq!(fc.check_watermark(50), None);
274        assert_eq!(fc.check_watermark(80), Some(WatermarkSignal::HighWater));
275        assert_eq!(fc.check_watermark(90), None); // Already high
276        assert_eq!(fc.check_watermark(20), Some(WatermarkSignal::LowWater));
277        assert_eq!(fc.check_watermark(10), None); // Already low
278    }
279
280    #[test]
281    fn test_metrics_tracking() {
282        let fc = FlowController::new(FlowControlConfig {
283            initial_credits: 2,
284            ..Default::default()
285        });
286
287        fc.try_acquire();
288        fc.try_acquire();
289        fc.try_acquire(); // Should fail
290
291        let m = fc.metrics_snapshot();
292        assert_eq!(m.credits_consumed, 2);
293        assert_eq!(m.backpressure_events, 1);
294    }
295
296    #[test]
297    fn test_refill() {
298        let fc = FlowController::new(FlowControlConfig {
299            initial_credits: 0,
300            refill_amount: 10,
301            ..Default::default()
302        });
303
304        assert!(!fc.try_acquire());
305        fc.refill();
306        assert_eq!(fc.available_credits(), 10);
307        for _ in 0..10 {
308            assert!(fc.try_acquire());
309        }
310        assert!(!fc.try_acquire());
311    }
312}