1use crate::Provider;
2
3use std::{fmt::Display, sync::Arc, time::Duration};
4
5use async_timer_rs::{hashed::Timeout, Timer};
6use completeq_rs::{error::CompleteQError, result::EmitResult, user_event::UserEvent};
7use ethers_types_rs::{
8 ethabi::{ParseLog, RawLog},
9 *,
10};
11use futures::{executor::ThreadPool, task::SpawnExt};
12use once_cell::sync::OnceCell;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum EventType {
17 Transaction(H256),
19 Filter(Filter),
21}
22
23impl Display for EventType {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 Self::Transaction(hash) => {
27 write!(f, "event_tx: {}", hash)
28 }
29 Self::Filter(filter) => {
30 write!(f, "event_filter: {:?}", filter)
31 }
32 }
33 }
34}
35
36pub enum EventArg {
38 Transaction(TransactionReceipt),
40 Log(Vec<Log>),
42}
43
44#[derive(Default, Clone)]
45pub struct Event;
46
47impl UserEvent for Event {
48 type Argument = EventArg;
49 type ID = EventType;
50}
51
52pub(crate) type OneshotCompleteQ = completeq_rs::oneshot::CompleteQ<Event>;
53pub(crate) type ChannelCompleteQ = completeq_rs::channel::CompleteQ<Event>;
54
55impl Provider {
56 pub(crate) fn start_event_poll(&self) {
57 static THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
58
59 let mut poller = Poller::new(self.clone());
60
61 let thread_pool = THREAD_POOL.get_or_init(|| ThreadPool::new().unwrap());
62
63 thread_pool
64 .spawn(async move { poller.poll_loop().await })
65 .unwrap();
66 }
67
68 pub fn register_filter_listener<A, T>(
69 &self,
70 address: Option<A>,
71 topic_filter: Option<T>,
72 ) -> anyhow::Result<DefaultFilterReceiver>
73 where
74 A: TryInto<AddressFilter>,
75 A::Error: std::error::Error + Sync + Send + 'static,
76 T: TryInto<TopicFilter>,
77 T::Error: std::error::Error + Sync + Send + 'static,
78 {
79 let address = if let Some(address) = address {
80 Some(address.try_into()?)
81 } else {
82 None
83 };
84
85 let topic_filter = if let Some(topic_filter) = topic_filter {
86 Some(topic_filter.try_into()?)
87 } else {
88 None
89 };
90
91 let filter = Filter {
92 from_block: None,
93 to_block: None,
94 address,
95 topics: topic_filter,
96 };
97
98 let event_type = EventType::Filter(filter.clone());
99
100 self.events.lock().unwrap().push(event_type.clone());
101
102 Ok(FilterReceiver {
103 filter,
104 receiver: self.channel.wait_for(event_type, 100),
105 })
106 }
107
108 pub fn register_transaction_listener<H>(
109 &self,
110 tx_hash: H,
111 ) -> anyhow::Result<DefaultTransactionReceipter>
112 where
113 H: TryInto<H256>,
114 H::Error: std::error::Error + Sync + Send + 'static,
115 {
116 let tx_hash = tx_hash.try_into()?;
117
118 let event_type = EventType::Transaction(tx_hash);
119
120 self.events.lock().unwrap().push(event_type.clone());
121
122 Ok(TransactionReceipter {
123 tx: tx_hash,
124 receiver: self.oneshot.wait_for(event_type),
125 })
126 }
127}
128
129struct Poller {
130 max_filter_block_range: U256,
131 last_poll_block_number: U256,
132 poll_interval_duration: Duration,
133 provider: Provider,
134 tx_listeners: Vec<H256>,
135 filter_listeners: Vec<Filter>,
136}
137
138impl Poller {
139 fn new(provider: Provider) -> Self {
140 Self {
141 last_poll_block_number: 0.into(),
142 max_filter_block_range: 10.into(),
143 provider,
144 tx_listeners: Default::default(),
145 filter_listeners: Default::default(),
146 poll_interval_duration: Duration::from_secs(5),
147 }
148 }
149 async fn poll_loop(&mut self) {
150 loop {
151 log::debug!("start events poll for {}", self.provider.id());
152 if Arc::strong_count(&self.provider.events) == 1 {
154 log::debug!("stop events poller for {}", self.provider.id());
155 return;
156 }
157
158 self.handle_new_listeners();
159
160 Self::handle_result("Poll one block events", self.poll_one().await);
161
162 log::debug!(
163 "events poll_once for {}, sleeping {:?}",
164 self.provider.id(),
165 self.poll_interval_duration
166 );
167
168 let timer = Timeout::new(self.poll_interval_duration);
169
170 timer.await;
171 }
172 }
173
174 async fn check_if_poll(&mut self) -> anyhow::Result<Option<U256>> {
175 if self.filter_listeners.is_empty() && self.tx_listeners.is_empty() {
176 return Ok(None);
177 }
178
179 let block_number = self.provider.eth_block_number().await?;
180
181 if block_number == self.last_poll_block_number {
182 Ok(None)
183 } else {
184 Ok(Some(block_number))
185 }
186 }
187
188 async fn recalc_poll_internval_duration(&mut self, block_number: U256) -> anyhow::Result<()> {
189 let last_block = self
190 .provider
191 .eth_get_block_by_number(block_number, true)
192 .await?;
193
194 let one: U256 = 1.into();
195
196 let prev_block = self
197 .provider
198 .eth_get_block_by_number(block_number - one, true)
199 .await?;
200
201 if let Some(prev_block) = prev_block {
202 if let Some(last_block) = last_block {
203 self.poll_interval_duration =
204 Duration::from_secs((last_block.timestamp - prev_block.timestamp).as_u64());
205
206 log::debug!(
207 "new poll interval duration is {:?}",
208 self.poll_interval_duration
209 );
210 }
211 }
212
213 Ok(())
214 }
215
216 async fn poll_one(&mut self) -> anyhow::Result<()> {
217 if let Some(block_number) = self.check_if_poll().await? {
218 self.fetch_and_emit_tx_events().await?;
219
220 self.fetch_and_emit_filter_events(block_number).await?;
221
222 if block_number - self.last_poll_block_number > 1.into() {
223 self.recalc_poll_internval_duration(block_number).await?;
224 }
225
226 self.last_poll_block_number = block_number;
227 }
228
229 Ok(())
230 }
231
232 fn handle_new_listeners(&mut self) {
234 let new_listeners = self
235 .provider
236 .events
237 .lock()
238 .unwrap()
239 .drain(0..)
240 .collect::<Vec<_>>();
241
242 for listener in new_listeners {
243 match listener {
244 EventType::Transaction(tx_hash) => {
245 self.tx_listeners.push(tx_hash);
246 }
247 EventType::Filter(filter_id) => {
248 self.filter_listeners.push(filter_id);
249 }
250 }
251 }
252 }
253
254 async fn fetch_and_emit_tx_events(&mut self) -> anyhow::Result<()> {
255 let mut remaining = vec![];
256
257 for tx_hash in &self.tx_listeners {
258 match self
259 .provider
260 .eth_get_transaction_receipt(tx_hash.clone())
261 .await?
262 {
263 Some(receipt) => {
264 log::trace!(
265 "Get tx {} receipt returns {}",
266 tx_hash,
267 serde_json::to_string(&receipt).unwrap()
268 );
269
270 self.provider.oneshot.complete_one(
272 EventType::Transaction(tx_hash.clone()),
273 EventArg::Transaction(receipt),
274 );
275 }
276 None => {
277 log::trace!("Get tx receipt return None");
278 remaining.push(tx_hash.clone());
279 }
280 }
281 }
282
283 self.tx_listeners = remaining;
284
285 Ok(())
286 }
287
288 async fn fetch_and_emit_filter_events(&mut self, block_number: U256) -> anyhow::Result<()> {
289 let mut remaining = vec![];
290
291 for filter in &self.filter_listeners {
292 let mut filter_send = filter.clone();
293
294 let from_block = if block_number > self.max_filter_block_range {
295 let mut from_block = block_number - self.max_filter_block_range;
296
297 if from_block < self.last_poll_block_number {
298 from_block = self.last_poll_block_number;
299 }
300
301 from_block
302 } else {
303 self.last_poll_block_number
304 };
305
306 filter_send.from_block = Some(from_block);
307 filter_send.to_block = Some(block_number);
308
309 log::debug!("try poll filter logs {:?}", filter_send);
310
311 let logs = self.provider.eth_get_logs(filter_send).await?;
312
313 match logs {
314 FilterEvents::Logs(logs) if logs.len() > 0 => {
315 let result = self
316 .provider
317 .channel
318 .complete_one(EventType::Filter(filter.clone()), EventArg::Log(logs))
319 .await;
320
321 match result {
322 EmitResult::Closed => {
323 log::debug!("Remove filter listener {:?}", filter);
324 continue;
325 }
326 _ => {}
327 }
328 }
329 _ => {
330 log::debug!("poll empty logs for filter {:?}", filter);
331 }
332 }
333
334 remaining.push(filter.clone());
335 }
336
337 self.filter_listeners = remaining;
338
339 Ok(())
340 }
341 fn handle_result<T, E>(tag: &str, result: std::result::Result<T, E>)
342 where
343 E: Display,
344 {
345 if result.is_err() {
346 log::error!("{} error, {}", tag, result.err().unwrap())
347 }
348 }
349}
350
351pub struct TransactionReceipter<T: Timer> {
353 pub tx: H256,
355
356 receiver: completeq_rs::oneshot::EventReceiver<Event, T>,
357}
358
359impl<T: Timer> TransactionReceipter<T>
360where
361 T: Unpin,
362{
363 pub async fn wait(&mut self) -> anyhow::Result<TransactionReceipt> {
364 let value = (&mut self.receiver).await.success()?;
365
366 match value {
367 Some(EventArg::Transaction(receipt)) => Ok(receipt),
368 None => Err(CompleteQError::PipeBroken.into()),
369 _ => {
370 panic!("Inner error, returns event arg type error!!!")
371 }
372 }
373 }
374}
375
376pub type DefaultTransactionReceipter = TransactionReceipter<Timeout>;
377
378pub struct FilterReceiver<T: Timer> {
379 pub filter: Filter,
380
381 receiver: completeq_rs::channel::EventReceiver<Event, T>,
382}
383
384impl<T: Timer> FilterReceiver<T>
385where
386 T: Unpin,
387{
388 pub async fn try_next(&mut self) -> anyhow::Result<Option<Vec<Log>>> {
389 Ok((&mut self.receiver).await.success().map(|c| {
390 c.map(|c| match c {
391 EventArg::Log(logs) => logs,
392 _ => {
393 panic!("Inner error, returns event arg type error!!!")
394 }
395 })
396 })?)
397 }
398}
399
400pub type DefaultFilterReceiver = FilterReceiver<Timeout>;
401
402pub struct TypedFilterReceiver<T, LOG>
403where
404 LOG: ParseLog,
405 T: Timer + Unpin,
406{
407 log: LOG,
408 receiver: FilterReceiver<T>,
409}
410
411impl<T, LOG> TypedFilterReceiver<T, LOG>
412where
413 LOG: ParseLog,
414 T: Timer + Unpin,
415{
416 pub fn new(log: LOG, receiver: FilterReceiver<T>) -> Self {
417 Self { log, receiver }
418 }
419 pub async fn try_next(&mut self) -> anyhow::Result<Option<Vec<LOG::Log>>> {
420 if let Some(logs) = self.receiver.try_next().await? {
421 let mut result = vec![];
422 for log in logs {
423 result.push(self.log.parse_log(RawLog {
424 data: log.data.0,
425 topics: log.topics,
426 })?);
427 }
428
429 Ok(Some(result))
430 } else {
431 Ok(None)
433 }
434 }
435}