use std::collections::{HashMap, HashSet, VecDeque};
use std::fs::{self, File};
use std::io::{BufReader, ErrorKind, Read};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread::{self, JoinHandle, ThreadId};
use std::time::Duration;
use parking_lot::{Mutex, RwLock};
use hashbrown::HashMap as HbHashMap;
use ahash::RandomState;
use xxhash_rust::xxh3::xxh3_64;
use serde_json::Value;
use uuid::Uuid;
use crate::config::{ConfigError, ConfigManager, DbConfig};
use crate::index::bloom::BloomFilter;
use crate::index::hot::HotIndex;
use crate::index::secondary::{
FullTextSidecar, JsonTrieSidecar, RoaringBitmapSidecar, SlugMap, TimeFenceIndex,
};
use crate::index::sparse::{SparseEntry, SparseIndex};
use crate::index::{VersionLocation, VersionPointer};
use crate::observe::{OperationKind, OperationObserver};
use crate::query::{self, FilterRequest, IndexRegistry, MvccSnapshot, SegmentAccelerators};
use crate::storage::manifest::SegmentTier;
use crate::storage::{
CompactionJob, Compactor, CompressionCodec, FileManifest, FileSegment, Manifest, Segment,
SegmentMetadata, SegmentReader, StorageError,
};
use crate::write::flush::{FlushCoordinator, FlushOutcome};
use crate::write::memtable::MemTableEntry;
use crate::write::wal::{Wal, WalOptions};
use crate::write::{
CommitOutcome, CommitReceipt, GroupCommitConfig, GroupCommitController, GroupCommitError,
Mutation, Transaction,
};
pub type Result<T> = std::result::Result<T, DatabaseError>;
#[derive(Debug, thiserror::Error)]
pub enum DatabaseError {
#[error("configuration error: {0}")]
Config(#[from] ConfigError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("write-path error: {0}")]
WritePath(#[from] GroupCommitError),
}
#[derive(Clone, Debug)]
pub struct DatabaseOptions {
pub data_dir: PathBuf,
pub wal_direct_io: bool,
pub wal_ring_bytes: usize,
pub compression: CompressionCodec,
pub secondary_indexes: SecondaryIndexConfig,
pub fsync_on_commit: bool,
pub wal_max_batch_ops: usize,
pub index_on_write: bool,
}
impl DatabaseOptions {
pub fn new<P: Into<PathBuf>>(data_dir: P) -> Self {
Self {
data_dir: data_dir.into(),
wal_direct_io: true,
wal_ring_bytes: 4 * 1024 * 1024,
compression: CompressionCodec::None,
secondary_indexes: SecondaryIndexConfig::default(),
fsync_on_commit: true,
wal_max_batch_ops: 1024,
index_on_write: true,
}
}
}
impl Default for DatabaseOptions {
fn default() -> Self {
Self::new("./data")
}
}
type SlugExtractor = Arc<dyn Fn(&[u8], &[u8]) -> Option<String> + Send + Sync + 'static>;
type TimestampExtractor = Arc<dyn Fn(&[u8], &[u8]) -> Option<i64> + Send + Sync + 'static>;
type TagExtractor = Arc<dyn Fn(&[u8], &[u8]) -> Vec<String> + Send + Sync + 'static>;
type JsonExtractor = Arc<dyn Fn(&[u8], &[u8]) -> Vec<(String, Value)> + Send + Sync + 'static>;
type FullTextExtractor = Arc<dyn Fn(&[u8], &[u8]) -> Option<String> + Send + Sync + 'static>;
const MAINTENANCE_POLL_INTERVAL: Duration = Duration::from_millis(50);
#[derive(Clone, Default)]
pub struct SecondaryIndexConfig {
pub slug: Option<SlugExtractor>,
pub timestamp: Option<TimestampExtractor>,
pub roaring_tags: Option<TagExtractor>,
pub json_paths: Option<JsonExtractor>,
pub full_text: Option<FullTextExtractor>,
}
impl std::fmt::Debug for SecondaryIndexConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SecondaryIndexConfig")
.field("slug", &self.slug.is_some())
.field("timestamp", &self.timestamp.is_some())
.field("roaring_tags", &self.roaring_tags.is_some())
.field("json_paths", &self.json_paths.is_some())
.field("full_text", &self.full_text.is_some())
.finish()
}
}
struct ShardedExactIndex {
shards: Vec<RwLock<HbHashMap<Vec<u8>, VersionPointer, RandomState>>>,
}
impl ShardedExactIndex {
fn new(shard_count: usize) -> Self {
let n = shard_count.max(1).next_power_of_two();
let mut shards = Vec::with_capacity(n);
for _ in 0..n {
shards.push(RwLock::new(HbHashMap::with_hasher(RandomState::default())));
}
Self { shards }
}
#[inline]
fn shard_id(&self, key: &[u8]) -> usize {
(xxh3_64(key) as usize) & (self.shards.len() - 1)
}
#[inline]
fn get(&self, key: &[u8]) -> Option<VersionPointer> {
let id = self.shard_id(key);
self.shards[id].read().get(key).cloned()
}
#[inline]
fn insert(&self, key: Vec<u8>, pointer: VersionPointer) {
let id = self.shard_id(&key);
self.shards[id].write().insert(key, pointer);
}
fn batch_insert(&self, entries: &[(Vec<u8>, VersionPointer)]) {
if entries.is_empty() {
return;
}
let n = self.shards.len();
let mut buckets: Vec<Vec<(Vec<u8>, VersionPointer)>> = (0..n).map(|_| Vec::new()).collect();
for (k, v) in entries.iter() {
let id = self.shard_id(k);
buckets[id].push((k.clone(), v.clone()));
}
for (id, bucket) in buckets.into_iter().enumerate() {
if bucket.is_empty() {
continue;
}
let mut map = self.shards[id].write();
for (k, v) in bucket {
map.insert(k, v);
}
}
}
}
impl SecondaryIndexConfig {
fn is_empty(&self) -> bool {
self.slug.is_none()
&& self.timestamp.is_none()
&& self.roaring_tags.is_none()
&& self.json_paths.is_none()
&& self.full_text.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct ApplyOutcome {
pub receipt: CommitReceipt,
pub pointers: Vec<(Vec<u8>, VersionPointer)>,
}
#[derive(Clone, Default)]
struct CachedIndexMetadata {
slug: Option<String>,
timestamp: Option<i64>,
tags: Vec<String>,
json_fields: Vec<(String, Value)>,
full_text: Option<String>,
}
#[derive(Clone)]
struct MutationApplyResult {
key: Vec<u8>,
pointer: Option<VersionPointer>,
}
#[derive(Clone, Debug)]
struct SidecarEntry {
key: Vec<u8>,
offset: u64,
}
#[derive(Clone)]
struct SegmentCacheEntry {
segment: Arc<FileSegment>,
reader: Arc<SegmentReader>,
sidecar: Arc<Vec<SidecarEntry>>,
sparse: Option<Arc<SparseIndex>>,
bloom: Option<Arc<BloomFilter>>,
}
impl SegmentCacheEntry {
fn accelerators(&self) -> SegmentAccelerators<'_> {
SegmentAccelerators {
sparse: self.sparse.as_deref(),
bloom: self.bloom.as_deref(),
}
}
}
#[derive(Clone, Debug, Default)]
struct DecodedSegmentEntry<'a> {
key: &'a [u8],
value: &'a [u8],
sequence: u64,
tombstone: bool,
}
pub struct RecordCursor {
entries: Vec<(Vec<u8>, VersionPointer)>,
position: usize,
}
impl RecordCursor {
fn new(entries: Vec<(Vec<u8>, VersionPointer)>) -> Self {
Self {
entries,
position: 0,
}
}
pub fn remaining(&self) -> usize {
self.entries.len().saturating_sub(self.position)
}
pub fn is_empty(&self) -> bool {
self.remaining() == 0
}
pub fn next(&mut self, db: &Database) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
while self.position < self.entries.len() {
let (key, pointer) = &self.entries[self.position];
self.position += 1;
if let Some(value) = db.resolve_pointer(key, pointer)? {
return Ok(Some((key.clone(), value)));
}
}
Ok(None)
}
pub fn collect(mut self, db: &Database) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut results = Vec::with_capacity(self.remaining());
while let Some((key, value)) = self.next(db)? {
results.push((key, value));
}
Ok(results)
}
}
#[derive(Debug)]
struct EntryCache {
cap: usize,
tick: u64,
map: HashMap<Vec<u8>, CacheEntry>,
q: VecDeque<(Vec<u8>, u64)>,
}
#[derive(Clone, Debug)]
struct CacheEntry {
val: Arc<[u8]>,
seq: u64,
age: u64,
}
impl EntryCache {
fn new(capacity: usize) -> Self {
Self { cap: capacity.max(1), tick: 0, map: HashMap::new(), q: VecDeque::new() }
}
fn get_with_seq(&mut self, key: &[u8], seq: u64) -> Option<Arc<[u8]>> {
if let Some(entry) = self.map.get_mut(key) {
if entry.seq == seq {
let ret = entry.val.clone();
self.tick = self.tick.wrapping_add(1);
entry.age = self.tick;
self.q.push_back((key.to_vec(), self.tick));
return Some(ret);
}
}
None
}
fn put_with_seq(&mut self, key: &[u8], value: Arc<[u8]>, seq: u64) {
self.tick = self.tick.wrapping_add(1);
self.map.insert(key.to_vec(), CacheEntry { val: value, seq, age: self.tick });
self.q.push_back((key.to_vec(), self.tick));
self.evict_if_needed();
}
fn remove(&mut self, key: &[u8]) {
self.map.remove(key);
}
fn evict_if_needed(&mut self) {
while self.map.len() > self.cap {
if let Some((k, g)) = self.q.pop_front() {
let evict = matches!(self.map.get(&k), Some(ce) if ce.age == g);
if evict {
self.map.remove(&k);
} else {
continue;
}
} else {
break;
}
}
}
}
#[derive(Debug)]
struct HotShardCache {
prefix_len: usize,
max_shards: usize,
max_entries: usize,
tick: u64,
entry_count: usize,
shards: HashMap<Vec<u8>, Shard>,
lru: VecDeque<(Vec<u8>, u64)>,
}
#[derive(Debug)]
struct Shard {
age: u64,
entries: HashMap<Vec<u8>, CacheEntry>, }
impl HotShardCache {
fn new(prefix_len: usize, max_shards: usize, max_entries: usize) -> Self {
Self {
prefix_len: prefix_len.max(1),
max_shards: max_shards.max(1),
max_entries: max_entries.max(1),
tick: 0,
entry_count: 0,
shards: HashMap::new(),
lru: VecDeque::new(),
}
}
fn shard_key<'a>(&self, key: &'a [u8]) -> (&'a [u8], &'a [u8]) {
let split = key.len().min(self.prefix_len);
(&key[..split], &key[split..])
}
fn get_with_seq(&mut self, key: &[u8], seq: u64) -> Option<Arc<[u8]>> {
let (pfx, sfx) = self.shard_key(key);
if let Some(shard) = self.shards.get_mut(pfx) {
if let Some(entry) = shard.entries.get(sfx) {
if entry.seq == seq {
self.tick = self.tick.wrapping_add(1);
shard.age = self.tick;
self.lru.push_back((pfx.to_vec(), self.tick));
return Some(entry.val.clone());
}
}
}
None
}
fn put_with_seq(&mut self, key: &[u8], val: Arc<[u8]>, seq: u64) {
let (pfx, sfx) = self.shard_key(key);
self.tick = self.tick.wrapping_add(1);
let shard = self.shards.entry(pfx.to_vec()).or_insert_with(|| Shard {
age: self.tick,
entries: HashMap::new(),
});
let was_new = !shard.entries.contains_key(sfx);
shard
.entries
.insert(sfx.to_vec(), CacheEntry { val, seq, age: self.tick });
shard.age = self.tick;
self.lru.push_back((pfx.to_vec(), self.tick));
if was_new {
self.entry_count += 1;
}
self.evict_if_needed();
}
fn evict_if_needed(&mut self) {
while self.shards.len() > self.max_shards || self.entry_count > self.max_entries {
if let Some((pfx, g)) = self.lru.pop_front() {
let evict = matches!(self.shards.get(&pfx), Some(s) if s.age == g);
if evict {
if let Some(removed) = self.shards.remove(&pfx) {
self.entry_count = self.entry_count.saturating_sub(removed.entries.len());
}
}
} else {
break;
}
}
}
}
#[derive(Debug)]
struct Caches {
entry: EntryCache,
shards: HotShardCache,
}
impl Caches {
fn new(entry: EntryCache, shards: HotShardCache) -> Self {
Self { entry, shards }
}
fn get_with_seq(&mut self, key: &[u8], seq: u64) -> Option<Arc<[u8]>> {
if let Some(v) = self.shards.get_with_seq(key, seq) {
return Some(v);
}
self.entry.get_with_seq(key, seq)
}
fn put_with_seq(&mut self, key: &[u8], val: Arc<[u8]>, seq: u64) {
self.entry.put_with_seq(key, val.clone(), seq);
self.shards.put_with_seq(key, val, seq);
}
}
pub struct Database {
config: Arc<ConfigManager>,
manifest: Arc<Mutex<FileManifest>>,
controller: Arc<GroupCommitController>,
indexes: Arc<IndexRegistry>,
index_config: Arc<SecondaryIndexConfig>,
index_metadata: Arc<RwLock<HbHashMap<Vec<u8>, CachedIndexMetadata, RandomState>>>,
segment_cache: Arc<Mutex<std::collections::HashMap<String, Arc<SegmentCacheEntry>>>>,
record_ids: Arc<AtomicU64>,
metrics: OperationObserver,
compactor: Arc<Compactor<FileManifest>>,
scheduled_compactions: Arc<Mutex<HashSet<String>>>,
shutdown: Arc<AtomicBool>,
shutdown_notify: Arc<(Mutex<bool>, parking_lot::Condvar)>,
compression: CompressionCodec,
write_buffers: Arc<Mutex<HashMap<ThreadId, Vec<Mutation>>>>,
max_batch_ops: usize,
caches: Arc<Mutex<Caches>>,
exact_index: Arc<ShardedExactIndex>,
index_on_write: bool,
db_id: u64,
background_workers: Vec<JoinHandle<()>>,
}
impl Database {
#[inline]
fn mark_thread_pending(&self) {
thread_local! {
static PENDING: std::cell::RefCell<HashMap<u64, bool>> = Default::default();
}
let id = self.db_id;
PENDING.with(|map| {
let mut m = map.borrow_mut();
m.insert(id, true);
});
}
#[inline]
fn clear_thread_pending(&self) {
thread_local! {
static PENDING: std::cell::RefCell<HashMap<u64, bool>> = Default::default();
}
let id = self.db_id;
PENDING.with(|map| {
let mut m = map.borrow_mut();
m.remove(&id);
});
}
#[inline]
fn thread_has_pending(&self) -> bool {
thread_local! {
static PENDING: std::cell::RefCell<HashMap<u64, bool>> = Default::default();
}
let id = self.db_id;
PENDING.with(|map| map.borrow().get(&id).copied().unwrap_or(false))
}
#[inline]
fn flush_if_pending(&self) -> Result<()> {
if self.thread_has_pending() {
self.flush_thread_buffer_for(thread::current().id())?;
self.clear_thread_pending();
}
Ok(())
}
pub fn open(options: DatabaseOptions) -> Result<Self> {
fs::create_dir_all(&options.data_dir)?;
let mut config = DbConfig::new(&options.data_dir);
config.wal_fsync_on_commit = options.fsync_on_commit;
config.wal_max_batch_ops = options.wal_max_batch_ops.max(1);
let config_manager = Arc::new(ConfigManager::new(config.clone())?);
let wal_options = WalOptions { path: config.wal_path.clone(), direct_io: options.wal_direct_io, ring_bytes: options.wal_ring_bytes };
let wal = Wal::open(wal_options)?;
let compression = options.compression.clone();
let flush = FlushCoordinator::new(&config.data_dir, compression.clone());
let metrics = OperationObserver::new();
let group_commit_config = group_commit_config_from(&config);
let controller = Arc::new(GroupCommitController::new(
wal,
flush,
group_commit_config.clone(),
metrics.clone(),
));
let manifest = FileManifest::open(&config.manifest_path)?;
let manifest = Arc::new(Mutex::new(manifest));
let compactor = Arc::new(Compactor::spawn(
Arc::clone(&manifest),
config.data_dir.join("segments"),
)?);
let secondary_indexes = Arc::new(options.secondary_indexes.clone());
let indexes = Arc::new(IndexRegistry::new(
HotIndex::new(),
SlugMap::new(),
TimeFenceIndex::new(),
RoaringBitmapSidecar::new(),
JsonTrieSidecar::new(),
FullTextSidecar::new(),
));
let scheduled_compactions = Arc::new(Mutex::new(HashSet::new()));
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_notify = Arc::new((Mutex::new(false), parking_lot::Condvar::new()));
let write_buffers = Arc::new(Mutex::new(HashMap::new()));
let caches = Arc::new(Mutex::new(Caches::new(EntryCache::new(65_536), HotShardCache::new(4, 64, 4096))));
static NEXT_DB_ID: AtomicU64 = AtomicU64::new(1);
let db_id = NEXT_DB_ID.fetch_add(1, Ordering::Relaxed);
let mut database = Self {
config: Arc::clone(&config_manager),
manifest,
controller,
indexes,
index_config: Arc::clone(&secondary_indexes),
index_metadata: Arc::new(RwLock::new(HbHashMap::with_hasher(RandomState::default()))),
segment_cache: Arc::new(Mutex::new(std::collections::HashMap::new())),
record_ids: Arc::new(AtomicU64::new(0)),
metrics: metrics.clone(),
compactor,
scheduled_compactions,
shutdown,
shutdown_notify: Arc::clone(&shutdown_notify),
compression,
write_buffers,
max_batch_ops: group_commit_config.max_batch_ops,
caches,
exact_index: Arc::new(ShardedExactIndex::new(64)),
index_on_write: options.index_on_write,
db_id,
background_workers: Vec::new(),
};
database.spawn_background_workers()?;
Ok(database)
}
pub fn config_manager(&self) -> Arc<ConfigManager> {
Arc::clone(&self.config)
}
pub fn manifest(&self) -> Arc<Mutex<FileManifest>> {
Arc::clone(&self.manifest)
}
pub fn group_commit_controller(&self) -> Arc<GroupCommitController> {
Arc::clone(&self.controller)
}
pub fn index_registry(&self) -> Arc<IndexRegistry> {
Arc::clone(&self.indexes)
}
pub fn metrics(&self) -> OperationObserver {
self.metrics.clone()
}
pub fn apply_batch(&self, operations: Vec<Mutation>) -> Result<ApplyOutcome> {
if operations.is_empty() {
return Ok(ApplyOutcome::default());
}
let operations_len = operations.len();
let thread_id = thread::current().id();
let mut operations = operations;
let should_flush = {
let mut buffers = self.write_buffers.lock();
let buffer = buffers.entry(thread_id).or_insert_with(Vec::new);
buffer.append(&mut operations);
buffer.len() >= self.max_batch_ops
};
if should_flush {
if let Some((receipt, results)) = self.flush_thread_buffer_for(thread_id)? {
self.clear_thread_pending();
let total = results.len();
let start = total.saturating_sub(operations_len);
let pointers = results
.into_iter()
.skip(start)
.filter_map(|result| result.pointer.map(|pointer| (result.key, pointer)))
.collect();
Ok(ApplyOutcome { receipt, pointers })
} else {
Ok(ApplyOutcome::default())
}
} else {
self.mark_thread_pending();
Ok(ApplyOutcome::default())
}
}
pub fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<ApplyOutcome> {
self.apply_batch(vec![Mutation::Put { key, value }])
}
pub fn delete(&self, key: Vec<u8>) -> Result<ApplyOutcome> {
self.apply_batch(vec![Mutation::Delete { key }])
}
pub fn sync(&self) -> Result<Vec<FlushOutcome>> {
self.flush_all_buffers()?;
if let Err(err) = self.controller.sync_wal() {
return Err(DatabaseError::Io(err));
}
let flushes = self.controller.drain_completed_flushes();
if flushes.is_empty() {
return Ok(flushes);
}
let context = self.maintenance_context();
context.process_flush_outcomes(flushes.clone())?;
Ok(flushes)
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
if let Some(pointer) = self.exact_index.get(key) {
match &pointer.location {
VersionLocation::MemTable => {
let memtable = self.controller.current_memtable();
if let Some(entry) = memtable.get(key) {
if !entry.tombstone {
return Ok(Some(entry.value.as_ref().to_vec()));
}
}
return Ok(None);
}
_ => {
if let Some(mut caches) = self.caches.try_lock() {
if let Some(val) = caches.get_with_seq(key, pointer.sequence) {
return Ok(Some(val.to_vec()));
}
}
return self.resolve_pointer(key, &pointer);
}
}
}
let snapshot = MvccSnapshot::open();
let accelerators = SegmentAccelerators::empty();
if let Some(pointer) = query::get(&self.indexes, accelerators, key, snapshot) {
if matches!(pointer.location, VersionLocation::MemTable) {
let memtable = self.controller.current_memtable();
if let Some(entry) = memtable.get(key) {
if !entry.tombstone {
return Ok(Some(entry.value.as_ref().to_vec()));
}
}
return Ok(None);
}
if let Some(mut caches) = self.caches.try_lock() {
if let Some(val) = caches.get_with_seq(key, pointer.sequence) {
return Ok(Some(val.to_vec()));
}
}
self.resolve_pointer(key, &pointer)
} else {
let segment_ids = {
let manifest = self.manifest.lock();
SegmentTier::all()
.iter()
.flat_map(|tier| {
manifest
.segments_in(tier)
.iter()
.map(|meta| meta.id.clone())
})
.collect::<Vec<_>>()
};
for segment_id in segment_ids {
let handle = self.segment_handle(&segment_id)?;
if !handle
.sidecar
.iter()
.any(|entry| entry.key.as_slice() == key)
{
continue;
}
let accelerators = handle.accelerators();
if accelerators.sparse.is_none() && accelerators.bloom.is_none() {
continue;
}
if let Some(pointer) = query::get(&self.indexes, accelerators, key, snapshot) {
if let Some(val) = self.caches.lock().get_with_seq(key, pointer.sequence) {
return Ok(Some(val.to_vec()));
}
return self.resolve_pointer(key, &pointer);
}
}
Ok(None)
}
}
pub fn get_by_slug(&self, slug: &str) -> Result<Option<Vec<u8>>> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
let snapshot = MvccSnapshot::open();
if let Some((key, pointer)) = query::get_by_slug(&self.indexes, slug, snapshot) {
self.resolve_pointer(&key, &pointer)
} else {
Ok(None)
}
}
pub fn get_ref(&self, key: &[u8]) -> Result<Option<ValueRef>> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
if let Some(pointer) = self.exact_index.get(key) {
match &pointer.location {
VersionLocation::MemTable => {
let memtable = self.controller.current_memtable();
if let Some(entry) = memtable.get(key) {
if !entry.tombstone {
return Ok(Some(ValueRef::Mem(entry.value.clone())));
}
}
return Ok(None);
}
_ => {
if let Some(val) = self.caches.lock().get_with_seq(key, pointer.sequence) {
return Ok(Some(ValueRef::Mem(val)));
}
return self.resolve_pointer_ref(key, &pointer);
}
}
}
let snapshot = MvccSnapshot::open();
let accelerators = SegmentAccelerators::empty();
if let Some(pointer) = query::get(&self.indexes, accelerators, key, snapshot) {
if matches!(pointer.location, VersionLocation::MemTable) {
let memtable = self.controller.current_memtable();
if let Some(entry) = memtable.get(key) {
if !entry.tombstone {
return Ok(Some(ValueRef::Mem(entry.value.clone())));
}
}
return Ok(None);
}
if let Some(mut caches) = self.caches.try_lock() {
if let Some(val) = caches.get_with_seq(key, pointer.sequence) {
return Ok(Some(ValueRef::Mem(val)));
}
}
return self.resolve_pointer_ref(key, &pointer);
}
let segment_ids = {
let manifest = self.manifest.lock();
SegmentTier::all()
.iter()
.flat_map(|tier| {
manifest
.segments_in(tier)
.iter()
.map(|meta| meta.id.clone())
})
.collect::<Vec<_>>()
};
for segment_id in segment_ids {
let handle = self.segment_handle(&segment_id)?;
if !handle
.sidecar
.iter()
.any(|entry| entry.key.as_slice() == key)
{
continue;
}
let accelerators = handle.accelerators();
if accelerators.sparse.is_none() && accelerators.bloom.is_none() {
continue;
}
if let Some(pointer) = query::get(&self.indexes, accelerators, key, snapshot) {
if let Some(val) = self.caches.lock().get_with_seq(key, pointer.sequence) {
return Ok(Some(ValueRef::Mem(val)));
}
return self.resolve_pointer_ref(key, &pointer);
}
}
Ok(None)
}
pub fn scan_time(&self, start: i64, end: i64) -> Result<RecordCursor> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
let snapshot = MvccSnapshot::open();
let results = query::scan_time(&self.indexes, start, end, snapshot);
Ok(RecordCursor::new(results))
}
pub fn filter(&self, request: &FilterRequest<'_>) -> Result<RecordCursor> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
let snapshot = MvccSnapshot::open();
let results = query::filter(&self.indexes, request, snapshot);
Ok(RecordCursor::new(results))
}
pub fn search(&self, query: &str) -> Result<RecordCursor> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
let snapshot = MvccSnapshot::open();
let results = query::search(&self.indexes, query, snapshot);
Ok(RecordCursor::new(results))
}
pub fn scan_prefix(&self, prefix: &[u8]) -> Result<RecordCursor> {
self.flush_if_pending()?;
let _timer = self.metrics.timer(OperationKind::Query);
let snapshot = MvccSnapshot::open();
let results = query::scan_prefix(&self.indexes, prefix, snapshot);
Ok(RecordCursor::new(results))
}
fn extract_metadata(&self, key: &[u8], value: &[u8]) -> CachedIndexMetadata {
extract_metadata_from_config(&self.index_config, key, value)
}
fn build_pointer(&self, entry: &MemTableEntry, timestamp: Option<i64>) -> VersionPointer {
build_memtable_pointer(self.record_ids.as_ref(), entry, timestamp)
}
fn update_indexes_for_put(
&self,
key: &[u8],
pointer: &VersionPointer,
metadata: &CachedIndexMetadata,
) {
update_indexes_for_put_with_registry(self.indexes.as_ref(), key, pointer, metadata)
}
fn update_indexes_for_delete(
&self,
key: &[u8],
pointer: &VersionPointer,
metadata: Option<&CachedIndexMetadata>,
) {
update_indexes_for_delete_with_registry(self.indexes.as_ref(), key, pointer, metadata)
}
fn resolve_pointer(&self, key: &[u8], pointer: &VersionPointer) -> Result<Option<Vec<u8>>> {
match &pointer.location {
VersionLocation::MemTable => {
let memtable = self.controller.current_memtable();
Ok(memtable
.get(key)
.filter(|entry| !entry.tombstone)
.map(|entry| {
let arc = entry.value.clone();
let seq = entry.sequence;
if let Some(mut caches) = self.caches.try_lock() {
caches.put_with_seq(key, arc.clone(), seq);
}
arc.as_ref().to_vec()
}))
}
VersionLocation::Segment {
segment_id,
offset,
length: _,
} => self.read_segment_value(segment_id, *offset, key, pointer.sequence),
}
}
fn resolve_pointer_ref(&self, key: &[u8], pointer: &VersionPointer) -> Result<Option<ValueRef>> {
match &pointer.location {
VersionLocation::MemTable => {
let memtable = self.controller.current_memtable();
Ok(memtable
.get(key)
.filter(|entry| !entry.tombstone)
.map(|entry| ValueRef::Mem(entry.value.clone())))
}
VersionLocation::Segment { segment_id, offset, length: _ } => {
self.read_segment_value_ref(segment_id, *offset, key, pointer.sequence)
}
}
}
fn flush_current_thread_buffer(&self) -> Result<()> {
self.flush_thread_buffer_for(thread::current().id())?;
Ok(())
}
fn flush_thread_buffer_for(
&self,
thread_id: ThreadId,
) -> Result<Option<(CommitReceipt, Vec<MutationApplyResult>)>> {
let operations = {
let mut buffers = self.write_buffers.lock();
buffers.remove(&thread_id)
};
if let Some(operations) = operations {
if operations.is_empty() {
return Ok(None);
}
let outcome = self.submit_buffered_operations(operations)?;
Ok(Some(outcome))
} else {
Ok(None)
}
}
fn flush_all_buffers(&self) -> Result<()> {
let buffers = {
let mut buffers = self.write_buffers.lock();
buffers.drain().map(|(_, ops)| ops).collect::<Vec<_>>()
};
for operations in buffers {
if operations.is_empty() {
continue;
}
self.submit_buffered_operations(operations)?;
}
Ok(())
}
fn submit_buffered_operations(
&self,
operations: Vec<Mutation>,
) -> Result<(CommitReceipt, Vec<MutationApplyResult>)> {
if operations.is_empty() {
return Ok((CommitReceipt::default(), Vec::new()));
}
let outcome = self.controller.submit(Transaction::new(operations))?;
let CommitOutcome { receipt, entries } = outcome;
if self.index_config.is_empty() {
if !self.index_on_write {
let mut results = Vec::with_capacity(entries.len());
for entry in entries {
let key = entry.key.as_ref().to_vec();
let pointer = self.build_pointer(&entry, None);
results.push(MutationApplyResult { key, pointer: Some(pointer) });
}
return Ok((receipt, results));
}
let mut hot_batch = Vec::with_capacity(entries.len());
let mut exact_batch = Vec::with_capacity(entries.len());
let mut results = Vec::with_capacity(entries.len());
for entry in entries {
let key_arc = Arc::clone(&entry.key);
let key_vec = key_arc.as_ref().to_vec();
let pointer = self.build_pointer(&entry, None);
hot_batch.push((key_arc, pointer.clone()));
exact_batch.push((key_vec.clone(), pointer.clone()));
results.push(MutationApplyResult { key: key_vec, pointer: Some(pointer) });
}
self.exact_index.batch_insert(&exact_batch);
self.indexes.hot.upsert_batch(hot_batch);
Ok((receipt, results))
} else {
if !self.index_on_write {
let mut results = Vec::with_capacity(entries.len());
for entry in entries {
let key = entry.key.as_ref().to_vec();
let pointer = self.build_pointer(&entry, None);
results.push(MutationApplyResult { key, pointer: Some(pointer) });
}
return Ok((receipt, results));
}
let mut metadata_guard = self.index_metadata.write();
let mut results = Vec::with_capacity(entries.len());
let mut exact_batch = Vec::with_capacity(entries.len());
for entry in entries {
let key = entry.key.as_ref().to_vec();
if entry.tombstone {
let cached = metadata_guard.remove(key.as_slice());
let pointer =
self.build_pointer(&entry, cached.as_ref().and_then(|m| m.timestamp));
self.update_indexes_for_delete(&key, &pointer, cached.as_ref());
exact_batch.push((key.clone(), pointer.clone()));
results.push(MutationApplyResult {
key,
pointer: Some(pointer),
});
} else {
let metadata = self.extract_metadata(&key, entry.value.as_ref());
let pointer = self.build_pointer(&entry, metadata.timestamp);
self.update_indexes_for_put(&key, &pointer, &metadata);
metadata_guard.insert(key.clone(), metadata);
exact_batch.push((key.clone(), pointer.clone()));
results.push(MutationApplyResult {
key,
pointer: Some(pointer),
});
}
}
self.exact_index.batch_insert(&exact_batch);
Ok((receipt, results))
}
}
fn read_segment_value(
&self,
segment_id: &str,
offset: u64,
expected_key: &[u8],
expected_seq: u64,
) -> Result<Option<Vec<u8>>> {
let handle = self.segment_handle(segment_id)?;
let buffer = handle.reader.as_bytes();
let decoded = decode_segment_entry(buffer, offset)?;
if decoded.key != expected_key {
return Ok(None);
}
if decoded.tombstone {
return Ok(None);
}
let value = decoded.value.to_vec();
let arc: Arc<[u8]> = Arc::from(value.clone());
if let Some(mut caches) = self.caches.try_lock() {
caches.put_with_seq(expected_key, arc.clone(), expected_seq);
}
Ok(Some(value))
}
fn read_segment_value_ref(
&self,
segment_id: &str,
offset: u64,
expected_key: &[u8],
expected_seq: u64,
) -> Result<Option<ValueRef>> {
let handle = self.segment_handle(segment_id)?;
let buffer = handle.reader.as_bytes();
let decoded = decode_segment_entry(buffer, offset)?;
if decoded.key != expected_key || decoded.tombstone {
return Ok(None);
}
let v = decoded.value.to_vec();
let arc: Arc<[u8]> = Arc::from(v.clone());
if let Some(mut caches) = self.caches.try_lock() {
caches.put_with_seq(expected_key, arc.clone(), expected_seq);
}
let start = decoded.value.as_ptr() as usize - buffer.as_ptr() as usize;
let len = decoded.value.len();
Ok(Some(ValueRef::Seg { reader: handle.reader.clone(), start, len }))
}
fn segment_handle(&self, segment_id: &str) -> Result<Arc<SegmentCacheEntry>> {
if let Some(entry) = self.segment_cache.lock().get(segment_id) {
let _ = entry.segment.metadata();
return Ok(entry.clone());
}
let loaded = self.load_segment(segment_id)?;
let mut guard = self.segment_cache.lock();
Ok(guard
.entry(segment_id.to_string())
.or_insert_with(|| loaded.clone())
.clone())
}
fn load_segment(&self, segment_id: &str) -> Result<Arc<SegmentCacheEntry>> {
let metadata = {
let manifest = self.manifest.lock();
SegmentTier::all()
.iter()
.filter_map(|tier| {
manifest
.segments_in(tier)
.iter()
.find(|s| s.id == segment_id)
})
.map(|metadata| metadata.clone())
.next()
};
let metadata = metadata.ok_or_else(|| {
StorageError::InvalidFormat(format!("segment {segment_id} not found"))
})?;
let segment = Arc::new(FileSegment::new(metadata.clone()));
let reader = Arc::new(segment.open_reader()?);
let sidecar = Arc::new(self.load_sidecar_entries(segment_id)?);
let (sparse, bloom) = self.build_segment_indexes(segment_id, &reader, &sidecar)?;
Ok(Arc::new(SegmentCacheEntry {
segment,
reader,
sidecar,
sparse: sparse.map(Arc::new),
bloom: bloom.map(Arc::new),
}))
}
fn build_segment_indexes(
&self,
segment_id: &str,
reader: &SegmentReader,
sidecar: &[SidecarEntry],
) -> Result<(Option<SparseIndex>, Option<BloomFilter>)> {
build_segment_indexes_from_sidecar(segment_id, reader, sidecar)
}
fn load_sidecar_entries(&self, segment_id: &str) -> Result<Vec<SidecarEntry>> {
let path = self.sidecar_path(segment_id);
read_sidecar_entries(&path)
}
fn sidecar_path(&self, segment_id: &str) -> PathBuf {
let config = self.config.current();
manifest_sidecar_path(&config, segment_id)
}
fn maintenance_context(&self) -> MaintenanceContext {
MaintenanceContext {
manifest: Arc::clone(&self.manifest),
config: Arc::clone(&self.config),
indexes: Arc::clone(&self.indexes),
index_config: Arc::clone(&self.index_config),
index_metadata: Arc::clone(&self.index_metadata),
segment_cache: Arc::clone(&self.segment_cache),
record_ids: Arc::clone(&self.record_ids),
metrics: self.metrics.clone(),
compactor: Arc::clone(&self.compactor),
scheduled_compactions: Arc::clone(&self.scheduled_compactions),
compression: self.compression.clone(),
}
}
fn spawn_background_workers(&mut self) -> Result<()> {
let controller = Arc::clone(&self.controller);
let shutdown = Arc::clone(&self.shutdown);
let shutdown_notify = Arc::clone(&self.shutdown_notify);
let context = self.maintenance_context();
let handle = std::thread::Builder::new()
.name("db-maintenance".to_string())
.spawn(move || {
loop {
if let Err(err) = context.sync_manifest() {
eprintln!("failed to reconcile manifest: {err}");
}
let flushes = controller.drain_completed_flushes();
if flushes.is_empty() {
if shutdown.load(Ordering::Relaxed) {
break;
}
let (lock, cv) = &*shutdown_notify;
let mut flag = lock.lock();
if *flag {
break;
}
let _ = cv.wait_for(&mut flag, MAINTENANCE_POLL_INTERVAL);
if *flag || shutdown.load(Ordering::Relaxed) {
break;
}
continue;
}
if let Err(err) = context.process_flush_outcomes(flushes) {
eprintln!("background flush processing failed: {err}");
}
}
})?;
self.background_workers.push(handle);
Ok(())
}
}
fn extract_metadata_from_config(
config: &SecondaryIndexConfig,
key: &[u8],
value: &[u8],
) -> CachedIndexMetadata {
let mut metadata = CachedIndexMetadata::default();
if let Some(extractor) = &config.slug {
metadata.slug = extractor(key, value);
}
if let Some(extractor) = &config.timestamp {
metadata.timestamp = extractor(key, value);
}
if let Some(extractor) = &config.roaring_tags {
metadata.tags = extractor(key, value);
}
if let Some(extractor) = &config.json_paths {
metadata.json_fields = extractor(key, value);
}
if let Some(extractor) = &config.full_text {
metadata.full_text = extractor(key, value);
}
metadata
}
fn update_indexes_for_put_with_registry(
indexes: &IndexRegistry,
key: &[u8],
pointer: &VersionPointer,
metadata: &CachedIndexMetadata,
) {
let key_vec = key.to_vec();
indexes.hot.upsert(key_vec.clone(), pointer.clone());
if let Some(slug) = &metadata.slug {
indexes
.slug
.insert(slug.clone(), key_vec.clone(), pointer.clone());
}
if let Some(timestamp) = metadata.timestamp {
indexes
.time_fence
.insert(timestamp, key_vec.clone(), pointer.clone());
}
for tag in &metadata.tags {
indexes
.roaring
.add(tag.clone(), key_vec.clone(), pointer.clone());
}
for (path, value) in &metadata.json_fields {
indexes
.json_trie
.index_value(path.clone(), value, key_vec.clone(), pointer.clone());
}
if let Some(text) = &metadata.full_text {
indexes.fts.index_document(text, key_vec, pointer.clone());
}
}
fn update_indexes_for_delete_with_registry(
indexes: &IndexRegistry,
key: &[u8],
pointer: &VersionPointer,
metadata: Option<&CachedIndexMetadata>,
) {
let key_vec = key.to_vec();
indexes.hot.upsert(key_vec.clone(), pointer.clone());
if let Some(metadata) = metadata {
if let Some(slug) = &metadata.slug {
indexes
.slug
.insert(slug.clone(), key_vec.clone(), pointer.clone());
}
if let Some(timestamp) = metadata.timestamp {
indexes
.time_fence
.insert(timestamp, key_vec.clone(), pointer.clone());
}
for tag in &metadata.tags {
indexes
.roaring
.add(tag.clone(), key_vec.clone(), pointer.clone());
}
for (path, value) in &metadata.json_fields {
indexes
.json_trie
.index_value(path.clone(), value, key_vec.clone(), pointer.clone());
}
if let Some(text) = &metadata.full_text {
indexes
.fts
.index_document(text, key_vec.clone(), pointer.clone());
}
}
}
fn build_memtable_pointer(
record_ids: &AtomicU64,
entry: &MemTableEntry,
timestamp: Option<i64>,
) -> VersionPointer {
VersionPointer {
record_id: record_ids.fetch_add(1, Ordering::SeqCst) + 1,
sequence: entry.sequence,
location: VersionLocation::MemTable,
tombstone: entry.tombstone,
timestamp,
}
}
fn build_segment_pointer(
record_ids: &AtomicU64,
sequence: u64,
segment_id: String,
offset: u64,
length: u32,
tombstone: bool,
timestamp: Option<i64>,
) -> VersionPointer {
VersionPointer {
record_id: record_ids.fetch_add(1, Ordering::SeqCst) + 1,
sequence,
location: VersionLocation::Segment {
segment_id,
offset,
length,
},
tombstone,
timestamp,
}
}
#[derive(Clone)]
struct MaintenanceContext {
manifest: Arc<Mutex<FileManifest>>,
config: Arc<ConfigManager>,
indexes: Arc<IndexRegistry>,
index_config: Arc<SecondaryIndexConfig>,
index_metadata: Arc<RwLock<HbHashMap<Vec<u8>, CachedIndexMetadata, RandomState>>>,
segment_cache: Arc<Mutex<HashMap<String, Arc<SegmentCacheEntry>>>>,
record_ids: Arc<AtomicU64>,
metrics: OperationObserver,
compactor: Arc<Compactor<FileManifest>>,
scheduled_compactions: Arc<Mutex<HashSet<String>>>,
compression: CompressionCodec,
}
impl MaintenanceContext {
fn process_flush_outcomes(&self, flushes: Vec<FlushOutcome>) -> Result<()> {
for outcome in flushes {
self.process_flush(outcome)?;
}
Ok(())
}
fn process_flush(&self, outcome: FlushOutcome) -> Result<()> {
let segment = Arc::new(outcome.segment);
let metadata = segment.metadata().clone();
if metadata.size_bytes == 0 {
return Ok(());
}
let reader = Arc::new(segment.open_reader()?);
let config = self.config.current();
let index_path = manifest_sidecar_path(&config, &metadata.id);
let sidecar_entries = read_sidecar_entries(&index_path)?;
let (sparse, bloom) =
build_segment_indexes_from_sidecar(&metadata.id, reader.as_ref(), &sidecar_entries)?;
let cache_entry = Arc::new(SegmentCacheEntry {
segment: Arc::clone(&segment),
reader: Arc::clone(&reader),
sidecar: Arc::new(sidecar_entries.clone()),
sparse: sparse.map(Arc::new),
bloom: bloom.map(Arc::new),
});
self.segment_cache
.lock()
.insert(metadata.id.clone(), cache_entry);
self.register_segment(&metadata)?;
self.update_indexes(&metadata.id, reader.as_ref(), &sidecar_entries)?;
Ok(())
}
fn register_segment(&self, metadata: &SegmentMetadata) -> Result<()> {
let hot_segments = {
let mut manifest = self.manifest.lock();
manifest.register_segment(SegmentTier::Hot, metadata.clone())?;
manifest.persist()?;
manifest.segments_in(&SegmentTier::Hot).to_vec()
};
self.maybe_schedule_compaction(hot_segments);
Ok(())
}
fn update_indexes(
&self,
segment_id: &str,
reader: &SegmentReader,
entries: &[SidecarEntry],
) -> Result<()> {
let mut metadata_guard = self.index_metadata.write();
for entry in entries {
let decoded = decode_segment_entry(reader.as_bytes(), entry.offset)?;
let key_vec = decoded.key.to_vec();
let mut cached = metadata_guard.get(&key_vec).cloned();
if !decoded.tombstone && cached.is_none() {
let meta =
extract_metadata_from_config(&self.index_config, &key_vec, decoded.value);
metadata_guard.insert(key_vec.clone(), meta.clone());
cached = Some(meta);
}
let pointer = build_segment_pointer(
self.record_ids.as_ref(),
decoded.sequence,
segment_id.to_string(),
entry.offset,
decoded.value.len() as u32,
decoded.tombstone,
cached.as_ref().and_then(|m| m.timestamp),
);
if decoded.tombstone {
update_indexes_for_delete_with_registry(
self.indexes.as_ref(),
&key_vec,
&pointer,
cached.as_ref(),
);
metadata_guard.remove(&key_vec);
} else if let Some(meta) = cached.as_ref() {
update_indexes_for_put_with_registry(
self.indexes.as_ref(),
&key_vec,
&pointer,
meta,
);
}
}
Ok(())
}
fn maybe_schedule_compaction(&self, hot_segments: Vec<SegmentMetadata>) {
const HOT_COMPACTION_FAN_IN: usize = 4;
let mut scheduled = self.scheduled_compactions.lock();
scheduled.retain(|id| hot_segments.iter().any(|segment| &segment.id == id));
if hot_segments.len() < HOT_COMPACTION_FAN_IN {
return;
}
let mut candidates = Vec::new();
for segment in &hot_segments {
if candidates.len() >= HOT_COMPACTION_FAN_IN {
break;
}
if scheduled.contains(&segment.id) {
continue;
}
scheduled.insert(segment.id.clone());
candidates.push(segment.clone());
}
drop(scheduled);
if candidates.len() != HOT_COMPACTION_FAN_IN {
return;
}
let job = CompactionJob {
job_id: format!("hot-{}", Uuid::new_v4()),
input_segments: candidates.clone(),
target_tier: SegmentTier::Warm,
compression: self.compression.clone(),
};
if let Err(err) = self.compactor.submit(job) {
eprintln!("failed to submit compaction job: {err}");
let mut scheduled = self.scheduled_compactions.lock();
for segment in &candidates {
scheduled.remove(&segment.id);
}
} else {
let rewritten: u64 = candidates.iter().map(|meta| meta.size_bytes).sum();
self.metrics
.record_value(OperationKind::Compaction, rewritten);
}
}
fn ingest_segment_from_metadata(&self, metadata: SegmentMetadata) -> Result<()> {
let segment = Arc::new(FileSegment::new(metadata.clone()));
let reader = Arc::new(segment.open_reader()?);
let config = self.config.current();
let sidecar_entries = read_sidecar_entries(&manifest_sidecar_path(&config, &metadata.id))?;
let (sparse, bloom) =
build_segment_indexes_from_sidecar(&metadata.id, reader.as_ref(), &sidecar_entries)?;
let cache_entry = Arc::new(SegmentCacheEntry {
segment,
reader: Arc::clone(&reader),
sidecar: Arc::new(sidecar_entries.clone()),
sparse: sparse.map(Arc::new),
bloom: bloom.map(Arc::new),
});
self.segment_cache
.lock()
.insert(metadata.id.clone(), cache_entry);
self.update_indexes(&metadata.id, reader.as_ref(), &sidecar_entries)
}
fn sync_manifest(&self) -> Result<()> {
let snapshot: Vec<SegmentMetadata> = {
let manifest = self.manifest.lock();
SegmentTier::all()
.iter()
.flat_map(|tier| manifest.segments_in(tier).iter().cloned())
.collect()
};
let known_ids: HashSet<String> = snapshot.iter().map(|meta| meta.id.clone()).collect();
let mut cache_guard = self.segment_cache.lock();
cache_guard.retain(|id, _| known_ids.contains(id));
let cached_ids: HashSet<String> = cache_guard.keys().cloned().collect();
drop(cache_guard);
for metadata in snapshot {
if !cached_ids.contains(&metadata.id) {
self.ingest_segment_from_metadata(metadata.clone())?;
}
}
Ok(())
}
}
fn build_segment_indexes_from_sidecar(
segment_id: &str,
reader: &SegmentReader,
sidecar: &[SidecarEntry],
) -> Result<(Option<SparseIndex>, Option<BloomFilter>)> {
if sidecar.is_empty() {
return Ok((None, None));
}
let mut sparse_entries = Vec::with_capacity(sidecar.len());
let mut bloom = BloomFilter::new();
for (idx, entry) in sidecar.iter().enumerate() {
let decoded = decode_segment_entry(reader.as_bytes(), entry.offset)?;
bloom.insert(decoded.key.to_vec());
let pointer = VersionPointer {
record_id: idx as u64,
sequence: decoded.sequence,
location: VersionLocation::Segment {
segment_id: segment_id.to_string(),
offset: entry.offset,
length: decoded.value.len() as u32,
},
tombstone: decoded.tombstone,
timestamp: None,
};
sparse_entries.push(SparseEntry {
key: decoded.key.to_vec(),
pointer,
});
}
let sparse = if sparse_entries.is_empty() {
None
} else {
Some(SparseIndex::new(sparse_entries))
};
let bloom = if bloom.len() == 0 { None } else { Some(bloom) };
Ok((sparse, bloom))
}
fn read_sidecar_entries(path: &Path) -> Result<Vec<SidecarEntry>> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut entries = Vec::new();
loop {
let mut len_buf = [0u8; 4];
match reader.read_exact(&mut len_buf) {
Ok(()) => {}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
Err(err) => return Err(err.into()),
}
let key_len = u32::from_le_bytes(len_buf) as usize;
let mut key = vec![0u8; key_len];
reader.read_exact(&mut key)?;
let mut offset_buf = [0u8; 8];
reader.read_exact(&mut offset_buf)?;
let offset = u64::from_le_bytes(offset_buf);
entries.push(SidecarEntry { key, offset });
}
Ok(entries)
}
fn manifest_sidecar_path(config: &DbConfig, segment_id: &str) -> PathBuf {
config
.data_dir
.join("indexes")
.join(format!("{segment_id}.idx"))
}
fn decode_segment_entry<'a>(
buffer: &'a [u8],
offset: u64,
) -> std::result::Result<DecodedSegmentEntry<'a>, StorageError> {
let start = offset as usize;
let header_len = 4 + 4 + 8 + 1;
if buffer.len() < start + header_len {
return Err(StorageError::InvalidFormat(
"segment entry header exceeds segment bounds".into(),
));
}
let mut key_len_bytes = [0u8; 4];
key_len_bytes.copy_from_slice(&buffer[start..start + 4]);
let key_len = u32::from_le_bytes(key_len_bytes) as usize;
let mut value_len_bytes = [0u8; 4];
value_len_bytes.copy_from_slice(&buffer[start + 4..start + 8]);
let value_len = u32::from_le_bytes(value_len_bytes) as usize;
let mut sequence_bytes = [0u8; 8];
sequence_bytes.copy_from_slice(&buffer[start + 8..start + 16]);
let sequence = u64::from_le_bytes(sequence_bytes);
let tombstone = buffer[start + 16] != 0;
let key_start = start + header_len;
let key_end = key_start
.checked_add(key_len)
.ok_or_else(|| StorageError::InvalidFormat("segment key length overflow".into()))?;
if key_end > buffer.len() {
return Err(StorageError::InvalidFormat(
"segment key exceeds segment bounds".into(),
));
}
let value_start = key_end;
let value_end = value_start
.checked_add(value_len)
.ok_or_else(|| StorageError::InvalidFormat("segment value length overflow".into()))?;
if value_end > buffer.len() {
return Err(StorageError::InvalidFormat(
"segment value exceeds segment bounds".into(),
));
}
Ok(DecodedSegmentEntry {
key: &buffer[key_start..key_end],
value: &buffer[value_start..value_end],
sequence,
tombstone,
})
}
impl Drop for Database {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
{
let (lock, cv) = &*self.shutdown_notify;
let mut flag = lock.lock();
*flag = true;
cv.notify_all();
}
for handle in self.background_workers.drain(..) {
handle.join().ok();
}
if let Err(err) = self.flush_all_buffers() {
eprintln!("failed to flush buffered mutations during shutdown: {err}");
}
if let Err(err) = self.maintenance_context().sync_manifest() {
eprintln!("failed to reconcile manifest during shutdown: {err}");
}
let remaining = self.controller.drain_completed_flushes();
if !remaining.is_empty() {
if let Err(err) = self.maintenance_context().process_flush_outcomes(remaining) {
eprintln!("failed to process flushes during shutdown: {err}");
}
}
if let Err(err) = self.manifest.lock().persist() {
eprintln!("failed to persist manifest on drop: {err}");
}
}
}
fn group_commit_config_from(config: &DbConfig) -> GroupCommitConfig {
let min = config.wal_group_interval();
let max = min + min;
GroupCommitConfig {
max_batch_ops: config.wal_max_batch_ops,
max_batch_bytes: config.wal_group_bytes,
min_interval: min,
max_interval: max,
fsync_on_commit: config.wal_fsync_on_commit,
}
}
pub enum ValueRef {
Mem(Arc<[u8]>),
Seg { reader: Arc<SegmentReader>, start: usize, len: usize },
}
impl ValueRef {
pub fn as_slice(&self) -> &[u8] {
match self {
ValueRef::Mem(data) => data.as_ref(),
ValueRef::Seg { reader, start, len } => {
let buf = reader.as_bytes();
&buf[*start..*start + *len]
}
}
}
}