fuel_relayer/
relayer.rs

1//! # Relayer
2//! This module handles bridge communications between
3//! the fuel node and the data availability layer.
4
5use crate::{
6    log::EthEventLog,
7    Config,
8};
9use anyhow::Result;
10use async_trait::async_trait;
11use core::time::Duration;
12use ethers_core::types::{
13    Filter,
14    Log,
15    SyncingStatus,
16    ValueOrArray,
17    H160,
18};
19use ethers_providers::{
20    Http,
21    Middleware,
22    Provider,
23    ProviderError,
24};
25use fuel_core_interfaces::{
26    db::Messages,
27    model::Message,
28    relayer::RelayerDb,
29};
30use std::{
31    convert::TryInto,
32    ops::Deref,
33    sync::{
34        atomic::AtomicBool,
35        Arc,
36    },
37};
38use synced::update_synced;
39use tokio::sync::watch;
40
41use self::{
42    get_logs::*,
43    run::RelayerData,
44    state::EthLocal,
45};
46
47mod get_logs;
48mod run;
49mod state;
50mod synced;
51mod syncing;
52
53#[cfg(test)]
54mod test;
55
56type Synced = watch::Receiver<bool>;
57type NotifySynced = watch::Sender<bool>;
58type Database = Box<dyn RelayerDb>;
59
60/// Handle for interacting with the [`Relayer`].
61pub struct RelayerHandle {
62    /// Receives signals when the relayer reaches consistency with the DA layer.
63    synced: RelayerSynced,
64    /// Gracefully shuts down the relayer.
65    shutdown: RelayerShutdown,
66}
67
68/// Receives signals when the relayer reaches consistency with the DA layer.
69#[derive(Clone)]
70pub struct RelayerSynced {
71    synced: Synced,
72}
73
74/// The actual relayer that runs on a background task
75/// to sync with the DA layer.
76struct Relayer<P>
77where
78    P: Middleware<Error = ProviderError>,
79{
80    /// Sends signals when the relayer reaches consistency with the DA layer.
81    synced: NotifySynced,
82    /// The node that communicates with Ethereum.
83    eth_node: Arc<P>,
84    /// The fuel database.
85    database: Database,
86    /// Configuration settings.
87    config: Config,
88}
89
90/// Shutdown handle for gracefully ending
91/// the background relayer task.
92struct RelayerShutdown {
93    join_handle: tokio::task::JoinHandle<()>,
94    shutdown: Arc<AtomicBool>,
95}
96
97impl<P> Relayer<P>
98where
99    P: Middleware<Error = ProviderError> + 'static,
100{
101    /// Create a new relayer.
102    fn new(
103        synced: NotifySynced,
104        eth_node: P,
105        database: Database,
106        config: Config,
107    ) -> Self {
108        Self {
109            synced,
110            eth_node: Arc::new(eth_node),
111            database,
112            config,
113        }
114    }
115
116    async fn set_deploy_height(&self) {
117        if self.finalized().await.unwrap_or_default() < *self.config.da_deploy_height {
118            self.database
119                .set_finalized_da_height(self.config.da_deploy_height)
120                .await;
121        }
122    }
123}
124
125#[async_trait]
126impl<P> RelayerData for Relayer<P>
127where
128    P: Middleware<Error = ProviderError> + 'static,
129{
130    async fn download_logs(
131        &mut self,
132        eth_sync_gap: &state::EthSyncGap,
133    ) -> anyhow::Result<()> {
134        let logs = download_logs(
135            eth_sync_gap,
136            self.config.eth_v2_listening_contracts.clone(),
137            self.eth_node.clone(),
138            self.config.log_page_size,
139        );
140        write_logs(self.database.as_mut(), logs).await
141    }
142
143    async fn set_finalized_da_height(
144        &self,
145        height: fuel_core_interfaces::model::DaBlockHeight,
146    ) {
147        self.database.set_finalized_da_height(height).await
148    }
149
150    fn update_synced(&self, state: &state::EthState) {
151        update_synced(&self.synced, state)
152    }
153
154    async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
155        syncing::wait_if_eth_syncing(
156            &self.eth_node,
157            self.config.syncing_call_frequency,
158            self.config.syncing_log_frequency,
159        )
160        .await
161    }
162}
163
164impl RelayerHandle {
165    /// Start a http [`Relayer`] running and return the handle to it.
166    pub fn start(database: Database, config: Config) -> anyhow::Result<Self> {
167        let url = config.eth_client.clone().ok_or_else(|| {
168            anyhow::anyhow!(
169                "Tried to start Relayer without setting an eth_client in the config"
170            )
171        })?;
172        // TODO: Does this handle https?
173        let http = Http::new(url);
174        let eth_node = Provider::new(http);
175        Ok(Self::start_inner::<Provider<Http>>(
176            eth_node, database, config,
177        ))
178    }
179
180    #[cfg(any(test, feature = "test-helpers"))]
181    /// Start a test relayer.
182    pub fn start_test<P>(eth_node: P, database: Database, config: Config) -> Self
183    where
184        P: Middleware<Error = ProviderError> + 'static,
185    {
186        Self::start_inner(eth_node, database, config)
187    }
188
189    fn start_inner<P>(eth_node: P, database: Database, config: Config) -> Self
190    where
191        P: Middleware<Error = ProviderError> + 'static,
192    {
193        let (tx, rx) = watch::channel(false);
194        let synced = RelayerSynced { synced: rx };
195        let shutdown = run(Relayer::new(tx, eth_node, database, config));
196        Self { synced, shutdown }
197    }
198
199    /// Gets a handle to the synced notification
200    pub fn listen_synced(&self) -> RelayerSynced {
201        self.synced.clone()
202    }
203
204    /// Check if the [`Relayer`] is still running.
205    pub fn is_running(&self) -> bool {
206        !self.shutdown.join_handle.is_finished()
207    }
208
209    /// Gracefully shutdown the [`Relayer`].
210    pub async fn shutdown(self) -> anyhow::Result<()> {
211        self.shutdown
212            .shutdown
213            .store(true, core::sync::atomic::Ordering::Relaxed);
214        Ok(self.shutdown.join_handle.await?)
215    }
216}
217
218impl RelayerSynced {
219    /// Wait for the [`Relayer`] to be in sync with
220    /// the data availability layer.
221    ///
222    /// Yields until the relayer reaches a point where it
223    /// considered up to date. Note that there's no guarantee
224    /// the relayer will ever catch up to the da layer and
225    /// may fall behind immediately after this future completes.
226    ///
227    /// The only guarantee is that if this future completes then
228    /// the relayer did reach consistency with the da layer for
229    /// some period of time.
230    pub async fn await_synced(&self) -> Result<()> {
231        let mut rx = self.synced.clone();
232        if !rx.borrow_and_update().deref() {
233            rx.changed().await?;
234        }
235        Ok(())
236    }
237}
238
239#[async_trait]
240impl<P> state::EthRemote for Relayer<P>
241where
242    P: Middleware<Error = ProviderError>,
243{
244    async fn current(&self) -> anyhow::Result<u64> {
245        Ok(self.eth_node.get_block_number().await?.as_u64())
246    }
247
248    fn finalization_period(&self) -> u64 {
249        *self.config.da_finalization
250    }
251}
252
253#[async_trait]
254impl<P> state::EthLocal for Relayer<P>
255where
256    P: Middleware<Error = ProviderError>,
257{
258    async fn finalized(&self) -> Option<u64> {
259        self.database.get_finalized_da_height().await.map(|h| *h)
260    }
261}
262
263/// Main background run loop.
264fn run<P>(mut relayer: Relayer<P>) -> RelayerShutdown
265where
266    P: Middleware<Error = ProviderError> + 'static,
267{
268    let shutdown = Arc::new(AtomicBool::new(false));
269    let join_handle = tokio::task::spawn({
270        let shutdown = shutdown.clone();
271        async move {
272            // Set deploy height
273            relayer.set_deploy_height().await;
274            while !shutdown.load(core::sync::atomic::Ordering::Relaxed) {
275                let now = tokio::time::Instant::now();
276
277                if let Err(e) = run::run(&mut relayer).await {
278                    let e: &dyn std::error::Error = &*e;
279                    tracing::error!(e);
280                }
281
282                // Sleep the loop so the da node is not spammed.
283                tokio::time::sleep(
284                    relayer
285                        .config
286                        .sync_minimum_duration
287                        .saturating_sub(now.elapsed()),
288                )
289                .await;
290            }
291        }
292    });
293    RelayerShutdown {
294        join_handle,
295        shutdown,
296    }
297}