concordium_rust_sdk/indexer.rs
1//! Support for writing indexers using the Rust SDK.
2//!
3//! The main indexer entrypoint is the [`TraverseConfig::traverse`] method
4//! which will start chain traversal, calling methods of the [`Indexer`] trait
5//! for each finalized block it discovers.
6use crate::{
7 types::{
8 execution_tree, queries::BlockInfo, AccountTransactionEffects, BlockItemSummary,
9 BlockItemSummaryDetails, ExecutionTree, SpecialTransactionOutcome,
10 },
11 v2::{self, FinalizedBlockInfo, QueryError, QueryResult},
12};
13use concordium_base::{
14 base::{AbsoluteBlockHeight, Energy},
15 contracts_common::{AccountAddress, Amount, ContractAddress, OwnedEntrypointName},
16 hashes::TransactionHash,
17 smart_contracts::OwnedReceiveName,
18};
19use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt as _};
20use std::{
21 collections::{BTreeMap, BTreeSet},
22 time::Duration,
23};
24use tokio::time::error::Elapsed;
25pub use tonic::async_trait;
26
27/// Configuration for an indexer.
28pub struct TraverseConfig {
29 endpoints: Vec<v2::Endpoint>,
30 max_parallel: usize,
31 max_behind: std::time::Duration,
32 wait_after_fail: std::time::Duration,
33 start_height: AbsoluteBlockHeight,
34}
35
36#[derive(Debug, thiserror::Error)]
37/// An error encountered during chain traversal and passed to the
38/// [`Indexer`].
39pub enum TraverseError {
40 #[error("Failed to connect: {0}")]
41 Connect(#[from] tonic::transport::Error),
42 #[error("Failed to query: {0}")]
43 Query(#[from] QueryError),
44 #[error("Timed out waiting for finalized blocks.")]
45 Elapsed(#[from] Elapsed),
46}
47
48#[async_trait]
49/// A trait intended to be implemented by indexers that traverse the chain and
50/// extract information of interest to store for efficient retrieval.
51///
52/// The main method of this trait is [`on_finalized`](Indexer::on_finalized)
53/// which will be called by the [`traverse`](TraverseConfig::traverse) method
54/// for each finalized block. The other two methods are meant for signalling and
55/// bookkeeping.
56///
57/// Note that this trait has `async` methods, which is why the type signatures
58/// are daunting. The intended way of implementing it is to use the
59/// `async_trait` macro like so
60///
61/// ```
62/// # use concordium_rust_sdk::{indexer::*, v2::{self, FinalizedBlockInfo, QueryResult}};
63/// use concordium_rust_sdk::indexer::async_trait;
64/// # struct MyIndexer;
65/// #[async_trait]
66/// impl Indexer for MyIndexer {
67/// type Context = ();
68/// type Data = ();
69///
70/// async fn on_connect<'a>(
71/// &mut self,
72/// endpoint: v2::Endpoint,
73/// client: &'a mut v2::Client,
74/// ) -> QueryResult<Self::Context> {
75/// unimplemented!("Implement me.")
76/// }
77///
78/// async fn on_finalized<'a>(
79/// &self,
80/// client: v2::Client,
81/// ctx: &'a Self::Context,
82/// fbi: FinalizedBlockInfo,
83/// ) -> QueryResult<Self::Data> {
84/// unimplemented!("Implement me.")
85/// }
86///
87/// async fn on_failure(
88/// &mut self,
89/// ep: v2::Endpoint,
90/// successive_failures: u64,
91/// err: TraverseError,
92/// ) -> bool {
93/// unimplemented!("Implement me.")
94/// }
95/// }
96/// ```
97pub trait Indexer {
98 /// The data that is retrieved upon connecting to the endpoint and supplied
99 /// to each call of [`on_finalized`](Self::on_finalized).
100 type Context: Send + Sync;
101 /// The data returned by the [`on_finalized`](Self::on_finalized) call.
102 type Data: Send + Sync;
103
104 /// Called when a new connection is established to the given endpoint.
105 /// The return value from this method is passed to each call of
106 /// [`on_finalized`](Self::on_finalized).
107 async fn on_connect<'a>(
108 &mut self,
109 endpoint: v2::Endpoint,
110 client: &'a mut v2::Client,
111 ) -> QueryResult<Self::Context>;
112
113 /// The main method of this trait. It is called for each finalized block
114 /// that the indexer discovers. Note that the indexer might call this
115 /// concurrently for multiple blocks at the same time to speed up indexing.
116 ///
117 /// This method is meant to return errors that are unexpected, and if it
118 /// does return an error the indexer will attempt to reconnect to the
119 /// next endpoint.
120 async fn on_finalized<'a>(
121 &self,
122 client: v2::Client,
123 ctx: &'a Self::Context,
124 fbi: FinalizedBlockInfo,
125 ) -> QueryResult<Self::Data>;
126
127 /// Called when either connecting to the node or querying the node fails.
128 /// The number of successive failures without progress is passed to the
129 /// method which should return whether to stop indexing ([`true`]) or not
130 /// ([`false`]).
131 async fn on_failure(
132 &mut self,
133 endpoint: v2::Endpoint,
134 successive_failures: u64,
135 err: TraverseError,
136 ) -> bool;
137}
138
139impl TraverseConfig {
140 /// A configuration with a single endpoint starting at the given block
141 /// height.
142 pub fn new_single(endpoint: v2::Endpoint, start_height: AbsoluteBlockHeight) -> Self {
143 Self {
144 endpoints: vec![endpoint],
145 max_parallel: 4,
146 max_behind: Duration::from_secs(60),
147 wait_after_fail: Duration::from_secs(1),
148 start_height,
149 }
150 }
151
152 /// A configuration with a given list of endpoints and starting height.
153 /// Returns [`None`] if the list of endpoints is empty.
154 pub fn new(endpoints: Vec<v2::Endpoint>, start_height: AbsoluteBlockHeight) -> Option<Self> {
155 if endpoints.is_empty() {
156 return None;
157 }
158 Some(Self {
159 endpoints,
160 max_parallel: 4,
161 max_behind: Duration::from_secs(60),
162 wait_after_fail: Duration::from_secs(1),
163 start_height,
164 })
165 }
166
167 /// Set the maximum number of time the last finalized block of the node can
168 /// be behind before it is deemed too far behind, and another node is
169 /// tried.
170 ///
171 /// The default value is 60 seconds.
172 pub fn set_max_behind(self, max_behind: Duration) -> Self { Self { max_behind, ..self } }
173
174 /// After each failure the indexer will pause a bit before trying another
175 /// node to avoid overloading the node. Defaults to 1 second.
176 pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
177 Self {
178 wait_after_fail,
179 ..self
180 }
181 }
182
183 /// Add an additional endpoint to the list of endpoints the indexer will
184 /// use. This is added to the end of the list, so the endpoint is only tried
185 /// in case of failure of previous queries.
186 pub fn push_endpoint(mut self, endpoint: v2::Endpoint) -> Self {
187 self.endpoints.push(endpoint);
188 self
189 }
190
191 /// Set the maximum number of blocks that will be queried in parallel, if
192 /// they are available. Defaults to 4 if not set explicitly.
193 pub fn set_max_parallel(self, max_parallel: usize) -> Self {
194 Self {
195 max_parallel,
196 ..self
197 }
198 }
199
200 /// Traverse the chain according to the supplied configuration, invoking
201 /// [`on_finalized`](Indexer::on_finalized) for each finalized block.
202 ///
203 /// Multiple [`on_finalized`](Indexer::on_finalized) calls might be executed
204 /// concurrently, but their responses will be written to the provided
205 /// [`tokio::sync::mpsc::Sender`] in the increasing order of block height,
206 /// with no gaps.
207 ///
208 /// If a query fails, either due to timeout or node not being available the
209 /// indexer will attempt the next endpoint it is configured with.
210 ///
211 /// For robust indexing it is crucial that the supplied
212 /// [`Endpoints`](v2::Endpoint) are configured with timeouts so that the
213 /// indexer may make progress.
214 ///
215 /// The [`traverse`](Self::traverse) method will return either when
216 /// signalled so by the [`on_failure`](Indexer::on_failure) method or
217 /// when the receiver part of the supplied [`tokio::sync::mpsc::Sender`]
218 /// is closed. Typically this method should run in a task spawned via
219 /// [`tokio::spawn`].
220 pub async fn traverse<I: Indexer>(
221 self,
222 mut indexer: I,
223 sender: tokio::sync::mpsc::Sender<I::Data>,
224 ) -> QueryResult<()> {
225 let TraverseConfig {
226 endpoints,
227 max_parallel,
228 max_behind,
229 wait_after_fail,
230 start_height: mut height,
231 } = self;
232 let mut successive_failures: u64 = 0;
233 for node_ep in endpoints.into_iter().cycle() {
234 if sender.is_closed() {
235 return Ok(());
236 }
237 if successive_failures > 0 {
238 tokio::time::sleep(wait_after_fail).await
239 }
240 let mut node = match v2::Client::new(node_ep.clone()).await {
241 Ok(v) => v,
242 Err(e) => {
243 successive_failures += 1;
244 let should_stop = indexer
245 .on_failure(node_ep, successive_failures, e.into())
246 .await;
247 if should_stop {
248 return Ok(());
249 } else {
250 continue;
251 }
252 }
253 };
254
255 let context = match indexer.on_connect(node_ep.clone(), &mut node).await {
256 Ok(a) => a,
257 Err(e) => {
258 successive_failures += 1;
259 let should_stop = indexer
260 .on_failure(node_ep, successive_failures, e.into())
261 .await;
262 if should_stop {
263 return Ok(());
264 } else {
265 continue;
266 }
267 }
268 };
269 let mut finalized_blocks = match node.get_finalized_blocks_from(height).await {
270 Ok(v) => v,
271 Err(e) => {
272 successive_failures += 1;
273 let should_stop = indexer
274 .on_failure(node_ep, successive_failures, e.into())
275 .await;
276 if should_stop {
277 return Ok(());
278 } else {
279 continue;
280 }
281 }
282 };
283
284 let mut preprocessors = FuturesOrdered::new();
285 let mut finalized_blocks_error = false;
286 'node_loop: loop {
287 tokio::select! {
288 biased;
289 // First check if anything is done preprocessing.
290 // If there is nothing being preprocessing this branch is disabled until next loop.
291 Some(data) = preprocessors.next() => {
292 let data = match data {
293 Ok(v) => v,
294 Err(e) => {
295 drop(preprocessors);
296 successive_failures += 1;
297 let should_stop = indexer
298 .on_failure(node_ep, successive_failures, TraverseError::Query(e))
299 .await;
300 if should_stop {
301 return Ok(());
302 } else {
303 break 'node_loop;
304 }
305 }
306 };
307 if sender.send(data).await.is_err() {
308 // the listener ended the stream, meaning we should stop.
309 return Ok(());
310 }
311 height = height.next();
312 successive_failures = 0;
313 if finalized_blocks_error {
314 // we have processed the blocks we can, but further queries on the same stream
315 // will fail since the stream signalled an error.
316 break 'node_loop;
317 }
318 },
319 // If there is nothing immediately ready from preprocessing, we check if we have
320 // available slots for preprocessors and fill them.
321 // If no space this branch gets disabled until next loop.
322 _ = async {}, if preprocessors.len() < max_parallel => {
323 let space = max_parallel - preprocessors.len();
324 match finalized_blocks
325 .next_chunk_timeout(space, max_behind)
326 .await
327 {
328 Ok((has_error, chunks)) => {
329 finalized_blocks_error = has_error;
330 for fb in chunks {
331 preprocessors.push_back(indexer.on_finalized(node.clone(), &context, fb));
332 }
333 },
334 Err(e) => {
335 drop(preprocessors);
336 successive_failures += 1;
337 let should_stop = indexer
338 .on_failure(node_ep, successive_failures, TraverseError::Elapsed(e))
339 .await;
340 if should_stop {
341 return Ok(());
342 } else {
343 break 'node_loop;
344 }
345 }
346 };
347
348 }
349 };
350 }
351 }
352 Ok(()) // unreachable
353 }
354}
355
356/// An indexer that retrieves all transaction outcomes.
357///
358/// The [`on_connect`](Indexer::on_connect) and
359/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
360/// log the events on `info` and `warn` levels, respectively, using the
361/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
362/// of the log is `ccd_indexer` which may be used to filter the logs.
363pub struct TransactionIndexer;
364
365#[async_trait]
366impl Indexer for TransactionIndexer {
367 type Context = ();
368 type Data = (BlockInfo, Vec<BlockItemSummary>);
369
370 async fn on_connect<'a>(
371 &mut self,
372 endpoint: v2::Endpoint,
373 _client: &'a mut v2::Client,
374 ) -> QueryResult<()> {
375 tracing::info!(
376 target: "ccd_indexer",
377 "Connected to endpoint {}.",
378 endpoint.uri()
379 );
380 Ok(())
381 }
382
383 async fn on_finalized<'a>(
384 &self,
385 mut client: v2::Client,
386 _ctx: &'a (),
387 fbi: FinalizedBlockInfo,
388 ) -> QueryResult<Self::Data> {
389 let bi = client.get_block_info(fbi.height).await?.response;
390 if bi.transaction_count != 0 {
391 let summary = client
392 .get_block_transaction_events(fbi.height)
393 .await?
394 .response
395 .try_collect::<Vec<_>>()
396 .await?;
397 Ok((bi, summary))
398 } else {
399 Ok((bi, Vec::new()))
400 }
401 }
402
403 async fn on_failure(
404 &mut self,
405 endpoint: v2::Endpoint,
406 successive_failures: u64,
407 err: TraverseError,
408 ) -> bool {
409 tracing::warn!(
410 target: "ccd_indexer",
411 successive_failures,
412 "Failed when querying endpoint {}: {err}",
413 endpoint.uri()
414 );
415 false
416 }
417}
418
419/// An indexer that retrieves smart contract updates where the specific
420/// entrypoint of a contract was triggered as the top-level entrypoint.
421///
422/// The [`on_connect`](Indexer::on_connect) and
423/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
424/// log the events on `info` and `warn` levels, respectively, using the
425/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
426/// of the log is `ccd_indexer` which may be used to filter the logs.
427pub struct ContractUpdateIndexer {
428 pub target_address: ContractAddress,
429 pub entrypoint: OwnedEntrypointName,
430}
431
432pub struct ContractUpdateInfo {
433 /// The execution tree generated by the call.
434 pub execution_tree: ExecutionTree,
435 /// The amount of energy charged for the transaction.
436 pub energy_cost: Energy,
437 /// The cost, in CCD, of the transaction.
438 pub cost: Amount,
439 /// The hash of the transaction from which this update stems.
440 pub transaction_hash: TransactionHash,
441 /// The sender of the transaction.
442 pub sender: AccountAddress,
443}
444
445fn update_info(summary: BlockItemSummary) -> Option<ContractUpdateInfo> {
446 let BlockItemSummaryDetails::AccountTransaction(at) = summary.details else {
447 return None;
448 };
449
450 let AccountTransactionEffects::ContractUpdateIssued { effects } = at.effects else {
451 return None;
452 };
453
454 Some(ContractUpdateInfo {
455 execution_tree: execution_tree(effects)?,
456 energy_cost: summary.energy_cost,
457 cost: at.cost,
458 transaction_hash: summary.hash,
459 sender: at.sender,
460 })
461}
462
463#[async_trait]
464impl Indexer for ContractUpdateIndexer {
465 type Context = ();
466 type Data = (BlockInfo, Vec<ContractUpdateInfo>);
467
468 async fn on_connect<'a>(
469 &mut self,
470 endpoint: v2::Endpoint,
471 _client: &'a mut v2::Client,
472 ) -> QueryResult<()> {
473 tracing::info!(
474 target: "ccd_indexer",
475 "Connected to endpoint {}.",
476 endpoint.uri()
477 );
478 Ok(())
479 }
480
481 async fn on_finalized<'a>(
482 &self,
483 mut client: v2::Client,
484 _ctx: &'a (),
485 fbi: FinalizedBlockInfo,
486 ) -> QueryResult<Self::Data> {
487 let bi = client.get_block_info(fbi.height).await?.response;
488 if bi.transaction_count != 0 {
489 let summary = client
490 .get_block_transaction_events(fbi.height)
491 .await?
492 .response
493 .try_filter_map(|summary| async move {
494 let Some(info) = update_info(summary) else {
495 return Ok(None);
496 };
497 if info.execution_tree.address() == self.target_address
498 && info.execution_tree.entrypoint() == self.entrypoint.as_entrypoint_name()
499 {
500 Ok(Some(info))
501 } else {
502 Ok(None)
503 }
504 })
505 .try_collect::<Vec<_>>()
506 .await?;
507 Ok((bi, summary))
508 } else {
509 Ok((bi, Vec::new()))
510 }
511 }
512
513 async fn on_failure(
514 &mut self,
515 endpoint: v2::Endpoint,
516 successive_failures: u64,
517 err: TraverseError,
518 ) -> bool {
519 tracing::warn!(
520 target: "ccd_indexer",
521 successive_failures,
522 "Failed when querying endpoint {}: {err}",
523 endpoint.uri()
524 );
525 false
526 }
527}
528
529/// An indexer that retrieves smart contract updates where the specific
530/// contracts were affected. The configuration can choose to require any or all
531/// to be updated.
532///
533/// The [`on_connect`](Indexer::on_connect) and
534/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
535/// log the events on `info` and `warn` levels, respectively, using the
536/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
537/// of the log is `ccd_indexer` which may be used to filter the logs.
538pub struct AffectedContractIndexer {
539 pub addresses: BTreeSet<ContractAddress>,
540 /// Require all contract addreseses in the `addresses` set to be updated.
541 pub all: bool,
542}
543
544#[async_trait]
545impl Indexer for AffectedContractIndexer {
546 type Context = ();
547 type Data = (
548 BlockInfo,
549 Vec<(
550 ContractUpdateInfo,
551 BTreeMap<ContractAddress, BTreeSet<OwnedReceiveName>>,
552 )>,
553 );
554
555 async fn on_connect<'a>(
556 &mut self,
557 endpoint: v2::Endpoint,
558 _client: &'a mut v2::Client,
559 ) -> QueryResult<()> {
560 tracing::info!(
561 target: "ccd_indexer",
562 "Connected to endpoint {}.",
563 endpoint.uri()
564 );
565 Ok(())
566 }
567
568 async fn on_finalized<'a>(
569 &self,
570 mut client: v2::Client,
571 _ctx: &'a (),
572 fbi: FinalizedBlockInfo,
573 ) -> QueryResult<Self::Data> {
574 let bi = client.get_block_info(fbi.height).await?.response;
575 if bi.transaction_count != 0 {
576 let summary = client
577 .get_block_transaction_events(fbi.height)
578 .await?
579 .response
580 .try_filter_map(|summary| async move {
581 let Some(info) = update_info(summary) else {
582 return Ok(None);
583 };
584 let affected_addresses = info.execution_tree.affected_addresses();
585 if (self.all
586 && self
587 .addresses
588 .iter()
589 .all(|addr| affected_addresses.contains_key(addr)))
590 || self
591 .addresses
592 .iter()
593 .any(|addr| affected_addresses.contains_key(addr))
594 {
595 Ok(Some((info, affected_addresses)))
596 } else {
597 Ok(None)
598 }
599 })
600 .try_collect::<Vec<_>>()
601 .await?;
602 Ok((bi, summary))
603 } else {
604 Ok((bi, Vec::new()))
605 }
606 }
607
608 async fn on_failure(
609 &mut self,
610 endpoint: v2::Endpoint,
611 successive_failures: u64,
612 err: TraverseError,
613 ) -> bool {
614 tracing::warn!(
615 target: "ccd_indexer",
616 successive_failures,
617 "Failed when querying endpoint {}: {err}",
618 endpoint.uri()
619 );
620 false
621 }
622}
623
624/// An indexer that retrieves all events in a block, transaction outcomes
625/// and special transaction outcomes.
626///
627/// The [`on_connect`](Indexer::on_connect) and
628/// [`on_failure`](Indexer::on_failure) methods of the [`Indexer`] trait only
629/// log the events on `info` and `warn` levels, respectively, using the
630/// [`tracing`](https://docs.rs/tracing/latest/tracing/) crate. The [target](https://docs.rs/tracing/latest/tracing/struct.Metadata.html#method.target)
631/// of the log is `ccd_indexer` which may be used to filter the logs.
632pub struct BlockEventsIndexer;
633
634#[async_trait]
635impl Indexer for BlockEventsIndexer {
636 type Context = ();
637 type Data = (
638 BlockInfo,
639 Vec<BlockItemSummary>,
640 Vec<SpecialTransactionOutcome>,
641 );
642
643 async fn on_connect<'a>(
644 &mut self,
645 endpoint: v2::Endpoint,
646 client: &'a mut v2::Client,
647 ) -> QueryResult<()> {
648 TransactionIndexer.on_connect(endpoint, client).await
649 }
650
651 async fn on_finalized<'a>(
652 &self,
653 client: v2::Client,
654 ctx: &'a (),
655 fbi: FinalizedBlockInfo,
656 ) -> QueryResult<Self::Data> {
657 let mut special_client = client.clone();
658 let special = async move {
659 let events = special_client
660 .get_block_special_events(fbi.height)
661 .await?
662 .response
663 .try_collect()
664 .await?;
665 Ok(events)
666 };
667 let ((bi, summary), special) =
668 futures::try_join!(TransactionIndexer.on_finalized(client, ctx, fbi), special)?;
669 Ok((bi, summary, special))
670 }
671
672 async fn on_failure(
673 &mut self,
674 endpoint: v2::Endpoint,
675 successive_failures: u64,
676 err: TraverseError,
677 ) -> bool {
678 TransactionIndexer
679 .on_failure(endpoint, successive_failures, err)
680 .await
681 }
682}
683
684#[async_trait]
685/// Handle an individual event. This trait is designed to be used together with
686/// the [`ProcessorConfig`]. These two together are designed to ease the work of
687/// writing the part of indexers where data is to be stored in a database.
688pub trait ProcessEvent {
689 /// The type of events that are to be processed. Typically this will be all
690 /// of the transactions of interest for a single block.
691 type Data;
692 /// An error that can be signalled.
693 type Error: std::fmt::Display + std::fmt::Debug;
694 /// A description returned by the [`process`](ProcessEvent::process) method.
695 /// This message is logged by the [`ProcessorConfig`] and is intended to
696 /// describe the data that was just processed.
697 type Description: std::fmt::Display;
698
699 /// Process a single item. This should work atomically in the sense that
700 /// either the entire `data` is processed or none of it is in case of an
701 /// error. This property is relied upon by the [`ProcessorConfig`] to retry
702 /// failed attempts.
703 async fn process(&mut self, data: &Self::Data) -> Result<Self::Description, Self::Error>;
704
705 /// The `on_failure` method is invoked by the [`ProcessorConfig`] when it
706 /// fails to process an event. It is meant to retry to recreate the
707 /// resources, such as a database connection, that might have been
708 /// dropped. The return value should signal if the handler process
709 /// should continue (`true`) or not.
710 ///
711 /// The function takes the `error` that occurred at the latest
712 /// [`process`](Self::process) call that just failed, and the number of
713 /// attempts of calling `process` that failed.
714 async fn on_failure(
715 &mut self,
716 error: Self::Error,
717 failed_attempts: u32,
718 ) -> Result<bool, Self::Error>;
719}
720
721pub struct ProcessorConfig {
722 /// The amount of time to wait after a failure to process an event.
723 wait_after_fail: std::time::Duration,
724 /// A future to be signalled to stop processing.
725 stop: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
726}
727
728/// The default implementation behaves the same as
729/// [`ProcessorConfig::new`](ProcessorConfig::new).
730impl Default for ProcessorConfig {
731 fn default() -> Self { Self::new() }
732}
733
734impl ProcessorConfig {
735 /// After each failure the [`ProcessorConfig`] will pause a bit before
736 /// trying again. Defaults to 5 seconds.
737 pub fn set_wait_after_failure(self, wait_after_fail: Duration) -> Self {
738 Self {
739 wait_after_fail,
740 ..self
741 }
742 }
743
744 /// Set the stop signal for the processor. This accepts a future which will
745 /// be polled and if the future yields ready the `process_events` method
746 /// will terminate.
747 ///
748 /// An example of such a future would be the `Receiver` end of a oneshot
749 /// channel.
750 pub fn set_stop_signal(
751 self,
752 stop: impl std::future::Future<Output = ()> + Send + 'static,
753 ) -> Self {
754 Self {
755 stop: Box::pin(stop),
756 ..self
757 }
758 }
759
760 /// Construct a new [`ProcessorConfig`] that will retry the given number of
761 /// times. The default wait after a failure is 5 seconds.
762 pub fn new() -> Self {
763 Self {
764 wait_after_fail: std::time::Duration::from_secs(5),
765 stop: Box::pin(std::future::pending()),
766 }
767 }
768
769 /// Process events that are coming in on the provided channel.
770 ///
771 /// This handler will only terminate in the case of
772 ///
773 /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
774 /// - the sender part of the `events` channel has been dropped
775 /// - the [`ProcessorConfig`] was configured with a termination signal that
776 /// was triggered.
777 ///
778 /// The function will log progress using the `tracing` library with the
779 /// target set to `ccd_event_processor`.
780 pub async fn process_events<P: ProcessEvent>(
781 self,
782 process: P,
783 events: tokio::sync::mpsc::Receiver<P::Data>,
784 ) {
785 let stream = tokio_stream::wrappers::ReceiverStream::new(events);
786 self.process_event_stream(process, stream).await
787 }
788
789 /// Process events that are coming in on the provided stream.
790 ///
791 /// This handler will only terminate in the case of
792 ///
793 /// - the [`on_failure`](ProcessEvent::on_failure) method indicates so.
794 /// - the sender part of the `events` channel has been dropped
795 /// - the [`ProcessorConfig`] was configured with a termination signal that
796 /// was triggered.
797 ///
798 /// The function will log progress using the `tracing` library with the
799 /// target set to `ccd_event_processor`.
800 pub async fn process_event_stream<P, E>(mut self, mut process: P, mut events: E)
801 where
802 P: ProcessEvent,
803 E: futures::Stream<Item = P::Data> + Unpin, {
804 while let Some(event) = tokio::select! {
805 biased;
806 _ = &mut self.stop => None,
807 r = events.next() => r,
808 } {
809 let mut try_number: u32 = 0;
810 'outer: loop {
811 let start = tokio::time::Instant::now();
812 let response = process.process(&event).await;
813 let end = tokio::time::Instant::now();
814 let duration = end.duration_since(start).as_millis();
815 match response {
816 Ok(descr) => {
817 tracing::info!(
818 target: "ccd_event_processor",
819 "{descr} in {duration}ms."
820 );
821 break 'outer;
822 }
823 Err(e) => {
824 tracing::error!(
825 target: "ccd_event_processor",
826 "Failed to process event: {e}. Took {duration}ms to fail."
827 );
828 tracing::info!(
829 target: "ccd_event_processor",
830 "Retrying in {}ms.",
831 self.wait_after_fail.as_millis()
832 );
833 // Wait before calling on_failure with the idea that whatever caused the
834 // failure is more likely to be fixed if we try
835 // after a bit of time rather than immediately.
836 tokio::select! {
837 biased;
838 _ = &mut self.stop => {break 'outer},
839 _ = tokio::time::sleep(self.wait_after_fail) => {}
840 }
841 match process.on_failure(e, try_number + 1).await {
842 Ok(true) => {
843 // do nothing, continue.
844 }
845 Ok(false) => return,
846 Err(e) => {
847 tracing::warn!("Failed to restart: {e}.");
848 }
849 }
850 try_number += 1;
851 }
852 }
853 }
854 }
855 tracing::info!(
856 target: "ccd_event_processor",
857 "Terminating process_events due to channel closing."
858 );
859 }
860}
861
862/// Given a configuration for traversing the chain and processing generated
863/// events start a process to traverse the chain and index events.
864///
865/// This process will only stop when the `stop_signal` future completes, when
866/// [`traverse`](TraverseConfig::traverse) completes, or when
867/// [`process_events`](ProcessorConfig::process_events) completes.
868pub async fn traverse_and_process<I: Indexer, P: ProcessEvent<Data = I::Data>>(
869 config: TraverseConfig,
870 i: I,
871 processor: ProcessorConfig,
872 p: P,
873) -> Result<(), QueryError> {
874 let (sender, receiver) = tokio::sync::mpsc::channel(10);
875 let fut1 = config.traverse(i, sender);
876 let fut2 = processor.process_events(p, receiver);
877 let (r1, ()) = futures::join!(fut1, fut2);
878 r1
879}