Struct PipelineBuilder

Source
pub struct PipelineBuilder {
    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
    pub account_pipes: Vec<Box<dyn AccountPipes>>,
    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
    pub metrics: MetricsCollection,
    pub metrics_flush_interval: Option<u64>,
    pub datasource_cancellation_token: Option<CancellationToken>,
    pub shutdown_strategy: ShutdownStrategy,
    pub channel_buffer_size: usize,
}
Expand description

A builder for constructing a Pipeline instance with customized data sources, processing pipes, and metrics.

The PipelineBuilder struct offers a flexible way to assemble a Pipeline by allowing configuration of its components, such as data sources, account and transaction pipes, deletion handling, and metrics. Using the builder pattern, you can add the desired elements incrementally and then finalize with a call to build.

§Overview

The PipelineBuilder supports the following components:

  • Datasources: Sources of data updates, such as account information and transactions.
  • Account Pipes: For processing account updates from data sources.
  • Account Deletion Pipes: For handling account deletion updates.
  • Instruction Pipes: For handling instructions associated with transactions.
  • Transaction Pipes: For handling full transaction data.
  • Metrics: Collects and reports performance data, such as update processing times.
  • Metrics Flush Interval: Optional interval defining how often to flush metrics data.

Each component can be added through method chaining, enhancing code readability and maintainability.

§Example

use std::sync::Arc;

carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
   TestProgramDecoder,
   TestProgramProcessor
);
// ...

§Fields

  • datasources: A collection of Datasource objects wrapped in Arc for shared ownership across threads. Each Datasource provides updates to the pipeline.
  • account_pipes: A collection of AccountPipes to handle account updates.
  • account_deletion_pipes: A collection of AccountDeletionPipes for processing account deletions.
  • instruction_pipes: A collection of InstructionPipes to process instructions in transactions.
  • transaction_pipes: A collection of TransactionPipes to process full transaction data.
  • metrics: A vector of Metrics implementations for tracking pipeline performance.
  • metrics_flush_interval: An optional interval (in seconds) for flushing metrics data. If not set, a default flush interval will be used.
  • datasource_cancellation_token: An optional CancellationToken for canceling datasource. If not set, a default CancellationToken will be used.
  • channel_buffer_size: The size of the channel buffer for the pipeline. If not set, a default size of 10_000 will be used.

§Returns

After configuring the builder, call build to create a Pipeline instance. The builder will return a CarbonResult<Pipeline>, which will either contain the configured pipeline or an error if configuration failed.

§Notes

  • The builder pattern allows for method chaining, making it easy to incrementally add components to the Pipeline.
  • Ensure that each component matches the data and update types expected by your application.

Fields§

§datasources: Vec<Arc<dyn Datasource + Send + Sync>>§account_pipes: Vec<Box<dyn AccountPipes>>§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>§block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>§metrics: MetricsCollection§metrics_flush_interval: Option<u64>§datasource_cancellation_token: Option<CancellationToken>§shutdown_strategy: ShutdownStrategy§channel_buffer_size: usize

Implementations§

Source§

impl PipelineBuilder

Source

pub fn new() -> Self

Creates a new PipelineBuilder with empty collections for datasources, pipes, and metrics.

This method initializes a PipelineBuilder instance, allowing you to configure each component of a Pipeline before building it. The builder pattern offers flexibility in adding data sources, account and transaction handling pipes, deletion processing, and metrics collection features.

§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new();
Source

