1use crate::destination::{Destination, DestinationError};
60use crate::event::ChangeEvent;
61use crate::metrics;
62use crate::state::StateStore;
63use crate::stream::{ChangeStreamConfig, ChangeStreamListener};
64use crate::watch_level::WatchLevel;
65use futures::StreamExt;
66use mongodb::bson::Document;
67use std::future::Future;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::time::Duration;
71use tokio::sync::{broadcast, Mutex, RwLock};
72use tokio::task::JoinHandle;
73use tokio::time::{interval, Instant};
74use tracing::{debug, error, info, instrument, warn};
75
76#[derive(Debug, Clone)]
102pub struct DistributedLockConfig {
103 pub enabled: bool,
114
115 pub ttl: Duration,
126
127 pub refresh_interval: Duration,
136
137 pub retry_interval: Duration,
144}
145
146impl Default for DistributedLockConfig {
147 fn default() -> Self {
148 Self {
149 enabled: true, ttl: Duration::from_secs(30),
151 refresh_interval: Duration::from_secs(10),
152 retry_interval: Duration::from_secs(5),
153 }
154 }
155}
156
157impl DistributedLockConfig {
158 #[must_use]
160 pub fn builder() -> DistributedLockConfigBuilder {
161 DistributedLockConfigBuilder::default()
162 }
163
164 #[must_use]
168 pub fn disabled() -> Self {
169 Self {
170 enabled: false,
171 ..Default::default()
172 }
173 }
174}
175
176#[derive(Debug, Default)]
178pub struct DistributedLockConfigBuilder {
179 enabled: Option<bool>,
180 ttl: Option<Duration>,
181 refresh_interval: Option<Duration>,
182 retry_interval: Option<Duration>,
183}
184
185impl DistributedLockConfigBuilder {
186 #[must_use]
188 pub fn enabled(mut self, enabled: bool) -> Self {
189 self.enabled = Some(enabled);
190 self
191 }
192
193 #[must_use]
195 pub fn ttl(mut self, ttl: Duration) -> Self {
196 self.ttl = Some(ttl);
197 self
198 }
199
200 #[must_use]
202 pub fn refresh_interval(mut self, interval: Duration) -> Self {
203 self.refresh_interval = Some(interval);
204 self
205 }
206
207 #[must_use]
209 pub fn retry_interval(mut self, interval: Duration) -> Self {
210 self.retry_interval = Some(interval);
211 self
212 }
213
214 pub fn build(self) -> Result<DistributedLockConfig, ConfigError> {
221 let ttl = self.ttl.unwrap_or(Duration::from_secs(30));
222 let refresh_interval = self.refresh_interval.unwrap_or(Duration::from_secs(10));
223
224 if refresh_interval >= ttl {
226 return Err(ConfigError::InvalidLockConfig {
227 reason: format!(
228 "refresh_interval ({:?}) must be less than ttl ({:?})",
229 refresh_interval, ttl
230 ),
231 });
232 }
233
234 Ok(DistributedLockConfig {
235 enabled: self.enabled.unwrap_or(true),
236 ttl,
237 refresh_interval,
238 retry_interval: self.retry_interval.unwrap_or(Duration::from_secs(5)),
239 })
240 }
241}
242
243#[derive(Debug, Clone)]
245pub struct PipelineConfig {
246 pub mongodb_uri: String,
248
249 pub database: String,
251
252 pub watch_level: WatchLevel,
259
260 pub batch_size: usize,
262
263 pub batch_timeout: Duration,
265
266 pub max_retries: usize,
268
269 pub retry_delay: Duration,
271
272 pub max_retry_delay: Duration,
274
275 pub channel_buffer_size: usize,
277
278 pub stream_config: ChangeStreamConfig,
280
281 pub distributed_lock: DistributedLockConfig,
288}
289
290impl PipelineConfig {
291 #[must_use]
293 pub fn builder() -> PipelineConfigBuilder {
294 PipelineConfigBuilder::default()
295 }
296}
297
298#[derive(Debug, Default)]
300pub struct PipelineConfigBuilder {
301 mongodb_uri: Option<String>,
302 database: Option<String>,
303 watch_level: Option<WatchLevel>,
304 batch_size: usize,
305 batch_timeout: Duration,
306 max_retries: usize,
307 retry_delay: Duration,
308 max_retry_delay: Duration,
309 channel_buffer_size: usize,
310 stream_config: Option<ChangeStreamConfig>,
311 distributed_lock: Option<DistributedLockConfig>,
312}
313
314impl PipelineConfigBuilder {
315 #[must_use]
317 pub fn mongodb_uri(mut self, uri: impl Into<String>) -> Self {
318 self.mongodb_uri = Some(uri.into());
319 self
320 }
321
322 #[must_use]
324 pub fn database(mut self, database: impl Into<String>) -> Self {
325 self.database = Some(database.into());
326 self
327 }
328
329 #[must_use]
357 #[deprecated(since = "0.2.0", note = "Use watch_collections() instead")]
358 pub fn collections(mut self, collections: Vec<String>) -> Self {
359 self.watch_level = Some(WatchLevel::Collection(collections));
360 self
361 }
362
363 #[must_use]
385 pub fn watch_collections(mut self, collections: Vec<String>) -> Self {
386 self.watch_level = Some(WatchLevel::Collection(collections));
387 self
388 }
389
390 #[must_use]
413 pub fn watch_database(mut self) -> Self {
414 self.watch_level = Some(WatchLevel::Database);
415 self
416 }
417
418 #[must_use]
447 pub fn watch_deployment(mut self) -> Self {
448 self.watch_level = Some(WatchLevel::Deployment);
449 self
450 }
451
452 #[must_use]
454 pub fn batch_size(mut self, size: usize) -> Self {
455 self.batch_size = size;
456 self
457 }
458
459 #[must_use]
461 pub fn batch_timeout(mut self, timeout: Duration) -> Self {
462 self.batch_timeout = timeout;
463 self
464 }
465
466 #[must_use]
468 pub fn max_retries(mut self, retries: usize) -> Self {
469 self.max_retries = retries;
470 self
471 }
472
473 #[must_use]
475 pub fn retry_delay(mut self, delay: Duration) -> Self {
476 self.retry_delay = delay;
477 self
478 }
479
480 #[must_use]
482 pub fn max_retry_delay(mut self, delay: Duration) -> Self {
483 self.max_retry_delay = delay;
484 self
485 }
486
487 #[must_use]
489 pub fn channel_buffer_size(mut self, size: usize) -> Self {
490 self.channel_buffer_size = size;
491 self
492 }
493
494 #[must_use]
496 pub fn stream_config(mut self, config: ChangeStreamConfig) -> Self {
497 self.stream_config = Some(config);
498 self
499 }
500
501 #[must_use]
522 pub fn distributed_lock(mut self, config: DistributedLockConfig) -> Self {
523 self.distributed_lock = Some(config);
524 self
525 }
526
527 #[must_use]
544 pub fn disable_distributed_lock(mut self) -> Self {
545 self.distributed_lock = Some(DistributedLockConfig::disabled());
546 self
547 }
548
549 pub fn build(self) -> Result<PipelineConfig, ConfigError> {
555 let mongodb_uri = self.mongodb_uri.ok_or(ConfigError::MissingMongoUri)?;
556 let database = self.database.ok_or(ConfigError::MissingDatabase)?;
557
558 let watch_level = self.watch_level.unwrap_or_default();
560
561 let batch_size = match self.batch_size {
563 0 => 100, size if size > 10_000 => {
565 return Err(ConfigError::InvalidBatchSize {
566 value: size,
567 reason: "batch_size exceeds maximum (10,000)",
568 })
569 }
570 size => size,
571 };
572
573 let batch_timeout = if self.batch_timeout.is_zero() {
575 Duration::from_secs(5) } else {
577 self.batch_timeout
578 };
579
580 let retry_delay = if self.retry_delay.is_zero() {
582 Duration::from_millis(100)
583 } else {
584 self.retry_delay
585 };
586
587 let max_retry_delay = if self.max_retry_delay.is_zero() {
588 Duration::from_secs(30)
589 } else {
590 self.max_retry_delay
591 };
592
593 if retry_delay > max_retry_delay {
595 return Err(ConfigError::RetryDelayExceedsMax {
596 retry_delay,
597 max_retry_delay,
598 });
599 }
600
601 let channel_buffer_size = match self.channel_buffer_size {
603 0 => 1000, size if size < 10 => {
605 return Err(ConfigError::InvalidChannelBufferSize {
606 value: size,
607 reason: "channel_buffer_size must be at least 10",
608 })
609 }
610 size => size,
611 };
612
613 let stream_config = self.stream_config.unwrap_or_else(|| {
614 ChangeStreamConfig::builder()
615 .build()
616 .expect("Default stream config should build")
617 });
618
619 let distributed_lock = self.distributed_lock.unwrap_or_default();
620
621 Ok(PipelineConfig {
622 mongodb_uri,
623 database,
624 watch_level,
625 batch_size,
626 batch_timeout,
627 max_retries: self.max_retries,
628 retry_delay,
629 max_retry_delay,
630 channel_buffer_size,
631 stream_config,
632 distributed_lock,
633 })
634 }
635}
636
637#[derive(Debug, Clone, Default)]
639pub struct PipelineStats {
640 pub events_processed: u64,
642
643 pub batches_written: u64,
645
646 pub write_errors: u64,
648
649 pub retries: u64,
651}
652
653type WorkerHandle = JoinHandle<Result<(), PipelineError>>;
655
656type LockRefreshHandle = JoinHandle<()>;
658
659pub struct Pipeline<S: StateStore, D: Destination> {
661 config: PipelineConfig,
663
664 store: Arc<S>,
666
667 destination: Arc<Mutex<D>>,
669
670 shutdown_tx: Option<broadcast::Sender<()>>,
672
673 workers: Arc<RwLock<Vec<WorkerHandle>>>,
675
676 lock_refresh_handles: Arc<RwLock<Vec<LockRefreshHandle>>>,
678
679 stats: Arc<RwLock<PipelineStats>>,
681
682 running: Arc<RwLock<bool>>,
684
685 owner_id: String,
689
690 locked_collections: Arc<RwLock<Vec<String>>>,
692}
693
694impl<S: StateStore + Send + Sync + 'static, D: Destination + Send + Sync + 'static> Pipeline<S, D> {
695 pub async fn new(
701 config: PipelineConfig,
702 store: S,
703 destination: D,
704 ) -> Result<Self, PipelineError> {
705 let hostname = hostname::get()
707 .map(|h| h.to_string_lossy().to_string())
708 .unwrap_or_else(|_| "unknown".to_string());
709 let owner_id = format!("{}-{}", hostname, uuid::Uuid::new_v4());
710
711 info!(
712 database = %config.database,
713 watch_level = %config.watch_level,
714 batch_size = config.batch_size,
715 batch_timeout = ?config.batch_timeout,
716 owner_id = %owner_id,
717 distributed_lock_enabled = config.distributed_lock.enabled,
718 "Creating pipeline"
719 );
720
721 Ok(Self {
722 config,
723 store: Arc::new(store),
724 destination: Arc::new(Mutex::new(destination)),
725 shutdown_tx: None,
726 workers: Arc::new(RwLock::new(Vec::new())),
727 lock_refresh_handles: Arc::new(RwLock::new(Vec::new())),
728 stats: Arc::new(RwLock::new(PipelineStats::default())),
729 running: Arc::new(RwLock::new(false)),
730 owner_id,
731 locked_collections: Arc::new(RwLock::new(Vec::new())),
732 })
733 }
734
735 #[must_use]
737 pub fn owner_id(&self) -> &str {
738 &self.owner_id
739 }
740
741 fn lock_key(&self, collection: &str) -> String {
743 format!("rigatoni:lock:{}:{}", self.config.database, collection)
744 }
745
746 fn database_lock_key(&self) -> String {
748 format!("rigatoni:lock:{}:__database__", self.config.database)
749 }
750
751 fn deployment_lock_key(&self) -> String {
753 "rigatoni:lock:__deployment__".to_string()
754 }
755
756 #[instrument(skip(self), fields(database = %self.config.database, owner_id = %self.owner_id))]
776 pub async fn start(&mut self) -> Result<(), PipelineError> {
777 let mut running = self.running.write().await;
779 if *running {
780 return Err(PipelineError::AlreadyRunning);
781 }
782
783 info!(
784 watch_level = %self.config.watch_level,
785 distributed_lock_enabled = self.config.distributed_lock.enabled,
786 "Starting pipeline"
787 );
788
789 let (shutdown_tx, _) = broadcast::channel(1);
791 self.shutdown_tx = Some(shutdown_tx.clone());
792
793 let mut workers = self.workers.write().await;
794 let mut lock_refresh_handles = self.lock_refresh_handles.write().await;
795 let mut locked_collections = self.locked_collections.write().await;
796 let mut num_workers = 0;
797
798 match &self.config.watch_level {
799 WatchLevel::Collection(collections) => {
800 if collections.is_empty() {
802 return Err(PipelineError::Configuration(
803 "No collections specified. Either provide collection names with \
804 watch_collections() or use watch_database() to watch all collections."
805 .to_string(),
806 ));
807 }
808
809 info!(
810 collections = ?collections,
811 "Starting collection-level watching"
812 );
813
814 for collection in collections {
816 let lock_key = self.lock_key(collection);
817
818 if self.config.distributed_lock.enabled {
820 let acquired = self
821 .store
822 .try_acquire_lock(
823 &lock_key,
824 &self.owner_id,
825 self.config.distributed_lock.ttl,
826 )
827 .await
828 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
829
830 if !acquired {
831 info!(
832 collection = %collection,
833 "Collection is locked by another instance, skipping"
834 );
835 metrics::increment_lock_acquisition_failures(
837 metrics::LockFailureReason::AlreadyHeld,
838 );
839 continue;
841 }
842
843 info!(collection = %collection, "Acquired lock for collection");
844 metrics::increment_lock_acquisitions();
845 locked_collections.push(collection.clone());
846
847 let refresh_handle = self.start_lock_refresh_task(
849 lock_key,
850 collection.clone(),
851 shutdown_tx.subscribe(),
852 );
853 lock_refresh_handles.push(refresh_handle);
854 }
855
856 let shutdown_rx = shutdown_tx.subscribe();
857 let worker = self
858 .spawn_collection_worker(collection.clone(), shutdown_rx)
859 .await?;
860
861 workers.push(worker);
862 num_workers += 1;
863 }
864
865 metrics::set_active_collections(num_workers);
867 }
868 WatchLevel::Database => {
869 info!(
870 database = %self.config.database,
871 "Starting database-level watching"
872 );
873
874 let lock_key = self.database_lock_key();
875
876 if self.config.distributed_lock.enabled {
878 let acquired = self
879 .store
880 .try_acquire_lock(
881 &lock_key,
882 &self.owner_id,
883 self.config.distributed_lock.ttl,
884 )
885 .await
886 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
887
888 if !acquired {
889 info!("Database is locked by another instance, cannot start");
890 metrics::increment_lock_acquisition_failures(
891 metrics::LockFailureReason::AlreadyHeld,
892 );
893 return Err(PipelineError::Configuration(
894 "Database is locked by another instance. \
895 Wait for the lock to expire or use collection-level watching."
896 .to_string(),
897 ));
898 }
899
900 info!("Acquired lock for database");
901 metrics::increment_lock_acquisitions();
902 locked_collections.push("__database__".to_string());
903
904 let refresh_handle = self.start_lock_refresh_task(
906 lock_key,
907 "__database__".to_string(),
908 shutdown_tx.subscribe(),
909 );
910 lock_refresh_handles.push(refresh_handle);
911 }
912
913 let shutdown_rx = shutdown_tx.subscribe();
914 let worker = self.spawn_database_worker(shutdown_rx).await?;
915 workers.push(worker);
916 num_workers = 1;
917
918 metrics::set_active_collections(1);
920 }
921 WatchLevel::Deployment => {
922 info!("Starting deployment-level watching (cluster-wide)");
923
924 let lock_key = self.deployment_lock_key();
925
926 if self.config.distributed_lock.enabled {
928 let acquired = self
929 .store
930 .try_acquire_lock(
931 &lock_key,
932 &self.owner_id,
933 self.config.distributed_lock.ttl,
934 )
935 .await
936 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
937
938 if !acquired {
939 info!("Deployment is locked by another instance, cannot start");
940 metrics::increment_lock_acquisition_failures(
941 metrics::LockFailureReason::AlreadyHeld,
942 );
943 return Err(PipelineError::Configuration(
944 "Deployment is locked by another instance. \
945 Wait for the lock to expire or use collection/database-level watching."
946 .to_string(),
947 ));
948 }
949
950 info!("Acquired lock for deployment");
951 metrics::increment_lock_acquisitions();
952 locked_collections.push("__deployment__".to_string());
953
954 let refresh_handle = self.start_lock_refresh_task(
956 lock_key,
957 "__deployment__".to_string(),
958 shutdown_tx.subscribe(),
959 );
960 lock_refresh_handles.push(refresh_handle);
961 }
962
963 let shutdown_rx = shutdown_tx.subscribe();
964 let worker = self.spawn_deployment_worker(shutdown_rx).await?;
965 workers.push(worker);
966 num_workers = 1;
967
968 metrics::set_active_collections(1);
970 }
971 }
972
973 *running = true;
974 info!(
975 workers = num_workers,
976 locks_held = locked_collections.len(),
977 "Pipeline started"
978 );
979
980 metrics::set_pipeline_status(metrics::PipelineStatus::Running);
982 metrics::set_locks_held(locked_collections.len());
983
984 Ok(())
985 }
986
987 fn start_lock_refresh_task(
989 &self,
990 lock_key: String,
991 collection_name: String,
992 mut shutdown_rx: broadcast::Receiver<()>,
993 ) -> LockRefreshHandle {
994 let store = Arc::clone(&self.store);
995 let owner_id = self.owner_id.clone();
996 let refresh_interval = self.config.distributed_lock.refresh_interval;
997 let ttl = self.config.distributed_lock.ttl;
998
999 tokio::spawn(async move {
1000 let mut interval_timer = interval(refresh_interval);
1001 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1002
1003 loop {
1004 tokio::select! {
1005 _ = shutdown_rx.recv() => {
1006 debug!(
1007 collection = %collection_name,
1008 "Lock refresh task received shutdown signal"
1009 );
1010 break;
1011 }
1012 _ = interval_timer.tick() => {
1013 match store.refresh_lock(&lock_key, &owner_id, ttl).await {
1014 Ok(true) => {
1015 debug!(
1016 collection = %collection_name,
1017 "Lock refreshed successfully"
1018 );
1019 metrics::increment_lock_refreshes();
1020 }
1021 Ok(false) => {
1022 error!(
1023 collection = %collection_name,
1024 "Lost lock (acquired by another instance or expired)"
1025 );
1026 metrics::increment_locks_lost();
1027 break;
1029 }
1030 Err(e) => {
1031 warn!(
1032 collection = %collection_name,
1033 error = %e,
1034 "Failed to refresh lock (transient error, will retry)"
1035 );
1036 }
1038 }
1039 }
1040 }
1041 }
1042 })
1043 }
1044
1045 async fn spawn_collection_worker(
1047 &self,
1048 collection: String,
1049 shutdown_rx: broadcast::Receiver<()>,
1050 ) -> Result<WorkerHandle, PipelineError> {
1051 let config = self.config.clone();
1052 let store = Arc::clone(&self.store);
1053 let destination = Arc::clone(&self.destination);
1054 let stats = Arc::clone(&self.stats);
1055
1056 let handle = tokio::spawn(async move {
1057 Self::collection_worker(collection, config, store, destination, stats, shutdown_rx)
1058 .await
1059 });
1060
1061 Ok(handle)
1062 }
1063
1064 async fn spawn_database_worker(
1066 &self,
1067 shutdown_rx: broadcast::Receiver<()>,
1068 ) -> Result<WorkerHandle, PipelineError> {
1069 let config = self.config.clone();
1070 let store = Arc::clone(&self.store);
1071 let destination = Arc::clone(&self.destination);
1072 let stats = Arc::clone(&self.stats);
1073
1074 let handle = tokio::spawn(async move {
1075 Self::database_worker(config, store, destination, stats, shutdown_rx).await
1076 });
1077
1078 Ok(handle)
1079 }
1080
1081 async fn spawn_deployment_worker(
1083 &self,
1084 shutdown_rx: broadcast::Receiver<()>,
1085 ) -> Result<WorkerHandle, PipelineError> {
1086 let config = self.config.clone();
1087 let store = Arc::clone(&self.store);
1088 let destination = Arc::clone(&self.destination);
1089 let stats = Arc::clone(&self.stats);
1090
1091 let handle = tokio::spawn(async move {
1092 Self::deployment_worker(config, store, destination, stats, shutdown_rx).await
1093 });
1094
1095 Ok(handle)
1096 }
1097
1098 #[allow(clippy::too_many_lines)]
1100 #[instrument(skip(config, store, destination, stats, shutdown_rx), fields(collection = %collection))]
1101 async fn collection_worker(
1102 collection: String,
1103 config: PipelineConfig,
1104 store: Arc<S>,
1105 destination: Arc<Mutex<D>>,
1106 stats: Arc<RwLock<PipelineStats>>,
1107 mut shutdown_rx: broadcast::Receiver<()>,
1108 ) -> Result<(), PipelineError> {
1109 info!("Starting collection worker");
1110
1111 let resume_token_key = config
1113 .watch_level
1114 .resume_token_key(&config.database, Some(&collection));
1115
1116 let resume_token = store
1118 .get_resume_token(&resume_token_key)
1119 .await
1120 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1121
1122 if let Some(ref token) = resume_token {
1123 info!(?token, "Resuming from saved token");
1124 }
1125
1126 let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
1128 .await
1129 .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
1130
1131 let db = client.database(&config.database);
1132 let mongo_collection = db.collection(&collection);
1133
1134 let store_clone = Arc::clone(&store);
1136 let resume_key = resume_token_key.clone();
1137 let resume_token_callback = move |token: Document| {
1138 let store = Arc::clone(&store_clone);
1139 let key = resume_key.clone();
1140 Box::pin(async move {
1141 store
1142 .save_resume_token(&key, &token)
1143 .await
1144 .map_err(|e| e.to_string())
1145 }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
1146 };
1147
1148 let mut listener = ChangeStreamListener::new(
1150 mongo_collection,
1151 config.stream_config.clone(),
1152 resume_token_callback,
1153 )
1154 .await
1155 .map_err(|e| PipelineError::ChangeStream(e.to_string()))?;
1156
1157 let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
1159 let mut last_resume_token: Option<Document> = None;
1160
1161 let mut batch_timer = interval(config.batch_timeout);
1163 batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1164
1165 info!(
1166 batch_size = config.batch_size,
1167 batch_timeout = ?config.batch_timeout,
1168 "Worker event loop started"
1169 );
1170
1171 loop {
1172 tokio::select! {
1173 _ = shutdown_rx.recv() => {
1175 info!("Received shutdown signal");
1176
1177 if !batch.is_empty() {
1179 info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
1180 if let Err(e) = Self::flush_batch(
1181 &collection,
1182 &mut batch,
1183 last_resume_token.as_ref(),
1184 &destination,
1185 &store,
1186 &stats,
1187 &config,
1188 )
1189 .await
1190 {
1191 error!(?e, "Failed to flush batch on shutdown");
1192 }
1193 }
1194
1195 info!("Worker shutting down gracefully");
1196 break;
1197 }
1198
1199 _ = batch_timer.tick() => {
1201 if !batch.is_empty() {
1202 debug!(batch_size = batch.len(), "Batch timeout - flushing");
1203
1204 if let Err(e) = Self::flush_batch(
1205 &collection,
1206 &mut batch,
1207 last_resume_token.as_ref(),
1208 &destination,
1209 &store,
1210 &stats,
1211 &config,
1212 )
1213 .await
1214 {
1215 error!(?e, "Failed to flush batch on timeout");
1216 }
1218 }
1219 }
1220
1221 event_result = listener.next() => {
1223 match event_result {
1224 Some(Ok(ackable_event)) => {
1225 let event = ackable_event.event.clone();
1227
1228 debug!(
1229 operation = ?event.operation,
1230 collection = %event.namespace.collection,
1231 "Received event"
1232 );
1233
1234 last_resume_token = Some(event.resume_token.clone());
1236
1237 ackable_event.ack();
1239
1240 batch.push(event.clone());
1242
1243 metrics::increment_batch_queue_size(&collection);
1245
1246 if batch.len() >= config.batch_size {
1248 debug!(batch_size = batch.len(), "Batch full - flushing");
1249
1250 if let Err(e) = Self::flush_batch(
1251 &collection,
1252 &mut batch,
1253 last_resume_token.as_ref(),
1254 &destination,
1255 &store,
1256 &stats,
1257 &config,
1258 )
1259 .await
1260 {
1261 error!(?e, "Failed to flush full batch");
1262 }
1264 }
1265 }
1266 Some(Err(e)) => {
1267 error!(?e, "Error reading from change stream");
1268 tokio::time::sleep(Duration::from_secs(1)).await;
1270 }
1271 None => {
1272 warn!("Change stream ended unexpectedly");
1274 break;
1275 }
1276 }
1277 }
1278 }
1279 }
1280
1281 Ok(())
1282 }
1283
1284 #[allow(clippy::too_many_lines)]
1289 #[instrument(skip(config, store, destination, stats, shutdown_rx), fields(database = %config.database))]
1290 async fn database_worker(
1291 config: PipelineConfig,
1292 store: Arc<S>,
1293 destination: Arc<Mutex<D>>,
1294 stats: Arc<RwLock<PipelineStats>>,
1295 mut shutdown_rx: broadcast::Receiver<()>,
1296 ) -> Result<(), PipelineError> {
1297 info!("Starting database worker");
1298
1299 let resume_token_key = config.watch_level.resume_token_key(&config.database, None);
1301
1302 let resume_token = store
1304 .get_resume_token(&resume_token_key)
1305 .await
1306 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1307
1308 if let Some(ref token) = resume_token {
1309 info!(?token, "Resuming from saved token");
1310 }
1311
1312 let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
1314 .await
1315 .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
1316
1317 let db = client.database(&config.database);
1318
1319 let mut options = mongodb::options::ChangeStreamOptions::default();
1321
1322 if config.stream_config.full_document_on_update {
1323 options.full_document = Some(mongodb::options::FullDocumentType::UpdateLookup);
1324 }
1325
1326 if config.stream_config.full_document_before_change {
1327 options.full_document_before_change =
1328 Some(mongodb::options::FullDocumentBeforeChangeType::WhenAvailable);
1329 }
1330
1331 options.batch_size = config.stream_config.batch_size;
1332
1333 if let Some(ref token_doc) = resume_token {
1335 if let Ok(bytes) = bson::to_vec(token_doc) {
1336 if let Ok(resume_token) =
1337 bson::from_slice::<mongodb::change_stream::event::ResumeToken>(&bytes)
1338 {
1339 options.resume_after = Some(resume_token);
1340 }
1341 }
1342 }
1343
1344 let mut stream = if config.stream_config.pipeline.is_empty() {
1346 db.watch().with_options(options).await
1347 } else {
1348 db.watch()
1349 .pipeline(config.stream_config.pipeline.clone())
1350 .with_options(options)
1351 .await
1352 }
1353 .map_err(|e| PipelineError::MongoDB(format!("Failed to create database watch: {}", e)))?;
1354
1355 info!("Database change stream created successfully");
1356
1357 let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
1359 let mut last_resume_token: Option<Document> = None;
1360
1361 let mut batch_timer = interval(config.batch_timeout);
1363 batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1364
1365 let metrics_label = format!("__db:{}", config.database);
1367
1368 info!(
1369 batch_size = config.batch_size,
1370 batch_timeout = ?config.batch_timeout,
1371 "Database worker event loop started"
1372 );
1373
1374 loop {
1375 tokio::select! {
1376 _ = shutdown_rx.recv() => {
1378 info!("Received shutdown signal");
1379
1380 if !batch.is_empty() {
1382 info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
1383 if let Err(e) = Self::flush_batch(
1384 &metrics_label,
1385 &mut batch,
1386 last_resume_token.as_ref(),
1387 &destination,
1388 &store,
1389 &stats,
1390 &config,
1391 )
1392 .await
1393 {
1394 error!(?e, "Failed to flush batch on shutdown");
1395 }
1396 }
1397
1398 info!("Database worker shutting down gracefully");
1399 break;
1400 }
1401
1402 _ = batch_timer.tick() => {
1404 if !batch.is_empty() {
1405 debug!(batch_size = batch.len(), "Batch timeout - flushing");
1406
1407 if let Err(e) = Self::flush_batch(
1408 &metrics_label,
1409 &mut batch,
1410 last_resume_token.as_ref(),
1411 &destination,
1412 &store,
1413 &stats,
1414 &config,
1415 )
1416 .await
1417 {
1418 error!(?e, "Failed to flush batch on timeout");
1419 }
1420 }
1421 }
1422
1423 event_result = stream.next() => {
1425 match event_result {
1426 Some(Ok(change_event)) => {
1427 let resume_token = match bson::to_document(&change_event.id) {
1429 Ok(token) => token,
1430 Err(e) => {
1431 error!(?e, "Failed to serialize resume token");
1432 continue;
1433 }
1434 };
1435
1436 let event = match ChangeEvent::try_from(change_event) {
1438 Ok(evt) => evt,
1439 Err(e) => {
1440 error!(?e, "Failed to convert change event");
1441 continue;
1442 }
1443 };
1444
1445 debug!(
1446 operation = ?event.operation,
1447 database = %event.namespace.database,
1448 collection = %event.namespace.collection,
1449 "Received database event"
1450 );
1451
1452 last_resume_token = Some(resume_token.clone());
1454
1455 if let Err(e) = store.save_resume_token(&resume_token_key, &resume_token).await {
1457 warn!(?e, "Failed to save resume token");
1458 }
1459
1460 batch.push(event);
1462
1463 metrics::increment_batch_queue_size(&metrics_label);
1465
1466 if batch.len() >= config.batch_size {
1468 debug!(batch_size = batch.len(), "Batch full - flushing");
1469
1470 if let Err(e) = Self::flush_batch(
1471 &metrics_label,
1472 &mut batch,
1473 last_resume_token.as_ref(),
1474 &destination,
1475 &store,
1476 &stats,
1477 &config,
1478 )
1479 .await
1480 {
1481 error!(?e, "Failed to flush full batch");
1482 }
1483 }
1484 }
1485 Some(Err(e)) => {
1486 error!(?e, "Error reading from database change stream");
1487 tokio::time::sleep(Duration::from_secs(1)).await;
1489 }
1490 None => {
1491 warn!("Database change stream ended unexpectedly");
1492 break;
1493 }
1494 }
1495 }
1496 }
1497 }
1498
1499 Ok(())
1500 }
1501
1502 #[allow(clippy::too_many_lines)]
1507 #[instrument(skip(config, store, destination, stats, shutdown_rx))]
1508 async fn deployment_worker(
1509 config: PipelineConfig,
1510 store: Arc<S>,
1511 destination: Arc<Mutex<D>>,
1512 stats: Arc<RwLock<PipelineStats>>,
1513 mut shutdown_rx: broadcast::Receiver<()>,
1514 ) -> Result<(), PipelineError> {
1515 info!("Starting deployment worker (cluster-wide)");
1516
1517 let resume_token_key = config.watch_level.resume_token_key(&config.database, None);
1519
1520 let resume_token = store
1522 .get_resume_token(&resume_token_key)
1523 .await
1524 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1525
1526 if let Some(ref token) = resume_token {
1527 info!(?token, "Resuming from saved token");
1528 }
1529
1530 let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
1532 .await
1533 .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
1534
1535 let mut options = mongodb::options::ChangeStreamOptions::default();
1537
1538 if config.stream_config.full_document_on_update {
1539 options.full_document = Some(mongodb::options::FullDocumentType::UpdateLookup);
1540 }
1541
1542 if config.stream_config.full_document_before_change {
1543 options.full_document_before_change =
1544 Some(mongodb::options::FullDocumentBeforeChangeType::WhenAvailable);
1545 }
1546
1547 options.batch_size = config.stream_config.batch_size;
1548
1549 if let Some(ref token_doc) = resume_token {
1551 if let Ok(bytes) = bson::to_vec(token_doc) {
1552 if let Ok(resume_token) =
1553 bson::from_slice::<mongodb::change_stream::event::ResumeToken>(&bytes)
1554 {
1555 options.resume_after = Some(resume_token);
1556 }
1557 }
1558 }
1559
1560 let mut stream = if config.stream_config.pipeline.is_empty() {
1562 client.watch().with_options(options).await
1563 } else {
1564 client
1565 .watch()
1566 .pipeline(config.stream_config.pipeline.clone())
1567 .with_options(options)
1568 .await
1569 }
1570 .map_err(|e| PipelineError::MongoDB(format!("Failed to create deployment watch: {}", e)))?;
1571
1572 info!("Deployment change stream created successfully");
1573
1574 let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
1576 let mut last_resume_token: Option<Document> = None;
1577
1578 let mut batch_timer = interval(config.batch_timeout);
1580 batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1581
1582 let metrics_label = "__deployment__".to_string();
1584
1585 info!(
1586 batch_size = config.batch_size,
1587 batch_timeout = ?config.batch_timeout,
1588 "Deployment worker event loop started"
1589 );
1590
1591 loop {
1592 tokio::select! {
1593 _ = shutdown_rx.recv() => {
1595 info!("Received shutdown signal");
1596
1597 if !batch.is_empty() {
1599 info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
1600 if let Err(e) = Self::flush_batch(
1601 &metrics_label,
1602 &mut batch,
1603 last_resume_token.as_ref(),
1604 &destination,
1605 &store,
1606 &stats,
1607 &config,
1608 )
1609 .await
1610 {
1611 error!(?e, "Failed to flush batch on shutdown");
1612 }
1613 }
1614
1615 info!("Deployment worker shutting down gracefully");
1616 break;
1617 }
1618
1619 _ = batch_timer.tick() => {
1621 if !batch.is_empty() {
1622 debug!(batch_size = batch.len(), "Batch timeout - flushing");
1623
1624 if let Err(e) = Self::flush_batch(
1625 &metrics_label,
1626 &mut batch,
1627 last_resume_token.as_ref(),
1628 &destination,
1629 &store,
1630 &stats,
1631 &config,
1632 )
1633 .await
1634 {
1635 error!(?e, "Failed to flush batch on timeout");
1636 }
1637 }
1638 }
1639
1640 event_result = stream.next() => {
1642 match event_result {
1643 Some(Ok(change_event)) => {
1644 let resume_token = match bson::to_document(&change_event.id) {
1646 Ok(token) => token,
1647 Err(e) => {
1648 error!(?e, "Failed to serialize resume token");
1649 continue;
1650 }
1651 };
1652
1653 let event = match ChangeEvent::try_from(change_event) {
1655 Ok(evt) => evt,
1656 Err(e) => {
1657 error!(?e, "Failed to convert change event");
1658 continue;
1659 }
1660 };
1661
1662 debug!(
1663 operation = ?event.operation,
1664 database = %event.namespace.database,
1665 collection = %event.namespace.collection,
1666 "Received deployment event"
1667 );
1668
1669 last_resume_token = Some(resume_token.clone());
1671
1672 if let Err(e) = store.save_resume_token(&resume_token_key, &resume_token).await {
1674 warn!(?e, "Failed to save resume token");
1675 }
1676
1677 batch.push(event);
1679
1680 metrics::increment_batch_queue_size(&metrics_label);
1682
1683 if batch.len() >= config.batch_size {
1685 debug!(batch_size = batch.len(), "Batch full - flushing");
1686
1687 if let Err(e) = Self::flush_batch(
1688 &metrics_label,
1689 &mut batch,
1690 last_resume_token.as_ref(),
1691 &destination,
1692 &store,
1693 &stats,
1694 &config,
1695 )
1696 .await
1697 {
1698 error!(?e, "Failed to flush full batch");
1699 }
1700 }
1701 }
1702 Some(Err(e)) => {
1703 error!(?e, "Error reading from deployment change stream");
1704 tokio::time::sleep(Duration::from_secs(1)).await;
1706 }
1707 None => {
1708 warn!("Deployment change stream ended unexpectedly");
1709 break;
1710 }
1711 }
1712 }
1713 }
1714 }
1715
1716 Ok(())
1717 }
1718
1719 #[instrument(skip(batch, last_resume_token, destination, store, stats, config), fields(collection = %collection, batch_size = batch.len()))]
1721 async fn flush_batch(
1722 collection: &str,
1723 batch: &mut Vec<ChangeEvent>,
1724 last_resume_token: Option<&Document>,
1725 destination: &Arc<Mutex<D>>,
1726 store: &Arc<S>,
1727 stats: &Arc<RwLock<PipelineStats>>,
1728 config: &PipelineConfig,
1729 ) -> Result<(), PipelineError> {
1730 if batch.is_empty() {
1731 return Ok(());
1732 }
1733
1734 let batch_size = batch.len();
1735 let start_time = Instant::now();
1736
1737 debug!("Flushing batch to destination");
1738
1739 metrics::record_batch_size(batch_size, collection);
1741
1742 Self::write_with_retry(batch, destination, config, stats).await?;
1744
1745 let elapsed = start_time.elapsed();
1746 info!(
1747 batch_size,
1748 elapsed_ms = elapsed.as_millis(),
1749 "Batch written successfully"
1750 );
1751
1752 metrics::record_batch_duration(elapsed.as_secs_f64(), collection);
1754
1755 if let Some(token) = last_resume_token {
1757 store
1758 .save_resume_token(collection, token)
1759 .await
1760 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
1761
1762 debug!("Resume token saved");
1763 }
1764
1765 let mut operation_counts = std::collections::HashMap::new();
1767 for event in batch.iter() {
1768 *operation_counts.entry(&event.operation).or_insert(0u64) += 1;
1769 }
1770 for (operation, count) in operation_counts {
1771 metrics::increment_events_processed_by(count, collection, operation.as_str());
1772 }
1773
1774 let mut s = stats.write().await;
1776 s.events_processed += batch_size as u64;
1777 s.batches_written += 1;
1778
1779 metrics::decrement_batch_queue_size(batch_size, collection);
1781
1782 batch.clear();
1784
1785 Ok(())
1786 }
1787
1788 #[instrument(skip(batch, destination, config, stats), fields(batch_size = batch.len()))]
1790 async fn write_with_retry(
1791 batch: &[ChangeEvent],
1792 destination: &Arc<Mutex<D>>,
1793 config: &PipelineConfig,
1794 stats: &Arc<RwLock<PipelineStats>>,
1795 ) -> Result<(), PipelineError> {
1796 let mut retry_delay = config.retry_delay;
1797 let mut attempt = 0;
1798
1799 loop {
1800 let result = {
1801 let mut dest = destination.lock().await;
1802 match dest.write_batch(batch).await {
1803 Ok(()) => dest.flush().await,
1804 Err(e) => Err(e),
1805 }
1806 };
1807
1808 match result {
1809 Ok(()) => {
1810 if attempt > 0 {
1811 info!(attempts = attempt + 1, "Write succeeded after retries");
1812 }
1813 return Ok(());
1814 }
1815 Err(e) => {
1816 {
1818 let mut s = stats.write().await;
1819 s.write_errors += 1;
1820 }
1821
1822 let error_category = Self::categorize_error(&e);
1824 let destination_type = {
1825 let dest = destination.lock().await;
1826 dest.metadata().destination_type.clone()
1827 };
1828 metrics::increment_destination_errors(&destination_type, error_category);
1829
1830 attempt += 1;
1831
1832 if attempt > config.max_retries {
1833 error!(attempts = attempt, ?e, "Write failed after max retries");
1834 return Err(PipelineError::Destination(e.to_string()));
1835 }
1836
1837 if !Self::is_retryable_error(&e) {
1839 error!(?e, "Non-retryable error encountered");
1840 return Err(PipelineError::Destination(e.to_string()));
1841 }
1842
1843 {
1845 let mut s = stats.write().await;
1846 s.retries += 1;
1847 }
1848
1849 metrics::increment_retries(error_category);
1851
1852 warn!(
1853 attempt,
1854 max_retries = config.max_retries,
1855 retry_delay_ms = retry_delay.as_millis(),
1856 ?e,
1857 "Write failed, retrying"
1858 );
1859
1860 tokio::time::sleep(retry_delay).await;
1862
1863 retry_delay = std::cmp::min(retry_delay * 2, config.max_retry_delay);
1865 }
1866 }
1867 }
1868 }
1869
1870 fn is_retryable_error(error: &DestinationError) -> bool {
1872 error.to_string().contains("retryable") || error.to_string().contains("timeout")
1875 }
1876
1877 fn categorize_error(error: &DestinationError) -> metrics::ErrorCategory {
1881 let error_str = error.to_string().to_lowercase();
1882
1883 if error_str.contains("timeout") {
1884 metrics::ErrorCategory::Timeout
1885 } else if error_str.contains("connection") || error_str.contains("network") {
1886 metrics::ErrorCategory::Connection
1887 } else if error_str.contains("serialization") || error_str.contains("encode") {
1888 metrics::ErrorCategory::Serialization
1889 } else if error_str.contains("permission") || error_str.contains("auth") {
1890 metrics::ErrorCategory::Permission
1891 } else if error_str.contains("validation") {
1892 metrics::ErrorCategory::Validation
1893 } else if error_str.contains("not found") || error_str.contains("404") {
1894 metrics::ErrorCategory::NotFound
1895 } else if error_str.contains("rate limit") || error_str.contains("throttle") {
1896 metrics::ErrorCategory::RateLimit
1897 } else {
1898 metrics::ErrorCategory::Unknown
1899 }
1900 }
1901
1902 #[instrument(skip(self), fields(owner_id = %self.owner_id))]
1915 pub async fn stop(&mut self) -> Result<(), PipelineError> {
1916 info!("Stopping pipeline");
1917
1918 let mut running = self.running.write().await;
1919 if !*running {
1920 warn!("Pipeline is not running");
1921 return Ok(());
1922 }
1923
1924 if let Some(tx) = self.shutdown_tx.take() {
1926 let _ = tx.send(());
1927 }
1928
1929 let mut workers = self.workers.write().await;
1931 for worker in workers.drain(..) {
1932 match worker.await {
1933 Ok(Ok(())) => {
1934 debug!("Worker stopped successfully");
1935 }
1936 Ok(Err(e)) => {
1937 error!(?e, "Worker stopped with error");
1938 }
1939 Err(e) => {
1940 error!(?e, "Worker panicked");
1941 }
1942 }
1943 }
1944
1945 let mut lock_refresh_handles = self.lock_refresh_handles.write().await;
1947 for handle in lock_refresh_handles.drain(..) {
1948 if let Err(e) = handle.await {
1949 warn!(?e, "Lock refresh task panicked");
1950 }
1951 }
1952
1953 if self.config.distributed_lock.enabled {
1955 let locked_collections = self.locked_collections.read().await;
1956 for collection in locked_collections.iter() {
1957 let lock_key = match collection.as_str() {
1958 "__database__" => self.database_lock_key(),
1959 "__deployment__" => self.deployment_lock_key(),
1960 _ => self.lock_key(collection),
1961 };
1962
1963 match self.store.release_lock(&lock_key, &self.owner_id).await {
1964 Ok(true) => {
1965 info!(collection = %collection, "Released lock");
1966 metrics::increment_locks_released();
1967 }
1968 Ok(false) => {
1969 warn!(
1970 collection = %collection,
1971 "Lock was not held by this instance (already released or stolen)"
1972 );
1973 }
1974 Err(e) => {
1975 warn!(
1976 collection = %collection,
1977 error = %e,
1978 "Failed to release lock (will expire via TTL)"
1979 );
1980 }
1981 }
1982 }
1983 }
1984
1985 let mut dest = self.destination.lock().await;
1987 dest.flush()
1988 .await
1989 .map_err(|e| PipelineError::Destination(e.to_string()))?;
1990 dest.close()
1991 .await
1992 .map_err(|e| PipelineError::Destination(e.to_string()))?;
1993
1994 *running = false;
1995
1996 let mut locked_collections = self.locked_collections.write().await;
1998 locked_collections.clear();
1999
2000 metrics::set_pipeline_status(metrics::PipelineStatus::Stopped);
2002 metrics::set_active_collections(0);
2003 metrics::set_locks_held(0);
2004
2005 let stats = self.stats.read().await;
2007 info!(
2008 events_processed = stats.events_processed,
2009 batches_written = stats.batches_written,
2010 write_errors = stats.write_errors,
2011 retries = stats.retries,
2012 "Pipeline stopped"
2013 );
2014
2015 Ok(())
2016 }
2017
2018 #[must_use]
2020 pub async fn stats(&self) -> PipelineStats {
2021 self.stats.read().await.clone()
2022 }
2023
2024 #[must_use]
2026 pub async fn is_running(&self) -> bool {
2027 *self.running.read().await
2028 }
2029}
2030
2031#[derive(Debug, thiserror::Error)]
2033pub enum ConfigError {
2034 #[error("mongodb_uri is required")]
2036 MissingMongoUri,
2037
2038 #[error("database is required")]
2040 MissingDatabase,
2041
2042 #[error("Invalid batch_size: {value} ({reason})")]
2044 InvalidBatchSize { value: usize, reason: &'static str },
2045
2046 #[error("Invalid batch_timeout: {reason}")]
2048 InvalidBatchTimeout { reason: &'static str },
2049
2050 #[error("retry_delay ({retry_delay:?}) exceeds max_retry_delay ({max_retry_delay:?})")]
2052 RetryDelayExceedsMax {
2053 retry_delay: Duration,
2054 max_retry_delay: Duration,
2055 },
2056
2057 #[error("Invalid channel_buffer_size: {value} ({reason})")]
2059 InvalidChannelBufferSize { value: usize, reason: &'static str },
2060
2061 #[error("Invalid distributed lock config: {reason}")]
2063 InvalidLockConfig { reason: String },
2064}
2065
2066#[derive(Debug, thiserror::Error)]
2068pub enum PipelineError {
2069 #[error("Pipeline is already running")]
2071 AlreadyRunning,
2072
2073 #[error("MongoDB error: {0}")]
2075 MongoDB(String),
2076
2077 #[error("Change stream error: {0}")]
2079 ChangeStream(String),
2080
2081 #[error("Destination error: {0}")]
2083 Destination(String),
2084
2085 #[error("State store error: {0}")]
2087 StateStore(String),
2088
2089 #[error("Configuration error: {0}")]
2091 Configuration(String),
2092
2093 #[error("Pipeline error: {0}")]
2095 Other(String),
2096
2097 #[error("Lock lost for collection '{collection}': {reason}")]
2099 LockLost { collection: String, reason: String },
2100}
2101
2102#[cfg(test)]
2103mod tests {
2104 use super::*;
2105
2106 #[test]
2107 fn test_watch_collections_builds_collection_level() {
2108 let config = PipelineConfig::builder()
2109 .mongodb_uri("mongodb://localhost:27017")
2110 .database("testdb")
2111 .watch_collections(vec!["users".to_string(), "orders".to_string()])
2112 .build()
2113 .unwrap();
2114
2115 assert!(matches!(config.watch_level, WatchLevel::Collection(_)));
2116 if let WatchLevel::Collection(collections) = config.watch_level {
2117 assert_eq!(collections.len(), 2);
2118 assert!(collections.contains(&"users".to_string()));
2119 assert!(collections.contains(&"orders".to_string()));
2120 }
2121 }
2122
2123 #[test]
2124 fn test_watch_database_builds_database_level() {
2125 let config = PipelineConfig::builder()
2126 .mongodb_uri("mongodb://localhost:27017")
2127 .database("testdb")
2128 .watch_database()
2129 .build()
2130 .unwrap();
2131
2132 assert!(matches!(config.watch_level, WatchLevel::Database));
2133 }
2134
2135 #[test]
2136 fn test_watch_deployment_builds_deployment_level() {
2137 let config = PipelineConfig::builder()
2138 .mongodb_uri("mongodb://localhost:27017")
2139 .database("testdb")
2140 .watch_deployment()
2141 .build()
2142 .unwrap();
2143
2144 assert!(matches!(config.watch_level, WatchLevel::Deployment));
2145 }
2146
2147 #[test]
2148 fn test_default_watch_level_is_database() {
2149 let config = PipelineConfig::builder()
2150 .mongodb_uri("mongodb://localhost:27017")
2151 .database("testdb")
2152 .build()
2153 .unwrap();
2154
2155 assert!(matches!(config.watch_level, WatchLevel::Database));
2156 }
2157
2158 #[test]
2159 #[allow(deprecated)]
2160 fn test_deprecated_collections_method_still_works() {
2161 let config = PipelineConfig::builder()
2162 .mongodb_uri("mongodb://localhost:27017")
2163 .database("testdb")
2164 .collections(vec!["users".to_string()])
2165 .build()
2166 .unwrap();
2167
2168 assert!(matches!(config.watch_level, WatchLevel::Collection(_)));
2169 if let WatchLevel::Collection(collections) = config.watch_level {
2170 assert_eq!(collections.len(), 1);
2171 assert_eq!(collections[0], "users");
2172 }
2173 }
2174
2175 #[test]
2176 fn test_watch_level_can_be_overridden() {
2177 let config = PipelineConfig::builder()
2179 .mongodb_uri("mongodb://localhost:27017")
2180 .database("testdb")
2181 .watch_collections(vec!["users".to_string()])
2182 .watch_database() .build()
2184 .unwrap();
2185
2186 assert!(matches!(config.watch_level, WatchLevel::Database));
2187 }
2188
2189 #[test]
2190 fn test_config_builder_defaults() {
2191 let config = PipelineConfig::builder()
2192 .mongodb_uri("mongodb://localhost:27017")
2193 .database("testdb")
2194 .build()
2195 .unwrap();
2196
2197 assert_eq!(config.batch_size, 100);
2199 assert_eq!(config.batch_timeout, Duration::from_secs(5));
2200 assert_eq!(config.max_retries, 0);
2201 assert_eq!(config.retry_delay, Duration::from_millis(100));
2202 assert_eq!(config.max_retry_delay, Duration::from_secs(30));
2203 assert_eq!(config.channel_buffer_size, 1000);
2204 assert!(matches!(config.watch_level, WatchLevel::Database));
2205 }
2206
2207 #[test]
2208 fn test_config_builder_validates_batch_size() {
2209 let result = PipelineConfig::builder()
2210 .mongodb_uri("mongodb://localhost:27017")
2211 .database("testdb")
2212 .batch_size(20_000) .build();
2214
2215 assert!(result.is_err());
2216 if let Err(e) = result {
2217 assert!(matches!(e, ConfigError::InvalidBatchSize { .. }));
2218 }
2219 }
2220
2221 #[test]
2222 fn test_config_builder_validates_channel_buffer_size() {
2223 let result = PipelineConfig::builder()
2224 .mongodb_uri("mongodb://localhost:27017")
2225 .database("testdb")
2226 .channel_buffer_size(5) .build();
2228
2229 assert!(result.is_err());
2230 if let Err(e) = result {
2231 assert!(matches!(e, ConfigError::InvalidChannelBufferSize { .. }));
2232 }
2233 }
2234
2235 #[test]
2236 fn test_config_builder_requires_mongodb_uri() {
2237 let result = PipelineConfig::builder().database("testdb").build();
2238
2239 assert!(result.is_err());
2240 if let Err(e) = result {
2241 assert!(matches!(e, ConfigError::MissingMongoUri));
2242 }
2243 }
2244
2245 #[test]
2246 fn test_config_builder_requires_database() {
2247 let result = PipelineConfig::builder()
2248 .mongodb_uri("mongodb://localhost:27017")
2249 .build();
2250
2251 assert!(result.is_err());
2252 if let Err(e) = result {
2253 assert!(matches!(e, ConfigError::MissingDatabase));
2254 }
2255 }
2256
2257 #[test]
2258 fn test_retry_delay_validation() {
2259 let result = PipelineConfig::builder()
2260 .mongodb_uri("mongodb://localhost:27017")
2261 .database("testdb")
2262 .retry_delay(Duration::from_secs(60))
2263 .max_retry_delay(Duration::from_secs(30)) .build();
2265
2266 assert!(result.is_err());
2267 if let Err(e) = result {
2268 assert!(matches!(e, ConfigError::RetryDelayExceedsMax { .. }));
2269 }
2270 }
2271}