pub struct PipelineBuilder {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: MetricsCollection,
pub metrics_flush_interval: Option<u64>,
pub shutdown_strategy: ShutdownStrategy,
}
Expand description
A builder for constructing a Pipeline
instance with customized data
sources, processing pipes, and metrics.
The PipelineBuilder
struct offers a flexible way to assemble a Pipeline
by allowing configuration of its components, such as data sources, account
and transaction pipes, deletion handling, and metrics. Using the builder
pattern, you can add the desired elements incrementally and then finalize
with a call to build
.
§Overview
The PipelineBuilder
supports the following components:
- Datasources: Sources of data updates, such as account information and transactions.
- Account Pipes: For processing account updates from data sources.
- Account Deletion Pipes: For handling account deletion updates.
- Instruction Pipes: For handling instructions associated with transactions.
- Transaction Pipes: For handling full transaction data.
- Metrics: Collects and reports performance data, such as update processing times.
- Metrics Flush Interval: Optional interval defining how often to flush metrics data.
Each component can be added through method chaining, enhancing code readability and maintainability.
§Example
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
// ...
§Fields
datasources
: A collection ofDatasource
objects wrapped inArc
for shared ownership across threads. EachDatasource
provides updates to the pipeline.account_pipes
: A collection ofAccountPipes
to handle account updates.account_deletion_pipes
: A collection ofAccountDeletionPipes
for processing account deletions.instruction_pipes
: A collection ofInstructionPipes
to process instructions in transactions.transaction_pipes
: A collection ofTransactionPipes
to process full transaction data.metrics
: A vector ofMetrics
implementations for tracking pipeline performance.metrics_flush_interval
: An optional interval (in seconds) for flushing metrics data. If not set, a default flush interval will be used.
§Returns
After configuring the builder, call build
to create a Pipeline
instance.
The builder will return a CarbonResult<Pipeline>
, which will either
contain the configured pipeline or an error if configuration failed.
§Notes
- The builder pattern allows for method chaining, making it easy to
incrementally add components to the
Pipeline
. - Ensure that each component matches the data and update types expected by your application.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>
§account_pipes: Vec<Box<dyn AccountPipes>>
§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>
§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>
§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>
§metrics: MetricsCollection
§metrics_flush_interval: Option<u64>
§shutdown_strategy: ShutdownStrategy
Implementations§
Source§impl PipelineBuilder
impl PipelineBuilder
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new PipelineBuilder
with empty collections for datasources,
pipes, and metrics.
This method initializes a PipelineBuilder
instance, allowing you to
configure each component of a Pipeline
before building it. The
builder pattern offers flexibility in adding data sources, account
and transaction handling pipes, deletion processing, and metrics
collection features.
§Example
let builder = PipelineBuilder::new();
Sourcepub fn datasource(self, datasource: impl Datasource + 'static) -> Self
pub fn datasource(self, datasource: impl Datasource + 'static) -> Self
Adds a datasource to the pipeline.
The datasource is responsible for providing updates, such as account and transaction data, to the pipeline. Multiple datasources can be added to handle various types of updates.
§Parameters
datasource
: The data source to add, implementing theDatasource
trait.
§Example
let builder = PipelineBuilder::new()
.datasource(MyDatasource::new());
Sourcepub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
pub fn shutdown_strategy(self, shutdown_strategy: ShutdownStrategy) -> Self
Sets the shutdown strategy for the pipeline.
This method configures how the pipeline should handle shutdowns. The shutdown strategy defines whether the pipeline should terminate immediately or continue processing pending updates after terminating the data sources.
§Parameters
shutdown_strategy
: A variant ofShutdownStrategy
that determines how the pipeline should handle shutdowns.
§Returns
Returns Self
, allowing for method chaining.
§Notes
- Use
ShutdownStrategy::Immediate
to stop the entire pipeline instantly, including all active processing tasks. - Use
ShutdownStrategy::ProcessPending
(the default) to terminate data sources first and allow the pipeline to finish processing any updates that are still pending.
Sourcepub fn account<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn account<T: Send + Sync + 'static>( self, decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static, processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an account pipe to process account updates.
Account pipes decode and process updates to accounts within the
pipeline. This method requires both an AccountDecoder
and a
Processor
to handle decoded account data.
§Parameters
decoder
: AnAccountDecoder
that decodes the account data.processor
: AProcessor
that processes the decoded account data.
§Example
let builder = PipelineBuilder::new()
.account(MyAccountDecoder, MyAccountProcessor);
Sourcepub fn account_deletions(
self,
processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
) -> Self
pub fn account_deletions( self, processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static, ) -> Self
Adds an account deletion pipe to handle account deletion events.
Account deletion pipes process deletions of accounts, with a Processor
to handle the deletion events as they occur.
§Parameters
processor
: AProcessor
that processes account deletion events.
§Example
let builder = PipelineBuilder::new()
.account_deletions(MyAccountDeletionProcessor);
Sourcepub fn instruction<T: Send + Sync + 'static>(
self,
decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
) -> Self
pub fn instruction<T: Send + Sync + 'static>( self, decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static, processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static, ) -> Self
Adds an instruction pipe to process instructions within transactions.
Instruction pipes decode and process individual instructions, enabling specialized handling of various instruction types.
§Parameters
decoder
: AnInstructionDecoder
for decoding instructions from transaction data.processor
: AProcessor
that processes decoded instruction data.
§Example
let builder = PipelineBuilder::new()
.instruction(MyDecoder, MyInstructionProcessor);
Sourcepub fn transaction<T, U>(
self,
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static,
schema: Option<TransactionSchema<T>>,
) -> Self
pub fn transaction<T, U>( self, processor: impl Processor<InputType = TransactionProcessorInputType<T, U>> + Send + Sync + 'static, schema: Option<TransactionSchema<T>>, ) -> Self
Adds a transaction pipe for processing full transaction data.
This method requires a transaction schema for decoding and a Processor
to handle the processed transaction data.
§Parameters
schema
: ATransactionSchema
used to match and interpret transaction data.processor
: AProcessor
that processes the decoded transaction data.
§Example
let builder = PipelineBuilder::new()
.transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
Sourcepub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
pub fn metrics(self, metrics: Arc<dyn Metrics>) -> Self
Adds a metrics component to the pipeline for performance tracking.
This component collects and reports on pipeline metrics, providing insights into performance and operational statistics.
§Parameters
metrics
: An instance of aMetrics
implementation, used to gather and report metrics.
§Example
let builder = PipelineBuilder::new()
.metrics(Arc::new(LogMetrics::new()));
Sourcepub fn metrics_flush_interval(self, interval: u64) -> Self
pub fn metrics_flush_interval(self, interval: u64) -> Self
Sets the interval for flushing metrics data.
This value defines the frequency, in seconds, at which metrics data is flushed from memory. If not set, a default interval is used.
§Parameters
interval
: The flush interval for metrics, in seconds.
§Example
let builder = PipelineBuilder::new()
.metrics_flush_interval(60);
Sourcepub fn build(self) -> CarbonResult<Pipeline>
pub fn build(self) -> CarbonResult<Pipeline>
Builds and returns a Pipeline
configured with the specified
components.
After configuring the PipelineBuilder
with data sources, pipes, and
metrics, call this method to create the final Pipeline
instance
ready for operation.
§Returns
Returns a CarbonResult<Pipeline>
containing the configured Pipeline
,
or an error if any part of the configuration is invalid.
§Example
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.build()?
Trait Implementations§
Source§impl Default for PipelineBuilder
impl Default for PipelineBuilder
Source§fn default() -> PipelineBuilder
fn default() -> PipelineBuilder
Auto Trait Implementations§
impl Freeze for PipelineBuilder
impl !RefUnwindSafe for PipelineBuilder
impl Send for PipelineBuilder
impl Sync for PipelineBuilder
impl Unpin for PipelineBuilder
impl !UnwindSafe for PipelineBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more