use essential_node_db::{self as db, with_tx, ConnectionPool};
use essential_node_types::{block_notify::BlockTx, Block};
use essential_types::{ContentAddress, Word};
use futures::stream::TryStreamExt;
use futures::Stream;
pub(crate) use streams::stream_blocks;
use crate::error::{CriticalError, InternalResult, RecoverableError};
use crate::DataSyncError;
mod streams;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BlockProgress {
pub last_block_number: Word,
pub last_block_address: ContentAddress,
}
pub async fn get_block_progress(
pool: &ConnectionPool,
) -> Result<Option<BlockProgress>, db::pool::AcquireThenRusqliteError> {
pool.acquire_then(|conn| {
with_tx(conn, |tx| {
let Some(block_address) = db::get_latest_finalized_block_address(tx)? else {
return Ok(None);
};
let Some(header) = db::get_block_header(tx, &block_address)? else {
return Ok(None);
};
Ok(Some(BlockProgress {
last_block_number: header.number,
last_block_address: block_address,
}))
})
})
.await
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
pub async fn sync_blocks<S>(
pool: ConnectionPool,
progress: &Option<BlockProgress>,
notify: BlockTx,
stream: S,
) -> InternalResult<()>
where
S: Stream<Item = InternalResult<Block>>,
{
tokio::pin!(stream);
if let Some(progress) = progress {
let last = stream.try_next().await?;
check_block_fork(&last, progress)?;
}
let mut block_number = match progress {
Some(BlockProgress {
last_block_number, ..
}) => last_block_number.saturating_add(1),
None => 0,
};
stream
.try_for_each(move |block| {
let sequential_block = block.header.number == block_number;
block_number = block.header.number.saturating_add(1);
let notify = notify.clone();
let pool = pool.clone();
async move {
if !sequential_block {
return Err(RecoverableError::NonSequentialBlock(
block_number,
block.header.number,
)
.into());
}
#[cfg(feature = "tracing")]
tracing::debug!("Writing block number {} to database", block.header.number);
write_block(&pool, block)
.await
.map_err(CriticalError::from)?;
notify.notify();
Ok(())
}
})
.await?;
Ok(())
}
async fn write_block(
pool: &ConnectionPool,
block: Block,
) -> Result<(), db::pool::AcquireThenRusqliteError> {
pool.acquire_then(move |conn| {
with_tx(conn, |tx| {
let block_address = essential_hash::content_addr(&block);
essential_node_db::insert_block(tx, &block)?;
essential_node_db::finalize_block(tx, &block_address)?;
Ok(())
})
})
.await
}
fn check_block_fork(block: &Option<Block>, progress: &BlockProgress) -> crate::Result<()> {
match block {
Some(block) => {
let block_address = essential_hash::content_addr(block);
if block_address != progress.last_block_address {
return Err(CriticalError::DataSyncFailed(DataSyncError::Fork(
progress.last_block_number,
progress.last_block_address.clone(),
Some(block_address),
)));
}
}
None => {
return Err(CriticalError::DataSyncFailed(DataSyncError::Fork(
progress.last_block_number,
progress.last_block_address.clone(),
None,
)));
}
}
Ok(())
}