atlas_arch/
rollback.rs

1use crate::datasource::{ReappliedTransactionsEvent, RolledbackTransactionsEvent};
2use crate::error::IndexerResult;
3use crate::filter::Filter;
4use crate::metrics::MetricsCollection;
5use crate::processor::Processor;
6use async_trait::async_trait;
7use std::collections::HashSet;
8use std::sync::Arc;
9
10pub struct RolledbackTransactionsPipe {
11    pub processor: Box<
12        dyn Processor<
13                InputType = RolledbackTransactionsEvent,
14                OutputType = HashSet<arch_program::pubkey::Pubkey>,
15            > + Send
16            + Sync,
17    >,
18    pub filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
19}
20
21#[async_trait]
22/// Handles rollback events (reorged-away transactions) and produces any
23/// side-effects, such as reindexing affected accounts.
24pub trait RolledbackTransactionsPipes: Send + Sync {
25    async fn run(
26        &mut self,
27        events: Vec<RolledbackTransactionsEvent>,
28        metrics: Arc<MetricsCollection>,
29    ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>>;
30
31    fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
32}
33
34#[async_trait]
35impl RolledbackTransactionsPipes for RolledbackTransactionsPipe {
36    async fn run(
37        &mut self,
38        events: Vec<RolledbackTransactionsEvent>,
39        metrics: Arc<MetricsCollection>,
40    ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>> {
41        log::trace!("RolledbackTransactions::run(events: {:?}, metrics)", events);
42        let out = self.processor.process(events, metrics).await?;
43        Ok(out)
44    }
45
46    fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
47        &self.filters
48    }
49}
50
51pub struct ReappliedTransactionsPipe {
52    pub processor: Box<
53        dyn Processor<
54                InputType = ReappliedTransactionsEvent,
55                OutputType = HashSet<arch_program::pubkey::Pubkey>,
56            > + Send
57            + Sync,
58    >,
59    pub filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
60}
61
62#[async_trait]
63/// Handles reapplication events (transactions restored after a reorg) and
64/// returns any affected accounts to update downstream.
65pub trait ReappliedTransactionsPipes: Send + Sync {
66    async fn run(
67        &mut self,
68        events: Vec<ReappliedTransactionsEvent>,
69        metrics: Arc<MetricsCollection>,
70    ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>>;
71
72    fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
73}
74
75#[async_trait]
76impl ReappliedTransactionsPipes for ReappliedTransactionsPipe {
77    async fn run(
78        &mut self,
79        events: Vec<ReappliedTransactionsEvent>,
80        metrics: Arc<MetricsCollection>,
81    ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>> {
82        log::trace!("ReappliedTransactions::run(events: {:?}, metrics)", events);
83        let out = self.processor.process(events, metrics).await?;
84        Ok(out)
85    }
86
87    fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
88        &self.filters
89    }
90}