Skip to main content

laminar_core/subscription/
backpressure.rs

1//! Backpressure control for the subscription system.
2//!
3//! Provides per-subscription backpressure strategies that determine behavior
4//! when a subscriber's channel buffer is full, plus demand-based flow control
5//! for Reactive Streams integration.
6//!
7//! # Strategies
8//!
9//! - **`DropOldest`**: Uses broadcast channel's natural lagging (subscribers skip ahead)
10//! - **`DropNewest`**: Silently discards new events when buffer full
11//! - **`Block`**: Blocks the Ring 1 dispatcher until buffer has space (never Ring 0)
12//! - **`Sample(n)`**: Delivers every Nth event, drops the rest
13//!
14//! # Demand-Based Flow Control
15//!
16//! [`DemandBackpressure`] implements the Reactive Streams `request(n)` model:
17//! the subscriber must explicitly request events via [`DemandHandle::request`].
18//! The dispatcher only delivers when pending demand > 0.
19
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23use crate::subscription::registry::BackpressureStrategy;
24
25// ---------------------------------------------------------------------------
26// BackpressureController
27// ---------------------------------------------------------------------------
28
29/// Per-subscription backpressure controller.
30///
31/// Tracks the backpressure strategy and dropped event count for a single
32/// subscription. The dispatcher consults this controller before sending
33/// each event to determine whether it should be delivered or dropped.
34pub struct BackpressureController {
35    /// The backpressure strategy.
36    strategy: BackpressureStrategy,
37    /// Total events dropped due to backpressure.
38    dropped: u64,
39    /// Counter for `Sample(n)` strategy.
40    sample_counter: u64,
41    /// Lag threshold for emitting a warning log.
42    lag_warning_threshold: u64,
43}
44
45impl BackpressureController {
46    /// Creates a new controller with the given strategy.
47    #[must_use]
48    pub fn new(strategy: BackpressureStrategy) -> Self {
49        Self {
50            strategy,
51            dropped: 0,
52            sample_counter: 0,
53            lag_warning_threshold: 1000,
54        }
55    }
56
57    /// Creates a new controller with a custom lag warning threshold.
58    #[must_use]
59    pub fn with_lag_threshold(strategy: BackpressureStrategy, threshold: u64) -> Self {
60        Self {
61            strategy,
62            dropped: 0,
63            sample_counter: 0,
64            lag_warning_threshold: threshold,
65        }
66    }
67
68    /// Determines whether the next event should be delivered.
69    ///
70    /// For `Sample(n)`, increments an internal counter and returns `true`
71    /// every Nth call. For other strategies, always returns `true` — the
72    /// actual backpressure is handled by the broadcast channel or the
73    /// dispatcher's send logic.
74    #[inline]
75    pub fn should_deliver(&mut self) -> bool {
76        match self.strategy {
77            BackpressureStrategy::DropOldest
78            | BackpressureStrategy::DropNewest
79            | BackpressureStrategy::Block => true,
80            BackpressureStrategy::Sample(n) => {
81                self.sample_counter += 1;
82                if self.sample_counter.is_multiple_of(n as u64) {
83                    true
84                } else {
85                    self.dropped += 1;
86                    false
87                }
88            }
89        }
90    }
91
92    /// Records an event drop (e.g., `DropNewest` when buffer is full).
93    pub fn record_drop(&mut self) {
94        self.dropped += 1;
95    }
96
97    /// Returns the total number of events dropped.
98    #[must_use]
99    pub fn dropped(&self) -> u64 {
100        self.dropped
101    }
102
103    /// Returns the configured backpressure strategy.
104    #[must_use]
105    pub fn strategy(&self) -> BackpressureStrategy {
106        self.strategy
107    }
108
109    /// Returns the lag warning threshold.
110    #[must_use]
111    pub fn lag_warning_threshold(&self) -> u64 {
112        self.lag_warning_threshold
113    }
114
115    /// Returns `true` if the given lag exceeds the warning threshold.
116    #[must_use]
117    pub fn is_lagging(&self, lag: u64) -> bool {
118        lag >= self.lag_warning_threshold
119    }
120}
121
122// ---------------------------------------------------------------------------
123// DemandBackpressure
124// ---------------------------------------------------------------------------
125
126/// Demand-based backpressure (Reactive Streams `request(n)` model).
127///
128/// The subscriber calls [`DemandHandle::request`] to indicate it can accept
129/// N more events. The dispatcher calls [`try_consume`](Self::try_consume)
130/// before each delivery — if pending demand is 0, the event is not sent.
131///
132/// # Thread Safety
133///
134/// The pending demand counter is an [`AtomicU64`] shared between the
135/// dispatcher (which decrements via `try_consume`) and the subscriber
136/// (which increments via `request`). The CAS loop in `try_consume`
137/// ensures correctness under concurrent access.
138pub struct DemandBackpressure {
139    /// Pending demand: subscriber has requested this many more events.
140    pending: Arc<AtomicU64>,
141}
142
143/// Handle given to the subscriber to request more events.
144///
145/// Created by [`DemandBackpressure::new`]. The subscriber calls
146/// [`request`](Self::request) to increase pending demand.
147#[derive(Clone)]
148pub struct DemandHandle {
149    pending: Arc<AtomicU64>,
150}
151
152impl DemandBackpressure {
153    /// Creates a new demand-based backpressure pair with initial demand of 0.
154    ///
155    /// Returns `(controller, handle)` where the controller is held by the
156    /// dispatcher and the handle is given to the subscriber.
157    #[must_use]
158    pub fn new() -> (Self, DemandHandle) {
159        let pending = Arc::new(AtomicU64::new(0));
160        let handle = DemandHandle {
161            pending: Arc::clone(&pending),
162        };
163        (Self { pending }, handle)
164    }
165
166    /// Attempts to consume one unit of demand.
167    ///
168    /// Returns `true` if demand was available (and decremented), `false`
169    /// if pending demand was 0. Uses a CAS loop for lock-free correctness.
170    #[inline]
171    #[must_use]
172    pub fn try_consume(&self) -> bool {
173        loop {
174            let current = self.pending.load(Ordering::Acquire);
175            if current == 0 {
176                return false;
177            }
178            if self
179                .pending
180                .compare_exchange_weak(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
181                .is_ok()
182            {
183                return true;
184            }
185        }
186    }
187
188    /// Returns the current pending demand.
189    #[must_use]
190    pub fn pending(&self) -> u64 {
191        self.pending.load(Ordering::Acquire)
192    }
193}
194
195impl DemandHandle {
196    /// Requests `n` more events from the dispatcher.
197    ///
198    /// Atomically adds `n` to the pending demand counter.
199    pub fn request(&self, n: u64) {
200        self.pending.fetch_add(n, Ordering::Release);
201    }
202
203    /// Returns the current pending demand.
204    #[must_use]
205    pub fn pending(&self) -> u64 {
206        self.pending.load(Ordering::Acquire)
207    }
208}
209
210// ===========================================================================
211// Tests
212// ===========================================================================
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    // --- BackpressureController tests ---
219
220    #[test]
221    fn test_backpressure_drop_oldest() {
222        let mut ctrl = BackpressureController::new(BackpressureStrategy::DropOldest);
223        for _ in 0..100 {
224            assert!(ctrl.should_deliver());
225        }
226        assert_eq!(ctrl.dropped(), 0);
227    }
228
229    #[test]
230    fn test_backpressure_drop_newest() {
231        let mut ctrl = BackpressureController::new(BackpressureStrategy::DropNewest);
232        assert!(ctrl.should_deliver());
233        ctrl.record_drop();
234        ctrl.record_drop();
235        assert_eq!(ctrl.dropped(), 2);
236    }
237
238    #[test]
239    fn test_backpressure_sample() {
240        let mut ctrl = BackpressureController::new(BackpressureStrategy::Sample(3));
241        let mut delivered = Vec::new();
242        for i in 0..12 {
243            if ctrl.should_deliver() {
244                delivered.push(i);
245            }
246        }
247        assert_eq!(delivered, vec![2, 5, 8, 11]);
248        assert_eq!(ctrl.dropped(), 8);
249    }
250
251    #[test]
252    fn test_backpressure_controller_dropped_count() {
253        let mut ctrl = BackpressureController::new(BackpressureStrategy::DropNewest);
254        assert_eq!(ctrl.dropped(), 0);
255        ctrl.record_drop();
256        assert_eq!(ctrl.dropped(), 1);
257        ctrl.record_drop();
258        ctrl.record_drop();
259        assert_eq!(ctrl.dropped(), 3);
260    }
261
262    #[test]
263    fn test_backpressure_strategy_accessor() {
264        let ctrl = BackpressureController::new(BackpressureStrategy::Block);
265        assert_eq!(ctrl.strategy(), BackpressureStrategy::Block);
266    }
267
268    #[test]
269    fn test_lagging_subscriber_detection() {
270        let ctrl =
271            BackpressureController::with_lag_threshold(BackpressureStrategy::DropOldest, 500);
272        assert!(!ctrl.is_lagging(499));
273        assert!(ctrl.is_lagging(500));
274        assert!(ctrl.is_lagging(1000));
275        assert_eq!(ctrl.lag_warning_threshold(), 500);
276    }
277
278    // --- DemandBackpressure tests ---
279
280    #[test]
281    fn test_backpressure_demand_request() {
282        let (demand, handle) = DemandBackpressure::new();
283        assert_eq!(demand.pending(), 0);
284
285        handle.request(5);
286        assert_eq!(demand.pending(), 5);
287
288        for _ in 0..5 {
289            assert!(demand.try_consume());
290        }
291        assert!(!demand.try_consume());
292        assert_eq!(demand.pending(), 0);
293    }
294
295    #[test]
296    fn test_backpressure_demand_zero() {
297        let (demand, _handle) = DemandBackpressure::new();
298        assert!(!demand.try_consume());
299        assert!(!demand.try_consume());
300    }
301
302    #[test]
303    fn test_backpressure_demand_concurrent() {
304        let (demand, handle) = DemandBackpressure::new();
305        let demand = Arc::new(demand);
306        let handle = Arc::new(handle);
307
308        let h = Arc::clone(&handle);
309        let requester = std::thread::spawn(move || {
310            for _ in 0..100 {
311                h.request(100);
312            }
313        });
314
315        let d = Arc::clone(&demand);
316        let consumer = std::thread::spawn(move || {
317            let mut consumed = 0u64;
318            loop {
319                if d.try_consume() {
320                    consumed += 1;
321                    if consumed == 10_000 {
322                        break;
323                    }
324                }
325                std::thread::yield_now();
326            }
327            consumed
328        });
329
330        requester.join().unwrap();
331        let total_consumed = consumer.join().unwrap();
332        assert_eq!(total_consumed, 10_000);
333        assert_eq!(demand.pending(), 0);
334    }
335
336    #[test]
337    fn test_demand_handle_clone() {
338        let (demand, handle1) = DemandBackpressure::new();
339        let handle2 = handle1.clone();
340        handle1.request(3);
341        handle2.request(2);
342        assert_eq!(demand.pending(), 5);
343    }
344}