Skip to main content

laminar_core/tpc/
backpressure.rs

1//! # Credit-Based Flow Control
2//!
3//! Implements credit-based backpressure similar to Apache Flink's network stack.
4//!
5//! ## How It Works
6//!
7//! ```text
8//! ┌──────────┐                      ┌──────────┐
9//! │  Sender  │                      │ Receiver │
10//! │          │<── Credits (N=4) ────│          │
11//! │          │                      │          │
12//! │          │── Data + Backlog ───>│          │
13//! │          │                      │          │
14//! │          │<── Credits (N=2) ────│          │
15//! └──────────┘                      └──────────┘
16//! ```
17//!
18//! 1. Receiver grants initial credits (buffer slots) to sender
19//! 2. Sender decrements credits when sending, includes backlog size
20//! 3. Receiver processes data and returns credits based on capacity
21//! 4. If sender has no credits, it must wait or apply overflow strategy
22//!
23//! ## Credit Types
24//!
25//! - **Exclusive credits**: Fixed per-sender, always available
26//! - **Floating credits**: Shared pool, allocated based on backlog priority
27//!
28//! This design prevents buffer overflow while maximizing throughput.
29
30// Credits are bounded by configuration to be within safe range for casting
31// Max credits: 65535 (u16::MAX) to ensure clean i64/f64 conversions
32
33use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
34use std::sync::Arc;
35
36/// Configuration for backpressure handling.
37#[derive(Debug, Clone)]
38pub struct BackpressureConfig {
39    /// Initial exclusive credits per sender (like Flink's buffers-per-channel).
40    /// These credits are always reserved for a specific sender.
41    pub exclusive_credits: usize,
42
43    /// Floating credits shared across all senders (like Flink's floating-buffers-per-gate).
44    /// Allocated dynamically based on backlog priority.
45    pub floating_credits: usize,
46
47    /// Strategy when credits are exhausted.
48    pub overflow_strategy: OverflowStrategy,
49
50    /// High watermark - when queue reaches this %, start throttling.
51    /// Value between 0.0 and 1.0.
52    pub high_watermark: f64,
53
54    /// Low watermark - when queue drops below this %, resume normal flow.
55    /// Value between 0.0 and 1.0.
56    pub low_watermark: f64,
57}
58
59impl Default for BackpressureConfig {
60    fn default() -> Self {
61        Self {
62            exclusive_credits: 4,
63            floating_credits: 8,
64            overflow_strategy: OverflowStrategy::Block,
65            high_watermark: 0.8,
66            low_watermark: 0.5,
67        }
68    }
69}
70
71impl BackpressureConfig {
72    /// Creates a new configuration builder.
73    #[must_use]
74    pub fn builder() -> BackpressureConfigBuilder {
75        BackpressureConfigBuilder::default()
76    }
77
78    /// Total credits available (exclusive + floating).
79    #[must_use]
80    pub fn total_credits(&self) -> usize {
81        self.exclusive_credits + self.floating_credits
82    }
83}
84
85/// Builder for `BackpressureConfig`.
86#[derive(Debug, Default)]
87pub struct BackpressureConfigBuilder {
88    exclusive_credits: Option<usize>,
89    floating_credits: Option<usize>,
90    overflow_strategy: Option<OverflowStrategy>,
91    high_watermark: Option<f64>,
92    low_watermark: Option<f64>,
93}
94
95impl BackpressureConfigBuilder {
96    /// Sets exclusive credits per sender.
97    #[must_use]
98    pub fn exclusive_credits(mut self, credits: usize) -> Self {
99        self.exclusive_credits = Some(credits);
100        self
101    }
102
103    /// Sets floating credits (shared pool).
104    #[must_use]
105    pub fn floating_credits(mut self, credits: usize) -> Self {
106        self.floating_credits = Some(credits);
107        self
108    }
109
110    /// Sets the overflow strategy.
111    #[must_use]
112    pub fn overflow_strategy(mut self, strategy: OverflowStrategy) -> Self {
113        self.overflow_strategy = Some(strategy);
114        self
115    }
116
117    /// Sets the high watermark (0.0 to 1.0).
118    #[must_use]
119    pub fn high_watermark(mut self, watermark: f64) -> Self {
120        self.high_watermark = Some(watermark.clamp(0.0, 1.0));
121        self
122    }
123
124    /// Sets the low watermark (0.0 to 1.0).
125    #[must_use]
126    pub fn low_watermark(mut self, watermark: f64) -> Self {
127        self.low_watermark = Some(watermark.clamp(0.0, 1.0));
128        self
129    }
130
131    /// Builds the configuration.
132    #[must_use]
133    pub fn build(self) -> BackpressureConfig {
134        BackpressureConfig {
135            exclusive_credits: self.exclusive_credits.unwrap_or(4).min(u16::MAX as usize),
136            floating_credits: self.floating_credits.unwrap_or(8).min(u16::MAX as usize),
137            overflow_strategy: self.overflow_strategy.unwrap_or(OverflowStrategy::Block),
138            high_watermark: self.high_watermark.unwrap_or(0.8),
139            low_watermark: self.low_watermark.unwrap_or(0.5),
140        }
141    }
142}
143
144/// Strategy for handling overflow when credits are exhausted.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum OverflowStrategy {
147    /// Block the sender until credits become available.
148    /// Best for exactly-once semantics where no data loss is acceptable.
149    Block,
150
151    /// Drop the data and record metrics.
152    /// Best for best-effort streams where latency matters more than completeness.
153    Drop,
154
155    /// Return an error immediately without blocking or dropping.
156    /// Caller decides what to do.
157    Error,
158}
159
160/// Result of attempting to acquire credits.
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub enum CreditAcquireResult {
163    /// Credits acquired successfully.
164    Acquired,
165    /// No credits available, would need to wait.
166    WouldBlock,
167    /// Credits exhausted and overflow strategy is Drop.
168    Dropped,
169}
170
171/// Manages credits for a single receiver (core).
172///
173/// Thread-safe for use between sender and receiver threads.
174#[derive(Debug)]
175pub struct CreditGate {
176    /// Available credits (can go negative temporarily during high contention).
177    available: AtomicI64,
178    /// Maximum credits (exclusive + floating).
179    max_credits: usize,
180    /// Configuration.
181    config: BackpressureConfig,
182    /// Metrics.
183    metrics: CreditMetrics,
184}
185
186impl CreditGate {
187    /// Creates a new credit gate with the given configuration.
188    #[must_use]
189    pub fn new(config: BackpressureConfig) -> Self {
190        let max_credits = config.total_credits();
191        Self {
192            #[allow(clippy::cast_possible_wrap)] // Safe: bounded by u16::MAX in build()
193            available: AtomicI64::new(max_credits as i64),
194            max_credits,
195            config,
196            metrics: CreditMetrics::new(),
197        }
198    }
199
200    /// Attempts to acquire one credit.
201    ///
202    /// Returns the result based on the overflow strategy.
203    pub fn try_acquire(&self) -> CreditAcquireResult {
204        self.try_acquire_n(1)
205    }
206
207    /// Attempts to acquire N credits.
208    pub fn try_acquire_n(&self, n: usize) -> CreditAcquireResult {
209        let n = i64::try_from(n).unwrap_or(i64::MAX);
210
211        // Try to acquire credits atomically
212        let mut current = self.available.load(Ordering::Acquire);
213        loop {
214            if current < n {
215                // Not enough credits
216                self.metrics.record_blocked();
217                return match self.config.overflow_strategy {
218                    OverflowStrategy::Drop => {
219                        self.metrics.record_dropped(u64::try_from(n).unwrap_or(0));
220                        CreditAcquireResult::Dropped
221                    }
222                    // Both Block and Error return WouldBlock - caller handles differently
223                    OverflowStrategy::Block | OverflowStrategy::Error => {
224                        CreditAcquireResult::WouldBlock
225                    }
226                };
227            }
228
229            match self.available.compare_exchange_weak(
230                current,
231                current - n,
232                Ordering::AcqRel,
233                Ordering::Acquire,
234            ) {
235                Ok(_) => {
236                    self.metrics.record_acquired(u64::try_from(n).unwrap_or(0));
237                    return CreditAcquireResult::Acquired;
238                }
239                Err(actual) => current = actual,
240            }
241        }
242    }
243
244    /// Acquires credits, spinning until available.
245    ///
246    /// Only use when `OverflowStrategy::Block` is configured.
247    pub fn acquire_blocking(&self, n: usize) {
248        loop {
249            match self.try_acquire_n(n) {
250                // Success or dropped (shouldn't happen with Block strategy)
251                CreditAcquireResult::Acquired | CreditAcquireResult::Dropped => return,
252                CreditAcquireResult::WouldBlock => {
253                    // Spin with backoff
254                    std::hint::spin_loop();
255                }
256            }
257        }
258    }
259
260    /// Releases credits back to the pool.
261    ///
262    /// Called by receiver after processing data.
263    pub fn release(&self, n: usize) {
264        let n = i64::try_from(n).unwrap_or(i64::MAX);
265        let prev = self.available.fetch_add(n, Ordering::Release);
266
267        // Clamp to max (in case of over-release)
268        let new_val = prev + n;
269        if new_val > {
270            #[allow(clippy::cast_possible_wrap)]
271            let max = self.max_credits as i64;
272            max
273        } {
274            // Try to correct, but don't worry if it fails (another thread may have acquired)
275            let _ = self.available.compare_exchange(
276                new_val,
277                {
278                    #[allow(clippy::cast_possible_wrap)]
279                    let max = self.max_credits as i64;
280                    max
281                },
282                Ordering::AcqRel,
283                Ordering::Relaxed,
284            );
285        }
286
287        self.metrics.record_released(u64::try_from(n).unwrap_or(0));
288    }
289
290    /// Returns the number of available credits.
291    #[must_use]
292    pub fn available(&self) -> usize {
293        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
294        // Safe: load().max(0) is >= 0
295        let val = self.available.load(Ordering::Relaxed).max(0) as usize;
296        val
297    }
298
299    /// Returns the maximum credits.
300    #[must_use]
301    pub fn max_credits(&self) -> usize {
302        self.max_credits
303    }
304
305    /// Returns true if backpressure is active (credits below threshold).
306    #[must_use]
307    pub fn is_backpressured(&self) -> bool {
308        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio
309        let available = self.available() as f64;
310        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio calculation
311        let max = self.max_credits as f64;
312        (available / max) < (1.0 - self.config.high_watermark)
313    }
314
315    /// Returns true if backpressure has cleared (credits above low watermark).
316    #[must_use]
317    pub fn is_recovered(&self) -> bool {
318        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio
319        let available = self.available() as f64;
320        #[allow(clippy::cast_precision_loss)] // Acceptable for ratio calculation
321        let max = self.max_credits as f64;
322        (available / max) >= (1.0 - self.config.low_watermark)
323    }
324
325    /// Returns the configuration.
326    #[must_use]
327    pub fn config(&self) -> &BackpressureConfig {
328        &self.config
329    }
330
331    /// Returns the metrics.
332    #[must_use]
333    pub fn metrics(&self) -> &CreditMetrics {
334        &self.metrics
335    }
336
337    /// Resets the gate to initial state.
338    pub fn reset(&self) {
339        #[allow(clippy::cast_possible_wrap)] // Safe: max_credits bounded to u16::MAX
340        self.available
341            .store(self.max_credits as i64, Ordering::Release);
342        self.metrics.reset();
343    }
344}
345
346/// Metrics for credit-based flow control.
347#[derive(Debug)]
348pub struct CreditMetrics {
349    /// Total credits acquired.
350    credits_acquired: AtomicU64,
351    /// Total credits released.
352    credits_released: AtomicU64,
353    /// Times sender was blocked due to no credits.
354    times_blocked: AtomicU64,
355    /// Items dropped due to overflow.
356    items_dropped: AtomicU64,
357}
358
359impl CreditMetrics {
360    /// Creates new metrics.
361    fn new() -> Self {
362        Self {
363            credits_acquired: AtomicU64::new(0),
364            credits_released: AtomicU64::new(0),
365            times_blocked: AtomicU64::new(0),
366            items_dropped: AtomicU64::new(0),
367        }
368    }
369
370    fn record_acquired(&self, n: u64) {
371        self.credits_acquired.fetch_add(n, Ordering::Relaxed);
372    }
373
374    fn record_released(&self, n: u64) {
375        self.credits_released.fetch_add(n, Ordering::Relaxed);
376    }
377
378    fn record_blocked(&self) {
379        self.times_blocked.fetch_add(1, Ordering::Relaxed);
380    }
381
382    fn record_dropped(&self, n: u64) {
383        self.items_dropped.fetch_add(n, Ordering::Relaxed);
384    }
385
386    /// Resets all metrics.
387    fn reset(&self) {
388        self.credits_acquired.store(0, Ordering::Relaxed);
389        self.credits_released.store(0, Ordering::Relaxed);
390        self.times_blocked.store(0, Ordering::Relaxed);
391        self.items_dropped.store(0, Ordering::Relaxed);
392    }
393
394    /// Returns total credits acquired.
395    #[must_use]
396    pub fn credits_acquired(&self) -> u64 {
397        self.credits_acquired.load(Ordering::Relaxed)
398    }
399
400    /// Returns total credits released.
401    #[must_use]
402    pub fn credits_released(&self) -> u64 {
403        self.credits_released.load(Ordering::Relaxed)
404    }
405
406    /// Returns times sender was blocked.
407    #[must_use]
408    pub fn times_blocked(&self) -> u64 {
409        self.times_blocked.load(Ordering::Relaxed)
410    }
411
412    /// Returns items dropped due to overflow.
413    #[must_use]
414    pub fn items_dropped(&self) -> u64 {
415        self.items_dropped.load(Ordering::Relaxed)
416    }
417
418    /// Returns a snapshot of all metrics.
419    #[must_use]
420    pub fn snapshot(&self) -> CreditMetricsSnapshot {
421        CreditMetricsSnapshot {
422            credits_acquired: self.credits_acquired(),
423            credits_released: self.credits_released(),
424            times_blocked: self.times_blocked(),
425            items_dropped: self.items_dropped(),
426        }
427    }
428}
429
430/// Snapshot of credit metrics for reporting.
431#[derive(Debug, Clone, Copy)]
432pub struct CreditMetricsSnapshot {
433    /// Total credits acquired.
434    pub credits_acquired: u64,
435    /// Total credits released.
436    pub credits_released: u64,
437    /// Times sender was blocked.
438    pub times_blocked: u64,
439    /// Items dropped due to overflow.
440    pub items_dropped: u64,
441}
442
443impl CreditMetricsSnapshot {
444    /// Returns credits currently in flight (acquired - released).
445    #[allow(clippy::cast_possible_wrap)] // Metrics are u64, difference fits in i64 unless skewed
446    #[must_use]
447    pub fn credits_in_flight(&self) -> i64 {
448        (self.credits_acquired as i64).saturating_sub(self.credits_released as i64)
449    }
450}
451
452/// Manages credit flow between a sender and receiver.
453///
454/// This is a convenience wrapper that combines a `CreditGate` with
455/// additional sender-side state like backlog tracking.
456#[derive(Debug)]
457pub struct CreditChannel {
458    /// The credit gate (shared with receiver).
459    gate: Arc<CreditGate>,
460    /// Current backlog at sender (items waiting to be sent).
461    /// Wrapped in `Arc` for safe sharing with `CreditSender`.
462    backlog: Arc<AtomicU64>,
463}
464
465impl CreditChannel {
466    /// Creates a new credit channel.
467    #[must_use]
468    pub fn new(config: BackpressureConfig) -> Self {
469        Self {
470            gate: Arc::new(CreditGate::new(config)),
471            backlog: Arc::new(AtomicU64::new(0)),
472        }
473    }
474
475    /// Returns a handle for the sender side.
476    #[must_use]
477    pub fn sender(&self) -> CreditSender {
478        CreditSender {
479            gate: Arc::clone(&self.gate),
480            backlog: Arc::clone(&self.backlog),
481        }
482    }
483
484    /// Returns a handle for the receiver side.
485    #[must_use]
486    pub fn receiver(&self) -> CreditReceiver {
487        CreditReceiver {
488            gate: Arc::clone(&self.gate),
489        }
490    }
491
492    /// Returns the credit gate.
493    #[must_use]
494    pub fn gate(&self) -> &CreditGate {
495        &self.gate
496    }
497}
498
499/// Sender-side handle for credit flow control.
500#[derive(Debug, Clone)]
501pub struct CreditSender {
502    gate: Arc<CreditGate>,
503    /// Shared backlog counter with the channel.
504    backlog: Arc<AtomicU64>,
505}
506
507impl CreditSender {
508    /// Attempts to send (acquire credit).
509    #[must_use]
510    pub fn try_send(&self) -> CreditAcquireResult {
511        self.gate.try_acquire()
512    }
513
514    /// Sends with blocking if needed.
515    pub fn send_blocking(&self) {
516        self.gate.acquire_blocking(1);
517    }
518
519    /// Reports backlog size to receiver.
520    pub fn set_backlog(&self, size: u64) {
521        self.backlog.store(size, Ordering::Relaxed);
522    }
523
524    /// Returns current backlog.
525    #[must_use]
526    pub fn backlog(&self) -> u64 {
527        self.backlog.load(Ordering::Relaxed)
528    }
529
530    /// Returns available credits.
531    #[must_use]
532    pub fn available_credits(&self) -> usize {
533        self.gate.available()
534    }
535
536    /// Returns true if backpressured.
537    #[must_use]
538    pub fn is_backpressured(&self) -> bool {
539        self.gate.is_backpressured()
540    }
541}
542
543/// Receiver-side handle for credit flow control.
544#[derive(Debug, Clone)]
545pub struct CreditReceiver {
546    gate: Arc<CreditGate>,
547}
548
549impl CreditReceiver {
550    /// Releases credits after processing.
551    pub fn release(&self, n: usize) {
552        self.gate.release(n);
553    }
554
555    /// Returns available credits.
556    #[must_use]
557    pub fn available_credits(&self) -> usize {
558        self.gate.available()
559    }
560
561    /// Returns true if recovered from backpressure.
562    #[must_use]
563    pub fn is_recovered(&self) -> bool {
564        self.gate.is_recovered()
565    }
566
567    /// Returns metrics.
568    #[must_use]
569    pub fn metrics(&self) -> &CreditMetrics {
570        self.gate.metrics()
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[test]
579    fn test_credit_gate_basic() {
580        let config = BackpressureConfig {
581            exclusive_credits: 2,
582            floating_credits: 2,
583            overflow_strategy: OverflowStrategy::Block,
584            ..Default::default()
585        };
586        let gate = CreditGate::new(config);
587
588        assert_eq!(gate.available(), 4);
589        assert_eq!(gate.max_credits(), 4);
590
591        // Acquire all credits
592        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
593        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
594        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
595        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
596        assert_eq!(gate.available(), 0);
597
598        // Should block now
599        assert_eq!(gate.try_acquire(), CreditAcquireResult::WouldBlock);
600
601        // Release some
602        gate.release(2);
603        assert_eq!(gate.available(), 2);
604
605        // Can acquire again
606        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
607        assert_eq!(gate.available(), 1);
608    }
609
610    #[test]
611    fn test_credit_gate_drop_strategy() {
612        let config = BackpressureConfig {
613            exclusive_credits: 1,
614            floating_credits: 0,
615            overflow_strategy: OverflowStrategy::Drop,
616            ..Default::default()
617        };
618        let gate = CreditGate::new(config);
619
620        assert_eq!(gate.try_acquire(), CreditAcquireResult::Acquired);
621        assert_eq!(gate.try_acquire(), CreditAcquireResult::Dropped);
622
623        assert_eq!(gate.metrics().items_dropped(), 1);
624    }
625
626    #[test]
627    fn test_credit_gate_batch_acquire() {
628        let config = BackpressureConfig {
629            exclusive_credits: 4,
630            floating_credits: 4,
631            overflow_strategy: OverflowStrategy::Block,
632            ..Default::default()
633        };
634        let gate = CreditGate::new(config);
635
636        assert_eq!(gate.try_acquire_n(5), CreditAcquireResult::Acquired);
637        assert_eq!(gate.available(), 3);
638
639        assert_eq!(gate.try_acquire_n(4), CreditAcquireResult::WouldBlock);
640        assert_eq!(gate.try_acquire_n(3), CreditAcquireResult::Acquired);
641        assert_eq!(gate.available(), 0);
642    }
643
644    #[test]
645    fn test_credit_channel() {
646        let config = BackpressureConfig::default();
647        let channel = CreditChannel::new(config);
648
649        let sender = channel.sender();
650        let receiver = channel.receiver();
651
652        // Sender can send
653        assert_eq!(sender.try_send(), CreditAcquireResult::Acquired);
654        sender.set_backlog(10);
655        assert_eq!(sender.backlog(), 10);
656
657        // Receiver releases
658        receiver.release(1);
659        assert_eq!(receiver.available_credits(), channel.gate().max_credits());
660    }
661
662    #[test]
663    fn test_backpressure_watermarks() {
664        let config = BackpressureConfig {
665            exclusive_credits: 10,
666            floating_credits: 0,
667            high_watermark: 0.8, // Backpressure when <20% available
668            low_watermark: 0.5,  // Recovered when >50% available
669            ..Default::default()
670        };
671        let gate = CreditGate::new(config);
672
673        // Initially not backpressured
674        assert!(!gate.is_backpressured());
675        assert!(gate.is_recovered());
676
677        // Use 9 credits (10% available) - should be backpressured
678        for _ in 0..9 {
679            gate.try_acquire();
680        }
681        assert!(gate.is_backpressured());
682        assert!(!gate.is_recovered());
683
684        // Release 4 (50% available) - should be recovered
685        gate.release(4);
686        assert!(!gate.is_backpressured());
687        assert!(gate.is_recovered());
688    }
689
690    #[test]
691    fn test_metrics_snapshot() {
692        let config = BackpressureConfig::default();
693        let gate = CreditGate::new(config);
694
695        gate.try_acquire();
696        gate.try_acquire();
697        gate.release(1);
698
699        let snapshot = gate.metrics().snapshot();
700        assert_eq!(snapshot.credits_acquired, 2);
701        assert_eq!(snapshot.credits_released, 1);
702        assert_eq!(snapshot.credits_in_flight(), 1);
703    }
704
705    #[test]
706    fn test_config_builder() {
707        let config = BackpressureConfig::builder()
708            .exclusive_credits(8)
709            .floating_credits(16)
710            .overflow_strategy(OverflowStrategy::Drop)
711            .high_watermark(0.9)
712            .low_watermark(0.6)
713            .build();
714
715        assert_eq!(config.exclusive_credits, 8);
716        assert_eq!(config.floating_credits, 16);
717        assert_eq!(config.overflow_strategy, OverflowStrategy::Drop);
718        assert!((config.high_watermark - 0.9).abs() < f64::EPSILON);
719        assert!((config.low_watermark - 0.6).abs() < f64::EPSILON);
720        assert_eq!(config.total_credits(), 24);
721    }
722
723    #[test]
724    fn test_concurrent_acquire_release() {
725        use std::sync::Arc;
726        use std::thread;
727
728        let config = BackpressureConfig {
729            exclusive_credits: 100,
730            floating_credits: 0,
731            overflow_strategy: OverflowStrategy::Block,
732            ..Default::default()
733        };
734        let gate = Arc::new(CreditGate::new(config));
735
736        let gate_sender = Arc::clone(&gate);
737        let gate_receiver = Arc::clone(&gate);
738
739        let sender = thread::spawn(move || {
740            let mut acquired = 0;
741            for _ in 0..1000 {
742                if gate_sender.try_acquire() == CreditAcquireResult::Acquired {
743                    acquired += 1;
744                }
745                // Simulate some work
746                std::hint::spin_loop();
747            }
748            acquired
749        });
750
751        let receiver = thread::spawn(move || {
752            let mut released = 0;
753            for _ in 0..500 {
754                gate_receiver.release(1);
755                released += 1;
756                std::hint::spin_loop();
757            }
758            released
759        });
760
761        let acquired = sender.join().unwrap();
762        let released = receiver.join().unwrap();
763
764        // Should have acquired some (up to 100 initial + 500 released)
765        assert!(acquired > 0);
766        assert_eq!(released, 500);
767    }
768}