carbon_core/pipeline.rs
1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//! transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//! and process specific types of data. Account pipes handle account updates,
25//! instruction pipes process instructions within transactions, and
26//! transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//! times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//! for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//! deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//! on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//! Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//! (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//! wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//! pipeline performance, especially in production environments.
52
53use crate::block_details::{BlockDetailsPipe, BlockDetailsPipes};
54use crate::datasource::{BlockDetails, DatasourceId};
55use crate::filter::Filter;
56use {
57 crate::{
58 account::{
59 AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
60 },
61 account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
62 collection::InstructionDecoderCollection,
63 datasource::{AccountDeletion, Datasource, Update},
64 error::CarbonResult,
65 instruction::{
66 InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
67 InstructionsWithMetadata, NestedInstructions,
68 },
69 metrics::{Metrics, MetricsCollection},
70 processor::Processor,
71 schema::TransactionSchema,
72 transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
73 transformers,
74 },
75 core::time,
76 serde::de::DeserializeOwned,
77 std::{convert::TryInto, sync::Arc, time::Instant},
78 tokio_util::sync::CancellationToken,
79};
80
81/// Defines the shutdown behavior for the pipeline.
82///
83/// `ShutdownStrategy` determines how the pipeline will behave when it receives
84/// a shutdown signal. It supports two modes:
85///
86/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
87/// - `ProcessPending`: Terminates the data sources, then completes processing
88/// of any updates currently pending in the pipeline. This is the default
89/// behavior.
90///
91/// # Variants
92///
93/// - `Immediate`: Immediately stops all pipeline activity without processing
94/// any remaining updates.
95/// - `ProcessPending`: Gracefully terminates the data sources and allows the
96/// pipeline to finish processing updates that are still in progress or
97/// queued.
98///
99/// # Notes
100///
101/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
102/// that no updates are lost during shutdown.
103#[derive(Default, PartialEq, Debug)]
104pub enum ShutdownStrategy {
105 /// Stop the whole pipeline immediately.
106 Immediate,
107 /// Terminate the datasource(s) and finish processing all pending updates.
108 #[default]
109 ProcessPending,
110}
111
112/// The default size of the channel buffer for the pipeline.
113///
114/// This constant defines the default number of updates that can be queued in
115/// the pipeline's channel buffer. It is used as a fallback value if the
116/// `channel_buffer_size` is not explicitly set during pipeline construction.
117///
118/// The default size is 10,000 updates, which provides a reasonable balance
119pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
120
121/// Represents the primary data processing pipeline in the `carbon-core`
122/// framework.
123///
124/// The `Pipeline` struct is responsible for orchestrating the flow of data from
125/// various sources, processing it through multiple pipes (for accounts,
126/// transactions, instructions, and account deletions), and recording metrics at
127/// each stage. This flexible design allows for customized data processing,
128/// handling a variety of update types with minimal boilerplate code.
129///
130/// ## Overview
131///
132/// A `Pipeline` instance includes collections of data sources and processing
133/// pipes, enabling users to configure the pipeline to handle diverse types of
134/// blockchain-related data. Each pipe is responsible for decoding, processing,
135/// and routing specific data types, while the metrics system records relevant
136/// statistics.
137///
138/// ### Key Concepts
139///
140/// - **Datasources**: These provide the raw data, such as account updates,
141/// transaction details, and account deletions.
142/// - **Pipes**: Modular units that handle specific data types:
143/// - `AccountPipes` for account updates.
144/// - `AccountDeletionPipes` for account deletions.
145/// - `InstructionPipes` for instruction data within transactions.
146/// - `TransactionPipes` for entire transaction payloads.
147/// - **Metrics**: Collect performance data, enabling real-time insights and
148/// efficient monitoring.
149///
150/// ## Fields
151///
152/// - `datasources`: A vector of data sources (`Datasource` implementations)
153/// that provide the data for processing. Each data source is paired with a
154/// unique `DatasourceId` for identification and filtering purposes. Each data
155/// source must be wrapped in an `Arc` for safe, concurrent access.
156/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
157/// account updates.
158/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
159/// deletion events.
160/// - `block_details_pipes`: A vector of `BlockDetailsPipes` to handle
161/// block details.
162/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
163/// instructions within transactions. These pipes work with nested
164/// instructions and are generically defined to support varied instruction
165/// types.
166/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
167/// processing complete transaction payloads.
168/// - `metrics`: A vector of `Metrics` implementations to record and track
169/// performance data. Each metrics instance is managed within an `Arc` to
170/// ensure thread safety.
171/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
172/// frequently metrics should be flushed. If `None`, the default interval is
173/// used.
174/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
175/// not set, a default size of 10_000 will be used.
176///
177/// ## Example
178///
179/// ```ignore
180/// use std::sync::Arc;
181///
182/// carbon_core::pipeline::Pipeline::builder()
183/// .datasource(transaction_crawler)
184/// .metrics(Arc::new(LogMetrics::new()))
185/// .metrics(Arc::new(PrometheusMetrics::new()))
186/// .instruction(
187/// TestProgramDecoder,
188/// TestProgramProcessor
189/// )
190/// .account(
191/// TestProgramDecoder,
192/// TestProgramAccountProcessor
193/// )
194/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
195/// .account_deletions(TestProgramAccountDeletionProcessor)
196/// .channel_buffer_size(1000)
197/// .build()?
198/// .run()
199/// .await?;
200/// ```
201///
202/// ## Notes
203///
204/// - Ensure that each data source and pipe implements the required traits, such
205/// as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
206/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
207/// `Box` types to handle shared ownership and trait object storage.
208/// - The `metrics_flush_interval` controls how frequently the pipeline's
209/// metrics are flushed. If `None`, a default interval (usually 5 seconds) is
210/// used.
211pub struct Pipeline {
212 pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
213 pub account_pipes: Vec<Box<dyn AccountPipes>>,
214 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
215 pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
216 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
217 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
218 pub metrics: Arc<MetricsCollection>,
219 pub metrics_flush_interval: Option<u64>,
220 pub datasource_cancellation_token: Option<CancellationToken>,
221 pub shutdown_strategy: ShutdownStrategy,
222 pub channel_buffer_size: usize,
223}
224
225impl Pipeline {
226 /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
227 ///
228 /// The `builder` method returns a `PipelineBuilder` that allows you to
229 /// configure and customize the pipeline components before building the
230 /// final `Pipeline` object. This approach provides a flexible and
231 /// type-safe way to assemble a pipeline by specifying data sources,
232 /// processing pipes, and metrics.
233 ///
234 /// # Example
235 ///
236 /// ```ignore
237 /// use std::sync::Arc;
238 ///
239 /// carbon_core::pipeline::Pipeline::builder()
240 /// .datasource(transaction_crawler)
241 /// .metrics(Arc::new(LogMetrics::new()))
242 /// .metrics(Arc::new(PrometheusMetrics::new()))
243 /// .instruction(
244 /// TestProgramDecoder,
245 /// TestProgramProcessor
246 /// );
247 /// // ...
248 /// ```
249 ///
250 /// # Returns
251 ///
252 /// Returns a `PipelineBuilder` instance with empty collections for data
253 /// sources, pipes, and metrics. You can then configure each component
254 /// using the builder pattern.
255 pub fn builder() -> PipelineBuilder {
256 log::trace!("Pipeline::builder()");
257 PipelineBuilder {
258 datasources: Vec::new(),
259 account_pipes: Vec::new(),
260 account_deletion_pipes: Vec::new(),
261 block_details_pipes: Vec::new(),
262 instruction_pipes: Vec::new(),
263 transaction_pipes: Vec::new(),
264 metrics: MetricsCollection::default(),
265 metrics_flush_interval: None,
266 datasource_cancellation_token: None,
267 shutdown_strategy: ShutdownStrategy::default(),
268 channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
269 }
270 }
271
272 /// Runs the `Pipeline`, processing updates from data sources and handling
273 /// metrics.
274 ///
275 /// The `run` method initializes the pipeline's metrics system and starts
276 /// listening for updates from the configured data sources. It checks
277 /// the types of updates provided by each data source to ensure that the
278 /// required data types are available for processing. The method then
279 /// enters a loop where it processes each update received from the data
280 /// sources in turn, logging and updating metrics based on the success
281 /// or failure of each operation.
282 ///
283 /// # How it Works
284 ///
285 /// - Initializes metrics and sets up an interval for periodic metric
286 /// flushing.
287 /// - Spawns tasks for each data source to continuously consume updates.
288 /// - Processes updates according to their type (e.g., Account, Transaction,
289 /// or AccountDeletion).
290 /// - Records performance metrics such as update processing times, and
291 /// tracks success and failure counts.
292 ///
293 /// # Errors
294 ///
295 /// The method returns an `Err` variant if:
296 /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
297 /// `Transaction`) are not provided by any data source, causing a mismatch
298 /// in expected data processing capabilities.
299 /// - A data source encounters an error while consuming updates.
300 /// - An error occurs during metrics flushing or processing of updates.
301 ///
302 /// # Example
303 ///
304 /// ```ignore
305 /// use carbon_core::pipeline::Pipeline;
306 ///
307 /// let mut pipeline = Pipeline::builder()
308 /// .datasource(MyDatasource::new())
309 /// .metrics(MyMetrics::new())
310 /// .build()
311 /// .expect("Failed to build pipeline");
312 ///
313 /// // Running the pipeline asynchronously
314 /// tokio::spawn(async move {
315 /// if let Err(e) = pipeline.run().await {
316 /// eprintln!("Pipeline run error: {:?}", e);
317 /// }
318 /// });
319 /// ```
320 ///
321 /// # Notes
322 ///
323 /// - This method is asynchronous and should be awaited within a Tokio
324 /// runtime environment.
325 /// - The pipeline monitors metrics and flushes them based on the configured
326 /// `metrics_flush_interval`.
327 /// - The `run` method operates in an infinite loop, handling updates until
328 /// a termination condition occurs.
329 pub async fn run(&mut self) -> CarbonResult<()> {
330 log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
331 self.datasources.len(),
332 self.metrics.metrics.len(),
333 self.account_pipes.len(),
334 self.account_deletion_pipes.len(),
335 self.instruction_pipes.len(),
336 self.transaction_pipes.len(),
337 );
338
339 log::trace!("run(self)");
340
341 self.metrics.initialize_metrics().await?;
342 let (update_sender, mut update_receiver) =
343 tokio::sync::mpsc::channel::<(Update, DatasourceId)>(self.channel_buffer_size);
344
345 let datasource_cancellation_token = self
346 .datasource_cancellation_token
347 .clone()
348 .unwrap_or_default();
349
350 for datasource in &self.datasources {
351 let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
352 let sender_clone = update_sender.clone();
353 let datasource_clone = Arc::clone(&datasource.1);
354 let datasource_id = datasource.0.clone();
355 let metrics_collection = self.metrics.clone();
356
357 tokio::spawn(async move {
358 if let Err(e) = datasource_clone
359 .consume(
360 datasource_id,
361 sender_clone,
362 datasource_cancellation_token_clone,
363 metrics_collection,
364 )
365 .await
366 {
367 log::error!("error consuming datasource: {:?}", e);
368 }
369 });
370 }
371
372 drop(update_sender);
373
374 let mut interval = tokio::time::interval(time::Duration::from_secs(
375 self.metrics_flush_interval.unwrap_or(5),
376 ));
377
378 loop {
379 tokio::select! {
380 _ = datasource_cancellation_token.cancelled() => {
381 log::trace!("datasource cancellation token cancelled, shutting down.");
382 self.metrics.flush_metrics().await?;
383 self.metrics.shutdown_metrics().await?;
384 break;
385 }
386 _ = tokio::signal::ctrl_c() => {
387 log::trace!("received SIGINT, shutting down.");
388 datasource_cancellation_token.cancel();
389
390 if self.shutdown_strategy == ShutdownStrategy::Immediate {
391 log::info!("shutting down the pipeline immediately.");
392 self.metrics.flush_metrics().await?;
393 self.metrics.shutdown_metrics().await?;
394 break;
395 } else {
396 log::info!("shutting down the pipeline after processing pending updates.");
397 }
398 }
399 _ = interval.tick() => {
400 self.metrics.flush_metrics().await?;
401 }
402 update = update_receiver.recv() => {
403 match update {
404 Some((update, datasource_id)) => {
405 self
406 .metrics.increment_counter("updates_received", 1)
407 .await?;
408
409 let start = Instant::now();
410 let process_result = self.process(update.clone(), datasource_id.clone()).await;
411 let time_taken_nanoseconds = start.elapsed().as_nanos();
412 let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
413
414 self
415 .metrics
416 .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
417 .await?;
418
419 self
420 .metrics
421 .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
422 .await?;
423
424 match process_result {
425 Ok(_) => {
426 self
427 .metrics.increment_counter("updates_successful", 1)
428 .await?;
429
430 log::trace!("processed update")
431 }
432 Err(error) => {
433 log::error!("error processing update ({:?}): {:?}", update, error);
434 self.metrics.increment_counter("updates_failed", 1).await?;
435 }
436 };
437
438 self
439 .metrics.increment_counter("updates_processed", 1)
440 .await?;
441
442 self
443 .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
444 .await?;
445 }
446 None => {
447 log::info!("update_receiver closed, shutting down.");
448 self.metrics.flush_metrics().await?;
449 self.metrics.shutdown_metrics().await?;
450 break;
451 }
452 }
453 }
454 }
455 }
456
457 log::info!("pipeline shutdown complete.");
458
459 Ok(())
460 }
461
462 /// Processes a single update and routes it through the appropriate pipeline
463 /// stages.
464 ///
465 /// The `process` method takes an `Update` and determines its type, then
466 /// routes it through the corresponding pipes for handling account
467 /// updates, transactions, or account deletions. It also records metrics
468 /// for processed updates, providing insights into the processing
469 /// workload and performance.
470 ///
471 /// ## Functionality
472 ///
473 /// - **Account Updates**: Passes account updates through the
474 /// `account_pipes`. Each pipe processes the account metadata and the
475 /// updated account state.
476 /// - **Transaction Updates**: Extracts transaction metadata and
477 /// instructions, nests them if needed, and routes them through
478 /// `instruction_pipes` and `transaction_pipes`.
479 /// - **Account Deletions**: Sends account deletion events through the
480 /// `account_deletion_pipes`.
481 ///
482 /// The method also updates metrics counters for each type of update,
483 /// tracking how many updates have been processed in each category.
484 ///
485 /// # Parameters
486 ///
487 /// - `update`: An `Update` variant representing the type of data received.
488 /// This can be an `Account`, `Transaction`, or `AccountDeletion`, each
489 /// triggering different processing logic within the pipeline.
490 /// - `datasource_id`: The ID of the datasource that produced this update.
491 /// This is used by filters to determine whether the update should be
492 /// processed by specific pipes.
493 ///
494 /// # Returns
495 ///
496 /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
497 /// processing or an error if processing fails at any stage.
498 ///
499 /// # Notes
500 ///
501 /// - This method is asynchronous and should be awaited within a Tokio
502 /// runtime.
503 /// - Each type of update (account, transaction, account deletion) requires
504 /// its own set of pipes, so ensure that appropriate pipes are configured
505 /// based on the data types expected from the data sources.
506 /// - Metrics are recorded after each successful processing stage to track
507 /// processing volumes and identify potential bottlenecks in real-time.
508 /// - Filters are applied to each pipe before processing, allowing for
509 /// selective update processing based on datasource ID and other criteria.
510 ///
511 /// # Errors
512 ///
513 /// Returns an error if any of the pipes fail during processing, or if an
514 /// issue arises while incrementing counters or updating metrics. Handle
515 /// errors gracefully to ensure continuous pipeline operation.
516 async fn process(&mut self, update: Update, datasource_id: DatasourceId) -> CarbonResult<()> {
517 log::trace!(
518 "process(self, update: {:?}, datasource_id: {:?})",
519 update,
520 datasource_id
521 );
522 match update {
523 Update::Account(account_update) => {
524 let account_metadata = AccountMetadata {
525 slot: account_update.slot,
526 pubkey: account_update.pubkey,
527 };
528
529 for pipe in self.account_pipes.iter_mut() {
530 if pipe.filters().iter().all(|filter| {
531 filter.filter_account(
532 &datasource_id,
533 &account_metadata,
534 &account_update.account,
535 )
536 }) {
537 pipe.run(
538 (account_metadata.clone(), account_update.account.clone()),
539 self.metrics.clone(),
540 )
541 .await?;
542 }
543 }
544
545 self.metrics
546 .increment_counter("account_updates_processed", 1)
547 .await?;
548 }
549 Update::Transaction(transaction_update) => {
550 let transaction_metadata = Arc::new((*transaction_update).clone().try_into()?);
551
552 let instructions_with_metadata: InstructionsWithMetadata =
553 transformers::extract_instructions_with_metadata(
554 &transaction_metadata,
555 &transaction_update,
556 )?;
557
558 let nested_instructions: NestedInstructions = instructions_with_metadata.into();
559
560 for pipe in self.instruction_pipes.iter_mut() {
561 for nested_instruction in nested_instructions.iter() {
562 if pipe.filters().iter().all(|filter| {
563 filter.filter_instruction(&datasource_id, nested_instruction)
564 }) {
565 pipe.run(nested_instruction, self.metrics.clone()).await?;
566 }
567 }
568 }
569
570 for pipe in self.transaction_pipes.iter_mut() {
571 if pipe.filters().iter().all(|filter| {
572 filter.filter_transaction(
573 &datasource_id,
574 &transaction_metadata,
575 &nested_instructions,
576 )
577 }) {
578 pipe.run(
579 transaction_metadata.clone(),
580 &nested_instructions,
581 self.metrics.clone(),
582 )
583 .await?;
584 }
585 }
586
587 self.metrics
588 .increment_counter("transaction_updates_processed", 1)
589 .await?;
590 }
591 Update::AccountDeletion(account_deletion) => {
592 for pipe in self.account_deletion_pipes.iter_mut() {
593 if pipe.filters().iter().all(|filter| {
594 filter.filter_account_deletion(&datasource_id, &account_deletion)
595 }) {
596 pipe.run(account_deletion.clone(), self.metrics.clone())
597 .await?;
598 }
599 }
600
601 self.metrics
602 .increment_counter("account_deletions_processed", 1)
603 .await?;
604 }
605 Update::BlockDetails(block_details) => {
606 for pipe in self.block_details_pipes.iter_mut() {
607 if pipe
608 .filters()
609 .iter()
610 .all(|filter| filter.filter_block_details(&datasource_id, &block_details))
611 {
612 pipe.run(block_details.clone(), self.metrics.clone())
613 .await?;
614 }
615 }
616
617 self.metrics
618 .increment_counter("block_details_processed", 1)
619 .await?;
620 }
621 };
622
623 Ok(())
624 }
625}
626
627/// A builder for constructing a `Pipeline` instance with customized data
628/// sources, processing pipes, and metrics.
629///
630/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
631/// by allowing configuration of its components, such as data sources, account
632/// and transaction pipes, deletion handling, and metrics. Using the builder
633/// pattern, you can add the desired elements incrementally and then finalize
634/// with a call to `build`.
635///
636/// ## Overview
637///
638/// The `PipelineBuilder` supports the following components:
639/// - **Datasources**: Sources of data updates, such as account information and
640/// transactions.
641/// - **Account Pipes**: For processing account updates from data sources.
642/// - **Account Deletion Pipes**: For handling account deletion updates.
643/// - **Instruction Pipes**: For handling instructions associated with
644/// transactions.
645/// - **Transaction Pipes**: For handling full transaction data.
646/// - **Metrics**: Collects and reports performance data, such as update
647/// processing times.
648/// - **Metrics Flush Interval**: Optional interval defining how often to flush
649/// metrics data.
650///
651/// Each component can be added through method chaining, enhancing code
652/// readability and maintainability.
653///
654/// # Example
655///
656/// ```ignore
657/// use std::sync::Arc;
658///
659/// carbon_core::pipeline::Pipeline::builder()
660/// .datasource(transaction_crawler)
661/// .metrics(Arc::new(LogMetrics::new()))
662/// .metrics(Arc::new(PrometheusMetrics::new()))
663/// .instruction(
664/// TestProgramDecoder,
665/// TestProgramProcessor
666/// );
667/// // ...
668/// ```
669///
670/// # Fields
671///
672/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
673/// shared ownership across threads. Each `Datasource` provides updates to the
674/// pipeline.
675/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
676/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
677/// processing account deletions.
678/// - `instruction_pipes`: A collection of `InstructionPipes` to process
679/// instructions in transactions.
680/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
681/// transaction data.
682/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
683/// performance.
684/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
685/// metrics data. If not set, a default flush interval will be used.
686/// - `datasource_cancellation_token`: An optional `CancellationToken` for
687/// canceling datasource. If not set, a default `CancellationToken` will be
688/// used.
689/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
690/// not set, a default size of 10_000 will be used.
691///
692/// # Returns
693///
694/// After configuring the builder, call `build` to create a `Pipeline` instance.
695/// The builder will return a `CarbonResult<Pipeline>`, which will either
696/// contain the configured pipeline or an error if configuration failed.
697///
698/// # Notes
699///
700/// - The builder pattern allows for method chaining, making it easy to
701/// incrementally add components to the `Pipeline`.
702/// - Ensure that each component matches the data and update types expected by
703/// your application.
704#[derive(Default)]
705pub struct PipelineBuilder {
706 pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
707 pub account_pipes: Vec<Box<dyn AccountPipes>>,
708 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
709 pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
710 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
711 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
712 pub metrics: MetricsCollection,
713 pub metrics_flush_interval: Option<u64>,
714 pub datasource_cancellation_token: Option<CancellationToken>,
715 pub shutdown_strategy: ShutdownStrategy,
716 pub channel_buffer_size: usize,
717}
718
719impl PipelineBuilder {
720 /// Creates a new `PipelineBuilder` with empty collections for datasources,
721 /// pipes, and metrics.
722 ///
723 /// This method initializes a `PipelineBuilder` instance, allowing you to
724 /// configure each component of a `Pipeline` before building it. The
725 /// builder pattern offers flexibility in adding data sources, account
726 /// and transaction handling pipes, deletion processing, and metrics
727 /// collection features.
728 ///
729 /// # Example
730 ///
731 /// ```rust
732 /// use carbon_core::pipeline::PipelineBuilder;
733 ///
734 /// let builder = PipelineBuilder::new();
735 /// ```
736 pub fn new() -> Self {
737 log::trace!("PipelineBuilder::new()");
738 Self::default()
739 }
740
741 /// Adds a datasource to the pipeline.
742 ///
743 /// The datasource is responsible for providing updates, such as account and
744 /// transaction data, to the pipeline. Multiple datasources can be added
745 /// to handle various types of updates.
746 ///
747 /// # Parameters
748 ///
749 /// - `datasource`: The data source to add, implementing the `Datasource`
750 /// trait.
751 ///
752 /// # Example
753 ///
754 /// ```ignore
755 /// use carbon_core::pipeline::PipelineBuilder;
756 ///
757 /// let builder = PipelineBuilder::new()
758 /// .datasource(MyDatasource::new());
759 /// ```
760 pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
761 log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
762 self.datasources
763 .push((DatasourceId::new_unique(), Arc::new(datasource)));
764 self
765 }
766
767 /// Adds a datasource to the pipeline with a specific ID.
768 ///
769 /// This method allows you to assign a custom ID to a datasource, which is
770 /// useful for filtering updates based on their source. The ID can be used
771 /// with filters to selectively process updates from specific datasources.
772 ///
773 /// # Parameters
774 ///
775 /// - `datasource`: The data source to add, implementing the `Datasource`
776 /// trait
777 /// - `id`: The `DatasourceId` to assign to this datasource
778 ///
779 /// # Example
780 ///
781 /// ```ignore
782 /// use carbon_core::{pipeline::PipelineBuilder, datasource::DatasourceId};
783 ///
784 /// let mainnet_id = DatasourceId::new_named("mainnet");
785 /// let builder = PipelineBuilder::new()
786 /// .datasource_with_id(mainnet_id, MyDatasource::new());
787 /// ```
788 pub fn datasource_with_id(
789 mut self,
790 datasource: impl Datasource + 'static,
791 id: DatasourceId,
792 ) -> Self {
793 log::trace!(
794 "datasource_with_id(self, id: {:?}, datasource: {:?})",
795 id,
796 stringify!(datasource)
797 );
798 self.datasources.push((id, Arc::new(datasource)));
799 self
800 }
801
802 /// Sets the shutdown strategy for the pipeline.
803 ///
804 /// This method configures how the pipeline should handle shutdowns. The
805 /// shutdown strategy defines whether the pipeline should terminate
806 /// immediately or continue processing pending updates after terminating
807 /// the data sources.
808 ///
809 /// # Parameters
810 ///
811 /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
812 /// how the pipeline should handle shutdowns.
813 ///
814 /// # Returns
815 ///
816 /// Returns `Self`, allowing for method chaining.
817 ///
818 /// # Notes
819 ///
820 /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
821 /// instantly, including all active processing tasks.
822 /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
823 /// sources first and allow the pipeline to finish processing any updates
824 /// that are still pending.
825 pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
826 log::trace!(
827 "shutdown_strategy(self, shutdown_strategy: {:?})",
828 shutdown_strategy
829 );
830 self.shutdown_strategy = shutdown_strategy;
831 self
832 }
833
834 /// Adds an account pipe to process account updates.
835 ///
836 /// Account pipes decode and process updates to accounts within the
837 /// pipeline. This method requires both an `AccountDecoder` and a
838 /// `Processor` to handle decoded account data.
839 ///
840 /// # Parameters
841 ///
842 /// - `decoder`: An `AccountDecoder` that decodes the account data.
843 /// - `processor`: A `Processor` that processes the decoded account data.
844 ///
845 /// # Example
846 ///
847 /// ```ignore
848 /// use carbon_core::pipeline::PipelineBuilder;
849 ///
850 /// let builder = PipelineBuilder::new()
851 /// .account(MyAccountDecoder, MyAccountProcessor);
852 /// ```
853 pub fn account<T: Send + Sync + 'static>(
854 mut self,
855 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
856 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
857 ) -> Self {
858 log::trace!(
859 "account(self, decoder: {:?}, processor: {:?})",
860 stringify!(decoder),
861 stringify!(processor)
862 );
863 self.account_pipes.push(Box::new(AccountPipe {
864 decoder: Box::new(decoder),
865 processor: Box::new(processor),
866 filters: vec![],
867 }));
868 self
869 }
870
871 /// Adds an account pipe with filters to process account updates selectively.
872 ///
873 /// This method creates an account pipe that only processes updates that pass
874 /// all the specified filters. Filters can be used to selectively process
875 /// updates based on criteria such as datasource ID, account properties, or
876 /// other custom logic.
877 ///
878 /// # Parameters
879 ///
880 /// - `decoder`: An `AccountDecoder` that decodes the account data
881 /// - `processor`: A `Processor` that processes the decoded account data
882 /// - `filters`: A collection of filters that determine which account updates
883 /// should be processed
884 ///
885 /// # Example
886 ///
887 /// ```ignore
888 /// use carbon_core::{
889 /// pipeline::PipelineBuilder,
890 /// datasource::DatasourceId,
891 /// filter::DatasourceFilter,
892 /// };
893 ///
894 /// let mainnet_id = DatasourceId::new_named("mainnet");
895 /// let filter = DatasourceFilter::new(mainnet_id);
896 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
897 ///
898 /// let builder = PipelineBuilder::new()
899 /// .account_with_filters(MyAccountDecoder, MyAccountProcessor, filters);
900 /// ```
901 pub fn account_with_filters<T: Send + Sync + 'static>(
902 mut self,
903 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
904 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
905 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
906 ) -> Self {
907 log::trace!(
908 "account_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
909 stringify!(decoder),
910 stringify!(processor),
911 stringify!(filters)
912 );
913 self.account_pipes.push(Box::new(AccountPipe {
914 decoder: Box::new(decoder),
915 processor: Box::new(processor),
916 filters,
917 }));
918 self
919 }
920
921 /// Adds an account deletion pipe to handle account deletion events.
922 ///
923 /// Account deletion pipes process deletions of accounts, with a `Processor`
924 /// to handle the deletion events as they occur.
925 ///
926 /// # Parameters
927 ///
928 /// - `processor`: A `Processor` that processes account deletion events.
929 ///
930 /// # Example
931 ///
932 /// ```ignore
933 /// use carbon_core::pipeline::PipelineBuilder;
934 ///
935 /// let builder = PipelineBuilder::new()
936 /// .account_deletions(MyAccountDeletionProcessor);
937 /// ```
938 pub fn account_deletions(
939 mut self,
940 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
941 ) -> Self {
942 log::trace!(
943 "account_deletions(self, processor: {:?})",
944 stringify!(processor)
945 );
946 self.account_deletion_pipes
947 .push(Box::new(AccountDeletionPipe {
948 processor: Box::new(processor),
949 filters: vec![],
950 }));
951 self
952 }
953
954 /// Adds an account deletion pipe with filters to handle account deletion events selectively.
955 ///
956 /// This method creates an account deletion pipe that only processes deletion
957 /// events that pass all the specified filters. Filters can be used to
958 /// selectively process deletions based on criteria such as datasource ID or
959 /// other custom logic.
960 ///
961 /// # Parameters
962 ///
963 /// - `processor`: A `Processor` that processes account deletion events
964 /// - `filters`: A collection of filters that determine which account deletion
965 /// events should be processed
966 ///
967 /// # Example
968 ///
969 /// ```ignore
970 /// use carbon_core::{
971 /// pipeline::PipelineBuilder,
972 /// datasource::DatasourceId,
973 /// filter::DatasourceFilter,
974 /// };
975 ///
976 /// let mainnet_id = DatasourceId::new_named("mainnet");
977 /// let filter = DatasourceFilter::new(mainnet_id);
978 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
979 ///
980 /// let builder = PipelineBuilder::new()
981 /// .account_deletions_with_filters(MyAccountDeletionProcessor, filters);
982 /// ```
983 pub fn account_deletions_with_filters(
984 mut self,
985 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
986 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
987 ) -> Self {
988 log::trace!(
989 "account_deletions_with_filters(self, processor: {:?}, filters: {:?})",
990 stringify!(processor),
991 stringify!(filters)
992 );
993 self.account_deletion_pipes
994 .push(Box::new(AccountDeletionPipe {
995 processor: Box::new(processor),
996 filters,
997 }));
998 self
999 }
1000
1001 /// Adds a block details pipe to handle block details updates.
1002 ///
1003 /// Block details pipes process updates related to block metadata, such as
1004 /// slot, block hash, and rewards, with a `Processor` to handle the updates.
1005 ///
1006 /// # Parameters
1007 ///
1008 /// - `processor`: A `Processor` that processes block details updates.
1009 ///
1010 /// # Example
1011 ///
1012 /// ```ignore
1013 /// use carbon_core::pipeline::PipelineBuilder;
1014 ///
1015 /// let builder = PipelineBuilder::new()
1016 /// .block_details(MyBlockDetailsProcessor);
1017 /// ```
1018 pub fn block_details(
1019 mut self,
1020 processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1021 ) -> Self {
1022 log::trace!(
1023 "block_details(self, processor: {:?})",
1024 stringify!(processor)
1025 );
1026 self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1027 processor: Box::new(processor),
1028 filters: vec![],
1029 }));
1030 self
1031 }
1032
1033 /// Adds a block details pipe with filters to handle block details updates selectively.
1034 ///
1035 /// This method creates a block details pipe that only processes updates that
1036 /// pass all the specified filters. Filters can be used to selectively process
1037 /// block details updates based on criteria such as datasource ID, block height,
1038 /// or other custom logic.
1039 ///
1040 /// # Parameters
1041 ///
1042 /// - `processor`: A `Processor` that processes block details updates
1043 /// - `filters`: A collection of filters that determine which block details
1044 /// updates should be processed
1045 ///
1046 /// # Example
1047 ///
1048 /// ```ignore
1049 /// use carbon_core::{
1050 /// pipeline::PipelineBuilder,
1051 /// datasource::DatasourceId,
1052 /// filter::DatasourceFilter,
1053 /// };
1054 ///
1055 /// let mainnet_id = DatasourceId::new_named("mainnet");
1056 /// let filter = DatasourceFilter::new(mainnet_id);
1057 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1058 ///
1059 /// let builder = PipelineBuilder::new()
1060 /// .block_details_with_filters(MyBlockDetailsProcessor, filters);
1061 /// ```
1062 pub fn block_details_with_filters(
1063 mut self,
1064 processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1065 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1066 ) -> Self {
1067 log::trace!(
1068 "block_details_with_filters(self, processor: {:?}, filters: {:?})",
1069 stringify!(processor),
1070 stringify!(filters)
1071 );
1072 self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1073 processor: Box::new(processor),
1074 filters,
1075 }));
1076 self
1077 }
1078
1079 /// Adds an instruction pipe to process instructions within transactions.
1080 ///
1081 /// Instruction pipes decode and process individual instructions,
1082 /// enabling specialized handling of various instruction types.
1083 ///
1084 /// # Parameters
1085 ///
1086 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1087 /// transaction data.
1088 /// - `processor`: A `Processor` that processes decoded instruction data.
1089 ///
1090 /// # Example
1091 ///
1092 /// ```ignore
1093 /// use carbon_core::pipeline::PipelineBuilder;
1094 ///
1095 /// let builder = PipelineBuilder::new()
1096 /// .instruction(MyDecoder, MyInstructionProcessor);
1097 /// ```
1098 pub fn instruction<T: Send + Sync + 'static>(
1099 mut self,
1100 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1101 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1102 ) -> Self {
1103 log::trace!(
1104 "instruction(self, decoder: {:?}, processor: {:?})",
1105 stringify!(decoder),
1106 stringify!(processor)
1107 );
1108 self.instruction_pipes.push(Box::new(InstructionPipe {
1109 decoder: Box::new(decoder),
1110 processor: Box::new(processor),
1111 filters: vec![],
1112 }));
1113 self
1114 }
1115
1116 /// Adds an instruction pipe with filters to process instructions selectively.
1117 ///
1118 /// This method creates an instruction pipe that only processes instructions
1119 /// that pass all the specified filters. Filters can be used to selectively
1120 /// process instructions based on criteria such as datasource ID, instruction
1121 /// type, or other custom logic.
1122 ///
1123 /// # Parameters
1124 ///
1125 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1126 /// transaction data
1127 /// - `processor`: A `Processor` that processes decoded instruction data
1128 /// - `filters`: A collection of filters that determine which instructions
1129 /// should be processed
1130 ///
1131 /// # Example
1132 ///
1133 /// ```ignore
1134 /// use carbon_core::{
1135 /// pipeline::PipelineBuilder,
1136 /// datasource::DatasourceId,
1137 /// filter::DatasourceFilter,
1138 /// };
1139 ///
1140 /// let mainnet_id = DatasourceId::new_named("mainnet");
1141 /// let filter = DatasourceFilter::new(mainnet_id);
1142 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1143 ///
1144 /// let builder = PipelineBuilder::new()
1145 /// .instruction_with_filters(MyDecoder, MyInstructionProcessor, filters);
1146 /// ```
1147 pub fn instruction_with_filters<T: Send + Sync + 'static>(
1148 mut self,
1149 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1150 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1151 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1152 ) -> Self {
1153 log::trace!(
1154 "instruction_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
1155 stringify!(decoder),
1156 stringify!(processor),
1157 stringify!(filters)
1158 );
1159 self.instruction_pipes.push(Box::new(InstructionPipe {
1160 decoder: Box::new(decoder),
1161 processor: Box::new(processor),
1162 filters,
1163 }));
1164 self
1165 }
1166
1167 /// Adds a transaction pipe for processing full transaction data.
1168 ///
1169 /// This method requires a transaction schema for decoding and a `Processor`
1170 /// to handle the processed transaction data.
1171 ///
1172 /// # Parameters
1173 ///
1174 /// - `schema`: A `TransactionSchema` used to match and interpret
1175 /// transaction data.
1176 /// - `processor`: A `Processor` that processes the decoded transaction
1177 /// data.
1178 ///
1179 /// # Example
1180 ///
1181 /// ```ignore
1182 /// use carbon_core::pipeline::PipelineBuilder;
1183 ///
1184 /// let builder = PipelineBuilder::new()
1185 /// .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
1186 /// ```
1187 pub fn transaction<T, U>(
1188 mut self,
1189 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1190 + Send
1191 + Sync
1192 + 'static,
1193 schema: Option<TransactionSchema<T>>,
1194 ) -> Self
1195 where
1196 T: InstructionDecoderCollection + 'static,
1197 U: DeserializeOwned + Send + Sync + 'static,
1198 {
1199 log::trace!(
1200 "transaction(self, schema: {:?}, processor: {:?})",
1201 stringify!(schema),
1202 stringify!(processor)
1203 );
1204 self.transaction_pipes
1205 .push(Box::new(TransactionPipe::<T, U>::new(
1206 schema,
1207 processor,
1208 vec![],
1209 )));
1210 self
1211 }
1212
1213 /// Adds a transaction pipe with filters for processing full transaction data selectively.
1214 ///
1215 /// This method creates a transaction pipe that only processes transactions
1216 /// that pass all the specified filters. Filters can be used to selectively
1217 /// process transactions based on criteria such as datasource ID, transaction
1218 /// type, or other custom logic.
1219 ///
1220 /// # Parameters
1221 ///
1222 /// - `processor`: A `Processor` that processes the decoded transaction data
1223 /// - `schema`: A `TransactionSchema` used to match and interpret
1224 /// transaction data
1225 /// - `filters`: A collection of filters that determine which transactions
1226 /// should be processed
1227 ///
1228 /// # Example
1229 ///
1230 /// ```ignore
1231 /// use carbon_core::{
1232 /// pipeline::PipelineBuilder,
1233 /// datasource::DatasourceId,
1234 /// filter::DatasourceFilter,
1235 /// };
1236 ///
1237 /// let mainnet_id = DatasourceId::new_named("mainnet");
1238 /// let filter = DatasourceFilter::new(mainnet_id);
1239 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1240 ///
1241 /// let builder = PipelineBuilder::new()
1242 /// .transaction_with_filters(MyTransactionProcessor, MY_SCHEMA.clone(), filters);
1243 /// ```
1244 pub fn transaction_with_filters<T, U>(
1245 mut self,
1246 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1247 + Send
1248 + Sync
1249 + 'static,
1250 schema: Option<TransactionSchema<T>>,
1251 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1252 ) -> Self
1253 where
1254 T: InstructionDecoderCollection + 'static,
1255 U: DeserializeOwned + Send + Sync + 'static,
1256 {
1257 log::trace!(
1258 "transaction_with_filters(self, schema: {:?}, processor: {:?}, filters: {:?})",
1259 stringify!(schema),
1260 stringify!(processor),
1261 stringify!(filters)
1262 );
1263 self.transaction_pipes
1264 .push(Box::new(TransactionPipe::<T, U>::new(
1265 schema, processor, filters,
1266 )));
1267 self
1268 }
1269
1270 /// Adds a metrics component to the pipeline for performance tracking.
1271 ///
1272 /// This component collects and reports on pipeline metrics, providing
1273 /// insights into performance and operational statistics.
1274 ///
1275 /// # Parameters
1276 ///
1277 /// - `metrics`: An instance of a `Metrics` implementation, used to gather
1278 /// and report metrics.
1279 ///
1280 /// # Example
1281 ///
1282 /// ```ignore
1283 /// use std::sync::Arc;
1284 /// use carbon_core::pipeline::PipelineBuilder;
1285 ///
1286 /// let builder = PipelineBuilder::new()
1287 /// .metrics(Arc::new(LogMetrics::new()));
1288 /// ```
1289 pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
1290 log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
1291 self.metrics.metrics.push(metrics);
1292 self
1293 }
1294
1295 /// Sets the interval for flushing metrics data.
1296 ///
1297 /// This value defines the frequency, in seconds, at which metrics data is
1298 /// flushed from memory. If not set, a default interval is used.
1299 ///
1300 /// # Parameters
1301 ///
1302 /// - `interval`: The flush interval for metrics, in seconds.
1303 ///
1304 /// # Example
1305 ///
1306 /// ```rust
1307 /// use carbon_core::pipeline::PipelineBuilder;
1308 ///
1309 /// let builder = PipelineBuilder::new()
1310 /// .metrics_flush_interval(60);
1311 /// ```
1312 pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
1313 log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
1314 self.metrics_flush_interval = Some(interval);
1315 self
1316 }
1317
1318 /// Sets the cancellation token for cancelling datasource on demand.
1319 ///
1320 /// This value is used to cancel datasource on demand.
1321 /// If not set, a default `CancellationToken` is used.
1322 ///
1323 /// # Parameters
1324 ///
1325 /// - `cancellation_token`: An instance of `CancellationToken`.
1326 ///
1327 /// # Example
1328 ///
1329 /// ```rust
1330 /// use carbon_core::pipeline::PipelineBuilder;
1331 /// use tokio_util::sync::CancellationToken;
1332 ///
1333 /// let builder = PipelineBuilder::new()
1334 /// .datasource_cancellation_token(CancellationToken::new());
1335 /// ```
1336 pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
1337 log::trace!(
1338 "datasource_cancellation_token(self, cancellation_token: {:?})",
1339 cancellation_token
1340 );
1341 self.datasource_cancellation_token = Some(cancellation_token);
1342 self
1343 }
1344
1345 /// Sets the size of the channel buffer for the pipeline.
1346 ///
1347 /// This value defines the maximum number of updates that can be queued in
1348 /// the pipeline's channel buffer. If not set, a default size of 10_000
1349 /// will be used.
1350 ///
1351 /// # Parameters
1352 ///
1353 /// - `size`: The size of the channel buffer for the pipeline.
1354 ///
1355 /// # Example
1356 ///
1357 /// ```rust
1358 /// use carbon_core::pipeline::PipelineBuilder;
1359 ///
1360 /// let builder = PipelineBuilder::new()
1361 /// .channel_buffer_size(1000);
1362 /// ```
1363 pub fn channel_buffer_size(mut self, size: usize) -> Self {
1364 log::trace!("channel_buffer_size(self, size: {:?})", size);
1365 self.channel_buffer_size = size;
1366 self
1367 }
1368
1369 /// Builds and returns a `Pipeline` configured with the specified
1370 /// components.
1371 ///
1372 /// After configuring the `PipelineBuilder` with data sources, pipes, and
1373 /// metrics, call this method to create the final `Pipeline` instance
1374 /// ready for operation.
1375 ///
1376 /// # Returns
1377 ///
1378 /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
1379 /// or an error if any part of the configuration is invalid.
1380 ///
1381 /// # Example
1382 ///
1383 /// ```ignore
1384 /// use std::sync::Arc;
1385 ///
1386 /// carbon_core::pipeline::Pipeline::builder()
1387 /// .datasource(transaction_crawler)
1388 /// .metrics(Arc::new(LogMetrics::new()))
1389 /// .metrics(Arc::new(PrometheusMetrics::new()))
1390 /// .instruction(
1391 /// TestProgramDecoder,
1392 /// TestProgramProcessor
1393 /// )
1394 /// .account(
1395 /// TestProgramDecoder,
1396 /// TestProgramAccountProcessor
1397 /// )
1398 /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
1399 /// .account_deletions(TestProgramAccountDeletionProcessor)
1400 /// .channel_buffer_size(1000)
1401 /// .build()?;
1402 ///
1403 /// Ok(())
1404 /// ```
1405 pub fn build(self) -> CarbonResult<Pipeline> {
1406 log::trace!("build(self)");
1407 Ok(Pipeline {
1408 datasources: self.datasources,
1409 account_pipes: self.account_pipes,
1410 account_deletion_pipes: self.account_deletion_pipes,
1411 block_details_pipes: self.block_details_pipes,
1412 instruction_pipes: self.instruction_pipes,
1413 transaction_pipes: self.transaction_pipes,
1414 shutdown_strategy: self.shutdown_strategy,
1415 metrics: Arc::new(self.metrics),
1416 metrics_flush_interval: self.metrics_flush_interval,
1417 datasource_cancellation_token: self.datasource_cancellation_token,
1418 channel_buffer_size: self.channel_buffer_size,
1419 })
1420 }
1421}