carbon_core/
pipeline.rs

1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//!   transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//!   and process specific types of data. Account pipes handle account updates,
25//!   instruction pipes process instructions within transactions, and
26//!   transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//!   times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//!   for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//!   deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//!   on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//!   Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//!   (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//!   wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//!   pipeline performance, especially in production environments.
52
53use crate::block_details::{BlockDetailsPipe, BlockDetailsPipes};
54use crate::datasource::{BlockDetails, DatasourceId};
55use crate::filter::Filter;
56use {
57    crate::{
58        account::{
59            AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
60        },
61        account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
62        collection::InstructionDecoderCollection,
63        datasource::{AccountDeletion, Datasource, Update},
64        error::CarbonResult,
65        instruction::{
66            InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
67            InstructionsWithMetadata, NestedInstructions,
68        },
69        metrics::{Metrics, MetricsCollection},
70        processor::Processor,
71        schema::TransactionSchema,
72        transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
73        transformers,
74    },
75    core::time,
76    serde::de::DeserializeOwned,
77    std::{convert::TryInto, sync::Arc, time::Instant},
78    tokio_util::sync::CancellationToken,
79};
80
81/// Defines the shutdown behavior for the pipeline.
82///
83/// `ShutdownStrategy` determines how the pipeline will behave when it receives
84/// a shutdown signal. It supports two modes:
85///
86/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
87/// - `ProcessPending`: Terminates the data sources, then completes processing
88///   of any updates currently pending in the pipeline. This is the default
89///   behavior.
90///
91/// # Variants
92///
93/// - `Immediate`: Immediately stops all pipeline activity without processing
94///   any remaining updates.
95/// - `ProcessPending`: Gracefully terminates the data sources and allows the
96///   pipeline to finish processing updates that are still in progress or
97///   queued.
98///
99/// # Notes
100///
101/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
102///   that no updates are lost during shutdown.
103#[derive(Default, PartialEq, Debug)]
104pub enum ShutdownStrategy {
105    /// Stop the whole pipeline immediately.
106    Immediate,
107    /// Terminate the datasource(s) and finish processing all pending updates.
108    #[default]
109    ProcessPending,
110}
111
112/// The default size of the channel buffer for the pipeline.
113///
114/// This constant defines the default number of updates that can be queued in
115/// the pipeline's channel buffer. It is used as a fallback value if the
116/// `channel_buffer_size` is not explicitly set during pipeline construction.
117///
118/// The default size is 10,000 updates, which provides a reasonable balance
119pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
120
121/// Represents the primary data processing pipeline in the `carbon-core`
122/// framework.
123///
124/// The `Pipeline` struct is responsible for orchestrating the flow of data from
125/// various sources, processing it through multiple pipes (for accounts,
126/// transactions, instructions, and account deletions), and recording metrics at
127/// each stage. This flexible design allows for customized data processing,
128/// handling a variety of update types with minimal boilerplate code.
129///
130/// ## Overview
131///
132/// A `Pipeline` instance includes collections of data sources and processing
133/// pipes, enabling users to configure the pipeline to handle diverse types of
134/// blockchain-related data. Each pipe is responsible for decoding, processing,
135/// and routing specific data types, while the metrics system records relevant
136/// statistics.
137///
138/// ### Key Concepts
139///
140/// - **Datasources**: These provide the raw data, such as account updates,
141///   transaction details, and account deletions.
142/// - **Pipes**: Modular units that handle specific data types:
143///   - `AccountPipes` for account updates.
144///   - `AccountDeletionPipes` for account deletions.
145///   - `InstructionPipes` for instruction data within transactions.
146///   - `TransactionPipes` for entire transaction payloads.
147/// - **Metrics**: Collect performance data, enabling real-time insights and
148///   efficient monitoring.
149///
150/// ## Fields
151///
152/// - `datasources`: A vector of data sources (`Datasource` implementations)
153///   that provide the data for processing. Each data source is paired with a
154///   unique `DatasourceId` for identification and filtering purposes. Each data
155///   source must be wrapped in an `Arc` for safe, concurrent access.
156/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
157///   account updates.
158/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
159///   deletion events.
160/// - `block_details_pipes`: A vector of `BlockDetailsPipes` to handle
161///   block details.
162/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
163///   instructions within transactions. These pipes work with nested
164///   instructions and are generically defined to support varied instruction
165///   types.
166/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
167///   processing complete transaction payloads.
168/// - `metrics`: A vector of `Metrics` implementations to record and track
169///   performance data. Each metrics instance is managed within an `Arc` to
170///   ensure thread safety.
171/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
172///   frequently metrics should be flushed. If `None`, the default interval is
173///   used.
174/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
175///   not set, a default size of 10_000 will be used.
176///
177/// ## Example
178///
179/// ```ignore
180/// use std::sync::Arc;
181///
182/// carbon_core::pipeline::Pipeline::builder()
183/// .datasource(transaction_crawler)
184/// .metrics(Arc::new(LogMetrics::new()))
185/// .metrics(Arc::new(PrometheusMetrics::new()))
186/// .instruction(
187///    TestProgramDecoder,
188///    TestProgramProcessor
189/// )
190/// .account(
191///     TestProgramDecoder,
192///     TestProgramAccountProcessor
193/// )
194/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
195/// .account_deletions(TestProgramAccountDeletionProcessor)
196/// .channel_buffer_size(1000)
197/// .build()?
198/// .run()
199/// .await?;
200/// ```
201///
202/// ## Notes
203///
204/// - Ensure that each data source and pipe implements the required traits, such
205///   as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
206/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
207///   `Box` types to handle shared ownership and trait object storage.
208/// - The `metrics_flush_interval` controls how frequently the pipeline's
209///   metrics are flushed. If `None`, a default interval (usually 5 seconds) is
210///   used.
211pub struct Pipeline {
212    pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
213    pub account_pipes: Vec<Box<dyn AccountPipes>>,
214    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
215    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
216    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
217    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
218    pub metrics: Arc<MetricsCollection>,
219    pub metrics_flush_interval: Option<u64>,
220    pub datasource_cancellation_token: Option<CancellationToken>,
221    pub shutdown_strategy: ShutdownStrategy,
222    pub channel_buffer_size: usize,
223}
224
225impl Pipeline {
226    /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
227    ///
228    /// The `builder` method returns a `PipelineBuilder` that allows you to
229    /// configure and customize the pipeline components before building the
230    /// final `Pipeline` object. This approach provides a flexible and
231    /// type-safe way to assemble a pipeline by specifying data sources,
232    /// processing pipes, and metrics.
233    ///
234    /// # Example
235    ///
236    /// ```ignore
237    /// use std::sync::Arc;
238    ///
239    /// carbon_core::pipeline::Pipeline::builder()
240    /// .datasource(transaction_crawler)
241    /// .metrics(Arc::new(LogMetrics::new()))
242    /// .metrics(Arc::new(PrometheusMetrics::new()))
243    /// .instruction(
244    ///    TestProgramDecoder,
245    ///    TestProgramProcessor
246    /// );
247    /// // ...
248    /// ```
249    ///
250    /// # Returns
251    ///
252    /// Returns a `PipelineBuilder` instance with empty collections for data
253    /// sources, pipes, and metrics. You can then configure each component
254    /// using the builder pattern.
255    pub fn builder() -> PipelineBuilder {
256        log::trace!("Pipeline::builder()");
257        PipelineBuilder {
258            datasources: Vec::new(),
259            account_pipes: Vec::new(),
260            account_deletion_pipes: Vec::new(),
261            block_details_pipes: Vec::new(),
262            instruction_pipes: Vec::new(),
263            transaction_pipes: Vec::new(),
264            metrics: MetricsCollection::default(),
265            metrics_flush_interval: None,
266            datasource_cancellation_token: None,
267            shutdown_strategy: ShutdownStrategy::default(),
268            channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
269        }
270    }
271
272    /// Runs the `Pipeline`, processing updates from data sources and handling
273    /// metrics.
274    ///
275    /// The `run` method initializes the pipeline's metrics system and starts
276    /// listening for updates from the configured data sources. It checks
277    /// the types of updates provided by each data source to ensure that the
278    /// required data types are available for processing. The method then
279    /// enters a loop where it processes each update received from the data
280    /// sources in turn, logging and updating metrics based on the success
281    /// or failure of each operation.
282    ///
283    /// # How it Works
284    ///
285    /// - Initializes metrics and sets up an interval for periodic metric
286    ///   flushing.
287    /// - Spawns tasks for each data source to continuously consume updates.
288    /// - Processes updates according to their type (e.g., Account, Transaction,
289    ///   or AccountDeletion).
290    /// - Records performance metrics such as update processing times, and
291    ///   tracks success and failure counts.
292    ///
293    /// # Errors
294    ///
295    /// The method returns an `Err` variant if:
296    /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
297    ///   `Transaction`) are not provided by any data source, causing a mismatch
298    ///   in expected data processing capabilities.
299    /// - A data source encounters an error while consuming updates.
300    /// - An error occurs during metrics flushing or processing of updates.
301    ///
302    /// # Example
303    ///
304    /// ```ignore
305    /// use carbon_core::pipeline::Pipeline;
306    ///
307    /// let mut pipeline = Pipeline::builder()
308    ///     .datasource(MyDatasource::new())
309    ///     .metrics(MyMetrics::new())
310    ///     .build()
311    ///     .expect("Failed to build pipeline");
312    ///
313    /// // Running the pipeline asynchronously
314    /// tokio::spawn(async move {
315    ///     if let Err(e) = pipeline.run().await {
316    ///         eprintln!("Pipeline run error: {:?}", e);
317    ///     }
318    /// });
319    /// ```
320    ///
321    /// # Notes
322    ///
323    /// - This method is asynchronous and should be awaited within a Tokio
324    ///   runtime environment.
325    /// - The pipeline monitors metrics and flushes them based on the configured
326    ///   `metrics_flush_interval`.
327    /// - The `run` method operates in an infinite loop, handling updates until
328    ///   a termination condition occurs.
329    pub async fn run(&mut self) -> CarbonResult<()> {
330        log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
331            self.datasources.len(),
332            self.metrics.metrics.len(),
333            self.account_pipes.len(),
334            self.account_deletion_pipes.len(),
335            self.instruction_pipes.len(),
336            self.transaction_pipes.len(),
337        );
338
339        log::trace!("run(self)");
340
341        self.metrics.initialize_metrics().await?;
342        let (update_sender, mut update_receiver) =
343            tokio::sync::mpsc::channel::<(Update, DatasourceId)>(self.channel_buffer_size);
344
345        let datasource_cancellation_token = self
346            .datasource_cancellation_token
347            .clone()
348            .unwrap_or_default();
349
350        for datasource in &self.datasources {
351            let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
352            let sender_clone = update_sender.clone();
353            let datasource_clone = Arc::clone(&datasource.1);
354            let datasource_id = datasource.0.clone();
355            let metrics_collection = self.metrics.clone();
356
357            tokio::spawn(async move {
358                if let Err(e) = datasource_clone
359                    .consume(
360                        datasource_id,
361                        sender_clone,
362                        datasource_cancellation_token_clone,
363                        metrics_collection,
364                    )
365                    .await
366                {
367                    log::error!("error consuming datasource: {:?}", e);
368                }
369            });
370        }
371
372        drop(update_sender);
373
374        let mut interval = tokio::time::interval(time::Duration::from_secs(
375            self.metrics_flush_interval.unwrap_or(5),
376        ));
377
378        loop {
379            tokio::select! {
380                _ = datasource_cancellation_token.cancelled() => {
381                    log::trace!("datasource cancellation token cancelled, shutting down.");
382                    self.metrics.flush_metrics().await?;
383                    self.metrics.shutdown_metrics().await?;
384                    break;
385                }
386                _ = tokio::signal::ctrl_c() => {
387                    log::trace!("received SIGINT, shutting down.");
388                    datasource_cancellation_token.cancel();
389
390                    if self.shutdown_strategy == ShutdownStrategy::Immediate {
391                        log::info!("shutting down the pipeline immediately.");
392                        self.metrics.flush_metrics().await?;
393                        self.metrics.shutdown_metrics().await?;
394                        break;
395                    } else {
396                        log::info!("shutting down the pipeline after processing pending updates.");
397                    }
398                }
399                _ = interval.tick() => {
400                    self.metrics.flush_metrics().await?;
401                }
402                update = update_receiver.recv() => {
403                    match update {
404                        Some((update, datasource_id)) => {
405                            self
406                                .metrics.increment_counter("updates_received", 1)
407                                .await?;
408
409                            let start = Instant::now();
410                            let process_result = self.process(update.clone(), datasource_id.clone()).await;
411                            let time_taken_nanoseconds = start.elapsed().as_nanos();
412                            let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
413
414                            self
415                                .metrics
416                                .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
417                                .await?;
418
419                            self
420                                .metrics
421                                .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
422                                .await?;
423
424                            match process_result {
425                                Ok(_) => {
426                                    self
427                                        .metrics.increment_counter("updates_successful", 1)
428                                        .await?;
429
430                                    log::trace!("processed update")
431                                }
432                                Err(error) => {
433                                    log::error!("error processing update ({:?}): {:?}", update, error);
434                                    self.metrics.increment_counter("updates_failed", 1).await?;
435                                }
436                            };
437
438                            self
439                                .metrics.increment_counter("updates_processed", 1)
440                                .await?;
441
442                            self
443                                .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
444                                .await?;
445                        }
446                        None => {
447                            log::info!("update_receiver closed, shutting down.");
448                            self.metrics.flush_metrics().await?;
449                            self.metrics.shutdown_metrics().await?;
450                            break;
451                        }
452                    }
453                }
454            }
455        }
456
457        log::info!("pipeline shutdown complete.");
458
459        Ok(())
460    }
461
462    /// Processes a single update and routes it through the appropriate pipeline
463    /// stages.
464    ///
465    /// The `process` method takes an `Update` and determines its type, then
466    /// routes it through the corresponding pipes for handling account
467    /// updates, transactions, or account deletions. It also records metrics
468    /// for processed updates, providing insights into the processing
469    /// workload and performance.
470    ///
471    /// ## Functionality
472    ///
473    /// - **Account Updates**: Passes account updates through the
474    ///   `account_pipes`. Each pipe processes the account metadata and the
475    ///   updated account state.
476    /// - **Transaction Updates**: Extracts transaction metadata and
477    ///   instructions, nests them if needed, and routes them through
478    ///   `instruction_pipes` and `transaction_pipes`.
479    /// - **Account Deletions**: Sends account deletion events through the
480    ///   `account_deletion_pipes`.
481    ///
482    /// The method also updates metrics counters for each type of update,
483    /// tracking how many updates have been processed in each category.
484    ///
485    /// # Parameters
486    ///
487    /// - `update`: An `Update` variant representing the type of data received.
488    ///   This can be an `Account`, `Transaction`, or `AccountDeletion`, each
489    ///   triggering different processing logic within the pipeline.
490    /// - `datasource_id`: The ID of the datasource that produced this update.
491    ///   This is used by filters to determine whether the update should be
492    ///   processed by specific pipes.
493    ///
494    /// # Returns
495    ///
496    /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
497    /// processing or an error if processing fails at any stage.
498    ///
499    /// # Notes
500    ///
501    /// - This method is asynchronous and should be awaited within a Tokio
502    ///   runtime.
503    /// - Each type of update (account, transaction, account deletion) requires
504    ///   its own set of pipes, so ensure that appropriate pipes are configured
505    ///   based on the data types expected from the data sources.
506    /// - Metrics are recorded after each successful processing stage to track
507    ///   processing volumes and identify potential bottlenecks in real-time.
508    /// - Filters are applied to each pipe before processing, allowing for
509    ///   selective update processing based on datasource ID and other criteria.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if any of the pipes fail during processing, or if an
514    /// issue arises while incrementing counters or updating metrics. Handle
515    /// errors gracefully to ensure continuous pipeline operation.
516    async fn process(&mut self, update: Update, datasource_id: DatasourceId) -> CarbonResult<()> {
517        log::trace!(
518            "process(self, update: {:?}, datasource_id: {:?})",
519            update,
520            datasource_id
521        );
522        match update {
523            Update::Account(account_update) => {
524                let account_metadata = AccountMetadata {
525                    slot: account_update.slot,
526                    pubkey: account_update.pubkey,
527                };
528
529                for pipe in self.account_pipes.iter_mut() {
530                    if pipe.filters().iter().all(|filter| {
531                        filter.filter_account(
532                            &datasource_id,
533                            &account_metadata,
534                            &account_update.account,
535                        )
536                    }) {
537                        pipe.run(
538                            (account_metadata.clone(), account_update.account.clone()),
539                            self.metrics.clone(),
540                        )
541                        .await?;
542                    }
543                }
544
545                self.metrics
546                    .increment_counter("account_updates_processed", 1)
547                    .await?;
548            }
549            Update::Transaction(transaction_update) => {
550                let transaction_metadata = Arc::new((*transaction_update).clone().try_into()?);
551
552                let instructions_with_metadata: InstructionsWithMetadata =
553                    transformers::extract_instructions_with_metadata(
554                        &transaction_metadata,
555                        &transaction_update,
556                    )?;
557
558                let nested_instructions: NestedInstructions = instructions_with_metadata.into();
559
560                for pipe in self.instruction_pipes.iter_mut() {
561                    for nested_instruction in nested_instructions.iter() {
562                        if pipe.filters().iter().all(|filter| {
563                            filter.filter_instruction(&datasource_id, nested_instruction)
564                        }) {
565                            pipe.run(nested_instruction, self.metrics.clone()).await?;
566                        }
567                    }
568                }
569
570                for pipe in self.transaction_pipes.iter_mut() {
571                    if pipe.filters().iter().all(|filter| {
572                        filter.filter_transaction(
573                            &datasource_id,
574                            &transaction_metadata,
575                            &nested_instructions,
576                        )
577                    }) {
578                        pipe.run(
579                            transaction_metadata.clone(),
580                            &nested_instructions,
581                            self.metrics.clone(),
582                        )
583                        .await?;
584                    }
585                }
586
587                self.metrics
588                    .increment_counter("transaction_updates_processed", 1)
589                    .await?;
590            }
591            Update::AccountDeletion(account_deletion) => {
592                for pipe in self.account_deletion_pipes.iter_mut() {
593                    if pipe.filters().iter().all(|filter| {
594                        filter.filter_account_deletion(&datasource_id, &account_deletion)
595                    }) {
596                        pipe.run(account_deletion.clone(), self.metrics.clone())
597                            .await?;
598                    }
599                }
600
601                self.metrics
602                    .increment_counter("account_deletions_processed", 1)
603                    .await?;
604            }
605            Update::BlockDetails(block_details) => {
606                for pipe in self.block_details_pipes.iter_mut() {
607                    if pipe
608                        .filters()
609                        .iter()
610                        .all(|filter| filter.filter_block_details(&datasource_id, &block_details))
611                    {
612                        pipe.run(block_details.clone(), self.metrics.clone())
613                            .await?;
614                    }
615                }
616
617                self.metrics
618                    .increment_counter("block_details_processed", 1)
619                    .await?;
620            }
621        };
622
623        Ok(())
624    }
625}
626
627/// A builder for constructing a `Pipeline` instance with customized data
628/// sources, processing pipes, and metrics.
629///
630/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
631/// by allowing configuration of its components, such as data sources, account
632/// and transaction pipes, deletion handling, and metrics. Using the builder
633/// pattern, you can add the desired elements incrementally and then finalize
634/// with a call to `build`.
635///
636/// ## Overview
637///
638/// The `PipelineBuilder` supports the following components:
639/// - **Datasources**: Sources of data updates, such as account information and
640///   transactions.
641/// - **Account Pipes**: For processing account updates from data sources.
642/// - **Account Deletion Pipes**: For handling account deletion updates.
643/// - **Instruction Pipes**: For handling instructions associated with
644///   transactions.
645/// - **Transaction Pipes**: For handling full transaction data.
646/// - **Metrics**: Collects and reports performance data, such as update
647///   processing times.
648/// - **Metrics Flush Interval**: Optional interval defining how often to flush
649///   metrics data.
650///
651/// Each component can be added through method chaining, enhancing code
652/// readability and maintainability.
653///
654/// # Example
655///
656/// ```ignore
657/// use std::sync::Arc;
658///
659/// carbon_core::pipeline::Pipeline::builder()
660/// .datasource(transaction_crawler)
661/// .metrics(Arc::new(LogMetrics::new()))
662/// .metrics(Arc::new(PrometheusMetrics::new()))
663/// .instruction(
664///    TestProgramDecoder,
665///    TestProgramProcessor
666/// );
667/// // ...
668/// ```
669///
670/// # Fields
671///
672/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
673///   shared ownership across threads. Each `Datasource` provides updates to the
674///   pipeline.
675/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
676/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
677///   processing account deletions.
678/// - `instruction_pipes`: A collection of `InstructionPipes` to process
679///   instructions in transactions.
680/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
681///   transaction data.
682/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
683///   performance.
684/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
685///   metrics data. If not set, a default flush interval will be used.
686/// - `datasource_cancellation_token`: An optional `CancellationToken` for
687///   canceling datasource. If not set, a default `CancellationToken` will be
688///   used.
689/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
690///   not set, a default size of 10_000 will be used.
691///
692/// # Returns
693///
694/// After configuring the builder, call `build` to create a `Pipeline` instance.
695/// The builder will return a `CarbonResult<Pipeline>`, which will either
696/// contain the configured pipeline or an error if configuration failed.
697///
698/// # Notes
699///
700/// - The builder pattern allows for method chaining, making it easy to
701///   incrementally add components to the `Pipeline`.
702/// - Ensure that each component matches the data and update types expected by
703///   your application.
704#[derive(Default)]
705pub struct PipelineBuilder {
706    pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
707    pub account_pipes: Vec<Box<dyn AccountPipes>>,
708    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
709    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
710    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
711    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
712    pub metrics: MetricsCollection,
713    pub metrics_flush_interval: Option<u64>,
714    pub datasource_cancellation_token: Option<CancellationToken>,
715    pub shutdown_strategy: ShutdownStrategy,
716    pub channel_buffer_size: usize,
717}
718
719impl PipelineBuilder {
720    /// Creates a new `PipelineBuilder` with empty collections for datasources,
721    /// pipes, and metrics.
722    ///
723    /// This method initializes a `PipelineBuilder` instance, allowing you to
724    /// configure each component of a `Pipeline` before building it. The
725    /// builder pattern offers flexibility in adding data sources, account
726    /// and transaction handling pipes, deletion processing, and metrics
727    /// collection features.
728    ///
729    /// # Example
730    ///
731    /// ```rust
732    /// use carbon_core::pipeline::PipelineBuilder;
733    ///
734    /// let builder = PipelineBuilder::new();
735    /// ```
736    pub fn new() -> Self {
737        log::trace!("PipelineBuilder::new()");
738        Self::default()
739    }
740
741    /// Adds a datasource to the pipeline.
742    ///
743    /// The datasource is responsible for providing updates, such as account and
744    /// transaction data, to the pipeline. Multiple datasources can be added
745    /// to handle various types of updates.
746    ///
747    /// # Parameters
748    ///
749    /// - `datasource`: The data source to add, implementing the `Datasource`
750    ///   trait.
751    ///
752    /// # Example
753    ///
754    /// ```ignore
755    /// use carbon_core::pipeline::PipelineBuilder;
756    ///
757    /// let builder = PipelineBuilder::new()
758    ///     .datasource(MyDatasource::new());
759    /// ```
760    pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
761        log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
762        self.datasources
763            .push((DatasourceId::new_unique(), Arc::new(datasource)));
764        self
765    }
766
767    /// Adds a datasource to the pipeline with a specific ID.
768    ///
769    /// This method allows you to assign a custom ID to a datasource, which is
770    /// useful for filtering updates based on their source. The ID can be used
771    /// with filters to selectively process updates from specific datasources.
772    ///
773    /// # Parameters
774    ///
775    /// - `datasource`: The data source to add, implementing the `Datasource`
776    ///   trait
777    /// - `id`: The `DatasourceId` to assign to this datasource
778    ///
779    /// # Example
780    ///
781    /// ```ignore
782    /// use carbon_core::{pipeline::PipelineBuilder, datasource::DatasourceId};
783    ///
784    /// let mainnet_id = DatasourceId::new_named("mainnet");
785    /// let builder = PipelineBuilder::new()
786    ///     .datasource_with_id(mainnet_id, MyDatasource::new());
787    /// ```
788    pub fn datasource_with_id(
789        mut self,
790        datasource: impl Datasource + 'static,
791        id: DatasourceId,
792    ) -> Self {
793        log::trace!(
794            "datasource_with_id(self, id: {:?}, datasource: {:?})",
795            id,
796            stringify!(datasource)
797        );
798        self.datasources.push((id, Arc::new(datasource)));
799        self
800    }
801
802    /// Sets the shutdown strategy for the pipeline.
803    ///
804    /// This method configures how the pipeline should handle shutdowns. The
805    /// shutdown strategy defines whether the pipeline should terminate
806    /// immediately or continue processing pending updates after terminating
807    /// the data sources.
808    ///
809    /// # Parameters
810    ///
811    /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
812    ///   how the pipeline should handle shutdowns.
813    ///
814    /// # Returns
815    ///
816    /// Returns `Self`, allowing for method chaining.
817    ///
818    /// # Notes
819    ///
820    /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
821    ///   instantly, including all active processing tasks.
822    /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
823    ///   sources first and allow the pipeline to finish processing any updates
824    ///   that are still pending.
825    pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
826        log::trace!(
827            "shutdown_strategy(self, shutdown_strategy: {:?})",
828            shutdown_strategy
829        );
830        self.shutdown_strategy = shutdown_strategy;
831        self
832    }
833
834    /// Adds an account pipe to process account updates.
835    ///
836    /// Account pipes decode and process updates to accounts within the
837    /// pipeline. This method requires both an `AccountDecoder` and a
838    /// `Processor` to handle decoded account data.
839    ///
840    /// # Parameters
841    ///
842    /// - `decoder`: An `AccountDecoder` that decodes the account data.
843    /// - `processor`: A `Processor` that processes the decoded account data.
844    ///
845    /// # Example
846    ///
847    /// ```ignore
848    /// use carbon_core::pipeline::PipelineBuilder;
849    ///
850    /// let builder = PipelineBuilder::new()
851    ///     .account(MyAccountDecoder, MyAccountProcessor);
852    /// ```
853    pub fn account<T: Send + Sync + 'static>(
854        mut self,
855        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
856        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
857    ) -> Self {
858        log::trace!(
859            "account(self, decoder: {:?}, processor: {:?})",
860            stringify!(decoder),
861            stringify!(processor)
862        );
863        self.account_pipes.push(Box::new(AccountPipe {
864            decoder: Box::new(decoder),
865            processor: Box::new(processor),
866            filters: vec![],
867        }));
868        self
869    }
870
871    /// Adds an account pipe with filters to process account updates selectively.
872    ///
873    /// This method creates an account pipe that only processes updates that pass
874    /// all the specified filters. Filters can be used to selectively process
875    /// updates based on criteria such as datasource ID, account properties, or
876    /// other custom logic.
877    ///
878    /// # Parameters
879    ///
880    /// - `decoder`: An `AccountDecoder` that decodes the account data
881    /// - `processor`: A `Processor` that processes the decoded account data
882    /// - `filters`: A collection of filters that determine which account updates
883    ///   should be processed
884    ///
885    /// # Example
886    ///
887    /// ```ignore
888    /// use carbon_core::{
889    ///     pipeline::PipelineBuilder,
890    ///     datasource::DatasourceId,
891    ///     filter::DatasourceFilter,
892    /// };
893    ///
894    /// let mainnet_id = DatasourceId::new_named("mainnet");
895    /// let filter = DatasourceFilter::new(mainnet_id);
896    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
897    ///
898    /// let builder = PipelineBuilder::new()
899    ///     .account_with_filters(MyAccountDecoder, MyAccountProcessor, filters);
900    /// ```
901    pub fn account_with_filters<T: Send + Sync + 'static>(
902        mut self,
903        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
904        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
905        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
906    ) -> Self {
907        log::trace!(
908            "account_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
909            stringify!(decoder),
910            stringify!(processor),
911            stringify!(filters)
912        );
913        self.account_pipes.push(Box::new(AccountPipe {
914            decoder: Box::new(decoder),
915            processor: Box::new(processor),
916            filters,
917        }));
918        self
919    }
920
921    /// Adds an account deletion pipe to handle account deletion events.
922    ///
923    /// Account deletion pipes process deletions of accounts, with a `Processor`
924    /// to handle the deletion events as they occur.
925    ///
926    /// # Parameters
927    ///
928    /// - `processor`: A `Processor` that processes account deletion events.
929    ///
930    /// # Example
931    ///
932    /// ```ignore
933    /// use carbon_core::pipeline::PipelineBuilder;
934    ///
935    /// let builder = PipelineBuilder::new()
936    ///     .account_deletions(MyAccountDeletionProcessor);
937    /// ```
938    pub fn account_deletions(
939        mut self,
940        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
941    ) -> Self {
942        log::trace!(
943            "account_deletions(self, processor: {:?})",
944            stringify!(processor)
945        );
946        self.account_deletion_pipes
947            .push(Box::new(AccountDeletionPipe {
948                processor: Box::new(processor),
949                filters: vec![],
950            }));
951        self
952    }
953
954    /// Adds an account deletion pipe with filters to handle account deletion events selectively.
955    ///
956    /// This method creates an account deletion pipe that only processes deletion
957    /// events that pass all the specified filters. Filters can be used to
958    /// selectively process deletions based on criteria such as datasource ID or
959    /// other custom logic.
960    ///
961    /// # Parameters
962    ///
963    /// - `processor`: A `Processor` that processes account deletion events
964    /// - `filters`: A collection of filters that determine which account deletion
965    ///   events should be processed
966    ///
967    /// # Example
968    ///
969    /// ```ignore
970    /// use carbon_core::{
971    ///     pipeline::PipelineBuilder,
972    ///     datasource::DatasourceId,
973    ///     filter::DatasourceFilter,
974    /// };
975    ///
976    /// let mainnet_id = DatasourceId::new_named("mainnet");
977    /// let filter = DatasourceFilter::new(mainnet_id);
978    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
979    ///
980    /// let builder = PipelineBuilder::new()
981    ///     .account_deletions_with_filters(MyAccountDeletionProcessor, filters);
982    /// ```
983    pub fn account_deletions_with_filters(
984        mut self,
985        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
986        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
987    ) -> Self {
988        log::trace!(
989            "account_deletions_with_filters(self, processor: {:?}, filters: {:?})",
990            stringify!(processor),
991            stringify!(filters)
992        );
993        self.account_deletion_pipes
994            .push(Box::new(AccountDeletionPipe {
995                processor: Box::new(processor),
996                filters,
997            }));
998        self
999    }
1000
1001    /// Adds a block details pipe to handle block details updates.
1002    ///
1003    /// Block details pipes process updates related to block metadata, such as
1004    /// slot, block hash, and rewards, with a `Processor` to handle the updates.
1005    ///
1006    /// # Parameters
1007    ///
1008    /// - `processor`: A `Processor` that processes block details updates.
1009    ///
1010    /// # Example
1011    ///
1012    /// ```ignore
1013    /// use carbon_core::pipeline::PipelineBuilder;
1014    ///
1015    /// let builder = PipelineBuilder::new()
1016    ///     .block_details(MyBlockDetailsProcessor);
1017    /// ```
1018    pub fn block_details(
1019        mut self,
1020        processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1021    ) -> Self {
1022        log::trace!(
1023            "block_details(self, processor: {:?})",
1024            stringify!(processor)
1025        );
1026        self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1027            processor: Box::new(processor),
1028            filters: vec![],
1029        }));
1030        self
1031    }
1032
1033    /// Adds a block details pipe with filters to handle block details updates selectively.
1034    ///
1035    /// This method creates a block details pipe that only processes updates that
1036    /// pass all the specified filters. Filters can be used to selectively process
1037    /// block details updates based on criteria such as datasource ID, block height,
1038    /// or other custom logic.
1039    ///
1040    /// # Parameters
1041    ///
1042    /// - `processor`: A `Processor` that processes block details updates
1043    /// - `filters`: A collection of filters that determine which block details
1044    ///   updates should be processed
1045    ///
1046    /// # Example
1047    ///
1048    /// ```ignore
1049    /// use carbon_core::{
1050    ///     pipeline::PipelineBuilder,
1051    ///     datasource::DatasourceId,
1052    ///     filter::DatasourceFilter,
1053    /// };
1054    ///
1055    /// let mainnet_id = DatasourceId::new_named("mainnet");
1056    /// let filter = DatasourceFilter::new(mainnet_id);
1057    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1058    ///
1059    /// let builder = PipelineBuilder::new()
1060    ///     .block_details_with_filters(MyBlockDetailsProcessor, filters);
1061    /// ```
1062    pub fn block_details_with_filters(
1063        mut self,
1064        processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1065        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1066    ) -> Self {
1067        log::trace!(
1068            "block_details_with_filters(self, processor: {:?}, filters: {:?})",
1069            stringify!(processor),
1070            stringify!(filters)
1071        );
1072        self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1073            processor: Box::new(processor),
1074            filters,
1075        }));
1076        self
1077    }
1078
1079    /// Adds an instruction pipe to process instructions within transactions.
1080    ///
1081    /// Instruction pipes decode and process individual instructions,
1082    /// enabling specialized handling of various instruction types.
1083    ///
1084    /// # Parameters
1085    ///
1086    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1087    ///   transaction data.
1088    /// - `processor`: A `Processor` that processes decoded instruction data.
1089    ///
1090    /// # Example
1091    ///
1092    /// ```ignore
1093    /// use carbon_core::pipeline::PipelineBuilder;
1094    ///
1095    /// let builder = PipelineBuilder::new()
1096    ///     .instruction(MyDecoder, MyInstructionProcessor);
1097    /// ```
1098    pub fn instruction<T: Send + Sync + 'static>(
1099        mut self,
1100        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1101        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1102    ) -> Self {
1103        log::trace!(
1104            "instruction(self, decoder: {:?}, processor: {:?})",
1105            stringify!(decoder),
1106            stringify!(processor)
1107        );
1108        self.instruction_pipes.push(Box::new(InstructionPipe {
1109            decoder: Box::new(decoder),
1110            processor: Box::new(processor),
1111            filters: vec![],
1112        }));
1113        self
1114    }
1115
1116    /// Adds an instruction pipe with filters to process instructions selectively.
1117    ///
1118    /// This method creates an instruction pipe that only processes instructions
1119    /// that pass all the specified filters. Filters can be used to selectively
1120    /// process instructions based on criteria such as datasource ID, instruction
1121    /// type, or other custom logic.
1122    ///
1123    /// # Parameters
1124    ///
1125    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1126    ///   transaction data
1127    /// - `processor`: A `Processor` that processes decoded instruction data
1128    /// - `filters`: A collection of filters that determine which instructions
1129    ///   should be processed
1130    ///
1131    /// # Example
1132    ///
1133    /// ```ignore
1134    /// use carbon_core::{
1135    ///     pipeline::PipelineBuilder,
1136    ///     datasource::DatasourceId,
1137    ///     filter::DatasourceFilter,
1138    /// };
1139    ///
1140    /// let mainnet_id = DatasourceId::new_named("mainnet");
1141    /// let filter = DatasourceFilter::new(mainnet_id);
1142    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1143    ///
1144    /// let builder = PipelineBuilder::new()
1145    ///     .instruction_with_filters(MyDecoder, MyInstructionProcessor, filters);
1146    /// ```
1147    pub fn instruction_with_filters<T: Send + Sync + 'static>(
1148        mut self,
1149        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1150        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1151        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1152    ) -> Self {
1153        log::trace!(
1154            "instruction_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
1155            stringify!(decoder),
1156            stringify!(processor),
1157            stringify!(filters)
1158        );
1159        self.instruction_pipes.push(Box::new(InstructionPipe {
1160            decoder: Box::new(decoder),
1161            processor: Box::new(processor),
1162            filters,
1163        }));
1164        self
1165    }
1166
1167    /// Adds a transaction pipe for processing full transaction data.
1168    ///
1169    /// This method requires a transaction schema for decoding and a `Processor`
1170    /// to handle the processed transaction data.
1171    ///
1172    /// # Parameters
1173    ///
1174    /// - `schema`: A `TransactionSchema` used to match and interpret
1175    ///   transaction data.
1176    /// - `processor`: A `Processor` that processes the decoded transaction
1177    ///   data.
1178    ///
1179    /// # Example
1180    ///
1181    /// ```ignore
1182    /// use carbon_core::pipeline::PipelineBuilder;
1183    ///
1184    /// let builder = PipelineBuilder::new()
1185    ///     .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
1186    /// ```
1187    pub fn transaction<T, U>(
1188        mut self,
1189        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1190            + Send
1191            + Sync
1192            + 'static,
1193        schema: Option<TransactionSchema<T>>,
1194    ) -> Self
1195    where
1196        T: InstructionDecoderCollection + 'static,
1197        U: DeserializeOwned + Send + Sync + 'static,
1198    {
1199        log::trace!(
1200            "transaction(self, schema: {:?}, processor: {:?})",
1201            stringify!(schema),
1202            stringify!(processor)
1203        );
1204        self.transaction_pipes
1205            .push(Box::new(TransactionPipe::<T, U>::new(
1206                schema,
1207                processor,
1208                vec![],
1209            )));
1210        self
1211    }
1212
1213    /// Adds a transaction pipe with filters for processing full transaction data selectively.
1214    ///
1215    /// This method creates a transaction pipe that only processes transactions
1216    /// that pass all the specified filters. Filters can be used to selectively
1217    /// process transactions based on criteria such as datasource ID, transaction
1218    /// type, or other custom logic.
1219    ///
1220    /// # Parameters
1221    ///
1222    /// - `processor`: A `Processor` that processes the decoded transaction data
1223    /// - `schema`: A `TransactionSchema` used to match and interpret
1224    ///   transaction data
1225    /// - `filters`: A collection of filters that determine which transactions
1226    ///   should be processed
1227    ///
1228    /// # Example
1229    ///
1230    /// ```ignore
1231    /// use carbon_core::{
1232    ///     pipeline::PipelineBuilder,
1233    ///     datasource::DatasourceId,
1234    ///     filter::DatasourceFilter,
1235    /// };
1236    ///
1237    /// let mainnet_id = DatasourceId::new_named("mainnet");
1238    /// let filter = DatasourceFilter::new(mainnet_id);
1239    /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1240    ///
1241    /// let builder = PipelineBuilder::new()
1242    ///     .transaction_with_filters(MyTransactionProcessor, MY_SCHEMA.clone(), filters);
1243    /// ```
1244    pub fn transaction_with_filters<T, U>(
1245        mut self,
1246        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1247            + Send
1248            + Sync
1249            + 'static,
1250        schema: Option<TransactionSchema<T>>,
1251        filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1252    ) -> Self
1253    where
1254        T: InstructionDecoderCollection + 'static,
1255        U: DeserializeOwned + Send + Sync + 'static,
1256    {
1257        log::trace!(
1258            "transaction_with_filters(self, schema: {:?}, processor: {:?}, filters: {:?})",
1259            stringify!(schema),
1260            stringify!(processor),
1261            stringify!(filters)
1262        );
1263        self.transaction_pipes
1264            .push(Box::new(TransactionPipe::<T, U>::new(
1265                schema, processor, filters,
1266            )));
1267        self
1268    }
1269
1270    /// Adds a metrics component to the pipeline for performance tracking.
1271    ///
1272    /// This component collects and reports on pipeline metrics, providing
1273    /// insights into performance and operational statistics.
1274    ///
1275    /// # Parameters
1276    ///
1277    /// - `metrics`: An instance of a `Metrics` implementation, used to gather
1278    ///   and report metrics.
1279    ///
1280    /// # Example
1281    ///
1282    /// ```ignore
1283    /// use std::sync::Arc;
1284    /// use carbon_core::pipeline::PipelineBuilder;
1285    ///
1286    /// let builder = PipelineBuilder::new()
1287    ///     .metrics(Arc::new(LogMetrics::new()));
1288    /// ```
1289    pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
1290        log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
1291        self.metrics.metrics.push(metrics);
1292        self
1293    }
1294
1295    /// Sets the interval for flushing metrics data.
1296    ///
1297    /// This value defines the frequency, in seconds, at which metrics data is
1298    /// flushed from memory. If not set, a default interval is used.
1299    ///
1300    /// # Parameters
1301    ///
1302    /// - `interval`: The flush interval for metrics, in seconds.
1303    ///
1304    /// # Example
1305    ///
1306    /// ```rust
1307    /// use carbon_core::pipeline::PipelineBuilder;
1308    ///
1309    /// let builder = PipelineBuilder::new()
1310    ///     .metrics_flush_interval(60);
1311    /// ```
1312    pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
1313        log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
1314        self.metrics_flush_interval = Some(interval);
1315        self
1316    }
1317
1318    /// Sets the cancellation token for cancelling datasource on demand.
1319    ///
1320    /// This value is used to cancel datasource on demand.
1321    /// If not set, a default `CancellationToken` is used.
1322    ///
1323    /// # Parameters
1324    ///
1325    /// - `cancellation_token`: An instance of `CancellationToken`.
1326    ///
1327    /// # Example
1328    ///
1329    /// ```rust
1330    /// use carbon_core::pipeline::PipelineBuilder;
1331    /// use tokio_util::sync::CancellationToken;
1332    ///
1333    /// let builder = PipelineBuilder::new()
1334    ///     .datasource_cancellation_token(CancellationToken::new());
1335    /// ```
1336    pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
1337        log::trace!(
1338            "datasource_cancellation_token(self, cancellation_token: {:?})",
1339            cancellation_token
1340        );
1341        self.datasource_cancellation_token = Some(cancellation_token);
1342        self
1343    }
1344
1345    /// Sets the size of the channel buffer for the pipeline.
1346    ///
1347    /// This value defines the maximum number of updates that can be queued in
1348    /// the pipeline's channel buffer. If not set, a default size of 10_000
1349    /// will be used.
1350    ///
1351    /// # Parameters
1352    ///
1353    /// - `size`: The size of the channel buffer for the pipeline.
1354    ///
1355    /// # Example
1356    ///
1357    /// ```rust
1358    /// use carbon_core::pipeline::PipelineBuilder;
1359    ///
1360    /// let builder = PipelineBuilder::new()
1361    ///     .channel_buffer_size(1000);
1362    /// ```
1363    pub fn channel_buffer_size(mut self, size: usize) -> Self {
1364        log::trace!("channel_buffer_size(self, size: {:?})", size);
1365        self.channel_buffer_size = size;
1366        self
1367    }
1368
1369    /// Builds and returns a `Pipeline` configured with the specified
1370    /// components.
1371    ///
1372    /// After configuring the `PipelineBuilder` with data sources, pipes, and
1373    /// metrics, call this method to create the final `Pipeline` instance
1374    /// ready for operation.
1375    ///
1376    /// # Returns
1377    ///
1378    /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
1379    /// or an error if any part of the configuration is invalid.
1380    ///
1381    /// # Example
1382    ///
1383    /// ```ignore
1384    /// use std::sync::Arc;
1385    ///
1386    /// carbon_core::pipeline::Pipeline::builder()
1387    /// .datasource(transaction_crawler)
1388    /// .metrics(Arc::new(LogMetrics::new()))
1389    /// .metrics(Arc::new(PrometheusMetrics::new()))
1390    /// .instruction(
1391    ///    TestProgramDecoder,
1392    ///    TestProgramProcessor
1393    /// )
1394    /// .account(
1395    ///     TestProgramDecoder,
1396    ///     TestProgramAccountProcessor
1397    /// )
1398    /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
1399    /// .account_deletions(TestProgramAccountDeletionProcessor)
1400    /// .channel_buffer_size(1000)
1401    /// .build()?;
1402    ///
1403    ///  Ok(())
1404    /// ```
1405    pub fn build(self) -> CarbonResult<Pipeline> {
1406        log::trace!("build(self)");
1407        Ok(Pipeline {
1408            datasources: self.datasources,
1409            account_pipes: self.account_pipes,
1410            account_deletion_pipes: self.account_deletion_pipes,
1411            block_details_pipes: self.block_details_pipes,
1412            instruction_pipes: self.instruction_pipes,
1413            transaction_pipes: self.transaction_pipes,
1414            shutdown_strategy: self.shutdown_strategy,
1415            metrics: Arc::new(self.metrics),
1416            metrics_flush_interval: self.metrics_flush_interval,
1417            datasource_cancellation_token: self.datasource_cancellation_token,
1418            channel_buffer_size: self.channel_buffer_size,
1419        })
1420    }
1421}