carbon_core/
pipeline.rs

1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//!   transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//!   and process specific types of data. Account pipes handle account updates,
25//!   instruction pipes process instructions within transactions, and
26//!   transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//!   times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//!   for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//!   deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//!   on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//!   Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//!   (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//!   wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//!   pipeline performance, especially in production environments.
52
53use {
54    crate::{
55        account::{
56            AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
57        },
58        account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
59        collection::InstructionDecoderCollection,
60        datasource::{AccountDeletion, Datasource, Update},
61        error::CarbonResult,
62        instruction::{
63            InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
64            InstructionsWithMetadata, NestedInstructions,
65        },
66        metrics::{Metrics, MetricsCollection},
67        processor::Processor,
68        schema::TransactionSchema,
69        transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
70        transformers,
71    },
72    core::time,
73    serde::de::DeserializeOwned,
74    std::{convert::TryInto, sync::Arc, time::Instant},
75    tokio_util::sync::CancellationToken,
76};
77
78/// Defines the shutdown behavior for the pipeline.
79///
80/// `ShutdownStrategy` determines how the pipeline will behave when it receives
81/// a shutdown signal. It supports two modes:
82///
83/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
84/// - `ProcessPending`: Terminates the data sources, then completes processing
85///   of any updates currently pending in the pipeline. This is the default
86///   behavior.
87///
88/// # Variants
89///
90/// - `Immediate`: Immediately stops all pipeline activity without processing
91///   any remaining updates.
92/// - `ProcessPending`: Gracefully terminates the data sources and allows the
93///   pipeline to finish processing updates that are still in progress or
94///   queued.
95///
96/// # Notes
97///
98/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
99///   that no updates are lost during shutdown.
100#[derive(Default, PartialEq, Debug)]
101pub enum ShutdownStrategy {
102    /// Stop the whole pipeline immediately.
103    Immediate,
104    /// Terminate the datasource(s) and finish processing all pending updates.
105    #[default]
106    ProcessPending,
107}
108
109/// Represents the primary data processing pipeline in the `carbon-core`
110/// framework.
111///
112/// The `Pipeline` struct is responsible for orchestrating the flow of data from
113/// various sources, processing it through multiple pipes (for accounts,
114/// transactions, instructions, and account deletions), and recording metrics at
115/// each stage. This flexible design allows for customized data processing,
116/// handling a variety of update types with minimal boilerplate code.
117///
118/// ## Overview
119///
120/// A `Pipeline` instance includes collections of data sources and processing
121/// pipes, enabling users to configure the pipeline to handle diverse types of
122/// blockchain-related data. Each pipe is responsible for decoding, processing,
123/// and routing specific data types, while the metrics system records relevant
124/// statistics.
125///
126/// ### Key Concepts
127///
128/// - **Datasources**: These provide the raw data, such as account updates,
129///   transaction details, and account deletions.
130/// - **Pipes**: Modular units that handle specific data types:
131///   - `AccountPipes` for account updates.
132///   - `AccountDeletionPipes` for account deletions.
133///   - `InstructionPipes` for instruction data within transactions.
134///   - `TransactionPipes` for entire transaction payloads.
135/// - **Metrics**: Collect performance data, enabling real-time insights and
136///   efficient monitoring.
137///
138/// ## Fields
139///
140/// - `datasources`: A vector of data sources (`Datasource` implementations)
141///   that provide the data for processing. Each data source must be wrapped in
142///   an `Arc` for safe, concurrent access.
143/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
144///   account updates.
145/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
146///   deletion events.
147/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
148///   instructions within transactions. These pipes work with nested
149///   instructions and are generically defined to support varied instruction
150///   types.
151/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
152///   processing complete transaction payloads.
153/// - `metrics`: A vector of `Metrics` implementations to record and track
154///   performance data. Each metrics instance is managed within an `Arc` to
155///   ensure thread safety.
156/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
157///   frequently metrics should be flushed. If `None`, the default interval is
158///   used.
159///
160/// ## Example
161///
162/// ```rust
163///
164/// carbon_core::pipeline::Pipeline::builder()
165/// .datasource(transaction_crawler)
166/// .metrics(Arc::new(LogMetrics::new()))
167/// .metrics(Arc::new(PrometheusMetrics::new()))
168/// .instruction(
169///    TestProgramDecoder,
170///    TestProgramProcessor
171/// )
172/// .account(
173///     TestProgramDecoder,
174///     TestProgramAccountProcessor
175/// )
176/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
177/// .account_deletions(TestProgramAccountDeletionProcessor)
178/// .build()?
179/// .run()
180/// .await?;
181/// ```
182///
183/// ## Notes
184///
185/// - Ensure that each data source and pipe implements the required traits, such
186///   as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
187/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
188///   `Box` types to handle shared ownership and trait object storage.
189/// - The `metrics_flush_interval` controls how frequently the pipeline's
190///   metrics are flushed. If `None`, a default interval (usually 5 seconds) is
191///   used.
192pub struct Pipeline {
193    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
194    pub account_pipes: Vec<Box<dyn AccountPipes>>,
195    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
196    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
197    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
198    pub metrics: Arc<MetricsCollection>,
199    pub metrics_flush_interval: Option<u64>,
200    pub shutdown_strategy: ShutdownStrategy,
201}
202
203impl Pipeline {
204    /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
205    ///
206    /// The `builder` method returns a `PipelineBuilder` that allows you to
207    /// configure and customize the pipeline components before building the
208    /// final `Pipeline` object. This approach provides a flexible and
209    /// type-safe way to assemble a pipeline by specifying data sources,
210    /// processing pipes, and metrics.
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// carbon_core::pipeline::Pipeline::builder()
216    /// .datasource(transaction_crawler)
217    /// .metrics(Arc::new(LogMetrics::new()))
218    /// .metrics(Arc::new(PrometheusMetrics::new()))
219    /// .instruction(
220    ///    TestProgramDecoder,
221    ///    TestProgramProcessor
222    /// )
223    /// // ...
224    /// ```
225    ///
226    /// # Returns
227    ///
228    /// Returns a `PipelineBuilder` instance with empty collections for data
229    /// sources, pipes, and metrics. You can then configure each component
230    /// using the builder pattern.
231    pub fn builder() -> PipelineBuilder {
232        log::trace!("Pipeline::builder()");
233        PipelineBuilder {
234            datasources: Vec::new(),
235            account_pipes: Vec::new(),
236            account_deletion_pipes: Vec::new(),
237            instruction_pipes: Vec::new(),
238            transaction_pipes: Vec::new(),
239            metrics: MetricsCollection::default(),
240            metrics_flush_interval: None,
241            shutdown_strategy: ShutdownStrategy::default(),
242        }
243    }
244
245    /// Runs the `Pipeline`, processing updates from data sources and handling
246    /// metrics.
247    ///
248    /// The `run` method initializes the pipeline’s metrics system and starts
249    /// listening for updates from the configured data sources. It checks
250    /// the types of updates provided by each data source to ensure that the
251    /// required data types are available for processing. The method then
252    /// enters a loop where it processes each update received from the data
253    /// sources in turn, logging and updating metrics based on the success
254    /// or failure of each operation.
255    ///
256    /// # How it Works
257    ///
258    /// - Initializes metrics and sets up an interval for periodic metric
259    ///   flushing.
260    /// - Spawns tasks for each data source to continuously consume updates.
261    /// - Processes updates according to their type (e.g., Account, Transaction,
262    ///   or AccountDeletion).
263    /// - Records performance metrics such as update processing times, and
264    ///   tracks success and failure counts.
265    ///
266    /// # Errors
267    ///
268    /// The method returns an `Err` variant if:
269    /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
270    ///   `Transaction`) are not provided by any data source, causing a mismatch
271    ///   in expected data processing capabilities.
272    /// - A data source encounters an error while consuming updates.
273    /// - An error occurs during metrics flushing or processing of updates.
274    ///
275    /// # Example
276    ///
277    /// ```rust
278    /// use carbon_core::Pipeline;
279    ///
280    /// let mut pipeline = Pipeline::builder()
281    ///     .datasource(MyDatasource::new())
282    ///     .metrics(MyMetrics::new())
283    ///     .build()
284    ///     .expect("Failed to build pipeline");
285    ///
286    /// // Running the pipeline asynchronously
287    /// tokio::spawn(async move {
288    ///     if let Err(e) = pipeline.run().await {
289    ///         eprintln!("Pipeline run error: {:?}", e);
290    ///     }
291    /// });
292    /// ```
293    ///
294    /// # Notes
295    ///
296    /// - This method is asynchronous and should be awaited within a Tokio
297    ///   runtime environment.
298    /// - The pipeline monitors metrics and flushes them based on the configured
299    ///   `metrics_flush_interval`.
300    /// - The `run` method operates in an infinite loop, handling updates until
301    ///   a termination condition occurs.
302    pub async fn run(&mut self) -> CarbonResult<()> {
303        log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
304            self.datasources.len(),
305            self.metrics.metrics.len(),
306            self.account_pipes.len(),
307            self.account_deletion_pipes.len(),
308            self.instruction_pipes.len(),
309            self.transaction_pipes.len(),
310        );
311
312        log::trace!("run(self)");
313
314        self.metrics.initialize_metrics().await?;
315        let (update_sender, mut update_receiver) = tokio::sync::mpsc::unbounded_channel::<Update>();
316
317        let datasource_cancellation_token = CancellationToken::new();
318
319        for datasource in &self.datasources {
320            let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
321            let sender_clone = update_sender.clone();
322            let datasource_clone = Arc::clone(datasource);
323            let metrics_collection = self.metrics.clone();
324
325            tokio::spawn(async move {
326                if let Err(e) = datasource_clone
327                    .consume(
328                        &sender_clone,
329                        datasource_cancellation_token_clone,
330                        metrics_collection,
331                    )
332                    .await
333                {
334                    log::error!("error consuming datasource: {:?}", e);
335                }
336            });
337        }
338
339        let mut interval = tokio::time::interval(time::Duration::from_secs(
340            self.metrics_flush_interval.unwrap_or(5),
341        ));
342
343        loop {
344            tokio::select! {
345                _ = tokio::signal::ctrl_c() => {
346                    log::trace!("received SIGINT, shutting down.");
347                    datasource_cancellation_token.cancel();
348
349                    if self.shutdown_strategy == ShutdownStrategy::Immediate {
350                        log::info!("shutting down the pipeline immediately.");
351                        self.metrics.flush_metrics().await?;
352                        self.metrics.shutdown_metrics().await?;
353                        break;
354                    } else {
355                        log::info!("shutting down the pipeline after processing pending updates.");
356                    }
357                }
358                _ = interval.tick() => {
359                    self.metrics.flush_metrics().await?;
360                }
361                update = update_receiver.recv() => {
362                    match update {
363                        Some(update) => {
364                            self
365                                .metrics.increment_counter("updates_received", 1)
366                                .await?;
367
368                            let start = Instant::now();
369                            let process_result = self.process(update.clone()).await;
370                            let time_taken_nanoseconds = start.elapsed().as_nanos();
371                            let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
372
373                            self
374                                .metrics
375                                .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
376                                .await?;
377
378                            self
379                                .metrics
380                                .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
381                                .await?;
382
383                            match process_result {
384                                Ok(_) => {
385                                    self
386                                        .metrics.increment_counter("updates_successful", 1)
387                                        .await?;
388
389                                    log::trace!("processed update")
390                                }
391                                Err(error) => {
392                                    log::error!("error processing update ({:?}): {:?}", update, error);
393                                    self.metrics.increment_counter("updates_failed", 1).await?;
394                                }
395                            };
396
397                            self
398                                .metrics.increment_counter("updates_processed", 1)
399                                .await?;
400
401                            self
402                                .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
403                                .await?;
404                        }
405                        None => {
406                            log::info!("update_receiver closed, shutting down.");
407                            self.metrics.flush_metrics().await?;
408                            self.metrics.shutdown_metrics().await?;
409                            break;
410                        }
411                    }
412                }
413            }
414        }
415
416        log::info!("pipeline shutdown complete.");
417
418        Ok(())
419    }
420
421    /// Processes a single update and routes it through the appropriate pipeline
422    /// stages.
423    ///
424    /// The `process` method takes an `Update` and determines its type, then
425    /// routes it through the corresponding pipes for handling account
426    /// updates, transactions, or account deletions. It also records metrics
427    /// for processed updates, providing insights into the processing
428    /// workload and performance.
429    ///
430    /// ## Functionality
431    ///
432    /// - **Account Updates**: Passes account updates through the
433    ///   `account_pipes`. Each pipe processes the account metadata and the
434    ///   updated account state.
435    /// - **Transaction Updates**: Extracts transaction metadata and
436    ///   instructions, nests them if needed, and routes them through
437    ///   `instruction_pipes` and `transaction_pipes`.
438    /// - **Account Deletions**: Sends account deletion events through the
439    ///   `account_deletion_pipes`.
440    ///
441    /// The method also updates metrics counters for each type of update,
442    /// tracking how many updates have been processed in each category.
443    ///
444    /// # Parameters
445    ///
446    /// - `update`: An `Update` variant representing the type of data received.
447    ///   This can be an `Account`, `Transaction`, or `AccountDeletion`, each
448    ///   triggering different processing logic within the pipeline.
449    ///
450    /// # Returns
451    ///
452    /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
453    /// processing or an error if processing fails at any stage.
454    ///
455    /// # Notes
456    ///
457    /// - This method is asynchronous and should be awaited within a Tokio
458    ///   runtime.
459    /// - Each type of update (account, transaction, account deletion) requires
460    ///   its own set of pipes, so ensure that appropriate pipes are configured
461    ///   based on the data types expected from the data sources.
462    /// - Metrics are recorded after each successful processing stage to track
463    ///   processing volumes and identify potential bottlenecks in real-time.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if any of the pipes fail during processing, or if an
468    /// issue arises while incrementing counters or updating metrics. Handle
469    /// errors gracefully to ensure continuous pipeline operation.
470    async fn process(&mut self, update: Update) -> CarbonResult<()> {
471        log::trace!("process(self, update: {:?})", update);
472        match update {
473            Update::Account(account_update) => {
474                let account_metadata = AccountMetadata {
475                    slot: account_update.slot,
476                    pubkey: account_update.pubkey,
477                };
478
479                for pipe in self.account_pipes.iter_mut() {
480                    pipe.run(
481                        (account_metadata.clone(), account_update.account.clone()),
482                        self.metrics.clone(),
483                    )
484                    .await?;
485                }
486
487                self.metrics
488                    .increment_counter("account_updates_processed", 1)
489                    .await?;
490            }
491            Update::Transaction(transaction_update) => {
492                let transaction_metadata = &(*transaction_update).clone().try_into()?;
493
494                let instructions_with_metadata: InstructionsWithMetadata =
495                    transformers::extract_instructions_with_metadata(
496                        transaction_metadata,
497                        &transaction_update,
498                    )?;
499
500                let nested_instructions: NestedInstructions = instructions_with_metadata.into();
501
502                for pipe in self.instruction_pipes.iter_mut() {
503                    for nested_instruction in nested_instructions.iter() {
504                        pipe.run(nested_instruction, self.metrics.clone()).await?;
505                    }
506                }
507
508                for pipe in self.transaction_pipes.iter_mut() {
509                    pipe.run(
510                        transaction_metadata.clone(),
511                        &nested_instructions,
512                        self.metrics.clone(),
513                    )
514                    .await?;
515                }
516
517                self.metrics
518                    .increment_counter("transaction_updates_processed", 1)
519                    .await?;
520            }
521            Update::AccountDeletion(account_deletion) => {
522                for pipe in self.account_deletion_pipes.iter_mut() {
523                    pipe.run(account_deletion.clone(), self.metrics.clone())
524                        .await?;
525                }
526
527                self.metrics
528                    .increment_counter("account_deletions_processed", 1)
529                    .await?;
530            }
531        };
532
533        Ok(())
534    }
535}
536
537/// A builder for constructing a `Pipeline` instance with customized data
538/// sources, processing pipes, and metrics.
539///
540/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
541/// by allowing configuration of its components, such as data sources, account
542/// and transaction pipes, deletion handling, and metrics. Using the builder
543/// pattern, you can add the desired elements incrementally and then finalize
544/// with a call to `build`.
545///
546/// ## Overview
547///
548/// The `PipelineBuilder` supports the following components:
549/// - **Datasources**: Sources of data updates, such as account information and
550///   transactions.
551/// - **Account Pipes**: For processing account updates from data sources.
552/// - **Account Deletion Pipes**: For handling account deletion updates.
553/// - **Instruction Pipes**: For handling instructions associated with
554///   transactions.
555/// - **Transaction Pipes**: For handling full transaction data.
556/// - **Metrics**: Collects and reports performance data, such as update
557///   processing times.
558/// - **Metrics Flush Interval**: Optional interval defining how often to flush
559///   metrics data.
560///
561/// Each component can be added through method chaining, enhancing code
562/// readability and maintainability.
563///
564/// # Example
565///
566/// ```rust
567/// carbon_core::pipeline::Pipeline::builder()
568/// .datasource(transaction_crawler)
569/// .metrics(Arc::new(LogMetrics::new()))
570/// .metrics(Arc::new(PrometheusMetrics::new()))
571/// .instruction(
572///    TestProgramDecoder,
573///    TestProgramProcessor
574/// )
575/// // ...
576/// ```
577///
578/// # Fields
579///
580/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
581///   shared ownership across threads. Each `Datasource` provides updates to the
582///   pipeline.
583/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
584/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
585///   processing account deletions.
586/// - `instruction_pipes`: A collection of `InstructionPipes` to process
587///   instructions in transactions.
588/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
589///   transaction data.
590/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
591///   performance.
592/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
593///   metrics data. If not set, a default flush interval will be used.
594///
595/// # Returns
596///
597/// After configuring the builder, call `build` to create a `Pipeline` instance.
598/// The builder will return a `CarbonResult<Pipeline>`, which will either
599/// contain the configured pipeline or an error if configuration failed.
600///
601/// # Notes
602///
603/// - The builder pattern allows for method chaining, making it easy to
604///   incrementally add components to the `Pipeline`.
605/// - Ensure that each component matches the data and update types expected by
606///   your application.
607#[derive(Default)]
608pub struct PipelineBuilder {
609    pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
610    pub account_pipes: Vec<Box<dyn AccountPipes>>,
611    pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
612    pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
613    pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
614    pub metrics: MetricsCollection,
615    pub metrics_flush_interval: Option<u64>,
616    pub shutdown_strategy: ShutdownStrategy,
617}
618
619impl PipelineBuilder {
620    /// Creates a new `PipelineBuilder` with empty collections for datasources,
621    /// pipes, and metrics.
622    ///
623    /// This method initializes a `PipelineBuilder` instance, allowing you to
624    /// configure each component of a `Pipeline` before building it. The
625    /// builder pattern offers flexibility in adding data sources, account
626    /// and transaction handling pipes, deletion processing, and metrics
627    /// collection features.
628    ///
629    /// # Example
630    ///
631    /// ```rust
632    /// let builder = PipelineBuilder::new();
633    /// ```
634    pub fn new() -> Self {
635        log::trace!("PipelineBuilder::new()");
636        Self::default()
637    }
638
639    /// Adds a datasource to the pipeline.
640    ///
641    /// The datasource is responsible for providing updates, such as account and
642    /// transaction data, to the pipeline. Multiple datasources can be added
643    /// to handle various types of updates.
644    ///
645    /// # Parameters
646    ///
647    /// - `datasource`: The data source to add, implementing the `Datasource`
648    ///   trait.
649    ///
650    /// # Example
651    ///
652    /// ```rust
653    /// let builder = PipelineBuilder::new()
654    ///     .datasource(MyDatasource::new());
655    /// ```
656    pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
657        log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
658        self.datasources.push(Arc::new(datasource));
659        self
660    }
661
662    /// Sets the shutdown strategy for the pipeline.
663    ///
664    /// This method configures how the pipeline should handle shutdowns. The
665    /// shutdown strategy defines whether the pipeline should terminate
666    /// immediately or continue processing pending updates after terminating
667    /// the data sources.
668    ///
669    /// # Parameters
670    ///
671    /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
672    ///   how the pipeline should handle shutdowns.
673    ///
674    /// # Returns
675    ///
676    /// Returns `Self`, allowing for method chaining.
677    ///
678    /// # Notes
679    ///
680    /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
681    ///   instantly, including all active processing tasks.
682    /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
683    ///   sources first and allow the pipeline to finish processing any updates
684    ///   that are still pending.
685    pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
686        log::trace!(
687            "shutdown_strategy(self, shutdown_strategy: {:?})",
688            shutdown_strategy
689        );
690        self.shutdown_strategy = shutdown_strategy;
691        self
692    }
693
694    /// Adds an account pipe to process account updates.
695    ///
696    /// Account pipes decode and process updates to accounts within the
697    /// pipeline. This method requires both an `AccountDecoder` and a
698    /// `Processor` to handle decoded account data.
699    ///
700    /// # Parameters
701    ///
702    /// - `decoder`: An `AccountDecoder` that decodes the account data.
703    /// - `processor`: A `Processor` that processes the decoded account data.
704    ///
705    /// # Example
706    ///
707    /// ```rust
708    /// let builder = PipelineBuilder::new()
709    ///     .account(MyAccountDecoder, MyAccountProcessor);
710    /// ```
711    pub fn account<T: Send + Sync + 'static>(
712        mut self,
713        decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
714        processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
715    ) -> Self {
716        log::trace!(
717            "account(self, decoder: {:?}, processor: {:?})",
718            stringify!(decoder),
719            stringify!(processor)
720        );
721        self.account_pipes.push(Box::new(AccountPipe {
722            decoder: Box::new(decoder),
723            processor: Box::new(processor),
724        }));
725        self
726    }
727
728    /// Adds an account deletion pipe to handle account deletion events.
729    ///
730    /// Account deletion pipes process deletions of accounts, with a `Processor`
731    /// to handle the deletion events as they occur.
732    ///
733    /// # Parameters
734    ///
735    /// - `processor`: A `Processor` that processes account deletion events.
736    ///
737    /// # Example
738    ///
739    /// ```rust
740    /// let builder = PipelineBuilder::new()
741    ///     .account_deletions(MyAccountDeletionProcessor);
742    /// ```
743    pub fn account_deletions(
744        mut self,
745        processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
746    ) -> Self {
747        log::trace!(
748            "account_deletions(self, processor: {:?})",
749            stringify!(processor)
750        );
751        self.account_deletion_pipes
752            .push(Box::new(AccountDeletionPipe {
753                processor: Box::new(processor),
754            }));
755        self
756    }
757
758    /// Adds an instruction pipe to process instructions within transactions.
759    ///
760    /// Instruction pipes decode and process individual instructions,
761    /// enabling specialized handling of various instruction types.
762    ///
763    /// # Parameters
764    ///
765    /// - `decoder`: An `InstructionDecoder` for decoding instructions from
766    ///   transaction data.
767    /// - `processor`: A `Processor` that processes decoded instruction data.
768    ///
769    /// # Example
770    ///
771    /// ```rust
772    /// let builder = PipelineBuilder::new()
773    ///     .instruction(MyDecoder, MyInstructionProcessor);
774    /// ```
775    pub fn instruction<T: Send + Sync + 'static>(
776        mut self,
777        decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
778        processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
779    ) -> Self {
780        log::trace!(
781            "instruction(self, decoder: {:?}, processor: {:?})",
782            stringify!(decoder),
783            stringify!(processor)
784        );
785        self.instruction_pipes.push(Box::new(InstructionPipe {
786            decoder: Box::new(decoder),
787            processor: Box::new(processor),
788        }));
789        self
790    }
791
792    /// Adds a transaction pipe for processing full transaction data.
793    ///
794    /// This method requires a transaction schema for decoding and a `Processor`
795    /// to handle the processed transaction data.
796    ///
797    /// # Parameters
798    ///
799    /// - `schema`: A `TransactionSchema` used to match and interpret
800    ///   transaction data.
801    /// - `processor`: A `Processor` that processes the decoded transaction
802    ///   data.
803    ///
804    /// # Example
805    ///
806    /// ```rust
807    /// let builder = PipelineBuilder::new()
808    ///     .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
809    /// ```
810    pub fn transaction<T, U>(
811        mut self,
812        processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
813            + Send
814            + Sync
815            + 'static,
816        schema: Option<TransactionSchema<T>>,
817    ) -> Self
818    where
819        T: InstructionDecoderCollection + 'static,
820        U: DeserializeOwned + Send + Sync + 'static,
821    {
822        log::trace!(
823            "transaction(self, schema: {:?}, processor: {:?})",
824            stringify!(schema),
825            stringify!(processor)
826        );
827        self.transaction_pipes
828            .push(Box::new(TransactionPipe::<T, U>::new(schema, processor)));
829        self
830    }
831
832    /// Adds a metrics component to the pipeline for performance tracking.
833    ///
834    /// This component collects and reports on pipeline metrics, providing
835    /// insights into performance and operational statistics.
836    ///
837    /// # Parameters
838    ///
839    /// - `metrics`: An instance of a `Metrics` implementation, used to gather
840    ///   and report metrics.
841    ///
842    /// # Example
843    ///
844    /// ```rust
845    /// let builder = PipelineBuilder::new()
846    ///     .metrics(Arc::new(LogMetrics::new()));
847    /// ```
848    pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
849        log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
850        self.metrics.metrics.push(metrics);
851        self
852    }
853
854    /// Sets the interval for flushing metrics data.
855    ///
856    /// This value defines the frequency, in seconds, at which metrics data is
857    /// flushed from memory. If not set, a default interval is used.
858    ///
859    /// # Parameters
860    ///
861    /// - `interval`: The flush interval for metrics, in seconds.
862    ///
863    /// # Example
864    ///
865    /// ```rust
866    /// let builder = PipelineBuilder::new()
867    ///     .metrics_flush_interval(60);
868    /// ```
869    pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
870        log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
871        self.metrics_flush_interval = Some(interval);
872        self
873    }
874
875    /// Builds and returns a `Pipeline` configured with the specified
876    /// components.
877    ///
878    /// After configuring the `PipelineBuilder` with data sources, pipes, and
879    /// metrics, call this method to create the final `Pipeline` instance
880    /// ready for operation.
881    ///
882    /// # Returns
883    ///
884    /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
885    /// or an error if any part of the configuration is invalid.
886    ///
887    /// # Example
888    ///
889    /// ```rust
890    /// carbon_core::pipeline::Pipeline::builder()
891    /// .datasource(transaction_crawler)
892    /// .metrics(Arc::new(LogMetrics::new()))
893    /// .metrics(Arc::new(PrometheusMetrics::new()))
894    /// .instruction(
895    ///    TestProgramDecoder,
896    ///    TestProgramProcessor
897    /// )
898    /// .account(
899    ///     TestProgramDecoder,
900    ///     TestProgramAccountProcessor
901    /// )
902    /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
903    /// .account_deletions(TestProgramAccountDeletionProcessor)
904    /// .build()?
905    /// ```
906    pub fn build(self) -> CarbonResult<Pipeline> {
907        log::trace!("build(self)");
908        Ok(Pipeline {
909            datasources: self.datasources,
910            account_pipes: self.account_pipes,
911            account_deletion_pipes: self.account_deletion_pipes,
912            instruction_pipes: self.instruction_pipes,
913            transaction_pipes: self.transaction_pipes,
914            shutdown_strategy: self.shutdown_strategy,
915            metrics: Arc::new(self.metrics),
916            metrics_flush_interval: self.metrics_flush_interval,
917        })
918    }
919}