ethers_middleware/timelag/
mod.rs1use async_trait::async_trait;
2use ethers_core::types::{
3 transaction::eip2718::TypedTransaction, Block, BlockId, BlockNumber, Bytes, FilterBlockOption,
4 NameOrAddress, Transaction, TransactionReceipt, TxHash, U256,
5};
6use std::sync::Arc;
7use thiserror::Error;
8
9use ethers_providers::{Middleware, MiddlewareError};
10
11type TimeLagResult<T, M> = Result<T, TimeLagError<M>>;
12
13#[derive(Error, Debug)]
15pub enum TimeLagError<M>
16where
17 M: Middleware,
18{
19 #[error("{0}")]
20 MiddlewareError(M::Error),
22
23 #[error("Unsupported RPC. Timelag provider does not support filters or subscriptions.")]
24 Unsupported,
25}
26
27impl<M: Middleware> MiddlewareError for TimeLagError<M> {
29 type Inner = M::Error;
30
31 fn from_err(src: M::Error) -> Self {
32 TimeLagError::MiddlewareError(src)
33 }
34
35 fn as_inner(&self) -> Option<&Self::Inner> {
36 match self {
37 TimeLagError::MiddlewareError(e) => Some(e),
38 _ => None,
39 }
40 }
41}
42#[derive(Debug)]
44pub struct TimeLag<M> {
45 inner: Arc<M>,
46 lag: u8,
47}
48
49impl<M> TimeLag<M>
50where
51 M: Middleware,
52{
53 pub fn new(inner: M, lag: u8) -> Self {
55 Self { inner: inner.into(), lag }
56 }
57}
58
59impl<M> TimeLag<M>
60where
61 M: Middleware,
62{
63 async fn normalize_block_id(&self, id: Option<BlockId>) -> TimeLagResult<Option<BlockId>, M> {
64 match id {
65 Some(BlockId::Number(n)) => {
66 Ok(self.normalize_block_number(Some(n)).await?.map(Into::into))
67 }
68 None => Ok(self.normalize_block_number(None).await?.map(Into::into)),
69 _ => Ok(id),
70 }
71 }
72
73 async fn normalize_block_number(
74 &self,
75 number: Option<BlockNumber>,
76 ) -> TimeLagResult<Option<BlockNumber>, M> {
77 let lag_tip = self.get_block_number().await?;
78 match number {
79 Some(BlockNumber::Latest) => Ok(Some(BlockNumber::Number(lag_tip))),
80 Some(BlockNumber::Number(n)) => {
81 if n < lag_tip {
82 Ok(Some(BlockNumber::Number(n)))
83 } else {
84 Ok(Some(BlockNumber::Number(lag_tip)))
85 }
86 }
87 None => Ok(Some(BlockNumber::Number(lag_tip))),
88 _ => Ok(number),
89 }
90 }
91
92 async fn normalize_filter_range(
93 &self,
94 block_option: FilterBlockOption,
95 ) -> TimeLagResult<FilterBlockOption, M> {
96 match block_option {
97 FilterBlockOption::Range { from_block: _, to_block: None } => {
98 Ok(block_option.set_to_block(self.get_block_number().await?.into()))
99 }
100 _ => Ok(block_option),
101 }
102 }
103}
104
105#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
106#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
107impl<M> Middleware for TimeLag<M>
108where
109 M: Middleware,
110{
111 type Error = TimeLagError<M>;
112
113 type Provider = M::Provider;
114
115 type Inner = M;
116
117 fn inner(&self) -> &Self::Inner {
118 &self.inner
119 }
120
121 async fn get_block_number(&self) -> Result<ethers_core::types::U64, Self::Error> {
122 self.inner()
123 .get_block_number()
124 .await
125 .map(|num| num - self.lag)
126 .map_err(ethers_providers::MiddlewareError::from_err)
127 }
128
129 async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
130 &self,
131 tx: T,
132 block: Option<BlockId>,
133 ) -> Result<ethers_providers::PendingTransaction<'_, Self::Provider>, Self::Error> {
134 self.inner()
135 .send_transaction(tx, block)
136 .await
137 .map_err(ethers_providers::MiddlewareError::from_err)
138 }
139
140 async fn get_block<T: Into<BlockId> + Send + Sync>(
141 &self,
142 block_hash_or_number: T,
143 ) -> Result<Option<Block<TxHash>>, Self::Error> {
144 let block_hash_or_number = self
145 .normalize_block_id(Some(block_hash_or_number.into()))
146 .await?
147 .expect("Cannot return None if Some is passed in");
148
149 self.inner()
150 .get_block(block_hash_or_number)
151 .await
152 .map_err(ethers_providers::MiddlewareError::from_err)
153 }
154
155 async fn get_block_with_txs<T: Into<BlockId> + Send + Sync>(
156 &self,
157 block_hash_or_number: T,
158 ) -> Result<Option<Block<Transaction>>, Self::Error> {
159 let block_hash_or_number = self
160 .normalize_block_id(Some(block_hash_or_number.into()))
161 .await?
162 .expect("Cannot return None if Some is passed in");
163
164 self.inner()
165 .get_block_with_txs(block_hash_or_number)
166 .await
167 .map_err(ethers_providers::MiddlewareError::from_err)
168 }
169
170 async fn get_uncle_count<T: Into<BlockId> + Send + Sync>(
171 &self,
172 block_hash_or_number: T,
173 ) -> Result<U256, Self::Error> {
174 let block_hash_or_number = self
175 .normalize_block_id(Some(block_hash_or_number.into()))
176 .await?
177 .expect("Cannot return None if Some is passed in");
178
179 self.inner()
180 .get_uncle_count(block_hash_or_number)
181 .await
182 .map_err(ethers_providers::MiddlewareError::from_err)
183 }
184
185 async fn get_uncle<T: Into<BlockId> + Send + Sync>(
186 &self,
187 block_hash_or_number: T,
188 idx: ethers_core::types::U64,
189 ) -> Result<Option<Block<TxHash>>, Self::Error> {
190 let block_hash_or_number = self
191 .normalize_block_id(Some(block_hash_or_number.into()))
192 .await?
193 .expect("Cannot return None if Some is passed in");
194
195 self.inner()
196 .get_uncle(block_hash_or_number, idx)
197 .await
198 .map_err(ethers_providers::MiddlewareError::from_err)
199 }
200
201 async fn get_transaction_count<T: Into<NameOrAddress> + Send + Sync>(
202 &self,
203 from: T,
204 block: Option<BlockId>,
205 ) -> Result<U256, Self::Error> {
206 let block = self.normalize_block_id(block).await?;
207
208 self.inner()
209 .get_transaction_count(from, block)
210 .await
211 .map_err(ethers_providers::MiddlewareError::from_err)
212 }
213
214 async fn call(
215 &self,
216 tx: &TypedTransaction,
217 block: Option<BlockId>,
218 ) -> Result<Bytes, Self::Error> {
219 let block = self.normalize_block_id(block).await?;
220
221 self.inner().call(tx, block).await.map_err(ethers_providers::MiddlewareError::from_err)
222 }
223
224 async fn get_balance<T: Into<NameOrAddress> + Send + Sync>(
225 &self,
226 from: T,
227 block: Option<BlockId>,
228 ) -> Result<U256, Self::Error> {
229 let block = self.normalize_block_id(block).await?;
230 self.inner()
231 .get_balance(from, block)
232 .await
233 .map_err(ethers_providers::MiddlewareError::from_err)
234 }
235
236 async fn get_transaction_receipt<T: Send + Sync + Into<TxHash>>(
237 &self,
238 transaction_hash: T,
239 ) -> Result<Option<TransactionReceipt>, Self::Error> {
240 let receipt = self
241 .inner()
242 .get_transaction_receipt(transaction_hash)
243 .await
244 .map_err(ethers_providers::MiddlewareError::from_err)?;
245
246 if receipt.is_none() {
247 return Ok(None)
248 }
249
250 let receipt = receipt.expect("checked is_none");
251 if receipt.block_number.is_none() {
252 return Ok(Some(receipt))
253 }
254
255 let number = receipt.block_number.expect("checked is_none");
256 if number <= self.get_block_number().await? {
257 Ok(Some(receipt))
258 } else {
259 Ok(None)
261 }
262 }
263
264 async fn get_code<T: Into<NameOrAddress> + Send + Sync>(
265 &self,
266 at: T,
267 block: Option<BlockId>,
268 ) -> Result<Bytes, Self::Error> {
269 let block = self.normalize_block_id(block).await?;
270
271 self.inner().get_code(at, block).await.map_err(ethers_providers::MiddlewareError::from_err)
272 }
273
274 async fn get_storage_at<T: Into<NameOrAddress> + Send + Sync>(
275 &self,
276 from: T,
277 location: TxHash,
278 block: Option<BlockId>,
279 ) -> Result<TxHash, Self::Error> {
280 let block = self.normalize_block_id(block).await?;
281 self.inner()
282 .get_storage_at(from, location, block)
283 .await
284 .map_err(ethers_providers::MiddlewareError::from_err)
285 }
286
287 async fn fill_transaction(
288 &self,
289 tx: &mut TypedTransaction,
290 block: Option<BlockId>,
291 ) -> Result<(), Self::Error> {
292 self.inner()
293 .fill_transaction(tx, block)
294 .await
295 .map_err(ethers_providers::MiddlewareError::from_err)
296 }
297
298 async fn get_block_receipts<T: Into<BlockNumber> + Send + Sync>(
299 &self,
300 block: T,
301 ) -> Result<Vec<TransactionReceipt>, Self::Error> {
302 let block: BlockNumber = block.into();
303 let block = self
304 .normalize_block_number(Some(block))
305 .await?
306 .expect("Cannot return None if Some is passed in");
307
308 self.inner()
309 .get_block_receipts(block)
310 .await
311 .map_err(ethers_providers::MiddlewareError::from_err)
312 }
313
314 async fn get_logs(
315 &self,
316 filter: ðers_core::types::Filter,
317 ) -> Result<Vec<ethers_core::types::Log>, Self::Error> {
318 let mut filter = filter.clone();
319 filter.block_option = self.normalize_filter_range(filter.block_option).await?;
320
321 self.inner().get_logs(&filter).await.map_err(ethers_providers::MiddlewareError::from_err)
322 }
323
324 async fn new_filter(
325 &self,
326 _filter: ethers_providers::FilterKind<'_>,
327 ) -> Result<U256, Self::Error> {
328 Err(TimeLagError::Unsupported)
329 }
330
331 async fn get_filter_changes<T, R>(&self, _id: T) -> Result<Vec<R>, Self::Error>
332 where
333 T: Into<U256> + Send + Sync,
334 R: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + std::fmt::Debug,
335 {
336 Err(TimeLagError::Unsupported)
337 }
338
339 async fn watch_blocks(
340 &self,
341 ) -> Result<ethers_providers::FilterWatcher<'_, Self::Provider, TxHash>, Self::Error> {
342 Err(TimeLagError::Unsupported)
343 }
344
345 async fn subscribe<T, R>(
346 &self,
347 _params: T,
348 ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, R>, Self::Error>
349 where
350 T: std::fmt::Debug + serde::Serialize + Send + Sync,
351 R: serde::de::DeserializeOwned + Send + Sync,
352 Self::Provider: ethers_providers::PubsubClient,
353 {
354 Err(TimeLagError::Unsupported)
355 }
356
357 async fn unsubscribe<T>(&self, _id: T) -> Result<bool, Self::Error>
358 where
359 T: Into<U256> + Send + Sync,
360 Self::Provider: ethers_providers::PubsubClient,
361 {
362 Err(TimeLagError::Unsupported)
363 }
364
365 async fn subscribe_blocks(
366 &self,
367 ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, Block<TxHash>>, Self::Error>
368 where
369 Self::Provider: ethers_providers::PubsubClient,
370 {
371 Err(TimeLagError::Unsupported)
372 }
373
374 async fn subscribe_pending_txs(
375 &self,
376 ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, TxHash>, Self::Error>
377 where
378 Self::Provider: ethers_providers::PubsubClient,
379 {
380 Err(TimeLagError::Unsupported)
381 }
382
383 async fn subscribe_full_pending_txs(
384 &self,
385 ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, Transaction>, Self::Error>
386 where
387 Self::Provider: ethers_providers::PubsubClient,
388 {
389 Err(TimeLagError::Unsupported)
390 }
391
392 async fn subscribe_logs<'a>(
393 &'a self,
394 _filter: ðers_core::types::Filter,
395 ) -> Result<
396 ethers_providers::SubscriptionStream<'a, Self::Provider, ethers_core::types::Log>,
397 Self::Error,
398 >
399 where
400 Self::Provider: ethers_providers::PubsubClient,
401 {
402 Err(TimeLagError::Unsupported)
403 }
404}