use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::{self, File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tracing::{debug, info, span, warn, Level};
use scirs2_core::profiling::Profiler;
use crate::annotations::TripleAnnotation;
use crate::bloom_filter::BloomFilter;
use crate::StarResult;
#[derive(Debug, Clone)]
pub struct LsmConfig {
pub data_dir: PathBuf,
pub memtable_size_threshold: usize,
pub level_size_multiplier: usize,
pub max_levels: usize,
pub compaction_batch_size: usize,
pub enable_background_compaction: bool,
pub bloom_filter_fp_rate: f64,
pub enable_compression: bool,
}
impl Default for LsmConfig {
fn default() -> Self {
Self {
data_dir: std::env::temp_dir().join("oxirs_lsm"),
memtable_size_threshold: 16 * 1024 * 1024, level_size_multiplier: 10,
max_levels: 7,
compaction_batch_size: 4,
enable_background_compaction: true,
bloom_filter_fp_rate: 0.01,
enable_compression: true,
}
}
}
#[derive(Debug, Clone)]
struct MemTable {
data: BTreeMap<u64, TripleAnnotation>,
size_bytes: usize,
#[allow(dead_code)]
created_at: DateTime<Utc>,
}
impl MemTable {
fn new() -> Self {
Self {
data: BTreeMap::new(),
size_bytes: 0,
created_at: Utc::now(),
}
}
fn insert(&mut self, key: u64, annotation: TripleAnnotation) {
let entry_size = std::mem::size_of::<u64>()
+ std::mem::size_of::<TripleAnnotation>()
+ annotation.source.as_ref().map_or(0, |s| s.len());
self.data.insert(key, annotation);
self.size_bytes += entry_size;
}
fn get(&self, key: u64) -> Option<&TripleAnnotation> {
self.data.get(&key)
}
fn size_bytes(&self) -> usize {
self.size_bytes
}
fn is_empty(&self) -> bool {
self.data.is_empty()
}
fn len(&self) -> usize {
self.data.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SSTable {
id: u64,
level: usize,
#[serde(skip)]
path: PathBuf,
min_key: u64,
max_key: u64,
entry_count: usize,
file_size: usize,
created_at: DateTime<Utc>,
#[serde(skip)]
bloom_filter: Option<BloomFilter>,
}
impl SSTable {
fn from_memtable(
memtable: &MemTable,
id: u64,
level: usize,
data_dir: &Path,
enable_compression: bool,
bloom_fp_rate: f64,
) -> StarResult<Self> {
let path = data_dir.join(format!("sstable_{}_{}.dat", level, id));
let mut bloom_filter = BloomFilter::new(memtable.len(), bloom_fp_rate);
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let mut writer = BufWriter::new(file);
let header = SSTableHeader {
version: 1,
entry_count: memtable.len(),
enable_compression,
};
let header_bytes = oxicode::serde::encode_to_vec(&header, oxicode::config::standard())
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
writer
.write_all(&(header_bytes.len() as u32).to_le_bytes())
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
writer
.write_all(&header_bytes)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let mut min_key = u64::MAX;
let mut max_key = u64::MIN;
for (&key, annotation) in &memtable.data {
min_key = min_key.min(key);
max_key = max_key.max(key);
bloom_filter.insert(&key.to_le_bytes());
let entry = SSTableEntry {
key,
annotation: annotation.clone(),
};
let entry_bytes = oxicode::serde::encode_to_vec(&entry, oxicode::config::standard())
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
writer
.write_all(&(entry_bytes.len() as u32).to_le_bytes())
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
writer
.write_all(&entry_bytes)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
}
writer
.flush()
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let metadata = std::fs::metadata(&path)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
Ok(Self {
id,
level,
path,
min_key,
max_key,
entry_count: memtable.len(),
file_size: metadata.len() as usize,
created_at: Utc::now(),
bloom_filter: Some(bloom_filter),
})
}
fn get(&self, key: u64) -> StarResult<Option<TripleAnnotation>> {
if let Some(ref bloom) = self.bloom_filter {
if !bloom.contains(&key.to_le_bytes()) {
return Ok(None); }
}
if key < self.min_key || key > self.max_key {
return Ok(None);
}
let file =
File::open(&self.path).map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let mut reader = BufReader::new(file);
let mut header_len_bytes = [0u8; 4];
reader
.read_exact(&mut header_len_bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let header_len = u32::from_le_bytes(header_len_bytes) as usize;
let mut header_bytes = vec![0u8; header_len];
reader
.read_exact(&mut header_bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
loop {
let mut entry_len_bytes = [0u8; 4];
if reader.read_exact(&mut entry_len_bytes).is_err() {
break; }
let entry_len = u32::from_le_bytes(entry_len_bytes) as usize;
let mut entry_bytes = vec![0u8; entry_len];
reader
.read_exact(&mut entry_bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let entry: SSTableEntry =
oxicode::serde::decode_from_slice(&entry_bytes, oxicode::config::standard())
.map_err(|e| crate::StarError::parse_error(e.to_string()))?
.0;
if entry.key == key {
return Ok(Some(entry.annotation));
}
}
Ok(None)
}
fn scan(&self) -> StarResult<Vec<(u64, TripleAnnotation)>> {
let file =
File::open(&self.path).map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let mut reader = BufReader::new(file);
let mut header_len_bytes = [0u8; 4];
reader
.read_exact(&mut header_len_bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let header_len = u32::from_le_bytes(header_len_bytes) as usize;
let mut header_bytes = vec![0u8; header_len];
reader
.read_exact(&mut header_bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let mut results = Vec::new();
loop {
let mut entry_len_bytes = [0u8; 4];
if reader.read_exact(&mut entry_len_bytes).is_err() {
break; }
let entry_len = u32::from_le_bytes(entry_len_bytes) as usize;
let mut entry_bytes = vec![0u8; entry_len];
reader
.read_exact(&mut entry_bytes)
.map_err(|e| crate::StarError::parse_error(e.to_string()))?;
let entry: SSTableEntry =
oxicode::serde::decode_from_slice(&entry_bytes, oxicode::config::standard())
.map_err(|e| crate::StarError::parse_error(e.to_string()))?
.0;
results.push((entry.key, entry.annotation));
}
Ok(results)
}
}
#[derive(Debug, Serialize, Deserialize)]
struct SSTableHeader {
version: u32,
entry_count: usize,
enable_compression: bool,
}
#[derive(Debug, Serialize, Deserialize)]
struct SSTableEntry {
key: u64,
annotation: TripleAnnotation,
}
pub struct LsmAnnotationStore {
config: LsmConfig,
memtable: Arc<RwLock<MemTable>>,
immutable_memtables: Arc<RwLock<Vec<MemTable>>>,
sstables: Arc<RwLock<Vec<Vec<SSTable>>>>,
next_sstable_id: Arc<RwLock<u64>>,
#[allow(dead_code)]
profiler: Profiler,
stats: Arc<RwLock<LsmStatistics>>,
}
#[derive(Debug, Clone, Default)]
pub struct LsmStatistics {
pub total_writes: usize,
pub total_reads: usize,
pub memtable_hits: usize,
pub sstable_hits: usize,
pub bloom_filter_rejections: usize,
pub flush_count: usize,
pub compaction_count: usize,
pub bytes_written: usize,
pub bytes_read: usize,
}
impl LsmAnnotationStore {
pub fn new(config: LsmConfig) -> StarResult<Self> {
let span = span!(Level::INFO, "lsm_store_new");
let _enter = span.enter();
fs::create_dir_all(&config.data_dir)
.map_err(|e| crate::StarError::serialization_error(e.to_string()))?;
let mut levels = Vec::new();
for _ in 0..config.max_levels {
levels.push(Vec::new());
}
info!("Created LSM annotation store at {:?}", config.data_dir);
Ok(Self {
config,
memtable: Arc::new(RwLock::new(MemTable::new())),
immutable_memtables: Arc::new(RwLock::new(Vec::new())),
sstables: Arc::new(RwLock::new(levels)),
next_sstable_id: Arc::new(RwLock::new(1)),
profiler: Profiler::new(),
stats: Arc::new(RwLock::new(LsmStatistics::default())),
})
}
pub fn insert(&mut self, key: u64, annotation: TripleAnnotation) -> StarResult<()> {
let span = span!(Level::DEBUG, "lsm_insert");
let _enter = span.enter();
{
let mut memtable = self.memtable.write().unwrap_or_else(|e| e.into_inner());
memtable.insert(key, annotation);
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.total_writes += 1;
if memtable.size_bytes() >= self.config.memtable_size_threshold {
debug!(
"Memtable size {} exceeds threshold {}, triggering flush",
memtable.size_bytes(),
self.config.memtable_size_threshold
);
drop(memtable); self.flush_memtable()?;
}
}
Ok(())
}
pub fn get(&self, key: u64) -> StarResult<Option<TripleAnnotation>> {
let span = span!(Level::DEBUG, "lsm_get");
let _enter = span.enter();
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.total_reads += 1;
{
let memtable = self.memtable.read().unwrap_or_else(|e| e.into_inner());
if let Some(annotation) = memtable.get(key) {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.memtable_hits += 1;
return Ok(Some(annotation.clone()));
}
}
{
let immutable = self
.immutable_memtables
.read()
.unwrap_or_else(|e| e.into_inner());
for mem in immutable.iter().rev() {
if let Some(annotation) = mem.get(key) {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.memtable_hits += 1;
return Ok(Some(annotation.clone()));
}
}
}
let sstables = self.sstables.read().unwrap_or_else(|e| e.into_inner());
for level_tables in sstables.iter() {
for sstable in level_tables.iter().rev() {
if let Some(annotation) = sstable.get(key)? {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.sstable_hits += 1;
return Ok(Some(annotation));
}
}
}
Ok(None)
}
fn flush_memtable(&mut self) -> StarResult<()> {
let span = span!(Level::INFO, "flush_memtable");
let _enter = span.enter();
let old_memtable = {
let mut memtable = self.memtable.write().unwrap_or_else(|e| e.into_inner());
std::mem::replace(&mut *memtable, MemTable::new())
};
if old_memtable.is_empty() {
return Ok(());
}
info!("Flushing memtable with {} entries", old_memtable.len());
let sstable_id = {
let mut id = self
.next_sstable_id
.write()
.unwrap_or_else(|e| e.into_inner());
let current = *id;
*id += 1;
current
};
let sstable = SSTable::from_memtable(
&old_memtable,
sstable_id,
0, &self.config.data_dir,
self.config.enable_compression,
self.config.bloom_filter_fp_rate,
)?;
{
let mut sstables = self.sstables.write().unwrap_or_else(|e| e.into_inner());
sstables[0].push(sstable.clone());
}
{
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.flush_count += 1;
stats.bytes_written += sstable.file_size;
}
info!(
"Flushed memtable to SSTable {} ({} bytes)",
sstable_id, sstable.file_size
);
self.maybe_compact()?;
Ok(())
}
fn maybe_compact(&mut self) -> StarResult<()> {
let sstables = self.sstables.read().unwrap_or_else(|e| e.into_inner());
for level in 0..self.config.max_levels - 1 {
let level_size = sstables[level].len();
let threshold = self.config.level_size_multiplier.pow(level as u32);
if level_size >= threshold {
drop(sstables); info!(
"Level {} has {} SSTables (threshold {}), triggering compaction",
level, level_size, threshold
);
return self.compact_level(level);
}
}
Ok(())
}
fn compact_level(&mut self, level: usize) -> StarResult<()> {
let span = span!(Level::INFO, "compact_level", level = level);
let _enter = span.enter();
let tables_to_compact: Vec<SSTable> = {
let sstables = self.sstables.read().unwrap_or_else(|e| e.into_inner());
sstables[level]
.iter()
.take(self.config.compaction_batch_size)
.cloned()
.collect()
};
if tables_to_compact.is_empty() {
return Ok(());
}
info!(
"Compacting {} SSTables from level {}",
tables_to_compact.len(),
level
);
let mut all_entries: Vec<(u64, TripleAnnotation)> = Vec::new();
for sstable in &tables_to_compact {
let entries = sstable.scan()?;
all_entries.extend(entries);
}
all_entries.sort_by_key(|(k, _)| *k);
all_entries.dedup_by_key(|(k, _)| *k);
let mut new_memtable = MemTable::new();
for (key, annotation) in all_entries {
new_memtable.insert(key, annotation);
}
let sstable_id = {
let mut id = self
.next_sstable_id
.write()
.unwrap_or_else(|e| e.into_inner());
let current = *id;
*id += 1;
current
};
let new_sstable = SSTable::from_memtable(
&new_memtable,
sstable_id,
level + 1,
&self.config.data_dir,
self.config.enable_compression,
self.config.bloom_filter_fp_rate,
)?;
{
let mut sstables = self.sstables.write().unwrap_or_else(|e| e.into_inner());
let compact_ids: Vec<u64> = tables_to_compact.iter().map(|t| t.id).collect();
sstables[level].retain(|t| !compact_ids.contains(&t.id));
sstables[level + 1].push(new_sstable.clone());
}
for sstable in &tables_to_compact {
if let Err(e) = fs::remove_file(&sstable.path) {
warn!("Failed to delete SSTable file {:?}: {}", sstable.path, e);
}
}
{
let mut stats = self.stats.write().unwrap_or_else(|e| e.into_inner());
stats.compaction_count += 1;
}
info!(
"Completed compaction of level {} -> {}, created SSTable {}",
level,
level + 1,
sstable_id
);
Ok(())
}
pub fn statistics(&self) -> LsmStatistics {
self.stats.read().unwrap_or_else(|e| e.into_inner()).clone()
}
pub fn memtable_size(&self) -> usize {
self.memtable
.read()
.unwrap_or_else(|e| e.into_inner())
.size_bytes()
}
pub fn sstable_counts(&self) -> Vec<usize> {
self.sstables
.read()
.unwrap_or_else(|e| e.into_inner())
.iter()
.map(|level| level.len())
.collect()
}
pub fn force_flush(&mut self) -> StarResult<()> {
self.flush_memtable()
}
pub fn force_compact(&mut self, level: usize) -> StarResult<()> {
if level >= self.config.max_levels - 1 {
return Err(crate::StarError::invalid_quoted_triple("Invalid level"));
}
self.compact_level(level)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lsm_store_creation() {
let config = LsmConfig::default();
let store = LsmAnnotationStore::new(config);
assert!(store.is_ok());
}
#[test]
fn test_insert_and_get() {
let config = LsmConfig {
memtable_size_threshold: 1024 * 1024, ..Default::default()
};
let mut store = LsmAnnotationStore::new(config).unwrap();
let key = 12345u64;
let annotation = TripleAnnotation::new().with_confidence(0.9);
store.insert(key, annotation.clone()).unwrap();
let retrieved = store.get(key).unwrap();
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().confidence, Some(0.9));
}
#[test]
fn test_memtable_flush() {
let config = LsmConfig {
memtable_size_threshold: 100, ..Default::default()
};
let mut store = LsmAnnotationStore::new(config).unwrap();
for i in 0..100 {
let annotation = TripleAnnotation::new().with_confidence(0.8);
store.insert(i, annotation).unwrap();
}
let stats = store.statistics();
assert!(stats.flush_count > 0);
}
#[test]
fn test_sstable_counts() {
let config = LsmConfig::default();
let store = LsmAnnotationStore::new(config).unwrap();
let counts = store.sstable_counts();
assert_eq!(counts.len(), 7); assert!(counts.iter().all(|&c| c == 0));
}
#[test]
fn test_statistics() {
let config = LsmConfig {
memtable_size_threshold: 1024 * 1024,
..Default::default()
};
let mut store = LsmAnnotationStore::new(config).unwrap();
let annotation = TripleAnnotation::new().with_confidence(0.9);
store.insert(1, annotation).unwrap();
let _ = store.get(1).unwrap();
let _ = store.get(2).unwrap();
let stats = store.statistics();
assert_eq!(stats.total_writes, 1);
assert_eq!(stats.total_reads, 2);
assert_eq!(stats.memtable_hits, 1);
}
}