use std::sync::mpsc::{self, SyncSender, TrySendError};
use ahash::AHashMap;
use compact_str::CompactString;
use crate::keyspace::Entry;
use crate::memory::is_large_value;
use crate::types::Value;
const DROP_CHANNEL_CAPACITY: usize = 4096;
#[allow(dead_code)]
enum Droppable {
Value(Value),
Entries(AHashMap<CompactString, Entry>),
}
#[derive(Debug, Clone)]
pub struct DropHandle {
tx: SyncSender<Droppable>,
}
impl DropHandle {
pub fn spawn() -> Self {
let (tx, rx) = mpsc::sync_channel::<Droppable>(DROP_CHANNEL_CAPACITY);
if let Err(e) = std::thread::Builder::new()
.name("ember-drop".into())
.spawn(move || {
while rx.recv().is_ok() {}
})
{
tracing::warn!("failed to spawn drop thread, large values will be freed inline: {e}");
}
Self { tx }
}
pub fn defer_value(&self, value: Value) {
if !is_large_value(&value) {
return; }
match self.tx.try_send(Droppable::Value(value)) {
Ok(()) => {}
Err(TrySendError::Full(item)) => {
drop(item);
}
Err(TrySendError::Disconnected(_)) => {
}
}
}
pub(crate) fn defer_entries(&self, entries: AHashMap<CompactString, Entry>) {
if entries.is_empty() {
return;
}
match self.tx.try_send(Droppable::Entries(entries)) {
Ok(()) => {}
Err(TrySendError::Full(item)) => {
drop(item);
}
Err(TrySendError::Disconnected(_)) => {}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use std::collections::VecDeque;
#[test]
fn defer_small_value_drops_inline() {
let handle = DropHandle::spawn();
handle.defer_value(Value::String(Bytes::from("hello")));
}
#[test]
fn defer_large_list() {
let handle = DropHandle::spawn();
let mut list = VecDeque::new();
for i in 0..100 {
list.push_back(Bytes::from(format!("item-{i}")));
}
handle.defer_value(Value::List(list));
std::thread::sleep(std::time::Duration::from_millis(10));
}
#[test]
fn defer_entries_from_flush() {
let handle = DropHandle::spawn();
let mut entries = AHashMap::new();
for i in 0..10 {
entries.insert(
CompactString::from(format!("key-{i}").as_str()),
Entry {
value: Value::String(Bytes::from(format!("val-{i}"))),
expires_at_ms: 0,
cached_value_size: 0,
last_access_secs: 0,
},
);
}
handle.defer_entries(entries);
std::thread::sleep(std::time::Duration::from_millis(10));
}
#[test]
fn empty_entries_skipped() {
let handle = DropHandle::spawn();
handle.defer_entries(AHashMap::new());
}
}