Expand description
Pipeline orchestration for MongoDB change streams to destinations.
The Pipeline is the core orchestrator that connects MongoDB change streams
to destinations. It handles:
- Batching: Accumulate events and flush based on size or timeout
- Retry Logic: Exponential backoff for failed destination writes
- State Management: Persist resume tokens after successful writes
- Back-pressure: Slow down MongoDB reads if destination is slow
- Graceful Shutdown: Flush pending batches and save state
- Observability: Structured logging and metrics
§Example
use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
use rigatoni_core::stream::ChangeStreamConfig;
use std::time::Duration;
// Configure pipeline
let config = PipelineConfig::builder()
.mongodb_uri("mongodb://localhost:27017")
.database("mydb")
.collections(vec!["users".to_string(), "orders".to_string()])
.batch_size(100)
.batch_timeout(Duration::from_secs(5))
.max_retries(3)
.build()?;
// Create pipeline with state store and destination
// let pipeline = Pipeline::new(config, store, destination).await?;
// Start processing
// pipeline.start().await?;
// Graceful shutdown
// pipeline.stop().await?;Structs§
- Pipeline
- Pipeline orchestrator that connects MongoDB change streams to destinations.
- Pipeline
Config - Configuration for the pipeline orchestrator.
- Pipeline
Config Builder - Builder for
PipelineConfig. - Pipeline
Stats - Pipeline statistics.
Enums§
- Config
Error - Pipeline configuration errors.
- Pipeline
Error - Pipeline errors.