use std::ops::RangeInclusive;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use heed::byteorder::BigEndian;
use heed::types::{Bytes, SerdeBincode, Str, U64};
use heed::{Database, Env};
use crate::error::{StoreError, StoreResult};
use crate::orchestrator::DurabilityMode;
#[cfg(test)]
use crate::storage::journal::JournalPruneReport;
use crate::storage::metadata::{GcWatermark, MetadataStore, ShardLayout};
use crate::types::{BlockId, JournalMeta};
mod durability;
mod env;
mod layout;
use env::EnvHandles;
pub struct LmdbMetadataStore {
env: Arc<Env>,
path: PathBuf,
state_db: Database<Str, SerdeBincode<BlockId>>,
config_db: Database<Str, Bytes>,
journal_offsets_db: Database<U64<BigEndian>, SerdeBincode<JournalMeta>>,
gc_watermark_db: Database<Str, Bytes>,
snapshot_watermark_db: Database<Str, SerdeBincode<BlockId>>,
}
impl LmdbMetadataStore {
const CURRENT_BLOCK_KEY: &'static str = "current_block";
const SHARD_LAYOUT_KEY: &'static str = "shard_layout";
const DURABILITY_MODE_KEY: &'static str = "durability_mode";
const LMDB_MAP_SIZE_KEY: &'static str = "lmdb_map_size";
const JOURNAL_CHUNK_SIZE_KEY: &'static str = "journal_chunk_size_bytes";
const MIN_ROLLBACK_WINDOW_KEY: &'static str = "min_rollback_window";
const PRUNE_INTERVAL_KEY: &'static str = "prune_interval_nanos";
const BOOTSTRAP_BLOCK_PROFILE_KEY: &'static str = "bootstrap_block_profile";
const GC_WATERMARK_KEY: &'static str = "pending_plan";
const SNAPSHOT_WATERMARK_KEY: &'static str = "latest_snapshot_block";
const DEFAULT_MAP_SIZE: usize = env::DEFAULT_MAP_SIZE;
pub fn new(path: impl AsRef<Path>) -> StoreResult<Self> {
Self::new_with_map_size(path, Self::DEFAULT_MAP_SIZE)
}
pub fn new_with_map_size(path: impl AsRef<Path>, map_size: usize) -> StoreResult<Self> {
let handles = env::open_rw(path.as_ref(), map_size)?;
let store = Self::from_handles(handles);
let actual_map_size = store.effective_map_size();
store.ensure_lmdb_map_size(actual_map_size)?;
Ok(store)
}
pub fn env(&self) -> Arc<Env> {
Arc::clone(&self.env)
}
pub fn effective_map_size(&self) -> usize {
self.env.info().map_size
}
pub fn path(&self) -> &Path {
&self.path
}
fn from_handles(handles: EnvHandles) -> Self {
Self {
env: Arc::new(handles.env),
path: handles.path,
state_db: handles.state_db,
config_db: handles.config_db,
journal_offsets_db: handles.journal_offsets_db,
gc_watermark_db: handles.gc_watermark_db,
snapshot_watermark_db: handles.snapshot_watermark_db,
}
}
pub fn load_shard_layout(&self) -> StoreResult<Option<ShardLayout>> {
layout::load(self.env.as_ref(), &self.config_db, Self::SHARD_LAYOUT_KEY)
}
pub fn store_shard_layout(&self, layout: &ShardLayout) -> StoreResult<()> {
layout::store(
self.env.as_ref(),
&self.config_db,
Self::SHARD_LAYOUT_KEY,
layout,
)
}
pub fn load_durability_mode(&self) -> StoreResult<Option<DurabilityMode>> {
durability::load_mode(
self.env.as_ref(),
&self.config_db,
Self::DURABILITY_MODE_KEY,
)
}
pub fn store_durability_mode(&self, mode: &DurabilityMode) -> StoreResult<()> {
durability::store_mode(
self.env.as_ref(),
&self.config_db,
Self::DURABILITY_MODE_KEY,
mode,
)
}
pub fn load_lmdb_map_size(&self) -> StoreResult<Option<usize>> {
durability::load_map_size(self.env.as_ref(), &self.config_db, Self::LMDB_MAP_SIZE_KEY)
}
pub fn store_lmdb_map_size(&self, map_size: usize) -> StoreResult<()> {
durability::store_map_size(
self.env.as_ref(),
&self.config_db,
Self::LMDB_MAP_SIZE_KEY,
map_size,
)
}
pub fn ensure_lmdb_map_size(&self, map_size: usize) -> StoreResult<()> {
durability::ensure_map_size(
self.env.as_ref(),
&self.config_db,
Self::LMDB_MAP_SIZE_KEY,
map_size,
)
}
pub fn load_journal_chunk_size(&self) -> StoreResult<Option<u64>> {
let txn = self.env.read_txn()?;
if let Some(bytes) = self.config_db.get(&txn, Self::JOURNAL_CHUNK_SIZE_KEY)? {
let stored: u64 = bincode::deserialize(bytes)?;
Ok(Some(stored))
} else {
Ok(None)
}
}
pub fn store_journal_chunk_size(&self, size: u64) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
let encoded = bincode::serialize(&size)?;
self.config_db
.put(&mut txn, Self::JOURNAL_CHUNK_SIZE_KEY, &encoded)?;
txn.commit()?;
Ok(())
}
pub fn load_min_rollback_window(&self) -> StoreResult<Option<BlockId>> {
let txn = self.env.read_txn()?;
if let Some(bytes) = self.config_db.get(&txn, Self::MIN_ROLLBACK_WINDOW_KEY)? {
let stored: BlockId = bincode::deserialize(bytes)?;
Ok(Some(stored))
} else {
Ok(None)
}
}
pub fn store_min_rollback_window(&self, window: BlockId) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
let encoded = bincode::serialize(&window)?;
self.config_db
.put(&mut txn, Self::MIN_ROLLBACK_WINDOW_KEY, &encoded)?;
txn.commit()?;
Ok(())
}
pub fn load_prune_interval(&self) -> StoreResult<Option<Duration>> {
let txn = self.env.read_txn()?;
if let Some(bytes) = self.config_db.get(&txn, Self::PRUNE_INTERVAL_KEY)? {
let stored: u64 = bincode::deserialize(bytes)?;
Ok(Some(Duration::from_nanos(stored)))
} else {
Ok(None)
}
}
pub fn store_prune_interval(&self, interval: Duration) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
let nanos = interval.as_nanos().min(u64::MAX as u128) as u64;
let encoded = bincode::serialize(&nanos)?;
self.config_db
.put(&mut txn, Self::PRUNE_INTERVAL_KEY, &encoded)?;
txn.commit()?;
Ok(())
}
pub fn load_bootstrap_block_profile(&self) -> StoreResult<Option<u64>> {
let txn = self.env.read_txn()?;
if let Some(bytes) = self
.config_db
.get(&txn, Self::BOOTSTRAP_BLOCK_PROFILE_KEY)?
{
let stored: u64 = bincode::deserialize(bytes)?;
Ok(Some(stored))
} else {
Ok(None)
}
}
pub fn store_bootstrap_block_profile(&self, profile: u64) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
let encoded = bincode::serialize(&profile)?;
self.config_db
.put(&mut txn, Self::BOOTSTRAP_BLOCK_PROFILE_KEY, &encoded)?;
txn.commit()?;
Ok(())
}
pub fn load_gc_watermark(&self) -> StoreResult<Option<GcWatermark>> {
let txn = self.env.read_txn()?;
if let Some(bytes) = self.gc_watermark_db.get(&txn, Self::GC_WATERMARK_KEY)? {
Ok(Some(bincode::deserialize(bytes)?))
} else {
Ok(None)
}
}
pub fn store_gc_watermark(&self, watermark: &GcWatermark) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
let encoded = bincode::serialize(watermark)?;
self.gc_watermark_db
.put(&mut txn, Self::GC_WATERMARK_KEY, &encoded)?;
txn.commit()?;
Ok(())
}
pub fn clear_gc_watermark(&self) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
self.gc_watermark_db
.delete(&mut txn, Self::GC_WATERMARK_KEY)?;
txn.commit()?;
Ok(())
}
pub fn load_snapshot_watermark(&self) -> StoreResult<Option<BlockId>> {
let txn = self.env.read_txn()?;
if let Some(value) = self
.snapshot_watermark_db
.get(&txn, Self::SNAPSHOT_WATERMARK_KEY)?
{
Ok(Some(value))
} else {
Ok(None)
}
}
pub fn store_snapshot_watermark(&self, block: BlockId) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
self.snapshot_watermark_db
.put(&mut txn, Self::SNAPSHOT_WATERMARK_KEY, &block)?;
txn.commit()?;
Ok(())
}
}
impl Clone for LmdbMetadataStore {
fn clone(&self) -> Self {
Self {
env: Arc::clone(&self.env),
path: self.path.clone(),
state_db: self.state_db,
config_db: self.config_db,
journal_offsets_db: self.journal_offsets_db,
gc_watermark_db: self.gc_watermark_db,
snapshot_watermark_db: self.snapshot_watermark_db,
}
}
}
impl MetadataStore for LmdbMetadataStore {
fn current_block(&self) -> StoreResult<BlockId> {
let txn = self.env.read_txn()?;
let value = self.state_db.get(&txn, Self::CURRENT_BLOCK_KEY)?;
Ok(value.unwrap_or(0))
}
fn set_current_block(&self, block: BlockId) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
self.state_db
.put(&mut txn, Self::CURRENT_BLOCK_KEY, &block)?;
txn.commit()?;
Ok(())
}
fn put_journal_offset(&self, block: BlockId, meta: &JournalMeta) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
self.journal_offsets_db.put(&mut txn, &block, meta)?;
txn.commit()?;
Ok(())
}
fn get_journal_offsets(&self, range: RangeInclusive<BlockId>) -> StoreResult<Vec<JournalMeta>> {
let start = *range.start();
let end = *range.end();
if start > end {
return Err(StoreError::InvalidBlockRange { start, end });
}
let txn = self.env.read_txn()?;
let iter = self.journal_offsets_db.range(&txn, &(start..=end))?;
let mut metas = Vec::new();
for result in iter {
let (_block, meta) = result?;
metas.push(meta);
}
Ok(metas)
}
fn last_journal_offset_at_or_before(&self, block: BlockId) -> StoreResult<Option<JournalMeta>> {
let txn = self.env.read_txn()?;
let range = self.journal_offsets_db.range(&txn, &(0..=block))?;
let mut last_meta = None;
for result in range {
let (_key, meta) = result?;
last_meta = Some(meta);
}
Ok(last_meta)
}
fn remove_journal_offsets_after(&self, block: BlockId) -> StoreResult<()> {
if block == BlockId::MAX {
return Ok(());
}
let mut txn = self.env.write_txn()?;
let range_start = block.checked_add(1).ok_or(StoreError::InvalidBlockRange {
start: block,
end: block,
})?;
let iter = self
.journal_offsets_db
.range(&txn, &(range_start..=BlockId::MAX))?;
let mut to_remove = Vec::new();
for result in iter {
let (key, _) = result?;
to_remove.push(key);
}
for key in to_remove {
self.journal_offsets_db.delete(&mut txn, &key)?;
}
txn.commit()?;
Ok(())
}
fn prune_journal_offsets_at_or_before(&self, block: BlockId) -> StoreResult<usize> {
let mut txn = self.env.write_txn()?;
let current_block = self
.state_db
.get(&txn, Self::CURRENT_BLOCK_KEY)?
.unwrap_or(0);
let iter = self.journal_offsets_db.range(&txn, &(0..=block))?;
let mut removed = 0usize;
let mut to_remove = Vec::new();
for result in iter {
let (key, _) = result?;
to_remove.push(key);
}
for key in to_remove {
self.journal_offsets_db.delete(&mut txn, &key)?;
removed += 1;
}
self.state_db
.put(&mut txn, Self::CURRENT_BLOCK_KEY, ¤t_block)?;
txn.commit()?;
Ok(removed)
}
fn record_block_commit(&self, block: BlockId, meta: &JournalMeta) -> StoreResult<()> {
let mut txn = self.env.write_txn()?;
self.journal_offsets_db.put(&mut txn, &block, meta)?;
self.state_db
.put(&mut txn, Self::CURRENT_BLOCK_KEY, &block)?;
txn.commit()?;
Ok(())
}
fn record_block_commits(&self, entries: &[(BlockId, JournalMeta)]) -> StoreResult<()> {
if entries.is_empty() {
return Ok(());
}
let mut txn = self.env.write_txn()?;
for (block, meta) in entries {
self.journal_offsets_db.put(&mut txn, block, meta)?;
}
if let Some((last_block, _)) = entries.last() {
self.state_db
.put(&mut txn, Self::CURRENT_BLOCK_KEY, last_block)?;
}
txn.commit()?;
Ok(())
}
fn load_gc_watermark(&self) -> StoreResult<Option<GcWatermark>> {
LmdbMetadataStore::load_gc_watermark(self)
}
fn store_gc_watermark(&self, watermark: &GcWatermark) -> StoreResult<()> {
LmdbMetadataStore::store_gc_watermark(self, watermark)
}
fn clear_gc_watermark(&self) -> StoreResult<()> {
LmdbMetadataStore::clear_gc_watermark(self)
}
fn load_snapshot_watermark(&self) -> StoreResult<Option<BlockId>> {
LmdbMetadataStore::load_snapshot_watermark(self)
}
fn store_snapshot_watermark(&self, block: BlockId) -> StoreResult<()> {
LmdbMetadataStore::store_snapshot_watermark(self, block)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use tempfile::tempdir_in;
use crate::error::StoreError;
use crate::storage::journal::ChunkDeletionPlan;
fn sample_meta(block_height: BlockId, chunk_offset: u64) -> JournalMeta {
JournalMeta {
block_height,
chunk_id: 1,
chunk_offset,
compressed_len: 16,
checksum: 1234 + block_height as u32,
}
}
#[test]
fn current_block_defaults_to_zero_and_updates() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).expect("metadata store should initialize");
assert_eq!(store.current_block().unwrap(), 0);
store.set_current_block(42).unwrap();
assert_eq!(store.current_block().unwrap(), 42);
assert!(store.path().exists());
}
#[test]
fn journal_offsets_round_trip() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
let meta1 = sample_meta(1, 0);
let meta3 = sample_meta(3, 128);
store.put_journal_offset(1, &meta1).unwrap();
store.put_journal_offset(3, &meta3).unwrap();
let result = store.get_journal_offsets(1..=3).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0].block_height, 1);
assert_eq!(result[0].chunk_offset, 0);
assert_eq!(result[0].compressed_len, 16);
assert_eq!(result[0].checksum, 1235);
assert_eq!(result[1].block_height, 3);
assert_eq!(result[1].chunk_offset, 128);
assert_eq!(result[1].checksum, 1237);
let empty = store.get_journal_offsets(4..=6).unwrap();
assert!(empty.is_empty());
}
#[test]
fn pruning_config_round_trip() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
assert!(store.load_min_rollback_window().unwrap().is_none());
store.store_min_rollback_window(256).unwrap();
assert_eq!(store.load_min_rollback_window().unwrap(), Some(256));
let interval = Duration::from_millis(750);
store.store_prune_interval(interval).unwrap();
assert_eq!(store.load_prune_interval().unwrap(), Some(interval));
assert!(store.load_bootstrap_block_profile().unwrap().is_none());
store.store_bootstrap_block_profile(42).unwrap();
assert_eq!(store.load_bootstrap_block_profile().unwrap(), Some(42));
}
#[test]
fn prune_journal_offsets_removes_range() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
for block in 0..5 {
let meta = sample_meta(block, block * 64);
store.put_journal_offset(block, &meta).unwrap();
}
let removed = store
.prune_journal_offsets_at_or_before(2)
.expect("prune succeeds");
assert_eq!(removed, 3);
let remaining = store.get_journal_offsets(0..=5).unwrap();
assert_eq!(remaining.len(), 2);
assert_eq!(remaining[0].block_height, 3);
assert_eq!(remaining[1].block_height, 4);
}
#[test]
fn gc_watermark_round_trip() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
assert!(store.load_gc_watermark().unwrap().is_none());
let plan = ChunkDeletionPlan {
chunk_ids: vec![1, 2, 3],
};
let staged_path = tmp.path().join("journal").join("journal.idx.staged");
let watermark = GcWatermark {
pruned_through: 42,
chunk_plan: plan.clone(),
staged_index_path: staged_path.clone(),
baseline_entry_count: 0,
report: JournalPruneReport {
pruned_through: 42,
chunks_removed: plan.chunk_ids.len(),
entries_removed: 0,
bytes_freed: 0,
},
};
store.store_gc_watermark(&watermark).unwrap();
let loaded = store.load_gc_watermark().unwrap().unwrap();
assert_eq!(loaded.pruned_through, 42);
assert_eq!(loaded.chunk_plan.chunk_ids, plan.chunk_ids);
assert_eq!(loaded.staged_index_path, staged_path);
store.clear_gc_watermark().unwrap();
assert!(store.load_gc_watermark().unwrap().is_none());
}
#[test]
fn snapshot_watermark_round_trip() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
assert!(store.load_snapshot_watermark().unwrap().is_none());
store.store_snapshot_watermark(77).unwrap();
assert_eq!(store.load_snapshot_watermark().unwrap(), Some(77));
}
#[test]
fn journal_offsets_invalid_range_errors() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
let err = store
.get_journal_offsets(std::ops::RangeInclusive::new(5, 3))
.unwrap_err();
match err {
StoreError::InvalidBlockRange { start, end } => {
assert_eq!(start, 5);
assert_eq!(end, 3);
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn clone_shares_environment_and_path() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
store.set_current_block(3).unwrap();
let meta = sample_meta(2, 256);
store.put_journal_offset(2, &meta).unwrap();
let cloned = store.clone();
assert!(Arc::ptr_eq(&store.env(), &cloned.env()));
assert_eq!(store.path(), cloned.path());
cloned.set_current_block(8).unwrap();
assert_eq!(store.current_block().unwrap(), 8);
let offsets = cloned.get_journal_offsets(1..=5).unwrap();
assert_eq!(offsets.len(), 1);
assert_eq!(offsets[0].block_height, 2);
}
#[test]
fn custom_map_size_creates_store() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let custom_size = 10 << 30; let store = LmdbMetadataStore::new_with_map_size(tmp.path(), custom_size).unwrap();
store.set_current_block(42).unwrap();
assert_eq!(store.current_block().unwrap(), 42);
let meta = sample_meta(1, 100);
store.put_journal_offset(1, &meta).unwrap();
let offsets = store.get_journal_offsets(1..=1).unwrap();
assert_eq!(offsets.len(), 1);
assert_eq!(offsets[0].block_height, 1);
}
#[test]
fn default_map_size_is_sufficient_for_bitcoin() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
let test_blocks = [1, 10_000, 100_000, 500_000, 1_000_000, 1_400_000];
for block in test_blocks.iter() {
let meta = sample_meta(*block, block * 100);
store.put_journal_offset(*block, &meta).unwrap();
}
store.set_current_block(1_400_000).unwrap();
assert_eq!(store.current_block().unwrap(), 1_400_000);
let offsets = store.get_journal_offsets(1..=1_400_000).unwrap();
assert_eq!(offsets.len(), test_blocks.len());
let offsets = store.get_journal_offsets(1_000_000..=1_000_000).unwrap();
assert_eq!(offsets.len(), 1);
assert_eq!(offsets[0].block_height, 1_000_000);
}
#[test]
fn journal_chunk_size_round_trip() {
let workspace_tmp = std::env::current_dir().unwrap().join("target/testdata");
fs::create_dir_all(&workspace_tmp).unwrap();
let tmp = tempdir_in(&workspace_tmp).unwrap();
let store = LmdbMetadataStore::new(tmp.path()).unwrap();
assert!(store.load_journal_chunk_size().unwrap().is_none());
let chunk_size = 8_u64 << 20;
store.store_journal_chunk_size(chunk_size).unwrap();
assert_eq!(store.load_journal_chunk_size().unwrap(), Some(chunk_size));
}
}