rigatoni_core/
pipeline.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Pipeline orchestration for MongoDB change streams to destinations.
18//!
19//! The [`Pipeline`] is the core orchestrator that connects MongoDB change streams
20//! to destinations. It handles:
21//!
22//! - **Batching**: Accumulate events and flush based on size or timeout
23//! - **Retry Logic**: Exponential backoff for failed destination writes
24//! - **State Management**: Persist resume tokens after successful writes
25//! - **Back-pressure**: Slow down MongoDB reads if destination is slow
26//! - **Graceful Shutdown**: Flush pending batches and save state
27//! - **Observability**: Structured logging and metrics
28//!
29//! # Example
30//!
31//! ```rust,no_run
32//! use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
33//! use rigatoni_core::stream::ChangeStreamConfig;
34//! use std::time::Duration;
35//!
36//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
37//! // Configure pipeline
38//! let config = PipelineConfig::builder()
39//!     .mongodb_uri("mongodb://localhost:27017")
40//!     .database("mydb")
41//!     .collections(vec!["users".to_string(), "orders".to_string()])
42//!     .batch_size(100)
43//!     .batch_timeout(Duration::from_secs(5))
44//!     .max_retries(3)
45//!     .build()?;
46//!
47//! // Create pipeline with state store and destination
48//! // let pipeline = Pipeline::new(config, store, destination).await?;
49//!
50//! // Start processing
51//! // pipeline.start().await?;
52//!
53//! // Graceful shutdown
54//! // pipeline.stop().await?;
55//! # Ok(())
56//! # }
57//! ```
58
59use crate::destination::{Destination, DestinationError};
60use crate::event::ChangeEvent;
61use crate::metrics;
62use crate::state::StateStore;
63use crate::stream::{ChangeStreamConfig, ChangeStreamListener};
64use futures::StreamExt;
65use mongodb::bson::Document;
66use std::future::Future;
67use std::pin::Pin;
68use std::sync::Arc;
69use std::time::Duration;
70use tokio::sync::{broadcast, Mutex, RwLock};
71use tokio::task::JoinHandle;
72use tokio::time::{interval, Instant};
73use tracing::{debug, error, info, instrument, warn};
74
75/// Configuration for the pipeline orchestrator.
76#[derive(Debug, Clone)]
77pub struct PipelineConfig {
78    /// MongoDB connection URI
79    pub mongodb_uri: String,
80
81    /// Database name to watch
82    pub database: String,
83
84    /// Collections to watch (empty = all collections)
85    pub collections: Vec<String>,
86
87    /// Maximum number of events to batch before flushing
88    pub batch_size: usize,
89
90    /// Maximum time to wait before flushing a batch
91    pub batch_timeout: Duration,
92
93    /// Maximum number of retry attempts for failed writes
94    pub max_retries: usize,
95
96    /// Initial retry delay (doubles with each retry)
97    pub retry_delay: Duration,
98
99    /// Maximum retry delay
100    pub max_retry_delay: Duration,
101
102    /// Channel buffer size for back-pressure
103    pub channel_buffer_size: usize,
104
105    /// Change stream configuration
106    pub stream_config: ChangeStreamConfig,
107}
108
109impl PipelineConfig {
110    /// Creates a new builder for `PipelineConfig`.
111    #[must_use]
112    pub fn builder() -> PipelineConfigBuilder {
113        PipelineConfigBuilder::default()
114    }
115}
116
117/// Builder for `PipelineConfig`.
118#[derive(Debug, Default)]
119pub struct PipelineConfigBuilder {
120    mongodb_uri: Option<String>,
121    database: Option<String>,
122    collections: Vec<String>,
123    batch_size: usize,
124    batch_timeout: Duration,
125    max_retries: usize,
126    retry_delay: Duration,
127    max_retry_delay: Duration,
128    channel_buffer_size: usize,
129    stream_config: Option<ChangeStreamConfig>,
130}
131
132impl PipelineConfigBuilder {
133    /// Sets the MongoDB URI.
134    #[must_use]
135    pub fn mongodb_uri(mut self, uri: impl Into<String>) -> Self {
136        self.mongodb_uri = Some(uri.into());
137        self
138    }
139
140    /// Sets the database name.
141    #[must_use]
142    pub fn database(mut self, database: impl Into<String>) -> Self {
143        self.database = Some(database.into());
144        self
145    }
146
147    /// Sets the collections to watch.
148    #[must_use]
149    pub fn collections(mut self, collections: Vec<String>) -> Self {
150        self.collections = collections;
151        self
152    }
153
154    /// Sets the batch size.
155    #[must_use]
156    pub fn batch_size(mut self, size: usize) -> Self {
157        self.batch_size = size;
158        self
159    }
160
161    /// Sets the batch timeout.
162    #[must_use]
163    pub fn batch_timeout(mut self, timeout: Duration) -> Self {
164        self.batch_timeout = timeout;
165        self
166    }
167
168    /// Sets the maximum number of retries.
169    #[must_use]
170    pub fn max_retries(mut self, retries: usize) -> Self {
171        self.max_retries = retries;
172        self
173    }
174
175    /// Sets the initial retry delay.
176    #[must_use]
177    pub fn retry_delay(mut self, delay: Duration) -> Self {
178        self.retry_delay = delay;
179        self
180    }
181
182    /// Sets the maximum retry delay.
183    #[must_use]
184    pub fn max_retry_delay(mut self, delay: Duration) -> Self {
185        self.max_retry_delay = delay;
186        self
187    }
188
189    /// Sets the channel buffer size.
190    #[must_use]
191    pub fn channel_buffer_size(mut self, size: usize) -> Self {
192        self.channel_buffer_size = size;
193        self
194    }
195
196    /// Sets the change stream configuration.
197    #[must_use]
198    pub fn stream_config(mut self, config: ChangeStreamConfig) -> Self {
199        self.stream_config = Some(config);
200        self
201    }
202
203    /// Builds the `PipelineConfig`.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if required fields are missing or invalid.
208    pub fn build(self) -> Result<PipelineConfig, ConfigError> {
209        let mongodb_uri = self.mongodb_uri.ok_or(ConfigError::MissingMongoUri)?;
210        let database = self.database.ok_or(ConfigError::MissingDatabase)?;
211
212        // Validate batch size
213        let batch_size = match self.batch_size {
214            0 => 100, // Default
215            size if size > 10_000 => {
216                return Err(ConfigError::InvalidBatchSize {
217                    value: size,
218                    reason: "batch_size exceeds maximum (10,000)",
219                })
220            }
221            size => size,
222        };
223
224        // Validate batch timeout
225        let batch_timeout = if self.batch_timeout.is_zero() {
226            Duration::from_secs(5) // Default
227        } else {
228            self.batch_timeout
229        };
230
231        // Set retry delays with defaults
232        let retry_delay = if self.retry_delay.is_zero() {
233            Duration::from_millis(100)
234        } else {
235            self.retry_delay
236        };
237
238        let max_retry_delay = if self.max_retry_delay.is_zero() {
239            Duration::from_secs(30)
240        } else {
241            self.max_retry_delay
242        };
243
244        // Cross-field validation: retry_delay must not exceed max_retry_delay
245        if retry_delay > max_retry_delay {
246            return Err(ConfigError::RetryDelayExceedsMax {
247                retry_delay,
248                max_retry_delay,
249            });
250        }
251
252        // Validate channel buffer size
253        let channel_buffer_size = match self.channel_buffer_size {
254            0 => 1000, // Default
255            size if size < 10 => {
256                return Err(ConfigError::InvalidChannelBufferSize {
257                    value: size,
258                    reason: "channel_buffer_size must be at least 10",
259                })
260            }
261            size => size,
262        };
263
264        let stream_config = self.stream_config.unwrap_or_else(|| {
265            ChangeStreamConfig::builder()
266                .build()
267                .expect("Default stream config should build")
268        });
269
270        Ok(PipelineConfig {
271            mongodb_uri,
272            database,
273            collections: self.collections,
274            batch_size,
275            batch_timeout,
276            max_retries: self.max_retries,
277            retry_delay,
278            max_retry_delay,
279            channel_buffer_size,
280            stream_config,
281        })
282    }
283}
284
285/// Pipeline statistics.
286#[derive(Debug, Clone, Default)]
287pub struct PipelineStats {
288    /// Total events processed
289    pub events_processed: u64,
290
291    /// Total batches written
292    pub batches_written: u64,
293
294    /// Total write errors
295    pub write_errors: u64,
296
297    /// Total retries
298    pub retries: u64,
299}
300
301/// Type alias for worker task handles.
302type WorkerHandle = JoinHandle<Result<(), PipelineError>>;
303
304/// Pipeline orchestrator that connects MongoDB change streams to destinations.
305pub struct Pipeline<S: StateStore, D: Destination> {
306    /// Pipeline configuration
307    config: PipelineConfig,
308
309    /// State store for resume tokens
310    store: Arc<S>,
311
312    /// Destination for events
313    destination: Arc<Mutex<D>>,
314
315    /// Shutdown sender (taken when starting)
316    shutdown_tx: Option<broadcast::Sender<()>>,
317
318    /// Worker task handles
319    workers: Arc<RwLock<Vec<WorkerHandle>>>,
320
321    /// Pipeline statistics
322    stats: Arc<RwLock<PipelineStats>>,
323
324    /// Running flag
325    running: Arc<RwLock<bool>>,
326}
327
328impl<S: StateStore + Send + Sync + 'static, D: Destination + Send + Sync + 'static> Pipeline<S, D> {
329    /// Creates a new pipeline instance.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the destination cannot be initialized.
334    pub async fn new(
335        config: PipelineConfig,
336        store: S,
337        destination: D,
338    ) -> Result<Self, PipelineError> {
339        info!(
340            database = %config.database,
341            collections = ?config.collections,
342            batch_size = config.batch_size,
343            batch_timeout = ?config.batch_timeout,
344            "Creating pipeline"
345        );
346
347        Ok(Self {
348            config,
349            store: Arc::new(store),
350            destination: Arc::new(Mutex::new(destination)),
351            shutdown_tx: None,
352            workers: Arc::new(RwLock::new(Vec::new())),
353            stats: Arc::new(RwLock::new(PipelineStats::default())),
354            running: Arc::new(RwLock::new(false)),
355        })
356    }
357
358    /// Starts the pipeline, spawning workers for each collection.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if:
363    /// - The pipeline is already running
364    /// - MongoDB connection fails
365    /// - Worker spawn fails
366    #[instrument(skip(self), fields(database = %self.config.database))]
367    pub async fn start(&mut self) -> Result<(), PipelineError> {
368        // Check if already running
369        let mut running = self.running.write().await;
370        if *running {
371            return Err(PipelineError::AlreadyRunning);
372        }
373
374        // Early validation: watching all collections is not yet supported
375        if self.config.collections.is_empty() {
376            return Err(PipelineError::Configuration(
377                "Watching all collections is not yet implemented. \
378                 Please specify explicit collections in the configuration."
379                    .to_string(),
380            ));
381        }
382
383        info!("Starting pipeline");
384
385        // Create shutdown channel (broadcast so all workers get the signal)
386        let (shutdown_tx, _) = broadcast::channel(1);
387        self.shutdown_tx = Some(shutdown_tx.clone());
388
389        let collections = self.config.collections.clone();
390        let num_collections = collections.len();
391
392        // Spawn worker for each collection
393        let mut workers = self.workers.write().await;
394        for collection in collections {
395            let shutdown_rx = shutdown_tx.subscribe();
396            let worker = self
397                .spawn_collection_worker(collection.clone(), shutdown_rx)
398                .await?;
399
400            workers.push(worker);
401        }
402
403        *running = true;
404        info!(workers = workers.len(), "Pipeline started");
405
406        // Update metrics
407        metrics::set_pipeline_status(metrics::PipelineStatus::Running);
408        metrics::set_active_collections(num_collections);
409
410        Ok(())
411    }
412
413    /// Spawns a worker task for a specific collection.
414    async fn spawn_collection_worker(
415        &self,
416        collection: String,
417        shutdown_rx: broadcast::Receiver<()>,
418    ) -> Result<WorkerHandle, PipelineError> {
419        let config = self.config.clone();
420        let store = Arc::clone(&self.store);
421        let destination = Arc::clone(&self.destination);
422        let stats = Arc::clone(&self.stats);
423
424        let collection_name = if collection.is_empty() {
425            "all".to_string()
426        } else {
427            collection
428        };
429
430        let handle = tokio::spawn(async move {
431            Self::collection_worker(
432                collection_name,
433                config,
434                store,
435                destination,
436                stats,
437                shutdown_rx,
438            )
439            .await
440        });
441
442        Ok(handle)
443    }
444
445    /// Worker task that processes events for a collection.
446    #[allow(clippy::too_many_lines)]
447    #[instrument(skip(config, store, destination, stats, shutdown_rx), fields(collection = %collection))]
448    async fn collection_worker(
449        collection: String,
450        config: PipelineConfig,
451        store: Arc<S>,
452        destination: Arc<Mutex<D>>,
453        stats: Arc<RwLock<PipelineStats>>,
454        mut shutdown_rx: broadcast::Receiver<()>,
455    ) -> Result<(), PipelineError> {
456        info!("Starting collection worker");
457
458        // Get resume token from state store
459        let resume_token = store
460            .get_resume_token(&collection)
461            .await
462            .map_err(|e| PipelineError::StateStore(e.to_string()))?;
463
464        if let Some(ref token) = resume_token {
465            info!(?token, "Resuming from saved token");
466        }
467
468        // Connect to MongoDB
469        let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
470            .await
471            .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
472
473        let db = client.database(&config.database);
474
475        // Get the collection to watch
476        let mongo_collection = if !collection.is_empty() && collection != "all" {
477            db.collection(&collection)
478        } else {
479            // Watch all collections in the database
480            // For now, we'll watch a specific collection - watching all is more complex
481            // TODO: Support watching entire database
482            return Err(PipelineError::Configuration(
483                "Watching all collections not yet implemented".to_string(),
484            ));
485        };
486
487        // Create resume token callback that saves to state store
488        let store_clone = Arc::clone(&store);
489        let collection_clone = collection.clone();
490        let resume_token_callback = move |token: Document| {
491            let store = Arc::clone(&store_clone);
492            let coll = collection_clone.clone();
493            Box::pin(async move {
494                store
495                    .save_resume_token(&coll, &token)
496                    .await
497                    .map_err(|e| e.to_string())
498            }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
499        };
500
501        // Create change stream listener
502        let mut listener = ChangeStreamListener::new(
503            mongo_collection,
504            config.stream_config.clone(),
505            resume_token_callback,
506        )
507        .await
508        .map_err(|e| PipelineError::ChangeStream(e.to_string()))?;
509
510        // Event batch accumulator
511        let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
512        let mut last_resume_token: Option<Document> = None;
513
514        // Batch timeout interval
515        let mut batch_timer = interval(config.batch_timeout);
516        batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
517
518        info!(
519            batch_size = config.batch_size,
520            batch_timeout = ?config.batch_timeout,
521            "Worker event loop started"
522        );
523
524        loop {
525            tokio::select! {
526                // Check for shutdown signal
527                _ = shutdown_rx.recv() => {
528                    info!("Received shutdown signal");
529
530                    // Flush pending batch
531                    if !batch.is_empty() {
532                        info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
533                        if let Err(e) = Self::flush_batch(
534                            &collection,
535                            &mut batch,
536                            last_resume_token.as_ref(),
537                            &destination,
538                            &store,
539                            &stats,
540                            &config,
541                        )
542                        .await
543                        {
544                            error!(?e, "Failed to flush batch on shutdown");
545                        }
546                    }
547
548                    info!("Worker shutting down gracefully");
549                    break;
550                }
551
552                // Batch timeout - flush accumulated events
553                _ = batch_timer.tick() => {
554                    if !batch.is_empty() {
555                        debug!(batch_size = batch.len(), "Batch timeout - flushing");
556
557                        if let Err(e) = Self::flush_batch(
558                            &collection,
559                            &mut batch,
560                            last_resume_token.as_ref(),
561                            &destination,
562                            &store,
563                            &stats,
564                            &config,
565                        )
566                        .await
567                        {
568                            error!(?e, "Failed to flush batch on timeout");
569                            // Continue processing - don't break the loop
570                        }
571                    }
572                }
573
574                // Read next event from change stream
575                event_result = listener.next() => {
576                    match event_result {
577                        Some(Ok(ackable_event)) => {
578                            // Extract the change event
579                            let event = ackable_event.event.clone();
580
581                            debug!(
582                                operation = ?event.operation,
583                                collection = %event.namespace.collection,
584                                "Received event"
585                            );
586
587                            // Store resume token
588                            last_resume_token = Some(event.resume_token.clone());
589
590                            // Acknowledge the event (sends resume token to callback)
591                            ackable_event.ack();
592
593                            // Add to batch
594                            batch.push(event.clone());
595
596                            // Update metrics
597                            metrics::increment_batch_queue_size(&collection);
598
599                            // Check if batch is full
600                            if batch.len() >= config.batch_size {
601                                debug!(batch_size = batch.len(), "Batch full - flushing");
602
603                                if let Err(e) = Self::flush_batch(
604                                    &collection,
605                                    &mut batch,
606                                    last_resume_token.as_ref(),
607                                    &destination,
608                                    &store,
609                                    &stats,
610                                    &config,
611                                )
612                                .await
613                                {
614                                    error!(?e, "Failed to flush full batch");
615                                    // Continue processing
616                                }
617                            }
618                        }
619                        Some(Err(e)) => {
620                            error!(?e, "Error reading from change stream");
621                            // Try to reconnect after a delay
622                            tokio::time::sleep(Duration::from_secs(1)).await;
623                        }
624                        None => {
625                            // Stream ended - shouldn't happen with MongoDB change streams
626                            warn!("Change stream ended unexpectedly");
627                            break;
628                        }
629                    }
630                }
631            }
632        }
633
634        Ok(())
635    }
636
637    /// Flushes a batch of events to the destination with retry logic.
638    #[instrument(skip(batch, last_resume_token, destination, store, stats, config), fields(collection = %collection, batch_size = batch.len()))]
639    async fn flush_batch(
640        collection: &str,
641        batch: &mut Vec<ChangeEvent>,
642        last_resume_token: Option<&Document>,
643        destination: &Arc<Mutex<D>>,
644        store: &Arc<S>,
645        stats: &Arc<RwLock<PipelineStats>>,
646        config: &PipelineConfig,
647    ) -> Result<(), PipelineError> {
648        if batch.is_empty() {
649            return Ok(());
650        }
651
652        let batch_size = batch.len();
653        let start_time = Instant::now();
654
655        debug!("Flushing batch to destination");
656
657        // Record batch size metric
658        metrics::record_batch_size(batch_size, collection);
659
660        // Write to destination with retry
661        Self::write_with_retry(batch, destination, config, stats).await?;
662
663        let elapsed = start_time.elapsed();
664        info!(
665            batch_size,
666            elapsed_ms = elapsed.as_millis(),
667            "Batch written successfully"
668        );
669
670        // Record batch duration metric
671        metrics::record_batch_duration(elapsed.as_secs_f64(), collection);
672
673        // Save resume token after successful write
674        if let Some(token) = last_resume_token {
675            store
676                .save_resume_token(collection, token)
677                .await
678                .map_err(|e| PipelineError::StateStore(e.to_string()))?;
679
680            debug!("Resume token saved");
681        }
682
683        // Count processed events (bulk increment by operation type)
684        let mut operation_counts = std::collections::HashMap::new();
685        for event in batch.iter() {
686            *operation_counts.entry(&event.operation).or_insert(0u64) += 1;
687        }
688        for (operation, count) in operation_counts {
689            metrics::increment_events_processed_by(count, collection, operation.as_str());
690        }
691
692        // Update statistics
693        let mut s = stats.write().await;
694        s.events_processed += batch_size as u64;
695        s.batches_written += 1;
696
697        // Update queue size metric
698        metrics::decrement_batch_queue_size(batch_size, collection);
699
700        // Clear batch
701        batch.clear();
702
703        Ok(())
704    }
705
706    /// Writes a batch to the destination with exponential backoff retry.
707    #[instrument(skip(batch, destination, config, stats), fields(batch_size = batch.len()))]
708    async fn write_with_retry(
709        batch: &[ChangeEvent],
710        destination: &Arc<Mutex<D>>,
711        config: &PipelineConfig,
712        stats: &Arc<RwLock<PipelineStats>>,
713    ) -> Result<(), PipelineError> {
714        let mut retry_delay = config.retry_delay;
715        let mut attempt = 0;
716
717        loop {
718            let result = {
719                let mut dest = destination.lock().await;
720                match dest.write_batch(batch).await {
721                    Ok(()) => dest.flush().await,
722                    Err(e) => Err(e),
723                }
724            };
725
726            match result {
727                Ok(()) => {
728                    if attempt > 0 {
729                        info!(attempts = attempt + 1, "Write succeeded after retries");
730                    }
731                    return Ok(());
732                }
733                Err(e) => {
734                    // Increment write error counter for each failed attempt
735                    {
736                        let mut s = stats.write().await;
737                        s.write_errors += 1;
738                    }
739
740                    // Record error type metric
741                    let error_category = Self::categorize_error(&e);
742                    let destination_type = {
743                        let dest = destination.lock().await;
744                        dest.metadata().destination_type.clone()
745                    };
746                    metrics::increment_destination_errors(&destination_type, error_category);
747
748                    attempt += 1;
749
750                    if attempt > config.max_retries {
751                        error!(attempts = attempt, ?e, "Write failed after max retries");
752                        return Err(PipelineError::Destination(e.to_string()));
753                    }
754
755                    // Check if error is retryable
756                    if !Self::is_retryable_error(&e) {
757                        error!(?e, "Non-retryable error encountered");
758                        return Err(PipelineError::Destination(e.to_string()));
759                    }
760
761                    // Increment retry counter (only for actual retries, not initial attempt)
762                    {
763                        let mut s = stats.write().await;
764                        s.retries += 1;
765                    }
766
767                    // Record retry metric
768                    metrics::increment_retries(error_category);
769
770                    warn!(
771                        attempt,
772                        max_retries = config.max_retries,
773                        retry_delay_ms = retry_delay.as_millis(),
774                        ?e,
775                        "Write failed, retrying"
776                    );
777
778                    // Wait before retry
779                    tokio::time::sleep(retry_delay).await;
780
781                    // Exponential backoff with cap
782                    retry_delay = std::cmp::min(retry_delay * 2, config.max_retry_delay);
783                }
784            }
785        }
786    }
787
788    /// Checks if a destination error is retryable.
789    fn is_retryable_error(error: &DestinationError) -> bool {
790        // Check if the error indicates it's retryable
791        // This depends on the DestinationError implementation
792        error.to_string().contains("retryable") || error.to_string().contains("timeout")
793    }
794
795    /// Categorizes an error for metrics labeling.
796    ///
797    /// Maps errors to a small set of categories to avoid cardinality explosion.
798    fn categorize_error(error: &DestinationError) -> metrics::ErrorCategory {
799        let error_str = error.to_string().to_lowercase();
800
801        if error_str.contains("timeout") {
802            metrics::ErrorCategory::Timeout
803        } else if error_str.contains("connection") || error_str.contains("network") {
804            metrics::ErrorCategory::Connection
805        } else if error_str.contains("serialization") || error_str.contains("encode") {
806            metrics::ErrorCategory::Serialization
807        } else if error_str.contains("permission") || error_str.contains("auth") {
808            metrics::ErrorCategory::Permission
809        } else if error_str.contains("validation") {
810            metrics::ErrorCategory::Validation
811        } else if error_str.contains("not found") || error_str.contains("404") {
812            metrics::ErrorCategory::NotFound
813        } else if error_str.contains("rate limit") || error_str.contains("throttle") {
814            metrics::ErrorCategory::RateLimit
815        } else {
816            metrics::ErrorCategory::Unknown
817        }
818    }
819
820    /// Stops the pipeline gracefully.
821    ///
822    /// This will:
823    /// 1. Send shutdown signal to all workers
824    /// 2. Wait for workers to finish processing
825    /// 3. Flush any pending batches
826    /// 4. Close destination connection
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if shutdown fails or workers panic.
831    #[instrument(skip(self))]
832    pub async fn stop(&mut self) -> Result<(), PipelineError> {
833        info!("Stopping pipeline");
834
835        let mut running = self.running.write().await;
836        if !*running {
837            warn!("Pipeline is not running");
838            return Ok(());
839        }
840
841        // Send shutdown signal (broadcast to all workers)
842        if let Some(tx) = self.shutdown_tx.take() {
843            let _ = tx.send(());
844        }
845
846        // Wait for all workers to finish
847        let mut workers = self.workers.write().await;
848        for worker in workers.drain(..) {
849            match worker.await {
850                Ok(Ok(())) => {
851                    debug!("Worker stopped successfully");
852                }
853                Ok(Err(e)) => {
854                    error!(?e, "Worker stopped with error");
855                }
856                Err(e) => {
857                    error!(?e, "Worker panicked");
858                }
859            }
860        }
861
862        // Flush and close destination
863        let mut dest = self.destination.lock().await;
864        dest.flush()
865            .await
866            .map_err(|e| PipelineError::Destination(e.to_string()))?;
867        dest.close()
868            .await
869            .map_err(|e| PipelineError::Destination(e.to_string()))?;
870
871        *running = false;
872
873        // Update metrics
874        metrics::set_pipeline_status(metrics::PipelineStatus::Stopped);
875        metrics::set_active_collections(0);
876
877        // Log final statistics
878        let stats = self.stats.read().await;
879        info!(
880            events_processed = stats.events_processed,
881            batches_written = stats.batches_written,
882            write_errors = stats.write_errors,
883            retries = stats.retries,
884            "Pipeline stopped"
885        );
886
887        Ok(())
888    }
889
890    /// Returns the current pipeline statistics.
891    #[must_use]
892    pub async fn stats(&self) -> PipelineStats {
893        self.stats.read().await.clone()
894    }
895
896    /// Checks if the pipeline is currently running.
897    #[must_use]
898    pub async fn is_running(&self) -> bool {
899        *self.running.read().await
900    }
901}
902
903/// Pipeline configuration errors.
904#[derive(Debug, thiserror::Error)]
905pub enum ConfigError {
906    /// Missing required MongoDB URI
907    #[error("mongodb_uri is required")]
908    MissingMongoUri,
909
910    /// Missing required database name
911    #[error("database is required")]
912    MissingDatabase,
913
914    /// Invalid batch size
915    #[error("Invalid batch_size: {value} ({reason})")]
916    InvalidBatchSize { value: usize, reason: &'static str },
917
918    /// Invalid batch timeout
919    #[error("Invalid batch_timeout: {reason}")]
920    InvalidBatchTimeout { reason: &'static str },
921
922    /// Retry delay exceeds maximum
923    #[error("retry_delay ({retry_delay:?}) exceeds max_retry_delay ({max_retry_delay:?})")]
924    RetryDelayExceedsMax {
925        retry_delay: Duration,
926        max_retry_delay: Duration,
927    },
928
929    /// Invalid channel buffer size
930    #[error("Invalid channel_buffer_size: {value} ({reason})")]
931    InvalidChannelBufferSize { value: usize, reason: &'static str },
932}
933
934/// Pipeline errors.
935#[derive(Debug, thiserror::Error)]
936pub enum PipelineError {
937    /// Pipeline is already running
938    #[error("Pipeline is already running")]
939    AlreadyRunning,
940
941    /// MongoDB connection error
942    #[error("MongoDB error: {0}")]
943    MongoDB(String),
944
945    /// Change stream error
946    #[error("Change stream error: {0}")]
947    ChangeStream(String),
948
949    /// Destination error
950    #[error("Destination error: {0}")]
951    Destination(String),
952
953    /// State store error
954    #[error("State store error: {0}")]
955    StateStore(String),
956
957    /// Configuration error
958    #[error("Configuration error: {0}")]
959    Configuration(String),
960
961    /// Other errors
962    #[error("Pipeline error: {0}")]
963    Other(String),
964}