carbon_core/pipeline.rs
1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//! transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//! and process specific types of data. Account pipes handle account updates,
25//! instruction pipes process instructions within transactions, and
26//! transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//! times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//! for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//! deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//! on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//! Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//! (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//! wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//! pipeline performance, especially in production environments.
52
53use {
54 crate::{
55 account::{
56 AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
57 },
58 account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
59 collection::InstructionDecoderCollection,
60 datasource::{AccountDeletion, Datasource, Update},
61 error::CarbonResult,
62 instruction::{
63 InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
64 InstructionsWithMetadata, NestedInstructions,
65 },
66 metrics::{Metrics, MetricsCollection},
67 processor::Processor,
68 schema::TransactionSchema,
69 transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
70 transformers,
71 },
72 core::time,
73 serde::de::DeserializeOwned,
74 std::{convert::TryInto, sync::Arc, time::Instant},
75 tokio_util::sync::CancellationToken,
76};
77
78/// Defines the shutdown behavior for the pipeline.
79///
80/// `ShutdownStrategy` determines how the pipeline will behave when it receives
81/// a shutdown signal. It supports two modes:
82///
83/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
84/// - `ProcessPending`: Terminates the data sources, then completes processing
85/// of any updates currently pending in the pipeline. This is the default
86/// behavior.
87///
88/// # Variants
89///
90/// - `Immediate`: Immediately stops all pipeline activity without processing
91/// any remaining updates.
92/// - `ProcessPending`: Gracefully terminates the data sources and allows the
93/// pipeline to finish processing updates that are still in progress or
94/// queued.
95///
96/// # Notes
97///
98/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
99/// that no updates are lost during shutdown.
100#[derive(Default, PartialEq, Debug)]
101pub enum ShutdownStrategy {
102 /// Stop the whole pipeline immediately.
103 Immediate,
104 /// Terminate the datasource(s) and finish processing all pending updates.
105 #[default]
106 ProcessPending,
107}
108
109/// Represents the primary data processing pipeline in the `carbon-core`
110/// framework.
111///
112/// The `Pipeline` struct is responsible for orchestrating the flow of data from
113/// various sources, processing it through multiple pipes (for accounts,
114/// transactions, instructions, and account deletions), and recording metrics at
115/// each stage. This flexible design allows for customized data processing,
116/// handling a variety of update types with minimal boilerplate code.
117///
118/// ## Overview
119///
120/// A `Pipeline` instance includes collections of data sources and processing
121/// pipes, enabling users to configure the pipeline to handle diverse types of
122/// blockchain-related data. Each pipe is responsible for decoding, processing,
123/// and routing specific data types, while the metrics system records relevant
124/// statistics.
125///
126/// ### Key Concepts
127///
128/// - **Datasources**: These provide the raw data, such as account updates,
129/// transaction details, and account deletions.
130/// - **Pipes**: Modular units that handle specific data types:
131/// - `AccountPipes` for account updates.
132/// - `AccountDeletionPipes` for account deletions.
133/// - `InstructionPipes` for instruction data within transactions.
134/// - `TransactionPipes` for entire transaction payloads.
135/// - **Metrics**: Collect performance data, enabling real-time insights and
136/// efficient monitoring.
137///
138/// ## Fields
139///
140/// - `datasources`: A vector of data sources (`Datasource` implementations)
141/// that provide the data for processing. Each data source must be wrapped in
142/// an `Arc` for safe, concurrent access.
143/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
144/// account updates.
145/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
146/// deletion events.
147/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
148/// instructions within transactions. These pipes work with nested
149/// instructions and are generically defined to support varied instruction
150/// types.
151/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
152/// processing complete transaction payloads.
153/// - `metrics`: A vector of `Metrics` implementations to record and track
154/// performance data. Each metrics instance is managed within an `Arc` to
155/// ensure thread safety.
156/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
157/// frequently metrics should be flushed. If `None`, the default interval is
158/// used.
159///
160/// ## Example
161///
162/// ```rust
163///
164/// carbon_core::pipeline::Pipeline::builder()
165/// .datasource(transaction_crawler)
166/// .metrics(Arc::new(LogMetrics::new()))
167/// .metrics(Arc::new(PrometheusMetrics::new()))
168/// .instruction(
169/// TestProgramDecoder,
170/// TestProgramProcessor
171/// )
172/// .account(
173/// TestProgramDecoder,
174/// TestProgramAccountProcessor
175/// )
176/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
177/// .account_deletions(TestProgramAccountDeletionProcessor)
178/// .build()?
179/// .run()
180/// .await?;
181/// ```
182///
183/// ## Notes
184///
185/// - Ensure that each data source and pipe implements the required traits, such
186/// as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
187/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
188/// `Box` types to handle shared ownership and trait object storage.
189/// - The `metrics_flush_interval` controls how frequently the pipeline's
190/// metrics are flushed. If `None`, a default interval (usually 5 seconds) is
191/// used.
192pub struct Pipeline {
193 pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
194 pub account_pipes: Vec<Box<dyn AccountPipes>>,
195 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
196 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
197 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
198 pub metrics: Arc<MetricsCollection>,
199 pub metrics_flush_interval: Option<u64>,
200 pub shutdown_strategy: ShutdownStrategy,
201}
202
203impl Pipeline {
204 /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
205 ///
206 /// The `builder` method returns a `PipelineBuilder` that allows you to
207 /// configure and customize the pipeline components before building the
208 /// final `Pipeline` object. This approach provides a flexible and
209 /// type-safe way to assemble a pipeline by specifying data sources,
210 /// processing pipes, and metrics.
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// carbon_core::pipeline::Pipeline::builder()
216 /// .datasource(transaction_crawler)
217 /// .metrics(Arc::new(LogMetrics::new()))
218 /// .metrics(Arc::new(PrometheusMetrics::new()))
219 /// .instruction(
220 /// TestProgramDecoder,
221 /// TestProgramProcessor
222 /// )
223 /// // ...
224 /// ```
225 ///
226 /// # Returns
227 ///
228 /// Returns a `PipelineBuilder` instance with empty collections for data
229 /// sources, pipes, and metrics. You can then configure each component
230 /// using the builder pattern.
231 pub fn builder() -> PipelineBuilder {
232 log::trace!("Pipeline::builder()");
233 PipelineBuilder {
234 datasources: Vec::new(),
235 account_pipes: Vec::new(),
236 account_deletion_pipes: Vec::new(),
237 instruction_pipes: Vec::new(),
238 transaction_pipes: Vec::new(),
239 metrics: MetricsCollection::default(),
240 metrics_flush_interval: None,
241 shutdown_strategy: ShutdownStrategy::default(),
242 }
243 }
244
245 /// Runs the `Pipeline`, processing updates from data sources and handling
246 /// metrics.
247 ///
248 /// The `run` method initializes the pipeline’s metrics system and starts
249 /// listening for updates from the configured data sources. It checks
250 /// the types of updates provided by each data source to ensure that the
251 /// required data types are available for processing. The method then
252 /// enters a loop where it processes each update received from the data
253 /// sources in turn, logging and updating metrics based on the success
254 /// or failure of each operation.
255 ///
256 /// # How it Works
257 ///
258 /// - Initializes metrics and sets up an interval for periodic metric
259 /// flushing.
260 /// - Spawns tasks for each data source to continuously consume updates.
261 /// - Processes updates according to their type (e.g., Account, Transaction,
262 /// or AccountDeletion).
263 /// - Records performance metrics such as update processing times, and
264 /// tracks success and failure counts.
265 ///
266 /// # Errors
267 ///
268 /// The method returns an `Err` variant if:
269 /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
270 /// `Transaction`) are not provided by any data source, causing a mismatch
271 /// in expected data processing capabilities.
272 /// - A data source encounters an error while consuming updates.
273 /// - An error occurs during metrics flushing or processing of updates.
274 ///
275 /// # Example
276 ///
277 /// ```rust
278 /// use carbon_core::Pipeline;
279 ///
280 /// let mut pipeline = Pipeline::builder()
281 /// .datasource(MyDatasource::new())
282 /// .metrics(MyMetrics::new())
283 /// .build()
284 /// .expect("Failed to build pipeline");
285 ///
286 /// // Running the pipeline asynchronously
287 /// tokio::spawn(async move {
288 /// if let Err(e) = pipeline.run().await {
289 /// eprintln!("Pipeline run error: {:?}", e);
290 /// }
291 /// });
292 /// ```
293 ///
294 /// # Notes
295 ///
296 /// - This method is asynchronous and should be awaited within a Tokio
297 /// runtime environment.
298 /// - The pipeline monitors metrics and flushes them based on the configured
299 /// `metrics_flush_interval`.
300 /// - The `run` method operates in an infinite loop, handling updates until
301 /// a termination condition occurs.
302 pub async fn run(&mut self) -> CarbonResult<()> {
303 log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
304 self.datasources.len(),
305 self.metrics.metrics.len(),
306 self.account_pipes.len(),
307 self.account_deletion_pipes.len(),
308 self.instruction_pipes.len(),
309 self.transaction_pipes.len(),
310 );
311
312 log::trace!("run(self)");
313
314 self.metrics.initialize_metrics().await?;
315 let (update_sender, mut update_receiver) = tokio::sync::mpsc::unbounded_channel::<Update>();
316
317 let datasource_cancellation_token = CancellationToken::new();
318
319 for datasource in &self.datasources {
320 let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
321 let sender_clone = update_sender.clone();
322 let datasource_clone = Arc::clone(datasource);
323 let metrics_collection = self.metrics.clone();
324
325 tokio::spawn(async move {
326 if let Err(e) = datasource_clone
327 .consume(
328 &sender_clone,
329 datasource_cancellation_token_clone,
330 metrics_collection,
331 )
332 .await
333 {
334 log::error!("error consuming datasource: {:?}", e);
335 }
336 });
337 }
338
339 let mut interval = tokio::time::interval(time::Duration::from_secs(
340 self.metrics_flush_interval.unwrap_or(5),
341 ));
342
343 loop {
344 tokio::select! {
345 _ = tokio::signal::ctrl_c() => {
346 log::trace!("received SIGINT, shutting down.");
347 datasource_cancellation_token.cancel();
348
349 if self.shutdown_strategy == ShutdownStrategy::Immediate {
350 log::info!("shutting down the pipeline immediately.");
351 self.metrics.flush_metrics().await?;
352 self.metrics.shutdown_metrics().await?;
353 break;
354 } else {
355 log::info!("shutting down the pipeline after processing pending updates.");
356 }
357 }
358 _ = interval.tick() => {
359 self.metrics.flush_metrics().await?;
360 }
361 update = update_receiver.recv() => {
362 match update {
363 Some(update) => {
364 self
365 .metrics.increment_counter("updates_received", 1)
366 .await?;
367
368 let start = Instant::now();
369 let process_result = self.process(update.clone()).await;
370 let time_taken_nanoseconds = start.elapsed().as_nanos();
371 let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
372
373 self
374 .metrics
375 .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
376 .await?;
377
378 self
379 .metrics
380 .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
381 .await?;
382
383 match process_result {
384 Ok(_) => {
385 self
386 .metrics.increment_counter("updates_successful", 1)
387 .await?;
388
389 log::trace!("processed update")
390 }
391 Err(error) => {
392 log::error!("error processing update ({:?}): {:?}", update, error);
393 self.metrics.increment_counter("updates_failed", 1).await?;
394 }
395 };
396
397 self
398 .metrics.increment_counter("updates_processed", 1)
399 .await?;
400
401 self
402 .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
403 .await?;
404 }
405 None => {
406 log::info!("update_receiver closed, shutting down.");
407 self.metrics.flush_metrics().await?;
408 self.metrics.shutdown_metrics().await?;
409 break;
410 }
411 }
412 }
413 }
414 }
415
416 log::info!("pipeline shutdown complete.");
417
418 Ok(())
419 }
420
421 /// Processes a single update and routes it through the appropriate pipeline
422 /// stages.
423 ///
424 /// The `process` method takes an `Update` and determines its type, then
425 /// routes it through the corresponding pipes for handling account
426 /// updates, transactions, or account deletions. It also records metrics
427 /// for processed updates, providing insights into the processing
428 /// workload and performance.
429 ///
430 /// ## Functionality
431 ///
432 /// - **Account Updates**: Passes account updates through the
433 /// `account_pipes`. Each pipe processes the account metadata and the
434 /// updated account state.
435 /// - **Transaction Updates**: Extracts transaction metadata and
436 /// instructions, nests them if needed, and routes them through
437 /// `instruction_pipes` and `transaction_pipes`.
438 /// - **Account Deletions**: Sends account deletion events through the
439 /// `account_deletion_pipes`.
440 ///
441 /// The method also updates metrics counters for each type of update,
442 /// tracking how many updates have been processed in each category.
443 ///
444 /// # Parameters
445 ///
446 /// - `update`: An `Update` variant representing the type of data received.
447 /// This can be an `Account`, `Transaction`, or `AccountDeletion`, each
448 /// triggering different processing logic within the pipeline.
449 ///
450 /// # Returns
451 ///
452 /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
453 /// processing or an error if processing fails at any stage.
454 ///
455 /// # Notes
456 ///
457 /// - This method is asynchronous and should be awaited within a Tokio
458 /// runtime.
459 /// - Each type of update (account, transaction, account deletion) requires
460 /// its own set of pipes, so ensure that appropriate pipes are configured
461 /// based on the data types expected from the data sources.
462 /// - Metrics are recorded after each successful processing stage to track
463 /// processing volumes and identify potential bottlenecks in real-time.
464 ///
465 /// # Errors
466 ///
467 /// Returns an error if any of the pipes fail during processing, or if an
468 /// issue arises while incrementing counters or updating metrics. Handle
469 /// errors gracefully to ensure continuous pipeline operation.
470 async fn process(&mut self, update: Update) -> CarbonResult<()> {
471 log::trace!("process(self, update: {:?})", update);
472 match update {
473 Update::Account(account_update) => {
474 let account_metadata = AccountMetadata {
475 slot: account_update.slot,
476 pubkey: account_update.pubkey,
477 };
478
479 for pipe in self.account_pipes.iter_mut() {
480 pipe.run(
481 (account_metadata.clone(), account_update.account.clone()),
482 self.metrics.clone(),
483 )
484 .await?;
485 }
486
487 self.metrics
488 .increment_counter("account_updates_processed", 1)
489 .await?;
490 }
491 Update::Transaction(transaction_update) => {
492 let transaction_metadata = &(*transaction_update).clone().try_into()?;
493
494 let instructions_with_metadata: InstructionsWithMetadata =
495 transformers::extract_instructions_with_metadata(
496 transaction_metadata,
497 &transaction_update,
498 )?;
499
500 let nested_instructions: NestedInstructions = instructions_with_metadata.into();
501
502 for pipe in self.instruction_pipes.iter_mut() {
503 for nested_instruction in nested_instructions.iter() {
504 pipe.run(nested_instruction, self.metrics.clone()).await?;
505 }
506 }
507
508 for pipe in self.transaction_pipes.iter_mut() {
509 pipe.run(
510 transaction_metadata.clone(),
511 &nested_instructions,
512 self.metrics.clone(),
513 )
514 .await?;
515 }
516
517 self.metrics
518 .increment_counter("transaction_updates_processed", 1)
519 .await?;
520 }
521 Update::AccountDeletion(account_deletion) => {
522 for pipe in self.account_deletion_pipes.iter_mut() {
523 pipe.run(account_deletion.clone(), self.metrics.clone())
524 .await?;
525 }
526
527 self.metrics
528 .increment_counter("account_deletions_processed", 1)
529 .await?;
530 }
531 };
532
533 Ok(())
534 }
535}
536
537/// A builder for constructing a `Pipeline` instance with customized data
538/// sources, processing pipes, and metrics.
539///
540/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
541/// by allowing configuration of its components, such as data sources, account
542/// and transaction pipes, deletion handling, and metrics. Using the builder
543/// pattern, you can add the desired elements incrementally and then finalize
544/// with a call to `build`.
545///
546/// ## Overview
547///
548/// The `PipelineBuilder` supports the following components:
549/// - **Datasources**: Sources of data updates, such as account information and
550/// transactions.
551/// - **Account Pipes**: For processing account updates from data sources.
552/// - **Account Deletion Pipes**: For handling account deletion updates.
553/// - **Instruction Pipes**: For handling instructions associated with
554/// transactions.
555/// - **Transaction Pipes**: For handling full transaction data.
556/// - **Metrics**: Collects and reports performance data, such as update
557/// processing times.
558/// - **Metrics Flush Interval**: Optional interval defining how often to flush
559/// metrics data.
560///
561/// Each component can be added through method chaining, enhancing code
562/// readability and maintainability.
563///
564/// # Example
565///
566/// ```rust
567/// carbon_core::pipeline::Pipeline::builder()
568/// .datasource(transaction_crawler)
569/// .metrics(Arc::new(LogMetrics::new()))
570/// .metrics(Arc::new(PrometheusMetrics::new()))
571/// .instruction(
572/// TestProgramDecoder,
573/// TestProgramProcessor
574/// )
575/// // ...
576/// ```
577///
578/// # Fields
579///
580/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
581/// shared ownership across threads. Each `Datasource` provides updates to the
582/// pipeline.
583/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
584/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
585/// processing account deletions.
586/// - `instruction_pipes`: A collection of `InstructionPipes` to process
587/// instructions in transactions.
588/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
589/// transaction data.
590/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
591/// performance.
592/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
593/// metrics data. If not set, a default flush interval will be used.
594///
595/// # Returns
596///
597/// After configuring the builder, call `build` to create a `Pipeline` instance.
598/// The builder will return a `CarbonResult<Pipeline>`, which will either
599/// contain the configured pipeline or an error if configuration failed.
600///
601/// # Notes
602///
603/// - The builder pattern allows for method chaining, making it easy to
604/// incrementally add components to the `Pipeline`.
605/// - Ensure that each component matches the data and update types expected by
606/// your application.
607#[derive(Default)]
608pub struct PipelineBuilder {
609 pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
610 pub account_pipes: Vec<Box<dyn AccountPipes>>,
611 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
612 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
613 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
614 pub metrics: MetricsCollection,
615 pub metrics_flush_interval: Option<u64>,
616 pub shutdown_strategy: ShutdownStrategy,
617}
618
619impl PipelineBuilder {
620 /// Creates a new `PipelineBuilder` with empty collections for datasources,
621 /// pipes, and metrics.
622 ///
623 /// This method initializes a `PipelineBuilder` instance, allowing you to
624 /// configure each component of a `Pipeline` before building it. The
625 /// builder pattern offers flexibility in adding data sources, account
626 /// and transaction handling pipes, deletion processing, and metrics
627 /// collection features.
628 ///
629 /// # Example
630 ///
631 /// ```rust
632 /// let builder = PipelineBuilder::new();
633 /// ```
634 pub fn new() -> Self {
635 log::trace!("PipelineBuilder::new()");
636 Self::default()
637 }
638
639 /// Adds a datasource to the pipeline.
640 ///
641 /// The datasource is responsible for providing updates, such as account and
642 /// transaction data, to the pipeline. Multiple datasources can be added
643 /// to handle various types of updates.
644 ///
645 /// # Parameters
646 ///
647 /// - `datasource`: The data source to add, implementing the `Datasource`
648 /// trait.
649 ///
650 /// # Example
651 ///
652 /// ```rust
653 /// let builder = PipelineBuilder::new()
654 /// .datasource(MyDatasource::new());
655 /// ```
656 pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
657 log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
658 self.datasources.push(Arc::new(datasource));
659 self
660 }
661
662 /// Sets the shutdown strategy for the pipeline.
663 ///
664 /// This method configures how the pipeline should handle shutdowns. The
665 /// shutdown strategy defines whether the pipeline should terminate
666 /// immediately or continue processing pending updates after terminating
667 /// the data sources.
668 ///
669 /// # Parameters
670 ///
671 /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
672 /// how the pipeline should handle shutdowns.
673 ///
674 /// # Returns
675 ///
676 /// Returns `Self`, allowing for method chaining.
677 ///
678 /// # Notes
679 ///
680 /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
681 /// instantly, including all active processing tasks.
682 /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
683 /// sources first and allow the pipeline to finish processing any updates
684 /// that are still pending.
685 pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
686 log::trace!(
687 "shutdown_strategy(self, shutdown_strategy: {:?})",
688 shutdown_strategy
689 );
690 self.shutdown_strategy = shutdown_strategy;
691 self
692 }
693
694 /// Adds an account pipe to process account updates.
695 ///
696 /// Account pipes decode and process updates to accounts within the
697 /// pipeline. This method requires both an `AccountDecoder` and a
698 /// `Processor` to handle decoded account data.
699 ///
700 /// # Parameters
701 ///
702 /// - `decoder`: An `AccountDecoder` that decodes the account data.
703 /// - `processor`: A `Processor` that processes the decoded account data.
704 ///
705 /// # Example
706 ///
707 /// ```rust
708 /// let builder = PipelineBuilder::new()
709 /// .account(MyAccountDecoder, MyAccountProcessor);
710 /// ```
711 pub fn account<T: Send + Sync + 'static>(
712 mut self,
713 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
714 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
715 ) -> Self {
716 log::trace!(
717 "account(self, decoder: {:?}, processor: {:?})",
718 stringify!(decoder),
719 stringify!(processor)
720 );
721 self.account_pipes.push(Box::new(AccountPipe {
722 decoder: Box::new(decoder),
723 processor: Box::new(processor),
724 }));
725 self
726 }
727
728 /// Adds an account deletion pipe to handle account deletion events.
729 ///
730 /// Account deletion pipes process deletions of accounts, with a `Processor`
731 /// to handle the deletion events as they occur.
732 ///
733 /// # Parameters
734 ///
735 /// - `processor`: A `Processor` that processes account deletion events.
736 ///
737 /// # Example
738 ///
739 /// ```rust
740 /// let builder = PipelineBuilder::new()
741 /// .account_deletions(MyAccountDeletionProcessor);
742 /// ```
743 pub fn account_deletions(
744 mut self,
745 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
746 ) -> Self {
747 log::trace!(
748 "account_deletions(self, processor: {:?})",
749 stringify!(processor)
750 );
751 self.account_deletion_pipes
752 .push(Box::new(AccountDeletionPipe {
753 processor: Box::new(processor),
754 }));
755 self
756 }
757
758 /// Adds an instruction pipe to process instructions within transactions.
759 ///
760 /// Instruction pipes decode and process individual instructions,
761 /// enabling specialized handling of various instruction types.
762 ///
763 /// # Parameters
764 ///
765 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
766 /// transaction data.
767 /// - `processor`: A `Processor` that processes decoded instruction data.
768 ///
769 /// # Example
770 ///
771 /// ```rust
772 /// let builder = PipelineBuilder::new()
773 /// .instruction(MyDecoder, MyInstructionProcessor);
774 /// ```
775 pub fn instruction<T: Send + Sync + 'static>(
776 mut self,
777 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
778 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
779 ) -> Self {
780 log::trace!(
781 "instruction(self, decoder: {:?}, processor: {:?})",
782 stringify!(decoder),
783 stringify!(processor)
784 );
785 self.instruction_pipes.push(Box::new(InstructionPipe {
786 decoder: Box::new(decoder),
787 processor: Box::new(processor),
788 }));
789 self
790 }
791
792 /// Adds a transaction pipe for processing full transaction data.
793 ///
794 /// This method requires a transaction schema for decoding and a `Processor`
795 /// to handle the processed transaction data.
796 ///
797 /// # Parameters
798 ///
799 /// - `schema`: A `TransactionSchema` used to match and interpret
800 /// transaction data.
801 /// - `processor`: A `Processor` that processes the decoded transaction
802 /// data.
803 ///
804 /// # Example
805 ///
806 /// ```rust
807 /// let builder = PipelineBuilder::new()
808 /// .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
809 /// ```
810 pub fn transaction<T, U>(
811 mut self,
812 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
813 + Send
814 + Sync
815 + 'static,
816 schema: Option<TransactionSchema<T>>,
817 ) -> Self
818 where
819 T: InstructionDecoderCollection + 'static,
820 U: DeserializeOwned + Send + Sync + 'static,
821 {
822 log::trace!(
823 "transaction(self, schema: {:?}, processor: {:?})",
824 stringify!(schema),
825 stringify!(processor)
826 );
827 self.transaction_pipes
828 .push(Box::new(TransactionPipe::<T, U>::new(schema, processor)));
829 self
830 }
831
832 /// Adds a metrics component to the pipeline for performance tracking.
833 ///
834 /// This component collects and reports on pipeline metrics, providing
835 /// insights into performance and operational statistics.
836 ///
837 /// # Parameters
838 ///
839 /// - `metrics`: An instance of a `Metrics` implementation, used to gather
840 /// and report metrics.
841 ///
842 /// # Example
843 ///
844 /// ```rust
845 /// let builder = PipelineBuilder::new()
846 /// .metrics(Arc::new(LogMetrics::new()));
847 /// ```
848 pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
849 log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
850 self.metrics.metrics.push(metrics);
851 self
852 }
853
854 /// Sets the interval for flushing metrics data.
855 ///
856 /// This value defines the frequency, in seconds, at which metrics data is
857 /// flushed from memory. If not set, a default interval is used.
858 ///
859 /// # Parameters
860 ///
861 /// - `interval`: The flush interval for metrics, in seconds.
862 ///
863 /// # Example
864 ///
865 /// ```rust
866 /// let builder = PipelineBuilder::new()
867 /// .metrics_flush_interval(60);
868 /// ```
869 pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
870 log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
871 self.metrics_flush_interval = Some(interval);
872 self
873 }
874
875 /// Builds and returns a `Pipeline` configured with the specified
876 /// components.
877 ///
878 /// After configuring the `PipelineBuilder` with data sources, pipes, and
879 /// metrics, call this method to create the final `Pipeline` instance
880 /// ready for operation.
881 ///
882 /// # Returns
883 ///
884 /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
885 /// or an error if any part of the configuration is invalid.
886 ///
887 /// # Example
888 ///
889 /// ```rust
890 /// carbon_core::pipeline::Pipeline::builder()
891 /// .datasource(transaction_crawler)
892 /// .metrics(Arc::new(LogMetrics::new()))
893 /// .metrics(Arc::new(PrometheusMetrics::new()))
894 /// .instruction(
895 /// TestProgramDecoder,
896 /// TestProgramProcessor
897 /// )
898 /// .account(
899 /// TestProgramDecoder,
900 /// TestProgramAccountProcessor
901 /// )
902 /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
903 /// .account_deletions(TestProgramAccountDeletionProcessor)
904 /// .build()?
905 /// ```
906 pub fn build(self) -> CarbonResult<Pipeline> {
907 log::trace!("build(self)");
908 Ok(Pipeline {
909 datasources: self.datasources,
910 account_pipes: self.account_pipes,
911 account_deletion_pipes: self.account_deletion_pipes,
912 instruction_pipes: self.instruction_pipes,
913 transaction_pipes: self.transaction_pipes,
914 shutdown_strategy: self.shutdown_strategy,
915 metrics: Arc::new(self.metrics),
916 metrics_flush_interval: self.metrics_flush_interval,
917 })
918 }
919}