use std::borrow::Cow;
use std::fs;
use heed::types::Bytes;
use heed::{Database, Env, EnvFlags, EnvOpenOptions, Error as HeedError, MdbError, RoTxn};
use crate::config::CoinStoreConfig;
use super::schema::{
CF_ARCHIVE_COIN_RECORDS, CF_COIN_BY_CONFIRMED_HEIGHT, CF_COIN_BY_PARENT,
CF_COIN_BY_PUZZLE_HASH, CF_COIN_BY_SPENT_HEIGHT, CF_COIN_RECORDS, CF_HINTS, CF_HINTS_BY_VALUE,
CF_MERKLE_NODES, CF_METADATA, CF_STATE_SNAPSHOTS, CF_UNSPENT_BY_PUZZLE_HASH,
};
use super::{StorageBackend, StorageError, WriteBatch, WriteOp};
pub const LMDB_DB_COINS: &str = "coins";
pub const LMDB_DB_COINS_BY_PH: &str = "coins_by_ph";
pub const LMDB_DB_HINTS: &str = "hints";
pub const LMDB_DB_HINTS_BY_VALUE: &str = "hints_by_value";
pub const LMDB_DB_SNAPSHOTS: &str = "snapshots";
pub const LMDB_DB_METADATA: &str = "metadata";
pub const LMDB_NAMED_DATABASES: &[&str] = &[
LMDB_DB_COINS,
LMDB_DB_COINS_BY_PH,
LMDB_DB_HINTS,
LMDB_DB_HINTS_BY_VALUE,
LMDB_DB_SNAPSHOTS,
LMDB_DB_METADATA,
];
const LMDB_MAX_DBS: u32 = 8;
const TAG_COIN_RECORDS: u8 = 0x01;
const TAG_COIN_BY_PARENT: u8 = 0x02;
const TAG_COIN_BY_CONFIRMED_HEIGHT: u8 = 0x03;
const TAG_COIN_BY_SPENT_HEIGHT: u8 = 0x04;
const TAG_ARCHIVE_COIN_RECORDS: u8 = 0x05;
const TAG_MERKLE_NODES: u8 = 0x06;
const TAG_COIN_BY_PUZZLE_HASH: u8 = 0x01;
const TAG_UNSPENT_BY_PUZZLE_HASH: u8 = 0x02;
pub struct LmdbBackend {
env: Env,
dbs: Vec<Database<Bytes, Bytes>>,
}
fn map_heed(ctx: impl Into<String>, err: HeedError) -> StorageError {
match err {
HeedError::Mdb(MdbError::MapFull) => StorageError::MapFull,
other => StorageError::BackendError(format!("{}: {}", ctx.into(), other)),
}
}
fn logical_cf_route(cf: &str) -> Result<(usize, Option<u8>), StorageError> {
Ok(match cf {
CF_COIN_RECORDS => (0, Some(TAG_COIN_RECORDS)),
CF_COIN_BY_PARENT => (0, Some(TAG_COIN_BY_PARENT)),
CF_COIN_BY_CONFIRMED_HEIGHT => (0, Some(TAG_COIN_BY_CONFIRMED_HEIGHT)),
CF_COIN_BY_SPENT_HEIGHT => (0, Some(TAG_COIN_BY_SPENT_HEIGHT)),
CF_ARCHIVE_COIN_RECORDS => (0, Some(TAG_ARCHIVE_COIN_RECORDS)),
CF_MERKLE_NODES => (0, Some(TAG_MERKLE_NODES)),
CF_COIN_BY_PUZZLE_HASH => (1, Some(TAG_COIN_BY_PUZZLE_HASH)),
CF_UNSPENT_BY_PUZZLE_HASH => (1, Some(TAG_UNSPENT_BY_PUZZLE_HASH)),
CF_HINTS => (2, None),
CF_HINTS_BY_VALUE => (3, None),
CF_STATE_SNAPSHOTS => (4, None),
CF_METADATA => (5, None),
_ => return Err(StorageError::UnknownColumnFamily(cf.to_string())),
})
}
fn encode_storage_key(tag: Option<u8>, user_key: &[u8]) -> Cow<'_, [u8]> {
match tag {
Some(t) => {
let mut v = Vec::with_capacity(1 + user_key.len());
v.push(t);
v.extend_from_slice(user_key);
Cow::Owned(v)
}
None => Cow::Borrowed(user_key),
}
}
fn decode_user_key(tag: Option<u8>, stored: &[u8]) -> Vec<u8> {
match tag {
Some(_) => stored.get(1..).unwrap_or_default().to_vec(),
None => stored.to_vec(),
}
}
impl LmdbBackend {
pub fn open(config: &CoinStoreConfig) -> Result<Self, StorageError> {
let path = &config.storage_path;
fs::create_dir_all(path).map_err(|e| {
StorageError::BackendError(format!("Failed to create LMDB directory: {}", e))
})?;
let mut opts = EnvOpenOptions::new();
opts.map_size(config.lmdb_map_size);
opts.max_dbs(LMDB_MAX_DBS);
unsafe {
opts.flags(EnvFlags::NO_TLS | EnvFlags::NO_READ_AHEAD);
}
let env = unsafe { opts.open(path) }.map_err(|e| map_heed("Failed to open LMDB env", e))?;
let mut wtxn = env
.write_txn()
.map_err(|e| map_heed("LMDB write_txn (open)", e))?;
let mut dbs = Vec::with_capacity(LMDB_NAMED_DATABASES.len());
for name in LMDB_NAMED_DATABASES {
let db: Database<Bytes, Bytes> = env
.create_database(&mut wtxn, Some(*name))
.map_err(|e| map_heed(format!("LMDB create db {name}"), e))?;
dbs.push(db);
}
wtxn.commit()
.map_err(|e| map_heed("LMDB commit (open)", e))?;
debug_assert_eq!(dbs.len(), LMDB_NAMED_DATABASES.len());
Ok(Self { env, dbs })
}
pub fn environment(&self) -> &Env {
&self.env
}
fn db(&self, index: usize) -> Result<Database<Bytes, Bytes>, StorageError> {
self.dbs
.get(index)
.copied()
.ok_or_else(|| StorageError::BackendError("LMDB internal db index".into()))
}
fn with_rotxn<R>(
&self,
f: impl for<'a> FnOnce(&'a RoTxn) -> Result<R, StorageError>,
) -> Result<R, StorageError> {
let rtxn = self
.env
.read_txn()
.map_err(|e| map_heed("LMDB read_txn", e))?;
f(&rtxn)
}
}
impl StorageBackend for LmdbBackend {
fn get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
let (dbi, tag) = logical_cf_route(cf)?;
let db = self.db(dbi)?;
let enc = encode_storage_key(tag, key);
self.with_rotxn(|rtxn| {
let v = db
.get(rtxn, enc.as_ref())
.map_err(|e| map_heed("LMDB get", e))?;
Ok(v.map(|b| b.to_vec()))
})
}
fn put(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), StorageError> {
let (dbi, tag) = logical_cf_route(cf)?;
let db = self.db(dbi)?;
let enc = encode_storage_key(tag, key);
let mut wtxn = self
.env
.write_txn()
.map_err(|e| map_heed("LMDB write_txn", e))?;
db.put(&mut wtxn, enc.as_ref(), value)
.map_err(|e| map_heed("LMDB put", e))?;
wtxn.commit().map_err(|e| map_heed("LMDB commit", e))
}
fn delete(&self, cf: &str, key: &[u8]) -> Result<(), StorageError> {
let (dbi, tag) = logical_cf_route(cf)?;
let db = self.db(dbi)?;
let enc = encode_storage_key(tag, key);
let mut wtxn = self
.env
.write_txn()
.map_err(|e| map_heed("LMDB write_txn", e))?;
let _existed: bool = db
.delete(&mut wtxn, enc.as_ref())
.map_err(|e| map_heed("LMDB delete", e))?;
wtxn.commit().map_err(|e| map_heed("LMDB commit", e))
}
fn batch_write(&self, batch: WriteBatch) -> Result<(), StorageError> {
if batch.is_empty() {
return Ok(());
}
let mut wtxn = self
.env
.write_txn()
.map_err(|e| map_heed("LMDB write_txn", e))?;
for op in &batch.ops {
match op {
WriteOp::Put { cf, key, value } => {
let (dbi, tag) = logical_cf_route(cf)?;
let db = self.db(dbi)?;
let enc = encode_storage_key(tag, key.as_slice());
db.put(&mut wtxn, enc.as_ref(), value.as_slice())
.map_err(|e| map_heed("LMDB batch put", e))?;
}
WriteOp::Delete { cf, key } => {
let (dbi, tag) = logical_cf_route(cf)?;
let db = self.db(dbi)?;
let enc = encode_storage_key(tag, key.as_slice());
let _ = db
.delete(&mut wtxn, enc.as_ref())
.map_err(|e| map_heed("LMDB batch del", e))?;
}
}
}
wtxn.commit().map_err(|e| map_heed("LMDB commit", e))
}
fn prefix_scan(&self, cf: &str, prefix: &[u8]) -> Result<Vec<super::KvPair>, StorageError> {
let (dbi, tag) = logical_cf_route(cf)?;
let db = self.db(dbi)?;
let rtxn = self
.env
.read_txn()
.map_err(|e| map_heed("LMDB read_txn", e))?;
let mut out = Vec::new();
match tag {
Some(t) => {
let seek_prefix: Vec<u8> = {
let mut p = Vec::with_capacity(1 + prefix.len());
p.push(t);
p.extend_from_slice(prefix);
p
};
let mut iter = db
.prefix_iter(&rtxn, seek_prefix.as_slice())
.map_err(|e| map_heed("LMDB prefix_iter", e))?;
loop {
match iter
.next()
.transpose()
.map_err(|e| map_heed("LMDB prefix next", e))?
{
None => break,
Some((k, v)) => {
out.push((decode_user_key(Some(t), k), v.to_vec()));
}
}
}
}
None => {
if prefix.is_empty() {
let mut iter = db.iter(&rtxn).map_err(|e| map_heed("LMDB iter", e))?;
loop {
match iter
.next()
.transpose()
.map_err(|e| map_heed("LMDB iter next", e))?
{
None => break,
Some((k, v)) => out.push((k.to_vec(), v.to_vec())),
}
}
} else {
let mut iter = db
.prefix_iter(&rtxn, prefix)
.map_err(|e| map_heed("LMDB prefix_iter", e))?;
loop {
match iter
.next()
.transpose()
.map_err(|e| map_heed("LMDB prefix next", e))?
{
None => break,
Some((k, v)) => out.push((k.to_vec(), v.to_vec())),
}
}
}
}
}
Ok(out)
}
fn flush(&self) -> Result<(), StorageError> {
self.env
.force_sync()
.map_err(|e| map_heed("LMDB force_sync", e))
}
fn compact(&self, _cf: &str) -> Result<(), StorageError> {
Ok(())
}
}