concordium_rust_sdk/indexer.rs
1//! Support for writing indexers using the Rust SDK.
2//!
3//! The main indexer entrypoint is the [`TraverseConfig::traverse`] method
4//! which will start chain traversal, calling methods of the [`Indexer`] trait
5//! for each finalized block it discovers.
6use crate::{
7 types::{
8 execution_tree, queries::BlockInfo, AccountTransactionEffects, BlockItemSummary,
9 BlockItemSummaryDetails, ExecutionTree, SpecialTransactionOutcome,
10 },
11 v2::{self, FinalizedBlockInfo, QueryError, QueryResult},
12};
13use concordium_base::{
14 base::{AbsoluteBlockHeight, Energy},
15 contracts_common::{AccountAddress, Amount, ContractAddress, OwnedEntrypointName},
16 hashes::TransactionHash,
17 smart_contracts::OwnedReceiveName,
18};
19use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt as _};
20use std::{
21 collections::{BTreeMap, BTreeSet},
22 time::Duration,
23};
24use tokio::time::error::Elapsed;
25pub use tonic::async_trait;
26
27/// Configuration for an indexer.
28pub struct TraverseConfig {
29 endpoints: Vec<v2::Endpoint>,
30 max_parallel: usize,
31 max_behind: std::time::Duration,
32 wait_after_fail: std::time::Duration,
33 start_height: AbsoluteBlockHeight,
34}
35
36#[derive(Debug, thiserror::Error)]
37/// An error encountered during chain traversal and passed to the
38/// [`Indexer`].
39pub enum TraverseError {
40 #[error("Failed to connect: {0}")]
41 Connect(#[from] tonic::transport::Error),
42 #[error("Failed to query: {0}")]
43 Query(#[from] QueryError),
44 #[error("Timed out waiting for finalized blocks.")]
45 Elapsed(#[from] Elapsed),
46}
47
48#[async_trait]
49/// A trait intended to be implemented by indexers that traverse the chain and
50/// extract information of interest to store for efficient retrieval.
51///
52/// The main method of this trait is [`on_finalized`](Indexer::on_finalized)
53/// which will be called by the [`traverse`](TraverseConfig::traverse) method
54/// for each finalized block. The other two methods are meant for signalling and
55/// bookkeeping.
56///
57/// Note that this trait has `async` methods, which is why the type signatures
58/// are daunting. The intended way of implementing it is to use the
59/// `async_trait` macro like so
60///
61/// ```
62/// # use concordium_rust_sdk::{indexer::*, v2::{self, FinalizedBlockInfo, QueryResult}};
63/// use concordium_rust_sdk::indexer::async_trait;
64/// # struct MyIndexer;
65/// #[async_trait]
66/// impl Indexer for MyIndexer {
67/// type Context = ();
68/// type Data = ();
69///
70/// async fn on_connect<'a>(
71/// &mut self,
72/// endpoint: v2::Endpoint,
73/// client: &'a mut v2::Client,
74/// ) -> QueryResult<Self::Context> {
75/// unimplemented!("Implement me.")
76/// }
77///
78/// async fn on_finalized<'a>(
79/// &self,
80/// client: v2::Client,
81/// ctx: &'a Self::Context,
82/// fbi: FinalizedBlockInfo,
83/// ) -> QueryResult<Self::Data> {
84/// unimplemented!("Implement me.")
85/// }
86///
87/// async fn on_failure(
88/// &mut self,
89/// ep: v2::Endpoint,
90/// successive_failures: u64,
91/// err: TraverseError,
92/// ) -> bool {
93/// unimplemented!("Implement me.")
94/// }
95/// }
96/// ```
97pub trait Indexer {
98 /// The data that is retrieved upon connecting to the endpoint and supplied
99 /// to each call of [`on_finalized`](Self::on_finalized).
100 type Context: Send + Sync;
101 /// The data returned by the [`on_finalized`](Self::on_finalized) call.
102 type Data: Send + Sync;
103
104 /// Called when a new connection is established to the given endpoint.
105 /// The return value from this method is passed to each call of
106 /// [`on_finalized`](Self::on_finalized).
107 async fn on_connect<'a>(
108 &mut self,
109 endpoint: v2::Endpoint,
110 client: &'a mut v2::Client,
111 ) -> QueryResult<Self::Context>;
112
113 /// The main method of this trait. It is called for each finalized block
114 /// that the indexer discovers. Note that the indexer might call this
115 /// concurrently for multiple blocks at the same time to speed up indexing.
116 ///
117 /// This method is meant to return errors that are unexpected, and if it
118 /// does return an error the indexer will attempt to reconnect to the
119 /// next endpoint.
120 async fn on_finalized<'a>(
121 &self,
122 client: v2::Client,
123 ctx: &'a Self::Context,
124 fbi: FinalizedBlockInfo,
125 ) -> QueryResult<Self::Data>;
126
127 /// Called when either connecting to the node or querying the node fails.
128 /// The number of successive failures without progress is passed to the
129 /// method which should return whether to stop indexing ([`true`]) or not
130 /// ([`false`]).
131 async fn on_failure(
132 &mut self,
133 endpoint: v2::Endpoint,
134 successive_failures: u64,
135 err: TraverseError,
136 ) -> bool;
137}
138
139impl TraverseConfig {
140 /// A configuration with a single endpoint starting at the given block
141 /// height.
142 pub fn new_single(endpoint: v2::Endpoint, start_height: AbsoluteBlockHeight) -> Self {
143 Self {
144 endpoints: vec![endpoint],
145 max_parallel: 4,
146 max_behind: Duration::from_secs(60),
147 wait_after_fail: Duration::from_secs(1),
148 start_height,
149 }
150 }
151
152 /// A configuration with a given list of endpoints and starting height.
153 /// Returns [`None`] if the list of endpoints is empty.
154 pub fn new(endpoints: Vec<v2::Endpoint>, start_height: AbsoluteBlockHeight) -> Option<Self> {
155 if endpoints.is_empty() {
156 return None;
157 }
158 Some(Self {
159 endpoints,
160 max_parallel: 4,
161 max_behind: Duration::from_secs(60),
162 wait_after_fail: Duration::from_secs(1),
163 start_height,
164 })
165 }
166
167 /// Set the maximum number of time the last finalized block of the node can
168 /// be behind before it is deemed too far behind, and another node is
169 /// tried.
170 ///
171 /// The default value is 60 seconds.
172 pub fn set_max_behind(self, max_behind: Duration) -> Self { Self { max_behind, ..self } }
173
174 /// After each failure the indexer will pause a bit before trying another
175 /// node to avoid overloading the node. Defaults to 1 second.
176 pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
177 Self {
178 wait_after_fail,
179 ..self
180 }
181 }
182
183 /// Add an additional endpoint to the list of endpoints the indexer will
184 /// use. This is added to the end of the list, so the endpoint is only tried
185 /// in case of failure of previous queries.
186 pub fn push_endpoint(mut self, endpoint: v2::Endpoint) -> Self {
187 self.endpoints.push(endpoint);
188 self
189 }
190
191 /// Set the maximum number of blocks that will be queried in parallel, if
192 /// they are available. Defaults to 4 if not set explicitly.
193 pub fn set_max_parallel(self, max_parallel: usize) -> Self {
194 Self {
195 max_parallel,
196 ..self
197 }
198 }
199
200 /// Traverse the chain according to the supplied configuration, invoking
201 /// [`on_finalized`](Indexer::on_finalized) for each finalized block.
202 ///
203 /// Multiple [`on_finalized`](Indexer::on_finalized) calls might be executed
204 /// concurrently, but their responses will be written to the provided
205 /// [`tokio::sync::mpsc::Sender`] in the increasing order of block height,
206 /// with no gaps.
207 ///
208 /// If a query fails, either due to timeout or node not being available the
209 /// indexer will attempt the next endpoint it is configured with.
210 ///
211 /// For robust indexing it is crucial that the supplied
212 /// [`Endpoints`](v2::Endpoint) are configured with timeouts so that the
213 /// indexer may make progress.
214 ///
215 /// The [`traverse`](Self::traverse) method will return either when
216 /// signalled so by the [`on_failure`](Indexer::on_failure) method or
217 /// when the receiver part of the supplied [`tokio::sync::mpsc::Sender`]
218 /// is closed. Typically this method should run in a task spawned via
219 /// [`tokio::spawn`].
220 pub async fn traverse<I: Indexer>(
221 self,
222 mut indexer: I,
223 sender: tokio::sync::mpsc::Sender<I::Data>,
224 ) -> QueryResult<()> {
225 let TraverseConfig {
226 endpoints,
227 max_parallel,
228 max_behind,
229 wait_after_fail,
230 start_height: mut height,
231 } = self;
232 let mut successive_failures: u64 = 0;
233 for node_ep in endpoints.into_iter().cycle() {
234 if sender.is_closed() {
235 return Ok(());
236 }
237 if successive_failures > 0 {
238 tokio::time::sleep(wait_after_fail).await
239 }
240 let mut node = match v2::Client::new(node_ep.clone()).await {
241 Ok(v) => v,
242 Err(e) => {
243 successive_failures += 1;
244 let should_stop = indexer
245 .on_failure(node_ep, successive_failures, e.into())
246 .await;
247 if should_stop {
248 return Ok(());
249 } else {
250 continue;
251 }
252 }
253 };
254
255 let context = match indexer.on_connect(node_ep.clone(), &mut node).await {
256 Ok(a) => a,
257 Err(e) => {
258 successive_failures += 1;
259 let should_stop = indexer
260 .on_failure(node_ep, successive_failures, e.into())
261 .await;
262 if should_stop {
263 return Ok(());
264 } else {
265 continue;
266 }
267 }
268 };
269 let mut finalized_blocks = match node.get_finalized_blocks_from(height).await {
270 Ok(v) => v,
271 Err(e) => {
272 successive_failures += 1;
273 let should_stop = indexer
274 .on_failure(node_ep, successive_failures, e.into())
275 .await;
276 if should_stop {
277 return Ok(());
278 } else {
279 continue;
280 }
281 }
282 };
283
284 'node_loop: loop {
285 let last_height = height;
286 let (has_error, chunks) = match finalized_blocks
287 .next_chunk_timeout(max_parallel, max_behind)
288 .await
289 {
290 Ok(v) => v,
291 Err(e) => {
292 successive_failures += 1;
293 let should_stop = indexer
294 .on_failure(node_ep, successive_failures, e.into())
295 .await;
296 if should_stop {
297 return Ok(());
298 } else {
299 break 'node_loop;
300 }
301 }
302 };
303
304 let mut futs = FuturesOrdered::new();
305 for fb in chunks {
306 futs.push_back(indexer.on_finalized(node.clone(), &context, fb));
307 }
308 while let Some(data) = futs.next().await {
309 let data = match data {
310 Ok(v) => v,
311 Err(e) => {
312 drop(futs);
313 successive_failures += 1;
314 let should_stop = indexer
315 .on_failure(node_ep, successive_failures, e.into())
316 .await;
317 if should_stop {
318 return Ok(());
319 } else {
320 break 'node_loop;
321 }
322 }
323 };
324 if sender.send(data).await.is_err() {
325 return Ok(()); // the listener ended the stream, meaning
326 // we
327 // should stop.
328 }
329 height = height.next();
330 }
331
332 if height > last_height {
333 successive_failures = 0;
334 }
335
336 if has_error {
337 // we have processed the blocks we can, but further queries on the same stream
338 // will fail since the stream signalled an error.
339 break 'node_loop;
340 }
341 }
342 }
343 Ok(()) // unreachable
344 }
345}
346
347/// An indexer that retrieves all transaction outcomes.
348///
349/// The [`on_connect`](Indexer::on_connect) and
350/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
351/// log the events on `info` and `warn` levels, respectively, using the
352/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
353/// of the log is `ccd_indexer` which may be used to filter the logs.
354pub struct TransactionIndexer;
355
356#[async_trait]
357impl Indexer for TransactionIndexer {
358 type Context = ();
359 type Data = (BlockInfo, Vec<BlockItemSummary>);
360
361 async fn on_connect<'a>(
362 &mut self,
363 endpoint: v2::Endpoint,
364 _client: &'a mut v2::Client,
365 ) -> QueryResult<()> {
366 tracing::info!(
367 target: "ccd_indexer",
368 "Connected to endpoint {}.",
369 endpoint.uri()
370 );
371 Ok(())
372 }
373
374 async fn on_finalized<'a>(
375 &self,
376 mut client: v2::Client,
377 _ctx: &'a (),
378 fbi: FinalizedBlockInfo,
379 ) -> QueryResult<Self::Data> {
380 let bi = client.get_block_info(fbi.height).await?.response;
381 if bi.transaction_count != 0 {
382 let summary = client
383 .get_block_transaction_events(fbi.height)
384 .await?
385 .response
386 .try_collect::<Vec<_>>()
387 .await?;
388 Ok((bi, summary))
389 } else {
390 Ok((bi, Vec::new()))
391 }
392 }
393
394 async fn on_failure(
395 &mut self,
396 endpoint: v2::Endpoint,
397 successive_failures: u64,
398 err: TraverseError,
399 ) -> bool {
400 tracing::warn!(
401 target: "ccd_indexer",
402 successive_failures,
403 "Failed when querying endpoint {}: {err}",
404 endpoint.uri()
405 );
406 false
407 }
408}
409
410/// An indexer that retrieves smart contract updates where the specific
411/// entrypoint of a contract was triggered as the top-level entrypoint.
412///
413/// The [`on_connect`](Indexer::on_connect) and
414/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
415/// log the events on `info` and `warn` levels, respectively, using the
416/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
417/// of the log is `ccd_indexer` which may be used to filter the logs.
418pub struct ContractUpdateIndexer {
419 pub target_address: ContractAddress,
420 pub entrypoint: OwnedEntrypointName,
421}
422
423pub struct ContractUpdateInfo {
424 /// The execution tree generated by the call.
425 pub execution_tree: ExecutionTree,
426 /// The amount of energy charged for the transaction.
427 pub energy_cost: Energy,
428 /// The cost, in CCD, of the transaction.
429 pub cost: Amount,
430 /// The hash of the transaction from which this update stems.
431 pub transaction_hash: TransactionHash,
432 /// The sender of the transaction.
433 pub sender: AccountAddress,
434}
435
436fn update_info(summary: BlockItemSummary) -> Option<ContractUpdateInfo> {
437 let BlockItemSummaryDetails::AccountTransaction(at) = summary.details else {
438 return None;
439 };
440
441 let AccountTransactionEffects::ContractUpdateIssued { effects } = at.effects else {
442 return None;
443 };
444
445 Some(ContractUpdateInfo {
446 execution_tree: execution_tree(effects)?,
447 energy_cost: summary.energy_cost,
448 cost: at.cost,
449 transaction_hash: summary.hash,
450 sender: at.sender,
451 })
452}
453
454#[async_trait]
455impl Indexer for ContractUpdateIndexer {
456 type Context = ();
457 type Data = (BlockInfo, Vec<ContractUpdateInfo>);
458
459 async fn on_connect<'a>(
460 &mut self,
461 endpoint: v2::Endpoint,
462 _client: &'a mut v2::Client,
463 ) -> QueryResult<()> {
464 tracing::info!(
465 target: "ccd_indexer",
466 "Connected to endpoint {}.",
467 endpoint.uri()
468 );
469 Ok(())
470 }
471
472 async fn on_finalized<'a>(
473 &self,
474 mut client: v2::Client,
475 _ctx: &'a (),
476 fbi: FinalizedBlockInfo,
477 ) -> QueryResult<Self::Data> {
478 let bi = client.get_block_info(fbi.height).await?.response;
479 if bi.transaction_count != 0 {
480 let summary = client
481 .get_block_transaction_events(fbi.height)
482 .await?
483 .response
484 .try_filter_map(|summary| async move {
485 let Some(info) = update_info(summary) else {
486 return Ok(None);
487 };
488 if info.execution_tree.address() == self.target_address
489 && info.execution_tree.entrypoint() == self.entrypoint.as_entrypoint_name()
490 {
491 Ok(Some(info))
492 } else {
493 Ok(None)
494 }
495 })
496 .try_collect::<Vec<_>>()
497 .await?;
498 Ok((bi, summary))
499 } else {
500 Ok((bi, Vec::new()))
501 }
502 }
503
504 async fn on_failure(
505 &mut self,
506 endpoint: v2::Endpoint,
507 successive_failures: u64,
508 err: TraverseError,
509 ) -> bool {
510 tracing::warn!(
511 target: "ccd_indexer",
512 successive_failures,
513 "Failed when querying endpoint {}: {err}",
514 endpoint.uri()
515 );
516 false
517 }
518}
519
520/// An indexer that retrieves smart contract updates where the specific
521/// contracts were affected. The configuration can choose to require any or all
522/// to be updated.
523///
524/// The [`on_connect`](Indexer::on_connect) and
525/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
526/// log the events on `info` and `warn` levels, respectively, using the
527/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
528/// of the log is `ccd_indexer` which may be used to filter the logs.
529pub struct AffectedContractIndexer {
530 pub addresses: BTreeSet<ContractAddress>,
531 /// Require all contract addreseses in the `addresses` set to be updated.
532 pub all: bool,
533}
534
535#[async_trait]
536impl Indexer for AffectedContractIndexer {
537 type Context = ();
538 type Data = (
539 BlockInfo,
540 Vec<(
541 ContractUpdateInfo,
542 BTreeMap<ContractAddress, BTreeSet<OwnedReceiveName>>,
543 )>,
544 );
545
546 async fn on_connect<'a>(
547 &mut self,
548 endpoint: v2::Endpoint,
549 _client: &'a mut v2::Client,
550 ) -> QueryResult<()> {
551 tracing::info!(
552 target: "ccd_indexer",
553 "Connected to endpoint {}.",
554 endpoint.uri()
555 );
556 Ok(())
557 }
558
559 async fn on_finalized<'a>(
560 &self,
561 mut client: v2::Client,
562 _ctx: &'a (),
563 fbi: FinalizedBlockInfo,
564 ) -> QueryResult<Self::Data> {
565 let bi = client.get_block_info(fbi.height).await?.response;
566 if bi.transaction_count != 0 {
567 let summary = client
568 .get_block_transaction_events(fbi.height)
569 .await?
570 .response
571 .try_filter_map(|summary| async move {
572 let Some(info) = update_info(summary) else {
573 return Ok(None);
574 };
575 let affected_addresses = info.execution_tree.affected_addresses();
576 if (self.all
577 && self
578 .addresses
579 .iter()
580 .all(|addr| affected_addresses.contains_key(addr)))
581 || self
582 .addresses
583 .iter()
584 .any(|addr| affected_addresses.contains_key(addr))
585 {
586 Ok(Some((info, affected_addresses)))
587 } else {
588 Ok(None)
589 }
590 })
591 .try_collect::<Vec<_>>()
592 .await?;
593 Ok((bi, summary))
594 } else {
595 Ok((bi, Vec::new()))
596 }
597 }
598
599 async fn on_failure(
600 &mut self,
601 endpoint: v2::Endpoint,
602 successive_failures: u64,
603 err: TraverseError,
604 ) -> bool {
605 tracing::warn!(
606 target: "ccd_indexer",
607 successive_failures,
608 "Failed when querying endpoint {}: {err}",
609 endpoint.uri()
610 );
611 false
612 }
613}
614
615/// An indexer that retrieves all events in a block, transaction outcomes
616/// and special transaction outcomes.
617///
618/// The [`on_connect`](Indexer::on_connect) and
619/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
620/// log the events on `info` and `warn` levels, respectively, using the
621/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
622/// of the log is `ccd_indexer` which may be used to filter the logs.
623pub struct BlockEventsIndexer;
624
625#[async_trait]
626impl Indexer for BlockEventsIndexer {
627 type Context = ();
628 type Data = (
629 BlockInfo,
630 Vec<BlockItemSummary>,
631 Vec<SpecialTransactionOutcome>,
632 );
633
634 async fn on_connect<'a>(
635 &mut self,
636 endpoint: v2::Endpoint,
637 client: &'a mut v2::Client,
638 ) -> QueryResult<()> {
639 TransactionIndexer.on_connect(endpoint, client).await
640 }
641
642 async fn on_finalized<'a>(
643 &self,
644 client: v2::Client,
645 ctx: &'a (),
646 fbi: FinalizedBlockInfo,
647 ) -> QueryResult<Self::Data> {
648 let mut special_client = client.clone();
649 let special = async move {
650 let events = special_client
651 .get_block_special_events(fbi.height)
652 .await?
653 .response
654 .try_collect()
655 .await?;
656 Ok(events)
657 };
658 let ((bi, summary), special) =
659 futures::try_join!(TransactionIndexer.on_finalized(client, ctx, fbi), special)?;
660 Ok((bi, summary, special))
661 }
662
663 async fn on_failure(
664 &mut self,
665 endpoint: v2::Endpoint,
666 successive_failures: u64,
667 err: TraverseError,
668 ) -> bool {
669 TransactionIndexer
670 .on_failure(endpoint, successive_failures, err)
671 .await
672 }
673}
674
675#[async_trait]
676/// Handle an individual event. This trait is designed to be used together with
677/// the [`ProcessorConfig`]. These two together are designed to ease the work of
678/// writing the part of indexers where data is to be stored in a database.
679pub trait ProcessEvent {
680 /// The type of events that are to be processed. Typically this will be all
681 /// of the transactions of interest for a single block.
682 type Data;
683 /// An error that can be signalled.
684 type Error: std::fmt::Display + std::fmt::Debug;
685 /// A description returned by the [`process`](ProcessEvent::process) method.
686 /// This message is logged by the [`ProcessorConfig`] and is intended to
687 /// describe the data that was just processed.
688 type Description: std::fmt::Display;
689
690 /// Process a single item. This should work atomically in the sense that
691 /// either the entire `data` is processed or none of it is in case of an
692 /// error. This property is relied upon by the [`ProcessorConfig`] to retry
693 /// failed attempts.
694 async fn process(&mut self, data: &Self::Data) -> Result<Self::Description, Self::Error>;
695
696 /// The `on_failure` method is invoked by the [`ProcessorConfig`] when it
697 /// fails to process an event. It is meant to retry to recreate the
698 /// resources, such as a database connection, that might have been
699 /// dropped. The return value should signal if the handler process
700 /// should continue (`true`) or not.
701 ///
702 /// The function takes the `error` that occurred at the latest
703 /// [`process`](Self::process) call that just failed, and the number of
704 /// attempts of calling `process` that failed.
705 async fn on_failure(
706 &mut self,
707 error: Self::Error,
708 failed_attempts: u32,
709 ) -> Result<bool, Self::Error>;
710}
711
712pub struct ProcessorConfig {
713 /// The amount of time to wait after a failure to process an event.
714 wait_after_fail: std::time::Duration,
715 /// A future to be signalled to stop processing.
716 stop: std::pin::Pin<Box<dyn std::future::Future<Output = ()>>>,
717}
718
719/// The default implementation behaves the same as
720/// [`ProcessorConfig::new`](ProcessorConfig::new).
721impl Default for ProcessorConfig {
722 fn default() -> Self { Self::new() }
723}
724
725impl ProcessorConfig {
726 /// After each failure the [`ProcessorConfig`] will pause a bit before
727 /// trying again. Defaults to 5 seconds.
728 pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
729 Self {
730 wait_after_fail,
731 ..self
732 }
733 }
734
735 /// Set the stop signal for the processor. This accepts a future which will
736 /// be polled and if the future yields ready the `process_events` method
737 /// will terminate.
738 ///
739 /// An example of such a future would be the `Receiver` end of a oneshot
740 /// channel.
741 pub fn set_stop_signal(self, stop: impl std::future::Future<Output = ()> + 'static) -> Self {
742 Self {
743 stop: Box::pin(stop),
744 ..self
745 }
746 }
747
748 /// Construct a new [`ProcessorConfig`] that will retry the given number of
749 /// times. The default wait after a failure is 5 seconds.
750 pub fn new() -> Self {
751 Self {
752 wait_after_fail: std::time::Duration::from_secs(5),
753 stop: Box::pin(std::future::pending()),
754 }
755 }
756
757 /// Process events that are coming in on the provided channel.
758 ///
759 /// This handler will only terminate in the case of
760 ///
761 /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
762 /// - the sender part of the `events` channel has been dropped
763 /// - the [`ProcessorConfig`] was configured with a termination signal that
764 /// was triggered.
765 ///
766 /// The function will log progress using the `tracing` library with the
767 /// target set to `ccd_event_processor`.
768 pub async fn process_events<P: ProcessEvent>(
769 mut self,
770 mut process: P,
771 mut events: tokio::sync::mpsc::Receiver<P::Data>,
772 ) {
773 while let Some(event) = tokio::select! {
774 biased;
775 _ = &mut self.stop => None,
776 r = events.recv() => r,
777 } {
778 let mut try_number: u32 = 0;
779 'outer: loop {
780 let start = tokio::time::Instant::now();
781 let response = process.process(&event).await;
782 let end = tokio::time::Instant::now();
783 let duration = end.duration_since(start).as_millis();
784 match response {
785 Ok(descr) => {
786 tracing::info!(
787 target: "ccd_event_processor",
788 "{descr} in {duration}ms."
789 );
790 break 'outer;
791 }
792 Err(e) => {
793 tracing::error!(
794 target: "ccd_event_processor",
795 "Failed to process event: {e}. Took {duration}ms to fail."
796 );
797 tracing::info!(
798 target: "ccd_event_processor",
799 "Retrying in {}ms.",
800 self.wait_after_fail.as_millis()
801 );
802 // Wait before calling on_failure with the idea that whatever caused the
803 // failure is more likely to be fixed if we try
804 // after a bit of time rather than immediately.
805 tokio::time::sleep(self.wait_after_fail).await;
806 match process.on_failure(e, try_number + 1).await {
807 Ok(true) => {
808 // do nothing, continue.
809 }
810 Ok(false) => return,
811 Err(e) => {
812 tracing::warn!("Failed to restart: {e}.");
813 }
814 }
815 try_number += 1;
816 }
817 }
818 }
819 }
820 tracing::info!(
821 target: "ccd_event_processor",
822 "Terminating process_events due to channel closing."
823 );
824 }
825}
826
827/// Given a configuration for traversing the chain and processing generated
828/// events start a process to traverse the chain and index events.
829///
830/// This process will only stop when the `stop_signal` future completes, when
831/// [`traverse`](TraverseConfig::traverse) completes, or when
832/// [`process_events`](ProcessorConfig::process_events) completes.
833pub async fn traverse_and_process<I: Indexer, P: ProcessEvent<Data = I::Data>>(
834 config: TraverseConfig,
835 i: I,
836 processor: ProcessorConfig,
837 p: P,
838) -> Result<(), QueryError> {
839 let (sender, receiver) = tokio::sync::mpsc::channel(10);
840 let fut1 = config.traverse(i, sender);
841 let fut2 = processor.process_events(p, receiver);
842 let (r1, ()) = futures::join!(fut1, fut2);
843 r1
844}