carbon_core/pipeline.rs
1//! Defines the `Pipeline` struct and related components for processing
2//! blockchain data updates.
3//!
4//! The `Pipeline` module is central to the `carbon-core` framework, offering a
5//! flexible and extensible data processing architecture that supports various
6//! blockchain data types, including account updates, transaction details, and
7//! account deletions. The pipeline integrates multiple data sources and
8//! processing pipes to handle and transform incoming data, while recording
9//! performance metrics for monitoring and analysis.
10//!
11//! # Overview
12//!
13//! This module provides the `Pipeline` struct, which orchestrates data flow
14//! from multiple sources, processes it through designated pipes, and captures
15//! metrics at each stage. The pipeline is highly customizable and can be
16//! configured with various components to suit specific data handling
17//! requirements.
18//!
19//! ## Key Components
20//!
21//! - **Datasources**: Provide raw data updates, which may include account or
22//! transaction details.
23//! - **Account, Instruction, and Transaction Pipes**: Modular units that decode
24//! and process specific types of data. Account pipes handle account updates,
25//! instruction pipes process instructions within transactions, and
26//! transaction pipes manage complete transaction records.
27//! - **Metrics**: Collects data on pipeline performance, such as processing
28//! times and error rates, providing insights into operational efficiency.
29//!
30//! # Fields and Configuration
31//!
32//! - **datasources**: A list of `Datasource` objects that act as the sources
33//! for account and transaction data.
34//! - **account_pipes**: A collection of pipes for processing account updates.
35//! - **account_deletion_pipes**: Pipes responsible for handling account
36//! deletion events.
37//! - **instruction_pipes**: Used to process instructions within transactions.
38//! - **transaction_pipes**: For handling full transactions.
39//! - **metrics**: A vector of `Metrics` implementations that gather and report
40//! on performance data.
41//! - **metrics_flush_interval**: Specifies how frequently metrics are flushed.
42//! Defaults to 5 seconds if unset.
43//!
44//! ## Notes
45//!
46//! - Each pipe and data source must implement the appropriate traits
47//! (`Datasource`, `AccountPipes`, `Metrics`, etc.).
48//! - The `Pipeline` is designed for concurrent operation, with `Arc` and `Box`
49//! wrappers ensuring safe, shared access.
50//! - Proper metric collection and flushing are essential for monitoring
51//! pipeline performance, especially in production environments.
52
53use crate::block_details::{BlockDetailsPipe, BlockDetailsPipes};
54use crate::datasource::{BlockDetails, DatasourceId};
55use crate::filter::Filter;
56use {
57 crate::{
58 account::{
59 AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
60 },
61 account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
62 collection::InstructionDecoderCollection,
63 datasource::{AccountDeletion, Datasource, Update},
64 error::CarbonResult,
65 instruction::{
66 InstructionDecoder, InstructionPipe, InstructionPipes, InstructionProcessorInputType,
67 InstructionsWithMetadata, NestedInstructions,
68 },
69 metrics::{Metrics, MetricsCollection},
70 processor::Processor,
71 schema::TransactionSchema,
72 transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
73 transformers,
74 },
75 core::time,
76 serde::de::DeserializeOwned,
77 std::{convert::TryInto, sync::Arc, time::Instant},
78 tokio_util::sync::CancellationToken,
79};
80
81/// Defines the shutdown behavior for the pipeline.
82///
83/// `ShutdownStrategy` determines how the pipeline will behave when it receives
84/// a shutdown signal. It supports two modes:
85///
86/// - `Immediate`: Stops the entire pipeline, including all tasks, instantly.
87/// - `ProcessPending`: Terminates the data sources, then completes processing
88/// of any updates currently pending in the pipeline. This is the default
89/// behavior.
90///
91/// # Variants
92///
93/// - `Immediate`: Immediately stops all pipeline activity without processing
94/// any remaining updates.
95/// - `ProcessPending`: Gracefully terminates the data sources and allows the
96/// pipeline to finish processing updates that are still in progress or
97/// queued.
98///
99/// # Notes
100///
101/// - `ProcessPending` is the default variant, enabling the pipeline to ensure
102/// that no updates are lost during shutdown.
103#[derive(Default, PartialEq, Debug)]
104pub enum ShutdownStrategy {
105 /// Stop the whole pipeline immediately.
106 Immediate,
107 /// Terminate the datasource(s) and finish processing all pending updates.
108 #[default]
109 ProcessPending,
110}
111
112/// The default size of the channel buffer for the pipeline.
113///
114/// This constant defines the default number of updates that can be queued in
115/// the pipeline's channel buffer. It is used as a fallback value if the
116/// `channel_buffer_size` is not explicitly set during pipeline construction.
117///
118/// The default size is 10,000 updates, which provides a reasonable balance
119pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
120
121/// Represents the primary data processing pipeline in the `carbon-core`
122/// framework.
123///
124/// The `Pipeline` struct is responsible for orchestrating the flow of data from
125/// various sources, processing it through multiple pipes (for accounts,
126/// transactions, instructions, and account deletions), and recording metrics at
127/// each stage. This flexible design allows for customized data processing,
128/// handling a variety of update types with minimal boilerplate code.
129///
130/// ## Overview
131///
132/// A `Pipeline` instance includes collections of data sources and processing
133/// pipes, enabling users to configure the pipeline to handle diverse types of
134/// blockchain-related data. Each pipe is responsible for decoding, processing,
135/// and routing specific data types, while the metrics system records relevant
136/// statistics.
137///
138/// ### Key Concepts
139///
140/// - **Datasources**: These provide the raw data, such as account updates,
141/// transaction details, and account deletions.
142/// - **Pipes**: Modular units that handle specific data types:
143/// - `AccountPipes` for account updates.
144/// - `AccountDeletionPipes` for account deletions.
145/// - `InstructionPipes` for instruction data within transactions.
146/// - `TransactionPipes` for entire transaction payloads.
147/// - **Metrics**: Collect performance data, enabling real-time insights and
148/// efficient monitoring.
149///
150/// ## Fields
151///
152/// - `datasources`: A vector of data sources (`Datasource` implementations)
153/// that provide the data for processing. Each data source is paired with a
154/// unique `DatasourceId` for identification and filtering purposes. Each data
155/// source must be wrapped in an `Arc` for safe, concurrent access.
156/// - `account_pipes`: A vector of `AccountPipes`, each responsible for handling
157/// account updates.
158/// - `account_deletion_pipes`: A vector of `AccountDeletionPipes` to handle
159/// deletion events.
160/// - `block_details_pipes`: A vector of `BlockDetailsPipes` to handle
161/// block details.
162/// - `instruction_pipes`: A vector of `InstructionPipes` for processing
163/// instructions within transactions. These pipes work with nested
164/// instructions and are generically defined to support varied instruction
165/// types.
166/// - `transaction_pipes`: A vector of `TransactionPipes` responsible for
167/// processing complete transaction payloads.
168/// - `metrics`: A vector of `Metrics` implementations to record and track
169/// performance data. Each metrics instance is managed within an `Arc` to
170/// ensure thread safety.
171/// - `metrics_flush_interval`: An optional interval, in seconds, defining how
172/// frequently metrics should be flushed. If `None`, the default interval is
173/// used.
174/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
175/// not set, a default size of 10_000 will be used.
176///
177/// ## Example
178///
179/// ```ignore
180/// use std::sync::Arc;
181///
182/// carbon_core::pipeline::Pipeline::builder()
183/// .datasource(transaction_crawler)
184/// .metrics(Arc::new(LogMetrics::new()))
185/// .metrics(Arc::new(PrometheusMetrics::new()))
186/// .instruction(
187/// TestProgramDecoder,
188/// TestProgramProcessor
189/// )
190/// .account(
191/// TestProgramDecoder,
192/// TestProgramAccountProcessor
193/// )
194/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
195/// .account_deletions(TestProgramAccountDeletionProcessor)
196/// .channel_buffer_size(1000)
197/// .build()?
198/// .run()
199/// .await?;
200/// ```
201///
202/// ## Notes
203///
204/// - Ensure that each data source and pipe implements the required traits, such
205/// as `Datasource`, `AccountPipes`, and `Metrics`, as appropriate.
206/// - The pipeline is designed for concurrent operation, utilizing `Arc` and
207/// `Box` types to handle shared ownership and trait object storage.
208/// - The `metrics_flush_interval` controls how frequently the pipeline's
209/// metrics are flushed. If `None`, a default interval (usually 5 seconds) is
210/// used.
211pub struct Pipeline {
212 pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
213 pub account_pipes: Vec<Box<dyn AccountPipes>>,
214 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
215 pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
216 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
217 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
218 pub metrics: Arc<MetricsCollection>,
219 pub metrics_flush_interval: Option<u64>,
220 pub datasource_cancellation_token: Option<CancellationToken>,
221 pub shutdown_strategy: ShutdownStrategy,
222 pub channel_buffer_size: usize,
223}
224
225impl Pipeline {
226 /// Creates a new `PipelineBuilder` instance for constructing a `Pipeline`.
227 ///
228 /// The `builder` method returns a `PipelineBuilder` that allows you to
229 /// configure and customize the pipeline components before building the
230 /// final `Pipeline` object. This approach provides a flexible and
231 /// type-safe way to assemble a pipeline by specifying data sources,
232 /// processing pipes, and metrics.
233 ///
234 /// # Example
235 ///
236 /// ```ignore
237 /// use std::sync::Arc;
238 ///
239 /// carbon_core::pipeline::Pipeline::builder()
240 /// .datasource(transaction_crawler)
241 /// .metrics(Arc::new(LogMetrics::new()))
242 /// .metrics(Arc::new(PrometheusMetrics::new()))
243 /// .instruction(
244 /// TestProgramDecoder,
245 /// TestProgramProcessor
246 /// );
247 /// // ...
248 /// ```
249 ///
250 /// # Returns
251 ///
252 /// Returns a `PipelineBuilder` instance with empty collections for data
253 /// sources, pipes, and metrics. You can then configure each component
254 /// using the builder pattern.
255 pub fn builder() -> PipelineBuilder {
256 log::trace!("Pipeline::builder()");
257 PipelineBuilder {
258 datasources: Vec::new(),
259 account_pipes: Vec::new(),
260 account_deletion_pipes: Vec::new(),
261 block_details_pipes: Vec::new(),
262 instruction_pipes: Vec::new(),
263 transaction_pipes: Vec::new(),
264 metrics: MetricsCollection::default(),
265 metrics_flush_interval: None,
266 datasource_cancellation_token: None,
267 shutdown_strategy: ShutdownStrategy::default(),
268 channel_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
269 }
270 }
271
272 /// Runs the `Pipeline`, processing updates from data sources and handling
273 /// metrics.
274 ///
275 /// The `run` method initializes the pipeline's metrics system and starts
276 /// listening for updates from the configured data sources. It checks
277 /// the types of updates provided by each data source to ensure that the
278 /// required data types are available for processing. The method then
279 /// enters a loop where it processes each update received from the data
280 /// sources in turn, logging and updating metrics based on the success
281 /// or failure of each operation.
282 ///
283 /// # How it Works
284 ///
285 /// - Initializes metrics and sets up an interval for periodic metric
286 /// flushing.
287 /// - Spawns tasks for each data source to continuously consume updates.
288 /// - Processes updates according to their type (e.g., Account, Transaction,
289 /// or AccountDeletion).
290 /// - Records performance metrics such as update processing times, and
291 /// tracks success and failure counts.
292 ///
293 /// # Errors
294 ///
295 /// The method returns an `Err` variant if:
296 /// - Required update types (e.g., `AccountUpdate`, `AccountDeletion`,
297 /// `Transaction`) are not provided by any data source, causing a mismatch
298 /// in expected data processing capabilities.
299 /// - A data source encounters an error while consuming updates.
300 /// - An error occurs during metrics flushing or processing of updates.
301 ///
302 /// # Example
303 ///
304 /// ```ignore
305 /// use carbon_core::pipeline::Pipeline;
306 ///
307 /// let mut pipeline = Pipeline::builder()
308 /// .datasource(MyDatasource::new())
309 /// .metrics(MyMetrics::new())
310 /// .build()
311 /// .expect("Failed to build pipeline");
312 ///
313 /// // Running the pipeline asynchronously
314 /// tokio::spawn(async move {
315 /// if let Err(e) = pipeline.run().await {
316 /// eprintln!("Pipeline run error: {:?}", e);
317 /// }
318 /// });
319 /// ```
320 ///
321 /// # Notes
322 ///
323 /// - This method is asynchronous and should be awaited within a Tokio
324 /// runtime environment.
325 /// - The pipeline monitors metrics and flushes them based on the configured
326 /// `metrics_flush_interval`.
327 /// - The `run` method operates in an infinite loop, handling updates until
328 /// a termination condition occurs.
329 pub async fn run(&mut self) -> CarbonResult<()> {
330 log::info!("starting pipeline. num_datasources: {}, num_metrics: {}, num_account_pipes: {}, num_account_deletion_pipes: {}, num_instruction_pipes: {}, num_transaction_pipes: {}",
331 self.datasources.len(),
332 self.metrics.metrics.len(),
333 self.account_pipes.len(),
334 self.account_deletion_pipes.len(),
335 self.instruction_pipes.len(),
336 self.transaction_pipes.len(),
337 );
338
339 log::trace!("run(self)");
340
341 self.metrics.initialize_metrics().await?;
342 let (update_sender, mut update_receiver) =
343 tokio::sync::mpsc::channel::<(Update, DatasourceId)>(self.channel_buffer_size);
344
345 let datasource_cancellation_token = self
346 .datasource_cancellation_token
347 .clone()
348 .unwrap_or_default();
349
350 for datasource in &self.datasources {
351 let datasource_cancellation_token_clone = datasource_cancellation_token.clone();
352 let sender_clone = update_sender.clone();
353 let datasource_clone = Arc::clone(&datasource.1);
354 let datasource_id = datasource.0.clone();
355 let metrics_collection = self.metrics.clone();
356
357 tokio::spawn(async move {
358 if let Err(e) = datasource_clone
359 .consume(
360 datasource_id,
361 sender_clone,
362 datasource_cancellation_token_clone,
363 metrics_collection,
364 )
365 .await
366 {
367 log::error!("error consuming datasource: {:?}", e);
368 }
369 });
370 }
371
372 drop(update_sender);
373
374 let mut interval = tokio::time::interval(time::Duration::from_secs(
375 self.metrics_flush_interval.unwrap_or(5),
376 ));
377
378 loop {
379 tokio::select! {
380 _ = datasource_cancellation_token.cancelled() => {
381 log::trace!("datasource cancellation token cancelled, shutting down.");
382 self.metrics.flush_metrics().await?;
383 self.metrics.shutdown_metrics().await?;
384 break;
385 }
386 _ = tokio::signal::ctrl_c() => {
387 log::trace!("received SIGINT, shutting down.");
388 datasource_cancellation_token.cancel();
389
390 if self.shutdown_strategy == ShutdownStrategy::Immediate {
391 log::info!("shutting down the pipeline immediately.");
392 self.metrics.flush_metrics().await?;
393 self.metrics.shutdown_metrics().await?;
394 break;
395 } else {
396 log::info!("shutting down the pipeline after processing pending updates.");
397 }
398 }
399 _ = interval.tick() => {
400 self.metrics.flush_metrics().await?;
401 }
402 update = update_receiver.recv() => {
403 match update {
404 Some((update, datasource_id)) => {
405 self
406 .metrics.increment_counter("updates_received", 1)
407 .await?;
408
409 let start = Instant::now();
410 let process_result = self.process(update.clone(), datasource_id.clone()).await;
411 let time_taken_nanoseconds = start.elapsed().as_nanos();
412 let time_taken_milliseconds = time_taken_nanoseconds / 1_000_000;
413
414 self
415 .metrics
416 .record_histogram("updates_process_time_nanoseconds", time_taken_nanoseconds as f64)
417 .await?;
418
419 self
420 .metrics
421 .record_histogram("updates_process_time_milliseconds", time_taken_milliseconds as f64)
422 .await?;
423
424 match process_result {
425 Ok(_) => {
426 self
427 .metrics.increment_counter("updates_successful", 1)
428 .await?;
429
430 log::trace!("processed update")
431 }
432 Err(error) => {
433 log::error!("error processing update ({:?}): {:?}", update, error);
434 self.metrics.increment_counter("updates_failed", 1).await?;
435 }
436 };
437
438 self
439 .metrics.increment_counter("updates_processed", 1)
440 .await?;
441
442 self
443 .metrics.update_gauge("updates_queued", update_receiver.len() as f64)
444 .await?;
445 }
446 None => {
447 log::info!("update_receiver closed, shutting down.");
448 self.metrics.flush_metrics().await?;
449 self.metrics.shutdown_metrics().await?;
450 break;
451 }
452 }
453 }
454 }
455 }
456
457 log::info!("pipeline shutdown complete.");
458
459 Ok(())
460 }
461
462 /// Processes a single update and routes it through the appropriate pipeline
463 /// stages.
464 ///
465 /// The `process` method takes an `Update` and determines its type, then
466 /// routes it through the corresponding pipes for handling account
467 /// updates, transactions, or account deletions. It also records metrics
468 /// for processed updates, providing insights into the processing
469 /// workload and performance.
470 ///
471 /// ## Functionality
472 ///
473 /// - **Account Updates**: Passes account updates through the
474 /// `account_pipes`. Each pipe processes the account metadata and the
475 /// updated account state.
476 /// - **Transaction Updates**: Extracts transaction metadata and
477 /// instructions, nests them if needed, and routes them through
478 /// `instruction_pipes` and `transaction_pipes`.
479 /// - **Account Deletions**: Sends account deletion events through the
480 /// `account_deletion_pipes`.
481 ///
482 /// The method also updates metrics counters for each type of update,
483 /// tracking how many updates have been processed in each category.
484 ///
485 /// # Parameters
486 ///
487 /// - `update`: An `Update` variant representing the type of data received.
488 /// This can be an `Account`, `Transaction`, or `AccountDeletion`, each
489 /// triggering different processing logic within the pipeline.
490 /// - `datasource_id`: The ID of the datasource that produced this update.
491 /// This is used by filters to determine whether the update should be
492 /// processed by specific pipes.
493 ///
494 /// # Returns
495 ///
496 /// Returns a `CarbonResult<()>`, indicating `Ok(())` on successful
497 /// processing or an error if processing fails at any stage.
498 ///
499 /// # Notes
500 ///
501 /// - This method is asynchronous and should be awaited within a Tokio
502 /// runtime.
503 /// - Each type of update (account, transaction, account deletion) requires
504 /// its own set of pipes, so ensure that appropriate pipes are configured
505 /// based on the data types expected from the data sources.
506 /// - Metrics are recorded after each successful processing stage to track
507 /// processing volumes and identify potential bottlenecks in real-time.
508 /// - Filters are applied to each pipe before processing, allowing for
509 /// selective update processing based on datasource ID and other criteria.
510 ///
511 /// # Errors
512 ///
513 /// Returns an error if any of the pipes fail during processing, or if an
514 /// issue arises while incrementing counters or updating metrics. Handle
515 /// errors gracefully to ensure continuous pipeline operation.
516 async fn process(&mut self, update: Update, datasource_id: DatasourceId) -> CarbonResult<()> {
517 log::trace!(
518 "process(self, update: {:?}, datasource_id: {:?})",
519 update,
520 datasource_id
521 );
522 match update {
523 Update::Account(account_update) => {
524 let account_metadata = AccountMetadata {
525 slot: account_update.slot,
526 pubkey: account_update.pubkey,
527 transaction_signature: account_update.transaction_signature,
528 };
529
530 for pipe in self.account_pipes.iter_mut() {
531 if pipe.filters().iter().all(|filter| {
532 filter.filter_account(
533 &datasource_id,
534 &account_metadata,
535 &account_update.account,
536 )
537 }) {
538 pipe.run(
539 (account_metadata.clone(), account_update.account.clone()),
540 self.metrics.clone(),
541 )
542 .await?;
543 }
544 }
545
546 self.metrics
547 .increment_counter("account_updates_processed", 1)
548 .await?;
549 }
550 Update::Transaction(transaction_update) => {
551 let transaction_metadata = Arc::new((*transaction_update).clone().try_into()?);
552
553 let instructions_with_metadata: InstructionsWithMetadata =
554 transformers::extract_instructions_with_metadata(
555 &transaction_metadata,
556 &transaction_update,
557 )?;
558
559 let nested_instructions: NestedInstructions = instructions_with_metadata.into();
560
561 for pipe in self.instruction_pipes.iter_mut() {
562 for nested_instruction in nested_instructions.iter() {
563 if pipe.filters().iter().all(|filter| {
564 filter.filter_instruction(&datasource_id, nested_instruction)
565 }) {
566 pipe.run(nested_instruction, self.metrics.clone()).await?;
567 }
568 }
569 }
570
571 for pipe in self.transaction_pipes.iter_mut() {
572 if pipe.filters().iter().all(|filter| {
573 filter.filter_transaction(
574 &datasource_id,
575 &transaction_metadata,
576 &nested_instructions,
577 )
578 }) {
579 pipe.run(
580 transaction_metadata.clone(),
581 &nested_instructions,
582 self.metrics.clone(),
583 )
584 .await?;
585 }
586 }
587
588 self.metrics
589 .increment_counter("transaction_updates_processed", 1)
590 .await?;
591 }
592 Update::AccountDeletion(account_deletion) => {
593 for pipe in self.account_deletion_pipes.iter_mut() {
594 if pipe.filters().iter().all(|filter| {
595 filter.filter_account_deletion(&datasource_id, &account_deletion)
596 }) {
597 pipe.run(account_deletion.clone(), self.metrics.clone())
598 .await?;
599 }
600 }
601
602 self.metrics
603 .increment_counter("account_deletions_processed", 1)
604 .await?;
605 }
606 Update::BlockDetails(block_details) => {
607 for pipe in self.block_details_pipes.iter_mut() {
608 if pipe
609 .filters()
610 .iter()
611 .all(|filter| filter.filter_block_details(&datasource_id, &block_details))
612 {
613 pipe.run(block_details.clone(), self.metrics.clone())
614 .await?;
615 }
616 }
617
618 self.metrics
619 .increment_counter("block_details_processed", 1)
620 .await?;
621 }
622 };
623
624 Ok(())
625 }
626}
627
628/// A builder for constructing a `Pipeline` instance with customized data
629/// sources, processing pipes, and metrics.
630///
631/// The `PipelineBuilder` struct offers a flexible way to assemble a `Pipeline`
632/// by allowing configuration of its components, such as data sources, account
633/// and transaction pipes, deletion handling, and metrics. Using the builder
634/// pattern, you can add the desired elements incrementally and then finalize
635/// with a call to `build`.
636///
637/// ## Overview
638///
639/// The `PipelineBuilder` supports the following components:
640/// - **Datasources**: Sources of data updates, such as account information and
641/// transactions.
642/// - **Account Pipes**: For processing account updates from data sources.
643/// - **Account Deletion Pipes**: For handling account deletion updates.
644/// - **Instruction Pipes**: For handling instructions associated with
645/// transactions.
646/// - **Transaction Pipes**: For handling full transaction data.
647/// - **Metrics**: Collects and reports performance data, such as update
648/// processing times.
649/// - **Metrics Flush Interval**: Optional interval defining how often to flush
650/// metrics data.
651///
652/// Each component can be added through method chaining, enhancing code
653/// readability and maintainability.
654///
655/// # Example
656///
657/// ```ignore
658/// use std::sync::Arc;
659///
660/// carbon_core::pipeline::Pipeline::builder()
661/// .datasource(transaction_crawler)
662/// .metrics(Arc::new(LogMetrics::new()))
663/// .metrics(Arc::new(PrometheusMetrics::new()))
664/// .instruction(
665/// TestProgramDecoder,
666/// TestProgramProcessor
667/// );
668/// // ...
669/// ```
670///
671/// # Fields
672///
673/// - `datasources`: A collection of `Datasource` objects wrapped in `Arc` for
674/// shared ownership across threads. Each `Datasource` provides updates to the
675/// pipeline.
676/// - `account_pipes`: A collection of `AccountPipes` to handle account updates.
677/// - `account_deletion_pipes`: A collection of `AccountDeletionPipes` for
678/// processing account deletions.
679/// - `instruction_pipes`: A collection of `InstructionPipes` to process
680/// instructions in transactions.
681/// - `transaction_pipes`: A collection of `TransactionPipes` to process full
682/// transaction data.
683/// - `metrics`: A vector of `Metrics` implementations for tracking pipeline
684/// performance.
685/// - `metrics_flush_interval`: An optional interval (in seconds) for flushing
686/// metrics data. If not set, a default flush interval will be used.
687/// - `datasource_cancellation_token`: An optional `CancellationToken` for
688/// canceling datasource. If not set, a default `CancellationToken` will be
689/// used.
690/// - `channel_buffer_size`: The size of the channel buffer for the pipeline. If
691/// not set, a default size of 10_000 will be used.
692///
693/// # Returns
694///
695/// After configuring the builder, call `build` to create a `Pipeline` instance.
696/// The builder will return a `CarbonResult<Pipeline>`, which will either
697/// contain the configured pipeline or an error if configuration failed.
698///
699/// # Notes
700///
701/// - The builder pattern allows for method chaining, making it easy to
702/// incrementally add components to the `Pipeline`.
703/// - Ensure that each component matches the data and update types expected by
704/// your application.
705#[derive(Default)]
706pub struct PipelineBuilder {
707 pub datasources: Vec<(DatasourceId, Arc<dyn Datasource + Send + Sync>)>,
708 pub account_pipes: Vec<Box<dyn AccountPipes>>,
709 pub account_deletion_pipes: Vec<Box<dyn AccountDeletionPipes>>,
710 pub block_details_pipes: Vec<Box<dyn BlockDetailsPipes>>,
711 pub instruction_pipes: Vec<Box<dyn for<'a> InstructionPipes<'a>>>,
712 pub transaction_pipes: Vec<Box<dyn for<'a> TransactionPipes<'a>>>,
713 pub metrics: MetricsCollection,
714 pub metrics_flush_interval: Option<u64>,
715 pub datasource_cancellation_token: Option<CancellationToken>,
716 pub shutdown_strategy: ShutdownStrategy,
717 pub channel_buffer_size: usize,
718}
719
720impl PipelineBuilder {
721 /// Creates a new `PipelineBuilder` with empty collections for datasources,
722 /// pipes, and metrics.
723 ///
724 /// This method initializes a `PipelineBuilder` instance, allowing you to
725 /// configure each component of a `Pipeline` before building it. The
726 /// builder pattern offers flexibility in adding data sources, account
727 /// and transaction handling pipes, deletion processing, and metrics
728 /// collection features.
729 ///
730 /// # Example
731 ///
732 /// ```rust
733 /// use carbon_core::pipeline::PipelineBuilder;
734 ///
735 /// let builder = PipelineBuilder::new();
736 /// ```
737 pub fn new() -> Self {
738 log::trace!("PipelineBuilder::new()");
739 Self::default()
740 }
741
742 /// Adds a datasource to the pipeline.
743 ///
744 /// The datasource is responsible for providing updates, such as account and
745 /// transaction data, to the pipeline. Multiple datasources can be added
746 /// to handle various types of updates.
747 ///
748 /// # Parameters
749 ///
750 /// - `datasource`: The data source to add, implementing the `Datasource`
751 /// trait.
752 ///
753 /// # Example
754 ///
755 /// ```ignore
756 /// use carbon_core::pipeline::PipelineBuilder;
757 ///
758 /// let builder = PipelineBuilder::new()
759 /// .datasource(MyDatasource::new());
760 /// ```
761 pub fn datasource(mut self, datasource: impl Datasource + 'static) -> Self {
762 log::trace!("datasource(self, datasource: {:?})", stringify!(datasource));
763 self.datasources
764 .push((DatasourceId::new_unique(), Arc::new(datasource)));
765 self
766 }
767
768 /// Adds a datasource to the pipeline with a specific ID.
769 ///
770 /// This method allows you to assign a custom ID to a datasource, which is
771 /// useful for filtering updates based on their source. The ID can be used
772 /// with filters to selectively process updates from specific datasources.
773 ///
774 /// # Parameters
775 ///
776 /// - `datasource`: The data source to add, implementing the `Datasource`
777 /// trait
778 /// - `id`: The `DatasourceId` to assign to this datasource
779 ///
780 /// # Example
781 ///
782 /// ```ignore
783 /// use carbon_core::{pipeline::PipelineBuilder, datasource::DatasourceId};
784 ///
785 /// let mainnet_id = DatasourceId::new_named("mainnet");
786 /// let builder = PipelineBuilder::new()
787 /// .datasource_with_id(mainnet_id, MyDatasource::new());
788 /// ```
789 pub fn datasource_with_id(
790 mut self,
791 datasource: impl Datasource + 'static,
792 id: DatasourceId,
793 ) -> Self {
794 log::trace!(
795 "datasource_with_id(self, id: {:?}, datasource: {:?})",
796 id,
797 stringify!(datasource)
798 );
799 self.datasources.push((id, Arc::new(datasource)));
800 self
801 }
802
803 /// Sets the shutdown strategy for the pipeline.
804 ///
805 /// This method configures how the pipeline should handle shutdowns. The
806 /// shutdown strategy defines whether the pipeline should terminate
807 /// immediately or continue processing pending updates after terminating
808 /// the data sources.
809 ///
810 /// # Parameters
811 ///
812 /// - `shutdown_strategy`: A variant of [`ShutdownStrategy`] that determines
813 /// how the pipeline should handle shutdowns.
814 ///
815 /// # Returns
816 ///
817 /// Returns `Self`, allowing for method chaining.
818 ///
819 /// # Notes
820 ///
821 /// - Use `ShutdownStrategy::Immediate` to stop the entire pipeline
822 /// instantly, including all active processing tasks.
823 /// - Use `ShutdownStrategy::ProcessPending` (the default) to terminate data
824 /// sources first and allow the pipeline to finish processing any updates
825 /// that are still pending.
826 pub fn shutdown_strategy(mut self, shutdown_strategy: ShutdownStrategy) -> Self {
827 log::trace!(
828 "shutdown_strategy(self, shutdown_strategy: {:?})",
829 shutdown_strategy
830 );
831 self.shutdown_strategy = shutdown_strategy;
832 self
833 }
834
835 /// Adds an account pipe to process account updates.
836 ///
837 /// Account pipes decode and process updates to accounts within the
838 /// pipeline. This method requires both an `AccountDecoder` and a
839 /// `Processor` to handle decoded account data.
840 ///
841 /// # Parameters
842 ///
843 /// - `decoder`: An `AccountDecoder` that decodes the account data.
844 /// - `processor`: A `Processor` that processes the decoded account data.
845 ///
846 /// # Example
847 ///
848 /// ```ignore
849 /// use carbon_core::pipeline::PipelineBuilder;
850 ///
851 /// let builder = PipelineBuilder::new()
852 /// .account(MyAccountDecoder, MyAccountProcessor);
853 /// ```
854 pub fn account<T: Send + Sync + 'static>(
855 mut self,
856 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
857 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
858 ) -> Self {
859 log::trace!(
860 "account(self, decoder: {:?}, processor: {:?})",
861 stringify!(decoder),
862 stringify!(processor)
863 );
864 self.account_pipes.push(Box::new(AccountPipe {
865 decoder: Box::new(decoder),
866 processor: Box::new(processor),
867 filters: vec![],
868 }));
869 self
870 }
871
872 /// Adds an account pipe with filters to process account updates selectively.
873 ///
874 /// This method creates an account pipe that only processes updates that pass
875 /// all the specified filters. Filters can be used to selectively process
876 /// updates based on criteria such as datasource ID, account properties, or
877 /// other custom logic.
878 ///
879 /// # Parameters
880 ///
881 /// - `decoder`: An `AccountDecoder` that decodes the account data
882 /// - `processor`: A `Processor` that processes the decoded account data
883 /// - `filters`: A collection of filters that determine which account updates
884 /// should be processed
885 ///
886 /// # Example
887 ///
888 /// ```ignore
889 /// use carbon_core::{
890 /// pipeline::PipelineBuilder,
891 /// datasource::DatasourceId,
892 /// filter::DatasourceFilter,
893 /// };
894 ///
895 /// let mainnet_id = DatasourceId::new_named("mainnet");
896 /// let filter = DatasourceFilter::new(mainnet_id);
897 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
898 ///
899 /// let builder = PipelineBuilder::new()
900 /// .account_with_filters(MyAccountDecoder, MyAccountProcessor, filters);
901 /// ```
902 pub fn account_with_filters<T: Send + Sync + 'static>(
903 mut self,
904 decoder: impl for<'a> AccountDecoder<'a, AccountType = T> + Send + Sync + 'static,
905 processor: impl Processor<InputType = AccountProcessorInputType<T>> + Send + Sync + 'static,
906 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
907 ) -> Self {
908 log::trace!(
909 "account_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
910 stringify!(decoder),
911 stringify!(processor),
912 stringify!(filters)
913 );
914 self.account_pipes.push(Box::new(AccountPipe {
915 decoder: Box::new(decoder),
916 processor: Box::new(processor),
917 filters,
918 }));
919 self
920 }
921
922 /// Adds an account deletion pipe to handle account deletion events.
923 ///
924 /// Account deletion pipes process deletions of accounts, with a `Processor`
925 /// to handle the deletion events as they occur.
926 ///
927 /// # Parameters
928 ///
929 /// - `processor`: A `Processor` that processes account deletion events.
930 ///
931 /// # Example
932 ///
933 /// ```ignore
934 /// use carbon_core::pipeline::PipelineBuilder;
935 ///
936 /// let builder = PipelineBuilder::new()
937 /// .account_deletions(MyAccountDeletionProcessor);
938 /// ```
939 pub fn account_deletions(
940 mut self,
941 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
942 ) -> Self {
943 log::trace!(
944 "account_deletions(self, processor: {:?})",
945 stringify!(processor)
946 );
947 self.account_deletion_pipes
948 .push(Box::new(AccountDeletionPipe {
949 processor: Box::new(processor),
950 filters: vec![],
951 }));
952 self
953 }
954
955 /// Adds an account deletion pipe with filters to handle account deletion events selectively.
956 ///
957 /// This method creates an account deletion pipe that only processes deletion
958 /// events that pass all the specified filters. Filters can be used to
959 /// selectively process deletions based on criteria such as datasource ID or
960 /// other custom logic.
961 ///
962 /// # Parameters
963 ///
964 /// - `processor`: A `Processor` that processes account deletion events
965 /// - `filters`: A collection of filters that determine which account deletion
966 /// events should be processed
967 ///
968 /// # Example
969 ///
970 /// ```ignore
971 /// use carbon_core::{
972 /// pipeline::PipelineBuilder,
973 /// datasource::DatasourceId,
974 /// filter::DatasourceFilter,
975 /// };
976 ///
977 /// let mainnet_id = DatasourceId::new_named("mainnet");
978 /// let filter = DatasourceFilter::new(mainnet_id);
979 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
980 ///
981 /// let builder = PipelineBuilder::new()
982 /// .account_deletions_with_filters(MyAccountDeletionProcessor, filters);
983 /// ```
984 pub fn account_deletions_with_filters(
985 mut self,
986 processor: impl Processor<InputType = AccountDeletion> + Send + Sync + 'static,
987 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
988 ) -> Self {
989 log::trace!(
990 "account_deletions_with_filters(self, processor: {:?}, filters: {:?})",
991 stringify!(processor),
992 stringify!(filters)
993 );
994 self.account_deletion_pipes
995 .push(Box::new(AccountDeletionPipe {
996 processor: Box::new(processor),
997 filters,
998 }));
999 self
1000 }
1001
1002 /// Adds a block details pipe to handle block details updates.
1003 ///
1004 /// Block details pipes process updates related to block metadata, such as
1005 /// slot, block hash, and rewards, with a `Processor` to handle the updates.
1006 ///
1007 /// # Parameters
1008 ///
1009 /// - `processor`: A `Processor` that processes block details updates.
1010 ///
1011 /// # Example
1012 ///
1013 /// ```ignore
1014 /// use carbon_core::pipeline::PipelineBuilder;
1015 ///
1016 /// let builder = PipelineBuilder::new()
1017 /// .block_details(MyBlockDetailsProcessor);
1018 /// ```
1019 pub fn block_details(
1020 mut self,
1021 processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1022 ) -> Self {
1023 log::trace!(
1024 "block_details(self, processor: {:?})",
1025 stringify!(processor)
1026 );
1027 self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1028 processor: Box::new(processor),
1029 filters: vec![],
1030 }));
1031 self
1032 }
1033
1034 /// Adds a block details pipe with filters to handle block details updates selectively.
1035 ///
1036 /// This method creates a block details pipe that only processes updates that
1037 /// pass all the specified filters. Filters can be used to selectively process
1038 /// block details updates based on criteria such as datasource ID, block height,
1039 /// or other custom logic.
1040 ///
1041 /// # Parameters
1042 ///
1043 /// - `processor`: A `Processor` that processes block details updates
1044 /// - `filters`: A collection of filters that determine which block details
1045 /// updates should be processed
1046 ///
1047 /// # Example
1048 ///
1049 /// ```ignore
1050 /// use carbon_core::{
1051 /// pipeline::PipelineBuilder,
1052 /// datasource::DatasourceId,
1053 /// filter::DatasourceFilter,
1054 /// };
1055 ///
1056 /// let mainnet_id = DatasourceId::new_named("mainnet");
1057 /// let filter = DatasourceFilter::new(mainnet_id);
1058 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1059 ///
1060 /// let builder = PipelineBuilder::new()
1061 /// .block_details_with_filters(MyBlockDetailsProcessor, filters);
1062 /// ```
1063 pub fn block_details_with_filters(
1064 mut self,
1065 processor: impl Processor<InputType = BlockDetails> + Send + Sync + 'static,
1066 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1067 ) -> Self {
1068 log::trace!(
1069 "block_details_with_filters(self, processor: {:?}, filters: {:?})",
1070 stringify!(processor),
1071 stringify!(filters)
1072 );
1073 self.block_details_pipes.push(Box::new(BlockDetailsPipe {
1074 processor: Box::new(processor),
1075 filters,
1076 }));
1077 self
1078 }
1079
1080 /// Adds an instruction pipe to process instructions within transactions.
1081 ///
1082 /// Instruction pipes decode and process individual instructions,
1083 /// enabling specialized handling of various instruction types.
1084 ///
1085 /// # Parameters
1086 ///
1087 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1088 /// transaction data.
1089 /// - `processor`: A `Processor` that processes decoded instruction data.
1090 ///
1091 /// # Example
1092 ///
1093 /// ```ignore
1094 /// use carbon_core::pipeline::PipelineBuilder;
1095 ///
1096 /// let builder = PipelineBuilder::new()
1097 /// .instruction(MyDecoder, MyInstructionProcessor);
1098 /// ```
1099 pub fn instruction<T: Send + Sync + 'static>(
1100 mut self,
1101 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1102 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1103 ) -> Self {
1104 log::trace!(
1105 "instruction(self, decoder: {:?}, processor: {:?})",
1106 stringify!(decoder),
1107 stringify!(processor)
1108 );
1109 self.instruction_pipes.push(Box::new(InstructionPipe {
1110 decoder: Box::new(decoder),
1111 processor: Box::new(processor),
1112 filters: vec![],
1113 }));
1114 self
1115 }
1116
1117 /// Adds an instruction pipe with filters to process instructions selectively.
1118 ///
1119 /// This method creates an instruction pipe that only processes instructions
1120 /// that pass all the specified filters. Filters can be used to selectively
1121 /// process instructions based on criteria such as datasource ID, instruction
1122 /// type, or other custom logic.
1123 ///
1124 /// # Parameters
1125 ///
1126 /// - `decoder`: An `InstructionDecoder` for decoding instructions from
1127 /// transaction data
1128 /// - `processor`: A `Processor` that processes decoded instruction data
1129 /// - `filters`: A collection of filters that determine which instructions
1130 /// should be processed
1131 ///
1132 /// # Example
1133 ///
1134 /// ```ignore
1135 /// use carbon_core::{
1136 /// pipeline::PipelineBuilder,
1137 /// datasource::DatasourceId,
1138 /// filter::DatasourceFilter,
1139 /// };
1140 ///
1141 /// let mainnet_id = DatasourceId::new_named("mainnet");
1142 /// let filter = DatasourceFilter::new(mainnet_id);
1143 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1144 ///
1145 /// let builder = PipelineBuilder::new()
1146 /// .instruction_with_filters(MyDecoder, MyInstructionProcessor, filters);
1147 /// ```
1148 pub fn instruction_with_filters<T: Send + Sync + 'static>(
1149 mut self,
1150 decoder: impl for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static,
1151 processor: impl Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static,
1152 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1153 ) -> Self {
1154 log::trace!(
1155 "instruction_with_filters(self, decoder: {:?}, processor: {:?}, filters: {:?})",
1156 stringify!(decoder),
1157 stringify!(processor),
1158 stringify!(filters)
1159 );
1160 self.instruction_pipes.push(Box::new(InstructionPipe {
1161 decoder: Box::new(decoder),
1162 processor: Box::new(processor),
1163 filters,
1164 }));
1165 self
1166 }
1167
1168 /// Adds a transaction pipe for processing full transaction data.
1169 ///
1170 /// This method requires a transaction schema for decoding and a `Processor`
1171 /// to handle the processed transaction data.
1172 ///
1173 /// # Parameters
1174 ///
1175 /// - `schema`: A `TransactionSchema` used to match and interpret
1176 /// transaction data.
1177 /// - `processor`: A `Processor` that processes the decoded transaction
1178 /// data.
1179 ///
1180 /// # Example
1181 ///
1182 /// ```ignore
1183 /// use carbon_core::pipeline::PipelineBuilder;
1184 ///
1185 /// let builder = PipelineBuilder::new()
1186 /// .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
1187 /// ```
1188 pub fn transaction<T, U>(
1189 mut self,
1190 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1191 + Send
1192 + Sync
1193 + 'static,
1194 schema: Option<TransactionSchema<T>>,
1195 ) -> Self
1196 where
1197 T: InstructionDecoderCollection + 'static,
1198 U: DeserializeOwned + Send + Sync + 'static,
1199 {
1200 log::trace!(
1201 "transaction(self, schema: {:?}, processor: {:?})",
1202 stringify!(schema),
1203 stringify!(processor)
1204 );
1205 self.transaction_pipes
1206 .push(Box::new(TransactionPipe::<T, U>::new(
1207 schema,
1208 processor,
1209 vec![],
1210 )));
1211 self
1212 }
1213
1214 /// Adds a transaction pipe with filters for processing full transaction data selectively.
1215 ///
1216 /// This method creates a transaction pipe that only processes transactions
1217 /// that pass all the specified filters. Filters can be used to selectively
1218 /// process transactions based on criteria such as datasource ID, transaction
1219 /// type, or other custom logic.
1220 ///
1221 /// # Parameters
1222 ///
1223 /// - `processor`: A `Processor` that processes the decoded transaction data
1224 /// - `schema`: A `TransactionSchema` used to match and interpret
1225 /// transaction data
1226 /// - `filters`: A collection of filters that determine which transactions
1227 /// should be processed
1228 ///
1229 /// # Example
1230 ///
1231 /// ```ignore
1232 /// use carbon_core::{
1233 /// pipeline::PipelineBuilder,
1234 /// datasource::DatasourceId,
1235 /// filter::DatasourceFilter,
1236 /// };
1237 ///
1238 /// let mainnet_id = DatasourceId::new_named("mainnet");
1239 /// let filter = DatasourceFilter::new(mainnet_id);
1240 /// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
1241 ///
1242 /// let builder = PipelineBuilder::new()
1243 /// .transaction_with_filters(MyTransactionProcessor, MY_SCHEMA.clone(), filters);
1244 /// ```
1245 pub fn transaction_with_filters<T, U>(
1246 mut self,
1247 processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
1248 + Send
1249 + Sync
1250 + 'static,
1251 schema: Option<TransactionSchema<T>>,
1252 filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
1253 ) -> Self
1254 where
1255 T: InstructionDecoderCollection + 'static,
1256 U: DeserializeOwned + Send + Sync + 'static,
1257 {
1258 log::trace!(
1259 "transaction_with_filters(self, schema: {:?}, processor: {:?}, filters: {:?})",
1260 stringify!(schema),
1261 stringify!(processor),
1262 stringify!(filters)
1263 );
1264 self.transaction_pipes
1265 .push(Box::new(TransactionPipe::<T, U>::new(
1266 schema, processor, filters,
1267 )));
1268 self
1269 }
1270
1271 /// Adds a metrics component to the pipeline for performance tracking.
1272 ///
1273 /// This component collects and reports on pipeline metrics, providing
1274 /// insights into performance and operational statistics.
1275 ///
1276 /// # Parameters
1277 ///
1278 /// - `metrics`: An instance of a `Metrics` implementation, used to gather
1279 /// and report metrics.
1280 ///
1281 /// # Example
1282 ///
1283 /// ```ignore
1284 /// use std::sync::Arc;
1285 /// use carbon_core::pipeline::PipelineBuilder;
1286 ///
1287 /// let builder = PipelineBuilder::new()
1288 /// .metrics(Arc::new(LogMetrics::new()));
1289 /// ```
1290 pub fn metrics(mut self, metrics: Arc<dyn Metrics>) -> Self {
1291 log::trace!("metrics(self, metrics: {:?})", stringify!(metrics));
1292 self.metrics.metrics.push(metrics);
1293 self
1294 }
1295
1296 /// Sets the interval for flushing metrics data.
1297 ///
1298 /// This value defines the frequency, in seconds, at which metrics data is
1299 /// flushed from memory. If not set, a default interval is used.
1300 ///
1301 /// # Parameters
1302 ///
1303 /// - `interval`: The flush interval for metrics, in seconds.
1304 ///
1305 /// # Example
1306 ///
1307 /// ```rust
1308 /// use carbon_core::pipeline::PipelineBuilder;
1309 ///
1310 /// let builder = PipelineBuilder::new()
1311 /// .metrics_flush_interval(60);
1312 /// ```
1313 pub fn metrics_flush_interval(mut self, interval: u64) -> Self {
1314 log::trace!("metrics_flush_interval(self, interval: {:?})", interval);
1315 self.metrics_flush_interval = Some(interval);
1316 self
1317 }
1318
1319 /// Sets the cancellation token for cancelling datasource on demand.
1320 ///
1321 /// This value is used to cancel datasource on demand.
1322 /// If not set, a default `CancellationToken` is used.
1323 ///
1324 /// # Parameters
1325 ///
1326 /// - `cancellation_token`: An instance of `CancellationToken`.
1327 ///
1328 /// # Example
1329 ///
1330 /// ```rust
1331 /// use carbon_core::pipeline::PipelineBuilder;
1332 /// use tokio_util::sync::CancellationToken;
1333 ///
1334 /// let builder = PipelineBuilder::new()
1335 /// .datasource_cancellation_token(CancellationToken::new());
1336 /// ```
1337 pub fn datasource_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
1338 log::trace!(
1339 "datasource_cancellation_token(self, cancellation_token: {:?})",
1340 cancellation_token
1341 );
1342 self.datasource_cancellation_token = Some(cancellation_token);
1343 self
1344 }
1345
1346 /// Sets the size of the channel buffer for the pipeline.
1347 ///
1348 /// This value defines the maximum number of updates that can be queued in
1349 /// the pipeline's channel buffer. If not set, a default size of 10_000
1350 /// will be used.
1351 ///
1352 /// # Parameters
1353 ///
1354 /// - `size`: The size of the channel buffer for the pipeline.
1355 ///
1356 /// # Example
1357 ///
1358 /// ```rust
1359 /// use carbon_core::pipeline::PipelineBuilder;
1360 ///
1361 /// let builder = PipelineBuilder::new()
1362 /// .channel_buffer_size(1000);
1363 /// ```
1364 pub fn channel_buffer_size(mut self, size: usize) -> Self {
1365 log::trace!("channel_buffer_size(self, size: {:?})", size);
1366 self.channel_buffer_size = size;
1367 self
1368 }
1369
1370 /// Builds and returns a `Pipeline` configured with the specified
1371 /// components.
1372 ///
1373 /// After configuring the `PipelineBuilder` with data sources, pipes, and
1374 /// metrics, call this method to create the final `Pipeline` instance
1375 /// ready for operation.
1376 ///
1377 /// # Returns
1378 ///
1379 /// Returns a `CarbonResult<Pipeline>` containing the configured `Pipeline`,
1380 /// or an error if any part of the configuration is invalid.
1381 ///
1382 /// # Example
1383 ///
1384 /// ```ignore
1385 /// use std::sync::Arc;
1386 ///
1387 /// carbon_core::pipeline::Pipeline::builder()
1388 /// .datasource(transaction_crawler)
1389 /// .metrics(Arc::new(LogMetrics::new()))
1390 /// .metrics(Arc::new(PrometheusMetrics::new()))
1391 /// .instruction(
1392 /// TestProgramDecoder,
1393 /// TestProgramProcessor
1394 /// )
1395 /// .account(
1396 /// TestProgramDecoder,
1397 /// TestProgramAccountProcessor
1398 /// )
1399 /// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
1400 /// .account_deletions(TestProgramAccountDeletionProcessor)
1401 /// .channel_buffer_size(1000)
1402 /// .build()?;
1403 ///
1404 /// Ok(())
1405 /// ```
1406 pub fn build(self) -> CarbonResult<Pipeline> {
1407 log::trace!("build(self)");
1408 Ok(Pipeline {
1409 datasources: self.datasources,
1410 account_pipes: self.account_pipes,
1411 account_deletion_pipes: self.account_deletion_pipes,
1412 block_details_pipes: self.block_details_pipes,
1413 instruction_pipes: self.instruction_pipes,
1414 transaction_pipes: self.transaction_pipes,
1415 shutdown_strategy: self.shutdown_strategy,
1416 metrics: Arc::new(self.metrics),
1417 metrics_flush_interval: self.metrics_flush_interval,
1418 datasource_cancellation_token: self.datasource_cancellation_token,
1419 channel_buffer_size: self.channel_buffer_size,
1420 })
1421 }
1422}