rigatoni_core/event.rs
1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! `MongoDB` Change Stream Event Representation
18//!
19//! This module defines the core event types used throughout the Rigatoni data replication pipeline.
20//! Events represent `MongoDB` change stream operations and flow from sources to destinations.
21//!
22//! # Examples
23//!
24//! ```rust
25//! use rigatoni_core::event::{ChangeEvent, OperationType, Namespace};
26//! use bson::{doc, Document};
27//! use chrono::Utc;
28//!
29//! // Create an insert event manually
30//! let event = ChangeEvent {
31//! operation: OperationType::Insert,
32//! namespace: Namespace {
33//! database: "mydb".to_string(),
34//! collection: "users".to_string(),
35//! },
36//! document_key: Some(doc! { "_id": 123 }),
37//! full_document: Some(doc! {
38//! "_id": 123,
39//! "name": "Alice",
40//! "email": "alice@example.com"
41//! }),
42//! update_description: None,
43//! cluster_time: Utc::now(),
44//! resume_token: doc! { "_data": "token123" },
45//! };
46//!
47//! // Check operation type
48//! assert!(event.is_insert());
49//! assert_eq!(event.collection_name(), "users");
50//!
51//! // Access document data
52//! if let Some(doc) = &event.full_document {
53//! println!("Inserted: {:?}", doc);
54//! }
55//! ```
56
57use bson::Document;
58use chrono::{DateTime, Utc};
59use serde::{Deserialize, Serialize};
60use std::fmt;
61
62/// Error that can occur when converting from `MongoDB` driver's `ChangeStreamEvent`.
63#[derive(Debug, Clone)]
64pub enum ConversionError {
65 /// Failed to convert resume token to BSON document
66 ResumeTokenConversion(String),
67}
68
69impl fmt::Display for ConversionError {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 match self {
72 Self::ResumeTokenConversion(msg) => {
73 write!(f, "Failed to convert resume token: {msg}")
74 }
75 }
76 }
77}
78
79impl std::error::Error for ConversionError {}
80
81/// `MongoDB` change stream operation types.
82///
83/// Represents all possible operations that can occur in a `MongoDB` change stream.
84/// Each variant corresponds to a specific database operation.
85///
86/// The `Unknown` variant allows forward compatibility with future `MongoDB` versions
87/// that may introduce new operation types.
88#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90#[non_exhaustive]
91pub enum OperationType {
92 /// A document was inserted into a collection
93 Insert,
94
95 /// A document was updated (modified in place)
96 Update,
97
98 /// A document was deleted from a collection
99 Delete,
100
101 /// A document was replaced entirely (all fields changed)
102 Replace,
103
104 /// The change stream was invalidated (collection dropped, renamed, etc.)
105 Invalidate,
106
107 /// A collection was dropped
108 Drop,
109
110 /// A database was dropped
111 #[serde(rename = "dropdatabase")]
112 DropDatabase,
113
114 /// A collection was renamed
115 Rename,
116
117 /// An unknown operation type from a newer `MongoDB` version
118 ///
119 /// Contains the original operation type string for logging and debugging.
120 #[serde(untagged)]
121 Unknown(String),
122}
123
124impl OperationType {
125 /// Returns true if this operation modifies data (insert, update, replace).
126 #[inline]
127 #[must_use]
128 pub const fn is_data_modification(&self) -> bool {
129 matches!(self, Self::Insert | Self::Update | Self::Replace)
130 }
131
132 /// Returns true if this operation removes data (delete, drop, drop database).
133 #[inline]
134 #[must_use]
135 pub const fn is_data_removal(&self) -> bool {
136 matches!(self, Self::Delete | Self::Drop | Self::DropDatabase)
137 }
138
139 /// Returns true if this operation is a DDL operation (drop, rename, drop database).
140 #[inline]
141 #[must_use]
142 pub const fn is_ddl(&self) -> bool {
143 matches!(self, Self::Drop | Self::DropDatabase | Self::Rename)
144 }
145
146 /// Returns true if this is an unknown operation type.
147 ///
148 /// Unknown operation types may appear when using a newer `MongoDB` version
149 /// than this library was designed for.
150 #[inline]
151 #[must_use]
152 pub const fn is_unknown(&self) -> bool {
153 matches!(self, Self::Unknown(_))
154 }
155
156 /// Returns the operation type as a static string for metrics labels.
157 ///
158 /// This is used for consistent metric labeling without allocations.
159 #[must_use]
160 pub fn as_str(&self) -> &str {
161 match self {
162 Self::Insert => "insert",
163 Self::Update => "update",
164 Self::Delete => "delete",
165 Self::Replace => "replace",
166 Self::Invalidate => "invalidate",
167 Self::Drop => "drop",
168 Self::DropDatabase => "dropdatabase",
169 Self::Rename => "rename",
170 Self::Unknown(s) => s.as_str(),
171 }
172 }
173}
174
175/// `MongoDB` namespace (database + collection).
176///
177/// Identifies the specific collection where an operation occurred.
178#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
179pub struct Namespace {
180 /// Database name
181 pub database: String,
182
183 /// Collection name
184 pub collection: String,
185}
186
187impl Namespace {
188 /// Creates a new namespace from database and collection names.
189 pub fn new(database: impl Into<String>, collection: impl Into<String>) -> Self {
190 Self {
191 database: database.into(),
192 collection: collection.into(),
193 }
194 }
195
196 /// Returns the fully qualified namespace as "database.collection".
197 #[must_use]
198 pub fn full_name(&self) -> String {
199 format!("{}.{}", self.database, self.collection)
200 }
201}
202
203/// Update description for partial document updates.
204///
205/// When a document is updated (not replaced), this describes what changed.
206#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
207pub struct UpdateDescription {
208 /// Fields that were added or modified
209 #[serde(rename = "updatedFields")]
210 pub updated_fields: Document,
211
212 /// Fields that were removed from the document
213 #[serde(rename = "removedFields")]
214 pub removed_fields: Vec<String>,
215
216 /// Array modifications (if any)
217 #[serde(rename = "truncatedArrays", skip_serializing_if = "Option::is_none")]
218 pub truncated_arrays: Option<Vec<TruncatedArray>>,
219}
220
221/// Describes modifications to an array field.
222#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
223pub struct TruncatedArray {
224 /// Field path to the array
225 pub field: String,
226
227 /// New size of the array after truncation
228 #[serde(rename = "newSize")]
229 pub new_size: u32,
230}
231
232/// A `MongoDB` change stream event.
233///
234/// This is the primary type that flows through the Rigatoni pipeline.
235/// It represents a single change operation from `MongoDB` change streams.
236///
237/// # Memory Layout
238///
239/// The struct uses owned data for all fields to ensure safe transfer between
240/// async tasks and threads. Fields that may not be present use `Option<T>`.
241///
242/// Approximate memory size: 200-500 bytes depending on document sizes.
243///
244/// # Examples
245///
246/// ```rust
247/// use rigatoni_core::event::{ChangeEvent, OperationType};
248///
249/// fn process_event(event: &ChangeEvent) {
250/// match event.operation {
251/// OperationType::Insert => {
252/// println!("New document in {}", event.collection_name());
253/// if let Some(doc) = &event.full_document {
254/// println!("Data: {:?}", doc);
255/// }
256/// }
257/// OperationType::Update => {
258/// println!("Document updated in {}", event.collection_name());
259/// if let Some(desc) = &event.update_description {
260/// let keys: Vec<_> = desc.updated_fields.keys().collect();
261/// println!("Changed fields: {:?}", keys);
262/// }
263/// }
264/// OperationType::Delete => {
265/// println!("Document deleted from {}", event.collection_name());
266/// println!("Key: {:?}", event.document_key);
267/// }
268/// _ => println!("Other operation: {:?}", event.operation),
269/// }
270/// }
271/// ```
272#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
273pub struct ChangeEvent {
274 /// Type of operation that occurred
275 #[serde(rename = "operationType")]
276 pub operation: OperationType,
277
278 /// Namespace (database + collection) where the operation occurred
279 #[serde(rename = "ns")]
280 pub namespace: Namespace,
281
282 /// Document key (_id and shard key if sharded)
283 ///
284 /// Present for all operations except invalidate.
285 /// For invalidate events, this will be None.
286 #[serde(rename = "documentKey", skip_serializing_if = "Option::is_none")]
287 pub document_key: Option<Document>,
288
289 /// Full document after the operation
290 ///
291 /// Present for: insert (always), replace (always), update (if configured),
292 /// delete (never, unless configured for pre-images).
293 #[serde(rename = "fullDocument", skip_serializing_if = "Option::is_none")]
294 pub full_document: Option<Document>,
295
296 /// Description of what changed in an update operation
297 ///
298 /// Present only for update operations.
299 #[serde(rename = "updateDescription", skip_serializing_if = "Option::is_none")]
300 pub update_description: Option<UpdateDescription>,
301
302 /// Timestamp of the operation in the oplog
303 #[serde(rename = "clusterTime")]
304 pub cluster_time: DateTime<Utc>,
305
306 /// Resume token for this event
307 ///
308 /// Can be used to resume the change stream from this point.
309 #[serde(rename = "_id")]
310 pub resume_token: Document,
311}
312
313impl ChangeEvent {
314 /// Returns true if this is an insert operation.
315 #[inline]
316 #[must_use]
317 pub fn is_insert(&self) -> bool {
318 self.operation == OperationType::Insert
319 }
320
321 /// Returns true if this is an update operation.
322 #[inline]
323 #[must_use]
324 pub fn is_update(&self) -> bool {
325 self.operation == OperationType::Update
326 }
327
328 /// Returns true if this is a delete operation.
329 #[inline]
330 #[must_use]
331 pub fn is_delete(&self) -> bool {
332 self.operation == OperationType::Delete
333 }
334
335 /// Returns true if this is a replace operation.
336 #[inline]
337 #[must_use]
338 pub fn is_replace(&self) -> bool {
339 self.operation == OperationType::Replace
340 }
341
342 /// Returns true if this is an invalidate operation.
343 #[inline]
344 #[must_use]
345 pub fn is_invalidate(&self) -> bool {
346 self.operation == OperationType::Invalidate
347 }
348
349 /// Returns the collection name.
350 #[inline]
351 #[must_use]
352 pub fn collection_name(&self) -> &str {
353 &self.namespace.collection
354 }
355
356 /// Returns the database name.
357 #[inline]
358 #[must_use]
359 pub fn database_name(&self) -> &str {
360 &self.namespace.database
361 }
362
363 /// Returns the fully qualified namespace as "database.collection".
364 #[inline]
365 #[must_use]
366 pub fn full_namespace(&self) -> String {
367 self.namespace.full_name()
368 }
369
370 /// Returns the document ID if present in the document key.
371 ///
372 /// Most `MongoDB` documents have an `_id` field in the document key.
373 /// Returns None if `document_key` is not present (e.g., invalidate events).
374 #[must_use]
375 pub fn document_id(&self) -> Option<&bson::Bson> {
376 self.document_key.as_ref()?.get("_id")
377 }
378
379 /// Returns true if this event has a full document.
380 ///
381 /// Useful for checking if the document data is available.
382 #[inline]
383 #[must_use]
384 pub const fn has_full_document(&self) -> bool {
385 self.full_document.is_some()
386 }
387
388 /// Returns true if this event has update description.
389 ///
390 /// Only present for update operations.
391 #[inline]
392 #[must_use]
393 pub const fn has_update_description(&self) -> bool {
394 self.update_description.is_some()
395 }
396
397 /// Returns the size estimate of this event in bytes.
398 ///
399 /// Useful for batching and memory management.
400 #[must_use]
401 pub fn estimated_size_bytes(&self) -> usize {
402 let mut size = std::mem::size_of::<Self>();
403
404 // Add document sizes (rough estimate)
405 if let Some(doc) = &self.full_document {
406 size += estimate_document_size(doc);
407 }
408
409 if let Some(update_desc) = &self.update_description {
410 size += estimate_document_size(&update_desc.updated_fields);
411 size += update_desc
412 .removed_fields
413 .iter()
414 .map(String::len)
415 .sum::<usize>();
416 }
417
418 if let Some(doc_key) = &self.document_key {
419 size += estimate_document_size(doc_key);
420 }
421 size += estimate_document_size(&self.resume_token);
422
423 size
424 }
425}
426
427/// Estimates the serialized size of a BSON document in bytes.
428fn estimate_document_size(doc: &Document) -> usize {
429 // Simple estimation: each key + value pair ~= 50 bytes average
430 // This is a rough heuristic; actual size varies widely
431 doc.len() * 50
432}
433
434/// Conversion from `MongoDB` driver's `ChangeStreamEvent`.
435///
436/// This enables seamless integration with the official `MongoDB` Rust driver.
437/// Returns an error if the resume token cannot be converted to a BSON document.
438impl TryFrom<mongodb::change_stream::event::ChangeStreamEvent<Document>> for ChangeEvent {
439 type Error = ConversionError;
440
441 fn try_from(
442 event: mongodb::change_stream::event::ChangeStreamEvent<Document>,
443 ) -> Result<Self, Self::Error> {
444 use mongodb::change_stream::event::OperationType as MongoOpType;
445
446 // Convert operation type
447 let operation = match event.operation_type {
448 MongoOpType::Insert => OperationType::Insert,
449 MongoOpType::Update => OperationType::Update,
450 MongoOpType::Delete => OperationType::Delete,
451 MongoOpType::Replace => OperationType::Replace,
452 MongoOpType::Invalidate => OperationType::Invalidate,
453 MongoOpType::Drop => OperationType::Drop,
454 MongoOpType::DropDatabase => OperationType::DropDatabase,
455 MongoOpType::Rename => OperationType::Rename,
456 _ => {
457 // For any unknown operation types, preserve the original type string
458 // This ensures forward compatibility with new MongoDB versions
459 let op_str = format!("{:?}", event.operation_type);
460 eprintln!(
461 "Warning: Unknown MongoDB operation type encountered: {op_str}. \
462 This may indicate a newer MongoDB version than supported."
463 );
464 OperationType::Unknown(op_str)
465 }
466 };
467
468 // Convert namespace
469 let namespace = event.ns.map_or_else(
470 || Namespace {
471 database: String::new(),
472 collection: String::new(),
473 },
474 |ns| Namespace {
475 database: ns.db,
476 collection: ns.coll.unwrap_or_default(),
477 },
478 );
479
480 // Convert update description
481 let update_description = event.update_description.map(|ud| UpdateDescription {
482 updated_fields: ud.updated_fields,
483 removed_fields: ud.removed_fields,
484 truncated_arrays: ud.truncated_arrays.map(|arrays| {
485 arrays
486 .into_iter()
487 .map(|ta| TruncatedArray {
488 field: ta.field,
489 new_size: u32::try_from(ta.new_size).unwrap_or(0),
490 })
491 .collect()
492 }),
493 });
494
495 // Convert cluster time to chrono DateTime, preserving increment as nanoseconds
496 // MongoDB Timestamp has both time (seconds) and increment (counter within that second)
497 // We map increment to nanoseconds to preserve ordering of events within the same second
498 let cluster_time = event
499 .cluster_time
500 .map_or_else(|| {
501 eprintln!("Warning: Missing cluster_time in ChangeStreamEvent, using current time");
502 Utc::now()
503 }, |ts| {
504 let seconds = i64::from(ts.time);
505 // Map increment to nanoseconds for sub-second precision
506 // This preserves event ordering within the same second
507 let nanos = ts.increment * 1_000_000; // Scale increment to nanosecond range
508 DateTime::from_timestamp(seconds, nanos)
509 .unwrap_or_else(|| {
510 // Log error in production - this should never happen with valid MongoDB data
511 eprintln!(
512 "Warning: Invalid MongoDB timestamp (time={}, increment={}), using current time",
513 ts.time, ts.increment
514 );
515 Utc::now()
516 })
517 });
518
519 // Convert resume token - this is critical for stream resumption
520 let resume_token = bson::to_document(&event.id).map_err(|e| {
521 ConversionError::ResumeTokenConversion(format!(
522 "Failed to serialize resume token to BSON document: {e}"
523 ))
524 })?;
525
526 Ok(Self {
527 operation,
528 namespace,
529 document_key: event.document_key,
530 full_document: event.full_document,
531 update_description,
532 cluster_time,
533 resume_token,
534 })
535 }
536}