pub fn datasource(self, datasource: impl Datasource + 'static) -> Self

Adds a datasource to the pipeline.

The datasource is responsible for providing updates, such as account and transaction data, to the pipeline. Multiple datasources can be added to handle various types of updates.

§Parameters
  • datasource: The data source to add, implementing the Datasource trait.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .datasource(MyDatasource::new());
Source

pub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self

Sets the shutdown strategy for the pipeline.

This method configures how the pipeline should handle shutdowns. The shutdown strategy defines whether the pipeline should terminate immediately or continue processing pending updates after terminating the data sources.

§Parameters
  • shutdown_strategy: A variant of ShutdownStrategy that determines how the pipeline should handle shutdowns.
§Returns

Returns Self, allowing for method chaining.

§Notes
  • Use ShutdownStrategy::Immediate to stop the entire pipeline instantly, including all active processing tasks.
  • Use ShutdownStrategy::ProcessPending (the default) to terminate data sources first and allow the pipeline to finish processing any updates that are still pending.
Source

pub fn account<T: Send + Sync + 'static>( self, decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static, processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static, ) -> Self

Adds an account pipe to process account updates.

Account pipes decode and process updates to accounts within the pipeline. This method requires both an AccountDecoder and a Processor to handle decoded account data.

§Parameters
  • decoder: An AccountDecoder that decodes the account data.
  • processor: A Processor that processes the decoded account data.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .account(MyAccountDecoder, MyAccountProcessor);
Source

pub fn account_deletions( self, processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static, ) -> Self

Adds an account deletion pipe to handle account deletion events.

Account deletion pipes process deletions of accounts, with a Processor to handle the deletion events as they occur.

§Parameters
  • processor: A Processor that processes account deletion events.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .account_deletions(MyAccountDeletionProcessor);
Source

pub fn block_details( self, processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static, ) -> Self

Adds a block details pipe to handle block details updates.

Block details pipes process updates related to block metadata, such as slot, block hash, and rewards, with a Processor to handle the updates.

§Parameters
  • processor: A Processor that processes block details updates.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .block_details(MyBlockDetailsProcessor);
Source

pub fn instruction<T: Send + Sync + 'static>( self, decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static, processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static, ) -> Self

Adds an instruction pipe to process instructions within transactions.

Instruction pipes decode and process individual instructions, enabling specialized handling of various instruction types.

§Parameters
  • decoder: An InstructionDecoder for decoding instructions from transaction data.
  • processor: A Processor that processes decoded instruction data.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .instruction(MyDecoder, MyInstructionProcessor);
Source

pub fn transaction<T, U>( self, processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static, schema: Option<TransactionSchema<T>>, ) -> Self
where T: InstructionDecoderCollection + 'static, U: DeserializeOwned + Send + Sync + 'static,

Adds a transaction pipe for processing full transaction data.

This method requires a transaction schema for decoding and a Processor to handle the processed transaction data.

§Parameters
  • schema: A TransactionSchema used to match and interpret transaction data.
  • processor: A Processor that processes the decoded transaction data.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
Source

pub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self

Adds a metrics component to the pipeline for performance tracking.

This component collects and reports on pipeline metrics, providing insights into performance and operational statistics.

§Parameters
  • metrics: An instance of a Metrics implementation, used to gather and report metrics.
§Example
use std::sync::Arc;
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .metrics(Arc::new(LogMetrics::new()));
Source

pub fn metrics_flush_interval(self, interval: u64) -> Self

Sets the interval for flushing metrics data.

This value defines the frequency, in seconds, at which metrics data is flushed from memory. If not set, a default interval is used.

§Parameters
  • interval: The flush interval for metrics, in seconds.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .metrics_flush_interval(60);
Source

pub fn datasource_cancellation_token( self, cancellation_token: CancellationToken, ) -> Self

Sets the cancellation token for cancelling datasource on demand.

This value is used to cancel datasource on demand. If not set, a default CancellationToken is used.

§Parameters
  • cancellation_token: An instance of CancellationToken.
§Example
use carbon_core::pipeline::PipelineBuilder;
use tokio_util::sync::CancellationToken;

let builder = PipelineBuilder::new()
    .datasource_cancellation_token(CancellationToken::new());
Source

pub fn channel_buffer_size(self, size: usize) -> Self

Sets the size of the channel buffer for the pipeline.

This value defines the maximum number of updates that can be queued in the pipeline’s channel buffer. If not set, a default size of 10_000 will be used.

§Parameters
  • size: The size of the channel buffer for the pipeline.
§Example
use carbon_core::pipeline::PipelineBuilder;

let builder = PipelineBuilder::new()
    .channel_buffer_size(1000);
Source

pub fn build(self) -> CarbonResult<Pipeline>

Builds and returns a Pipeline configured with the specified components.

After configuring the PipelineBuilder with data sources, pipes, and metrics, call this method to create the final Pipeline instance ready for operation.

§Returns

Returns a CarbonResult<Pipeline> containing the configured Pipeline, or an error if any part of the configuration is invalid.

§Example
use std::sync::Arc;

carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
   TestProgramDecoder,
   TestProgramProcessor
)
.account(
    TestProgramDecoder,
    TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.channel_buffer_size(1000)
.build()?;

 Ok(())

Trait Implementations§

Source§

impl Default for PipelineBuilder

Source§

fn default() -> PipelineBuilder

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V