pub struct PipelineBuilder {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: MetricsCollection,
pub metrics_flush_interval: Option<u64>,
pub shutdown_strategy: ShutdownStrategy,
}Expand description
A builder for constructing a Pipeline instance with customized data sources, processing pipes, and metrics.
The PipelineBuilder struct offers a flexible way to assemble a Pipeline by allowing
configuration of its components, such as data sources, account and transaction pipes,
deletion handling, and metrics. Using the builder pattern, you can add the desired elements
incrementally and then finalize with a call to build.
§Overview
The PipelineBuilder supports the following components:
- Datasources: Sources of data updates, such as account information and transactions.
- Account Pipes: For processing account updates from data sources.
- Account Deletion Pipes: For handling account deletion updates.
- Instruction Pipes: For handling instructions associated with transactions.
- Transaction Pipes: For handling full transaction data.
- Metrics: Collects and reports performance data, such as update processing times.
- Metrics Flush Interval: Optional interval defining how often to flush metrics data.
Each component can be added through method chaining, enhancing code readability and maintainability.
§Example
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
// ...§Fields
datasources: A collection ofDatasourceobjects wrapped inArcfor shared ownership across threads. EachDatasourceprovides updates to the pipeline.account_pipes: A collection ofAccountPipesto handle account updates.account_deletion_pipes: A collection ofAccountDeletionPipesfor processing account deletions.instruction_pipes: A collection ofInstructionPipesto process instructions in transactions.transaction_pipes: A collection ofTransactionPipesto process full transaction data.metrics: A vector ofMetricsimplementations for tracking pipeline performance.metrics_flush_interval: An optional interval (in seconds) for flushing metrics data. If not set, a default flush interval will be used.
§Returns
After configuring the builder, call build to create a Pipeline instance.
The builder will return a CarbonResult<Pipeline>, which will either contain the
configured pipeline or an error if configuration failed.
§Notes
- The builder pattern allows for method chaining, making it easy to incrementally
add components to the
Pipeline. - Ensure that each component matches the data and update types expected by your application.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>§account_pipes: Vec<Box<dyn AccountPipes>>§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>§metrics: MetricsCollection§metrics_flush_interval: Option<u64>§shutdown_strategy: ShutdownStrategyImplementations§
Source§impl PipelineBuilder
impl PipelineBuilder
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new PipelineBuilder with empty collections for datasources, pipes, and metrics.
This method initializes a PipelineBuilder instance, allowing you to configure each component
of a Pipeline before building it. The builder pattern offers flexibility in adding data
sources, account and transaction handling pipes, deletion processing, and metrics collection
features.
§Example
let builder = PipelineBuilder::new();Sourcepub fn datasource(
self,
datasource: impl Datasource + Send + Sync + 'static,
) -> Self
pub fn datasource( self, datasource: impl Datasource + Send + Sync + 'static, ) -> Self
Adds a datasource to the pipeline.
The datasource is responsible for providing updates, such as account and transaction data, to the pipeline. Multiple datasources can be added to handle various types of updates.
§Parameters
datasource: The data source to add, implementing theDatasourcetrait.
§Example
let builder = PipelineBuilder::new()
.datasource(MyDatasource::new());Sourcepub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
pub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
Sets the shutdown strategy for the pipeline.
This method configures how the pipeline should handle shutdowns. The shutdown strategy defines whether the pipeline should terminate immediately or continue processing pending updates after terminating the data sources.
§Parameters
shutdown_strategy: A variant ofShutdownStrategythat determines how the pipeline should handle shutdowns.
§Returns
Returns Self, allowing for method chaining.
§Notes
- Use
ShutdownStrategy::Immediateto stop the entire pipeline instantly, including all active processing tasks. - Use
ShutdownStrategy::ProcessPending(the default) to terminate data sources first and allow the pipeline to finish processing any updates that are still pending.
Sourcepub fn account<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn account<T: Send + Sync + 'static>( self, decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static, processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an account pipe to process account updates.
Account pipes decode and process updates to accounts within the pipeline. This method
requires both an AccountDecoder and a Processor to handle decoded account data.
§Parameters
decoder: AnAccountDecoderthat decodes the account data.processor: AProcessorthat processes the decoded account data.
§Example
let builder = PipelineBuilder::new()
.account(MyAccountDecoder, MyAccountProcessor);Sourcepub fn account_deletions(
self,
processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
) -> Self
pub fn account_deletions( self, processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static, ) -> Self
Adds an account deletion pipe to handle account deletion events.
Account deletion pipes process deletions of accounts, with a Processor to handle
the deletion events as they occur.
§Parameters
processor: AProcessorthat processes account deletion events.
§Example
let builder = PipelineBuilder::new()
.account_deletions(MyAccountDeletionProcessor);Sourcepub fn instruction<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn instruction<T: Send + Sync + 'static>( self, decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static, processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an instruction pipe to process instructions within transactions.
Instruction pipes decode and process individual instructions, enabling specialized handling of various instruction types.
§Parameters
decoder: AnInstructionDecoderfor decoding instructions from transaction data.processor: AProcessorthat processes decoded instruction data.
§Example
let builder = PipelineBuilder::new()
.instruction(MyDecoder, MyInstructionProcessor);Sourcepub fn transaction<T, U>(
self,
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static,
schema: Option<TransactionSchema<T>>,
) -> Self
pub fn transaction<T, U>( self, processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static, schema: Option<TransactionSchema<T>>, ) -> Self
Adds a transaction pipe for processing full transaction data.
This method requires a transaction schema for decoding and a Processor to handle
the processed transaction data.
§Parameters
schema: ATransactionSchemaused to match and interpret transaction data.processor: AProcessorthat processes the decoded transaction data.
§Example
let builder = PipelineBuilder::new()
.transaction(MY_SCHEMA.clone(), MyTransactionProcessor);Sourcepub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
pub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
Adds a metrics component to the pipeline for performance tracking.
This component collects and reports on pipeline metrics, providing insights into performance and operational statistics.
§Parameters
metrics: An instance of aMetricsimplementation, used to gather and report metrics.
§Example
let builder = PipelineBuilder::new()
.metrics(Arc::new(LogMetrics::new()));Sourcepub fn metrics_flush_interval(self, interval: u64) -> Self
pub fn metrics_flush_interval(self, interval: u64) -> Self
Sets the interval for flushing metrics data.
This value defines the frequency, in seconds, at which metrics data is flushed from memory. If not set, a default interval is used.
§Parameters
interval: The flush interval for metrics, in seconds.
§Example
let builder = PipelineBuilder::new()
.metrics_flush_interval(60);Sourcepub fn build(self) -> CarbonResult<Pipeline>
pub fn build(self) -> CarbonResult<Pipeline>
Builds and returns a Pipeline configured with the specified components.
After configuring the PipelineBuilder with data sources, pipes, and metrics,
call this method to create the final Pipeline instance ready for operation.
§Returns
Returns a CarbonResult<Pipeline> containing the configured Pipeline, or an error
if any part of the configuration is invalid.
§Example
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.build()?Auto Trait Implementations§
impl Freeze for PipelineBuilder
impl !RefUnwindSafe for PipelineBuilder
impl Send for PipelineBuilder
impl Sync for PipelineBuilder
impl Unpin for PipelineBuilder
impl !UnwindSafe for PipelineBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more