solana_perf/
data_budget.rs1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2
3#[derive(Default)]
4pub struct DataBudget {
5 bytes: AtomicUsize,
7 last_timestamp_ms: AtomicU64,
10}
11
12impl DataBudget {
13 pub fn restricted() -> Self {
15 Self {
16 bytes: AtomicUsize::default(),
17 last_timestamp_ms: AtomicU64::new(u64::MAX),
18 }
19 }
20
21 #[must_use]
24 pub fn take(&self, size: usize) -> bool {
25 let mut budget = self.bytes.load(Ordering::Acquire);
26 loop {
27 if budget < size {
28 return false;
29 }
30 match self.bytes.compare_exchange_weak(
31 budget,
32 budget.saturating_sub(size),
33 Ordering::AcqRel,
34 Ordering::Acquire,
35 ) {
36 Ok(_) => return true,
37 Err(bytes) => budget = bytes,
38 }
39 }
40 }
41
42 fn can_update(&self, duration_millis: u64) -> bool {
45 let now = solana_sdk::timing::timestamp();
46 let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire);
47 loop {
48 if now < last_timestamp.saturating_add(duration_millis) {
49 return false;
50 }
51 match self.last_timestamp_ms.compare_exchange_weak(
52 last_timestamp,
53 now,
54 Ordering::AcqRel,
55 Ordering::Acquire,
56 ) {
57 Ok(_) => return true,
58 Err(ts) => last_timestamp = ts,
59 }
60 }
61 }
62
63 pub fn update<F>(&self, duration_millis: u64, updater: F) -> usize
67 where
68 F: Fn(usize) -> usize,
69 {
70 if self.can_update(duration_millis) {
71 let mut bytes = self.bytes.load(Ordering::Acquire);
72 loop {
73 match self.bytes.compare_exchange_weak(
74 bytes,
75 updater(bytes),
76 Ordering::AcqRel,
77 Ordering::Acquire,
78 ) {
79 Ok(_) => break,
80 Err(b) => bytes = b,
81 }
82 }
83 }
84 self.bytes.load(Ordering::Acquire)
85 }
86
87 pub fn clone_non_atomic(&self) -> Self {
89 Self {
90 bytes: AtomicUsize::new(self.bytes.load(Ordering::Acquire)),
91 last_timestamp_ms: AtomicU64::new(self.last_timestamp_ms.load(Ordering::Acquire)),
92 }
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use {super::*, std::time::Duration};
99
100 #[test]
101 fn test_data_budget() {
102 let budget = DataBudget::default();
103 assert!(!budget.take(1)); assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); assert!(budget.take(1));
107 assert!(budget.take(2));
108 assert!(!budget.take(3)); assert_eq!(budget.update(30, |_| 10), 2); assert!(!budget.take(3)); std::thread::sleep(Duration::from_millis(50));
114 assert_eq!(budget.update(30, |bytes| bytes * 2), 4); assert!(budget.take(3));
117 assert!(budget.take(1));
118 assert!(!budget.take(1)); }
120}