1use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
28
29#[derive(Debug, Clone)]
31pub struct FlowControlConfig {
32 pub initial_credits: u32,
34 pub refill_amount: u32,
36 pub high_water_mark: u32,
38 pub low_water_mark: u32,
40 pub overflow_policy: OverflowPolicy,
42}
43
44impl Default for FlowControlConfig {
45 fn default() -> Self {
46 Self {
47 initial_credits: 64,
48 refill_amount: 32,
49 high_water_mark: 192, low_water_mark: 64, overflow_policy: OverflowPolicy::Block,
52 }
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum OverflowPolicy {
59 Block,
61 Buffer {
63 limit: u32,
65 },
66 DropToDlq,
68 DropSilent,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum WatermarkSignal {
75 HighWater,
77 LowWater,
79}
80
81pub struct FlowController {
85 credits: AtomicI64,
87 config: FlowControlConfig,
89 is_high_water: std::sync::atomic::AtomicBool,
91 metrics: FlowMetrics,
93}
94
95pub struct FlowMetrics {
97 pub credits_granted: AtomicU64,
99 pub credits_consumed: AtomicU64,
101 pub backpressure_events: AtomicU64,
103 pub dlq_routed: AtomicU64,
105 pub dropped: AtomicU64,
107 pub high_water_signals: AtomicU64,
109 pub low_water_signals: AtomicU64,
111}
112
113impl FlowController {
114 pub fn new(config: FlowControlConfig) -> Self {
116 let initial = config.initial_credits as i64;
117 Self {
118 credits: AtomicI64::new(initial),
119 config,
120 is_high_water: std::sync::atomic::AtomicBool::new(false),
121 metrics: FlowMetrics {
122 credits_granted: AtomicU64::new(0),
123 credits_consumed: AtomicU64::new(0),
124 backpressure_events: AtomicU64::new(0),
125 dlq_routed: AtomicU64::new(0),
126 dropped: AtomicU64::new(0),
127 high_water_signals: AtomicU64::new(0),
128 low_water_signals: AtomicU64::new(0),
129 },
130 }
131 }
132
133 pub fn try_acquire(&self) -> bool {
135 let prev = self.credits.fetch_sub(1, Ordering::AcqRel);
136 if prev > 0 {
137 self.metrics
138 .credits_consumed
139 .fetch_add(1, Ordering::Relaxed);
140 true
141 } else {
142 self.credits.fetch_add(1, Ordering::Release);
144 self.metrics
145 .backpressure_events
146 .fetch_add(1, Ordering::Relaxed);
147 false
148 }
149 }
150
151 pub fn grant(&self, amount: u32) {
153 self.credits.fetch_add(amount as i64, Ordering::Release);
154 self.metrics
155 .credits_granted
156 .fetch_add(amount as u64, Ordering::Relaxed);
157 }
158
159 pub fn refill(&self) {
161 self.grant(self.config.refill_amount);
162 }
163
164 pub fn available_credits(&self) -> i64 {
166 self.credits.load(Ordering::Acquire)
167 }
168
169 pub fn check_watermark(&self, queue_depth: u32) -> Option<WatermarkSignal> {
173 let was_high = self.is_high_water.load(Ordering::Acquire);
174
175 if !was_high && queue_depth >= self.config.high_water_mark {
176 self.is_high_water.store(true, Ordering::Release);
177 self.metrics
178 .high_water_signals
179 .fetch_add(1, Ordering::Relaxed);
180 Some(WatermarkSignal::HighWater)
181 } else if was_high && queue_depth <= self.config.low_water_mark {
182 self.is_high_water.store(false, Ordering::Release);
183 self.metrics
184 .low_water_signals
185 .fetch_add(1, Ordering::Relaxed);
186 Some(WatermarkSignal::LowWater)
187 } else {
188 None
189 }
190 }
191
192 pub fn overflow_policy(&self) -> OverflowPolicy {
194 self.config.overflow_policy
195 }
196
197 pub fn record_drop(&self) {
199 self.metrics.dropped.fetch_add(1, Ordering::Relaxed);
200 }
201
202 pub fn record_dlq(&self) {
204 self.metrics.dlq_routed.fetch_add(1, Ordering::Relaxed);
205 }
206
207 pub fn metrics_snapshot(&self) -> FlowMetricsSnapshot {
209 FlowMetricsSnapshot {
210 credits_available: self.credits.load(Ordering::Relaxed),
211 credits_granted: self.metrics.credits_granted.load(Ordering::Relaxed),
212 credits_consumed: self.metrics.credits_consumed.load(Ordering::Relaxed),
213 backpressure_events: self.metrics.backpressure_events.load(Ordering::Relaxed),
214 dlq_routed: self.metrics.dlq_routed.load(Ordering::Relaxed),
215 dropped: self.metrics.dropped.load(Ordering::Relaxed),
216 high_water_signals: self.metrics.high_water_signals.load(Ordering::Relaxed),
217 low_water_signals: self.metrics.low_water_signals.load(Ordering::Relaxed),
218 }
219 }
220}
221
222#[derive(Debug, Clone)]
224pub struct FlowMetricsSnapshot {
225 pub credits_available: i64,
227 pub credits_granted: u64,
229 pub credits_consumed: u64,
231 pub backpressure_events: u64,
233 pub dlq_routed: u64,
235 pub dropped: u64,
237 pub high_water_signals: u64,
239 pub low_water_signals: u64,
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_flow_controller_basic() {
249 let fc = FlowController::new(FlowControlConfig {
250 initial_credits: 3,
251 ..Default::default()
252 });
253
254 assert!(fc.try_acquire());
255 assert!(fc.try_acquire());
256 assert!(fc.try_acquire());
257 assert!(!fc.try_acquire()); fc.grant(2);
260 assert!(fc.try_acquire());
261 assert!(fc.try_acquire());
262 assert!(!fc.try_acquire());
263 }
264
265 #[test]
266 fn test_watermark_signals() {
267 let fc = FlowController::new(FlowControlConfig {
268 high_water_mark: 80,
269 low_water_mark: 20,
270 ..Default::default()
271 });
272
273 assert_eq!(fc.check_watermark(50), None);
274 assert_eq!(fc.check_watermark(80), Some(WatermarkSignal::HighWater));
275 assert_eq!(fc.check_watermark(90), None); assert_eq!(fc.check_watermark(20), Some(WatermarkSignal::LowWater));
277 assert_eq!(fc.check_watermark(10), None); }
279
280 #[test]
281 fn test_metrics_tracking() {
282 let fc = FlowController::new(FlowControlConfig {
283 initial_credits: 2,
284 ..Default::default()
285 });
286
287 fc.try_acquire();
288 fc.try_acquire();
289 fc.try_acquire(); let m = fc.metrics_snapshot();
292 assert_eq!(m.credits_consumed, 2);
293 assert_eq!(m.backpressure_events, 1);
294 }
295
296 #[test]
297 fn test_refill() {
298 let fc = FlowController::new(FlowControlConfig {
299 initial_credits: 0,
300 refill_amount: 10,
301 ..Default::default()
302 });
303
304 assert!(!fc.try_acquire());
305 fc.refill();
306 assert_eq!(fc.available_credits(), 10);
307 for _ in 0..10 {
308 assert!(fc.try_acquire());
309 }
310 assert!(!fc.try_acquire());
311 }
312}