Module pipeline

Module pipeline 

Source
Expand description

Pipeline orchestration for MongoDB change streams to destinations.

The Pipeline is the core orchestrator that connects MongoDB change streams to destinations. It handles:

  • Batching: Accumulate events and flush based on size or timeout
  • Retry Logic: Exponential backoff for failed destination writes
  • State Management: Persist resume tokens after successful writes
  • Back-pressure: Slow down MongoDB reads if destination is slow
  • Graceful Shutdown: Flush pending batches and save state
  • Observability: Structured logging and metrics

§Example

use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_core::stream::ChangeStreamConfig;
use std::time::Duration;

// Configure pipeline
let config = PipelineConfig::builder()
    .mongodb_uri("mongodb://localhost:27017")
    .database("mydb")
    .collections(vec!["users".to_string(), "orders".to_string()])
    .batch_size(100)
    .batch_timeout(Duration::from_secs(5))
    .max_retries(3)
    .build()?;

// Create pipeline with state store and destination
// let pipeline = Pipeline::new(config, store, destination).await?;

// Start processing
// pipeline.start().await?;

// Graceful shutdown
// pipeline.stop().await?;

Structs§

Pipeline
Pipeline orchestrator that connects MongoDB change streams to destinations.
PipelineConfig
Configuration for the pipeline orchestrator.
PipelineConfigBuilder
Builder for PipelineConfig.
PipelineStats
Pipeline statistics.

Enums§

ConfigError
Pipeline configuration errors.
PipelineError
Pipeline errors.