carbon_core/pipeline.rs
1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//! transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//! and process specific types of data. Account pipes handle account updates,
25//! instruction pipes process instructions within transactions, and
26//! transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//! times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//! for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//! deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//! on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//! Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//! (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//! wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//! pipeline performance, especially in production environments.
52
53use {
54 crate::{
55 account::{
56 AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
57 },
58 account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
59 collection::InstructionDecoderCollection,
60 datasource::{AccountDeletion, Datasource, Update},
61 error::CarbonResult,
62 instruction::{
63 InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
64 InstructionsWithMetadata, NestedInstructions,
65 },
66 metrics::{Metrics, MetricsCollection},
67 processor::Processor,
68 schema::TransactionSchema,
69 transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
70 transformers,
71 },
72 core::time,
73 serde::de::DeserializeOwned,
74 std::{convert::TryInto, sync::Arc, time::Instant},
75 tokio_util::sync::CancellationToken,
76};
77
78/// Defines the shutdown behavior for the pipeline.
79///
80/// `ShutdownStrategy` determines how the pipeline will behave when it receives
81/// a shutdown signal. It supports two modes:
82///
83/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
84/// - `ProcessPending`: Terminates the data sources, then completes processing
85/// of any updates currently pending in the pipeline. This is the default
86/// behavior.
87///
88/// # Variants
89///
90/// - `Immediate`: Immediately stops all pipeline activity without processing
91/// any remaining updates.
92/// - `ProcessPending`: Gracefully terminates the data sources and allows the
93/// pipeline to finish processing updates that are still in progress or
94/// queued.
95///
96/// # Notes
97///
98/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
99/// that no updates are lost during shutdown.
100#[derive(Default, PartialEq, Debug)]
101pub enum ShutdownStrategy {
102 /// Stop the whole pipeline immediately.
103 Immediate,
104 /// Terminate the datasource(s) and finish processing all pending updates.
105 #[default]
106 ProcessPending,
107}
108
109/// The default size of the channel buffer for the pipeline.
110///
111/// This constant defines the default number of updates that can be queued in
112/// the pipeline's channel buffer. It is used as a fallback value if the
113/// `channel_buffer_size` is not explicitly set during pipeline construction.
114///
115/// The default size is 10,000 updates, which provides a reasonable balance
116pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
117
118/// Represents the primary data processing pipeline in the `carbon-core`
119/// framework.
120///
121/// The `Pipeline` struct is responsible for orchestrating the flow of data from
122/// various sources, processing it through multiple pipes (for accounts,
123/// transactions, instructions, and account deletions), and recording metrics at
124/// each stage. This flexible design allows for customized data processing,
125/// handling a variety of update types with minimal boilerplate code.
126///
127/// ## Overview
128///
129/// A `Pipeline` instance includes collections of data sources and processing
130/// pipes, enabling users to configure the pipeline to handle diverse types of
131/// blockchain-related data. Each pipe is responsible for decoding, processing,
132/// and routing specific data types, while the metrics system records relevant
133/// statistics.
134///
135/// ### Key Concepts
136///
137/// - **Datasources**: These provide the raw data, such as account updates,
138/// transaction details, and account deletions.
139/// - **Pipes**: Modular units that handle specific data types:
140/// - `AccountPipes` for account updates.
141/// - `AccountDeletionPipes` for account deletions.
142/// - `InstructionPipes` for instruction data within transactions.
143/// - `TransactionPipes` for entire transaction payloads.
144/// - **Metrics**: Collect performance data, enabling real-time insights and
145/// efficient monitoring.
146///
147/// ## Fields
148///
149/// - `datasources`: A vector of data sources (`Datasource` implementations)
150/// that provide the data for processing. Each data source must be wrapped in
151/// an `Arc` for safe, concurrent access.
152/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
153/// account updates.
154/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
155/// deletion events.
156/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
157/// instructions within transactions. These pipes work with nested
158/// instructions and are generically defined to support varied instruction
159/// types.
160/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
161/// processing complete transaction payloads.
162/// - `metrics`: A vector of `Metrics` implementations to record and track
163/// performance data. Each metrics instance is managed within an `Arc` to
164/// ensure thread safety.
165/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
166/// frequently metrics should be flushed. If `None`, the default interval is
167/// used.
168/// - `channel_buffer_size`: The size of the channel buffer for the pipeline.
169/// If not set, a default size of 10_000 will be used.
170///
171/// ## Example
172///
173/// ```rust
174///
175/// carbon_core::pipeline::Pipeline::builder()
176/// .datasource(transaction_crawler)
177/// .metrics(Arc::new(LogMetrics::new()))
178/// .metrics(Arc::new(PrometheusMetrics::new()))
179/// .instruction(
180/// TestProgramDecoder,
181/// TestProgramProcessor
182/// )
183/// .account(
184/// TestProgramDecoder,
185/// TestProgramAccountProcessor
186/// )
187/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
188/// .account_deletions(TestProgramAccountDeletionProcessor)
189/// .channel_buffer_size(1000)
190/// .build()?
191/// .run()
192/// .await?;
193/// ```
194///
195/// ## Notes
196///
197/// - Ensure that each data source and pipe implements the required traits, such
198/// as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
199/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
200/// `Box` types to handle shared ownership and trait object storage.
201/// - The `metrics_flush_interval` controls how frequently the pipeline's
202/// metrics are flushed. If `None`, a default interval (usually 5 seconds) is
203/// used.
204pub struct Pipeline {
205 pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
206 pub account_pipes: Vec<Box<dyn AccountPipes>>,
207 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
208 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
209 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
210 pub metrics: Arc<MetricsCollection>,
211 pub metrics_flush_interval: Option<u64>,
212 pub datasource_cancellation_token: Option<CancellationToken>,
213 pub shutdown_strategy: ShutdownStrategy,
214 pub channel_buffer_size: usize,
215}
216
217impl Pipeline {
218 /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
219 ///
220 /// The `builder` method returns a `PipelineBuilder` that allows you to
221 /// configure and customize the pipeline components before building the
222 /// final `Pipeline` object. This approach provides a flexible and
223 /// type-safe way to assemble a pipeline by specifying data sources,
224 /// processing pipes, and metrics.
225 ///
226 /// # Example
227 ///
228 /// ```rust
229 /// carbon_core::pipeline::Pipeline::builder()
230 /// .datasource(transaction_crawler)
231 /// .metrics(Arc::new(LogMetrics::new()))
232 /// .metrics(Arc::new(PrometheusMetrics::new()))
233 /// .instruction(
234 /// TestProgramDecoder,
235 /// TestProgramProcessor
236 /// )
237 /// // ...
238 /// ```
239 ///
240 /// # Returns
241 ///
242 /// Returns a `PipelineBuilder` instance with empty collections for data
243 /// sources, pipes, and metrics. You can then configure each component
244 /// using the builder pattern.
245 pub fn builder() -> PipelineBuilder {
246 log::trace!("Pipeline::builder()");
247 PipelineBuilder {
248 datasources: Vec::new(),
249 account_pipes: Vec::new(),
250 account_deletion_pipes: Vec::new(),
251 instruction_pipes: Vec::new(),
252 transaction_pipes: Vec::new(),
253 metrics: MetricsCollection::default(),
254 metrics_flush_interval: None,
255 datasource_cancellation_token: None,
256 shutdown_strategy: ShutdownStrategy::default(),
257 channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
258 }
259 }
260
261 /// Runs the `Pipeline`, processing updates from data sources and handling
262 /// metrics.
263 ///
264 /// The `run` method initializes the pipeline’s metrics system and starts
265 /// listening for updates from the configured data sources. It checks
266 /// the types of updates provided by each data source to ensure that the
267 /// required data types are available for processing. The method then
268 /// enters a loop where it processes each update received from the data
269 /// sources in turn, logging and updating metrics based on the success
270 /// or failure of each operation.
271 ///
272 /// # How it Works
273 ///
274 /// - Initializes metrics and sets up an interval for periodic metric
275 /// flushing.
276 /// - Spawns tasks for each data source to continuously consume updates.
277 /// - Processes updates according to their type (e.g., Account, Transaction,
278 /// or AccountDeletion).
279 /// - Records performance metrics such as update processing times, and
280 /// tracks success and failure counts.
281 ///
282 /// # Errors
283 ///
284 /// The method returns an `Err` variant if:
285 /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
286 /// `Transaction`) are not provided by any data source, causing a mismatch
287 /// in expected data processing capabilities.
288 /// - A data source encounters an error while consuming updates.
289 /// - An error occurs during metrics flushing or processing of updates.
290 ///
291 /// # Example
292 ///
293 /// ```rust
294 /// use carbon_core::Pipeline;
295 ///
296 /// let mut pipeline = Pipeline::builder()
297 /// .datasource(MyDatasource::new())
298 /// .metrics(MyMetrics::new())
299 /// .build()
300 /// .expect("Failed to build pipeline");
301 ///
302 /// // Running the pipeline asynchronously
303 /// tokio::spawn(async move {
304 /// if let Err(e) = pipeline.run().await {
305 /// eprintln!("Pipeline run error: {:?}", e);
306 /// }
307 /// });
308 /// ```
309 ///
310 /// # Notes
311 ///
312 /// - This method is asynchronous and should be awaited within a Tokio
313 /// runtime environment.
314 /// - The pipeline monitors metrics and flushes them based on the configured
315 /// `metrics_flush_interval`.
316 /// - The `run` method operates in an infinite loop, handling updates until
317 /// a termination condition occurs.
318 pub async fn run(&mut self) -> CarbonResult<()> {
319 log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
320 self.datasources.len(),
321 self.metrics.metrics.len(),
322 self.account_pipes.len(),
323 self.account_deletion_pipes.len(),
324 self.instruction_pipes.len(),
325 self.transaction_pipes.len(),
326 );
327
328 log::trace!("run(self)");
329
330 self.metrics.initialize_metrics().await?;
331 let (update_sender, mut update_receiver) =
332 tokio::sync::mpsc::channel::<Update>(self.channel_buffer_size);
333
334 let datasource_cancellation_token = self
335 .datasource_cancellation_token
336 .clone()
337 .unwrap_or_default();
338
339 for datasource in &self.datasources {
340 let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
341 let sender_clone = update_sender.clone();
342 let datasource_clone = Arc::clone(datasource);
343 let metrics_collection = self.metrics.clone();
344
345 tokio::spawn(async move {
346 if let Err(e) = datasource_clone
347 .consume(
348 &sender_clone,
349 datasource_cancellation_token_clone,
350 metrics_collection,
351 )
352 .await
353 {
354 log::error!("error consuming datasource: {:?}", e);
355 }
356 });
357 }
358
359 let mut interval = tokio::time::interval(time::Duration::from_secs(
360 self.metrics_flush_interval.unwrap_or(5),
361 ));
362
363 loop {
364 tokio::select! {
365 _ = tokio::signal::ctrl_c() => {
366 log::trace!("received SIGINT, shutting down.");
367 datasource_cancellation_token.cancel();
368
369 if self.shutdown_strategy == ShutdownStrategy::Immediate {
370 log::info!("shutting down the pipeline immediately.");
371 self.metrics.flush_metrics().await?;
372 self.metrics.shutdown_metrics().await?;
373 break;
374 } else {
375 log::info!("shutting down the pipeline after processing pending updates.");
376 }
377 }
378 _ = interval.tick() => {
379 self.metrics.flush_metrics().await?;
380 }
381 update = update_receiver.recv() => {
382 match update {
383 Some(update) => {
384 self
385 .metrics.increment_counter("updates_received", 1)
386 .await?;
387
388 let start = Instant::now();
389 let process_result = self.process(update.clone()).await;
390 let time_taken_nanoseconds = start.elapsed().as_nanos();
391 let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
392
393 self
394 .metrics
395 .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
396 .await?;
397
398 self
399 .metrics
400 .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
401 .await?;
402
403 match process_result {
404 Ok(_) => {
405 self
406 .metrics.increment_counter("updates_successful", 1)
407 .await?;
408
409 log::trace!("processed update")
410 }
411 Err(error) => {
412 log::error!("error processing update ({:?}): {:?}", update, error);
413 self.metrics.increment_counter("updates_failed", 1).await?;
414 }
415 };
416
417 self
418 .metrics.increment_counter("updates_processed", 1)
419 .await?;
420
421 self
422 .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
423 .await?;
424 }
425 None => {
426 log::info!("update_receiver closed, shutting down.");
427 self.metrics.flush_metrics().await?;
428 self.metrics.shutdown_metrics().await?;
429 break;
430 }
431 }
432 }
433 }
434 }
435
436 log::info!("pipeline shutdown complete.");
437
438 Ok(())
439 }
440
441 /// Processes a single update and routes it through the appropriate pipeline
442 /// stages.
443 ///
444 /// The `process` method takes an `Update` and determines its type, then
445 /// routes it through the corresponding pipes for handling account
446 /// updates, transactions, or account deletions. It also records metrics
447 /// for processed updates, providing insights into the processing
448 /// workload and performance.
449 ///
450 /// ## Functionality
451 ///
452 /// - **Account Updates**: Passes account updates through the
453 /// `account_pipes`. Each pipe processes the account metadata and the
454 /// updated account state.
455 /// - **Transaction Updates**: Extracts transaction metadata and
456 /// instructions, nests them if needed, and routes them through
457 /// `instruction_pipes` and `transaction_pipes`.
458 /// - **Account Deletions**: Sends account deletion events through the
459 /// `account_deletion_pipes`.
460 ///
461 /// The method also updates metrics counters for each type of update,
462 /// tracking how many updates have been processed in each category.
463 ///
464 /// # Parameters
465 ///
466 /// - `update`: An `Update` variant representing the type of data received.
467 /// This can be an `Account`, `Transaction`, or `AccountDeletion`, each
468 /// triggering different processing logic within the pipeline.
469 ///
470 /// # Returns
471 ///
472 /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
473 /// processing or an error if processing fails at any stage.
474 ///
475 /// # Notes
476 ///
477 /// - This method is asynchronous and should be awaited within a Tokio
478 /// runtime.
479 /// - Each type of update (account, transaction, account deletion) requires
480 /// its own set of pipes, so ensure that appropriate pipes are configured
481 /// based on the data types expected from the data sources.
482 /// - Metrics are recorded after each successful processing stage to track
483 /// processing volumes and identify potential bottlenecks in real-time.
484 ///
485 /// # Errors
486 ///
487 /// Returns an error if any of the pipes fail during processing, or if an
488 /// issue arises while incrementing counters or updating metrics. Handle
489 /// errors gracefully to ensure continuous pipeline operation.
490 async fn process(&mut self, update: Update) -> CarbonResult<()> {
491 log::trace!("process(self, update: {:?})", update);
492 match update {
493 Update::Account(account_update) => {
494 let account_metadata = AccountMetadata {
495 slot: account_update.slot,
496 pubkey: account_update.pubkey,
497 };
498
499 for pipe in self.account_pipes.iter_mut() {
500 pipe.run(
501 (account_metadata.clone(), account_update.account.clone()),
502 self.metrics.clone(),
503 )
504 .await?;
505 }
506
507 self.metrics
508 .increment_counter("account_updates_processed", 1)
509 .await?;
510 }
511 Update::Transaction(transaction_update) => {
512 let transaction_metadata = &(*transaction_update).clone().try_into()?;
513
514 let instructions_with_metadata: InstructionsWithMetadata =
515 transformers::extract_instructions_with_metadata(
516 transaction_metadata,
517 &transaction_update,
518 )?;
519
520 let nested_instructions: NestedInstructions = instructions_with_metadata.into();
521
522 for pipe in self.instruction_pipes.iter_mut() {
523 for nested_instruction in nested_instructions.iter() {
524 pipe.run(nested_instruction, self.metrics.clone()).await?;
525 }
526 }
527
528 for pipe in self.transaction_pipes.iter_mut() {
529 pipe.run(
530 transaction_metadata.clone(),
531 &nested_instructions,
532 self.metrics.clone(),
533 )
534 .await?;
535 }
536
537 self.metrics
538 .increment_counter("transaction_updates_processed", 1)
539 .await?;
540 }
541 Update::AccountDeletion(account_deletion) => {
542 for pipe in self.account_deletion_pipes.iter_mut() {
543 pipe.run(account_deletion.clone(), self.metrics.clone())
544 .await?;
545 }
546
547 self.metrics
548 .increment_counter("account_deletions_processed", 1)
549 .await?;
550 }
551 };
552
553 Ok(())
554 }
555}
556
557/// A builder for constructing a `Pipeline` instance with customized data
558/// sources, processing pipes, and metrics.
559///
560/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
561/// by allowing configuration of its components, such as data sources, account
562/// and transaction pipes, deletion handling, and metrics. Using the builder
563/// pattern, you can add the desired elements incrementally and then finalize
564/// with a call to `build`.
565///
566/// ## Overview
567///
568/// The `PipelineBuilder` supports the following components:
569/// - **Datasources**: Sources of data updates, such as account information and
570/// transactions.
571/// - **Account Pipes**: For processing account updates from data sources.
572/// - **Account Deletion Pipes**: For handling account deletion updates.
573/// - **Instruction Pipes**: For handling instructions associated with
574/// transactions.
575/// - **Transaction Pipes**: For handling full transaction data.
576/// - **Metrics**: Collects and reports performance data, such as update
577/// processing times.
578/// - **Metrics Flush Interval**: Optional interval defining how often to flush
579/// metrics data.
580///
581/// Each component can be added through method chaining, enhancing code
582/// readability and maintainability.
583///
584/// # Example
585///
586/// ```rust
587/// carbon_core::pipeline::Pipeline::builder()
588/// .datasource(transaction_crawler)
589/// .metrics(Arc::new(LogMetrics::new()))
590/// .metrics(Arc::new(PrometheusMetrics::new()))
591/// .instruction(
592/// TestProgramDecoder,
593/// TestProgramProcessor
594/// )
595/// // ...
596/// ```
597///
598/// # Fields
599///
600/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
601/// shared ownership across threads. Each `Datasource` provides updates to the
602/// pipeline.
603/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
604/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
605/// processing account deletions.
606/// - `instruction_pipes`: A collection of `InstructionPipes` to process
607/// instructions in transactions.
608/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
609/// transaction data.
610/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
611/// performance.
612/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
613/// metrics data. If not set, a default flush interval will be used.
614/// - `datasource_cancellation_token`: An optional `CancellationToken` for
615/// canceling datasource. If not set, a default `CancellationToken` will be used.
616/// - `channel_buffer_size`: The size of the channel buffer for the pipeline.
617/// If not set, a default size of 10_000 will be used.
618///
619/// # Returns
620///
621/// After configuring the builder, call `build` to create a `Pipeline` instance.
622/// The builder will return a `CarbonResult<Pipeline>`, which will either
623/// contain the configured pipeline or an error if configuration failed.
624///
625/// # Notes
626///
627/// - The builder pattern allows for method chaining, making it easy to
628/// incrementally add components to the `Pipeline`.
629/// - Ensure that each component matches the data and update types expected by
630/// your application.
631#[derive(Default)]
632pub struct PipelineBuilder {
633 pub datasources: Vec<Arc<dyn Datasource + Send + Sync>>,
634 pub account_pipes: Vec<Box<dyn AccountPipes>>,
635 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
636 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
637 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
638 pub metrics: MetricsCollection,
639 pub metrics_flush_interval: Option<u64>,
640 pub datasource_cancellation_token: Option<CancellationToken>,
641 pub shutdown_strategy: ShutdownStrategy,
642 pub channel_buffer_size: usize,
643}
644
645impl PipelineBuilder {
646 /// Creates a new `PipelineBuilder` with empty collections for datasources,
647 /// pipes, and metrics.
648 ///
649 /// This method initializes a `PipelineBuilder` instance, allowing you to
650 /// configure each component of a `Pipeline` before building it. The
651 /// builder pattern offers flexibility in adding data sources, account
652 /// and transaction handling pipes, deletion processing, and metrics
653 /// collection features.
654 ///
655 /// # Example
656 ///
657 /// ```rust
658 /// let builder = PipelineBuilder::new();
659 /// ```
660 pub fn new() -> Self {
661 log::trace!("PipelineBuilder::new()");
662 Self::default()
663 }
664
665 /// Adds a datasource to the pipeline.
666 ///
667 /// The datasource is responsible for providing updates, such as account and
668 /// transaction data, to the pipeline. Multiple datasources can be added
669 /// to handle various types of updates.
670 ///
671 /// # Parameters
672 ///
673 /// - `datasource`: The data source to add, implementing the `Datasource`
674 /// trait.
675 ///
676 /// # Example
677 ///
678 /// ```rust
679 /// let builder = PipelineBuilder::new()
680 /// .datasource(MyDatasource::new());
681 /// ```
682 pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
683 log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
684 self.datasources.push(Arc::new(datasource));
685 self
686 }
687
688 /// Sets the shutdown strategy for the pipeline.
689 ///
690 /// This method configures how the pipeline should handle shutdowns. The
691 /// shutdown strategy defines whether the pipeline should terminate
692 /// immediately or continue processing pending updates after terminating
693 /// the data sources.
694 ///
695 /// # Parameters
696 ///
697 /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
698 /// how the pipeline should handle shutdowns.
699 ///
700 /// # Returns
701 ///
702 /// Returns `Self`, allowing for method chaining.
703 ///
704 /// # Notes
705 ///
706 /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
707 /// instantly, including all active processing tasks.
708 /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
709 /// sources first and allow the pipeline to finish processing any updates
710 /// that are still pending.
711 pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
712 log::trace!(
713 "shutdown_strategy(self, shutdown_strategy: {:?})",
714 shutdown_strategy
715 );
716 self.shutdown_strategy = shutdown_strategy;
717 self
718 }
719
720 /// Adds an account pipe to process account updates.
721 ///
722 /// Account pipes decode and process updates to accounts within the
723 /// pipeline. This method requires both an `AccountDecoder` and a
724 /// `Processor` to handle decoded account data.
725 ///
726 /// # Parameters
727 ///
728 /// - `decoder`: An `AccountDecoder` that decodes the account data.
729 /// - `processor`: A `Processor` that processes the decoded account data.
730 ///
731 /// # Example
732 ///
733 /// ```rust
734 /// let builder = PipelineBuilder::new()
735 /// .account(MyAccountDecoder, MyAccountProcessor);
736 /// ```
737 pub fn account<T: Send + Sync + 'static>(
738 mut self,
739 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
740 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
741 ) -> Self {
742 log::trace!(
743 "account(self, decoder: {:?}, processor: {:?})",
744 stringify!(decoder),
745 stringify!(processor)
746 );
747 self.account_pipes.push(Box::new(AccountPipe {
748 decoder: Box::new(decoder),
749 processor: Box::new(processor),
750 }));
751 self
752 }
753
754 /// Adds an account deletion pipe to handle account deletion events.
755 ///
756 /// Account deletion pipes process deletions of accounts, with a `Processor`
757 /// to handle the deletion events as they occur.
758 ///
759 /// # Parameters
760 ///
761 /// - `processor`: A `Processor` that processes account deletion events.
762 ///
763 /// # Example
764 ///
765 /// ```rust
766 /// let builder = PipelineBuilder::new()
767 /// .account_deletions(MyAccountDeletionProcessor);
768 /// ```
769 pub fn account_deletions(
770 mut self,
771 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
772 ) -> Self {
773 log::trace!(
774 "account_deletions(self, processor: {:?})",
775 stringify!(processor)
776 );
777 self.account_deletion_pipes
778 .push(Box::new(AccountDeletionPipe {
779 processor: Box::new(processor),
780 }));
781 self
782 }
783
784 /// Adds an instruction pipe to process instructions within transactions.
785 ///
786 /// Instruction pipes decode and process individual instructions,
787 /// enabling specialized handling of various instruction types.
788 ///
789 /// # Parameters
790 ///
791 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
792 /// transaction data.
793 /// - `processor`: A `Processor` that processes decoded instruction data.
794 ///
795 /// # Example
796 ///
797 /// ```rust
798 /// let builder = PipelineBuilder::new()
799 /// .instruction(MyDecoder, MyInstructionProcessor);
800 /// ```
801 pub fn instruction<T: Send + Sync + 'static>(
802 mut self,
803 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
804 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
805 ) -> Self {
806 log::trace!(
807 "instruction(self, decoder: {:?}, processor: {:?})",
808 stringify!(decoder),
809 stringify!(processor)
810 );
811 self.instruction_pipes.push(Box::new(InstructionPipe {
812 decoder: Box::new(decoder),
813 processor: Box::new(processor),
814 }));
815 self
816 }
817
818 /// Adds a transaction pipe for processing full transaction data.
819 ///
820 /// This method requires a transaction schema for decoding and a `Processor`
821 /// to handle the processed transaction data.
822 ///
823 /// # Parameters
824 ///
825 /// - `schema`: A `TransactionSchema` used to match and interpret
826 /// transaction data.
827 /// - `processor`: A `Processor` that processes the decoded transaction
828 /// data.
829 ///
830 /// # Example
831 ///
832 /// ```rust
833 /// let builder = PipelineBuilder::new()
834 /// .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
835 /// ```
836 pub fn transaction<T, U>(
837 mut self,
838 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
839 + Send
840 + Sync
841 + 'static,
842 schema: Option<TransactionSchema<T>>,
843 ) -> Self
844 where
845 T: InstructionDecoderCollection + 'static,
846 U: DeserializeOwned + Send + Sync + 'static,
847 {
848 log::trace!(
849 "transaction(self, schema: {:?}, processor: {:?})",
850 stringify!(schema),
851 stringify!(processor)
852 );
853 self.transaction_pipes
854 .push(Box::new(TransactionPipe::<T, U>::new(schema, processor)));
855 self
856 }
857
858 /// Adds a metrics component to the pipeline for performance tracking.
859 ///
860 /// This component collects and reports on pipeline metrics, providing
861 /// insights into performance and operational statistics.
862 ///
863 /// # Parameters
864 ///
865 /// - `metrics`: An instance of a `Metrics` implementation, used to gather
866 /// and report metrics.
867 ///
868 /// # Example
869 ///
870 /// ```rust
871 /// let builder = PipelineBuilder::new()
872 /// .metrics(Arc::new(LogMetrics::new()));
873 /// ```
874 pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
875 log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
876 self.metrics.metrics.push(metrics);
877 self
878 }
879
880 /// Sets the interval for flushing metrics data.
881 ///
882 /// This value defines the frequency, in seconds, at which metrics data is
883 /// flushed from memory. If not set, a default interval is used.
884 ///
885 /// # Parameters
886 ///
887 /// - `interval`: The flush interval for metrics, in seconds.
888 ///
889 /// # Example
890 ///
891 /// ```rust
892 /// let builder = PipelineBuilder::new()
893 /// .metrics_flush_interval(60);
894 /// ```
895 pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
896 log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
897 self.metrics_flush_interval = Some(interval);
898 self
899 }
900
901 /// Sets the cancellation token for cancelling datasource on demand.
902 ///
903 /// This value is used to cancel datasource on demand.
904 /// If not set, a default `CancellationToken` is used.
905 ///
906 /// # Parameters
907 ///
908 /// - `cancellation_token`: An instance of `CancellationToken`.
909 ///
910 /// # Example
911 ///
912 /// ```rust
913 /// let builder = PipelineBuilder::new()
914 /// .datasource_cancellation_token(CancellationToken::new());
915 /// ```
916 pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
917 log::trace!(
918 "datasource_cancellation_token(self, cancellation_token: {:?})",
919 cancellation_token
920 );
921 self.datasource_cancellation_token = Some(cancellation_token);
922 self
923 }
924
925 /// Sets the size of the channel buffer for the pipeline.
926 ///
927 /// This value defines the maximum number of updates that can be queued in
928 /// the pipeline's channel buffer. If not set, a default size of 10_000
929 /// will be used.
930 ///
931 /// # Parameters
932 ///
933 /// - `size`: The size of the channel buffer for the pipeline.
934 ///
935 /// # Example
936 ///
937 /// ```rust
938 /// let builder = PipelineBuilder::new()
939 /// .channel_buffer_size(1000);
940 /// ```
941 pub fn channel_buffer_size(mut self, size: usize) -> Self {
942 log::trace!("channel_buffer_size(self, size: {:?})", size);
943 self.channel_buffer_size = size;
944 self
945 }
946
947 /// Builds and returns a `Pipeline` configured with the specified
948 /// components.
949 ///
950 /// After configuring the `PipelineBuilder` with data sources, pipes, and
951 /// metrics, call this method to create the final `Pipeline` instance
952 /// ready for operation.
953 ///
954 /// # Returns
955 ///
956 /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
957 /// or an error if any part of the configuration is invalid.
958 ///
959 /// # Example
960 ///
961 /// ```rust
962 /// carbon_core::pipeline::Pipeline::builder()
963 /// .datasource(transaction_crawler)
964 /// .metrics(Arc::new(LogMetrics::new()))
965 /// .metrics(Arc::new(PrometheusMetrics::new()))
966 /// .instruction(
967 /// TestProgramDecoder,
968 /// TestProgramProcessor
969 /// )
970 /// .account(
971 /// TestProgramDecoder,
972 /// TestProgramAccountProcessor
973 /// )
974 /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
975 /// .account_deletions(TestProgramAccountDeletionProcessor)
976 /// .channel_buffer_size(1000)
977 /// .build()?
978 /// ```
979 pub fn build(self) -> CarbonResult<Pipeline> {
980 log::trace!("build(self)");
981 Ok(Pipeline {
982 datasources: self.datasources,
983 account_pipes: self.account_pipes,
984 account_deletion_pipes: self.account_deletion_pipes,
985 instruction_pipes: self.instruction_pipes,
986 transaction_pipes: self.transaction_pipes,
987 shutdown_strategy: self.shutdown_strategy,
988 metrics: Arc::new(self.metrics),
989 metrics_flush_interval: self.metrics_flush_interval,
990 datasource_cancellation_token: self.datasource_cancellation_token,
991 channel_buffer_size: self.channel_buffer_size,
992 })
993 }
994}