concordium_rust_sdk/indexer.rs
1//! Support for writing indexers using the Rust SDK.
2//!
3//! The main indexer entrypoint is the [`TraverseConfig::traverse`] method
4//! which will start chain traversal, calling methods of the [`Indexer`] trait
5//! for each finalized block it discovers.
6use crate::{
7 types::{
8 execution_tree, queries::BlockInfo, AccountTransactionEffects, BlockItemSummary,
9 ExecutionTree, SpecialTransactionOutcome,
10 },
11 v2::{self, upward::UnknownDataError, FinalizedBlockInfo, QueryError, QueryResult, Upward},
12};
13use concordium_base::{
14 base::{AbsoluteBlockHeight, Energy},
15 contracts_common::{AccountAddress, Amount, ContractAddress, OwnedEntrypointName},
16 hashes::TransactionHash,
17 smart_contracts::OwnedReceiveName,
18};
19use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt as _};
20use std::{
21 collections::{BTreeMap, BTreeSet},
22 time::Duration,
23};
24use tokio::time::error::Elapsed;
25pub use tonic::async_trait;
26
27/// Configuration for an indexer.
28pub struct TraverseConfig {
29 endpoints: Vec<v2::Endpoint>,
30 max_parallel: usize,
31 max_behind: std::time::Duration,
32 wait_after_fail: std::time::Duration,
33 start_height: AbsoluteBlockHeight,
34}
35
36#[derive(Debug, thiserror::Error)]
37/// An error encountered during chain traversal and passed to the
38/// [`Indexer`].
39pub enum TraverseError {
40 #[error("Failed to connect: {0}")]
41 Connect(#[from] tonic::transport::Error),
42 #[error("Failed to query: {0}")]
43 Query(#[from] QueryError),
44 #[error("Timed out waiting for finalized blocks.")]
45 Elapsed(#[from] Elapsed),
46 #[error("UnknownDataError occurred: {0}")]
47 UnknownDataError(#[from] UnknownDataError),
48 #[error("Other error occurred: ${0}")]
49 OtherError(#[from] anyhow::Error),
50}
51
52impl From<OnFinalizationError> for TraverseError {
53 fn from(e: OnFinalizationError) -> Self {
54 match e {
55 OnFinalizationError::Query(query_error) => query_error.into(),
56 OnFinalizationError::UnknownDataError(err) => TraverseError::UnknownDataError(err),
57 OnFinalizationError::OtherError(err) => TraverseError::OtherError(err),
58 }
59 }
60}
61
62#[derive(Debug, thiserror::Error)]
63/// An error encountered during processing of a finalized block
64/// and passed to the [`Indexer`].
65pub enum OnFinalizationError {
66 #[error("Failed to query: {0}")]
67 Query(#[from] QueryError),
68 #[error("UnknownDataError occurred: ${0}")]
69 UnknownDataError(#[from] UnknownDataError),
70 #[error("Other error occurred: ${0}")]
71 OtherError(#[from] anyhow::Error),
72}
73
74impl From<tonic::Status> for OnFinalizationError {
75 fn from(s: tonic::Status) -> Self {
76 Self::Query(s.into())
77 }
78}
79
80pub type OnFinalizationResult<A> = Result<A, OnFinalizationError>;
81
82pub type TraverseResult<A> = OnFinalizationResult<A>;
83
84#[async_trait]
85/// A trait intended to be implemented by indexers that traverse the chain and
86/// extract information of interest to store for efficient retrieval.
87///
88/// The main method of this trait is [`on_finalized`](Indexer::on_finalized)
89/// which will be called by the [`traverse`](TraverseConfig::traverse) method
90/// for each finalized block. The other two methods are meant for signalling and
91/// bookkeeping.
92///
93/// Note that this trait has `async` methods, which is why the type signatures
94/// are daunting. The intended way of implementing it is to use the
95/// `async_trait` macro like so
96///
97/// ```
98/// # use concordium_rust_sdk::{indexer::*, v2::{self, FinalizedBlockInfo, QueryResult}};
99/// use concordium_rust_sdk::indexer::async_trait;
100/// # struct MyIndexer;
101/// #[async_trait]
102/// impl Indexer for MyIndexer {
103/// type Context = ();
104/// type Data = ();
105///
106/// async fn on_connect<'a>(
107/// &mut self,
108/// endpoint: v2::Endpoint,
109/// client: &'a mut v2::Client,
110/// ) -> QueryResult<Self::Context> {
111/// unimplemented!("Implement me.")
112/// }
113///
114/// async fn on_finalized<'a>(
115/// &self,
116/// client: v2::Client,
117/// ctx: &'a Self::Context,
118/// fbi: FinalizedBlockInfo,
119/// ) -> OnFinalizationResult<Self::Data> {
120/// unimplemented!("Implement me.")
121/// }
122///
123/// async fn on_failure(
124/// &mut self,
125/// ep: v2::Endpoint,
126/// successive_failures: u64,
127/// err: TraverseError,
128/// ) -> bool {
129/// unimplemented!("Implement me.")
130/// }
131/// }
132/// ```
133pub trait Indexer {
134 /// The data that is retrieved upon connecting to the endpoint and supplied
135 /// to each call of [`on_finalized`](Self::on_finalized).
136 type Context: Send + Sync;
137 /// The data returned by the [`on_finalized`](Self::on_finalized) call.
138 type Data: Send + Sync;
139
140 /// Called when a new connection is established to the given endpoint.
141 /// The return value from this method is passed to each call of
142 /// [`on_finalized`](Self::on_finalized).
143 async fn on_connect<'a>(
144 &mut self,
145 endpoint: v2::Endpoint,
146 client: &'a mut v2::Client,
147 ) -> QueryResult<Self::Context>;
148
149 /// The main method of this trait. It is called for each finalized block
150 /// that the indexer discovers. Note that the indexer might call this
151 /// concurrently for multiple blocks at the same time to speed up indexing.
152 ///
153 /// This method is meant to return errors that are unexpected, and if it
154 /// does return an error the indexer will attempt to reconnect to the
155 /// next endpoint.
156 async fn on_finalized<'a>(
157 &self,
158 client: v2::Client,
159 ctx: &'a Self::Context,
160 fbi: FinalizedBlockInfo,
161 ) -> OnFinalizationResult<Self::Data>;
162
163 /// Called when either connecting to the node or querying the node fails.
164 /// The number of successive failures without progress is passed to the
165 /// method which should return whether to stop indexing ([`true`]) or not
166 /// ([`false`]).
167 async fn on_failure(
168 &mut self,
169 endpoint: v2::Endpoint,
170 successive_failures: u64,
171 err: TraverseError,
172 ) -> bool;
173}
174
175impl TraverseConfig {
176 /// A configuration with a single endpoint starting at the given block
177 /// height.
178 pub fn new_single(endpoint: v2::Endpoint, start_height: AbsoluteBlockHeight) -> Self {
179 Self {
180 endpoints: vec![endpoint],
181 max_parallel: 4,
182 max_behind: Duration::from_secs(60),
183 wait_after_fail: Duration::from_secs(1),
184 start_height,
185 }
186 }
187
188 /// A configuration with a given list of endpoints and starting height.
189 /// Returns [`None`] if the list of endpoints is empty.
190 pub fn new(endpoints: Vec<v2::Endpoint>, start_height: AbsoluteBlockHeight) -> Option<Self> {
191 if endpoints.is_empty() {
192 return None;
193 }
194 Some(Self {
195 endpoints,
196 max_parallel: 4,
197 max_behind: Duration::from_secs(60),
198 wait_after_fail: Duration::from_secs(1),
199 start_height,
200 })
201 }
202
203 /// Set the maximum number of time the last finalized block of the node can
204 /// be behind before it is deemed too far behind, and another node is
205 /// tried.
206 ///
207 /// The default value is 60 seconds.
208 pub fn set_max_behind(self, max_behind: Duration) -> Self {
209 Self { max_behind, ..self }
210 }
211
212 /// After each failure the indexer will pause a bit before trying another
213 /// node to avoid overloading the node. Defaults to 1 second.
214 pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
215 Self {
216 wait_after_fail,
217 ..self
218 }
219 }
220
221 /// Add an additional endpoint to the list of endpoints the indexer will
222 /// use. This is added to the end of the list, so the endpoint is only tried
223 /// in case of failure of previous queries.
224 pub fn push_endpoint(mut self, endpoint: v2::Endpoint) -> Self {
225 self.endpoints.push(endpoint);
226 self
227 }
228
229 /// Set the maximum number of blocks that will be queried in parallel, if
230 /// they are available. Defaults to 4 if not set explicitly.
231 pub fn set_max_parallel(self, max_parallel: usize) -> Self {
232 Self {
233 max_parallel,
234 ..self
235 }
236 }
237
238 /// Traverse the chain according to the supplied configuration, invoking
239 /// [`on_finalized`](Indexer::on_finalized) for each finalized block.
240 ///
241 /// Multiple [`on_finalized`](Indexer::on_finalized) calls might be executed
242 /// concurrently, but their responses will be written to the provided
243 /// [`tokio::sync::mpsc::Sender`] in the increasing order of block height,
244 /// with no gaps.
245 ///
246 /// If a query fails, either due to timeout or node not being available the
247 /// indexer will attempt the next endpoint it is configured with.
248 ///
249 /// For robust indexing it is crucial that the supplied
250 /// [`Endpoints`](v2::Endpoint) are configured with timeouts so that the
251 /// indexer may make progress.
252 ///
253 /// The [`traverse`](Self::traverse) method will return either when
254 /// signalled so by the [`on_failure`](Indexer::on_failure) method or
255 /// when the receiver part of the supplied [`tokio::sync::mpsc::Sender`]
256 /// is closed. Typically this method should run in a task spawned via
257 /// [`tokio::spawn`].
258 pub async fn traverse<I: Indexer>(
259 self,
260 mut indexer: I,
261 sender: tokio::sync::mpsc::Sender<I::Data>,
262 ) -> TraverseResult<()> {
263 let TraverseConfig {
264 endpoints,
265 max_parallel,
266 max_behind,
267 wait_after_fail,
268 start_height: mut height,
269 } = self;
270 let mut successive_failures: u64 = 0;
271 for node_ep in endpoints.into_iter().cycle() {
272 if sender.is_closed() {
273 return Ok(());
274 }
275 if successive_failures > 0 {
276 tokio::time::sleep(wait_after_fail).await
277 }
278 let mut node = match v2::Client::new(node_ep.clone()).await {
279 Ok(v) => v,
280 Err(e) => {
281 successive_failures += 1;
282 let should_stop = indexer
283 .on_failure(node_ep, successive_failures, e.into())
284 .await;
285 if should_stop {
286 return Ok(());
287 } else {
288 continue;
289 }
290 }
291 };
292
293 let context = match indexer.on_connect(node_ep.clone(), &mut node).await {
294 Ok(a) => a,
295 Err(e) => {
296 successive_failures += 1;
297 let should_stop = indexer
298 .on_failure(node_ep, successive_failures, e.into())
299 .await;
300 if should_stop {
301 return Ok(());
302 } else {
303 continue;
304 }
305 }
306 };
307 let mut finalized_blocks = match node.get_finalized_blocks_from(height).await {
308 Ok(v) => v,
309 Err(e) => {
310 successive_failures += 1;
311 let should_stop = indexer
312 .on_failure(node_ep, successive_failures, e.into())
313 .await;
314 if should_stop {
315 return Ok(());
316 } else {
317 continue;
318 }
319 }
320 };
321
322 let mut preprocessors = FuturesOrdered::new();
323 let mut finalized_blocks_error = false;
324 'node_loop: loop {
325 tokio::select! {
326 biased;
327 // First check if anything is done preprocessing.
328 // If there is nothing being preprocessing this branch is disabled until next loop.
329 Some(data) = preprocessors.next() => {
330 let data = match data {
331 Ok(v) => v,
332 Err(e) => {
333 drop(preprocessors);
334 successive_failures += 1;
335 let should_stop = indexer.on_failure(
336 node_ep,
337 successive_failures,
338 OnFinalizationError::into(e)
339 ).await;
340 if should_stop {
341 return Ok(());
342 } else {
343 break 'node_loop;
344 }
345 }
346 };
347 if sender.send(data).await.is_err() {
348 // the listener ended the stream, meaning we should stop.
349 return Ok(());
350 }
351 height = height.next();
352 successive_failures = 0;
353 if finalized_blocks_error {
354 // we have processed the blocks we can, but further queries on the same stream
355 // will fail since the stream signalled an error.
356 break 'node_loop;
357 }
358 },
359 // If there is nothing immediately ready from preprocessing, we check if we have
360 // available slots for preprocessors and fill them.
361 // If no space this branch gets disabled until next loop.
362 _ = async {}, if preprocessors.len() < max_parallel => {
363 let space = max_parallel - preprocessors.len();
364 match finalized_blocks
365 .next_chunk_timeout(space, max_behind)
366 .await
367 {
368 Ok((has_error, chunks)) => {
369 finalized_blocks_error = has_error;
370 for fb in chunks {
371 preprocessors.push_back(indexer.on_finalized(node.clone(), &context, fb));
372 }
373 },
374 Err(e) => {
375 drop(preprocessors);
376 successive_failures += 1;
377 let should_stop = indexer
378 .on_failure(node_ep, successive_failures, TraverseError::Elapsed(e))
379 .await;
380 if should_stop {
381 return Ok(());
382 } else {
383 break 'node_loop;
384 }
385 }
386 };
387
388 }
389 };
390 }
391 }
392 Ok(()) // unreachable
393 }
394}
395
396/// An indexer that retrieves all transaction outcomes.
397///
398/// The [`on_connect`](Indexer::on_connect) and
399/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
400/// log the events on `info` and `warn` levels, respectively, using the
401/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
402/// of the log is `ccd_indexer` which may be used to filter the logs.
403pub struct TransactionIndexer;
404
405#[async_trait]
406impl Indexer for TransactionIndexer {
407 type Context = ();
408 type Data = (BlockInfo, Vec<BlockItemSummary>);
409
410 async fn on_connect<'a>(
411 &mut self,
412 endpoint: v2::Endpoint,
413 _client: &'a mut v2::Client,
414 ) -> QueryResult<()> {
415 tracing::info!(
416 target: "ccd_indexer",
417 "Connected to endpoint {}.",
418 endpoint.uri()
419 );
420 Ok(())
421 }
422
423 async fn on_finalized<'a>(
424 &self,
425 mut client: v2::Client,
426 _ctx: &'a (),
427 fbi: FinalizedBlockInfo,
428 ) -> OnFinalizationResult<Self::Data> {
429 let bi = client.get_block_info(fbi.height).await?.response;
430 if bi.transaction_count != 0 {
431 let summary = client
432 .get_block_transaction_events(fbi.height)
433 .await?
434 .response
435 .try_collect::<Vec<_>>()
436 .await?;
437 Ok((bi, summary))
438 } else {
439 Ok((bi, Vec::new()))
440 }
441 }
442
443 async fn on_failure(
444 &mut self,
445 endpoint: v2::Endpoint,
446 successive_failures: u64,
447 err: TraverseError,
448 ) -> bool {
449 tracing::warn!(
450 target: "ccd_indexer",
451 successive_failures,
452 "Failed when querying endpoint {}: {err}",
453 endpoint.uri()
454 );
455 false
456 }
457}
458
459/// An indexer that retrieves smart contract updates where the specific
460/// entrypoint of a contract was triggered as the top-level entrypoint.
461///
462/// The [`on_connect`](Indexer::on_connect) and
463/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
464/// log the events on `info` and `warn` levels, respectively, using the
465/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
466/// of the log is `ccd_indexer` which may be used to filter the logs.
467pub struct ContractUpdateIndexer {
468 pub target_address: ContractAddress,
469 pub entrypoint: OwnedEntrypointName,
470}
471
472pub struct ContractUpdateInfo {
473 /// The execution tree generated by the call.
474 pub execution_tree: Upward<ExecutionTree>,
475 /// The amount of energy charged for the transaction.
476 pub energy_cost: Energy,
477 /// The cost, in CCD, of the transaction.
478 pub cost: Amount,
479 /// The hash of the transaction from which this update stems.
480 pub transaction_hash: TransactionHash,
481 /// The sender of the transaction.
482 pub sender: AccountAddress,
483}
484
485fn update_info(summary: BlockItemSummary) -> Option<ContractUpdateInfo> {
486 let at = summary.details.known()?.account_transaction()?;
487 let AccountTransactionEffects::ContractUpdateIssued { effects } = at.effects.known()? else {
488 return None;
489 };
490
491 Some(ContractUpdateInfo {
492 execution_tree: execution_tree(effects)?,
493 energy_cost: summary.energy_cost,
494 cost: at.cost,
495 transaction_hash: summary.hash,
496 sender: at.sender,
497 })
498}
499
500#[async_trait]
501impl Indexer for ContractUpdateIndexer {
502 type Context = ();
503 type Data = (BlockInfo, Vec<ContractUpdateInfo>);
504
505 async fn on_connect<'a>(
506 &mut self,
507 endpoint: v2::Endpoint,
508 _client: &'a mut v2::Client,
509 ) -> QueryResult<()> {
510 tracing::info!(
511 target: "ccd_indexer",
512 "Connected to endpoint {}.",
513 endpoint.uri()
514 );
515 Ok(())
516 }
517
518 async fn on_finalized<'a>(
519 &self,
520 mut client: v2::Client,
521 _ctx: &'a (),
522 fbi: FinalizedBlockInfo,
523 ) -> OnFinalizationResult<Self::Data> {
524 let bi = client
525 .get_block_info(fbi.height)
526 .await
527 .map_err(OnFinalizationError::from)?
528 .response;
529 if bi.transaction_count != 0 {
530 let summary = client
531 .get_block_transaction_events(fbi.height)
532 .await?
533 .response
534 .map_err(OnFinalizationError::from)
535 .try_filter_map(|summary| async move {
536 let Some(info) = update_info(summary) else {
537 return Ok(None);
538 };
539
540 let execution_tree = info.execution_tree.as_ref().known_or_err()?;
541
542 if execution_tree.address() == self.target_address
543 && execution_tree.entrypoint() == self.entrypoint.as_entrypoint_name()
544 {
545 Ok(Some(info))
546 } else {
547 Ok(None)
548 }
549 })
550 .try_collect::<Vec<_>>()
551 .await?;
552 Ok((bi, summary))
553 } else {
554 Ok((bi, Vec::new()))
555 }
556 }
557
558 async fn on_failure(
559 &mut self,
560 endpoint: v2::Endpoint,
561 successive_failures: u64,
562 err: TraverseError,
563 ) -> bool {
564 tracing::warn!(
565 target: "ccd_indexer",
566 successive_failures,
567 "Failed when querying endpoint {}: {err}",
568 endpoint.uri()
569 );
570 false
571 }
572}
573
574/// An indexer that retrieves smart contract updates where the specific
575/// contracts were affected. The configuration can choose to require any or all
576/// to be updated.
577///
578/// The [`on_connect`](Indexer::on_connect) and
579/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
580/// log the events on `info` and `warn` levels, respectively, using the
581/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
582/// of the log is `ccd_indexer` which may be used to filter the logs.
583pub struct AffectedContractIndexer {
584 pub addresses: BTreeSet<ContractAddress>,
585 /// Require all contract addreseses in the `addresses` set to be updated.
586 pub all: bool,
587}
588
589#[async_trait]
590impl Indexer for AffectedContractIndexer {
591 type Context = ();
592 type Data = (
593 BlockInfo,
594 Vec<
595 v2::Upward<(
596 ContractUpdateInfo,
597 BTreeMap<ContractAddress, BTreeSet<OwnedReceiveName>>,
598 )>,
599 >,
600 );
601
602 async fn on_connect<'a>(
603 &mut self,
604 endpoint: v2::Endpoint,
605 _client: &'a mut v2::Client,
606 ) -> QueryResult<()> {
607 tracing::info!(
608 target: "ccd_indexer",
609 "Connected to endpoint {}.",
610 endpoint.uri()
611 );
612 Ok(())
613 }
614
615 async fn on_finalized<'a>(
616 &self,
617 mut client: v2::Client,
618 _ctx: &'a (),
619 fbi: FinalizedBlockInfo,
620 ) -> OnFinalizationResult<Self::Data> {
621 let bi = client.get_block_info(fbi.height).await?.response;
622 if bi.transaction_count != 0 {
623 let summary = client
624 .get_block_transaction_events(fbi.height)
625 .await?
626 .response
627 .map_err(OnFinalizationError::from)
628 .try_filter_map(|summary| async move {
629 let Some(info) = update_info(summary) else {
630 return Ok(None);
631 };
632
633 let execution_tree = info.execution_tree.as_ref().known_or_err()?;
634
635 let affected_addresses = execution_tree.affected_addresses();
636 let v2::Upward::Known(affected_addresses) = affected_addresses else {
637 return Ok(Some(v2::Upward::Unknown(())));
638 };
639
640 if (self.all
641 && self
642 .addresses
643 .iter()
644 .all(|addr| affected_addresses.contains_key(addr)))
645 || self
646 .addresses
647 .iter()
648 .any(|addr| affected_addresses.contains_key(addr))
649 {
650 Ok(Some(v2::Upward::Known((info, affected_addresses))))
651 } else {
652 Ok(None)
653 }
654 })
655 .try_collect::<Vec<_>>()
656 .await?;
657 Ok((bi, summary))
658 } else {
659 Ok((bi, Vec::new()))
660 }
661 }
662
663 async fn on_failure(
664 &mut self,
665 endpoint: v2::Endpoint,
666 successive_failures: u64,
667 err: TraverseError,
668 ) -> bool {
669 tracing::warn!(
670 target: "ccd_indexer",
671 successive_failures,
672 "Failed when querying endpoint {}: {err}",
673 endpoint.uri()
674 );
675 false
676 }
677}
678
679/// An indexer that retrieves all events in a block, transaction outcomes
680/// and special transaction outcomes.
681///
682/// The [`on_connect`](Indexer::on_connect) and
683/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
684/// log the events on `info` and `warn` levels, respectively, using the
685/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
686/// of the log is `ccd_indexer` which may be used to filter the logs.
687pub struct BlockEventsIndexer;
688
689#[async_trait]
690impl Indexer for BlockEventsIndexer {
691 type Context = ();
692 type Data = (
693 BlockInfo,
694 Vec<BlockItemSummary>,
695 Vec<v2::Upward<SpecialTransactionOutcome>>,
696 );
697
698 async fn on_connect<'a>(
699 &mut self,
700 endpoint: v2::Endpoint,
701 client: &'a mut v2::Client,
702 ) -> QueryResult<()> {
703 TransactionIndexer.on_connect(endpoint, client).await
704 }
705
706 async fn on_finalized<'a>(
707 &self,
708 client: v2::Client,
709 ctx: &'a (),
710 fbi: FinalizedBlockInfo,
711 ) -> OnFinalizationResult<Self::Data> {
712 let mut special_client = client.clone();
713 let special = async move {
714 let events = special_client
715 .get_block_special_events(fbi.height)
716 .await?
717 .response
718 .try_collect()
719 .await?;
720 Ok(events)
721 };
722 let ((bi, summary), special) =
723 futures::try_join!(TransactionIndexer.on_finalized(client, ctx, fbi), special)?;
724 Ok((bi, summary, special))
725 }
726
727 async fn on_failure(
728 &mut self,
729 endpoint: v2::Endpoint,
730 successive_failures: u64,
731 err: TraverseError,
732 ) -> bool {
733 TransactionIndexer
734 .on_failure(endpoint, successive_failures, err)
735 .await
736 }
737}
738
739#[async_trait]
740/// Handle an individual event. This trait is designed to be used together with
741/// the [`ProcessorConfig`]. These two together are designed to ease the work of
742/// writing the part of indexers where data is to be stored in a database.
743pub trait ProcessEvent {
744 /// The type of events that are to be processed. Typically this will be all
745 /// of the transactions of interest for a single block.
746 type Data;
747 /// An error that can be signalled.
748 type Error: std::fmt::Display + std::fmt::Debug;
749 /// A description returned by the [`process`](ProcessEvent::process) method.
750 /// This message is logged by the [`ProcessorConfig`] and is intended to
751 /// describe the data that was just processed.
752 type Description: std::fmt::Display;
753
754 /// Process a single item. This should work atomically in the sense that
755 /// either the entire `data` is processed or none of it is in case of an
756 /// error. This property is relied upon by the [`ProcessorConfig`] to retry
757 /// failed attempts.
758 async fn process(&mut self, data: &Self::Data) -> Result<Self::Description, Self::Error>;
759
760 /// The `on_failure` method is invoked by the [`ProcessorConfig`] when it
761 /// fails to process an event. It is meant to retry to recreate the
762 /// resources, such as a database connection, that might have been
763 /// dropped. The return value should signal if the handler process
764 /// should continue (`true`) or not.
765 ///
766 /// The function takes the `error` that occurred at the latest
767 /// [`process`](Self::process) call that just failed, and the number of
768 /// attempts of calling `process` that failed.
769 async fn on_failure(
770 &mut self,
771 error: Self::Error,
772 failed_attempts: u32,
773 ) -> Result<bool, Self::Error>;
774}
775
776pub struct ProcessorConfig {
777 /// The amount of time to wait after a failure to process an event.
778 wait_after_fail: std::time::Duration,
779 /// A future to be signalled to stop processing.
780 stop: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
781}
782
783/// The default implementation behaves the same as
784/// [`ProcessorConfig::new`](ProcessorConfig::new).
785impl Default for ProcessorConfig {
786 fn default() -> Self {
787 Self::new()
788 }
789}
790
791impl ProcessorConfig {
792 /// After each failure the [`ProcessorConfig`] will pause a bit before
793 /// trying again. Defaults to 5 seconds.
794 pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
795 Self {
796 wait_after_fail,
797 ..self
798 }
799 }
800
801 /// Set the stop signal for the processor. This accepts a future which will
802 /// be polled and if the future yields ready the `process_events` method
803 /// will terminate.
804 ///
805 /// An example of such a future would be the `Receiver` end of a oneshot
806 /// channel.
807 pub fn set_stop_signal(
808 self,
809 stop: impl std::future::Future<Output = ()> + Send + 'static,
810 ) -> Self {
811 Self {
812 stop: Box::pin(stop),
813 ..self
814 }
815 }
816
817 /// Construct a new [`ProcessorConfig`] that will retry the given number of
818 /// times. The default wait after a failure is 5 seconds.
819 pub fn new() -> Self {
820 Self {
821 wait_after_fail: std::time::Duration::from_secs(5),
822 stop: Box::pin(std::future::pending()),
823 }
824 }
825
826 /// Process events that are coming in on the provided channel.
827 ///
828 /// This handler will only terminate in the case of
829 ///
830 /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
831 /// - the sender part of the `events` channel has been dropped
832 /// - the [`ProcessorConfig`] was configured with a termination signal that
833 /// was triggered.
834 ///
835 /// The function will log progress using the `tracing` library with the
836 /// target set to `ccd_event_processor`.
837 pub async fn process_events<P: ProcessEvent>(
838 self,
839 process: P,
840 events: tokio::sync::mpsc::Receiver<P::Data>,
841 ) {
842 let stream = tokio_stream::wrappers::ReceiverStream::new(events);
843 self.process_event_stream(process, stream).await
844 }
845
846 /// Process events that are coming in on the provided stream.
847 ///
848 /// This handler will only terminate in the case of
849 ///
850 /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
851 /// - the sender part of the `events` channel has been dropped
852 /// - the [`ProcessorConfig`] was configured with a termination signal that
853 /// was triggered.
854 ///
855 /// The function will log progress using the `tracing` library with the
856 /// target set to `ccd_event_processor`.
857 pub async fn process_event_stream<P, E>(mut self, mut process: P, mut events: E)
858 where
859 P: ProcessEvent,
860 E: futures::Stream<Item = P::Data> + Unpin,
861 {
862 while let Some(event) = tokio::select! {
863 biased;
864 _ = &mut self.stop => None,
865 r = events.next() => r,
866 } {
867 let mut try_number: u32 = 0;
868 'outer: loop {
869 let start = tokio::time::Instant::now();
870 let response = process.process(&event).await;
871 let end = tokio::time::Instant::now();
872 let duration = end.duration_since(start).as_millis();
873 match response {
874 Ok(descr) => {
875 tracing::info!(
876 target: "ccd_event_processor",
877 "{descr} in {duration}ms."
878 );
879 break 'outer;
880 }
881 Err(e) => {
882 tracing::error!(
883 target: "ccd_event_processor",
884 "Failed to process event: {e}. Took {duration}ms to fail."
885 );
886 tracing::info!(
887 target: "ccd_event_processor",
888 "Retrying in {}ms.",
889 self.wait_after_fail.as_millis()
890 );
891 // Wait before calling on_failure with the idea that whatever caused the
892 // failure is more likely to be fixed if we try
893 // after a bit of time rather than immediately.
894 tokio::select! {
895 biased;
896 _ = &mut self.stop => {break 'outer},
897 _ = tokio::time::sleep(self.wait_after_fail) => {}
898 }
899 match process.on_failure(e, try_number + 1).await {
900 Ok(true) => {
901 // do nothing, continue.
902 }
903 Ok(false) => return,
904 Err(e) => {
905 tracing::warn!("Failed to restart: {e}.");
906 }
907 }
908 try_number += 1;
909 }
910 }
911 }
912 }
913 tracing::info!(
914 target: "ccd_event_processor",
915 "Terminating process_events due to channel closing."
916 );
917 }
918}
919
920/// Given a configuration for traversing the chain and processing generated
921/// events start a process to traverse the chain and index events.
922///
923/// This process will only stop when the `stop_signal` future completes, when
924/// [`traverse`](TraverseConfig::traverse) completes, or when
925/// [`process_events`](ProcessorConfig::process_events) completes.
926pub async fn traverse_and_process<I: Indexer, P: ProcessEvent<Data = I::Data>>(
927 config: TraverseConfig,
928 i: I,
929 processor: ProcessorConfig,
930 p: P,
931) -> TraverseResult<()> {
932 let (sender, receiver) = tokio::sync::mpsc::channel(10);
933 let fut1 = config.traverse(i, sender);
934 let fut2 = processor.process_events(p, receiver);
935 let (r1, ()) = futures::join!(fut1, fut2);
936 r1
937}