rigatoni_core/
watch_level.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Watch level configuration for MongoDB change streams.
18//!
19//! This module defines the different scopes at which Rigatoni can watch
20//! for changes in MongoDB: collection level, database level, or deployment
21//! (cluster) level.
22//!
23//! # Watch Levels
24//!
25//! ## Collection Level
26//!
27//! Watch specific collections only. Use this when you know exactly which
28//! collections to monitor and want fine-grained control.
29//!
30//! ```rust
31//! use rigatoni_core::watch_level::WatchLevel;
32//!
33//! let level = WatchLevel::Collection(vec!["users".to_string(), "orders".to_string()]);
34//! ```
35//!
36//! ## Database Level
37//!
38//! Watch all collections in a database. New collections are automatically
39//! included as they're created. This is the default and recommended mode
40//! for most use cases.
41//!
42//! ```rust
43//! use rigatoni_core::watch_level::WatchLevel;
44//!
45//! let level = WatchLevel::Database;
46//! ```
47//!
48//! ## Deployment Level
49//!
50//! Watch all databases in the deployment (cluster-wide). Use with caution
51//! as this can generate high event volume. Requires MongoDB 4.0+ and
52//! appropriate permissions.
53//!
54//! ```rust
55//! use rigatoni_core::watch_level::WatchLevel;
56//!
57//! let level = WatchLevel::Deployment;
58//! ```
59//!
60//! # Performance Considerations
61//!
62//! | Level | Streams | Auto-Discovery | Throughput | Recommended For |
63//! |-------|---------|----------------|------------|-----------------|
64//! | Collection | N (one per collection) | No | Highest (parallel) | Large DBs with known collections |
65//! | Database | 1 | Yes | Good | Most use cases (< 50 collections) |
66//! | Deployment | 1 | Yes | Variable | Monitoring/audit use cases |
67
68/// Defines the scope of collections to watch for changes.
69///
70/// This enum controls whether Rigatoni watches specific collections,
71/// an entire database, or all databases in a deployment.
72///
73/// # Examples
74///
75/// ```rust
76/// use rigatoni_core::watch_level::WatchLevel;
77///
78/// // Watch specific collections
79/// let collections = WatchLevel::Collection(vec!["users".to_string(), "orders".to_string()]);
80///
81/// // Watch entire database (default)
82/// let database = WatchLevel::Database;
83///
84/// // Watch entire deployment (cluster-wide)
85/// let deployment = WatchLevel::Deployment;
86///
87/// // Default is Database level
88/// assert_eq!(WatchLevel::default(), WatchLevel::Database);
89/// ```
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum WatchLevel {
92    /// Watch specific collections only.
93    ///
94    /// This is the most granular option, allowing you to specify exactly
95    /// which collections to monitor. Each collection gets its own change
96    /// stream, enabling parallel processing.
97    ///
98    /// **Advantages**:
99    /// - Maximum parallelism (one worker per collection)
100    /// - Can apply different batching/retry settings per collection
101    /// - Lower latency for specific collections
102    ///
103    /// **Disadvantages**:
104    /// - Must update configuration when adding new collections
105    /// - More MongoDB connections
106    ///
107    /// # Example
108    ///
109    /// ```rust
110    /// use rigatoni_core::watch_level::WatchLevel;
111    ///
112    /// let level = WatchLevel::Collection(vec![
113    ///     "users".to_string(),
114    ///     "orders".to_string(),
115    ///     "products".to_string(),
116    /// ]);
117    /// ```
118    Collection(Vec<String>),
119
120    /// Watch all collections in the database.
121    ///
122    /// Automatically picks up new collections as they are created.
123    /// Uses MongoDB's `db.watch()` API to create a single change stream
124    /// for the entire database.
125    ///
126    /// **Advantages**:
127    /// - Automatic discovery of new collections
128    /// - Single change stream (simpler architecture)
129    /// - No configuration updates needed
130    ///
131    /// **Disadvantages**:
132    /// - Single stream may become bottleneck for high-volume databases
133    /// - Cannot apply per-collection settings
134    ///
135    /// **Requirements**:
136    /// - MongoDB replica set
137    ///
138    /// **Recommended for**: Databases with < 50 collections
139    ///
140    /// # Example
141    ///
142    /// ```rust
143    /// use rigatoni_core::pipeline::PipelineConfig;
144    ///
145    /// let config = PipelineConfig::builder()
146    ///     .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
147    ///     .database("mydb")
148    ///     .watch_database()  // Use Database watch level
149    ///     .build();
150    /// ```
151    Database,
152
153    /// Watch all databases in the deployment (cluster-wide).
154    ///
155    /// Uses MongoDB's `client.watch()` API to monitor changes across
156    /// all databases in the deployment. This is the most comprehensive
157    /// option but also the most resource-intensive.
158    ///
159    /// **Advantages**:
160    /// - Complete visibility into all changes
161    /// - Single stream for entire cluster
162    /// - Useful for audit logging and compliance
163    ///
164    /// **Disadvantages**:
165    /// - High event volume
166    /// - Requires cluster-wide permissions
167    /// - Not suitable for multi-tenant environments (unless intended)
168    ///
169    /// **Requirements**:
170    /// - MongoDB 4.0+
171    /// - Cluster-wide read permissions
172    ///
173    /// **Recommended for**: Monitoring, audit logging, compliance use cases
174    ///
175    /// # Example
176    ///
177    /// ```rust
178    /// use rigatoni_core::pipeline::PipelineConfig;
179    ///
180    /// let config = PipelineConfig::builder()
181    ///     .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
182    ///     .database("mydb")  // Still needed for state storage keys
183    ///     .watch_deployment()  // Watch all databases
184    ///     .build();
185    /// ```
186    Deployment,
187}
188
189impl Default for WatchLevel {
190    /// Returns the default watch level: [`WatchLevel::Database`].
191    ///
192    /// Database-level watching is the recommended default as it provides
193    /// automatic collection discovery without the complexity of deployment-level
194    /// watching.
195    fn default() -> Self {
196        Self::Database
197    }
198}
199
200impl WatchLevel {
201    /// Returns `true` if this is collection-level watching.
202    ///
203    /// # Example
204    ///
205    /// ```rust
206    /// use rigatoni_core::watch_level::WatchLevel;
207    ///
208    /// let level = WatchLevel::Collection(vec!["users".to_string()]);
209    /// assert!(level.is_collection());
210    ///
211    /// let level = WatchLevel::Database;
212    /// assert!(!level.is_collection());
213    /// ```
214    #[must_use]
215    pub fn is_collection(&self) -> bool {
216        matches!(self, Self::Collection(_))
217    }
218
219    /// Returns `true` if this is database-level watching.
220    ///
221    /// # Example
222    ///
223    /// ```rust
224    /// use rigatoni_core::watch_level::WatchLevel;
225    ///
226    /// let level = WatchLevel::Database;
227    /// assert!(level.is_database());
228    /// ```
229    #[must_use]
230    pub fn is_database(&self) -> bool {
231        matches!(self, Self::Database)
232    }
233
234    /// Returns `true` if this is deployment-level (cluster-wide) watching.
235    ///
236    /// # Example
237    ///
238    /// ```rust
239    /// use rigatoni_core::watch_level::WatchLevel;
240    ///
241    /// let level = WatchLevel::Deployment;
242    /// assert!(level.is_deployment());
243    /// ```
244    #[must_use]
245    pub fn is_deployment(&self) -> bool {
246        matches!(self, Self::Deployment)
247    }
248
249    /// Returns the collections if this is collection-level watching.
250    ///
251    /// Returns `None` for database or deployment level watching.
252    ///
253    /// # Example
254    ///
255    /// ```rust
256    /// use rigatoni_core::watch_level::WatchLevel;
257    ///
258    /// let level = WatchLevel::Collection(vec!["users".to_string()]);
259    /// assert_eq!(level.collections(), Some(&vec!["users".to_string()]));
260    ///
261    /// let level = WatchLevel::Database;
262    /// assert_eq!(level.collections(), None);
263    /// ```
264    #[must_use]
265    pub fn collections(&self) -> Option<&Vec<String>> {
266        match self {
267            Self::Collection(collections) => Some(collections),
268            _ => None,
269        }
270    }
271
272    /// Returns a human-readable description of the watch level.
273    ///
274    /// # Example
275    ///
276    /// ```rust
277    /// use rigatoni_core::watch_level::WatchLevel;
278    ///
279    /// let level = WatchLevel::Collection(vec!["users".to_string(), "orders".to_string()]);
280    /// assert_eq!(level.description(), "2 collections");
281    ///
282    /// let level = WatchLevel::Database;
283    /// assert_eq!(level.description(), "database");
284    ///
285    /// let level = WatchLevel::Deployment;
286    /// assert_eq!(level.description(), "deployment");
287    /// ```
288    #[must_use]
289    pub fn description(&self) -> String {
290        match self {
291            Self::Collection(collections) => {
292                if collections.is_empty() {
293                    "no collections".to_string()
294                } else if collections.len() == 1 {
295                    format!("1 collection ({})", collections[0])
296                } else {
297                    format!("{} collections", collections.len())
298                }
299            }
300            Self::Database => "database".to_string(),
301            Self::Deployment => "deployment".to_string(),
302        }
303    }
304
305    /// Returns the resume token key prefix for this watch level.
306    ///
307    /// Different watch levels use different resume token keys to avoid
308    /// conflicts when switching between levels.
309    ///
310    /// # Arguments
311    ///
312    /// * `database` - The database name (used for collection and database levels)
313    /// * `collection` - Optional collection name (used for collection level)
314    ///
315    /// # Example
316    ///
317    /// ```rust
318    /// use rigatoni_core::watch_level::WatchLevel;
319    ///
320    /// let level = WatchLevel::Collection(vec!["users".to_string()]);
321    /// assert_eq!(
322    ///     level.resume_token_key("mydb", Some("users")),
323    ///     "resume_token:mydb:users"
324    /// );
325    ///
326    /// let level = WatchLevel::Database;
327    /// assert_eq!(
328    ///     level.resume_token_key("mydb", None),
329    ///     "resume_token:database:mydb"
330    /// );
331    ///
332    /// let level = WatchLevel::Deployment;
333    /// assert_eq!(
334    ///     level.resume_token_key("mydb", None),
335    ///     "resume_token:deployment"
336    /// );
337    /// ```
338    #[must_use]
339    pub fn resume_token_key(&self, database: &str, collection: Option<&str>) -> String {
340        match self {
341            Self::Collection(_) => {
342                if let Some(coll) = collection {
343                    format!("resume_token:{}:{}", database, coll)
344                } else {
345                    format!("resume_token:{}", database)
346                }
347            }
348            Self::Database => {
349                format!("resume_token:database:{}", database)
350            }
351            Self::Deployment => "resume_token:deployment".to_string(),
352        }
353    }
354}
355
356impl std::fmt::Display for WatchLevel {
357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358        match self {
359            Self::Collection(collections) => {
360                if collections.is_empty() {
361                    write!(f, "Collection([])")
362                } else {
363                    write!(f, "Collection({:?})", collections)
364                }
365            }
366            Self::Database => write!(f, "Database"),
367            Self::Deployment => write!(f, "Deployment"),
368        }
369    }
370}