concordium_rust_sdk/
indexer.rs

1//! Support for writing indexers using the Rust SDK.
2//!
3//! The main indexer entrypoint is the [`TraverseConfig::traverse`] method
4//! which will start chain traversal, calling methods of the [`Indexer`] trait
5//! for each finalized block it discovers.
6use crate::{
7    types::{
8        execution_tree, queries::BlockInfo, AccountTransactionEffects, BlockItemSummary,
9        ExecutionTree, SpecialTransactionOutcome,
10    },
11    v2::{self, upward::UnknownDataError, FinalizedBlockInfo, QueryError, QueryResult, Upward},
12};
13use concordium_base::{
14    base::{AbsoluteBlockHeight, Energy},
15    contracts_common::{AccountAddress, Amount, ContractAddress, OwnedEntrypointName},
16    hashes::TransactionHash,
17    smart_contracts::OwnedReceiveName,
18};
19use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt as _};
20use std::{
21    collections::{BTreeMap, BTreeSet},
22    time::Duration,
23};
24use tokio::time::error::Elapsed;
25pub use tonic::async_trait;
26
27/// Configuration for an indexer.
28pub struct TraverseConfig {
29    endpoints: Vec<v2::Endpoint>,
30    max_parallel: usize,
31    max_behind: std::time::Duration,
32    wait_after_fail: std::time::Duration,
33    start_height: AbsoluteBlockHeight,
34}
35
36#[derive(Debug, thiserror::Error)]
37/// An error encountered during chain traversal and passed to the
38/// [`Indexer`].
39pub enum TraverseError {
40    #[error("Failed to connect: {0}")]
41    Connect(#[from] tonic::transport::Error),
42    #[error("Failed to query: {0}")]
43    Query(#[from] QueryError),
44    #[error("Timed out waiting for finalized blocks.")]
45    Elapsed(#[from] Elapsed),
46    #[error("UnknownDataError occurred: {0}")]
47    UnknownDataError(#[from] UnknownDataError),
48    #[error("Other error occurred: ${0}")]
49    OtherError(#[from] anyhow::Error),
50}
51
52impl From<OnFinalizationError> for TraverseError {
53    fn from(e: OnFinalizationError) -> Self {
54        match e {
55            OnFinalizationError::Query(query_error) => query_error.into(),
56            OnFinalizationError::UnknownDataError(err) => TraverseError::UnknownDataError(err),
57            OnFinalizationError::OtherError(err) => TraverseError::OtherError(err),
58        }
59    }
60}
61
62#[derive(Debug, thiserror::Error)]
63/// An error encountered during processing of a finalized block
64/// and passed to the [`Indexer`].
65pub enum OnFinalizationError {
66    #[error("Failed to query: {0}")]
67    Query(#[from] QueryError),
68    #[error("UnknownDataError occurred: ${0}")]
69    UnknownDataError(#[from] UnknownDataError),
70    #[error("Other error occurred: ${0}")]
71    OtherError(#[from] anyhow::Error),
72}
73
74impl From<tonic::Status> for OnFinalizationError {
75    fn from(s: tonic::Status) -> Self {
76        Self::Query(s.into())
77    }
78}
79
80pub type OnFinalizationResult<A> = Result<A, OnFinalizationError>;
81
82pub type TraverseResult<A> = OnFinalizationResult<A>;
83
84#[async_trait]
85/// A trait intended to be implemented by indexers that traverse the chain and
86/// extract information of interest to store for efficient retrieval.
87///
88/// The main method of this trait is [`on_finalized`](Indexer::on_finalized)
89/// which will be called by the [`traverse`](TraverseConfig::traverse) method
90/// for each finalized block. The other two methods are meant for signalling and
91/// bookkeeping.
92///
93/// Note that this trait has `async` methods, which is why the type signatures
94/// are daunting. The intended way of implementing it is to use the
95/// `async_trait` macro like so
96///
97/// ```
98/// # use concordium_rust_sdk::{indexer::*, v2::{self, FinalizedBlockInfo, QueryResult}};
99/// use concordium_rust_sdk::indexer::async_trait;
100/// # struct MyIndexer;
101/// #[async_trait]
102/// impl Indexer for MyIndexer {
103///     type Context = ();
104///     type Data = ();
105///
106///     async fn on_connect<'a>(
107///         &mut self,
108///         endpoint: v2::Endpoint,
109///         client: &'a mut v2::Client,
110///     ) -> QueryResult<Self::Context> {
111///         unimplemented!("Implement me.")
112///     }
113///
114///     async fn on_finalized<'a>(
115///         &self,
116///         client: v2::Client,
117///         ctx: &'a Self::Context,
118///         fbi: FinalizedBlockInfo,
119///     ) -> OnFinalizationResult<Self::Data> {
120///         unimplemented!("Implement me.")
121///     }
122///
123///     async fn on_failure(
124///         &mut self,
125///         ep: v2::Endpoint,
126///         successive_failures: u64,
127///         err: TraverseError,
128///     ) -> bool {
129///         unimplemented!("Implement me.")
130///     }
131/// }
132/// ```
133pub trait Indexer {
134    /// The data that is retrieved upon connecting to the endpoint and supplied
135    /// to each call of [`on_finalized`](Self::on_finalized).
136    type Context: Send + Sync;
137    /// The data returned by the [`on_finalized`](Self::on_finalized) call.
138    type Data: Send + Sync;
139
140    /// Called when a new connection is established to the given endpoint.
141    /// The return value from this method is passed to each call of
142    /// [`on_finalized`](Self::on_finalized).
143    async fn on_connect<'a>(
144        &mut self,
145        endpoint: v2::Endpoint,
146        client: &'a mut v2::Client,
147    ) -> QueryResult<Self::Context>;
148
149    /// The main method of this trait. It is called for each finalized block
150    /// that the indexer discovers. Note that the indexer might call this
151    /// concurrently for multiple blocks at the same time to speed up indexing.
152    ///
153    /// This method is meant to return errors that are unexpected, and if it
154    /// does return an error the indexer will attempt to reconnect to the
155    /// next endpoint.
156    async fn on_finalized<'a>(
157        &self,
158        client: v2::Client,
159        ctx: &'a Self::Context,
160        fbi: FinalizedBlockInfo,
161    ) -> OnFinalizationResult<Self::Data>;
162
163    /// Called when either connecting to the node or querying the node fails.
164    /// The number of successive failures without progress is passed to the
165    /// method which should return whether to stop indexing ([`true`]) or not
166    /// ([`false`]).
167    async fn on_failure(
168        &mut self,
169        endpoint: v2::Endpoint,
170        successive_failures: u64,
171        err: TraverseError,
172    ) -> bool;
173}
174
175impl TraverseConfig {
176    /// A configuration with a single endpoint starting at the given block
177    /// height.
178    pub fn new_single(endpoint: v2::Endpoint, start_height: AbsoluteBlockHeight) -> Self {
179        Self {
180            endpoints: vec![endpoint],
181            max_parallel: 4,
182            max_behind: Duration::from_secs(60),
183            wait_after_fail: Duration::from_secs(1),
184            start_height,
185        }
186    }
187
188    /// A configuration with a given list of endpoints and starting height.
189    /// Returns [`None`] if the list of endpoints is empty.
190    pub fn new(endpoints: Vec<v2::Endpoint>, start_height: AbsoluteBlockHeight) -> Option<Self> {
191        if endpoints.is_empty() {
192            return None;
193        }
194        Some(Self {
195            endpoints,
196            max_parallel: 4,
197            max_behind: Duration::from_secs(60),
198            wait_after_fail: Duration::from_secs(1),
199            start_height,
200        })
201    }
202
203    /// Set the maximum number of time the last finalized block of the node can
204    /// be behind before it is deemed too far behind, and another node is
205    /// tried.
206    ///
207    /// The default value is 60 seconds.
208    pub fn set_max_behind(self, max_behind: Duration) -> Self {
209        Self { max_behind, ..self }
210    }
211
212    /// After each failure the indexer will pause a bit before trying another
213    /// node to avoid overloading the node. Defaults to 1 second.
214    pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
215        Self {
216            wait_after_fail,
217            ..self
218        }
219    }
220
221    /// Add an additional endpoint to the list of endpoints the indexer will
222    /// use. This is added to the end of the list, so the endpoint is only tried
223    /// in case of failure of previous queries.
224    pub fn push_endpoint(mut self, endpoint: v2::Endpoint) -> Self {
225        self.endpoints.push(endpoint);
226        self
227    }
228
229    /// Set the maximum number of blocks that will be queried in parallel, if
230    /// they are available. Defaults to 4 if not set explicitly.
231    pub fn set_max_parallel(self, max_parallel: usize) -> Self {
232        Self {
233            max_parallel,
234            ..self
235        }
236    }
237
238    /// Traverse the chain according to the supplied configuration, invoking
239    /// [`on_finalized`](Indexer::on_finalized) for each finalized block.
240    ///
241    /// Multiple [`on_finalized`](Indexer::on_finalized) calls might be executed
242    /// concurrently, but their responses will be written to the provided
243    /// [`tokio::sync::mpsc::Sender`] in the increasing order of block height,
244    /// with no gaps.
245    ///
246    /// If a query fails, either due to timeout or node not being available the
247    /// indexer will attempt the next endpoint it is configured with.
248    ///
249    /// For robust indexing it is crucial that the supplied
250    /// [`Endpoints`](v2::Endpoint) are configured with timeouts so that the
251    /// indexer may make progress.
252    ///
253    /// The [`traverse`](Self::traverse) method will return either when
254    /// signalled so by the [`on_failure`](Indexer::on_failure) method or
255    /// when the receiver part of the supplied [`tokio::sync::mpsc::Sender`]
256    /// is closed. Typically this method should run in a task spawned via
257    /// [`tokio::spawn`].
258    pub async fn traverse<I: Indexer>(
259        self,
260        mut indexer: I,
261        sender: tokio::sync::mpsc::Sender<I::Data>,
262    ) -> TraverseResult<()> {
263        let TraverseConfig {
264            endpoints,
265            max_parallel,
266            max_behind,
267            wait_after_fail,
268            start_height: mut height,
269        } = self;
270        let mut successive_failures: u64 = 0;
271        for node_ep in endpoints.into_iter().cycle() {
272            if sender.is_closed() {
273                return Ok(());
274            }
275            if successive_failures > 0 {
276                tokio::time::sleep(wait_after_fail).await
277            }
278            let mut node = match v2::Client::new(node_ep.clone()).await {
279                Ok(v) => v,
280                Err(e) => {
281                    successive_failures += 1;
282                    let should_stop = indexer
283                        .on_failure(node_ep, successive_failures, e.into())
284                        .await;
285                    if should_stop {
286                        return Ok(());
287                    } else {
288                        continue;
289                    }
290                }
291            };
292
293            let context = match indexer.on_connect(node_ep.clone(), &mut node).await {
294                Ok(a) => a,
295                Err(e) => {
296                    successive_failures += 1;
297                    let should_stop = indexer
298                        .on_failure(node_ep, successive_failures, e.into())
299                        .await;
300                    if should_stop {
301                        return Ok(());
302                    } else {
303                        continue;
304                    }
305                }
306            };
307            let mut finalized_blocks = match node.get_finalized_blocks_from(height).await {
308                Ok(v) => v,
309                Err(e) => {
310                    successive_failures += 1;
311                    let should_stop = indexer
312                        .on_failure(node_ep, successive_failures, e.into())
313                        .await;
314                    if should_stop {
315                        return Ok(());
316                    } else {
317                        continue;
318                    }
319                }
320            };
321
322            let mut preprocessors = FuturesOrdered::new();
323            let mut finalized_blocks_error = false;
324            'node_loop: loop {
325                tokio::select! {
326                    biased;
327                    // First check if anything is done preprocessing.
328                    // If there is nothing being preprocessing this branch is disabled until next loop.
329                    Some(data) = preprocessors.next() => {
330                        let data = match data {
331                            Ok(v) => v,
332                            Err(e) => {
333                                drop(preprocessors);
334                                successive_failures += 1;
335                                let should_stop = indexer.on_failure(
336                                    node_ep,
337                                    successive_failures,
338                                    OnFinalizationError::into(e)
339                                ).await;
340                                if should_stop {
341                                    return Ok(());
342                                } else {
343                                    break 'node_loop;
344                                }
345                            }
346                        };
347                        if sender.send(data).await.is_err() {
348                            // the listener ended the stream, meaning we should stop.
349                            return Ok(());
350                        }
351                        height = height.next();
352                        successive_failures = 0;
353                        if finalized_blocks_error {
354                            // we have processed the blocks we can, but further queries on the same stream
355                            // will fail since the stream signalled an error.
356                            break 'node_loop;
357                        }
358                    },
359                    // If there is nothing immediately ready from preprocessing, we check if we have
360                    // available slots for preprocessors and fill them.
361                    // If no space this branch gets disabled until next loop.
362                    _ = async {}, if preprocessors.len() < max_parallel => {
363                        let space = max_parallel - preprocessors.len();
364                        match finalized_blocks
365                            .next_chunk_timeout(space, max_behind)
366                            .await
367                        {
368                            Ok((has_error, chunks)) => {
369                                finalized_blocks_error = has_error;
370                                for fb in chunks {
371                                    preprocessors.push_back(indexer.on_finalized(node.clone(), &context, fb));
372                                }
373                            },
374                            Err(e) => {
375                                drop(preprocessors);
376                                successive_failures += 1;
377                                let should_stop = indexer
378                                    .on_failure(node_ep, successive_failures, TraverseError::Elapsed(e))
379                                    .await;
380                                if should_stop {
381                                    return Ok(());
382                                } else {
383                                    break 'node_loop;
384                                }
385                            }
386                        };
387
388                    }
389                };
390            }
391        }
392        Ok(()) // unreachable
393    }
394}
395
396/// An indexer that retrieves all transaction outcomes.
397///
398/// The [`on_connect`](Indexer::on_connect) and
399/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
400/// log the events on `info` and `warn` levels, respectively, using the
401/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
402/// of the log is `ccd_indexer` which may be used to filter the logs.
403pub struct TransactionIndexer;
404
405#[async_trait]
406impl Indexer for TransactionIndexer {
407    type Context = ();
408    type Data = (BlockInfo, Vec<BlockItemSummary>);
409
410    async fn on_connect<'a>(
411        &mut self,
412        endpoint: v2::Endpoint,
413        _client: &'a mut v2::Client,
414    ) -> QueryResult<()> {
415        tracing::info!(
416            target: "ccd_indexer",
417            "Connected to endpoint {}.",
418            endpoint.uri()
419        );
420        Ok(())
421    }
422
423    async fn on_finalized<'a>(
424        &self,
425        mut client: v2::Client,
426        _ctx: &'a (),
427        fbi: FinalizedBlockInfo,
428    ) -> OnFinalizationResult<Self::Data> {
429        let bi = client.get_block_info(fbi.height).await?.response;
430        if bi.transaction_count != 0 {
431            let summary = client
432                .get_block_transaction_events(fbi.height)
433                .await?
434                .response
435                .try_collect::<Vec<_>>()
436                .await?;
437            Ok((bi, summary))
438        } else {
439            Ok((bi, Vec::new()))
440        }
441    }
442
443    async fn on_failure(
444        &mut self,
445        endpoint: v2::Endpoint,
446        successive_failures: u64,
447        err: TraverseError,
448    ) -> bool {
449        tracing::warn!(
450            target: "ccd_indexer",
451            successive_failures,
452            "Failed when querying endpoint {}: {err}",
453            endpoint.uri()
454        );
455        false
456    }
457}
458
459/// An indexer that retrieves smart contract updates where the specific
460/// entrypoint of a contract was triggered as the top-level entrypoint.
461///
462/// The [`on_connect`](Indexer::on_connect) and
463/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
464/// log the events on `info` and `warn` levels, respectively, using the
465/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
466/// of the log is `ccd_indexer` which may be used to filter the logs.
467pub struct ContractUpdateIndexer {
468    pub target_address: ContractAddress,
469    pub entrypoint: OwnedEntrypointName,
470}
471
472pub struct ContractUpdateInfo {
473    /// The execution tree generated by the call.
474    pub execution_tree: Upward<ExecutionTree>,
475    /// The amount of energy charged for the transaction.
476    pub energy_cost: Energy,
477    /// The cost, in CCD, of the transaction.
478    pub cost: Amount,
479    /// The hash of the transaction from which this update stems.
480    pub transaction_hash: TransactionHash,
481    /// The sender of the transaction.
482    pub sender: AccountAddress,
483}
484
485fn update_info(summary: BlockItemSummary) -> Option<ContractUpdateInfo> {
486    let at = summary.details.known()?.account_transaction()?;
487    let AccountTransactionEffects::ContractUpdateIssued { effects } = at.effects.known()? else {
488        return None;
489    };
490
491    Some(ContractUpdateInfo {
492        execution_tree: execution_tree(effects)?,
493        energy_cost: summary.energy_cost,
494        cost: at.cost,
495        transaction_hash: summary.hash,
496        sender: at.sender,
497    })
498}
499
500#[async_trait]
501impl Indexer for ContractUpdateIndexer {
502    type Context = ();
503    type Data = (BlockInfo, Vec<ContractUpdateInfo>);
504
505    async fn on_connect<'a>(
506        &mut self,
507        endpoint: v2::Endpoint,
508        _client: &'a mut v2::Client,
509    ) -> QueryResult<()> {
510        tracing::info!(
511            target: "ccd_indexer",
512            "Connected to endpoint {}.",
513            endpoint.uri()
514        );
515        Ok(())
516    }
517
518    async fn on_finalized<'a>(
519        &self,
520        mut client: v2::Client,
521        _ctx: &'a (),
522        fbi: FinalizedBlockInfo,
523    ) -> OnFinalizationResult<Self::Data> {
524        let bi = client
525            .get_block_info(fbi.height)
526            .await
527            .map_err(OnFinalizationError::from)?
528            .response;
529        if bi.transaction_count != 0 {
530            let summary = client
531                .get_block_transaction_events(fbi.height)
532                .await?
533                .response
534                .map_err(OnFinalizationError::from)
535                .try_filter_map(|summary| async move {
536                    let Some(info) = update_info(summary) else {
537                        return Ok(None);
538                    };
539
540                    let execution_tree = info.execution_tree.as_ref().known_or_err()?;
541
542                    if execution_tree.address() == self.target_address
543                        && execution_tree.entrypoint() == self.entrypoint.as_entrypoint_name()
544                    {
545                        Ok(Some(info))
546                    } else {
547                        Ok(None)
548                    }
549                })
550                .try_collect::<Vec<_>>()
551                .await?;
552            Ok((bi, summary))
553        } else {
554            Ok((bi, Vec::new()))
555        }
556    }
557
558    async fn on_failure(
559        &mut self,
560        endpoint: v2::Endpoint,
561        successive_failures: u64,
562        err: TraverseError,
563    ) -> bool {
564        tracing::warn!(
565            target: "ccd_indexer",
566            successive_failures,
567            "Failed when querying endpoint {}: {err}",
568            endpoint.uri()
569        );
570        false
571    }
572}
573
574/// An indexer that retrieves smart contract updates where the specific
575/// contracts were affected. The configuration can choose to require any or all
576/// to be updated.
577///
578/// The [`on_connect`](Indexer::on_connect) and
579/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
580/// log the events on `info` and `warn` levels, respectively, using the
581/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
582/// of the log is `ccd_indexer` which may be used to filter the logs.
583pub struct AffectedContractIndexer {
584    pub addresses: BTreeSet<ContractAddress>,
585    /// Require all contract addreseses in the `addresses` set to be updated.
586    pub all: bool,
587}
588
589#[async_trait]
590impl Indexer for AffectedContractIndexer {
591    type Context = ();
592    type Data = (
593        BlockInfo,
594        Vec<
595            v2::Upward<(
596                ContractUpdateInfo,
597                BTreeMap<ContractAddress, BTreeSet<OwnedReceiveName>>,
598            )>,
599        >,
600    );
601
602    async fn on_connect<'a>(
603        &mut self,
604        endpoint: v2::Endpoint,
605        _client: &'a mut v2::Client,
606    ) -> QueryResult<()> {
607        tracing::info!(
608            target: "ccd_indexer",
609            "Connected to endpoint {}.",
610            endpoint.uri()
611        );
612        Ok(())
613    }
614
615    async fn on_finalized<'a>(
616        &self,
617        mut client: v2::Client,
618        _ctx: &'a (),
619        fbi: FinalizedBlockInfo,
620    ) -> OnFinalizationResult<Self::Data> {
621        let bi = client.get_block_info(fbi.height).await?.response;
622        if bi.transaction_count != 0 {
623            let summary = client
624                .get_block_transaction_events(fbi.height)
625                .await?
626                .response
627                .map_err(OnFinalizationError::from)
628                .try_filter_map(|summary| async move {
629                    let Some(info) = update_info(summary) else {
630                        return Ok(None);
631                    };
632
633                    let execution_tree = info.execution_tree.as_ref().known_or_err()?;
634
635                    let affected_addresses = execution_tree.affected_addresses();
636                    let v2::Upward::Known(affected_addresses) = affected_addresses else {
637                        return Ok(Some(v2::Upward::Unknown(())));
638                    };
639
640                    if (self.all
641                        && self
642                            .addresses
643                            .iter()
644                            .all(|addr| affected_addresses.contains_key(addr)))
645                        || self
646                            .addresses
647                            .iter()
648                            .any(|addr| affected_addresses.contains_key(addr))
649                    {
650                        Ok(Some(v2::Upward::Known((info, affected_addresses))))
651                    } else {
652                        Ok(None)
653                    }
654                })
655                .try_collect::<Vec<_>>()
656                .await?;
657            Ok((bi, summary))
658        } else {
659            Ok((bi, Vec::new()))
660        }
661    }
662
663    async fn on_failure(
664        &mut self,
665        endpoint: v2::Endpoint,
666        successive_failures: u64,
667        err: TraverseError,
668    ) -> bool {
669        tracing::warn!(
670            target: "ccd_indexer",
671            successive_failures,
672            "Failed when querying endpoint {}: {err}",
673            endpoint.uri()
674        );
675        false
676    }
677}
678
679/// An indexer that retrieves all events in a block, transaction outcomes
680/// and special transaction outcomes.
681///
682/// The [`on_connect`](Indexer::on_connect) and
683/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
684/// log the events on `info` and `warn` levels, respectively, using the
685/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
686/// of the log is `ccd_indexer` which may be used to filter the logs.
687pub struct BlockEventsIndexer;
688
689#[async_trait]
690impl Indexer for BlockEventsIndexer {
691    type Context = ();
692    type Data = (
693        BlockInfo,
694        Vec<BlockItemSummary>,
695        Vec<v2::Upward<SpecialTransactionOutcome>>,
696    );
697
698    async fn on_connect<'a>(
699        &mut self,
700        endpoint: v2::Endpoint,
701        client: &'a mut v2::Client,
702    ) -> QueryResult<()> {
703        TransactionIndexer.on_connect(endpoint, client).await
704    }
705
706    async fn on_finalized<'a>(
707        &self,
708        client: v2::Client,
709        ctx: &'a (),
710        fbi: FinalizedBlockInfo,
711    ) -> OnFinalizationResult<Self::Data> {
712        let mut special_client = client.clone();
713        let special = async move {
714            let events = special_client
715                .get_block_special_events(fbi.height)
716                .await?
717                .response
718                .try_collect()
719                .await?;
720            Ok(events)
721        };
722        let ((bi, summary), special) =
723            futures::try_join!(TransactionIndexer.on_finalized(client, ctx, fbi), special)?;
724        Ok((bi, summary, special))
725    }
726
727    async fn on_failure(
728        &mut self,
729        endpoint: v2::Endpoint,
730        successive_failures: u64,
731        err: TraverseError,
732    ) -> bool {
733        TransactionIndexer
734            .on_failure(endpoint, successive_failures, err)
735            .await
736    }
737}
738
739#[async_trait]
740/// Handle an individual event. This trait is designed to be used together with
741/// the [`ProcessorConfig`]. These two together are designed to ease the work of
742/// writing the part of indexers where data is to be stored in a database.
743pub trait ProcessEvent {
744    /// The type of events that are to be processed. Typically this will be all
745    /// of the transactions of interest for a single block.
746    type Data;
747    /// An error that can be signalled.
748    type Error: std::fmt::Display + std::fmt::Debug;
749    /// A description returned by the [`process`](ProcessEvent::process) method.
750    /// This message is logged by the [`ProcessorConfig`] and is intended to
751    /// describe the data that was just processed.
752    type Description: std::fmt::Display;
753
754    /// Process a single item. This should work atomically in the sense that
755    /// either the entire `data` is processed or none of it is in case of an
756    /// error. This property is relied upon by the [`ProcessorConfig`] to retry
757    /// failed attempts.
758    async fn process(&mut self, data: &Self::Data) -> Result<Self::Description, Self::Error>;
759
760    /// The `on_failure` method is invoked by the [`ProcessorConfig`] when it
761    /// fails to process an event. It is meant to retry to recreate the
762    /// resources, such as a database connection, that might have been
763    /// dropped. The return value should signal if the handler process
764    /// should continue (`true`) or not.
765    ///
766    /// The function takes the `error` that occurred at the latest
767    /// [`process`](Self::process) call that just failed, and the number of
768    /// attempts of calling `process` that failed.
769    async fn on_failure(
770        &mut self,
771        error: Self::Error,
772        failed_attempts: u32,
773    ) -> Result<bool, Self::Error>;
774}
775
776pub struct ProcessorConfig {
777    /// The amount of time to wait after a failure to process an event.
778    wait_after_fail: std::time::Duration,
779    /// A future to be signalled to stop processing.
780    stop: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
781}
782
783/// The default implementation behaves the same as
784/// [`ProcessorConfig::new`](ProcessorConfig::new).
785impl Default for ProcessorConfig {
786    fn default() -> Self {
787        Self::new()
788    }
789}
790
791impl ProcessorConfig {
792    /// After each failure the [`ProcessorConfig`] will pause a bit before
793    /// trying again. Defaults to 5 seconds.
794    pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
795        Self {
796            wait_after_fail,
797            ..self
798        }
799    }
800
801    /// Set the stop signal for the processor. This accepts a future which will
802    /// be polled and if the future yields ready the `process_events` method
803    /// will terminate.
804    ///
805    /// An example of such a future would be the `Receiver` end of a oneshot
806    /// channel.
807    pub fn set_stop_signal(
808        self,
809        stop: impl std::future::Future<Output = ()> + Send + 'static,
810    ) -> Self {
811        Self {
812            stop: Box::pin(stop),
813            ..self
814        }
815    }
816
817    /// Construct a new [`ProcessorConfig`] that will retry the given number of
818    /// times. The default wait after a failure is 5 seconds.
819    pub fn new() -> Self {
820        Self {
821            wait_after_fail: std::time::Duration::from_secs(5),
822            stop: Box::pin(std::future::pending()),
823        }
824    }
825
826    /// Process events that are coming in on the provided channel.
827    ///
828    /// This handler will only terminate in the case of
829    ///
830    /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
831    /// - the sender part of the `events` channel has been dropped
832    /// - the [`ProcessorConfig`] was configured with a termination signal that
833    ///   was triggered.
834    ///
835    /// The function will log progress using the `tracing` library with the
836    /// target set to `ccd_event_processor`.
837    pub async fn process_events<P: ProcessEvent>(
838        self,
839        process: P,
840        events: tokio::sync::mpsc::Receiver<P::Data>,
841    ) {
842        let stream = tokio_stream::wrappers::ReceiverStream::new(events);
843        self.process_event_stream(process, stream).await
844    }
845
846    /// Process events that are coming in on the provided stream.
847    ///
848    /// This handler will only terminate in the case of
849    ///
850    /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
851    /// - the sender part of the `events` channel has been dropped
852    /// - the [`ProcessorConfig`] was configured with a termination signal that
853    ///   was triggered.
854    ///
855    /// The function will log progress using the `tracing` library with the
856    /// target set to `ccd_event_processor`.
857    pub async fn process_event_stream<P, E>(mut self, mut process: P, mut events: E)
858    where
859        P: ProcessEvent,
860        E: futures::Stream<Item = P::Data> + Unpin,
861    {
862        while let Some(event) = tokio::select! {
863            biased;
864            _ = &mut self.stop => None,
865            r = events.next() => r,
866        } {
867            let mut try_number: u32 = 0;
868            'outer: loop {
869                let start = tokio::time::Instant::now();
870                let response = process.process(&event).await;
871                let end = tokio::time::Instant::now();
872                let duration = end.duration_since(start).as_millis();
873                match response {
874                    Ok(descr) => {
875                        tracing::info!(
876                            target: "ccd_event_processor",
877                            "{descr} in {duration}ms."
878                        );
879                        break 'outer;
880                    }
881                    Err(e) => {
882                        tracing::error!(
883                            target: "ccd_event_processor",
884                            "Failed to process event: {e}. Took {duration}ms to fail."
885                        );
886                        tracing::info!(
887                            target: "ccd_event_processor",
888                            "Retrying in {}ms.",
889                            self.wait_after_fail.as_millis()
890                        );
891                        // Wait before calling on_failure with the idea that whatever caused the
892                        // failure is more likely to be fixed if we try
893                        // after a bit of time rather than immediately.
894                        tokio::select! {
895                            biased;
896                            _ = &mut self.stop => {break 'outer},
897                            _ = tokio::time::sleep(self.wait_after_fail) => {}
898                        }
899                        match process.on_failure(e, try_number + 1).await {
900                            Ok(true) => {
901                                // do nothing, continue.
902                            }
903                            Ok(false) => return,
904                            Err(e) => {
905                                tracing::warn!("Failed to restart: {e}.");
906                            }
907                        }
908                        try_number += 1;
909                    }
910                }
911            }
912        }
913        tracing::info!(
914            target: "ccd_event_processor",
915            "Terminating process_events due to channel closing."
916        );
917    }
918}
919
920/// Given a configuration for traversing the chain and processing generated
921/// events start a process to traverse the chain and index events.
922///
923/// This process will only stop when the `stop_signal` future completes, when
924/// [`traverse`](TraverseConfig::traverse) completes, or when
925/// [`process_events`](ProcessorConfig::process_events) completes.
926pub async fn traverse_and_process<I: Indexer, P: ProcessEvent<Data = I::Data>>(
927    config: TraverseConfig,
928    i: I,
929    processor: ProcessorConfig,
930    p: P,
931) -> TraverseResult<()> {
932    let (sender, receiver) = tokio::sync::mpsc::channel(10);
933    let fut1 = config.traverse(i, sender);
934    let fut2 = processor.process_events(p, receiver);
935    let (r1, ()) = futures::join!(fut1, fut2);
936    r1
937}