pub struct PipelineBuilder {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: MetricsCollection,
pub metrics_flush_interval: Option<u64>,
pub datasource_cancellation_token: Option<CancellationToken>,
pub shutdown_strategy: ShutdownStrategy,
pub channel_buffer_size: usize,
}
Expand description
A builder for constructing a Pipeline
instance with customized data
sources, processing pipes, and metrics.
The PipelineBuilder
struct offers a flexible way to assemble a Pipeline
by allowing configuration of its components, such as data sources, account
and transaction pipes, deletion handling, and metrics. Using the builder
pattern, you can add the desired elements incrementally and then finalize
with a call to build
.
§Overview
The PipelineBuilder
supports the following components:
- Datasources: Sources of data updates, such as account information and transactions.
- Account Pipes: For processing account updates from data sources.
- Account Deletion Pipes: For handling account deletion updates.
- Instruction Pipes: For handling instructions associated with transactions.
- Transaction Pipes: For handling full transaction data.
- Metrics: Collects and reports performance data, such as update processing times.
- Metrics Flush Interval: Optional interval defining how often to flush metrics data.
Each component can be added through method chaining, enhancing code readability and maintainability.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
);
// ...
§Fields
datasources
: A collection ofDatasource
objects wrapped inArc
for shared ownership across threads. EachDatasource
provides updates to the pipeline.account_pipes
: A collection ofAccountPipes
to handle account updates.account_deletion_pipes
: A collection ofAccountDeletionPipes
for processing account deletions.instruction_pipes
: A collection ofInstructionPipes
to process instructions in transactions.transaction_pipes
: A collection ofTransactionPipes
to process full transaction data.metrics
: A vector ofMetrics
implementations for tracking pipeline performance.metrics_flush_interval
: An optional interval (in seconds) for flushing metrics data. If not set, a default flush interval will be used.datasource_cancellation_token
: An optionalCancellationToken
for canceling datasource. If not set, a defaultCancellationToken
will be used.channel_buffer_size
: The size of the channel buffer for the pipeline. If not set, a default size of 10_000 will be used.
§Returns
After configuring the builder, call build
to create a Pipeline
instance.
The builder will return a CarbonResult<Pipeline>
, which will either
contain the configured pipeline or an error if configuration failed.
§Notes
- The builder pattern allows for method chaining, making it easy to
incrementally add components to the
Pipeline
. - Ensure that each component matches the data and update types expected by your application.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>
§account_pipes: Vec<Box<dyn AccountPipes>>
§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>
§block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>
§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>
§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>
§metrics: MetricsCollection
§metrics_flush_interval: Option<u64>
§datasource_cancellation_token: Option<CancellationToken>
§shutdown_strategy: ShutdownStrategy
§channel_buffer_size: usize
Implementations§
Source§impl PipelineBuilder
impl PipelineBuilder
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new PipelineBuilder
with empty collections for datasources,
pipes, and metrics.
This method initializes a PipelineBuilder
instance, allowing you to
configure each component of a Pipeline
before building it. The
builder pattern offers flexibility in adding data sources, account
and transaction handling pipes, deletion processing, and metrics
collection features.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new();
Sourcepub fn datasource(self, datasource: impl Datasource + 'static) -> Self
pub fn datasource(self, datasource: impl Datasource + 'static) -> Self
Adds a datasource to the pipeline.
The datasource is responsible for providing updates, such as account and transaction data, to the pipeline. Multiple datasources can be added to handle various types of updates.
§Parameters
datasource
: The data source to add, implementing theDatasource
trait.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.datasource(MyDatasource::new());
Sourcepub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
pub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
Sets the shutdown strategy for the pipeline.
This method configures how the pipeline should handle shutdowns. The shutdown strategy defines whether the pipeline should terminate immediately or continue processing pending updates after terminating the data sources.
§Parameters
shutdown_strategy
: A variant ofShutdownStrategy
that determines how the pipeline should handle shutdowns.
§Returns
Returns Self
, allowing for method chaining.
§Notes
- Use
ShutdownStrategy::Immediate
to stop the entire pipeline instantly, including all active processing tasks. - Use
ShutdownStrategy::ProcessPending
(the default) to terminate data sources first and allow the pipeline to finish processing any updates that are still pending.
Sourcepub fn account<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn account<T: Send + Sync + 'static>( self, decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static, processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an account pipe to process account updates.
Account pipes decode and process updates to accounts within the
pipeline. This method requires both an AccountDecoder
and a
Processor
to handle decoded account data.
§Parameters
decoder
: AnAccountDecoder
that decodes the account data.processor
: AProcessor
that processes the decoded account data.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.account(MyAccountDecoder, MyAccountProcessor);
Sourcepub fn account_deletions(
self,
processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
) -> Self
pub fn account_deletions( self, processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static, ) -> Self
Adds an account deletion pipe to handle account deletion events.
Account deletion pipes process deletions of accounts, with a Processor
to handle the deletion events as they occur.
§Parameters
processor
: AProcessor
that processes account deletion events.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.account_deletions(MyAccountDeletionProcessor);
Sourcepub fn block_details(
self,
processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
) -> Self
pub fn block_details( self, processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static, ) -> Self
Adds a block details pipe to handle block details updates.
Block details pipes process updates related to block metadata, such as
slot, block hash, and rewards, with a Processor
to handle the updates.
§Parameters
processor
: AProcessor
that processes block details updates.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.block_details(MyBlockDetailsProcessor);
Sourcepub fn instruction<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn instruction<T: Send + Sync + 'static>( self, decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static, processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an instruction pipe to process instructions within transactions.
Instruction pipes decode and process individual instructions, enabling specialized handling of various instruction types.
§Parameters
decoder
: AnInstructionDecoder
for decoding instructions from transaction data.processor
: AProcessor
that processes decoded instruction data.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.instruction(MyDecoder, MyInstructionProcessor);
Sourcepub fn transaction<T, U>(
self,
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static,
schema: Option<TransactionSchema<T>>,
) -> Self
pub fn transaction<T, U>( self, processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static, schema: Option<TransactionSchema<T>>, ) -> Self
Adds a transaction pipe for processing full transaction data.
This method requires a transaction schema for decoding and a Processor
to handle the processed transaction data.
§Parameters
schema
: ATransactionSchema
used to match and interpret transaction data.processor
: AProcessor
that processes the decoded transaction data.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
Sourcepub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
pub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
Adds a metrics component to the pipeline for performance tracking.
This component collects and reports on pipeline metrics, providing insights into performance and operational statistics.
§Parameters
metrics
: An instance of aMetrics
implementation, used to gather and report metrics.
§Example
use std::sync::Arc;
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.metrics(Arc::new(LogMetrics::new()));
Sourcepub fn metrics_flush_interval(self, interval: u64) -> Self
pub fn metrics_flush_interval(self, interval: u64) -> Self
Sets the interval for flushing metrics data.
This value defines the frequency, in seconds, at which metrics data is flushed from memory. If not set, a default interval is used.
§Parameters
interval
: The flush interval for metrics, in seconds.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.metrics_flush_interval(60);
Sourcepub fn datasource_cancellation_token(
self,
cancellation_token: CancellationToken,
) -> Self
pub fn datasource_cancellation_token( self, cancellation_token: CancellationToken, ) -> Self
Sets the cancellation token for cancelling datasource on demand.
This value is used to cancel datasource on demand.
If not set, a default CancellationToken
is used.
§Parameters
cancellation_token
: An instance ofCancellationToken
.
§Example
use carbon_core::pipeline::PipelineBuilder;
use tokio_util::sync::CancellationToken;
let builder = PipelineBuilder::new()
.datasource_cancellation_token(CancellationToken::new());
Sourcepub fn channel_buffer_size(self, size: usize) -> Self
pub fn channel_buffer_size(self, size: usize) -> Self
Sets the size of the channel buffer for the pipeline.
This value defines the maximum number of updates that can be queued in the pipeline’s channel buffer. If not set, a default size of 10_000 will be used.
§Parameters
size
: The size of the channel buffer for the pipeline.
§Example
use carbon_core::pipeline::PipelineBuilder;
let builder = PipelineBuilder::new()
.channel_buffer_size(1000);
Sourcepub fn build(self) -> CarbonResult<Pipeline>
pub fn build(self) -> CarbonResult<Pipeline>
Builds and returns a Pipeline
configured with the specified
components.
After configuring the PipelineBuilder
with data sources, pipes, and
metrics, call this method to create the final Pipeline
instance
ready for operation.
§Returns
Returns a CarbonResult<Pipeline>
containing the configured Pipeline
,
or an error if any part of the configuration is invalid.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.channel_buffer_size(1000)
.build()?;
Ok(())
Trait Implementations§
Source§impl Default for PipelineBuilder
impl Default for PipelineBuilder
Source§fn default() -> PipelineBuilder
fn default() -> PipelineBuilder
Auto Trait Implementations§
impl Freeze for PipelineBuilder
impl !RefUnwindSafe for PipelineBuilder
impl Send for PipelineBuilder
impl Sync for PipelineBuilder
impl Unpin for PipelineBuilder
impl !UnwindSafe for PipelineBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more