pub struct Pipeline {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: Arc<MetricsCollection>,
pub metrics_flush_interval: Option<u64>,
pub datasource_cancellation_token: Option<CancellationToken>,
pub shutdown_strategy: ShutdownStrategy,
pub channel_buffer_size: usize,
}
Expand description
Represents the primary data processing pipeline in the carbon-core
framework.
The Pipeline
struct is responsible for orchestrating the flow of data from
various sources, processing it through multiple pipes (for accounts,
transactions, instructions, and account deletions), and recording metrics at
each stage. This flexible design allows for customized data processing,
handling a variety of update types with minimal boilerplate code.
§Overview
A Pipeline
instance includes collections of data sources and processing
pipes, enabling users to configure the pipeline to handle diverse types of
blockchain-related data. Each pipe is responsible for decoding, processing,
and routing specific data types, while the metrics system records relevant
statistics.
§Key Concepts
- Datasources: These provide the raw data, such as account updates, transaction details, and account deletions.
- Pipes: Modular units that handle specific data types:
AccountPipes
for account updates.AccountDeletionPipes
for account deletions.InstructionPipes
for instruction data within transactions.TransactionPipes
for entire transaction payloads.
- Metrics: Collect performance data, enabling real-time insights and efficient monitoring.
§Fields
datasources
: A vector of data sources (Datasource
implementations) that provide the data for processing. Each data source must be wrapped in anArc
for safe, concurrent access.account_pipes
: A vector ofAccountPipes
, each responsible for handling account updates.account_deletion_pipes
: A vector ofAccountDeletionPipes
to handle deletion events.block_details_pipes
: A vector ofBlockDetailsPipes
to handle block details.instruction_pipes
: A vector ofInstructionPipes
for processing instructions within transactions. These pipes work with nested instructions and are generically defined to support varied instruction types.transaction_pipes
: A vector ofTransactionPipes
responsible for processing complete transaction payloads.metrics
: A vector ofMetrics
implementations to record and track performance data. Each metrics instance is managed within anArc
to ensure thread safety.metrics_flush_interval
: An optional interval, in seconds, defining how frequently metrics should be flushed. IfNone
, the default interval is used.channel_buffer_size
: The size of the channel buffer for the pipeline. If not set, a default size of 10_000 will be used.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.channel_buffer_size(1000)
.build()?
.run()
.await?;
§Notes
- Ensure that each data source and pipe implements the required traits, such
as
Datasource
,AccountPipes
, andMetrics
, as appropriate. - The pipeline is designed for concurrent operation, utilizing
Arc
andBox
types to handle shared ownership and trait object storage. - The
metrics_flush_interval
controls how frequently the pipeline’s metrics are flushed. IfNone
, a default interval (usually 5 seconds) is used.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>
§account_pipes: Vec<Box<dyn AccountPipes>>
§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>
§block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>
§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>
§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>
§metrics: Arc<MetricsCollection>
§metrics_flush_interval: Option<u64>
§datasource_cancellation_token: Option<CancellationToken>
§shutdown_strategy: ShutdownStrategy
§channel_buffer_size: usize
Implementations§
Source§impl Pipeline
impl Pipeline
Sourcepub fn builder() -> PipelineBuilder
pub fn builder() -> PipelineBuilder
Creates a new PipelineBuilder
instance for constructing a Pipeline
.
The builder
method returns a PipelineBuilder
that allows you to
configure and customize the pipeline components before building the
final Pipeline
object. This approach provides a flexible and
type-safe way to assemble a pipeline by specifying data sources,
processing pipes, and metrics.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
);
// ...
§Returns
Returns a PipelineBuilder
instance with empty collections for data
sources, pipes, and metrics. You can then configure each component
using the builder pattern.
Sourcepub async fn run(&mut self) -> CarbonResult<()>
pub async fn run(&mut self) -> CarbonResult<()>
Runs the Pipeline
, processing updates from data sources and handling
metrics.
The run
method initializes the pipeline’s metrics system and starts
listening for updates from the configured data sources. It checks
the types of updates provided by each data source to ensure that the
required data types are available for processing. The method then
enters a loop where it processes each update received from the data
sources in turn, logging and updating metrics based on the success
or failure of each operation.
§How it Works
- Initializes metrics and sets up an interval for periodic metric flushing.
- Spawns tasks for each data source to continuously consume updates.
- Processes updates according to their type (e.g., Account, Transaction, or AccountDeletion).
- Records performance metrics such as update processing times, and tracks success and failure counts.
§Errors
The method returns an Err
variant if:
- Required update types (e.g.,
AccountUpdate
,AccountDeletion
,Transaction
) are not provided by any data source, causing a mismatch in expected data processing capabilities. - A data source encounters an error while consuming updates.
- An error occurs during metrics flushing or processing of updates.
§Example
use carbon_core::pipeline::Pipeline;
let mut pipeline = Pipeline::builder()
.datasource(MyDatasource::new())
.metrics(MyMetrics::new())
.build()
.expect("Failed to build pipeline");
// Running the pipeline asynchronously
tokio::spawn(async move {
if let Err(e) = pipeline.run().await {
eprintln!("Pipeline run error: {:?}", e);
}
});
§Notes
- This method is asynchronous and should be awaited within a Tokio runtime environment.
- The pipeline monitors metrics and flushes them based on the configured
metrics_flush_interval
. - The
run
method operates in an infinite loop, handling updates until a termination condition occurs.
Auto Trait Implementations§
impl Freeze for Pipeline
impl !RefUnwindSafe for Pipeline
impl Send for Pipeline
impl Sync for Pipeline
impl Unpin for Pipeline
impl !UnwindSafe for Pipeline
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more