1use crate::{
6 log::EthEventLog,
7 Config,
8};
9use anyhow::Result;
10use async_trait::async_trait;
11use core::time::Duration;
12use ethers_core::types::{
13 Filter,
14 Log,
15 SyncingStatus,
16 ValueOrArray,
17 H160,
18};
19use ethers_providers::{
20 Http,
21 Middleware,
22 Provider,
23 ProviderError,
24};
25use fuel_core_interfaces::{
26 db::Messages,
27 model::Message,
28 relayer::RelayerDb,
29};
30use std::{
31 convert::TryInto,
32 ops::Deref,
33 sync::{
34 atomic::AtomicBool,
35 Arc,
36 },
37};
38use synced::update_synced;
39use tokio::sync::watch;
40
41use self::{
42 get_logs::*,
43 run::RelayerData,
44 state::EthLocal,
45};
46
47mod get_logs;
48mod run;
49mod state;
50mod synced;
51mod syncing;
52
53#[cfg(test)]
54mod test;
55
56type Synced = watch::Receiver<bool>;
57type NotifySynced = watch::Sender<bool>;
58type Database = Box<dyn RelayerDb>;
59
60pub struct RelayerHandle {
62 synced: RelayerSynced,
64 shutdown: RelayerShutdown,
66}
67
68#[derive(Clone)]
70pub struct RelayerSynced {
71 synced: Synced,
72}
73
74struct Relayer<P>
77where
78 P: Middleware<Error = ProviderError>,
79{
80 synced: NotifySynced,
82 eth_node: Arc<P>,
84 database: Database,
86 config: Config,
88}
89
90struct RelayerShutdown {
93 join_handle: tokio::task::JoinHandle<()>,
94 shutdown: Arc<AtomicBool>,
95}
96
97impl<P> Relayer<P>
98where
99 P: Middleware<Error = ProviderError> + 'static,
100{
101 fn new(
103 synced: NotifySynced,
104 eth_node: P,
105 database: Database,
106 config: Config,
107 ) -> Self {
108 Self {
109 synced,
110 eth_node: Arc::new(eth_node),
111 database,
112 config,
113 }
114 }
115
116 async fn set_deploy_height(&self) {
117 if self.finalized().await.unwrap_or_default() < *self.config.da_deploy_height {
118 self.database
119 .set_finalized_da_height(self.config.da_deploy_height)
120 .await;
121 }
122 }
123}
124
125#[async_trait]
126impl<P> RelayerData for Relayer<P>
127where
128 P: Middleware<Error = ProviderError> + 'static,
129{
130 async fn download_logs(
131 &mut self,
132 eth_sync_gap: &state::EthSyncGap,
133 ) -> anyhow::Result<()> {
134 let logs = download_logs(
135 eth_sync_gap,
136 self.config.eth_v2_listening_contracts.clone(),
137 self.eth_node.clone(),
138 self.config.log_page_size,
139 );
140 write_logs(self.database.as_mut(), logs).await
141 }
142
143 async fn set_finalized_da_height(
144 &self,
145 height: fuel_core_interfaces::model::DaBlockHeight,
146 ) {
147 self.database.set_finalized_da_height(height).await
148 }
149
150 fn update_synced(&self, state: &state::EthState) {
151 update_synced(&self.synced, state)
152 }
153
154 async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
155 syncing::wait_if_eth_syncing(
156 &self.eth_node,
157 self.config.syncing_call_frequency,
158 self.config.syncing_log_frequency,
159 )
160 .await
161 }
162}
163
164impl RelayerHandle {
165 pub fn start(database: Database, config: Config) -> anyhow::Result<Self> {
167 let url = config.eth_client.clone().ok_or_else(|| {
168 anyhow::anyhow!(
169 "Tried to start Relayer without setting an eth_client in the config"
170 )
171 })?;
172 let http = Http::new(url);
174 let eth_node = Provider::new(http);
175 Ok(Self::start_inner::<Provider<Http>>(
176 eth_node, database, config,
177 ))
178 }
179
180 #[cfg(any(test, feature = "test-helpers"))]
181 pub fn start_test<P>(eth_node: P, database: Database, config: Config) -> Self
183 where
184 P: Middleware<Error = ProviderError> + 'static,
185 {
186 Self::start_inner(eth_node, database, config)
187 }
188
189 fn start_inner<P>(eth_node: P, database: Database, config: Config) -> Self
190 where
191 P: Middleware<Error = ProviderError> + 'static,
192 {
193 let (tx, rx) = watch::channel(false);
194 let synced = RelayerSynced { synced: rx };
195 let shutdown = run(Relayer::new(tx, eth_node, database, config));
196 Self { synced, shutdown }
197 }
198
199 pub fn listen_synced(&self) -> RelayerSynced {
201 self.synced.clone()
202 }
203
204 pub fn is_running(&self) -> bool {
206 !self.shutdown.join_handle.is_finished()
207 }
208
209 pub async fn shutdown(self) -> anyhow::Result<()> {
211 self.shutdown
212 .shutdown
213 .store(true, core::sync::atomic::Ordering::Relaxed);
214 Ok(self.shutdown.join_handle.await?)
215 }
216}
217
218impl RelayerSynced {
219 pub async fn await_synced(&self) -> Result<()> {
231 let mut rx = self.synced.clone();
232 if !rx.borrow_and_update().deref() {
233 rx.changed().await?;
234 }
235 Ok(())
236 }
237}
238
239#[async_trait]
240impl<P> state::EthRemote for Relayer<P>
241where
242 P: Middleware<Error = ProviderError>,
243{
244 async fn current(&self) -> anyhow::Result<u64> {
245 Ok(self.eth_node.get_block_number().await?.as_u64())
246 }
247
248 fn finalization_period(&self) -> u64 {
249 *self.config.da_finalization
250 }
251}
252
253#[async_trait]
254impl<P> state::EthLocal for Relayer<P>
255where
256 P: Middleware<Error = ProviderError>,
257{
258 async fn finalized(&self) -> Option<u64> {
259 self.database.get_finalized_da_height().await.map(|h| *h)
260 }
261}
262
263fn run<P>(mut relayer: Relayer<P>) -> RelayerShutdown
265where
266 P: Middleware<Error = ProviderError> + 'static,
267{
268 let shutdown = Arc::new(AtomicBool::new(false));
269 let join_handle = tokio::task::spawn({
270 let shutdown = shutdown.clone();
271 async move {
272 relayer.set_deploy_height().await;
274 while !shutdown.load(core::sync::atomic::Ordering::Relaxed) {
275 let now = tokio::time::Instant::now();
276
277 if let Err(e) = run::run(&mut relayer).await {
278 let e: &dyn std::error::Error = &*e;
279 tracing::error!(e);
280 }
281
282 tokio::time::sleep(
284 relayer
285 .config
286 .sync_minimum_duration
287 .saturating_sub(now.elapsed()),
288 )
289 .await;
290 }
291 }
292 });
293 RelayerShutdown {
294 join_handle,
295 shutdown,
296 }
297}