carbon_core/
datasource.rs

1//! Provides traits and structures for managing and consuming data updates from
2//! various sources.
3//!
4//! The `datasource` module defines the `Datasource` trait and associated data
5//! types for handling updates related to accounts, transactions, and account
6//! deletions. This module allows for flexible data ingestion from various
7//! Solana data sources, enabling integration with the `carbon-core` processing
8//! pipeline.
9//!
10//! # Overview
11//!
12//! The core component of this module is the `Datasource` trait, which
13//! represents an interface for consuming data updates asynchronously.
14//! Implementations of `Datasource` provide the logic for fetching data updates
15//! and delivering them via a channel to the pipeline. Different types of
16//! updates are represented by the `Update` enum, including:
17//! - `AccountUpdate`: Represents updates to accounts, including the account's
18//!   public key, slot, and other account data.
19//! - `TransactionUpdate`: Represents transaction updates, including transaction
20//!   details, signature, and status metadata.
21//! - `AccountDeletion`: Represents account deletion events, indicating when an
22//!   account is removed from the blockchain state.
23//!
24//! The module also includes the `UpdateType` enum to categorize the kinds of
25//! updates that a data source can provide.
26//!
27//! # Notes
28//!
29//! - The `Datasource` trait is asynchronous and should be used within a Tokio
30//!   runtime.
31//! - Use the `Update` enum to encapsulate data updates of different types. This
32//!   helps centralize handling of all update kinds in the pipeline.
33//! - Ensure implementations handle errors gracefully, especially when fetching
34//!   data and sending updates to the pipeline.
35
36use solana_program::hash::Hash;
37use solana_transaction_status::Rewards;
38use {
39    crate::{error::CarbonResult, metrics::MetricsCollection},
40    async_trait::async_trait,
41    solana_account::Account,
42    solana_pubkey::Pubkey,
43    solana_signature::Signature,
44    solana_transaction::versioned::VersionedTransaction,
45    solana_transaction_status::TransactionStatusMeta,
46    std::sync::Arc,
47    tokio_util::sync::CancellationToken,
48};
49
50/// Defines the interface for data sources that produce updates for accounts,
51/// transactions, and account deletions.
52///
53/// The `Datasource` trait represents a data source that can be consumed
54/// asynchronously within a pipeline. Implementations of this trait are
55/// responsible for fetching updates and sending them through a channel to be
56/// processed further. Each datasource specifies the types of updates it
57/// supports by implementing the `update_types` method.
58///
59/// # Required Methods
60///
61/// - `consume`: Initiates the asynchronous consumption of updates. This method
62///   should send updates through the provided `sender` channel.
63/// - `update_types`: Returns a list of `UpdateType` variants indicating the
64///   types of updates the datasource can provide.
65///
66/// # Example
67///
68/// ```ignore
69/// use std::sync::Arc;
70/// use carbon_core::datasource::UpdateType;
71/// use carbon_core::datasource::Update;
72/// use carbon_core::error::CarbonResult;
73/// use carbon_core::metrics::MetricsCollection;
74/// use carbon_core::datasource::Datasource;
75/// use tokio_util::sync::CancellationToken;
76/// use async_trait::async_trait;
77///
78/// #[async_trait]
79/// impl Datasource for MyDatasource {
80///     async fn consume(
81///         &self,
82///         sender: &tokio::sync::mpsc::UnboundedSender<Update>,
83///         cancellation_token: CancellationToken,
84///         metrics: Arc<MetricsCollection>,
85///     ) -> CarbonResult<tokio::task::AbortHandle> {
86///         // Implement update fetching logic
87///     }
88///
89///     fn update_types(&self) -> Vec<UpdateType> {
90///         vec![UpdateType::AccountUpdate, UpdateType::Transaction]
91///     }
92/// }
93/// ```
94///
95/// # Notes
96///
97/// - This trait is marked with `async_trait`, so implementations must be
98///   asynchronous.
99/// - The `consume` method should handle errors and retries to ensure robust
100///   update delivery.
101#[async_trait]
102pub trait Datasource: Send + Sync {
103    async fn consume(
104        &self,
105        id: DatasourceId,
106        sender: tokio::sync::mpsc::Sender<(Update, DatasourceId)>,
107        cancellation_token: CancellationToken,
108        metrics: Arc<MetricsCollection>,
109    ) -> CarbonResult<()>;
110
111    fn update_types(&self) -> Vec<UpdateType>;
112}
113
114/// A unique identifier for a datasource in the pipeline.
115///
116/// Datasource IDs are used to track the source of data updates and enable
117/// filtering of updates based on their origin. This is particularly useful
118/// when you have multiple datasources and want to process updates selectively.
119///
120/// # Examples
121///
122/// Creating a datasource with a unique ID:
123/// ```
124/// use carbon_core::datasource::DatasourceId;
125///
126/// let id = DatasourceId::new_unique();
127/// println!("Generated ID: {:?}", id);
128/// ```
129///
130/// Creating a datasource with a named ID:
131/// ```
132/// use carbon_core::datasource::DatasourceId;
133///
134/// let id = DatasourceId::new_named("mainnet-rpc");
135/// println!("Named ID: {:?}", id);
136/// ```
137///
138/// Using with filters:
139/// ```
140/// use carbon_core::{datasource::DatasourceId, filter::DatasourceFilter};
141///
142/// let datasource_id = DatasourceId::new_named("testnet");
143/// let filter = DatasourceFilter::new(datasource_id);
144/// ```
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
146pub struct DatasourceId(String);
147
148impl DatasourceId {
149    /// Creates a new datasource ID with a randomly generated unique identifier.
150    ///
151    /// This method uses a cryptographically secure random number generator
152    /// to create a unique ID. The ID is converted to a string representation
153    /// for easy debugging and logging.
154    ///
155    /// # Returns
156    ///
157    /// A new `DatasourceId` with a unique random identifier.
158    ///
159    /// # Examples
160    ///
161    /// ```
162    /// use carbon_core::datasource::DatasourceId;
163    ///
164    /// let id1 = DatasourceId::new_unique();
165    /// let id2 = DatasourceId::new_unique();
166    /// assert_ne!(id1, id2); // IDs should be different
167    /// ```
168    pub fn new_unique() -> Self {
169        Self(uuid::Uuid::new_v4().to_string())
170    }
171
172    /// Creates a new datasource ID with a specific name.
173    ///
174    /// This method is useful when you want to assign a meaningful name
175    /// to a datasource for easier identification and debugging.
176    ///
177    /// # Arguments
178    ///
179    /// * `name` - A string slice containing the name for the datasource ID
180    ///
181    /// # Returns
182    ///
183    /// A new `DatasourceId` with the specified name.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use carbon_core::datasource::DatasourceId;
189    ///
190    /// let mainnet_id = DatasourceId::new_named("mainnet-rpc");
191    /// let testnet_id = DatasourceId::new_named("testnet-rpc");
192    /// assert_ne!(mainnet_id, testnet_id);
193    /// ```
194    pub fn new_named(name: &str) -> Self {
195        Self(name.to_string())
196    }
197}
198
199/// Represents a data update in the `carbon-core` pipeline, encompassing
200/// different update types.
201///
202/// - `Account`: Represents an update to an account's data.
203/// - `Transaction`: Represents a transaction-related update, including
204///   transaction metadata.
205/// - `AccountDeletion`: Represents an event where an account has been deleted.
206#[derive(Debug, Clone)]
207pub enum Update {
208    Account(AccountUpdate),
209    Transaction(Box<TransactionUpdate>),
210    AccountDeletion(AccountDeletion),
211    BlockDetails(BlockDetails),
212}
213
214/// Enumerates the types of updates a datasource can provide.
215///
216/// The `UpdateType` enum categorizes updates into three types:
217/// - `AccountUpdate`: Indicates that the datasource provides account updates.
218/// - `Transaction`: Indicates that the datasource provides transaction updates.
219/// - `AccountDeletion`: Indicates that the datasource provides account deletion
220///   events.
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub enum UpdateType {
223    AccountUpdate,
224    Transaction,
225    AccountDeletion,
226}
227
228/// Represents an update to a Solana account, including its public key, data,
229/// and slot information.
230///
231/// The `AccountUpdate` struct encapsulates the essential information for an
232/// account update, containing the account's `pubkey`, `account` data, and the
233/// `slot` at which the update occurred.
234///
235/// - `pubkey`: The public key of the account being updated.
236/// - `account`: The new state of the account.
237/// - `slot`: The slot number in which this account update was recorded.
238/// - `transaction_signature`: Signature of the transaction that caused the update.
239#[derive(Debug, Clone)]
240pub struct AccountUpdate {
241    pub pubkey: Pubkey,
242    pub account: Account,
243    pub slot: u64,
244    pub transaction_signature: Option<Signature>,
245}
246
247/// Represents the details of a Solana block, including its slot, hashes, rewards, and timing information.
248///
249/// The `BlockDetails` struct encapsulates the essential information for a block,
250/// providing details about its slot, blockhashes, rewards, and other metadata.
251///
252/// - `slot`: The slot number in which this block was recorded.
253/// - `previous_block_hash`: The hash of the previous block in the blockchain.
254/// - `block_hash`: The hash of the current block.
255/// - `rewards`: Optional rewards information associated with the block, such as staking rewards.
256/// - `num_reward_partitions`: Optional number of reward partitions in the block.
257/// - `block_time`: Optional Unix timestamp indicating when the block was processed.
258/// - `block_height`: Optional height of the block in the blockchain.#[derive(Debug, Clone)]
259#[derive(Debug, Clone)]
260pub struct BlockDetails {
261    pub slot: u64,
262    pub block_hash: Option<Hash>,
263    pub previous_block_hash: Option<Hash>,
264    pub rewards: Option<Rewards>,
265    pub num_reward_partitions: Option<u64>,
266    pub block_time: Option<i64>,
267    pub block_height: Option<u64>,
268}
269
270/// Represents the deletion of a Solana account, containing the account's public
271/// key and slot information.
272///
273/// The `AccountDeletion` struct indicates that an account has been removed from
274/// the blockchain state, providing the `pubkey` of the deleted account and the
275/// `slot` in which the deletion occurred.
276///
277/// - `pubkey`: The public key of the deleted account.
278/// - `slot`: The slot number in which the account was deleted.
279/// - `transaction_signature`: Signature of the transaction that caused the update.
280#[derive(Debug, Clone)]
281pub struct AccountDeletion {
282    pub pubkey: Pubkey,
283    pub slot: u64,
284    pub transaction_signature: Option<Signature>,
285}
286
287/// Represents a transaction update in the Solana network, including transaction
288/// metadata, status, slot information and block time.
289///
290/// The `TransactionUpdate` struct provides detailed information about a
291/// transaction, including its `signature`, `transaction` data, `meta` status,
292/// and the `slot` where it was recorded. Additionally, it includes a `is_vote`
293/// flag to indicate whether the transaction is a voting transaction.
294///
295/// - `signature`: The unique signature of the transaction.
296/// - `transaction`: The complete `VersionedTransaction` data of the
297///   transaction.
298/// - `meta`: Metadata about the transaction's status, such as fee information
299///   and logs.
300/// - `is_vote`: A boolean indicating whether the transaction is a vote.
301/// - `slot`: The slot number in which the transaction was recorded.
302/// - `block_time`: The Unix timestamp of when the transaction was processed.
303/// - `block_hash`: Block hash that can be used to detect a fork.
304///
305/// Note: The `block_time` field may not be returned in all scenarios.
306#[derive(Debug, Clone)]
307pub struct TransactionUpdate {
308    pub signature: Signature,
309    pub transaction: VersionedTransaction, // TODO: replace with solana_transaction crate after 2.2.0 release
310    pub meta: TransactionStatusMeta,
311    pub is_vote: bool,
312    pub slot: u64,
313    pub block_time: Option<i64>,
314    pub block_hash: Option<Hash>,
315}