use crate::{
db::{
self, get_block_header, get_latest_finalized_block_address, update_validation_progress,
ConnectionPool,
},
error::{CriticalError, InternalError, RecoverableError, ValidationError},
handles::validation::Handle,
validate::{self, InvalidOutcome, ValidOutcome, ValidateOutcome},
};
use essential_hash::content_addr;
use essential_node_db::QueryError;
use essential_node_types::{block_notify::BlockRx, Block};
use essential_types::ContentAddress;
use tokio::sync::watch;
#[cfg(test)]
mod tests;
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
pub fn validation_stream(
conn_pool: ConnectionPool,
contract_registry: ContentAddress,
program_registry: ContentAddress,
mut block_rx: BlockRx,
) -> Result<Handle<CriticalError>, CriticalError> {
let (shutdown, stream_close) = watch::channel(());
let jh = tokio::spawn(async move {
let mut stream_close = stream_close.clone();
loop {
let err = 'wait_next_block: loop {
loop {
match validate_next_block(
conn_pool.clone(),
&contract_registry,
&program_registry,
)
.await
{
Err(err) => break 'wait_next_block err,
Ok(more_blocks_left) => {
if more_blocks_left {
continue;
} else {
#[cfg(feature = "tracing")]
tracing::debug!(
"Reached head of chain: Awaiting new block notifications"
);
break;
}
}
}
}
tokio::select! {
_ = stream_close.changed() => return Ok(()),
res = block_rx.changed() => if let Err(_err) = res {
#[cfg(feature = "tracing")]
tracing::debug!("Block notification channel dropped: closing stream");
return Ok(());
},
}
};
handle_error(err)?;
}
});
Ok(Handle::new(jh, shutdown))
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
async fn validate_next_block(
conn_pool: ConnectionPool,
contract_registry: &ContentAddress,
program_registry: &ContentAddress,
) -> Result<bool, InternalError> {
let progress = get_last_progress(&conn_pool)
.await?
.ok_or(CriticalError::LastProgressNone)?;
let Some(block) = get_next_block(&conn_pool, progress).await? else {
return Ok(false);
};
let block_address = content_addr(&block);
#[cfg(feature = "tracing")]
tracing::debug!(
"Validating block {} with number {}",
block_address,
block.header.number
);
let res = validate::validate(&conn_pool, contract_registry, program_registry, &block).await?;
let more_blocks_available = match res {
ValidateOutcome::Valid(ValidOutcome {
total_gas: _total_gas,
}) => {
let block_address = block_address.clone();
let r: Result<bool, InternalError> = conn_pool
.acquire_then(move |conn| {
update_validation_progress(conn, &block_address)?;
let tx = conn.transaction()?;
let latest_finalized_block_number = {
let hash = get_latest_finalized_block_address(&tx)?;
if let Some(hash) = hash {
let header = get_block_header(&tx, &hash)?;
header.map(|header| header.number)
} else {
None
}
};
if let Some(latest_block_number) = latest_finalized_block_number {
if latest_block_number > block.header.number {
return Ok(true);
}
}
Ok(false)
})
.await
.map_err(InternalError::from);
r
}
ValidateOutcome::Invalid(InvalidOutcome {
failure: _failure,
solution_set_index,
}) => {
let failed_solution_set = content_addr(
block
.solution_sets
.get(solution_set_index)
.expect("Failed solution set must exist."),
);
let r: Result<bool, InternalError> = conn_pool
.acquire_then(move |conn| {
db::insert_failed_block(conn, &block_address, &failed_solution_set)
.map_err(ValidationError::from)
.map(|_| Ok(false))
})
.await
.map_err(InternalError::from)?;
r
}
}?;
Ok(more_blocks_available)
}
async fn get_last_progress(
conn: &ConnectionPool,
) -> Result<Option<ContentAddress>, RecoverableError> {
conn.get_validation_progress()
.await
.map_err(RecoverableError::from)
}
async fn get_next_block(
conn: &ConnectionPool,
progress: ContentAddress,
) -> Result<Option<Block>, InternalError> {
conn.acquire_then(move |conn| {
let blocks = essential_node_db::get_next_block_addresses(conn, &progress)?;
if blocks.is_empty() {
Ok::<_, QueryError>(Ok(None))
} else if blocks.len() > 1 {
Ok(Err(CriticalError::Fork.into()))
} else {
let block_address = blocks.into_iter().next().expect("blocks is not empty");
let tx = conn.transaction()?;
let block = essential_node_db::get_block(&tx, &block_address)?;
Ok(Ok(block))
}
})
.await
.map_err(CriticalError::GetNextBlock)?
}
fn handle_error(e: InternalError) -> Result<(), CriticalError> {
let e = map_recoverable_errors(e);
match e {
InternalError::Critical(e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"The validation stream has encountered a critical error: {} and cannot recover.",
e
);
Err(e)
}
#[cfg(feature = "tracing")]
InternalError::Recoverable(e) => {
tracing::error!("The validation stream has encountered a recoverable error: {} and will now restart the stream.", e);
Ok(())
}
#[cfg(not(feature = "tracing"))]
InternalError::Recoverable(_) => Ok(()),
}
}
fn map_recoverable_errors(e: InternalError) -> InternalError {
match e {
InternalError::Critical(CriticalError::DatabaseFailed(e)) => {
if is_recoverable_db_err(&e) {
RecoverableError::Rusqlite(e).into()
} else {
CriticalError::DatabaseFailed(e).into()
}
}
InternalError::Critical(CriticalError::ReadState(e)) => match e {
db::pool::AcquireThenError::Acquire(e) => CriticalError::DbPoolClosed(e).into(),
e @ db::pool::AcquireThenError::Join(_) => RecoverableError::ReadState(e).into(),
db::pool::AcquireThenError::Inner(db::QueryError::Rusqlite(rus)) => {
if is_recoverable_db_err(&rus) {
RecoverableError::Rusqlite(rus).into()
} else {
CriticalError::DatabaseFailed(rus).into()
}
}
db::pool::AcquireThenError::Inner(essential_node_db::QueryError::UnsupportedRange) => {
RecoverableError::Query(essential_node_db::QueryError::UnsupportedRange).into()
}
},
_ => e,
}
}
fn is_recoverable_db_err(err: &rusqlite::Error) -> bool {
matches!(
err,
rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::DatabaseLocked,
..
},
_
) | rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::DatabaseBusy,
..
},
_
)
)
}