use crate::error::{AmateRSError, ErrorContext, Result};
use crate::storage::{
BlockCache, BlockCacheConfig, BlockCacheKey, CachedBlock, CompactionConfig, CompactionExecutor,
CompactionPlanner, Memtable, MemtableConfig, SSTableConfig, SSTableReader, SSTableWriter,
ValueLog, ValueLogConfig, ValuePointer, Wal,
};
use crate::types::{CipherBlob, Key};
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct SSTableMetadata {
pub path: PathBuf,
pub min_key: Key,
pub max_key: Key,
pub num_entries: usize,
pub file_size: u64,
pub level: usize,
}
#[derive(Debug, Clone)]
pub struct LevelInfo {
pub level: usize,
pub sstables: Vec<SSTableMetadata>,
pub total_size: u64,
}
impl LevelInfo {
fn new(level: usize) -> Self {
Self {
level,
sstables: Vec::new(),
total_size: 0,
}
}
fn add_sstable(&mut self, metadata: SSTableMetadata) {
self.total_size += metadata.file_size;
self.sstables.push(metadata);
}
}
#[derive(Debug, Clone)]
pub struct LsmTreeConfig {
pub data_dir: PathBuf,
pub wal_dir: PathBuf,
pub memtable_config: MemtableConfig,
pub sstable_config: SSTableConfig,
pub block_cache_config: BlockCacheConfig,
pub compaction_config: CompactionConfig,
pub value_log_config: Option<ValueLogConfig>,
pub max_levels: usize,
pub l0_compaction_threshold: usize,
pub level_size_multiplier: usize,
}
impl Default for LsmTreeConfig {
fn default() -> Self {
Self {
data_dir: PathBuf::from("./data"),
wal_dir: PathBuf::from("./wal"),
memtable_config: MemtableConfig::default(),
sstable_config: SSTableConfig::default(),
block_cache_config: BlockCacheConfig::default(),
compaction_config: CompactionConfig::default(),
value_log_config: None, max_levels: 7,
l0_compaction_threshold: 4,
level_size_multiplier: 10,
}
}
}
pub struct LsmTree {
config: LsmTreeConfig,
memtable: Arc<Memtable>,
immutable_memtable: Arc<RwLock<Option<Arc<Memtable>>>>,
wal: Arc<RwLock<Wal>>,
value_log: Option<Arc<ValueLog>>,
levels: Arc<RwLock<Vec<LevelInfo>>>,
block_cache: Arc<BlockCache>,
next_sstable_id: Arc<RwLock<u64>>,
compaction_planner: CompactionPlanner,
compaction_executor: Arc<RwLock<CompactionExecutor>>,
}
impl LsmTree {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
let config = LsmTreeConfig {
data_dir: data_dir.as_ref().to_path_buf(),
..Default::default()
};
Self::with_config(config)
}
pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
std::fs::create_dir_all(&config.data_dir).map_err(|e| {
AmateRSError::StorageIntegrity(ErrorContext::new(format!(
"Failed to create data directory: {}",
e
)))
})?;
std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
AmateRSError::StorageIntegrity(ErrorContext::new(format!(
"Failed to create WAL directory: {}",
e
)))
})?;
let wal_path = config.wal_dir.join("wal.log");
let wal = Wal::create(wal_path)?;
let memtable = Memtable::with_config(config.memtable_config.clone());
let mut levels = Vec::with_capacity(config.max_levels);
for i in 0..config.max_levels {
levels.push(LevelInfo::new(i));
}
let block_cache = BlockCache::with_config(config.block_cache_config.clone());
let compaction_planner = CompactionPlanner::new(config.compaction_config.clone());
let compaction_executor = CompactionExecutor::new(config.sstable_config.clone());
let value_log = if let Some(ref vlog_config) = config.value_log_config {
Some(Arc::new(ValueLog::with_config(vlog_config.clone())?))
} else {
None
};
let mut lsm = Self {
config,
memtable: Arc::new(memtable),
immutable_memtable: Arc::new(RwLock::new(None)),
wal: Arc::new(RwLock::new(wal)),
value_log,
levels: Arc::new(RwLock::new(levels)),
block_cache: Arc::new(block_cache),
next_sstable_id: Arc::new(RwLock::new(0)),
compaction_planner,
compaction_executor: Arc::new(RwLock::new(compaction_executor)),
};
lsm.recover_sstables()?;
Ok(lsm)
}
fn recover_sstables(&mut self) -> Result<()> {
use std::fs;
let entries = fs::read_dir(&self.config.data_dir).map_err(|e| {
AmateRSError::IoError(ErrorContext::new(format!(
"Failed to read data directory: {}",
e
)))
})?;
let mut sstables_by_level: BTreeMap<usize, Vec<SSTableMetadata>> = BTreeMap::new();
let mut max_id = 0u64;
for entry in entries {
let entry = entry.map_err(|e| {
AmateRSError::IoError(ErrorContext::new(format!(
"Failed to read directory entry: {}",
e
)))
})?;
let path = entry.path();
let filename = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};
if filename.starts_with('L') && filename.ends_with(".sst") {
let parts: Vec<&str> = filename[1..].trim_end_matches(".sst").split('_').collect();
if parts.len() == 2 {
if let (Ok(level), Ok(id)) =
(parts[0].parse::<usize>(), parts[1].parse::<u64>())
{
if id > max_id {
max_id = id;
}
let reader = SSTableReader::open(&path)?;
let (min_key, max_key, num_entries) = reader.metadata()?;
let file_size = std::fs::metadata(&path)
.map_err(|e| {
AmateRSError::IoError(ErrorContext::new(format!(
"Failed to get file size: {}",
e
)))
})?
.len();
let metadata = SSTableMetadata {
path: path.clone(),
min_key,
max_key,
num_entries,
file_size,
level,
};
sstables_by_level.entry(level).or_default().push(metadata);
}
}
}
}
let mut levels = self.levels.write();
for (level, mut sstables) in sstables_by_level {
if level < levels.len() {
if level > 0 {
sstables.sort_by(|a, b| a.min_key.cmp(&b.min_key));
}
for metadata in sstables {
levels[level].add_sstable(metadata);
}
}
}
drop(levels);
*self.next_sstable_id.write() = max_id + 1;
Ok(())
}
pub fn put(&self, key: Key, value: CipherBlob) -> Result<()> {
let stored_value = if let Some(ref vlog) = self.value_log {
if vlog.should_separate(&value) {
let pointer = vlog.append(key.clone(), value)?;
vlog.flush()?;
Self::encode_value_pointer(&pointer)
} else {
value
}
} else {
value
};
{
let mut wal = self.wal.write();
wal.put(key.clone(), stored_value.clone())?;
}
self.memtable.put(key, stored_value)?;
if self.memtable.should_flush() {
self.try_flush_memtable()?;
}
Ok(())
}
pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
if let Some(value) = self.memtable.get(key)? {
return self.resolve_value(value);
}
{
let immutable = self.immutable_memtable.read();
if let Some(ref memtable) = *immutable {
if let Some(value) = memtable.get(key)? {
return self.resolve_value(value);
}
}
}
let levels = self.levels.read();
for level_info in levels.iter() {
if let Some(value) = self.search_level(level_info, key)? {
return self.resolve_value(value);
}
}
Ok(None)
}
fn resolve_value(&self, value: CipherBlob) -> Result<Option<CipherBlob>> {
if value.as_bytes().is_empty() {
return Ok(None);
}
if Self::is_value_pointer(&value) {
if let Some(ref vlog) = self.value_log {
let pointer = Self::decode_value_pointer(&value)?;
let actual_value = vlog.read(&pointer)?;
Ok(Some(actual_value))
} else {
Err(AmateRSError::StorageIntegrity(ErrorContext::new(
"Found value pointer but vLog is not configured".to_string(),
)))
}
} else {
Ok(Some(value))
}
}
pub fn delete(&self, key: Key) -> Result<()> {
{
let mut wal = self.wal.write();
wal.delete(key.clone())?;
}
self.memtable.delete(key)?;
if self.memtable.should_flush() {
self.try_flush_memtable()?;
}
Ok(())
}
pub fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
let mut results = BTreeMap::new();
let levels = self.levels.read();
for level_info in levels.iter().rev() {
let level_results = self.range_scan_level(level_info, start, end)?;
for (k, v) in level_results {
results.entry(k).or_insert(v);
}
}
{
let immutable = self.immutable_memtable.read();
if let Some(ref memtable) = *immutable {
for (k, v) in memtable.range(start, end) {
results.insert(k, v);
}
}
}
for (k, v) in self.memtable.range(start, end) {
results.insert(k, v);
}
Ok(results.into_iter().collect())
}
fn search_level(&self, level_info: &LevelInfo, key: &Key) -> Result<Option<CipherBlob>> {
if level_info.level == 0 {
for metadata in level_info.sstables.iter().rev() {
if key >= &metadata.min_key && key <= &metadata.max_key {
if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
return Ok(Some(value));
}
}
}
} else {
let idx = level_info.sstables.binary_search_by(|metadata| {
if key < &metadata.min_key {
std::cmp::Ordering::Greater
} else if key > &metadata.max_key {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Equal
}
});
if let Ok(idx) = idx {
let metadata = &level_info.sstables[idx];
if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
return Ok(Some(value));
}
}
}
Ok(None)
}
fn range_scan_level(
&self,
level_info: &LevelInfo,
start: &Key,
end: &Key,
) -> Result<Vec<(Key, CipherBlob)>> {
let mut results = Vec::new();
for metadata in &level_info.sstables {
if &metadata.max_key < start || &metadata.min_key > end {
continue;
}
let reader = SSTableReader::open(&metadata.path)?;
let entries = reader.iter()?;
for (k, v) in entries {
if &k >= start && &k < end {
results.push((k, v));
}
}
}
Ok(results)
}
fn read_from_sstable(&self, path: &Path, key: &Key) -> Result<Option<CipherBlob>> {
let reader = SSTableReader::open(path)?;
reader.get(key)
}
fn try_flush_memtable(&self) -> Result<()> {
{
let immutable = self.immutable_memtable.read();
if immutable.is_some() {
return Ok(());
}
}
{
let mut immutable = self.immutable_memtable.write();
if immutable.is_some() {
return Ok(());
}
let old_memtable = Arc::clone(&self.memtable);
let new_memtable = Memtable::with_config(self.config.memtable_config.clone());
*immutable = Some(old_memtable);
}
self.flush_immutable_memtable()?;
Ok(())
}
fn flush_immutable_memtable(&self) -> Result<()> {
let memtable = {
let mut immutable = self.immutable_memtable.write();
immutable.take()
};
if let Some(memtable) = memtable {
let sstable_id = {
let mut next_id = self.next_sstable_id.write();
let id = *next_id;
*next_id += 1;
id
};
let sstable_path = self
.config
.data_dir
.join(format!("L0_{:08}.sst", sstable_id));
let mut writer = SSTableWriter::new(&sstable_path, self.config.sstable_config.clone())?;
let entries = memtable.entries();
let mut min_key = None;
let mut max_key = None;
let mut num_entries = 0;
for (key, value_opt) in entries {
let value = value_opt.unwrap_or_else(|| CipherBlob::new(Vec::new()));
if min_key.is_none() {
min_key = Some(key.clone());
}
max_key = Some(key.clone());
writer.add(key, value)?;
num_entries += 1;
}
writer.finish()?;
let file_size = std::fs::metadata(&sstable_path)
.map_err(|e| {
AmateRSError::StorageIntegrity(ErrorContext::new(format!(
"Failed to get SSTable size: {}",
e
)))
})?
.len();
if let (Some(min_key), Some(max_key)) = (min_key, max_key) {
let metadata = SSTableMetadata {
path: sstable_path,
min_key,
max_key,
num_entries,
file_size,
level: 0,
};
let mut levels = self.levels.write();
levels[0].add_sstable(metadata);
}
self.trigger_compaction()?;
}
Ok(())
}
fn trigger_compaction(&self) -> Result<()> {
let levels = self.levels.read();
let l0_count = levels[0].sstables.len();
if self.compaction_planner.needs_l0_compaction(l0_count) {
drop(levels); return self.compact_l0_to_l1();
}
for level_info in levels.iter() {
if level_info.level > 0
&& self
.compaction_planner
.needs_level_compaction(level_info.level, level_info.total_size)
{
let source_level = level_info.level;
drop(levels); return self.compact_level(source_level);
}
}
Ok(())
}
fn compact_l0_to_l1(&self) -> Result<()> {
let (source_sstables, target_sstables) = {
let levels = self.levels.read();
let source = levels[0].sstables.clone();
let target = if levels.len() > 1 {
levels[1].sstables.clone()
} else {
Vec::new()
};
(source, target)
};
if let Some(task) =
self.compaction_planner
.plan_compaction(0, source_sstables, target_sstables)
{
self.execute_compaction_task(task)?;
}
Ok(())
}
fn compact_level(&self, source_level: usize) -> Result<()> {
let (source_sstables, target_sstables) = {
let levels = self.levels.read();
if source_level >= levels.len() {
return Ok(());
}
let source = levels[source_level].sstables.clone();
let target = if source_level + 1 < levels.len() {
levels[source_level + 1].sstables.clone()
} else {
Vec::new()
};
(source, target)
};
if let Some(task) =
self.compaction_planner
.plan_compaction(source_level, source_sstables, target_sstables)
{
self.execute_compaction_task(task)?;
}
Ok(())
}
fn execute_compaction_task(&self, task: crate::storage::CompactionTask) -> Result<()> {
let output_sstables = {
let mut executor = self.compaction_executor.write();
let mut next_id = self.next_sstable_id.write();
executor.execute_compaction(task.clone(), &self.config.data_dir, &mut next_id)?
};
let mut levels = self.levels.write();
levels[task.source_level]
.sstables
.retain(|s| !task.source_sstables.iter().any(|ts| ts.path == s.path));
levels[task.source_level].total_size = levels[task.source_level]
.sstables
.iter()
.map(|s| s.file_size)
.sum();
if task.target_level < levels.len() {
levels[task.target_level]
.sstables
.retain(|s| !task.target_sstables.iter().any(|ts| ts.path == s.path));
levels[task.target_level].total_size = levels[task.target_level]
.sstables
.iter()
.map(|s| s.file_size)
.sum();
for sstable in output_sstables {
levels[task.target_level].add_sstable(sstable);
}
}
drop(levels);
for sstable in task.source_sstables.iter().chain(&task.target_sstables) {
std::fs::remove_file(&sstable.path).ok();
}
Ok(())
}
pub fn level_info(&self, level: usize) -> Option<LevelInfo> {
let levels = self.levels.read();
if level < levels.len() {
Some(levels[level].clone())
} else {
None
}
}
pub fn all_levels_info(&self) -> Vec<LevelInfo> {
self.levels.read().clone()
}
pub fn stats(&self) -> LsmTreeStats {
let levels = self.levels.read();
let cache_stats = self.block_cache.stats();
let compaction_stats = self.compaction_executor.read().stats_snapshot();
LsmTreeStats {
memtable_size: self.memtable.size_bytes(),
num_levels: levels.len(),
levels: levels.clone(),
cache_hit_rate: cache_stats.hit_rate(),
cache_size: cache_stats.size_bytes,
compaction_stats,
}
}
pub fn keys(&self) -> Result<Vec<Key>> {
let mut key_set = std::collections::BTreeSet::new();
for (key, value_opt) in self.memtable.entries() {
if value_opt.is_some() {
key_set.insert(key);
}
}
{
let immutable = self.immutable_memtable.read();
if let Some(ref memtable) = *immutable {
for (key, value_opt) in memtable.entries() {
if value_opt.is_some() {
key_set.insert(key);
}
}
}
}
let levels = self.levels.read();
for level_info in levels.iter() {
for metadata in &level_info.sstables {
let reader = SSTableReader::open(&metadata.path)?;
let entries = reader.iter()?;
for (key, _) in entries {
key_set.insert(key);
}
}
}
Ok(key_set.into_iter().collect())
}
pub fn flush(&self) -> Result<()> {
if self.memtable.size_bytes() > 0 {
self.try_flush_memtable()?;
}
self.flush_immutable_memtable()?;
{
let mut wal = self.wal.write();
wal.flush()?;
}
if let Some(ref vlog) = self.value_log {
vlog.flush()?;
}
Ok(())
}
pub fn close(&self) -> Result<()> {
self.flush()?;
Ok(())
}
fn encode_value_pointer(pointer: &ValuePointer) -> CipherBlob {
const MAGIC: &[u8] = b"VPTR"; let pointer_bytes = pointer.encode();
let mut bytes = Vec::with_capacity(MAGIC.len() + pointer_bytes.len());
bytes.extend_from_slice(MAGIC);
bytes.extend_from_slice(&pointer_bytes);
CipherBlob::new(bytes)
}
fn decode_value_pointer(blob: &CipherBlob) -> Result<ValuePointer> {
const MAGIC: &[u8] = b"VPTR";
let bytes = blob.as_bytes();
if bytes.len() < MAGIC.len() {
return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
"Invalid value pointer: too short".to_string(),
)));
}
if &bytes[..MAGIC.len()] != MAGIC {
return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
"Invalid value pointer: bad magic".to_string(),
)));
}
ValuePointer::decode(&bytes[MAGIC.len()..])
}
fn is_value_pointer(blob: &CipherBlob) -> bool {
const MAGIC: &[u8] = b"VPTR";
let bytes = blob.as_bytes();
bytes.len() >= MAGIC.len() && &bytes[..MAGIC.len()] == MAGIC
}
}
#[derive(Debug, Clone)]
pub struct LsmTreeStats {
pub memtable_size: usize,
pub num_levels: usize,
pub levels: Vec<LevelInfo>,
pub cache_hit_rate: f64,
pub cache_size: usize,
pub compaction_stats: crate::storage::CompactionStatsSnapshot,
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[test]
fn test_lsm_tree_basic_operations() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_basic");
std::fs::create_dir_all(&dir).ok();
let lsm = LsmTree::new(&dir)?;
let key = Key::from_str("test_key");
let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
lsm.put(key.clone(), value.clone())?;
let retrieved = lsm.get(&key)?;
assert!(retrieved.is_some());
assert_eq!(
retrieved
.expect("Value should be retrievable after put")
.as_bytes(),
&[1, 2, 3, 4, 5]
);
lsm.delete(key.clone())?;
let retrieved = lsm.get(&key)?;
assert!(retrieved.is_none());
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_multiple_keys() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_multiple");
std::fs::create_dir_all(&dir).ok();
let lsm = LsmTree::new(&dir)?;
for i in 0..10 {
let key = Key::from_str(&format!("key_{:03}", i));
let value = CipherBlob::new(vec![i as u8; 100]);
lsm.put(key, value)?;
}
for i in 0..10 {
let key = Key::from_str(&format!("key_{:03}", i));
let value = lsm.get(&key)?;
assert!(value.is_some());
assert_eq!(value.expect("Value should exist").as_bytes()[0], i as u8);
}
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_range_scan() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_range");
std::fs::create_dir_all(&dir).ok();
let lsm = LsmTree::new(&dir)?;
for i in 0..20 {
let key = Key::from_str(&format!("key_{:03}", i));
let value = CipherBlob::new(vec![i as u8; 50]);
lsm.put(key, value)?;
}
let start = Key::from_str("key_005");
let end = Key::from_str("key_015");
let results = lsm.range(&start, &end)?;
assert!(results.len() >= 10);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_stats() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_stats");
std::fs::create_dir_all(&dir).ok();
let lsm = LsmTree::new(&dir)?;
for i in 0..5 {
let key = Key::from_str(&format!("key_{}", i));
let value = CipherBlob::new(vec![i as u8; 100]);
lsm.put(key, value)?;
}
let stats = lsm.stats();
assert!(stats.memtable_size > 0);
assert_eq!(stats.num_levels, 7);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_compaction_trigger() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_compaction");
std::fs::create_dir_all(&dir).ok();
let mut config = LsmTreeConfig {
data_dir: dir.clone(),
..Default::default()
};
config.memtable_config.max_size_bytes = 1024; config.l0_compaction_threshold = 2;
let lsm = LsmTree::with_config(config)?;
for i in 0..100 {
let key = Key::from_str(&format!("key_{:04}", i));
let value = CipherBlob::new(vec![i as u8; 200]);
lsm.put(key, value)?;
}
let stats = lsm.stats();
assert!(
stats.compaction_stats.compactions_completed > 0
|| !stats.levels[0].sstables.is_empty()
);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_compaction_stats() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_compaction_stats");
std::fs::create_dir_all(&dir).ok();
let mut config = LsmTreeConfig {
data_dir: dir.clone(),
..Default::default()
};
config.memtable_config.max_size_bytes = 512;
let lsm = LsmTree::with_config(config)?;
for i in 0..50 {
let key = Key::from_str(&format!("key_{:04}", i));
let value = CipherBlob::new(vec![i as u8; 100]);
lsm.put(key, value)?;
}
let stats = lsm.stats();
let _ = stats.compaction_stats.keys_processed;
let _ = stats.compaction_stats.tombstones_removed;
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_level_organization() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_levels");
std::fs::create_dir_all(&dir).ok();
let mut config = LsmTreeConfig {
data_dir: dir.clone(),
..Default::default()
};
config.memtable_config.max_size_bytes = 1024;
let lsm = LsmTree::with_config(config)?;
for i in 0..200 {
let key = Key::from_str(&format!("key_{:05}", i));
let value = CipherBlob::new(vec![i as u8; 150]);
lsm.put(key, value)?;
}
for i in 0..200 {
let key = Key::from_str(&format!("key_{:05}", i));
let value = lsm.get(&key)?;
assert!(value.is_some());
}
let stats = lsm.stats();
let total_sstables: usize = stats.levels.iter().map(|l| l.sstables.len()).sum();
assert!(total_sstables > 0 || stats.memtable_size > 0);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_bloom_filter_negative_lookups() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_bloom");
std::fs::create_dir_all(&dir).ok();
let mut config = LsmTreeConfig {
data_dir: dir.clone(),
..Default::default()
};
config.memtable_config.max_size_bytes = 512;
let lsm = LsmTree::with_config(config)?;
for i in 0..100 {
let key = Key::from_str(&format!("exists_{:04}", i));
let value = CipherBlob::new(vec![i as u8; 100]);
lsm.put(key, value)?;
}
for i in 0..100 {
let key = Key::from_str(&format!("exists_{:04}", i));
let result = lsm.get(&key)?;
assert!(result.is_some());
}
for i in 0..100 {
let key = Key::from_str(&format!("notexists_{:04}", i));
let result = lsm.get(&key)?;
assert!(result.is_none());
}
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_vlog_basic() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_vlog_basic");
std::fs::create_dir_all(&dir).ok();
let config = LsmTreeConfig {
data_dir: dir.clone(),
wal_dir: dir.join("wal"),
value_log_config: Some(ValueLogConfig {
vlog_dir: dir.join("vlog"),
max_file_size: 1024 * 1024, value_threshold: 1024, sync_on_write: false,
gc_threshold: 0.5,
}),
..Default::default()
};
let lsm = LsmTree::with_config(config)?;
let small_key = Key::from_str("small_key");
let small_value = CipherBlob::new(vec![1u8; 512]);
lsm.put(small_key.clone(), small_value.clone())?;
let large_key = Key::from_str("large_key");
let large_value = CipherBlob::new(vec![2u8; 2048]);
lsm.put(large_key.clone(), large_value.clone())?;
let retrieved_small = lsm.get(&small_key)?;
assert!(retrieved_small.is_some());
assert_eq!(
retrieved_small
.expect("Small value should be retrievable")
.as_bytes(),
&vec![1u8; 512]
);
let retrieved_large = lsm.get(&large_key)?;
assert!(retrieved_large.is_some());
assert_eq!(
retrieved_large
.expect("Large value should be retrievable")
.as_bytes(),
&vec![2u8; 2048]
);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_vlog_multiple_large_values() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_vlog_multiple");
std::fs::create_dir_all(&dir).ok();
let config = LsmTreeConfig {
data_dir: dir.clone(),
wal_dir: dir.join("wal"),
value_log_config: Some(ValueLogConfig {
vlog_dir: dir.join("vlog"),
max_file_size: 1024 * 1024,
value_threshold: 1024,
sync_on_write: false,
gc_threshold: 0.5,
}),
..Default::default()
};
let lsm = LsmTree::with_config(config)?;
for i in 0..20 {
let key = Key::from_str(&format!("large_key_{:02}", i));
let value = CipherBlob::new(vec![i as u8; 2048]);
lsm.put(key, value)?;
}
for i in 0..20 {
let key = Key::from_str(&format!("large_key_{:02}", i));
let value = lsm.get(&key)?;
assert!(value.is_some());
let retrieved = value.expect("Value should exist");
assert_eq!(retrieved.as_bytes()[0], i as u8);
assert_eq!(retrieved.as_bytes().len(), 2048);
}
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_vlog_with_flush() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_vlog_flush");
std::fs::create_dir_all(&dir).ok();
let mut config = LsmTreeConfig {
data_dir: dir.clone(),
wal_dir: dir.join("wal"),
value_log_config: Some(ValueLogConfig {
vlog_dir: dir.join("vlog"),
max_file_size: 1024 * 1024,
value_threshold: 1024,
sync_on_write: false,
gc_threshold: 0.5,
}),
..Default::default()
};
config.memtable_config.max_size_bytes = 4096;
let lsm = LsmTree::with_config(config)?;
for i in 0..50 {
let key = Key::from_str(&format!("key_{:03}", i));
let value = CipherBlob::new(vec![i as u8; 1500]); lsm.put(key, value)?;
}
for i in 0..50 {
let key = Key::from_str(&format!("key_{:03}", i));
let value = lsm.get(&key)?;
assert!(value.is_some());
let retrieved = value.expect("Value should exist");
assert_eq!(retrieved.as_bytes()[0], i as u8);
}
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_vlog_disabled() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_vlog_disabled");
std::fs::create_dir_all(&dir).ok();
let config = LsmTreeConfig {
data_dir: dir.clone(),
value_log_config: None, ..Default::default()
};
let lsm = LsmTree::with_config(config)?;
let key = Key::from_str("large_key");
let value = CipherBlob::new(vec![42u8; 5000]);
lsm.put(key.clone(), value.clone())?;
let retrieved = lsm.get(&key)?;
assert!(retrieved.is_some());
assert_eq!(
retrieved
.expect("Value should be retrievable after put")
.as_bytes(),
&vec![42u8; 5000]
);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_vlog_update() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_vlog_update");
std::fs::create_dir_all(&dir).ok();
let config = LsmTreeConfig {
data_dir: dir.clone(),
wal_dir: dir.join("wal"),
value_log_config: Some(ValueLogConfig {
vlog_dir: dir.join("vlog"),
max_file_size: 1024 * 1024,
value_threshold: 1024,
sync_on_write: false,
gc_threshold: 0.5,
}),
..Default::default()
};
let lsm = LsmTree::with_config(config)?;
let key = Key::from_str("update_key");
let value1 = CipherBlob::new(vec![1u8; 2048]);
lsm.put(key.clone(), value1)?;
let value2 = CipherBlob::new(vec![2u8; 2048]);
lsm.put(key.clone(), value2)?;
let retrieved = lsm.get(&key)?;
assert!(retrieved.is_some());
assert_eq!(
retrieved
.expect("Value should be retrievable after put")
.as_bytes()[0],
2u8
);
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_vlog_delete() -> Result<()> {
let dir = env::temp_dir().join("test_lsm_vlog_delete");
std::fs::create_dir_all(&dir).ok();
let config = LsmTreeConfig {
data_dir: dir.clone(),
wal_dir: dir.join("wal"),
value_log_config: Some(ValueLogConfig {
vlog_dir: dir.join("vlog"),
max_file_size: 1024 * 1024,
value_threshold: 1024,
sync_on_write: false,
gc_threshold: 0.5,
}),
..Default::default()
};
let lsm = LsmTree::with_config(config)?;
let key = Key::from_str("delete_key");
let value = CipherBlob::new(vec![42u8; 2048]);
lsm.put(key.clone(), value)?;
assert!(lsm.get(&key)?.is_some());
lsm.delete(key.clone())?;
assert!(lsm.get(&key)?.is_none());
std::fs::remove_dir_all(&dir).ok();
Ok(())
}
#[test]
fn test_lsm_tree_value_pointer_encoding() -> Result<()> {
let pointer = ValuePointer {
file_id: 123,
offset: 456789,
length: 2048,
checksum: 0xDEADBEEF,
};
let encoded = LsmTree::encode_value_pointer(&pointer);
assert!(LsmTree::is_value_pointer(&encoded));
let decoded = LsmTree::decode_value_pointer(&encoded)?;
assert_eq!(decoded.file_id, 123);
assert_eq!(decoded.offset, 456789);
assert_eq!(decoded.length, 2048);
assert_eq!(decoded.checksum, 0xDEADBEEF);
let regular_value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
assert!(!LsmTree::is_value_pointer(®ular_value));
Ok(())
}
}