solana_perf/
recycler.rs

1use {
2    rand::{thread_rng, Rng},
3    std::sync::{
4        atomic::{AtomicBool, AtomicUsize, Ordering},
5        Arc, Mutex, Weak,
6    },
7};
8
9// A temporary burst in the workload can cause a large number of allocations,
10// after which they will be recycled and still reside in memory. If the number
11// of recycled objects stays above below limit for long, they will be deemed as
12// redundant since they are not getting reused. The recycler will then shrink
13// by releasing objects above this threshold. This limit aims to maintain a
14// cushion against *normal* variations in the workload while bounding the
15// number of redundant garbage collected objects after temporary bursts.
16const RECYCLER_SHRINK_SIZE: usize = 1024;
17
18// Lookback window for exponential moving averaging number of garbage collected
19// objects in terms of number of allocations. The half-life of the decaying
20// factor based on the window size defined below is 11356. This means a sample
21// of gc.size() that is 11356 allocations ago has half of the weight as the most
22// recent sample of gc.size() at current allocation.
23const RECYCLER_SHRINK_WINDOW: usize = 16384;
24
25#[derive(Debug, Default)]
26struct RecyclerStats {
27    total: AtomicUsize,
28    reuse: AtomicUsize,
29    freed: AtomicUsize,
30    max_gc: AtomicUsize,
31}
32
33#[derive(Clone, Default)]
34pub struct Recycler<T> {
35    recycler: Arc<RecyclerX<T>>,
36}
37
38#[derive(Debug)]
39pub struct RecyclerX<T> {
40    gc: Mutex<Vec<T>>,
41    stats: RecyclerStats,
42    id: usize,
43    // Shrink window times the exponential moving average size of gc.len().
44    size_factor: AtomicUsize,
45}
46
47impl<T: Default> Default for RecyclerX<T> {
48    fn default() -> RecyclerX<T> {
49        let id = thread_rng().gen_range(0..1000);
50        trace!("new recycler..{id}");
51        RecyclerX {
52            gc: Mutex::default(),
53            stats: RecyclerStats::default(),
54            id,
55            size_factor: AtomicUsize::default(),
56        }
57    }
58}
59
60#[cfg(feature = "frozen-abi")]
61impl solana_frozen_abi::abi_example::AbiExample
62    for RecyclerX<crate::cuda_runtime::PinnedVec<solana_packet::Packet>>
63{
64    fn example() -> Self {
65        Self::default()
66    }
67}
68
69pub trait Reset {
70    fn reset(&mut self);
71    fn warm(&mut self, size_hint: usize);
72    fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
73    where
74        Self: std::marker::Sized;
75}
76
77static WARM_RECYCLERS: AtomicBool = AtomicBool::new(false);
78
79pub fn enable_recycler_warming() {
80    WARM_RECYCLERS.store(true, Ordering::Relaxed);
81}
82
83fn warm_recyclers() -> bool {
84    WARM_RECYCLERS.load(Ordering::Relaxed)
85}
86
87impl<T: Default + Reset + Sized> Recycler<T> {
88    pub fn warmed(num: usize, size_hint: usize) -> Self {
89        let new = Self::default();
90        if warm_recyclers() {
91            let warmed_items: Vec<_> = (0..num)
92                .map(|_| {
93                    let mut item = new.allocate("warming");
94                    item.warm(size_hint);
95                    item
96                })
97                .collect();
98            warmed_items
99                .into_iter()
100                .for_each(|i| new.recycler.recycle(i));
101        }
102        new
103    }
104
105    pub fn allocate(&self, name: &'static str) -> T {
106        {
107            const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2;
108            const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
109            let mut gc = self.recycler.gc.lock().unwrap();
110            // Update the exponential moving average of gc.len().
111            // The update equation is:
112            //      a <- a * (n - 1) / n + x / n
113            // To avoid floating point math, define b = n a:
114            //      b <- b * (n - 1) / n + x
115            // To make the remaining division to round (instead of truncate),
116            // add n/2 to the numerator.
117            // Effectively b (size_factor here) is an exponential moving
118            // estimate of the "sum" of x (gc.len()) over the window as opposed
119            // to the "average".
120            self.recycler.size_factor.store(
121                self.recycler
122                    .size_factor
123                    .load(Ordering::Acquire)
124                    .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
125                    .saturating_add(RECYCLER_SHRINK_WINDOW_HALF)
126                    .checked_div(RECYCLER_SHRINK_WINDOW)
127                    .unwrap()
128                    .saturating_add(gc.len()),
129                Ordering::Release,
130            );
131            if let Some(mut x) = gc.pop() {
132                self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
133                x.reset();
134                return x;
135            }
136        }
137        let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
138        trace!(
139            "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
140            total,
141            name,
142            self.recycler.id,
143            self.recycler.stats.reuse.load(Ordering::Relaxed),
144            self.recycler.stats.max_gc.load(Ordering::Relaxed),
145        );
146
147        let mut t = T::default();
148        t.set_recycler(Arc::downgrade(&self.recycler));
149        t
150    }
151}
152
153impl<T: Default + Reset> RecyclerX<T> {
154    pub fn recycle(&self, x: T) {
155        let len = {
156            let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
157            gc.push(x);
158            const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
159            if gc.len() > RECYCLER_SHRINK_SIZE
160                && self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK
161            {
162                self.stats.freed.fetch_add(
163                    gc.len().saturating_sub(RECYCLER_SHRINK_SIZE),
164                    Ordering::Relaxed,
165                );
166                for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
167                    x.set_recycler(Weak::default());
168                }
169                self.size_factor
170                    .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
171            }
172            gc.len()
173        };
174
175        let max_gc = self.stats.max_gc.load(Ordering::Relaxed);
176        if len > max_gc {
177            // this is not completely accurate, but for most cases should be fine.
178            let _ = self.stats.max_gc.compare_exchange(
179                max_gc,
180                len,
181                Ordering::Relaxed,
182                Ordering::Relaxed,
183            );
184        }
185        let total = self.stats.total.load(Ordering::Relaxed);
186        let reuse = self.stats.reuse.load(Ordering::Relaxed);
187        let freed = self.stats.freed.load(Ordering::Relaxed);
188        datapoint_debug!(
189            "recycler",
190            ("gc_len", len as i64, i64),
191            ("total", total as i64, i64),
192            ("freed", freed as i64, i64),
193            ("reuse", reuse as i64, i64),
194        );
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use {super::*, crate::packet::PacketBatchRecycler, std::iter::repeat_with};
201
202    impl Reset for u64 {
203        fn reset(&mut self) {
204            *self = 10;
205        }
206        fn warm(&mut self, _size_hint: usize) {}
207        fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
208    }
209
210    #[test]
211    fn test_recycler() {
212        let recycler = Recycler::default();
213        let mut y: u64 = recycler.allocate("test_recycler1");
214        assert_eq!(y, 0);
215        y = 20;
216        let recycler2 = recycler.clone();
217        recycler2.recycler.recycle(y);
218        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
219        let z = recycler.allocate("test_recycler2");
220        assert_eq!(z, 10);
221        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
222    }
223
224    #[test]
225    fn test_recycler_shrink() {
226        let mut rng = rand::thread_rng();
227        let recycler = PacketBatchRecycler::default();
228        // Allocate a burst of packets.
229        const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2;
230        {
231            let _packets: Vec<_> = repeat_with(|| recycler.allocate(""))
232                .take(NUM_PACKETS)
233                .collect();
234        }
235        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS);
236        // Process a normal load of packets for a while.
237        for _ in 0..RECYCLER_SHRINK_WINDOW / 16 {
238            let count = rng.gen_range(1..128);
239            let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect();
240        }
241        // Assert that the gc size has shrunk.
242        assert_eq!(
243            recycler.recycler.gc.lock().unwrap().len(),
244            RECYCLER_SHRINK_SIZE
245        );
246    }
247}