fuel_core_relayer/
service.rs

1//! This module handles bridge communications between the fuel node and the data availability layer.
2
3use crate::{
4    Config,
5    log::EthEventLog,
6    ports::RelayerDb,
7    service::state::EthLocal,
8};
9use async_trait::async_trait;
10use core::time::Duration;
11use ethers_core::types::{
12    Filter,
13    Log,
14    SyncingStatus,
15    ValueOrArray,
16};
17use ethers_providers::{
18    Http,
19    Middleware,
20    Provider,
21    ProviderError,
22    Quorum,
23    QuorumProvider,
24    WeightedProvider,
25};
26use fuel_core_services::{
27    RunnableService,
28    RunnableTask,
29    ServiceRunner,
30    StateWatcher,
31    TaskNextAction,
32};
33use fuel_core_types::{
34    blockchain::primitives::DaBlockHeight,
35    entities::Message,
36};
37use futures::StreamExt;
38use std::convert::TryInto;
39use tokio::sync::watch;
40
41use self::{
42    get_logs::*,
43    run::RelayerData,
44};
45
46mod get_logs;
47mod run;
48mod state;
49mod syncing;
50
51#[cfg(test)]
52mod test;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum SyncState {
56    /// The relayer is partially synced with the DA layer. The variant contains the current
57    /// DA block height that the relayer synced to.
58    PartiallySynced(DaBlockHeight),
59    /// The relayer is fully synced with the DA layer.
60    Synced(DaBlockHeight),
61}
62
63impl SyncState {
64    /// Get the current DA block height that the relayer synced to.
65    pub fn da_block_height(&self) -> DaBlockHeight {
66        match self {
67            Self::PartiallySynced(height) | Self::Synced(height) => *height,
68        }
69    }
70
71    /// Returns `true` if the relayer is fully synced with the DA layer.
72    pub fn is_synced(&self) -> bool {
73        matches!(self, Self::Synced(_))
74    }
75}
76
77type Synced = watch::Receiver<SyncState>;
78type NotifySynced = watch::Sender<SyncState>;
79
80/// The alias of runnable relayer service.
81pub type Service<D> = CustomizableService<Provider<QuorumProvider<Http>>, D>;
82type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;
83
84/// The shared state of the relayer task.
85#[derive(Clone)]
86pub struct SharedState {
87    /// Receives signals when the relayer reaches consistency with the DA layer.
88    synced: Synced,
89}
90
91/// Not initialized version of the [`Task`].
92pub struct NotInitializedTask<P, D> {
93    /// Sends signals when the relayer reaches consistency with the DA layer.
94    synced: NotifySynced,
95    /// The node that communicates with Ethereum.
96    eth_node: P,
97    /// The fuel database.
98    database: D,
99    /// Configuration settings.
100    config: Config,
101    /// Retry on error
102    retry_on_error: bool,
103}
104
105pub enum RpcOutcome {
106    Success { logs_downloaded: u64 },
107    Error,
108}
109
110/// A trait for controlling the number of blocks queried per RPC call when downloading logs.
111/// Implementations may adapt the block range based on feedback from previous calls.
112pub trait PageSizer {
113    /// Updates the internal state of the page sizer based on the outcome of an RPC call.
114    ///
115    /// This method should be called after each log-fetching RPC call to allow the sizer
116    /// to adjust its block range strategy. It receives the number of logs downloaded and
117    /// whether the RPC call resulted in an error.
118    fn update(&mut self, outcome: RpcOutcome);
119
120    /// Returns the current number of blocks to include in the next RPC query.
121    fn page_size(&self) -> u64;
122}
123
124pub struct AdaptivePageSizer {
125    current: u64,
126    max: u64,
127    successful_rpc_calls: u64,
128    grow_threshold: u64,
129    max_logs_per_rpc: u64,
130}
131
132impl AdaptivePageSizer {
133    fn new(current: u64, max: u64, grow_threshold: u64, max_logs_per_rpc: u64) -> Self {
134        Self {
135            current,
136            max,
137            grow_threshold,
138            max_logs_per_rpc,
139            successful_rpc_calls: 0,
140        }
141    }
142}
143
144impl PageSizer for AdaptivePageSizer {
145    fn update(&mut self, outcome: RpcOutcome) {
146        const PAGE_GROW_FACTOR_NUM: u64 = 125;
147        const PAGE_GROW_FACTOR_DEN: u64 = 100;
148        const PAGE_SHRINK_FACTOR: u64 = 2;
149
150        match outcome {
151            RpcOutcome::Error => {
152                self.successful_rpc_calls = 0;
153                self.current = (self.current / PAGE_SHRINK_FACTOR).max(1);
154            }
155            RpcOutcome::Success { logs_downloaded }
156                if logs_downloaded > self.max_logs_per_rpc =>
157            {
158                self.successful_rpc_calls = 0;
159                self.current = (self.current / PAGE_SHRINK_FACTOR).max(1);
160            }
161            _ => {
162                self.successful_rpc_calls = self.successful_rpc_calls.saturating_add(1);
163                if self.successful_rpc_calls >= self.grow_threshold
164                    && self.current < self.max
165                {
166                    let grown = self.current.saturating_mul(PAGE_GROW_FACTOR_NUM)
167                        / PAGE_GROW_FACTOR_DEN;
168                    self.current = if grown > self.current {
169                        grown.min(self.max)
170                    } else {
171                        (self.current.saturating_add(1)).min(self.max)
172                    };
173                    self.successful_rpc_calls = 0;
174                }
175            }
176        }
177    }
178
179    fn page_size(&self) -> u64 {
180        self.current
181    }
182}
183
184/// The actual relayer background task that syncs with the DA layer.
185pub struct Task<P, D, S> {
186    /// Sends signals when the relayer reaches consistency with the DA layer.
187    synced: NotifySynced,
188    /// The node that communicates with Ethereum.
189    eth_node: P,
190    /// The fuel database.
191    database: D,
192    /// Configuration settings.
193    config: Config,
194    /// The watcher used to track the state of the service. If the service stops,
195    /// the task will stop synchronization.
196    shutdown: StateWatcher,
197    /// Retry on error
198    retry_on_error: bool,
199    /// Determines how many pages to request per RPC call, adapting based on success/failure feedback.
200    /// Allows dynamic tuning of log pagination to optimize performance and reliability.
201    page_sizer: S,
202}
203
204impl<P, D> NotInitializedTask<P, D>
205where
206    D: RelayerDb + 'static,
207{
208    /// Create a new relayer task.
209    fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
210        let da_block_height = database.get_finalized_da_height().unwrap_or_else(|| {
211            let height_before_deployed = config.da_deploy_height.0.saturating_sub(1);
212            height_before_deployed.into()
213        });
214
215        let (synced, _) = watch::channel(SyncState::PartiallySynced(da_block_height));
216
217        Self {
218            synced,
219            eth_node,
220            database,
221            config,
222            retry_on_error,
223        }
224    }
225}
226
227impl<P, D, S> RelayerData for Task<P, D, S>
228where
229    P: Middleware<Error = ProviderError> + 'static,
230    D: RelayerDb + 'static,
231    S: PageSizer + 'static + Send + Sync,
232{
233    async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
234        let mut shutdown = self.shutdown.clone();
235        tokio::select! {
236            biased;
237            _ = shutdown.while_started() => {
238                Err(anyhow::anyhow!("The relayer got a stop signal"))
239            },
240            result = syncing::wait_if_eth_syncing(
241                &self.eth_node,
242                self.config.syncing_call_frequency,
243                self.config.syncing_log_frequency,
244            ) => {
245                result
246            }
247        }
248    }
249
250    async fn download_logs(
251        &mut self,
252        eth_sync_gap: &state::EthSyncGap,
253    ) -> anyhow::Result<()> {
254        let logs = download_logs(
255            eth_sync_gap,
256            self.config.eth_v2_listening_contracts.clone(),
257            &self.eth_node,
258            &mut self.page_sizer,
259        );
260
261        let logs = logs.take_until(self.shutdown.while_started());
262        write_logs(&mut self.database, logs).await
263    }
264
265    fn update_synced(&self, state: &state::EthState) {
266        self.synced.send_if_modified(|last_state| {
267            let new_sync = state.sync_state();
268            if new_sync != *last_state {
269                *last_state = new_sync;
270                true
271            } else {
272                false
273            }
274        });
275    }
276
277    fn storage_da_block_height(&self) -> Option<u64> {
278        self.database
279            .get_finalized_da_height()
280            .map(|height| height.into())
281    }
282}
283
284#[async_trait]
285impl<P, D> RunnableService for NotInitializedTask<P, D>
286where
287    P: Middleware<Error = ProviderError> + 'static,
288    D: RelayerDb + 'static,
289{
290    const NAME: &'static str = "Relayer";
291
292    type SharedData = SharedState;
293    type Task = Task<P, D, AdaptivePageSizer>;
294    type TaskParams = ();
295
296    fn shared_data(&self) -> Self::SharedData {
297        let synced = self.synced.subscribe();
298
299        SharedState { synced }
300    }
301
302    async fn into_task(
303        mut self,
304        watcher: &StateWatcher,
305        _: Self::TaskParams,
306    ) -> anyhow::Result<Self::Task> {
307        let shutdown = watcher.clone();
308        let NotInitializedTask {
309            synced,
310            eth_node,
311            database,
312            config,
313            retry_on_error,
314        } = self;
315        let page_sizer = AdaptivePageSizer::new(
316            config.log_page_size,
317            config.log_page_size,
318            50,
319            config.max_logs_per_rpc,
320        );
321        let task = Task {
322            synced,
323            eth_node,
324            database,
325            shutdown,
326            retry_on_error,
327            page_sizer,
328            config,
329        };
330
331        Ok(task)
332    }
333}
334
335impl<P, D, S> RunnableTask for Task<P, D, S>
336where
337    P: Middleware<Error = ProviderError> + 'static,
338    D: RelayerDb + 'static,
339    S: PageSizer + 'static + Send + Sync,
340{
341    async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
342        let now = tokio::time::Instant::now();
343
344        let result = run::run(self).await;
345
346        if self.shutdown.borrow_and_update().started()
347            && (result.is_err() | self.synced.borrow().is_synced())
348        {
349            // Sleep the loop so the da node is not spammed.
350            tokio::time::sleep(
351                self.config
352                    .sync_minimum_duration
353                    .saturating_sub(now.elapsed()),
354            )
355            .await;
356        }
357
358        match result {
359            Err(err) => {
360                if !self.retry_on_error {
361                    tracing::error!("Exiting due to Error in relayer task: {}", err);
362                    TaskNextAction::Stop
363                } else {
364                    TaskNextAction::ErrorContinue(err)
365                }
366            }
367            _ => TaskNextAction::Continue,
368        }
369    }
370
371    async fn shutdown(self) -> anyhow::Result<()> {
372        // Nothing to shut down because we don't have any temporary state that should be dumped,
373        // and we don't spawn any sub-tasks that we need to finish or await.
374        Ok(())
375    }
376}
377
378impl SharedState {
379    /// Wait for the `Task` to be in sync with
380    /// the data availability layer.
381    ///
382    /// Yields until the relayer reaches a point where it
383    /// considered up to date. Note that there's no guarantee
384    /// the relayer will ever catch up to the da layer and
385    /// may fall behind immediately after this future completes.
386    ///
387    /// The only guarantee is that if this future completes then
388    /// the relayer did reach consistency with the da layer for
389    /// some period of time.
390    pub async fn await_synced(&self) -> anyhow::Result<()> {
391        let mut rx = self.synced.clone();
392        loop {
393            if rx.borrow_and_update().is_synced() {
394                break;
395            }
396
397            rx.changed().await?;
398        }
399
400        Ok(())
401    }
402
403    /// Wait until at least the given height is synced.
404    pub async fn await_at_least_synced(
405        &self,
406        height: &DaBlockHeight,
407    ) -> anyhow::Result<()> {
408        let mut rx = self.synced.clone();
409        loop {
410            if rx.borrow_and_update().da_block_height() >= *height {
411                break;
412            }
413
414            rx.changed().await?;
415        }
416        Ok(())
417    }
418
419    /// Get finalized da height that represents last block from da layer that got finalized.
420    /// Panics if height is not set as of initialization of the relayer.
421    pub fn get_finalized_da_height(&self) -> DaBlockHeight {
422        self.synced.borrow().da_block_height()
423    }
424}
425
426impl<P, D, S> state::EthRemote for Task<P, D, S>
427where
428    P: Middleware<Error = ProviderError>,
429    D: RelayerDb + 'static,
430    S: PageSizer + 'static + Send + Sync,
431{
432    async fn finalized(&self) -> anyhow::Result<u64> {
433        let mut shutdown = self.shutdown.clone();
434        tokio::select! {
435            biased;
436            _ = shutdown.while_started() => {
437                Err(anyhow::anyhow!("The relayer got a stop signal"))
438            },
439            block = self.eth_node.get_block(ethers_core::types::BlockNumber::Finalized) => {
440                let block_number = block.map_err(|err| anyhow::anyhow!("failed to get block from Eth node: {err:?}"))?
441                    .and_then(|block| block.number)
442                    .ok_or(anyhow::anyhow!("Block pending"))?
443                    .as_u64();
444                Ok(block_number)
445            }
446        }
447    }
448}
449
450impl<P, D, S> EthLocal for Task<P, D, S>
451where
452    P: Middleware<Error = ProviderError>,
453    D: RelayerDb + 'static,
454    S: PageSizer + 'static + Send + Sync,
455{
456    fn observed(&self) -> u64 {
457        self.synced.borrow().da_block_height().into()
458    }
459}
460
461/// Creates an instance of runnable relayer service.
462pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
463where
464    D: RelayerDb + 'static,
465{
466    let urls = config
467        .relayer
468        .clone()
469        .ok_or_else(|| {
470            anyhow::anyhow!(
471                "Tried to start Relayer without setting an eth_client in the config"
472            )
473        })?
474        .into_iter()
475        .map(|url| WeightedProvider::new(Http::new(url)));
476
477    let eth_node = Provider::new(QuorumProvider::new(Quorum::Majority, urls));
478    let retry_on_error = true;
479    Ok(new_service_internal(
480        eth_node,
481        database,
482        config,
483        retry_on_error,
484    ))
485}
486
487#[cfg(any(test, feature = "test-helpers"))]
488/// Start a test relayer.
489pub fn new_service_test<P, D>(
490    eth_node: P,
491    database: D,
492    config: Config,
493) -> CustomizableService<P, D>
494where
495    P: Middleware<Error = ProviderError> + 'static,
496    D: RelayerDb + 'static,
497{
498    let retry_on_fail = false;
499    new_service_internal(eth_node, database, config, retry_on_fail)
500}
501
502fn new_service_internal<P, D>(
503    eth_node: P,
504    database: D,
505    config: Config,
506    retry_on_error: bool,
507) -> CustomizableService<P, D>
508where
509    P: Middleware<Error = ProviderError> + 'static,
510    D: RelayerDb + 'static,
511{
512    let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);
513
514    CustomizableService::new(task)
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520
521    #[test]
522    fn adaptive_page_sizer_grows_when_threshold_exceeded() {
523        let grow_threshold = 50;
524        let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, 10_000);
525        for _ in 0..grow_threshold {
526            sizer.update(RpcOutcome::Success {
527                logs_downloaded: 100,
528            });
529        }
530        sizer.update(RpcOutcome::Success {
531            logs_downloaded: 100,
532        });
533        assert_eq!(sizer.page_size(), 5);
534    }
535
536    #[test]
537    fn adaptive_page_sizer_does_not_grow_if_below_threshold() {
538        let grow_threshold = 50;
539        let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, 10_000);
540        for _ in 0..grow_threshold - 10 {
541            sizer.update(RpcOutcome::Success {
542                logs_downloaded: 100,
543            });
544        }
545        assert_eq!(sizer.page_size(), 4);
546    }
547
548    #[test]
549    fn adaptive_page_sizer_does_not_grow_if_at_max() {
550        let grow_threshold = 50;
551        let mut sizer = AdaptivePageSizer::new(10, 10, grow_threshold, 10_000);
552        for _ in 0..grow_threshold + 1 {
553            sizer.update(RpcOutcome::Success {
554                logs_downloaded: 100,
555            });
556        }
557        assert_eq!(sizer.page_size(), 10);
558    }
559
560    #[test]
561    fn adaptive_page_sizer_shrinks_on_rpc_error() {
562        let grow_threshold = 50;
563        let mut sizer = AdaptivePageSizer::new(6, 10, grow_threshold, 10_000);
564        sizer.update(RpcOutcome::Error);
565        assert_eq!(sizer.page_size(), 3);
566    }
567
568    #[test]
569    fn adaptive_page_sizer_shrinks_on_excessive_logs() {
570        let mut sizer = AdaptivePageSizer::new(6, 10, 50, 100);
571        sizer.update(RpcOutcome::Success {
572            logs_downloaded: 101,
573        });
574        assert_eq!(sizer.page_size(), 3);
575    }
576
577    #[test]
578    fn adaptive_page_sizer_never_goes_below_one() {
579        let mut sizer = AdaptivePageSizer::new(1, 10, 50, 10_000);
580        sizer.update(RpcOutcome::Error);
581        assert_eq!(sizer.page_size(), 1);
582    }
583
584    #[test]
585    fn adaptive_page_sizer_resets_successful_calls_after_growth() {
586        let grow_threshold = 3;
587        let max_logs_per_rpc = 100;
588        let mut sizer = AdaptivePageSizer::new(2, 10, grow_threshold, max_logs_per_rpc);
589
590        sizer.update(RpcOutcome::Success {
591            logs_downloaded: 50,
592        });
593        sizer.update(RpcOutcome::Success {
594            logs_downloaded: 60,
595        });
596        sizer.update(RpcOutcome::Success {
597            logs_downloaded: 70,
598        }); // triggers growth
599
600        assert_eq!(sizer.successful_rpc_calls, 0, "Should reset after growth");
601    }
602
603    #[test]
604    fn adaptive_page_sizer_accumulates_successful_calls_until_threshold() {
605        let grow_threshold = 3;
606        let max_logs_per_rpc = 100;
607        let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, max_logs_per_rpc);
608
609        sizer.update(RpcOutcome::Success {
610            logs_downloaded: 20,
611        });
612        sizer.update(RpcOutcome::Success {
613            logs_downloaded: 25,
614        });
615        assert_eq!(sizer.page_size(), 4); // not yet grown
616
617        sizer.update(RpcOutcome::Success {
618            logs_downloaded: 30,
619        }); // threshold reached
620        assert_eq!(sizer.page_size(), 5);
621    }
622
623    #[test]
624    fn adaptive_page_sizer_grows_by_one_if_growth_factor_stalls() {
625        let grow_threshold = 50;
626        let mut sizer = AdaptivePageSizer::new(2, 10, grow_threshold, 10_000);
627        for _ in 0..grow_threshold {
628            sizer.update(RpcOutcome::Success {
629                logs_downloaded: 100,
630            });
631        }
632        sizer.update(RpcOutcome::Success {
633            logs_downloaded: 100,
634        });
635        assert_eq!(sizer.page_size(), 3, "Page size should grow by at least 1");
636    }
637
638    #[test]
639    fn adaptive_page_sizer_shrinks_when_logs_exceed_max_allowed() {
640        let grow_threshold = 50;
641        let max_logs_per_rpc = 100;
642        let mut sizer = AdaptivePageSizer::new(6, 10, grow_threshold, max_logs_per_rpc);
643
644        // Simulate a successful RPC call that returns more logs than allowed
645        sizer.update(RpcOutcome::Success {
646            logs_downloaded: max_logs_per_rpc + 1,
647        });
648
649        // Expect the page size to shrink
650        assert_eq!(
651            sizer.page_size(),
652            3,
653            "Page size should shrink when log count exceeds max_logs_per_rpc"
654        );
655    }
656}