1use crate::destination::{Destination, DestinationError};
60use crate::event::ChangeEvent;
61use crate::metrics;
62use crate::state::StateStore;
63use crate::stream::{ChangeStreamConfig, ChangeStreamListener};
64use futures::StreamExt;
65use mongodb::bson::Document;
66use std::future::Future;
67use std::pin::Pin;
68use std::sync::Arc;
69use std::time::Duration;
70use tokio::sync::{broadcast, Mutex, RwLock};
71use tokio::task::JoinHandle;
72use tokio::time::{interval, Instant};
73use tracing::{debug, error, info, instrument, warn};
74
75#[derive(Debug, Clone)]
77pub struct PipelineConfig {
78 pub mongodb_uri: String,
80
81 pub database: String,
83
84 pub collections: Vec<String>,
86
87 pub batch_size: usize,
89
90 pub batch_timeout: Duration,
92
93 pub max_retries: usize,
95
96 pub retry_delay: Duration,
98
99 pub max_retry_delay: Duration,
101
102 pub channel_buffer_size: usize,
104
105 pub stream_config: ChangeStreamConfig,
107}
108
109impl PipelineConfig {
110 #[must_use]
112 pub fn builder() -> PipelineConfigBuilder {
113 PipelineConfigBuilder::default()
114 }
115}
116
117#[derive(Debug, Default)]
119pub struct PipelineConfigBuilder {
120 mongodb_uri: Option<String>,
121 database: Option<String>,
122 collections: Vec<String>,
123 batch_size: usize,
124 batch_timeout: Duration,
125 max_retries: usize,
126 retry_delay: Duration,
127 max_retry_delay: Duration,
128 channel_buffer_size: usize,
129 stream_config: Option<ChangeStreamConfig>,
130}
131
132impl PipelineConfigBuilder {
133 #[must_use]
135 pub fn mongodb_uri(mut self, uri: impl Into<String>) -> Self {
136 self.mongodb_uri = Some(uri.into());
137 self
138 }
139
140 #[must_use]
142 pub fn database(mut self, database: impl Into<String>) -> Self {
143 self.database = Some(database.into());
144 self
145 }
146
147 #[must_use]
149 pub fn collections(mut self, collections: Vec<String>) -> Self {
150 self.collections = collections;
151 self
152 }
153
154 #[must_use]
156 pub fn batch_size(mut self, size: usize) -> Self {
157 self.batch_size = size;
158 self
159 }
160
161 #[must_use]
163 pub fn batch_timeout(mut self, timeout: Duration) -> Self {
164 self.batch_timeout = timeout;
165 self
166 }
167
168 #[must_use]
170 pub fn max_retries(mut self, retries: usize) -> Self {
171 self.max_retries = retries;
172 self
173 }
174
175 #[must_use]
177 pub fn retry_delay(mut self, delay: Duration) -> Self {
178 self.retry_delay = delay;
179 self
180 }
181
182 #[must_use]
184 pub fn max_retry_delay(mut self, delay: Duration) -> Self {
185 self.max_retry_delay = delay;
186 self
187 }
188
189 #[must_use]
191 pub fn channel_buffer_size(mut self, size: usize) -> Self {
192 self.channel_buffer_size = size;
193 self
194 }
195
196 #[must_use]
198 pub fn stream_config(mut self, config: ChangeStreamConfig) -> Self {
199 self.stream_config = Some(config);
200 self
201 }
202
203 pub fn build(self) -> Result<PipelineConfig, ConfigError> {
209 let mongodb_uri = self.mongodb_uri.ok_or(ConfigError::MissingMongoUri)?;
210 let database = self.database.ok_or(ConfigError::MissingDatabase)?;
211
212 let batch_size = match self.batch_size {
214 0 => 100, size if size > 10_000 => {
216 return Err(ConfigError::InvalidBatchSize {
217 value: size,
218 reason: "batch_size exceeds maximum (10,000)",
219 })
220 }
221 size => size,
222 };
223
224 let batch_timeout = if self.batch_timeout.is_zero() {
226 Duration::from_secs(5) } else {
228 self.batch_timeout
229 };
230
231 let retry_delay = if self.retry_delay.is_zero() {
233 Duration::from_millis(100)
234 } else {
235 self.retry_delay
236 };
237
238 let max_retry_delay = if self.max_retry_delay.is_zero() {
239 Duration::from_secs(30)
240 } else {
241 self.max_retry_delay
242 };
243
244 if retry_delay > max_retry_delay {
246 return Err(ConfigError::RetryDelayExceedsMax {
247 retry_delay,
248 max_retry_delay,
249 });
250 }
251
252 let channel_buffer_size = match self.channel_buffer_size {
254 0 => 1000, size if size < 10 => {
256 return Err(ConfigError::InvalidChannelBufferSize {
257 value: size,
258 reason: "channel_buffer_size must be at least 10",
259 })
260 }
261 size => size,
262 };
263
264 let stream_config = self.stream_config.unwrap_or_else(|| {
265 ChangeStreamConfig::builder()
266 .build()
267 .expect("Default stream config should build")
268 });
269
270 Ok(PipelineConfig {
271 mongodb_uri,
272 database,
273 collections: self.collections,
274 batch_size,
275 batch_timeout,
276 max_retries: self.max_retries,
277 retry_delay,
278 max_retry_delay,
279 channel_buffer_size,
280 stream_config,
281 })
282 }
283}
284
285#[derive(Debug, Clone, Default)]
287pub struct PipelineStats {
288 pub events_processed: u64,
290
291 pub batches_written: u64,
293
294 pub write_errors: u64,
296
297 pub retries: u64,
299}
300
301type WorkerHandle = JoinHandle<Result<(), PipelineError>>;
303
304pub struct Pipeline<S: StateStore, D: Destination> {
306 config: PipelineConfig,
308
309 store: Arc<S>,
311
312 destination: Arc<Mutex<D>>,
314
315 shutdown_tx: Option<broadcast::Sender<()>>,
317
318 workers: Arc<RwLock<Vec<WorkerHandle>>>,
320
321 stats: Arc<RwLock<PipelineStats>>,
323
324 running: Arc<RwLock<bool>>,
326}
327
328impl<S: StateStore + Send + Sync + 'static, D: Destination + Send + Sync + 'static> Pipeline<S, D> {
329 pub async fn new(
335 config: PipelineConfig,
336 store: S,
337 destination: D,
338 ) -> Result<Self, PipelineError> {
339 info!(
340 database = %config.database,
341 collections = ?config.collections,
342 batch_size = config.batch_size,
343 batch_timeout = ?config.batch_timeout,
344 "Creating pipeline"
345 );
346
347 Ok(Self {
348 config,
349 store: Arc::new(store),
350 destination: Arc::new(Mutex::new(destination)),
351 shutdown_tx: None,
352 workers: Arc::new(RwLock::new(Vec::new())),
353 stats: Arc::new(RwLock::new(PipelineStats::default())),
354 running: Arc::new(RwLock::new(false)),
355 })
356 }
357
358 #[instrument(skip(self), fields(database = %self.config.database))]
367 pub async fn start(&mut self) -> Result<(), PipelineError> {
368 let mut running = self.running.write().await;
370 if *running {
371 return Err(PipelineError::AlreadyRunning);
372 }
373
374 if self.config.collections.is_empty() {
376 return Err(PipelineError::Configuration(
377 "Watching all collections is not yet implemented. \
378 Please specify explicit collections in the configuration."
379 .to_string(),
380 ));
381 }
382
383 info!("Starting pipeline");
384
385 let (shutdown_tx, _) = broadcast::channel(1);
387 self.shutdown_tx = Some(shutdown_tx.clone());
388
389 let collections = self.config.collections.clone();
390 let num_collections = collections.len();
391
392 let mut workers = self.workers.write().await;
394 for collection in collections {
395 let shutdown_rx = shutdown_tx.subscribe();
396 let worker = self
397 .spawn_collection_worker(collection.clone(), shutdown_rx)
398 .await?;
399
400 workers.push(worker);
401 }
402
403 *running = true;
404 info!(workers = workers.len(), "Pipeline started");
405
406 metrics::set_pipeline_status(metrics::PipelineStatus::Running);
408 metrics::set_active_collections(num_collections);
409
410 Ok(())
411 }
412
413 async fn spawn_collection_worker(
415 &self,
416 collection: String,
417 shutdown_rx: broadcast::Receiver<()>,
418 ) -> Result<WorkerHandle, PipelineError> {
419 let config = self.config.clone();
420 let store = Arc::clone(&self.store);
421 let destination = Arc::clone(&self.destination);
422 let stats = Arc::clone(&self.stats);
423
424 let collection_name = if collection.is_empty() {
425 "all".to_string()
426 } else {
427 collection
428 };
429
430 let handle = tokio::spawn(async move {
431 Self::collection_worker(
432 collection_name,
433 config,
434 store,
435 destination,
436 stats,
437 shutdown_rx,
438 )
439 .await
440 });
441
442 Ok(handle)
443 }
444
445 #[allow(clippy::too_many_lines)]
447 #[instrument(skip(config, store, destination, stats, shutdown_rx), fields(collection = %collection))]
448 async fn collection_worker(
449 collection: String,
450 config: PipelineConfig,
451 store: Arc<S>,
452 destination: Arc<Mutex<D>>,
453 stats: Arc<RwLock<PipelineStats>>,
454 mut shutdown_rx: broadcast::Receiver<()>,
455 ) -> Result<(), PipelineError> {
456 info!("Starting collection worker");
457
458 let resume_token = store
460 .get_resume_token(&collection)
461 .await
462 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
463
464 if let Some(ref token) = resume_token {
465 info!(?token, "Resuming from saved token");
466 }
467
468 let client = mongodb::Client::with_uri_str(&config.mongodb_uri)
470 .await
471 .map_err(|e| PipelineError::MongoDB(e.to_string()))?;
472
473 let db = client.database(&config.database);
474
475 let mongo_collection = if !collection.is_empty() && collection != "all" {
477 db.collection(&collection)
478 } else {
479 return Err(PipelineError::Configuration(
483 "Watching all collections not yet implemented".to_string(),
484 ));
485 };
486
487 let store_clone = Arc::clone(&store);
489 let collection_clone = collection.clone();
490 let resume_token_callback = move |token: Document| {
491 let store = Arc::clone(&store_clone);
492 let coll = collection_clone.clone();
493 Box::pin(async move {
494 store
495 .save_resume_token(&coll, &token)
496 .await
497 .map_err(|e| e.to_string())
498 }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
499 };
500
501 let mut listener = ChangeStreamListener::new(
503 mongo_collection,
504 config.stream_config.clone(),
505 resume_token_callback,
506 )
507 .await
508 .map_err(|e| PipelineError::ChangeStream(e.to_string()))?;
509
510 let mut batch: Vec<ChangeEvent> = Vec::with_capacity(config.batch_size);
512 let mut last_resume_token: Option<Document> = None;
513
514 let mut batch_timer = interval(config.batch_timeout);
516 batch_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
517
518 info!(
519 batch_size = config.batch_size,
520 batch_timeout = ?config.batch_timeout,
521 "Worker event loop started"
522 );
523
524 loop {
525 tokio::select! {
526 _ = shutdown_rx.recv() => {
528 info!("Received shutdown signal");
529
530 if !batch.is_empty() {
532 info!(batch_size = batch.len(), "Flushing pending batch on shutdown");
533 if let Err(e) = Self::flush_batch(
534 &collection,
535 &mut batch,
536 last_resume_token.as_ref(),
537 &destination,
538 &store,
539 &stats,
540 &config,
541 )
542 .await
543 {
544 error!(?e, "Failed to flush batch on shutdown");
545 }
546 }
547
548 info!("Worker shutting down gracefully");
549 break;
550 }
551
552 _ = batch_timer.tick() => {
554 if !batch.is_empty() {
555 debug!(batch_size = batch.len(), "Batch timeout - flushing");
556
557 if let Err(e) = Self::flush_batch(
558 &collection,
559 &mut batch,
560 last_resume_token.as_ref(),
561 &destination,
562 &store,
563 &stats,
564 &config,
565 )
566 .await
567 {
568 error!(?e, "Failed to flush batch on timeout");
569 }
571 }
572 }
573
574 event_result = listener.next() => {
576 match event_result {
577 Some(Ok(ackable_event)) => {
578 let event = ackable_event.event.clone();
580
581 debug!(
582 operation = ?event.operation,
583 collection = %event.namespace.collection,
584 "Received event"
585 );
586
587 last_resume_token = Some(event.resume_token.clone());
589
590 ackable_event.ack();
592
593 batch.push(event.clone());
595
596 metrics::increment_batch_queue_size(&collection);
598
599 if batch.len() >= config.batch_size {
601 debug!(batch_size = batch.len(), "Batch full - flushing");
602
603 if let Err(e) = Self::flush_batch(
604 &collection,
605 &mut batch,
606 last_resume_token.as_ref(),
607 &destination,
608 &store,
609 &stats,
610 &config,
611 )
612 .await
613 {
614 error!(?e, "Failed to flush full batch");
615 }
617 }
618 }
619 Some(Err(e)) => {
620 error!(?e, "Error reading from change stream");
621 tokio::time::sleep(Duration::from_secs(1)).await;
623 }
624 None => {
625 warn!("Change stream ended unexpectedly");
627 break;
628 }
629 }
630 }
631 }
632 }
633
634 Ok(())
635 }
636
637 #[instrument(skip(batch, last_resume_token, destination, store, stats, config), fields(collection = %collection, batch_size = batch.len()))]
639 async fn flush_batch(
640 collection: &str,
641 batch: &mut Vec<ChangeEvent>,
642 last_resume_token: Option<&Document>,
643 destination: &Arc<Mutex<D>>,
644 store: &Arc<S>,
645 stats: &Arc<RwLock<PipelineStats>>,
646 config: &PipelineConfig,
647 ) -> Result<(), PipelineError> {
648 if batch.is_empty() {
649 return Ok(());
650 }
651
652 let batch_size = batch.len();
653 let start_time = Instant::now();
654
655 debug!("Flushing batch to destination");
656
657 metrics::record_batch_size(batch_size, collection);
659
660 Self::write_with_retry(batch, destination, config, stats).await?;
662
663 let elapsed = start_time.elapsed();
664 info!(
665 batch_size,
666 elapsed_ms = elapsed.as_millis(),
667 "Batch written successfully"
668 );
669
670 metrics::record_batch_duration(elapsed.as_secs_f64(), collection);
672
673 if let Some(token) = last_resume_token {
675 store
676 .save_resume_token(collection, token)
677 .await
678 .map_err(|e| PipelineError::StateStore(e.to_string()))?;
679
680 debug!("Resume token saved");
681 }
682
683 let mut operation_counts = std::collections::HashMap::new();
685 for event in batch.iter() {
686 *operation_counts.entry(&event.operation).or_insert(0u64) += 1;
687 }
688 for (operation, count) in operation_counts {
689 metrics::increment_events_processed_by(count, collection, operation.as_str());
690 }
691
692 let mut s = stats.write().await;
694 s.events_processed += batch_size as u64;
695 s.batches_written += 1;
696
697 metrics::decrement_batch_queue_size(batch_size, collection);
699
700 batch.clear();
702
703 Ok(())
704 }
705
706 #[instrument(skip(batch, destination, config, stats), fields(batch_size = batch.len()))]
708 async fn write_with_retry(
709 batch: &[ChangeEvent],
710 destination: &Arc<Mutex<D>>,
711 config: &PipelineConfig,
712 stats: &Arc<RwLock<PipelineStats>>,
713 ) -> Result<(), PipelineError> {
714 let mut retry_delay = config.retry_delay;
715 let mut attempt = 0;
716
717 loop {
718 let result = {
719 let mut dest = destination.lock().await;
720 match dest.write_batch(batch).await {
721 Ok(()) => dest.flush().await,
722 Err(e) => Err(e),
723 }
724 };
725
726 match result {
727 Ok(()) => {
728 if attempt > 0 {
729 info!(attempts = attempt + 1, "Write succeeded after retries");
730 }
731 return Ok(());
732 }
733 Err(e) => {
734 {
736 let mut s = stats.write().await;
737 s.write_errors += 1;
738 }
739
740 let error_category = Self::categorize_error(&e);
742 let destination_type = {
743 let dest = destination.lock().await;
744 dest.metadata().destination_type.clone()
745 };
746 metrics::increment_destination_errors(&destination_type, error_category);
747
748 attempt += 1;
749
750 if attempt > config.max_retries {
751 error!(attempts = attempt, ?e, "Write failed after max retries");
752 return Err(PipelineError::Destination(e.to_string()));
753 }
754
755 if !Self::is_retryable_error(&e) {
757 error!(?e, "Non-retryable error encountered");
758 return Err(PipelineError::Destination(e.to_string()));
759 }
760
761 {
763 let mut s = stats.write().await;
764 s.retries += 1;
765 }
766
767 metrics::increment_retries(error_category);
769
770 warn!(
771 attempt,
772 max_retries = config.max_retries,
773 retry_delay_ms = retry_delay.as_millis(),
774 ?e,
775 "Write failed, retrying"
776 );
777
778 tokio::time::sleep(retry_delay).await;
780
781 retry_delay = std::cmp::min(retry_delay * 2, config.max_retry_delay);
783 }
784 }
785 }
786 }
787
788 fn is_retryable_error(error: &DestinationError) -> bool {
790 error.to_string().contains("retryable") || error.to_string().contains("timeout")
793 }
794
795 fn categorize_error(error: &DestinationError) -> metrics::ErrorCategory {
799 let error_str = error.to_string().to_lowercase();
800
801 if error_str.contains("timeout") {
802 metrics::ErrorCategory::Timeout
803 } else if error_str.contains("connection") || error_str.contains("network") {
804 metrics::ErrorCategory::Connection
805 } else if error_str.contains("serialization") || error_str.contains("encode") {
806 metrics::ErrorCategory::Serialization
807 } else if error_str.contains("permission") || error_str.contains("auth") {
808 metrics::ErrorCategory::Permission
809 } else if error_str.contains("validation") {
810 metrics::ErrorCategory::Validation
811 } else if error_str.contains("not found") || error_str.contains("404") {
812 metrics::ErrorCategory::NotFound
813 } else if error_str.contains("rate limit") || error_str.contains("throttle") {
814 metrics::ErrorCategory::RateLimit
815 } else {
816 metrics::ErrorCategory::Unknown
817 }
818 }
819
820 #[instrument(skip(self))]
832 pub async fn stop(&mut self) -> Result<(), PipelineError> {
833 info!("Stopping pipeline");
834
835 let mut running = self.running.write().await;
836 if !*running {
837 warn!("Pipeline is not running");
838 return Ok(());
839 }
840
841 if let Some(tx) = self.shutdown_tx.take() {
843 let _ = tx.send(());
844 }
845
846 let mut workers = self.workers.write().await;
848 for worker in workers.drain(..) {
849 match worker.await {
850 Ok(Ok(())) => {
851 debug!("Worker stopped successfully");
852 }
853 Ok(Err(e)) => {
854 error!(?e, "Worker stopped with error");
855 }
856 Err(e) => {
857 error!(?e, "Worker panicked");
858 }
859 }
860 }
861
862 let mut dest = self.destination.lock().await;
864 dest.flush()
865 .await
866 .map_err(|e| PipelineError::Destination(e.to_string()))?;
867 dest.close()
868 .await
869 .map_err(|e| PipelineError::Destination(e.to_string()))?;
870
871 *running = false;
872
873 metrics::set_pipeline_status(metrics::PipelineStatus::Stopped);
875 metrics::set_active_collections(0);
876
877 let stats = self.stats.read().await;
879 info!(
880 events_processed = stats.events_processed,
881 batches_written = stats.batches_written,
882 write_errors = stats.write_errors,
883 retries = stats.retries,
884 "Pipeline stopped"
885 );
886
887 Ok(())
888 }
889
890 #[must_use]
892 pub async fn stats(&self) -> PipelineStats {
893 self.stats.read().await.clone()
894 }
895
896 #[must_use]
898 pub async fn is_running(&self) -> bool {
899 *self.running.read().await
900 }
901}
902
903#[derive(Debug, thiserror::Error)]
905pub enum ConfigError {
906 #[error("mongodb_uri is required")]
908 MissingMongoUri,
909
910 #[error("database is required")]
912 MissingDatabase,
913
914 #[error("Invalid batch_size: {value} ({reason})")]
916 InvalidBatchSize { value: usize, reason: &'static str },
917
918 #[error("Invalid batch_timeout: {reason}")]
920 InvalidBatchTimeout { reason: &'static str },
921
922 #[error("retry_delay ({retry_delay:?}) exceeds max_retry_delay ({max_retry_delay:?})")]
924 RetryDelayExceedsMax {
925 retry_delay: Duration,
926 max_retry_delay: Duration,
927 },
928
929 #[error("Invalid channel_buffer_size: {value} ({reason})")]
931 InvalidChannelBufferSize { value: usize, reason: &'static str },
932}
933
934#[derive(Debug, thiserror::Error)]
936pub enum PipelineError {
937 #[error("Pipeline is already running")]
939 AlreadyRunning,
940
941 #[error("MongoDB error: {0}")]
943 MongoDB(String),
944
945 #[error("Change stream error: {0}")]
947 ChangeStream(String),
948
949 #[error("Destination error: {0}")]
951 Destination(String),
952
953 #[error("State store error: {0}")]
955 StateStore(String),
956
957 #[error("Configuration error: {0}")]
959 Configuration(String),
960
961 #[error("Pipeline error: {0}")]
963 Other(String),
964}