tycho_util/mem/
reclaimer.rs

1//! Delayed drop queue: offload heavy drops to a background thread.
2//!
3//! Use `Reclaimer::instance().drop(value)` to enqueue any `Send + 'static`
4//! value for background drop.
5//!
6//! Metrics:
7//! - `tycho_delayed_drop_enqueued` — incremented when an item is enqueued.
8//! - `tycho_delayed_drop_dropped` — incremented when an item is actually dropped.
9//!   The current queue size can be estimated as `enqueued - dropped`.
10
11use std::cell::Cell;
12use std::num::NonZeroUsize;
13use std::sync::{Arc, Condvar, Mutex, OnceLock};
14use std::thread;
15use std::time::{Duration, Instant};
16
17use crossbeam_deque::{Injector, Steal};
18
19use crate::metrics::HistogramGuard;
20
21static INSTANCE: OnceLock<Reclaimer> = OnceLock::new();
22
23/// Reclaimer: wrapper around a bounded work-stealing queue that drops values
24/// on background worker threads.
25pub struct Reclaimer {
26    inner: Arc<Inner>,
27}
28
29impl Reclaimer {
30    const QUEUE_CAPACITY: NonZeroUsize = NonZeroUsize::new(10).unwrap();
31    const WARN_THRESHOLD: Duration = Duration::from_millis(10);
32    const DEFAULT_WORKERS: NonZeroUsize = NonZeroUsize::new(2).unwrap();
33
34    pub fn init(
35        queue_capacity: NonZeroUsize,
36        worker_count: NonZeroUsize,
37    ) -> Result<(), ReclaimerError> {
38        let mut did_init = false;
39
40        let _ = INSTANCE.get_or_init(|| {
41            did_init = true;
42            Self::with_workers(queue_capacity, worker_count)
43        });
44
45        if did_init {
46            Ok(())
47        } else {
48            Err(ReclaimerError::AlreadyInitialized)
49        }
50    }
51
52    fn with_workers(queue_capacity: NonZeroUsize, worker_count: NonZeroUsize) -> Self {
53        let inner = Arc::new(Inner::new(queue_capacity));
54        Self::start_workers(inner.clone(), worker_count);
55        Self { inner }
56    }
57
58    /// Global singleton instance, initialized on first use.
59    pub fn instance() -> &'static Reclaimer {
60        INSTANCE.get_or_init(|| Self::with_workers(Self::QUEUE_CAPACITY, Self::DEFAULT_WORKERS))
61    }
62
63    /// Enqueue a value to be dropped later by the background worker.
64    pub fn drop<T>(&self, value: T)
65    where
66        T: Send + 'static,
67    {
68        DROP_FLAGS.with(|flags| {
69            let flags_before = flags.get();
70            if flags_before & FLAG_DROPPING != 0 {
71                // Value will be dropped here inplace without using the queue.
72                return;
73            }
74
75            let inside_tokio = tokio::runtime::Handle::try_current().is_ok();
76            if inside_tokio || flags_before & FLAG_ALLOW_IN_PLACE == 0 {
77                // Prevent recursive channel drops.
78                flags.set(flags_before | FLAG_DROPPING);
79
80                let start = Instant::now();
81                metrics::counter!("tycho_delayed_drop_enqueued").increment(1);
82
83                self.inner.enqueue(Box::new(value), inside_tokio, start);
84
85                // Reset flags
86                flags.set(flags_before);
87            } else {
88                // DROP_IN_PLACE flag was set for blocking task.
89                drop(value);
90            }
91        });
92    }
93
94    /// Enqueue a value to be dropped later by the background worker.
95    ///
96    /// Drops value inplace if called outside a tokio context.
97    pub fn drop_in_place<T>(&self, value: T)
98    where
99        T: Send + 'static,
100    {
101        DROP_FLAGS.with(|flags| {
102            let flags_before = flags.get();
103            flags.set(flags_before | FLAG_ALLOW_IN_PLACE);
104            self.drop(value);
105            flags.set(flags_before);
106        });
107    }
108}
109
110thread_local! {
111    static DROP_FLAGS: Cell<u8> = const { Cell::new(0) };
112}
113
114const FLAG_DROPPING: u8 = 0x01;
115const FLAG_ALLOW_IN_PLACE: u8 = 0b10;
116
117struct Inner {
118    queue: Injector<Box<dyn Send>>,
119    state: Mutex<State>,
120    not_empty: Condvar,
121    not_full: Condvar,
122    capacity: NonZeroUsize,
123}
124
125struct State {
126    len: usize,
127}
128
129impl Reclaimer {
130    fn start_workers(inner: Arc<Inner>, worker_total: NonZeroUsize) {
131        for worker_index in 0..worker_total.get() {
132            let inner = inner.clone();
133            thread::Builder::new()
134                .name("tycho-reclaimer".into())
135                .spawn(move || Inner::worker_loop(inner, worker_index))
136                .expect("failed to spawn reclaimer worker");
137        }
138    }
139}
140
141impl Inner {
142    fn new(capacity: NonZeroUsize) -> Self {
143        Self {
144            queue: Injector::new(),
145            state: Mutex::new(State { len: 0 }),
146            not_empty: Condvar::new(),
147            not_full: Condvar::new(),
148            capacity,
149        }
150    }
151
152    fn enqueue(&self, item: Box<dyn Send>, inside_tokio: bool, start: Instant) {
153        {
154            let mut state = self.state.lock().expect("poisoned");
155
156            while state.len >= self.capacity.get() {
157                state = self.not_full.wait(state).expect("poisoned");
158            }
159            state.len += 1;
160        }
161
162        self.queue.push(item);
163        self.not_empty.notify_one();
164
165        if inside_tokio {
166            let elapsed = start.elapsed();
167            if elapsed > Reclaimer::WARN_THRESHOLD {
168                tracing::warn!(
169                    elapsed_ms = elapsed.as_millis(),
170                    "delayed drop queue was full for too long"
171                );
172            }
173        }
174    }
175
176    fn pop(&self) -> Box<dyn Send> {
177        loop {
178            match self.queue.steal() {
179                Steal::Success(item) => {
180                    {
181                        let mut state = self.state.lock().expect("poisoned");
182                        assert!(state.len > 0);
183                        state.len -= 1;
184                    }
185                    self.not_full.notify_one();
186                    return item;
187                }
188                Steal::Retry => {
189                    std::hint::spin_loop();
190                }
191                Steal::Empty => {
192                    let mut state = self.state.lock().expect("poisoned");
193                    while state.len == 0 {
194                        state = self.not_empty.wait(state).expect("poisoned");
195                    }
196                }
197            }
198        }
199    }
200
201    fn worker_loop(inner: Arc<Self>, worker_index: usize) {
202        tracing::info!(?worker_index, "reclaimer worker started");
203        scopeguard::defer! { tracing::info!(?worker_index, "reclaimer worker finished"); };
204
205        DROP_FLAGS.set(FLAG_DROPPING);
206
207        loop {
208            let item = inner.pop();
209            let histogram = HistogramGuard::begin("tycho_delayed_drop_time");
210            metrics::counter!("tycho_delayed_drop_dropped").increment(1);
211            drop(item);
212            histogram.finish();
213        }
214    }
215}
216
217#[derive(Debug, thiserror::Error)]
218pub enum ReclaimerError {
219    #[error("Reclaimer was already initialized")]
220    AlreadyInitialized,
221}
222
223#[cfg(test)]
224mod tests {
225    use std::sync::mpsc;
226    use std::thread;
227    use std::time::Duration;
228
229    use super::*;
230
231    /// Sends the current thread ID when dropped.
232    struct Tracer(mpsc::Sender<thread::ThreadId>);
233
234    impl Drop for Tracer {
235        fn drop(&mut self) {
236            // Ignore send errors (e.g., if the test already timed out/panicked).
237            let _ = self.0.send(thread::current().id());
238        }
239    }
240
241    #[test]
242    fn drops_in_background() {
243        let (tx, rx) = mpsc::channel::<thread::ThreadId>();
244        let origin = thread::current().id();
245
246        Reclaimer::instance().drop(Tracer(tx));
247
248        let dropped_on = rx
249            .recv_timeout(Duration::from_secs(3))
250            .expect("value was not dropped in time");
251
252        // Should be dropped by a worker thread, not the caller.
253        assert_ne!(dropped_on, origin, "drop did not occur on a worker thread");
254    }
255
256    #[test]
257    fn drops_in_place() {
258        let (tx, rx) = mpsc::channel::<thread::ThreadId>();
259        let origin = thread::current().id();
260
261        Reclaimer::instance().drop_in_place(Tracer(tx));
262
263        let dropped_on = rx
264            .recv_timeout(Duration::from_secs(3))
265            .expect("value was not dropped in time");
266
267        assert_eq!(dropped_on, origin, "didn't drop in place");
268    }
269
270    #[test]
271    fn double_init_will_err() {
272        // First call to init can succeed, but tests are run in parallel,
273        // so we need to ensure that it does not panic.
274        let _ = Reclaimer::init(Reclaimer::QUEUE_CAPACITY, Reclaimer::DEFAULT_WORKERS);
275
276        // The second call in *this* test must fail.
277        let second = Reclaimer::init(Reclaimer::QUEUE_CAPACITY, Reclaimer::DEFAULT_WORKERS);
278        assert!(
279            matches!(second, Err(ReclaimerError::AlreadyInitialized)),
280            "second init should always fail"
281        );
282    }
283
284    #[test]
285    fn burst_is_dropped() {
286        let reclaimer = Reclaimer::instance();
287
288        assert_eq!(
289            reclaimer.inner.capacity.get(),
290            Reclaimer::QUEUE_CAPACITY.get()
291        );
292
293        assert_eq!(reclaimer.inner.state.lock().unwrap().len, 0);
294
295        let total = Reclaimer::QUEUE_CAPACITY.get() * 100;
296
297        // Single channel; clone the sender per item.
298        let (tx, rx) = mpsc::channel::<thread::ThreadId>();
299        let origin = thread::current().id();
300
301        for _ in 0..total {
302            Reclaimer::instance().drop(Tracer(tx.clone()));
303        }
304        drop(tx); // Close the sending side when all enqueues are done.
305
306        let now = Instant::now();
307        let mut received = 0usize;
308
309        while received < total {
310            let elapsed = now.elapsed();
311            if elapsed > Duration::from_secs(3) {
312                panic!(
313                    "timed out waiting for drops: {received} of {total} received in {elapsed:?}"
314                );
315            }
316
317            match rx.recv_timeout(Duration::from_millis(100)) {
318                Ok(worker_id) => {
319                    assert_ne!(
320                        worker_id, origin,
321                        "Each drop should come from a worker thread"
322                    );
323                    received += 1;
324                }
325                Err(mpsc::RecvTimeoutError::Timeout) => {
326                    // keep looping until deadline
327                }
328                Err(e) => panic!("{e}"),
329            }
330        }
331
332        assert_eq!(reclaimer.inner.state.lock().unwrap().len, 0);
333    }
334}