1use crate::datasource::{ReappliedTransactionsEvent, RolledbackTransactionsEvent};
2use crate::error::IndexerResult;
3use crate::filter::Filter;
4use crate::metrics::MetricsCollection;
5use crate::processor::Processor;
6use async_trait::async_trait;
7use std::collections::HashSet;
8use std::sync::Arc;
9
10pub struct RolledbackTransactionsPipe {
11 pub processor: Box<
12 dyn Processor<
13 InputType = RolledbackTransactionsEvent,
14 OutputType = HashSet<arch_program::pubkey::Pubkey>,
15 > + Send
16 + Sync,
17 >,
18 pub filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
19}
20
21#[async_trait]
22pub trait RolledbackTransactionsPipes: Send + Sync {
25 async fn run(
26 &mut self,
27 events: Vec<RolledbackTransactionsEvent>,
28 metrics: Arc<MetricsCollection>,
29 ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>>;
30
31 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
32}
33
34#[async_trait]
35impl RolledbackTransactionsPipes for RolledbackTransactionsPipe {
36 async fn run(
37 &mut self,
38 events: Vec<RolledbackTransactionsEvent>,
39 metrics: Arc<MetricsCollection>,
40 ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>> {
41 log::trace!("RolledbackTransactions::run(events: {:?}, metrics)", events);
42 let out = self.processor.process(events, metrics).await?;
43 Ok(out)
44 }
45
46 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
47 &self.filters
48 }
49}
50
51pub struct ReappliedTransactionsPipe {
52 pub processor: Box<
53 dyn Processor<
54 InputType = ReappliedTransactionsEvent,
55 OutputType = HashSet<arch_program::pubkey::Pubkey>,
56 > + Send
57 + Sync,
58 >,
59 pub filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
60}
61
62#[async_trait]
63pub trait ReappliedTransactionsPipes: Send + Sync {
66 async fn run(
67 &mut self,
68 events: Vec<ReappliedTransactionsEvent>,
69 metrics: Arc<MetricsCollection>,
70 ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>>;
71
72 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
73}
74
75#[async_trait]
76impl ReappliedTransactionsPipes for ReappliedTransactionsPipe {
77 async fn run(
78 &mut self,
79 events: Vec<ReappliedTransactionsEvent>,
80 metrics: Arc<MetricsCollection>,
81 ) -> IndexerResult<HashSet<arch_program::pubkey::Pubkey>> {
82 log::trace!("ReappliedTransactions::run(events: {:?}, metrics)", events);
83 let out = self.processor.process(events, metrics).await?;
84 Ok(out)
85 }
86
87 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
88 &self.filters
89 }
90}