essential_relayer/
lib.rs

1#![warn(missing_docs)]
2//! Relayer is a library that syncs data from a remote source into a local database.
3//! The relayer syncs blocks.
4//! There are notify channels to signal when new data has been synced.
5
6use error::CriticalError;
7pub use error::DataSyncError;
8pub use error::Error;
9use error::InternalError;
10use error::InternalResult;
11pub use error::Result;
12use essential_node_db::{self as db, ConnectionPool};
13use essential_node_types::block_notify::BlockTx;
14use futures::StreamExt;
15pub use handle::Handle;
16use reqwest::{ClientBuilder, Url};
17use std::future::Future;
18use sync::stream_blocks;
19use sync::sync_blocks;
20use tokio::sync::watch;
21
22mod error;
23mod handle;
24mod sync;
25#[cfg(test)]
26mod tests;
27
28/// Relayer client that syncs data from a remote source into a local database.
29#[derive(Debug, Clone)]
30pub struct Relayer {
31    endpoint: Url,
32    client: reqwest::Client,
33}
34
35impl Relayer {
36    /// Create a new relayer client from a node endpoint.
37    pub fn new(endpoint: impl TryInto<Url>) -> Result<Self> {
38        let endpoint = endpoint.try_into().map_err(|_| CriticalError::UrlParse)?;
39        let client = ClientBuilder::new()
40            .http2_prior_knowledge()
41            .build()
42            .map_err(CriticalError::HttpClientBuild)?;
43        Ok(Self { endpoint, client })
44    }
45
46    /// Run the relayer client.
47    /// This will sync blocks from the remote source into the local database.
48    ///
49    /// Streams are spawned and run in the background.
50    /// A handle is returned that can be used to close or join the streams.
51    ///
52    /// The two watch channels are used to notify the caller when new data has been synced.
53    pub fn run(self, pool: ConnectionPool, new_block: BlockTx) -> Result<Handle> {
54        // The blocks callback. This is a closure that will be called
55        // every time the blocks stream is restarted.
56        let blocks = move |shutdown: watch::Receiver<()>| {
57            let pool = pool.clone();
58            let relayer = self.clone();
59            let notify = new_block.clone();
60            async move {
61                // Run the blocks stream
62                relayer.run_blocks(pool, shutdown, notify).await
63            }
64        };
65
66        run(blocks)
67    }
68
69    /// Run the blocks stream.
70    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
71    async fn run_blocks(
72        &self,
73        conn: ConnectionPool,
74        mut shutdown: watch::Receiver<()>,
75        notify: BlockTx,
76    ) -> InternalResult<()> {
77        #[cfg(feature = "tracing")]
78        tracing::info!("Stream starting");
79
80        // Get the last progress that was made from the database.
81        let progress = sync::get_block_progress(&conn)
82            .await
83            .map_err(CriticalError::from)?;
84
85        // Create the stream of blocks.
86        let stream = stream_blocks(&self.endpoint, &self.client, &progress).await?;
87
88        // Setup a future that will close the stream when the shutdown signal is received.
89        let close = async move {
90            let _ = shutdown.changed().await;
91            #[cfg(feature = "tracing")]
92            tracing::info!("Shutting down blocks stream");
93        };
94
95        // Run the stream of blocks.
96        sync_blocks(conn, &progress, notify, stream.take_until(close)).await
97    }
98}
99
100/// Run the two streams spawned in the background.
101///
102/// Handles errors and returns a handle that can be used to close or join the streams.
103///
104/// Recoverable errors will be logged and the stream will be restarted.
105/// Critical errors will cause the stream to end.
106fn run<B, BFut>(mut blocks: B) -> Result<Handle>
107where
108    B: FnMut(watch::Receiver<()>) -> BFut + Send + 'static,
109    BFut: Future<Output = InternalResult<()>> + Send,
110{
111    // Create a channels to signal the streams to shutdown.
112    let (close_blocks, blocks_shutdown) = watch::channel(());
113
114    let f = async move {
115        loop {
116            // Run the blocks stream callback
117            let r = blocks(blocks_shutdown.clone()).await;
118            match r {
119                // Stream has ended, return from the task
120                Ok(()) => return Ok(()),
121                Err(e) => {
122                    // Return error if it's critical or
123                    // continue if it's recoverable
124                    handle_error(e).await?;
125                }
126            }
127        }
128    };
129
130    #[cfg(feature = "tracing")]
131    use tracing::Instrument;
132
133    #[cfg(feature = "tracing")]
134    let f = f.instrument(tracing::info_span!("blocks_stream"));
135
136    let join_blocks = tokio::spawn(f);
137
138    Ok(Handle::new(join_blocks, close_blocks))
139}
140
141/// Exit on critical errors, log recoverable errors
142async fn handle_error(e: InternalError) -> Result<()> {
143    let e = map_recoverable_errors(e);
144    match e {
145        InternalError::Critical(e) => {
146            #[cfg(feature = "tracing")]
147            tracing::error!(
148                "The relayer has encountered a critical error: {} and cannot recover.",
149                e
150            );
151            Err(e)
152        }
153        #[cfg(feature = "tracing")]
154        InternalError::Recoverable(e) => {
155            // Slow down loop if source is unreachable
156            if matches!(e, error::RecoverableError::HttpClient(_)) {
157                // TODO: Make exponential backoff.
158                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
159            }
160            tracing::error!("The relayer has encountered a recoverable error: {} and will now restart the stream.", e);
161
162            Ok(())
163        }
164        #[cfg(not(feature = "tracing"))]
165        InternalError::Recoverable(_) => Ok(()),
166    }
167}
168
169/// Some critical error types contain variants that we should handle as recoverable errors.
170/// This function maps those errors to recoverable errors.
171fn map_recoverable_errors(e: InternalError) -> InternalError {
172    // Map recoverable rusqlite errors to recoverable errors
173    match e {
174        InternalError::Critical(CriticalError::DbPoolRusqlite(e)) => match e {
175            db::pool::AcquireThenError::Inner(e) => match e {
176                rus @ rusqlite::Error::SqliteFailure(
177                    rusqlite::ffi::Error {
178                        code: rusqlite::ffi::ErrorCode::DatabaseLocked,
179                        ..
180                    },
181                    _,
182                )
183                | rus @ rusqlite::Error::SqliteFailure(
184                    rusqlite::ffi::Error {
185                        code: rusqlite::ffi::ErrorCode::DatabaseBusy,
186                        ..
187                    },
188                    _,
189                ) => InternalError::Recoverable(error::RecoverableError::Rusqlite(rus)),
190                _ => InternalError::Critical(db::pool::AcquireThenError::Inner(e).into()),
191            },
192            _ => InternalError::Critical(e.into()),
193        },
194        _ => e,
195    }
196}