use std::collections::HashMap;
use std::error::Error;
use bson;
use bson::Document;
use bson::oid::ObjectId;
use mongodb;
use mongodb::error::Error::{
DecoderError,
DefaultError,
OIDError,
ResponseError,
};
use mongodb::coll::options::{
FindOneAndUpdateOptions,
FindOptions,
IndexModel,
IndexOptions,
ReturnDocument,
};
use mongodb::common::WriteConcern;
use mongodb::db::{
Database,
ThreadedDatabase,
};
use serde::{
Serialize,
Deserialize,
};
pub const DEFAULT_INDEX: &str = "_id";
pub fn basic_index_options(name: &str, background: bool, unique: Option<bool>, expire_after_seconds: Option<i32>, sparse: Option<bool>) -> IndexOptions {
return IndexOptions{
name: Some(name.to_owned()),
background: Some(background),
unique,
expire_after_seconds,
sparse,
storage_engine: None,
version: None,
default_language: None,
language_override: None,
text_version: None,
weights: None,
sphere_version: None,
bits: None,
max: None,
min: None,
bucket_size: None,
};
}
pub trait Model<'a> where Self: Serialize + Deserialize<'a> {
const COLLECTION_NAME: &'static str;
fn id(&self) -> Option<ObjectId>;
fn set_id(&mut self, ObjectId);
fn model_write_concern() -> WriteConcern {
return WriteConcern{
w: Self::write_concern_w(),
w_timeout: Self::write_concern_w_timeout(),
j: Self::write_concern_j(),
fsync: Self::write_concern_fsync(),
};
}
fn write_concern_w() -> i32 {
return 1;
}
fn write_concern_w_timeout() -> i32 {
return 0;
}
fn write_concern_j() -> bool {
return true;
}
fn write_concern_fsync() -> bool {
return false;
}
fn find(db: Database, filter: Option<Document>, options: Option<FindOptions>) -> mongodb::error::Result<Vec<Self>> {
let coll = db.collection(Self::COLLECTION_NAME);
let mut cursor = match coll.find(filter, options) {
Ok(cursor) => cursor,
Err(err) => return Err(err),
};
let bson_docs = match cursor.drain_current_batch() {
Ok(docs) => docs,
Err(err) => return Err(err),
};
let mut instances: Vec<Self> = vec![];
for doc in bson_docs {
let inst = Self::instance_from_document(doc)?;
instances.push(inst);
}
Ok(instances)
}
fn find_one(db: Database, filter: Option<Document>, options: Option<FindOptions>) -> mongodb::error::Result<Option<Self>> {
let coll = db.collection(Self::COLLECTION_NAME);
let doc_option = match coll.find_one(filter, options) {
Ok(doc_option) => doc_option,
Err(err) => return Err(err),
};
let doc = match doc_option {
Some(doc) => doc,
None => return Ok(None),
};
let instance = Self::instance_from_document(doc)?;
Ok(Some(instance))
}
fn save(&mut self, db: Database, filter: Option<Document>) -> mongodb::error::Result<()> {
let coll = db.collection(Self::COLLECTION_NAME);
let instance_doc = match bson::to_bson(&self)? {
bson::Bson::Document(doc) => doc,
_ => return Err(DefaultError("Failed to convert struct to a bson document.".to_string())),
};
let mut write_concern = Self::model_write_concern();
write_concern.j = true;
let mut id_needs_update = false;
let _filter = if let Some(id) = self.id() {
doc!{"_id" => id}
} else if filter == None {
let new_id = match ObjectId::new() {
Ok(new) => new,
Err(err) => return Err(OIDError(err)),
};
self.set_id(new_id.clone());
doc!{"_id" => new_id}
} else {
id_needs_update = true;
filter.unwrap()
};
let opts = FindOneAndUpdateOptions{upsert: Some(true), write_concern: Some(write_concern), return_document: Some(ReturnDocument::After), sort: None, projection: None, max_time_ms: None};
let updated_doc = match coll.find_one_and_replace(_filter, instance_doc, Some(opts))? {
Some(doc) => doc,
None => return Err(ResponseError("Server failed to return the updated document. Update may have failed.".to_owned())),
};
if id_needs_update {
let response_id = match updated_doc.get_object_id("_id") {
Ok(id) => id,
Err(_) => return Err(ResponseError("Server failed to return ObjectId of updated document.".to_owned())),
};
self.set_id(response_id.clone());
};
return Ok(());
}
fn instance_from_document(document: bson::Document) -> mongodb::error::Result<Self> {
match bson::from_bson::<Self>(bson::Bson::Document(document)) {
Ok(inst) => Ok(inst),
Err(err) => Err(DecoderError(err)),
}
}
fn indexes() -> Vec<IndexModel> {
return vec![];
}
fn sync(db: Database) {
let coll = db.collection(Self::COLLECTION_NAME);
println!("Synchronizing indexes for collection model: '{}'.", Self::COLLECTION_NAME);
let mut current_indexes_map: HashMap<String, Document> = HashMap::new();
let err_msg = format!("Error while fetching current indexes for '{}'.", Self::COLLECTION_NAME);
if let Ok(cursor) = coll.list_indexes() {
for doc_opt in cursor {
let doc = doc_opt.expect(&err_msg);
let idx_keys = doc.get_document("key").expect("Returned index appears to be malformed.");
let key = idx_keys.keys().fold("".to_owned(), |acc, bkey| acc + bkey);
current_indexes_map.insert(key, doc.clone());
}
}
let mut target_indexes_map: HashMap<String, IndexModel> = HashMap::new();
for model in Self::indexes().iter() {
let key = model.keys.keys().fold("".to_owned(), |acc, bkey| acc + bkey);
target_indexes_map.insert(key, model.to_owned());
}
let mut indexes_to_create = vec![];
for (key, index_model) in target_indexes_map.iter() {
if !current_indexes_map.contains_key(key) {
indexes_to_create.push(index_model)
}
}
let mut indexes_to_remove = vec![];
for (key, index_doc) in current_indexes_map {
if &key == DEFAULT_INDEX {
continue
}
if !target_indexes_map.contains_key(&key) {
indexes_to_remove.push(index_doc);
}
}
for model in indexes_to_create {
println!("Syncing index: {:?}", model); match coll.create_index_model(model.clone()) {
Ok(_) => println!("Index synced: {:?}", model), Err(err) => panic!("Failed to create index: {}", err.description()),
};
}
for doc in indexes_to_remove {
println!("Removing index: {:?}", doc); match coll.drop_index_string(doc.get_str("name").expect("Expected to find index name.").to_owned()) {
Ok(_) => println!("Index removed: {:?}", doc), Err(err) => panic!("Failed to remove index: {}", err.description()),
};
}
println!("Finished synchronizing indexes."); }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_index_options_returns_expected_output() {
let output = basic_index_options("testing", true, None, None, None);
assert!(output.name == Some("testing".to_string()));
assert!(output.background == Some(true));
assert!(output.unique == None);
assert!(output.expire_after_seconds == None);
assert!(output.sparse == None);
assert!(output.storage_engine == None);
assert!(output.version == None);
assert!(output.default_language == None);
assert!(output.language_override == None);
assert!(output.text_version == None);
assert!(output.weights == None);
assert!(output.sphere_version == None);
assert!(output.bits == None);
assert!(output.max == None);
assert!(output.min == None);
assert!(output.bucket_size == None);
}
}