use crate::document::document::BaseDocument;
use crate::document::pageable::{PageableRequest, PageableResponse};
use crate::repository::base_repository::Repository;
use async_trait::async_trait;
use bson::oid::ObjectId;
use bson::{doc, Bson, Document};
use futures::TryStreamExt;
use mongodb::error::Error;
use mongodb::options::{
DeleteOptions, FindOneAndDeleteOptions, FindOneOptions, FindOptions, InsertManyOptions,
};
use mongodb::Collection;
use opentelemetry::trace::{Span, TraceContextExt, Tracer};
use opentelemetry::{global, Context, KeyValue};
use std::fmt::Debug;
use std::str::FromStr;
pub struct MongoRepository<
T: BaseDocument + Debug + serde::ser::Serialize + for<'de> serde::Deserialize<'de> + Sync + Send,
> {
pub collection: Collection<T>,
}
#[async_trait]
impl<
T: BaseDocument
+ Debug
+ serde::ser::Serialize
+ for<'de> serde::Deserialize<'de>
+ Sync
+ Unpin
+ Send,
> Repository<T> for MongoRepository<T>
{
async fn save(&self, entity: T, ctx: Option<&Context>) -> Result<ObjectId, Error> {
if let Some(u_ctx) = ctx {
let mut span = global::tracer("opentelemetry").start_with_context("save", u_ctx);
span.set_attribute(KeyValue::new("method", "save"));
span.set_attribute(KeyValue::new("system", "mongodb"));
u_ctx.with_span(span);
}
let result = match self.collection.insert_one(entity, None).await {
Ok(res) => res,
Err(e) => return Err(Error::custom(e)),
};
match result.inserted_id.as_object_id() {
None => Err(Error::custom("could not get object id saved object..")),
Some(oid) => Ok(oid),
}
}
async fn save_many(
&self,
entities: Vec<T>,
ctx: Option<&Context>,
options: Option<InsertManyOptions>,
) -> Result<Vec<ObjectId>, Error> {
let mut ids: Vec<ObjectId> = vec![];
if let Some(u_ctx) = ctx {
let mut span = global::tracer("opentelemetry").start_with_context("save_many", u_ctx);
span.set_attribute(KeyValue::new("method", "save_many"));
span.set_attribute(KeyValue::new("system", "mongodb"));
u_ctx.with_span(span);
}
let resp = self.collection.insert_many(entities, options).await?;
resp.inserted_ids.into_iter().for_each(|(_, y)| {
if let Some(id) = y.as_object_id() {
ids.push(id)
}
});
Ok(ids)
}
async fn get_all_pageable(
&self,
request: PageableRequest,
ctx: Option<&Context>,
) -> Result<PageableResponse<T>, Error> {
let total = self.count_documents_in_collection(None).await?;
let options_filter = doc! {"_id": -1};
let options = FindOptions::builder()
.sort(options_filter)
.limit(request.number_per_page)
.build();
let mut filter = doc! {};
if let Some(last_item_id) = request.last_item_id {
if let Ok(obj_id) = ObjectId::from_str(last_item_id.as_str()) {
filter = doc! {
"_id": {
"$lt": obj_id
}
};
}
}
let documents = self
.find_all_by_filter_and_options(filter, Some(options), ctx)
.await?;
let documents_slice = documents.as_slice();
let mut last_item_id = None;
let mut no_of_items_in_batch = 0;
if !documents.is_empty() {
last_item_id = documents_slice[documents_slice.len() - 1]
.get_id()
.map(|t| t.to_hex());
no_of_items_in_batch = documents_slice.len();
}
Ok(PageableResponse {
data: documents,
number_per_page: request.number_per_page,
last_item_id,
total,
no_of_items_in_batch,
})
}
async fn count_documents_in_collection(&self, _ctx: Option<&Context>) -> Result<u64, Error> {
self.collection.count_documents(doc! {}, None).await
}
async fn find_by_id(&self, id: &str, ctx: Option<&Context>) -> Result<T, Error> {
match ObjectId::from_str(id) {
Ok(obj_id) => {
let filter = doc! { "_id": obj_id };
self.find_by_filter_and_options(filter, None, ctx).await
}
Err(err) => Err(Error::custom(err)),
}
}
async fn find_by_raw_id(&self, id: ObjectId, ctx: Option<&Context>) -> Result<T, Error> {
let filter = doc! { "_id": id };
self.find_by_filter_and_options(filter, None, ctx).await
}
async fn find_by_ids(&self, ids: Vec<String>, ctx: Option<&Context>) -> Result<Vec<T>, Error> {
let mut obj_ids = vec![];
for id in ids {
match ObjectId::from_str(id.as_str()) {
Ok(obj_id) => obj_ids.push(obj_id),
Err(err) => return Err(Error::custom(err)),
}
}
let filter = doc! {
"_id": {
"$in": obj_ids
}
};
self.find_all_by_filter_and_options(filter, None, ctx).await
}
async fn find_by_raw_ids(
&self,
ids: Vec<ObjectId>,
ctx: Option<&Context>,
) -> Result<Vec<T>, Error> {
let filter = doc! {
"_id": {
"$in": ids
}
};
self.find_all_by_filter_and_options(filter, None, ctx).await
}
async fn find_by_filter_and_options(
&self,
filter: Document,
options: Option<FindOneOptions>,
ctx: Option<&Context>,
) -> Result<T, Error> {
if let Some(u_ctx) = ctx {
let mut span = global::tracer("opentelemetry")
.start_with_context("find_by_filter_and_options", u_ctx);
span.set_attribute(KeyValue::new("method", "find_by_filter_and_options"));
span.set_attribute(KeyValue::new("system", "mongodb"));
span.set_attribute(KeyValue::new("query", format!("{filter:?}")));
u_ctx.with_span(span);
}
let entity_doc = self.collection.find_one(Some(filter), options).await?;
match entity_doc {
Some(doc) => Ok(doc),
None => Err(Error::custom("Could not find document")),
}
}
async fn find_all_by_filter_and_options(
&self,
filter: Document,
options: Option<FindOptions>,
ctx: Option<&Context>,
) -> Result<Vec<T>, Error> {
if let Some(u_ctx) = ctx {
let mut span = global::tracer("opentelemetry")
.start_with_context("find_all_by_filter_and_options", u_ctx);
span.set_attribute(KeyValue::new("method", "find_all_by_filter_and_options"));
span.set_attribute(KeyValue::new("system", "mongodb"));
span.set_attribute(KeyValue::new("query", format!("{filter:?}")));
u_ctx.with_span(span);
}
let mut cursor = self.collection.find(filter, options).await?;
let mut documents: Vec<T> = vec![];
while let Some(document) = cursor.try_next().await? {
documents.push(document)
}
Ok(documents)
}
async fn update(&self, entity: &T, ctx: Option<&Context>) -> Result<ObjectId, Error> {
match entity.get_id() {
None => Err(Error::custom("Invalid object id")),
Some(obj_id) => {
let serialized_entity = match bson::to_bson(&entity) {
Ok(ser_doc) => ser_doc,
Err(e) => return Err(Error::custom(e)),
};
let find_filter = doc! {"_id": obj_id};
let update_filter = doc! {"$set": serialized_entity};
if let Some(u_ctx) = ctx {
let mut span = global::tracer("update").start_with_context("update", u_ctx);
span.set_attribute(KeyValue::new("method", "update"));
span.set_attribute(KeyValue::new("system", "mongodb"));
span.set_attribute(KeyValue::new("query", format!("{find_filter:?}")));
span.set_attribute(KeyValue::new("command", format!("{update_filter:?}")));
u_ctx.with_span(span);
}
self.collection
.update_one(find_filter, update_filter, None)
.await
.map(|_| obj_id)
}
}
}
async fn delete_by_id(&self, id: ObjectId, ctx: Option<&Context>) -> Result<(), Error> {
let filter = doc! { "_id": id };
self.delete(filter, ctx, None).await
}
async fn delete_by_ids(&self, ids: Vec<ObjectId>, ctx: Option<&Context>) -> Result<(), Error> {
let filter = doc! {
"_id": {
"$in": ids
}
};
self.delete_many(filter, ctx, None).await
}
async fn delete(
&self,
filter: Document,
ctx: Option<&Context>,
options: Option<FindOneAndDeleteOptions>,
) -> Result<(), Error> {
if let Some(ctx) = ctx {
let mut span = global::tracer("opentelemetry").start_with_context("delete", ctx);
span.set_attribute(KeyValue::new("method", "delete"));
span.set_attribute(KeyValue::new("system", "mongodb"));
span.set_attribute(KeyValue::new("query", format!("{filter:?}")));
}
match self.collection.find_one_and_delete(filter, options).await? {
None => Err(Error::custom("could not delete document")),
Some(_) => Ok(()),
}
}
async fn delete_many(
&self,
filter: Document,
ctx: Option<&Context>,
options: Option<DeleteOptions>,
) -> Result<(), Error> {
if let Some(ctx) = ctx {
let mut span = global::tracer("opentelemetry").start_with_context("delete_many", ctx);
span.set_attribute(KeyValue::new("method", "delete_many"));
span.set_attribute(KeyValue::new("system", "mongodb"));
span.set_attribute(KeyValue::new("query", format!("{filter:?}")));
}
self.collection
.delete_many(filter, options)
.await
.map(|_| ())
}
}