ethers_providers_rs/
event.rs

1use crate::Provider;
2
3use std::{fmt::Display, sync::Arc, time::Duration};
4
5use async_timer_rs::{hashed::Timeout, Timer};
6use completeq_rs::{error::CompleteQError, result::EmitResult, user_event::UserEvent};
7use ethers_types_rs::{
8    ethabi::{ParseLog, RawLog},
9    *,
10};
11use futures::{executor::ThreadPool, task::SpawnExt};
12use once_cell::sync::OnceCell;
13
14/// Ether client support event types
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum EventType {
17    /// Transaction mint event
18    Transaction(H256),
19    /// filter.
20    Filter(Filter),
21}
22
23impl Display for EventType {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            Self::Transaction(hash) => {
27                write!(f, "event_tx: {}", hash)
28            }
29            Self::Filter(filter) => {
30                write!(f, "event_filter: {:?}", filter)
31            }
32        }
33    }
34}
35
36/// Event emit arg
37pub enum EventArg {
38    /// Transaction mint event return value [`TransactionReceipt`]
39    Transaction(TransactionReceipt),
40    /// `get_ethLogs`/`eth_getFilterLogs` return value [`PollLogs`]
41    Log(Vec<Log>),
42}
43
44#[derive(Default, Clone)]
45pub struct Event;
46
47impl UserEvent for Event {
48    type Argument = EventArg;
49    type ID = EventType;
50}
51
52pub(crate) type OneshotCompleteQ = completeq_rs::oneshot::CompleteQ<Event>;
53pub(crate) type ChannelCompleteQ = completeq_rs::channel::CompleteQ<Event>;
54
55impl Provider {
56    pub(crate) fn start_event_poll(&self) {
57        static THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
58
59        let mut poller = Poller::new(self.clone());
60
61        let thread_pool = THREAD_POOL.get_or_init(|| ThreadPool::new().unwrap());
62
63        thread_pool
64            .spawn(async move { poller.poll_loop().await })
65            .unwrap();
66    }
67
68    pub fn register_filter_listener<A, T>(
69        &self,
70        address: Option<A>,
71        topic_filter: Option<T>,
72    ) -> anyhow::Result<DefaultFilterReceiver>
73    where
74        A: TryInto<AddressFilter>,
75        A::Error: std::error::Error + Sync + Send + 'static,
76        T: TryInto<TopicFilter>,
77        T::Error: std::error::Error + Sync + Send + 'static,
78    {
79        let address = if let Some(address) = address {
80            Some(address.try_into()?)
81        } else {
82            None
83        };
84
85        let topic_filter = if let Some(topic_filter) = topic_filter {
86            Some(topic_filter.try_into()?)
87        } else {
88            None
89        };
90
91        let filter = Filter {
92            from_block: None,
93            to_block: None,
94            address,
95            topics: topic_filter,
96        };
97
98        let event_type = EventType::Filter(filter.clone());
99
100        self.events.lock().unwrap().push(event_type.clone());
101
102        Ok(FilterReceiver {
103            filter,
104            receiver: self.channel.wait_for(event_type, 100),
105        })
106    }
107
108    pub fn register_transaction_listener<H>(
109        &self,
110        tx_hash: H,
111    ) -> anyhow::Result<DefaultTransactionReceipter>
112    where
113        H: TryInto<H256>,
114        H::Error: std::error::Error + Sync + Send + 'static,
115    {
116        let tx_hash = tx_hash.try_into()?;
117
118        let event_type = EventType::Transaction(tx_hash);
119
120        self.events.lock().unwrap().push(event_type.clone());
121
122        Ok(TransactionReceipter {
123            tx: tx_hash,
124            receiver: self.oneshot.wait_for(event_type),
125        })
126    }
127}
128
129struct Poller {
130    max_filter_block_range: U256,
131    last_poll_block_number: U256,
132    poll_interval_duration: Duration,
133    provider: Provider,
134    tx_listeners: Vec<H256>,
135    filter_listeners: Vec<Filter>,
136}
137
138impl Poller {
139    fn new(provider: Provider) -> Self {
140        Self {
141            last_poll_block_number: 0.into(),
142            max_filter_block_range: 10.into(),
143            provider,
144            tx_listeners: Default::default(),
145            filter_listeners: Default::default(),
146            poll_interval_duration: Duration::from_secs(5),
147        }
148    }
149    async fn poll_loop(&mut self) {
150        loop {
151            log::debug!("start events poll for {}", self.provider.id());
152            // If only `Self` handle strong reference of provider, stop polll loop
153            if Arc::strong_count(&self.provider.events) == 1 {
154                log::debug!("stop events poller for {}", self.provider.id());
155                return;
156            }
157
158            self.handle_new_listeners();
159
160            Self::handle_result("Poll one block events", self.poll_one().await);
161
162            log::debug!(
163                "events poll_once for {}, sleeping {:?}",
164                self.provider.id(),
165                self.poll_interval_duration
166            );
167
168            let timer = Timeout::new(self.poll_interval_duration);
169
170            timer.await;
171        }
172    }
173
174    async fn check_if_poll(&mut self) -> anyhow::Result<Option<U256>> {
175        if self.filter_listeners.is_empty() && self.tx_listeners.is_empty() {
176            return Ok(None);
177        }
178
179        let block_number = self.provider.eth_block_number().await?;
180
181        if block_number == self.last_poll_block_number {
182            Ok(None)
183        } else {
184            Ok(Some(block_number))
185        }
186    }
187
188    async fn recalc_poll_internval_duration(&mut self, block_number: U256) -> anyhow::Result<()> {
189        let last_block = self
190            .provider
191            .eth_get_block_by_number(block_number, true)
192            .await?;
193
194        let one: U256 = 1.into();
195
196        let prev_block = self
197            .provider
198            .eth_get_block_by_number(block_number - one, true)
199            .await?;
200
201        if let Some(prev_block) = prev_block {
202            if let Some(last_block) = last_block {
203                self.poll_interval_duration =
204                    Duration::from_secs((last_block.timestamp - prev_block.timestamp).as_u64());
205
206                log::debug!(
207                    "new poll interval duration is {:?}",
208                    self.poll_interval_duration
209                );
210            }
211        }
212
213        Ok(())
214    }
215
216    async fn poll_one(&mut self) -> anyhow::Result<()> {
217        if let Some(block_number) = self.check_if_poll().await? {
218            self.fetch_and_emit_tx_events().await?;
219
220            self.fetch_and_emit_filter_events(block_number).await?;
221
222            if block_number - self.last_poll_block_number > 1.into() {
223                self.recalc_poll_internval_duration(block_number).await?;
224            }
225
226            self.last_poll_block_number = block_number;
227        }
228
229        Ok(())
230    }
231
232    /// Get and remove new incoming listeners from provider events pipe.
233    fn handle_new_listeners(&mut self) {
234        let new_listeners = self
235            .provider
236            .events
237            .lock()
238            .unwrap()
239            .drain(0..)
240            .collect::<Vec<_>>();
241
242        for listener in new_listeners {
243            match listener {
244                EventType::Transaction(tx_hash) => {
245                    self.tx_listeners.push(tx_hash);
246                }
247                EventType::Filter(filter_id) => {
248                    self.filter_listeners.push(filter_id);
249                }
250            }
251        }
252    }
253
254    async fn fetch_and_emit_tx_events(&mut self) -> anyhow::Result<()> {
255        let mut remaining = vec![];
256
257        for tx_hash in &self.tx_listeners {
258            match self
259                .provider
260                .eth_get_transaction_receipt(tx_hash.clone())
261                .await?
262            {
263                Some(receipt) => {
264                    log::trace!(
265                        "Get tx {} receipt returns {}",
266                        tx_hash,
267                        serde_json::to_string(&receipt).unwrap()
268                    );
269
270                    // Oneshot event ignore to checking returns value
271                    self.provider.oneshot.complete_one(
272                        EventType::Transaction(tx_hash.clone()),
273                        EventArg::Transaction(receipt),
274                    );
275                }
276                None => {
277                    log::trace!("Get tx receipt return None");
278                    remaining.push(tx_hash.clone());
279                }
280            }
281        }
282
283        self.tx_listeners = remaining;
284
285        Ok(())
286    }
287
288    async fn fetch_and_emit_filter_events(&mut self, block_number: U256) -> anyhow::Result<()> {
289        let mut remaining = vec![];
290
291        for filter in &self.filter_listeners {
292            let mut filter_send = filter.clone();
293
294            let from_block = if block_number > self.max_filter_block_range {
295                let mut from_block = block_number - self.max_filter_block_range;
296
297                if from_block < self.last_poll_block_number {
298                    from_block = self.last_poll_block_number;
299                }
300
301                from_block
302            } else {
303                self.last_poll_block_number
304            };
305
306            filter_send.from_block = Some(from_block);
307            filter_send.to_block = Some(block_number);
308
309            log::debug!("try poll filter logs {:?}", filter_send);
310
311            let logs = self.provider.eth_get_logs(filter_send).await?;
312
313            match logs {
314                FilterEvents::Logs(logs) if logs.len() > 0 => {
315                    let result = self
316                        .provider
317                        .channel
318                        .complete_one(EventType::Filter(filter.clone()), EventArg::Log(logs))
319                        .await;
320
321                    match result {
322                        EmitResult::Closed => {
323                            log::debug!("Remove filter listener {:?}", filter);
324                            continue;
325                        }
326                        _ => {}
327                    }
328                }
329                _ => {
330                    log::debug!("poll empty logs for filter {:?}", filter);
331                }
332            }
333
334            remaining.push(filter.clone());
335        }
336
337        self.filter_listeners = remaining;
338
339        Ok(())
340    }
341    fn handle_result<T, E>(tag: &str, result: std::result::Result<T, E>)
342    where
343        E: Display,
344    {
345        if result.is_err() {
346            log::error!("{} error, {}", tag, result.err().unwrap())
347        }
348    }
349}
350
351/// Transaction instance provide extra wait fn
352pub struct TransactionReceipter<T: Timer> {
353    /// Transaction id
354    pub tx: H256,
355
356    receiver: completeq_rs::oneshot::EventReceiver<Event, T>,
357}
358
359impl<T: Timer> TransactionReceipter<T>
360where
361    T: Unpin,
362{
363    pub async fn wait(&mut self) -> anyhow::Result<TransactionReceipt> {
364        let value = (&mut self.receiver).await.success()?;
365
366        match value {
367            Some(EventArg::Transaction(receipt)) => Ok(receipt),
368            None => Err(CompleteQError::PipeBroken.into()),
369            _ => {
370                panic!("Inner error, returns event arg type error!!!")
371            }
372        }
373    }
374}
375
376pub type DefaultTransactionReceipter = TransactionReceipter<Timeout>;
377
378pub struct FilterReceiver<T: Timer> {
379    pub filter: Filter,
380
381    receiver: completeq_rs::channel::EventReceiver<Event, T>,
382}
383
384impl<T: Timer> FilterReceiver<T>
385where
386    T: Unpin,
387{
388    pub async fn try_next(&mut self) -> anyhow::Result<Option<Vec<Log>>> {
389        Ok((&mut self.receiver).await.success().map(|c| {
390            c.map(|c| match c {
391                EventArg::Log(logs) => logs,
392                _ => {
393                    panic!("Inner error, returns event arg type error!!!")
394                }
395            })
396        })?)
397    }
398}
399
400pub type DefaultFilterReceiver = FilterReceiver<Timeout>;
401
402pub struct TypedFilterReceiver<T, LOG>
403where
404    LOG: ParseLog,
405    T: Timer + Unpin,
406{
407    log: LOG,
408    receiver: FilterReceiver<T>,
409}
410
411impl<T, LOG> TypedFilterReceiver<T, LOG>
412where
413    LOG: ParseLog,
414    T: Timer + Unpin,
415{
416    pub fn new(log: LOG, receiver: FilterReceiver<T>) -> Self {
417        Self { log, receiver }
418    }
419    pub async fn try_next(&mut self) -> anyhow::Result<Option<Vec<LOG::Log>>> {
420        if let Some(logs) = self.receiver.try_next().await? {
421            let mut result = vec![];
422            for log in logs {
423                result.push(self.log.parse_log(RawLog {
424                    data: log.data.0,
425                    topics: log.topics,
426                })?);
427            }
428
429            Ok(Some(result))
430        } else {
431            // end receive loop
432            Ok(None)
433        }
434    }
435}