ember_core/dropper.rs
1//! Background value dropper for lazy free.
2//!
3//! Expensive destructor work (dropping large lists, hashes, sorted sets)
4//! is offloaded to a dedicated OS thread so shard loops stay responsive.
5//! This is the same strategy Redis uses with its `lazyfree` threads.
6//!
7//! The dropper runs as a plain `std::thread` rather than a tokio task
8//! because dropping data structures is CPU-bound work that would starve
9//! the async executor.
10
11use std::collections::HashMap;
12use std::sync::mpsc::{self, SyncSender, TrySendError};
13
14use crate::keyspace::Entry;
15use crate::memory::is_large_value;
16use crate::types::Value;
17
18/// Bounded channel capacity. Large enough to absorb bursts without
19/// meaningful memory overhead (~4096 pointers).
20const DROP_CHANNEL_CAPACITY: usize = 4096;
21
22/// Items that can be sent to the background drop thread.
23///
24/// The fields are never explicitly read — the whole point is that the
25/// drop thread receives them and lets their destructors run.
26#[allow(dead_code)]
27enum Droppable {
28 /// A single value removed from the keyspace (e.g. DEL, UNLINK, eviction).
29 Value(Value),
30 /// All entries from a FLUSHDB ASYNC — dropped in bulk.
31 Entries(HashMap<String, Entry>),
32}
33
34/// A cloneable handle for deferring expensive drops to the background thread.
35///
36/// When all handles are dropped, the background thread's channel closes
37/// and it exits cleanly.
38#[derive(Debug, Clone)]
39pub struct DropHandle {
40 tx: SyncSender<Droppable>,
41}
42
43impl DropHandle {
44 /// Spawns the background drop thread and returns a handle.
45 ///
46 /// If the thread fails to spawn (resource exhaustion), logs a warning
47 /// and returns a handle that drops everything inline. The channel
48 /// disconnects immediately since the receiver is never started, and
49 /// `try_send` gracefully falls back to inline dropping.
50 pub fn spawn() -> Self {
51 let (tx, rx) = mpsc::sync_channel::<Droppable>(DROP_CHANNEL_CAPACITY);
52
53 if let Err(e) = std::thread::Builder::new()
54 .name("ember-drop".into())
55 .spawn(move || {
56 // just drain the channel — dropping each item frees the memory
57 while rx.recv().is_ok() {}
58 })
59 {
60 tracing::warn!("failed to spawn drop thread, large values will be freed inline: {e}");
61 }
62
63 Self { tx }
64 }
65
66 /// Defers dropping a value to the background thread if it's large enough
67 /// to be worth the channel overhead. Small values are dropped inline.
68 ///
69 /// If the channel is full, falls back to inline drop — never blocks.
70 pub fn defer_value(&self, value: Value) {
71 if !is_large_value(&value) {
72 return; // small value — inline drop is fine
73 }
74 // try_send: never block the shard even if the drop thread is behind
75 match self.tx.try_send(Droppable::Value(value)) {
76 Ok(()) => {}
77 Err(TrySendError::Full(item)) => {
78 // channel full — drop inline as fallback
79 drop(item);
80 }
81 Err(TrySendError::Disconnected(_)) => {
82 // drop thread gone — nothing we can do, value drops here
83 }
84 }
85 }
86
87 /// Defers dropping all entries from a flush operation. Always deferred
88 /// since a full keyspace is always worth offloading.
89 pub(crate) fn defer_entries(&self, entries: HashMap<String, Entry>) {
90 if entries.is_empty() {
91 return;
92 }
93 match self.tx.try_send(Droppable::Entries(entries)) {
94 Ok(()) => {}
95 Err(TrySendError::Full(item)) => {
96 drop(item);
97 }
98 Err(TrySendError::Disconnected(_)) => {}
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use bytes::Bytes;
107 use std::collections::VecDeque;
108
109 #[test]
110 fn defer_small_value_drops_inline() {
111 let handle = DropHandle::spawn();
112 // small string — should not be sent to channel
113 handle.defer_value(Value::String(Bytes::from("hello")));
114 }
115
116 #[test]
117 fn defer_large_list() {
118 let handle = DropHandle::spawn();
119 let mut list = VecDeque::new();
120 for i in 0..100 {
121 list.push_back(Bytes::from(format!("item-{i}")));
122 }
123 handle.defer_value(Value::List(list));
124 // give the drop thread a moment to process
125 std::thread::sleep(std::time::Duration::from_millis(10));
126 }
127
128 #[test]
129 fn defer_entries_from_flush() {
130 let handle = DropHandle::spawn();
131 let mut entries = HashMap::new();
132 for i in 0..10 {
133 entries.insert(
134 format!("key-{i}"),
135 Entry {
136 value: Value::String(Bytes::from(format!("val-{i}"))),
137 expires_at_ms: 0,
138 last_access_ms: 0,
139 },
140 );
141 }
142 handle.defer_entries(entries);
143 std::thread::sleep(std::time::Duration::from_millis(10));
144 }
145
146 #[test]
147 fn empty_entries_skipped() {
148 let handle = DropHandle::spawn();
149 handle.defer_entries(HashMap::new());
150 }
151}