atlas_arch/
transaction.rs

1use crate::filter::Filter;
2use crate::instruction::{DecodedInstruction, InstructionMetadata};
3use crate::transformers::unnest_parsed_instructions;
4use crate::{error::IndexerResult, instruction::NestedInstruction};
5use arch_program::sanitized::ArchMessage;
6
7use arch_program::pubkey::Pubkey;
8pub use arch_sdk::Signature;
9use arch_sdk::{InnerInstructionsList, RollbackStatus, Status};
10use {
11    crate::{
12        collection::InstructionDecoderCollection,
13        metrics::MetricsCollection,
14        processor::Processor,
15        schema::{ParsedInstruction, TransactionSchema},
16    },
17    async_trait::async_trait,
18    core::convert::TryFrom,
19    serde::de::DeserializeOwned,
20    std::sync::Arc,
21};
22
23#[derive(Debug, Clone)]
24pub struct TransactionMetadata {
25    pub id: String,
26    pub fee_payer: Pubkey,
27    pub message: ArchMessage,
28    pub status: Status,
29    pub log_messages: Vec<String>,
30    pub rollback_status: RollbackStatus,
31    pub block_height: u64,
32    pub bitcoin_txid: Option<String>,
33    pub bitcoin_tx: Option<crate::bitcoin::Transaction>,
34    pub inner_instructions_list: InnerInstructionsList,
35}
36
37impl TryFrom<crate::datasource::TransactionUpdate> for TransactionMetadata {
38    type Error = crate::error::Error;
39
40    fn try_from(value: crate::datasource::TransactionUpdate) -> Result<Self, Self::Error> {
41        log::trace!("try_from(transaction_update: {:?})", value);
42        let accounts = value
43            .transaction
44            .runtime_transaction
45            .message
46            .get_unique_instruction_account_keys();
47
48        Ok(TransactionMetadata {
49            id: value.transaction.txid().to_string(),
50            fee_payer: *accounts
51                .iter()
52                .next()
53                .ok_or(crate::error::Error::MissingFeePayer)?,
54            message: value.transaction.runtime_transaction.message.clone(),
55            status: value.transaction.status,
56            log_messages: value.transaction.logs,
57            rollback_status: value.transaction.rollback_status,
58            block_height: value.height,
59            bitcoin_txid: value.transaction.bitcoin_txid.map(|txid| txid.to_string()),
60            bitcoin_tx: None,
61            inner_instructions_list: value.transaction.inner_instructions_list,
62        })
63    }
64}
65
66pub type TransactionProcessorInputType<T, U = ()> = (
67    Arc<TransactionMetadata>,
68    Vec<(InstructionMetadata, DecodedInstruction<T>)>,
69    Option<U>,
70);
71
72pub struct TransactionPipe<T: InstructionDecoderCollection, U> {
73    schema: Option<TransactionSchema<T>>,
74    processor: Box<
75        dyn Processor<InputType = TransactionProcessorInputType<T, U>, OutputType = ()>
76            + Send
77            + Sync,
78    >,
79    filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
80}
81
82/// Represents a parsed transaction, including its metadata and parsed
83/// instructions.
84pub struct ParsedTransaction<I: InstructionDecoderCollection> {
85    pub metadata: TransactionMetadata,
86    pub instructions: Vec<ParsedInstruction<I>>,
87}
88
89impl<T: InstructionDecoderCollection, U> TransactionPipe<T, U> {
90    pub fn new(
91        schema: Option<TransactionSchema<T>>,
92        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>, OutputType = ()>
93            + Send
94            + Sync
95            + 'static,
96        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
97    ) -> Self {
98        log::trace!(
99            "TransactionPipe::new(schema: {:?}, processor: {:?})",
100            schema,
101            stringify!(processor)
102        );
103        Self {
104            schema,
105            processor: Box::new(processor),
106            filters,
107        }
108    }
109
110    fn matches_schema(&self, instructions: &[ParsedInstruction<T>]) -> Option<U>
111    where
112        U: DeserializeOwned,
113    {
114        match self.schema {
115            Some(ref schema) => schema.match_schema(instructions),
116            None => None,
117        }
118    }
119}
120
121pub fn parse_instructions<T: InstructionDecoderCollection>(
122    nested_ixs: &[NestedInstruction],
123) -> Vec<ParsedInstruction<T>> {
124    log::trace!("parse_instructions(nested_ixs: {:?})", nested_ixs);
125
126    let mut parsed_instructions: Vec<ParsedInstruction<T>> = Vec::new();
127
128    for nested_ix in nested_ixs {
129        if let Some(instruction) = T::parse_instruction(&nested_ix.instruction) {
130            parsed_instructions.push(ParsedInstruction {
131                program_id: nested_ix.instruction.program_id,
132                instruction,
133                inner_instructions: parse_instructions(&nested_ix.inner_instructions),
134            });
135        } else {
136            for inner_ix in nested_ix.inner_instructions.iter() {
137                parsed_instructions.extend(parse_instructions(&[inner_ix.clone()]));
138            }
139        }
140    }
141
142    parsed_instructions
143}
144
145#[async_trait]
146/// Parses instructions for each transaction and forwards to a processor.
147///
148/// Implementations should apply `Filter`s before processing and handle
149/// large batches efficiently. Implementors are expected to only process
150/// instructions newer than their configured cutoff when used with sync.
151pub trait TransactionPipes<'a>: Send + Sync {
152    async fn run(
153        &mut self,
154        transactions: Vec<(Arc<TransactionMetadata>, &[NestedInstruction])>,
155        metrics: Arc<MetricsCollection>,
156    ) -> IndexerResult<()>;
157
158    fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
159}
160
161#[async_trait]
162impl<T, U> TransactionPipes<'_> for TransactionPipe<T, U>
163where
164    T: InstructionDecoderCollection + Sync + 'static,
165    U: DeserializeOwned + Send + Sync + 'static,
166{
167    async fn run(
168        &mut self,
169        transactions: Vec<(Arc<TransactionMetadata>, &[NestedInstruction])>,
170        metrics: Arc<MetricsCollection>,
171    ) -> IndexerResult<()> {
172        log::trace!(
173            "TransactionPipe::run(transactions: {:?}, metrics)",
174            transactions,
175        );
176
177        let mut processed_transactions: Vec<TransactionProcessorInputType<T, U>> = Vec::new();
178        for (transaction_metadata, instructions) in transactions {
179            let parsed_instructions = parse_instructions(instructions);
180
181            let unnested_instructions = unnest_parsed_instructions(
182                transaction_metadata.clone(),
183                parsed_instructions.clone(),
184                0,
185                vec![],
186            );
187
188            let matched_data = self.matches_schema(&parsed_instructions);
189            processed_transactions.push((
190                transaction_metadata,
191                unnested_instructions,
192                matched_data,
193            ));
194        }
195        self.processor
196            .process(processed_transactions, metrics)
197            .await?;
198
199        Ok(())
200    }
201
202    fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
203        &self.filters
204    }
205}