tycho_util/mem/
reclaimer.rs1use std::cell::Cell;
12use std::num::NonZeroUsize;
13use std::sync::{Arc, Condvar, Mutex, OnceLock};
14use std::thread;
15use std::time::{Duration, Instant};
16
17use crossbeam_deque::{Injector, Steal};
18
19use crate::metrics::HistogramGuard;
20
21static INSTANCE: OnceLock<Reclaimer> = OnceLock::new();
22
23pub struct Reclaimer {
26 inner: Arc<Inner>,
27}
28
29impl Reclaimer {
30 const QUEUE_CAPACITY: NonZeroUsize = NonZeroUsize::new(10).unwrap();
31 const WARN_THRESHOLD: Duration = Duration::from_millis(10);
32 const DEFAULT_WORKERS: NonZeroUsize = NonZeroUsize::new(2).unwrap();
33
34 pub fn init(
35 queue_capacity: NonZeroUsize,
36 worker_count: NonZeroUsize,
37 ) -> Result<(), ReclaimerError> {
38 let mut did_init = false;
39
40 let _ = INSTANCE.get_or_init(|| {
41 did_init = true;
42 Self::with_workers(queue_capacity, worker_count)
43 });
44
45 if did_init {
46 Ok(())
47 } else {
48 Err(ReclaimerError::AlreadyInitialized)
49 }
50 }
51
52 fn with_workers(queue_capacity: NonZeroUsize, worker_count: NonZeroUsize) -> Self {
53 let inner = Arc::new(Inner::new(queue_capacity));
54 Self::start_workers(inner.clone(), worker_count);
55 Self { inner }
56 }
57
58 pub fn instance() -> &'static Reclaimer {
60 INSTANCE.get_or_init(|| Self::with_workers(Self::QUEUE_CAPACITY, Self::DEFAULT_WORKERS))
61 }
62
63 pub fn drop<T>(&self, value: T)
65 where
66 T: Send + 'static,
67 {
68 DROP_FLAGS.with(|flags| {
69 let flags_before = flags.get();
70 if flags_before & FLAG_DROPPING != 0 {
71 return;
73 }
74
75 let inside_tokio = tokio::runtime::Handle::try_current().is_ok();
76 if inside_tokio || flags_before & FLAG_ALLOW_IN_PLACE == 0 {
77 flags.set(flags_before | FLAG_DROPPING);
79
80 let start = Instant::now();
81 metrics::counter!("tycho_delayed_drop_enqueued").increment(1);
82
83 self.inner.enqueue(Box::new(value), inside_tokio, start);
84
85 flags.set(flags_before);
87 } else {
88 drop(value);
90 }
91 });
92 }
93
94 pub fn drop_in_place<T>(&self, value: T)
98 where
99 T: Send + 'static,
100 {
101 DROP_FLAGS.with(|flags| {
102 let flags_before = flags.get();
103 flags.set(flags_before | FLAG_ALLOW_IN_PLACE);
104 self.drop(value);
105 flags.set(flags_before);
106 });
107 }
108}
109
110thread_local! {
111 static DROP_FLAGS: Cell<u8> = const { Cell::new(0) };
112}
113
114const FLAG_DROPPING: u8 = 0x01;
115const FLAG_ALLOW_IN_PLACE: u8 = 0b10;
116
117struct Inner {
118 queue: Injector<Box<dyn Send>>,
119 state: Mutex<State>,
120 not_empty: Condvar,
121 not_full: Condvar,
122 capacity: NonZeroUsize,
123}
124
125struct State {
126 len: usize,
127}
128
129impl Reclaimer {
130 fn start_workers(inner: Arc<Inner>, worker_total: NonZeroUsize) {
131 for worker_index in 0..worker_total.get() {
132 let inner = inner.clone();
133 thread::Builder::new()
134 .name("tycho-reclaimer".into())
135 .spawn(move || Inner::worker_loop(inner, worker_index))
136 .expect("failed to spawn reclaimer worker");
137 }
138 }
139}
140
141impl Inner {
142 fn new(capacity: NonZeroUsize) -> Self {
143 Self {
144 queue: Injector::new(),
145 state: Mutex::new(State { len: 0 }),
146 not_empty: Condvar::new(),
147 not_full: Condvar::new(),
148 capacity,
149 }
150 }
151
152 fn enqueue(&self, item: Box<dyn Send>, inside_tokio: bool, start: Instant) {
153 {
154 let mut state = self.state.lock().expect("poisoned");
155
156 while state.len >= self.capacity.get() {
157 state = self.not_full.wait(state).expect("poisoned");
158 }
159 state.len += 1;
160 }
161
162 self.queue.push(item);
163 self.not_empty.notify_one();
164
165 if inside_tokio {
166 let elapsed = start.elapsed();
167 if elapsed > Reclaimer::WARN_THRESHOLD {
168 tracing::warn!(
169 elapsed_ms = elapsed.as_millis(),
170 "delayed drop queue was full for too long"
171 );
172 }
173 }
174 }
175
176 fn pop(&self) -> Box<dyn Send> {
177 loop {
178 match self.queue.steal() {
179 Steal::Success(item) => {
180 {
181 let mut state = self.state.lock().expect("poisoned");
182 assert!(state.len > 0);
183 state.len -= 1;
184 }
185 self.not_full.notify_one();
186 return item;
187 }
188 Steal::Retry => {
189 std::hint::spin_loop();
190 }
191 Steal::Empty => {
192 let mut state = self.state.lock().expect("poisoned");
193 while state.len == 0 {
194 state = self.not_empty.wait(state).expect("poisoned");
195 }
196 }
197 }
198 }
199 }
200
201 fn worker_loop(inner: Arc<Self>, worker_index: usize) {
202 tracing::info!(?worker_index, "reclaimer worker started");
203 scopeguard::defer! { tracing::info!(?worker_index, "reclaimer worker finished"); };
204
205 DROP_FLAGS.set(FLAG_DROPPING);
206
207 loop {
208 let item = inner.pop();
209 let histogram = HistogramGuard::begin("tycho_delayed_drop_time");
210 metrics::counter!("tycho_delayed_drop_dropped").increment(1);
211 drop(item);
212 histogram.finish();
213 }
214 }
215}
216
217#[derive(Debug, thiserror::Error)]
218pub enum ReclaimerError {
219 #[error("Reclaimer was already initialized")]
220 AlreadyInitialized,
221}
222
223#[cfg(test)]
224mod tests {
225 use std::sync::mpsc;
226 use std::thread;
227 use std::time::Duration;
228
229 use super::*;
230
231 struct Tracer(mpsc::Sender<thread::ThreadId>);
233
234 impl Drop for Tracer {
235 fn drop(&mut self) {
236 let _ = self.0.send(thread::current().id());
238 }
239 }
240
241 #[test]
242 fn drops_in_background() {
243 let (tx, rx) = mpsc::channel::<thread::ThreadId>();
244 let origin = thread::current().id();
245
246 Reclaimer::instance().drop(Tracer(tx));
247
248 let dropped_on = rx
249 .recv_timeout(Duration::from_secs(3))
250 .expect("value was not dropped in time");
251
252 assert_ne!(dropped_on, origin, "drop did not occur on a worker thread");
254 }
255
256 #[test]
257 fn drops_in_place() {
258 let (tx, rx) = mpsc::channel::<thread::ThreadId>();
259 let origin = thread::current().id();
260
261 Reclaimer::instance().drop_in_place(Tracer(tx));
262
263 let dropped_on = rx
264 .recv_timeout(Duration::from_secs(3))
265 .expect("value was not dropped in time");
266
267 assert_eq!(dropped_on, origin, "didn't drop in place");
268 }
269
270 #[test]
271 fn double_init_will_err() {
272 let _ = Reclaimer::init(Reclaimer::QUEUE_CAPACITY, Reclaimer::DEFAULT_WORKERS);
275
276 let second = Reclaimer::init(Reclaimer::QUEUE_CAPACITY, Reclaimer::DEFAULT_WORKERS);
278 assert!(
279 matches!(second, Err(ReclaimerError::AlreadyInitialized)),
280 "second init should always fail"
281 );
282 }
283
284 #[test]
285 fn burst_is_dropped() {
286 let reclaimer = Reclaimer::instance();
287
288 assert_eq!(
289 reclaimer.inner.capacity.get(),
290 Reclaimer::QUEUE_CAPACITY.get()
291 );
292
293 assert_eq!(reclaimer.inner.state.lock().unwrap().len, 0);
294
295 let total = Reclaimer::QUEUE_CAPACITY.get() * 100;
296
297 let (tx, rx) = mpsc::channel::<thread::ThreadId>();
299 let origin = thread::current().id();
300
301 for _ in 0..total {
302 Reclaimer::instance().drop(Tracer(tx.clone()));
303 }
304 drop(tx); let now = Instant::now();
307 let mut received = 0usize;
308
309 while received < total {
310 let elapsed = now.elapsed();
311 if elapsed > Duration::from_secs(3) {
312 panic!(
313 "timed out waiting for drops: {received} of {total} received in {elapsed:?}"
314 );
315 }
316
317 match rx.recv_timeout(Duration::from_millis(100)) {
318 Ok(worker_id) => {
319 assert_ne!(
320 worker_id, origin,
321 "Each drop should come from a worker thread"
322 );
323 received += 1;
324 }
325 Err(mpsc::RecvTimeoutError::Timeout) => {
326 }
328 Err(e) => panic!("{e}"),
329 }
330 }
331
332 assert_eq!(reclaimer.inner.state.lock().unwrap().len, 0);
333 }
334}