1use std::{collections::HashMap, sync::Arc};
20
21use async_trait::async_trait;
22use mongodb::{
23 bson::Document,
24 change_stream::{
25 event::{ChangeStreamEvent, OperationType},
26 ChangeStream,
27 },
28 options::{ClientOptions, FullDocumentBeforeChangeType, FullDocumentType},
29 Client,
30};
31use once_cell::sync::Lazy;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use tokio::{
35 sync::{
36 mpsc::{self, Receiver, Sender},
37 Mutex, RwLock,
38 },
39 task::JoinHandle,
40};
41use tracing::{error, info};
42
43use crate::config::DBListenerError;
44
45use super::{DBListenerTrait, EventType};
46
47static MONGODB_CLIENT_REGISTERY: Lazy<RwLock<HashMap<String, Arc<Client>>>> =
48 Lazy::new(|| RwLock::new(HashMap::new()));
49
50async fn get_or_create_client(db_url: &str) -> Result<Arc<Client>, DBListenerError> {
51 let mut clients = MONGODB_CLIENT_REGISTERY.write().await;
52
53 if let Some(client) = clients.get(db_url) {
54 Ok(Arc::clone(client))
55 } else {
56 let client_options = ClientOptions::parse(db_url).await;
57
58 if let Err(err) = client_options {
59 error!("Failed to connect to the client: {:?}", err);
60 return Err(DBListenerError::CreationError(format!(
61 "Failed to connect to the client url : {:#?}",
62 err
63 )));
64 }
65
66 let mut client_options = client_options.unwrap();
67
68 client_options.max_pool_size = Some(10);
69
70 let new_client = Client::with_options(client_options);
71
72 if let Err(err) = new_client {
73 error!("Failed to create client : {:#?}", err);
74 return Err(DBListenerError::CreationError(format!(
75 "Failed to create the client : {:#?}",
76 err
77 )));
78 }
79
80 let new_client = Arc::new(new_client.unwrap());
81
82 clients.insert(db_url.to_string(), Arc::clone(&new_client));
83
84 Ok(new_client)
85 }
86}
87
88#[derive(Debug, Clone)]
89pub struct MongoDocumentListener {
90 pub client: Arc<Client>,
91 pub database: String,
92 pub collection: String,
93 pub sender: Sender<Value>,
94 pub receiver: Arc<Mutex<Receiver<Value>>>,
95 pub events: Vec<EventType>,
96}
97
98impl MongoDocumentListener {
99 pub async fn new(
100 url: &str,
101 database: &str,
102 collection: &str,
103 events: Vec<EventType>,
104 ) -> Result<Self, DBListenerError> {
105 let client = get_or_create_client(url).await?;
106
107 let (sender, receiver) = mpsc::channel::<Value>(100); let mongo_document_listener = Self {
110 client: Arc::clone(&client),
111 collection: collection.to_string(),
112 database: database.to_string(),
113 events,
114 receiver: Arc::new(Mutex::new(receiver)),
115 sender,
116 };
117
118 mongo_document_listener.verify_members().await?;
119
120 Ok(mongo_document_listener)
121 }
122
123 pub async fn verify_members(&self) -> Result<(), DBListenerError> {
124 let db = self.client.database(&self.database);
125 let collection_names = db.list_collection_names().await.map_err(|err| {
126 error!("Failed to list collections: {:?}", err);
127 DBListenerError::ListenerVerifyError(format!("Error listing collections: {:#?}", err))
128 })?;
129
130 if !collection_names.contains(&self.collection) {
131 return Err(DBListenerError::ListenerVerifyError(format!(
132 "Collection `{}` does not exist in database `{}`",
133 self.collection, self.database
134 )));
135 }
136
137 Ok(())
138 }
139
140 async fn initialize_change_stream(
141 &self,
142 ) -> Result<ChangeStream<ChangeStreamEvent<Document>>, DBListenerError> {
143 let db = self.client.database(&self.database);
144 let collection = db.collection::<Document>(&self.collection);
145
146 collection
147 .watch()
148 .full_document_before_change(FullDocumentBeforeChangeType::WhenAvailable)
149 .full_document(FullDocumentType::UpdateLookup)
150 .await
151 .map_err(|e| {
152 error!("Failed to start MongoDB Change Stream: {:?}", e);
153 DBListenerError::ListenerError(e.to_string())
154 })
155 }
156
157 fn spawn_listener_task(
158 &self,
159 mut change_stream: ChangeStream<ChangeStreamEvent<Document>>,
160 ) -> JoinHandle<()> {
161 let sender_clone = self.sender.clone();
162 let allowed_events = self.events.clone();
163 let collection_name = self.collection.clone();
164
165 let mongo_notify = MongoNotify {
167 collection: self.collection.clone(),
168 database: self.database.clone(),
169 old_document: None,
170 new_document: None,
171 timestamp: String::new(),
172 operation: None,
173 };
174
175 tokio::spawn(async move {
176 let allowed_events = Arc::new(allowed_events);
177
178 info!(
179 "MongoDB Change Stream listener started for `{}` collection",
180 collection_name
181 );
182
183 while change_stream.is_alive() {
184 let notification = mongo_notify.clone();
185
186 match change_stream.next_if_any().await {
187 Ok(Some(change)) => {
188 println!("change stream : {:#?}", change);
189 if let Some(processed_notification) =
190 process_change_event(change, Arc::clone(&allowed_events), notification)
191 {
192 if let Ok(json_data) = serde_json::to_value(processed_notification) {
194 if let Err(e) = sender_clone.send(json_data).await {
195 error!("Failed to send notification: {:?}", e);
196 }
197 } else {
198 error!("Failed to serialize the mongo notification");
199 }
200 }
201 }
202 Ok(None) => continue,
203 Err(e) => {
204 error!("Failed to get event: {:?}", e);
205 continue;
206 }
207 }
208 }
209 })
210 }
211}
212
213fn process_change_event(
214 change: ChangeStreamEvent<Document>,
215 allowed_events: Arc<Vec<EventType>>,
216 mut notification: MongoNotify,
217) -> Option<MongoNotify> {
218 let operation_type = change.operation_type;
219
220 let (is_allowed, op_str) = match operation_type {
222 OperationType::Insert => (allowed_events.contains(&EventType::INSERT), "INSERT"),
223 OperationType::Delete => (allowed_events.contains(&EventType::DELETE), "DELETE"),
224 OperationType::Update => (allowed_events.contains(&EventType::UPDATE), "UPDATE"),
225 _ => (false, ""),
226 };
227
228 if !is_allowed {
229 return None;
230 }
231
232 notification.operation = Some(op_str.to_string());
233 notification.timestamp = chrono::Utc::now().to_rfc3339();
234
235 if let Some(old_document) = change.full_document_before_change {
238 if let Ok(json_data) = serde_json::to_value(old_document) {
239 notification.old_document = Some(json_data);
240 } else {
241 error!("Failed to serialize old document");
242 return None;
243 }
244 }
245
246 if let Some(new_document) = change.full_document {
247 if let Ok(json_data) = serde_json::to_value(new_document) {
248 notification.new_document = Some(json_data);
249 } else {
250 error!("Failed to serialize new document");
251 return None;
252 }
253 }
254
255 Some(notification)
256}
257
258#[async_trait]
259impl DBListenerTrait for MongoDocumentListener {
260 async fn start(
261 &self,
262 ) -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError> {
263 let change_stream = self.initialize_change_stream().await?;
264
265 let handle = self.spawn_listener_task(change_stream);
266
267 Ok((Arc::clone(&self.receiver), handle))
268 }
269
270 async fn stop(&self) -> Result<(), DBListenerError> {
271 info!("Stopping MongoDB Change Stream listener.");
272 Ok(())
273 }
274}
275
276#[derive(Debug, Deserialize, Serialize, Clone)]
277pub struct MongoNotify {
278 pub operation: Option<String>,
279 pub database: String,
280 pub collection: String,
281 pub new_document: Option<Value>,
282 pub old_document: Option<Value>,
283 pub timestamp: String,
284}
285
286#[cfg(test)]
287mod tests {
288 use std::{env, sync::Arc};
289 use tokio::time::{sleep, Duration};
290
291 use dotenv::dotenv;
292 use mongodb::{
293 bson::{doc, Document},
294 options::ClientOptions,
295 Client,
296 };
297
298 use crate::{
299 database::{
300 mongodb::{get_or_create_client, MongoDocumentListener},
301 DBListenerTrait,
302 },
303 EventType,
304 };
305
306 #[tokio::test]
307 async fn create_new_listener_with_props() {
308 dotenv().ok();
309
310 let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
312
313 let database = "SathishLoginPage".to_string();
314 let collection = "users".to_string();
315
316 let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
317
318 let result =
319 MongoDocumentListener::new(&database_url, &database, &collection, events).await;
320
321 println!("What is the error : {:#?}", result);
322
323 assert!(result.is_ok(), "Listener failed to connect");
324
325 sleep(Duration::from_secs(1)).await;
326 }
327
328 #[tokio::test]
329 async fn create_new_listener_with_invalid_props() {
330 dotenv().ok();
331
332 let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
334
335 let database = "InvalidDb".to_string();
336 let collection = "users".to_string();
337
338 let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
339
340 let result =
341 MongoDocumentListener::new(&database_url, &database, &collection, events).await;
342
343 assert!(result.is_err(), "Listener failed to connect");
344
345 sleep(Duration::from_secs(1)).await;
346 }
347
348 #[tokio::test]
349 async fn get_same_client_for_same_url() {
350 dotenv().ok();
351
352 let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
354
355 let client1 = get_or_create_client(&database_url).await.unwrap();
356
357 let client2 = get_or_create_client(&database_url).await.unwrap();
358
359 assert!(
360 Arc::ptr_eq(&client1, &client2),
361 "Expected the same pool instance, but got different ones"
362 );
363
364 sleep(Duration::from_secs(1)).await;
365 }
366
367 #[tokio::test]
368 #[ignore = "this should be tested alone.. as it requires client connection for the same db url"]
369 async fn mongodb_document_listener() {
370 dotenv().ok();
371
372 let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
374
375 let database = "SathishLoginPage".to_string();
376 let collection = "users".to_string();
377
378 let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
379
380 let mongo_document_listener =
381 MongoDocumentListener::new(&database_url, &database, &collection, events.clone()).await;
382
383 assert!(
384 mongo_document_listener.is_ok(),
385 "Failed to initialize mongodb listener"
386 );
387
388 let mongo_document_listener = mongo_document_listener.unwrap();
389
390 let (rx, handle) = mongo_document_listener.start().await.unwrap();
391
392 let notification_task = tokio::spawn(async move {
393 let mut received_events = Vec::new();
394 while let Some(payload) = rx.lock().await.recv().await {
395 println!("Notification received: {:#?}", payload);
396 received_events.push(payload);
397 if received_events.len() >= 3 {
398 break; }
400 }
401 received_events
402 });
403
404 let client_options = ClientOptions::parse(&database_url).await.unwrap();
405 let client = Client::with_options(client_options).unwrap();
406 let db = client.database(&database);
407 let coll = db.collection::<Document>(&collection);
408
409 coll.insert_one(doc! { "user_id": 1, "name": "test_user", "age": 30 })
410 .await
411 .unwrap();
412
413 sleep(Duration::from_millis(100)).await;
414
415 coll.update_one(doc! { "user_id": 1 }, doc! { "$set": { "age": 35 } })
416 .await
417 .unwrap();
418
419 sleep(Duration::from_millis(100)).await;
420
421 coll.delete_one(doc! { "user_id": 1 }).await.unwrap();
422
423 sleep(Duration::from_millis(100)).await;
424
425 sleep(Duration::from_secs(2)).await; let received_events = notification_task.await.unwrap();
428
429 assert_eq!(
430 received_events.len(),
431 3,
432 "Expected 3 events but received {}",
433 received_events.len()
434 );
435
436 mongo_document_listener.stop().await.unwrap();
437 handle.abort();
438 }
439}