use async_trait::async_trait;
use futures::TryStreamExt;
use mongodb::{Collection, options::ReturnDocument};
use serde::{Serialize, de::DeserializeOwned};
use std::fmt::Debug;
use crate::error::CoreError;
use crate::domain::database::DatabaseAdapter;
use crate::domain::query::{IntoBsonDocument, IntoDbFilter, QueryBuilder};
use crate::infrastructure::database::bindings::traits::MongoInsert;
#[derive(Debug, Clone)]
pub struct MongodbAdapter<T: Send + Sync + Debug> {
collection: Collection<T>,
}
impl<T: Send + Sync + Debug> MongodbAdapter<T> {
pub fn new(client: &mongodb::Client, cnx: &str, name: &str) -> Self {
MongodbAdapter {
collection: client.database(cnx).collection::<T>(name),
}
}
}
#[async_trait]
impl<T> DatabaseAdapter<T> for MongodbAdapter<T>
where
T: Debug
+ Send
+ Sync
+ Serialize
+ DeserializeOwned
+ 'static
+ MongoInsert
+ IntoBsonDocument,
{
async fn insert(&self, data: T) -> Result<String, CoreError> {
let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
let doc = data.into_bson_document().unwrap();
let operation = collection.insert_one(doc);
let doc = operation.await?;
let id = doc.inserted_id.as_object_id().ok_or({
CoreError::Validation(crate::error::ValidationError::Malformed {
field: crate::error::CredentialField::ObjectId,
})
})?;
Ok(id.to_string())
}
async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
let docs: Vec<mongodb::bson::Document> = data
.into_iter()
.map(|d| d.into_bson_document().unwrap())
.collect();
let operation = collection.insert_many(docs);
let doc = operation.await?;
let ids = doc
.inserted_ids
.into_values()
.map(|v| {
v.as_object_id().map(|id| id.to_string()).ok_or({
CoreError::Validation(crate::error::ValidationError::Malformed {
field: crate::error::CredentialField::ObjectId,
})
})
})
.collect::<Result<Vec<String>, _>>()?;
Ok(ids)
}
async fn upsert(&self, data: T) -> Result<String, CoreError> {
let doc = data.into_bson_document().unwrap();
let filter: mongodb::bson::Document = T::uniques()
.iter()
.filter_map(|&key| {
let clean_key = key.trim_matches('"');
doc.get(clean_key)
.map(|val| (clean_key.to_string(), val.clone()))
})
.collect();
let update = mongodb::bson::doc! { "$set": doc };
let collection = self.collection.clone_with_type::<mongodb::bson::Document>();
let result = collection
.find_one_and_update(filter, update)
.return_document(ReturnDocument::After)
.upsert(true)
.await?
.ok_or(CoreError::Internal(crate::error::InternalError::Database(
"upsert failed".into(),
)))?;
let id = result
.get("_id")
.and_then(|id| id.as_object_id())
.ok_or(CoreError::Internal(crate::error::InternalError::Database(
"missing id".into(),
)))?;
Ok(id.to_string())
}
async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
let mut ids = Vec::with_capacity(data.len());
for item in data {
let id = self.upsert(item).await?;
ids.push(id);
}
Ok(ids)
}
async fn find_all(&self, query: QueryBuilder) -> Result<Vec<T>, CoreError> {
let doc = query.into_mongo_filter();
self.collection
.find(doc)
.await?
.try_collect()
.await
.map_err(Into::into)
}
async fn find_one(&self, query: QueryBuilder) -> Result<Option<T>, CoreError> {
let doc = query.into_mongo_filter();
self.collection.find_one(doc).await.map_err(Into::into)
}
async fn find_one_and_update(
&self,
filter: QueryBuilder,
update: QueryBuilder,
) -> Result<Option<T>, CoreError> {
let filter = filter.into_mongo_filter();
let update = update.into_mongo_update();
self.collection
.find_one_and_update(filter, update)
.return_document(ReturnDocument::After)
.await
.map_err(Into::into)
}
async fn update_many(
&self,
filter: QueryBuilder,
update: QueryBuilder,
) -> Result<(), CoreError> {
let filter = filter.into_mongo_filter();
let update = update.into_mongo_update();
self.collection.update_many(filter, update).await?;
Ok(())
}
async fn delete_one(&self, query: QueryBuilder) -> Result<(), CoreError> {
let filter = query.into_mongo_filter();
self.collection.delete_one(filter).await?;
Ok(())
}
async fn delete_many(&self, query: QueryBuilder) -> Result<(), CoreError> {
let filter = query.into_mongo_filter();
self.collection.delete_many(filter).await?;
Ok(())
}
}