carbon_core/datasource.rs
1//! Provides traits and structures for managing and consuming data updates from
2//! various sources.
3//!
4//! The `datasource` module defines the `Datasource` trait and associated data
5//! types for handling updates related to accounts, transactions, and account
6//! deletions. This module allows for flexible data ingestion from various
7//! Solana data sources, enabling integration with the `carbon-core` processing
8//! pipeline.
9//!
10//! # Overview
11//!
12//! The core component of this module is the `Datasource` trait, which
13//! represents an interface for consuming data updates asynchronously.
14//! Implementations of `Datasource` provide the logic for fetching data updates
15//! and delivering them via a channel to the pipeline. Different types of
16//! updates are represented by the `Update` enum, including:
17//! - `AccountUpdate`: Represents updates to accounts, including the account's
18//! public key, slot, and other account data.
19//! - `TransactionUpdate`: Represents transaction updates, including transaction
20//! details, signature, and status metadata.
21//! - `AccountDeletion`: Represents account deletion events, indicating when an
22//! account is removed from the blockchain state.
23//!
24//! The module also includes the `UpdateType` enum to categorize the kinds of
25//! updates that a data source can provide.
26//!
27//! # Notes
28//!
29//! - The `Datasource` trait is asynchronous and should be used within a Tokio
30//! runtime.
31//! - Use the `Update` enum to encapsulate data updates of different types. This
32//! helps centralize handling of all update kinds in the pipeline.
33//! - Ensure implementations handle errors gracefully, especially when fetching
34//! data and sending updates to the pipeline.
35
36use solana_program::hash::Hash;
37use solana_transaction_status::Rewards;
38use {
39 crate::{error::CarbonResult, metrics::MetricsCollection},
40 async_trait::async_trait,
41 solana_account::Account,
42 solana_pubkey::Pubkey,
43 solana_signature::Signature,
44 solana_transaction::versioned::VersionedTransaction,
45 solana_transaction_status::TransactionStatusMeta,
46 std::sync::Arc,
47 tokio_util::sync::CancellationToken,
48};
49
50/// Defines the interface for data sources that produce updates for accounts,
51/// transactions, and account deletions.
52///
53/// The `Datasource` trait represents a data source that can be consumed
54/// asynchronously within a pipeline. Implementations of this trait are
55/// responsible for fetching updates and sending them through a channel to be
56/// processed further. Each datasource specifies the types of updates it
57/// supports by implementing the `update_types` method.
58///
59/// # Required Methods
60///
61/// - `consume`: Initiates the asynchronous consumption of updates. This method
62/// should send updates through the provided `sender` channel.
63/// - `update_types`: Returns a list of `UpdateType` variants indicating the
64/// types of updates the datasource can provide.
65///
66/// # Example
67///
68/// ```ignore
69/// use std::sync::Arc;
70/// use carbon_core::datasource::UpdateType;
71/// use carbon_core::datasource::Update;
72/// use carbon_core::error::CarbonResult;
73/// use carbon_core::metrics::MetricsCollection;
74/// use carbon_core::datasource::Datasource;
75/// use tokio_util::sync::CancellationToken;
76/// use async_trait::async_trait;
77///
78/// #[async_trait]
79/// impl Datasource for MyDatasource {
80/// async fn consume(
81/// &self,
82/// sender: &tokio::sync::mpsc::UnboundedSender<Update>,
83/// cancellation_token: CancellationToken,
84/// metrics: Arc<MetricsCollection>,
85/// ) -> CarbonResult<tokio::task::AbortHandle> {
86/// // Implement update fetching logic
87/// }
88///
89/// fn update_types(&self) -> Vec<UpdateType> {
90/// vec![UpdateType::AccountUpdate, UpdateType::Transaction]
91/// }
92/// }
93/// ```
94///
95/// # Notes
96///
97/// - This trait is marked with `async_trait`, so implementations must be
98/// asynchronous.
99/// - The `consume` method should handle errors and retries to ensure robust
100/// update delivery.
101#[async_trait]
102pub trait Datasource: Send + Sync {
103 async fn consume(
104 &self,
105 id: DatasourceId,
106 sender: tokio::sync::mpsc::Sender<(Update, DatasourceId)>,
107 cancellation_token: CancellationToken,
108 metrics: Arc<MetricsCollection>,
109 ) -> CarbonResult<()>;
110
111 fn update_types(&self) -> Vec<UpdateType>;
112}
113
114/// A unique identifier for a datasource in the pipeline.
115///
116/// Datasource IDs are used to track the source of data updates and enable
117/// filtering of updates based on their origin. This is particularly useful
118/// when you have multiple datasources and want to process updates selectively.
119///
120/// # Examples
121///
122/// Creating a datasource with a unique ID:
123/// ```
124/// use carbon_core::datasource::DatasourceId;
125///
126/// let id = DatasourceId::new_unique();
127/// println!("Generated ID: {:?}", id);
128/// ```
129///
130/// Creating a datasource with a named ID:
131/// ```
132/// use carbon_core::datasource::DatasourceId;
133///
134/// let id = DatasourceId::new_named("mainnet-rpc");
135/// println!("Named ID: {:?}", id);
136/// ```
137///
138/// Using with filters:
139/// ```
140/// use carbon_core::{datasource::DatasourceId, filter::DatasourceFilter};
141///
142/// let datasource_id = DatasourceId::new_named("testnet");
143/// let filter = DatasourceFilter::new(datasource_id);
144/// ```
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
146pub struct DatasourceId(String);
147
148impl DatasourceId {
149 /// Creates a new datasource ID with a randomly generated unique identifier.
150 ///
151 /// This method uses a cryptographically secure random number generator
152 /// to create a unique ID. The ID is converted to a string representation
153 /// for easy debugging and logging.
154 ///
155 /// # Returns
156 ///
157 /// A new `DatasourceId` with a unique random identifier.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// use carbon_core::datasource::DatasourceId;
163 ///
164 /// let id1 = DatasourceId::new_unique();
165 /// let id2 = DatasourceId::new_unique();
166 /// assert_ne!(id1, id2); // IDs should be different
167 /// ```
168 pub fn new_unique() -> Self {
169 Self(uuid::Uuid::new_v4().to_string())
170 }
171
172 /// Creates a new datasource ID with a specific name.
173 ///
174 /// This method is useful when you want to assign a meaningful name
175 /// to a datasource for easier identification and debugging.
176 ///
177 /// # Arguments
178 ///
179 /// * `name` - A string slice containing the name for the datasource ID
180 ///
181 /// # Returns
182 ///
183 /// A new `DatasourceId` with the specified name.
184 ///
185 /// # Examples
186 ///
187 /// ```
188 /// use carbon_core::datasource::DatasourceId;
189 ///
190 /// let mainnet_id = DatasourceId::new_named("mainnet-rpc");
191 /// let testnet_id = DatasourceId::new_named("testnet-rpc");
192 /// assert_ne!(mainnet_id, testnet_id);
193 /// ```
194 pub fn new_named(name: &str) -> Self {
195 Self(name.to_string())
196 }
197}
198
199/// Represents a data update in the `carbon-core` pipeline, encompassing
200/// different update types.
201///
202/// - `Account`: Represents an update to an account's data.
203/// - `Transaction`: Represents a transaction-related update, including
204/// transaction metadata.
205/// - `AccountDeletion`: Represents an event where an account has been deleted.
206#[derive(Debug, Clone)]
207pub enum Update {
208 Account(AccountUpdate),
209 Transaction(Box<TransactionUpdate>),
210 AccountDeletion(AccountDeletion),
211 BlockDetails(BlockDetails),
212}
213
214/// Enumerates the types of updates a datasource can provide.
215///
216/// The `UpdateType` enum categorizes updates into three types:
217/// - `AccountUpdate`: Indicates that the datasource provides account updates.
218/// - `Transaction`: Indicates that the datasource provides transaction updates.
219/// - `AccountDeletion`: Indicates that the datasource provides account deletion
220/// events.
221#[derive(Debug, Clone, PartialEq, Eq)]
222pub enum UpdateType {
223 AccountUpdate,
224 Transaction,
225 AccountDeletion,
226}
227
228/// Represents an update to a Solana account, including its public key, data,
229/// and slot information.
230///
231/// The `AccountUpdate` struct encapsulates the essential information for an
232/// account update, containing the account's `pubkey`, `account` data, and the
233/// `slot` at which the update occurred.
234///
235/// - `pubkey`: The public key of the account being updated.
236/// - `account`: The new state of the account.
237/// - `slot`: The slot number in which this account update was recorded.
238/// - `transaction_signature`: Signature of the transaction that caused the update.
239#[derive(Debug, Clone)]
240pub struct AccountUpdate {
241 pub pubkey: Pubkey,
242 pub account: Account,
243 pub slot: u64,
244 pub transaction_signature: Option<Signature>,
245}
246
247/// Represents the details of a Solana block, including its slot, hashes, rewards, and timing information.
248///
249/// The `BlockDetails` struct encapsulates the essential information for a block,
250/// providing details about its slot, blockhashes, rewards, and other metadata.
251///
252/// - `slot`: The slot number in which this block was recorded.
253/// - `previous_block_hash`: The hash of the previous block in the blockchain.
254/// - `block_hash`: The hash of the current block.
255/// - `rewards`: Optional rewards information associated with the block, such as staking rewards.
256/// - `num_reward_partitions`: Optional number of reward partitions in the block.
257/// - `block_time`: Optional Unix timestamp indicating when the block was processed.
258/// - `block_height`: Optional height of the block in the blockchain.#[derive(Debug, Clone)]
259#[derive(Debug, Clone)]
260pub struct BlockDetails {
261 pub slot: u64,
262 pub block_hash: Option<Hash>,
263 pub previous_block_hash: Option<Hash>,
264 pub rewards: Option<Rewards>,
265 pub num_reward_partitions: Option<u64>,
266 pub block_time: Option<i64>,
267 pub block_height: Option<u64>,
268}
269
270/// Represents the deletion of a Solana account, containing the account's public
271/// key and slot information.
272///
273/// The `AccountDeletion` struct indicates that an account has been removed from
274/// the blockchain state, providing the `pubkey` of the deleted account and the
275/// `slot` in which the deletion occurred.
276///
277/// - `pubkey`: The public key of the deleted account.
278/// - `slot`: The slot number in which the account was deleted.
279/// - `transaction_signature`: Signature of the transaction that caused the update.
280#[derive(Debug, Clone)]
281pub struct AccountDeletion {
282 pub pubkey: Pubkey,
283 pub slot: u64,
284 pub transaction_signature: Option<Signature>,
285}
286
287/// Represents a transaction update in the Solana network, including transaction
288/// metadata, status, slot information and block time.
289///
290/// The `TransactionUpdate` struct provides detailed information about a
291/// transaction, including its `signature`, `transaction` data, `meta` status,
292/// and the `slot` where it was recorded. Additionally, it includes a `is_vote`
293/// flag to indicate whether the transaction is a voting transaction.
294///
295/// - `signature`: The unique signature of the transaction.
296/// - `transaction`: The complete `VersionedTransaction` data of the
297/// transaction.
298/// - `meta`: Metadata about the transaction's status, such as fee information
299/// and logs.
300/// - `is_vote`: A boolean indicating whether the transaction is a vote.
301/// - `slot`: The slot number in which the transaction was recorded.
302/// - `block_time`: The Unix timestamp of when the transaction was processed.
303/// - `block_hash`: Block hash that can be used to detect a fork.
304///
305/// Note: The `block_time` field may not be returned in all scenarios.
306#[derive(Debug, Clone)]
307pub struct TransactionUpdate {
308 pub signature: Signature,
309 pub transaction: VersionedTransaction, // TODO: replace with solana_transaction crate after 2.2.0 release
310 pub meta: TransactionStatusMeta,
311 pub is_vote: bool,
312 pub slot: u64,
313 pub block_time: Option<i64>,
314 pub block_hash: Option<Hash>,
315}