Struct Pipeline

Source
pub struct Pipeline {
    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
    pub account_pipes: Vec<Box<dyn AccountPipes>>,
    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
    pub metrics: Arc<MetricsCollection>,
    pub metrics_flush_interval: Option<u64>,
    pub datasource_cancellation_token: Option<CancellationToken>,
    pub shutdown_strategy: ShutdownStrategy,
    pub channel_buffer_size: usize,
}
Expand description

Represents the primary data processing pipeline in the carbon-core framework.

The Pipeline struct is responsible for orchestrating the flow of data from various sources, processing it through multiple pipes (for accounts, transactions, instructions, and account deletions), and recording metrics at each stage. This flexible design allows for customized data processing, handling a variety of update types with minimal boilerplate code.

§Overview

A Pipeline instance includes collections of data sources and processing pipes, enabling users to configure the pipeline to handle diverse types of blockchain-related data. Each pipe is responsible for decoding, processing, and routing specific data types, while the metrics system records relevant statistics.

§Key Concepts

  • Datasources: These provide the raw data, such as account updates, transaction details, and account deletions.
  • Pipes: Modular units that handle specific data types:
    • AccountPipes for account updates.
    • AccountDeletionPipes for account deletions.
    • InstructionPipes for instruction data within transactions.
    • TransactionPipes for entire transaction payloads.
  • Metrics: Collect performance data, enabling real-time insights and efficient monitoring.

§Fields

  • datasources: A vector of data sources (Datasource implementations) that provide the data for processing. Each data source must be wrapped in an Arc for safe, concurrent access.
  • account_pipes: A vector of AccountPipes, each responsible for handling account updates.
  • account_deletion_pipes: A vector of AccountDeletionPipes to handle deletion events.
  • block_details_pipes: A vector of BlockDetailsPipes to handle block details.
  • instruction_pipes: A vector of InstructionPipes for processing instructions within transactions. These pipes work with nested instructions and are generically defined to support varied instruction types.
  • transaction_pipes: A vector of TransactionPipes responsible for processing complete transaction payloads.
  • metrics: A vector of Metrics implementations to record and track performance data. Each metrics instance is managed within an Arc to ensure thread safety.
  • metrics_flush_interval: An optional interval, in seconds, defining how frequently metrics should be flushed. If None, the default interval is used.
  • channel_buffer_size: The size of the channel buffer for the pipeline. If not set, a default size of 10_000 will be used.

§Example

use std::sync::Arc;

carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
   TestProgramDecoder,
   TestProgramProcessor
)
.account(
    TestProgramDecoder,
    TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.channel_buffer_size(1000)
.build()?
.run()
.await?;

§Notes

  • Ensure that each data source and pipe implements the required traits, such as Datasource, AccountPipes, and Metrics, as appropriate.
  • The pipeline is designed for concurrent operation, utilizing Arc and Box types to handle shared ownership and trait object storage.
  • The metrics_flush_interval controls how frequently the pipeline’s metrics are flushed. If None, a default interval (usually 5 seconds) is used.

Fields§

§datasources: Vec<Arc<dyn Datasource + Send + Sync>>§account_pipes: Vec<Box<dyn AccountPipes>>§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>§block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>§metrics: Arc<MetricsCollection>§metrics_flush_interval: Option<u64>§datasource_cancellation_token: Option<CancellationToken>§shutdown_strategy: ShutdownStrategy§channel_buffer_size: usize

Implementations§

Source§

impl Pipeline

Source

pub fn builder() -> PipelineBuilder

Creates a new PipelineBuilder instance for constructing a Pipeline.

The builder method returns a PipelineBuilder that allows you to configure and customize the pipeline components before building the final Pipeline object. This approach provides a flexible and type-safe way to assemble a pipeline by specifying data sources, processing pipes, and metrics.

§Example
use std::sync::Arc;

carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
   TestProgramDecoder,
   TestProgramProcessor
);
// ...
§Returns

Returns a PipelineBuilder instance with empty collections for data sources, pipes, and metrics. You can then configure each component using the builder pattern.

Source

pub async fn run(&mut self) -> CarbonResult<()>

Runs the Pipeline, processing updates from data sources and handling metrics.

The run method initializes the pipeline’s metrics system and starts listening for updates from the configured data sources. It checks the types of updates provided by each data source to ensure that the required data types are available for processing. The method then enters a loop where it processes each update received from the data sources in turn, logging and updating metrics based on the success or failure of each operation.

§How it Works
  • Initializes metrics and sets up an interval for periodic metric flushing.
  • Spawns tasks for each data source to continuously consume updates.
  • Processes updates according to their type (e.g., Account, Transaction, or AccountDeletion).
  • Records performance metrics such as update processing times, and tracks success and failure counts.
§Errors

The method returns an Err variant if:

  • Required update types (e.g., AccountUpdate, AccountDeletion, Transaction) are not provided by any data source, causing a mismatch in expected data processing capabilities.
  • A data source encounters an error while consuming updates.
  • An error occurs during metrics flushing or processing of updates.
§Example
use carbon_core::pipeline::Pipeline;

let mut pipeline = Pipeline::builder()
    .datasource(MyDatasource::new())
    .metrics(MyMetrics::new())
    .build()
    .expect("Failed to build pipeline");

// Running the pipeline asynchronously
tokio::spawn(async move {
    if let Err(e) = pipeline.run().await {
        eprintln!("Pipeline run error: {:?}", e);
    }
});
§Notes
  • This method is asynchronous and should be awaited within a Tokio runtime environment.
  • The pipeline monitors metrics and flushes them based on the configured metrics_flush_interval.
  • The run method operates in an infinite loop, handling updates until a termination condition occurs.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V