1use crate::{
4 Config,
5 log::EthEventLog,
6 ports::RelayerDb,
7 service::state::EthLocal,
8};
9use async_trait::async_trait;
10use core::time::Duration;
11use ethers_core::types::{
12 Filter,
13 Log,
14 SyncingStatus,
15 ValueOrArray,
16};
17use ethers_providers::{
18 Http,
19 Middleware,
20 Provider,
21 ProviderError,
22 Quorum,
23 QuorumProvider,
24 WeightedProvider,
25};
26use fuel_core_services::{
27 RunnableService,
28 RunnableTask,
29 ServiceRunner,
30 StateWatcher,
31 TaskNextAction,
32};
33use fuel_core_types::{
34 blockchain::primitives::DaBlockHeight,
35 entities::Message,
36};
37use futures::StreamExt;
38use std::convert::TryInto;
39use tokio::sync::watch;
40
41use self::{
42 get_logs::*,
43 run::RelayerData,
44};
45
46mod get_logs;
47mod run;
48mod state;
49mod syncing;
50
51#[cfg(test)]
52mod test;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum SyncState {
56 PartiallySynced(DaBlockHeight),
59 Synced(DaBlockHeight),
61}
62
63impl SyncState {
64 pub fn da_block_height(&self) -> DaBlockHeight {
66 match self {
67 Self::PartiallySynced(height) | Self::Synced(height) => *height,
68 }
69 }
70
71 pub fn is_synced(&self) -> bool {
73 matches!(self, Self::Synced(_))
74 }
75}
76
77type Synced = watch::Receiver<SyncState>;
78type NotifySynced = watch::Sender<SyncState>;
79
80pub type Service<D> = CustomizableService<Provider<QuorumProvider<Http>>, D>;
82type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;
83
84#[derive(Clone)]
86pub struct SharedState {
87 synced: Synced,
89}
90
91pub struct NotInitializedTask<P, D> {
93 synced: NotifySynced,
95 eth_node: P,
97 database: D,
99 config: Config,
101 retry_on_error: bool,
103}
104
105pub enum RpcOutcome {
106 Success { logs_downloaded: u64 },
107 Error,
108}
109
110pub trait PageSizer {
113 fn update(&mut self, outcome: RpcOutcome);
119
120 fn page_size(&self) -> u64;
122}
123
124pub struct AdaptivePageSizer {
125 current: u64,
126 max: u64,
127 successful_rpc_calls: u64,
128 grow_threshold: u64,
129 max_logs_per_rpc: u64,
130}
131
132impl AdaptivePageSizer {
133 fn new(current: u64, max: u64, grow_threshold: u64, max_logs_per_rpc: u64) -> Self {
134 Self {
135 current,
136 max,
137 grow_threshold,
138 max_logs_per_rpc,
139 successful_rpc_calls: 0,
140 }
141 }
142}
143
144impl PageSizer for AdaptivePageSizer {
145 fn update(&mut self, outcome: RpcOutcome) {
146 const PAGE_GROW_FACTOR_NUM: u64 = 125;
147 const PAGE_GROW_FACTOR_DEN: u64 = 100;
148 const PAGE_SHRINK_FACTOR: u64 = 2;
149
150 match outcome {
151 RpcOutcome::Error => {
152 self.successful_rpc_calls = 0;
153 self.current = (self.current / PAGE_SHRINK_FACTOR).max(1);
154 }
155 RpcOutcome::Success { logs_downloaded }
156 if logs_downloaded > self.max_logs_per_rpc =>
157 {
158 self.successful_rpc_calls = 0;
159 self.current = (self.current / PAGE_SHRINK_FACTOR).max(1);
160 }
161 _ => {
162 self.successful_rpc_calls = self.successful_rpc_calls.saturating_add(1);
163 if self.successful_rpc_calls >= self.grow_threshold
164 && self.current < self.max
165 {
166 let grown = self.current.saturating_mul(PAGE_GROW_FACTOR_NUM)
167 / PAGE_GROW_FACTOR_DEN;
168 self.current = if grown > self.current {
169 grown.min(self.max)
170 } else {
171 (self.current.saturating_add(1)).min(self.max)
172 };
173 self.successful_rpc_calls = 0;
174 }
175 }
176 }
177 }
178
179 fn page_size(&self) -> u64 {
180 self.current
181 }
182}
183
184pub struct Task<P, D, S> {
186 synced: NotifySynced,
188 eth_node: P,
190 database: D,
192 config: Config,
194 shutdown: StateWatcher,
197 retry_on_error: bool,
199 page_sizer: S,
202}
203
204impl<P, D> NotInitializedTask<P, D>
205where
206 D: RelayerDb + 'static,
207{
208 fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
210 let da_block_height = database.get_finalized_da_height().unwrap_or_else(|| {
211 let height_before_deployed = config.da_deploy_height.0.saturating_sub(1);
212 height_before_deployed.into()
213 });
214
215 let (synced, _) = watch::channel(SyncState::PartiallySynced(da_block_height));
216
217 Self {
218 synced,
219 eth_node,
220 database,
221 config,
222 retry_on_error,
223 }
224 }
225}
226
227impl<P, D, S> RelayerData for Task<P, D, S>
228where
229 P: Middleware<Error = ProviderError> + 'static,
230 D: RelayerDb + 'static,
231 S: PageSizer + 'static + Send + Sync,
232{
233 async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
234 let mut shutdown = self.shutdown.clone();
235 tokio::select! {
236 biased;
237 _ = shutdown.while_started() => {
238 Err(anyhow::anyhow!("The relayer got a stop signal"))
239 },
240 result = syncing::wait_if_eth_syncing(
241 &self.eth_node,
242 self.config.syncing_call_frequency,
243 self.config.syncing_log_frequency,
244 ) => {
245 result
246 }
247 }
248 }
249
250 async fn download_logs(
251 &mut self,
252 eth_sync_gap: &state::EthSyncGap,
253 ) -> anyhow::Result<()> {
254 let logs = download_logs(
255 eth_sync_gap,
256 self.config.eth_v2_listening_contracts.clone(),
257 &self.eth_node,
258 &mut self.page_sizer,
259 );
260
261 let logs = logs.take_until(self.shutdown.while_started());
262 write_logs(&mut self.database, logs).await
263 }
264
265 fn update_synced(&self, state: &state::EthState) {
266 self.synced.send_if_modified(|last_state| {
267 let new_sync = state.sync_state();
268 if new_sync != *last_state {
269 *last_state = new_sync;
270 true
271 } else {
272 false
273 }
274 });
275 }
276
277 fn storage_da_block_height(&self) -> Option<u64> {
278 self.database
279 .get_finalized_da_height()
280 .map(|height| height.into())
281 }
282}
283
284#[async_trait]
285impl<P, D> RunnableService for NotInitializedTask<P, D>
286where
287 P: Middleware<Error = ProviderError> + 'static,
288 D: RelayerDb + 'static,
289{
290 const NAME: &'static str = "Relayer";
291
292 type SharedData = SharedState;
293 type Task = Task<P, D, AdaptivePageSizer>;
294 type TaskParams = ();
295
296 fn shared_data(&self) -> Self::SharedData {
297 let synced = self.synced.subscribe();
298
299 SharedState { synced }
300 }
301
302 async fn into_task(
303 mut self,
304 watcher: &StateWatcher,
305 _: Self::TaskParams,
306 ) -> anyhow::Result<Self::Task> {
307 let shutdown = watcher.clone();
308 let NotInitializedTask {
309 synced,
310 eth_node,
311 database,
312 config,
313 retry_on_error,
314 } = self;
315 let page_sizer = AdaptivePageSizer::new(
316 config.log_page_size,
317 config.log_page_size,
318 50,
319 config.max_logs_per_rpc,
320 );
321 let task = Task {
322 synced,
323 eth_node,
324 database,
325 shutdown,
326 retry_on_error,
327 page_sizer,
328 config,
329 };
330
331 Ok(task)
332 }
333}
334
335impl<P, D, S> RunnableTask for Task<P, D, S>
336where
337 P: Middleware<Error = ProviderError> + 'static,
338 D: RelayerDb + 'static,
339 S: PageSizer + 'static + Send + Sync,
340{
341 async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
342 let now = tokio::time::Instant::now();
343
344 let result = run::run(self).await;
345
346 if self.shutdown.borrow_and_update().started()
347 && (result.is_err() | self.synced.borrow().is_synced())
348 {
349 tokio::time::sleep(
351 self.config
352 .sync_minimum_duration
353 .saturating_sub(now.elapsed()),
354 )
355 .await;
356 }
357
358 match result {
359 Err(err) => {
360 if !self.retry_on_error {
361 tracing::error!("Exiting due to Error in relayer task: {}", err);
362 TaskNextAction::Stop
363 } else {
364 TaskNextAction::ErrorContinue(err)
365 }
366 }
367 _ => TaskNextAction::Continue,
368 }
369 }
370
371 async fn shutdown(self) -> anyhow::Result<()> {
372 Ok(())
375 }
376}
377
378impl SharedState {
379 pub async fn await_synced(&self) -> anyhow::Result<()> {
391 let mut rx = self.synced.clone();
392 loop {
393 if rx.borrow_and_update().is_synced() {
394 break;
395 }
396
397 rx.changed().await?;
398 }
399
400 Ok(())
401 }
402
403 pub async fn await_at_least_synced(
405 &self,
406 height: &DaBlockHeight,
407 ) -> anyhow::Result<()> {
408 let mut rx = self.synced.clone();
409 loop {
410 if rx.borrow_and_update().da_block_height() >= *height {
411 break;
412 }
413
414 rx.changed().await?;
415 }
416 Ok(())
417 }
418
419 pub fn get_finalized_da_height(&self) -> DaBlockHeight {
422 self.synced.borrow().da_block_height()
423 }
424}
425
426impl<P, D, S> state::EthRemote for Task<P, D, S>
427where
428 P: Middleware<Error = ProviderError>,
429 D: RelayerDb + 'static,
430 S: PageSizer + 'static + Send + Sync,
431{
432 async fn finalized(&self) -> anyhow::Result<u64> {
433 let mut shutdown = self.shutdown.clone();
434 tokio::select! {
435 biased;
436 _ = shutdown.while_started() => {
437 Err(anyhow::anyhow!("The relayer got a stop signal"))
438 },
439 block = self.eth_node.get_block(ethers_core::types::BlockNumber::Finalized) => {
440 let block_number = block.map_err(|err| anyhow::anyhow!("failed to get block from Eth node: {err:?}"))?
441 .and_then(|block| block.number)
442 .ok_or(anyhow::anyhow!("Block pending"))?
443 .as_u64();
444 Ok(block_number)
445 }
446 }
447 }
448}
449
450impl<P, D, S> EthLocal for Task<P, D, S>
451where
452 P: Middleware<Error = ProviderError>,
453 D: RelayerDb + 'static,
454 S: PageSizer + 'static + Send + Sync,
455{
456 fn observed(&self) -> u64 {
457 self.synced.borrow().da_block_height().into()
458 }
459}
460
461pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
463where
464 D: RelayerDb + 'static,
465{
466 let urls = config
467 .relayer
468 .clone()
469 .ok_or_else(|| {
470 anyhow::anyhow!(
471 "Tried to start Relayer without setting an eth_client in the config"
472 )
473 })?
474 .into_iter()
475 .map(|url| WeightedProvider::new(Http::new(url)));
476
477 let eth_node = Provider::new(QuorumProvider::new(Quorum::Majority, urls));
478 let retry_on_error = true;
479 Ok(new_service_internal(
480 eth_node,
481 database,
482 config,
483 retry_on_error,
484 ))
485}
486
487#[cfg(any(test, feature = "test-helpers"))]
488pub fn new_service_test<P, D>(
490 eth_node: P,
491 database: D,
492 config: Config,
493) -> CustomizableService<P, D>
494where
495 P: Middleware<Error = ProviderError> + 'static,
496 D: RelayerDb + 'static,
497{
498 let retry_on_fail = false;
499 new_service_internal(eth_node, database, config, retry_on_fail)
500}
501
502fn new_service_internal<P, D>(
503 eth_node: P,
504 database: D,
505 config: Config,
506 retry_on_error: bool,
507) -> CustomizableService<P, D>
508where
509 P: Middleware<Error = ProviderError> + 'static,
510 D: RelayerDb + 'static,
511{
512 let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);
513
514 CustomizableService::new(task)
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520
521 #[test]
522 fn adaptive_page_sizer_grows_when_threshold_exceeded() {
523 let grow_threshold = 50;
524 let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, 10_000);
525 for _ in 0..grow_threshold {
526 sizer.update(RpcOutcome::Success {
527 logs_downloaded: 100,
528 });
529 }
530 sizer.update(RpcOutcome::Success {
531 logs_downloaded: 100,
532 });
533 assert_eq!(sizer.page_size(), 5);
534 }
535
536 #[test]
537 fn adaptive_page_sizer_does_not_grow_if_below_threshold() {
538 let grow_threshold = 50;
539 let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, 10_000);
540 for _ in 0..grow_threshold - 10 {
541 sizer.update(RpcOutcome::Success {
542 logs_downloaded: 100,
543 });
544 }
545 assert_eq!(sizer.page_size(), 4);
546 }
547
548 #[test]
549 fn adaptive_page_sizer_does_not_grow_if_at_max() {
550 let grow_threshold = 50;
551 let mut sizer = AdaptivePageSizer::new(10, 10, grow_threshold, 10_000);
552 for _ in 0..grow_threshold + 1 {
553 sizer.update(RpcOutcome::Success {
554 logs_downloaded: 100,
555 });
556 }
557 assert_eq!(sizer.page_size(), 10);
558 }
559
560 #[test]
561 fn adaptive_page_sizer_shrinks_on_rpc_error() {
562 let grow_threshold = 50;
563 let mut sizer = AdaptivePageSizer::new(6, 10, grow_threshold, 10_000);
564 sizer.update(RpcOutcome::Error);
565 assert_eq!(sizer.page_size(), 3);
566 }
567
568 #[test]
569 fn adaptive_page_sizer_shrinks_on_excessive_logs() {
570 let mut sizer = AdaptivePageSizer::new(6, 10, 50, 100);
571 sizer.update(RpcOutcome::Success {
572 logs_downloaded: 101,
573 });
574 assert_eq!(sizer.page_size(), 3);
575 }
576
577 #[test]
578 fn adaptive_page_sizer_never_goes_below_one() {
579 let mut sizer = AdaptivePageSizer::new(1, 10, 50, 10_000);
580 sizer.update(RpcOutcome::Error);
581 assert_eq!(sizer.page_size(), 1);
582 }
583
584 #[test]
585 fn adaptive_page_sizer_resets_successful_calls_after_growth() {
586 let grow_threshold = 3;
587 let max_logs_per_rpc = 100;
588 let mut sizer = AdaptivePageSizer::new(2, 10, grow_threshold, max_logs_per_rpc);
589
590 sizer.update(RpcOutcome::Success {
591 logs_downloaded: 50,
592 });
593 sizer.update(RpcOutcome::Success {
594 logs_downloaded: 60,
595 });
596 sizer.update(RpcOutcome::Success {
597 logs_downloaded: 70,
598 }); assert_eq!(sizer.successful_rpc_calls, 0, "Should reset after growth");
601 }
602
603 #[test]
604 fn adaptive_page_sizer_accumulates_successful_calls_until_threshold() {
605 let grow_threshold = 3;
606 let max_logs_per_rpc = 100;
607 let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, max_logs_per_rpc);
608
609 sizer.update(RpcOutcome::Success {
610 logs_downloaded: 20,
611 });
612 sizer.update(RpcOutcome::Success {
613 logs_downloaded: 25,
614 });
615 assert_eq!(sizer.page_size(), 4); sizer.update(RpcOutcome::Success {
618 logs_downloaded: 30,
619 }); assert_eq!(sizer.page_size(), 5);
621 }
622
623 #[test]
624 fn adaptive_page_sizer_grows_by_one_if_growth_factor_stalls() {
625 let grow_threshold = 50;
626 let mut sizer = AdaptivePageSizer::new(2, 10, grow_threshold, 10_000);
627 for _ in 0..grow_threshold {
628 sizer.update(RpcOutcome::Success {
629 logs_downloaded: 100,
630 });
631 }
632 sizer.update(RpcOutcome::Success {
633 logs_downloaded: 100,
634 });
635 assert_eq!(sizer.page_size(), 3, "Page size should grow by at least 1");
636 }
637
638 #[test]
639 fn adaptive_page_sizer_shrinks_when_logs_exceed_max_allowed() {
640 let grow_threshold = 50;
641 let max_logs_per_rpc = 100;
642 let mut sizer = AdaptivePageSizer::new(6, 10, grow_threshold, max_logs_per_rpc);
643
644 sizer.update(RpcOutcome::Success {
646 logs_downloaded: max_logs_per_rpc + 1,
647 });
648
649 assert_eq!(
651 sizer.page_size(),
652 3,
653 "Page size should shrink when log count exceeds max_logs_per_rpc"
654 );
655 }
656}