rigatoni_core/
pipeline.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Pipeline orchestration for MongoDB change streams to destinations.
18//!
19//! The [`Pipeline`] is the core orchestrator that connects MongoDB change streams
20//! to destinations. It handles:
21//!
22//! - **Batching**: Accumulate events and flush based on size or timeout
23//! - **Retry Logic**: Exponential backoff for failed destination writes
24//! - **State Management**: Persist resume tokens after successful writes
25//! - **Back-pressure**: Slow down MongoDB reads if destination is slow
26//! - **Graceful Shutdown**: Flush pending batches and save state
27//! - **Observability**: Structured logging and metrics
28//!
29//! # Example
30//!
31//! ```rust,no_run
32//! use rigatoni_core::pipeline::{Pipeline, PipelineConfig};
33//! use rigatoni_core::stream::ChangeStreamConfig;
34//! use std::time::Duration;
35//!
36//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
37//! // Configure pipeline
38//! let config = PipelineConfig::builder()
39//!     .mongodb_uri("mongodb://localhost:27017")
40//!     .database("mydb")
41//!     .collections(vec!["users".to_string(), "orders".to_string()])
42//!     .batch_size(100)
43//!     .batch_timeout(Duration::from_secs(5))
44//!     .max_retries(3)
45//!     .build()?;
46//!
47//! // Create pipeline with state store and destination
48//! // let pipeline = Pipeline::new(config, store, destination).await?;
49//!
50//! // Start processing
51//! // pipeline.start().await?;
52//!
53//! // Graceful shutdown
54//! // pipeline.stop().await?;
55//! # Ok(())
56//! # }
57//! ```
58
59use crate::destination::{Destination, DestinationError};
60use crate::event::ChangeEvent;
61use crate::metrics;
62use crate::state::StateStore;
63use crate::stream::{ChangeStreamConfig, ChangeStreamListener};
64use crate::watch_level::WatchLevel;
65use futures::StreamExt;
66use mongodb::bson::Document;
67use std::future::Future;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::time::Duration;
71use tokio::sync::{broadcast, Mutex, RwLock};
72use tokio::task::JoinHandle;
73use tokio::time::{interval, Instant};
74use tracing::{debug, error, info, instrument, warn};
75
76/// Configuration for distributed locking in multi-instance deployments.
77///
78/// Distributed locking ensures that only one pipeline instance processes
79/// events for a given collection at any time, preventing duplicate processing.
80///
81/// # How It Works
82///
83/// - Each collection is protected by a distributed lock (stored in Redis)
84/// - Only the instance holding the lock processes events for that collection
85/// - If an instance crashes, its locks expire after TTL
86/// - Other instances automatically take over expired locks
87///
88/// # Example
89///
90/// ```rust
91/// use rigatoni_core::pipeline::DistributedLockConfig;
92/// use std::time::Duration;
93///
94/// let lock_config = DistributedLockConfig {
95///     enabled: true,
96///     ttl: Duration::from_secs(30),
97///     refresh_interval: Duration::from_secs(10),
98///     retry_interval: Duration::from_secs(5),
99/// };
100/// ```
101#[derive(Debug, Clone)]
102pub struct DistributedLockConfig {
103    /// Enable distributed locking (requires StateStore with lock support).
104    ///
105    /// When enabled (default), the pipeline will acquire locks before starting
106    /// workers. This is required for multi-instance deployments to prevent
107    /// duplicate event processing.
108    ///
109    /// Disable only for:
110    /// - Single-instance deployments
111    /// - Development/testing scenarios
112    /// - When using a StateStore without lock support
113    pub enabled: bool,
114
115    /// Lock time-to-live (expiry time).
116    ///
117    /// If an instance crashes without releasing its locks, the locks will
118    /// expire after this duration, allowing other instances to take over.
119    ///
120    /// **Tradeoff**:
121    /// - Short TTL (10s): Fast failover, but needs frequent refresh
122    /// - Long TTL (60s): Less refresh overhead, but slow failover
123    ///
124    /// **Recommended**: 30 seconds (default)
125    pub ttl: Duration,
126
127    /// How often to refresh the lock (heartbeat interval).
128    ///
129    /// The lock owner must refresh the lock periodically to prevent expiry.
130    /// This interval should be significantly less than the TTL.
131    ///
132    /// **Rule of thumb**: refresh_interval < ttl / 2
133    ///
134    /// **Recommended**: TTL / 3 (e.g., 10s for 30s TTL) (default)
135    pub refresh_interval: Duration,
136
137    /// How long to wait before retrying lock acquisition.
138    ///
139    /// If a lock is held by another instance, this instance will wait
140    /// this duration before attempting to acquire the lock again.
141    ///
142    /// **Recommended**: 5-10 seconds (default: 5s)
143    pub retry_interval: Duration,
144}
145
146impl Default for DistributedLockConfig {
147    fn default() -> Self {
148        Self {
149            enabled: true, // Enabled by default for safety (prevents duplicates)
150            ttl: Duration::from_secs(30),
151            refresh_interval: Duration::from_secs(10),
152            retry_interval: Duration::from_secs(5),
153        }
154    }
155}
156
157impl DistributedLockConfig {
158    /// Creates a new builder for `DistributedLockConfig`.
159    #[must_use]
160    pub fn builder() -> DistributedLockConfigBuilder {
161        DistributedLockConfigBuilder::default()
162    }
163
164    /// Creates a disabled lock configuration.
165    ///
166    /// Use this for single-instance deployments where locking is not needed.
167    #[must_use]
168    pub fn disabled() -> Self {
169        Self {
170            enabled: false,
171            ..Default::default()
172        }
173    }
174}
175
176/// Builder for `DistributedLockConfig`.
177#[derive(Debug, Default)]
178pub struct DistributedLockConfigBuilder {
179    enabled: Option<bool>,
180    ttl: Option<Duration>,
181    refresh_interval: Option<Duration>,
182    retry_interval: Option<Duration>,
183}
184
185impl DistributedLockConfigBuilder {
186    /// Sets whether distributed locking is enabled.
187    #[must_use]
188    pub fn enabled(mut self, enabled: bool) -> Self {
189        self.enabled = Some(enabled);
190        self
191    }
192
193    /// Sets the lock TTL.
194    #[must_use]
195    pub fn ttl(mut self, ttl: Duration) -> Self {
196        self.ttl = Some(ttl);
197        self
198    }
199
200    /// Sets the lock refresh interval.
201    #[must_use]
202    pub fn refresh_interval(mut self, interval: Duration) -> Self {
203        self.refresh_interval = Some(interval);
204        self
205    }
206
207    /// Sets the lock retry interval.
208    #[must_use]
209    pub fn retry_interval(mut self, interval: Duration) -> Self {
210        self.retry_interval = Some(interval);
211        self
212    }
213
214    /// Builds the `DistributedLockConfig`.
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if:
219    /// - `refresh_interval` >= `ttl` (lock would expire before refresh)
220    pub fn build(self) -> Result<DistributedLockConfig, ConfigError> {
221        let ttl = self.ttl.unwrap_or(Duration::from_secs(30));
222        let refresh_interval = self.refresh_interval.unwrap_or(Duration::from_secs(10));
223
224        // Validate: refresh_interval must be less than ttl
225        if refresh_interval >= ttl {
226            return Err(ConfigError::InvalidLockConfig {
227                reason: format!(
228                    "refresh_interval ({:?}) must be less than ttl ({:?})",
229                    refresh_interval, ttl
230                ),
231            });
232        }
233
234        Ok(DistributedLockConfig {
235            enabled: self.enabled.unwrap_or(true),
236            ttl,
237            refresh_interval,
238            retry_interval: self.retry_interval.unwrap_or(Duration::from_secs(5)),
239        })
240    }
241}
242
243/// Configuration for the pipeline orchestrator.
244#[derive(Debug, Clone)]
245pub struct PipelineConfig {
246    /// MongoDB connection URI
247    pub mongodb_uri: String,
248
249    /// Database name to watch
250    pub database: String,
251
252    /// Scope of collections to watch.
253    ///
254    /// Determines whether to watch specific collections, the entire database,
255    /// or all databases in the deployment (cluster-wide).
256    ///
257    /// Default: [`WatchLevel::Database`] - watches all collections in the database.
258    pub watch_level: WatchLevel,
259
260    /// Maximum number of events to batch before flushing
261    pub batch_size: usize,
262
263    /// Maximum time to wait before flushing a batch
264    pub batch_timeout: Duration,
265
266    /// Maximum number of retry attempts for failed writes
267    pub max_retries: usize,
268
269    /// Initial retry delay (doubles with each retry)
270    pub retry_delay: Duration,
271
272    /// Maximum retry delay
273    pub max_retry_delay: Duration,
274
275    /// Channel buffer size for back-pressure
276    pub channel_buffer_size: usize,
277
278    /// Change stream configuration
279    pub stream_config: ChangeStreamConfig,
280
281    /// Distributed locking configuration for multi-instance deployments.
282    ///
283    /// When running multiple pipeline instances watching the same collections,
284    /// distributed locking ensures only one instance processes events at a time.
285    ///
286    /// Default: Enabled with 30s TTL, 10s refresh, 5s retry
287    pub distributed_lock: DistributedLockConfig,
288}
289
290impl PipelineConfig {
291    /// Creates a new builder for `PipelineConfig`.
292    #[must_use]
293    pub fn builder() -> PipelineConfigBuilder {
294        PipelineConfigBuilder::default()
295    }
296}
297
298/// Builder for `PipelineConfig`.
299#[derive(Debug, Default)]
300pub struct PipelineConfigBuilder {
301    mongodb_uri: Option<String>,
302    database: Option<String>,
303    watch_level: Option<WatchLevel>,
304    batch_size: usize,
305    batch_timeout: Duration,
306    max_retries: usize,
307    retry_delay: Duration,
308    max_retry_delay: Duration,
309    channel_buffer_size: usize,
310    stream_config: Option<ChangeStreamConfig>,
311    distributed_lock: Option<DistributedLockConfig>,
312}
313
314impl PipelineConfigBuilder {
315    /// Sets the MongoDB URI.
316    #[must_use]
317    pub fn mongodb_uri(mut self, uri: impl Into<String>) -> Self {
318        self.mongodb_uri = Some(uri.into());
319        self
320    }
321
322    /// Sets the database name.
323    #[must_use]
324    pub fn database(mut self, database: impl Into<String>) -> Self {
325        self.database = Some(database.into());
326        self
327    }
328
329    /// Sets the collections to watch.
330    ///
331    /// # Deprecated
332    ///
333    /// This method is deprecated in favor of [`watch_collections`](Self::watch_collections).
334    /// It will continue to work but may be removed in a future release.
335    ///
336    /// # Example
337    ///
338    /// ```rust
339    /// use rigatoni_core::pipeline::PipelineConfig;
340    ///
341    /// // Deprecated usage:
342    /// #[allow(deprecated)]
343    /// let config = PipelineConfig::builder()
344    ///     .mongodb_uri("mongodb://localhost:27017")
345    ///     .database("mydb")
346    ///     .collections(vec!["users".to_string()])
347    ///     .build();
348    ///
349    /// // Recommended usage:
350    /// let config = PipelineConfig::builder()
351    ///     .mongodb_uri("mongodb://localhost:27017")
352    ///     .database("mydb")
353    ///     .watch_collections(vec!["users".to_string()])
354    ///     .build();
355    /// ```
356    #[must_use]
357    #[deprecated(since = "0.2.0", note = "Use watch_collections() instead")]
358    pub fn collections(mut self, collections: Vec<String>) -> Self {
359        self.watch_level = Some(WatchLevel::Collection(collections));
360        self
361    }
362
363    /// Watch specific collections only.
364    ///
365    /// This creates a separate change stream worker for each collection,
366    /// enabling parallel processing. Use this when you know exactly which
367    /// collections to monitor.
368    ///
369    /// # Arguments
370    ///
371    /// * `collections` - List of collection names to watch
372    ///
373    /// # Example
374    ///
375    /// ```rust
376    /// use rigatoni_core::pipeline::PipelineConfig;
377    ///
378    /// let config = PipelineConfig::builder()
379    ///     .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
380    ///     .database("mydb")
381    ///     .watch_collections(vec!["users".to_string(), "orders".to_string()])
382    ///     .build();
383    /// ```
384    #[must_use]
385    pub fn watch_collections(mut self, collections: Vec<String>) -> Self {
386        self.watch_level = Some(WatchLevel::Collection(collections));
387        self
388    }
389
390    /// Watch all collections in the database.
391    ///
392    /// This creates a single change stream that monitors all collections
393    /// in the database. New collections are automatically included as
394    /// they are created.
395    ///
396    /// This is the recommended mode for most use cases, especially when:
397    /// - Collections are added/removed dynamically
398    /// - You want to capture all changes without maintaining a collection list
399    /// - The database has fewer than ~50 collections
400    ///
401    /// # Example
402    ///
403    /// ```rust
404    /// use rigatoni_core::pipeline::PipelineConfig;
405    ///
406    /// let config = PipelineConfig::builder()
407    ///     .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
408    ///     .database("mydb")
409    ///     .watch_database()  // Watch all collections
410    ///     .build();
411    /// ```
412    #[must_use]
413    pub fn watch_database(mut self) -> Self {
414        self.watch_level = Some(WatchLevel::Database);
415        self
416    }
417
418    /// Watch all databases in the deployment (cluster-wide).
419    ///
420    /// This creates a single change stream that monitors all databases
421    /// and collections in the MongoDB deployment. Use with caution as
422    /// this can generate very high event volume.
423    ///
424    /// # Requirements
425    ///
426    /// - MongoDB 4.0+
427    /// - Appropriate cluster-wide read permissions
428    ///
429    /// # Use Cases
430    ///
431    /// - Audit logging across entire cluster
432    /// - Multi-tenant setups with database-per-tenant
433    /// - Compliance and monitoring
434    ///
435    /// # Example
436    ///
437    /// ```rust
438    /// use rigatoni_core::pipeline::PipelineConfig;
439    ///
440    /// let config = PipelineConfig::builder()
441    ///     .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
442    ///     .database("mydb")  // Still needed for state storage keys
443    ///     .watch_deployment()  // Watch all databases
444    ///     .build();
445    /// ```
446    #[must_use]
447    pub fn watch_deployment(mut self) -> Self {
448        self.watch_level = Some(WatchLevel::Deployment);
449        self
450    }
451
452    /// Sets the batch size.
453    #[must_use]
454    pub fn batch_size(mut self, size: usize) -> Self {
455        self.batch_size = size;
456        self
457    }
458
459    /// Sets the batch timeout.
460    #[must_use]
461    pub fn batch_timeout(mut self, timeout: Duration) -> Self {
462        self.batch_timeout = timeout;
463        self
464    }
465
466    /// Sets the maximum number of retries.
467    #[must_use]
468    pub fn max_retries(mut self, retries: usize) -> Self {
469        self.max_retries = retries;
470        self
471    }
472
473    /// Sets the initial retry delay.
474    #[must_use]
475    pub fn retry_delay(mut self, delay: Duration) -> Self {
476        self.retry_delay = delay;
477        self
478    }
479
480    /// Sets the maximum retry delay.
481    #[must_use]
482    pub fn max_retry_delay(mut self, delay: Duration) -> Self {
483        self.max_retry_delay = delay;
484        self
485    }
486
487    /// Sets the channel buffer size.
488    #[must_use]
489    pub fn channel_buffer_size(mut self, size: usize) -> Self {
490        self.channel_buffer_size = size;
491        self
492    }
493
494    /// Sets the change stream configuration.
495    #[must_use]
496    pub fn stream_config(mut self, config: ChangeStreamConfig) -> Self {
497        self.stream_config = Some(config);
498        self
499    }
500
501    /// Sets the distributed locking configuration.
502    ///
503    /// # Example
504    ///
505    /// ```rust
506    /// use rigatoni_core::pipeline::{PipelineConfig, DistributedLockConfig};
507    /// use std::time::Duration;
508    ///
509    /// let config = PipelineConfig::builder()
510    ///     .mongodb_uri("mongodb://localhost:27017")
511    ///     .database("mydb")
512    ///     .distributed_lock(DistributedLockConfig {
513    ///         enabled: true,
514    ///         ttl: Duration::from_secs(30),
515    ///         refresh_interval: Duration::from_secs(10),
516    ///         retry_interval: Duration::from_secs(5),
517    ///     })
518    ///     .build()
519    ///     .unwrap();
520    /// ```
521    #[must_use]
522    pub fn distributed_lock(mut self, config: DistributedLockConfig) -> Self {
523        self.distributed_lock = Some(config);
524        self
525    }
526
527    /// Disables distributed locking.
528    ///
529    /// Use this for single-instance deployments where locking is not needed.
530    ///
531    /// # Example
532    ///
533    /// ```rust
534    /// use rigatoni_core::pipeline::PipelineConfig;
535    ///
536    /// let config = PipelineConfig::builder()
537    ///     .mongodb_uri("mongodb://localhost:27017")
538    ///     .database("mydb")
539    ///     .disable_distributed_lock()
540    ///     .build()
541    ///     .unwrap();
542    /// ```
543    #[must_use]
544    pub fn disable_distributed_lock(mut self) -> Self {
545        self.distributed_lock = Some(DistributedLockConfig::disabled());
546        self
547    }
548
549    /// Builds the `PipelineConfig`.
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if required fields are missing or invalid.
554    pub fn build(self) -> Result<PipelineConfig, ConfigError> {
555        let mongodb_uri = self.mongodb_uri.ok_or(ConfigError::MissingMongoUri)?;
556        let database = self.database.ok_or(ConfigError::MissingDatabase)?;
557
558        // Use default watch level (Database) if not specified
559        let watch_level = self.watch_level.unwrap_or_default();
560
561        // Validate batch size
562        let batch_size = match self.batch_size {
563            0 => 100, // Default
564            size if size > 10_000 => {
565                return Err(ConfigError::InvalidBatchSize {
566                    value: size,
567                    reason: "batch_size exceeds maximum (10,000)",
568                })
569            }
570            size => size,
571        };
572
573        // Validate batch timeout
574        let batch_timeout = if self.batch_timeout.is_zero() {
575            Duration::from_secs(5) // Default
576        } else {
577            self.batch_timeout
578        };
579
580        // Set retry delays with defaults
581        let retry_delay = if self.retry_delay.is_zero() {
582            Duration::from_millis(100)
583        } else {
584            self.retry_delay
585        };
586
587        let max_retry_delay = if self.max_retry_delay.is_zero() {
588            Duration::from_secs(30)
589        } else {
590            self.max_retry_delay
591        };
592
593        // Cross-field validation: retry_delay must not exceed max_retry_delay
594        if retry_delay > max_retry_delay {
595            return Err(ConfigError::RetryDelayExceedsMax {
596                retry_delay,
597                max_retry_delay,
598            });
599        }
600
601        // Validate channel buffer size
602        let channel_buffer_size = match self.channel_buffer_size {
603            0 => 1000, // Default
604            size if size < 10 => {
605                return Err(ConfigError::InvalidChannelBufferSize {
606                    value: size,
607                    reason: "channel_buffer_size must be at least 10",
608                })
609            }
610            size => size,
611        };
612
613        let stream_config = self.stream_config.unwrap_or_else(|| {
614            ChangeStreamConfig::builder()
615                .build()
616                .expect("Default stream config should build")
617        });
618
619        let distributed_lock = self.distributed_lock.unwrap_or_default();
620
621        Ok(PipelineConfig {
622            mongodb_uri,
623            database,
624            watch_level,
625            batch_size,
626            batch_timeout,
627            max_retries: self.max_retries,
628            retry_delay,
629            max_retry_delay,
630            channel_buffer_size,
631            stream_config,
632            distributed_lock,
633        })
634    }
635}
636
637/// Pipeline statistics.
638#[derive(Debug, Clone, Default)]
639pub struct PipelineStats {
640    /// Total events processed
641    pub events_processed: u64,
642
643    /// Total batches written
644    pub batches_written: u64,
645
646    /// Total write errors
647    pub write_errors: u64,
648
649    /// Total retries
650    pub retries: u64,
651}
652
653/// Type alias for worker task handles.
654type WorkerHandle = JoinHandle<Result<(), PipelineError>>;
655
656/// Type alias for lock refresh task handles.
657type LockRefreshHandle = JoinHandle<()>;
658
659/// Pipeline orchestrator that connects MongoDB change streams to destinations.
660pub struct Pipeline<S: StateStore, D: Destination> {
661    /// Pipeline configuration
662    config: PipelineConfig,
663
664    /// State store for resume tokens
665    store: Arc<S>,
666
667    /// Destination for events
668    destination: Arc<Mutex<D>>,
669
670    /// Shutdown sender (taken when starting)
671    shutdown_tx: Option<broadcast::Sender<()>>,
672
673    /// Worker task handles
674    workers: Arc<RwLock<Vec<WorkerHandle>>>,
675
676    /// Lock refresh task handles
677    lock_refresh_handles: Arc<RwLock<Vec<LockRefreshHandle>>>,
678
679    /// Pipeline statistics
680    stats: Arc<RwLock<PipelineStats>>,
681
682    /// Running flag
683    running: Arc<RwLock<bool>>,
684
685    /// Unique identifier for this pipeline instance (used for distributed locking).
686    ///
687    /// Format: `{hostname}-{uuid}`
688    owner_id: String,
689
690    /// Collections for which we hold locks (only for Collection watch level)
691    locked_collections: Arc<RwLock<Vec<String>>>,
692}
693
694impl<S: StateStore + Send + Sync + 'static, D: Destination + Send + Sync + 'static> Pipeline<S, D> {
695    /// Creates a new pipeline instance.
696    ///
697    /// # Errors
698    ///
699    /// Returns an error if the destination cannot be initialized.
700    pub async fn new(
701        config: PipelineConfig,
702        store: S,
703        destination: D,
704    ) -> Result<Self, PipelineError> {
705        // Generate unique owner ID for this instance (used for distributed locking)
706        let hostname = hostname::get()
707            .map(|h| h.to_string_lossy().to_string())
708            .unwrap_or_else(|_| "unknown".to_string());
709        let owner_id = format!("{}-{}", hostname, uuid::Uuid::new_v4());
710
711        info!(
712            database = %config.database,
713            watch_level = %config.watch_level,
714            batch_size = config.batch_size,
715            batch_timeout = ?config.batch_timeout,
716            owner_id = %owner_id,
717            distributed_lock_enabled = config.distributed_lock.enabled,
718            "Creating pipeline"
719        );
720
721        Ok(Self {
722            config,
723            store: Arc::new(store),
724            destination: Arc::new(Mutex::new(destination)),
725            shutdown_tx: None,
726            workers: Arc::new(RwLock::new(Vec::new())),
727            lock_refresh_handles: Arc::new(RwLock::new(Vec::new())),
728            stats: Arc::new(RwLock::new(PipelineStats::default())),
729            running: Arc::new(RwLock::new(false)),
730            owner_id,
731            locked_collections: Arc::new(RwLock::new(Vec::new())),
732        })
733    }
734
735    /// Returns the unique owner ID for this pipeline instance.
736    #[must_use]
737    pub fn owner_id(&self) -> &str {
738        &self.owner_id
739    }
740
741    /// Generates a lock key for a given collection.
742    fn lock_key(&self, collection: &str) -> String {
743        format!("rigatoni:lock:{}:{}", self.config.database, collection)
744    }
745
746    /// Generates a lock key for database-level watching.
747    fn database_lock_key(&self) -> String {
748        format!("rigatoni:lock:{}:__database__", self.config.database)
749    }
750
751    /// Generates a lock key for deployment-level watching.
752    fn deployment_lock_key(&self) -> String {
753        "rigatoni:lock:__deployment__".to_string()
754    }
755
756    /// Starts the pipeline, spawning workers based on the configured watch level.
757    ///
758    /// Depending on the [`WatchLevel`], this method will:
759    /// - **Collection**: Spawn one worker per collection (parallel processing)
760    /// - **Database**: Spawn a single worker watching all collections in the database
761    /// - **Deployment**: Spawn a single worker watching all databases in the deployment
762    ///
763    /// When distributed locking is enabled, the pipeline will:
764    /// - Acquire locks before starting workers
765    /// - Start lock refresh background tasks
766    /// - Release locks on shutdown
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if:
771    /// - The pipeline is already running
772    /// - MongoDB connection fails
773    /// - Worker spawn fails
774    /// - Empty collection list is provided with [`WatchLevel::Collection`]
775    #[instrument(skip(self), fields(database = %self.config.database, owner_id = %self.owner_id))]
776    pub async fn start(&mut self) -> Result<(), PipelineError> {
777        // Check if already running
778        let mut running = self.running.write().await;
779        if *running {
780            return Err(PipelineError::AlreadyRunning);
781        }
782
783        info!(
784            watch_level = %self.config.watch_level,
785            distributed_lock_enabled = self.config.distributed_lock.enabled,
786            "Starting pipeline"
787        );
788
789        // Create shutdown channel (broadcast so all workers get the signal)
790        let (shutdown_tx, _) = broadcast::channel(1);
791        self.shutdown_tx = Some(shutdown_tx.clone());
792
793        let mut workers = self.workers.write().await;
794        let mut lock_refresh_handles = self.lock_refresh_handles.write().await;
795        let mut locked_collections = self.locked_collections.write().await;
796        let mut num_workers = 0;
797
798        match &self.config.watch_level {
799            WatchLevel::Collection(collections) => {
800                // Validate that at least one collection is specified
801                if collections.is_empty() {
802                    return Err(PipelineError::Configuration(
803                        "No collections specified. Either provide collection names with \
804                         watch_collections() or use watch_database() to watch all collections."
805                            .to_string(),
806                    ));
807                }
808
809                info!(
810                    collections = ?collections,
811                    "Starting collection-level watching"
812                );
813
814                // Spawn worker for each collection (with optional locking)
815                for collection in collections {
816                    let lock_key = self.lock_key(collection);
817
818                    // Try to acquire lock if distributed locking is enabled
819                    if self.config.distributed_lock.enabled {
820                        let acquired = self
821                            .store
822                            .try_acquire_lock(
823                                &lock_key,
824                                &self.owner_id,
825                                self.config.distributed_lock.ttl,
826                            )
827                            .await
828                            .map_err(|e| PipelineError::StateStore(e.to_string()))?;
829
830                        if !acquired {
831                            info!(
832                                collection = %collection,
833                                "Collection is locked by another instance, skipping"
834                            );
835                            // Record metric
836                            metrics::increment_lock_acquisition_failures(
837                                metrics::LockFailureReason::AlreadyHeld,
838                            );
839                            // Skip this collection - another instance owns it
840                            continue;
841                        }
842
843                        info!(collection = %collection, "Acquired lock for collection");
844                        metrics::increment_lock_acquisitions();
845                        locked_collections.push(collection.clone());
846
847                        // Start lock refresh background task
848                        let refresh_handle = self.start_lock_refresh_task(
849                            lock_key,
850                            collection.clone(),
851                            shutdown_tx.subscribe(),
852                        );
853                        lock_refresh_handles.push(refresh_handle);
854                    }
855
856                    let shutdown_rx = shutdown_tx.subscribe();
857                    let worker = self
858                        .spawn_collection_worker(collection.clone(), shutdown_rx)
859                        .await?;
860
861                    workers.push(worker);
862                    num_workers += 1;
863                }
864
865                // Report only collections we're actually processing
866                metrics::set_active_collections(num_workers);
867            }
868            WatchLevel::Database => {
869                info!(
870                    database = %self.config.database,
871                    "Starting database-level watching"
872                );
873
874                let lock_key = self.database_lock_key();
875
876                // Try to acquire lock if distributed locking is enabled
877                if self.config.distributed_lock.enabled {
878                    let acquired = self
879                        .store
880                        .try_acquire_lock(
881                            &lock_key,
882                            &self.owner_id,
883                            self.config.distributed_lock.ttl,
884                        )
885                        .await
886                        .map_err(|e| PipelineError::StateStore(e.to_string()))?;
887
888                    if !acquired {
889                        info!("Database is locked by another instance, cannot start");
890                        metrics::increment_lock_acquisition_failures(
891                            metrics::LockFailureReason::AlreadyHeld,
892                        );
893                        return Err(PipelineError::Configuration(
894                            "Database is locked by another instance. \
895                             Wait for the lock to expire or use collection-level watching."
896                                .to_string(),
897                        ));
898                    }
899
900                    info!("Acquired lock for database");
901                    metrics::increment_lock_acquisitions();
902                    locked_collections.push("__database__".to_string());
903
904                    // Start lock refresh background task
905                    let refresh_handle = self.start_lock_refresh_task(
906                        lock_key,
907                        "__database__".to_string(),
908                        shutdown_tx.subscribe(),
909                    );
910                    lock_refresh_handles.push(refresh_handle);
911                }
912
913                let shutdown_rx = shutdown_tx.subscribe();
914                let worker = self.spawn_database_worker(shutdown_rx).await?;
915                workers.push(worker);
916                num_workers = 1;
917
918                // For database-level, we report 1 "collection" (the database itself)
919                metrics::set_active_collections(1);
920            }
921            WatchLevel::Deployment => {
922                info!("Starting deployment-level watching (cluster-wide)");
923
924                let lock_key = self.deployment_lock_key();
925
926                // Try to acquire lock if distributed locking is enabled
927                if self.config.distributed_lock.enabled {
928                    let acquired = self
929                        .store
930                        .try_acquire_lock(
931                            &lock_key,
932                            &self.owner_id,
933                            self.config.distributed_lock.ttl,
934                        )
935                        .await
936                        .map_err(|e| PipelineError::StateStore(e.to_string()))?;
937
938                    if !acquired {
939                        info!("Deployment is locked by another instance, cannot start");
940                        metrics::increment_lock_acquisition_failures(
941                            metrics::LockFailureReason::AlreadyHeld,
942                        );
943                        return Err(PipelineError::Configuration(
944                            "Deployment is locked by another instance. \
945                             Wait for the lock to expire or use collection/database-level watching."
946                                .to_string(),
947                        ));
948                    }
949
950                    info!("Acquired lock for deployment");
951                    metrics::increment_lock_acquisitions();
952                    locked_collections.push("__deployment__".to_string());
953
954                    // Start lock refresh background task
955                    let refresh_handle = self.start_lock_refresh_task(
956                        lock_key,
957                        "__deployment__".to_string(),
958                        shutdown_tx.subscribe(),
959                    );
960                    lock_refresh_handles.push(refresh_handle);
961                }
962
963                let shutdown_rx = shutdown_tx.subscribe();
964                let worker = self.spawn_deployment_worker(shutdown_rx).await?;
965                workers.push(worker);
966                num_workers = 1;
967
968                // For deployment-level, we report 1 "collection" (the deployment itself)
969                metrics::set_active_collections(1);
970            }
971        }
972
973        *running = true;
974        info!(
975            workers = num_workers,
976            locks_held = locked_collections.len(),
977            "Pipeline started"
978        );
979
980        // Update metrics
981        metrics::set_pipeline_status(metrics::PipelineStatus::Running);
982        metrics::set_locks_held(locked_collections.len());
983
984        Ok(())
985    }
986
987    /// Starts a background task to refresh a lock periodically.
988    fn start_lock_refresh_task(
989        &self,
990        lock_key: String,
991        collection_name: String,
992        mut shutdown_rx: broadcast::Receiver<()>,
993    ) -> LockRefreshHandle {
994        let store = Arc::clone(&self.store);
995        let owner_id = self.owner_id.clone();
996        let refresh_interval = self.config.distributed_lock.refresh_interval;
997        let ttl = self.config.distributed_lock.ttl;
998
999        tokio::spawn(async move {
1000            let mut interval_timer = interval(refresh_interval);
1001            interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1002
1003            loop {
1004                tokio::select! {
1005                    _ = shutdown_rx.recv() => {
1006                        debug!(
1007                            collection = %collection_name,
1008                            "Lock refresh task received shutdown signal"
1009                        );
1010                        break;
1011                    }
1012                    _ = interval_timer.tick() => {
1013                        match store.refresh_lock(&lock_key, &owner_id, ttl).await {
1014                            Ok(true) => {
1015                                debug!(
1016                                    collection = %collection_name,
1017                                    "Lock refreshed successfully"
1018                                );
1019                                metrics::increment_lock_refreshes();
1020                            }
1021                            Ok(false) => {
1022                                error!(
1023                                    collection = %collection_name,
1024                                    "Lost lock (acquired by another instance or expired)"
1025                                );
1026                                metrics::increment_locks_lost();
1027                                // TODO: Signal worker to stop gracefully
1028                                break;
1029                            }
1030                            Err(e) => {
1031                                warn!(
1032                                    collection = %collection_name,
1033                                    error = %e,
1034                                    "Failed to refresh lock (transient error, will retry)"
1035                                );
1036                                // Continue trying - transient errors shouldn't kill lock
1037                            }
1038                        }
1039                    }
1040                }
1041            }
1042        })
1043    }
1044
1045    /// Spawns a worker task for a specific collection.
1046    async fn spawn_collection_worker(
1047        &self,
1048        collection: String,
1049        shutdown_rx: broadcast::Receiver<()>,
1050    ) -> Result<WorkerHandle, PipelineError> {
1051        let config = self.config.clone();
1052        let store = Arc::clone(&self.store);
1053        let destination = Arc::clone(&self.destination);
1054        let stats = Arc::clone(&self.stats);
1055
1056        let handle = tokio::spawn(async move {
1057            Self::collection_worker(collection, config, store, destination, stats, shutdown_rx)
1058                .await
1059        });
1060
1061        Ok(handle)
1062    }
1063
1064    /// Spawns a worker task for database-level watching.
1065    async fn spawn_database_worker(
1066        &self,
1067        shutdown_rx: broadcast::Receiver<()>,
1068    ) -> Result<WorkerHandle, PipelineError> {
1069        let config = self.config.clone();
1070        let store = Arc::clone(&self.store);
1071        let destination = Arc::clone(&self.destination);
1072        let stats = Arc::clone(&self.stats);
1073
1074        let handle = tokio::spawn(async move {
1075            Self::database_worker(config, store, destination, stats, shutdown_rx).await
1076        });
1077
1078        Ok(handle)
1079    }
1080
1081    /// Spawns a worker task for deployment-level (cluster-wide) watching.
1082    async fn spawn_deployment_worker(
1083        &self,
1084        shutdown_rx: broadcast::Receiver<()>,
1085    ) -> Result<WorkerHandle, PipelineError> {
1086        let config = self.config.clone();
1087        let store = Arc::clone(&self.store);
1088        let destination = Arc::clone(&self.destination);
1089        let stats = Arc::clone(&self.stats);
1090
1091        let handle = tokio::spawn(async move {
1092            Self::deployment_worker(config, store, destination, stats, shutdown_rx).await
1093        });
1094
1095        Ok(handle)
1096    }
1097
1098    /// Worker task that processes events for a specific collection.
1099    #[allow(clippy::too_many_lines)]
1100    #[instrument(skip(config, store, destination, stats, shutdown_rx), fields(collection = %collection))]
1101    async fn collection_worker(
1102        collection: String,
1103        config: PipelineConfig,
1104        store: Arc<S>,
1105        destination: Arc<Mutex<D>>,
1106        stats: Arc<RwLock<PipelineStats>>,
1107        mut shutdown_rx: broadcast::Receiver<()>,
1108    ) -> Result<(), PipelineError> {
1109        info!("Starting collection worker");
1110
1111        // Resume token key for this collection
1112        let resume_token_key = config
1113            .watch_level
1114            .resume_token_key(&config.database, Some(&collection));
1115
1116        // Get resume token from state store
1117        let resume_token = store
1118            .get_resume_token(&resume_token_key)
1119            .await
1120            .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1121
1122        if let Some(ref token) = resume_token {
1123            info!(?token, "Resuming from saved token");
1124        }
1125
1126        // Connect to MongoDB
1127        let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
1128            .await
1129            .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
1130
1131        let db = client.database(&config.database);
1132        let mongo_collection = db.collection(&collection);
1133
1134        // Create resume token callback that saves to state store
1135        let store_clone = Arc::clone(&store);
1136        let resume_key = resume_token_key.clone();
1137        let resume_token_callback = move |token: Document| {
1138            let store = Arc::clone(&store_clone);
1139            let key = resume_key.clone();
1140            Box::pin(async move {
1141                store
1142                    .save_resume_token(&key, &token)
1143                    .await
1144                    .map_err(|e| e.to_string())
1145            }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
1146        };
1147
1148        // Create change stream listener
1149        let mut listener = ChangeStreamListener::new(
1150            mongo_collection,
1151            config.stream_config.clone(),
1152            resume_token_callback,
1153        )
1154        .await
1155        .map_err(|e| PipelineError::ChangeStream(e.to_string()))?;
1156
1157        // Event batch accumulator
1158        let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
1159        let mut last_resume_token: Option<Document> = None;
1160
1161        // Batch timeout interval
1162        let mut batch_timer = interval(config.batch_timeout);
1163        batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1164
1165        info!(
1166            batch_size = config.batch_size,
1167            batch_timeout = ?config.batch_timeout,
1168            "Worker event loop started"
1169        );
1170
1171        loop {
1172            tokio::select! {
1173                // Check for shutdown signal
1174                _ = shutdown_rx.recv() => {
1175                    info!("Received shutdown signal");
1176
1177                    // Flush pending batch
1178                    if !batch.is_empty() {
1179                        info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
1180                        if let Err(e) = Self::flush_batch(
1181                            &collection,
1182                            &mut batch,
1183                            last_resume_token.as_ref(),
1184                            &destination,
1185                            &store,
1186                            &stats,
1187                            &config,
1188                        )
1189                        .await
1190                        {
1191                            error!(?e, "Failed to flush batch on shutdown");
1192                        }
1193                    }
1194
1195                    info!("Worker shutting down gracefully");
1196                    break;
1197                }
1198
1199                // Batch timeout - flush accumulated events
1200                _ = batch_timer.tick() => {
1201                    if !batch.is_empty() {
1202                        debug!(batch_size = batch.len(), "Batch timeout - flushing");
1203
1204                        if let Err(e) = Self::flush_batch(
1205                            &collection,
1206                            &mut batch,
1207                            last_resume_token.as_ref(),
1208                            &destination,
1209                            &store,
1210                            &stats,
1211                            &config,
1212                        )
1213                        .await
1214                        {
1215                            error!(?e, "Failed to flush batch on timeout");
1216                            // Continue processing - don't break the loop
1217                        }
1218                    }
1219                }
1220
1221                // Read next event from change stream
1222                event_result = listener.next() => {
1223                    match event_result {
1224                        Some(Ok(ackable_event)) => {
1225                            // Extract the change event
1226                            let event = ackable_event.event.clone();
1227
1228                            debug!(
1229                                operation = ?event.operation,
1230                                collection = %event.namespace.collection,
1231                                "Received event"
1232                            );
1233
1234                            // Store resume token
1235                            last_resume_token = Some(event.resume_token.clone());
1236
1237                            // Acknowledge the event (sends resume token to callback)
1238                            ackable_event.ack();
1239
1240                            // Add to batch
1241                            batch.push(event.clone());
1242
1243                            // Update metrics
1244                            metrics::increment_batch_queue_size(&collection);
1245
1246                            // Check if batch is full
1247                            if batch.len() >= config.batch_size {
1248                                debug!(batch_size = batch.len(), "Batch full - flushing");
1249
1250                                if let Err(e) = Self::flush_batch(
1251                                    &collection,
1252                                    &mut batch,
1253                                    last_resume_token.as_ref(),
1254                                    &destination,
1255                                    &store,
1256                                    &stats,
1257                                    &config,
1258                                )
1259                                .await
1260                                {
1261                                    error!(?e, "Failed to flush full batch");
1262                                    // Continue processing
1263                                }
1264                            }
1265                        }
1266                        Some(Err(e)) => {
1267                            error!(?e, "Error reading from change stream");
1268                            // Try to reconnect after a delay
1269                            tokio::time::sleep(Duration::from_secs(1)).await;
1270                        }
1271                        None => {
1272                            // Stream ended - shouldn't happen with MongoDB change streams
1273                            warn!("Change stream ended unexpectedly");
1274                            break;
1275                        }
1276                    }
1277                }
1278            }
1279        }
1280
1281        Ok(())
1282    }
1283
1284    /// Worker task that processes events for an entire database.
1285    ///
1286    /// Uses MongoDB's `db.watch()` API to monitor all collections in the database.
1287    /// New collections are automatically included as they are created.
1288    #[allow(clippy::too_many_lines)]
1289    #[instrument(skip(config, store, destination, stats, shutdown_rx), fields(database = %config.database))]
1290    async fn database_worker(
1291        config: PipelineConfig,
1292        store: Arc<S>,
1293        destination: Arc<Mutex<D>>,
1294        stats: Arc<RwLock<PipelineStats>>,
1295        mut shutdown_rx: broadcast::Receiver<()>,
1296    ) -> Result<(), PipelineError> {
1297        info!("Starting database worker");
1298
1299        // Resume token key for database-level watching
1300        let resume_token_key = config.watch_level.resume_token_key(&config.database, None);
1301
1302        // Get resume token from state store
1303        let resume_token = store
1304            .get_resume_token(&resume_token_key)
1305            .await
1306            .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1307
1308        if let Some(ref token) = resume_token {
1309            info!(?token, "Resuming from saved token");
1310        }
1311
1312        // Connect to MongoDB
1313        let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
1314            .await
1315            .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
1316
1317        let db = client.database(&config.database);
1318
1319        // Build change stream options
1320        let mut options = mongodb::options::ChangeStreamOptions::default();
1321
1322        if config.stream_config.full_document_on_update {
1323            options.full_document = Some(mongodb::options::FullDocumentType::UpdateLookup);
1324        }
1325
1326        if config.stream_config.full_document_before_change {
1327            options.full_document_before_change =
1328                Some(mongodb::options::FullDocumentBeforeChangeType::WhenAvailable);
1329        }
1330
1331        options.batch_size = config.stream_config.batch_size;
1332
1333        // Set resume token if we have one
1334        if let Some(ref token_doc) = resume_token {
1335            if let Ok(bytes) = bson::to_vec(token_doc) {
1336                if let Ok(resume_token) =
1337                    bson::from_slice::<mongodb::change_stream::event::ResumeToken>(&bytes)
1338                {
1339                    options.resume_after = Some(resume_token);
1340                }
1341            }
1342        }
1343
1344        // Create database-level change stream
1345        let mut stream = if config.stream_config.pipeline.is_empty() {
1346            db.watch().with_options(options).await
1347        } else {
1348            db.watch()
1349                .pipeline(config.stream_config.pipeline.clone())
1350                .with_options(options)
1351                .await
1352        }
1353        .map_err(|e| PipelineError::MongoDB(format!("Failed to create database watch: {}", e)))?;
1354
1355        info!("Database change stream created successfully");
1356
1357        // Event batch accumulator
1358        let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
1359        let mut last_resume_token: Option<Document> = None;
1360
1361        // Batch timeout interval
1362        let mut batch_timer = interval(config.batch_timeout);
1363        batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1364
1365        // Label for metrics (database level uses a special name)
1366        let metrics_label = format!("__db:{}", config.database);
1367
1368        info!(
1369            batch_size = config.batch_size,
1370            batch_timeout = ?config.batch_timeout,
1371            "Database worker event loop started"
1372        );
1373
1374        loop {
1375            tokio::select! {
1376                // Check for shutdown signal
1377                _ = shutdown_rx.recv() => {
1378                    info!("Received shutdown signal");
1379
1380                    // Flush pending batch
1381                    if !batch.is_empty() {
1382                        info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
1383                        if let Err(e) = Self::flush_batch(
1384                            &metrics_label,
1385                            &mut batch,
1386                            last_resume_token.as_ref(),
1387                            &destination,
1388                            &store,
1389                            &stats,
1390                            &config,
1391                        )
1392                        .await
1393                        {
1394                            error!(?e, "Failed to flush batch on shutdown");
1395                        }
1396                    }
1397
1398                    info!("Database worker shutting down gracefully");
1399                    break;
1400                }
1401
1402                // Batch timeout - flush accumulated events
1403                _ = batch_timer.tick() => {
1404                    if !batch.is_empty() {
1405                        debug!(batch_size = batch.len(), "Batch timeout - flushing");
1406
1407                        if let Err(e) = Self::flush_batch(
1408                            &metrics_label,
1409                            &mut batch,
1410                            last_resume_token.as_ref(),
1411                            &destination,
1412                            &store,
1413                            &stats,
1414                            &config,
1415                        )
1416                        .await
1417                        {
1418                            error!(?e, "Failed to flush batch on timeout");
1419                        }
1420                    }
1421                }
1422
1423                // Read next event from database change stream
1424                event_result = stream.next() => {
1425                    match event_result {
1426                        Some(Ok(change_event)) => {
1427                            // Extract resume token
1428                            let resume_token = match bson::to_document(&change_event.id) {
1429                                Ok(token) => token,
1430                                Err(e) => {
1431                                    error!(?e, "Failed to serialize resume token");
1432                                    continue;
1433                                }
1434                            };
1435
1436                            // Convert MongoDB event to our ChangeEvent
1437                            let event = match ChangeEvent::try_from(change_event) {
1438                                Ok(evt) => evt,
1439                                Err(e) => {
1440                                    error!(?e, "Failed to convert change event");
1441                                    continue;
1442                                }
1443                            };
1444
1445                            debug!(
1446                                operation = ?event.operation,
1447                                database = %event.namespace.database,
1448                                collection = %event.namespace.collection,
1449                                "Received database event"
1450                            );
1451
1452                            // Store resume token
1453                            last_resume_token = Some(resume_token.clone());
1454
1455                            // Save resume token to state store
1456                            if let Err(e) = store.save_resume_token(&resume_token_key, &resume_token).await {
1457                                warn!(?e, "Failed to save resume token");
1458                            }
1459
1460                            // Add to batch
1461                            batch.push(event);
1462
1463                            // Update metrics
1464                            metrics::increment_batch_queue_size(&metrics_label);
1465
1466                            // Check if batch is full
1467                            if batch.len() >= config.batch_size {
1468                                debug!(batch_size = batch.len(), "Batch full - flushing");
1469
1470                                if let Err(e) = Self::flush_batch(
1471                                    &metrics_label,
1472                                    &mut batch,
1473                                    last_resume_token.as_ref(),
1474                                    &destination,
1475                                    &store,
1476                                    &stats,
1477                                    &config,
1478                                )
1479                                .await
1480                                {
1481                                    error!(?e, "Failed to flush full batch");
1482                                }
1483                            }
1484                        }
1485                        Some(Err(e)) => {
1486                            error!(?e, "Error reading from database change stream");
1487                            // Try to reconnect after a delay
1488                            tokio::time::sleep(Duration::from_secs(1)).await;
1489                        }
1490                        None => {
1491                            warn!("Database change stream ended unexpectedly");
1492                            break;
1493                        }
1494                    }
1495                }
1496            }
1497        }
1498
1499        Ok(())
1500    }
1501
1502    /// Worker task that processes events for the entire deployment (cluster-wide).
1503    ///
1504    /// Uses MongoDB's `client.watch()` API to monitor all databases in the deployment.
1505    /// Requires MongoDB 4.0+ and appropriate cluster-wide permissions.
1506    #[allow(clippy::too_many_lines)]
1507    #[instrument(skip(config, store, destination, stats, shutdown_rx))]
1508    async fn deployment_worker(
1509        config: PipelineConfig,
1510        store: Arc<S>,
1511        destination: Arc<Mutex<D>>,
1512        stats: Arc<RwLock<PipelineStats>>,
1513        mut shutdown_rx: broadcast::Receiver<()>,
1514    ) -> Result<(), PipelineError> {
1515        info!("Starting deployment worker (cluster-wide)");
1516
1517        // Resume token key for deployment-level watching
1518        let resume_token_key = config.watch_level.resume_token_key(&config.database, None);
1519
1520        // Get resume token from state store
1521        let resume_token = store
1522            .get_resume_token(&resume_token_key)
1523            .await
1524            .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1525
1526        if let Some(ref token) = resume_token {
1527            info!(?token, "Resuming from saved token");
1528        }
1529
1530        // Connect to MongoDB
1531        let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
1532            .await
1533            .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
1534
1535        // Build change stream options
1536        let mut options = mongodb::options::ChangeStreamOptions::default();
1537
1538        if config.stream_config.full_document_on_update {
1539            options.full_document = Some(mongodb::options::FullDocumentType::UpdateLookup);
1540        }
1541
1542        if config.stream_config.full_document_before_change {
1543            options.full_document_before_change =
1544                Some(mongodb::options::FullDocumentBeforeChangeType::WhenAvailable);
1545        }
1546
1547        options.batch_size = config.stream_config.batch_size;
1548
1549        // Set resume token if we have one
1550        if let Some(ref token_doc) = resume_token {
1551            if let Ok(bytes) = bson::to_vec(token_doc) {
1552                if let Ok(resume_token) =
1553                    bson::from_slice::<mongodb::change_stream::event::ResumeToken>(&bytes)
1554                {
1555                    options.resume_after = Some(resume_token);
1556                }
1557            }
1558        }
1559
1560        // Create deployment-level (cluster-wide) change stream
1561        let mut stream = if config.stream_config.pipeline.is_empty() {
1562            client.watch().with_options(options).await
1563        } else {
1564            client
1565                .watch()
1566                .pipeline(config.stream_config.pipeline.clone())
1567                .with_options(options)
1568                .await
1569        }
1570        .map_err(|e| PipelineError::MongoDB(format!("Failed to create deployment watch: {}", e)))?;
1571
1572        info!("Deployment change stream created successfully");
1573
1574        // Event batch accumulator
1575        let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
1576        let mut last_resume_token: Option<Document> = None;
1577
1578        // Batch timeout interval
1579        let mut batch_timer = interval(config.batch_timeout);
1580        batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1581
1582        // Label for metrics (deployment level uses a special name)
1583        let metrics_label = "__deployment__".to_string();
1584
1585        info!(
1586            batch_size = config.batch_size,
1587            batch_timeout = ?config.batch_timeout,
1588            "Deployment worker event loop started"
1589        );
1590
1591        loop {
1592            tokio::select! {
1593                // Check for shutdown signal
1594                _ = shutdown_rx.recv() => {
1595                    info!("Received shutdown signal");
1596
1597                    // Flush pending batch
1598                    if !batch.is_empty() {
1599                        info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
1600                        if let Err(e) = Self::flush_batch(
1601                            &metrics_label,
1602                            &mut batch,
1603                            last_resume_token.as_ref(),
1604                            &destination,
1605                            &store,
1606                            &stats,
1607                            &config,
1608                        )
1609                        .await
1610                        {
1611                            error!(?e, "Failed to flush batch on shutdown");
1612                        }
1613                    }
1614
1615                    info!("Deployment worker shutting down gracefully");
1616                    break;
1617                }
1618
1619                // Batch timeout - flush accumulated events
1620                _ = batch_timer.tick() => {
1621                    if !batch.is_empty() {
1622                        debug!(batch_size = batch.len(), "Batch timeout - flushing");
1623
1624                        if let Err(e) = Self::flush_batch(
1625                            &metrics_label,
1626                            &mut batch,
1627                            last_resume_token.as_ref(),
1628                            &destination,
1629                            &store,
1630                            &stats,
1631                            &config,
1632                        )
1633                        .await
1634                        {
1635                            error!(?e, "Failed to flush batch on timeout");
1636                        }
1637                    }
1638                }
1639
1640                // Read next event from deployment change stream
1641                event_result = stream.next() => {
1642                    match event_result {
1643                        Some(Ok(change_event)) => {
1644                            // Extract resume token
1645                            let resume_token = match bson::to_document(&change_event.id) {
1646                                Ok(token) => token,
1647                                Err(e) => {
1648                                    error!(?e, "Failed to serialize resume token");
1649                                    continue;
1650                                }
1651                            };
1652
1653                            // Convert MongoDB event to our ChangeEvent
1654                            let event = match ChangeEvent::try_from(change_event) {
1655                                Ok(evt) => evt,
1656                                Err(e) => {
1657                                    error!(?e, "Failed to convert change event");
1658                                    continue;
1659                                }
1660                            };
1661
1662                            debug!(
1663                                operation = ?event.operation,
1664                                database = %event.namespace.database,
1665                                collection = %event.namespace.collection,
1666                                "Received deployment event"
1667                            );
1668
1669                            // Store resume token
1670                            last_resume_token = Some(resume_token.clone());
1671
1672                            // Save resume token to state store
1673                            if let Err(e) = store.save_resume_token(&resume_token_key, &resume_token).await {
1674                                warn!(?e, "Failed to save resume token");
1675                            }
1676
1677                            // Add to batch
1678                            batch.push(event);
1679
1680                            // Update metrics
1681                            metrics::increment_batch_queue_size(&metrics_label);
1682
1683                            // Check if batch is full
1684                            if batch.len() >= config.batch_size {
1685                                debug!(batch_size = batch.len(), "Batch full - flushing");
1686
1687                                if let Err(e) = Self::flush_batch(
1688                                    &metrics_label,
1689                                    &mut batch,
1690                                    last_resume_token.as_ref(),
1691                                    &destination,
1692                                    &store,
1693                                    &stats,
1694                                    &config,
1695                                )
1696                                .await
1697                                {
1698                                    error!(?e, "Failed to flush full batch");
1699                                }
1700                            }
1701                        }
1702                        Some(Err(e)) => {
1703                            error!(?e, "Error reading from deployment change stream");
1704                            // Try to reconnect after a delay
1705                            tokio::time::sleep(Duration::from_secs(1)).await;
1706                        }
1707                        None => {
1708                            warn!("Deployment change stream ended unexpectedly");
1709                            break;
1710                        }
1711                    }
1712                }
1713            }
1714        }
1715
1716        Ok(())
1717    }
1718
1719    /// Flushes a batch of events to the destination with retry logic.
1720    #[instrument(skip(batch, last_resume_token, destination, store, stats, config), fields(collection = %collection, batch_size = batch.len()))]
1721    async fn flush_batch(
1722        collection: &str,
1723        batch: &mut Vec<ChangeEvent>,
1724        last_resume_token: Option<&Document>,
1725        destination: &Arc<Mutex<D>>,
1726        store: &Arc<S>,
1727        stats: &Arc<RwLock<PipelineStats>>,
1728        config: &PipelineConfig,
1729    ) -> Result<(), PipelineError> {
1730        if batch.is_empty() {
1731            return Ok(());
1732        }
1733
1734        let batch_size = batch.len();
1735        let start_time = Instant::now();
1736
1737        debug!("Flushing batch to destination");
1738
1739        // Record batch size metric
1740        metrics::record_batch_size(batch_size, collection);
1741
1742        // Write to destination with retry
1743        Self::write_with_retry(batch, destination, config, stats).await?;
1744
1745        let elapsed = start_time.elapsed();
1746        info!(
1747            batch_size,
1748            elapsed_ms = elapsed.as_millis(),
1749            "Batch written successfully"
1750        );
1751
1752        // Record batch duration metric
1753        metrics::record_batch_duration(elapsed.as_secs_f64(), collection);
1754
1755        // Save resume token after successful write
1756        if let Some(token) = last_resume_token {
1757            store
1758                .save_resume_token(collection, token)
1759                .await
1760                .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1761
1762            debug!("Resume token saved");
1763        }
1764
1765        // Count processed events (bulk increment by operation type)
1766        let mut operation_counts = std::collections::HashMap::new();
1767        for event in batch.iter() {
1768            *operation_counts.entry(&event.operation).or_insert(0u64) += 1;
1769        }
1770        for (operation, count) in operation_counts {
1771            metrics::increment_events_processed_by(count, collection, operation.as_str());
1772        }
1773
1774        // Update statistics
1775        let mut s = stats.write().await;
1776        s.events_processed += batch_size as u64;
1777        s.batches_written += 1;
1778
1779        // Update queue size metric
1780        metrics::decrement_batch_queue_size(batch_size, collection);
1781
1782        // Clear batch
1783        batch.clear();
1784
1785        Ok(())
1786    }
1787
1788    /// Writes a batch to the destination with exponential backoff retry.
1789    #[instrument(skip(batch, destination, config, stats), fields(batch_size = batch.len()))]
1790    async fn write_with_retry(
1791        batch: &[ChangeEvent],
1792        destination: &Arc<Mutex<D>>,
1793        config: &PipelineConfig,
1794        stats: &Arc<RwLock<PipelineStats>>,
1795    ) -> Result<(), PipelineError> {
1796        let mut retry_delay = config.retry_delay;
1797        let mut attempt = 0;
1798
1799        loop {
1800            let result = {
1801                let mut dest = destination.lock().await;
1802                match dest.write_batch(batch).await {
1803                    Ok(()) => dest.flush().await,
1804                    Err(e) => Err(e),
1805                }
1806            };
1807
1808            match result {
1809                Ok(()) => {
1810                    if attempt > 0 {
1811                        info!(attempts = attempt + 1, "Write succeeded after retries");
1812                    }
1813                    return Ok(());
1814                }
1815                Err(e) => {
1816                    // Increment write error counter for each failed attempt
1817                    {
1818                        let mut s = stats.write().await;
1819                        s.write_errors += 1;
1820                    }
1821
1822                    // Record error type metric
1823                    let error_category = Self::categorize_error(&e);
1824                    let destination_type = {
1825                        let dest = destination.lock().await;
1826                        dest.metadata().destination_type.clone()
1827                    };
1828                    metrics::increment_destination_errors(&destination_type, error_category);
1829
1830                    attempt += 1;
1831
1832                    if attempt > config.max_retries {
1833                        error!(attempts = attempt, ?e, "Write failed after max retries");
1834                        return Err(PipelineError::Destination(e.to_string()));
1835                    }
1836
1837                    // Check if error is retryable
1838                    if !Self::is_retryable_error(&e) {
1839                        error!(?e, "Non-retryable error encountered");
1840                        return Err(PipelineError::Destination(e.to_string()));
1841                    }
1842
1843                    // Increment retry counter (only for actual retries, not initial attempt)
1844                    {
1845                        let mut s = stats.write().await;
1846                        s.retries += 1;
1847                    }
1848
1849                    // Record retry metric
1850                    metrics::increment_retries(error_category);
1851
1852                    warn!(
1853                        attempt,
1854                        max_retries = config.max_retries,
1855                        retry_delay_ms = retry_delay.as_millis(),
1856                        ?e,
1857                        "Write failed, retrying"
1858                    );
1859
1860                    // Wait before retry
1861                    tokio::time::sleep(retry_delay).await;
1862
1863                    // Exponential backoff with cap
1864                    retry_delay = std::cmp::min(retry_delay * 2, config.max_retry_delay);
1865                }
1866            }
1867        }
1868    }
1869
1870    /// Checks if a destination error is retryable.
1871    fn is_retryable_error(error: &DestinationError) -> bool {
1872        // Check if the error indicates it's retryable
1873        // This depends on the DestinationError implementation
1874        error.to_string().contains("retryable") || error.to_string().contains("timeout")
1875    }
1876
1877    /// Categorizes an error for metrics labeling.
1878    ///
1879    /// Maps errors to a small set of categories to avoid cardinality explosion.
1880    fn categorize_error(error: &DestinationError) -> metrics::ErrorCategory {
1881        let error_str = error.to_string().to_lowercase();
1882
1883        if error_str.contains("timeout") {
1884            metrics::ErrorCategory::Timeout
1885        } else if error_str.contains("connection") || error_str.contains("network") {
1886            metrics::ErrorCategory::Connection
1887        } else if error_str.contains("serialization") || error_str.contains("encode") {
1888            metrics::ErrorCategory::Serialization
1889        } else if error_str.contains("permission") || error_str.contains("auth") {
1890            metrics::ErrorCategory::Permission
1891        } else if error_str.contains("validation") {
1892            metrics::ErrorCategory::Validation
1893        } else if error_str.contains("not found") || error_str.contains("404") {
1894            metrics::ErrorCategory::NotFound
1895        } else if error_str.contains("rate limit") || error_str.contains("throttle") {
1896            metrics::ErrorCategory::RateLimit
1897        } else {
1898            metrics::ErrorCategory::Unknown
1899        }
1900    }
1901
1902    /// Stops the pipeline gracefully.
1903    ///
1904    /// This will:
1905    /// 1. Send shutdown signal to all workers and lock refresh tasks
1906    /// 2. Wait for workers to finish processing
1907    /// 3. Flush any pending batches
1908    /// 4. Release all held locks (for distributed locking)
1909    /// 5. Close destination connection
1910    ///
1911    /// # Errors
1912    ///
1913    /// Returns an error if shutdown fails or workers panic.
1914    #[instrument(skip(self), fields(owner_id = %self.owner_id))]
1915    pub async fn stop(&mut self) -> Result<(), PipelineError> {
1916        info!("Stopping pipeline");
1917
1918        let mut running = self.running.write().await;
1919        if !*running {
1920            warn!("Pipeline is not running");
1921            return Ok(());
1922        }
1923
1924        // Send shutdown signal (broadcast to all workers and lock refresh tasks)
1925        if let Some(tx) = self.shutdown_tx.take() {
1926            let _ = tx.send(());
1927        }
1928
1929        // Wait for all workers to finish
1930        let mut workers = self.workers.write().await;
1931        for worker in workers.drain(..) {
1932            match worker.await {
1933                Ok(Ok(())) => {
1934                    debug!("Worker stopped successfully");
1935                }
1936                Ok(Err(e)) => {
1937                    error!(?e, "Worker stopped with error");
1938                }
1939                Err(e) => {
1940                    error!(?e, "Worker panicked");
1941                }
1942            }
1943        }
1944
1945        // Wait for lock refresh tasks to finish
1946        let mut lock_refresh_handles = self.lock_refresh_handles.write().await;
1947        for handle in lock_refresh_handles.drain(..) {
1948            if let Err(e) = handle.await {
1949                warn!(?e, "Lock refresh task panicked");
1950            }
1951        }
1952
1953        // Release all held locks
1954        if self.config.distributed_lock.enabled {
1955            let locked_collections = self.locked_collections.read().await;
1956            for collection in locked_collections.iter() {
1957                let lock_key = match collection.as_str() {
1958                    "__database__" => self.database_lock_key(),
1959                    "__deployment__" => self.deployment_lock_key(),
1960                    _ => self.lock_key(collection),
1961                };
1962
1963                match self.store.release_lock(&lock_key, &self.owner_id).await {
1964                    Ok(true) => {
1965                        info!(collection = %collection, "Released lock");
1966                        metrics::increment_locks_released();
1967                    }
1968                    Ok(false) => {
1969                        warn!(
1970                            collection = %collection,
1971                            "Lock was not held by this instance (already released or stolen)"
1972                        );
1973                    }
1974                    Err(e) => {
1975                        warn!(
1976                            collection = %collection,
1977                            error = %e,
1978                            "Failed to release lock (will expire via TTL)"
1979                        );
1980                    }
1981                }
1982            }
1983        }
1984
1985        // Flush and close destination
1986        let mut dest = self.destination.lock().await;
1987        dest.flush()
1988            .await
1989            .map_err(|e| PipelineError::Destination(e.to_string()))?;
1990        dest.close()
1991            .await
1992            .map_err(|e| PipelineError::Destination(e.to_string()))?;
1993
1994        *running = false;
1995
1996        // Clear locked collections
1997        let mut locked_collections = self.locked_collections.write().await;
1998        locked_collections.clear();
1999
2000        // Update metrics
2001        metrics::set_pipeline_status(metrics::PipelineStatus::Stopped);
2002        metrics::set_active_collections(0);
2003        metrics::set_locks_held(0);
2004
2005        // Log final statistics
2006        let stats = self.stats.read().await;
2007        info!(
2008            events_processed = stats.events_processed,
2009            batches_written = stats.batches_written,
2010            write_errors = stats.write_errors,
2011            retries = stats.retries,
2012            "Pipeline stopped"
2013        );
2014
2015        Ok(())
2016    }
2017
2018    /// Returns the current pipeline statistics.
2019    #[must_use]
2020    pub async fn stats(&self) -> PipelineStats {
2021        self.stats.read().await.clone()
2022    }
2023
2024    /// Checks if the pipeline is currently running.
2025    #[must_use]
2026    pub async fn is_running(&self) -> bool {
2027        *self.running.read().await
2028    }
2029}
2030
2031/// Pipeline configuration errors.
2032#[derive(Debug, thiserror::Error)]
2033pub enum ConfigError {
2034    /// Missing required MongoDB URI
2035    #[error("mongodb_uri is required")]
2036    MissingMongoUri,
2037
2038    /// Missing required database name
2039    #[error("database is required")]
2040    MissingDatabase,
2041
2042    /// Invalid batch size
2043    #[error("Invalid batch_size: {value} ({reason})")]
2044    InvalidBatchSize { value: usize, reason: &'static str },
2045
2046    /// Invalid batch timeout
2047    #[error("Invalid batch_timeout: {reason}")]
2048    InvalidBatchTimeout { reason: &'static str },
2049
2050    /// Retry delay exceeds maximum
2051    #[error("retry_delay ({retry_delay:?}) exceeds max_retry_delay ({max_retry_delay:?})")]
2052    RetryDelayExceedsMax {
2053        retry_delay: Duration,
2054        max_retry_delay: Duration,
2055    },
2056
2057    /// Invalid channel buffer size
2058    #[error("Invalid channel_buffer_size: {value} ({reason})")]
2059    InvalidChannelBufferSize { value: usize, reason: &'static str },
2060
2061    /// Invalid distributed lock configuration
2062    #[error("Invalid distributed lock config: {reason}")]
2063    InvalidLockConfig { reason: String },
2064}
2065
2066/// Pipeline errors.
2067#[derive(Debug, thiserror::Error)]
2068pub enum PipelineError {
2069    /// Pipeline is already running
2070    #[error("Pipeline is already running")]
2071    AlreadyRunning,
2072
2073    /// MongoDB connection error
2074    #[error("MongoDB error: {0}")]
2075    MongoDB(String),
2076
2077    /// Change stream error
2078    #[error("Change stream error: {0}")]
2079    ChangeStream(String),
2080
2081    /// Destination error
2082    #[error("Destination error: {0}")]
2083    Destination(String),
2084
2085    /// State store error
2086    #[error("State store error: {0}")]
2087    StateStore(String),
2088
2089    /// Configuration error
2090    #[error("Configuration error: {0}")]
2091    Configuration(String),
2092
2093    /// Other errors
2094    #[error("Pipeline error: {0}")]
2095    Other(String),
2096
2097    /// Lock lost error
2098    #[error("Lock lost for collection '{collection}': {reason}")]
2099    LockLost { collection: String, reason: String },
2100}
2101
2102#[cfg(test)]
2103mod tests {
2104    use super::*;
2105
2106    #[test]
2107    fn test_watch_collections_builds_collection_level() {
2108        let config = PipelineConfig::builder()
2109            .mongodb_uri("mongodb://localhost:27017")
2110            .database("testdb")
2111            .watch_collections(vec!["users".to_string(), "orders".to_string()])
2112            .build()
2113            .unwrap();
2114
2115        assert!(matches!(config.watch_level, WatchLevel::Collection(_)));
2116        if let WatchLevel::Collection(collections) = config.watch_level {
2117            assert_eq!(collections.len(), 2);
2118            assert!(collections.contains(&"users".to_string()));
2119            assert!(collections.contains(&"orders".to_string()));
2120        }
2121    }
2122
2123    #[test]
2124    fn test_watch_database_builds_database_level() {
2125        let config = PipelineConfig::builder()
2126            .mongodb_uri("mongodb://localhost:27017")
2127            .database("testdb")
2128            .watch_database()
2129            .build()
2130            .unwrap();
2131
2132        assert!(matches!(config.watch_level, WatchLevel::Database));
2133    }
2134
2135    #[test]
2136    fn test_watch_deployment_builds_deployment_level() {
2137        let config = PipelineConfig::builder()
2138            .mongodb_uri("mongodb://localhost:27017")
2139            .database("testdb")
2140            .watch_deployment()
2141            .build()
2142            .unwrap();
2143
2144        assert!(matches!(config.watch_level, WatchLevel::Deployment));
2145    }
2146
2147    #[test]
2148    fn test_default_watch_level_is_database() {
2149        let config = PipelineConfig::builder()
2150            .mongodb_uri("mongodb://localhost:27017")
2151            .database("testdb")
2152            .build()
2153            .unwrap();
2154
2155        assert!(matches!(config.watch_level, WatchLevel::Database));
2156    }
2157
2158    #[test]
2159    #[allow(deprecated)]
2160    fn test_deprecated_collections_method_still_works() {
2161        let config = PipelineConfig::builder()
2162            .mongodb_uri("mongodb://localhost:27017")
2163            .database("testdb")
2164            .collections(vec!["users".to_string()])
2165            .build()
2166            .unwrap();
2167
2168        assert!(matches!(config.watch_level, WatchLevel::Collection(_)));
2169        if let WatchLevel::Collection(collections) = config.watch_level {
2170            assert_eq!(collections.len(), 1);
2171            assert_eq!(collections[0], "users");
2172        }
2173    }
2174
2175    #[test]
2176    fn test_watch_level_can_be_overridden() {
2177        // Start with collections, then switch to database
2178        let config = PipelineConfig::builder()
2179            .mongodb_uri("mongodb://localhost:27017")
2180            .database("testdb")
2181            .watch_collections(vec!["users".to_string()])
2182            .watch_database() // Override to database level
2183            .build()
2184            .unwrap();
2185
2186        assert!(matches!(config.watch_level, WatchLevel::Database));
2187    }
2188
2189    #[test]
2190    fn test_config_builder_defaults() {
2191        let config = PipelineConfig::builder()
2192            .mongodb_uri("mongodb://localhost:27017")
2193            .database("testdb")
2194            .build()
2195            .unwrap();
2196
2197        // Default values
2198        assert_eq!(config.batch_size, 100);
2199        assert_eq!(config.batch_timeout, Duration::from_secs(5));
2200        assert_eq!(config.max_retries, 0);
2201        assert_eq!(config.retry_delay, Duration::from_millis(100));
2202        assert_eq!(config.max_retry_delay, Duration::from_secs(30));
2203        assert_eq!(config.channel_buffer_size, 1000);
2204        assert!(matches!(config.watch_level, WatchLevel::Database));
2205    }
2206
2207    #[test]
2208    fn test_config_builder_validates_batch_size() {
2209        let result = PipelineConfig::builder()
2210            .mongodb_uri("mongodb://localhost:27017")
2211            .database("testdb")
2212            .batch_size(20_000) // Too large
2213            .build();
2214
2215        assert!(result.is_err());
2216        if let Err(e) = result {
2217            assert!(matches!(e, ConfigError::InvalidBatchSize { .. }));
2218        }
2219    }
2220
2221    #[test]
2222    fn test_config_builder_validates_channel_buffer_size() {
2223        let result = PipelineConfig::builder()
2224            .mongodb_uri("mongodb://localhost:27017")
2225            .database("testdb")
2226            .channel_buffer_size(5) // Too small
2227            .build();
2228
2229        assert!(result.is_err());
2230        if let Err(e) = result {
2231            assert!(matches!(e, ConfigError::InvalidChannelBufferSize { .. }));
2232        }
2233    }
2234
2235    #[test]
2236    fn test_config_builder_requires_mongodb_uri() {
2237        let result = PipelineConfig::builder().database("testdb").build();
2238
2239        assert!(result.is_err());
2240        if let Err(e) = result {
2241            assert!(matches!(e, ConfigError::MissingMongoUri));
2242        }
2243    }
2244
2245    #[test]
2246    fn test_config_builder_requires_database() {
2247        let result = PipelineConfig::builder()
2248            .mongodb_uri("mongodb://localhost:27017")
2249            .build();
2250
2251        assert!(result.is_err());
2252        if let Err(e) = result {
2253            assert!(matches!(e, ConfigError::MissingDatabase));
2254        }
2255    }
2256
2257    #[test]
2258    fn test_retry_delay_validation() {
2259        let result = PipelineConfig::builder()
2260            .mongodb_uri("mongodb://localhost:27017")
2261            .database("testdb")
2262            .retry_delay(Duration::from_secs(60))
2263            .max_retry_delay(Duration::from_secs(30)) // Less than retry_delay
2264            .build();
2265
2266        assert!(result.is_err());
2267        if let Err(e) = result {
2268            assert!(matches!(e, ConfigError::RetryDelayExceedsMax { .. }));
2269        }
2270    }
2271}