pub struct PipelineBuilder {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: MetricsCollection,
pub metrics_flush_interval: Option<u64>,
pub datasource_cancellation_token: Option<CancellationToken>,
pub shutdown_strategy: ShutdownStrategy,
pub channel_buffer_size: usize,
}Expand description
A builder for constructing a Pipeline instance with customized data
sources, processing pipes, and metrics.
The PipelineBuilder struct offers a flexible way to assemble a Pipeline
by allowing configuration of its components, such as data sources, account
and transaction pipes, deletion handling, and metrics. Using the builder
pattern, you can add the desired elements incrementally and then finalize
with a call to build.
§Overview
The PipelineBuilder supports the following components:
- Datasources: Sources of data updates, such as account information and transactions.
- Account Pipes: For processing account updates from data sources.
- Account Deletion Pipes: For handling account deletion updates.
- Instruction Pipes: For handling instructions associated with transactions.
- Transaction Pipes: For handling full transaction data.
- Metrics: Collects and reports performance data, such as update processing times.
- Metrics Flush Interval: Optional interval defining how often to flush metrics data.
Each component can be added through method chaining, enhancing code readability and maintainability.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
);
// ...§Fields
datasources: A collection ofDatasourceobjects wrapped inArcfor shared ownership across threads. EachDatasourceprovides updates to the pipeline.account_pipes: A collection ofAccountPipesto handle account updates.account_deletion_pipes: A collection ofAccountDeletionPipesfor processing account deletions.instruction_pipes: A collection ofInstructionPipesto process instructions in transactions.transaction_pipes: A collection ofTransactionPipesto process full transaction data.metrics: A vector ofMetricsimplementations for tracking pipeline performance.metrics_flush_interval: An optional interval (in seconds) for flushing metrics data. If not set, a default flush interval will be used.datasource_cancellation_token: An optionalCancellationTokenfor canceling datasource. If not set, a defaultCancellationTokenwill be used.channel_buffer_size: The size of the channel buffer for the pipeline. If not set, a default size of 10_000 will be used.
§Returns
After configuring the builder, call build to create a Pipeline instance.
The builder will return a CarbonResult<Pipeline>, which will either
contain the configured pipeline or an error if configuration failed.
§Notes
- The builder pattern allows for method chaining, making it easy to
incrementally add components to the
Pipeline. - Ensure that each component matches the data and update types expected by your application.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>§account_pipes: Vec<Box<dyn AccountPipes>>§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>§block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>§metrics: MetricsCollection§metrics_flush_interval: Option<u64>§datasource_cancellation_token: Option<CancellationToken>§shutdown_strategy: ShutdownStrategy§channel_buffer_size: usizeImplementations§
Source§impl PipelineBuilder
impl PipelineBuilder
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new PipelineBuilder with empty collections for datasources,
pipes, and metrics.
This method initializes a PipelineBuilder instance, allowing you to
configure each component of a Pipeline before building it. The
builder pattern offers flexibility in adding data sources, account
and transaction handling pipes, deletion processing, and metrics
collection features.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new();Sourcepub fn datasource(self, datasource: impl Datasource + 'static) -> Self
pub fn datasource(self, datasource: impl Datasource + 'static) -> Self
Adds a datasource to the pipeline.
The datasource is responsible for providing updates, such as account and transaction data, to the pipeline. Multiple datasources can be added to handle various types of updates.
§Parameters
datasource: The data source to add, implementing theDatasourcetrait.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.datasource(MyDatasource::new());Sourcepub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
pub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
Sets the shutdown strategy for the pipeline.
This method configures how the pipeline should handle shutdowns. The shutdown strategy defines whether the pipeline should terminate immediately or continue processing pending updates after terminating the data sources.
§Parameters
shutdown_strategy: A variant ofShutdownStrategythat determines how the pipeline should handle shutdowns.
§Returns
Returns Self, allowing for method chaining.
§Notes
- Use
ShutdownStrategy::Immediateto stop the entire pipeline instantly, including all active processing tasks. - Use
ShutdownStrategy::ProcessPending(the default) to terminate data sources first and allow the pipeline to finish processing any updates that are still pending.
Sourcepub fn account<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn account<T: Send + Sync + 'static>( self, decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static, processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an account pipe to process account updates.
Account pipes decode and process updates to accounts within the
pipeline. This method requires both an AccountDecoder and a
Processor to handle decoded account data.
§Parameters
decoder: AnAccountDecoderthat decodes the account data.processor: AProcessorthat processes the decoded account data.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.account(MyAccountDecoder, MyAccountProcessor);Sourcepub fn account_deletions(
self,
processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
) -> Self
pub fn account_deletions( self, processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static, ) -> Self
Adds an account deletion pipe to handle account deletion events.
Account deletion pipes process deletions of accounts, with a Processor
to handle the deletion events as they occur.
§Parameters
processor: AProcessorthat processes account deletion events.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.account_deletions(MyAccountDeletionProcessor);Sourcepub fn block_details(
self,
processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
) -> Self
pub fn block_details( self, processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static, ) -> Self
Adds a block details pipe to handle block details updates.
Block details pipes process updates related to block metadata, such as
slot, block hash, and rewards, with a Processor to handle the updates.
§Parameters
processor: AProcessorthat processes block details updates.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.block_details(MyBlockDetailsProcessor);Sourcepub fn instruction<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn instruction<T: Send + Sync + 'static>( self, decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static, processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an instruction pipe to process instructions within transactions.
Instruction pipes decode and process individual instructions, enabling specialized handling of various instruction types.
§Parameters
decoder: AnInstructionDecoderfor decoding instructions from transaction data.processor: AProcessorthat processes decoded instruction data.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.instruction(MyDecoder, MyInstructionProcessor);Sourcepub fn transaction<T, U>(
self,
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static,
schema: Option<TransactionSchema<T>>,
) -> Self
pub fn transaction<T, U>( self, processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static, schema: Option<TransactionSchema<T>>, ) -> Self
Adds a transaction pipe for processing full transaction data.
This method requires a transaction schema for decoding and a Processor
to handle the processed transaction data.
§Parameters
schema: ATransactionSchemaused to match and interpret transaction data.processor: AProcessorthat processes the decoded transaction data.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.transaction(MY_SCHEMA.clone(), MyTransactionProcessor);Sourcepub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
pub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
Adds a metrics component to the pipeline for performance tracking.
This component collects and reports on pipeline metrics, providing insights into performance and operational statistics.
§Parameters
metrics: An instance of aMetricsimplementation, used to gather and report metrics.
§Example
use std::sync::Arc;
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.metrics(Arc::new(LogMetrics::new()));Sourcepub fn metrics_flush_interval(self, interval: u64) -> Self
pub fn metrics_flush_interval(self, interval: u64) -> Self
Sets the interval for flushing metrics data.
This value defines the frequency, in seconds, at which metrics data is flushed from memory. If not set, a default interval is used.
§Parameters
interval: The flush interval for metrics, in seconds.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.metrics_flush_interval(60);Sourcepub fn datasource_cancellation_token(
self,
cancellation_token: CancellationToken,
) -> Self
pub fn datasource_cancellation_token( self, cancellation_token: CancellationToken, ) -> Self
Sets the cancellation token for cancelling datasource on demand.
This value is used to cancel datasource on demand.
If not set, a default CancellationToken is used.
§Parameters
cancellation_token: An instance ofCancellationToken.
§Example
use carbon_core::pipeline::PipelineBuilder;
use tokio_util::sync::CancellationToken;
let builder = PipelineBuilder::new()
.datasource_cancellation_token(CancellationToken::new());Sourcepub fn channel_buffer_size(self, size: usize) -> Self
pub fn channel_buffer_size(self, size: usize) -> Self
Sets the size of the channel buffer for the pipeline.
This value defines the maximum number of updates that can be queued in the pipeline’s channel buffer. If not set, a default size of 10_000 will be used.
§Parameters
size: The size of the channel buffer for the pipeline.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.channel_buffer_size(1000);Sourcepub fn build(self) -> CarbonResult<Pipeline>
pub fn build(self) -> CarbonResult<Pipeline>
Builds and returns a Pipeline configured with the specified
components.
After configuring the PipelineBuilder with data sources, pipes, and
metrics, call this method to create the final Pipeline instance
ready for operation.
§Returns
Returns a CarbonResult<Pipeline> containing the configured Pipeline,
or an error if any part of the configuration is invalid.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.channel_buffer_size(1000)
.build()?;
Ok(())Trait Implementations§
Source§impl Default for PipelineBuilder
impl Default for PipelineBuilder
Source§fn default() -> PipelineBuilder
fn default() -> PipelineBuilder
Auto Trait Implementations§
impl Freeze for PipelineBuilder
impl !RefUnwindSafe for PipelineBuilder
impl Send for PipelineBuilder
impl Sync for PipelineBuilder
impl Unpin for PipelineBuilder
impl !UnwindSafe for PipelineBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more