carbon_core/
pipeline.rs

1//! Deprocess(ines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//!   transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//!   and process specific types of data. Account pipes handle account updates,
25//!   instruction pipes process instructions within transactions, and
26//!   transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//!   times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//!   for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//!   deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//!   on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//!   Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//!   (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//!   wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//!   pipeline performance, especially in production environments.
52
53use crate::block_details::{BlockDetailsPipe, BlockDetailsPipes};
54use crate::datasource::BlockDetails;
55use {
56    crate::{
57        account::{
58            AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
59        },
60        account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
61        collection::InstructionDecoderCollection,
62        datasource::{AccountDeletion, Datasource, Update},
63        error::CarbonResult,
64        instruction::{
65            InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
66            InstructionsWithMetadata, NestedInstructions,
67        },
68        metrics::{Metrics, MetricsCollection},
69        processor::Processor,
70        schema::TransactionSchema,
71        transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
72        transformers,
73    },
74    core::time,
75    serde::de::DeserializeOwned,
76    std::{convert::TryInto, sync::Arc, time::Instant},
77    tokio_util::sync::CancellationToken,
78};
79
80/// Defines the shutdown behavior for the pipeline.
81///
82/// `ShutdownStrategy` determines how the pipeline will behave when it receives
83/// a shutdown signal. It supports two modes:
84///
85/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
86/// - `ProcessPending`: Terminates the data sources, then completes processing
87///   of any updates currently pending in the pipeline. This is the default
88///   behavior.
89///
90/// # Variants
91///
92/// - `Immediate`: Immediately stops all pipeline activity without processing
93///   any remaining updates.
94/// - `ProcessPending`: Gracefully terminates the data sources and allows the
95///   pipeline to finish processing updates that are still in progress or
96///   queued.
97///
98/// # Notes
99///
100/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
101///   that no updates are lost during shutdown.
102#[derive(Default, PartialEq, Debug)]
103pub enum ShutdownStrategy {
104    /// Stop the whole pipeline immediately.
105    Immediate,
106    /// Terminate the datasource(s) and finish processing all pending updates.
107    #[default]
108    ProcessPending,
109}
110
111/// The default size of the channel buffer for the pipeline.
112///
113/// This constant defines the default number of updates that can be queued in
114/// the pipeline's channel buffer. It is used as a fallback value if the
115/// `channel_buffer_size` is not explicitly set during pipeline construction.
116///
117/// The default size is 10,000 updates, which provides a reasonable balance
118pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
119
120/// Represents the primary data processing pipeline in the `carbon-core`
121/// framework.
122///
123/// The `Pipeline` struct is responsible for orchestrating the flow of data from
124/// various sources, processing it through multiple pipes (for accounts,
125/// transactions, instructions, and account deletions), and recording metrics at
126/// each stage. This flexible design allows for customized data processing,
127/// handling a variety of update types with minimal boilerplate code.
128///
129/// ## Overview
130///
131/// A `Pipeline` instance includes collections of data sources and processing
132/// pipes, enabling users to configure the pipeline to handle diverse types of
133/// blockchain-related data. Each pipe is responsible for decoding, processing,
134/// and routing specific data types, while the metrics system records relevant
135/// statistics.
136///
137/// ### Key Concepts
138///
139/// - **Datasources**: These provide the raw data, such as account updates,
140///   transaction details, and account deletions.
141/// - **Pipes**: Modular units that handle specific data types:
142///   - `AccountPipes` for account updates.
143///   - `AccountDeletionPipes` for account deletions.
144///   - `InstructionPipes` for instruction data within transactions.
145///   - `TransactionPipes` for entire transaction payloads.
146/// - **Metrics**: Collect performance data, enabling real-time insights and
147///   efficient monitoring.
148///
149/// ## Fields
150///
151/// - `datasources`: A vector of data sources (`Datasource` implementations)
152///   that provide the data for processing. Each data source must be wrapped in
153///   an `Arc` for safe, concurrent access.
154/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
155///   account updates.
156/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
157///   deletion events.
158/// - `block_details_pipes`: A vector of `BlockDetailsPipes` to handle
159///   block details.
160/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
161///   instructions within transactions. These pipes work with nested
162///   instructions and are generically defined to support varied instruction
163///   types.
164/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
165///   processing complete transaction payloads.
166/// - `metrics`: A vector of `Metrics` implementations to record and track
167///   performance data. Each metrics instance is managed within an `Arc` to
168///   ensure thread safety.
169/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
170///   frequently metrics should be flushed. If `None`, the default interval is
171///   used.
172/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
173///   not set, a default size of 10_000 will be used.
174///
175/// ## Example
176///
177/// ```ignore
178/// use std::sync::Arc;
179///
180/// carbon_core::pipeline::Pipeline::builder()
181/// .datasource(transaction_crawler)
182/// .metrics(Arc::new(LogMetrics::new()))
183/// .metrics(Arc::new(PrometheusMetrics::new()))
184/// .instruction(
185///    TestProgramDecoder,
186///    TestProgramProcessor
187/// )
188/// .account(
189///     TestProgramDecoder,
190///     TestProgramAccountProcessor
191/// )
192/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
193/// .account_deletions(TestProgramAccountDeletionProcessor)
194/// .channel_buffer_size(1000)
195/// .build()?
196/// .run()
197/// .await?;
198/// ```
199///
200/// ## Notes
201///
202/// - Ensure that each data source and pipe implements the required traits, such
203///   as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
204/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
205///   `Box` types to handle shared ownership and trait object storage.
206/// - The `metrics_flush_interval` controls how frequently the pipeline's
207///   metrics are flushed. If `None`, a default interval (usually 5 seconds) is
208///   used.
209pub struct Pipeline {
210    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
211    pub account_pipes: Vec<Box<dyn AccountPipes>>,
212    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
213    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
214    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
215    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
216    pub metrics: Arc<MetricsCollection>,
217    pub metrics_flush_interval: Option<u64>,
218    pub datasource_cancellation_token: Option<CancellationToken>,
219    pub shutdown_strategy: ShutdownStrategy,
220    pub channel_buffer_size: usize,
221}
222
223impl Pipeline {
224    /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
225    ///
226    /// The `builder` method returns a `PipelineBuilder` that allows you to
227    /// configure and customize the pipeline components before building the
228    /// final `Pipeline` object. This approach provides a flexible and
229    /// type-safe way to assemble a pipeline by specifying data sources,
230    /// processing pipes, and metrics.
231    ///
232    /// # Example
233    ///
234    /// ```ignore
235    /// use std::sync::Arc;
236    ///
237    /// carbon_core::pipeline::Pipeline::builder()
238    /// .datasource(transaction_crawler)
239    /// .metrics(Arc::new(LogMetrics::new()))
240    /// .metrics(Arc::new(PrometheusMetrics::new()))
241    /// .instruction(
242    ///    TestProgramDecoder,
243    ///    TestProgramProcessor
244    /// );
245    /// // ...
246    /// ```
247    ///
248    /// # Returns
249    ///
250    /// Returns a `PipelineBuilder` instance with empty collections for data
251    /// sources, pipes, and metrics. You can then configure each component
252    /// using the builder pattern.
253    pub fn builder() -> PipelineBuilder {
254        log::trace!("Pipeline::builder()");
255        PipelineBuilder {
256            datasources: Vec::new(),
257            account_pipes: Vec::new(),
258            account_deletion_pipes: Vec::new(),
259            block_details_pipes: Vec::new(),
260            instruction_pipes: Vec::new(),
261            transaction_pipes: Vec::new(),
262            metrics: MetricsCollection::default(),
263            metrics_flush_interval: None,
264            datasource_cancellation_token: None,
265            shutdown_strategy: ShutdownStrategy::default(),
266            channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
267        }
268    }
269
270    /// Runs the `Pipeline`, processing updates from data sources and handling
271    /// metrics.
272    ///
273    /// The `run` method initializes the pipeline’s metrics system and starts
274    /// listening for updates from the configured data sources. It checks
275    /// the types of updates provided by each data source to ensure that the
276    /// required data types are available for processing. The method then
277    /// enters a loop where it processes each update received from the data
278    /// sources in turn, logging and updating metrics based on the success
279    /// or failure of each operation.
280    ///
281    /// # How it Works
282    ///
283    /// - Initializes metrics and sets up an interval for periodic metric
284    ///   flushing.
285    /// - Spawns tasks for each data source to continuously consume updates.
286    /// - Processes updates according to their type (e.g., Account, Transaction,
287    ///   or AccountDeletion).
288    /// - Records performance metrics such as update processing times, and
289    ///   tracks success and failure counts.
290    ///
291    /// # Errors
292    ///
293    /// The method returns an `Err` variant if:
294    /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
295    ///   `Transaction`) are not provided by any data source, causing a mismatch
296    ///   in expected data processing capabilities.
297    /// - A data source encounters an error while consuming updates.
298    /// - An error occurs during metrics flushing or processing of updates.
299    ///
300    /// # Example
301    ///
302    /// ```ignore
303    /// use carbon_core::pipeline::Pipeline;
304    ///
305    /// let mut pipeline = Pipeline::builder()
306    ///     .datasource(MyDatasource::new())
307    ///     .metrics(MyMetrics::new())
308    ///     .build()
309    ///     .expect("Failed to build pipeline");
310    ///
311    /// // Running the pipeline asynchronously
312    /// tokio::spawn(async move {
313    ///     if let Err(e) = pipeline.run().await {
314    ///         eprintln!("Pipeline run error: {:?}", e);
315    ///     }
316    /// });
317    /// ```
318    ///
319    /// # Notes
320    ///
321    /// - This method is asynchronous and should be awaited within a Tokio
322    ///   runtime environment.
323    /// - The pipeline monitors metrics and flushes them based on the configured
324    ///   `metrics_flush_interval`.
325    /// - The `run` method operates in an infinite loop, handling updates until
326    ///   a termination condition occurs.
327    pub async fn run(&mut self) -> CarbonResult<()> {
328        log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
329            self.datasources.len(),
330            self.metrics.metrics.len(),
331            self.account_pipes.len(),
332            self.account_deletion_pipes.len(),
333            self.instruction_pipes.len(),
334            self.transaction_pipes.len(),
335        );
336
337        log::trace!("run(self)");
338
339        self.metrics.initialize_metrics().await?;
340        let (update_sender, mut update_receiver) =
341            tokio::sync::mpsc::channel::<Update>(self.channel_buffer_size);
342
343        let datasource_cancellation_token = self
344            .datasource_cancellation_token
345            .clone()
346            .unwrap_or_default();
347
348        for datasource in &self.datasources {
349            let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
350            let sender_clone = update_sender.clone();
351            let datasource_clone = Arc::clone(datasource);
352            let metrics_collection = self.metrics.clone();
353
354            tokio::spawn(async move {
355                if let Err(e) = datasource_clone
356                    .consume(
357                        &sender_clone,
358                        datasource_cancellation_token_clone,
359                        metrics_collection,
360                    )
361                    .await
362                {
363                    log::error!("error consuming datasource: {:?}", e);
364                }
365            });
366        }
367
368        let mut interval = tokio::time::interval(time::Duration::from_secs(
369            self.metrics_flush_interval.unwrap_or(5),
370        ));
371
372        loop {
373            tokio::select! {
374                _ = tokio::signal::ctrl_c() => {
375                    log::trace!("received SIGINT, shutting down.");
376                    datasource_cancellation_token.cancel();
377
378                    if self.shutdown_strategy == ShutdownStrategy::Immediate {
379                        log::info!("shutting down the pipeline immediately.");
380                        self.metrics.flush_metrics().await?;
381                        self.metrics.shutdown_metrics().await?;
382                        break;
383                    } else {
384                        log::info!("shutting down the pipeline after processing pending updates.");
385                    }
386                }
387                _ = interval.tick() => {
388                    self.metrics.flush_metrics().await?;
389                }
390                update = update_receiver.recv() => {
391                    match update {
392                        Some(update) => {
393                            self
394                                .metrics.increment_counter("updates_received", 1)
395                                .await?;
396
397                            let start = Instant::now();
398                            let process_result = self.process(update.clone()).await;
399                            let time_taken_nanoseconds = start.elapsed().as_nanos();
400                            let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
401
402                            self
403                                .metrics
404                                .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
405                                .await?;
406
407                            self
408                                .metrics
409                                .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
410                                .await?;
411
412                            match process_result {
413                                Ok(_) => {
414                                    self
415                                        .metrics.increment_counter("updates_successful", 1)
416                                        .await?;
417
418                                    log::trace!("processed update")
419                                }
420                                Err(error) => {
421                                    log::error!("error processing update ({:?}): {:?}", update, error);
422                                    self.metrics.increment_counter("updates_failed", 1).await?;
423                                }
424                            };
425
426                            self
427                                .metrics.increment_counter("updates_processed", 1)
428                                .await?;
429
430                            self
431                                .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
432                                .await?;
433                        }
434                        None => {
435                            log::info!("update_receiver closed, shutting down.");
436                            self.metrics.flush_metrics().await?;
437                            self.metrics.shutdown_metrics().await?;
438                            break;
439                        }
440                    }
441                }
442            }
443        }
444
445        log::info!("pipeline shutdown complete.");
446
447        Ok(())
448    }
449
450    /// Processes a single update and routes it through the appropriate pipeline
451    /// stages.
452    ///
453    /// The `process` method takes an `Update` and determines its type, then
454    /// routes it through the corresponding pipes for handling account
455    /// updates, transactions, or account deletions. It also records metrics
456    /// for processed updates, providing insights into the processing
457    /// workload and performance.
458    ///
459    /// ## Functionality
460    ///
461    /// - **Account Updates**: Passes account updates through the
462    ///   `account_pipes`. Each pipe processes the account metadata and the
463    ///   updated account state.
464    /// - **Transaction Updates**: Extracts transaction metadata and
465    ///   instructions, nests them if needed, and routes them through
466    ///   `instruction_pipes` and `transaction_pipes`.
467    /// - **Account Deletions**: Sends account deletion events through the
468    ///   `account_deletion_pipes`.
469    ///
470    /// The method also updates metrics counters for each type of update,
471    /// tracking how many updates have been processed in each category.
472    ///
473    /// # Parameters
474    ///
475    /// - `update`: An `Update` variant representing the type of data received.
476    ///   This can be an `Account`, `Transaction`, or `AccountDeletion`, each
477    ///   triggering different processing logic within the pipeline.
478    ///
479    /// # Returns
480    ///
481    /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
482    /// processing or an error if processing fails at any stage.
483    ///
484    /// # Notes
485    ///
486    /// - This method is asynchronous and should be awaited within a Tokio
487    ///   runtime.
488    /// - Each type of update (account, transaction, account deletion) requires
489    ///   its own set of pipes, so ensure that appropriate pipes are configured
490    ///   based on the data types expected from the data sources.
491    /// - Metrics are recorded after each successful processing stage to track
492    ///   processing volumes and identify potential bottlenecks in real-time.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if any of the pipes fail during processing, or if an
497    /// issue arises while incrementing counters or updating metrics. Handle
498    /// errors gracefully to ensure continuous pipeline operation.
499    async fn process(&mut self, update: Update) -> CarbonResult<()> {
500        log::trace!("process(self, update: {:?})", update);
501        match update {
502            Update::Account(account_update) => {
503                let account_metadata = AccountMetadata {
504                    slot: account_update.slot,
505                    pubkey: account_update.pubkey,
506                };
507
508                for pipe in self.account_pipes.iter_mut() {
509                    pipe.run(
510                        (account_metadata.clone(), account_update.account.clone()),
511                        self.metrics.clone(),
512                    )
513                    .await?;
514                }
515
516                self.metrics
517                    .increment_counter("account_updates_processed", 1)
518                    .await?;
519            }
520            Update::Transaction(transaction_update) => {
521                let transaction_metadata = Arc::new((*transaction_update).clone().try_into()?);
522
523                let instructions_with_metadata: InstructionsWithMetadata =
524                    transformers::extract_instructions_with_metadata(
525                        &transaction_metadata,
526                        &transaction_update,
527                    )?;
528
529                let nested_instructions: NestedInstructions = instructions_with_metadata.into();
530
531                for pipe in self.instruction_pipes.iter_mut() {
532                    for nested_instruction in nested_instructions.iter() {
533                        pipe.run(nested_instruction, self.metrics.clone()).await?;
534                    }
535                }
536
537                for pipe in self.transaction_pipes.iter_mut() {
538                    pipe.run(
539                        transaction_metadata.clone(),
540                        &nested_instructions,
541                        self.metrics.clone(),
542                    )
543                    .await?;
544                }
545
546                self.metrics
547                    .increment_counter("transaction_updates_processed", 1)
548                    .await?;
549            }
550            Update::AccountDeletion(account_deletion) => {
551                for pipe in self.account_deletion_pipes.iter_mut() {
552                    pipe.run(account_deletion.clone(), self.metrics.clone())
553                        .await?;
554                }
555
556                self.metrics
557                    .increment_counter("account_deletions_processed", 1)
558                    .await?;
559            }
560            Update::BlockDetails(block_details) => {
561                for pipe in self.block_details_pipes.iter_mut() {
562                    pipe.run(block_details.clone(), self.metrics.clone())
563                        .await?;
564                }
565
566                self.metrics
567                    .increment_counter("block_details_processed", 1)
568                    .await?;
569            }
570        };
571
572        Ok(())
573    }
574}
575
576/// A builder for constructing a `Pipeline` instance with customized data
577/// sources, processing pipes, and metrics.
578///
579/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
580/// by allowing configuration of its components, such as data sources, account
581/// and transaction pipes, deletion handling, and metrics. Using the builder
582/// pattern, you can add the desired elements incrementally and then finalize
583/// with a call to `build`.
584///
585/// ## Overview
586///
587/// The `PipelineBuilder` supports the following components:
588/// - **Datasources**: Sources of data updates, such as account information and
589///   transactions.
590/// - **Account Pipes**: For processing account updates from data sources.
591/// - **Account Deletion Pipes**: For handling account deletion updates.
592/// - **Instruction Pipes**: For handling instructions associated with
593///   transactions.
594/// - **Transaction Pipes**: For handling full transaction data.
595/// - **Metrics**: Collects and reports performance data, such as update
596///   processing times.
597/// - **Metrics Flush Interval**: Optional interval defining how often to flush
598///   metrics data.
599///
600/// Each component can be added through method chaining, enhancing code
601/// readability and maintainability.
602///
603/// # Example
604///
605/// ```ignore
606/// use std::sync::Arc;
607///
608/// carbon_core::pipeline::Pipeline::builder()
609/// .datasource(transaction_crawler)
610/// .metrics(Arc::new(LogMetrics::new()))
611/// .metrics(Arc::new(PrometheusMetrics::new()))
612/// .instruction(
613///    TestProgramDecoder,
614///    TestProgramProcessor
615/// );
616/// // ...
617/// ```
618///
619/// # Fields
620///
621/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
622///   shared ownership across threads. Each `Datasource` provides updates to the
623///   pipeline.
624/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
625/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
626///   processing account deletions.
627/// - `instruction_pipes`: A collection of `InstructionPipes` to process
628///   instructions in transactions.
629/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
630///   transaction data.
631/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
632///   performance.
633/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
634///   metrics data. If not set, a default flush interval will be used.
635/// - `datasource_cancellation_token`: An optional `CancellationToken` for
636///   canceling datasource. If not set, a default `CancellationToken` will be
637///   used.
638/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
639///   not set, a default size of 10_000 will be used.
640///
641/// # Returns
642///
643/// After configuring the builder, call `build` to create a `Pipeline` instance.
644/// The builder will return a `CarbonResult<Pipeline>`, which will either
645/// contain the configured pipeline or an error if configuration failed.
646///
647/// # Notes
648///
649/// - The builder pattern allows for method chaining, making it easy to
650///   incrementally add components to the `Pipeline`.
651/// - Ensure that each component matches the data and update types expected by
652///   your application.
653#[derive(Default)]
654pub struct PipelineBuilder {
655    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
656    pub account_pipes: Vec<Box<dyn AccountPipes>>,
657    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
658    pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
659    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
660    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
661    pub metrics: MetricsCollection,
662    pub metrics_flush_interval: Option<u64>,
663    pub datasource_cancellation_token: Option<CancellationToken>,
664    pub shutdown_strategy: ShutdownStrategy,
665    pub channel_buffer_size: usize,
666}
667
668impl PipelineBuilder {
669    /// Creates a new `PipelineBuilder` with empty collections for datasources,
670    /// pipes, and metrics.
671    ///
672    /// This method initializes a `PipelineBuilder` instance, allowing you to
673    /// configure each component of a `Pipeline` before building it. The
674    /// builder pattern offers flexibility in adding data sources, account
675    /// and transaction handling pipes, deletion processing, and metrics
676    /// collection features.
677    ///
678    /// # Example
679    ///
680    /// ```rust
681    /// use carbon_core::pipeline::PipelineBuilder;
682    ///
683    /// let builder = PipelineBuilder::new();
684    /// ```
685    pub fn new() -> Self {
686        log::trace!("PipelineBuilder::new()");
687        Self::default()
688    }
689
690    /// Adds a datasource to the pipeline.
691    ///
692    /// The datasource is responsible for providing updates, such as account and
693    /// transaction data, to the pipeline. Multiple datasources can be added
694    /// to handle various types of updates.
695    ///
696    /// # Parameters
697    ///
698    /// - `datasource`: The data source to add, implementing the `Datasource`
699    ///   trait.
700    ///
701    /// # Example
702    ///
703    /// ```ignore
704    /// use carbon_core::pipeline::PipelineBuilder;
705    ///
706    /// let builder = PipelineBuilder::new()
707    ///     .datasource(MyDatasource::new());
708    /// ```
709    pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
710        log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
711        self.datasources.push(Arc::new(datasource));
712        self
713    }
714
715    /// Sets the shutdown strategy for the pipeline.
716    ///
717    /// This method configures how the pipeline should handle shutdowns. The
718    /// shutdown strategy defines whether the pipeline should terminate
719    /// immediately or continue processing pending updates after terminating
720    /// the data sources.
721    ///
722    /// # Parameters
723    ///
724    /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
725    ///   how the pipeline should handle shutdowns.
726    ///
727    /// # Returns
728    ///
729    /// Returns `Self`, allowing for method chaining.
730    ///
731    /// # Notes
732    ///
733    /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
734    ///   instantly, including all active processing tasks.
735    /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
736    ///   sources first and allow the pipeline to finish processing any updates
737    ///   that are still pending.
738    pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
739        log::trace!(
740            "shutdown_strategy(self, shutdown_strategy: {:?})",
741            shutdown_strategy
742        );
743        self.shutdown_strategy = shutdown_strategy;
744        self
745    }
746
747    /// Adds an account pipe to process account updates.
748    ///
749    /// Account pipes decode and process updates to accounts within the
750    /// pipeline. This method requires both an `AccountDecoder` and a
751    /// `Processor` to handle decoded account data.
752    ///
753    /// # Parameters
754    ///
755    /// - `decoder`: An `AccountDecoder` that decodes the account data.
756    /// - `processor`: A `Processor` that processes the decoded account data.
757    ///
758    /// # Example
759    ///
760    /// ```ignore
761    /// use carbon_core::pipeline::PipelineBuilder;
762    ///
763    /// let builder = PipelineBuilder::new()
764    ///     .account(MyAccountDecoder, MyAccountProcessor);
765    /// ```
766    pub fn account<T: Send + Sync + 'static>(
767        mut self,
768        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
769        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
770    ) -> Self {
771        log::trace!(
772            "account(self, decoder: {:?}, processor: {:?})",
773            stringify!(decoder),
774            stringify!(processor)
775        );
776        self.account_pipes.push(Box::new(AccountPipe {
777            decoder: Box::new(decoder),
778            processor: Box::new(processor),
779        }));
780        self
781    }
782
783    /// Adds an account deletion pipe to handle account deletion events.
784    ///
785    /// Account deletion pipes process deletions of accounts, with a `Processor`
786    /// to handle the deletion events as they occur.
787    ///
788    /// # Parameters
789    ///
790    /// - `processor`: A `Processor` that processes account deletion events.
791    ///
792    /// # Example
793    ///
794    /// ```ignore
795    /// use carbon_core::pipeline::PipelineBuilder;
796    ///
797    /// let builder = PipelineBuilder::new()
798    ///     .account_deletions(MyAccountDeletionProcessor);
799    /// ```
800    pub fn account_deletions(
801        mut self,
802        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
803    ) -> Self {
804        log::trace!(
805            "account_deletions(self, processor: {:?})",
806            stringify!(processor)
807        );
808        self.account_deletion_pipes
809            .push(Box::new(AccountDeletionPipe {
810                processor: Box::new(processor),
811            }));
812        self
813    }
814
815    /// Adds a block details pipe to handle block details updates.
816    ///
817    /// Block details pipes process updates related to block metadata, such as
818    /// slot, block hash, and rewards, with a `Processor` to handle the updates.
819    ///
820    /// # Parameters
821    ///
822    /// - `processor`: A `Processor` that processes block details updates.
823    ///
824    /// # Example
825    ///
826    /// ```ignore
827    /// use carbon_core::pipeline::PipelineBuilder;
828    ///
829    /// let builder = PipelineBuilder::new()
830    ///     .block_details(MyBlockDetailsProcessor);
831    /// ```
832    pub fn block_details(
833        mut self,
834        processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
835    ) -> Self {
836        log::trace!(
837            "block_details(self, processor: {:?})",
838            stringify!(processor)
839        );
840        self.block_details_pipes.push(Box::new(BlockDetailsPipe {
841            processor: Box::new(processor),
842        }));
843        self
844    }
845
846    /// Adds an instruction pipe to process instructions within transactions.
847    ///
848    /// Instruction pipes decode and process individual instructions,
849    /// enabling specialized handling of various instruction types.
850    ///
851    /// # Parameters
852    ///
853    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
854    ///   transaction data.
855    /// - `processor`: A `Processor` that processes decoded instruction data.
856    ///
857    /// # Example
858    ///
859    /// ```ignore
860    /// use carbon_core::pipeline::PipelineBuilder;
861    ///
862    /// let builder = PipelineBuilder::new()
863    ///     .instruction(MyDecoder, MyInstructionProcessor);
864    /// ```
865    pub fn instruction<T: Send + Sync + 'static>(
866        mut self,
867        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
868        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
869    ) -> Self {
870        log::trace!(
871            "instruction(self, decoder: {:?}, processor: {:?})",
872            stringify!(decoder),
873            stringify!(processor)
874        );
875        self.instruction_pipes.push(Box::new(InstructionPipe {
876            decoder: Box::new(decoder),
877            processor: Box::new(processor),
878        }));
879        self
880    }
881
882    /// Adds a transaction pipe for processing full transaction data.
883    ///
884    /// This method requires a transaction schema for decoding and a `Processor`
885    /// to handle the processed transaction data.
886    ///
887    /// # Parameters
888    ///
889    /// - `schema`: A `TransactionSchema` used to match and interpret
890    ///   transaction data.
891    /// - `processor`: A `Processor` that processes the decoded transaction
892    ///   data.
893    ///
894    /// # Example
895    ///
896    /// ```ignore
897    /// use carbon_core::pipeline::PipelineBuilder;
898    ///
899    /// let builder = PipelineBuilder::new()
900    ///     .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
901    /// ```
902    pub fn transaction<T, U>(
903        mut self,
904        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
905            + Send
906            + Sync
907            + 'static,
908        schema: Option<TransactionSchema<T>>,
909    ) -> Self
910    where
911        T: InstructionDecoderCollection + 'static,
912        U: DeserializeOwned + Send + Sync + 'static,
913    {
914        log::trace!(
915            "transaction(self, schema: {:?}, processor: {:?})",
916            stringify!(schema),
917            stringify!(processor)
918        );
919        self.transaction_pipes
920            .push(Box::new(TransactionPipe::<T, U>::new(schema, processor)));
921        self
922    }
923
924    /// Adds a metrics component to the pipeline for performance tracking.
925    ///
926    /// This component collects and reports on pipeline metrics, providing
927    /// insights into performance and operational statistics.
928    ///
929    /// # Parameters
930    ///
931    /// - `metrics`: An instance of a `Metrics` implementation, used to gather
932    ///   and report metrics.
933    ///
934    /// # Example
935    ///
936    /// ```ignore
937    /// use std::sync::Arc;
938    /// use carbon_core::pipeline::PipelineBuilder;
939    ///
940    /// let builder = PipelineBuilder::new()
941    ///     .metrics(Arc::new(LogMetrics::new()));
942    /// ```
943    pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
944        log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
945        self.metrics.metrics.push(metrics);
946        self
947    }
948
949    /// Sets the interval for flushing metrics data.
950    ///
951    /// This value defines the frequency, in seconds, at which metrics data is
952    /// flushed from memory. If not set, a default interval is used.
953    ///
954    /// # Parameters
955    ///
956    /// - `interval`: The flush interval for metrics, in seconds.
957    ///
958    /// # Example
959    ///
960    /// ```rust
961    /// use carbon_core::pipeline::PipelineBuilder;
962    ///
963    /// let builder = PipelineBuilder::new()
964    ///     .metrics_flush_interval(60);
965    /// ```
966    pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
967        log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
968        self.metrics_flush_interval = Some(interval);
969        self
970    }
971
972    /// Sets the cancellation token for cancelling datasource on demand.
973    ///
974    /// This value is used to cancel datasource on demand.
975    /// If not set, a default `CancellationToken` is used.
976    ///
977    /// # Parameters
978    ///
979    /// - `cancellation_token`: An instance of `CancellationToken`.
980    ///
981    /// # Example
982    ///
983    /// ```rust
984    /// use carbon_core::pipeline::PipelineBuilder;
985    /// use tokio_util::sync::CancellationToken;
986    ///
987    /// let builder = PipelineBuilder::new()
988    ///     .datasource_cancellation_token(CancellationToken::new());
989    /// ```
990    pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
991        log::trace!(
992            "datasource_cancellation_token(self, cancellation_token: {:?})",
993            cancellation_token
994        );
995        self.datasource_cancellation_token = Some(cancellation_token);
996        self
997    }
998
999    /// Sets the size of the channel buffer for the pipeline.
1000    ///
1001    /// This value defines the maximum number of updates that can be queued in
1002    /// the pipeline's channel buffer. If not set, a default size of 10_000
1003    /// will be used.
1004    ///
1005    /// # Parameters
1006    ///
1007    /// - `size`: The size of the channel buffer for the pipeline.
1008    ///
1009    /// # Example
1010    ///
1011    /// ```rust
1012    /// use carbon_core::pipeline::PipelineBuilder;
1013    ///
1014    /// let builder = PipelineBuilder::new()
1015    ///     .channel_buffer_size(1000);
1016    /// ```
1017    pub fn channel_buffer_size(mut self, size: usize) -> Self {
1018        log::trace!("channel_buffer_size(self, size: {:?})", size);
1019        self.channel_buffer_size = size;
1020        self
1021    }
1022
1023    /// Builds and returns a `Pipeline` configured with the specified
1024    /// components.
1025    ///
1026    /// After configuring the `PipelineBuilder` with data sources, pipes, and
1027    /// metrics, call this method to create the final `Pipeline` instance
1028    /// ready for operation.
1029    ///
1030    /// # Returns
1031    ///
1032    /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
1033    /// or an error if any part of the configuration is invalid.
1034    ///
1035    /// # Example
1036    ///
1037    /// ```ignore
1038    /// use std::sync::Arc;
1039    ///
1040    /// carbon_core::pipeline::Pipeline::builder()
1041    /// .datasource(transaction_crawler)
1042    /// .metrics(Arc::new(LogMetrics::new()))
1043    /// .metrics(Arc::new(PrometheusMetrics::new()))
1044    /// .instruction(
1045    ///    TestProgramDecoder,
1046    ///    TestProgramProcessor
1047    /// )
1048    /// .account(
1049    ///     TestProgramDecoder,
1050    ///     TestProgramAccountProcessor
1051    /// )
1052    /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
1053    /// .account_deletions(TestProgramAccountDeletionProcessor)
1054    /// .channel_buffer_size(1000)
1055    /// .build()?;
1056    ///
1057    ///  Ok(())
1058    /// ```
1059    pub fn build(self) -> CarbonResult<Pipeline> {
1060        log::trace!("build(self)");
1061        Ok(Pipeline {
1062            datasources: self.datasources,
1063            account_pipes: self.account_pipes,
1064            account_deletion_pipes: self.account_deletion_pipes,
1065            block_details_pipes: self.block_details_pipes,
1066            instruction_pipes: self.instruction_pipes,
1067            transaction_pipes: self.transaction_pipes,
1068            shutdown_strategy: self.shutdown_strategy,
1069            metrics: Arc::new(self.metrics),
1070            metrics_flush_interval: self.metrics_flush_interval,
1071            datasource_cancellation_token: self.datasource_cancellation_token,
1072            channel_buffer_size: self.channel_buffer_size,
1073        })
1074    }
1075}