Skip to main content

ember_core/
dropper.rs

1//! Background value dropper for lazy free.
2//!
3//! Expensive destructor work (dropping large lists, hashes, sorted sets)
4//! is offloaded to a dedicated OS thread so shard loops stay responsive.
5//! This is the same strategy Redis uses with its `lazyfree` threads.
6//!
7//! The dropper runs as a plain `std::thread` rather than a tokio task
8//! because dropping data structures is CPU-bound work that would starve
9//! the async executor.
10
11use std::sync::mpsc::{self, SyncSender, TrySendError};
12
13use ahash::AHashMap;
14use compact_str::CompactString;
15
16use crate::keyspace::Entry;
17use crate::memory::is_large_value;
18use crate::types::Value;
19
20/// Bounded channel capacity. Large enough to absorb bursts without
21/// meaningful memory overhead (~4096 pointers).
22const DROP_CHANNEL_CAPACITY: usize = 4096;
23
24/// Items that can be sent to the background drop thread.
25///
26/// The fields are never explicitly read — the whole point is that the
27/// drop thread receives them and lets their destructors run.
28#[allow(dead_code)]
29enum Droppable {
30    /// A single value removed from the keyspace (e.g. DEL, UNLINK, eviction).
31    Value(Value),
32    /// All entries from a FLUSHDB ASYNC — dropped in bulk.
33    Entries(AHashMap<CompactString, Entry>),
34}
35
36/// A cloneable handle for deferring expensive drops to the background thread.
37///
38/// When all handles are dropped, the background thread's channel closes
39/// and it exits cleanly.
40#[derive(Debug, Clone)]
41pub struct DropHandle {
42    tx: SyncSender<Droppable>,
43}
44
45impl DropHandle {
46    /// Spawns the background drop thread and returns a handle.
47    ///
48    /// If the thread fails to spawn (resource exhaustion), logs a warning
49    /// and returns a handle that drops everything inline. The channel
50    /// disconnects immediately since the receiver is never started, and
51    /// `try_send` gracefully falls back to inline dropping.
52    pub fn spawn() -> Self {
53        let (tx, rx) = mpsc::sync_channel::<Droppable>(DROP_CHANNEL_CAPACITY);
54
55        if let Err(e) = std::thread::Builder::new()
56            .name("ember-drop".into())
57            .spawn(move || {
58                // just drain the channel — dropping each item frees the memory
59                while rx.recv().is_ok() {}
60            })
61        {
62            tracing::warn!("failed to spawn drop thread, large values will be freed inline: {e}");
63        }
64
65        Self { tx }
66    }
67
68    /// Defers dropping a value to the background thread if it's large enough
69    /// to be worth the channel overhead. Small values are dropped inline.
70    ///
71    /// If the channel is full, falls back to inline drop — never blocks.
72    pub fn defer_value(&self, value: Value) {
73        if !is_large_value(&value) {
74            return; // small value — inline drop is fine
75        }
76        // try_send: never block the shard even if the drop thread is behind
77        match self.tx.try_send(Droppable::Value(value)) {
78            Ok(()) => {}
79            Err(TrySendError::Full(item)) => {
80                // channel full — drop inline as fallback
81                drop(item);
82            }
83            Err(TrySendError::Disconnected(_)) => {
84                // drop thread gone — nothing we can do, value drops here
85            }
86        }
87    }
88
89    /// Defers dropping all entries from a flush operation. Always deferred
90    /// since a full keyspace is always worth offloading.
91    pub(crate) fn defer_entries(&self, entries: AHashMap<CompactString, Entry>) {
92        if entries.is_empty() {
93            return;
94        }
95        match self.tx.try_send(Droppable::Entries(entries)) {
96            Ok(()) => {}
97            Err(TrySendError::Full(item)) => {
98                drop(item);
99            }
100            Err(TrySendError::Disconnected(_)) => {}
101        }
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use bytes::Bytes;
109    use std::collections::VecDeque;
110
111    #[test]
112    fn defer_small_value_drops_inline() {
113        let handle = DropHandle::spawn();
114        // small string — should not be sent to channel
115        handle.defer_value(Value::String(Bytes::from("hello")));
116    }
117
118    #[test]
119    fn defer_large_list() {
120        let handle = DropHandle::spawn();
121        let mut list = VecDeque::new();
122        for i in 0..100 {
123            list.push_back(Bytes::from(format!("item-{i}")));
124        }
125        handle.defer_value(Value::List(list));
126        // give the drop thread a moment to process
127        std::thread::sleep(std::time::Duration::from_millis(10));
128    }
129
130    #[test]
131    fn defer_entries_from_flush() {
132        let handle = DropHandle::spawn();
133        let mut entries = AHashMap::new();
134        for i in 0..10 {
135            entries.insert(
136                CompactString::from(format!("key-{i}").as_str()),
137                Entry {
138                    value: Value::String(Bytes::from(format!("val-{i}"))),
139                    expires_at_ms: 0,
140                    cached_value_size: 0,
141                    last_access_secs: 0,
142                },
143            );
144        }
145        handle.defer_entries(entries);
146        std::thread::sleep(std::time::Duration::from_millis(10));
147    }
148
149    #[test]
150    fn empty_entries_skipped() {
151        let handle = DropHandle::spawn();
152        handle.defer_entries(AHashMap::new());
153    }
154}