use std::collections::HashMap;
use async_trait::async_trait;
use mongodb::bson::oid::ObjectId;
use mongodb::bson::{doc, from_bson, to_bson};
use mongodb::bson::{Bson, Document};
use mongodb::options;
use mongodb::results::DeleteResult;
use mongodb::{Collection, Database};
use serde::{de::DeserializeOwned, Serialize};
use crate::common::IndexModel;
use crate::cursor::ModelCursor;
use crate::error::{Result, WitherError};
const MONGO_ID_INDEX_NAME: &str = "_id_";
const MONGO_DIFF_INDEX_BLACKLIST: [&str; 3] = ["v", "ns", "key"];
#[cfg_attr(feature = "docinclude", doc(include = "../docs/model-derive.md"))]
#[cfg_attr(feature = "docinclude", doc(include = "../docs/model-sync.md"))]
#[cfg_attr(feature = "docinclude", doc(include = "../docs/logging.md"))]
#[cfg_attr(feature = "docinclude", doc(include = "../docs/underlying-driver.md"))]
#[async_trait]
pub trait Model
where
Self: Serialize + DeserializeOwned,
{
const COLLECTION_NAME: &'static str;
fn id(&self) -> Option<ObjectId>;
fn set_id(&mut self, id: ObjectId);
fn read_concern() -> Option<options::ReadConcern> {
None
}
fn write_concern() -> Option<options::WriteConcern> {
None
}
fn selection_criteria() -> Option<options::SelectionCriteria> {
None
}
fn collection(db: &Database) -> Collection {
db.collection_with_options(
Self::COLLECTION_NAME,
options::CollectionOptions::builder()
.selection_criteria(Self::selection_criteria())
.read_concern(Self::read_concern())
.write_concern(Self::write_concern())
.build(),
)
}
async fn find<F, O>(db: &Database, filter: F, options: O) -> Result<ModelCursor<Self>>
where
F: Into<Option<Document>> + Send,
O: Into<Option<options::FindOptions>> + Send,
{
Ok(Self::collection(db).find(filter, options).await.map(ModelCursor::new)?)
}
async fn find_one<F, O>(db: &Database, filter: F, options: O) -> Result<Option<Self>>
where
F: Into<Option<Document>> + Send,
O: Into<Option<options::FindOneOptions>> + Send,
{
Ok(Self::collection(db)
.find_one(filter, options)
.await?
.map(Self::instance_from_document)
.transpose()?)
}
async fn find_one_and_delete<O>(db: &Database, filter: Document, options: O) -> Result<Option<Self>>
where
O: Into<Option<options::FindOneAndDeleteOptions>> + Send,
{
Ok(Self::collection(db)
.find_one_and_delete(filter, options)
.await?
.map(Self::instance_from_document)
.transpose()?)
}
async fn find_one_and_replace<O>(db: &Database, filter: Document, replacement: Document, options: O) -> Result<Option<Self>>
where
O: Into<Option<options::FindOneAndReplaceOptions>> + Send,
{
Ok(Self::collection(db)
.find_one_and_replace(filter, replacement, options)
.await?
.map(Self::instance_from_document)
.transpose()?)
}
async fn find_one_and_update<U, O>(db: &Database, filter: Document, update: U, options: O) -> Result<Option<Self>>
where
U: Into<options::UpdateModifications> + Send,
O: Into<Option<options::FindOneAndUpdateOptions>> + Send,
{
Ok(Self::collection(db)
.find_one_and_update(filter, update, options)
.await?
.map(Self::instance_from_document)
.transpose()?)
}
async fn save(&mut self, db: &Database, filter: Option<Document>) -> Result<()> {
let coll = Self::collection(db);
let instance_doc = Self::document_from_instance(&self)?;
let mut write_concern = Self::write_concern().unwrap_or_default();
write_concern.journal = Some(true);
let mut id_needs_update = false;
let filter = match (self.id(), filter) {
(Some(id), _) => doc! {"_id": id},
(None, None) => {
let new_id = ObjectId::new();
self.set_id(new_id.clone());
doc! {"_id": new_id}
}
(None, Some(filter)) => {
id_needs_update = true;
filter
}
};
let opts = options::FindOneAndReplaceOptions::builder()
.upsert(Some(true))
.write_concern(Some(write_concern))
.return_document(Some(options::ReturnDocument::After))
.build();
let updated_doc = coll
.find_one_and_replace(filter, instance_doc, Some(opts))
.await?
.ok_or(WitherError::ServerFailedToReturnUpdatedDoc)?;
if id_needs_update {
let response_id = updated_doc.get_object_id("_id").map_err(|_| WitherError::ServerFailedToReturnObjectId)?;
self.set_id(response_id.clone());
};
Ok(())
}
async fn update(self, db: &Database, filter: Option<Document>, update: Document, opts: Option<options::FindOneAndUpdateOptions>) -> Result<Self> {
let id = self.id().ok_or(WitherError::ModelIdRequiredForOperation)?;
let filter = match filter {
Some(mut doc) => {
doc.insert("_id", id);
doc
}
None => doc! {"_id": id},
};
let options = match opts {
Some(mut options) => {
options.write_concern = match options.write_concern {
Some(mut wc) => {
wc.journal = Some(true);
Some(wc)
}
None => {
let mut wc = Self::write_concern().unwrap_or_default();
wc.journal = Some(true);
Some(wc)
}
};
options
}
None => {
let mut options = options::FindOneAndUpdateOptions::default();
let mut wc = Self::write_concern().unwrap_or_default();
wc.journal = Some(true);
options.write_concern = Some(wc);
options
}
};
Ok(Self::collection(db)
.find_one_and_update(filter, update, Some(options))
.await?
.map(Self::instance_from_document)
.transpose()?
.ok_or(WitherError::ServerFailedToReturnUpdatedDoc)?)
}
async fn delete(&self, db: &Database) -> Result<DeleteResult> {
let id = self.id().ok_or(WitherError::ModelIdRequiredForOperation)?;
Ok(Self::collection(db).delete_one(doc! {"_id": id}, None).await?)
}
async fn delete_many<O>(db: &Database, filter: Document, options: O) -> Result<DeleteResult>
where
O: Into<Option<options::DeleteOptions>> + Send,
{
Ok(Self::collection(db).delete_many(filter, options).await?)
}
fn instance_from_document(document: Document) -> Result<Self> {
Ok(from_bson::<Self>(Bson::Document(document))?)
}
fn document_from_instance(&self) -> Result<Document> {
match to_bson(&self)? {
Bson::Document(doc) => Ok(doc),
bsn => Err(WitherError::ModelSerToDocument(bsn.element_type())),
}
}
fn indexes() -> Vec<IndexModel> {
vec![]
}
async fn sync(db: &Database) -> Result<()> {
let coll = Self::collection(db);
let current_indexes = get_current_indexes(&db, &coll).await?;
sync_model_indexes(db, &coll, Self::indexes(), current_indexes).await?;
Ok(())
}
async fn get_current_indexes(db: &Database) -> Result<HashMap<String, IndexModel>> {
let coll = Self::collection(db);
get_current_indexes(db, &coll).await
}
}
async fn get_current_indexes(db: &Database, coll: &Collection) -> Result<HashMap<String, IndexModel>> {
let list_indexes = match db.run_command(doc! {"listIndexes": coll.name()}, None).await {
Ok(list_indexes) => list_indexes,
Err(err) => match err.kind.as_ref() {
mongodb::error::ErrorKind::CommandError(err) if err.code == 26 => doc! {},
_ => return Err(err.into()),
},
};
Ok(build_index_map(list_indexes))
}
fn generate_index_name_from_keys(keys: &Document) -> String {
let mut key = keys.iter().fold(String::from(""), |mut acc, (key, value)| {
acc.push_str(&format!("{}_{}_", key, value.as_i32().unwrap_or(0)));
acc
});
key.pop();
key
}
fn build_index_map(list_index: Document) -> HashMap<String, IndexModel> {
let cursor = match list_index.get("cursor") {
Some(cursor) => cursor,
None => return Default::default(),
};
let doc = match cursor.as_document() {
Some(doc) => doc,
None => return Default::default(),
};
let first_batch = match doc.get_array("firstBatch").ok() {
Some(first_batch) => first_batch,
None => return Default::default(),
};
let index_map = first_batch
.iter()
.filter_map(|bson| bson.as_document().cloned())
.filter(|doc| {
match doc.get_str("name").ok() {
Some(name) if name == MONGO_ID_INDEX_NAME => false, Some(_) => true, None => false, }
})
.fold(HashMap::new(), |mut acc, doc| {
let idx_keys = match doc.get_document("key").ok() {
Some(idx_keys) => idx_keys,
None => return acc,
};
let index_name = generate_index_name_from_keys(idx_keys);
let mut options = Document::new();
doc.iter().for_each(|(b_key, b_value)| {
if !MONGO_DIFF_INDEX_BLACKLIST.contains(&b_key.as_str()) {
options.insert(b_key.to_string(), b_value);
}
});
let model = IndexModel::new(idx_keys.clone(), Some(options));
acc.insert(index_name, model);
acc
});
index_map
}
async fn sync_model_indexes<'a>(
db: &'a Database, coll: &'a Collection, model_indexes: Vec<IndexModel>, current_indexes_map: HashMap<String, IndexModel>,
) -> Result<()> {
log::info!("Synchronizing indexes for '{}'.", coll.namespace());
let aspired_indexes_map = model_indexes.iter().fold(HashMap::new(), |mut acc, model| {
let mut target_model = model.clone();
let key = generate_index_name_from_keys(&model.keys);
match &mut target_model.options {
Some(options) => {
if options.get_str("name").ok().is_none() {
options.insert("name", key.clone());
}
}
None => {
let options = doc! { "name": key.clone() };
target_model.options = Some(options);
}
}
acc.insert(key, target_model);
acc
});
let mut indexes_to_drop = current_indexes_map.iter().fold(vec![], |mut acc, (key, _)| {
if !aspired_indexes_map.contains_key(key) {
acc.push(key);
}
acc
});
let mut indexes_to_create: HashMap<String, IndexModel> = HashMap::new();
for (aspired_index_name, aspired_index) in aspired_indexes_map.iter() {
let current_index = match current_indexes_map.get(aspired_index_name) {
Some(current_index) => current_index,
None => {
indexes_to_create.insert(aspired_index_name.clone(), aspired_index.clone());
continue;
}
};
if aspired_index.options != current_index.options {
indexes_to_drop.push(aspired_index_name);
indexes_to_create.insert(aspired_index_name.clone(), aspired_index.clone());
}
}
for index_name in indexes_to_drop {
let drop_command = doc! {
"dropIndexes": coll.name(),
"index": index_name,
};
db.run_command(drop_command, None).await?;
}
let indexes_to_create = indexes_to_create.into_iter().fold(vec![], |mut acc, (_, index_model)| {
let mut index_doc = Document::new();
index_doc.insert("key", index_model.keys);
if let Some(options) = index_model.options {
index_doc.extend(options);
}
acc.push(index_doc);
acc
});
if !indexes_to_create.is_empty() {
db.run_command(
doc! {
"createIndexes": coll.name(),
"indexes": indexes_to_create,
},
None,
)
.await?;
}
log::info!("Synchronized indexes for '{}'.", coll.namespace());
Ok(())
}