Skip to main content

oxigdal_streaming/v2/
backpressure.rs

1//! Credit-based flow control for geospatial stream processing.
2//!
3//! Producers are given a credit budget. They may only emit items when
4//! they have available credits. Consumers replenish credits as they
5//! consume. This prevents unbounded buffering under load.
6//!
7//! Design:
8//! - `CreditPool`: shared atomic credit counter (Arc-wrapped for multi-producer/consumer use)
9//! - `BackpressureProducer`: wraps a data source with credit checking; stages pending items
10//! - `BackpressureConsumer`: wraps a sink with credit replenishment
11
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicI64, Ordering};
15
16use crate::error::StreamingError;
17
18/// A pool of credits shared between producer and consumer.
19///
20/// Credits are atomically tracked; producers consume credits when emitting items
21/// and consumers release credits as they process those items.
22#[derive(Debug, Clone)]
23pub struct CreditPool {
24    credits: Arc<AtomicI64>,
25    capacity: i64,
26}
27
28impl CreditPool {
29    /// Create a pool with the given initial (and maximum) capacity.
30    pub fn new(capacity: i64) -> Self {
31        assert!(capacity > 0, "CreditPool capacity must be positive");
32        Self {
33            credits: Arc::new(AtomicI64::new(capacity)),
34            capacity,
35        }
36    }
37
38    /// Try to acquire `n` credits (non-blocking).
39    ///
40    /// Returns `true` if the credits were successfully acquired, `false` if the
41    /// pool does not currently hold enough credits (backpressure signal).
42    pub fn try_acquire(&self, n: i64) -> bool {
43        assert!(n > 0, "must acquire at least 1 credit");
44        let mut current = self.credits.load(Ordering::Relaxed);
45        loop {
46            if current < n {
47                return false;
48            }
49            match self.credits.compare_exchange_weak(
50                current,
51                current - n,
52                Ordering::AcqRel,
53                Ordering::Relaxed,
54            ) {
55                Ok(_) => return true,
56                Err(actual) => current = actual,
57            }
58        }
59    }
60
61    /// Release `n` credits back to the pool (consumer-side).
62    ///
63    /// The pool is clamped to its capacity to prevent over-replenishment.
64    pub fn release(&self, n: i64) {
65        assert!(n > 0, "must release at least 1 credit");
66        // fetch_add then clamp in a CAS loop
67        let prev = self.credits.fetch_add(n, Ordering::AcqRel);
68        let after = prev + n;
69        if after > self.capacity {
70            // Clamp: try to bring the counter back down to capacity.
71            // A CAS loop is needed because another thread may have changed it.
72            let mut current = after;
73            loop {
74                if current <= self.capacity {
75                    break;
76                }
77                match self.credits.compare_exchange_weak(
78                    current,
79                    self.capacity,
80                    Ordering::AcqRel,
81                    Ordering::Relaxed,
82                ) {
83                    Ok(_) => break,
84                    Err(actual) => current = actual,
85                }
86            }
87        }
88    }
89
90    /// Number of credits currently available.
91    pub fn available(&self) -> i64 {
92        self.credits.load(Ordering::Acquire)
93    }
94
95    /// Maximum credits this pool can hold.
96    pub fn capacity(&self) -> i64 {
97        self.capacity
98    }
99
100    /// Utilization fraction: `0.0` when pool is full (nothing consumed),
101    /// `1.0` when completely empty (maximum backpressure).
102    pub fn utilization(&self) -> f64 {
103        let avail = self.available().max(0);
104        1.0 - (avail as f64 / self.capacity as f64)
105    }
106}
107
108// ─── PendingItem ──────────────────────────────────────────────────────────────
109
110/// An item staged by the producer, waiting to be drained by the consumer.
111#[derive(Debug)]
112pub struct PendingItem<T> {
113    /// The payload.
114    pub item: T,
115    /// Credits that were consumed when this item was emitted.
116    pub credits_required: i64,
117}
118
119// ─── BackpressureProducer ─────────────────────────────────────────────────────
120
121/// A producer that checks the shared `CreditPool` before emitting items.
122///
123/// Items that are successfully emitted are staged in an internal `VecDeque`;
124/// the consumer should call `drain()` to retrieve them and then call
125/// `BackpressureConsumer::consume()` for each item processed.
126pub struct BackpressureProducer<T> {
127    pool: CreditPool,
128    pending: VecDeque<PendingItem<T>>,
129    emitted_total: u64,
130    dropped_total: u64,
131}
132
133impl<T> BackpressureProducer<T> {
134    /// Create a producer that shares the given `CreditPool`.
135    pub fn new(pool: CreditPool) -> Self {
136        Self {
137            pool,
138            pending: VecDeque::new(),
139            emitted_total: 0,
140            dropped_total: 0,
141        }
142    }
143
144    /// Try to emit an item consuming `credits` credits.
145    ///
146    /// Returns:
147    /// - `Ok(true)` — item was staged successfully.
148    /// - `Ok(false)` — backpressured; caller should retry later or drop the item.
149    pub fn try_emit(&mut self, item: T, credits: i64) -> Result<bool, StreamingError> {
150        if credits <= 0 {
151            return Err(StreamingError::InvalidOperation(
152                "credits must be positive".into(),
153            ));
154        }
155        if self.pool.try_acquire(credits) {
156            self.pending.push_back(PendingItem {
157                item,
158                credits_required: credits,
159            });
160            self.emitted_total += 1;
161            Ok(true)
162        } else {
163            self.dropped_total += 1;
164            Ok(false)
165        }
166    }
167
168    /// Drain all staged items, yielding them to the consumer.
169    pub fn drain(&mut self) -> impl Iterator<Item = PendingItem<T>> + '_ {
170        self.pending.drain(..)
171    }
172
173    /// Total items successfully emitted since creation.
174    pub fn emitted_total(&self) -> u64 {
175        self.emitted_total
176    }
177
178    /// Total items dropped due to backpressure since creation.
179    pub fn dropped_total(&self) -> u64 {
180        self.dropped_total
181    }
182
183    /// Number of items currently staged (not yet drained).
184    pub fn pending_count(&self) -> usize {
185        self.pending.len()
186    }
187
188    /// Shared reference to the underlying credit pool.
189    pub fn pool(&self) -> &CreditPool {
190        &self.pool
191    }
192}
193
194// ─── BackpressureConsumer ─────────────────────────────────────────────────────
195
196/// A consumer that releases credits back to the pool as items are processed.
197pub struct BackpressureConsumer {
198    pool: CreditPool,
199    consumed_total: u64,
200}
201
202impl BackpressureConsumer {
203    /// Create a consumer that shares the given `CreditPool`.
204    pub fn new(pool: CreditPool) -> Self {
205        Self {
206            pool,
207            consumed_total: 0,
208        }
209    }
210
211    /// Mark `credits` worth of processing as complete, releasing those credits
212    /// back to the pool so the producer can emit more items.
213    pub fn consume(&mut self, credits: i64) {
214        self.pool.release(credits);
215        self.consumed_total += 1;
216    }
217
218    /// Total items consumed (i.e. times `consume()` was called) since creation.
219    pub fn consumed_total(&self) -> u64 {
220        self.consumed_total
221    }
222
223    /// Shared reference to the underlying credit pool.
224    pub fn pool(&self) -> &CreditPool {
225        &self.pool
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232
233    #[test]
234    fn test_credit_pool_initial_credits() {
235        let pool = CreditPool::new(100);
236        assert_eq!(pool.available(), 100);
237        assert_eq!(pool.capacity(), 100);
238    }
239
240    #[test]
241    fn test_credit_pool_try_acquire_success() {
242        let pool = CreditPool::new(50);
243        assert!(pool.try_acquire(30));
244        assert_eq!(pool.available(), 20);
245    }
246
247    #[test]
248    fn test_credit_pool_try_acquire_fail_insufficient() {
249        let pool = CreditPool::new(10);
250        assert!(!pool.try_acquire(11));
251        assert_eq!(pool.available(), 10); // unchanged
252    }
253
254    #[test]
255    fn test_credit_pool_release_replenishes() {
256        let pool = CreditPool::new(100);
257        assert!(pool.try_acquire(40));
258        pool.release(40);
259        assert_eq!(pool.available(), 100);
260    }
261
262    #[test]
263    fn test_credit_pool_over_release_clamped_to_capacity() {
264        let pool = CreditPool::new(50);
265        pool.release(30); // release without prior acquire — should clamp to capacity
266        assert_eq!(pool.available(), 50);
267    }
268
269    #[test]
270    fn test_credit_pool_utilization_zero_when_full() {
271        let pool = CreditPool::new(100);
272        assert!((pool.utilization() - 0.0).abs() < f64::EPSILON);
273    }
274
275    #[test]
276    fn test_credit_pool_utilization_one_when_empty() {
277        let pool = CreditPool::new(100);
278        assert!(pool.try_acquire(100));
279        assert!((pool.utilization() - 1.0).abs() < f64::EPSILON);
280    }
281
282    #[test]
283    fn test_producer_emit_success() {
284        let pool = CreditPool::new(10);
285        let mut producer = BackpressureProducer::new(pool);
286        let ok = producer
287            .try_emit("hello", 5)
288            .expect("try_emit should not error");
289        assert!(ok);
290        assert_eq!(producer.emitted_total(), 1);
291        assert_eq!(producer.pending_count(), 1);
292    }
293
294    #[test]
295    fn test_producer_backpressure_when_no_credits() {
296        let pool = CreditPool::new(5);
297        let mut producer = BackpressureProducer::new(pool);
298        // consume all credits
299        assert!(
300            producer
301                .try_emit("first", 5)
302                .expect("emit should not error")
303        );
304        // now no credits remain
305        let ok = producer
306            .try_emit("second", 1)
307            .expect("emit should not error");
308        assert!(!ok);
309        assert_eq!(producer.dropped_total(), 1);
310    }
311
312    #[test]
313    fn test_producer_drain_yields_pending_items() {
314        let pool = CreditPool::new(20);
315        let mut producer = BackpressureProducer::new(pool);
316        producer.try_emit(1u32, 4).expect("emit ok");
317        producer.try_emit(2u32, 4).expect("emit ok");
318        let items: Vec<_> = producer.drain().map(|p| p.item).collect();
319        assert_eq!(items, vec![1, 2]);
320        assert_eq!(producer.pending_count(), 0);
321    }
322
323    #[test]
324    fn test_consumer_consume_increments_count() {
325        let pool = CreditPool::new(100);
326        let mut consumer = BackpressureConsumer::new(pool);
327        consumer.consume(10);
328        consumer.consume(10);
329        assert_eq!(consumer.consumed_total(), 2);
330    }
331
332    #[test]
333    fn test_consumer_consume_releases_credits() {
334        let pool = CreditPool::new(100);
335        let consumer_pool = pool.clone();
336        // drain all credits via producer
337        let mut producer = BackpressureProducer::new(pool);
338        producer.try_emit("x", 100).expect("emit ok");
339        assert_eq!(producer.pool().available(), 0);
340
341        let mut consumer = BackpressureConsumer::new(consumer_pool);
342        consumer.consume(50);
343        assert_eq!(consumer.pool().available(), 50);
344    }
345
346    #[test]
347    fn test_credit_pool_clone_shares_state() {
348        let pool = CreditPool::new(100);
349        let pool2 = pool.clone();
350        assert!(pool.try_acquire(40));
351        // pool2 reflects the same atomic
352        assert_eq!(pool2.available(), 60);
353    }
354}