nodedb_cluster/mirror/throttle.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Send-rate throttle for cross-cluster mirror observers.
4//!
5//! The source side tracks bytes-in-flight for every connected mirror observer.
6//! When the in-flight byte count exceeds [`SendThrottle::cap`] the source
7//! stops pushing new snapshot chunks or log entries for that mirror until the
8//! observer drains below [`SendThrottle::resume_threshold`].
9//!
10//! # Design
11//!
12//! The throttle is intentionally simple: a per-mirror `AtomicU64` counter.
13//! The source increments it before writing a chunk over the QUIC stream and
14//! decrements it when the observer's ack arrives. If the ack is never
15//! received (e.g. the observer crashes mid-stream) the counter stays high
16//! until the link is torn down, at which point [`SendThrottle::reset`] brings
17//! it back to zero.
18//!
19//! This matches the `ObserverState::MAX_PENDING` entry-count cap in
20//! `nodedb-raft` but operates at the byte level so large snapshot chunks do
21//! not slip through.
22
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26/// Default bytes-in-flight cap per mirror observer: 64 MiB.
27pub const DEFAULT_CAP_BYTES: u64 = 64 * 1024 * 1024;
28
29/// Resume threshold as a fraction of the cap (resume when below 75% of cap).
30const RESUME_FRACTION: f64 = 0.75;
31
32/// Per-mirror send throttle shared between the sender task and the ack handler.
33///
34/// Clone-cheap: backed by `Arc<AtomicU64>`.
35#[derive(Debug, Clone)]
36pub struct SendThrottle {
37 /// Current bytes in flight to this mirror (acknowledged bytes have been
38 /// decremented).
39 in_flight: Arc<AtomicU64>,
40 /// Maximum bytes in flight before the source pauses sends.
41 cap: u64,
42 /// Threshold below which sends resume after being paused.
43 resume_threshold: u64,
44}
45
46impl SendThrottle {
47 /// Create a new throttle with the given cap.
48 pub fn new(cap: u64) -> Self {
49 let resume_threshold = (cap as f64 * RESUME_FRACTION) as u64;
50 Self {
51 in_flight: Arc::new(AtomicU64::new(0)),
52 cap,
53 resume_threshold,
54 }
55 }
56
57 /// Create a throttle with [`DEFAULT_CAP_BYTES`].
58 pub fn default_cap() -> Self {
59 Self::new(DEFAULT_CAP_BYTES)
60 }
61
62 /// Current bytes in flight (approximate, relaxed ordering).
63 pub fn in_flight(&self) -> u64 {
64 self.in_flight.load(Ordering::Relaxed)
65 }
66
67 /// Whether the source should currently send to this mirror.
68 ///
69 /// Returns `true` when `in_flight < cap`.
70 pub fn can_send(&self) -> bool {
71 self.in_flight.load(Ordering::Acquire) < self.cap
72 }
73
74 /// Whether the source may resume after being throttled.
75 ///
76 /// Returns `true` when `in_flight <= resume_threshold`.
77 pub fn can_resume(&self) -> bool {
78 self.in_flight.load(Ordering::Acquire) <= self.resume_threshold
79 }
80
81 /// Record that `bytes` have been dispatched toward the mirror.
82 ///
83 /// Call this before writing a chunk to the QUIC stream.
84 pub fn charge(&self, bytes: u64) {
85 self.in_flight.fetch_add(bytes, Ordering::AcqRel);
86 }
87
88 /// Record that `bytes` have been acknowledged by the mirror.
89 ///
90 /// Saturates at zero to guard against double-acks.
91 pub fn ack(&self, bytes: u64) {
92 let prev = self.in_flight.load(Ordering::Acquire);
93 let new = prev.saturating_sub(bytes);
94 // Attempt the CAS. Contention is rare (one ack task per mirror) so a
95 // simple exchange is fine here; we do not need perfect accuracy.
96 self.in_flight.store(new, Ordering::Release);
97 }
98
99 /// Reset the counter to zero (called on link teardown).
100 pub fn reset(&self) {
101 self.in_flight.store(0, Ordering::Release);
102 }
103
104 /// The configured cap in bytes.
105 pub fn cap(&self) -> u64 {
106 self.cap
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn throttle_charge_and_ack() {
116 let t = SendThrottle::new(100);
117 assert!(t.can_send());
118 t.charge(80);
119 assert!(t.can_send());
120 t.charge(30);
121 // 110 >= cap 100 → cannot send
122 assert!(!t.can_send());
123 t.ack(30);
124 // 80 <= resume 75 → false; 80 > 75 → still above resume
125 assert!(!t.can_resume());
126 t.ack(10);
127 // 70 <= 75 → can resume
128 assert!(t.can_resume());
129 }
130
131 #[test]
132 fn throttle_reset_zeroes_counter() {
133 let t = SendThrottle::new(100);
134 t.charge(99);
135 t.reset();
136 assert_eq!(t.in_flight(), 0);
137 assert!(t.can_send());
138 }
139
140 #[test]
141 fn ack_does_not_underflow() {
142 let t = SendThrottle::new(100);
143 t.charge(10);
144 t.ack(50); // ack more than in flight
145 assert_eq!(t.in_flight(), 0);
146 }
147
148 #[test]
149 fn clone_shares_state() {
150 let t = SendThrottle::new(100);
151 let t2 = t.clone();
152 t.charge(60);
153 assert_eq!(t2.in_flight(), 60);
154 }
155}