pub struct Pipeline {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: Arc<MetricsCollection>,
pub metrics_flush_interval: Option<u64>,
pub datasource_cancellation_token: Option<CancellationToken>,
pub shutdown_strategy: ShutdownStrategy,
pub channel_buffer_size: usize,
}Expand description
Represents the primary data processing pipeline in the carbon-core
framework.
The Pipeline struct is responsible for orchestrating the flow of data from
various sources, processing it through multiple pipes (for accounts,
transactions, instructions, and account deletions), and recording metrics at
each stage. This flexible design allows for customized data processing,
handling a variety of update types with minimal boilerplate code.
§Overview
A Pipeline instance includes collections of data sources and processing
pipes, enabling users to configure the pipeline to handle diverse types of
blockchain-related data. Each pipe is responsible for decoding, processing,
and routing specific data types, while the metrics system records relevant
statistics.
§Key Concepts
- Datasources: These provide the raw data, such as account updates, transaction details, and account deletions.
- Pipes: Modular units that handle specific data types:
AccountPipesfor account updates.AccountDeletionPipesfor account deletions.InstructionPipesfor instruction data within transactions.TransactionPipesfor entire transaction payloads.
- Metrics: Collect performance data, enabling real-time insights and efficient monitoring.
§Fields
datasources: A vector of data sources (Datasourceimplementations) that provide the data for processing. Each data source must be wrapped in anArcfor safe, concurrent access.account_pipes: A vector ofAccountPipes, each responsible for handling account updates.account_deletion_pipes: A vector ofAccountDeletionPipesto handle deletion events.block_details_pipes: A vector ofBlockDetailsPipesto handle block details.instruction_pipes: A vector ofInstructionPipesfor processing instructions within transactions. These pipes work with nested instructions and are generically defined to support varied instruction types.transaction_pipes: A vector ofTransactionPipesresponsible for processing complete transaction payloads.metrics: A vector ofMetricsimplementations to record and track performance data. Each metrics instance is managed within anArcto ensure thread safety.metrics_flush_interval: An optional interval, in seconds, defining how frequently metrics should be flushed. IfNone, the default interval is used.channel_buffer_size: The size of the channel buffer for the pipeline. If not set, a default size of 10_000 will be used.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.channel_buffer_size(1000)
.build()?
.run()
.await?;§Notes
- Ensure that each data source and pipe implements the required traits, such
as
Datasource,AccountPipes, andMetrics, as appropriate. - The pipeline is designed for concurrent operation, utilizing
ArcandBoxtypes to handle shared ownership and trait object storage. - The
metrics_flush_intervalcontrols how frequently the pipeline’s metrics are flushed. IfNone, a default interval (usually 5 seconds) is used.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>§account_pipes: Vec<Box<dyn AccountPipes>>§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>§block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>§metrics: Arc<MetricsCollection>§metrics_flush_interval: Option<u64>§datasource_cancellation_token: Option<CancellationToken>§shutdown_strategy: ShutdownStrategy§channel_buffer_size: usizeImplementations§
Source§impl Pipeline
impl Pipeline
Sourcepub fn builder() -> PipelineBuilder
pub fn builder() -> PipelineBuilder
Creates a new PipelineBuilder instance for constructing a Pipeline.
The builder method returns a PipelineBuilder that allows you to
configure and customize the pipeline components before building the
final Pipeline object. This approach provides a flexible and
type-safe way to assemble a pipeline by specifying data sources,
processing pipes, and metrics.
§Example
use std::sync::Arc;
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
);
// ...§Returns
Returns a PipelineBuilder instance with empty collections for data
sources, pipes, and metrics. You can then configure each component
using the builder pattern.
Sourcepub async fn run(&mut self) -> CarbonResult<()>
pub async fn run(&mut self) -> CarbonResult<()>
Runs the Pipeline, processing updates from data sources and handling
metrics.
The run method initializes the pipeline’s metrics system and starts
listening for updates from the configured data sources. It checks
the types of updates provided by each data source to ensure that the
required data types are available for processing. The method then
enters a loop where it processes each update received from the data
sources in turn, logging and updating metrics based on the success
or failure of each operation.
§How it Works
- Initializes metrics and sets up an interval for periodic metric flushing.
- Spawns tasks for each data source to continuously consume updates.
- Processes updates according to their type (e.g., Account, Transaction, or AccountDeletion).
- Records performance metrics such as update processing times, and tracks success and failure counts.
§Errors
The method returns an Err variant if:
- Required update types (e.g.,
AccountUpdate,AccountDeletion,Transaction) are not provided by any data source, causing a mismatch in expected data processing capabilities. - A data source encounters an error while consuming updates.
- An error occurs during metrics flushing or processing of updates.
§Example
use carbon_core::pipeline::Pipeline;
let mut pipeline = Pipeline::builder()
.datasource(MyDatasource::new())
.metrics(MyMetrics::new())
.build()
.expect("Failed to build pipeline");
// Running the pipeline asynchronously
tokio::spawn(async move {
if let Err(e) = pipeline.run().await {
eprintln!("Pipeline run error: {:?}", e);
}
});§Notes
- This method is asynchronous and should be awaited within a Tokio runtime environment.
- The pipeline monitors metrics and flushes them based on the configured
metrics_flush_interval. - The
runmethod operates in an infinite loop, handling updates until a termination condition occurs.
Auto Trait Implementations§
impl Freeze for Pipeline
impl !RefUnwindSafe for Pipeline
impl Send for Pipeline
impl Sync for Pipeline
impl Unpin for Pipeline
impl !UnwindSafe for Pipeline
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more