concordium_rust_sdk/
indexer.rs

1//! Support for writing indexers using the Rust SDK.
2//!
3//! The main indexer entrypoint is the [`TraverseConfig::traverse`] method
4//! which will start chain traversal, calling methods of the [`Indexer`] trait
5//! for each finalized block it discovers.
6use crate::{
7    types::{
8        execution_tree, queries::BlockInfo, AccountTransactionEffects, BlockItemSummary,
9        BlockItemSummaryDetails, ExecutionTree, SpecialTransactionOutcome,
10    },
11    v2::{self, FinalizedBlockInfo, QueryError, QueryResult},
12};
13use concordium_base::{
14    base::{AbsoluteBlockHeight, Energy},
15    contracts_common::{AccountAddress, Amount, ContractAddress, OwnedEntrypointName},
16    hashes::TransactionHash,
17    smart_contracts::OwnedReceiveName,
18};
19use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt as _};
20use std::{
21    collections::{BTreeMap, BTreeSet},
22    time::Duration,
23};
24use tokio::time::error::Elapsed;
25pub use tonic::async_trait;
26
27/// Configuration for an indexer.
28pub struct TraverseConfig {
29    endpoints:       Vec<v2::Endpoint>,
30    max_parallel:    usize,
31    max_behind:      std::time::Duration,
32    wait_after_fail: std::time::Duration,
33    start_height:    AbsoluteBlockHeight,
34}
35
36#[derive(Debug, thiserror::Error)]
37/// An error encountered during chain traversal and passed to the
38/// [`Indexer`].
39pub enum TraverseError {
40    #[error("Failed to connect: {0}")]
41    Connect(#[from] tonic::transport::Error),
42    #[error("Failed to query: {0}")]
43    Query(#[from] QueryError),
44    #[error("Timed out waiting for finalized blocks.")]
45    Elapsed(#[from] Elapsed),
46}
47
48#[async_trait]
49/// A trait intended to be implemented by indexers that traverse the chain and
50/// extract information of interest to store for efficient retrieval.
51///
52/// The main method of this trait is [`on_finalized`](Indexer::on_finalized)
53/// which will be called by the [`traverse`](TraverseConfig::traverse) method
54/// for each finalized block. The other two methods are meant for signalling and
55/// bookkeeping.
56///
57/// Note that this trait has `async` methods, which is why the type signatures
58/// are daunting. The intended way of implementing it is to use the
59/// `async_trait` macro like so
60///
61/// ```
62/// # use concordium_rust_sdk::{indexer::*, v2::{self, FinalizedBlockInfo, QueryResult}};
63/// use concordium_rust_sdk::indexer::async_trait;
64/// # struct MyIndexer;
65/// #[async_trait]
66/// impl Indexer for MyIndexer {
67///     type Context = ();
68///     type Data = ();
69///
70///     async fn on_connect<'a>(
71///         &mut self,
72///         endpoint: v2::Endpoint,
73///         client: &'a mut v2::Client,
74///     ) -> QueryResult<Self::Context> {
75///         unimplemented!("Implement me.")
76///     }
77///
78///     async fn on_finalized<'a>(
79///         &self,
80///         client: v2::Client,
81///         ctx: &'a Self::Context,
82///         fbi: FinalizedBlockInfo,
83///     ) -> QueryResult<Self::Data> {
84///         unimplemented!("Implement me.")
85///     }
86///
87///     async fn on_failure(
88///         &mut self,
89///         ep: v2::Endpoint,
90///         successive_failures: u64,
91///         err: TraverseError,
92///     ) -> bool {
93///         unimplemented!("Implement me.")
94///     }
95/// }
96/// ```
97pub trait Indexer {
98    /// The data that is retrieved upon connecting to the endpoint and supplied
99    /// to each call of [`on_finalized`](Self::on_finalized).
100    type Context: Send + Sync;
101    /// The data returned by the [`on_finalized`](Self::on_finalized) call.
102    type Data: Send + Sync;
103
104    /// Called when a new connection is established to the given endpoint.
105    /// The return value from this method is passed to each call of
106    /// [`on_finalized`](Self::on_finalized).
107    async fn on_connect<'a>(
108        &mut self,
109        endpoint: v2::Endpoint,
110        client: &'a mut v2::Client,
111    ) -> QueryResult<Self::Context>;
112
113    /// The main method of this trait. It is called for each finalized block
114    /// that the indexer discovers. Note that the indexer might call this
115    /// concurrently for multiple blocks at the same time to speed up indexing.
116    ///
117    /// This method is meant to return errors that are unexpected, and if it
118    /// does return an error the indexer will attempt to reconnect to the
119    /// next endpoint.
120    async fn on_finalized<'a>(
121        &self,
122        client: v2::Client,
123        ctx: &'a Self::Context,
124        fbi: FinalizedBlockInfo,
125    ) -> QueryResult<Self::Data>;
126
127    /// Called when either connecting to the node or querying the node fails.
128    /// The number of successive failures without progress is passed to the
129    /// method which should return whether to stop indexing ([`true`]) or not
130    /// ([`false`]).
131    async fn on_failure(
132        &mut self,
133        endpoint: v2::Endpoint,
134        successive_failures: u64,
135        err: TraverseError,
136    ) -> bool;
137}
138
139impl TraverseConfig {
140    /// A configuration with a single endpoint starting at the given block
141    /// height.
142    pub fn new_single(endpoint: v2::Endpoint, start_height: AbsoluteBlockHeight) -> Self {
143        Self {
144            endpoints: vec![endpoint],
145            max_parallel: 4,
146            max_behind: Duration::from_secs(60),
147            wait_after_fail: Duration::from_secs(1),
148            start_height,
149        }
150    }
151
152    /// A configuration with a given list of endpoints and starting height.
153    /// Returns [`None`] if the list of endpoints is empty.
154    pub fn new(endpoints: Vec<v2::Endpoint>, start_height: AbsoluteBlockHeight) -> Option<Self> {
155        if endpoints.is_empty() {
156            return None;
157        }
158        Some(Self {
159            endpoints,
160            max_parallel: 4,
161            max_behind: Duration::from_secs(60),
162            wait_after_fail: Duration::from_secs(1),
163            start_height,
164        })
165    }
166
167    /// Set the maximum number of time the last finalized block of the node can
168    /// be behind before it is deemed too far behind, and another node is
169    /// tried.
170    ///
171    /// The default value is 60 seconds.
172    pub fn set_max_behind(self, max_behind: Duration) -> Self { Self { max_behind, ..self } }
173
174    /// After each failure the indexer will pause a bit before trying another
175    /// node to avoid overloading the node. Defaults to 1 second.
176    pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
177        Self {
178            wait_after_fail,
179            ..self
180        }
181    }
182
183    /// Add an additional endpoint to the list of endpoints the indexer will
184    /// use. This is added to the end of the list, so the endpoint is only tried
185    /// in case of failure of previous queries.
186    pub fn push_endpoint(mut self, endpoint: v2::Endpoint) -> Self {
187        self.endpoints.push(endpoint);
188        self
189    }
190
191    /// Set the maximum number of blocks that will be queried in parallel, if
192    /// they are available. Defaults to 4 if not set explicitly.
193    pub fn set_max_parallel(self, max_parallel: usize) -> Self {
194        Self {
195            max_parallel,
196            ..self
197        }
198    }
199
200    /// Traverse the chain according to the supplied configuration, invoking
201    /// [`on_finalized`](Indexer::on_finalized) for each finalized block.
202    ///
203    /// Multiple [`on_finalized`](Indexer::on_finalized) calls might be executed
204    /// concurrently, but their responses will be written to the provided
205    /// [`tokio::sync::mpsc::Sender`] in the increasing order of block height,
206    /// with no gaps.
207    ///
208    /// If a query fails, either due to timeout or node not being available the
209    /// indexer will attempt the next endpoint it is configured with.
210    ///
211    /// For robust indexing it is crucial that the supplied
212    /// [`Endpoints`](v2::Endpoint) are configured with timeouts so that the
213    /// indexer may make progress.
214    ///
215    /// The [`traverse`](Self::traverse) method will return either when
216    /// signalled so by the [`on_failure`](Indexer::on_failure) method or
217    /// when the receiver part of the supplied [`tokio::sync::mpsc::Sender`]
218    /// is closed. Typically this method should run in a task spawned via
219    /// [`tokio::spawn`].
220    pub async fn traverse<I: Indexer>(
221        self,
222        mut indexer: I,
223        sender: tokio::sync::mpsc::Sender<I::Data>,
224    ) -> QueryResult<()> {
225        let TraverseConfig {
226            endpoints,
227            max_parallel,
228            max_behind,
229            wait_after_fail,
230            start_height: mut height,
231        } = self;
232        let mut successive_failures: u64 = 0;
233        for node_ep in endpoints.into_iter().cycle() {
234            if sender.is_closed() {
235                return Ok(());
236            }
237            if successive_failures > 0 {
238                tokio::time::sleep(wait_after_fail).await
239            }
240            let mut node = match v2::Client::new(node_ep.clone()).await {
241                Ok(v) => v,
242                Err(e) => {
243                    successive_failures += 1;
244                    let should_stop = indexer
245                        .on_failure(node_ep, successive_failures, e.into())
246                        .await;
247                    if should_stop {
248                        return Ok(());
249                    } else {
250                        continue;
251                    }
252                }
253            };
254
255            let context = match indexer.on_connect(node_ep.clone(), &mut node).await {
256                Ok(a) => a,
257                Err(e) => {
258                    successive_failures += 1;
259                    let should_stop = indexer
260                        .on_failure(node_ep, successive_failures, e.into())
261                        .await;
262                    if should_stop {
263                        return Ok(());
264                    } else {
265                        continue;
266                    }
267                }
268            };
269            let mut finalized_blocks = match node.get_finalized_blocks_from(height).await {
270                Ok(v) => v,
271                Err(e) => {
272                    successive_failures += 1;
273                    let should_stop = indexer
274                        .on_failure(node_ep, successive_failures, e.into())
275                        .await;
276                    if should_stop {
277                        return Ok(());
278                    } else {
279                        continue;
280                    }
281                }
282            };
283
284            'node_loop: loop {
285                let last_height = height;
286                let (has_error, chunks) = match finalized_blocks
287                    .next_chunk_timeout(max_parallel, max_behind)
288                    .await
289                {
290                    Ok(v) => v,
291                    Err(e) => {
292                        successive_failures += 1;
293                        let should_stop = indexer
294                            .on_failure(node_ep, successive_failures, e.into())
295                            .await;
296                        if should_stop {
297                            return Ok(());
298                        } else {
299                            break 'node_loop;
300                        }
301                    }
302                };
303
304                let mut futs = FuturesOrdered::new();
305                for fb in chunks {
306                    futs.push_back(indexer.on_finalized(node.clone(), &context, fb));
307                }
308                while let Some(data) = futs.next().await {
309                    let data = match data {
310                        Ok(v) => v,
311                        Err(e) => {
312                            drop(futs);
313                            successive_failures += 1;
314                            let should_stop = indexer
315                                .on_failure(node_ep, successive_failures, e.into())
316                                .await;
317                            if should_stop {
318                                return Ok(());
319                            } else {
320                                break 'node_loop;
321                            }
322                        }
323                    };
324                    if sender.send(data).await.is_err() {
325                        return Ok(()); // the listener ended the stream, meaning
326                                       // we
327                                       // should stop.
328                    }
329                    height = height.next();
330                }
331
332                if height > last_height {
333                    successive_failures = 0;
334                }
335
336                if has_error {
337                    // we have processed the blocks we can, but further queries on the same stream
338                    // will fail since the stream signalled an error.
339                    break 'node_loop;
340                }
341            }
342        }
343        Ok(()) // unreachable
344    }
345}
346
347/// An indexer that retrieves all transaction outcomes.
348///
349/// The [`on_connect`](Indexer::on_connect) and
350/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
351/// log the events on `info` and `warn` levels, respectively, using the
352/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
353/// of the log is `ccd_indexer` which may be used to filter the logs.
354pub struct TransactionIndexer;
355
356#[async_trait]
357impl Indexer for TransactionIndexer {
358    type Context = ();
359    type Data = (BlockInfo, Vec<BlockItemSummary>);
360
361    async fn on_connect<'a>(
362        &mut self,
363        endpoint: v2::Endpoint,
364        _client: &'a mut v2::Client,
365    ) -> QueryResult<()> {
366        tracing::info!(
367            target: "ccd_indexer",
368            "Connected to endpoint {}.",
369            endpoint.uri()
370        );
371        Ok(())
372    }
373
374    async fn on_finalized<'a>(
375        &self,
376        mut client: v2::Client,
377        _ctx: &'a (),
378        fbi: FinalizedBlockInfo,
379    ) -> QueryResult<Self::Data> {
380        let bi = client.get_block_info(fbi.height).await?.response;
381        if bi.transaction_count != 0 {
382            let summary = client
383                .get_block_transaction_events(fbi.height)
384                .await?
385                .response
386                .try_collect::<Vec<_>>()
387                .await?;
388            Ok((bi, summary))
389        } else {
390            Ok((bi, Vec::new()))
391        }
392    }
393
394    async fn on_failure(
395        &mut self,
396        endpoint: v2::Endpoint,
397        successive_failures: u64,
398        err: TraverseError,
399    ) -> bool {
400        tracing::warn!(
401            target: "ccd_indexer",
402            successive_failures,
403            "Failed when querying endpoint {}: {err}",
404            endpoint.uri()
405        );
406        false
407    }
408}
409
410/// An indexer that retrieves smart contract updates where the specific
411/// entrypoint of a contract was triggered as the top-level entrypoint.
412///
413/// The [`on_connect`](Indexer::on_connect) and
414/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
415/// log the events on `info` and `warn` levels, respectively, using the
416/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
417/// of the log is `ccd_indexer` which may be used to filter the logs.
418pub struct ContractUpdateIndexer {
419    pub target_address: ContractAddress,
420    pub entrypoint:     OwnedEntrypointName,
421}
422
423pub struct ContractUpdateInfo {
424    /// The execution tree generated by the call.
425    pub execution_tree:   ExecutionTree,
426    /// The amount of energy charged for the transaction.
427    pub energy_cost:      Energy,
428    /// The cost, in CCD, of the transaction.
429    pub cost:             Amount,
430    /// The hash of the transaction from which this update stems.
431    pub transaction_hash: TransactionHash,
432    /// The sender of the transaction.
433    pub sender:           AccountAddress,
434}
435
436fn update_info(summary: BlockItemSummary) -> Option<ContractUpdateInfo> {
437    let BlockItemSummaryDetails::AccountTransaction(at) = summary.details else {
438        return None;
439    };
440
441    let AccountTransactionEffects::ContractUpdateIssued { effects } = at.effects else {
442        return None;
443    };
444
445    Some(ContractUpdateInfo {
446        execution_tree:   execution_tree(effects)?,
447        energy_cost:      summary.energy_cost,
448        cost:             at.cost,
449        transaction_hash: summary.hash,
450        sender:           at.sender,
451    })
452}
453
454#[async_trait]
455impl Indexer for ContractUpdateIndexer {
456    type Context = ();
457    type Data = (BlockInfo, Vec<ContractUpdateInfo>);
458
459    async fn on_connect<'a>(
460        &mut self,
461        endpoint: v2::Endpoint,
462        _client: &'a mut v2::Client,
463    ) -> QueryResult<()> {
464        tracing::info!(
465            target: "ccd_indexer",
466            "Connected to endpoint {}.",
467            endpoint.uri()
468        );
469        Ok(())
470    }
471
472    async fn on_finalized<'a>(
473        &self,
474        mut client: v2::Client,
475        _ctx: &'a (),
476        fbi: FinalizedBlockInfo,
477    ) -> QueryResult<Self::Data> {
478        let bi = client.get_block_info(fbi.height).await?.response;
479        if bi.transaction_count != 0 {
480            let summary = client
481                .get_block_transaction_events(fbi.height)
482                .await?
483                .response
484                .try_filter_map(|summary| async move {
485                    let Some(info) = update_info(summary) else {
486                        return Ok(None);
487                    };
488                    if info.execution_tree.address() == self.target_address
489                        && info.execution_tree.entrypoint() == self.entrypoint.as_entrypoint_name()
490                    {
491                        Ok(Some(info))
492                    } else {
493                        Ok(None)
494                    }
495                })
496                .try_collect::<Vec<_>>()
497                .await?;
498            Ok((bi, summary))
499        } else {
500            Ok((bi, Vec::new()))
501        }
502    }
503
504    async fn on_failure(
505        &mut self,
506        endpoint: v2::Endpoint,
507        successive_failures: u64,
508        err: TraverseError,
509    ) -> bool {
510        tracing::warn!(
511            target: "ccd_indexer",
512            successive_failures,
513            "Failed when querying endpoint {}: {err}",
514            endpoint.uri()
515        );
516        false
517    }
518}
519
520/// An indexer that retrieves smart contract updates where the specific
521/// contracts were affected. The configuration can choose to require any or all
522/// to be updated.
523///
524/// The [`on_connect`](Indexer::on_connect) and
525/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
526/// log the events on `info` and `warn` levels, respectively, using the
527/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
528/// of the log is `ccd_indexer` which may be used to filter the logs.
529pub struct AffectedContractIndexer {
530    pub addresses: BTreeSet<ContractAddress>,
531    /// Require all contract addreseses in the `addresses` set to be updated.
532    pub all:       bool,
533}
534
535#[async_trait]
536impl Indexer for AffectedContractIndexer {
537    type Context = ();
538    type Data = (
539        BlockInfo,
540        Vec<(
541            ContractUpdateInfo,
542            BTreeMap<ContractAddress, BTreeSet<OwnedReceiveName>>,
543        )>,
544    );
545
546    async fn on_connect<'a>(
547        &mut self,
548        endpoint: v2::Endpoint,
549        _client: &'a mut v2::Client,
550    ) -> QueryResult<()> {
551        tracing::info!(
552            target: "ccd_indexer",
553            "Connected to endpoint {}.",
554            endpoint.uri()
555        );
556        Ok(())
557    }
558
559    async fn on_finalized<'a>(
560        &self,
561        mut client: v2::Client,
562        _ctx: &'a (),
563        fbi: FinalizedBlockInfo,
564    ) -> QueryResult<Self::Data> {
565        let bi = client.get_block_info(fbi.height).await?.response;
566        if bi.transaction_count != 0 {
567            let summary = client
568                .get_block_transaction_events(fbi.height)
569                .await?
570                .response
571                .try_filter_map(|summary| async move {
572                    let Some(info) = update_info(summary) else {
573                        return Ok(None);
574                    };
575                    let affected_addresses = info.execution_tree.affected_addresses();
576                    if (self.all
577                        && self
578                            .addresses
579                            .iter()
580                            .all(|addr| affected_addresses.contains_key(addr)))
581                        || self
582                            .addresses
583                            .iter()
584                            .any(|addr| affected_addresses.contains_key(addr))
585                    {
586                        Ok(Some((info, affected_addresses)))
587                    } else {
588                        Ok(None)
589                    }
590                })
591                .try_collect::<Vec<_>>()
592                .await?;
593            Ok((bi, summary))
594        } else {
595            Ok((bi, Vec::new()))
596        }
597    }
598
599    async fn on_failure(
600        &mut self,
601        endpoint: v2::Endpoint,
602        successive_failures: u64,
603        err: TraverseError,
604    ) -> bool {
605        tracing::warn!(
606            target: "ccd_indexer",
607            successive_failures,
608            "Failed when querying endpoint {}: {err}",
609            endpoint.uri()
610        );
611        false
612    }
613}
614
615/// An indexer that retrieves all events in a block, transaction outcomes
616/// and special transaction outcomes.
617///
618/// The [`on_connect`](Indexer::on_connect) and
619/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
620/// log the events on `info` and `warn` levels, respectively, using the
621/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
622/// of the log is `ccd_indexer` which may be used to filter the logs.
623pub struct BlockEventsIndexer;
624
625#[async_trait]
626impl Indexer for BlockEventsIndexer {
627    type Context = ();
628    type Data = (
629        BlockInfo,
630        Vec<BlockItemSummary>,
631        Vec<SpecialTransactionOutcome>,
632    );
633
634    async fn on_connect<'a>(
635        &mut self,
636        endpoint: v2::Endpoint,
637        client: &'a mut v2::Client,
638    ) -> QueryResult<()> {
639        TransactionIndexer.on_connect(endpoint, client).await
640    }
641
642    async fn on_finalized<'a>(
643        &self,
644        client: v2::Client,
645        ctx: &'a (),
646        fbi: FinalizedBlockInfo,
647    ) -> QueryResult<Self::Data> {
648        let mut special_client = client.clone();
649        let special = async move {
650            let events = special_client
651                .get_block_special_events(fbi.height)
652                .await?
653                .response
654                .try_collect()
655                .await?;
656            Ok(events)
657        };
658        let ((bi, summary), special) =
659            futures::try_join!(TransactionIndexer.on_finalized(client, ctx, fbi), special)?;
660        Ok((bi, summary, special))
661    }
662
663    async fn on_failure(
664        &mut self,
665        endpoint: v2::Endpoint,
666        successive_failures: u64,
667        err: TraverseError,
668    ) -> bool {
669        TransactionIndexer
670            .on_failure(endpoint, successive_failures, err)
671            .await
672    }
673}
674
675#[async_trait]
676/// Handle an individual event. This trait is designed to be used together with
677/// the [`ProcessorConfig`]. These two together are designed to ease the work of
678/// writing the part of indexers where data is to be stored in a database.
679pub trait ProcessEvent {
680    /// The type of events that are to be processed. Typically this will be all
681    /// of the transactions of interest for a single block.
682    type Data;
683    /// An error that can be signalled.
684    type Error: std::fmt::Display + std::fmt::Debug;
685    /// A description returned by the [`process`](ProcessEvent::process) method.
686    /// This message is logged by the [`ProcessorConfig`] and is intended to
687    /// describe the data that was just processed.
688    type Description: std::fmt::Display;
689
690    /// Process a single item. This should work atomically in the sense that
691    /// either the entire `data` is processed or none of it is in case of an
692    /// error. This property is relied upon by the [`ProcessorConfig`] to retry
693    /// failed attempts.
694    async fn process(&mut self, data: &Self::Data) -> Result<Self::Description, Self::Error>;
695
696    /// The `on_failure` method is invoked by the [`ProcessorConfig`] when it
697    /// fails to process an event. It is meant to retry to recreate the
698    /// resources, such as a database connection, that might have been
699    /// dropped. The return value should signal if the handler process
700    /// should continue (`true`) or not.
701    ///
702    /// The function takes the `error` that occurred at the latest
703    /// [`process`](Self::process) call that just failed, and the number of
704    /// attempts of calling `process` that failed.
705    async fn on_failure(
706        &mut self,
707        error: Self::Error,
708        failed_attempts: u32,
709    ) -> Result<bool, Self::Error>;
710}
711
712pub struct ProcessorConfig {
713    /// The amount of time to wait after a failure to process an event.
714    wait_after_fail: std::time::Duration,
715    /// A future to be signalled to stop processing.
716    stop:            std::pin::Pin<Box<dyn std::future::Future<Output = ()>>>,
717}
718
719/// The default implementation behaves the same as
720/// [`ProcessorConfig::new`](ProcessorConfig::new).
721impl Default for ProcessorConfig {
722    fn default() -> Self { Self::new() }
723}
724
725impl ProcessorConfig {
726    /// After each failure the [`ProcessorConfig`] will pause a bit before
727    /// trying again. Defaults to 5 seconds.
728    pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
729        Self {
730            wait_after_fail,
731            ..self
732        }
733    }
734
735    /// Set the stop signal for the processor. This accepts a future which will
736    /// be polled and if the future yields ready the `process_events` method
737    /// will terminate.
738    ///
739    /// An example of such a future would be the `Receiver` end of a oneshot
740    /// channel.
741    pub fn set_stop_signal(self, stop: impl std::future::Future<Output = ()> + 'static) -> Self {
742        Self {
743            stop: Box::pin(stop),
744            ..self
745        }
746    }
747
748    /// Construct a new [`ProcessorConfig`] that will retry the given number of
749    /// times. The default wait after a failure is 5 seconds.
750    pub fn new() -> Self {
751        Self {
752            wait_after_fail: std::time::Duration::from_secs(5),
753            stop:            Box::pin(std::future::pending()),
754        }
755    }
756
757    /// Process events that are coming in on the provided channel.
758    ///
759    /// This handler will only terminate in the case of
760    ///
761    /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
762    /// - the sender part of the `events` channel has been dropped
763    /// - the [`ProcessorConfig`] was configured with a termination signal that
764    ///   was triggered.
765    ///
766    /// The function will log progress using the `tracing` library with the
767    /// target set to `ccd_event_processor`.
768    pub async fn process_events<P: ProcessEvent>(
769        mut self,
770        mut process: P,
771        mut events: tokio::sync::mpsc::Receiver<P::Data>,
772    ) {
773        while let Some(event) = tokio::select! {
774            biased;
775            _ = &mut self.stop => None,
776            r = events.recv() => r,
777        } {
778            let mut try_number: u32 = 0;
779            'outer: loop {
780                let start = tokio::time::Instant::now();
781                let response = process.process(&event).await;
782                let end = tokio::time::Instant::now();
783                let duration = end.duration_since(start).as_millis();
784                match response {
785                    Ok(descr) => {
786                        tracing::info!(
787                            target: "ccd_event_processor",
788                            "{descr} in {duration}ms."
789                        );
790                        break 'outer;
791                    }
792                    Err(e) => {
793                        tracing::error!(
794                            target: "ccd_event_processor",
795                            "Failed to process event: {e}. Took {duration}ms to fail."
796                        );
797                        tracing::info!(
798                            target: "ccd_event_processor",
799                            "Retrying in {}ms.",
800                            self.wait_after_fail.as_millis()
801                        );
802                        // Wait before calling on_failure with the idea that whatever caused the
803                        // failure is more likely to be fixed if we try
804                        // after a bit of time rather than immediately.
805                        tokio::time::sleep(self.wait_after_fail).await;
806                        match process.on_failure(e, try_number + 1).await {
807                            Ok(true) => {
808                                // do nothing, continue.
809                            }
810                            Ok(false) => return,
811                            Err(e) => {
812                                tracing::warn!("Failed to restart: {e}.");
813                            }
814                        }
815                        try_number += 1;
816                    }
817                }
818            }
819        }
820        tracing::info!(
821            target: "ccd_event_processor",
822            "Terminating process_events due to channel closing."
823        );
824    }
825}
826
827/// Given a configuration for traversing the chain and processing generated
828/// events start a process to traverse the chain and index events.
829///
830/// This process will only stop when the `stop_signal` future completes, when
831/// [`traverse`](TraverseConfig::traverse) completes, or when
832/// [`process_events`](ProcessorConfig::process_events) completes.
833pub async fn traverse_and_process<I: Indexer, P: ProcessEvent<Data = I::Data>>(
834    config: TraverseConfig,
835    i: I,
836    processor: ProcessorConfig,
837    p: P,
838) -> Result<(), QueryError> {
839    let (sender, receiver) = tokio::sync::mpsc::channel(10);
840    let fut1 = config.traverse(i, sender);
841    let fut2 = processor.process_events(p, receiver);
842    let (r1, ()) = futures::join!(fut1, fut2);
843    r1
844}