carbon_core/
transaction.rs

1//! Provides types and traits for handling transaction processing in the
2//! `carbon-core` framework. It also provides utilities for matching
3//! transactions to schemas and executing custom processing logic on matched
4//! data.
5//!
6//! ## Key Components
7//!
8//! - **TransactionPipe**: Represents a processing pipe for transactions, with
9//!   functionality to parse and match instructions against a schema and handle
10//!   matched data with a specified processor.
11//! - **TransactionMetadata**: Metadata associated with a transaction, including
12//!   slot, signature, and fee payer information.
13//! - **ParsedTransaction**: Represents a transaction with its metadata and
14//!   parsed instructions.
15//!
16//! ## Usage
17//!
18//! To use this module, create a `TransactionPipe` with a transaction schema and
19//! a processor. Then, run the transaction pipe with a set of instructions and
20//! metrics to parse, match, and process transaction data. The `run` method will
21//! asynchronously handle these steps.
22//!
23//! ## Notes
24//!
25//! - **Nested Instructions**: The `TransactionPipe` supports nested
26//!   instructions within transactions, allowing for multi-level transaction
27//!   processing.
28//! - **Schema Matching**: The `TransactionPipe` will match transaction
29//!   instructions against the provided schema, only processing the data if it
30//!   conforms to the schema.
31
32use solana_program::hash::Hash;
33use {
34    crate::{
35        collection::InstructionDecoderCollection,
36        error::CarbonResult,
37        instruction::{DecodedInstruction, InstructionMetadata, NestedInstruction},
38        metrics::MetricsCollection,
39        processor::Processor,
40        schema::{ParsedInstruction, TransactionSchema},
41        transformers,
42    },
43    async_trait::async_trait,
44    core::convert::TryFrom,
45    serde::de::DeserializeOwned,
46    solana_pubkey::Pubkey,
47    solana_signature::Signature,
48    std::sync::Arc,
49};
50/// Contains metadata about a transaction, including its slot, signature, fee
51/// payer, transaction status metadata, the version transaction message and its
52/// block time.
53///
54/// # Fields
55/// - `slot`: The slot number in which this transaction was processed
56/// - `signature`: The unique signature of this transaction
57/// - `fee_payer`: The public key of the fee payer account that paid for this
58///   transaction
59/// - `meta`: Transaction status metadata containing execution status, fees,
60///   balances, and other metadata
61/// - `message`: The versioned message containing the transaction instructions
62///   and account keys
63/// - `block_time`: The Unix timestamp of when the transaction was processed.
64///
65/// Note: The `block_time` field may not be returned in all scenarios.
66#[derive(Debug, Clone)]
67pub struct TransactionMetadata {
68    pub slot: u64,
69    pub signature: Signature,
70    pub fee_payer: Pubkey,
71    pub meta: solana_transaction_status::TransactionStatusMeta,
72    pub message: solana_program::message::VersionedMessage,
73    pub block_time: Option<i64>,
74    pub block_hash: Option<Hash>,
75}
76
77impl Default for TransactionMetadata {
78    fn default() -> Self {
79        Self {
80            slot: 0,
81            signature: Signature::new_unique(),
82            fee_payer: Pubkey::new_unique(),
83            meta: solana_transaction_status::TransactionStatusMeta::default(),
84            message: solana_sdk::message::VersionedMessage::Legacy(
85                solana_sdk::message::Message::default(),
86            ),
87            block_time: None,
88            block_hash: None,
89        }
90    }
91}
92/// Tries convert transaction update into the metadata.
93///
94/// This function retrieves core metadata such as the transaction's slot,
95/// signature, and fee payer from the transaction's message. It ensures that
96/// these details are available and ready for further processing.
97///
98/// # Parameters
99///
100/// - `transaction_update`: The `TransactionUpdate` containing the transaction
101///   details.
102///
103/// # Returns
104///
105/// A `CarbonResult<TransactionMetadata>` which includes the slot, signature,
106/// fee payer, transaction status metadata and the version transaction message.
107///
108/// # Errors
109///
110/// Returns an error if the fee payer cannot be extracted from the transaction's
111/// account keys.
112impl TryFrom<crate::datasource::TransactionUpdate> for TransactionMetadata {
113    type Error = crate::error::Error;
114
115    fn try_from(value: crate::datasource::TransactionUpdate) -> Result<Self, Self::Error> {
116        log::trace!("try_from(transaction_update: {:?})", value);
117        let accounts = value.transaction.message.static_account_keys();
118
119        Ok(TransactionMetadata {
120            slot: value.slot,
121            signature: value.signature,
122            fee_payer: *accounts
123                .first()
124                .ok_or(crate::error::Error::MissingFeePayer)?,
125            meta: value.meta.clone(),
126            message: value.transaction.message.clone(),
127            block_time: value.block_time,
128            block_hash: value.block_hash,
129        })
130    }
131}
132
133/// The input type for the transaction processor.
134///
135/// - `T`: The instruction type, implementing `InstructionDecoderCollection`.
136/// - `U`: The output type for the matched data, if schema-matching,
137///   implementing `DeserializeOwned`.
138pub type TransactionProcessorInputType<T, U = ()> = (
139    Arc<TransactionMetadata>,
140    Vec<(InstructionMetadata, DecodedInstruction<T>)>,
141    Option<U>,
142);
143
144/// A pipe for processing transactions based on a defined schema and processor.
145///
146/// The `TransactionPipe` parses a transaction's instructions, optionally checks
147/// them against the schema, and runs the processor if the instructions match
148/// the schema. It provides methods for parsing nested instructions and matching
149/// transaction data to the schema.
150///
151/// ## Generics
152///
153/// - `T`: The instruction type, implementing `InstructionDecoderCollection`.
154/// - `U`: The output type for the matched data, if schema-matching,
155///   implementing `DeserializeOwned`.
156pub struct TransactionPipe<T: InstructionDecoderCollection, U> {
157    schema: Option<TransactionSchema<T>>,
158    processor: Box<dyn Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync>,
159}
160
161/// Represents a parsed transaction, including its metadata and parsed
162/// instructions.
163pub struct ParsedTransaction<I: InstructionDecoderCollection> {
164    pub metadata: TransactionMetadata,
165    pub instructions: Vec<ParsedInstruction<I>>,
166}
167
168impl<T: InstructionDecoderCollection, U> TransactionPipe<T, U> {
169    /// Creates a new `TransactionPipe` with the specified schema and processor.
170    ///
171    /// # Parameters
172    ///
173    /// - `schema`: The schema against which to match transaction instructions.
174    /// - `processor`: The processor that will handle matched transaction data.
175    ///
176    /// # Returns
177    ///
178    /// A `TransactionPipe` instance configured with the specified schema and
179    /// processor.
180    pub fn new(
181        schema: Option<TransactionSchema<T>>,
182        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
183            + Send
184            + Sync
185            + 'static,
186    ) -> Self {
187        log::trace!(
188            "TransactionPipe::new(schema: {:?}, processor: {:?})",
189            schema,
190            stringify!(processor)
191        );
192        Self {
193            schema,
194            processor: Box::new(processor),
195        }
196    }
197
198    /// Matches parsed instructions against the schema and returns the data as
199    /// type `U`.
200    ///
201    /// The method only returns data if the parsed instructions conform to the
202    /// schema.
203    ///
204    /// # Parameters
205    ///
206    /// - `instructions`: A slice of `ParsedInstruction` to be matched against
207    ///   the schema.
208    ///
209    /// # Returns
210    ///
211    /// An `Option<U>` containing the deserialized matched data if the
212    /// instructions match the schema.
213    fn matches_schema(&self, instructions: &[ParsedInstruction<T>]) -> Option<U>
214    where
215        U: DeserializeOwned,
216    {
217        match self.schema {
218            Some(ref schema) => schema.match_schema(instructions),
219            None => None,
220        }
221    }
222}
223
224/// Parses nested instructions into a list of `ParsedInstruction`.
225///
226/// This method recursively traverses the nested instructions and parses
227/// each one, creating a structured representation of the instructions.
228///
229/// # Parameters
230///
231/// - `nested_ixs`: A slice of `NestedInstruction` representing the instructions
232///   to be parsed.
233///
234/// # Returns
235///
236/// A `Box<Vec<ParsedInstruction<T>>>` containing the parsed instructions.
237pub fn parse_instructions<T: InstructionDecoderCollection>(
238    nested_ixs: &[NestedInstruction],
239) -> Vec<ParsedInstruction<T>> {
240    log::trace!("parse_instructions(nested_ixs: {:?})", nested_ixs);
241
242    let mut parsed_instructions: Vec<ParsedInstruction<T>> = Vec::new();
243
244    for nested_ix in nested_ixs {
245        if let Some(instruction) = T::parse_instruction(&nested_ix.instruction) {
246            parsed_instructions.push(ParsedInstruction {
247                program_id: nested_ix.instruction.program_id,
248                instruction,
249                inner_instructions: parse_instructions(&nested_ix.inner_instructions),
250            });
251        } else {
252            for inner_ix in nested_ix.inner_instructions.iter() {
253                parsed_instructions.extend(parse_instructions(&[inner_ix.clone()]));
254            }
255        }
256    }
257
258    parsed_instructions
259}
260
261/// Defines an asynchronous trait for processing transactions.
262///
263/// This trait provides a method for running transaction pipes, handling parsed
264/// instructions with associated metrics, and leveraging `TransactionPipe`
265/// implementations.
266#[async_trait]
267pub trait TransactionPipes<'a>: Send + Sync {
268    /// Runs the transaction pipe with the provided instructions and metrics.
269    ///
270    /// The method parses the instructions, matches them against the schema, and
271    /// processes the matched data asynchronously.
272    ///
273    /// # Parameters
274    ///
275    /// - `instructions`: A slice of `NestedInstruction` containing the
276    ///   transaction instructions.
277    /// - `metrics`: A vector of metrics instances for performance tracking.
278    ///
279    /// # Returns
280    ///
281    /// A `CarbonResult<()>` indicating success or failure.
282    async fn run(
283        &mut self,
284        transaction_metadata: Arc<TransactionMetadata>,
285        instructions: &[NestedInstruction],
286        metrics: Arc<MetricsCollection>,
287    ) -> CarbonResult<()>;
288}
289
290#[async_trait]
291impl<T, U> TransactionPipes<'_> for TransactionPipe<T, U>
292where
293    T: InstructionDecoderCollection + Sync + 'static,
294    U: DeserializeOwned + Send + Sync + 'static,
295{
296    async fn run(
297        &mut self,
298        transaction_metadata: Arc<TransactionMetadata>,
299        instructions: &[NestedInstruction],
300        metrics: Arc<MetricsCollection>,
301    ) -> CarbonResult<()> {
302        log::trace!(
303            "TransactionPipe::run(instructions: {:?}, metrics)",
304            instructions,
305        );
306
307        let parsed_instructions = parse_instructions(instructions);
308
309        let matched_data = self.matches_schema(&parsed_instructions);
310
311        let unnested_instructions = transformers::unnest_parsed_instructions(
312            transaction_metadata.clone(),
313            parsed_instructions,
314            0,
315        );
316
317        self.processor
318            .process(
319                (transaction_metadata, unnested_instructions, matched_data),
320                metrics,
321            )
322            .await?;
323
324        Ok(())
325    }
326}