use crate::{
chain_index::{
finalised_state::capability::{
CompactBlockExt, DbCore, DbMetadata, DbRead, DbVersion, DbWrite,
},
types::GENESIS_HEIGHT,
},
config::BlockCacheConfig,
error::FinalisedStateError,
status::{NamedAtomicStatus, StatusType},
CompactBlockStream, Height, IndexedBlock,
};
use zaino_proto::proto::{
compact_formats::CompactBlock,
service::PoolType,
utils::{compact_block_with_pool_types, PoolTypeFilter},
};
use zebra_chain::{
block::{Hash as ZebraHash, Height as ZebraHeight},
parameters::NetworkKind,
};
use super::LmdbLifecycle;
use async_trait::async_trait;
use lmdb::{Cursor, Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction};
use prost::Message;
use serde::{Deserialize, Serialize};
use std::{fs, sync::Arc, time::Duration};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::info;
#[async_trait]
impl DbRead for DbV0 {
async fn db_height(&self) -> Result<Option<crate::Height>, FinalisedStateError> {
self.tip_height().await
}
async fn get_block_height(
&self,
hash: crate::BlockHash,
) -> Result<Option<Height>, FinalisedStateError> {
match self.get_block_height_by_hash(hash).await {
Ok(height) => Ok(Some(height)),
Err(
FinalisedStateError::DataUnavailable(_)
| FinalisedStateError::FeatureUnavailable(_),
) => Ok(None),
Err(other) => Err(other),
}
}
async fn get_block_hash(
&self,
height: crate::Height,
) -> Result<Option<crate::BlockHash>, FinalisedStateError> {
match self.get_block_hash_by_height(height).await {
Ok(hash) => Ok(Some(hash)),
Err(
FinalisedStateError::DataUnavailable(_)
| FinalisedStateError::FeatureUnavailable(_),
) => Ok(None),
Err(other) => Err(other),
}
}
async fn get_metadata(&self) -> Result<DbMetadata, FinalisedStateError> {
self.get_metadata().await
}
}
#[async_trait]
impl DbWrite for DbV0 {
async fn write_block(&self, block: IndexedBlock) -> Result<(), FinalisedStateError> {
self.write_block(block).await
}
async fn delete_block_at_height(
&self,
height: crate::Height,
) -> Result<(), FinalisedStateError> {
self.delete_block_at_height(height).await
}
async fn delete_block(&self, block: &IndexedBlock) -> Result<(), FinalisedStateError> {
self.delete_block(block).await
}
async fn update_metadata(&self, _metadata: DbMetadata) -> Result<(), FinalisedStateError> {
Ok(())
}
}
#[async_trait]
impl DbCore for DbV0 {
fn status(&self) -> StatusType {
LmdbLifecycle::status(self)
}
async fn shutdown(&self) -> Result<(), FinalisedStateError> {
LmdbLifecycle::shutdown(self).await
}
}
impl LmdbLifecycle for DbV0 {
fn env(&self) -> &Arc<Environment> {
&self.env
}
fn db_handler_slot(&self) -> &std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> {
&self.db_handler
}
fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}
fn status_atomic(&self) -> &NamedAtomicStatus {
&self.status
}
}
#[async_trait]
impl CompactBlockExt for DbV0 {
async fn get_compact_block(
&self,
height: Height,
pool_types: PoolTypeFilter,
) -> Result<zaino_proto::proto::compact_formats::CompactBlock, FinalisedStateError> {
self.get_compact_block(height, pool_types).await
}
async fn get_compact_block_stream(
&self,
start_height: Height,
end_height: Height,
pool_types: PoolTypeFilter,
) -> Result<CompactBlockStream, FinalisedStateError> {
self.get_compact_block_stream(start_height, end_height, pool_types)
.await
}
}
#[derive(Debug)]
pub struct DbV0 {
env: Arc<Environment>,
heights_to_hashes: Database,
hashes_to_blocks: Database,
db_handler: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
cancel_token: CancellationToken,
status: NamedAtomicStatus,
config: BlockCacheConfig,
}
impl DbV0 {
pub(crate) async fn spawn(config: &BlockCacheConfig) -> Result<Self, FinalisedStateError> {
info!("Launching ZainoDB");
let db_size_bytes = config.storage.database.size.to_byte_count();
let db_path_dir = match config.network.to_zebra_network().kind() {
NetworkKind::Mainnet => "live",
NetworkKind::Testnet => "test",
NetworkKind::Regtest => "local",
};
let db_path = config.storage.database.path.join(db_path_dir);
if !db_path.exists() {
fs::create_dir_all(&db_path)?;
}
let cpu_cnt = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let max_readers = u32::try_from((cpu_cnt * 32).clamp(512, 4096))
.expect("max_readers was clamped to fit in u32");
let env = Environment::new()
.set_max_dbs(12)
.set_map_size(db_size_bytes)
.set_max_readers(max_readers)
.set_flags(EnvironmentFlags::NO_TLS | EnvironmentFlags::NO_READAHEAD)
.open(&db_path)?;
let heights_to_hashes =
super::open_or_create_db(&env, "heights_to_hashes", DatabaseFlags::empty()).await?;
let hashes_to_blocks =
super::open_or_create_db(&env, "hashes_to_blocks", DatabaseFlags::empty()).await?;
let mut zaino_db = Self {
env: Arc::new(env),
heights_to_hashes,
hashes_to_blocks,
db_handler: std::sync::Mutex::new(None),
cancel_token: CancellationToken::new(),
status: NamedAtomicStatus::new("ZainoDB", StatusType::Spawning),
config: config.clone(),
};
zaino_db.spawn_handler().await?;
Ok(zaino_db)
}
async fn spawn_handler(&mut self) -> Result<(), FinalisedStateError> {
let zaino_db = Self {
env: Arc::clone(&self.env),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
let handle = tokio::spawn({
let zaino_db = zaino_db;
async move {
zaino_db.status.store(StatusType::Ready);
let mut maintenance = interval(Duration::from_secs(60));
loop {
if zaino_db.status.load() == StatusType::Closing {
break;
}
zaino_db.zaino_db_handler_sleep(&mut maintenance).await;
}
}
});
*self.db_handler.lock().expect("db_handler mutex poisoned") = Some(handle);
Ok(())
}
pub(crate) async fn write_block(&self, block: IndexedBlock) -> Result<(), FinalisedStateError> {
self.status.store(StatusType::Syncing);
let compact_block: CompactBlock = block.to_compact_block();
let zebra_height: ZebraHeight = block.context.height().into();
let zebra_hash: ZebraHash = zebra_chain::block::Hash::from(*block.context.hash());
let height_key = DbHeight(zebra_height).to_be_bytes();
let hash_key = serde_json::to_vec(&DbHash(zebra_hash))?;
let block_value = serde_json::to_vec(&DbCompactBlock(compact_block))?;
let block_height = block.context.height().0;
tokio::task::block_in_place(|| {
let ro = self.env.begin_ro_txn()?;
let cur = ro.open_ro_cursor(self.heights_to_hashes)?;
match cur.get(None, None, lmdb_sys::MDB_LAST) {
Ok((last_height_bytes, _last_hash_bytes)) => {
let block_height = block.context.height().0;
let last_height = DbHeight::from_be_bytes(
last_height_bytes.expect("Height is always some in the finalised state"),
)?
.0
.0;
if block_height != last_height + 1 {
return Err(FinalisedStateError::Custom(format!(
"cannot write block at height {block_height:?}; \
current tip is {last_height:?}"
)));
}
}
Err(lmdb::Error::NotFound) => {
if block_height != GENESIS_HEIGHT.0 {
return Err(FinalisedStateError::Custom(format!(
"first block must be height 0, got {block_height:?}"
)));
}
}
Err(e) => return Err(FinalisedStateError::LmdbError(e)),
}
Ok::<_, FinalisedStateError>(())
})?;
let zaino_db = Self {
env: Arc::clone(&self.env),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
let post_result = tokio::task::spawn_blocking(move || {
let mut txn = zaino_db.env.begin_rw_txn()?;
txn.put(
zaino_db.heights_to_hashes,
&height_key,
&hash_key,
lmdb::WriteFlags::NO_OVERWRITE,
)?;
txn.put(
zaino_db.hashes_to_blocks,
&hash_key,
&block_value,
lmdb::WriteFlags::NO_OVERWRITE,
)?;
txn.commit()?;
Ok::<_, FinalisedStateError>(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("Tokio task error: {e}")))?;
match post_result {
Ok(_) => {
tokio::task::block_in_place(|| self.env.sync(true))
.map_err(|e| FinalisedStateError::Custom(format!("LMDB sync failed: {e}")))?;
self.status.store(StatusType::Ready);
Ok(())
}
Err(e) => {
let _ = self.delete_block(&block).await;
tokio::task::block_in_place(|| self.env.sync(true))
.map_err(|e| FinalisedStateError::Custom(format!("LMDB sync failed: {e}")))?;
self.status.store(StatusType::RecoverableError);
Err(FinalisedStateError::InvalidBlock {
height: block_height,
hash: *block.context.hash(),
reason: e.to_string(),
})
}
}
}
pub(crate) async fn delete_block_at_height(
&self,
height: crate::Height,
) -> Result<(), FinalisedStateError> {
let block_height = height.0;
let height_key = DbHeight(zebra_chain::block::Height(block_height)).to_be_bytes();
let zebra_block_hash: zebra_chain::block::Hash = tokio::task::block_in_place(|| {
let ro = self.env.begin_ro_txn()?;
let cur = ro.open_ro_cursor(self.heights_to_hashes)?;
match cur.get(None, None, lmdb_sys::MDB_LAST) {
Ok((last_height_bytes, last_hash_bytes)) => {
let last_height = DbHeight::from_be_bytes(
last_height_bytes.expect("Height is always some in the finalised state"),
)?
.0
.0;
if block_height != last_height {
return Err(FinalisedStateError::Custom(format!(
"cannot delete block at height {block_height:?}; \
current tip is {last_height:?}"
)));
}
let db_hash: DbHash = serde_json::from_slice(last_hash_bytes)?;
Ok(db_hash.0)
}
Err(lmdb::Error::NotFound) => Err(FinalisedStateError::Custom(format!(
"first block must be height 1, got {block_height:?}"
))),
Err(e) => Err(FinalisedStateError::LmdbError(e)),
}
})?;
let hash_key = serde_json::to_vec(&DbHash(zebra_block_hash))?;
let zaino_db = Self {
env: Arc::clone(&self.env),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
tokio::task::block_in_place(|| {
let mut txn = zaino_db.env.begin_rw_txn()?;
txn.del(zaino_db.heights_to_hashes, &height_key, None)?;
txn.del(zaino_db.hashes_to_blocks, &hash_key, None)?;
let _ = txn.commit();
self.env
.sync(true)
.map_err(|e| FinalisedStateError::Custom(format!("LMDB sync failed: {e}")))?;
Ok::<_, FinalisedStateError>(())
})?;
Ok(())
}
pub(crate) async fn delete_block(
&self,
block: &IndexedBlock,
) -> Result<(), FinalisedStateError> {
let zebra_height: ZebraHeight = block.context.height().into();
let zebra_hash: ZebraHash = zebra_chain::block::Hash::from(*block.context.hash());
let height_key = DbHeight(zebra_height).to_be_bytes();
let hash_key = serde_json::to_vec(&DbHash(zebra_hash))?;
let zaino_db = Self {
env: Arc::clone(&self.env),
heights_to_hashes: self.heights_to_hashes,
hashes_to_blocks: self.hashes_to_blocks,
db_handler: std::sync::Mutex::new(None),
cancel_token: self.cancel_token.clone(),
status: self.status.clone(),
config: self.config.clone(),
};
tokio::task::spawn_blocking(move || {
let mut txn = zaino_db.env.begin_rw_txn()?;
txn.del(zaino_db.heights_to_hashes, &height_key, None)?;
txn.del(zaino_db.hashes_to_blocks, &hash_key, None)?;
let _ = txn.commit();
zaino_db
.env
.sync(true)
.map_err(|e| FinalisedStateError::Custom(format!("LMDB sync failed: {e}")))?;
Ok::<_, FinalisedStateError>(())
})
.await
.map_err(|e| FinalisedStateError::Custom(format!("Tokio task error: {e}")))??;
Ok(())
}
pub(crate) async fn tip_height(&self) -> Result<Option<crate::Height>, FinalisedStateError> {
tokio::task::block_in_place(|| {
let ro = self.env.begin_ro_txn()?;
let cur = ro.open_ro_cursor(self.heights_to_hashes)?;
match cur.get(None, None, lmdb_sys::MDB_LAST) {
Ok((height_bytes, _hash_bytes)) => {
let tip_height = crate::Height(
DbHeight::from_be_bytes(
height_bytes.expect("Height is always some in the finalised state"),
)?
.0
.0,
);
Ok(Some(tip_height))
}
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(FinalisedStateError::LmdbError(e)),
}
})
}
async fn get_block_height_by_hash(
&self,
hash: crate::BlockHash,
) -> Result<crate::Height, FinalisedStateError> {
let zebra_hash: ZebraHash = zebra_chain::block::Hash::from(hash);
let hash_key = serde_json::to_vec(&DbHash(zebra_hash))?;
tokio::task::block_in_place(|| {
let txn = self.env.begin_ro_txn()?;
let block_bytes: &[u8] = txn.get(self.hashes_to_blocks, &hash_key)?;
let block: DbCompactBlock = serde_json::from_slice(block_bytes)?;
let block_height = block.0.height as u32;
Ok(crate::Height(block_height))
})
}
async fn get_block_hash_by_height(
&self,
height: crate::Height,
) -> Result<crate::BlockHash, FinalisedStateError> {
let zebra_height: ZebraHeight = height.into();
let height_key = DbHeight(zebra_height).to_be_bytes();
tokio::task::block_in_place(|| {
let txn = self.env.begin_ro_txn()?;
let hash_bytes: &[u8] = txn.get(self.heights_to_hashes, &height_key)?;
let db_hash: DbHash = serde_json::from_slice(hash_bytes)?;
Ok(crate::BlockHash::from(db_hash.0))
})
}
async fn get_metadata(&self) -> Result<DbMetadata, FinalisedStateError> {
Ok(DbMetadata {
version: DbVersion {
major: 0,
minor: 0,
patch: 0,
},
schema_hash: [0u8; 32],
migration_status:
crate::chain_index::finalised_state::capability::MigrationStatus::Complete,
})
}
async fn get_compact_block(
&self,
height: crate::Height,
pool_types: PoolTypeFilter,
) -> Result<zaino_proto::proto::compact_formats::CompactBlock, FinalisedStateError> {
let zebra_hash =
zebra_chain::block::Hash::from(self.get_block_hash_by_height(height).await?);
let hash_key = serde_json::to_vec(&DbHash(zebra_hash))?;
tokio::task::block_in_place(|| {
let txn = self.env.begin_ro_txn()?;
let block_bytes: &[u8] = txn.get(self.hashes_to_blocks, &hash_key)?;
let block: DbCompactBlock = serde_json::from_slice(block_bytes)?;
Ok(compact_block_with_pool_types(
block.0,
&pool_types.to_pool_types_vector(),
))
})
}
async fn get_compact_block_stream(
&self,
start_height: Height,
end_height: Height,
pool_types: PoolTypeFilter,
) -> Result<CompactBlockStream, FinalisedStateError> {
let is_ascending: bool = start_height <= end_height;
let (sender, receiver) =
tokio::sync::mpsc::channel::<Result<CompactBlock, tonic::Status>>(128);
let env = self.env.clone();
let heights_to_hashes_database: lmdb::Database = self.heights_to_hashes;
let hashes_to_blocks_database: lmdb::Database = self.hashes_to_blocks;
let pool_types_vector: Vec<PoolType> = pool_types.to_pool_types_vector();
tokio::task::spawn_blocking(move || {
fn lmdb_get_status(
database_name: &'static str,
height: Height,
error: lmdb::Error,
) -> tonic::Status {
match error {
lmdb::Error::NotFound => tonic::Status::not_found(format!(
"missing db entry in {database_name} at height {}",
height.0
)),
other_error => tonic::Status::internal(format!(
"lmdb get({database_name}) failed at height {}: {other_error}",
height.0
)),
}
}
let mut current_height: Height = start_height;
loop {
let result: Result<CompactBlock, tonic::Status> = (|| {
let txn = env.begin_ro_txn().map_err(|error| {
tonic::Status::internal(format!("lmdb begin_ro_txn failed: {error}"))
})?;
let zebra_height: ZebraHeight = current_height.into();
let height_key: [u8; 4] = DbHeight(zebra_height).to_be_bytes();
let hash_bytes: &[u8] = txn
.get(heights_to_hashes_database, &height_key)
.map_err(|error| {
lmdb_get_status("heights_to_hashes", current_height, error)
})?;
let db_hash: DbHash = serde_json::from_slice(hash_bytes).map_err(|error| {
tonic::Status::internal(format!(
"height->hash decode failed at height {}: {error}",
current_height.0
))
})?;
let hash_key: Vec<u8> =
serde_json::to_vec(&DbHash(db_hash.0)).map_err(|error| {
tonic::Status::internal(format!(
"hash key encode failed at height {}: {error}",
current_height.0
))
})?;
let block_bytes: &[u8] = txn
.get(hashes_to_blocks_database, &hash_key)
.map_err(|error| {
lmdb_get_status("hashes_to_blocks", current_height, error)
})?;
let db_compact_block: DbCompactBlock = serde_json::from_slice(block_bytes)
.map_err(|error| {
tonic::Status::internal(format!(
"block decode failed at height {}: {error}",
current_height.0
))
})?;
Ok(compact_block_with_pool_types(
db_compact_block.0,
&pool_types_vector,
))
})();
if sender.blocking_send(result).is_err() {
return;
}
if current_height == end_height {
return;
}
if is_ascending {
let next_value = match current_height.0.checked_add(1) {
Some(value) => value,
None => {
let _ = sender.blocking_send(Err(tonic::Status::internal(
"height overflow while iterating ascending".to_string(),
)));
return;
}
};
current_height = Height(next_value);
} else {
let next_value = match current_height.0.checked_sub(1) {
Some(value) => value,
None => {
let _ = sender.blocking_send(Err(tonic::Status::internal(
"height underflow while iterating descending".to_string(),
)));
return;
}
};
current_height = Height(next_value);
}
}
});
Ok(CompactBlockStream::new(receiver))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
struct DbHeight(pub ZebraHeight);
impl DbHeight {
fn to_be_bytes(self) -> [u8; 4] {
self.0 .0.to_be_bytes()
}
fn from_be_bytes(bytes: &[u8]) -> Result<Self, FinalisedStateError> {
let arr: [u8; 4] = bytes
.try_into()
.map_err(|_| FinalisedStateError::Custom("Invalid height key length".to_string()))?;
Ok(DbHeight(ZebraHeight(u32::from_be_bytes(arr))))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
struct DbHash(pub ZebraHash);
#[derive(Debug, Clone, PartialEq)]
struct DbCompactBlock(pub CompactBlock);
impl Serialize for DbCompactBlock {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let bytes = self.0.encode_to_vec();
serializer.serialize_bytes(&bytes)
}
}
impl<'de> Deserialize<'de> for DbCompactBlock {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let bytes: Vec<u8> = serde::de::Deserialize::deserialize(deserializer)?;
CompactBlock::decode(&*bytes)
.map(DbCompactBlock)
.map_err(serde::de::Error::custom)
}
}