Skip to main content

nodedb_cluster/mirror/
throttle.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Send-rate throttle for cross-cluster mirror observers.
4//!
5//! The source side tracks bytes-in-flight for every connected mirror observer.
6//! When the in-flight byte count exceeds [`SendThrottle::cap`] the source
7//! stops pushing new snapshot chunks or log entries for that mirror until the
8//! observer drains below [`SendThrottle::resume_threshold`].
9//!
10//! # Design
11//!
12//! The throttle is intentionally simple: a per-mirror `AtomicU64` counter.
13//! The source increments it before writing a chunk over the QUIC stream and
14//! decrements it when the observer's ack arrives.  If the ack is never
15//! received (e.g. the observer crashes mid-stream) the counter stays high
16//! until the link is torn down, at which point [`SendThrottle::reset`] brings
17//! it back to zero.
18//!
19//! This matches the `ObserverState::MAX_PENDING` entry-count cap in
20//! `nodedb-raft` but operates at the byte level so large snapshot chunks do
21//! not slip through.
22
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26/// Default bytes-in-flight cap per mirror observer: 64 MiB.
27pub const DEFAULT_CAP_BYTES: u64 = 64 * 1024 * 1024;
28
29/// Resume threshold as a fraction of the cap (resume when below 75% of cap).
30const RESUME_FRACTION: f64 = 0.75;
31
32/// Per-mirror send throttle shared between the sender task and the ack handler.
33///
34/// Clone-cheap: backed by `Arc<AtomicU64>`.
35#[derive(Debug, Clone)]
36pub struct SendThrottle {
37    /// Current bytes in flight to this mirror (acknowledged bytes have been
38    /// decremented).
39    in_flight: Arc<AtomicU64>,
40    /// Maximum bytes in flight before the source pauses sends.
41    cap: u64,
42    /// Threshold below which sends resume after being paused.
43    resume_threshold: u64,
44}
45
46impl SendThrottle {
47    /// Create a new throttle with the given cap.
48    pub fn new(cap: u64) -> Self {
49        let resume_threshold = (cap as f64 * RESUME_FRACTION) as u64;
50        Self {
51            in_flight: Arc::new(AtomicU64::new(0)),
52            cap,
53            resume_threshold,
54        }
55    }
56
57    /// Create a throttle with [`DEFAULT_CAP_BYTES`].
58    pub fn default_cap() -> Self {
59        Self::new(DEFAULT_CAP_BYTES)
60    }
61
62    /// Current bytes in flight (approximate, relaxed ordering).
63    pub fn in_flight(&self) -> u64 {
64        self.in_flight.load(Ordering::Relaxed)
65    }
66
67    /// Whether the source should currently send to this mirror.
68    ///
69    /// Returns `true` when `in_flight < cap`.
70    pub fn can_send(&self) -> bool {
71        self.in_flight.load(Ordering::Acquire) < self.cap
72    }
73
74    /// Whether the source may resume after being throttled.
75    ///
76    /// Returns `true` when `in_flight <= resume_threshold`.
77    pub fn can_resume(&self) -> bool {
78        self.in_flight.load(Ordering::Acquire) <= self.resume_threshold
79    }
80
81    /// Record that `bytes` have been dispatched toward the mirror.
82    ///
83    /// Call this before writing a chunk to the QUIC stream.
84    pub fn charge(&self, bytes: u64) {
85        self.in_flight.fetch_add(bytes, Ordering::AcqRel);
86    }
87
88    /// Record that `bytes` have been acknowledged by the mirror.
89    ///
90    /// Saturates at zero to guard against double-acks.
91    pub fn ack(&self, bytes: u64) {
92        let prev = self.in_flight.load(Ordering::Acquire);
93        let new = prev.saturating_sub(bytes);
94        // Attempt the CAS.  Contention is rare (one ack task per mirror) so a
95        // simple exchange is fine here; we do not need perfect accuracy.
96        self.in_flight.store(new, Ordering::Release);
97    }
98
99    /// Reset the counter to zero (called on link teardown).
100    pub fn reset(&self) {
101        self.in_flight.store(0, Ordering::Release);
102    }
103
104    /// The configured cap in bytes.
105    pub fn cap(&self) -> u64 {
106        self.cap
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[test]
115    fn throttle_charge_and_ack() {
116        let t = SendThrottle::new(100);
117        assert!(t.can_send());
118        t.charge(80);
119        assert!(t.can_send());
120        t.charge(30);
121        // 110 >= cap 100 → cannot send
122        assert!(!t.can_send());
123        t.ack(30);
124        // 80 <= resume 75 → false; 80 > 75 → still above resume
125        assert!(!t.can_resume());
126        t.ack(10);
127        // 70 <= 75 → can resume
128        assert!(t.can_resume());
129    }
130
131    #[test]
132    fn throttle_reset_zeroes_counter() {
133        let t = SendThrottle::new(100);
134        t.charge(99);
135        t.reset();
136        assert_eq!(t.in_flight(), 0);
137        assert!(t.can_send());
138    }
139
140    #[test]
141    fn ack_does_not_underflow() {
142        let t = SendThrottle::new(100);
143        t.charge(10);
144        t.ack(50); // ack more than in flight
145        assert_eq!(t.in_flight(), 0);
146    }
147
148    #[test]
149    fn clone_shares_state() {
150        let t = SendThrottle::new(100);
151        let t2 = t.clone();
152        t.charge(60);
153        assert_eq!(t2.in_flight(), 60);
154    }
155}