rigatoni_core/watch_level.rs
1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Watch level configuration for MongoDB change streams.
18//!
19//! This module defines the different scopes at which Rigatoni can watch
20//! for changes in MongoDB: collection level, database level, or deployment
21//! (cluster) level.
22//!
23//! # Watch Levels
24//!
25//! ## Collection Level
26//!
27//! Watch specific collections only. Use this when you know exactly which
28//! collections to monitor and want fine-grained control.
29//!
30//! ```rust
31//! use rigatoni_core::watch_level::WatchLevel;
32//!
33//! let level = WatchLevel::Collection(vec!["users".to_string(), "orders".to_string()]);
34//! ```
35//!
36//! ## Database Level
37//!
38//! Watch all collections in a database. New collections are automatically
39//! included as they're created. This is the default and recommended mode
40//! for most use cases.
41//!
42//! ```rust
43//! use rigatoni_core::watch_level::WatchLevel;
44//!
45//! let level = WatchLevel::Database;
46//! ```
47//!
48//! ## Deployment Level
49//!
50//! Watch all databases in the deployment (cluster-wide). Use with caution
51//! as this can generate high event volume. Requires MongoDB 4.0+ and
52//! appropriate permissions.
53//!
54//! ```rust
55//! use rigatoni_core::watch_level::WatchLevel;
56//!
57//! let level = WatchLevel::Deployment;
58//! ```
59//!
60//! # Performance Considerations
61//!
62//! | Level | Streams | Auto-Discovery | Throughput | Recommended For |
63//! |-------|---------|----------------|------------|-----------------|
64//! | Collection | N (one per collection) | No | Highest (parallel) | Large DBs with known collections |
65//! | Database | 1 | Yes | Good | Most use cases (< 50 collections) |
66//! | Deployment | 1 | Yes | Variable | Monitoring/audit use cases |
67
68/// Defines the scope of collections to watch for changes.
69///
70/// This enum controls whether Rigatoni watches specific collections,
71/// an entire database, or all databases in a deployment.
72///
73/// # Examples
74///
75/// ```rust
76/// use rigatoni_core::watch_level::WatchLevel;
77///
78/// // Watch specific collections
79/// let collections = WatchLevel::Collection(vec!["users".to_string(), "orders".to_string()]);
80///
81/// // Watch entire database (default)
82/// let database = WatchLevel::Database;
83///
84/// // Watch entire deployment (cluster-wide)
85/// let deployment = WatchLevel::Deployment;
86///
87/// // Default is Database level
88/// assert_eq!(WatchLevel::default(), WatchLevel::Database);
89/// ```
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum WatchLevel {
92 /// Watch specific collections only.
93 ///
94 /// This is the most granular option, allowing you to specify exactly
95 /// which collections to monitor. Each collection gets its own change
96 /// stream, enabling parallel processing.
97 ///
98 /// **Advantages**:
99 /// - Maximum parallelism (one worker per collection)
100 /// - Can apply different batching/retry settings per collection
101 /// - Lower latency for specific collections
102 ///
103 /// **Disadvantages**:
104 /// - Must update configuration when adding new collections
105 /// - More MongoDB connections
106 ///
107 /// # Example
108 ///
109 /// ```rust
110 /// use rigatoni_core::watch_level::WatchLevel;
111 ///
112 /// let level = WatchLevel::Collection(vec![
113 /// "users".to_string(),
114 /// "orders".to_string(),
115 /// "products".to_string(),
116 /// ]);
117 /// ```
118 Collection(Vec<String>),
119
120 /// Watch all collections in the database.
121 ///
122 /// Automatically picks up new collections as they are created.
123 /// Uses MongoDB's `db.watch()` API to create a single change stream
124 /// for the entire database.
125 ///
126 /// **Advantages**:
127 /// - Automatic discovery of new collections
128 /// - Single change stream (simpler architecture)
129 /// - No configuration updates needed
130 ///
131 /// **Disadvantages**:
132 /// - Single stream may become bottleneck for high-volume databases
133 /// - Cannot apply per-collection settings
134 ///
135 /// **Requirements**:
136 /// - MongoDB replica set
137 ///
138 /// **Recommended for**: Databases with < 50 collections
139 ///
140 /// # Example
141 ///
142 /// ```rust
143 /// use rigatoni_core::pipeline::PipelineConfig;
144 ///
145 /// let config = PipelineConfig::builder()
146 /// .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
147 /// .database("mydb")
148 /// .watch_database() // Use Database watch level
149 /// .build();
150 /// ```
151 Database,
152
153 /// Watch all databases in the deployment (cluster-wide).
154 ///
155 /// Uses MongoDB's `client.watch()` API to monitor changes across
156 /// all databases in the deployment. This is the most comprehensive
157 /// option but also the most resource-intensive.
158 ///
159 /// **Advantages**:
160 /// - Complete visibility into all changes
161 /// - Single stream for entire cluster
162 /// - Useful for audit logging and compliance
163 ///
164 /// **Disadvantages**:
165 /// - High event volume
166 /// - Requires cluster-wide permissions
167 /// - Not suitable for multi-tenant environments (unless intended)
168 ///
169 /// **Requirements**:
170 /// - MongoDB 4.0+
171 /// - Cluster-wide read permissions
172 ///
173 /// **Recommended for**: Monitoring, audit logging, compliance use cases
174 ///
175 /// # Example
176 ///
177 /// ```rust
178 /// use rigatoni_core::pipeline::PipelineConfig;
179 ///
180 /// let config = PipelineConfig::builder()
181 /// .mongodb_uri("mongodb://localhost:27017/?replicaSet=rs0")
182 /// .database("mydb") // Still needed for state storage keys
183 /// .watch_deployment() // Watch all databases
184 /// .build();
185 /// ```
186 Deployment,
187}
188
189impl Default for WatchLevel {
190 /// Returns the default watch level: [`WatchLevel::Database`].
191 ///
192 /// Database-level watching is the recommended default as it provides
193 /// automatic collection discovery without the complexity of deployment-level
194 /// watching.
195 fn default() -> Self {
196 Self::Database
197 }
198}
199
200impl WatchLevel {
201 /// Returns `true` if this is collection-level watching.
202 ///
203 /// # Example
204 ///
205 /// ```rust
206 /// use rigatoni_core::watch_level::WatchLevel;
207 ///
208 /// let level = WatchLevel::Collection(vec!["users".to_string()]);
209 /// assert!(level.is_collection());
210 ///
211 /// let level = WatchLevel::Database;
212 /// assert!(!level.is_collection());
213 /// ```
214 #[must_use]
215 pub fn is_collection(&self) -> bool {
216 matches!(self, Self::Collection(_))
217 }
218
219 /// Returns `true` if this is database-level watching.
220 ///
221 /// # Example
222 ///
223 /// ```rust
224 /// use rigatoni_core::watch_level::WatchLevel;
225 ///
226 /// let level = WatchLevel::Database;
227 /// assert!(level.is_database());
228 /// ```
229 #[must_use]
230 pub fn is_database(&self) -> bool {
231 matches!(self, Self::Database)
232 }
233
234 /// Returns `true` if this is deployment-level (cluster-wide) watching.
235 ///
236 /// # Example
237 ///
238 /// ```rust
239 /// use rigatoni_core::watch_level::WatchLevel;
240 ///
241 /// let level = WatchLevel::Deployment;
242 /// assert!(level.is_deployment());
243 /// ```
244 #[must_use]
245 pub fn is_deployment(&self) -> bool {
246 matches!(self, Self::Deployment)
247 }
248
249 /// Returns the collections if this is collection-level watching.
250 ///
251 /// Returns `None` for database or deployment level watching.
252 ///
253 /// # Example
254 ///
255 /// ```rust
256 /// use rigatoni_core::watch_level::WatchLevel;
257 ///
258 /// let level = WatchLevel::Collection(vec!["users".to_string()]);
259 /// assert_eq!(level.collections(), Some(&vec!["users".to_string()]));
260 ///
261 /// let level = WatchLevel::Database;
262 /// assert_eq!(level.collections(), None);
263 /// ```
264 #[must_use]
265 pub fn collections(&self) -> Option<&Vec<String>> {
266 match self {
267 Self::Collection(collections) => Some(collections),
268 _ => None,
269 }
270 }
271
272 /// Returns a human-readable description of the watch level.
273 ///
274 /// # Example
275 ///
276 /// ```rust
277 /// use rigatoni_core::watch_level::WatchLevel;
278 ///
279 /// let level = WatchLevel::Collection(vec!["users".to_string(), "orders".to_string()]);
280 /// assert_eq!(level.description(), "2 collections");
281 ///
282 /// let level = WatchLevel::Database;
283 /// assert_eq!(level.description(), "database");
284 ///
285 /// let level = WatchLevel::Deployment;
286 /// assert_eq!(level.description(), "deployment");
287 /// ```
288 #[must_use]
289 pub fn description(&self) -> String {
290 match self {
291 Self::Collection(collections) => {
292 if collections.is_empty() {
293 "no collections".to_string()
294 } else if collections.len() == 1 {
295 format!("1 collection ({})", collections[0])
296 } else {
297 format!("{} collections", collections.len())
298 }
299 }
300 Self::Database => "database".to_string(),
301 Self::Deployment => "deployment".to_string(),
302 }
303 }
304
305 /// Returns the resume token key prefix for this watch level.
306 ///
307 /// Different watch levels use different resume token keys to avoid
308 /// conflicts when switching between levels.
309 ///
310 /// # Arguments
311 ///
312 /// * `database` - The database name (used for collection and database levels)
313 /// * `collection` - Optional collection name (used for collection level)
314 ///
315 /// # Example
316 ///
317 /// ```rust
318 /// use rigatoni_core::watch_level::WatchLevel;
319 ///
320 /// let level = WatchLevel::Collection(vec!["users".to_string()]);
321 /// assert_eq!(
322 /// level.resume_token_key("mydb", Some("users")),
323 /// "resume_token:mydb:users"
324 /// );
325 ///
326 /// let level = WatchLevel::Database;
327 /// assert_eq!(
328 /// level.resume_token_key("mydb", None),
329 /// "resume_token:database:mydb"
330 /// );
331 ///
332 /// let level = WatchLevel::Deployment;
333 /// assert_eq!(
334 /// level.resume_token_key("mydb", None),
335 /// "resume_token:deployment"
336 /// );
337 /// ```
338 #[must_use]
339 pub fn resume_token_key(&self, database: &str, collection: Option<&str>) -> String {
340 match self {
341 Self::Collection(_) => {
342 if let Some(coll) = collection {
343 format!("resume_token:{}:{}", database, coll)
344 } else {
345 format!("resume_token:{}", database)
346 }
347 }
348 Self::Database => {
349 format!("resume_token:database:{}", database)
350 }
351 Self::Deployment => "resume_token:deployment".to_string(),
352 }
353 }
354}
355
356impl std::fmt::Display for WatchLevel {
357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358 match self {
359 Self::Collection(collections) => {
360 if collections.is_empty() {
361 write!(f, "Collection([])")
362 } else {
363 write!(f, "Collection({:?})", collections)
364 }
365 }
366 Self::Database => write!(f, "Database"),
367 Self::Deployment => write!(f, "Deployment"),
368 }
369 }
370}