rigatoni_core/
event.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! `MongoDB` Change Stream Event Representation
18//!
19//! This module defines the core event types used throughout the Rigatoni data replication pipeline.
20//! Events represent `MongoDB` change stream operations and flow from sources to destinations.
21//!
22//! # Examples
23//!
24//! ```rust
25//! use rigatoni_core::event::{ChangeEvent, OperationType, Namespace};
26//! use bson::{doc, Document};
27//! use chrono::Utc;
28//!
29//! // Create an insert event manually
30//! let event = ChangeEvent {
31//!     operation: OperationType::Insert,
32//!     namespace: Namespace {
33//!         database: "mydb".to_string(),
34//!         collection: "users".to_string(),
35//!     },
36//!     document_key: Some(doc! { "_id": 123 }),
37//!     full_document: Some(doc! {
38//!         "_id": 123,
39//!         "name": "Alice",
40//!         "email": "alice@example.com"
41//!     }),
42//!     update_description: None,
43//!     cluster_time: Utc::now(),
44//!     resume_token: doc! { "_data": "token123" },
45//! };
46//!
47//! // Check operation type
48//! assert!(event.is_insert());
49//! assert_eq!(event.collection_name(), "users");
50//!
51//! // Access document data
52//! if let Some(doc) = &event.full_document {
53//!     println!("Inserted: {:?}", doc);
54//! }
55//! ```
56
57use bson::Document;
58use chrono::{DateTime, Utc};
59use serde::{Deserialize, Serialize};
60use std::fmt;
61
62/// Error that can occur when converting from `MongoDB` driver's `ChangeStreamEvent`.
63#[derive(Debug, Clone)]
64pub enum ConversionError {
65    /// Failed to convert resume token to BSON document
66    ResumeTokenConversion(String),
67}
68
69impl fmt::Display for ConversionError {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        match self {
72            Self::ResumeTokenConversion(msg) => {
73                write!(f, "Failed to convert resume token: {msg}")
74            }
75        }
76    }
77}
78
79impl std::error::Error for ConversionError {}
80
81/// `MongoDB` change stream operation types.
82///
83/// Represents all possible operations that can occur in a `MongoDB` change stream.
84/// Each variant corresponds to a specific database operation.
85///
86/// The `Unknown` variant allows forward compatibility with future `MongoDB` versions
87/// that may introduce new operation types.
88#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90#[non_exhaustive]
91pub enum OperationType {
92    /// A document was inserted into a collection
93    Insert,
94
95    /// A document was updated (modified in place)
96    Update,
97
98    /// A document was deleted from a collection
99    Delete,
100
101    /// A document was replaced entirely (all fields changed)
102    Replace,
103
104    /// The change stream was invalidated (collection dropped, renamed, etc.)
105    Invalidate,
106
107    /// A collection was dropped
108    Drop,
109
110    /// A database was dropped
111    #[serde(rename = "dropdatabase")]
112    DropDatabase,
113
114    /// A collection was renamed
115    Rename,
116
117    /// An unknown operation type from a newer `MongoDB` version
118    ///
119    /// Contains the original operation type string for logging and debugging.
120    #[serde(untagged)]
121    Unknown(String),
122}
123
124impl OperationType {
125    /// Returns true if this operation modifies data (insert, update, replace).
126    #[inline]
127    #[must_use]
128    pub const fn is_data_modification(&self) -> bool {
129        matches!(self, Self::Insert | Self::Update | Self::Replace)
130    }
131
132    /// Returns true if this operation removes data (delete, drop, drop database).
133    #[inline]
134    #[must_use]
135    pub const fn is_data_removal(&self) -> bool {
136        matches!(self, Self::Delete | Self::Drop | Self::DropDatabase)
137    }
138
139    /// Returns true if this operation is a DDL operation (drop, rename, drop database).
140    #[inline]
141    #[must_use]
142    pub const fn is_ddl(&self) -> bool {
143        matches!(self, Self::Drop | Self::DropDatabase | Self::Rename)
144    }
145
146    /// Returns true if this is an unknown operation type.
147    ///
148    /// Unknown operation types may appear when using a newer `MongoDB` version
149    /// than this library was designed for.
150    #[inline]
151    #[must_use]
152    pub const fn is_unknown(&self) -> bool {
153        matches!(self, Self::Unknown(_))
154    }
155
156    /// Returns the operation type as a static string for metrics labels.
157    ///
158    /// This is used for consistent metric labeling without allocations.
159    #[must_use]
160    pub fn as_str(&self) -> &str {
161        match self {
162            Self::Insert => "insert",
163            Self::Update => "update",
164            Self::Delete => "delete",
165            Self::Replace => "replace",
166            Self::Invalidate => "invalidate",
167            Self::Drop => "drop",
168            Self::DropDatabase => "dropdatabase",
169            Self::Rename => "rename",
170            Self::Unknown(s) => s.as_str(),
171        }
172    }
173}
174
175/// `MongoDB` namespace (database + collection).
176///
177/// Identifies the specific collection where an operation occurred.
178#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
179pub struct Namespace {
180    /// Database name
181    pub database: String,
182
183    /// Collection name
184    pub collection: String,
185}
186
187impl Namespace {
188    /// Creates a new namespace from database and collection names.
189    pub fn new(database: impl Into<String>, collection: impl Into<String>) -> Self {
190        Self {
191            database: database.into(),
192            collection: collection.into(),
193        }
194    }
195
196    /// Returns the fully qualified namespace as "database.collection".
197    #[must_use]
198    pub fn full_name(&self) -> String {
199        format!("{}.{}", self.database, self.collection)
200    }
201}
202
203/// Update description for partial document updates.
204///
205/// When a document is updated (not replaced), this describes what changed.
206#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
207pub struct UpdateDescription {
208    /// Fields that were added or modified
209    #[serde(rename = "updatedFields")]
210    pub updated_fields: Document,
211
212    /// Fields that were removed from the document
213    #[serde(rename = "removedFields")]
214    pub removed_fields: Vec<String>,
215
216    /// Array modifications (if any)
217    #[serde(rename = "truncatedArrays", skip_serializing_if = "Option::is_none")]
218    pub truncated_arrays: Option<Vec<TruncatedArray>>,
219}
220
221/// Describes modifications to an array field.
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub struct TruncatedArray {
224    /// Field path to the array
225    pub field: String,
226
227    /// New size of the array after truncation
228    #[serde(rename = "newSize")]
229    pub new_size: u32,
230}
231
232/// A `MongoDB` change stream event.
233///
234/// This is the primary type that flows through the Rigatoni pipeline.
235/// It represents a single change operation from `MongoDB` change streams.
236///
237/// # Memory Layout
238///
239/// The struct uses owned data for all fields to ensure safe transfer between
240/// async tasks and threads. Fields that may not be present use `Option<T>`.
241///
242/// Approximate memory size: 200-500 bytes depending on document sizes.
243///
244/// # Examples
245///
246/// ```rust
247/// use rigatoni_core::event::{ChangeEvent, OperationType};
248///
249/// fn process_event(event: &ChangeEvent) {
250///     match event.operation {
251///         OperationType::Insert => {
252///             println!("New document in {}", event.collection_name());
253///             if let Some(doc) = &event.full_document {
254///                 println!("Data: {:?}", doc);
255///             }
256///         }
257///         OperationType::Update => {
258///             println!("Document updated in {}", event.collection_name());
259///             if let Some(desc) = &event.update_description {
260///                 let keys: Vec<_> = desc.updated_fields.keys().collect();
261///                 println!("Changed fields: {:?}", keys);
262///             }
263///         }
264///         OperationType::Delete => {
265///             println!("Document deleted from {}", event.collection_name());
266///             println!("Key: {:?}", event.document_key);
267///         }
268///         _ => println!("Other operation: {:?}", event.operation),
269///     }
270/// }
271/// ```
272#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
273pub struct ChangeEvent {
274    /// Type of operation that occurred
275    #[serde(rename = "operationType")]
276    pub operation: OperationType,
277
278    /// Namespace (database + collection) where the operation occurred
279    #[serde(rename = "ns")]
280    pub namespace: Namespace,
281
282    /// Document key (_id and shard key if sharded)
283    ///
284    /// Present for all operations except invalidate.
285    /// For invalidate events, this will be None.
286    #[serde(rename = "documentKey", skip_serializing_if = "Option::is_none")]
287    pub document_key: Option<Document>,
288
289    /// Full document after the operation
290    ///
291    /// Present for: insert (always), replace (always), update (if configured),
292    /// delete (never, unless configured for pre-images).
293    #[serde(rename = "fullDocument", skip_serializing_if = "Option::is_none")]
294    pub full_document: Option<Document>,
295
296    /// Description of what changed in an update operation
297    ///
298    /// Present only for update operations.
299    #[serde(rename = "updateDescription", skip_serializing_if = "Option::is_none")]
300    pub update_description: Option<UpdateDescription>,
301
302    /// Timestamp of the operation in the oplog
303    #[serde(rename = "clusterTime")]
304    pub cluster_time: DateTime<Utc>,
305
306    /// Resume token for this event
307    ///
308    /// Can be used to resume the change stream from this point.
309    #[serde(rename = "_id")]
310    pub resume_token: Document,
311}
312
313impl ChangeEvent {
314    /// Returns true if this is an insert operation.
315    #[inline]
316    #[must_use]
317    pub fn is_insert(&self) -> bool {
318        self.operation == OperationType::Insert
319    }
320
321    /// Returns true if this is an update operation.
322    #[inline]
323    #[must_use]
324    pub fn is_update(&self) -> bool {
325        self.operation == OperationType::Update
326    }
327
328    /// Returns true if this is a delete operation.
329    #[inline]
330    #[must_use]
331    pub fn is_delete(&self) -> bool {
332        self.operation == OperationType::Delete
333    }
334
335    /// Returns true if this is a replace operation.
336    #[inline]
337    #[must_use]
338    pub fn is_replace(&self) -> bool {
339        self.operation == OperationType::Replace
340    }
341
342    /// Returns true if this is an invalidate operation.
343    #[inline]
344    #[must_use]
345    pub fn is_invalidate(&self) -> bool {
346        self.operation == OperationType::Invalidate
347    }
348
349    /// Returns the collection name.
350    #[inline]
351    #[must_use]
352    pub fn collection_name(&self) -> &str {
353        &self.namespace.collection
354    }
355
356    /// Returns the database name.
357    #[inline]
358    #[must_use]
359    pub fn database_name(&self) -> &str {
360        &self.namespace.database
361    }
362
363    /// Returns the fully qualified namespace as "database.collection".
364    #[inline]
365    #[must_use]
366    pub fn full_namespace(&self) -> String {
367        self.namespace.full_name()
368    }
369
370    /// Returns the document ID if present in the document key.
371    ///
372    /// Most `MongoDB` documents have an `_id` field in the document key.
373    /// Returns None if `document_key` is not present (e.g., invalidate events).
374    #[must_use]
375    pub fn document_id(&self) -> Option<&bson::Bson> {
376        self.document_key.as_ref()?.get("_id")
377    }
378
379    /// Returns true if this event has a full document.
380    ///
381    /// Useful for checking if the document data is available.
382    #[inline]
383    #[must_use]
384    pub const fn has_full_document(&self) -> bool {
385        self.full_document.is_some()
386    }
387
388    /// Returns true if this event has update description.
389    ///
390    /// Only present for update operations.
391    #[inline]
392    #[must_use]
393    pub const fn has_update_description(&self) -> bool {
394        self.update_description.is_some()
395    }
396
397    /// Returns the size estimate of this event in bytes.
398    ///
399    /// Useful for batching and memory management.
400    #[must_use]
401    pub fn estimated_size_bytes(&self) -> usize {
402        let mut size = std::mem::size_of::<Self>();
403
404        // Add document sizes (rough estimate)
405        if let Some(doc) = &self.full_document {
406            size += estimate_document_size(doc);
407        }
408
409        if let Some(update_desc) = &self.update_description {
410            size += estimate_document_size(&update_desc.updated_fields);
411            size += update_desc
412                .removed_fields
413                .iter()
414                .map(String::len)
415                .sum::<usize>();
416        }
417
418        if let Some(doc_key) = &self.document_key {
419            size += estimate_document_size(doc_key);
420        }
421        size += estimate_document_size(&self.resume_token);
422
423        size
424    }
425}
426
427/// Estimates the serialized size of a BSON document in bytes.
428fn estimate_document_size(doc: &Document) -> usize {
429    // Simple estimation: each key + value pair ~= 50 bytes average
430    // This is a rough heuristic; actual size varies widely
431    doc.len() * 50
432}
433
434/// Conversion from `MongoDB` driver's `ChangeStreamEvent`.
435///
436/// This enables seamless integration with the official `MongoDB` Rust driver.
437/// Returns an error if the resume token cannot be converted to a BSON document.
438impl TryFrom<mongodb::change_stream::event::ChangeStreamEvent<Document>> for ChangeEvent {
439    type Error = ConversionError;
440
441    fn try_from(
442        event: mongodb::change_stream::event::ChangeStreamEvent<Document>,
443    ) -> Result<Self, Self::Error> {
444        use mongodb::change_stream::event::OperationType as MongoOpType;
445
446        // Convert operation type
447        let operation = match event.operation_type {
448            MongoOpType::Insert => OperationType::Insert,
449            MongoOpType::Update => OperationType::Update,
450            MongoOpType::Delete => OperationType::Delete,
451            MongoOpType::Replace => OperationType::Replace,
452            MongoOpType::Invalidate => OperationType::Invalidate,
453            MongoOpType::Drop => OperationType::Drop,
454            MongoOpType::DropDatabase => OperationType::DropDatabase,
455            MongoOpType::Rename => OperationType::Rename,
456            _ => {
457                // For any unknown operation types, preserve the original type string
458                // This ensures forward compatibility with new MongoDB versions
459                let op_str = format!("{:?}", event.operation_type);
460                eprintln!(
461                    "Warning: Unknown MongoDB operation type encountered: {op_str}. \
462                     This may indicate a newer MongoDB version than supported."
463                );
464                OperationType::Unknown(op_str)
465            }
466        };
467
468        // Convert namespace
469        let namespace = event.ns.map_or_else(
470            || Namespace {
471                database: String::new(),
472                collection: String::new(),
473            },
474            |ns| Namespace {
475                database: ns.db,
476                collection: ns.coll.unwrap_or_default(),
477            },
478        );
479
480        // Convert update description
481        let update_description = event.update_description.map(|ud| UpdateDescription {
482            updated_fields: ud.updated_fields,
483            removed_fields: ud.removed_fields,
484            truncated_arrays: ud.truncated_arrays.map(|arrays| {
485                arrays
486                    .into_iter()
487                    .map(|ta| TruncatedArray {
488                        field: ta.field,
489                        new_size: u32::try_from(ta.new_size).unwrap_or(0),
490                    })
491                    .collect()
492            }),
493        });
494
495        // Convert cluster time to chrono DateTime, preserving increment as nanoseconds
496        // MongoDB Timestamp has both time (seconds) and increment (counter within that second)
497        // We map increment to nanoseconds to preserve ordering of events within the same second
498        let cluster_time = event
499            .cluster_time
500            .map_or_else(|| {
501                eprintln!("Warning: Missing cluster_time in ChangeStreamEvent, using current time");
502                Utc::now()
503            }, |ts| {
504                let seconds = i64::from(ts.time);
505                // Map increment to nanoseconds for sub-second precision
506                // This preserves event ordering within the same second
507                let nanos = ts.increment * 1_000_000; // Scale increment to nanosecond range
508                DateTime::from_timestamp(seconds, nanos)
509                    .unwrap_or_else(|| {
510                        // Log error in production - this should never happen with valid MongoDB data
511                        eprintln!(
512                            "Warning: Invalid MongoDB timestamp (time={}, increment={}), using current time",
513                            ts.time, ts.increment
514                        );
515                        Utc::now()
516                    })
517            });
518
519        // Convert resume token - this is critical for stream resumption
520        let resume_token = bson::to_document(&event.id).map_err(|e| {
521            ConversionError::ResumeTokenConversion(format!(
522                "Failed to serialize resume token to BSON document: {e}"
523            ))
524        })?;
525
526        Ok(Self {
527            operation,
528            namespace,
529            document_key: event.document_key,
530            full_document: event.full_document,
531            update_description,
532            cluster_time,
533            resume_token,
534        })
535    }
536}