pub struct Pipeline {
pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
pub account_pipes: Vec<Box<dyn AccountPipes>>,
pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
pub metrics: Arc<MetricsCollection>,
pub metrics_flush_interval: Option<u64>,
pub shutdown_strategy: ShutdownStrategy,
}Expand description
Represents the primary data processing pipeline in the carbon-core framework.
The Pipeline struct is responsible for orchestrating the flow of data from various
sources, processing it through multiple pipes (for accounts, transactions, instructions,
and account deletions), and recording metrics at each stage. This flexible design
allows for customized data processing, handling a variety of update types with minimal
boilerplate code.
§Overview
A Pipeline instance includes collections of data sources and processing pipes, enabling
users to configure the pipeline to handle diverse types of blockchain-related data. Each
pipe is responsible for decoding, processing, and routing specific data types, while the
metrics system records relevant statistics.
§Key Concepts
- Datasources: These provide the raw data, such as account updates, transaction details, and account deletions.
- Pipes: Modular units that handle specific data types:
AccountPipesfor account updates.AccountDeletionPipesfor account deletions.InstructionPipesfor instruction data within transactions.TransactionPipesfor entire transaction payloads.
- Metrics: Collect performance data, enabling real-time insights and efficient monitoring.
§Fields
datasources: A vector of data sources (Datasourceimplementations) that provide the data for processing. Each data source must be wrapped in anArcfor safe, concurrent access.account_pipes: A vector ofAccountPipes, each responsible for handling account updates.account_deletion_pipes: A vector ofAccountDeletionPipesto handle deletion events.instruction_pipes: A vector ofInstructionPipesfor processing instructions within transactions. These pipes work with nested instructions and are generically defined to support varied instruction types.transaction_pipes: A vector ofTransactionPipesresponsible for processing complete transaction payloads.metrics: A vector ofMetricsimplementations to record and track performance data. Each metrics instance is managed within anArcto ensure thread safety.metrics_flush_interval: An optional interval, in seconds, defining how frequently metrics should be flushed. IfNone, the default interval is used.
§Example
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
.account(
TestProgramDecoder,
TestProgramAccountProcessor
)
.transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
.account_deletions(TestProgramAccountDeletionProcessor)
.build()?
.run()
.await?;§Notes
- Ensure that each data source and pipe implements the required traits, such as
Datasource,AccountPipes, andMetrics, as appropriate. - The pipeline is designed for concurrent operation, utilizing
ArcandBoxtypes to handle shared ownership and trait object storage. - The
metrics_flush_intervalcontrols how frequently the pipeline’s metrics are flushed. IfNone, a default interval (usually 5 seconds) is used.
Fields§
§datasources: Vec<Arc<dyn Datasource + Send + Sync>>§account_pipes: Vec<Box<dyn AccountPipes>>§account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>§instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>§transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>§metrics: Arc<MetricsCollection>§metrics_flush_interval: Option<u64>§shutdown_strategy: ShutdownStrategyImplementations§
Source§impl Pipeline
impl Pipeline
Sourcepub fn builder() -> PipelineBuilder
pub fn builder() -> PipelineBuilder
Creates a new PipelineBuilder instance for constructing a Pipeline.
The builder method returns a PipelineBuilder that allows you to configure
and customize the pipeline components before building the final Pipeline object.
This approach provides a flexible and type-safe way to assemble a pipeline
by specifying data sources, processing pipes, and metrics.
§Example
carbon_core::pipeline::Pipeline::builder()
.datasource(transaction_crawler)
.metrics(Arc::new(LogMetrics::new()))
.metrics(Arc::new(PrometheusMetrics::new()))
.instruction(
TestProgramDecoder,
TestProgramProcessor
)
// ...§Returns
Returns a PipelineBuilder instance with empty collections for data sources,
pipes, and metrics. You can then configure each component using the builder pattern.
Sourcepub async fn run(&mut self) -> CarbonResult<()>
pub async fn run(&mut self) -> CarbonResult<()>
Runs the Pipeline, processing updates from data sources and handling metrics.
The run method initializes the pipeline’s metrics system and starts listening for
updates from the configured data sources. It checks the types of updates provided
by each data source to ensure that the required data types are available for
processing. The method then enters a loop where it processes each update received
from the data sources in turn, logging and updating metrics based on the success
or failure of each operation.
§How it Works
- Initializes metrics and sets up an interval for periodic metric flushing.
- Spawns tasks for each data source to continuously consume updates.
- Processes updates according to their type (e.g., Account, Transaction, or AccountDeletion).
- Records performance metrics such as update processing times, and tracks success and failure counts.
§Errors
The method returns an Err variant if:
- Required update types (e.g.,
AccountUpdate,AccountDeletion,Transaction) are not provided by any data source, causing a mismatch in expected data processing capabilities. - A data source encounters an error while consuming updates.
- An error occurs during metrics flushing or processing of updates.
§Example
use carbon_core::Pipeline;
let mut pipeline = Pipeline::builder()
.datasource(MyDatasource::new())
.metrics(MyMetrics::new())
.build()
.expect("Failed to build pipeline");
// Running the pipeline asynchronously
tokio::spawn(async move {
if let Err(e) = pipeline.run().await {
eprintln!("Pipeline run error: {:?}", e);
}
});§Notes
- This method is asynchronous and should be awaited within a Tokio runtime environment.
- The pipeline monitors metrics and flushes them based on the configured
metrics_flush_interval. - The
runmethod operates in an infinite loop, handling updates until a termination condition occurs.
Auto Trait Implementations§
impl Freeze for Pipeline
impl !RefUnwindSafe for Pipeline
impl Send for Pipeline
impl Sync for Pipeline
impl Unpin for Pipeline
impl !UnwindSafe for Pipeline
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more