pub(crate) mod v0;
pub(crate) mod v1;
use v0::DbV0;
use v1::DbV1;
use zaino_proto::proto::utils::PoolTypeFilter;
use crate::{
chain_index::{
finalised_state::capability::{
BlockCoreExt, BlockShieldedExt, BlockTransparentExt, CompactBlockExt, DbCore,
DbMetadata, DbRead, DbWrite, IndexedBlockExt, TransparentHistExt,
},
types::{db::metadata::FinalisedTxOutSetInfoAccumulator, TransactionHash},
},
config::BlockCacheConfig,
error::FinalisedStateError,
BlockHash, BlockHeaderData, CommitmentTreeData, CompactBlockStream, Height, IndexedBlock,
NamedAtomicStatus, OrchardCompactTx, OrchardTxList, Outpoint, SaplingCompactTx, SaplingTxList,
StatusType, TransparentCompactTx, TransparentTxList, TxLocation, TxOutCompact, TxidList,
};
#[cfg(feature = "transparent_address_history_experimental")]
use crate::AddrScript;
use async_trait::async_trait;
use lmdb::{Database, DatabaseFlags, Environment};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{
task::JoinHandle,
time::{interval, sleep, MissedTickBehavior},
};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use super::capability::Capability;
#[async_trait]
pub(super) trait LmdbLifecycle: Sync {
fn env(&self) -> &Arc<Environment>;
fn db_handler_slot(&self) -> &Mutex<Option<JoinHandle<()>>>;
fn cancel_token(&self) -> &CancellationToken;
fn status_atomic(&self) -> &NamedAtomicStatus;
fn status(&self) -> StatusType {
self.status_atomic().load()
}
async fn wait_until_ready(&self) {
let mut ticker = interval(Duration::from_millis(100));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if self.status_atomic().load() == StatusType::Ready {
break;
}
}
}
async fn clean_trailing(&self) -> Result<(), FinalisedStateError> {
let txn = self.env().begin_ro_txn()?;
drop(txn);
Ok(())
}
async fn zaino_db_handler_sleep(&self, maintenance: &mut tokio::time::Interval) {
tokio::select! {
_ = sleep(Duration::from_secs(5)) => {},
_ = maintenance.tick() => {
if let Err(e) = self.clean_trailing().await {
warn!("clean_trailing failed: {}", e);
}
}
_ = self.cancel_token().cancelled() => {},
}
}
async fn shutdown(&self) -> Result<(), FinalisedStateError> {
self.status_atomic().store(StatusType::Closing);
self.cancel_token().cancel();
let taken = self
.db_handler_slot()
.lock()
.expect("db_handler mutex poisoned")
.take();
if let Some(mut handle) = taken {
let timeout = sleep(Duration::from_secs(5));
tokio::pin!(timeout);
tokio::select! {
res = &mut handle => {
match res {
Ok(_) => {}
Err(e) if e.is_cancelled() => {}
Err(e) => warn!("background task ended with error: {e:?}"),
}
}
_ = &mut timeout => {
warn!("background task didn't exit in time – aborting");
handle.abort();
}
}
}
let _ = self.clean_trailing().await;
if let Err(e) = self.env().sync(true) {
warn!("LMDB fsync before close failed: {e}");
}
Ok(())
}
}
pub(super) async fn open_or_create_db(
env: &Environment,
name: &str,
flags: DatabaseFlags,
) -> Result<Database, FinalisedStateError> {
match env.open_db(Some(name)) {
Ok(db) => Ok(db),
Err(lmdb::Error::NotFound) => env
.create_db(Some(name), flags)
.map_err(FinalisedStateError::LmdbError),
Err(e) => Err(FinalisedStateError::LmdbError(e)),
}
}
pub(super) const VERSION_DIRS: [&str; 1] = ["v1"];
#[derive(Debug)]
pub(crate) enum DbBackend {
V0(DbV0),
V1(DbV1),
}
impl DbBackend {
pub(crate) async fn spawn_v0(cfg: &BlockCacheConfig) -> Result<Self, FinalisedStateError> {
Ok(Self::V0(DbV0::spawn(cfg).await?))
}
pub(crate) async fn spawn_v1(cfg: &BlockCacheConfig) -> Result<Self, FinalisedStateError> {
Ok(Self::V1(DbV1::spawn(cfg).await?))
}
pub(crate) async fn wait_until_ready(&self) {
let mut ticker = interval(Duration::from_millis(100));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if self.status() == StatusType::Ready {
break;
}
}
}
pub(crate) fn capability(&self) -> Capability {
match self {
Self::V0(_) => {
Capability::READ_CORE | Capability::WRITE_CORE | Capability::COMPACT_BLOCK_EXT
}
Self::V1(_) => Capability::LATEST,
}
}
pub(crate) fn env(&self) -> Arc<Environment> {
match self {
Self::V1(db) => Arc::clone(db.env()),
Self::V0(db) => Arc::clone(db.env()),
}
}
pub(crate) fn metadata_db(&self) -> Result<Database, FinalisedStateError> {
match self {
Self::V1(db) => Ok(db.metadata_db()),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable("v1 metadata db")),
}
}
pub(crate) fn spent_db(&self) -> Result<Database, FinalisedStateError> {
match self {
Self::V1(db) => Ok(db.spent_db()),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable("v1 spent db")),
}
}
pub(crate) fn txid_location_db(&self) -> Result<Database, FinalisedStateError> {
match self {
Self::V1(db) => Ok(db.txid_location_db()),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable(
"v1 txid_location db",
)),
}
}
pub(crate) fn txids_db(&self) -> Result<Database, FinalisedStateError> {
match self {
Self::V1(db) => Ok(db.txids_db()),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable("v1 txids db")),
}
}
pub(crate) fn transparent_db(&self) -> Result<Database, FinalisedStateError> {
match self {
Self::V1(db) => Ok(db.transparent_db()),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable("v1 transparent db")),
}
}
pub(crate) fn tx_out_set_info_accumulator_db(&self) -> Result<Database, FinalisedStateError> {
match self {
Self::V1(database) => Ok(database.tx_out_set_info_accumulator_db()),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable(
"v1 tx_out_set_info_accumulator db",
)),
}
}
pub(crate) async fn rebuild_tx_out_set_accumulator(&self) -> Result<(), FinalisedStateError> {
match self {
Self::V1(database) => database.rebuild_tx_out_set_accumulator().await,
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable(
"v1 txout-set accumulator builder",
)),
}
}
}
impl From<DbV0> for DbBackend {
fn from(value: DbV0) -> Self {
Self::V0(value)
}
}
impl From<DbV1> for DbBackend {
fn from(value: DbV1) -> Self {
Self::V1(value)
}
}
#[async_trait]
impl DbCore for DbBackend {
fn status(&self) -> StatusType {
match self {
Self::V0(db) => DbCore::status(db),
Self::V1(db) => DbCore::status(db),
}
}
async fn shutdown(&self) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => DbCore::shutdown(db).await,
Self::V1(db) => DbCore::shutdown(db).await,
}
}
}
#[async_trait]
impl DbRead for DbBackend {
async fn db_height(&self) -> Result<Option<Height>, FinalisedStateError> {
match self {
Self::V0(db) => db.db_height().await,
Self::V1(db) => db.db_height().await,
}
}
async fn get_block_height(
&self,
hash: BlockHash,
) -> Result<Option<Height>, FinalisedStateError> {
match self {
Self::V0(db) => db.get_block_height(hash).await,
Self::V1(db) => db.get_block_height(hash).await,
}
}
async fn get_block_hash(
&self,
height: Height,
) -> Result<Option<BlockHash>, FinalisedStateError> {
match self {
Self::V0(db) => db.get_block_hash(height).await,
Self::V1(db) => db.get_block_hash(height).await,
}
}
async fn get_metadata(&self) -> Result<DbMetadata, FinalisedStateError> {
match self {
Self::V0(db) => db.get_metadata().await,
Self::V1(db) => db.get_metadata().await,
}
}
}
#[async_trait]
impl DbWrite for DbBackend {
async fn write_block(&self, block: IndexedBlock) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => db.write_block(block).await,
Self::V1(db) => db.write_block(block).await,
}
}
async fn write_blocks_to_height<S: crate::chain_index::source::BlockchainSource>(
&self,
height: Height,
source: &S,
) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => db.write_blocks_to_height(height, source).await,
Self::V1(db) => db.write_blocks_to_height(height, source).await,
}
}
async fn delete_block_at_height(&self, height: Height) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => db.delete_block_at_height(height).await,
Self::V1(db) => db.delete_block_at_height(height).await,
}
}
async fn delete_block(&self, block: &IndexedBlock) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => db.delete_block(block).await,
Self::V1(db) => db.delete_block(block).await,
}
}
async fn update_metadata(&self, metadata: DbMetadata) -> Result<(), FinalisedStateError> {
match self {
Self::V0(db) => db.update_metadata(metadata).await,
Self::V1(db) => db.update_metadata(metadata).await,
}
}
}
#[async_trait]
impl BlockCoreExt for DbBackend {
async fn get_block_header(
&self,
height: Height,
) -> Result<BlockHeaderData, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_header(height).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_core")),
}
}
async fn get_block_range_headers(
&self,
start: Height,
end: Height,
) -> Result<Vec<BlockHeaderData>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_range_headers(start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_core")),
}
}
async fn get_block_txids(&self, height: Height) -> Result<TxidList, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_txids(height).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_core")),
}
}
async fn get_block_range_txids(
&self,
start: Height,
end: Height,
) -> Result<Vec<TxidList>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_range_txids(start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_core")),
}
}
async fn get_txid(
&self,
tx_location: TxLocation,
) -> Result<TransactionHash, FinalisedStateError> {
match self {
Self::V1(db) => db.get_txid(tx_location).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_core")),
}
}
async fn get_tx_location(
&self,
txid: &TransactionHash,
) -> Result<Option<TxLocation>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_tx_location(txid).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_core")),
}
}
}
#[async_trait]
impl BlockTransparentExt for DbBackend {
async fn get_transparent(
&self,
tx_location: TxLocation,
) -> Result<Option<TransparentCompactTx>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_transparent(tx_location).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_transparent")),
}
}
async fn get_block_transparent(
&self,
height: Height,
) -> Result<TransparentTxList, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_transparent(height).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_transparent")),
}
}
async fn get_block_range_transparent(
&self,
start: Height,
end: Height,
) -> Result<Vec<TransparentTxList>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_range_transparent(start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_transparent")),
}
}
async fn get_previous_output(
&self,
outpoint: Outpoint,
) -> Result<TxOutCompact, FinalisedStateError> {
match self {
Self::V1(db) => <DbV1 as BlockTransparentExt>::get_previous_output(db, outpoint).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_transparent")),
}
}
}
#[async_trait]
impl BlockShieldedExt for DbBackend {
async fn get_sapling(
&self,
tx_location: TxLocation,
) -> Result<Option<SaplingCompactTx>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_sapling(tx_location).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_block_sapling(&self, h: Height) -> Result<SaplingTxList, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_sapling(h).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_block_range_sapling(
&self,
start: Height,
end: Height,
) -> Result<Vec<SaplingTxList>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_range_sapling(start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_orchard(
&self,
tx_location: TxLocation,
) -> Result<Option<OrchardCompactTx>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_orchard(tx_location).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_block_orchard(&self, h: Height) -> Result<OrchardTxList, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_orchard(h).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_block_range_orchard(
&self,
start: Height,
end: Height,
) -> Result<Vec<OrchardTxList>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_range_orchard(start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_block_commitment_tree_data(
&self,
height: Height,
) -> Result<CommitmentTreeData, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_commitment_tree_data(height).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
async fn get_block_range_commitment_tree_data(
&self,
start: Height,
end: Height,
) -> Result<Vec<CommitmentTreeData>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_block_range_commitment_tree_data(start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable("block_shielded")),
}
}
}
#[async_trait]
impl CompactBlockExt for DbBackend {
async fn get_compact_block(
&self,
height: Height,
pool_types: PoolTypeFilter,
) -> Result<zaino_proto::proto::compact_formats::CompactBlock, FinalisedStateError> {
#[allow(unreachable_patterns)]
match self {
Self::V0(db) => db.get_compact_block(height, pool_types).await,
Self::V1(db) => db.get_compact_block(height, pool_types).await,
_ => Err(FinalisedStateError::FeatureUnavailable("compact_block")),
}
}
async fn get_compact_block_stream(
&self,
start_height: Height,
end_height: Height,
pool_types: PoolTypeFilter,
) -> Result<CompactBlockStream, FinalisedStateError> {
#[allow(unreachable_patterns)]
match self {
Self::V0(db) => {
db.get_compact_block_stream(start_height, end_height, pool_types)
.await
}
Self::V1(db) => {
db.get_compact_block_stream(start_height, end_height, pool_types)
.await
}
_ => Err(FinalisedStateError::FeatureUnavailable("compact_block")),
}
}
}
#[async_trait]
impl IndexedBlockExt for DbBackend {
async fn get_chain_block(
&self,
height: Height,
) -> Result<Option<IndexedBlock>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_chain_block(height).await,
_ => Err(FinalisedStateError::FeatureUnavailable("chain_block")),
}
}
}
#[async_trait]
impl TransparentHistExt for DbBackend {
#[cfg(feature = "transparent_address_history_experimental")]
async fn addr_records(
&self,
script: AddrScript,
) -> Result<Option<Vec<crate::chain_index::types::AddrEventBytes>>, FinalisedStateError> {
match self {
Self::V1(db) => db.addr_records(script).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn addr_and_index_records(
&self,
script: AddrScript,
tx_location: TxLocation,
) -> Result<Option<Vec<crate::chain_index::types::AddrEventBytes>>, FinalisedStateError> {
match self {
Self::V1(db) => db.addr_and_index_records(script, tx_location).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn addr_tx_locations_by_range(
&self,
script: AddrScript,
start: Height,
end: Height,
) -> Result<Option<Vec<TxLocation>>, FinalisedStateError> {
match self {
Self::V1(db) => db.addr_tx_locations_by_range(script, start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn addr_utxos_by_range(
&self,
script: AddrScript,
start: Height,
end: Height,
) -> Result<Option<Vec<(TxLocation, u16, u64)>>, FinalisedStateError> {
match self {
Self::V1(db) => db.addr_utxos_by_range(script, start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
#[cfg(feature = "transparent_address_history_experimental")]
async fn addr_balance_by_range(
&self,
script: AddrScript,
start: Height,
end: Height,
) -> Result<i64, FinalisedStateError> {
match self {
Self::V1(db) => db.addr_balance_by_range(script, start, end).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
async fn get_outpoint_spender(
&self,
outpoint: Outpoint,
) -> Result<Option<TxLocation>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_outpoint_spender(outpoint).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
async fn get_outpoint_spenders(
&self,
outpoints: Vec<Outpoint>,
) -> Result<Vec<Option<TxLocation>>, FinalisedStateError> {
match self {
Self::V1(db) => db.get_outpoint_spenders(outpoints).await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
async fn get_tx_out_set_info_accumulator(
&self,
) -> Result<FinalisedTxOutSetInfoAccumulator, FinalisedStateError> {
match self {
Self::V1(database) => database.get_tx_out_set_info_accumulator().await,
_ => Err(FinalisedStateError::FeatureUnavailable(
"transparent_history",
)),
}
}
}
#[cfg(test)]
impl DbBackend {
pub(crate) async fn spawn_v1_0_0(cfg: &BlockCacheConfig) -> Result<Self, FinalisedStateError> {
Ok(Self::V1(DbV1::spawn_v1_0_0(cfg).await?))
}
pub(crate) fn validated_tip_height(&self) -> u32 {
match self {
Self::V1(db) => db.validated_tip_height(),
Self::V0(_) => 0,
}
}
pub(crate) async fn read_tx_out_set_accumulator_built_height(
&self,
) -> Result<Option<Height>, FinalisedStateError> {
match self {
Self::V1(database) => database.read_tx_out_set_accumulator_built_height().await,
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable(
"v1 txout-set accumulator builder",
)),
}
}
pub(crate) fn build_tx_out_set_accumulator_blocking(
&self,
db_tip: Height,
shards: u16,
) -> Result<FinalisedTxOutSetInfoAccumulator, FinalisedStateError> {
match self {
Self::V1(database) => database.build_tx_out_set_accumulator_blocking(db_tip, shards),
Self::V0(_) => Err(FinalisedStateError::FeatureUnavailable(
"v1 txout-set accumulator builder",
)),
}
}
pub(crate) async fn write_block_v1_0_0(
&self,
block: IndexedBlock,
) -> Result<(), FinalisedStateError> {
match self {
Self::V1(db) => db.write_block_v1_0_0(block).await,
Self::V0(_) => Err(FinalisedStateError::Custom(
"v1.0.0 test fixture writer requires a v1 backend".to_string(),
)),
}
}
}
#[cfg(test)]
mod shutdown {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::{sync::Barrier, time::timeout};
struct FakeDb {
env: Arc<Environment>,
db_handler: Mutex<Option<JoinHandle<()>>>,
cancel_token: CancellationToken,
status: NamedAtomicStatus,
}
impl LmdbLifecycle for FakeDb {
fn env(&self) -> &Arc<Environment> {
&self.env
}
fn db_handler_slot(&self) -> &Mutex<Option<JoinHandle<()>>> {
&self.db_handler
}
fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}
fn status_atomic(&self) -> &NamedAtomicStatus {
&self.status
}
}
#[tokio::test]
async fn wakes_every_shutdown_waiter() {
let tmp = tempfile::tempdir().unwrap();
let env = Arc::new(
lmdb::Environment::new()
.set_map_size(1 << 20)
.open(tmp.path())
.unwrap(),
);
let db = Arc::new(FakeDb {
env,
db_handler: Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("test", StatusType::Ready),
});
const N: usize = 3;
let woke = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(N + 1));
let mut waiters = Vec::with_capacity(N);
for _ in 0..N {
let token = db.cancel_token.clone();
let woke = Arc::clone(&woke);
let barrier = Arc::clone(&barrier);
waiters.push(tokio::spawn(async move {
barrier.wait().await;
token.cancelled().await;
woke.fetch_add(1, Ordering::Relaxed);
}));
}
barrier.wait().await;
LmdbLifecycle::shutdown(db.as_ref()).await.unwrap();
for (i, w) in waiters.into_iter().enumerate() {
timeout(Duration::from_millis(200), w)
.await
.unwrap_or_else(|_| panic!("waiter {i} stranded: cancel_token woke only a subset"))
.unwrap();
}
assert_eq!(woke.load(Ordering::Relaxed), N);
}
}