rigatoni_core/stream.rs
1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! `MongoDB` Change Stream Wrapper
18//!
19//! This module provides an ergonomic async wrapper around `MongoDB` change streams with:
20//! - Automatic resume token management
21//! - Reconnection with exponential backoff
22//! - Graceful error handling
23//! - Pipeline filtering support
24//! - Pre/post-image configuration
25//!
26//! # Architecture
27//!
28//! The [`ChangeStreamListener`] implements the [`Stream`] trait from `futures`,
29//! making it composable with other async stream utilities. It wraps MongoDB's
30//! native change stream and adds production-ready reliability features.
31//!
32//! ## Resume Token Flow
33//!
34//! ```text
35//! ┌─────────────────┐
36//! │ MongoDB Cluster │
37//! └────────┬────────┘
38//! │ Change Events
39//! ▼
40//! ┌──────────────────────┐
41//! │ ChangeStreamListener │◄─── Resume Token Callback
42//! └──────────────────────┘ (persists to storage)
43//! │
44//! │ ChangeEvent
45//! ▼
46//! ┌─────────────────┐
47//! │ Pipeline Stage │
48//! └─────────────────┘
49//! ```
50//!
51//! ## Reconnection Algorithm
52//!
53//! When a connection error occurs:
54//! 1. Load the last persisted resume token
55//! 2. Attempt reconnection with exponential backoff: 100ms, 200ms, 400ms, ...
56//! 3. Max backoff capped at `max_backoff_ms`
57//! 4. Fail after `max_reconnect_attempts` attempts
58//!
59//! # Examples
60//!
61//! ## Basic Usage
62//!
63//! ```rust,no_run
64//! use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
65//! use mongodb::{Client, options::ClientOptions};
66//! use futures::StreamExt;
67//!
68//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
69//! // Connect to MongoDB
70//! let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
71//! let client = Client::with_options(client_options)?;
72//! let collection = client.database("mydb").collection("users");
73//!
74//! // Configure change stream
75//! let config = ChangeStreamConfig::builder()
76//! .full_document_before_change() // Enable pre-images
77//! .full_document_update_lookup() // Include full document on updates
78//! .max_reconnect_attempts(10)
79//! .build()?;
80//!
81//! // Create listener with resume token callback
82//! let mut listener = ChangeStreamListener::new(
83//! collection,
84//! config,
85//! |token| {
86//! // Persist resume token to storage
87//! println!("Saving resume token: {:?}", token);
88//! Box::pin(async { Ok(()) })
89//! },
90//! ).await?;
91//!
92//! // Consume events
93//! while let Some(result) = listener.next().await {
94//! match result {
95//! Ok(ackable) => {
96//! let event = ackable.event_ref();
97//! println!("Received: {:?}", event.operation);
98//! ackable.ack(); // Acknowledge after processing
99//! }
100//! Err(e) => {
101//! eprintln!("Stream error: {}", e);
102//! break;
103//! }
104//! }
105//! }
106//! # Ok(())
107//! # }
108//! ```
109//!
110//! ## With Pipeline Filtering
111//!
112//! ```rust,no_run
113//! use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
114//! use bson::doc;
115//! use futures::StreamExt;
116//! # use mongodb::{Client, options::ClientOptions};
117//!
118//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
119//! # let client = Client::with_options(ClientOptions::parse("mongodb://localhost:27017").await?)?;
120//! # let collection = client.database("mydb").collection("users");
121//!
122//! // Only watch insert and update operations
123//! let pipeline = vec![
124//! doc! {
125//! "$match": {
126//! "operationType": { "$in": ["insert", "update"] }
127//! }
128//! }
129//! ];
130//!
131//! let config = ChangeStreamConfig::builder()
132//! .pipeline(pipeline)
133//! .build()?;
134//!
135//! let mut listener = ChangeStreamListener::new(
136//! collection,
137//! config,
138//! |_| Box::pin(async { Ok(()) }),
139//! ).await?;
140//!
141//! while let Some(result) = listener.next().await {
142//! let ackable = result?;
143//! ackable.ack();
144//! // Only insert/update events will be received
145//! }
146//! # Ok(())
147//! # }
148//! ```
149//!
150//! ## Multi-Collection Scenario
151//!
152//! ```rust,no_run
153//! use rigatoni_core::stream::ChangeStreamListener;
154//! use futures::StreamExt;
155//! use tokio::task::JoinSet;
156//! # use mongodb::{Client, options::ClientOptions};
157//! # use rigatoni_core::stream::ChangeStreamConfig;
158//!
159//! # async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
160//! # let client = Client::with_options(ClientOptions::parse("mongodb://localhost:27017").await?)?;
161//! let db = client.database("mydb");
162//!
163//! let mut tasks = JoinSet::new();
164//!
165//! // Watch multiple collections concurrently
166//! for collection_name in &["users", "orders", "products"] {
167//! let collection = db.collection(collection_name);
168//! let config = ChangeStreamConfig::default();
169//!
170//! tasks.spawn(async move {
171//! let mut listener = ChangeStreamListener::new(
172//! collection,
173//! config,
174//! |_| Box::pin(async { Ok(()) }),
175//! ).await?;
176//!
177//! while let Some(result) = listener.next().await {
178//! let ackable = result?;
179//! println!("{}: {:?}", collection_name, ackable.event_ref().operation);
180//! ackable.ack();
181//! }
182//!
183//! Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
184//! });
185//! }
186//!
187//! // Wait for all streams
188//! while let Some(result) = tasks.join_next().await {
189//! result??;
190//! }
191//! # Ok(())
192//! # }
193//! ```
194
195use crate::event::{ChangeEvent, ConversionError};
196use bson::Document;
197use futures::Stream;
198use mongodb::{
199 change_stream::event::{ChangeStreamEvent, ResumeToken},
200 error::{Error as MongoError, ErrorKind as MongoErrorKind},
201 options::ChangeStreamOptions,
202 Collection,
203};
204use std::{
205 future::Future,
206 pin::Pin,
207 sync::{Arc, Mutex},
208 task::{Context, Poll},
209 time::Duration,
210};
211use thiserror::Error;
212use tokio::sync::mpsc;
213use tracing::{debug, error, info, warn};
214
215/// Errors that can occur during change stream operations.
216#[derive(Debug, Error)]
217pub enum StreamError {
218 /// MongoDB connection error (may be retryable)
219 #[error("Connection error: {message}")]
220 Connection {
221 message: String,
222 #[source]
223 source: Option<Box<dyn std::error::Error + Send + Sync>>,
224 /// MongoDB error code
225 code: Option<i32>,
226 /// MongoDB error labels (e.g., "RetryableWriteError")
227 labels: Vec<String>,
228 },
229
230 /// Failed to convert MongoDB event to ChangeEvent
231 #[error("Event conversion failed: {0}")]
232 Conversion(#[from] ConversionError),
233
234 /// Resume token persistence failed
235 #[error("Resume token persistence failed: {0}")]
236 ResumeTokenPersistence(String),
237
238 /// Stream was invalidated (collection dropped/renamed)
239 #[error("Stream invalidated: {reason}")]
240 Invalidated { reason: String },
241
242 /// Maximum reconnection attempts exceeded
243 #[error("Max reconnection attempts ({0}) exceeded")]
244 MaxReconnectAttemptsExceeded(u32),
245
246 /// Resume token is invalid or oplog truncated (error code 286)
247 #[error("Invalid resume token (code {code}): oplog may be truncated")]
248 InvalidResumeToken { code: i32 },
249
250 /// Configuration error
251 #[error("Configuration error: {0}")]
252 Configuration(String),
253}
254
255impl From<MongoError> for StreamError {
256 fn from(err: MongoError) -> Self {
257 Self::from_mongo_error(err)
258 }
259}
260
261impl StreamError {
262 /// Creates a StreamError from a MongoDB error with proper classification.
263 ///
264 /// This method checks error codes and labels for accurate categorization.
265 pub fn from_mongo_error(err: MongoError) -> Self {
266 // Extract error code
267 let code = match err.kind.as_ref() {
268 MongoErrorKind::Command(cmd_err) => Some(cmd_err.code),
269 _ => None,
270 };
271
272 // Check for invalid resume token (code 286: ChangeStreamFatalError)
273 if code == Some(286) {
274 return Self::InvalidResumeToken { code: 286 };
275 }
276
277 // Extract labels from the error
278 // Note: In MongoDB Rust driver, labels() method returns &HashSet<String>
279 let labels: Vec<String> = err.labels().iter().cloned().collect();
280
281 Self::Connection {
282 message: err.to_string(),
283 source: Some(Box::new(err)),
284 code,
285 labels,
286 }
287 }
288
289 /// Returns true if this error is retryable with reconnection.
290 ///
291 /// Uses MongoDB error codes and labels for accurate classification:
292 /// - Error labels: RetryableWriteError, TransientTransactionError, NetworkError
293 /// - Transient error codes: 6, 7, 43, 89, 91, 10107, 11600, 11602, 13435, 13436
294 ///
295 /// Non-retryable errors:
296 /// - Invalid resume token (286)
297 /// - Stream invalidated (collection dropped)
298 /// - Authentication/authorization errors
299 #[must_use]
300 pub fn is_retryable(&self) -> bool {
301 match self {
302 Self::Connection { code, labels, .. } => {
303 // Check error labels first (most reliable)
304 if labels.iter().any(|l| {
305 l == "RetryableWriteError"
306 || l == "TransientTransactionError"
307 || l == "NetworkError"
308 }) {
309 return true;
310 }
311
312 // Check specific transient error codes
313 if let Some(c) = code {
314 matches!(
315 c,
316 // Network errors
317 6 | // HostUnreachable
318 7 | // HostNotFound
319 89 | // NetworkTimeout
320 91 | // ShutdownInProgress
321 // Replication errors (transient during failover)
322 10107 | // NotMaster / NotPrimary
323 11600 | // InterruptedAtShutdown
324 11602 | // InterruptedDueToReplStateChange
325 13435 | // NotMasterNoSlaveOk
326 13436 | // NotMasterOrSecondary / NotPrimaryOrSecondary
327 // Cursor errors (can retry with resume token)
328 43 // CursorNotFound
329 )
330 } else {
331 // No code available, be conservative
332 false
333 }
334 }
335 Self::Invalidated { .. } => false,
336 Self::InvalidResumeToken { .. } => false,
337 Self::MaxReconnectAttemptsExceeded(_) => false,
338 Self::Conversion(_) => false,
339 Self::ResumeTokenPersistence(_) => false,
340 Self::Configuration(_) => false,
341 }
342 }
343
344 /// Returns the error category for metrics/logging.
345 #[must_use]
346 pub fn category(&self) -> &'static str {
347 match self {
348 Self::Connection { .. } => "connection",
349 Self::Conversion(_) => "conversion",
350 Self::ResumeTokenPersistence(_) => "persistence",
351 Self::Invalidated { .. } => "invalidated",
352 Self::MaxReconnectAttemptsExceeded(_) => "max_retries",
353 Self::InvalidResumeToken { .. } => "invalid_token",
354 Self::Configuration(_) => "configuration",
355 }
356 }
357}
358
359/// Event with acknowledgment capability for correct resume token semantics.
360///
361/// This type ensures that resume tokens are persisted AFTER the user successfully
362/// processes the event, providing at-least-once delivery semantics.
363///
364/// # Important
365///
366/// You MUST call `.ack()` after successfully processing the event. If you don't
367/// call `.ack()`, the resume token will not be persisted, and the stream cannot
368/// reliably resume after a crash.
369///
370/// # Examples
371///
372/// ```rust,no_run
373/// use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
374/// use futures::StreamExt;
375/// # use mongodb::{Client, options::ClientOptions};
376///
377/// # async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
378/// # let client = Client::with_options(ClientOptions::parse("mongodb://localhost:27017").await?)?;
379/// # let collection = client.database("mydb").collection("test");
380/// let mut listener = ChangeStreamListener::new(
381/// collection,
382/// ChangeStreamConfig::default(),
383/// |_| Box::pin(async { Ok(()) }),
384/// ).await?;
385///
386/// while let Some(ackable) = listener.next().await {
387/// let ackable = ackable?;
388///
389/// // Access the event
390/// let event = ackable.event_ref();
391/// println!("Processing: {:?}", event.operation);
392///
393/// // Process the event
394/// process_event(event).await?;
395///
396/// // IMPORTANT: Acknowledge after successful processing
397/// ackable.ack();
398/// }
399/// # Ok(())
400/// # }
401/// # async fn process_event(event: &rigatoni_core::event::ChangeEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { Ok(()) }
402/// ```
403#[derive(Debug, Clone)]
404pub struct AckableEvent {
405 /// The change event
406 pub event: ChangeEvent,
407
408 /// Resume token for this event
409 resume_token: Document,
410
411 /// Channel to send ack
412 ack_sender: mpsc::UnboundedSender<Document>,
413
414 /// Shared reference to last resume token (updated on ack)
415 last_resume_token: Arc<Mutex<Option<Document>>>,
416}
417
418impl AckableEvent {
419 /// Acknowledge processing of this event, persisting the resume token.
420 ///
421 /// Call this AFTER successfully processing the event to ensure at-least-once semantics.
422 /// The token will be sent to a background persistence task asynchronously.
423 ///
424 /// This method also updates the internal last_resume_token used for reconnection,
425 /// ensuring that reconnection only resumes from successfully processed events.
426 ///
427 /// If the persistence task has stopped, this will log a warning but not fail.
428 pub fn ack(self) {
429 // Update last resume token for reconnection (only after user confirms processing)
430 if let Ok(mut last_token) = self.last_resume_token.lock() {
431 *last_token = Some(self.resume_token.clone());
432 }
433
434 // Send token for persistence
435 if self.ack_sender.send(self.resume_token).is_err() {
436 warn!("Failed to send ack - persistence task may have stopped");
437 }
438 }
439
440 /// Get a reference to the change event without consuming the `AckableEvent`.
441 ///
442 /// Use this to access the event data while keeping the ability to call `.ack()` later.
443 #[must_use]
444 pub fn event_ref(&self) -> &ChangeEvent {
445 &self.event
446 }
447
448 /// Consume the `AckableEvent` and return the inner `ChangeEvent`.
449 ///
450 /// # Warning
451 ///
452 /// After calling this, you cannot call `.ack()`. Only use this if you don't
453 /// want to persist the resume token.
454 #[must_use]
455 pub fn into_event(self) -> ChangeEvent {
456 self.event
457 }
458}
459
460/// Configuration for change stream behavior.
461///
462/// Use [`ChangeStreamConfigBuilder`] to construct instances:
463///
464/// ```rust
465/// use rigatoni_core::stream::ChangeStreamConfig;
466///
467/// let config = ChangeStreamConfig::builder()
468/// .full_document_update_lookup()
469/// .max_reconnect_attempts(10)
470/// .build();
471/// ```
472#[derive(Debug, Clone)]
473pub struct ChangeStreamConfig {
474 /// MongoDB aggregation pipeline to filter events
475 pub pipeline: Vec<Document>,
476
477 /// Whether to include full document for update operations
478 pub full_document_on_update: bool,
479
480 /// Whether to include document before change (requires MongoDB 6.0+)
481 pub full_document_before_change: bool,
482
483 /// Initial backoff in milliseconds
484 pub initial_backoff_ms: u64,
485
486 /// Maximum backoff in milliseconds
487 pub max_backoff_ms: u64,
488
489 /// Maximum number of reconnection attempts (0 = infinite)
490 pub max_reconnect_attempts: u32,
491
492 /// Batch size for fetching events
493 pub batch_size: Option<u32>,
494
495 /// Resume token to start from (if any)
496 pub resume_token: Option<Document>,
497
498 /// Backoff jitter factor (0.0 to 1.0)
499 /// Default: 0.1 (10% jitter)
500 pub backoff_jitter: f64,
501}
502
503impl Default for ChangeStreamConfig {
504 fn default() -> Self {
505 Self {
506 pipeline: Vec::new(),
507 full_document_on_update: false,
508 full_document_before_change: false,
509 initial_backoff_ms: 100,
510 max_backoff_ms: 30_000, // 30 seconds
511 max_reconnect_attempts: 5,
512 batch_size: None,
513 resume_token: None,
514 backoff_jitter: 0.1, // 10% jitter to prevent thundering herd
515 }
516 }
517}
518
519impl ChangeStreamConfig {
520 /// Creates a new builder for configuring a change stream.
521 #[must_use]
522 pub fn builder() -> ChangeStreamConfigBuilder {
523 ChangeStreamConfigBuilder::default()
524 }
525
526 /// Validates the configuration.
527 ///
528 /// Returns an error if:
529 /// - `initial_backoff_ms` is 0
530 /// - `initial_backoff_ms` > `max_backoff_ms`
531 /// - `backoff_jitter` is not in range [0.0, 1.0]
532 pub fn validate(&self) -> Result<(), StreamError> {
533 if self.initial_backoff_ms == 0 {
534 return Err(StreamError::Configuration(
535 "initial_backoff_ms must be greater than 0".to_string(),
536 ));
537 }
538
539 if self.initial_backoff_ms > self.max_backoff_ms {
540 return Err(StreamError::Configuration(format!(
541 "initial_backoff_ms ({}) must be <= max_backoff_ms ({})",
542 self.initial_backoff_ms, self.max_backoff_ms
543 )));
544 }
545
546 if !(0.0..=1.0).contains(&self.backoff_jitter) {
547 return Err(StreamError::Configuration(format!(
548 "backoff_jitter ({}) must be between 0.0 and 1.0",
549 self.backoff_jitter
550 )));
551 }
552
553 Ok(())
554 }
555
556 /// Calculates backoff duration with jitter for the given attempt.
557 ///
558 /// Uses exponential backoff: `initial_ms * 2^(attempt-1)`, capped at `max_backoff_ms`.
559 /// Adds random jitter to prevent thundering herd: ±(base * jitter_factor / 2).
560 fn calculate_backoff(&self, attempt: u32) -> Duration {
561 // Exponential backoff: initial * 2^(attempt-1)
562 let base_ms = self
563 .initial_backoff_ms
564 .saturating_mul(1_u64 << attempt.saturating_sub(1))
565 .min(self.max_backoff_ms);
566
567 // Add jitter: random value in range [base * (1 - jitter/2), base * (1 + jitter/2)]
568 if self.backoff_jitter > 0.0 {
569 let jitter_range = (base_ms as f64) * self.backoff_jitter;
570 let jitter = (rand::random::<f64>() * jitter_range) - (jitter_range / 2.0);
571 let final_ms = ((base_ms as f64) + jitter).max(0.0) as u64;
572 Duration::from_millis(final_ms)
573 } else {
574 Duration::from_millis(base_ms)
575 }
576 }
577
578 /// Converts this config to MongoDB's `ChangeStreamOptions`.
579 fn to_mongo_options(&self) -> ChangeStreamOptions {
580 let mut options = ChangeStreamOptions::default();
581
582 // Set full document options
583 if self.full_document_on_update {
584 options.full_document = Some(mongodb::options::FullDocumentType::UpdateLookup);
585 }
586
587 if self.full_document_before_change {
588 options.full_document_before_change =
589 Some(mongodb::options::FullDocumentBeforeChangeType::WhenAvailable);
590 }
591
592 // Set batch size
593 options.batch_size = self.batch_size;
594
595 // Set resume token if provided
596 // Note: We store resume tokens as Document for persistence,
597 // but MongoDB expects ResumeToken. We'll deserialize from BSON bytes.
598 if let Some(ref token_doc) = self.resume_token {
599 // Serialize Document to bytes, then deserialize as ResumeToken
600 if let Ok(bytes) = bson::to_vec(token_doc) {
601 if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&bytes) {
602 options.resume_after = Some(resume_token);
603 }
604 }
605 }
606
607 options
608 }
609}
610
611/// Builder for [`ChangeStreamConfig`].
612#[derive(Debug, Default)]
613pub struct ChangeStreamConfigBuilder {
614 pipeline: Vec<Document>,
615 full_document_on_update: bool,
616 full_document_before_change: bool,
617 initial_backoff_ms: Option<u64>,
618 max_backoff_ms: Option<u64>,
619 max_reconnect_attempts: Option<u32>,
620 batch_size: Option<u32>,
621 resume_token: Option<Document>,
622 backoff_jitter: Option<f64>,
623}
624
625impl ChangeStreamConfigBuilder {
626 /// Sets the aggregation pipeline for filtering events.
627 ///
628 /// # Example
629 ///
630 /// ```rust
631 /// use rigatoni_core::stream::ChangeStreamConfig;
632 /// use bson::doc;
633 ///
634 /// let config = ChangeStreamConfig::builder()
635 /// .pipeline(vec![
636 /// doc! { "$match": { "operationType": "insert" } }
637 /// ])
638 /// .build();
639 /// ```
640 #[must_use]
641 pub fn pipeline(mut self, pipeline: Vec<Document>) -> Self {
642 self.pipeline = pipeline;
643 self
644 }
645
646 /// Enables full document retrieval for update operations.
647 ///
648 /// When enabled, MongoDB will include the entire updated document
649 /// in the change event (requires additional lookup).
650 #[must_use]
651 pub fn full_document_update_lookup(mut self) -> Self {
652 self.full_document_on_update = true;
653 self
654 }
655
656 /// Enables pre-image for change events (requires MongoDB 6.0+).
657 ///
658 /// When enabled, the document state before the change will be included.
659 /// Requires pre-image collection to be configured on the collection.
660 #[must_use]
661 pub fn full_document_before_change(mut self) -> Self {
662 self.full_document_before_change = true;
663 self
664 }
665
666 /// Sets the initial backoff duration in milliseconds.
667 ///
668 /// Default: 100ms
669 #[must_use]
670 pub fn initial_backoff_ms(mut self, ms: u64) -> Self {
671 self.initial_backoff_ms = Some(ms);
672 self
673 }
674
675 /// Sets the maximum backoff duration in milliseconds.
676 ///
677 /// Default: 30,000ms (30 seconds)
678 #[must_use]
679 pub fn max_backoff_ms(mut self, ms: u64) -> Self {
680 self.max_backoff_ms = Some(ms);
681 self
682 }
683
684 /// Sets the maximum number of reconnection attempts.
685 ///
686 /// Set to 0 for infinite retries (use with caution).
687 /// Default: 5
688 #[must_use]
689 pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
690 self.max_reconnect_attempts = Some(attempts);
691 self
692 }
693
694 /// Sets the batch size for fetching events from MongoDB.
695 ///
696 /// Larger batches can improve throughput but increase memory usage.
697 #[must_use]
698 pub fn batch_size(mut self, size: u32) -> Self {
699 self.batch_size = Some(size);
700 self
701 }
702
703 /// Sets a resume token to start streaming from.
704 ///
705 /// If provided, the stream will resume from this point instead
706 /// of starting from the current time.
707 #[must_use]
708 pub fn resume_token(mut self, token: Document) -> Self {
709 self.resume_token = Some(token);
710 self
711 }
712
713 /// Sets the backoff jitter factor (0.0 to 1.0).
714 ///
715 /// Jitter adds randomness to backoff delays to prevent thundering herd.
716 /// A value of 0.1 means ±10% randomness.
717 ///
718 /// Default: 0.1
719 #[must_use]
720 pub fn backoff_jitter(mut self, jitter: f64) -> Self {
721 self.backoff_jitter = Some(jitter);
722 self
723 }
724
725 /// Builds the configuration.
726 ///
727 /// # Errors
728 ///
729 /// Returns `StreamError::Configuration` if validation fails:
730 /// - `initial_backoff_ms` must be > 0
731 /// - `initial_backoff_ms` must be <= `max_backoff_ms`
732 /// - `backoff_jitter` must be between 0.0 and 1.0
733 pub fn build(self) -> Result<ChangeStreamConfig, StreamError> {
734 let config = ChangeStreamConfig {
735 pipeline: self.pipeline,
736 full_document_on_update: self.full_document_on_update,
737 full_document_before_change: self.full_document_before_change,
738 initial_backoff_ms: self.initial_backoff_ms.unwrap_or(100),
739 max_backoff_ms: self.max_backoff_ms.unwrap_or(30_000),
740 max_reconnect_attempts: self.max_reconnect_attempts.unwrap_or(5),
741 batch_size: self.batch_size,
742 resume_token: self.resume_token,
743 backoff_jitter: self.backoff_jitter.unwrap_or(0.1),
744 };
745
746 config.validate()?;
747 Ok(config)
748 }
749}
750
751/// Callback for persisting resume tokens.
752///
753/// This callback is invoked after each successfully processed event.
754/// It should persist the token to durable storage (database, file, etc.)
755/// to enable resuming after crashes or restarts.
756///
757/// # Returns
758///
759/// - `Ok(())` if token was successfully persisted
760/// - `Err(String)` if persistence failed (will propagate as StreamError)
761///
762/// # Example
763///
764/// ```rust
765/// use bson::Document;
766/// use std::future::Future;
767/// use std::pin::Pin;
768///
769/// fn save_to_redis(
770/// token: Document,
771/// ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
772/// Box::pin(async move {
773/// // Save to Redis
774/// // redis_client.set("resume_token", token).await
775/// // .map_err(|e| e.to_string())?;
776/// Ok(())
777/// })
778/// }
779/// ```
780pub type ResumeTokenCallback =
781 Box<dyn Fn(Document) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;
782
783/// Type alias for the reconnection future to reduce complexity
784type ReconnectFuture = Pin<
785 Box<
786 dyn Future<
787 Output = Result<
788 mongodb::change_stream::ChangeStream<ChangeStreamEvent<Document>>,
789 StreamError,
790 >,
791 > + Send,
792 >,
793>;
794
795/// State of the change stream listener.
796enum StreamState {
797 /// Stream is active and consuming events
798 Active,
799 /// Stream encountered an error and is reconnecting
800 Reconnecting(ReconnectFuture),
801 /// Stream is closed (terminal state)
802 Closed,
803}
804
805/// A MongoDB change stream listener with automatic reconnection.
806///
807/// Implements the `Stream` trait, yielding `Result<ChangeEvent, StreamError>`.
808///
809/// # Lifecycle
810///
811/// 1. **Active**: Consuming events normally
812/// 2. **Reconnecting**: Connection lost, attempting to reconnect with exponential backoff
813/// 3. **Closed**: Terminal state after fatal error or explicit close
814///
815/// # Thread Safety
816///
817/// `ChangeStreamListener` is `Send` but not `Sync`. Each listener should be
818/// owned by a single task. For multi-collection scenarios, spawn separate
819/// tasks with separate listeners.
820///
821/// # Resume Token Semantics
822///
823/// Resume tokens are only updated when the user calls `.ack()` on an `AckableEvent`.
824/// This ensures at-least-once delivery semantics: if the stream crashes before
825/// the user acknowledges an event, reconnection will resume from before that event.
826pub struct ChangeStreamListener {
827 /// The MongoDB collection being watched
828 collection: Collection<Document>,
829
830 /// Configuration for the change stream
831 config: ChangeStreamConfig,
832
833 /// The underlying MongoDB change stream
834 stream: Option<mongodb::change_stream::ChangeStream<ChangeStreamEvent<Document>>>,
835
836 /// Callback for persisting resume tokens
837 resume_token_callback: ResumeTokenCallback,
838
839 /// Current state of the listener
840 state: StreamState,
841
842 /// Number of reconnection attempts made
843 reconnect_attempts: u32,
844
845 /// Last successfully processed resume token (shared with AckableEvent)
846 last_resume_token: Arc<Mutex<Option<Document>>>,
847
848 /// Channel sender for acking tokens (sends to persistence task)
849 ack_sender: mpsc::UnboundedSender<Document>,
850
851 /// Background task that persists tokens
852 _persistence_task: tokio::task::JoinHandle<()>,
853}
854
855impl ChangeStreamListener {
856 /// Creates a new change stream listener.
857 ///
858 /// # Arguments
859 ///
860 /// * `collection` - MongoDB collection to watch
861 /// * `config` - Change stream configuration
862 /// * `resume_token_callback` - Callback to persist resume tokens
863 ///
864 /// # Errors
865 ///
866 /// Returns `StreamError::Connection` if initial connection fails.
867 ///
868 /// # Examples
869 ///
870 /// ```rust,no_run
871 /// use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
872 /// use mongodb::{Client, options::ClientOptions};
873 ///
874 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
875 /// let client = Client::with_options(
876 /// ClientOptions::parse("mongodb://localhost:27017").await?
877 /// )?;
878 /// let collection = client.database("mydb").collection("users");
879 ///
880 /// let listener = ChangeStreamListener::new(
881 /// collection,
882 /// ChangeStreamConfig::default(),
883 /// |token| Box::pin(async move {
884 /// println!("Token: {:?}", token);
885 /// Ok(())
886 /// }),
887 /// ).await?;
888 /// # Ok(())
889 /// # }
890 /// ```
891 pub async fn new<F>(
892 collection: Collection<Document>,
893 config: ChangeStreamConfig,
894 resume_token_callback: F,
895 ) -> Result<Self, StreamError>
896 where
897 F: Fn(Document) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
898 + Send
899 + Sync
900 + 'static,
901 {
902 info!(
903 "Initializing change stream for {}.{}",
904 collection.namespace().db,
905 collection.namespace().coll
906 );
907
908 let options = config.to_mongo_options();
909 let stream = if config.pipeline.is_empty() {
910 collection.watch().with_options(options).await?
911 } else {
912 collection
913 .watch()
914 .pipeline(config.pipeline.clone())
915 .with_options(options)
916 .await?
917 };
918
919 // Create mpsc channel for token persistence
920 let (ack_sender, mut ack_receiver) = mpsc::unbounded_channel::<Document>();
921
922 // Spawn background task to persist tokens
923 let resume_token_callback_clone = Box::new(resume_token_callback);
924 let persistence_task = tokio::spawn(async move {
925 while let Some(token) = ack_receiver.recv().await {
926 if let Err(e) = resume_token_callback_clone(token).await {
927 warn!("Failed to persist resume token: {}", e);
928 }
929 }
930 debug!("Token persistence task shutting down");
931 });
932
933 Ok(Self {
934 collection,
935 config,
936 stream: Some(stream),
937 resume_token_callback: Box::new(|_| Box::pin(async { Ok(()) })),
938 state: StreamState::Active,
939 reconnect_attempts: 0,
940 last_resume_token: Arc::new(Mutex::new(None)),
941 ack_sender,
942 _persistence_task: persistence_task,
943 })
944 }
945
946 /// Async reconnection function that doesn't borrow self.
947 ///
948 /// This is called from within the reconnection future in poll_next.
949 /// It returns the new stream on success.
950 async fn reconnect_async(
951 collection: Collection<Document>,
952 config: ChangeStreamConfig,
953 last_resume_token: Option<Document>,
954 mut reconnect_attempts: u32,
955 ) -> Result<mongodb::change_stream::ChangeStream<ChangeStreamEvent<Document>>, StreamError>
956 {
957 reconnect_attempts += 1;
958
959 // Check if max attempts exceeded
960 if config.max_reconnect_attempts > 0 && reconnect_attempts > config.max_reconnect_attempts {
961 error!(
962 attempts = reconnect_attempts,
963 "Max reconnection attempts exceeded"
964 );
965 return Err(StreamError::MaxReconnectAttemptsExceeded(
966 config.max_reconnect_attempts,
967 ));
968 }
969
970 // Calculate exponential backoff with jitter
971 let backoff = config.calculate_backoff(reconnect_attempts);
972
973 warn!(
974 attempt = reconnect_attempts,
975 backoff_ms = backoff.as_millis(),
976 "Reconnecting to change stream"
977 );
978
979 // Sleep before retry
980 tokio::time::sleep(backoff).await;
981
982 // Build options with resume token if available
983 let mut options = config.to_mongo_options();
984 if let Some(ref token_doc) = last_resume_token {
985 debug!("Resuming from token: {:?}", token_doc);
986 // Serialize Document to bytes, then deserialize as ResumeToken
987 if let Ok(bytes) = bson::to_vec(token_doc) {
988 if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&bytes) {
989 options.resume_after = Some(resume_token);
990 }
991 }
992 }
993
994 // Attempt to reopen stream
995 let stream = if config.pipeline.is_empty() {
996 collection.watch().with_options(options).await?
997 } else {
998 collection
999 .watch()
1000 .pipeline(config.pipeline.clone())
1001 .with_options(options)
1002 .await?
1003 };
1004
1005 info!(
1006 attempt = reconnect_attempts,
1007 "Successfully reconnected to change stream"
1008 );
1009
1010 Ok(stream)
1011 }
1012
1013 /// Attempts to reconnect the change stream with exponential backoff.
1014 ///
1015 /// This method is called internally when a retryable error occurs.
1016 /// It will:
1017 /// 1. Calculate backoff duration
1018 /// 2. Sleep for backoff period
1019 /// 3. Attempt to reopen stream with last resume token
1020 /// 4. Reset attempts on success
1021 ///
1022 /// # Note
1023 ///
1024 /// This method is now deprecated in favor of reconnect_async.
1025 #[allow(dead_code)]
1026 async fn reconnect(&mut self) -> Result<(), StreamError> {
1027 self.reconnect_attempts += 1;
1028
1029 // Check if max attempts exceeded
1030 if self.config.max_reconnect_attempts > 0
1031 && self.reconnect_attempts > self.config.max_reconnect_attempts
1032 {
1033 error!(
1034 attempts = self.reconnect_attempts,
1035 "Max reconnection attempts exceeded"
1036 );
1037 return Err(StreamError::MaxReconnectAttemptsExceeded(
1038 self.config.max_reconnect_attempts,
1039 ));
1040 }
1041
1042 // Calculate exponential backoff: initial * 2^(attempts-1)
1043 let backoff_ms = self
1044 .config
1045 .initial_backoff_ms
1046 .saturating_mul(2_u64.saturating_pow(self.reconnect_attempts - 1))
1047 .min(self.config.max_backoff_ms);
1048
1049 warn!(
1050 attempt = self.reconnect_attempts,
1051 backoff_ms = backoff_ms,
1052 "Reconnecting to change stream"
1053 );
1054
1055 // Sleep before retry
1056 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1057
1058 // Build options with resume token if available
1059 let mut options = self.config.to_mongo_options();
1060 if let Ok(last_token) = self.last_resume_token.lock() {
1061 if let Some(ref token_doc) = *last_token {
1062 debug!("Resuming from token: {:?}", token_doc);
1063 // Serialize Document to bytes, then deserialize as ResumeToken
1064 if let Ok(bytes) = bson::to_vec(token_doc) {
1065 if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&bytes) {
1066 options.resume_after = Some(resume_token);
1067 }
1068 }
1069 }
1070 }
1071
1072 // Attempt to reopen stream
1073 let stream = if self.config.pipeline.is_empty() {
1074 self.collection.watch().with_options(options).await?
1075 } else {
1076 self.collection
1077 .watch()
1078 .pipeline(self.config.pipeline.clone())
1079 .with_options(options)
1080 .await?
1081 };
1082
1083 info!(
1084 attempt = self.reconnect_attempts,
1085 "Successfully reconnected to change stream"
1086 );
1087
1088 self.stream = Some(stream);
1089 self.state = StreamState::Active;
1090 self.reconnect_attempts = 0; // Reset on success
1091
1092 Ok(())
1093 }
1094
1095 /// Processes a MongoDB change stream event.
1096 ///
1097 /// Converts to ChangeEvent, persists resume token, and handles invalidation.
1098 ///
1099 /// # Note
1100 ///
1101 /// Currently not used in poll_next due to async/await borrowing constraints.
1102 /// The logic is inlined in poll_next instead.
1103 #[allow(dead_code)]
1104 async fn process_event(
1105 &mut self,
1106 event: ChangeStreamEvent<Document>,
1107 ) -> Result<ChangeEvent, StreamError> {
1108 // Extract resume token before conversion
1109 let resume_token = bson::to_document(&event.id)
1110 .map_err(|e| StreamError::ResumeTokenPersistence(e.to_string()))?;
1111
1112 // Convert to ChangeEvent
1113 let change_event = ChangeEvent::try_from(event)?;
1114
1115 // Check for invalidation
1116 if change_event.is_invalidate() {
1117 let reason = format!(
1118 "Collection {}.{} was dropped or renamed",
1119 self.collection.namespace().db,
1120 self.collection.namespace().coll
1121 );
1122 error!("{}", reason);
1123 self.state = StreamState::Closed;
1124 return Err(StreamError::Invalidated { reason });
1125 }
1126
1127 // Persist resume token
1128 (self.resume_token_callback)(resume_token.clone())
1129 .await
1130 .map_err(StreamError::ResumeTokenPersistence)?;
1131
1132 // Update last resume token
1133 if let Ok(mut last_token) = self.last_resume_token.lock() {
1134 *last_token = Some(resume_token);
1135 }
1136
1137 Ok(change_event)
1138 }
1139
1140 /// Closes the change stream.
1141 ///
1142 /// This is a graceful shutdown that closes the underlying MongoDB cursor.
1143 /// After calling this, the stream will return `None` on next poll.
1144 pub async fn close(&mut self) {
1145 info!("Closing change stream");
1146 self.state = StreamState::Closed;
1147 self.stream = None;
1148 }
1149}
1150
1151impl Stream for ChangeStreamListener {
1152 type Item = Result<AckableEvent, StreamError>;
1153
1154 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1155 let this = self.get_mut();
1156
1157 // If closed, return None
1158 if matches!(this.state, StreamState::Closed) {
1159 return Poll::Ready(None);
1160 }
1161
1162 // If reconnecting, poll the reconnection future
1163 if let StreamState::Reconnecting(ref mut reconnect_fut) = this.state {
1164 match reconnect_fut.as_mut().poll(cx) {
1165 Poll::Ready(Ok(new_stream)) => {
1166 // Reconnection succeeded, transition back to Active
1167 info!("Reconnection successful, resuming stream");
1168 this.stream = Some(new_stream);
1169 this.state = StreamState::Active;
1170 this.reconnect_attempts = 0;
1171 // Wake to poll the stream immediately
1172 cx.waker().wake_by_ref();
1173 return Poll::Pending;
1174 }
1175 Poll::Ready(Err(e)) => {
1176 // Reconnection failed permanently
1177 error!("Reconnection failed: {}", e);
1178 this.state = StreamState::Closed;
1179 return Poll::Ready(Some(Err(e)));
1180 }
1181 Poll::Pending => {
1182 // Still reconnecting
1183 return Poll::Pending;
1184 }
1185 }
1186 }
1187
1188 // Poll the underlying stream
1189 if let Some(ref mut stream) = this.stream {
1190 use futures::StreamExt;
1191 let mut stream_pin = Pin::new(stream);
1192 match stream_pin.poll_next_unpin(cx) {
1193 Poll::Ready(Some(Ok(event))) => {
1194 // Extract resume token
1195 let resume_token = match bson::to_document(&event.id) {
1196 Ok(token) => token,
1197 Err(e) => {
1198 return Poll::Ready(Some(Err(StreamError::ResumeTokenPersistence(
1199 e.to_string(),
1200 ))))
1201 }
1202 };
1203
1204 // Convert to ChangeEvent
1205 let change_event = match ChangeEvent::try_from(event) {
1206 Ok(evt) => evt,
1207 Err(e) => return Poll::Ready(Some(Err(StreamError::Conversion(e)))),
1208 };
1209
1210 // Check for invalidation
1211 if change_event.is_invalidate() {
1212 let reason = format!(
1213 "Collection {}.{} was dropped or renamed",
1214 this.collection.namespace().db,
1215 this.collection.namespace().coll
1216 );
1217 error!("{}", reason);
1218 this.state = StreamState::Closed;
1219 return Poll::Ready(Some(Err(StreamError::Invalidated { reason })));
1220 }
1221
1222 // Create AckableEvent - user must call ack() after processing
1223 // Note: last_resume_token is NOT updated here - only when user calls ack()
1224 // This ensures at-least-once semantics
1225 let ackable = AckableEvent {
1226 event: change_event,
1227 resume_token,
1228 ack_sender: this.ack_sender.clone(),
1229 last_resume_token: this.last_resume_token.clone(),
1230 };
1231
1232 Poll::Ready(Some(Ok(ackable)))
1233 }
1234 Poll::Ready(Some(Err(e))) => {
1235 // MongoDB error
1236 let stream_err = StreamError::from_mongo_error(e);
1237
1238 if stream_err.is_retryable() {
1239 warn!("Retryable error occurred: {}", stream_err);
1240
1241 // Create reconnection future
1242 let collection = this.collection.clone();
1243 let config = this.config.clone();
1244 let last_resume_token =
1245 this.last_resume_token.lock().ok().and_then(|t| t.clone());
1246 let reconnect_attempts = this.reconnect_attempts;
1247
1248 let reconnect_fut = Box::pin(async move {
1249 Self::reconnect_async(
1250 collection,
1251 config,
1252 last_resume_token,
1253 reconnect_attempts,
1254 )
1255 .await
1256 });
1257
1258 this.state = StreamState::Reconnecting(reconnect_fut);
1259 // Wake to attempt reconnection on next poll
1260 cx.waker().wake_by_ref();
1261 Poll::Pending
1262 } else {
1263 error!("Fatal error occurred: {}", stream_err);
1264 this.state = StreamState::Closed;
1265 Poll::Ready(Some(Err(stream_err)))
1266 }
1267 }
1268 Poll::Ready(None) => {
1269 // Stream ended (should not happen for change streams)
1270 warn!("Change stream ended unexpectedly");
1271 this.state = StreamState::Closed;
1272 Poll::Ready(None)
1273 }
1274 Poll::Pending => Poll::Pending,
1275 }
1276 } else {
1277 // No stream available
1278 this.state = StreamState::Closed;
1279 Poll::Ready(None)
1280 }
1281 }
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286 use super::*;
1287 use std::sync::Arc;
1288 use tokio::sync::Mutex;
1289
1290 #[test]
1291 fn test_default_config() {
1292 let config = ChangeStreamConfig::default();
1293 assert_eq!(config.pipeline.len(), 0);
1294 assert!(!config.full_document_on_update);
1295 assert!(!config.full_document_before_change);
1296 assert_eq!(config.initial_backoff_ms, 100);
1297 assert_eq!(config.max_backoff_ms, 30_000);
1298 assert_eq!(config.max_reconnect_attempts, 5);
1299 }
1300
1301 #[test]
1302 fn test_config_builder() {
1303 let config = ChangeStreamConfig::builder()
1304 .full_document_update_lookup()
1305 .full_document_before_change()
1306 .max_reconnect_attempts(10)
1307 .initial_backoff_ms(200)
1308 .max_backoff_ms(60_000)
1309 .batch_size(100)
1310 .build()
1311 .unwrap();
1312
1313 assert!(config.full_document_on_update);
1314 assert!(config.full_document_before_change);
1315 assert_eq!(config.max_reconnect_attempts, 10);
1316 assert_eq!(config.initial_backoff_ms, 200);
1317 assert_eq!(config.max_backoff_ms, 60_000);
1318 assert_eq!(config.batch_size, Some(100));
1319 }
1320
1321 #[test]
1322 fn test_config_with_pipeline() {
1323 use bson::doc;
1324
1325 let pipeline = vec![doc! { "$match": { "operationType": "insert" } }];
1326
1327 let config = ChangeStreamConfig::builder()
1328 .pipeline(pipeline.clone())
1329 .build()
1330 .unwrap();
1331
1332 assert_eq!(config.pipeline.len(), 1);
1333 assert_eq!(config.pipeline[0], pipeline[0]);
1334 }
1335
1336 #[test]
1337 fn test_stream_error_is_retryable() {
1338 // Connection errors with retryable labels are retryable
1339 let err = StreamError::Connection {
1340 message: "connection refused".to_string(),
1341 source: None,
1342 code: Some(6), // HostUnreachable
1343 labels: vec![],
1344 };
1345 assert!(err.is_retryable());
1346
1347 // Invalidated is not retryable
1348 let err = StreamError::Invalidated {
1349 reason: "collection dropped".to_string(),
1350 };
1351 assert!(!err.is_retryable());
1352
1353 // Max attempts exceeded is not retryable
1354 let err = StreamError::MaxReconnectAttemptsExceeded(5);
1355 assert!(!err.is_retryable());
1356 }
1357
1358 #[test]
1359 fn test_stream_error_category() {
1360 let err = StreamError::Invalidated {
1361 reason: "test".to_string(),
1362 };
1363 assert_eq!(err.category(), "invalidated");
1364
1365 let err = StreamError::MaxReconnectAttemptsExceeded(5);
1366 assert_eq!(err.category(), "max_retries");
1367
1368 let err = StreamError::Configuration("test".to_string());
1369 assert_eq!(err.category(), "configuration");
1370 }
1371
1372 #[tokio::test]
1373 async fn test_resume_token_callback() {
1374 use bson::doc;
1375
1376 let saved_token = Arc::new(Mutex::new(None));
1377 let saved_token_clone = saved_token.clone();
1378
1379 let callback = move |token: Document| {
1380 let saved = saved_token_clone.clone();
1381 Box::pin(async move {
1382 *saved.lock().await = Some(token);
1383 Ok(())
1384 }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
1385 };
1386
1387 let test_token = doc! { "_data": "test123" };
1388 callback(test_token.clone()).await.unwrap();
1389
1390 let saved = saved_token.lock().await;
1391 assert_eq!(*saved, Some(test_token));
1392 }
1393
1394 // Note: test_stream_state_transitions removed because StreamState::Reconnecting
1395 // now contains a Future which doesn't implement PartialEq
1396
1397 #[test]
1398 fn test_config_resume_token() {
1399 use bson::doc;
1400
1401 let token = doc! { "_data": "token123" };
1402 let config = ChangeStreamConfig::builder()
1403 .resume_token(token.clone())
1404 .build()
1405 .unwrap();
1406
1407 assert_eq!(config.resume_token, Some(token));
1408 }
1409
1410 // Note: Integration tests with real MongoDB would go in tests/integration_test.rs
1411 // and would use testcontainers for a real MongoDB instance.
1412}