Documentation
use crate::document::document::BaseDocument;
use crate::document::pageable::{PageableRequest, PageableResponse};
use crate::repository::base_repository::Repository;
use async_trait::async_trait;
use bson::oid::ObjectId;
use bson::{doc, Bson, Document};
use futures::TryStreamExt;
use mongodb::error::Error;
use mongodb::options::{
    DeleteOptions, FindOneAndDeleteOptions, FindOneOptions, FindOptions, InsertManyOptions,
};
use mongodb::Collection;
use opentelemetry::trace::{Span, TraceContextExt, Tracer};
use opentelemetry::{global, Context, KeyValue};
use std::fmt::Debug;
use std::str::FromStr;

/// Default base_repository implementation which should work out of the box for any struct which implements the BaseDocument trait
/// ```
/// use bson::oid::ObjectId;
/// use serde::{Deserialize, Serialize};
/// use mongo_data::document::document::BaseDocument;
/// #[derive(Deserialize, Serialize, Debug, Clone)]
///  pub struct TestEntity {
///     #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
///     pub id: Option<ObjectId>,
///    pub pid: String,
/// }
///
/// // Implement the BaseDocument trait
/// impl BaseDocument for TestEntity {
///     fn set_id(&mut self, id: Option<ObjectId>) {
///         self.id = id
///     }
///
///     fn get_id(&self) -> Option<ObjectId> {
///        self.id
///     }
/// }
/// ```
pub struct MongoRepository<
    T: BaseDocument + Debug + serde::ser::Serialize + for<'de> serde::Deserialize<'de> + Sync + Send,
> {
    pub collection: Collection<T>,
}


