use parking_lot::{RwLock, Mutex};
use std::collections::BTreeMap;
use std::sync::Arc;
pub struct IndexMemBuffer<K, V>
where
K: Ord + Clone,
V: Clone,
{
active: Arc<RwLock<BufferState<K, V>>>,
immutable: Arc<RwLock<Vec<Arc<BufferState<K, V>>>>>,
size_limit: usize,
flush_lock: Arc<Mutex<()>>,
}
struct BufferState<K, V>
where
K: Ord + Clone,
V: Clone,
{
data: BTreeMap<K, V>,
size: usize,
}
impl<K, V> IndexMemBuffer<K, V>
where
K: Ord + Clone,
V: Clone,
{
pub fn new(size_limit: usize) -> Self {
Self {
active: Arc::new(RwLock::new(BufferState {
data: BTreeMap::new(),
size: 0,
})),
immutable: Arc::new(RwLock::new(Vec::new())),
size_limit,
flush_lock: Arc::new(Mutex::new(())),
}
}
pub fn insert(&self, key: K, value: V) -> Result<bool, String> {
let mut active = self.active.write();
let entry_size = std::mem::size_of::<K>() + std::mem::size_of::<V>();
active.data.insert(key, value);
active.size += entry_size;
if active.size >= self.size_limit {
let old_active = BufferState {
data: std::mem::take(&mut active.data),
size: active.size,
};
active.size = 0;
self.immutable.write().push(Arc::new(old_active));
return Ok(true); }
Ok(false)
}
pub fn get(&self, key: &K) -> Option<V> {
{
let active = self.active.read();
if let Some(value) = active.data.get(key) {
return Some(value.clone());
}
}
{
let immutable = self.immutable.read();
for buffer in immutable.iter().rev() {
if let Some(value) = buffer.data.get(key) {
return Some(value.clone());
}
}
}
None
}
pub fn range(&self, start: &K, end: &K) -> Vec<(K, V)> {
use std::ops::Bound;
let mut results = Vec::new();
{
let active = self.active.read();
results.extend(
active.data
.range((Bound::Included(start), Bound::Included(end)))
.map(|(k, v)| (k.clone(), v.clone()))
);
}
{
let immutable = self.immutable.read();
for buffer in immutable.iter() {
results.extend(
buffer.data
.range((Bound::Included(start), Bound::Included(end)))
.map(|(k, v)| (k.clone(), v.clone()))
);
}
}
results.sort_by(|a, b| a.0.cmp(&b.0));
results.dedup_by(|a, b| a.0 == b.0);
results
}
pub fn scan_all(&self) -> Vec<(K, V)> {
let mut results = Vec::new();
{
let active = self.active.read();
results.extend(
active.data.iter().map(|(k, v)| (k.clone(), v.clone()))
);
}
{
let immutable = self.immutable.read();
for buffer in immutable.iter() {
results.extend(
buffer.data.iter().map(|(k, v)| (k.clone(), v.clone()))
);
}
}
results.sort_by(|a, b| a.0.cmp(&b.0));
results.dedup_by(|a, b| a.0 == b.0);
results
}
pub fn drain(&self) -> Vec<(K, V)> {
let mut results = Vec::new();
{
let mut active = self.active.write();
results.extend(
active.data.iter().map(|(k, v)| (k.clone(), v.clone()))
);
active.data.clear();
active.size = 0;
}
{
let mut immutable = self.immutable.write();
for buffer in immutable.iter() {
results.extend(
buffer.data.iter().map(|(k, v)| (k.clone(), v.clone()))
);
}
immutable.clear();
}
results.sort_by(|a, b| a.0.cmp(&b.0));
results.dedup_by(|a, b| a.0 == b.0);
results
}
pub fn flush(&self) -> Result<Option<Vec<(K, V)>>, String> {
let _lock = self.flush_lock.lock();
let buffer = {
let mut immutable = self.immutable.write();
if immutable.is_empty() {
return Ok(None);
}
immutable.remove(0) };
let entries: Vec<_> = buffer.data.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
Ok(Some(entries))
}
pub fn delete(&self, key: &K) -> bool {
let mut active = self.active.write();
if active.data.remove(key).is_some() {
let entry_size = std::mem::size_of::<K>() + std::mem::size_of::<V>();
active.size = active.size.saturating_sub(entry_size);
true
} else {
false
}
}
pub fn stats(&self) -> BufferStats {
let active = self.active.read();
let immutable = self.immutable.read();
let active_size = active.size;
let active_count = active.data.len();
let immutable_count = immutable.len();
let immutable_size: usize = immutable.iter().map(|b| b.size).sum();
BufferStats {
active_size_bytes: active_size,
active_entry_count: active_count,
immutable_buffer_count: immutable_count,
immutable_size_bytes: immutable_size,
total_size_bytes: active_size + immutable_size,
size_limit: self.size_limit,
fullness: ((active_size + immutable_size) as f64 / self.size_limit as f64 * 100.0) as u8,
}
}
pub fn is_empty(&self) -> bool {
let active = self.active.read();
let immutable = self.immutable.read();
active.data.is_empty() && immutable.is_empty()
}
pub fn size(&self) -> usize {
let active_size = self.active.read().size;
let immutable_size: usize = self.immutable.read().iter().map(|b| b.size).sum();
active_size + immutable_size
}
pub fn should_flush(&self) -> bool {
!self.immutable.read().is_empty()
}
pub fn immutable_count(&self) -> usize {
self.immutable.read().len()
}
}
#[derive(Debug, Clone)]
pub struct BufferStats {
pub active_size_bytes: usize,
pub active_entry_count: usize,
pub immutable_buffer_count: usize,
pub immutable_size_bytes: usize,
pub total_size_bytes: usize,
pub size_limit: usize,
pub fullness: u8,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mem_buffer_basic() {
let buffer: IndexMemBuffer<i32, String> = IndexMemBuffer::new(1024);
let full = buffer.insert(1, "one".to_string()).unwrap();
assert!(!full);
let full = buffer.insert(2, "two".to_string()).unwrap();
assert!(!full);
assert_eq!(buffer.get(&1), Some("one".to_string()));
assert_eq!(buffer.get(&2), Some("two".to_string()));
assert_eq!(buffer.get(&3), None);
}
#[test]
fn test_mem_buffer_range() {
let buffer: IndexMemBuffer<i32, String> = IndexMemBuffer::new(1024);
buffer.insert(1, "one".to_string()).unwrap();
buffer.insert(2, "two".to_string()).unwrap();
buffer.insert(3, "three".to_string()).unwrap();
buffer.insert(5, "five".to_string()).unwrap();
let results = buffer.range(&2, &4);
assert_eq!(results.len(), 2);
assert_eq!(results[0], (2, "two".to_string()));
assert_eq!(results[1], (3, "three".to_string()));
}
#[test]
fn test_mem_buffer_drain() {
let buffer: IndexMemBuffer<i32, String> = IndexMemBuffer::new(1024);
buffer.insert(1, "one".to_string()).unwrap();
buffer.insert(2, "two".to_string()).unwrap();
let entries = buffer.drain();
assert_eq!(entries.len(), 2);
assert!(buffer.is_empty());
assert_eq!(buffer.size(), 0);
assert_eq!(buffer.get(&1), None);
}
#[test]
fn test_mem_buffer_fullness() {
let buffer: IndexMemBuffer<i32, String> = IndexMemBuffer::new(128);
let mut i = 0;
loop {
let full = buffer.insert(i, format!("value_{}", i)).unwrap();
i += 1;
if full {
break;
}
}
println!("Inserted {} entries before buffer full", i);
assert!(buffer.should_flush());
let stats = buffer.stats();
println!("Stats: {:?}", stats);
assert!(stats.fullness >= 100);
}
}