carbon_core/transaction.rs
1//! Provides types and traits for handling transaction processing in the
2//! `carbon-core` framework. It also provides utilities for matching
3//! transactions to schemas and executing custom processing logic on matched
4//! data.
5//!
6//! ## Key Components
7//!
8//! - **TransactionPipe**: Represents a processing pipe for transactions, with
9//! functionality to parse and match instructions against a schema and handle
10//! matched data with a specified processor.
11//! - **TransactionMetadata**: Metadata associated with a transaction, including
12//! slot, signature, and fee payer information.
13//! - **ParsedTransaction**: Represents a transaction with its metadata and
14//! parsed instructions.
15//!
16//! ## Usage
17//!
18//! To use this module, create a `TransactionPipe` with a transaction schema and
19//! a processor. Then, run the transaction pipe with a set of instructions and
20//! metrics to parse, match, and process transaction data. The `run` method will
21//! asynchronously handle these steps.
22//!
23//! ## Notes
24//!
25//! - **Nested Instructions**: The `TransactionPipe` supports nested
26//! instructions within transactions, allowing for multi-level transaction
27//! processing.
28//! - **Schema Matching**: The `TransactionPipe` will match transaction
29//! instructions against the provided schema, only processing the data if it
30//! conforms to the schema.
31
32use solana_program::hash::Hash;
33use {
34 crate::{
35 collection::InstructionDecoderCollection,
36 error::CarbonResult,
37 instruction::{DecodedInstruction, InstructionMetadata, NestedInstruction},
38 metrics::MetricsCollection,
39 processor::Processor,
40 schema::{ParsedInstruction, TransactionSchema},
41 transformers,
42 },
43 async_trait::async_trait,
44 core::convert::TryFrom,
45 serde::de::DeserializeOwned,
46 solana_pubkey::Pubkey,
47 solana_signature::Signature,
48 std::sync::Arc,
49};
50/// Contains metadata about a transaction, including its slot, signature, fee
51/// payer, transaction status metadata, the version transaction message and its
52/// block time.
53///
54/// # Fields
55/// - `slot`: The slot number in which this transaction was processed
56/// - `signature`: The unique signature of this transaction
57/// - `fee_payer`: The public key of the fee payer account that paid for this
58/// transaction
59/// - `meta`: Transaction status metadata containing execution status, fees,
60/// balances, and other metadata
61/// - `message`: The versioned message containing the transaction instructions
62/// and account keys
63/// - `block_time`: The Unix timestamp of when the transaction was processed.
64///
65/// Note: The `block_time` field may not be returned in all scenarios.
66#[derive(Debug, Clone)]
67pub struct TransactionMetadata {
68 pub slot: u64,
69 pub signature: Signature,
70 pub fee_payer: Pubkey,
71 pub meta: solana_transaction_status::TransactionStatusMeta,
72 pub message: solana_program::message::VersionedMessage,
73 pub block_time: Option<i64>,
74 pub block_hash: Option<Hash>,
75}
76
77impl Default for TransactionMetadata {
78 fn default() -> Self {
79 Self {
80 slot: 0,
81 signature: Signature::new_unique(),
82 fee_payer: Pubkey::new_unique(),
83 meta: solana_transaction_status::TransactionStatusMeta::default(),
84 message: solana_sdk::message::VersionedMessage::Legacy(
85 solana_sdk::message::Message::default(),
86 ),
87 block_time: None,
88 block_hash: None,
89 }
90 }
91}
92/// Tries convert transaction update into the metadata.
93///
94/// This function retrieves core metadata such as the transaction's slot,
95/// signature, and fee payer from the transaction's message. It ensures that
96/// these details are available and ready for further processing.
97///
98/// # Parameters
99///
100/// - `transaction_update`: The `TransactionUpdate` containing the transaction
101/// details.
102///
103/// # Returns
104///
105/// A `CarbonResult<TransactionMetadata>` which includes the slot, signature,
106/// fee payer, transaction status metadata and the version transaction message.
107///
108/// # Errors
109///
110/// Returns an error if the fee payer cannot be extracted from the transaction's
111/// account keys.
112impl TryFrom<crate::datasource::TransactionUpdate> for TransactionMetadata {
113 type Error = crate::error::Error;
114
115 fn try_from(value: crate::datasource::TransactionUpdate) -> Result<Self, Self::Error> {
116 log::trace!("try_from(transaction_update: {:?})", value);
117 let accounts = value.transaction.message.static_account_keys();
118
119 Ok(TransactionMetadata {
120 slot: value.slot,
121 signature: value.signature,
122 fee_payer: *accounts
123 .first()
124 .ok_or(crate::error::Error::MissingFeePayer)?,
125 meta: value.meta.clone(),
126 message: value.transaction.message.clone(),
127 block_time: value.block_time,
128 block_hash: value.block_hash,
129 })
130 }
131}
132
133/// The input type for the transaction processor.
134///
135/// - `T`: The instruction type, implementing `InstructionDecoderCollection`.
136/// - `U`: The output type for the matched data, if schema-matching,
137/// implementing `DeserializeOwned`.
138pub type TransactionProcessorInputType<T, U = ()> = (
139 Arc<TransactionMetadata>,
140 Vec<(InstructionMetadata, DecodedInstruction<T>)>,
141 Option<U>,
142);
143
144/// A pipe for processing transactions based on a defined schema and processor.
145///
146/// The `TransactionPipe` parses a transaction's instructions, optionally checks
147/// them against the schema, and runs the processor if the instructions match
148/// the schema. It provides methods for parsing nested instructions and matching
149/// transaction data to the schema.
150///
151/// ## Generics
152///
153/// - `T`: The instruction type, implementing `InstructionDecoderCollection`.
154/// - `U`: The output type for the matched data, if schema-matching,
155/// implementing `DeserializeOwned`.
156pub struct TransactionPipe<T: InstructionDecoderCollection, U> {
157 schema: Option<TransactionSchema<T>>,
158 processor: Box<dyn Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync>,
159}
160
161/// Represents a parsed transaction, including its metadata and parsed
162/// instructions.
163pub struct ParsedTransaction<I: InstructionDecoderCollection> {
164 pub metadata: TransactionMetadata,
165 pub instructions: Vec<ParsedInstruction<I>>,
166}
167
168impl<T: InstructionDecoderCollection, U> TransactionPipe<T, U> {
169 /// Creates a new `TransactionPipe` with the specified schema and processor.
170 ///
171 /// # Parameters
172 ///
173 /// - `schema`: The schema against which to match transaction instructions.
174 /// - `processor`: The processor that will handle matched transaction data.
175 ///
176 /// # Returns
177 ///
178 /// A `TransactionPipe` instance configured with the specified schema and
179 /// processor.
180 pub fn new(
181 schema: Option<TransactionSchema<T>>,
182 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
183 + Send
184 + Sync
185 + 'static,
186 ) -> Self {
187 log::trace!(
188 "TransactionPipe::new(schema: {:?}, processor: {:?})",
189 schema,
190 stringify!(processor)
191 );
192 Self {
193 schema,
194 processor: Box::new(processor),
195 }
196 }
197
198 /// Matches parsed instructions against the schema and returns the data as
199 /// type `U`.
200 ///
201 /// The method only returns data if the parsed instructions conform to the
202 /// schema.
203 ///
204 /// # Parameters
205 ///
206 /// - `instructions`: A slice of `ParsedInstruction` to be matched against
207 /// the schema.
208 ///
209 /// # Returns
210 ///
211 /// An `Option<U>` containing the deserialized matched data if the
212 /// instructions match the schema.
213 fn matches_schema(&self, instructions: &[ParsedInstruction<T>]) -> Option<U>
214 where
215 U: DeserializeOwned,
216 {
217 match self.schema {
218 Some(ref schema) => schema.match_schema(instructions),
219 None => None,
220 }
221 }
222}
223
224/// Parses nested instructions into a list of `ParsedInstruction`.
225///
226/// This method recursively traverses the nested instructions and parses
227/// each one, creating a structured representation of the instructions.
228///
229/// # Parameters
230///
231/// - `nested_ixs`: A slice of `NestedInstruction` representing the instructions
232/// to be parsed.
233///
234/// # Returns
235///
236/// A `Box<Vec<ParsedInstruction<T>>>` containing the parsed instructions.
237pub fn parse_instructions<T: InstructionDecoderCollection>(
238 nested_ixs: &[NestedInstruction],
239) -> Vec<ParsedInstruction<T>> {
240 log::trace!("parse_instructions(nested_ixs: {:?})", nested_ixs);
241
242 let mut parsed_instructions: Vec<ParsedInstruction<T>> = Vec::new();
243
244 for nested_ix in nested_ixs {
245 if let Some(instruction) = T::parse_instruction(&nested_ix.instruction) {
246 parsed_instructions.push(ParsedInstruction {
247 program_id: nested_ix.instruction.program_id,
248 instruction,
249 inner_instructions: parse_instructions(&nested_ix.inner_instructions),
250 });
251 } else {
252 for inner_ix in nested_ix.inner_instructions.iter() {
253 parsed_instructions.extend(parse_instructions(&[inner_ix.clone()]));
254 }
255 }
256 }
257
258 parsed_instructions
259}
260
261/// Defines an asynchronous trait for processing transactions.
262///
263/// This trait provides a method for running transaction pipes, handling parsed
264/// instructions with associated metrics, and leveraging `TransactionPipe`
265/// implementations.
266#[async_trait]
267pub trait TransactionPipes<'a>: Send + Sync {
268 /// Runs the transaction pipe with the provided instructions and metrics.
269 ///
270 /// The method parses the instructions, matches them against the schema, and
271 /// processes the matched data asynchronously.
272 ///
273 /// # Parameters
274 ///
275 /// - `instructions`: A slice of `NestedInstruction` containing the
276 /// transaction instructions.
277 /// - `metrics`: A vector of metrics instances for performance tracking.
278 ///
279 /// # Returns
280 ///
281 /// A `CarbonResult<()>` indicating success or failure.
282 async fn run(
283 &mut self,
284 transaction_metadata: Arc<TransactionMetadata>,
285 instructions: &[NestedInstruction],
286 metrics: Arc<MetricsCollection>,
287 ) -> CarbonResult<()>;
288}
289
290#[async_trait]
291impl<T, U> TransactionPipes<'_> for TransactionPipe<T, U>
292where
293 T: InstructionDecoderCollection + Sync + 'static,
294 U: DeserializeOwned + Send + Sync + 'static,
295{
296 async fn run(
297 &mut self,
298 transaction_metadata: Arc<TransactionMetadata>,
299 instructions: &[NestedInstruction],
300 metrics: Arc<MetricsCollection>,
301 ) -> CarbonResult<()> {
302 log::trace!(
303 "TransactionPipe::run(instructions: {:?}, metrics)",
304 instructions,
305 );
306
307 let parsed_instructions = parse_instructions(instructions);
308
309 let matched_data = self.matches_schema(&parsed_instructions);
310
311 let unnested_instructions = transformers::unnest_parsed_instructions(
312 transaction_metadata.clone(),
313 parsed_instructions,
314 0,
315 );
316
317 self.processor
318 .process(
319 (transaction_metadata, unnested_instructions, matched_data),
320 metrics,
321 )
322 .await?;
323
324 Ok(())
325 }
326}