use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use mongodb::{
bson::Document,
change_stream::{
event::{ChangeStreamEvent, OperationType},
ChangeStream,
},
options::{ClientOptions, FullDocumentBeforeChangeType, FullDocumentType},
Client,
};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{
sync::{
mpsc::{self, Receiver, Sender},
Mutex, RwLock,
},
task::JoinHandle,
};
use tracing::{error, info};
use crate::config::DBListenerError;
use super::{DBListenerTrait, EventType};
static MONGODB_CLIENT_REGISTERY: Lazy<RwLock<HashMap<String, Arc<Client>>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
async fn get_or_create_client(db_url: &str) -> Result<Arc<Client>, DBListenerError> {
let mut clients = MONGODB_CLIENT_REGISTERY.write().await;
if let Some(client) = clients.get(db_url) {
Ok(Arc::clone(client))
} else {
let client_options = ClientOptions::parse(db_url).await;
if let Err(err) = client_options {
error!("Failed to connect to the client: {:?}", err);
return Err(DBListenerError::CreationError(format!(
"Failed to connect to the client url : {:#?}",
err
)));
}
let mut client_options = client_options.unwrap();
client_options.max_pool_size = Some(10);
let new_client = Client::with_options(client_options);
if let Err(err) = new_client {
error!("Failed to create client : {:#?}", err);
return Err(DBListenerError::CreationError(format!(
"Failed to create the client : {:#?}",
err
)));
}
let new_client = Arc::new(new_client.unwrap());
clients.insert(db_url.to_string(), Arc::clone(&new_client));
Ok(new_client)
}
}
#[derive(Debug, Clone)]
pub struct MongoDocumentListener {
pub client: Arc<Client>,
pub database: String,
pub collection: String,
pub sender: Sender<Value>,
pub receiver: Arc<Mutex<Receiver<Value>>>,
pub events: Vec<EventType>,
}
impl MongoDocumentListener {
pub async fn new(
url: &str,
database: &str,
collection: &str,
events: Vec<EventType>,
) -> Result<Self, DBListenerError> {
let client = get_or_create_client(url).await?;
let (sender, receiver) = mpsc::channel::<Value>(100);
let mongo_document_listener = Self {
client: Arc::clone(&client),
collection: collection.to_string(),
database: database.to_string(),
events,
receiver: Arc::new(Mutex::new(receiver)),
sender,
};
mongo_document_listener.verify_members().await?;
Ok(mongo_document_listener)
}
pub async fn verify_members(&self) -> Result<(), DBListenerError> {
let db = self.client.database(&self.database);
let collection_names = db.list_collection_names().await.map_err(|err| {
error!("Failed to list collections: {:?}", err);
DBListenerError::ListenerVerifyError(format!("Error listing collections: {:#?}", err))
})?;
if !collection_names.contains(&self.collection) {
return Err(DBListenerError::ListenerVerifyError(format!(
"Collection `{}` does not exist in database `{}`",
self.collection, self.database
)));
}
Ok(())
}
async fn initialize_change_stream(
&self,
) -> Result<ChangeStream<ChangeStreamEvent<Document>>, DBListenerError> {
let db = self.client.database(&self.database);
let collection = db.collection::<Document>(&self.collection);
collection
.watch()
.full_document_before_change(FullDocumentBeforeChangeType::WhenAvailable)
.full_document(FullDocumentType::UpdateLookup)
.await
.map_err(|e| {
error!("Failed to start MongoDB Change Stream: {:?}", e);
DBListenerError::ListenerError(e.to_string())
})
}
fn spawn_listener_task(
&self,
mut change_stream: ChangeStream<ChangeStreamEvent<Document>>,
) -> JoinHandle<()> {
let sender_clone = self.sender.clone();
let allowed_events = self.events.clone();
let collection_name = self.collection.clone();
let mongo_notify = MongoNotify {
collection: self.collection.clone(),
database: self.database.clone(),
old_document: None,
new_document: None,
timestamp: String::new(),
operation: None,
};
tokio::spawn(async move {
let allowed_events = Arc::new(allowed_events);
info!(
"MongoDB Change Stream listener started for `{}` collection",
collection_name
);
while change_stream.is_alive() {
let notification = mongo_notify.clone();
match change_stream.next_if_any().await {
Ok(Some(change)) => {
println!("change stream : {:#?}", change);
if let Some(processed_notification) =
process_change_event(change, Arc::clone(&allowed_events), notification)
{
if let Ok(json_data) = serde_json::to_value(processed_notification) {
if let Err(e) = sender_clone.send(json_data).await {
error!("Failed to send notification: {:?}", e);
}
} else {
error!("Failed to serialize the mongo notification");
}
}
}
Ok(None) => continue,
Err(e) => {
error!("Failed to get event: {:?}", e);
continue;
}
}
}
})
}
}
fn process_change_event(
change: ChangeStreamEvent<Document>,
allowed_events: Arc<Vec<EventType>>,
mut notification: MongoNotify,
) -> Option<MongoNotify> {
let operation_type = change.operation_type;
let (is_allowed, op_str) = match operation_type {
OperationType::Insert => (allowed_events.contains(&EventType::INSERT), "INSERT"),
OperationType::Delete => (allowed_events.contains(&EventType::DELETE), "DELETE"),
OperationType::Update => (allowed_events.contains(&EventType::UPDATE), "UPDATE"),
_ => (false, ""),
};
if !is_allowed {
return None;
}
notification.operation = Some(op_str.to_string());
notification.timestamp = chrono::Utc::now().to_rfc3339();
if let Some(old_document) = change.full_document_before_change {
if let Ok(json_data) = serde_json::to_value(old_document) {
notification.old_document = Some(json_data);
} else {
error!("Failed to serialize old document");
return None;
}
}
if let Some(new_document) = change.full_document {
if let Ok(json_data) = serde_json::to_value(new_document) {
notification.new_document = Some(json_data);
} else {
error!("Failed to serialize new document");
return None;
}
}
Some(notification)
}
#[async_trait]
impl DBListenerTrait for MongoDocumentListener {
async fn start(
&self,
) -> Result<(Arc<Mutex<Receiver<Value>>>, JoinHandle<()>), DBListenerError> {
let change_stream = self.initialize_change_stream().await?;
let handle = self.spawn_listener_task(change_stream);
Ok((Arc::clone(&self.receiver), handle))
}
async fn stop(&self) -> Result<(), DBListenerError> {
info!("Stopping MongoDB Change Stream listener.");
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct MongoNotify {
pub operation: Option<String>,
pub database: String,
pub collection: String,
pub new_document: Option<Value>,
pub old_document: Option<Value>,
pub timestamp: String,
}
#[cfg(test)]
mod tests {
use std::{env, sync::Arc};
use tokio::time::{sleep, Duration};
use dotenv::dotenv;
use mongodb::{
bson::{doc, Document},
options::ClientOptions,
Client,
};
use crate::{
database::{
mongodb::{get_or_create_client, MongoDocumentListener},
DBListenerTrait,
},
EventType,
};
#[tokio::test]
async fn create_new_listener_with_props() {
dotenv().ok();
let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
let database = "SathishLoginPage".to_string();
let collection = "users".to_string();
let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
let result =
MongoDocumentListener::new(&database_url, &database, &collection, events).await;
println!("What is the error : {:#?}", result);
assert!(result.is_ok(), "Listener failed to connect");
sleep(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn create_new_listener_with_invalid_props() {
dotenv().ok();
let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
let database = "InvalidDb".to_string();
let collection = "users".to_string();
let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
let result =
MongoDocumentListener::new(&database_url, &database, &collection, events).await;
assert!(result.is_err(), "Listener failed to connect");
sleep(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn get_same_client_for_same_url() {
dotenv().ok();
let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
let client1 = get_or_create_client(&database_url).await.unwrap();
let client2 = get_or_create_client(&database_url).await.unwrap();
assert!(
Arc::ptr_eq(&client1, &client2),
"Expected the same pool instance, but got different ones"
);
sleep(Duration::from_secs(1)).await;
}
#[tokio::test]
#[ignore = "this should be tested alone.. as it requires client connection for the same db url"]
async fn mongodb_document_listener() {
dotenv().ok();
let database_url = env::var("MONGO_DATABASE_URL").expect("MONGO_DATABASE_URL must be set");
let database = "SathishLoginPage".to_string();
let collection = "users".to_string();
let events = vec![EventType::INSERT, EventType::UPDATE, EventType::DELETE];
let mongo_document_listener =
MongoDocumentListener::new(&database_url, &database, &collection, events.clone()).await;
assert!(
mongo_document_listener.is_ok(),
"Failed to initialize mongodb listener"
);
let mongo_document_listener = mongo_document_listener.unwrap();
let (rx, handle) = mongo_document_listener.start().await.unwrap();
let notification_task = tokio::spawn(async move {
let mut received_events = Vec::new();
while let Some(payload) = rx.lock().await.recv().await {
println!("Notification received: {:#?}", payload);
received_events.push(payload);
if received_events.len() >= 3 {
break; }
}
received_events
});
let client_options = ClientOptions::parse(&database_url).await.unwrap();
let client = Client::with_options(client_options).unwrap();
let db = client.database(&database);
let coll = db.collection::<Document>(&collection);
coll.insert_one(doc! { "user_id": 1, "name": "test_user", "age": 30 })
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
coll.update_one(doc! { "user_id": 1 }, doc! { "$set": { "age": 35 } })
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
coll.delete_one(doc! { "user_id": 1 }).await.unwrap();
sleep(Duration::from_millis(100)).await;
sleep(Duration::from_secs(2)).await;
let received_events = notification_task.await.unwrap();
assert_eq!(
received_events.len(),
3,
"Expected 3 events but received {}",
received_events.len()
);
mongo_document_listener.stop().await.unwrap();
handle.abort();
}
}