use crate::{CollectionConfig, Model};
use async_trait::async_trait;
use mongodb::bson::oid::ObjectId;
use mongodb::bson::{doc, from_document, to_bson, Document};
use mongodb::error::Result;
use mongodb::options::*;
use serde::Deserialize;
use std::borrow::Borrow;
use std::ops::Deref;
#[derive(Debug)]
pub struct BulkUpdate {
pub query: Document,
pub update: Document,
pub options: Option<UpdateOptions>,
}
#[derive(Debug, Deserialize)]
pub struct BulkUpdateResult {
#[serde(rename = "n")]
pub nb_affected: u64,
#[serde(rename = "nModified")]
pub nb_modified: u64,
#[serde(default)]
pub upserted: Vec<BulkUpdateUpsertResult>,
}
#[derive(Debug, Deserialize)]
pub struct BulkUpdateUpsertResult {
pub index: u64,
#[serde(alias = "_id")]
pub id: ObjectId,
}
#[derive(Debug)]
pub struct Repository<M: Model> {
db: mongodb::Database, coll: mongodb::Collection<M>,
}
impl<M: Model> Deref for Repository<M> {
type Target = mongodb::Collection<M>;
fn deref(&self) -> &mongodb::Collection<M> {
&self.coll
}
}
impl<M: Model> Clone for Repository<M> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
coll: self.coll.clone_with_type(),
}
}
}
impl<M: Model> Repository<M> {
pub fn new(db: mongodb::Database) -> Self {
let coll = if let Some(options) = M::CollConf::collection_options() {
db.collection_with_options(M::CollConf::collection_name(), options)
} else {
db.collection(M::CollConf::collection_name())
};
Self { db, coll }
}
pub fn new_with_options(db: mongodb::Database, options: CollectionOptions) -> Self {
let coll = db.collection_with_options(M::CollConf::collection_name(), options);
Self { db, coll }
}
pub fn collection_name(&self) -> &'static str {
M::CollConf::collection_name()
}
pub fn get_underlying(&self) -> mongodb::Collection<M> {
self.coll.clone_with_type()
}
pub fn cast_model<OtherModel>(self) -> Repository<OtherModel>
where
OtherModel: Model<CollConf = M::CollConf>,
{
Repository {
db: self.db,
coll: self.coll.clone_with_type(),
}
}
pub async fn bulk_update<V, U>(&self, updates: V) -> Result<BulkUpdateResult>
where
M: Send + Sync,
V: Borrow<Vec<U>> + Send + Sync,
U: Borrow<BulkUpdate> + Send + Sync,
{
Ok(self.coll.bulk_update(&self.db, updates).await?)
}
}
#[async_trait]
pub trait CollectionExt {
async fn bulk_update<V, U>(
&self,
db: &mongodb::Database,
updates: V,
) -> Result<BulkUpdateResult>
where
V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
U: 'async_trait + Send + Sync + Borrow<BulkUpdate>;
}
#[async_trait]
impl<M: Send + Sync> CollectionExt for mongodb::Collection<M> {
async fn bulk_update<V, U>(
&self,
db: &mongodb::Database,
updates: V,
) -> Result<BulkUpdateResult>
where
V: 'async_trait + Send + Sync + Borrow<Vec<U>>,
U: 'async_trait + Send + Sync + Borrow<BulkUpdate>,
{
let updates = updates.borrow();
let mut update_docs = Vec::with_capacity(updates.len());
for u in updates {
let u = u.borrow();
let mut doc = doc! {
"q": &u.query,
"u": &u.update,
"multi": false,
};
if let Some(options) = &u.options {
if let Some(ref upsert) = options.upsert {
doc.insert("upsert", upsert);
}
if let Some(ref collation) = options.collation {
doc.insert("collation", to_bson(collation)?);
}
if let Some(ref array_filters) = options.array_filters {
doc.insert("arrayFilters", array_filters);
}
if let Some(ref hint) = options.hint {
doc.insert("hint", to_bson(hint)?);
}
}
update_docs.push(doc);
}
let mut command = doc! {
"update": self.name(),
"updates": update_docs,
};
if let Some(ref write_concern) = self.write_concern() {
command.insert("writeConcern", to_bson(write_concern)?);
}
let res = db.run_command(command, None).await?;
Ok(from_document(res)?)
}
}