rigatoni_core/
stream.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! `MongoDB` Change Stream Wrapper
18//!
19//! This module provides an ergonomic async wrapper around `MongoDB` change streams with:
20//! - Automatic resume token management
21//! - Reconnection with exponential backoff
22//! - Graceful error handling
23//! - Pipeline filtering support
24//! - Pre/post-image configuration
25//!
26//! # Architecture
27//!
28//! The [`ChangeStreamListener`] implements the [`Stream`] trait from `futures`,
29//! making it composable with other async stream utilities. It wraps MongoDB's
30//! native change stream and adds production-ready reliability features.
31//!
32//! ## Resume Token Flow
33//!
34//! ```text
35//! ┌─────────────────┐
36//! │ MongoDB Cluster │
37//! └────────┬────────┘
38//!          │ Change Events
39//!          ▼
40//! ┌──────────────────────┐
41//! │ ChangeStreamListener │◄─── Resume Token Callback
42//! └──────────────────────┘      (persists to storage)
43//!          │
44//!          │ ChangeEvent
45//!          ▼
46//! ┌─────────────────┐
47//! │ Pipeline Stage  │
48//! └─────────────────┘
49//! ```
50//!
51//! ## Reconnection Algorithm
52//!
53//! When a connection error occurs:
54//! 1. Load the last persisted resume token
55//! 2. Attempt reconnection with exponential backoff: 100ms, 200ms, 400ms, ...
56//! 3. Max backoff capped at `max_backoff_ms`
57//! 4. Fail after `max_reconnect_attempts` attempts
58//!
59//! # Examples
60//!
61//! ## Basic Usage
62//!
63//! ```rust,no_run
64//! use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
65//! use mongodb::{Client, options::ClientOptions};
66//! use futures::StreamExt;
67//!
68//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
69//! // Connect to MongoDB
70//! let client_options = ClientOptions::parse("mongodb://localhost:27017").await?;
71//! let client = Client::with_options(client_options)?;
72//! let collection = client.database("mydb").collection("users");
73//!
74//! // Configure change stream
75//! let config = ChangeStreamConfig::builder()
76//!     .full_document_before_change()  // Enable pre-images
77//!     .full_document_update_lookup()  // Include full document on updates
78//!     .max_reconnect_attempts(10)
79//!     .build()?;
80//!
81//! // Create listener with resume token callback
82//! let mut listener = ChangeStreamListener::new(
83//!     collection,
84//!     config,
85//!     |token| {
86//!         // Persist resume token to storage
87//!         println!("Saving resume token: {:?}", token);
88//!         Box::pin(async { Ok(()) })
89//!     },
90//! ).await?;
91//!
92//! // Consume events
93//! while let Some(result) = listener.next().await {
94//!     match result {
95//!         Ok(ackable) => {
96//!             let event = ackable.event_ref();
97//!             println!("Received: {:?}", event.operation);
98//!             ackable.ack(); // Acknowledge after processing
99//!         }
100//!         Err(e) => {
101//!             eprintln!("Stream error: {}", e);
102//!             break;
103//!         }
104//!     }
105//! }
106//! # Ok(())
107//! # }
108//! ```
109//!
110//! ## With Pipeline Filtering
111//!
112//! ```rust,no_run
113//! use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
114//! use bson::doc;
115//! use futures::StreamExt;
116//! # use mongodb::{Client, options::ClientOptions};
117//!
118//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
119//! # let client = Client::with_options(ClientOptions::parse("mongodb://localhost:27017").await?)?;
120//! # let collection = client.database("mydb").collection("users");
121//!
122//! // Only watch insert and update operations
123//! let pipeline = vec![
124//!     doc! {
125//!         "$match": {
126//!             "operationType": { "$in": ["insert", "update"] }
127//!         }
128//!     }
129//! ];
130//!
131//! let config = ChangeStreamConfig::builder()
132//!     .pipeline(pipeline)
133//!     .build()?;
134//!
135//! let mut listener = ChangeStreamListener::new(
136//!     collection,
137//!     config,
138//!     |_| Box::pin(async { Ok(()) }),
139//! ).await?;
140//!
141//! while let Some(result) = listener.next().await {
142//!     let ackable = result?;
143//!     ackable.ack();
144//!     // Only insert/update events will be received
145//! }
146//! # Ok(())
147//! # }
148//! ```
149//!
150//! ## Multi-Collection Scenario
151//!
152//! ```rust,no_run
153//! use rigatoni_core::stream::ChangeStreamListener;
154//! use futures::StreamExt;
155//! use tokio::task::JoinSet;
156//! # use mongodb::{Client, options::ClientOptions};
157//! # use rigatoni_core::stream::ChangeStreamConfig;
158//!
159//! # async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
160//! # let client = Client::with_options(ClientOptions::parse("mongodb://localhost:27017").await?)?;
161//! let db = client.database("mydb");
162//!
163//! let mut tasks = JoinSet::new();
164//!
165//! // Watch multiple collections concurrently
166//! for collection_name in &["users", "orders", "products"] {
167//!     let collection = db.collection(collection_name);
168//!     let config = ChangeStreamConfig::default();
169//!
170//!     tasks.spawn(async move {
171//!         let mut listener = ChangeStreamListener::new(
172//!             collection,
173//!             config,
174//!             |_| Box::pin(async { Ok(()) }),
175//!         ).await?;
176//!
177//!         while let Some(result) = listener.next().await {
178//!             let ackable = result?;
179//!             println!("{}: {:?}", collection_name, ackable.event_ref().operation);
180//!             ackable.ack();
181//!         }
182//!
183//!         Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
184//!     });
185//! }
186//!
187//! // Wait for all streams
188//! while let Some(result) = tasks.join_next().await {
189//!     result??;
190//! }
191//! # Ok(())
192//! # }
193//! ```
194
195use crate::event::{ChangeEvent, ConversionError};
196use bson::Document;
197use futures::Stream;
198use mongodb::{
199    change_stream::event::{ChangeStreamEvent, ResumeToken},
200    error::{Error as MongoError, ErrorKind as MongoErrorKind},
201    options::ChangeStreamOptions,
202    Collection,
203};
204use std::{
205    future::Future,
206    pin::Pin,
207    sync::{Arc, Mutex},
208    task::{Context, Poll},
209    time::Duration,
210};
211use thiserror::Error;
212use tokio::sync::mpsc;
213use tracing::{debug, error, info, warn};
214
215/// Errors that can occur during change stream operations.
216#[derive(Debug, Error)]
217pub enum StreamError {
218    /// MongoDB connection error (may be retryable)
219    #[error("Connection error: {message}")]
220    Connection {
221        message: String,
222        #[source]
223        source: Option<Box<dyn std::error::Error + Send + Sync>>,
224        /// MongoDB error code
225        code: Option<i32>,
226        /// MongoDB error labels (e.g., "RetryableWriteError")
227        labels: Vec<String>,
228    },
229
230    /// Failed to convert MongoDB event to ChangeEvent
231    #[error("Event conversion failed: {0}")]
232    Conversion(#[from] ConversionError),
233
234    /// Resume token persistence failed
235    #[error("Resume token persistence failed: {0}")]
236    ResumeTokenPersistence(String),
237
238    /// Stream was invalidated (collection dropped/renamed)
239    #[error("Stream invalidated: {reason}")]
240    Invalidated { reason: String },
241
242    /// Maximum reconnection attempts exceeded
243    #[error("Max reconnection attempts ({0}) exceeded")]
244    MaxReconnectAttemptsExceeded(u32),
245
246    /// Resume token is invalid or oplog truncated (error code 286)
247    #[error("Invalid resume token (code {code}): oplog may be truncated")]
248    InvalidResumeToken { code: i32 },
249
250    /// Configuration error
251    #[error("Configuration error: {0}")]
252    Configuration(String),
253}
254
255impl From<MongoError> for StreamError {
256    fn from(err: MongoError) -> Self {
257        Self::from_mongo_error(err)
258    }
259}
260
261impl StreamError {
262    /// Creates a StreamError from a MongoDB error with proper classification.
263    ///
264    /// This method checks error codes and labels for accurate categorization.
265    pub fn from_mongo_error(err: MongoError) -> Self {
266        // Extract error code
267        let code = match err.kind.as_ref() {
268            MongoErrorKind::Command(cmd_err) => Some(cmd_err.code),
269            _ => None,
270        };
271
272        // Check for invalid resume token (code 286: ChangeStreamFatalError)
273        if code == Some(286) {
274            return Self::InvalidResumeToken { code: 286 };
275        }
276
277        // Extract labels from the error
278        // Note: In MongoDB Rust driver, labels() method returns &HashSet<String>
279        let labels: Vec<String> = err.labels().iter().cloned().collect();
280
281        Self::Connection {
282            message: err.to_string(),
283            source: Some(Box::new(err)),
284            code,
285            labels,
286        }
287    }
288
289    /// Returns true if this error is retryable with reconnection.
290    ///
291    /// Uses MongoDB error codes and labels for accurate classification:
292    /// - Error labels: RetryableWriteError, TransientTransactionError, NetworkError
293    /// - Transient error codes: 6, 7, 43, 89, 91, 10107, 11600, 11602, 13435, 13436
294    ///
295    /// Non-retryable errors:
296    /// - Invalid resume token (286)
297    /// - Stream invalidated (collection dropped)
298    /// - Authentication/authorization errors
299    #[must_use]
300    pub fn is_retryable(&self) -> bool {
301        match self {
302            Self::Connection { code, labels, .. } => {
303                // Check error labels first (most reliable)
304                if labels.iter().any(|l| {
305                    l == "RetryableWriteError"
306                        || l == "TransientTransactionError"
307                        || l == "NetworkError"
308                }) {
309                    return true;
310                }
311
312                // Check specific transient error codes
313                if let Some(c) = code {
314                    matches!(
315                        c,
316                        // Network errors
317                        6 |    // HostUnreachable
318                        7 |    // HostNotFound
319                        89 |   // NetworkTimeout
320                        91 |   // ShutdownInProgress
321                        // Replication errors (transient during failover)
322                        10107 | // NotMaster / NotPrimary
323                        11600 | // InterruptedAtShutdown
324                        11602 | // InterruptedDueToReplStateChange
325                        13435 | // NotMasterNoSlaveOk
326                        13436 | // NotMasterOrSecondary / NotPrimaryOrSecondary
327                        // Cursor errors (can retry with resume token)
328                        43 // CursorNotFound
329                    )
330                } else {
331                    // No code available, be conservative
332                    false
333                }
334            }
335            Self::Invalidated { .. } => false,
336            Self::InvalidResumeToken { .. } => false,
337            Self::MaxReconnectAttemptsExceeded(_) => false,
338            Self::Conversion(_) => false,
339            Self::ResumeTokenPersistence(_) => false,
340            Self::Configuration(_) => false,
341        }
342    }
343
344    /// Returns the error category for metrics/logging.
345    #[must_use]
346    pub fn category(&self) -> &'static str {
347        match self {
348            Self::Connection { .. } => "connection",
349            Self::Conversion(_) => "conversion",
350            Self::ResumeTokenPersistence(_) => "persistence",
351            Self::Invalidated { .. } => "invalidated",
352            Self::MaxReconnectAttemptsExceeded(_) => "max_retries",
353            Self::InvalidResumeToken { .. } => "invalid_token",
354            Self::Configuration(_) => "configuration",
355        }
356    }
357}
358
359/// Event with acknowledgment capability for correct resume token semantics.
360///
361/// This type ensures that resume tokens are persisted AFTER the user successfully
362/// processes the event, providing at-least-once delivery semantics.
363///
364/// # Important
365///
366/// You MUST call `.ack()` after successfully processing the event. If you don't
367/// call `.ack()`, the resume token will not be persisted, and the stream cannot
368/// reliably resume after a crash.
369///
370/// # Examples
371///
372/// ```rust,no_run
373/// use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
374/// use futures::StreamExt;
375/// # use mongodb::{Client, options::ClientOptions};
376///
377/// # async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
378/// # let client = Client::with_options(ClientOptions::parse("mongodb://localhost:27017").await?)?;
379/// # let collection = client.database("mydb").collection("test");
380/// let mut listener = ChangeStreamListener::new(
381///     collection,
382///     ChangeStreamConfig::default(),
383///     |_| Box::pin(async { Ok(()) }),
384/// ).await?;
385///
386/// while let Some(ackable) = listener.next().await {
387///     let ackable = ackable?;
388///
389///     // Access the event
390///     let event = ackable.event_ref();
391///     println!("Processing: {:?}", event.operation);
392///
393///     // Process the event
394///     process_event(event).await?;
395///
396///     // IMPORTANT: Acknowledge after successful processing
397///     ackable.ack();
398/// }
399/// # Ok(())
400/// # }
401/// # async fn process_event(event: &rigatoni_core::event::ChangeEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { Ok(()) }
402/// ```
403#[derive(Debug, Clone)]
404pub struct AckableEvent {
405    /// The change event
406    pub event: ChangeEvent,
407
408    /// Resume token for this event
409    resume_token: Document,
410
411    /// Channel to send ack
412    ack_sender: mpsc::UnboundedSender<Document>,
413
414    /// Shared reference to last resume token (updated on ack)
415    last_resume_token: Arc<Mutex<Option<Document>>>,
416}
417
418impl AckableEvent {
419    /// Acknowledge processing of this event, persisting the resume token.
420    ///
421    /// Call this AFTER successfully processing the event to ensure at-least-once semantics.
422    /// The token will be sent to a background persistence task asynchronously.
423    ///
424    /// This method also updates the internal last_resume_token used for reconnection,
425    /// ensuring that reconnection only resumes from successfully processed events.
426    ///
427    /// If the persistence task has stopped, this will log a warning but not fail.
428    pub fn ack(self) {
429        // Update last resume token for reconnection (only after user confirms processing)
430        if let Ok(mut last_token) = self.last_resume_token.lock() {
431            *last_token = Some(self.resume_token.clone());
432        }
433
434        // Send token for persistence
435        if self.ack_sender.send(self.resume_token).is_err() {
436            warn!("Failed to send ack - persistence task may have stopped");
437        }
438    }
439
440    /// Get a reference to the change event without consuming the `AckableEvent`.
441    ///
442    /// Use this to access the event data while keeping the ability to call `.ack()` later.
443    #[must_use]
444    pub fn event_ref(&self) -> &ChangeEvent {
445        &self.event
446    }
447
448    /// Consume the `AckableEvent` and return the inner `ChangeEvent`.
449    ///
450    /// # Warning
451    ///
452    /// After calling this, you cannot call `.ack()`. Only use this if you don't
453    /// want to persist the resume token.
454    #[must_use]
455    pub fn into_event(self) -> ChangeEvent {
456        self.event
457    }
458}
459
460/// Configuration for change stream behavior.
461///
462/// Use [`ChangeStreamConfigBuilder`] to construct instances:
463///
464/// ```rust
465/// use rigatoni_core::stream::ChangeStreamConfig;
466///
467/// let config = ChangeStreamConfig::builder()
468///     .full_document_update_lookup()
469///     .max_reconnect_attempts(10)
470///     .build();
471/// ```
472#[derive(Debug, Clone)]
473pub struct ChangeStreamConfig {
474    /// MongoDB aggregation pipeline to filter events
475    pub pipeline: Vec<Document>,
476
477    /// Whether to include full document for update operations
478    pub full_document_on_update: bool,
479
480    /// Whether to include document before change (requires MongoDB 6.0+)
481    pub full_document_before_change: bool,
482
483    /// Initial backoff in milliseconds
484    pub initial_backoff_ms: u64,
485
486    /// Maximum backoff in milliseconds
487    pub max_backoff_ms: u64,
488
489    /// Maximum number of reconnection attempts (0 = infinite)
490    pub max_reconnect_attempts: u32,
491
492    /// Batch size for fetching events
493    pub batch_size: Option<u32>,
494
495    /// Resume token to start from (if any)
496    pub resume_token: Option<Document>,
497
498    /// Backoff jitter factor (0.0 to 1.0)
499    /// Default: 0.1 (10% jitter)
500    pub backoff_jitter: f64,
501}
502
503impl Default for ChangeStreamConfig {
504    fn default() -> Self {
505        Self {
506            pipeline: Vec::new(),
507            full_document_on_update: false,
508            full_document_before_change: false,
509            initial_backoff_ms: 100,
510            max_backoff_ms: 30_000, // 30 seconds
511            max_reconnect_attempts: 5,
512            batch_size: None,
513            resume_token: None,
514            backoff_jitter: 0.1, // 10% jitter to prevent thundering herd
515        }
516    }
517}
518
519impl ChangeStreamConfig {
520    /// Creates a new builder for configuring a change stream.
521    #[must_use]
522    pub fn builder() -> ChangeStreamConfigBuilder {
523        ChangeStreamConfigBuilder::default()
524    }
525
526    /// Validates the configuration.
527    ///
528    /// Returns an error if:
529    /// - `initial_backoff_ms` is 0
530    /// - `initial_backoff_ms` > `max_backoff_ms`
531    /// - `backoff_jitter` is not in range [0.0, 1.0]
532    pub fn validate(&self) -> Result<(), StreamError> {
533        if self.initial_backoff_ms == 0 {
534            return Err(StreamError::Configuration(
535                "initial_backoff_ms must be greater than 0".to_string(),
536            ));
537        }
538
539        if self.initial_backoff_ms > self.max_backoff_ms {
540            return Err(StreamError::Configuration(format!(
541                "initial_backoff_ms ({}) must be <= max_backoff_ms ({})",
542                self.initial_backoff_ms, self.max_backoff_ms
543            )));
544        }
545
546        if !(0.0..=1.0).contains(&self.backoff_jitter) {
547            return Err(StreamError::Configuration(format!(
548                "backoff_jitter ({}) must be between 0.0 and 1.0",
549                self.backoff_jitter
550            )));
551        }
552
553        Ok(())
554    }
555
556    /// Calculates backoff duration with jitter for the given attempt.
557    ///
558    /// Uses exponential backoff: `initial_ms * 2^(attempt-1)`, capped at `max_backoff_ms`.
559    /// Adds random jitter to prevent thundering herd: ±(base * jitter_factor / 2).
560    fn calculate_backoff(&self, attempt: u32) -> Duration {
561        // Exponential backoff: initial * 2^(attempt-1)
562        let base_ms = self
563            .initial_backoff_ms
564            .saturating_mul(1_u64 << attempt.saturating_sub(1))
565            .min(self.max_backoff_ms);
566
567        // Add jitter: random value in range [base * (1 - jitter/2), base * (1 + jitter/2)]
568        if self.backoff_jitter > 0.0 {
569            let jitter_range = (base_ms as f64) * self.backoff_jitter;
570            let jitter = (rand::random::<f64>() * jitter_range) - (jitter_range / 2.0);
571            let final_ms = ((base_ms as f64) + jitter).max(0.0) as u64;
572            Duration::from_millis(final_ms)
573        } else {
574            Duration::from_millis(base_ms)
575        }
576    }
577
578    /// Converts this config to MongoDB's `ChangeStreamOptions`.
579    fn to_mongo_options(&self) -> ChangeStreamOptions {
580        let mut options = ChangeStreamOptions::default();
581
582        // Set full document options
583        if self.full_document_on_update {
584            options.full_document = Some(mongodb::options::FullDocumentType::UpdateLookup);
585        }
586
587        if self.full_document_before_change {
588            options.full_document_before_change =
589                Some(mongodb::options::FullDocumentBeforeChangeType::WhenAvailable);
590        }
591
592        // Set batch size
593        options.batch_size = self.batch_size;
594
595        // Set resume token if provided
596        // Note: We store resume tokens as Document for persistence,
597        // but MongoDB expects ResumeToken. We'll deserialize from BSON bytes.
598        if let Some(ref token_doc) = self.resume_token {
599            // Serialize Document to bytes, then deserialize as ResumeToken
600            if let Ok(bytes) = bson::to_vec(token_doc) {
601                if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&bytes) {
602                    options.resume_after = Some(resume_token);
603                }
604            }
605        }
606
607        options
608    }
609}
610
611/// Builder for [`ChangeStreamConfig`].
612#[derive(Debug, Default)]
613pub struct ChangeStreamConfigBuilder {
614    pipeline: Vec<Document>,
615    full_document_on_update: bool,
616    full_document_before_change: bool,
617    initial_backoff_ms: Option<u64>,
618    max_backoff_ms: Option<u64>,
619    max_reconnect_attempts: Option<u32>,
620    batch_size: Option<u32>,
621    resume_token: Option<Document>,
622    backoff_jitter: Option<f64>,
623}
624
625impl ChangeStreamConfigBuilder {
626    /// Sets the aggregation pipeline for filtering events.
627    ///
628    /// # Example
629    ///
630    /// ```rust
631    /// use rigatoni_core::stream::ChangeStreamConfig;
632    /// use bson::doc;
633    ///
634    /// let config = ChangeStreamConfig::builder()
635    ///     .pipeline(vec![
636    ///         doc! { "$match": { "operationType": "insert" } }
637    ///     ])
638    ///     .build();
639    /// ```
640    #[must_use]
641    pub fn pipeline(mut self, pipeline: Vec<Document>) -> Self {
642        self.pipeline = pipeline;
643        self
644    }
645
646    /// Enables full document retrieval for update operations.
647    ///
648    /// When enabled, MongoDB will include the entire updated document
649    /// in the change event (requires additional lookup).
650    #[must_use]
651    pub fn full_document_update_lookup(mut self) -> Self {
652        self.full_document_on_update = true;
653        self
654    }
655
656    /// Enables pre-image for change events (requires MongoDB 6.0+).
657    ///
658    /// When enabled, the document state before the change will be included.
659    /// Requires pre-image collection to be configured on the collection.
660    #[must_use]
661    pub fn full_document_before_change(mut self) -> Self {
662        self.full_document_before_change = true;
663        self
664    }
665
666    /// Sets the initial backoff duration in milliseconds.
667    ///
668    /// Default: 100ms
669    #[must_use]
670    pub fn initial_backoff_ms(mut self, ms: u64) -> Self {
671        self.initial_backoff_ms = Some(ms);
672        self
673    }
674
675    /// Sets the maximum backoff duration in milliseconds.
676    ///
677    /// Default: 30,000ms (30 seconds)
678    #[must_use]
679    pub fn max_backoff_ms(mut self, ms: u64) -> Self {
680        self.max_backoff_ms = Some(ms);
681        self
682    }
683
684    /// Sets the maximum number of reconnection attempts.
685    ///
686    /// Set to 0 for infinite retries (use with caution).
687    /// Default: 5
688    #[must_use]
689    pub fn max_reconnect_attempts(mut self, attempts: u32) -> Self {
690        self.max_reconnect_attempts = Some(attempts);
691        self
692    }
693
694    /// Sets the batch size for fetching events from MongoDB.
695    ///
696    /// Larger batches can improve throughput but increase memory usage.
697    #[must_use]
698    pub fn batch_size(mut self, size: u32) -> Self {
699        self.batch_size = Some(size);
700        self
701    }
702
703    /// Sets a resume token to start streaming from.
704    ///
705    /// If provided, the stream will resume from this point instead
706    /// of starting from the current time.
707    #[must_use]
708    pub fn resume_token(mut self, token: Document) -> Self {
709        self.resume_token = Some(token);
710        self
711    }
712
713    /// Sets the backoff jitter factor (0.0 to 1.0).
714    ///
715    /// Jitter adds randomness to backoff delays to prevent thundering herd.
716    /// A value of 0.1 means ±10% randomness.
717    ///
718    /// Default: 0.1
719    #[must_use]
720    pub fn backoff_jitter(mut self, jitter: f64) -> Self {
721        self.backoff_jitter = Some(jitter);
722        self
723    }
724
725    /// Builds the configuration.
726    ///
727    /// # Errors
728    ///
729    /// Returns `StreamError::Configuration` if validation fails:
730    /// - `initial_backoff_ms` must be > 0
731    /// - `initial_backoff_ms` must be <= `max_backoff_ms`
732    /// - `backoff_jitter` must be between 0.0 and 1.0
733    pub fn build(self) -> Result<ChangeStreamConfig, StreamError> {
734        let config = ChangeStreamConfig {
735            pipeline: self.pipeline,
736            full_document_on_update: self.full_document_on_update,
737            full_document_before_change: self.full_document_before_change,
738            initial_backoff_ms: self.initial_backoff_ms.unwrap_or(100),
739            max_backoff_ms: self.max_backoff_ms.unwrap_or(30_000),
740            max_reconnect_attempts: self.max_reconnect_attempts.unwrap_or(5),
741            batch_size: self.batch_size,
742            resume_token: self.resume_token,
743            backoff_jitter: self.backoff_jitter.unwrap_or(0.1),
744        };
745
746        config.validate()?;
747        Ok(config)
748    }
749}
750
751/// Callback for persisting resume tokens.
752///
753/// This callback is invoked after each successfully processed event.
754/// It should persist the token to durable storage (database, file, etc.)
755/// to enable resuming after crashes or restarts.
756///
757/// # Returns
758///
759/// - `Ok(())` if token was successfully persisted
760/// - `Err(String)` if persistence failed (will propagate as StreamError)
761///
762/// # Example
763///
764/// ```rust
765/// use bson::Document;
766/// use std::future::Future;
767/// use std::pin::Pin;
768///
769/// fn save_to_redis(
770///     token: Document,
771/// ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
772///     Box::pin(async move {
773///         // Save to Redis
774///         // redis_client.set("resume_token", token).await
775///         //     .map_err(|e| e.to_string())?;
776///         Ok(())
777///     })
778/// }
779/// ```
780pub type ResumeTokenCallback =
781    Box<dyn Fn(Document) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;
782
783/// Type alias for the reconnection future to reduce complexity
784type ReconnectFuture = Pin<
785    Box<
786        dyn Future<
787                Output = Result<
788                    mongodb::change_stream::ChangeStream<ChangeStreamEvent<Document>>,
789                    StreamError,
790                >,
791            > + Send,
792    >,
793>;
794
795/// State of the change stream listener.
796enum StreamState {
797    /// Stream is active and consuming events
798    Active,
799    /// Stream encountered an error and is reconnecting
800    Reconnecting(ReconnectFuture),
801    /// Stream is closed (terminal state)
802    Closed,
803}
804
805/// A MongoDB change stream listener with automatic reconnection.
806///
807/// Implements the `Stream` trait, yielding `Result<ChangeEvent, StreamError>`.
808///
809/// # Lifecycle
810///
811/// 1. **Active**: Consuming events normally
812/// 2. **Reconnecting**: Connection lost, attempting to reconnect with exponential backoff
813/// 3. **Closed**: Terminal state after fatal error or explicit close
814///
815/// # Thread Safety
816///
817/// `ChangeStreamListener` is `Send` but not `Sync`. Each listener should be
818/// owned by a single task. For multi-collection scenarios, spawn separate
819/// tasks with separate listeners.
820///
821/// # Resume Token Semantics
822///
823/// Resume tokens are only updated when the user calls `.ack()` on an `AckableEvent`.
824/// This ensures at-least-once delivery semantics: if the stream crashes before
825/// the user acknowledges an event, reconnection will resume from before that event.
826pub struct ChangeStreamListener {
827    /// The MongoDB collection being watched
828    collection: Collection<Document>,
829
830    /// Configuration for the change stream
831    config: ChangeStreamConfig,
832
833    /// The underlying MongoDB change stream
834    stream: Option<mongodb::change_stream::ChangeStream<ChangeStreamEvent<Document>>>,
835
836    /// Callback for persisting resume tokens
837    resume_token_callback: ResumeTokenCallback,
838
839    /// Current state of the listener
840    state: StreamState,
841
842    /// Number of reconnection attempts made
843    reconnect_attempts: u32,
844
845    /// Last successfully processed resume token (shared with AckableEvent)
846    last_resume_token: Arc<Mutex<Option<Document>>>,
847
848    /// Channel sender for acking tokens (sends to persistence task)
849    ack_sender: mpsc::UnboundedSender<Document>,
850
851    /// Background task that persists tokens
852    _persistence_task: tokio::task::JoinHandle<()>,
853}
854
855impl ChangeStreamListener {
856    /// Creates a new change stream listener.
857    ///
858    /// # Arguments
859    ///
860    /// * `collection` - MongoDB collection to watch
861    /// * `config` - Change stream configuration
862    /// * `resume_token_callback` - Callback to persist resume tokens
863    ///
864    /// # Errors
865    ///
866    /// Returns `StreamError::Connection` if initial connection fails.
867    ///
868    /// # Examples
869    ///
870    /// ```rust,no_run
871    /// use rigatoni_core::stream::{ChangeStreamListener, ChangeStreamConfig};
872    /// use mongodb::{Client, options::ClientOptions};
873    ///
874    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
875    /// let client = Client::with_options(
876    ///     ClientOptions::parse("mongodb://localhost:27017").await?
877    /// )?;
878    /// let collection = client.database("mydb").collection("users");
879    ///
880    /// let listener = ChangeStreamListener::new(
881    ///     collection,
882    ///     ChangeStreamConfig::default(),
883    ///     |token| Box::pin(async move {
884    ///         println!("Token: {:?}", token);
885    ///         Ok(())
886    ///     }),
887    /// ).await?;
888    /// # Ok(())
889    /// # }
890    /// ```
891    pub async fn new<F>(
892        collection: Collection<Document>,
893        config: ChangeStreamConfig,
894        resume_token_callback: F,
895    ) -> Result<Self, StreamError>
896    where
897        F: Fn(Document) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
898            + Send
899            + Sync
900            + 'static,
901    {
902        info!(
903            "Initializing change stream for {}.{}",
904            collection.namespace().db,
905            collection.namespace().coll
906        );
907
908        let options = config.to_mongo_options();
909        let stream = if config.pipeline.is_empty() {
910            collection.watch().with_options(options).await?
911        } else {
912            collection
913                .watch()
914                .pipeline(config.pipeline.clone())
915                .with_options(options)
916                .await?
917        };
918
919        // Create mpsc channel for token persistence
920        let (ack_sender, mut ack_receiver) = mpsc::unbounded_channel::<Document>();
921
922        // Spawn background task to persist tokens
923        let resume_token_callback_clone = Box::new(resume_token_callback);
924        let persistence_task = tokio::spawn(async move {
925            while let Some(token) = ack_receiver.recv().await {
926                if let Err(e) = resume_token_callback_clone(token).await {
927                    warn!("Failed to persist resume token: {}", e);
928                }
929            }
930            debug!("Token persistence task shutting down");
931        });
932
933        Ok(Self {
934            collection,
935            config,
936            stream: Some(stream),
937            resume_token_callback: Box::new(|_| Box::pin(async { Ok(()) })),
938            state: StreamState::Active,
939            reconnect_attempts: 0,
940            last_resume_token: Arc::new(Mutex::new(None)),
941            ack_sender,
942            _persistence_task: persistence_task,
943        })
944    }
945
946    /// Async reconnection function that doesn't borrow self.
947    ///
948    /// This is called from within the reconnection future in poll_next.
949    /// It returns the new stream on success.
950    async fn reconnect_async(
951        collection: Collection<Document>,
952        config: ChangeStreamConfig,
953        last_resume_token: Option<Document>,
954        mut reconnect_attempts: u32,
955    ) -> Result<mongodb::change_stream::ChangeStream<ChangeStreamEvent<Document>>, StreamError>
956    {
957        reconnect_attempts += 1;
958
959        // Check if max attempts exceeded
960        if config.max_reconnect_attempts > 0 && reconnect_attempts > config.max_reconnect_attempts {
961            error!(
962                attempts = reconnect_attempts,
963                "Max reconnection attempts exceeded"
964            );
965            return Err(StreamError::MaxReconnectAttemptsExceeded(
966                config.max_reconnect_attempts,
967            ));
968        }
969
970        // Calculate exponential backoff with jitter
971        let backoff = config.calculate_backoff(reconnect_attempts);
972
973        warn!(
974            attempt = reconnect_attempts,
975            backoff_ms = backoff.as_millis(),
976            "Reconnecting to change stream"
977        );
978
979        // Sleep before retry
980        tokio::time::sleep(backoff).await;
981
982        // Build options with resume token if available
983        let mut options = config.to_mongo_options();
984        if let Some(ref token_doc) = last_resume_token {
985            debug!("Resuming from token: {:?}", token_doc);
986            // Serialize Document to bytes, then deserialize as ResumeToken
987            if let Ok(bytes) = bson::to_vec(token_doc) {
988                if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&bytes) {
989                    options.resume_after = Some(resume_token);
990                }
991            }
992        }
993
994        // Attempt to reopen stream
995        let stream = if config.pipeline.is_empty() {
996            collection.watch().with_options(options).await?
997        } else {
998            collection
999                .watch()
1000                .pipeline(config.pipeline.clone())
1001                .with_options(options)
1002                .await?
1003        };
1004
1005        info!(
1006            attempt = reconnect_attempts,
1007            "Successfully reconnected to change stream"
1008        );
1009
1010        Ok(stream)
1011    }
1012
1013    /// Attempts to reconnect the change stream with exponential backoff.
1014    ///
1015    /// This method is called internally when a retryable error occurs.
1016    /// It will:
1017    /// 1. Calculate backoff duration
1018    /// 2. Sleep for backoff period
1019    /// 3. Attempt to reopen stream with last resume token
1020    /// 4. Reset attempts on success
1021    ///
1022    /// # Note
1023    ///
1024    /// This method is now deprecated in favor of reconnect_async.
1025    #[allow(dead_code)]
1026    async fn reconnect(&mut self) -> Result<(), StreamError> {
1027        self.reconnect_attempts += 1;
1028
1029        // Check if max attempts exceeded
1030        if self.config.max_reconnect_attempts > 0
1031            && self.reconnect_attempts > self.config.max_reconnect_attempts
1032        {
1033            error!(
1034                attempts = self.reconnect_attempts,
1035                "Max reconnection attempts exceeded"
1036            );
1037            return Err(StreamError::MaxReconnectAttemptsExceeded(
1038                self.config.max_reconnect_attempts,
1039            ));
1040        }
1041
1042        // Calculate exponential backoff: initial * 2^(attempts-1)
1043        let backoff_ms = self
1044            .config
1045            .initial_backoff_ms
1046            .saturating_mul(2_u64.saturating_pow(self.reconnect_attempts - 1))
1047            .min(self.config.max_backoff_ms);
1048
1049        warn!(
1050            attempt = self.reconnect_attempts,
1051            backoff_ms = backoff_ms,
1052            "Reconnecting to change stream"
1053        );
1054
1055        // Sleep before retry
1056        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1057
1058        // Build options with resume token if available
1059        let mut options = self.config.to_mongo_options();
1060        if let Ok(last_token) = self.last_resume_token.lock() {
1061            if let Some(ref token_doc) = *last_token {
1062                debug!("Resuming from token: {:?}", token_doc);
1063                // Serialize Document to bytes, then deserialize as ResumeToken
1064                if let Ok(bytes) = bson::to_vec(token_doc) {
1065                    if let Ok(resume_token) = bson::from_slice::<ResumeToken>(&bytes) {
1066                        options.resume_after = Some(resume_token);
1067                    }
1068                }
1069            }
1070        }
1071
1072        // Attempt to reopen stream
1073        let stream = if self.config.pipeline.is_empty() {
1074            self.collection.watch().with_options(options).await?
1075        } else {
1076            self.collection
1077                .watch()
1078                .pipeline(self.config.pipeline.clone())
1079                .with_options(options)
1080                .await?
1081        };
1082
1083        info!(
1084            attempt = self.reconnect_attempts,
1085            "Successfully reconnected to change stream"
1086        );
1087
1088        self.stream = Some(stream);
1089        self.state = StreamState::Active;
1090        self.reconnect_attempts = 0; // Reset on success
1091
1092        Ok(())
1093    }
1094
1095    /// Processes a MongoDB change stream event.
1096    ///
1097    /// Converts to ChangeEvent, persists resume token, and handles invalidation.
1098    ///
1099    /// # Note
1100    ///
1101    /// Currently not used in poll_next due to async/await borrowing constraints.
1102    /// The logic is inlined in poll_next instead.
1103    #[allow(dead_code)]
1104    async fn process_event(
1105        &mut self,
1106        event: ChangeStreamEvent<Document>,
1107    ) -> Result<ChangeEvent, StreamError> {
1108        // Extract resume token before conversion
1109        let resume_token = bson::to_document(&event.id)
1110            .map_err(|e| StreamError::ResumeTokenPersistence(e.to_string()))?;
1111
1112        // Convert to ChangeEvent
1113        let change_event = ChangeEvent::try_from(event)?;
1114
1115        // Check for invalidation
1116        if change_event.is_invalidate() {
1117            let reason = format!(
1118                "Collection {}.{} was dropped or renamed",
1119                self.collection.namespace().db,
1120                self.collection.namespace().coll
1121            );
1122            error!("{}", reason);
1123            self.state = StreamState::Closed;
1124            return Err(StreamError::Invalidated { reason });
1125        }
1126
1127        // Persist resume token
1128        (self.resume_token_callback)(resume_token.clone())
1129            .await
1130            .map_err(StreamError::ResumeTokenPersistence)?;
1131
1132        // Update last resume token
1133        if let Ok(mut last_token) = self.last_resume_token.lock() {
1134            *last_token = Some(resume_token);
1135        }
1136
1137        Ok(change_event)
1138    }
1139
1140    /// Closes the change stream.
1141    ///
1142    /// This is a graceful shutdown that closes the underlying MongoDB cursor.
1143    /// After calling this, the stream will return `None` on next poll.
1144    pub async fn close(&mut self) {
1145        info!("Closing change stream");
1146        self.state = StreamState::Closed;
1147        self.stream = None;
1148    }
1149}
1150
1151impl Stream for ChangeStreamListener {
1152    type Item = Result<AckableEvent, StreamError>;
1153
1154    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1155        let this = self.get_mut();
1156
1157        // If closed, return None
1158        if matches!(this.state, StreamState::Closed) {
1159            return Poll::Ready(None);
1160        }
1161
1162        // If reconnecting, poll the reconnection future
1163        if let StreamState::Reconnecting(ref mut reconnect_fut) = this.state {
1164            match reconnect_fut.as_mut().poll(cx) {
1165                Poll::Ready(Ok(new_stream)) => {
1166                    // Reconnection succeeded, transition back to Active
1167                    info!("Reconnection successful, resuming stream");
1168                    this.stream = Some(new_stream);
1169                    this.state = StreamState::Active;
1170                    this.reconnect_attempts = 0;
1171                    // Wake to poll the stream immediately
1172                    cx.waker().wake_by_ref();
1173                    return Poll::Pending;
1174                }
1175                Poll::Ready(Err(e)) => {
1176                    // Reconnection failed permanently
1177                    error!("Reconnection failed: {}", e);
1178                    this.state = StreamState::Closed;
1179                    return Poll::Ready(Some(Err(e)));
1180                }
1181                Poll::Pending => {
1182                    // Still reconnecting
1183                    return Poll::Pending;
1184                }
1185            }
1186        }
1187
1188        // Poll the underlying stream
1189        if let Some(ref mut stream) = this.stream {
1190            use futures::StreamExt;
1191            let mut stream_pin = Pin::new(stream);
1192            match stream_pin.poll_next_unpin(cx) {
1193                Poll::Ready(Some(Ok(event))) => {
1194                    // Extract resume token
1195                    let resume_token = match bson::to_document(&event.id) {
1196                        Ok(token) => token,
1197                        Err(e) => {
1198                            return Poll::Ready(Some(Err(StreamError::ResumeTokenPersistence(
1199                                e.to_string(),
1200                            ))))
1201                        }
1202                    };
1203
1204                    // Convert to ChangeEvent
1205                    let change_event = match ChangeEvent::try_from(event) {
1206                        Ok(evt) => evt,
1207                        Err(e) => return Poll::Ready(Some(Err(StreamError::Conversion(e)))),
1208                    };
1209
1210                    // Check for invalidation
1211                    if change_event.is_invalidate() {
1212                        let reason = format!(
1213                            "Collection {}.{} was dropped or renamed",
1214                            this.collection.namespace().db,
1215                            this.collection.namespace().coll
1216                        );
1217                        error!("{}", reason);
1218                        this.state = StreamState::Closed;
1219                        return Poll::Ready(Some(Err(StreamError::Invalidated { reason })));
1220                    }
1221
1222                    // Create AckableEvent - user must call ack() after processing
1223                    // Note: last_resume_token is NOT updated here - only when user calls ack()
1224                    // This ensures at-least-once semantics
1225                    let ackable = AckableEvent {
1226                        event: change_event,
1227                        resume_token,
1228                        ack_sender: this.ack_sender.clone(),
1229                        last_resume_token: this.last_resume_token.clone(),
1230                    };
1231
1232                    Poll::Ready(Some(Ok(ackable)))
1233                }
1234                Poll::Ready(Some(Err(e))) => {
1235                    // MongoDB error
1236                    let stream_err = StreamError::from_mongo_error(e);
1237
1238                    if stream_err.is_retryable() {
1239                        warn!("Retryable error occurred: {}", stream_err);
1240
1241                        // Create reconnection future
1242                        let collection = this.collection.clone();
1243                        let config = this.config.clone();
1244                        let last_resume_token =
1245                            this.last_resume_token.lock().ok().and_then(|t| t.clone());
1246                        let reconnect_attempts = this.reconnect_attempts;
1247
1248                        let reconnect_fut = Box::pin(async move {
1249                            Self::reconnect_async(
1250                                collection,
1251                                config,
1252                                last_resume_token,
1253                                reconnect_attempts,
1254                            )
1255                            .await
1256                        });
1257
1258                        this.state = StreamState::Reconnecting(reconnect_fut);
1259                        // Wake to attempt reconnection on next poll
1260                        cx.waker().wake_by_ref();
1261                        Poll::Pending
1262                    } else {
1263                        error!("Fatal error occurred: {}", stream_err);
1264                        this.state = StreamState::Closed;
1265                        Poll::Ready(Some(Err(stream_err)))
1266                    }
1267                }
1268                Poll::Ready(None) => {
1269                    // Stream ended (should not happen for change streams)
1270                    warn!("Change stream ended unexpectedly");
1271                    this.state = StreamState::Closed;
1272                    Poll::Ready(None)
1273                }
1274                Poll::Pending => Poll::Pending,
1275            }
1276        } else {
1277            // No stream available
1278            this.state = StreamState::Closed;
1279            Poll::Ready(None)
1280        }
1281    }
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286    use super::*;
1287    use std::sync::Arc;
1288    use tokio::sync::Mutex;
1289
1290    #[test]
1291    fn test_default_config() {
1292        let config = ChangeStreamConfig::default();
1293        assert_eq!(config.pipeline.len(), 0);
1294        assert!(!config.full_document_on_update);
1295        assert!(!config.full_document_before_change);
1296        assert_eq!(config.initial_backoff_ms, 100);
1297        assert_eq!(config.max_backoff_ms, 30_000);
1298        assert_eq!(config.max_reconnect_attempts, 5);
1299    }
1300
1301    #[test]
1302    fn test_config_builder() {
1303        let config = ChangeStreamConfig::builder()
1304            .full_document_update_lookup()
1305            .full_document_before_change()
1306            .max_reconnect_attempts(10)
1307            .initial_backoff_ms(200)
1308            .max_backoff_ms(60_000)
1309            .batch_size(100)
1310            .build()
1311            .unwrap();
1312
1313        assert!(config.full_document_on_update);
1314        assert!(config.full_document_before_change);
1315        assert_eq!(config.max_reconnect_attempts, 10);
1316        assert_eq!(config.initial_backoff_ms, 200);
1317        assert_eq!(config.max_backoff_ms, 60_000);
1318        assert_eq!(config.batch_size, Some(100));
1319    }
1320
1321    #[test]
1322    fn test_config_with_pipeline() {
1323        use bson::doc;
1324
1325        let pipeline = vec![doc! { "$match": { "operationType": "insert" } }];
1326
1327        let config = ChangeStreamConfig::builder()
1328            .pipeline(pipeline.clone())
1329            .build()
1330            .unwrap();
1331
1332        assert_eq!(config.pipeline.len(), 1);
1333        assert_eq!(config.pipeline[0], pipeline[0]);
1334    }
1335
1336    #[test]
1337    fn test_stream_error_is_retryable() {
1338        // Connection errors with retryable labels are retryable
1339        let err = StreamError::Connection {
1340            message: "connection refused".to_string(),
1341            source: None,
1342            code: Some(6), // HostUnreachable
1343            labels: vec![],
1344        };
1345        assert!(err.is_retryable());
1346
1347        // Invalidated is not retryable
1348        let err = StreamError::Invalidated {
1349            reason: "collection dropped".to_string(),
1350        };
1351        assert!(!err.is_retryable());
1352
1353        // Max attempts exceeded is not retryable
1354        let err = StreamError::MaxReconnectAttemptsExceeded(5);
1355        assert!(!err.is_retryable());
1356    }
1357
1358    #[test]
1359    fn test_stream_error_category() {
1360        let err = StreamError::Invalidated {
1361            reason: "test".to_string(),
1362        };
1363        assert_eq!(err.category(), "invalidated");
1364
1365        let err = StreamError::MaxReconnectAttemptsExceeded(5);
1366        assert_eq!(err.category(), "max_retries");
1367
1368        let err = StreamError::Configuration("test".to_string());
1369        assert_eq!(err.category(), "configuration");
1370    }
1371
1372    #[tokio::test]
1373    async fn test_resume_token_callback() {
1374        use bson::doc;
1375
1376        let saved_token = Arc::new(Mutex::new(None));
1377        let saved_token_clone = saved_token.clone();
1378
1379        let callback = move |token: Document| {
1380            let saved = saved_token_clone.clone();
1381            Box::pin(async move {
1382                *saved.lock().await = Some(token);
1383                Ok(())
1384            }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
1385        };
1386
1387        let test_token = doc! { "_data": "test123" };
1388        callback(test_token.clone()).await.unwrap();
1389
1390        let saved = saved_token.lock().await;
1391        assert_eq!(*saved, Some(test_token));
1392    }
1393
1394    // Note: test_stream_state_transitions removed because StreamState::Reconnecting
1395    // now contains a Future which doesn't implement PartialEq
1396
1397    #[test]
1398    fn test_config_resume_token() {
1399        use bson::doc;
1400
1401        let token = doc! { "_data": "token123" };
1402        let config = ChangeStreamConfig::builder()
1403            .resume_token(token.clone())
1404            .build()
1405            .unwrap();
1406
1407        assert_eq!(config.resume_token, Some(token));
1408    }
1409
1410    // Note: Integration tests with real MongoDB would go in tests/integration_test.rs
1411    // and would use testcontainers for a real MongoDB instance.
1412}