laminar_core/subscription/
backpressure.rs1use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22
23use crate::subscription::registry::BackpressureStrategy;
24
25pub struct BackpressureController {
35 strategy: BackpressureStrategy,
37 dropped: u64,
39 sample_counter: u64,
41 lag_warning_threshold: u64,
43}
44
45impl BackpressureController {
46 #[must_use]
48 pub fn new(strategy: BackpressureStrategy) -> Self {
49 Self {
50 strategy,
51 dropped: 0,
52 sample_counter: 0,
53 lag_warning_threshold: 1000,
54 }
55 }
56
57 #[must_use]
59 pub fn with_lag_threshold(strategy: BackpressureStrategy, threshold: u64) -> Self {
60 Self {
61 strategy,
62 dropped: 0,
63 sample_counter: 0,
64 lag_warning_threshold: threshold,
65 }
66 }
67
68 #[inline]
75 pub fn should_deliver(&mut self) -> bool {
76 match self.strategy {
77 BackpressureStrategy::DropOldest
78 | BackpressureStrategy::DropNewest
79 | BackpressureStrategy::Block => true,
80 BackpressureStrategy::Sample(n) => {
81 self.sample_counter += 1;
82 if self.sample_counter.is_multiple_of(n as u64) {
83 true
84 } else {
85 self.dropped += 1;
86 false
87 }
88 }
89 }
90 }
91
92 pub fn record_drop(&mut self) {
94 self.dropped += 1;
95 }
96
97 #[must_use]
99 pub fn dropped(&self) -> u64 {
100 self.dropped
101 }
102
103 #[must_use]
105 pub fn strategy(&self) -> BackpressureStrategy {
106 self.strategy
107 }
108
109 #[must_use]
111 pub fn lag_warning_threshold(&self) -> u64 {
112 self.lag_warning_threshold
113 }
114
115 #[must_use]
117 pub fn is_lagging(&self, lag: u64) -> bool {
118 lag >= self.lag_warning_threshold
119 }
120}
121
122pub struct DemandBackpressure {
139 pending: Arc<AtomicU64>,
141}
142
143#[derive(Clone)]
148pub struct DemandHandle {
149 pending: Arc<AtomicU64>,
150}
151
152impl DemandBackpressure {
153 #[must_use]
158 pub fn new() -> (Self, DemandHandle) {
159 let pending = Arc::new(AtomicU64::new(0));
160 let handle = DemandHandle {
161 pending: Arc::clone(&pending),
162 };
163 (Self { pending }, handle)
164 }
165
166 #[inline]
171 #[must_use]
172 pub fn try_consume(&self) -> bool {
173 loop {
174 let current = self.pending.load(Ordering::Acquire);
175 if current == 0 {
176 return false;
177 }
178 if self
179 .pending
180 .compare_exchange_weak(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
181 .is_ok()
182 {
183 return true;
184 }
185 }
186 }
187
188 #[must_use]
190 pub fn pending(&self) -> u64 {
191 self.pending.load(Ordering::Acquire)
192 }
193}
194
195impl DemandHandle {
196 pub fn request(&self, n: u64) {
200 self.pending.fetch_add(n, Ordering::Release);
201 }
202
203 #[must_use]
205 pub fn pending(&self) -> u64 {
206 self.pending.load(Ordering::Acquire)
207 }
208}
209
210#[cfg(test)]
215mod tests {
216 use super::*;
217
218 #[test]
221 fn test_backpressure_drop_oldest() {
222 let mut ctrl = BackpressureController::new(BackpressureStrategy::DropOldest);
223 for _ in 0..100 {
224 assert!(ctrl.should_deliver());
225 }
226 assert_eq!(ctrl.dropped(), 0);
227 }
228
229 #[test]
230 fn test_backpressure_drop_newest() {
231 let mut ctrl = BackpressureController::new(BackpressureStrategy::DropNewest);
232 assert!(ctrl.should_deliver());
233 ctrl.record_drop();
234 ctrl.record_drop();
235 assert_eq!(ctrl.dropped(), 2);
236 }
237
238 #[test]
239 fn test_backpressure_sample() {
240 let mut ctrl = BackpressureController::new(BackpressureStrategy::Sample(3));
241 let mut delivered = Vec::new();
242 for i in 0..12 {
243 if ctrl.should_deliver() {
244 delivered.push(i);
245 }
246 }
247 assert_eq!(delivered, vec![2, 5, 8, 11]);
248 assert_eq!(ctrl.dropped(), 8);
249 }
250
251 #[test]
252 fn test_backpressure_controller_dropped_count() {
253 let mut ctrl = BackpressureController::new(BackpressureStrategy::DropNewest);
254 assert_eq!(ctrl.dropped(), 0);
255 ctrl.record_drop();
256 assert_eq!(ctrl.dropped(), 1);
257 ctrl.record_drop();
258 ctrl.record_drop();
259 assert_eq!(ctrl.dropped(), 3);
260 }
261
262 #[test]
263 fn test_backpressure_strategy_accessor() {
264 let ctrl = BackpressureController::new(BackpressureStrategy::Block);
265 assert_eq!(ctrl.strategy(), BackpressureStrategy::Block);
266 }
267
268 #[test]
269 fn test_lagging_subscriber_detection() {
270 let ctrl =
271 BackpressureController::with_lag_threshold(BackpressureStrategy::DropOldest, 500);
272 assert!(!ctrl.is_lagging(499));
273 assert!(ctrl.is_lagging(500));
274 assert!(ctrl.is_lagging(1000));
275 assert_eq!(ctrl.lag_warning_threshold(), 500);
276 }
277
278 #[test]
281 fn test_backpressure_demand_request() {
282 let (demand, handle) = DemandBackpressure::new();
283 assert_eq!(demand.pending(), 0);
284
285 handle.request(5);
286 assert_eq!(demand.pending(), 5);
287
288 for _ in 0..5 {
289 assert!(demand.try_consume());
290 }
291 assert!(!demand.try_consume());
292 assert_eq!(demand.pending(), 0);
293 }
294
295 #[test]
296 fn test_backpressure_demand_zero() {
297 let (demand, _handle) = DemandBackpressure::new();
298 assert!(!demand.try_consume());
299 assert!(!demand.try_consume());
300 }
301
302 #[test]
303 fn test_backpressure_demand_concurrent() {
304 let (demand, handle) = DemandBackpressure::new();
305 let demand = Arc::new(demand);
306 let handle = Arc::new(handle);
307
308 let h = Arc::clone(&handle);
309 let requester = std::thread::spawn(move || {
310 for _ in 0..100 {
311 h.request(100);
312 }
313 });
314
315 let d = Arc::clone(&demand);
316 let consumer = std::thread::spawn(move || {
317 let mut consumed = 0u64;
318 loop {
319 if d.try_consume() {
320 consumed += 1;
321 if consumed == 10_000 {
322 break;
323 }
324 }
325 std::thread::yield_now();
326 }
327 consumed
328 });
329
330 requester.join().unwrap();
331 let total_consumed = consumer.join().unwrap();
332 assert_eq!(total_consumed, 10_000);
333 assert_eq!(demand.pending(), 0);
334 }
335
336 #[test]
337 fn test_demand_handle_clone() {
338 let (demand, handle1) = DemandBackpressure::new();
339 let handle2 = handle1.clone();
340 handle1.request(3);
341 handle2.request(2);
342 assert_eq!(demand.pending(), 5);
343 }
344}