#[async_trait]
impl<
        T: BaseDocument
            + Debug
            + serde::ser::Serialize
            + for<'de> serde::Deserialize<'de>
            + Sync
            + Unpin
            + Send,
    > Repository<T> for MongoRepository<T>
{
    async fn save(&self, entity: T, ctx: Option<&Context>) -> Result<ObjectId, Error> {
        if let Some(u_ctx) = ctx {
            let mut span = global::tracer("opentelemetry").start_with_context("save", u_ctx);

            span.set_attribute(KeyValue::new("method", "save"));
            span.set_attribute(KeyValue::new("system", "mongodb"));

            u_ctx.with_span(span);
        }

        let result = match self.collection.insert_one(entity, None).await {
            Ok(res) => res,
            Err(e) => return Err(Error::custom(e)),
        };

        match result.inserted_id.as_object_id() {
            None => Err(Error::custom("could not get object id saved object..")),
            Some(oid) => Ok(oid),
        }
    }

    async fn save_many(
        &self,
        entities: Vec<T>,
        ctx: Option<&Context>,
        options: Option<InsertManyOptions>,
    ) -> Result<Vec<ObjectId>, Error> {
        let mut ids: Vec<ObjectId> = vec![];

        if let Some(u_ctx) = ctx {
            let mut span = global::tracer("opentelemetry").start_with_context("save_many", u_ctx);

            span.set_attribute(KeyValue::new("method", "save_many"));
            span.set_attribute(KeyValue::new("system", "mongodb"));

            u_ctx.with_span(span);
        }
        let resp = self.collection.insert_many(entities, options).await?;

        resp.inserted_ids.into_iter().for_each(|(_, y)| {
            if let Some(id) = y.as_object_id() {
                ids.push(id)
            }
        });

        Ok(ids)
    }

    async fn get_all_pageable(
        &self,
        request: PageableRequest,
        ctx: Option<&Context>,
    ) -> Result<PageableResponse<T>, Error> {
        let total = self.count_documents_in_collection(None).await?;

        let options_filter = doc! {"_id": -1};

        let options = FindOptions::builder()
            .sort(options_filter)
            .limit(request.number_per_page)
            .build();

        let mut filter = doc! {};

        if let Some(last_item_id) = request.last_item_id {
            if let Ok(obj_id) = ObjectId::from_str(last_item_id.as_str()) {
                filter = doc! {
                    "_id": {
                        "$lt": obj_id
                    }
                };
            }
        }

        let documents = self
            .find_all_by_filter_and_options(filter, Some(options), ctx)
            .await?;

        let documents_slice = documents.as_slice();

        let mut last_item_id = None;
        let mut no_of_items_in_batch = 0;

        if !documents.is_empty() {
            last_item_id = documents_slice[documents_slice.len() - 1]
                .get_id()
                .map(|t| t.to_hex());
            no_of_items_in_batch = documents_slice.len();
        }

        Ok(PageableResponse {
            data: documents,
            number_per_page: request.number_per_page,
            last_item_id,
            total,
            no_of_items_in_batch,
        })
    }

    async fn count_documents_in_collection(&self, _ctx: Option<&Context>) -> Result<u64, Error> {
        self.collection.count_documents(doc! {}, None).await
    }

    async fn find_by_id(&self, id: &str, ctx: Option<&Context>) -> Result<T, Error> {
        match ObjectId::from_str(id) {
            Ok(obj_id) => {
                let filter = doc! { "_id":  obj_id };
                self.find_by_filter_and_options(filter, None, ctx).await
            }
            Err(err) => Err(Error::custom(err)),
        }
    }

    async fn find_by_raw_id(&self, id: ObjectId, ctx: Option<&Context>) -> Result<T, Error> {
        let filter = doc! { "_id":  id };
        self.find_by_filter_and_options(filter, None, ctx).await
    }

    async fn find_by_ids(&self, ids: Vec<String>, ctx: Option<&Context>) -> Result<Vec<T>, Error> {
        let mut obj_ids = vec![];
        for id in ids {
            match ObjectId::from_str(id.as_str()) {
                Ok(obj_id) => obj_ids.push(obj_id),
                Err(err) => return Err(Error::custom(err)),
            }
        }
        let filter = doc! {
            "_id": {
                "$in": obj_ids
            }
        };
        self.find_all_by_filter_and_options(filter, None, ctx).await
    }

    async fn find_by_raw_ids(
        &self,
        ids: Vec<ObjectId>,
        ctx: Option<&Context>,
    ) -> Result<Vec<T>, Error> {
        let filter = doc! {
            "_id": {
                "$in": ids
            }
        };
        self.find_all_by_filter_and_options(filter, None, ctx).await
    }

    async fn find_by_filter_and_options(
        &self,
        filter: Document,
        options: Option<FindOneOptions>,
        ctx: Option<&Context>,
    ) -> Result<T, Error> {
        if let Some(u_ctx) = ctx {
            let mut span = global::tracer("opentelemetry")
                .start_with_context("find_by_filter_and_options", u_ctx);

            span.set_attribute(KeyValue::new("method", "find_by_filter_and_options"));
            span.set_attribute(KeyValue::new("system", "mongodb"));
            span.set_attribute(KeyValue::new("query", format!("{filter:?}")));

            u_ctx.with_span(span);
        }

        let entity_doc = self.collection.find_one(Some(filter), options).await?;

        match entity_doc {
            Some(doc) => Ok(doc),
            None => Err(Error::custom("Could not find document")),
        }
    }

    async fn find_all_by_filter_and_options(
        &self,
        filter: Document,
        options: Option<FindOptions>,
        ctx: Option<&Context>,
    ) -> Result<Vec<T>, Error> {
        if let Some(u_ctx) = ctx {
            let mut span = global::tracer("opentelemetry")
                .start_with_context("find_all_by_filter_and_options", u_ctx);

            span.set_attribute(KeyValue::new("method", "find_all_by_filter_and_options"));
            span.set_attribute(KeyValue::new("system", "mongodb"));
            span.set_attribute(KeyValue::new("query", format!("{filter:?}")));

            u_ctx.with_span(span);
        }
        let mut cursor = self.collection.find(filter, options).await?;
        let mut documents: Vec<T> = vec![];

        while let Some(document) = cursor.try_next().await? {
            documents.push(document)
        }

        Ok(documents)
    }

    async fn update(&self, entity: &T, ctx: Option<&Context>) -> Result<ObjectId, Error> {
        match entity.get_id() {
            None => Err(Error::custom("Invalid object id")),
            Some(obj_id) => {
                let serialized_entity = match bson::to_bson(&entity) {
                    Ok(ser_doc) => ser_doc,
                    Err(e) => return Err(Error::custom(e)),
                };

                let find_filter = doc! {"_id": obj_id};
                let update_filter = doc! {"$set": serialized_entity};

                if let Some(u_ctx) = ctx {
                    let mut span = global::tracer("update").start_with_context("update", u_ctx);

                    span.set_attribute(KeyValue::new("method", "update"));
                    span.set_attribute(KeyValue::new("system", "mongodb"));
                    span.set_attribute(KeyValue::new("query", format!("{find_filter:?}")));
                    span.set_attribute(KeyValue::new("command", format!("{update_filter:?}")));

                    u_ctx.with_span(span);
                }

                // Update the document:
                self.collection
                    .update_one(find_filter, update_filter, None)
                    .await
                    .map(|_| obj_id)
            }
        }
    }

    async fn delete_by_id(&self, id: ObjectId, ctx: Option<&Context>) -> Result<(), Error> {
        let filter = doc! { "_id":  id };
        self.delete(filter, ctx, None).await
    }

    async fn delete_by_ids(&self, ids: Vec<ObjectId>, ctx: Option<&Context>) -> Result<(), Error> {
        let filter = doc! {
            "_id": {
                "$in": ids
            }
        };
        self.delete_many(filter, ctx, None).await
    }

    async fn delete(
        &self,
        filter: Document,
        ctx: Option<&Context>,
        options: Option<FindOneAndDeleteOptions>,
    ) -> Result<(), Error> {
        if let Some(ctx) = ctx {
            let mut span = global::tracer("opentelemetry").start_with_context("delete", ctx);
            span.set_attribute(KeyValue::new("method", "delete"));
            span.set_attribute(KeyValue::new("system", "mongodb"));
            span.set_attribute(KeyValue::new("query", format!("{filter:?}")));
        }

        match self.collection.find_one_and_delete(filter, options).await? {
            None => Err(Error::custom("could not delete document")),
            Some(_) => Ok(()),
        }
    }

    async fn delete_many(
        &self,
        filter: Document,
        ctx: Option<&Context>,
        options: Option<DeleteOptions>,
    ) -> Result<(), Error> {
        if let Some(ctx) = ctx {
            let mut span = global::tracer("opentelemetry").start_with_context("delete_many", ctx);
            span.set_attribute(KeyValue::new("method", "delete_many"));
            span.set_attribute(KeyValue::new("system", "mongodb"));
            span.set_attribute(KeyValue::new("query", format!("{filter:?}")));
        }
        self.collection
            .delete_many(filter, options)
            .await
            .map(|_| ())
    }
}