solana_perf/
data_budget.rs

1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2
3#[derive(Default)]
4pub struct DataBudget {
5    // Amount of bytes we have in the budget to send.
6    bytes: AtomicUsize,
7    // Last time that we upped the bytes count, used
8    // to detect when to up the bytes budget again
9    last_timestamp_ms: AtomicU64,
10}
11
12impl DataBudget {
13    /// Create a data budget with max bytes, used for tests
14    pub fn restricted() -> Self {
15        Self {
16            bytes: AtomicUsize::default(),
17            last_timestamp_ms: AtomicU64::new(u64::MAX),
18        }
19    }
20
21    // If there are enough bytes in the budget, consumes from
22    // the budget and returns true. Otherwise returns false.
23    #[must_use]
24    pub fn take(&self, size: usize) -> bool {
25        let mut budget = self.bytes.load(Ordering::Acquire);
26        loop {
27            if budget < size {
28                return false;
29            }
30            match self.bytes.compare_exchange_weak(
31                budget,
32                budget.saturating_sub(size),
33                Ordering::AcqRel,
34                Ordering::Acquire,
35            ) {
36                Ok(_) => return true,
37                Err(bytes) => budget = bytes,
38            }
39        }
40    }
41
42    // Updates timestamp and returns true, if at least given milliseconds
43    // has passed since last update. Otherwise returns false.
44    fn can_update(&self, duration_millis: u64) -> bool {
45        let now = solana_sdk::timing::timestamp();
46        let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire);
47        loop {
48            if now < last_timestamp.saturating_add(duration_millis) {
49                return false;
50            }
51            match self.last_timestamp_ms.compare_exchange_weak(
52                last_timestamp,
53                now,
54                Ordering::AcqRel,
55                Ordering::Acquire,
56            ) {
57                Ok(_) => return true,
58                Err(ts) => last_timestamp = ts,
59            }
60        }
61    }
62
63    /// Updates the budget if at least given milliseconds has passed since last
64    /// update. Updater function maps current value of bytes to the new one.
65    /// Returns current data-budget after the update.
66    pub fn update<F>(&self, duration_millis: u64, updater: F) -> usize
67    where
68        F: Fn(usize) -> usize,
69    {
70        if self.can_update(duration_millis) {
71            let mut bytes = self.bytes.load(Ordering::Acquire);
72            loop {
73                match self.bytes.compare_exchange_weak(
74                    bytes,
75                    updater(bytes),
76                    Ordering::AcqRel,
77                    Ordering::Acquire,
78                ) {
79                    Ok(_) => break,
80                    Err(b) => bytes = b,
81                }
82            }
83        }
84        self.bytes.load(Ordering::Acquire)
85    }
86
87    // Non-atomic clone only for tests and simulations.
88    pub fn clone_non_atomic(&self) -> Self {
89        Self {
90            bytes: AtomicUsize::new(self.bytes.load(Ordering::Acquire)),
91            last_timestamp_ms: AtomicU64::new(self.last_timestamp_ms.load(Ordering::Acquire)),
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use {super::*, std::time::Duration};
99
100    #[test]
101    fn test_data_budget() {
102        let budget = DataBudget::default();
103        assert!(!budget.take(1)); // budget = 0.
104
105        assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); // budget updates to 5.
106        assert!(budget.take(1));
107        assert!(budget.take(2));
108        assert!(!budget.take(3)); // budget = 2, out of budget.
109
110        assert_eq!(budget.update(30, |_| 10), 2); // no update, budget = 2.
111        assert!(!budget.take(3)); // budget = 2, out of budget.
112
113        std::thread::sleep(Duration::from_millis(50));
114        assert_eq!(budget.update(30, |bytes| bytes * 2), 4); // budget updates to 4.
115
116        assert!(budget.take(3));
117        assert!(budget.take(1));
118        assert!(!budget.take(1)); // budget = 0.
119    }
120}