ember_core/dropper.rs
1//! Background value dropper for lazy free.
2//!
3//! Expensive destructor work (dropping large lists, hashes, sorted sets)
4//! is offloaded to a dedicated OS thread so shard loops stay responsive.
5//! This is the same strategy Redis uses with its `lazyfree` threads.
6//!
7//! The dropper runs as a plain `std::thread` rather than a tokio task
8//! because dropping data structures is CPU-bound work that would starve
9//! the async executor.
10
11use std::sync::mpsc::{self, SyncSender, TrySendError};
12
13use ahash::AHashMap;
14use compact_str::CompactString;
15
16use crate::keyspace::Entry;
17use crate::memory::is_large_value;
18use crate::types::Value;
19
20/// Bounded channel capacity. Large enough to absorb bursts without
21/// meaningful memory overhead (~4096 pointers).
22const DROP_CHANNEL_CAPACITY: usize = 4096;
23
24/// Items that can be sent to the background drop thread.
25///
26/// The fields are never explicitly read — the whole point is that the
27/// drop thread receives them and lets their destructors run.
28#[allow(dead_code)]
29enum Droppable {
30 /// A single value removed from the keyspace (e.g. DEL, UNLINK, eviction).
31 Value(Value),
32 /// All entries from a FLUSHDB ASYNC — dropped in bulk.
33 Entries(AHashMap<CompactString, Entry>),
34}
35
36/// A cloneable handle for deferring expensive drops to the background thread.
37///
38/// When all handles are dropped, the background thread's channel closes
39/// and it exits cleanly.
40#[derive(Debug, Clone)]
41pub struct DropHandle {
42 tx: SyncSender<Droppable>,
43}
44
45impl DropHandle {
46 /// Spawns the background drop thread and returns a handle.
47 ///
48 /// If the thread fails to spawn (resource exhaustion), logs a warning
49 /// and returns a handle that drops everything inline. The channel
50 /// disconnects immediately since the receiver is never started, and
51 /// `try_send` gracefully falls back to inline dropping.
52 pub fn spawn() -> Self {
53 let (tx, rx) = mpsc::sync_channel::<Droppable>(DROP_CHANNEL_CAPACITY);
54
55 if let Err(e) = std::thread::Builder::new()
56 .name("ember-drop".into())
57 .spawn(move || {
58 // just drain the channel — dropping each item frees the memory
59 while rx.recv().is_ok() {}
60 })
61 {
62 tracing::warn!("failed to spawn drop thread, large values will be freed inline: {e}");
63 }
64
65 Self { tx }
66 }
67
68 /// Defers dropping a value to the background thread if it's large enough
69 /// to be worth the channel overhead. Small values are dropped inline.
70 ///
71 /// If the channel is full, falls back to inline drop — never blocks.
72 pub fn defer_value(&self, value: Value) {
73 if !is_large_value(&value) {
74 return; // small value — inline drop is fine
75 }
76 // try_send: never block the shard even if the drop thread is behind
77 match self.tx.try_send(Droppable::Value(value)) {
78 Ok(()) => {}
79 Err(TrySendError::Full(item)) => {
80 // channel full — drop inline as fallback
81 drop(item);
82 }
83 Err(TrySendError::Disconnected(_)) => {
84 // drop thread gone — nothing we can do, value drops here
85 }
86 }
87 }
88
89 /// Defers dropping all entries from a flush operation. Always deferred
90 /// since a full keyspace is always worth offloading.
91 pub(crate) fn defer_entries(&self, entries: AHashMap<CompactString, Entry>) {
92 if entries.is_empty() {
93 return;
94 }
95 match self.tx.try_send(Droppable::Entries(entries)) {
96 Ok(()) => {}
97 Err(TrySendError::Full(item)) => {
98 drop(item);
99 }
100 Err(TrySendError::Disconnected(_)) => {}
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use bytes::Bytes;
109 use std::collections::VecDeque;
110
111 #[test]
112 fn defer_small_value_drops_inline() {
113 let handle = DropHandle::spawn();
114 // small string — should not be sent to channel
115 handle.defer_value(Value::String(Bytes::from("hello")));
116 }
117
118 #[test]
119 fn defer_large_list() {
120 let handle = DropHandle::spawn();
121 let mut list = VecDeque::new();
122 for i in 0..100 {
123 list.push_back(Bytes::from(format!("item-{i}")));
124 }
125 handle.defer_value(Value::List(list));
126 // give the drop thread a moment to process
127 std::thread::sleep(std::time::Duration::from_millis(10));
128 }
129
130 #[test]
131 fn defer_entries_from_flush() {
132 let handle = DropHandle::spawn();
133 let mut entries = AHashMap::new();
134 for i in 0..10 {
135 entries.insert(
136 CompactString::from(format!("key-{i}").as_str()),
137 Entry {
138 value: Value::String(Bytes::from(format!("val-{i}"))),
139 expires_at_ms: 0,
140 cached_value_size: 0,
141 last_access_secs: 0,
142 },
143 );
144 }
145 handle.defer_entries(entries);
146 std::thread::sleep(std::time::Duration::from_millis(10));
147 }
148
149 #[test]
150 fn empty_entries_skipped() {
151 let handle = DropHandle::spawn();
152 handle.defer_entries(AHashMap::new());
153 }
154}