use std::collections::BTreeMap;
use std::path::Path;
use crate::core::reactor::Reactor;
use crate::features::storage::sstable::{self, Entry, SstableError};
#[derive(Debug)]
pub enum MemTableError {
Sstable(SstableError),
}
pub type Result<T> = std::result::Result<T, MemTableError>;
impl From<SstableError> for MemTableError {
fn from(err: SstableError) -> Self {
MemTableError::Sstable(err)
}
}
pub struct MemTable {
entries: BTreeMap<u64, Vec<Entry>>,
}
impl Default for MemTable {
fn default() -> Self {
Self::new()
}
}
impl MemTable {
pub fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}
pub fn put(&mut self, entry: Entry) {
self.entries.entry(entry.key).or_default().push(entry);
}
pub fn entries_for_key(&self, key: u64) -> Vec<Entry> {
self.entries.get(&key).cloned().unwrap_or_default()
}
pub fn flush_to_sstable(&self, path: &Path) -> Result<sstable::Sstable> {
self.flush_to_sstable_with_reactor(path, &crate::core::reactor::SystemReactor)
}
pub fn flush_to_sstable_with_reactor(
&self,
path: &Path,
reactor: &dyn Reactor,
) -> Result<sstable::Sstable> {
let mut flattened = Vec::new();
for bucket in self.entries.values() {
for entry in bucket {
flattened.push(entry.clone());
}
}
let table = sstable::write_sstable_with_reactor(path, &flattened, reactor)?;
Ok(table)
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn clear(&mut self) {
self.entries.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::features::storage::sstable::EntryKind;
use std::fs;
fn temp_path(name: &str) -> std::path::PathBuf {
let mut dir = std::env::temp_dir();
let stamp = format!(
"{}_{}_{}",
name,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
dir.push(format!("{}.sst", stamp));
dir
}
#[test]
fn memtable_flush_round_trip() {
let path = temp_path("memtable_flush");
let mut memtable = MemTable::new();
memtable.put(Entry {
key: 1,
version: 1,
kind: EntryKind::EdgeDelta,
value: b"one".to_vec(),
});
memtable.put(Entry {
key: 2,
version: 1,
kind: EntryKind::VectorDelta,
value: b"two".to_vec(),
});
memtable.put(Entry {
key: 1,
version: 2,
kind: EntryKind::FullNode,
value: b"one-v2".to_vec(),
});
let table = memtable.flush_to_sstable(&path).unwrap();
let loaded = sstable::read_sstable(&path).unwrap();
assert_eq!(table.entries, loaded.entries);
assert_eq!(table.entries.len(), 3);
fs::remove_file(path).ok();
}
}