db_library/database/
mongodb.rs

1//! MongoDB Document Listener
2//!
3//! This module provides `MongoDocumentListener`, which listens for changes in specified MongoDB collections.
4//! It utilizes MongoDB's Change Streams to capture and relay events asynchronously.
5//!
6//! # Features
7//! - Monitors specified collections for `INSERT`, `UPDATE`, and `DELETE` operations.
8//! - Uses MongoDB's Change Streams for real-time event detection.
9//! - Filters events based on operation types (`INSERT`, `UPDATE`, `DELETE`).
10//! - Sends notifications through async channels.
11//! - Provides a structured API to start and stop listeners.
12//!
13//! # Dependencies
14//! - `mongodb` for database interaction
15//! - `tokio` for async execution
16//! - `serde` and `serde_json` for JSON serialization
17//! - `tracing` for logging
18
19use std::{collections::HashMap, sync::Arc};
20
21use async_trait::async_trait;
22use mongodb::{
23    bson::Document,
24    change_stream::{
25        event::{ChangeStreamEvent, OperationType},
26        ChangeStream,
27    },
28    options::{ClientOptions, FullDocumentBeforeChangeType, FullDocumentType},
29    Client,
30};
31use once_cell::sync::Lazy;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use tokio::{
35    sync::{
36        mpsc::{self, Receiver, Sender},
37        Mutex, RwLock,
38    },
39    task::JoinHandle,
40};
41use tracing::{error, info};
42
43use crate::config::DBListenerError;
44
45use super::{DBListenerTrait, EventType};
46
47static MONGODB_CLIENT_REGISTERY: Lazy<RwLock<HashMap<String, Arc<Client>>>> =
48    Lazy::new(|| RwLock::new(HashMap::new()));
49
50async fn get_or_create_client(db_url: &str) -> Result<Arc<Client>, DBListenerError> {
51    let mut clients = MONGODB_CLIENT_REGISTERY.write().await;
52
53    if let Some(client) = clients.get(db_url) {
54        Ok(Arc::clone(client))
55    } else {
56        let client_options = ClientOptions::parse(db_url).await;
57
58        if let Err(err) = client_options {
59            error!("Failed to connect to the client: {:?}", err);
60            return Err(DBListenerError::CreationError(format!(
61                "Failed to connect to the client url : {:#?}",
62                err
63            )));
64        }
65
66        let mut client_options = client_options.unwrap();
67
68        client_options.max_pool_size = Some(10);
69
70        let new_client = Client::with_options(client_options);
71
72        if let Err(err) = new_client {
73            error!("Failed to create client : {:#?}", err);
74            return Err(DBListenerError::CreationError(format!(
75                "Failed to create the client : {:#?}",
76                err
77            )));
78        }
79
80        let new_client = Arc::new(new_client.unwrap());
81
82        clients.insert(db_url.to_string(), Arc::clone(&new_client));
83
84        Ok(new_client)
85    }
86}
87
88#[derive(Debug, Clone)]
89pub struct MongoDocumentListener {
90    pub client: Arc<Client>,
91    pub database: String,
92    pub collection: String,
93    pub sender: Sender<Value>,
94    pub receiver: Arc<Mutex<Receiver<Value>>>,
95    pub events: Vec<EventType>,
96}
97
98impl MongoDocumentListener {
99    pub async fn new(
100        url: &str,
101        database: &str,
102        collection: &str,
103        events: Vec<EventType>,
104    ) -> Result<Self, DBListenerError> {
105        let client = get_or_create_client(url).await?;
106
107        let (sender, receiver) = mpsc::channel::<Value>(100); // Channel with buffer size 100
108
109        let mongo_document_listener = Self {
110            client: Arc::clone(&client),
111            collection: collection.to_string(),
112            database: database.to_string(),
113            events,
114            receiver: Arc::new(Mutex::new(receiver)),
115            sender,
116        };
117
118        mongo_document_listener.verify_members().await?;
119
120        Ok(mongo_document_listener)
121    }
122
123    pub async fn verify_members(&self) -> Result<(), DBListenerError> {
124        let db = self.client.database(&self.database);
125        let collection_names = db.list_collection_names().await.map_err(|err| {
126            error!("Failed to list collections: {:?}", err);
127            DBListenerError::ListenerVerifyError(format!("Error listing collections: {:#?}", err))
128        })?;
129
130        if !collection_names.contains(&self.collection) {
131            return Err(DBListenerError::ListenerVerifyError(format!(
132                "Collection `{}` does not exist in database `{}`",
133                self.collection, self.database
134            )));
135        }
136
137        Ok(())
138    }
139
140    async fn initialize_change_stream(
141        &self,
142    ) -> Result<ChangeStream<ChangeStreamEvent<Document>>, DBListenerError> {
143        let db = self.client.database(&self.database);
144        let collection = db.collection::<Document>(&self.collection);
145
146        collection
147            .watch()
148            .full_document_before_change(FullDocumentBeforeChangeType::WhenAvailable)
149            .full_document(FullDocumentType::UpdateLookup)
150            .await
151            .map_err(|e| {
152                error!("Failed to start MongoDB Change Stream: {:?}", e);
153                DBListenerError::ListenerError(e.to_string())
154            })
155    }
156
157    fn spawn_listener_task(
158        &self,
159        mut change_stream: ChangeStream<ChangeStreamEvent<Document>>,
160    ) -> JoinHandle<()> {
161        let sender_clone = self.sender.clone();
162        let allowed_events = self.events.clone();
163        let collection_name = self.collection.clone();
164
165        // Initialize base notification structure
166        let mongo_notify = MongoNotify {
167            collection: self.collection.clone(),
168            database: self.database.clone(),
169            old_document: None,
170            new_document: None,
171            timestamp: String::new(),
172            operation: None,
173        };
174
175        tokio::spawn(async move {
176            let allowed_events = Arc::new(allowed_events);
177
178            info!(
179                "MongoDB Change Stream listener started for `{}` collection",
180                collection_name
181            );
182
183            while change_stream.is_alive() {
184                let notification = mongo_notify.clone();
185
186                match change_stream.next_if_any().await {
187                    Ok(Some(change)) => {
188                        println!("change stream : {:#?}", change);
189                        if let Some(processed_notification) =
190                            process_change_event(change, Arc::clone(&allowed_events), notification)
191                        {
192                            // Send the processed notification
193                            if let Ok(json_data) = serde_json::to_value(processed_notification) {
194                                if let Err(e) = sender_clone.send(json_data).await {
195                                    error!("Failed to send notification: {:?}", e);
196                                }
197                            } else {
198                                error!("Failed to serialize the mongo notification");
199                            }
200                        }
201                    }
202                    Ok(None) => continue,
203                    Err(e) => {
204                        error!("Failed to get event: {:?}", e);
205                        continue;
206                    }
207                }
208            }
209        })
210    }
211}
212
213fn process_change_event(
214    change: ChangeStreamEvent<Document>,
215    allowed_events: Arc<Vec<EventType>>,
216    mut notification: MongoNotify,
217) -> Option<MongoNotify> {
218    let operation_type = change.operation_type;
219
220    // Check if the operation is allowed
221    let (is_allowed, op_str) = match operation_type {
222        OperationType::Insert => (allowed_events.contains(&EventType::INSERT), "INSERT"),
223        OperationType::Delete => (allowed_events.contains(&EventType::DELETE), "DELETE"),
224        OperationType::Update => (allowed_events.contains(&EventType::UPDATE), "UPDATE"),
225        _ => (false, ""),
226    };
227
228    if !is_allowed {
229        return None;
230    }
231
232    notification.operation = Some(op_str.to_string());
233    notification.timestamp = chrono::Utc::now().to_rfc3339();
234
235    // Handle document changes based on operation type
236
237    if let Some(old_document) = change.full_document_before_change {
238        if let Ok(json_data) = serde_json::to_value(old_document) {
239            notification.old_document = Some(json_data);
240        } else {
241            error!("Failed to serialize old document");
242            return None;
243        }
244    }
245
246    if let Some(new_document) = change.full_document {
247        if let Ok(json_data) = serde_json::to_value(new_document) {
248            notification.new_document = Some(json_data);
249        } else {
250            error!("Failed to serialize new document");
251            return None;
252        }
253    }
254
255    Some(notification)
256}
257
258#[async_trait]
259impl DBListenerTrait for MongoDocumentListener {
260    async fn start(
261        &self,
262    ) -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError> {
263        let change_stream = self.initialize_change_stream().await?;
264
265        let handle = self.spawn_listener_task(change_stream);
266
267        Ok((Arc::clone(&self.receiver), handle))
268    }
269
270    async fn stop(&self) -> Result<(), DBListenerError> {
271        info!("Stopping MongoDB Change Stream listener.");
272        Ok(())
273    }
274}
275
276#[derive(Debug, Deserialize, Serialize, Clone)]
277pub struct MongoNotify {
278    pub operation: Option<String>,
279    pub database: String,
280    pub collection: String,
281    pub new_document: Option<Value>,
282    pub old_document: Option<Value>,
283    pub timestamp: String,
284}
285
286#[cfg(test)]
287mod tests {
288    use std::{env, sync::Arc};
289    use tokio::time::{sleep, Duration};
290
291    use dotenv::dotenv;
292    use mongodb::{
293        bson::{doc, Document},
294        options::ClientOptions,
295        Client,
296    };
297
298    use crate::{
299        database::{
300            mongodb::{get_or_create_client, MongoDocumentListener},
301            DBListenerTrait,
302        },
303        EventType,
304    };
305
306    #[tokio::test]
307    async fn create_new_listener_with_props() {
308        dotenv().ok();
309
310        // Get the database URL from the environment variables
311        let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
312
313        let database = "SathishLoginPage".to_string();
314        let collection = "users".to_string();
315
316        let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
317
318        let result =
319            MongoDocumentListener::new(&database_url, &database, &collection, events).await;
320
321        println!("What is the error : {:#?}", result);
322
323        assert!(result.is_ok(), "Listener failed to connect");
324
325        sleep(Duration::from_secs(1)).await;
326    }
327
328    #[tokio::test]
329    async fn create_new_listener_with_invalid_props() {
330        dotenv().ok();
331
332        // Get the database URL from the environment variables
333        let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
334
335        let database = "InvalidDb".to_string();
336        let collection = "users".to_string();
337
338        let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
339
340        let result =
341            MongoDocumentListener::new(&database_url, &database, &collection, events).await;
342
343        assert!(result.is_err(), "Listener failed to connect");
344
345        sleep(Duration::from_secs(1)).await;
346    }
347
348    #[tokio::test]
349    async fn get_same_client_for_same_url() {
350        dotenv().ok();
351
352        // Get the database URL from the environment variables
353        let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
354
355        let client1 = get_or_create_client(&database_url).await.unwrap();
356
357        let client2 = get_or_create_client(&database_url).await.unwrap();
358
359        assert!(
360            Arc::ptr_eq(&client1, &client2),
361            "Expected the same pool instance, but got different ones"
362        );
363
364        sleep(Duration::from_secs(1)).await;
365    }
366
367    #[tokio::test]
368    #[ignore = "this should be tested alone.. as it requires client connection for the same db url"]
369    async fn mongodb_document_listener() {
370        dotenv().ok();
371
372        // Get the database URL from the environment variables
373        let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
374
375        let database = "SathishLoginPage".to_string();
376        let collection = "users".to_string();
377
378        let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
379
380        let mongo_document_listener =
381            MongoDocumentListener::new(&database_url, &database, &collection, events.clone()).await;
382
383        assert!(
384            mongo_document_listener.is_ok(),
385            "Failed to initialize mongodb listener"
386        );
387
388        let mongo_document_listener = mongo_document_listener.unwrap();
389
390        let (rx, handle) = mongo_document_listener.start().await.unwrap();
391
392        let notification_task = tokio::spawn(async move {
393            let mut received_events = Vec::new();
394            while let Some(payload) = rx.lock().await.recv().await {
395                println!("Notification received: {:#?}", payload);
396                received_events.push(payload);
397                if received_events.len() >= 3 {
398                    break; // Stop after receiving all expected events
399                }
400            }
401            received_events
402        });
403
404        let client_options = ClientOptions::parse(&database_url).await.unwrap();
405        let client = Client::with_options(client_options).unwrap();
406        let db = client.database(&database);
407        let coll = db.collection::<Document>(&collection);
408
409        coll.insert_one(doc! { "user_id": 1, "name": "test_user", "age": 30 })
410            .await
411            .unwrap();
412
413        sleep(Duration::from_millis(100)).await;
414
415        coll.update_one(doc! { "user_id": 1 }, doc! { "$set": { "age": 35 } })
416            .await
417            .unwrap();
418
419        sleep(Duration::from_millis(100)).await;
420
421        coll.delete_one(doc! { "user_id": 1 }).await.unwrap();
422
423        sleep(Duration::from_millis(100)).await;
424
425        sleep(Duration::from_secs(2)).await; // Allow time for notifications to be received
426
427        let received_events = notification_task.await.unwrap();
428
429        assert_eq!(
430            received_events.len(),
431            3,
432            "Expected 3 events but received {}",
433            received_events.len()
434        );
435
436        mongo_document_listener.stop().await.unwrap();
437        handle.abort();
438    }
439}