#![warn(missing_docs)]
use error::CriticalError;
pub use error::DataSyncError;
pub use error::Error;
use error::InternalError;
use error::InternalResult;
pub use error::Result;
use essential_node_db::{self as db, ConnectionPool};
use essential_node_types::block_notify::BlockTx;
use futures::StreamExt;
pub use handle::Handle;
use reqwest::{ClientBuilder, Url};
use std::future::Future;
use sync::stream_blocks;
use sync::sync_blocks;
use tokio::sync::watch;
mod error;
mod handle;
mod sync;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone)]
pub struct Relayer {
endpoint: Url,
client: reqwest::Client,
}
impl Relayer {
pub fn new(endpoint: impl TryInto<Url>) -> Result<Self> {
let endpoint = endpoint.try_into().map_err(|_| CriticalError::UrlParse)?;
let client = ClientBuilder::new()
.http2_prior_knowledge()
.build()
.map_err(CriticalError::HttpClientBuild)?;
Ok(Self { endpoint, client })
}
pub fn run(self, pool: ConnectionPool, new_block: BlockTx) -> Result<Handle> {
let blocks = move |shutdown: watch::Receiver<()>| {
let pool = pool.clone();
let relayer = self.clone();
let notify = new_block.clone();
async move {
relayer.run_blocks(pool, shutdown, notify).await
}
};
run(blocks)
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
async fn run_blocks(
&self,
conn: ConnectionPool,
mut shutdown: watch::Receiver<()>,
notify: BlockTx,
) -> InternalResult<()> {
#[cfg(feature = "tracing")]
tracing::info!("Stream starting");
let progress = sync::get_block_progress(&conn)
.await
.map_err(CriticalError::from)?;
let stream = stream_blocks(&self.endpoint, &self.client, &progress).await?;
let close = async move {
let _ = shutdown.changed().await;
#[cfg(feature = "tracing")]
tracing::info!("Shutting down blocks stream");
};
sync_blocks(conn, &progress, notify, stream.take_until(close)).await
}
}
fn run<B, BFut>(mut blocks: B) -> Result<Handle>
where
B: FnMut(watch::Receiver<()>) -> BFut + Send + 'static,
BFut: Future<Output = InternalResult<()>> + Send,
{
let (close_blocks, blocks_shutdown) = watch::channel(());
let f = async move {
loop {
let r = blocks(blocks_shutdown.clone()).await;
match r {
Ok(()) => return Ok(()),
Err(e) => {
handle_error(e).await?;
}
}
}
};
#[cfg(feature = "tracing")]
use tracing::Instrument;
#[cfg(feature = "tracing")]
let f = f.instrument(tracing::info_span!("blocks_stream"));
let join_blocks = tokio::spawn(f);
Ok(Handle::new(join_blocks, close_blocks))
}
async fn handle_error(e: InternalError) -> Result<()> {
let e = map_recoverable_errors(e);
match e {
InternalError::Critical(e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"The relayer has encountered a critical error: {} and cannot recover.",
e
);
Err(e)
}
#[cfg(feature = "tracing")]
InternalError::Recoverable(e) => {
if matches!(e, error::RecoverableError::HttpClient(_)) {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
tracing::error!("The relayer has encountered a recoverable error: {} and will now restart the stream.", e);
Ok(())
}
#[cfg(not(feature = "tracing"))]
InternalError::Recoverable(_) => Ok(()),
}
}
fn map_recoverable_errors(e: InternalError) -> InternalError {
match e {
InternalError::Critical(CriticalError::DbPoolRusqlite(e)) => match e {
db::pool::AcquireThenError::Inner(e) => match e {
rus @ rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::DatabaseLocked,
..
},
_,
)
| rus @ rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error {
code: rusqlite::ffi::ErrorCode::DatabaseBusy,
..
},
_,
) => InternalError::Recoverable(error::RecoverableError::Rusqlite(rus)),
_ => InternalError::Critical(db::pool::AcquireThenError::Inner(e).into()),
},
_ => InternalError::Critical(e.into()),
},
_ => e,
}
}