1use {
2 rand::{thread_rng, Rng},
3 std::sync::{
4 atomic::{AtomicBool, AtomicUsize, Ordering},
5 Arc, Mutex, Weak,
6 },
7};
8
9const RECYCLER_SHRINK_SIZE: usize = 1024;
17
18const RECYCLER_SHRINK_WINDOW: usize = 16384;
24
25#[derive(Debug, Default)]
26struct RecyclerStats {
27 total: AtomicUsize,
28 reuse: AtomicUsize,
29 freed: AtomicUsize,
30 max_gc: AtomicUsize,
31}
32
33#[derive(Clone, Default)]
34pub struct Recycler<T> {
35 recycler: Arc<RecyclerX<T>>,
36}
37
38#[derive(Debug)]
39pub struct RecyclerX<T> {
40 gc: Mutex<Vec<T>>,
41 stats: RecyclerStats,
42 id: usize,
43 size_factor: AtomicUsize,
45}
46
47impl<T: Default> Default for RecyclerX<T> {
48 fn default() -> RecyclerX<T> {
49 let id = thread_rng().gen_range(0..1000);
50 trace!("new recycler..{id}");
51 RecyclerX {
52 gc: Mutex::default(),
53 stats: RecyclerStats::default(),
54 id,
55 size_factor: AtomicUsize::default(),
56 }
57 }
58}
59
60#[cfg(feature = "frozen-abi")]
61impl solana_frozen_abi::abi_example::AbiExample
62 for RecyclerX<crate::cuda_runtime::PinnedVec<solana_packet::Packet>>
63{
64 fn example() -> Self {
65 Self::default()
66 }
67}
68
69pub trait Reset {
70 fn reset(&mut self);
71 fn warm(&mut self, size_hint: usize);
72 fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
73 where
74 Self: std::marker::Sized;
75}
76
77static WARM_RECYCLERS: AtomicBool = AtomicBool::new(false);
78
79pub fn enable_recycler_warming() {
80 WARM_RECYCLERS.store(true, Ordering::Relaxed);
81}
82
83fn warm_recyclers() -> bool {
84 WARM_RECYCLERS.load(Ordering::Relaxed)
85}
86
87impl<T: Default + Reset + Sized> Recycler<T> {
88 pub fn warmed(num: usize, size_hint: usize) -> Self {
89 let new = Self::default();
90 if warm_recyclers() {
91 let warmed_items: Vec<_> = (0..num)
92 .map(|_| {
93 let mut item = new.allocate("warming");
94 item.warm(size_hint);
95 item
96 })
97 .collect();
98 warmed_items
99 .into_iter()
100 .for_each(|i| new.recycler.recycle(i));
101 }
102 new
103 }
104
105 pub fn allocate(&self, name: &'static str) -> T {
106 {
107 const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2;
108 const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
109 let mut gc = self.recycler.gc.lock().unwrap();
110 self.recycler.size_factor.store(
121 self.recycler
122 .size_factor
123 .load(Ordering::Acquire)
124 .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
125 .saturating_add(RECYCLER_SHRINK_WINDOW_HALF)
126 .checked_div(RECYCLER_SHRINK_WINDOW)
127 .unwrap()
128 .saturating_add(gc.len()),
129 Ordering::Release,
130 );
131 if let Some(mut x) = gc.pop() {
132 self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
133 x.reset();
134 return x;
135 }
136 }
137 let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
138 trace!(
139 "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
140 total,
141 name,
142 self.recycler.id,
143 self.recycler.stats.reuse.load(Ordering::Relaxed),
144 self.recycler.stats.max_gc.load(Ordering::Relaxed),
145 );
146
147 let mut t = T::default();
148 t.set_recycler(Arc::downgrade(&self.recycler));
149 t
150 }
151}
152
153impl<T: Default + Reset> RecyclerX<T> {
154 pub fn recycle(&self, x: T) {
155 let len = {
156 let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
157 gc.push(x);
158 const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
159 if gc.len() > RECYCLER_SHRINK_SIZE
160 && self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK
161 {
162 self.stats.freed.fetch_add(
163 gc.len().saturating_sub(RECYCLER_SHRINK_SIZE),
164 Ordering::Relaxed,
165 );
166 for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
167 x.set_recycler(Weak::default());
168 }
169 self.size_factor
170 .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
171 }
172 gc.len()
173 };
174
175 let max_gc = self.stats.max_gc.load(Ordering::Relaxed);
176 if len > max_gc {
177 let _ = self.stats.max_gc.compare_exchange(
179 max_gc,
180 len,
181 Ordering::Relaxed,
182 Ordering::Relaxed,
183 );
184 }
185 let total = self.stats.total.load(Ordering::Relaxed);
186 let reuse = self.stats.reuse.load(Ordering::Relaxed);
187 let freed = self.stats.freed.load(Ordering::Relaxed);
188 datapoint_debug!(
189 "recycler",
190 ("gc_len", len as i64, i64),
191 ("total", total as i64, i64),
192 ("freed", freed as i64, i64),
193 ("reuse", reuse as i64, i64),
194 );
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use {super::*, crate::packet::PacketBatchRecycler, std::iter::repeat_with};
201
202 impl Reset for u64 {
203 fn reset(&mut self) {
204 *self = 10;
205 }
206 fn warm(&mut self, _size_hint: usize) {}
207 fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
208 }
209
210 #[test]
211 fn test_recycler() {
212 let recycler = Recycler::default();
213 let mut y: u64 = recycler.allocate("test_recycler1");
214 assert_eq!(y, 0);
215 y = 20;
216 let recycler2 = recycler.clone();
217 recycler2.recycler.recycle(y);
218 assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
219 let z = recycler.allocate("test_recycler2");
220 assert_eq!(z, 10);
221 assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
222 }
223
224 #[test]
225 fn test_recycler_shrink() {
226 let mut rng = rand::thread_rng();
227 let recycler = PacketBatchRecycler::default();
228 const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2;
230 {
231 let _packets: Vec<_> = repeat_with(|| recycler.allocate(""))
232 .take(NUM_PACKETS)
233 .collect();
234 }
235 assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS);
236 for _ in 0..RECYCLER_SHRINK_WINDOW / 16 {
238 let count = rng.gen_range(1..128);
239 let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect();
240 }
241 assert_eq!(
243 recycler.recycler.gc.lock().unwrap().len(),
244 RECYCLER_SHRINK_SIZE
245 );
246 }
247}