carbon_core/
pipeline.rs

1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//!   transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//!   and process specific types of data. Account pipes handle account updates,
25//!   instruction pipes process instructions within transactions, and
26//!   transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//!   times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//!   for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//!   deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//!   on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//!   Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//!   (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//!   wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//!   pipeline performance, especially in production environments.
52
53use {
54    crate::{
55        account::{
56            AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
57        },
58        account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
59        collection::InstructionDecoderCollection,
60        datasource::{AccountDeletion, Datasource, Update},
61        error::CarbonResult,
62        instruction::{
63            InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
64            InstructionsWithMetadata, NestedInstructions,
65        },
66        metrics::{Metrics, MetricsCollection},
67        processor::Processor,
68        schema::TransactionSchema,
69        transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
70        transformers,
71    },
72    core::time,
73    serde::de::DeserializeOwned,
74    std::{convert::TryInto, sync::Arc, time::Instant},
75    tokio_util::sync::CancellationToken,
76};
77
78/// Defines the shutdown behavior for the pipeline.
79///
80/// `ShutdownStrategy` determines how the pipeline will behave when it receives
81/// a shutdown signal. It supports two modes:
82///
83/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
84/// - `ProcessPending`: Terminates the data sources, then completes processing
85///   of any updates currently pending in the pipeline. This is the default
86///   behavior.
87///
88/// # Variants
89///
90/// - `Immediate`: Immediately stops all pipeline activity without processing
91///   any remaining updates.
92/// - `ProcessPending`: Gracefully terminates the data sources and allows the
93///   pipeline to finish processing updates that are still in progress or
94///   queued.
95///
96/// # Notes
97///
98/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
99///   that no updates are lost during shutdown.
100#[derive(Default, PartialEq, Debug)]
101pub enum ShutdownStrategy {
102    /// Stop the whole pipeline immediately.
103    Immediate,
104    /// Terminate the datasource(s) and finish processing all pending updates.
105    #[default]
106    ProcessPending,
107}
108
109/// The default size of the channel buffer for the pipeline.
110///
111/// This constant defines the default number of updates that can be queued in
112/// the pipeline's channel buffer. It is used as a fallback value if the
113/// `channel_buffer_size` is not explicitly set during pipeline construction.
114///
115/// The default size is 10,000 updates, which provides a reasonable balance
116pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
117
118/// Represents the primary data processing pipeline in the `carbon-core`
119/// framework.
120///
121/// The `Pipeline` struct is responsible for orchestrating the flow of data from
122/// various sources, processing it through multiple pipes (for accounts,
123/// transactions, instructions, and account deletions), and recording metrics at
124/// each stage. This flexible design allows for customized data processing,
125/// handling a variety of update types with minimal boilerplate code.
126///
127/// ## Overview
128///
129/// A `Pipeline` instance includes collections of data sources and processing
130/// pipes, enabling users to configure the pipeline to handle diverse types of
131/// blockchain-related data. Each pipe is responsible for decoding, processing,
132/// and routing specific data types, while the metrics system records relevant
133/// statistics.
134///
135/// ### Key Concepts
136///
137/// - **Datasources**: These provide the raw data, such as account updates,
138///   transaction details, and account deletions.
139/// - **Pipes**: Modular units that handle specific data types:
140///   - `AccountPipes` for account updates.
141///   - `AccountDeletionPipes` for account deletions.
142///   - `InstructionPipes` for instruction data within transactions.
143///   - `TransactionPipes` for entire transaction payloads.
144/// - **Metrics**: Collect performance data, enabling real-time insights and
145///   efficient monitoring.
146///
147/// ## Fields
148///
149/// - `datasources`: A vector of data sources (`Datasource` implementations)
150///   that provide the data for processing. Each data source must be wrapped in
151///   an `Arc` for safe, concurrent access.
152/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
153///   account updates.
154/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
155///   deletion events.
156/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
157///   instructions within transactions. These pipes work with nested
158///   instructions and are generically defined to support varied instruction
159///   types.
160/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
161///   processing complete transaction payloads.
162/// - `metrics`: A vector of `Metrics` implementations to record and track
163///   performance data. Each metrics instance is managed within an `Arc` to
164///   ensure thread safety.
165/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
166///   frequently metrics should be flushed. If `None`, the default interval is
167///   used.
168/// - `channel_buffer_size`: The size of the channel buffer for the pipeline.
169///   If not set, a default size of 10_000 will be used.
170///
171/// ## Example
172///
173/// ```rust
174///
175/// carbon_core::pipeline::Pipeline::builder()
176/// .datasource(transaction_crawler)
177/// .metrics(Arc::new(LogMetrics::new()))
178/// .metrics(Arc::new(PrometheusMetrics::new()))
179/// .instruction(
180///    TestProgramDecoder,
181///    TestProgramProcessor
182/// )
183/// .account(
184///     TestProgramDecoder,
185///     TestProgramAccountProcessor
186/// )
187/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
188/// .account_deletions(TestProgramAccountDeletionProcessor)
189/// .channel_buffer_size(1000)
190/// .build()?
191/// .run()
192/// .await?;
193/// ```
194///
195/// ## Notes
196///
197/// - Ensure that each data source and pipe implements the required traits, such
198///   as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
199/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
200///   `Box` types to handle shared ownership and trait object storage.
201/// - The `metrics_flush_interval` controls how frequently the pipeline's
202///   metrics are flushed. If `None`, a default interval (usually 5 seconds) is
203///   used.
204pub struct Pipeline {
205    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
206    pub account_pipes: Vec<Box<dyn AccountPipes>>,
207    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
208    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
209    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
210    pub metrics: Arc<MetricsCollection>,
211    pub metrics_flush_interval: Option<u64>,
212    pub datasource_cancellation_token: Option<CancellationToken>,
213    pub shutdown_strategy: ShutdownStrategy,
214    pub channel_buffer_size: usize,
215}
216
217impl Pipeline {
218    /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
219    ///
220    /// The `builder` method returns a `PipelineBuilder` that allows you to
221    /// configure and customize the pipeline components before building the
222    /// final `Pipeline` object. This approach provides a flexible and
223    /// type-safe way to assemble a pipeline by specifying data sources,
224    /// processing pipes, and metrics.
225    ///
226    /// # Example
227    ///
228    /// ```rust
229    /// carbon_core::pipeline::Pipeline::builder()
230    /// .datasource(transaction_crawler)
231    /// .metrics(Arc::new(LogMetrics::new()))
232    /// .metrics(Arc::new(PrometheusMetrics::new()))
233    /// .instruction(
234    ///    TestProgramDecoder,
235    ///    TestProgramProcessor
236    /// )
237    /// // ...
238    /// ```
239    ///
240    /// # Returns
241    ///
242    /// Returns a `PipelineBuilder` instance with empty collections for data
243    /// sources, pipes, and metrics. You can then configure each component
244    /// using the builder pattern.
245    pub fn builder() -> PipelineBuilder {
246        log::trace!("Pipeline::builder()");
247        PipelineBuilder {
248            datasources: Vec::new(),
249            account_pipes: Vec::new(),
250            account_deletion_pipes: Vec::new(),
251            instruction_pipes: Vec::new(),
252            transaction_pipes: Vec::new(),
253            metrics: MetricsCollection::default(),
254            metrics_flush_interval: None,
255            datasource_cancellation_token: None,
256            shutdown_strategy: ShutdownStrategy::default(),
257            channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
258        }
259    }
260
261    /// Runs the `Pipeline`, processing updates from data sources and handling
262    /// metrics.
263    ///
264    /// The `run` method initializes the pipeline’s metrics system and starts
265    /// listening for updates from the configured data sources. It checks
266    /// the types of updates provided by each data source to ensure that the
267    /// required data types are available for processing. The method then
268    /// enters a loop where it processes each update received from the data
269    /// sources in turn, logging and updating metrics based on the success
270    /// or failure of each operation.
271    ///
272    /// # How it Works
273    ///
274    /// - Initializes metrics and sets up an interval for periodic metric
275    ///   flushing.
276    /// - Spawns tasks for each data source to continuously consume updates.
277    /// - Processes updates according to their type (e.g., Account, Transaction,
278    ///   or AccountDeletion).
279    /// - Records performance metrics such as update processing times, and
280    ///   tracks success and failure counts.
281    ///
282    /// # Errors
283    ///
284    /// The method returns an `Err` variant if:
285    /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
286    ///   `Transaction`) are not provided by any data source, causing a mismatch
287    ///   in expected data processing capabilities.
288    /// - A data source encounters an error while consuming updates.
289    /// - An error occurs during metrics flushing or processing of updates.
290    ///
291    /// # Example
292    ///
293    /// ```rust
294    /// use carbon_core::Pipeline;
295    ///
296    /// let mut pipeline = Pipeline::builder()
297    ///     .datasource(MyDatasource::new())
298    ///     .metrics(MyMetrics::new())
299    ///     .build()
300    ///     .expect("Failed to build pipeline");
301    ///
302    /// // Running the pipeline asynchronously
303    /// tokio::spawn(async move {
304    ///     if let Err(e) = pipeline.run().await {
305    ///         eprintln!("Pipeline run error: {:?}", e);
306    ///     }
307    /// });
308    /// ```
309    ///
310    /// # Notes
311    ///
312    /// - This method is asynchronous and should be awaited within a Tokio
313    ///   runtime environment.
314    /// - The pipeline monitors metrics and flushes them based on the configured
315    ///   `metrics_flush_interval`.
316    /// - The `run` method operates in an infinite loop, handling updates until
317    ///   a termination condition occurs.
318    pub async fn run(&mut self) -> CarbonResult<()> {
319        log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
320            self.datasources.len(),
321            self.metrics.metrics.len(),
322            self.account_pipes.len(),
323            self.account_deletion_pipes.len(),
324            self.instruction_pipes.len(),
325            self.transaction_pipes.len(),
326        );
327
328        log::trace!("run(self)");
329
330        self.metrics.initialize_metrics().await?;
331        let (update_sender, mut update_receiver) =
332            tokio::sync::mpsc::channel::<Update>(self.channel_buffer_size);
333
334        let datasource_cancellation_token = self
335            .datasource_cancellation_token
336            .clone()
337            .unwrap_or_default();
338
339        for datasource in &self.datasources {
340            let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
341            let sender_clone = update_sender.clone();
342            let datasource_clone = Arc::clone(datasource);
343            let metrics_collection = self.metrics.clone();
344
345            tokio::spawn(async move {
346                if let Err(e) = datasource_clone
347                    .consume(
348                        &sender_clone,
349                        datasource_cancellation_token_clone,
350                        metrics_collection,
351                    )
352                    .await
353                {
354                    log::error!("error consuming datasource: {:?}", e);
355                }
356            });
357        }
358
359        let mut interval = tokio::time::interval(time::Duration::from_secs(
360            self.metrics_flush_interval.unwrap_or(5),
361        ));
362
363        loop {
364            tokio::select! {
365                _ = tokio::signal::ctrl_c() => {
366                    log::trace!("received SIGINT, shutting down.");
367                    datasource_cancellation_token.cancel();
368
369                    if self.shutdown_strategy == ShutdownStrategy::Immediate {
370                        log::info!("shutting down the pipeline immediately.");
371                        self.metrics.flush_metrics().await?;
372                        self.metrics.shutdown_metrics().await?;
373                        break;
374                    } else {
375                        log::info!("shutting down the pipeline after processing pending updates.");
376                    }
377                }
378                _ = interval.tick() => {
379                    self.metrics.flush_metrics().await?;
380                }
381                update = update_receiver.recv() => {
382                    match update {
383                        Some(update) => {
384                            self
385                                .metrics.increment_counter("updates_received", 1)
386                                .await?;
387
388                            let start = Instant::now();
389                            let process_result = self.process(update.clone()).await;
390                            let time_taken_nanoseconds = start.elapsed().as_nanos();
391                            let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
392
393                            self
394                                .metrics
395                                .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
396                                .await?;
397
398                            self
399                                .metrics
400                                .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
401                                .await?;
402
403                            match process_result {
404                                Ok(_) => {
405                                    self
406                                        .metrics.increment_counter("updates_successful", 1)
407                                        .await?;
408
409                                    log::trace!("processed update")
410                                }
411                                Err(error) => {
412                                    log::error!("error processing update ({:?}): {:?}", update, error);
413                                    self.metrics.increment_counter("updates_failed", 1).await?;
414                                }
415                            };
416
417                            self
418                                .metrics.increment_counter("updates_processed", 1)
419                                .await?;
420
421                            self
422                                .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
423                                .await?;
424                        }
425                        None => {
426                            log::info!("update_receiver closed, shutting down.");
427                            self.metrics.flush_metrics().await?;
428                            self.metrics.shutdown_metrics().await?;
429                            break;
430                        }
431                    }
432                }
433            }
434        }
435
436        log::info!("pipeline shutdown complete.");
437
438        Ok(())
439    }
440
441    /// Processes a single update and routes it through the appropriate pipeline
442    /// stages.
443    ///
444    /// The `process` method takes an `Update` and determines its type, then
445    /// routes it through the corresponding pipes for handling account
446    /// updates, transactions, or account deletions. It also records metrics
447    /// for processed updates, providing insights into the processing
448    /// workload and performance.
449    ///
450    /// ## Functionality
451    ///
452    /// - **Account Updates**: Passes account updates through the
453    ///   `account_pipes`. Each pipe processes the account metadata and the
454    ///   updated account state.
455    /// - **Transaction Updates**: Extracts transaction metadata and
456    ///   instructions, nests them if needed, and routes them through
457    ///   `instruction_pipes` and `transaction_pipes`.
458    /// - **Account Deletions**: Sends account deletion events through the
459    ///   `account_deletion_pipes`.
460    ///
461    /// The method also updates metrics counters for each type of update,
462    /// tracking how many updates have been processed in each category.
463    ///
464    /// # Parameters
465    ///
466    /// - `update`: An `Update` variant representing the type of data received.
467    ///   This can be an `Account`, `Transaction`, or `AccountDeletion`, each
468    ///   triggering different processing logic within the pipeline.
469    ///
470    /// # Returns
471    ///
472    /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
473    /// processing or an error if processing fails at any stage.
474    ///
475    /// # Notes
476    ///
477    /// - This method is asynchronous and should be awaited within a Tokio
478    ///   runtime.
479    /// - Each type of update (account, transaction, account deletion) requires
480    ///   its own set of pipes, so ensure that appropriate pipes are configured
481    ///   based on the data types expected from the data sources.
482    /// - Metrics are recorded after each successful processing stage to track
483    ///   processing volumes and identify potential bottlenecks in real-time.
484    ///
485    /// # Errors
486    ///
487    /// Returns an error if any of the pipes fail during processing, or if an
488    /// issue arises while incrementing counters or updating metrics. Handle
489    /// errors gracefully to ensure continuous pipeline operation.
490    async fn process(&mut self, update: Update) -> CarbonResult<()> {
491        log::trace!("process(self, update: {:?})", update);
492        match update {
493            Update::Account(account_update) => {
494                let account_metadata = AccountMetadata {
495                    slot: account_update.slot,
496                    pubkey: account_update.pubkey,
497                };
498
499                for pipe in self.account_pipes.iter_mut() {
500                    pipe.run(
501                        (account_metadata.clone(), account_update.account.clone()),
502                        self.metrics.clone(),
503                    )
504                    .await?;
505                }
506
507                self.metrics
508                    .increment_counter("account_updates_processed", 1)
509                    .await?;
510            }
511            Update::Transaction(transaction_update) => {
512                let transaction_metadata = &(*transaction_update).clone().try_into()?;
513
514                let instructions_with_metadata: InstructionsWithMetadata =
515                    transformers::extract_instructions_with_metadata(
516                        transaction_metadata,
517                        &transaction_update,
518                    )?;
519
520                let nested_instructions: NestedInstructions = instructions_with_metadata.into();
521
522                for pipe in self.instruction_pipes.iter_mut() {
523                    for nested_instruction in nested_instructions.iter() {
524                        pipe.run(nested_instruction, self.metrics.clone()).await?;
525                    }
526                }
527
528                for pipe in self.transaction_pipes.iter_mut() {
529                    pipe.run(
530                        transaction_metadata.clone(),
531                        &nested_instructions,
532                        self.metrics.clone(),
533                    )
534                    .await?;
535                }
536
537                self.metrics
538                    .increment_counter("transaction_updates_processed", 1)
539                    .await?;
540            }
541            Update::AccountDeletion(account_deletion) => {
542                for pipe in self.account_deletion_pipes.iter_mut() {
543                    pipe.run(account_deletion.clone(), self.metrics.clone())
544                        .await?;
545                }
546
547                self.metrics
548                    .increment_counter("account_deletions_processed", 1)
549                    .await?;
550            }
551        };
552
553        Ok(())
554    }
555}
556
557/// A builder for constructing a `Pipeline` instance with customized data
558/// sources, processing pipes, and metrics.
559///
560/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
561/// by allowing configuration of its components, such as data sources, account
562/// and transaction pipes, deletion handling, and metrics. Using the builder
563/// pattern, you can add the desired elements incrementally and then finalize
564/// with a call to `build`.
565///
566/// ## Overview
567///
568/// The `PipelineBuilder` supports the following components:
569/// - **Datasources**: Sources of data updates, such as account information and
570///   transactions.
571/// - **Account Pipes**: For processing account updates from data sources.
572/// - **Account Deletion Pipes**: For handling account deletion updates.
573/// - **Instruction Pipes**: For handling instructions associated with
574///   transactions.
575/// - **Transaction Pipes**: For handling full transaction data.
576/// - **Metrics**: Collects and reports performance data, such as update
577///   processing times.
578/// - **Metrics Flush Interval**: Optional interval defining how often to flush
579///   metrics data.
580///
581/// Each component can be added through method chaining, enhancing code
582/// readability and maintainability.
583///
584/// # Example
585///
586/// ```rust
587/// carbon_core::pipeline::Pipeline::builder()
588/// .datasource(transaction_crawler)
589/// .metrics(Arc::new(LogMetrics::new()))
590/// .metrics(Arc::new(PrometheusMetrics::new()))
591/// .instruction(
592///    TestProgramDecoder,
593///    TestProgramProcessor
594/// )
595/// // ...
596/// ```
597///
598/// # Fields
599///
600/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
601///   shared ownership across threads. Each `Datasource` provides updates to the
602///   pipeline.
603/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
604/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
605///   processing account deletions.
606/// - `instruction_pipes`: A collection of `InstructionPipes` to process
607///   instructions in transactions.
608/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
609///   transaction data.
610/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
611///   performance.
612/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
613///   metrics data. If not set, a default flush interval will be used.
614/// - `datasource_cancellation_token`: An optional `CancellationToken` for
615///   canceling datasource. If not set, a default `CancellationToken` will be used.
616/// - `channel_buffer_size`: The size of the channel buffer for the pipeline.
617///   If not set, a default size of 10_000 will be used.
618///
619/// # Returns
620///
621/// After configuring the builder, call `build` to create a `Pipeline` instance.
622/// The builder will return a `CarbonResult<Pipeline>`, which will either
623/// contain the configured pipeline or an error if configuration failed.
624///
625/// # Notes
626///
627/// - The builder pattern allows for method chaining, making it easy to
628///   incrementally add components to the `Pipeline`.
629/// - Ensure that each component matches the data and update types expected by
630///   your application.
631#[derive(Default)]
632pub struct PipelineBuilder {
633    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
634    pub account_pipes: Vec<Box<dyn AccountPipes>>,
635    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
636    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
637    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
638    pub metrics: MetricsCollection,
639    pub metrics_flush_interval: Option<u64>,
640    pub datasource_cancellation_token: Option<CancellationToken>,
641    pub shutdown_strategy: ShutdownStrategy,
642    pub channel_buffer_size: usize,
643}
644
645impl PipelineBuilder {
646    /// Creates a new `PipelineBuilder` with empty collections for datasources,
647    /// pipes, and metrics.
648    ///
649    /// This method initializes a `PipelineBuilder` instance, allowing you to
650    /// configure each component of a `Pipeline` before building it. The
651    /// builder pattern offers flexibility in adding data sources, account
652    /// and transaction handling pipes, deletion processing, and metrics
653    /// collection features.
654    ///
655    /// # Example
656    ///
657    /// ```rust
658    /// let builder = PipelineBuilder::new();
659    /// ```
660    pub fn new() -> Self {
661        log::trace!("PipelineBuilder::new()");
662        Self::default()
663    }
664
665    /// Adds a datasource to the pipeline.
666    ///
667    /// The datasource is responsible for providing updates, such as account and
668    /// transaction data, to the pipeline. Multiple datasources can be added
669    /// to handle various types of updates.
670    ///
671    /// # Parameters
672    ///
673    /// - `datasource`: The data source to add, implementing the `Datasource`
674    ///   trait.
675    ///
676    /// # Example
677    ///
678    /// ```rust
679    /// let builder = PipelineBuilder::new()
680    ///     .datasource(MyDatasource::new());
681    /// ```
682    pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
683        log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
684        self.datasources.push(Arc::new(datasource));
685        self
686    }
687
688    /// Sets the shutdown strategy for the pipeline.
689    ///
690    /// This method configures how the pipeline should handle shutdowns. The
691    /// shutdown strategy defines whether the pipeline should terminate
692    /// immediately or continue processing pending updates after terminating
693    /// the data sources.
694    ///
695    /// # Parameters
696    ///
697    /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
698    ///   how the pipeline should handle shutdowns.
699    ///
700    /// # Returns
701    ///
702    /// Returns `Self`, allowing for method chaining.
703    ///
704    /// # Notes
705    ///
706    /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
707    ///   instantly, including all active processing tasks.
708    /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
709    ///   sources first and allow the pipeline to finish processing any updates
710    ///   that are still pending.
711    pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
712        log::trace!(
713            "shutdown_strategy(self, shutdown_strategy: {:?})",
714            shutdown_strategy
715        );
716        self.shutdown_strategy = shutdown_strategy;
717        self
718    }
719
720    /// Adds an account pipe to process account updates.
721    ///
722    /// Account pipes decode and process updates to accounts within the
723    /// pipeline. This method requires both an `AccountDecoder` and a
724    /// `Processor` to handle decoded account data.
725    ///
726    /// # Parameters
727    ///
728    /// - `decoder`: An `AccountDecoder` that decodes the account data.
729    /// - `processor`: A `Processor` that processes the decoded account data.
730    ///
731    /// # Example
732    ///
733    /// ```rust
734    /// let builder = PipelineBuilder::new()
735    ///     .account(MyAccountDecoder, MyAccountProcessor);
736    /// ```
737    pub fn account<T: Send + Sync + 'static>(
738        mut self,
739        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
740        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
741    ) -> Self {
742        log::trace!(
743            "account(self, decoder: {:?}, processor: {:?})",
744            stringify!(decoder),
745            stringify!(processor)
746        );
747        self.account_pipes.push(Box::new(AccountPipe {
748            decoder: Box::new(decoder),
749            processor: Box::new(processor),
750        }));
751        self
752    }
753
754    /// Adds an account deletion pipe to handle account deletion events.
755    ///
756    /// Account deletion pipes process deletions of accounts, with a `Processor`
757    /// to handle the deletion events as they occur.
758    ///
759    /// # Parameters
760    ///
761    /// - `processor`: A `Processor` that processes account deletion events.
762    ///
763    /// # Example
764    ///
765    /// ```rust
766    /// let builder = PipelineBuilder::new()
767    ///     .account_deletions(MyAccountDeletionProcessor);
768    /// ```
769    pub fn account_deletions(
770        mut self,
771        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
772    ) -> Self {
773        log::trace!(
774            "account_deletions(self, processor: {:?})",
775            stringify!(processor)
776        );
777        self.account_deletion_pipes
778            .push(Box::new(AccountDeletionPipe {
779                processor: Box::new(processor),
780            }));
781        self
782    }
783
784    /// Adds an instruction pipe to process instructions within transactions.
785    ///
786    /// Instruction pipes decode and process individual instructions,
787    /// enabling specialized handling of various instruction types.
788    ///
789    /// # Parameters
790    ///
791    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
792    ///   transaction data.
793    /// - `processor`: A `Processor` that processes decoded instruction data.
794    ///
795    /// # Example
796    ///
797    /// ```rust
798    /// let builder = PipelineBuilder::new()
799    ///     .instruction(MyDecoder, MyInstructionProcessor);
800    /// ```
801    pub fn instruction<T: Send + Sync + 'static>(
802        mut self,
803        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
804        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
805    ) -> Self {
806        log::trace!(
807            "instruction(self, decoder: {:?}, processor: {:?})",
808            stringify!(decoder),
809            stringify!(processor)
810        );
811        self.instruction_pipes.push(Box::new(InstructionPipe {
812            decoder: Box::new(decoder),
813            processor: Box::new(processor),
814        }));
815        self
816    }
817
818    /// Adds a transaction pipe for processing full transaction data.
819    ///
820    /// This method requires a transaction schema for decoding and a `Processor`
821    /// to handle the processed transaction data.
822    ///
823    /// # Parameters
824    ///
825    /// - `schema`: A `TransactionSchema` used to match and interpret
826    ///   transaction data.
827    /// - `processor`: A `Processor` that processes the decoded transaction
828    ///   data.
829    ///
830    /// # Example
831    ///
832    /// ```rust
833    /// let builder = PipelineBuilder::new()
834    ///     .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
835    /// ```
836    pub fn transaction<T, U>(
837        mut self,
838        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
839            + Send
840            + Sync
841            + 'static,
842        schema: Option<TransactionSchema<T>>,
843    ) -> Self
844    where
845        T: InstructionDecoderCollection + 'static,
846        U: DeserializeOwned + Send + Sync + 'static,
847    {
848        log::trace!(
849            "transaction(self, schema: {:?}, processor: {:?})",
850            stringify!(schema),
851            stringify!(processor)
852        );
853        self.transaction_pipes
854            .push(Box::new(TransactionPipe::<T, U>::new(schema, processor)));
855        self
856    }
857
858    /// Adds a metrics component to the pipeline for performance tracking.
859    ///
860    /// This component collects and reports on pipeline metrics, providing
861    /// insights into performance and operational statistics.
862    ///
863    /// # Parameters
864    ///
865    /// - `metrics`: An instance of a `Metrics` implementation, used to gather
866    ///   and report metrics.
867    ///
868    /// # Example
869    ///
870    /// ```rust
871    /// let builder = PipelineBuilder::new()
872    ///     .metrics(Arc::new(LogMetrics::new()));
873    /// ```
874    pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
875        log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
876        self.metrics.metrics.push(metrics);
877        self
878    }
879
880    /// Sets the interval for flushing metrics data.
881    ///
882    /// This value defines the frequency, in seconds, at which metrics data is
883    /// flushed from memory. If not set, a default interval is used.
884    ///
885    /// # Parameters
886    ///
887    /// - `interval`: The flush interval for metrics, in seconds.
888    ///
889    /// # Example
890    ///
891    /// ```rust
892    /// let builder = PipelineBuilder::new()
893    ///     .metrics_flush_interval(60);
894    /// ```
895    pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
896        log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
897        self.metrics_flush_interval = Some(interval);
898        self
899    }
900
901    /// Sets the cancellation token for cancelling datasource on demand.
902    ///
903    /// This value is used to cancel datasource on demand.
904    /// If not set, a default `CancellationToken` is used.
905    ///
906    /// # Parameters
907    ///
908    /// - `cancellation_token`: An instance of `CancellationToken`.
909    ///
910    /// # Example
911    ///
912    /// ```rust
913    /// let builder = PipelineBuilder::new()
914    ///     .datasource_cancellation_token(CancellationToken::new());
915    /// ```
916    pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
917        log::trace!(
918            "datasource_cancellation_token(self, cancellation_token: {:?})",
919            cancellation_token
920        );
921        self.datasource_cancellation_token = Some(cancellation_token);
922        self
923    }
924
925    /// Sets the size of the channel buffer for the pipeline.
926    ///
927    /// This value defines the maximum number of updates that can be queued in
928    /// the pipeline's channel buffer. If not set, a default size of 10_000
929    /// will be used.
930    ///
931    /// # Parameters
932    ///
933    /// - `size`: The size of the channel buffer for the pipeline.
934    ///
935    /// # Example
936    ///
937    /// ```rust
938    /// let builder = PipelineBuilder::new()
939    ///     .channel_buffer_size(1000);
940    /// ```
941    pub fn channel_buffer_size(mut self, size: usize) -> Self {
942        log::trace!("channel_buffer_size(self, size: {:?})", size);
943        self.channel_buffer_size = size;
944        self
945    }
946
947    /// Builds and returns a `Pipeline` configured with the specified
948    /// components.
949    ///
950    /// After configuring the `PipelineBuilder` with data sources, pipes, and
951    /// metrics, call this method to create the final `Pipeline` instance
952    /// ready for operation.
953    ///
954    /// # Returns
955    ///
956    /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
957    /// or an error if any part of the configuration is invalid.
958    ///
959    /// # Example
960    ///
961    /// ```rust
962    /// carbon_core::pipeline::Pipeline::builder()
963    /// .datasource(transaction_crawler)
964    /// .metrics(Arc::new(LogMetrics::new()))
965    /// .metrics(Arc::new(PrometheusMetrics::new()))
966    /// .instruction(
967    ///    TestProgramDecoder,
968    ///    TestProgramProcessor
969    /// )
970    /// .account(
971    ///     TestProgramDecoder,
972    ///     TestProgramAccountProcessor
973    /// )
974    /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
975    /// .account_deletions(TestProgramAccountDeletionProcessor)
976    /// .channel_buffer_size(1000)
977    /// .build()?
978    /// ```
979    pub fn build(self) -> CarbonResult<Pipeline> {
980        log::trace!("build(self)");
981        Ok(Pipeline {
982            datasources: self.datasources,
983            account_pipes: self.account_pipes,
984            account_deletion_pipes: self.account_deletion_pipes,
985            instruction_pipes: self.instruction_pipes,
986            transaction_pipes: self.transaction_pipes,
987            shutdown_strategy: self.shutdown_strategy,
988            metrics: Arc::new(self.metrics),
989            metrics_flush_interval: self.metrics_flush_interval,
990            datasource_cancellation_token: self.datasource_cancellation_token,
991            channel_buffer_size: self.channel_buffer_size,
992        })
993    }
994}