carbon_core/
pipeline.rs

1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//!   transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//!   and process specific types of data. Account pipes handle account updates,
25//!   instruction pipes process instructions within transactions, and
26//!   transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//!   times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//!   for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//!   deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//!   on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//!   Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//!   (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//!   wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//!   pipeline performance, especially in production environments.
52
53use crate::block_details::{BlockDetailsPipe, BlockDetailsPipes};
54use crate::datasource::{BlockDetails, DatasourceId};
55use crate::filter::Filter;
56use {
57    crate::{
58        account::{
59            AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
60        },
61        account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
62        collection::InstructionDecoderCollection,
63        datasource::{AccountDeletion, Datasource, Update},
64        error::CarbonResult,
65        instruction::{
66            InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
67            InstructionsWithMetadata, NestedInstructions,
68        },
69        metrics::{Metrics, MetricsCollection},
70        processor::Processor,
71        schema::TransactionSchema,
72        transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
73        transformers,
74    },
75    core::time,
76    serde::de::DeserializeOwned,
77    std::{convert::TryInto, sync::Arc, time::Instant},
78    tokio_util::sync::CancellationToken,
79};
80
81/// Defines the shutdown behavior for the pipeline.
82///
83/// `ShutdownStrategy` determines how the pipeline will behave when it receives
84/// a shutdown signal. It supports two modes:
85///
86/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
87/// - `ProcessPending`: Terminates the data sources, then completes processing
88///   of any updates currently pending in the pipeline. This is the default
89///   behavior.
90///
91/// # Variants
92///
93/// - `Immediate`: Immediately stops all pipeline activity without processing
94///   any remaining updates.
95/// - `ProcessPending`: Gracefully terminates the data sources and allows the
96///   pipeline to finish processing updates that are still in progress or
97///   queued.
98///
99/// # Notes
100///
101/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
102///   that no updates are lost during shutdown.
103#[derive(Default, PartialEq, Debug)]
104pub enum ShutdownStrategy {
105    /// Stop the whole pipeline immediately.
106    Immediate,
107    /// Terminate the datasource(s) and finish processing all pending updates.
108    #[default]
109    ProcessPending,
110}
111
112/// The default size of the channel buffer for the pipeline.
113///
114/// This constant defines the default number of updates that can be queued in
115/// the pipeline's channel buffer. It is used as a fallback value if the
116/// `channel_buffer_size` is not explicitly set during pipeline construction.
117///
118/// The default size is 10,000 updates, which provides a reasonable balance
119pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
120
121/// Represents the primary data processing pipeline in the `carbon-core`
122/// framework.
123///
124/// The `Pipeline` struct is responsible for orchestrating the flow of data from
125/// various sources, processing it through multiple pipes (for accounts,
126/// transactions, instructions, and account deletions), and recording metrics at
127/// each stage. This flexible design allows for customized data processing,
128/// handling a variety of update types with minimal boilerplate code.
129///
130/// ## Overview
131///
132/// A `Pipeline` instance includes collections of data sources and processing
133/// pipes, enabling users to configure the pipeline to handle diverse types of
134/// blockchain-related data. Each pipe is responsible for decoding, processing,
135/// and routing specific data types, while the metrics system records relevant
136/// statistics.
137///
138/// ### Key Concepts
139///
140/// - **Datasources**: These provide the raw data, such as account updates,
141///   transaction details, and account deletions.
142/// - **Pipes**: Modular units that handle specific data types:
143///   - `AccountPipes` for account updates.
144///   - `AccountDeletionPipes` for account deletions.
145///   - `InstructionPipes` for instruction data within transactions.
146///   - `TransactionPipes` for entire transaction payloads.
147/// - **Metrics**: Collect performance data, enabling real-time insights and
148///   efficient monitoring.
149///
150/// ## Fields
151///
152/// - `datasources`: A vector of data sources (`Datasource` implementations)
153///   that provide the data for processing. Each data source is paired with a
154///   unique `DatasourceId` for identification and filtering purposes. Each data
155///   source must be wrapped in an `Arc` for safe, concurrent access.
156/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
157///   account updates.
158/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
159///   deletion events.
160/// - `block_details_pipes`: A vector of `BlockDetailsPipes` to handle
161///   block details.
162/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
163///   instructions within transactions. These pipes work with nested
164///   instructions and are generically defined to support varied instruction
165///   types.
166/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
167///   processing complete transaction payloads.
168/// - `metrics`: A vector of `Metrics` implementations to record and track
169///   performance data. Each metrics instance is managed within an `Arc` to
170///   ensure thread safety.
171/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
172///   frequently metrics should be flushed. If `None`, the default interval is
173///   used.
174/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
175///   not set, a default size of 10_000 will be used.
176///
177/// ## Example
178///
179/// ```ignore
180/// use std::sync::Arc;
181///
182/// carbon_core::pipeline::Pipeline::builder()
183/// .datasource(transaction_crawler)
184/// .metrics(Arc::new(LogMetrics::new()))
185/// .metrics(Arc::new(PrometheusMetrics::new()))
186/// .instruction(
187///    TestProgramDecoder,
188///    TestProgramProcessor
189/// )
190/// .account(
191///     TestProgramDecoder,
192///     TestProgramAccountProcessor
193/// )
194/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
195/// .account_deletions(TestProgramAccountDeletionProcessor)
196/// .channel_buffer_size(1000)
197/// .build()?
198/// .run()
199/// .await?;
200/// ```
201///
202/// ## Notes
203///
204/// - Ensure that each data source and pipe implements the required traits, such
205///   as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
206/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
207///   `Box` types to handle shared ownership and trait object storage.
208/// - The `metrics_flush_interval` controls how frequently the pipeline's
209///   metrics are flushed. If `None`, a default interval (usually 5 seconds) is
210///   used.
211pub struct Pipeline {
212    pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
213    pub account_pipes: Vec<Box<dyn AccountPipes>>,
214    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
215    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
216    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
217    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
218    pub metrics: Arc<MetricsCollection>,
219    pub metrics_flush_interval: Option<u64>,
220    pub datasource_cancellation_token: Option<CancellationToken>,
221    pub shutdown_strategy: ShutdownStrategy,
222    pub channel_buffer_size: usize,
223}
224
225impl Pipeline {
226    /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
227    ///
228    /// The `builder` method returns a `PipelineBuilder` that allows you to
229    /// configure and customize the pipeline components before building the
230    /// final `Pipeline` object. This approach provides a flexible and
231    /// type-safe way to assemble a pipeline by specifying data sources,
232    /// processing pipes, and metrics.
233    ///
234    /// # Example
235    ///
236    /// ```ignore
237    /// use std::sync::Arc;
238    ///
239    /// carbon_core::pipeline::Pipeline::builder()
240    /// .datasource(transaction_crawler)
241    /// .metrics(Arc::new(LogMetrics::new()))
242    /// .metrics(Arc::new(PrometheusMetrics::new()))
243    /// .instruction(
244    ///    TestProgramDecoder,
245    ///    TestProgramProcessor
246    /// );
247    /// // ...
248    /// ```
249    ///
250    /// # Returns
251    ///
252    /// Returns a `PipelineBuilder` instance with empty collections for data
253    /// sources, pipes, and metrics. You can then configure each component
254    /// using the builder pattern.
255    pub fn builder() -> PipelineBuilder {
256        log::trace!("Pipeline::builder()");
257        PipelineBuilder {
258            datasources: Vec::new(),
259            account_pipes: Vec::new(),
260            account_deletion_pipes: Vec::new(),
261            block_details_pipes: Vec::new(),
262            instruction_pipes: Vec::new(),
263            transaction_pipes: Vec::new(),
264            metrics: MetricsCollection::default(),
265            metrics_flush_interval: None,
266            datasource_cancellation_token: None,
267            shutdown_strategy: ShutdownStrategy::default(),
268            channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
269        }
270    }
271
272    /// Runs the `Pipeline`, processing updates from data sources and handling
273    /// metrics.
274    ///
275    /// The `run` method initializes the pipeline's metrics system and starts
276    /// listening for updates from the configured data sources. It checks
277    /// the types of updates provided by each data source to ensure that the
278    /// required data types are available for processing. The method then
279    /// enters a loop where it processes each update received from the data
280    /// sources in turn, logging and updating metrics based on the success
281    /// or failure of each operation.
282    ///
283    /// # How it Works
284    ///
285    /// - Initializes metrics and sets up an interval for periodic metric
286    ///   flushing.
287    /// - Spawns tasks for each data source to continuously consume updates.
288    /// - Processes updates according to their type (e.g., Account, Transaction,
289    ///   or AccountDeletion).
290    /// - Records performance metrics such as update processing times, and
291    ///   tracks success and failure counts.
292    ///
293    /// # Errors
294    ///
295    /// The method returns an `Err` variant if:
296    /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
297    ///   `Transaction`) are not provided by any data source, causing a mismatch
298    ///   in expected data processing capabilities.
299    /// - A data source encounters an error while consuming updates.
300    /// - An error occurs during metrics flushing or processing of updates.
301    ///
302    /// # Example
303    ///
304    /// ```ignore
305    /// use carbon_core::pipeline::Pipeline;
306    ///
307    /// let mut pipeline = Pipeline::builder()
308    ///     .datasource(MyDatasource::new())
309    ///     .metrics(MyMetrics::new())
310    ///     .build()
311    ///     .expect("Failed to build pipeline");
312    ///
313    /// // Running the pipeline asynchronously
314    /// tokio::spawn(async move {
315    ///     if let Err(e) = pipeline.run().await {
316    ///         eprintln!("Pipeline run error: {:?}", e);
317    ///     }
318    /// });
319    /// ```
320    ///
321    /// # Notes
322    ///
323    /// - This method is asynchronous and should be awaited within a Tokio
324    ///   runtime environment.
325    /// - The pipeline monitors metrics and flushes them based on the configured
326    ///   `metrics_flush_interval`.
327    /// - The `run` method operates in an infinite loop, handling updates until
328    ///   a termination condition occurs.
329    pub async fn run(&mut self) -> CarbonResult<()> {
330        log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
331            self.datasources.len(),
332            self.metrics.metrics.len(),
333            self.account_pipes.len(),
334            self.account_deletion_pipes.len(),
335            self.instruction_pipes.len(),
336            self.transaction_pipes.len(),
337        );
338
339        log::trace!("run(self)");
340
341        self.metrics.initialize_metrics().await?;
342        let (update_sender, mut update_receiver) =
343            tokio::sync::mpsc::channel::<(Update, DatasourceId)>(self.channel_buffer_size);
344
345        let datasource_cancellation_token = self
346            .datasource_cancellation_token
347            .clone()
348            .unwrap_or_default();
349
350        for datasource in &self.datasources {
351            let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
352            let sender_clone = update_sender.clone();
353            let datasource_clone = Arc::clone(&datasource.1);
354            let datasource_id = datasource.0.clone();
355            let metrics_collection = self.metrics.clone();
356
357            tokio::spawn(async move {
358                if let Err(e) = datasource_clone
359                    .consume(
360                        datasource_id,
361                        sender_clone,
362                        datasource_cancellation_token_clone,
363                        metrics_collection,
364                    )
365                    .await
366                {
367                    log::error!("error consuming datasource: {:?}", e);
368                }
369            });
370        }
371
372        drop(update_sender);
373
374        let mut interval = tokio::time::interval(time::Duration::from_secs(
375            self.metrics_flush_interval.unwrap_or(5),
376        ));
377
378        loop {
379            tokio::select! {
380                _ = datasource_cancellation_token.cancelled() => {
381                    log::trace!("datasource cancellation token cancelled, shutting down.");
382                    self.metrics.flush_metrics().await?;
383                    self.metrics.shutdown_metrics().await?;
384                    break;
385                }
386                _ = tokio::signal::ctrl_c() => {
387                    log::trace!("received SIGINT, shutting down.");
388                    datasource_cancellation_token.cancel();
389
390                    if self.shutdown_strategy == ShutdownStrategy::Immediate {
391                        log::info!("shutting down the pipeline immediately.");
392                        self.metrics.flush_metrics().await?;
393                        self.metrics.shutdown_metrics().await?;
394                        break;
395                    } else {
396                        log::info!("shutting down the pipeline after processing pending updates.");
397                    }
398                }
399                _ = interval.tick() => {
400                    self.metrics.flush_metrics().await?;
401                }
402                update = update_receiver.recv() => {
403                    match update {
404                        Some((update, datasource_id)) => {
405                            self
406                                .metrics.increment_counter("updates_received", 1)
407                                .await?;
408
409                            let start = Instant::now();
410                            let process_result = self.process(update.clone(), datasource_id.clone()).await;
411                            let time_taken_nanoseconds = start.elapsed().as_nanos();
412                            let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
413
414                            self
415                                .metrics
416                                .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
417                                .await?;
418
419                            self
420                                .metrics
421                                .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
422                                .await?;
423
424                            match process_result {
425                                Ok(_) => {
426                                    self
427                                        .metrics.increment_counter("updates_successful", 1)
428                                        .await?;
429
430                                    log::trace!("processed update")
431                                }
432                                Err(error) => {
433                                    log::error!("error processing update ({:?}): {:?}", update, error);
434                                    self.metrics.increment_counter("updates_failed", 1).await?;
435                                }
436                            };
437
438                            self
439                                .metrics.increment_counter("updates_processed", 1)
440                                .await?;
441
442                            self
443                                .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
444                                .await?;
445                        }
446                        None => {
447                            log::info!("update_receiver closed, shutting down.");
448                            self.metrics.flush_metrics().await?;
449                            self.metrics.shutdown_metrics().await?;
450                            break;
451                        }
452                    }
453                }
454            }
455        }
456
457        log::info!("pipeline shutdown complete.");
458
459        Ok(())
460    }
461
462    /// Processes a single update and routes it through the appropriate pipeline
463    /// stages.
464    ///
465    /// The `process` method takes an `Update` and determines its type, then
466    /// routes it through the corresponding pipes for handling account
467    /// updates, transactions, or account deletions. It also records metrics
468    /// for processed updates, providing insights into the processing
469    /// workload and performance.
470    ///
471    /// ## Functionality
472    ///
473    /// - **Account Updates**: Passes account updates through the
474    ///   `account_pipes`. Each pipe processes the account metadata and the
475    ///   updated account state.
476    /// - **Transaction Updates**: Extracts transaction metadata and
477    ///   instructions, nests them if needed, and routes them through
478    ///   `instruction_pipes` and `transaction_pipes`.
479    /// - **Account Deletions**: Sends account deletion events through the
480    ///   `account_deletion_pipes`.
481    ///
482    /// The method also updates metrics counters for each type of update,
483    /// tracking how many updates have been processed in each category.
484    ///
485    /// # Parameters
486    ///
487    /// - `update`: An `Update` variant representing the type of data received.
488    ///   This can be an `Account`, `Transaction`, or `AccountDeletion`, each
489    ///   triggering different processing logic within the pipeline.
490    /// - `datasource_id`: The ID of the datasource that produced this update.
491    ///   This is used by filters to determine whether the update should be
492    ///   processed by specific pipes.
493    ///
494    /// # Returns
495    ///
496    /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
497    /// processing or an error if processing fails at any stage.
498    ///
499    /// # Notes
500    ///
501    /// - This method is asynchronous and should be awaited within a Tokio
502    ///   runtime.
503    /// - Each type of update (account, transaction, account deletion) requires
504    ///   its own set of pipes, so ensure that appropriate pipes are configured
505    ///   based on the data types expected from the data sources.
506    /// - Metrics are recorded after each successful processing stage to track
507    ///   processing volumes and identify potential bottlenecks in real-time.
508    /// - Filters are applied to each pipe before processing, allowing for
509    ///   selective update processing based on datasource ID and other criteria.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if any of the pipes fail during processing, or if an
514    /// issue arises while incrementing counters or updating metrics. Handle
515    /// errors gracefully to ensure continuous pipeline operation.
516    async fn process(&mut self, update: Update, datasource_id: DatasourceId) -> CarbonResult<()> {
517        log::trace!(
518            "process(self, update: {:?}, datasource_id: {:?})",
519            update,
520            datasource_id
521        );
522        match update {
523            Update::Account(account_update) => {
524                let account_metadata = AccountMetadata {
525                    slot: account_update.slot,
526                    pubkey: account_update.pubkey,
527                    transaction_signature: account_update.transaction_signature,
528                };
529
530                for pipe in self.account_pipes.iter_mut() {
531                    if pipe.filters().iter().all(|filter| {
532                        filter.filter_account(
533                            &datasource_id,
534                            &account_metadata,
535                            &account_update.account,
536                        )
537                    }) {
538                        pipe.run(
539                            (account_metadata.clone(), account_update.account.clone()),
540                            self.metrics.clone(),
541                        )
542                        .await?;
543                    }
544                }
545
546                self.metrics
547                    .increment_counter("account_updates_processed", 1)
548                    .await?;
549            }
550            Update::Transaction(transaction_update) => {
551                let transaction_metadata = Arc::new((*transaction_update).clone().try_into()?);
552
553                let instructions_with_metadata: InstructionsWithMetadata =
554                    transformers::extract_instructions_with_metadata(
555                        &transaction_metadata,
556                        &transaction_update,
557                    )?;
558
559                let nested_instructions: NestedInstructions = instructions_with_metadata.into();
560
561                for pipe in self.instruction_pipes.iter_mut() {
562                    for nested_instruction in nested_instructions.iter() {
563                        if pipe.filters().iter().all(|filter| {
564                            filter.filter_instruction(&datasource_id, nested_instruction)
565                        }) {
566                            pipe.run(nested_instruction, self.metrics.clone()).await?;
567                        }
568                    }
569                }
570
571                for pipe in self.transaction_pipes.iter_mut() {
572                    if pipe.filters().iter().all(|filter| {
573                        filter.filter_transaction(
574                            &datasource_id,
575                            &transaction_metadata,
576                            &nested_instructions,
577                        )
578                    }) {
579                        pipe.run(
580                            transaction_metadata.clone(),
581                            &nested_instructions,
582                            self.metrics.clone(),
583                        )
584                        .await?;
585                    }
586                }
587
588                self.metrics
589                    .increment_counter("transaction_updates_processed", 1)
590                    .await?;
591            }
592            Update::AccountDeletion(account_deletion) => {
593                for pipe in self.account_deletion_pipes.iter_mut() {
594                    if pipe.filters().iter().all(|filter| {
595                        filter.filter_account_deletion(&datasource_id, &account_deletion)
596                    }) {
597                        pipe.run(account_deletion.clone(), self.metrics.clone())
598                            .await?;
599                    }
600                }
601
602                self.metrics
603                    .increment_counter("account_deletions_processed", 1)
604                    .await?;
605            }
606            Update::BlockDetails(block_details) => {
607                for pipe in self.block_details_pipes.iter_mut() {
608                    if pipe
609                        .filters()
610                        .iter()
611                        .all(|filter| filter.filter_block_details(&datasource_id, &block_details))
612                    {
613                        pipe.run(block_details.clone(), self.metrics.clone())
614                            .await?;
615                    }
616                }
617
618                self.metrics
619                    .increment_counter("block_details_processed", 1)
620                    .await?;
621            }
622        };
623
624        Ok(())
625    }
626}
627
628/// A builder for constructing a `Pipeline` instance with customized data
629/// sources, processing pipes, and metrics.
630///
631/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
632/// by allowing configuration of its components, such as data sources, account
633/// and transaction pipes, deletion handling, and metrics. Using the builder
634/// pattern, you can add the desired elements incrementally and then finalize
635/// with a call to `build`.
636///
637/// ## Overview
638///
639/// The `PipelineBuilder` supports the following components:
640/// - **Datasources**: Sources of data updates, such as account information and
641///   transactions.
642/// - **Account Pipes**: For processing account updates from data sources.
643/// - **Account Deletion Pipes**: For handling account deletion updates.
644/// - **Instruction Pipes**: For handling instructions associated with
645///   transactions.
646/// - **Transaction Pipes**: For handling full transaction data.
647/// - **Metrics**: Collects and reports performance data, such as update
648///   processing times.
649/// - **Metrics Flush Interval**: Optional interval defining how often to flush
650///   metrics data.
651///
652/// Each component can be added through method chaining, enhancing code
653/// readability and maintainability.
654///
655/// # Example
656///
657/// ```ignore
658/// use std::sync::Arc;
659///
660/// carbon_core::pipeline::Pipeline::builder()
661/// .datasource(transaction_crawler)
662/// .metrics(Arc::new(LogMetrics::new()))
663/// .metrics(Arc::new(PrometheusMetrics::new()))
664/// .instruction(
665///    TestProgramDecoder,
666///    TestProgramProcessor
667/// );
668/// // ...
669/// ```
670///
671/// # Fields
672///
673/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
674///   shared ownership across threads. Each `Datasource` provides updates to the
675///   pipeline.
676/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
677/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
678///   processing account deletions.
679/// - `instruction_pipes`: A collection of `InstructionPipes` to process
680///   instructions in transactions.
681/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
682///   transaction data.
683/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
684///   performance.
685/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
686///   metrics data. If not set, a default flush interval will be used.
687/// - `datasource_cancellation_token`: An optional `CancellationToken` for
688///   canceling datasource. If not set, a default `CancellationToken` will be
689///   used.
690/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
691///   not set, a default size of 10_000 will be used.
692///
693/// # Returns
694///
695/// After configuring the builder, call `build` to create a `Pipeline` instance.
696/// The builder will return a `CarbonResult<Pipeline>`, which will either
697/// contain the configured pipeline or an error if configuration failed.
698///
699/// # Notes
700///
701/// - The builder pattern allows for method chaining, making it easy to
702///   incrementally add components to the `Pipeline`.
703/// - Ensure that each component matches the data and update types expected by
704///   your application.
705#[derive(Default)]
706pub struct PipelineBuilder {
707    pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
708    pub account_pipes: Vec<Box<dyn AccountPipes>>,
709    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
710    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
711    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
712    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
713    pub metrics: MetricsCollection,
714    pub metrics_flush_interval: Option<u64>,
715    pub datasource_cancellation_token: Option<CancellationToken>,
716    pub shutdown_strategy: ShutdownStrategy,
717    pub channel_buffer_size: usize,
718}
719
720impl PipelineBuilder {
721    /// Creates a new `PipelineBuilder` with empty collections for datasources,
722    /// pipes, and metrics.
723    ///
724    /// This method initializes a `PipelineBuilder` instance, allowing you to
725    /// configure each component of a `Pipeline` before building it. The
726    /// builder pattern offers flexibility in adding data sources, account
727    /// and transaction handling pipes, deletion processing, and metrics
728    /// collection features.
729    ///
730    /// # Example
731    ///
732    /// ```rust
733    /// use carbon_core::pipeline::PipelineBuilder;
734    ///
735    /// let builder = PipelineBuilder::new();
736    /// ```
737    pub fn new() -> Self {
738        log::trace!("PipelineBuilder::new()");
739        Self::default()
740    }
741
742    /// Adds a datasource to the pipeline.
743    ///
744    /// The datasource is responsible for providing updates, such as account and
745    /// transaction data, to the pipeline. Multiple datasources can be added
746    /// to handle various types of updates.
747    ///
748    /// # Parameters
749    ///
750    /// - `datasource`: The data source to add, implementing the `Datasource`
751    ///   trait.
752    ///
753    /// # Example
754    ///
755    /// ```ignore
756    /// use carbon_core::pipeline::PipelineBuilder;
757    ///
758    /// let builder = PipelineBuilder::new()
759    ///     .datasource(MyDatasource::new());
760    /// ```
761    pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
762        log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
763        self.datasources
764            .push((DatasourceId::new_unique(), Arc::new(datasource)));
765        self
766    }
767
768    /// Adds a datasource to the pipeline with a specific ID.
769    ///
770    /// This method allows you to assign a custom ID to a datasource, which is
771    /// useful for filtering updates based on their source. The ID can be used
772    /// with filters to selectively process updates from specific datasources.
773    ///
774    /// # Parameters
775    ///
776    /// - `datasource`: The data source to add, implementing the `Datasource`
777    ///   trait
778    /// - `id`: The `DatasourceId` to assign to this datasource
779    ///
780    /// # Example
781    ///
782    /// ```ignore
783    /// use carbon_core::{pipeline::PipelineBuilder, datasource::DatasourceId};
784    ///
785    /// let mainnet_id = DatasourceId::new_named("mainnet");
786    /// let builder = PipelineBuilder::new()
787    ///     .datasource_with_id(mainnet_id, MyDatasource::new());
788    /// ```
789    pub fn datasource_with_id(
790        mut self,
791        datasource: impl Datasource + 'static,
792        id: DatasourceId,
793    ) -> Self {
794        log::trace!(
795            "datasource_with_id(self, id: {:?}, datasource: {:?})",
796            id,
797            stringify!(datasource)
798        );
799        self.datasources.push((id, Arc::new(datasource)));
800        self
801    }
802
803    /// Sets the shutdown strategy for the pipeline.
804    ///
805    /// This method configures how the pipeline should handle shutdowns. The
806    /// shutdown strategy defines whether the pipeline should terminate
807    /// immediately or continue processing pending updates after terminating
808    /// the data sources.
809    ///
810    /// # Parameters
811    ///
812    /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
813    ///   how the pipeline should handle shutdowns.
814    ///
815    /// # Returns
816    ///
817    /// Returns `Self`, allowing for method chaining.
818    ///
819    /// # Notes
820    ///
821    /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
822    ///   instantly, including all active processing tasks.
823    /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
824    ///   sources first and allow the pipeline to finish processing any updates
825    ///   that are still pending.
826    pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
827        log::trace!(
828            "shutdown_strategy(self, shutdown_strategy: {:?})",
829            shutdown_strategy
830        );
831        self.shutdown_strategy = shutdown_strategy;
832        self
833    }
834
835    /// Adds an account pipe to process account updates.
836    ///
837    /// Account pipes decode and process updates to accounts within the
838    /// pipeline. This method requires both an `AccountDecoder` and a
839    /// `Processor` to handle decoded account data.
840    ///
841    /// # Parameters
842    ///
843    /// - `decoder`: An `AccountDecoder` that decodes the account data.
844    /// - `processor`: A `Processor` that processes the decoded account data.
845    ///
846    /// # Example
847    ///
848    /// ```ignore
849    /// use carbon_core::pipeline::PipelineBuilder;
850    ///
851    /// let builder = PipelineBuilder::new()
852    ///     .account(MyAccountDecoder, MyAccountProcessor);
853    /// ```
854    pub fn account<T: Send + Sync + 'static>(
855        mut self,
856        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
857        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
858    ) -> Self {
859        log::trace!(
860            "account(self, decoder: {:?}, processor: {:?})",
861            stringify!(decoder),
862            stringify!(processor)
863        );
864        self.account_pipes.push(Box::new(AccountPipe {
865            decoder: Box::new(decoder),
866            processor: Box::new(processor),
867            filters: vec![],
868        }));
869        self
870    }
871
872    /// Adds an account pipe with filters to process account updates selectively.
873    ///
874    /// This method creates an account pipe that only processes updates that pass
875    /// all the specified filters. Filters can be used to selectively process
876    /// updates based on criteria such as datasource ID, account properties, or
877    /// other custom logic.
878    ///
879    /// # Parameters
880    ///
881    /// - `decoder`: An `AccountDecoder` that decodes the account data
882    /// - `processor`: A `Processor` that processes the decoded account data
883    /// - `filters`: A collection of filters that determine which account updates
884    ///   should be processed
885    ///
886    /// # Example
887    ///
888    /// ```ignore
889    /// use carbon_core::{
890    ///     pipeline::PipelineBuilder,
891    ///     datasource::DatasourceId,
892    ///     filter::DatasourceFilter,
893    /// };
894    ///
895    /// let mainnet_id = DatasourceId::new_named("mainnet");
896    /// let filter = DatasourceFilter::new(mainnet_id);
897    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
898    ///
899    /// let builder = PipelineBuilder::new()
900    ///     .account_with_filters(MyAccountDecoder, MyAccountProcessor, filters);
901    /// ```
902    pub fn account_with_filters<T: Send + Sync + 'static>(
903        mut self,
904        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
905        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
906        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
907    ) -> Self {
908        log::trace!(
909            "account_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
910            stringify!(decoder),
911            stringify!(processor),
912            stringify!(filters)
913        );
914        self.account_pipes.push(Box::new(AccountPipe {
915            decoder: Box::new(decoder),
916            processor: Box::new(processor),
917            filters,
918        }));
919        self
920    }
921
922    /// Adds an account deletion pipe to handle account deletion events.
923    ///
924    /// Account deletion pipes process deletions of accounts, with a `Processor`
925    /// to handle the deletion events as they occur.
926    ///
927    /// # Parameters
928    ///
929    /// - `processor`: A `Processor` that processes account deletion events.
930    ///
931    /// # Example
932    ///
933    /// ```ignore
934    /// use carbon_core::pipeline::PipelineBuilder;
935    ///
936    /// let builder = PipelineBuilder::new()
937    ///     .account_deletions(MyAccountDeletionProcessor);
938    /// ```
939    pub fn account_deletions(
940        mut self,
941        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
942    ) -> Self {
943        log::trace!(
944            "account_deletions(self, processor: {:?})",
945            stringify!(processor)
946        );
947        self.account_deletion_pipes
948            .push(Box::new(AccountDeletionPipe {
949                processor: Box::new(processor),
950                filters: vec![],
951            }));
952        self
953    }
954
955    /// Adds an account deletion pipe with filters to handle account deletion events selectively.
956    ///
957    /// This method creates an account deletion pipe that only processes deletion
958    /// events that pass all the specified filters. Filters can be used to
959    /// selectively process deletions based on criteria such as datasource ID or
960    /// other custom logic.
961    ///
962    /// # Parameters
963    ///
964    /// - `processor`: A `Processor` that processes account deletion events
965    /// - `filters`: A collection of filters that determine which account deletion
966    ///   events should be processed
967    ///
968    /// # Example
969    ///
970    /// ```ignore
971    /// use carbon_core::{
972    ///     pipeline::PipelineBuilder,
973    ///     datasource::DatasourceId,
974    ///     filter::DatasourceFilter,
975    /// };
976    ///
977    /// let mainnet_id = DatasourceId::new_named("mainnet");
978    /// let filter = DatasourceFilter::new(mainnet_id);
979    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
980    ///
981    /// let builder = PipelineBuilder::new()
982    ///     .account_deletions_with_filters(MyAccountDeletionProcessor, filters);
983    /// ```
984    pub fn account_deletions_with_filters(
985        mut self,
986        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
987        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
988    ) -> Self {
989        log::trace!(
990            "account_deletions_with_filters(self, processor: {:?}, filters: {:?})",
991            stringify!(processor),
992            stringify!(filters)
993        );
994        self.account_deletion_pipes
995            .push(Box::new(AccountDeletionPipe {
996                processor: Box::new(processor),
997                filters,
998            }));
999        self
1000    }
1001
1002    /// Adds a block details pipe to handle block details updates.
1003    ///
1004    /// Block details pipes process updates related to block metadata, such as
1005    /// slot, block hash, and rewards, with a `Processor` to handle the updates.
1006    ///
1007    /// # Parameters
1008    ///
1009    /// - `processor`: A `Processor` that processes block details updates.
1010    ///
1011    /// # Example
1012    ///
1013    /// ```ignore
1014    /// use carbon_core::pipeline::PipelineBuilder;
1015    ///
1016    /// let builder = PipelineBuilder::new()
1017    ///     .block_details(MyBlockDetailsProcessor);
1018    /// ```
1019    pub fn block_details(
1020        mut self,
1021        processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1022    ) -> Self {
1023        log::trace!(
1024            "block_details(self, processor: {:?})",
1025            stringify!(processor)
1026        );
1027        self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1028            processor: Box::new(processor),
1029            filters: vec![],
1030        }));
1031        self
1032    }
1033
1034    /// Adds a block details pipe with filters to handle block details updates selectively.
1035    ///
1036    /// This method creates a block details pipe that only processes updates that
1037    /// pass all the specified filters. Filters can be used to selectively process
1038    /// block details updates based on criteria such as datasource ID, block height,
1039    /// or other custom logic.
1040    ///
1041    /// # Parameters
1042    ///
1043    /// - `processor`: A `Processor` that processes block details updates
1044    /// - `filters`: A collection of filters that determine which block details
1045    ///   updates should be processed
1046    ///
1047    /// # Example
1048    ///
1049    /// ```ignore
1050    /// use carbon_core::{
1051    ///     pipeline::PipelineBuilder,
1052    ///     datasource::DatasourceId,
1053    ///     filter::DatasourceFilter,
1054    /// };
1055    ///
1056    /// let mainnet_id = DatasourceId::new_named("mainnet");
1057    /// let filter = DatasourceFilter::new(mainnet_id);
1058    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1059    ///
1060    /// let builder = PipelineBuilder::new()
1061    ///     .block_details_with_filters(MyBlockDetailsProcessor, filters);
1062    /// ```
1063    pub fn block_details_with_filters(
1064        mut self,
1065        processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1066        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1067    ) -> Self {
1068        log::trace!(
1069            "block_details_with_filters(self, processor: {:?}, filters: {:?})",
1070            stringify!(processor),
1071            stringify!(filters)
1072        );
1073        self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1074            processor: Box::new(processor),
1075            filters,
1076        }));
1077        self
1078    }
1079
1080    /// Adds an instruction pipe to process instructions within transactions.
1081    ///
1082    /// Instruction pipes decode and process individual instructions,
1083    /// enabling specialized handling of various instruction types.
1084    ///
1085    /// # Parameters
1086    ///
1087    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1088    ///   transaction data.
1089    /// - `processor`: A `Processor` that processes decoded instruction data.
1090    ///
1091    /// # Example
1092    ///
1093    /// ```ignore
1094    /// use carbon_core::pipeline::PipelineBuilder;
1095    ///
1096    /// let builder = PipelineBuilder::new()
1097    ///     .instruction(MyDecoder, MyInstructionProcessor);
1098    /// ```
1099    pub fn instruction<T: Send + Sync + 'static>(
1100        mut self,
1101        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1102        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1103    ) -> Self {
1104        log::trace!(
1105            "instruction(self, decoder: {:?}, processor: {:?})",
1106            stringify!(decoder),
1107            stringify!(processor)
1108        );
1109        self.instruction_pipes.push(Box::new(InstructionPipe {
1110            decoder: Box::new(decoder),
1111            processor: Box::new(processor),
1112            filters: vec![],
1113        }));
1114        self
1115    }
1116
1117    /// Adds an instruction pipe with filters to process instructions selectively.
1118    ///
1119    /// This method creates an instruction pipe that only processes instructions
1120    /// that pass all the specified filters. Filters can be used to selectively
1121    /// process instructions based on criteria such as datasource ID, instruction
1122    /// type, or other custom logic.
1123    ///
1124    /// # Parameters
1125    ///
1126    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1127    ///   transaction data
1128    /// - `processor`: A `Processor` that processes decoded instruction data
1129    /// - `filters`: A collection of filters that determine which instructions
1130    ///   should be processed
1131    ///
1132    /// # Example
1133    ///
1134    /// ```ignore
1135    /// use carbon_core::{
1136    ///     pipeline::PipelineBuilder,
1137    ///     datasource::DatasourceId,
1138    ///     filter::DatasourceFilter,
1139    /// };
1140    ///
1141    /// let mainnet_id = DatasourceId::new_named("mainnet");
1142    /// let filter = DatasourceFilter::new(mainnet_id);
1143    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1144    ///
1145    /// let builder = PipelineBuilder::new()
1146    ///     .instruction_with_filters(MyDecoder, MyInstructionProcessor, filters);
1147    /// ```
1148    pub fn instruction_with_filters<T: Send + Sync + 'static>(
1149        mut self,
1150        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1151        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1152        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1153    ) -> Self {
1154        log::trace!(
1155            "instruction_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
1156            stringify!(decoder),
1157            stringify!(processor),
1158            stringify!(filters)
1159        );
1160        self.instruction_pipes.push(Box::new(InstructionPipe {
1161            decoder: Box::new(decoder),
1162            processor: Box::new(processor),
1163            filters,
1164        }));
1165        self
1166    }
1167
1168    /// Adds a transaction pipe for processing full transaction data.
1169    ///
1170    /// This method requires a transaction schema for decoding and a `Processor`
1171    /// to handle the processed transaction data.
1172    ///
1173    /// # Parameters
1174    ///
1175    /// - `schema`: A `TransactionSchema` used to match and interpret
1176    ///   transaction data.
1177    /// - `processor`: A `Processor` that processes the decoded transaction
1178    ///   data.
1179    ///
1180    /// # Example
1181    ///
1182    /// ```ignore
1183    /// use carbon_core::pipeline::PipelineBuilder;
1184    ///
1185    /// let builder = PipelineBuilder::new()
1186    ///     .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
1187    /// ```
1188    pub fn transaction<T, U>(
1189        mut self,
1190        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1191            + Send
1192            + Sync
1193            + 'static,
1194        schema: Option<TransactionSchema<T>>,
1195    ) -> Self
1196    where
1197        T: InstructionDecoderCollection + 'static,
1198        U: DeserializeOwned + Send + Sync + 'static,
1199    {
1200        log::trace!(
1201            "transaction(self, schema: {:?}, processor: {:?})",
1202            stringify!(schema),
1203            stringify!(processor)
1204        );
1205        self.transaction_pipes
1206            .push(Box::new(TransactionPipe::<T, U>::new(
1207                schema,
1208                processor,
1209                vec![],
1210            )));
1211        self
1212    }
1213
1214    /// Adds a transaction pipe with filters for processing full transaction data selectively.
1215    ///
1216    /// This method creates a transaction pipe that only processes transactions
1217    /// that pass all the specified filters. Filters can be used to selectively
1218    /// process transactions based on criteria such as datasource ID, transaction
1219    /// type, or other custom logic.
1220    ///
1221    /// # Parameters
1222    ///
1223    /// - `processor`: A `Processor` that processes the decoded transaction data
1224    /// - `schema`: A `TransactionSchema` used to match and interpret
1225    ///   transaction data
1226    /// - `filters`: A collection of filters that determine which transactions
1227    ///   should be processed
1228    ///
1229    /// # Example
1230    ///
1231    /// ```ignore
1232    /// use carbon_core::{
1233    ///     pipeline::PipelineBuilder,
1234    ///     datasource::DatasourceId,
1235    ///     filter::DatasourceFilter,
1236    /// };
1237    ///
1238    /// let mainnet_id = DatasourceId::new_named("mainnet");
1239    /// let filter = DatasourceFilter::new(mainnet_id);
1240    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1241    ///
1242    /// let builder = PipelineBuilder::new()
1243    ///     .transaction_with_filters(MyTransactionProcessor, MY_SCHEMA.clone(), filters);
1244    /// ```
1245    pub fn transaction_with_filters<T, U>(
1246        mut self,
1247        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1248            + Send
1249            + Sync
1250            + 'static,
1251        schema: Option<TransactionSchema<T>>,
1252        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1253    ) -> Self
1254    where
1255        T: InstructionDecoderCollection + 'static,
1256        U: DeserializeOwned + Send + Sync + 'static,
1257    {
1258        log::trace!(
1259            "transaction_with_filters(self, schema: {:?}, processor: {:?}, filters: {:?})",
1260            stringify!(schema),
1261            stringify!(processor),
1262            stringify!(filters)
1263        );
1264        self.transaction_pipes
1265            .push(Box::new(TransactionPipe::<T, U>::new(
1266                schema, processor, filters,
1267            )));
1268        self
1269    }
1270
1271    /// Adds a metrics component to the pipeline for performance tracking.
1272    ///
1273    /// This component collects and reports on pipeline metrics, providing
1274    /// insights into performance and operational statistics.
1275    ///
1276    /// # Parameters
1277    ///
1278    /// - `metrics`: An instance of a `Metrics` implementation, used to gather
1279    ///   and report metrics.
1280    ///
1281    /// # Example
1282    ///
1283    /// ```ignore
1284    /// use std::sync::Arc;
1285    /// use carbon_core::pipeline::PipelineBuilder;
1286    ///
1287    /// let builder = PipelineBuilder::new()
1288    ///     .metrics(Arc::new(LogMetrics::new()));
1289    /// ```
1290    pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
1291        log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
1292        self.metrics.metrics.push(metrics);
1293        self
1294    }
1295
1296    /// Sets the interval for flushing metrics data.
1297    ///
1298    /// This value defines the frequency, in seconds, at which metrics data is
1299    /// flushed from memory. If not set, a default interval is used.
1300    ///
1301    /// # Parameters
1302    ///
1303    /// - `interval`: The flush interval for metrics, in seconds.
1304    ///
1305    /// # Example
1306    ///
1307    /// ```rust
1308    /// use carbon_core::pipeline::PipelineBuilder;
1309    ///
1310    /// let builder = PipelineBuilder::new()
1311    ///     .metrics_flush_interval(60);
1312    /// ```
1313    pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
1314        log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
1315        self.metrics_flush_interval = Some(interval);
1316        self
1317    }
1318
1319    /// Sets the cancellation token for cancelling datasource on demand.
1320    ///
1321    /// This value is used to cancel datasource on demand.
1322    /// If not set, a default `CancellationToken` is used.
1323    ///
1324    /// # Parameters
1325    ///
1326    /// - `cancellation_token`: An instance of `CancellationToken`.
1327    ///
1328    /// # Example
1329    ///
1330    /// ```rust
1331    /// use carbon_core::pipeline::PipelineBuilder;
1332    /// use tokio_util::sync::CancellationToken;
1333    ///
1334    /// let builder = PipelineBuilder::new()
1335    ///     .datasource_cancellation_token(CancellationToken::new());
1336    /// ```
1337    pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
1338        log::trace!(
1339            "datasource_cancellation_token(self, cancellation_token: {:?})",
1340            cancellation_token
1341        );
1342        self.datasource_cancellation_token = Some(cancellation_token);
1343        self
1344    }
1345
1346    /// Sets the size of the channel buffer for the pipeline.
1347    ///
1348    /// This value defines the maximum number of updates that can be queued in
1349    /// the pipeline's channel buffer. If not set, a default size of 10_000
1350    /// will be used.
1351    ///
1352    /// # Parameters
1353    ///
1354    /// - `size`: The size of the channel buffer for the pipeline.
1355    ///
1356    /// # Example
1357    ///
1358    /// ```rust
1359    /// use carbon_core::pipeline::PipelineBuilder;
1360    ///
1361    /// let builder = PipelineBuilder::new()
1362    ///     .channel_buffer_size(1000);
1363    /// ```
1364    pub fn channel_buffer_size(mut self, size: usize) -> Self {
1365        log::trace!("channel_buffer_size(self, size: {:?})", size);
1366        self.channel_buffer_size = size;
1367        self
1368    }
1369
1370    /// Builds and returns a `Pipeline` configured with the specified
1371    /// components.
1372    ///
1373    /// After configuring the `PipelineBuilder` with data sources, pipes, and
1374    /// metrics, call this method to create the final `Pipeline` instance
1375    /// ready for operation.
1376    ///
1377    /// # Returns
1378    ///
1379    /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
1380    /// or an error if any part of the configuration is invalid.
1381    ///
1382    /// # Example
1383    ///
1384    /// ```ignore
1385    /// use std::sync::Arc;
1386    ///
1387    /// carbon_core::pipeline::Pipeline::builder()
1388    /// .datasource(transaction_crawler)
1389    /// .metrics(Arc::new(LogMetrics::new()))
1390    /// .metrics(Arc::new(PrometheusMetrics::new()))
1391    /// .instruction(
1392    ///    TestProgramDecoder,
1393    ///    TestProgramProcessor
1394    /// )
1395    /// .account(
1396    ///     TestProgramDecoder,
1397    ///     TestProgramAccountProcessor
1398    /// )
1399    /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
1400    /// .account_deletions(TestProgramAccountDeletionProcessor)
1401    /// .channel_buffer_size(1000)
1402    /// .build()?;
1403    ///
1404    ///  Ok(())
1405    /// ```
1406    pub fn build(self) -> CarbonResult<Pipeline> {
1407        log::trace!("build(self)");
1408        Ok(Pipeline {
1409            datasources: self.datasources,
1410            account_pipes: self.account_pipes,
1411            account_deletion_pipes: self.account_deletion_pipes,
1412            block_details_pipes: self.block_details_pipes,
1413            instruction_pipes: self.instruction_pipes,
1414            transaction_pipes: self.transaction_pipes,
1415            shutdown_strategy: self.shutdown_strategy,
1416            metrics: Arc::new(self.metrics),
1417            metrics_flush_interval: self.metrics_flush_interval,
1418            datasource_cancellation_token: self.datasource_cancellation_token,
1419            channel_buffer_size: self.channel_buffer_size,
1420        })
1421    }
1422}