use std::collections::HashMap;
use std::error::Error;
use mongodb::{
Bson, Document,
coll::{
Collection,
options::{
CountOptions,
FindOneAndDeleteOptions,
FindOneAndUpdateOptions,
FindOptions,
IndexModel,
IndexOptions,
ReturnDocument,
},
},
common::WriteConcern,
db::{Database, ThreadedDatabase},
error::{
Error::{ArgumentError, DecoderError, DefaultError, OIDError, ResponseError},
Result,
},
oid::ObjectId,
to_bson, from_bson,
};
use serde::{Serialize, Deserialize};
pub const DEFAULT_INDEX: &str = "_id";
pub fn basic_index_options(name: &str, background: bool, unique: Option<bool>, expire_after_seconds: Option<i32>, sparse: Option<bool>) -> IndexOptions {
return IndexOptions{
name: Some(name.to_owned()),
background: Some(background),
unique,
expire_after_seconds,
sparse,
storage_engine: None,
version: None,
default_language: None,
language_override: None,
text_version: None,
weights: None,
sphere_version: None,
bits: None,
max: None,
min: None,
bucket_size: None,
};
}
#[cfg_attr(feature="docinclude", doc(include="../docs/model-derive.md"))]
#[cfg_attr(feature="docinclude", doc(include="../docs/model-sync.md"))]
#[cfg_attr(feature="docinclude", doc(include="../docs/logging.md"))]
#[cfg_attr(feature="docinclude", doc(include="../docs/manually-implementing-model.md"))]
#[cfg_attr(feature="docinclude", doc(include="../docs/underlying-driver.md"))]
pub trait Model<'a> where Self: Serialize + Deserialize<'a> {
const COLLECTION_NAME: &'static str;
fn id(&self) -> Option<ObjectId>;
fn set_id(&mut self, ObjectId);
fn model_write_concern() -> WriteConcern {
WriteConcern{
w: Self::write_concern_w(),
w_timeout: Self::write_concern_w_timeout(),
j: Self::write_concern_j(),
fsync: Self::write_concern_fsync(),
}
}
fn write_concern_w() -> i32 {
1
}
fn write_concern_w_timeout() -> i32 {
0
}
fn write_concern_j() -> bool {
true
}
fn write_concern_fsync() -> bool {
false
}
fn count(db: Database, filter: Option<Document>, options: Option<CountOptions>) -> Result<i64> {
let coll = db.collection(Self::COLLECTION_NAME);
coll.count(filter, options)
}
fn find(db: Database, filter: Option<Document>, options: Option<FindOptions>) -> Result<Vec<Self>> {
let coll = db.collection(Self::COLLECTION_NAME);
let mut cursor = match coll.find(filter, options) {
Ok(cursor) => cursor,
Err(err) => return Err(err),
};
let bson_docs = match cursor.drain_current_batch() {
Ok(docs) => docs,
Err(err) => return Err(err),
};
let mut instances: Vec<Self> = vec![];
for doc in bson_docs {
let inst = Self::instance_from_document(doc)?;
instances.push(inst);
}
Ok(instances)
}
fn delete_many(db: Database, filter: Document) -> Result<()> {
let coll = db.collection(Self::COLLECTION_NAME);
coll.delete_many(filter, Some(Self::model_write_concern()))?;
Ok(())
}
fn find_one(db: Database, filter: Option<Document>, options: Option<FindOptions>) -> Result<Option<Self>> {
let coll = db.collection(Self::COLLECTION_NAME);
let doc_option = match coll.find_one(filter, options) {
Ok(doc_option) => doc_option,
Err(err) => return Err(err),
};
let doc = match doc_option {
Some(doc) => doc,
None => return Ok(None),
};
let instance = Self::instance_from_document(doc)?;
Ok(Some(instance))
}
fn find_one_and_delete(db: Database, filter: Document, options: Option<FindOneAndDeleteOptions>) -> Result<Option<Self>> {
db.collection(Self::COLLECTION_NAME).find_one_and_delete(filter, options)
.and_then(|docopt| match docopt {
Some(doc) => match Self::instance_from_document(doc) {
Ok(model) => Ok(Some(model)),
Err(err) => Err(err),
}
None => Ok(None),
})
}
fn find_one_and_replace(db: Database, filter: Document, replacement: Document, options: Option<FindOneAndUpdateOptions>) -> Result<Option<Self>> {
db.collection(Self::COLLECTION_NAME).find_one_and_replace(filter, replacement, options)
.and_then(|docopt| match docopt {
Some(doc) => match Self::instance_from_document(doc) {
Ok(model) => Ok(Some(model)),
Err(err) => Err(err),
}
None => Ok(None),
})
}
fn find_one_and_update(db: Database, filter: Document, update: Document, options: Option<FindOneAndUpdateOptions>) -> Result<Option<Self>> {
db.collection(Self::COLLECTION_NAME).find_one_and_update(filter, update, options)
.and_then(|docopt| match docopt {
Some(doc) => match Self::instance_from_document(doc) {
Ok(model) => Ok(Some(model)),
Err(err) => Err(err),
}
None => Ok(None),
})
}
fn delete(&self, db: Database) -> Result<()> {
let id = self.id().ok_or(DefaultError("This instance has no ID. Can not be deleted.".to_string()))?;
let coll = db.collection(Self::COLLECTION_NAME);
coll.delete_one(doc!{"_id": id}, Some(Self::model_write_concern()))?;
Ok(())
}
fn save(&mut self, db: Database, filter: Option<Document>) -> Result<()> {
let coll = db.collection(Self::COLLECTION_NAME);
let instance_doc = match to_bson(&self)? {
Bson::Document(doc) => doc,
_ => return Err(DefaultError("Failed to convert struct to a bson document.".to_string())),
};
let mut write_concern = Self::model_write_concern();
write_concern.j = true;
let mut id_needs_update = false;
let _filter = if let Some(id) = self.id() {
doc!{"_id": id}
} else if filter == None {
let new_id = match ObjectId::new() {
Ok(new) => new,
Err(err) => return Err(OIDError(err)),
};
self.set_id(new_id.clone());
doc!{"_id": new_id}
} else {
id_needs_update = true;
filter.unwrap()
};
let opts = FindOneAndUpdateOptions{upsert: Some(true), write_concern: Some(write_concern), return_document: Some(ReturnDocument::After), sort: None, projection: None, max_time_ms: None};
let updated_doc = match coll.find_one_and_replace(_filter, instance_doc, Some(opts))? {
Some(doc) => doc,
None => return Err(ResponseError("Server failed to return the updated document. Update may have failed.".to_owned())),
};
if id_needs_update {
let response_id = match updated_doc.get_object_id("_id") {
Ok(id) => id,
Err(_) => return Err(ResponseError("Server failed to return ObjectId of updated document.".to_owned())),
};
self.set_id(response_id.clone());
};
return Ok(());
}
fn update(self, db: Database, filter: Option<Document>, update: Document, opts: Option<FindOneAndUpdateOptions>) -> Result<Option<Self>> {
let coll = db.collection(Self::COLLECTION_NAME);
let id = match self.id() {
Some(id) => id,
None => {
return Err(ArgumentError("Model must have an ObjectId for this operation.".to_owned()));
}
};
let filter = match filter {
Some(mut doc) => {
doc.insert("_id", id);
doc
}
None => doc!{"_id": id},
};
let options = match opts {
Some(mut options) => {
options.write_concern = match options.write_concern {
Some(mut wc) => {
wc.j = true;
Some(wc)
},
None => {
let mut wc = Self::model_write_concern();
wc.j = true;
Some(wc)
}
};
options
},
None => {
let mut options = FindOneAndUpdateOptions::default();
let mut wc = Self::model_write_concern();
wc.j = true;
options.write_concern = Some(wc);
options
}
};
coll.find_one_and_update(filter, update, Some(options)).and_then(|docopt| match docopt {
Some(doc) => match Self::instance_from_document(doc) {
Ok(model) => Ok(Some(model)),
Err(err) => Err(err),
}
None => Ok(None),
})
}
fn instance_from_document(document: Document) -> Result<Self> {
match from_bson::<Self>(Bson::Document(document)) {
Ok(inst) => Ok(inst),
Err(err) => Err(DecoderError(err)),
}
}
fn indexes() -> Vec<IndexModel> {
vec![]
}
fn sync(db: Database) -> Result<()> {
let coll = db.collection(Self::COLLECTION_NAME);
sync_model_indexes(&coll, Self::indexes())?;
Ok(())
}
}
fn sync_model_indexes<'a>(coll: &'a Collection, indexes: Vec<IndexModel>) -> Result<()> {
info!("Synchronizing indexes for '{}'.", coll.namespace);
let _ = coll.db.create_collection(coll.name().as_str(), None); let mut current_indexes_map: HashMap<String, Document> = HashMap::new();
let indices = coll.list_indexes()
.map_err(|err| DefaultError(format!("Error while fetching current indexes for '{}': {:?}", coll.namespace, err.description())))?
.filter_map(|doc_res| doc_res.ok());
for doc in indices {
let idx_keys = doc.get_document("key")
.map_err(|err| DefaultError(format!("Error extracting 'key' of index document: {:?}", err.description())))?;
let key = idx_keys.keys().fold(String::from(""), |acc, bkey| acc + bkey);
current_indexes_map.insert(key, doc.clone());
}
let mut target_indexes_map: HashMap<String, IndexModel> = HashMap::new();
for model in indexes.iter() {
let key = model.keys.keys().fold(String::from(""), |acc, bkey| acc + bkey);
target_indexes_map.insert(key, model.to_owned());
}
let mut indexes_to_create = vec![];
for (key, index_model) in target_indexes_map.iter() {
if !current_indexes_map.contains_key(key) {
indexes_to_create.push(index_model)
}
}
let mut indexes_to_remove = vec![];
for (key, index_doc) in current_indexes_map {
if &key == DEFAULT_INDEX {
continue
}
if !target_indexes_map.contains_key(&key) {
indexes_to_remove.push(index_doc);
}
}
for model in indexes_to_create {
coll.create_index_model(model.clone())
.map_err(|err| DefaultError(format!("Failed to create index: {}", err.description())))?;
}
for doc in indexes_to_remove {
let index_name = String::from(
doc.get_str("name").map_err(|err| DefaultError(format!("Failed to get index name: {:?}", err.description())))?
);
coll.drop_index_string(index_name)
.map_err(|err| DefaultError(format!("Failed to remove index: {}", err.description())))?;
}
info!("Finished synchronizing indexes for '{}'.", coll.namespace);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_index_options_returns_expected_output() {
let output = basic_index_options("testing", true, None, None, None);
assert!(output.name == Some("testing".to_string()));
assert!(output.background == Some(true));
assert!(output.unique == None);
assert!(output.expire_after_seconds == None);
assert!(output.sparse == None);
assert!(output.storage_engine == None);
assert!(output.version == None);
assert!(output.default_language == None);
assert!(output.language_override == None);
assert!(output.text_version == None);
assert!(output.weights == None);
assert!(output.sphere_version == None);
assert!(output.bits == None);
assert!(output.max == None);
assert!(output.min == None);
assert!(output.bucket_size == None);
}
}