1#![warn(missing_docs)]
2use error::CriticalError;
7pub use error::DataSyncError;
8pub use error::Error;
9use error::InternalError;
10use error::InternalResult;
11pub use error::Result;
12use essential_node_db::{self as db, ConnectionPool};
13use essential_node_types::block_notify::BlockTx;
14use futures::StreamExt;
15pub use handle::Handle;
16use reqwest::{ClientBuilder, Url};
17use std::future::Future;
18use sync::stream_blocks;
19use sync::sync_blocks;
20use tokio::sync::watch;
21
22mod error;
23mod handle;
24mod sync;
25#[cfg(test)]
26mod tests;
27
28#[derive(Debug, Clone)]
30pub struct Relayer {
31 endpoint: Url,
32 client: reqwest::Client,
33}
34
35impl Relayer {
36 pub fn new(endpoint: impl TryInto<Url>) -> Result<Self> {
38 let endpoint = endpoint.try_into().map_err(|_| CriticalError::UrlParse)?;
39 let client = ClientBuilder::new()
40 .http2_prior_knowledge()
41 .build()
42 .map_err(CriticalError::HttpClientBuild)?;
43 Ok(Self { endpoint, client })
44 }
45
46 pub fn run(self, pool: ConnectionPool, new_block: BlockTx) -> Result<Handle> {
54 let blocks = move |shutdown: watch::Receiver<()>| {
57 let pool = pool.clone();
58 let relayer = self.clone();
59 let notify = new_block.clone();
60 async move {
61 relayer.run_blocks(pool, shutdown, notify).await
63 }
64 };
65
66 run(blocks)
67 }
68
69 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
71 async fn run_blocks(
72 &self,
73 conn: ConnectionPool,
74 mut shutdown: watch::Receiver<()>,
75 notify: BlockTx,
76 ) -> InternalResult<()> {
77 #[cfg(feature = "tracing")]
78 tracing::info!("Stream starting");
79
80 let progress = sync::get_block_progress(&conn)
82 .await
83 .map_err(CriticalError::from)?;
84
85 let stream = stream_blocks(&self.endpoint, &self.client, &progress).await?;
87
88 let close = async move {
90 let _ = shutdown.changed().await;
91 #[cfg(feature = "tracing")]
92 tracing::info!("Shutting down blocks stream");
93 };
94
95 sync_blocks(conn, &progress, notify, stream.take_until(close)).await
97 }
98}
99
100fn run<B, BFut>(mut blocks: B) -> Result<Handle>
107where
108 B: FnMut(watch::Receiver<()>) -> BFut + Send + 'static,
109 BFut: Future<Output = InternalResult<()>> + Send,
110{
111 let (close_blocks, blocks_shutdown) = watch::channel(());
113
114 let f = async move {
115 loop {
116 let r = blocks(blocks_shutdown.clone()).await;
118 match r {
119 Ok(()) => return Ok(()),
121 Err(e) => {
122 handle_error(e).await?;
125 }
126 }
127 }
128 };
129
130 #[cfg(feature = "tracing")]
131 use tracing::Instrument;
132
133 #[cfg(feature = "tracing")]
134 let f = f.instrument(tracing::info_span!("blocks_stream"));
135
136 let join_blocks = tokio::spawn(f);
137
138 Ok(Handle::new(join_blocks, close_blocks))
139}
140
141async fn handle_error(e: InternalError) -> Result<()> {
143 let e = map_recoverable_errors(e);
144 match e {
145 InternalError::Critical(e) => {
146 #[cfg(feature = "tracing")]
147 tracing::error!(
148 "The relayer has encountered a critical error: {} and cannot recover.",
149 e
150 );
151 Err(e)
152 }
153 #[cfg(feature = "tracing")]
154 InternalError::Recoverable(e) => {
155 if matches!(e, error::RecoverableError::HttpClient(_)) {
157 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
159 }
160 tracing::error!("The relayer has encountered a recoverable error: {} and will now restart the stream.", e);
161
162 Ok(())
163 }
164 #[cfg(not(feature = "tracing"))]
165 InternalError::Recoverable(_) => Ok(()),
166 }
167}
168
169fn map_recoverable_errors(e: InternalError) -> InternalError {
172 match e {
174 InternalError::Critical(CriticalError::DbPoolRusqlite(e)) => match e {
175 db::pool::AcquireThenError::Inner(e) => match e {
176 rus @ rusqlite::Error::SqliteFailure(
177 rusqlite::ffi::Error {
178 code: rusqlite::ffi::ErrorCode::DatabaseLocked,
179 ..
180 },
181 _,
182 )
183 | rus @ rusqlite::Error::SqliteFailure(
184 rusqlite::ffi::Error {
185 code: rusqlite::ffi::ErrorCode::DatabaseBusy,
186 ..
187 },
188 _,
189 ) => InternalError::Recoverable(error::RecoverableError::Rusqlite(rus)),
190 _ => InternalError::Critical(db::pool::AcquireThenError::Inner(e).into()),
191 },
192 _ => InternalError::Critical(e.into()),
193 },
194 _ => e,
195 }
196}