carbon_core/pipeline.rs
1//! Deprocess(ines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//! transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//! and process specific types of data. Account pipes handle account updates,
25//! instruction pipes process instructions within transactions, and
26//! transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//! times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//! for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//! deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//! on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//! Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//! (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//! wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//! pipeline performance, especially in production environments.
52
53use crate::block_details::{BlockDetailsPipe, BlockDetailsPipes};
54use crate::datasource::BlockDetails;
55use {
56 crate::{
57 account::{
58 AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
59 },
60 account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
61 collection::InstructionDecoderCollection,
62 datasource::{AccountDeletion, Datasource, Update},
63 error::CarbonResult,
64 instruction::{
65 InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
66 InstructionsWithMetadata, NestedInstructions,
67 },
68 metrics::{Metrics, MetricsCollection},
69 processor::Processor,
70 schema::TransactionSchema,
71 transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
72 transformers,
73 },
74 core::time,
75 serde::de::DeserializeOwned,
76 std::{convert::TryInto, sync::Arc, time::Instant},
77 tokio_util::sync::CancellationToken,
78};
79
80/// Defines the shutdown behavior for the pipeline.
81///
82/// `ShutdownStrategy` determines how the pipeline will behave when it receives
83/// a shutdown signal. It supports two modes:
84///
85/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
86/// - `ProcessPending`: Terminates the data sources, then completes processing
87/// of any updates currently pending in the pipeline. This is the default
88/// behavior.
89///
90/// # Variants
91///
92/// - `Immediate`: Immediately stops all pipeline activity without processing
93/// any remaining updates.
94/// - `ProcessPending`: Gracefully terminates the data sources and allows the
95/// pipeline to finish processing updates that are still in progress or
96/// queued.
97///
98/// # Notes
99///
100/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
101/// that no updates are lost during shutdown.
102#[derive(Default, PartialEq, Debug)]
103pub enum ShutdownStrategy {
104 /// Stop the whole pipeline immediately.
105 Immediate,
106 /// Terminate the datasource(s) and finish processing all pending updates.
107 #[default]
108 ProcessPending,
109}
110
111/// The default size of the channel buffer for the pipeline.
112///
113/// This constant defines the default number of updates that can be queued in
114/// the pipeline's channel buffer. It is used as a fallback value if the
115/// `channel_buffer_size` is not explicitly set during pipeline construction.
116///
117/// The default size is 10,000 updates, which provides a reasonable balance
118pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
119
120/// Represents the primary data processing pipeline in the `carbon-core`
121/// framework.
122///
123/// The `Pipeline` struct is responsible for orchestrating the flow of data from
124/// various sources, processing it through multiple pipes (for accounts,
125/// transactions, instructions, and account deletions), and recording metrics at
126/// each stage. This flexible design allows for customized data processing,
127/// handling a variety of update types with minimal boilerplate code.
128///
129/// ## Overview
130///
131/// A `Pipeline` instance includes collections of data sources and processing
132/// pipes, enabling users to configure the pipeline to handle diverse types of
133/// blockchain-related data. Each pipe is responsible for decoding, processing,
134/// and routing specific data types, while the metrics system records relevant
135/// statistics.
136///
137/// ### Key Concepts
138///
139/// - **Datasources**: These provide the raw data, such as account updates,
140/// transaction details, and account deletions.
141/// - **Pipes**: Modular units that handle specific data types:
142/// - `AccountPipes` for account updates.
143/// - `AccountDeletionPipes` for account deletions.
144/// - `InstructionPipes` for instruction data within transactions.
145/// - `TransactionPipes` for entire transaction payloads.
146/// - **Metrics**: Collect performance data, enabling real-time insights and
147/// efficient monitoring.
148///
149/// ## Fields
150///
151/// - `datasources`: A vector of data sources (`Datasource` implementations)
152/// that provide the data for processing. Each data source must be wrapped in
153/// an `Arc` for safe, concurrent access.
154/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
155/// account updates.
156/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
157/// deletion events.
158/// - `block_details_pipes`: A vector of `BlockDetailsPipes` to handle
159/// block details.
160/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
161/// instructions within transactions. These pipes work with nested
162/// instructions and are generically defined to support varied instruction
163/// types.
164/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
165/// processing complete transaction payloads.
166/// - `metrics`: A vector of `Metrics` implementations to record and track
167/// performance data. Each metrics instance is managed within an `Arc` to
168/// ensure thread safety.
169/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
170/// frequently metrics should be flushed. If `None`, the default interval is
171/// used.
172/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
173/// not set, a default size of 10_000 will be used.
174///
175/// ## Example
176///
177/// ```ignore
178/// use std::sync::Arc;
179///
180/// carbon_core::pipeline::Pipeline::builder()
181/// .datasource(transaction_crawler)
182/// .metrics(Arc::new(LogMetrics::new()))
183/// .metrics(Arc::new(PrometheusMetrics::new()))
184/// .instruction(
185/// TestProgramDecoder,
186/// TestProgramProcessor
187/// )
188/// .account(
189/// TestProgramDecoder,
190/// TestProgramAccountProcessor
191/// )
192/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
193/// .account_deletions(TestProgramAccountDeletionProcessor)
194/// .channel_buffer_size(1000)
195/// .build()?
196/// .run()
197/// .await?;
198/// ```
199///
200/// ## Notes
201///
202/// - Ensure that each data source and pipe implements the required traits, such
203/// as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
204/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
205/// `Box` types to handle shared ownership and trait object storage.
206/// - The `metrics_flush_interval` controls how frequently the pipeline's
207/// metrics are flushed. If `None`, a default interval (usually 5 seconds) is
208/// used.
209pub struct Pipeline {
210 pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
211 pub account_pipes: Vec<Box<dyn AccountPipes>>,
212 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
213 pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
214 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
215 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
216 pub metrics: Arc<MetricsCollection>,
217 pub metrics_flush_interval: Option<u64>,
218 pub datasource_cancellation_token: Option<CancellationToken>,
219 pub shutdown_strategy: ShutdownStrategy,
220 pub channel_buffer_size: usize,
221}
222
223impl Pipeline {
224 /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
225 ///
226 /// The `builder` method returns a `PipelineBuilder` that allows you to
227 /// configure and customize the pipeline components before building the
228 /// final `Pipeline` object. This approach provides a flexible and
229 /// type-safe way to assemble a pipeline by specifying data sources,
230 /// processing pipes, and metrics.
231 ///
232 /// # Example
233 ///
234 /// ```ignore
235 /// use std::sync::Arc;
236 ///
237 /// carbon_core::pipeline::Pipeline::builder()
238 /// .datasource(transaction_crawler)
239 /// .metrics(Arc::new(LogMetrics::new()))
240 /// .metrics(Arc::new(PrometheusMetrics::new()))
241 /// .instruction(
242 /// TestProgramDecoder,
243 /// TestProgramProcessor
244 /// );
245 /// // ...
246 /// ```
247 ///
248 /// # Returns
249 ///
250 /// Returns a `PipelineBuilder` instance with empty collections for data
251 /// sources, pipes, and metrics. You can then configure each component
252 /// using the builder pattern.
253 pub fn builder() -> PipelineBuilder {
254 log::trace!("Pipeline::builder()");
255 PipelineBuilder {
256 datasources: Vec::new(),
257 account_pipes: Vec::new(),
258 account_deletion_pipes: Vec::new(),
259 block_details_pipes: Vec::new(),
260 instruction_pipes: Vec::new(),
261 transaction_pipes: Vec::new(),
262 metrics: MetricsCollection::default(),
263 metrics_flush_interval: None,
264 datasource_cancellation_token: None,
265 shutdown_strategy: ShutdownStrategy::default(),
266 channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
267 }
268 }
269
270 /// Runs the `Pipeline`, processing updates from data sources and handling
271 /// metrics.
272 ///
273 /// The `run` method initializes the pipeline’s metrics system and starts
274 /// listening for updates from the configured data sources. It checks
275 /// the types of updates provided by each data source to ensure that the
276 /// required data types are available for processing. The method then
277 /// enters a loop where it processes each update received from the data
278 /// sources in turn, logging and updating metrics based on the success
279 /// or failure of each operation.
280 ///
281 /// # How it Works
282 ///
283 /// - Initializes metrics and sets up an interval for periodic metric
284 /// flushing.
285 /// - Spawns tasks for each data source to continuously consume updates.
286 /// - Processes updates according to their type (e.g., Account, Transaction,
287 /// or AccountDeletion).
288 /// - Records performance metrics such as update processing times, and
289 /// tracks success and failure counts.
290 ///
291 /// # Errors
292 ///
293 /// The method returns an `Err` variant if:
294 /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
295 /// `Transaction`) are not provided by any data source, causing a mismatch
296 /// in expected data processing capabilities.
297 /// - A data source encounters an error while consuming updates.
298 /// - An error occurs during metrics flushing or processing of updates.
299 ///
300 /// # Example
301 ///
302 /// ```ignore
303 /// use carbon_core::pipeline::Pipeline;
304 ///
305 /// let mut pipeline = Pipeline::builder()
306 /// .datasource(MyDatasource::new())
307 /// .metrics(MyMetrics::new())
308 /// .build()
309 /// .expect("Failed to build pipeline");
310 ///
311 /// // Running the pipeline asynchronously
312 /// tokio::spawn(async move {
313 /// if let Err(e) = pipeline.run().await {
314 /// eprintln!("Pipeline run error: {:?}", e);
315 /// }
316 /// });
317 /// ```
318 ///
319 /// # Notes
320 ///
321 /// - This method is asynchronous and should be awaited within a Tokio
322 /// runtime environment.
323 /// - The pipeline monitors metrics and flushes them based on the configured
324 /// `metrics_flush_interval`.
325 /// - The `run` method operates in an infinite loop, handling updates until
326 /// a termination condition occurs.
327 pub async fn run(&mut self) -> CarbonResult<()> {
328 log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
329 self.datasources.len(),
330 self.metrics.metrics.len(),
331 self.account_pipes.len(),
332 self.account_deletion_pipes.len(),
333 self.instruction_pipes.len(),
334 self.transaction_pipes.len(),
335 );
336
337 log::trace!("run(self)");
338
339 self.metrics.initialize_metrics().await?;
340 let (update_sender, mut update_receiver) =
341 tokio::sync::mpsc::channel::<Update>(self.channel_buffer_size);
342
343 let datasource_cancellation_token = self
344 .datasource_cancellation_token
345 .clone()
346 .unwrap_or_default();
347
348 for datasource in &self.datasources {
349 let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
350 let sender_clone = update_sender.clone();
351 let datasource_clone = Arc::clone(datasource);
352 let metrics_collection = self.metrics.clone();
353
354 tokio::spawn(async move {
355 if let Err(e) = datasource_clone
356 .consume(
357 &sender_clone,
358 datasource_cancellation_token_clone,
359 metrics_collection,
360 )
361 .await
362 {
363 log::error!("error consuming datasource: {:?}", e);
364 }
365 });
366 }
367
368 let mut interval = tokio::time::interval(time::Duration::from_secs(
369 self.metrics_flush_interval.unwrap_or(5),
370 ));
371
372 loop {
373 tokio::select! {
374 _ = tokio::signal::ctrl_c() => {
375 log::trace!("received SIGINT, shutting down.");
376 datasource_cancellation_token.cancel();
377
378 if self.shutdown_strategy == ShutdownStrategy::Immediate {
379 log::info!("shutting down the pipeline immediately.");
380 self.metrics.flush_metrics().await?;
381 self.metrics.shutdown_metrics().await?;
382 break;
383 } else {
384 log::info!("shutting down the pipeline after processing pending updates.");
385 }
386 }
387 _ = interval.tick() => {
388 self.metrics.flush_metrics().await?;
389 }
390 update = update_receiver.recv() => {
391 match update {
392 Some(update) => {
393 self
394 .metrics.increment_counter("updates_received", 1)
395 .await?;
396
397 let start = Instant::now();
398 let process_result = self.process(update.clone()).await;
399 let time_taken_nanoseconds = start.elapsed().as_nanos();
400 let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
401
402 self
403 .metrics
404 .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
405 .await?;
406
407 self
408 .metrics
409 .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
410 .await?;
411
412 match process_result {
413 Ok(_) => {
414 self
415 .metrics.increment_counter("updates_successful", 1)
416 .await?;
417
418 log::trace!("processed update")
419 }
420 Err(error) => {
421 log::error!("error processing update ({:?}): {:?}", update, error);
422 self.metrics.increment_counter("updates_failed", 1).await?;
423 }
424 };
425
426 self
427 .metrics.increment_counter("updates_processed", 1)
428 .await?;
429
430 self
431 .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
432 .await?;
433 }
434 None => {
435 log::info!("update_receiver closed, shutting down.");
436 self.metrics.flush_metrics().await?;
437 self.metrics.shutdown_metrics().await?;
438 break;
439 }
440 }
441 }
442 }
443 }
444
445 log::info!("pipeline shutdown complete.");
446
447 Ok(())
448 }
449
450 /// Processes a single update and routes it through the appropriate pipeline
451 /// stages.
452 ///
453 /// The `process` method takes an `Update` and determines its type, then
454 /// routes it through the corresponding pipes for handling account
455 /// updates, transactions, or account deletions. It also records metrics
456 /// for processed updates, providing insights into the processing
457 /// workload and performance.
458 ///
459 /// ## Functionality
460 ///
461 /// - **Account Updates**: Passes account updates through the
462 /// `account_pipes`. Each pipe processes the account metadata and the
463 /// updated account state.
464 /// - **Transaction Updates**: Extracts transaction metadata and
465 /// instructions, nests them if needed, and routes them through
466 /// `instruction_pipes` and `transaction_pipes`.
467 /// - **Account Deletions**: Sends account deletion events through the
468 /// `account_deletion_pipes`.
469 ///
470 /// The method also updates metrics counters for each type of update,
471 /// tracking how many updates have been processed in each category.
472 ///
473 /// # Parameters
474 ///
475 /// - `update`: An `Update` variant representing the type of data received.
476 /// This can be an `Account`, `Transaction`, or `AccountDeletion`, each
477 /// triggering different processing logic within the pipeline.
478 ///
479 /// # Returns
480 ///
481 /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
482 /// processing or an error if processing fails at any stage.
483 ///
484 /// # Notes
485 ///
486 /// - This method is asynchronous and should be awaited within a Tokio
487 /// runtime.
488 /// - Each type of update (account, transaction, account deletion) requires
489 /// its own set of pipes, so ensure that appropriate pipes are configured
490 /// based on the data types expected from the data sources.
491 /// - Metrics are recorded after each successful processing stage to track
492 /// processing volumes and identify potential bottlenecks in real-time.
493 ///
494 /// # Errors
495 ///
496 /// Returns an error if any of the pipes fail during processing, or if an
497 /// issue arises while incrementing counters or updating metrics. Handle
498 /// errors gracefully to ensure continuous pipeline operation.
499 async fn process(&mut self, update: Update) -> CarbonResult<()> {
500 log::trace!("process(self, update: {:?})", update);
501 match update {
502 Update::Account(account_update) => {
503 let account_metadata = AccountMetadata {
504 slot: account_update.slot,
505 pubkey: account_update.pubkey,
506 };
507
508 for pipe in self.account_pipes.iter_mut() {
509 pipe.run(
510 (account_metadata.clone(), account_update.account.clone()),
511 self.metrics.clone(),
512 )
513 .await?;
514 }
515
516 self.metrics
517 .increment_counter("account_updates_processed", 1)
518 .await?;
519 }
520 Update::Transaction(transaction_update) => {
521 let transaction_metadata = Arc::new((*transaction_update).clone().try_into()?);
522
523 let instructions_with_metadata: InstructionsWithMetadata =
524 transformers::extract_instructions_with_metadata(
525 &transaction_metadata,
526 &transaction_update,
527 )?;
528
529 let nested_instructions: NestedInstructions = instructions_with_metadata.into();
530
531 for pipe in self.instruction_pipes.iter_mut() {
532 for nested_instruction in nested_instructions.iter() {
533 pipe.run(nested_instruction, self.metrics.clone()).await?;
534 }
535 }
536
537 for pipe in self.transaction_pipes.iter_mut() {
538 pipe.run(
539 transaction_metadata.clone(),
540 &nested_instructions,
541 self.metrics.clone(),
542 )
543 .await?;
544 }
545
546 self.metrics
547 .increment_counter("transaction_updates_processed", 1)
548 .await?;
549 }
550 Update::AccountDeletion(account_deletion) => {
551 for pipe in self.account_deletion_pipes.iter_mut() {
552 pipe.run(account_deletion.clone(), self.metrics.clone())
553 .await?;
554 }
555
556 self.metrics
557 .increment_counter("account_deletions_processed", 1)
558 .await?;
559 }
560 Update::BlockDetails(block_details) => {
561 for pipe in self.block_details_pipes.iter_mut() {
562 pipe.run(block_details.clone(), self.metrics.clone())
563 .await?;
564 }
565
566 self.metrics
567 .increment_counter("block_details_processed", 1)
568 .await?;
569 }
570 };
571
572 Ok(())
573 }
574}
575
576/// A builder for constructing a `Pipeline` instance with customized data
577/// sources, processing pipes, and metrics.
578///
579/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
580/// by allowing configuration of its components, such as data sources, account
581/// and transaction pipes, deletion handling, and metrics. Using the builder
582/// pattern, you can add the desired elements incrementally and then finalize
583/// with a call to `build`.
584///
585/// ## Overview
586///
587/// The `PipelineBuilder` supports the following components:
588/// - **Datasources**: Sources of data updates, such as account information and
589/// transactions.
590/// - **Account Pipes**: For processing account updates from data sources.
591/// - **Account Deletion Pipes**: For handling account deletion updates.
592/// - **Instruction Pipes**: For handling instructions associated with
593/// transactions.
594/// - **Transaction Pipes**: For handling full transaction data.
595/// - **Metrics**: Collects and reports performance data, such as update
596/// processing times.
597/// - **Metrics Flush Interval**: Optional interval defining how often to flush
598/// metrics data.
599///
600/// Each component can be added through method chaining, enhancing code
601/// readability and maintainability.
602///
603/// # Example
604///
605/// ```ignore
606/// use std::sync::Arc;
607///
608/// carbon_core::pipeline::Pipeline::builder()
609/// .datasource(transaction_crawler)
610/// .metrics(Arc::new(LogMetrics::new()))
611/// .metrics(Arc::new(PrometheusMetrics::new()))
612/// .instruction(
613/// TestProgramDecoder,
614/// TestProgramProcessor
615/// );
616/// // ...
617/// ```
618///
619/// # Fields
620///
621/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
622/// shared ownership across threads. Each `Datasource` provides updates to the
623/// pipeline.
624/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
625/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
626/// processing account deletions.
627/// - `instruction_pipes`: A collection of `InstructionPipes` to process
628/// instructions in transactions.
629/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
630/// transaction data.
631/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
632/// performance.
633/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
634/// metrics data. If not set, a default flush interval will be used.
635/// - `datasource_cancellation_token`: An optional `CancellationToken` for
636/// canceling datasource. If not set, a default `CancellationToken` will be
637/// used.
638/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
639/// not set, a default size of 10_000 will be used.
640///
641/// # Returns
642///
643/// After configuring the builder, call `build` to create a `Pipeline` instance.
644/// The builder will return a `CarbonResult<Pipeline>`, which will either
645/// contain the configured pipeline or an error if configuration failed.
646///
647/// # Notes
648///
649/// - The builder pattern allows for method chaining, making it easy to
650/// incrementally add components to the `Pipeline`.
651/// - Ensure that each component matches the data and update types expected by
652/// your application.
653#[derive(Default)]
654pub struct PipelineBuilder {
655 pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
656 pub account_pipes: Vec<Box<dyn AccountPipes>>,
657 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
658 pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
659 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
660 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
661 pub metrics: MetricsCollection,
662 pub metrics_flush_interval: Option<u64>,
663 pub datasource_cancellation_token: Option<CancellationToken>,
664 pub shutdown_strategy: ShutdownStrategy,
665 pub channel_buffer_size: usize,
666}
667
668impl PipelineBuilder {
669 /// Creates a new `PipelineBuilder` with empty collections for datasources,
670 /// pipes, and metrics.
671 ///
672 /// This method initializes a `PipelineBuilder` instance, allowing you to
673 /// configure each component of a `Pipeline` before building it. The
674 /// builder pattern offers flexibility in adding data sources, account
675 /// and transaction handling pipes, deletion processing, and metrics
676 /// collection features.
677 ///
678 /// # Example
679 ///
680 /// ```rust
681 /// use carbon_core::pipeline::PipelineBuilder;
682 ///
683 /// let builder = PipelineBuilder::new();
684 /// ```
685 pub fn new() -> Self {
686 log::trace!("PipelineBuilder::new()");
687 Self::default()
688 }
689
690 /// Adds a datasource to the pipeline.
691 ///
692 /// The datasource is responsible for providing updates, such as account and
693 /// transaction data, to the pipeline. Multiple datasources can be added
694 /// to handle various types of updates.
695 ///
696 /// # Parameters
697 ///
698 /// - `datasource`: The data source to add, implementing the `Datasource`
699 /// trait.
700 ///
701 /// # Example
702 ///
703 /// ```ignore
704 /// use carbon_core::pipeline::PipelineBuilder;
705 ///
706 /// let builder = PipelineBuilder::new()
707 /// .datasource(MyDatasource::new());
708 /// ```
709 pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
710 log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
711 self.datasources.push(Arc::new(datasource));
712 self
713 }
714
715 /// Sets the shutdown strategy for the pipeline.
716 ///
717 /// This method configures how the pipeline should handle shutdowns. The
718 /// shutdown strategy defines whether the pipeline should terminate
719 /// immediately or continue processing pending updates after terminating
720 /// the data sources.
721 ///
722 /// # Parameters
723 ///
724 /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
725 /// how the pipeline should handle shutdowns.
726 ///
727 /// # Returns
728 ///
729 /// Returns `Self`, allowing for method chaining.
730 ///
731 /// # Notes
732 ///
733 /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
734 /// instantly, including all active processing tasks.
735 /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
736 /// sources first and allow the pipeline to finish processing any updates
737 /// that are still pending.
738 pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
739 log::trace!(
740 "shutdown_strategy(self, shutdown_strategy: {:?})",
741 shutdown_strategy
742 );
743 self.shutdown_strategy = shutdown_strategy;
744 self
745 }
746
747 /// Adds an account pipe to process account updates.
748 ///
749 /// Account pipes decode and process updates to accounts within the
750 /// pipeline. This method requires both an `AccountDecoder` and a
751 /// `Processor` to handle decoded account data.
752 ///
753 /// # Parameters
754 ///
755 /// - `decoder`: An `AccountDecoder` that decodes the account data.
756 /// - `processor`: A `Processor` that processes the decoded account data.
757 ///
758 /// # Example
759 ///
760 /// ```ignore
761 /// use carbon_core::pipeline::PipelineBuilder;
762 ///
763 /// let builder = PipelineBuilder::new()
764 /// .account(MyAccountDecoder, MyAccountProcessor);
765 /// ```
766 pub fn account<T: Send + Sync + 'static>(
767 mut self,
768 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
769 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
770 ) -> Self {
771 log::trace!(
772 "account(self, decoder: {:?}, processor: {:?})",
773 stringify!(decoder),
774 stringify!(processor)
775 );
776 self.account_pipes.push(Box::new(AccountPipe {
777 decoder: Box::new(decoder),
778 processor: Box::new(processor),
779 }));
780 self
781 }
782
783 /// Adds an account deletion pipe to handle account deletion events.
784 ///
785 /// Account deletion pipes process deletions of accounts, with a `Processor`
786 /// to handle the deletion events as they occur.
787 ///
788 /// # Parameters
789 ///
790 /// - `processor`: A `Processor` that processes account deletion events.
791 ///
792 /// # Example
793 ///
794 /// ```ignore
795 /// use carbon_core::pipeline::PipelineBuilder;
796 ///
797 /// let builder = PipelineBuilder::new()
798 /// .account_deletions(MyAccountDeletionProcessor);
799 /// ```
800 pub fn account_deletions(
801 mut self,
802 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
803 ) -> Self {
804 log::trace!(
805 "account_deletions(self, processor: {:?})",
806 stringify!(processor)
807 );
808 self.account_deletion_pipes
809 .push(Box::new(AccountDeletionPipe {
810 processor: Box::new(processor),
811 }));
812 self
813 }
814
815 /// Adds a block details pipe to handle block details updates.
816 ///
817 /// Block details pipes process updates related to block metadata, such as
818 /// slot, block hash, and rewards, with a `Processor` to handle the updates.
819 ///
820 /// # Parameters
821 ///
822 /// - `processor`: A `Processor` that processes block details updates.
823 ///
824 /// # Example
825 ///
826 /// ```ignore
827 /// use carbon_core::pipeline::PipelineBuilder;
828 ///
829 /// let builder = PipelineBuilder::new()
830 /// .block_details(MyBlockDetailsProcessor);
831 /// ```
832 pub fn block_details(
833 mut self,
834 processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
835 ) -> Self {
836 log::trace!(
837 "block_details(self, processor: {:?})",
838 stringify!(processor)
839 );
840 self.block_details_pipes.push(Box::new(BlockDetailsPipe {
841 processor: Box::new(processor),
842 }));
843 self
844 }
845
846 /// Adds an instruction pipe to process instructions within transactions.
847 ///
848 /// Instruction pipes decode and process individual instructions,
849 /// enabling specialized handling of various instruction types.
850 ///
851 /// # Parameters
852 ///
853 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
854 /// transaction data.
855 /// - `processor`: A `Processor` that processes decoded instruction data.
856 ///
857 /// # Example
858 ///
859 /// ```ignore
860 /// use carbon_core::pipeline::PipelineBuilder;
861 ///
862 /// let builder = PipelineBuilder::new()
863 /// .instruction(MyDecoder, MyInstructionProcessor);
864 /// ```
865 pub fn instruction<T: Send + Sync + 'static>(
866 mut self,
867 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
868 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
869 ) -> Self {
870 log::trace!(
871 "instruction(self, decoder: {:?}, processor: {:?})",
872 stringify!(decoder),
873 stringify!(processor)
874 );
875 self.instruction_pipes.push(Box::new(InstructionPipe {
876 decoder: Box::new(decoder),
877 processor: Box::new(processor),
878 }));
879 self
880 }
881
882 /// Adds a transaction pipe for processing full transaction data.
883 ///
884 /// This method requires a transaction schema for decoding and a `Processor`
885 /// to handle the processed transaction data.
886 ///
887 /// # Parameters
888 ///
889 /// - `schema`: A `TransactionSchema` used to match and interpret
890 /// transaction data.
891 /// - `processor`: A `Processor` that processes the decoded transaction
892 /// data.
893 ///
894 /// # Example
895 ///
896 /// ```ignore
897 /// use carbon_core::pipeline::PipelineBuilder;
898 ///
899 /// let builder = PipelineBuilder::new()
900 /// .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
901 /// ```
902 pub fn transaction<T, U>(
903 mut self,
904 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
905 + Send
906 + Sync
907 + 'static,
908 schema: Option<TransactionSchema<T>>,
909 ) -> Self
910 where
911 T: InstructionDecoderCollection + 'static,
912 U: DeserializeOwned + Send + Sync + 'static,
913 {
914 log::trace!(
915 "transaction(self, schema: {:?}, processor: {:?})",
916 stringify!(schema),
917 stringify!(processor)
918 );
919 self.transaction_pipes
920 .push(Box::new(TransactionPipe::<T, U>::new(schema, processor)));
921 self
922 }
923
924 /// Adds a metrics component to the pipeline for performance tracking.
925 ///
926 /// This component collects and reports on pipeline metrics, providing
927 /// insights into performance and operational statistics.
928 ///
929 /// # Parameters
930 ///
931 /// - `metrics`: An instance of a `Metrics` implementation, used to gather
932 /// and report metrics.
933 ///
934 /// # Example
935 ///
936 /// ```ignore
937 /// use std::sync::Arc;
938 /// use carbon_core::pipeline::PipelineBuilder;
939 ///
940 /// let builder = PipelineBuilder::new()
941 /// .metrics(Arc::new(LogMetrics::new()));
942 /// ```
943 pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
944 log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
945 self.metrics.metrics.push(metrics);
946 self
947 }
948
949 /// Sets the interval for flushing metrics data.
950 ///
951 /// This value defines the frequency, in seconds, at which metrics data is
952 /// flushed from memory. If not set, a default interval is used.
953 ///
954 /// # Parameters
955 ///
956 /// - `interval`: The flush interval for metrics, in seconds.
957 ///
958 /// # Example
959 ///
960 /// ```rust
961 /// use carbon_core::pipeline::PipelineBuilder;
962 ///
963 /// let builder = PipelineBuilder::new()
964 /// .metrics_flush_interval(60);
965 /// ```
966 pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
967 log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
968 self.metrics_flush_interval = Some(interval);
969 self
970 }
971
972 /// Sets the cancellation token for cancelling datasource on demand.
973 ///
974 /// This value is used to cancel datasource on demand.
975 /// If not set, a default `CancellationToken` is used.
976 ///
977 /// # Parameters
978 ///
979 /// - `cancellation_token`: An instance of `CancellationToken`.
980 ///
981 /// # Example
982 ///
983 /// ```rust
984 /// use carbon_core::pipeline::PipelineBuilder;
985 /// use tokio_util::sync::CancellationToken;
986 ///
987 /// let builder = PipelineBuilder::new()
988 /// .datasource_cancellation_token(CancellationToken::new());
989 /// ```
990 pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
991 log::trace!(
992 "datasource_cancellation_token(self, cancellation_token: {:?})",
993 cancellation_token
994 );
995 self.datasource_cancellation_token = Some(cancellation_token);
996 self
997 }
998
999 /// Sets the size of the channel buffer for the pipeline.
1000 ///
1001 /// This value defines the maximum number of updates that can be queued in
1002 /// the pipeline's channel buffer. If not set, a default size of 10_000
1003 /// will be used.
1004 ///
1005 /// # Parameters
1006 ///
1007 /// - `size`: The size of the channel buffer for the pipeline.
1008 ///
1009 /// # Example
1010 ///
1011 /// ```rust
1012 /// use carbon_core::pipeline::PipelineBuilder;
1013 ///
1014 /// let builder = PipelineBuilder::new()
1015 /// .channel_buffer_size(1000);
1016 /// ```
1017 pub fn channel_buffer_size(mut self, size: usize) -> Self {
1018 log::trace!("channel_buffer_size(self, size: {:?})", size);
1019 self.channel_buffer_size = size;
1020 self
1021 }
1022
1023 /// Builds and returns a `Pipeline` configured with the specified
1024 /// components.
1025 ///
1026 /// After configuring the `PipelineBuilder` with data sources, pipes, and
1027 /// metrics, call this method to create the final `Pipeline` instance
1028 /// ready for operation.
1029 ///
1030 /// # Returns
1031 ///
1032 /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
1033 /// or an error if any part of the configuration is invalid.
1034 ///
1035 /// # Example
1036 ///
1037 /// ```ignore
1038 /// use std::sync::Arc;
1039 ///
1040 /// carbon_core::pipeline::Pipeline::builder()
1041 /// .datasource(transaction_crawler)
1042 /// .metrics(Arc::new(LogMetrics::new()))
1043 /// .metrics(Arc::new(PrometheusMetrics::new()))
1044 /// .instruction(
1045 /// TestProgramDecoder,
1046 /// TestProgramProcessor
1047 /// )
1048 /// .account(
1049 /// TestProgramDecoder,
1050 /// TestProgramAccountProcessor
1051 /// )
1052 /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
1053 /// .account_deletions(TestProgramAccountDeletionProcessor)
1054 /// .channel_buffer_size(1000)
1055 /// .build()?;
1056 ///
1057 /// Ok(())
1058 /// ```
1059 pub fn build(self) -> CarbonResult<Pipeline> {
1060 log::trace!("build(self)");
1061 Ok(Pipeline {
1062 datasources: self.datasources,
1063 account_pipes: self.account_pipes,
1064 account_deletion_pipes: self.account_deletion_pipes,
1065 block_details_pipes: self.block_details_pipes,
1066 instruction_pipes: self.instruction_pipes,
1067 transaction_pipes: self.transaction_pipes,
1068 shutdown_strategy: self.shutdown_strategy,
1069 metrics: Arc::new(self.metrics),
1070 metrics_flush_interval: self.metrics_flush_interval,
1071 datasource_cancellation_token: self.datasource_cancellation_token,
1072 channel_buffer_size: self.channel_buffer_size,
1073 })
1074 }
1075}