oxigdal_streaming/v2/
backpressure.rs1use std::collections::VecDeque;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicI64, Ordering};
15
16use crate::error::StreamingError;
17
18#[derive(Debug, Clone)]
23pub struct CreditPool {
24 credits: Arc<AtomicI64>,
25 capacity: i64,
26}
27
28impl CreditPool {
29 pub fn new(capacity: i64) -> Self {
31 assert!(capacity > 0, "CreditPool capacity must be positive");
32 Self {
33 credits: Arc::new(AtomicI64::new(capacity)),
34 capacity,
35 }
36 }
37
38 pub fn try_acquire(&self, n: i64) -> bool {
43 assert!(n > 0, "must acquire at least 1 credit");
44 let mut current = self.credits.load(Ordering::Relaxed);
45 loop {
46 if current < n {
47 return false;
48 }
49 match self.credits.compare_exchange_weak(
50 current,
51 current - n,
52 Ordering::AcqRel,
53 Ordering::Relaxed,
54 ) {
55 Ok(_) => return true,
56 Err(actual) => current = actual,
57 }
58 }
59 }
60
61 pub fn release(&self, n: i64) {
65 assert!(n > 0, "must release at least 1 credit");
66 let prev = self.credits.fetch_add(n, Ordering::AcqRel);
68 let after = prev + n;
69 if after > self.capacity {
70 let mut current = after;
73 loop {
74 if current <= self.capacity {
75 break;
76 }
77 match self.credits.compare_exchange_weak(
78 current,
79 self.capacity,
80 Ordering::AcqRel,
81 Ordering::Relaxed,
82 ) {
83 Ok(_) => break,
84 Err(actual) => current = actual,
85 }
86 }
87 }
88 }
89
90 pub fn available(&self) -> i64 {
92 self.credits.load(Ordering::Acquire)
93 }
94
95 pub fn capacity(&self) -> i64 {
97 self.capacity
98 }
99
100 pub fn utilization(&self) -> f64 {
103 let avail = self.available().max(0);
104 1.0 - (avail as f64 / self.capacity as f64)
105 }
106}
107
108#[derive(Debug)]
112pub struct PendingItem<T> {
113 pub item: T,
115 pub credits_required: i64,
117}
118
119pub struct BackpressureProducer<T> {
127 pool: CreditPool,
128 pending: VecDeque<PendingItem<T>>,
129 emitted_total: u64,
130 dropped_total: u64,
131}
132
133impl<T> BackpressureProducer<T> {
134 pub fn new(pool: CreditPool) -> Self {
136 Self {
137 pool,
138 pending: VecDeque::new(),
139 emitted_total: 0,
140 dropped_total: 0,
141 }
142 }
143
144 pub fn try_emit(&mut self, item: T, credits: i64) -> Result<bool, StreamingError> {
150 if credits <= 0 {
151 return Err(StreamingError::InvalidOperation(
152 "credits must be positive".into(),
153 ));
154 }
155 if self.pool.try_acquire(credits) {
156 self.pending.push_back(PendingItem {
157 item,
158 credits_required: credits,
159 });
160 self.emitted_total += 1;
161 Ok(true)
162 } else {
163 self.dropped_total += 1;
164 Ok(false)
165 }
166 }
167
168 pub fn drain(&mut self) -> impl Iterator<Item = PendingItem<T>> + '_ {
170 self.pending.drain(..)
171 }
172
173 pub fn emitted_total(&self) -> u64 {
175 self.emitted_total
176 }
177
178 pub fn dropped_total(&self) -> u64 {
180 self.dropped_total
181 }
182
183 pub fn pending_count(&self) -> usize {
185 self.pending.len()
186 }
187
188 pub fn pool(&self) -> &CreditPool {
190 &self.pool
191 }
192}
193
194pub struct BackpressureConsumer {
198 pool: CreditPool,
199 consumed_total: u64,
200}
201
202impl BackpressureConsumer {
203 pub fn new(pool: CreditPool) -> Self {
205 Self {
206 pool,
207 consumed_total: 0,
208 }
209 }
210
211 pub fn consume(&mut self, credits: i64) {
214 self.pool.release(credits);
215 self.consumed_total += 1;
216 }
217
218 pub fn consumed_total(&self) -> u64 {
220 self.consumed_total
221 }
222
223 pub fn pool(&self) -> &CreditPool {
225 &self.pool
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232
233 #[test]
234 fn test_credit_pool_initial_credits() {
235 let pool = CreditPool::new(100);
236 assert_eq!(pool.available(), 100);
237 assert_eq!(pool.capacity(), 100);
238 }
239
240 #[test]
241 fn test_credit_pool_try_acquire_success() {
242 let pool = CreditPool::new(50);
243 assert!(pool.try_acquire(30));
244 assert_eq!(pool.available(), 20);
245 }
246
247 #[test]
248 fn test_credit_pool_try_acquire_fail_insufficient() {
249 let pool = CreditPool::new(10);
250 assert!(!pool.try_acquire(11));
251 assert_eq!(pool.available(), 10); }
253
254 #[test]
255 fn test_credit_pool_release_replenishes() {
256 let pool = CreditPool::new(100);
257 assert!(pool.try_acquire(40));
258 pool.release(40);
259 assert_eq!(pool.available(), 100);
260 }
261
262 #[test]
263 fn test_credit_pool_over_release_clamped_to_capacity() {
264 let pool = CreditPool::new(50);
265 pool.release(30); assert_eq!(pool.available(), 50);
267 }
268
269 #[test]
270 fn test_credit_pool_utilization_zero_when_full() {
271 let pool = CreditPool::new(100);
272 assert!((pool.utilization() - 0.0).abs() < f64::EPSILON);
273 }
274
275 #[test]
276 fn test_credit_pool_utilization_one_when_empty() {
277 let pool = CreditPool::new(100);
278 assert!(pool.try_acquire(100));
279 assert!((pool.utilization() - 1.0).abs() < f64::EPSILON);
280 }
281
282 #[test]
283 fn test_producer_emit_success() {
284 let pool = CreditPool::new(10);
285 let mut producer = BackpressureProducer::new(pool);
286 let ok = producer
287 .try_emit("hello", 5)
288 .expect("try_emit should not error");
289 assert!(ok);
290 assert_eq!(producer.emitted_total(), 1);
291 assert_eq!(producer.pending_count(), 1);
292 }
293
294 #[test]
295 fn test_producer_backpressure_when_no_credits() {
296 let pool = CreditPool::new(5);
297 let mut producer = BackpressureProducer::new(pool);
298 assert!(
300 producer
301 .try_emit("first", 5)
302 .expect("emit should not error")
303 );
304 let ok = producer
306 .try_emit("second", 1)
307 .expect("emit should not error");
308 assert!(!ok);
309 assert_eq!(producer.dropped_total(), 1);
310 }
311
312 #[test]
313 fn test_producer_drain_yields_pending_items() {
314 let pool = CreditPool::new(20);
315 let mut producer = BackpressureProducer::new(pool);
316 producer.try_emit(1u32, 4).expect("emit ok");
317 producer.try_emit(2u32, 4).expect("emit ok");
318 let items: Vec<_> = producer.drain().map(|p| p.item).collect();
319 assert_eq!(items, vec![1, 2]);
320 assert_eq!(producer.pending_count(), 0);
321 }
322
323 #[test]
324 fn test_consumer_consume_increments_count() {
325 let pool = CreditPool::new(100);
326 let mut consumer = BackpressureConsumer::new(pool);
327 consumer.consume(10);
328 consumer.consume(10);
329 assert_eq!(consumer.consumed_total(), 2);
330 }
331
332 #[test]
333 fn test_consumer_consume_releases_credits() {
334 let pool = CreditPool::new(100);
335 let consumer_pool = pool.clone();
336 let mut producer = BackpressureProducer::new(pool);
338 producer.try_emit("x", 100).expect("emit ok");
339 assert_eq!(producer.pool().available(), 0);
340
341 let mut consumer = BackpressureConsumer::new(consumer_pool);
342 consumer.consume(50);
343 assert_eq!(consumer.pool().available(), 50);
344 }
345
346 #[test]
347 fn test_credit_pool_clone_shares_state() {
348 let pool = CreditPool::new(100);
349 let pool2 = pool.clone();
350 assert!(pool.try_acquire(40));
351 assert_eq!(pool2.available(), 60);
353 }
354}