concordium_rust_sdk/
indexer.rs

1//! Support for writing indexers using the Rust SDK.
2//!
3//! The main indexer entrypoint is the [`TraverseConfig::traverse`] method
4//! which will start chain traversal, calling methods of the [`Indexer`] trait
5//! for each finalized block it discovers.
6use crate::{
7    types::{
8        execution_tree, queries::BlockInfo, AccountTransactionEffects, BlockItemSummary,
9        BlockItemSummaryDetails, ExecutionTree, SpecialTransactionOutcome,
10    },
11    v2::{self, FinalizedBlockInfo, QueryError, QueryResult},
12};
13use concordium_base::{
14    base::{AbsoluteBlockHeight, Energy},
15    contracts_common::{AccountAddress, Amount, ContractAddress, OwnedEntrypointName},
16    hashes::TransactionHash,
17    smart_contracts::OwnedReceiveName,
18};
19use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt as _};
20use std::{
21    collections::{BTreeMap, BTreeSet},
22    time::Duration,
23};
24use tokio::time::error::Elapsed;
25pub use tonic::async_trait;
26
27/// Configuration for an indexer.
28pub struct TraverseConfig {
29    endpoints:       Vec<v2::Endpoint>,
30    max_parallel:    usize,
31    max_behind:      std::time::Duration,
32    wait_after_fail: std::time::Duration,
33    start_height:    AbsoluteBlockHeight,
34}
35
36#[derive(Debug, thiserror::Error)]
37/// An error encountered during chain traversal and passed to the
38/// [`Indexer`].
39pub enum TraverseError {
40    #[error("Failed to connect: {0}")]
41    Connect(#[from] tonic::transport::Error),
42    #[error("Failed to query: {0}")]
43    Query(#[from] QueryError),
44    #[error("Timed out waiting for finalized blocks.")]
45    Elapsed(#[from] Elapsed),
46}
47
48#[async_trait]
49/// A trait intended to be implemented by indexers that traverse the chain and
50/// extract information of interest to store for efficient retrieval.
51///
52/// The main method of this trait is [`on_finalized`](Indexer::on_finalized)
53/// which will be called by the [`traverse`](TraverseConfig::traverse) method
54/// for each finalized block. The other two methods are meant for signalling and
55/// bookkeeping.
56///
57/// Note that this trait has `async` methods, which is why the type signatures
58/// are daunting. The intended way of implementing it is to use the
59/// `async_trait` macro like so
60///
61/// ```
62/// # use concordium_rust_sdk::{indexer::*, v2::{self, FinalizedBlockInfo, QueryResult}};
63/// use concordium_rust_sdk::indexer::async_trait;
64/// # struct MyIndexer;
65/// #[async_trait]
66/// impl Indexer for MyIndexer {
67///     type Context = ();
68///     type Data = ();
69///
70///     async fn on_connect<'a>(
71///         &mut self,
72///         endpoint: v2::Endpoint,
73///         client: &'a mut v2::Client,
74///     ) -> QueryResult<Self::Context> {
75///         unimplemented!("Implement me.")
76///     }
77///
78///     async fn on_finalized<'a>(
79///         &self,
80///         client: v2::Client,
81///         ctx: &'a Self::Context,
82///         fbi: FinalizedBlockInfo,
83///     ) -> QueryResult<Self::Data> {
84///         unimplemented!("Implement me.")
85///     }
86///
87///     async fn on_failure(
88///         &mut self,
89///         ep: v2::Endpoint,
90///         successive_failures: u64,
91///         err: TraverseError,
92///     ) -> bool {
93///         unimplemented!("Implement me.")
94///     }
95/// }
96/// ```
97pub trait Indexer {
98    /// The data that is retrieved upon connecting to the endpoint and supplied
99    /// to each call of [`on_finalized`](Self::on_finalized).
100    type Context: Send + Sync;
101    /// The data returned by the [`on_finalized`](Self::on_finalized) call.
102    type Data: Send + Sync;
103
104    /// Called when a new connection is established to the given endpoint.
105    /// The return value from this method is passed to each call of
106    /// [`on_finalized`](Self::on_finalized).
107    async fn on_connect<'a>(
108        &mut self,
109        endpoint: v2::Endpoint,
110        client: &'a mut v2::Client,
111    ) -> QueryResult<Self::Context>;
112
113    /// The main method of this trait. It is called for each finalized block
114    /// that the indexer discovers. Note that the indexer might call this
115    /// concurrently for multiple blocks at the same time to speed up indexing.
116    ///
117    /// This method is meant to return errors that are unexpected, and if it
118    /// does return an error the indexer will attempt to reconnect to the
119    /// next endpoint.
120    async fn on_finalized<'a>(
121        &self,
122        client: v2::Client,
123        ctx: &'a Self::Context,
124        fbi: FinalizedBlockInfo,
125    ) -> QueryResult<Self::Data>;
126
127    /// Called when either connecting to the node or querying the node fails.
128    /// The number of successive failures without progress is passed to the
129    /// method which should return whether to stop indexing ([`true`]) or not
130    /// ([`false`]).
131    async fn on_failure(
132        &mut self,
133        endpoint: v2::Endpoint,
134        successive_failures: u64,
135        err: TraverseError,
136    ) -> bool;
137}
138
139impl TraverseConfig {
140    /// A configuration with a single endpoint starting at the given block
141    /// height.
142    pub fn new_single(endpoint: v2::Endpoint, start_height: AbsoluteBlockHeight) -> Self {
143        Self {
144            endpoints: vec![endpoint],
145            max_parallel: 4,
146            max_behind: Duration::from_secs(60),
147            wait_after_fail: Duration::from_secs(1),
148            start_height,
149        }
150    }
151
152    /// A configuration with a given list of endpoints and starting height.
153    /// Returns [`None`] if the list of endpoints is empty.
154    pub fn new(endpoints: Vec<v2::Endpoint>, start_height: AbsoluteBlockHeight) -> Option<Self> {
155        if endpoints.is_empty() {
156            return None;
157        }
158        Some(Self {
159            endpoints,
160            max_parallel: 4,
161            max_behind: Duration::from_secs(60),
162            wait_after_fail: Duration::from_secs(1),
163            start_height,
164        })
165    }
166
167    /// Set the maximum number of time the last finalized block of the node can
168    /// be behind before it is deemed too far behind, and another node is
169    /// tried.
170    ///
171    /// The default value is 60 seconds.
172    pub fn set_max_behind(self, max_behind: Duration) -> Self { Self { max_behind, ..self } }
173
174    /// After each failure the indexer will pause a bit before trying another
175    /// node to avoid overloading the node. Defaults to 1 second.
176    pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
177        Self {
178            wait_after_fail,
179            ..self
180        }
181    }
182
183    /// Add an additional endpoint to the list of endpoints the indexer will
184    /// use. This is added to the end of the list, so the endpoint is only tried
185    /// in case of failure of previous queries.
186    pub fn push_endpoint(mut self, endpoint: v2::Endpoint) -> Self {
187        self.endpoints.push(endpoint);
188        self
189    }
190
191    /// Set the maximum number of blocks that will be queried in parallel, if
192    /// they are available. Defaults to 4 if not set explicitly.
193    pub fn set_max_parallel(self, max_parallel: usize) -> Self {
194        Self {
195            max_parallel,
196            ..self
197        }
198    }
199
200    /// Traverse the chain according to the supplied configuration, invoking
201    /// [`on_finalized`](Indexer::on_finalized) for each finalized block.
202    ///
203    /// Multiple [`on_finalized`](Indexer::on_finalized) calls might be executed
204    /// concurrently, but their responses will be written to the provided
205    /// [`tokio::sync::mpsc::Sender`] in the increasing order of block height,
206    /// with no gaps.
207    ///
208    /// If a query fails, either due to timeout or node not being available the
209    /// indexer will attempt the next endpoint it is configured with.
210    ///
211    /// For robust indexing it is crucial that the supplied
212    /// [`Endpoints`](v2::Endpoint) are configured with timeouts so that the
213    /// indexer may make progress.
214    ///
215    /// The [`traverse`](Self::traverse) method will return either when
216    /// signalled so by the [`on_failure`](Indexer::on_failure) method or
217    /// when the receiver part of the supplied [`tokio::sync::mpsc::Sender`]
218    /// is closed. Typically this method should run in a task spawned via
219    /// [`tokio::spawn`].
220    pub async fn traverse<I: Indexer>(
221        self,
222        mut indexer: I,
223        sender: tokio::sync::mpsc::Sender<I::Data>,
224    ) -> QueryResult<()> {
225        let TraverseConfig {
226            endpoints,
227            max_parallel,
228            max_behind,
229            wait_after_fail,
230            start_height: mut height,
231        } = self;
232        let mut successive_failures: u64 = 0;
233        for node_ep in endpoints.into_iter().cycle() {
234            if sender.is_closed() {
235                return Ok(());
236            }
237            if successive_failures > 0 {
238                tokio::time::sleep(wait_after_fail).await
239            }
240            let mut node = match v2::Client::new(node_ep.clone()).await {
241                Ok(v) => v,
242                Err(e) => {
243                    successive_failures += 1;
244                    let should_stop = indexer
245                        .on_failure(node_ep, successive_failures, e.into())
246                        .await;
247                    if should_stop {
248                        return Ok(());
249                    } else {
250                        continue;
251                    }
252                }
253            };
254
255            let context = match indexer.on_connect(node_ep.clone(), &mut node).await {
256                Ok(a) => a,
257                Err(e) => {
258                    successive_failures += 1;
259                    let should_stop = indexer
260                        .on_failure(node_ep, successive_failures, e.into())
261                        .await;
262                    if should_stop {
263                        return Ok(());
264                    } else {
265                        continue;
266                    }
267                }
268            };
269            let mut finalized_blocks = match node.get_finalized_blocks_from(height).await {
270                Ok(v) => v,
271                Err(e) => {
272                    successive_failures += 1;
273                    let should_stop = indexer
274                        .on_failure(node_ep, successive_failures, e.into())
275                        .await;
276                    if should_stop {
277                        return Ok(());
278                    } else {
279                        continue;
280                    }
281                }
282            };
283
284            let mut preprocessors = FuturesOrdered::new();
285            let mut finalized_blocks_error = false;
286            'node_loop: loop {
287                tokio::select! {
288                    biased;
289                    // First check if anything is done preprocessing.
290                    // If there is nothing being preprocessing this branch is disabled until next loop.
291                    Some(data) = preprocessors.next() => {
292                        let data = match data {
293                            Ok(v) => v,
294                            Err(e) => {
295                                drop(preprocessors);
296                                successive_failures += 1;
297                                let should_stop = indexer
298                                    .on_failure(node_ep, successive_failures, TraverseError::Query(e))
299                                    .await;
300                                if should_stop {
301                                    return Ok(());
302                                } else {
303                                    break 'node_loop;
304                                }
305                            }
306                        };
307                        if sender.send(data).await.is_err() {
308                            // the listener ended the stream, meaning we should stop.
309                            return Ok(());
310                        }
311                        height = height.next();
312                        successive_failures = 0;
313                        if finalized_blocks_error {
314                            // we have processed the blocks we can, but further queries on the same stream
315                            // will fail since the stream signalled an error.
316                            break 'node_loop;
317                        }
318                    },
319                    // If there is nothing immediately ready from preprocessing, we check if we have
320                    // available slots for preprocessors and fill them.
321                    // If no space this branch gets disabled until next loop.
322                    _ = async {}, if preprocessors.len() < max_parallel => {
323                        let space = max_parallel - preprocessors.len();
324                        match finalized_blocks
325                            .next_chunk_timeout(space, max_behind)
326                            .await
327                        {
328                            Ok((has_error, chunks)) => {
329                                finalized_blocks_error = has_error;
330                                for fb in chunks {
331                                    preprocessors.push_back(indexer.on_finalized(node.clone(), &context, fb));
332                                }
333                            },
334                            Err(e) => {
335                                drop(preprocessors);
336                                successive_failures += 1;
337                                let should_stop = indexer
338                                    .on_failure(node_ep, successive_failures, TraverseError::Elapsed(e))
339                                    .await;
340                                if should_stop {
341                                    return Ok(());
342                                } else {
343                                    break 'node_loop;
344                                }
345                            }
346                        };
347
348                    }
349                };
350            }
351        }
352        Ok(()) // unreachable
353    }
354}
355
356/// An indexer that retrieves all transaction outcomes.
357///
358/// The [`on_connect`](Indexer::on_connect) and
359/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
360/// log the events on `info` and `warn` levels, respectively, using the
361/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
362/// of the log is `ccd_indexer` which may be used to filter the logs.
363pub struct TransactionIndexer;
364
365#[async_trait]
366impl Indexer for TransactionIndexer {
367    type Context = ();
368    type Data = (BlockInfo, Vec<BlockItemSummary>);
369
370    async fn on_connect<'a>(
371        &mut self,
372        endpoint: v2::Endpoint,
373        _client: &'a mut v2::Client,
374    ) -> QueryResult<()> {
375        tracing::info!(
376            target: "ccd_indexer",
377            "Connected to endpoint {}.",
378            endpoint.uri()
379        );
380        Ok(())
381    }
382
383    async fn on_finalized<'a>(
384        &self,
385        mut client: v2::Client,
386        _ctx: &'a (),
387        fbi: FinalizedBlockInfo,
388    ) -> QueryResult<Self::Data> {
389        let bi = client.get_block_info(fbi.height).await?.response;
390        if bi.transaction_count != 0 {
391            let summary = client
392                .get_block_transaction_events(fbi.height)
393                .await?
394                .response
395                .try_collect::<Vec<_>>()
396                .await?;
397            Ok((bi, summary))
398        } else {
399            Ok((bi, Vec::new()))
400        }
401    }
402
403    async fn on_failure(
404        &mut self,
405        endpoint: v2::Endpoint,
406        successive_failures: u64,
407        err: TraverseError,
408    ) -> bool {
409        tracing::warn!(
410            target: "ccd_indexer",
411            successive_failures,
412            "Failed when querying endpoint {}: {err}",
413            endpoint.uri()
414        );
415        false
416    }
417}
418
419/// An indexer that retrieves smart contract updates where the specific
420/// entrypoint of a contract was triggered as the top-level entrypoint.
421///
422/// The [`on_connect`](Indexer::on_connect) and
423/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
424/// log the events on `info` and `warn` levels, respectively, using the
425/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
426/// of the log is `ccd_indexer` which may be used to filter the logs.
427pub struct ContractUpdateIndexer {
428    pub target_address: ContractAddress,
429    pub entrypoint:     OwnedEntrypointName,
430}
431
432pub struct ContractUpdateInfo {
433    /// The execution tree generated by the call.
434    pub execution_tree:   ExecutionTree,
435    /// The amount of energy charged for the transaction.
436    pub energy_cost:      Energy,
437    /// The cost, in CCD, of the transaction.
438    pub cost:             Amount,
439    /// The hash of the transaction from which this update stems.
440    pub transaction_hash: TransactionHash,
441    /// The sender of the transaction.
442    pub sender:           AccountAddress,
443}
444
445fn update_info(summary: BlockItemSummary) -> Option<ContractUpdateInfo> {
446    let BlockItemSummaryDetails::AccountTransaction(at) = summary.details else {
447        return None;
448    };
449
450    let AccountTransactionEffects::ContractUpdateIssued { effects } = at.effects else {
451        return None;
452    };
453
454    Some(ContractUpdateInfo {
455        execution_tree:   execution_tree(effects)?,
456        energy_cost:      summary.energy_cost,
457        cost:             at.cost,
458        transaction_hash: summary.hash,
459        sender:           at.sender,
460    })
461}
462
463#[async_trait]
464impl Indexer for ContractUpdateIndexer {
465    type Context = ();
466    type Data = (BlockInfo, Vec<ContractUpdateInfo>);
467
468    async fn on_connect<'a>(
469        &mut self,
470        endpoint: v2::Endpoint,
471        _client: &'a mut v2::Client,
472    ) -> QueryResult<()> {
473        tracing::info!(
474            target: "ccd_indexer",
475            "Connected to endpoint {}.",
476            endpoint.uri()
477        );
478        Ok(())
479    }
480
481    async fn on_finalized<'a>(
482        &self,
483        mut client: v2::Client,
484        _ctx: &'a (),
485        fbi: FinalizedBlockInfo,
486    ) -> QueryResult<Self::Data> {
487        let bi = client.get_block_info(fbi.height).await?.response;
488        if bi.transaction_count != 0 {
489            let summary = client
490                .get_block_transaction_events(fbi.height)
491                .await?
492                .response
493                .try_filter_map(|summary| async move {
494                    let Some(info) = update_info(summary) else {
495                        return Ok(None);
496                    };
497                    if info.execution_tree.address() == self.target_address
498                        && info.execution_tree.entrypoint() == self.entrypoint.as_entrypoint_name()
499                    {
500                        Ok(Some(info))
501                    } else {
502                        Ok(None)
503                    }
504                })
505                .try_collect::<Vec<_>>()
506                .await?;
507            Ok((bi, summary))
508        } else {
509            Ok((bi, Vec::new()))
510        }
511    }
512
513    async fn on_failure(
514        &mut self,
515        endpoint: v2::Endpoint,
516        successive_failures: u64,
517        err: TraverseError,
518    ) -> bool {
519        tracing::warn!(
520            target: "ccd_indexer",
521            successive_failures,
522            "Failed when querying endpoint {}: {err}",
523            endpoint.uri()
524        );
525        false
526    }
527}
528
529/// An indexer that retrieves smart contract updates where the specific
530/// contracts were affected. The configuration can choose to require any or all
531/// to be updated.
532///
533/// The [`on_connect`](Indexer::on_connect) and
534/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
535/// log the events on `info` and `warn` levels, respectively, using the
536/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
537/// of the log is `ccd_indexer` which may be used to filter the logs.
538pub struct AffectedContractIndexer {
539    pub addresses: BTreeSet<ContractAddress>,
540    /// Require all contract addreseses in the `addresses` set to be updated.
541    pub all:       bool,
542}
543
544#[async_trait]
545impl Indexer for AffectedContractIndexer {
546    type Context = ();
547    type Data = (
548        BlockInfo,
549        Vec<(
550            ContractUpdateInfo,
551            BTreeMap<ContractAddress, BTreeSet<OwnedReceiveName>>,
552        )>,
553    );
554
555    async fn on_connect<'a>(
556        &mut self,
557        endpoint: v2::Endpoint,
558        _client: &'a mut v2::Client,
559    ) -> QueryResult<()> {
560        tracing::info!(
561            target: "ccd_indexer",
562            "Connected to endpoint {}.",
563            endpoint.uri()
564        );
565        Ok(())
566    }
567
568    async fn on_finalized<'a>(
569        &self,
570        mut client: v2::Client,
571        _ctx: &'a (),
572        fbi: FinalizedBlockInfo,
573    ) -> QueryResult<Self::Data> {
574        let bi = client.get_block_info(fbi.height).await?.response;
575        if bi.transaction_count != 0 {
576            let summary = client
577                .get_block_transaction_events(fbi.height)
578                .await?
579                .response
580                .try_filter_map(|summary| async move {
581                    let Some(info) = update_info(summary) else {
582                        return Ok(None);
583                    };
584                    let affected_addresses = info.execution_tree.affected_addresses();
585                    if (self.all
586                        && self
587                            .addresses
588                            .iter()
589                            .all(|addr| affected_addresses.contains_key(addr)))
590                        || self
591                            .addresses
592                            .iter()
593                            .any(|addr| affected_addresses.contains_key(addr))
594                    {
595                        Ok(Some((info, affected_addresses)))
596                    } else {
597                        Ok(None)
598                    }
599                })
600                .try_collect::<Vec<_>>()
601                .await?;
602            Ok((bi, summary))
603        } else {
604            Ok((bi, Vec::new()))
605        }
606    }
607
608    async fn on_failure(
609        &mut self,
610        endpoint: v2::Endpoint,
611        successive_failures: u64,
612        err: TraverseError,
613    ) -> bool {
614        tracing::warn!(
615            target: "ccd_indexer",
616            successive_failures,
617            "Failed when querying endpoint {}: {err}",
618            endpoint.uri()
619        );
620        false
621    }
622}
623
624/// An indexer that retrieves all events in a block, transaction outcomes
625/// and special transaction outcomes.
626///
627/// The [`on_connect`](Indexer::on_connect) and
628/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
629/// log the events on `info` and `warn` levels, respectively, using the
630/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
631/// of the log is `ccd_indexer` which may be used to filter the logs.
632pub struct BlockEventsIndexer;
633
634#[async_trait]
635impl Indexer for BlockEventsIndexer {
636    type Context = ();
637    type Data = (
638        BlockInfo,
639        Vec<BlockItemSummary>,
640        Vec<SpecialTransactionOutcome>,
641    );
642
643    async fn on_connect<'a>(
644        &mut self,
645        endpoint: v2::Endpoint,
646        client: &'a mut v2::Client,
647    ) -> QueryResult<()> {
648        TransactionIndexer.on_connect(endpoint, client).await
649    }
650
651    async fn on_finalized<'a>(
652        &self,
653        client: v2::Client,
654        ctx: &'a (),
655        fbi: FinalizedBlockInfo,
656    ) -> QueryResult<Self::Data> {
657        let mut special_client = client.clone();
658        let special = async move {
659            let events = special_client
660                .get_block_special_events(fbi.height)
661                .await?
662                .response
663                .try_collect()
664                .await?;
665            Ok(events)
666        };
667        let ((bi, summary), special) =
668            futures::try_join!(TransactionIndexer.on_finalized(client, ctx, fbi), special)?;
669        Ok((bi, summary, special))
670    }
671
672    async fn on_failure(
673        &mut self,
674        endpoint: v2::Endpoint,
675        successive_failures: u64,
676        err: TraverseError,
677    ) -> bool {
678        TransactionIndexer
679            .on_failure(endpoint, successive_failures, err)
680            .await
681    }
682}
683
684#[async_trait]
685/// Handle an individual event. This trait is designed to be used together with
686/// the [`ProcessorConfig`]. These two together are designed to ease the work of
687/// writing the part of indexers where data is to be stored in a database.
688pub trait ProcessEvent {
689    /// The type of events that are to be processed. Typically this will be all
690    /// of the transactions of interest for a single block.
691    type Data;
692    /// An error that can be signalled.
693    type Error: std::fmt::Display + std::fmt::Debug;
694    /// A description returned by the [`process`](ProcessEvent::process) method.
695    /// This message is logged by the [`ProcessorConfig`] and is intended to
696    /// describe the data that was just processed.
697    type Description: std::fmt::Display;
698
699    /// Process a single item. This should work atomically in the sense that
700    /// either the entire `data` is processed or none of it is in case of an
701    /// error. This property is relied upon by the [`ProcessorConfig`] to retry
702    /// failed attempts.
703    async fn process(&mut self, data: &Self::Data) -> Result<Self::Description, Self::Error>;
704
705    /// The `on_failure` method is invoked by the [`ProcessorConfig`] when it
706    /// fails to process an event. It is meant to retry to recreate the
707    /// resources, such as a database connection, that might have been
708    /// dropped. The return value should signal if the handler process
709    /// should continue (`true`) or not.
710    ///
711    /// The function takes the `error` that occurred at the latest
712    /// [`process`](Self::process) call that just failed, and the number of
713    /// attempts of calling `process` that failed.
714    async fn on_failure(
715        &mut self,
716        error: Self::Error,
717        failed_attempts: u32,
718    ) -> Result<bool, Self::Error>;
719}
720
721pub struct ProcessorConfig {
722    /// The amount of time to wait after a failure to process an event.
723    wait_after_fail: std::time::Duration,
724    /// A future to be signalled to stop processing.
725    stop:            std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
726}
727
728/// The default implementation behaves the same as
729/// [`ProcessorConfig::new`](ProcessorConfig::new).
730impl Default for ProcessorConfig {
731    fn default() -> Self { Self::new() }
732}
733
734impl ProcessorConfig {
735    /// After each failure the [`ProcessorConfig`] will pause a bit before
736    /// trying again. Defaults to 5 seconds.
737    pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
738        Self {
739            wait_after_fail,
740            ..self
741        }
742    }
743
744    /// Set the stop signal for the processor. This accepts a future which will
745    /// be polled and if the future yields ready the `process_events` method
746    /// will terminate.
747    ///
748    /// An example of such a future would be the `Receiver` end of a oneshot
749    /// channel.
750    pub fn set_stop_signal(
751        self,
752        stop: impl std::future::Future<Output = ()> + Send + 'static,
753    ) -> Self {
754        Self {
755            stop: Box::pin(stop),
756            ..self
757        }
758    }
759
760    /// Construct a new [`ProcessorConfig`] that will retry the given number of
761    /// times. The default wait after a failure is 5 seconds.
762    pub fn new() -> Self {
763        Self {
764            wait_after_fail: std::time::Duration::from_secs(5),
765            stop:            Box::pin(std::future::pending()),
766        }
767    }
768
769    /// Process events that are coming in on the provided channel.
770    ///
771    /// This handler will only terminate in the case of
772    ///
773    /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
774    /// - the sender part of the `events` channel has been dropped
775    /// - the [`ProcessorConfig`] was configured with a termination signal that
776    ///   was triggered.
777    ///
778    /// The function will log progress using the `tracing` library with the
779    /// target set to `ccd_event_processor`.
780    pub async fn process_events<P: ProcessEvent>(
781        self,
782        process: P,
783        events: tokio::sync::mpsc::Receiver<P::Data>,
784    ) {
785        let stream = tokio_stream::wrappers::ReceiverStream::new(events);
786        self.process_event_stream(process, stream).await
787    }
788
789    /// Process events that are coming in on the provided stream.
790    ///
791    /// This handler will only terminate in the case of
792    ///
793    /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
794    /// - the sender part of the `events` channel has been dropped
795    /// - the [`ProcessorConfig`] was configured with a termination signal that
796    ///   was triggered.
797    ///
798    /// The function will log progress using the `tracing` library with the
799    /// target set to `ccd_event_processor`.
800    pub async fn process_event_stream<P, E>(mut self, mut process: P, mut events: E)
801    where
802        P: ProcessEvent,
803        E: futures::Stream<Item = P::Data> + Unpin, {
804        while let Some(event) = tokio::select! {
805            biased;
806            _ = &mut self.stop => None,
807            r = events.next() => r,
808        } {
809            let mut try_number: u32 = 0;
810            'outer: loop {
811                let start = tokio::time::Instant::now();
812                let response = process.process(&event).await;
813                let end = tokio::time::Instant::now();
814                let duration = end.duration_since(start).as_millis();
815                match response {
816                    Ok(descr) => {
817                        tracing::info!(
818                            target: "ccd_event_processor",
819                            "{descr} in {duration}ms."
820                        );
821                        break 'outer;
822                    }
823                    Err(e) => {
824                        tracing::error!(
825                            target: "ccd_event_processor",
826                            "Failed to process event: {e}. Took {duration}ms to fail."
827                        );
828                        tracing::info!(
829                            target: "ccd_event_processor",
830                            "Retrying in {}ms.",
831                            self.wait_after_fail.as_millis()
832                        );
833                        // Wait before calling on_failure with the idea that whatever caused the
834                        // failure is more likely to be fixed if we try
835                        // after a bit of time rather than immediately.
836                        tokio::select! {
837                            biased;
838                            _ = &mut self.stop => {break 'outer},
839                            _ = tokio::time::sleep(self.wait_after_fail) => {}
840                        }
841                        match process.on_failure(e, try_number + 1).await {
842                            Ok(true) => {
843                                // do nothing, continue.
844                            }
845                            Ok(false) => return,
846                            Err(e) => {
847                                tracing::warn!("Failed to restart: {e}.");
848                            }
849                        }
850                        try_number += 1;
851                    }
852                }
853            }
854        }
855        tracing::info!(
856            target: "ccd_event_processor",
857            "Terminating process_events due to channel closing."
858        );
859    }
860}
861
862/// Given a configuration for traversing the chain and processing generated
863/// events start a process to traverse the chain and index events.
864///
865/// This process will only stop when the `stop_signal` future completes, when
866/// [`traverse`](TraverseConfig::traverse) completes, or when
867/// [`process_events`](ProcessorConfig::process_events) completes.
868pub async fn traverse_and_process<I: Indexer, P: ProcessEvent<Data = I::Data>>(
869    config: TraverseConfig,
870    i: I,
871    processor: ProcessorConfig,
872    p: P,
873) -> Result<(), QueryError> {
874    let (sender, receiver) = tokio::sync::mpsc::channel(10);
875    let fut1 = config.traverse(i, sender);
876    let fut2 = processor.process_events(p, receiver);
877    let (r1, ()) = futures::join!(fut1, fut2);
878    r1
879}