1use crate::filter::Filter;
2use crate::instruction::{DecodedInstruction, InstructionMetadata};
3use crate::transformers::unnest_parsed_instructions;
4use crate::{error::IndexerResult, instruction::NestedInstruction};
5use arch_program::sanitized::ArchMessage;
6
7use arch_program::pubkey::Pubkey;
8pub use arch_sdk::Signature;
9use arch_sdk::{InnerInstructionsList, RollbackStatus, Status};
10use {
11 crate::{
12 collection::InstructionDecoderCollection,
13 metrics::MetricsCollection,
14 processor::Processor,
15 schema::{ParsedInstruction, TransactionSchema},
16 },
17 async_trait::async_trait,
18 core::convert::TryFrom,
19 serde::de::DeserializeOwned,
20 std::sync::Arc,
21};
22
23#[derive(Debug, Clone)]
24pub struct TransactionMetadata {
25 pub id: String,
26 pub fee_payer: Pubkey,
27 pub message: ArchMessage,
28 pub status: Status,
29 pub log_messages: Vec<String>,
30 pub rollback_status: RollbackStatus,
31 pub block_height: u64,
32 pub bitcoin_txid: Option<String>,
33 pub bitcoin_tx: Option<crate::bitcoin::Transaction>,
34 pub inner_instructions_list: InnerInstructionsList,
35}
36
37impl TryFrom<crate::datasource::TransactionUpdate> for TransactionMetadata {
38 type Error = crate::error::Error;
39
40 fn try_from(value: crate::datasource::TransactionUpdate) -> Result<Self, Self::Error> {
41 log::trace!("try_from(transaction_update: {:?})", value);
42 let accounts = value
43 .transaction
44 .runtime_transaction
45 .message
46 .get_unique_instruction_account_keys();
47
48 Ok(TransactionMetadata {
49 id: value.transaction.txid().to_string(),
50 fee_payer: *accounts
51 .iter()
52 .next()
53 .ok_or(crate::error::Error::MissingFeePayer)?,
54 message: value.transaction.runtime_transaction.message.clone(),
55 status: value.transaction.status,
56 log_messages: value.transaction.logs,
57 rollback_status: value.transaction.rollback_status,
58 block_height: value.height,
59 bitcoin_txid: value.transaction.bitcoin_txid.map(|txid| txid.to_string()),
60 bitcoin_tx: None,
61 inner_instructions_list: value.transaction.inner_instructions_list,
62 })
63 }
64}
65
66pub type TransactionProcessorInputType<T, U = ()> = (
67 Arc<TransactionMetadata>,
68 Vec<(InstructionMetadata, DecodedInstruction<T>)>,
69 Option<U>,
70);
71
72pub struct TransactionPipe<T: InstructionDecoderCollection, U> {
73 schema: Option<TransactionSchema<T>>,
74 processor: Box<
75 dyn Processor<InputType = TransactionProcessorInputType<T, U>, OutputType = ()>
76 + Send
77 + Sync,
78 >,
79 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
80}
81
82pub struct ParsedTransaction<I: InstructionDecoderCollection> {
85 pub metadata: TransactionMetadata,
86 pub instructions: Vec<ParsedInstruction<I>>,
87}
88
89impl<T: InstructionDecoderCollection, U> TransactionPipe<T, U> {
90 pub fn new(
91 schema: Option<TransactionSchema<T>>,
92 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>, OutputType = ()>
93 + Send
94 + Sync
95 + 'static,
96 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
97 ) -> Self {
98 log::trace!(
99 "TransactionPipe::new(schema: {:?}, processor: {:?})",
100 schema,
101 stringify!(processor)
102 );
103 Self {
104 schema,
105 processor: Box::new(processor),
106 filters,
107 }
108 }
109
110 fn matches_schema(&self, instructions: &[ParsedInstruction<T>]) -> Option<U>
111 where
112 U: DeserializeOwned,
113 {
114 match self.schema {
115 Some(ref schema) => schema.match_schema(instructions),
116 None => None,
117 }
118 }
119}
120
121pub fn parse_instructions<T: InstructionDecoderCollection>(
122 nested_ixs: &[NestedInstruction],
123) -> Vec<ParsedInstruction<T>> {
124 log::trace!("parse_instructions(nested_ixs: {:?})", nested_ixs);
125
126 let mut parsed_instructions: Vec<ParsedInstruction<T>> = Vec::new();
127
128 for nested_ix in nested_ixs {
129 if let Some(instruction) = T::parse_instruction(&nested_ix.instruction) {
130 parsed_instructions.push(ParsedInstruction {
131 program_id: nested_ix.instruction.program_id,
132 instruction,
133 inner_instructions: parse_instructions(&nested_ix.inner_instructions),
134 });
135 } else {
136 for inner_ix in nested_ix.inner_instructions.iter() {
137 parsed_instructions.extend(parse_instructions(&[inner_ix.clone()]));
138 }
139 }
140 }
141
142 parsed_instructions
143}
144
145#[async_trait]
146pub trait TransactionPipes<'a>: Send + Sync {
152 async fn run(
153 &mut self,
154 transactions: Vec<(Arc<TransactionMetadata>, &[NestedInstruction])>,
155 metrics: Arc<MetricsCollection>,
156 ) -> IndexerResult<()>;
157
158 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
159}
160
161#[async_trait]
162impl<T, U> TransactionPipes<'_> for TransactionPipe<T, U>
163where
164 T: InstructionDecoderCollection + Sync + 'static,
165 U: DeserializeOwned + Send + Sync + 'static,
166{
167 async fn run(
168 &mut self,
169 transactions: Vec<(Arc<TransactionMetadata>, &[NestedInstruction])>,
170 metrics: Arc<MetricsCollection>,
171 ) -> IndexerResult<()> {
172 log::trace!(
173 "TransactionPipe::run(transactions: {:?}, metrics)",
174 transactions,
175 );
176
177 let mut processed_transactions: Vec<TransactionProcessorInputType<T, U>> = Vec::new();
178 for (transaction_metadata, instructions) in transactions {
179 let parsed_instructions = parse_instructions(instructions);
180
181 let unnested_instructions = unnest_parsed_instructions(
182 transaction_metadata.clone(),
183 parsed_instructions.clone(),
184 0,
185 vec![],
186 );
187
188 let matched_data = self.matches_schema(&parsed_instructions);
189 processed_transactions.push((
190 transaction_metadata,
191 unnested_instructions,
192 matched_data,
193 ));
194 }
195 self.processor
196 .process(processed_transactions, metrics)
197 .await?;
198
199 Ok(())
200 }
201
202 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
203 &self.filters
204 }
205}