Skip to main content

ember_core/
dropper.rs

1//! Background value dropper for lazy free.
2//!
3//! Expensive destructor work (dropping large lists, hashes, sorted sets)
4//! is offloaded to a dedicated OS thread so shard loops stay responsive.
5//! This is the same strategy Redis uses with its `lazyfree` threads.
6//!
7//! The dropper runs as a plain `std::thread` rather than a tokio task
8//! because dropping data structures is CPU-bound work that would starve
9//! the async executor.
10
11use std::collections::HashMap;
12use std::sync::mpsc::{self, SyncSender, TrySendError};
13
14use crate::keyspace::Entry;
15use crate::memory::is_large_value;
16use crate::types::Value;
17
18/// Bounded channel capacity. Large enough to absorb bursts without
19/// meaningful memory overhead (~4096 pointers).
20const DROP_CHANNEL_CAPACITY: usize = 4096;
21
22/// Items that can be sent to the background drop thread.
23///
24/// The fields are never explicitly read — the whole point is that the
25/// drop thread receives them and lets their destructors run.
26#[allow(dead_code)]
27enum Droppable {
28    /// A single value removed from the keyspace (e.g. DEL, UNLINK, eviction).
29    Value(Value),
30    /// All entries from a FLUSHDB ASYNC — dropped in bulk.
31    Entries(HashMap<String, Entry>),
32}
33
34/// A cloneable handle for deferring expensive drops to the background thread.
35///
36/// When all handles are dropped, the background thread's channel closes
37/// and it exits cleanly.
38#[derive(Debug, Clone)]
39pub struct DropHandle {
40    tx: SyncSender<Droppable>,
41}
42
43impl DropHandle {
44    /// Spawns the background drop thread and returns a handle.
45    ///
46    /// If the thread fails to spawn (resource exhaustion), logs a warning
47    /// and returns a handle that drops everything inline. The channel
48    /// disconnects immediately since the receiver is never started, and
49    /// `try_send` gracefully falls back to inline dropping.
50    pub fn spawn() -> Self {
51        let (tx, rx) = mpsc::sync_channel::<Droppable>(DROP_CHANNEL_CAPACITY);
52
53        if let Err(e) = std::thread::Builder::new()
54            .name("ember-drop".into())
55            .spawn(move || {
56                // just drain the channel — dropping each item frees the memory
57                while rx.recv().is_ok() {}
58            })
59        {
60            tracing::warn!("failed to spawn drop thread, large values will be freed inline: {e}");
61        }
62
63        Self { tx }
64    }
65
66    /// Defers dropping a value to the background thread if it's large enough
67    /// to be worth the channel overhead. Small values are dropped inline.
68    ///
69    /// If the channel is full, falls back to inline drop — never blocks.
70    pub fn defer_value(&self, value: Value) {
71        if !is_large_value(&value) {
72            return; // small value — inline drop is fine
73        }
74        // try_send: never block the shard even if the drop thread is behind
75        match self.tx.try_send(Droppable::Value(value)) {
76            Ok(()) => {}
77            Err(TrySendError::Full(item)) => {
78                // channel full — drop inline as fallback
79                drop(item);
80            }
81            Err(TrySendError::Disconnected(_)) => {
82                // drop thread gone — nothing we can do, value drops here
83            }
84        }
85    }
86
87    /// Defers dropping all entries from a flush operation. Always deferred
88    /// since a full keyspace is always worth offloading.
89    pub(crate) fn defer_entries(&self, entries: HashMap<String, Entry>) {
90        if entries.is_empty() {
91            return;
92        }
93        match self.tx.try_send(Droppable::Entries(entries)) {
94            Ok(()) => {}
95            Err(TrySendError::Full(item)) => {
96                drop(item);
97            }
98            Err(TrySendError::Disconnected(_)) => {}
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use bytes::Bytes;
107    use std::collections::VecDeque;
108
109    #[test]
110    fn defer_small_value_drops_inline() {
111        let handle = DropHandle::spawn();
112        // small string — should not be sent to channel
113        handle.defer_value(Value::String(Bytes::from("hello")));
114    }
115
116    #[test]
117    fn defer_large_list() {
118        let handle = DropHandle::spawn();
119        let mut list = VecDeque::new();
120        for i in 0..100 {
121            list.push_back(Bytes::from(format!("item-{i}")));
122        }
123        handle.defer_value(Value::List(list));
124        // give the drop thread a moment to process
125        std::thread::sleep(std::time::Duration::from_millis(10));
126    }
127
128    #[test]
129    fn defer_entries_from_flush() {
130        let handle = DropHandle::spawn();
131        let mut entries = HashMap::new();
132        for i in 0..10 {
133            entries.insert(
134                format!("key-{i}"),
135                Entry {
136                    value: Value::String(Bytes::from(format!("val-{i}"))),
137                    expires_at_ms: 0,
138                    last_access_ms: 0,
139                },
140            );
141        }
142        handle.defer_entries(entries);
143        std::thread::sleep(std::time::Duration::from_millis(10));
144    }
145
146    #[test]
147    fn empty_entries_skipped() {
148        let handle = DropHandle::spawn();
149        handle.defer_entries(HashMap::new());
150    }
151}