use crate::{CollectionConfig, Model};
use async_trait::async_trait;
use mongodb::bson::oid::ObjectId;
use mongodb::bson::{Document, doc};
#[cfg(feature = "bson-3")]
use mongodb::bson::{deserialize_from_document, serialize_to_bson};
#[cfg(feature = "compat-3-0-0")]
use mongodb::bson::{from_document as deserialize_from_document, to_bson as serialize_to_bson};
use mongodb::error::Result;
use mongodb::options::{CollectionOptions, UpdateOptions};
use serde::Deserialize;
use std::borrow::Borrow;
use std::ops::Deref;
#[derive(Debug)]
pub struct BulkUpdate {
pub query: Document,
pub update: Document,
pub options: Option<UpdateOptions>,
}
#[derive(Debug, Deserialize)]
pub struct BulkUpdateResult {
#[serde(rename = "n")]
pub nb_affected: u64,
#[serde(rename = "nModified")]
pub nb_modified: u64,
#[serde(default)]
pub upserted: Vec<BulkUpdateUpsertResult>,
}
#[derive(Debug, Deserialize)]
pub struct BulkUpdateUpsertResult {
pub index: u64,
#[serde(alias = "_id")]
pub id: ObjectId,
}
#[derive(Debug)]
pub struct Repository<M: Model> {
db: mongodb::Database, coll: mongodb::Collection<M>,
}
impl<M: Model> Deref for Repository<M> {
type Target = mongodb::Collection<M>;
fn deref(&self) -> &mongodb::Collection<M> {
&self.coll
}
}
impl<M: Model> Clone for Repository<M> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
coll: self.coll.clone_with_type(),
}
}
}
impl<M: Model> Repository<M> {
pub fn new(db: mongodb::Database) -> Self {
let coll = M::CollConf::collection_options().map_or_else(
|| db.collection(M::CollConf::collection_name()),
|options| db.collection_with_options(M::CollConf::collection_name(), options),
);
Self { db, coll }
}
pub fn new_with_options(db: mongodb::Database, options: CollectionOptions) -> Self {
let coll = db.collection_with_options(M::CollConf::collection_name(), options);
Self { db, coll }
}
pub fn collection_name(&self) -> &'static str {
M::CollConf::collection_name()
}
pub fn get_underlying(&self) -> mongodb::Collection<M> {
self.coll.clone_with_type()
}
pub fn cast_model<OtherModel>(self) -> Repository<OtherModel>
where
OtherModel: Model<CollConf = M::CollConf>,
{
Repository {
db: self.db,
coll: self.coll.clone_with_type(),
}
}
pub async fn bulk_update<V, U>(&self, updates: V) -> Result<BulkUpdateResult>
where
V: Borrow<Vec<U>> + Send + Sync,
U: Borrow<BulkUpdate> + Send + Sync,
{
self.coll.bulk_update(&self.db, updates).await
}
}
#[async_trait]
pub trait CollectionExt {
async fn bulk_update<V, U>(
&self,
db: &mongodb::Database,
updates: V,
) -> Result<BulkUpdateResult>
where
V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
U: 'async_trait + Send + Sync + Borrow<BulkUpdate>;
}
#[async_trait]
impl<M: Send + Sync> CollectionExt for mongodb::Collection<M> {
async fn bulk_update<V, U>(
&self,
db: &mongodb::Database,
updates: V,
) -> Result<BulkUpdateResult>
where
V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
U: 'async_trait + Send + Sync + Borrow<BulkUpdate>,
{
let updates = updates.borrow();
let mut update_docs = Vec::with_capacity(updates.len());
for u in updates {
let u = u.borrow();
let mut doc = doc! {
"q": &u.query,
"u": &u.update,
"multi": false,
};
if let Some(options) = &u.options {
if let Some(ref upsert) = options.upsert {
doc.insert("upsert", upsert);
}
if let Some(ref collation) = options.collation {
doc.insert("collation", serialize_to_bson(collation)?);
}
if let Some(ref array_filters) = options.array_filters {
doc.insert("arrayFilters", array_filters);
}
if let Some(ref hint) = options.hint {
doc.insert("hint", serialize_to_bson(hint)?);
}
}
update_docs.push(doc);
}
let mut command = doc! {
"update": self.name(),
"updates": update_docs,
};
if let Some(ref write_concern) = self.write_concern() {
command.insert("writeConcern", serialize_to_bson(write_concern)?);
}
let res = db.run_command(command).await?;
Ok(deserialize_from_document(res)?)
}
}