ethers_middleware/timelag/
mod.rs

1use async_trait::async_trait;
2use ethers_core::types::{
3    transaction::eip2718::TypedTransaction, Block, BlockId, BlockNumber, Bytes, FilterBlockOption,
4    NameOrAddress, Transaction, TransactionReceipt, TxHash, U256,
5};
6use std::sync::Arc;
7use thiserror::Error;
8
9use ethers_providers::{Middleware, MiddlewareError};
10
11type TimeLagResult<T, M> = Result<T, TimeLagError<M>>;
12
13/// TimeLage Provider Errors
14#[derive(Error, Debug)]
15pub enum TimeLagError<M>
16where
17    M: Middleware,
18{
19    #[error("{0}")]
20    /// Thrown when an internal middleware errors
21    MiddlewareError(M::Error),
22
23    #[error("Unsupported RPC. Timelag provider does not support filters or subscriptions.")]
24    Unsupported,
25}
26
27// Boilerplate
28impl<M: Middleware> MiddlewareError for TimeLagError<M> {
29    type Inner = M::Error;
30
31    fn from_err(src: M::Error) -> Self {
32        TimeLagError::MiddlewareError(src)
33    }
34
35    fn as_inner(&self) -> Option<&Self::Inner> {
36        match self {
37            TimeLagError::MiddlewareError(e) => Some(e),
38            _ => None,
39        }
40    }
41}
42/// TimeLag Provider
43#[derive(Debug)]
44pub struct TimeLag<M> {
45    inner: Arc<M>,
46    lag: u8,
47}
48
49impl<M> TimeLag<M>
50where
51    M: Middleware,
52{
53    /// Instantiates TimeLag provider
54    pub fn new(inner: M, lag: u8) -> Self {
55        Self { inner: inner.into(), lag }
56    }
57}
58
59impl<M> TimeLag<M>
60where
61    M: Middleware,
62{
63    async fn normalize_block_id(&self, id: Option<BlockId>) -> TimeLagResult<Option<BlockId>, M> {
64        match id {
65            Some(BlockId::Number(n)) => {
66                Ok(self.normalize_block_number(Some(n)).await?.map(Into::into))
67            }
68            None => Ok(self.normalize_block_number(None).await?.map(Into::into)),
69            _ => Ok(id),
70        }
71    }
72
73    async fn normalize_block_number(
74        &self,
75        number: Option<BlockNumber>,
76    ) -> TimeLagResult<Option<BlockNumber>, M> {
77        let lag_tip = self.get_block_number().await?;
78        match number {
79            Some(BlockNumber::Latest) => Ok(Some(BlockNumber::Number(lag_tip))),
80            Some(BlockNumber::Number(n)) => {
81                if n < lag_tip {
82                    Ok(Some(BlockNumber::Number(n)))
83                } else {
84                    Ok(Some(BlockNumber::Number(lag_tip)))
85                }
86            }
87            None => Ok(Some(BlockNumber::Number(lag_tip))),
88            _ => Ok(number),
89        }
90    }
91
92    async fn normalize_filter_range(
93        &self,
94        block_option: FilterBlockOption,
95    ) -> TimeLagResult<FilterBlockOption, M> {
96        match block_option {
97            FilterBlockOption::Range { from_block: _, to_block: None } => {
98                Ok(block_option.set_to_block(self.get_block_number().await?.into()))
99            }
100            _ => Ok(block_option),
101        }
102    }
103}
104
105#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
106#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
107impl<M> Middleware for TimeLag<M>
108where
109    M: Middleware,
110{
111    type Error = TimeLagError<M>;
112
113    type Provider = M::Provider;
114
115    type Inner = M;
116
117    fn inner(&self) -> &Self::Inner {
118        &self.inner
119    }
120
121    async fn get_block_number(&self) -> Result<ethers_core::types::U64, Self::Error> {
122        self.inner()
123            .get_block_number()
124            .await
125            .map(|num| num - self.lag)
126            .map_err(ethers_providers::MiddlewareError::from_err)
127    }
128
129    async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
130        &self,
131        tx: T,
132        block: Option<BlockId>,
133    ) -> Result<ethers_providers::PendingTransaction<'_, Self::Provider>, Self::Error> {
134        self.inner()
135            .send_transaction(tx, block)
136            .await
137            .map_err(ethers_providers::MiddlewareError::from_err)
138    }
139
140    async fn get_block<T: Into<BlockId> + Send + Sync>(
141        &self,
142        block_hash_or_number: T,
143    ) -> Result<Option<Block<TxHash>>, Self::Error> {
144        let block_hash_or_number = self
145            .normalize_block_id(Some(block_hash_or_number.into()))
146            .await?
147            .expect("Cannot return None if Some is passed in");
148
149        self.inner()
150            .get_block(block_hash_or_number)
151            .await
152            .map_err(ethers_providers::MiddlewareError::from_err)
153    }
154
155    async fn get_block_with_txs<T: Into<BlockId> + Send + Sync>(
156        &self,
157        block_hash_or_number: T,
158    ) -> Result<Option<Block<Transaction>>, Self::Error> {
159        let block_hash_or_number = self
160            .normalize_block_id(Some(block_hash_or_number.into()))
161            .await?
162            .expect("Cannot return None if Some is passed in");
163
164        self.inner()
165            .get_block_with_txs(block_hash_or_number)
166            .await
167            .map_err(ethers_providers::MiddlewareError::from_err)
168    }
169
170    async fn get_uncle_count<T: Into<BlockId> + Send + Sync>(
171        &self,
172        block_hash_or_number: T,
173    ) -> Result<U256, Self::Error> {
174        let block_hash_or_number = self
175            .normalize_block_id(Some(block_hash_or_number.into()))
176            .await?
177            .expect("Cannot return None if Some is passed in");
178
179        self.inner()
180            .get_uncle_count(block_hash_or_number)
181            .await
182            .map_err(ethers_providers::MiddlewareError::from_err)
183    }
184
185    async fn get_uncle<T: Into<BlockId> + Send + Sync>(
186        &self,
187        block_hash_or_number: T,
188        idx: ethers_core::types::U64,
189    ) -> Result<Option<Block<TxHash>>, Self::Error> {
190        let block_hash_or_number = self
191            .normalize_block_id(Some(block_hash_or_number.into()))
192            .await?
193            .expect("Cannot return None if Some is passed in");
194
195        self.inner()
196            .get_uncle(block_hash_or_number, idx)
197            .await
198            .map_err(ethers_providers::MiddlewareError::from_err)
199    }
200
201    async fn get_transaction_count<T: Into<NameOrAddress> + Send + Sync>(
202        &self,
203        from: T,
204        block: Option<BlockId>,
205    ) -> Result<U256, Self::Error> {
206        let block = self.normalize_block_id(block).await?;
207
208        self.inner()
209            .get_transaction_count(from, block)
210            .await
211            .map_err(ethers_providers::MiddlewareError::from_err)
212    }
213
214    async fn call(
215        &self,
216        tx: &TypedTransaction,
217        block: Option<BlockId>,
218    ) -> Result<Bytes, Self::Error> {
219        let block = self.normalize_block_id(block).await?;
220
221        self.inner().call(tx, block).await.map_err(ethers_providers::MiddlewareError::from_err)
222    }
223
224    async fn get_balance<T: Into<NameOrAddress> + Send + Sync>(
225        &self,
226        from: T,
227        block: Option<BlockId>,
228    ) -> Result<U256, Self::Error> {
229        let block = self.normalize_block_id(block).await?;
230        self.inner()
231            .get_balance(from, block)
232            .await
233            .map_err(ethers_providers::MiddlewareError::from_err)
234    }
235
236    async fn get_transaction_receipt<T: Send + Sync + Into<TxHash>>(
237        &self,
238        transaction_hash: T,
239    ) -> Result<Option<TransactionReceipt>, Self::Error> {
240        let receipt = self
241            .inner()
242            .get_transaction_receipt(transaction_hash)
243            .await
244            .map_err(ethers_providers::MiddlewareError::from_err)?;
245
246        if receipt.is_none() {
247            return Ok(None)
248        }
249
250        let receipt = receipt.expect("checked is_none");
251        if receipt.block_number.is_none() {
252            return Ok(Some(receipt))
253        }
254
255        let number = receipt.block_number.expect("checked is_none");
256        if number <= self.get_block_number().await? {
257            Ok(Some(receipt))
258        } else {
259            // Pretend it hasn't confirmed yet.
260            Ok(None)
261        }
262    }
263
264    async fn get_code<T: Into<NameOrAddress> + Send + Sync>(
265        &self,
266        at: T,
267        block: Option<BlockId>,
268    ) -> Result<Bytes, Self::Error> {
269        let block = self.normalize_block_id(block).await?;
270
271        self.inner().get_code(at, block).await.map_err(ethers_providers::MiddlewareError::from_err)
272    }
273
274    async fn get_storage_at<T: Into<NameOrAddress> + Send + Sync>(
275        &self,
276        from: T,
277        location: TxHash,
278        block: Option<BlockId>,
279    ) -> Result<TxHash, Self::Error> {
280        let block = self.normalize_block_id(block).await?;
281        self.inner()
282            .get_storage_at(from, location, block)
283            .await
284            .map_err(ethers_providers::MiddlewareError::from_err)
285    }
286
287    async fn fill_transaction(
288        &self,
289        tx: &mut TypedTransaction,
290        block: Option<BlockId>,
291    ) -> Result<(), Self::Error> {
292        self.inner()
293            .fill_transaction(tx, block)
294            .await
295            .map_err(ethers_providers::MiddlewareError::from_err)
296    }
297
298    async fn get_block_receipts<T: Into<BlockNumber> + Send + Sync>(
299        &self,
300        block: T,
301    ) -> Result<Vec<TransactionReceipt>, Self::Error> {
302        let block: BlockNumber = block.into();
303        let block = self
304            .normalize_block_number(Some(block))
305            .await?
306            .expect("Cannot return None if Some is passed in");
307
308        self.inner()
309            .get_block_receipts(block)
310            .await
311            .map_err(ethers_providers::MiddlewareError::from_err)
312    }
313
314    async fn get_logs(
315        &self,
316        filter: &ethers_core::types::Filter,
317    ) -> Result<Vec<ethers_core::types::Log>, Self::Error> {
318        let mut filter = filter.clone();
319        filter.block_option = self.normalize_filter_range(filter.block_option).await?;
320
321        self.inner().get_logs(&filter).await.map_err(ethers_providers::MiddlewareError::from_err)
322    }
323
324    async fn new_filter(
325        &self,
326        _filter: ethers_providers::FilterKind<'_>,
327    ) -> Result<U256, Self::Error> {
328        Err(TimeLagError::Unsupported)
329    }
330
331    async fn get_filter_changes<T, R>(&self, _id: T) -> Result<Vec<R>, Self::Error>
332    where
333        T: Into<U256> + Send + Sync,
334        R: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + std::fmt::Debug,
335    {
336        Err(TimeLagError::Unsupported)
337    }
338
339    async fn watch_blocks(
340        &self,
341    ) -> Result<ethers_providers::FilterWatcher<'_, Self::Provider, TxHash>, Self::Error> {
342        Err(TimeLagError::Unsupported)
343    }
344
345    async fn subscribe<T, R>(
346        &self,
347        _params: T,
348    ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, R>, Self::Error>
349    where
350        T: std::fmt::Debug + serde::Serialize + Send + Sync,
351        R: serde::de::DeserializeOwned + Send + Sync,
352        Self::Provider: ethers_providers::PubsubClient,
353    {
354        Err(TimeLagError::Unsupported)
355    }
356
357    async fn unsubscribe<T>(&self, _id: T) -> Result<bool, Self::Error>
358    where
359        T: Into<U256> + Send + Sync,
360        Self::Provider: ethers_providers::PubsubClient,
361    {
362        Err(TimeLagError::Unsupported)
363    }
364
365    async fn subscribe_blocks(
366        &self,
367    ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, Block<TxHash>>, Self::Error>
368    where
369        Self::Provider: ethers_providers::PubsubClient,
370    {
371        Err(TimeLagError::Unsupported)
372    }
373
374    async fn subscribe_pending_txs(
375        &self,
376    ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, TxHash>, Self::Error>
377    where
378        Self::Provider: ethers_providers::PubsubClient,
379    {
380        Err(TimeLagError::Unsupported)
381    }
382
383    async fn subscribe_full_pending_txs(
384        &self,
385    ) -> Result<ethers_providers::SubscriptionStream<'_, Self::Provider, Transaction>, Self::Error>
386    where
387        Self::Provider: ethers_providers::PubsubClient,
388    {
389        Err(TimeLagError::Unsupported)
390    }
391
392    async fn subscribe_logs<'a>(
393        &'a self,
394        _filter: &ethers_core::types::Filter,
395    ) -> Result<
396        ethers_providers::SubscriptionStream<'a, Self::Provider, ethers_core::types::Log>,
397        Self::Error,
398    >
399    where
400        Self::Provider: ethers_providers::PubsubClient,
401    {
402        Err(TimeLagError::Unsupported)
403    }
404}