mod batch;
pub mod error;
pub mod options;
pub mod results;
use bson::{self, Bson, oid};
use command_type::CommandType;
use self::batch::{Batch, DeleteModel, UpdateModel};
use self::error::{BulkWriteException, WriteException};
use self::options::*;
use self::results::*;
use ThreadedClient;
use common::{ReadPreference, WriteConcern};
use cursor::Cursor;
use db::{Database, ThreadedDatabase};
use Result;
use Error::{ArgumentError, ResponseError,OperationError, BulkWriteError};
use wire_protocol::flags::OpQueryFlags;
use std::collections::{BTreeMap, VecDeque};
use std::iter::FromIterator;
pub struct Collection {
pub db: Database,
pub namespace: String,
read_preference: ReadPreference,
write_concern: WriteConcern,
}
impl Collection {
pub fn new(db: Database, name: &str, create: bool,
read_preference: Option<ReadPreference>,
write_concern: Option<WriteConcern>) -> Collection {
let rp = read_preference.unwrap_or(db.read_preference.to_owned());
let wc = write_concern.unwrap_or(db.write_concern.to_owned());
if create {
let _ = db.create_collection(name, None);
}
Collection {
db: db.clone(),
namespace: format!("{}.{}", db.name, name),
read_preference: rp,
write_concern: wc,
}
}
pub fn get_req_id(&self) -> i32 {
self.db.client.get_req_id()
}
pub fn name(&self) -> String {
match self.namespace.find(".") {
Some(idx) => self.namespace[self.namespace.char_indices()
.nth(idx+1).unwrap().0..].to_owned(),
None => {
let msg = format!("Invalid namespace specified: '{}'.", self.namespace);
panic!(msg);
}
}
}
pub fn drop(&self) -> Result<()> {
self.db.drop_collection(&self.name()[..])
}
pub fn aggregate(&self, pipeline: Vec<bson::Document>,
options: Option<AggregateOptions>) -> Result<Cursor> {
let opts = options.unwrap_or(AggregateOptions::new());
let pipeline_map = pipeline.iter().map(|bdoc| {
Bson::Document(bdoc.to_owned())
}).collect();
let mut spec = bson::Document::new();
let mut cursor = bson::Document::new();
cursor.insert("batchSize", Bson::I32(opts.batch_size));
spec.insert("aggregate", Bson::String(self.name()));
spec.insert("pipeline", Bson::Array(pipeline_map));
spec.insert("cursor", Bson::Document(cursor));
if opts.allow_disk_use {
spec.insert("allowDiskUse", Bson::Boolean(opts.allow_disk_use));
}
let read_pref = opts.read_preference.unwrap_or(self.read_preference.to_owned());
self.db.command_cursor(spec, CommandType::Aggregate, read_pref)
}
pub fn count(&self, filter: Option<bson::Document>,
options: Option<CountOptions>) -> Result<i64> {
let opts = options.unwrap_or(CountOptions::new());
let mut spec = bson::Document::new();
spec.insert("count", Bson::String(self.name()));
spec.insert("skip", Bson::I64(opts.skip as i64));
spec.insert("limit", Bson::I64(opts.limit));
if filter.is_some() {
spec.insert("query", Bson::Document(filter.unwrap()));
}
if opts.hint_doc.is_some() {
spec.insert("hint", Bson::Document(opts.hint_doc.unwrap()));
} else if opts.hint.is_some() {
spec.insert("hint", Bson::String(opts.hint.unwrap()));
}
let read_pref = opts.read_preference.unwrap_or(self.read_preference.to_owned());
let result = try!(self.db.command(spec, CommandType::Count, Some(read_pref)));
match result.get("n") {
Some(&Bson::I32(ref n)) => Ok(*n as i64),
Some(&Bson::I64(ref n)) => Ok(*n),
_ => Err(ResponseError("No count received from server.".to_owned())),
}
}
pub fn distinct(&self, field_name: &str, filter: Option<bson::Document>,
options: Option<DistinctOptions>) -> Result<Vec<Bson>> {
let opts = options.unwrap_or(DistinctOptions::new());
let mut spec = bson::Document::new();
spec.insert("distinct", Bson::String(self.name()));
spec.insert("key", Bson::String(field_name.to_owned()));
if filter.is_some() {
spec.insert("query", Bson::Document(filter.unwrap()));
}
let read_pref = opts.read_preference.unwrap_or(self.read_preference.to_owned());
let result = try!(self.db.command(spec, CommandType::Distinct, Some(read_pref)));
match result.get("values") {
Some(&Bson::Array(ref vals)) => Ok(vals.to_owned()),
_ => Err(ResponseError("No values received from server.".to_owned()))
}
}
pub fn find(&self, filter: Option<bson::Document>,
options: Option<FindOptions>) -> Result<Cursor> {
self.find_with_command_type(filter, options, CommandType::Find)
}
fn find_with_command_type(&self, filter: Option<bson::Document>,
options: Option<FindOptions>,
cmd_type: CommandType) -> Result<Cursor> {
let options = options.unwrap_or(FindOptions::new());
let flags = OpQueryFlags::with_find_options(&options);
let doc = if options.sort.is_some() {
let mut doc = bson::Document::new();
doc.insert("$query".to_owned(),
Bson::Document(filter.unwrap_or(bson::Document::new())));
doc.insert("$orderby".to_owned(),
Bson::Document(options.sort.as_ref().unwrap().clone()));
doc
} else {
filter.unwrap_or(bson::Document::new())
};
let read_pref = options.read_preference.unwrap_or(self.read_preference.to_owned());
Cursor::query(self.db.client.clone(), self.namespace.to_owned(), options.batch_size,
flags, options.skip as i32, options.limit, doc,
options.projection.clone(), cmd_type, false, read_pref)
}
pub fn find_one(&self, filter: Option<bson::Document>,
options: Option<FindOptions>) -> Result<Option<bson::Document>> {
self.find_one_with_command_type(filter, options, CommandType::Find)
}
pub fn find_one_with_command_type(&self, filter: Option<bson::Document>,
options: Option<FindOptions>,
cmd_type: CommandType) -> Result<Option<bson::Document>> {
let options = options.unwrap_or(FindOptions::new());
let mut cursor = try!(self.find_with_command_type(filter, Some(options.with_limit(1)),
cmd_type));
match cursor.next() {
Some(Ok(bson)) => Ok(Some(bson)),
Some(Err(err)) => Err(err),
None => Ok(None)
}
}
fn find_and_modify(&self, cmd: &mut bson::Document,
filter: bson::Document, _max_time_ms: Option<i64>,
projection: Option<bson::Document>,
sort: Option<bson::Document>,
write_concern: Option<WriteConcern>, cmd_type: CommandType)
-> Result<Option<bson::Document>> {
let wc = write_concern.unwrap_or(self.write_concern.clone());
let mut new_cmd = bson::Document::new();
new_cmd.insert("findAndModify", Bson::String(self.name()));
new_cmd.insert("query", Bson::Document(filter));
new_cmd.insert("writeConcern", Bson::Document(wc.to_bson()));
if sort.is_some() {
new_cmd.insert("sort", Bson::Document(sort.unwrap()));
}
if projection.is_some() {
new_cmd.insert("fields", Bson::Document(projection.unwrap()));
}
for (key, val) in cmd.iter() {
new_cmd.insert(key.to_owned(), val.to_owned());
}
let res = try!(self.db.command(new_cmd, cmd_type, None));
try!(WriteException::validate_write_result(res.clone(), wc));
let doc = match res.get("value") {
Some(&Bson::Document(ref nested_doc)) => Some(nested_doc.to_owned()),
_ => None,
};
Ok(doc)
}
fn find_one_and_replace_or_update(&self, filter: bson::Document, update: bson::Document,
after: bool, max_time_ms: Option<i64>,
projection: Option<bson::Document>,
sort: Option<bson::Document>, upsert: bool, write_concern:
Option<WriteConcern>, cmd_type: CommandType) -> Result<Option<bson::Document>> {
let mut cmd = bson::Document::new();
cmd.insert("update", Bson::Document(update));
if after {
cmd.insert("new", Bson::Boolean(true));
}
if upsert {
cmd.insert("upsert", Bson::Boolean(true));
}
self.find_and_modify(&mut cmd, filter, max_time_ms, projection, sort, write_concern,
cmd_type)
}
pub fn find_one_and_delete(&self, filter: bson::Document,
options: Option<FindOneAndDeleteOptions>) -> Result<Option<bson::Document>> {
let opts = options.unwrap_or(FindOneAndDeleteOptions::new());
let mut cmd = bson::Document::new();
cmd.insert("remove", Bson::Boolean(true));
self.find_and_modify(&mut cmd, filter, opts.max_time_ms,
opts.projection, opts.sort, opts.write_concern,
CommandType::FindOneAndDelete)
}
pub fn find_one_and_replace(&self, filter: bson::Document, replacement: bson::Document,
options: Option<FindOneAndUpdateOptions>) -> Result<Option<bson::Document>> {
let opts = options.unwrap_or(FindOneAndUpdateOptions::new());
try!(Collection::validate_replace(&replacement));
self.find_one_and_replace_or_update(filter, replacement, opts.return_document.to_bool(),
opts.max_time_ms, opts.projection, opts.sort,
opts.upsert, opts.write_concern,
CommandType::FindOneAndReplace)
}
pub fn find_one_and_update(&self, filter: bson::Document, update: bson::Document,
options: Option<FindOneAndUpdateOptions>) -> Result<Option<bson::Document>> {
let opts = options.unwrap_or(FindOneAndUpdateOptions::new());
try!(Collection::validate_update(&update));
self.find_one_and_replace_or_update(filter, update, opts.return_document.to_bool(),
opts.max_time_ms, opts.projection, opts.sort,
opts.upsert, opts.write_concern,
CommandType::FindOneAndUpdate)
}
fn get_unordered_batches(requests: Vec<WriteModel>) -> Vec<Batch> {
let mut inserts = vec![];
let mut deletes = vec![];
let mut updates = vec![];
for req in requests {
match req {
WriteModel::InsertOne { document } => inserts.push(document),
WriteModel::DeleteOne { filter } =>
deletes.push(DeleteModel { filter: filter, multi: false }),
WriteModel::DeleteMany { filter } =>
deletes.push(DeleteModel { filter: filter, multi: true }),
WriteModel::ReplaceOne { filter, replacement, upsert } =>
updates.push(UpdateModel { filter: filter,
update: replacement,
upsert: upsert, multi: false,
is_replace: true }),
WriteModel::UpdateOne { filter, update, upsert } =>
updates.push(UpdateModel { filter: filter,
update: update, upsert: upsert,
multi: false, is_replace: false }),
WriteModel::UpdateMany { filter, update, upsert } =>
updates.push(UpdateModel { filter: filter,
update: update, upsert: upsert,
multi: true, is_replace: false }),
}
}
vec![
Batch::Insert(inserts),
Batch::Delete(deletes),
Batch::Update(updates),
]
}
pub fn get_ordered_batches(mut requests: VecDeque<WriteModel>) -> Vec<Batch> {
let first_model = match requests.pop_front() {
Some(model) => model,
None => return vec![]
};
let mut batches = vec![Batch::from(first_model)];
for model in requests {
let last_index = batches.len() - 1;
match batches[last_index].merge_model(model) {
Some(model) => batches.push(Batch::from(model)),
None => ()
}
}
batches
}
fn execute_insert_batch(&self, documents: Vec<bson::Document>,
start_index: i64, ordered: bool,
result: &mut BulkWriteResult,
exception: &mut BulkWriteException) -> bool {
let models = documents.iter().map(|doc| {
WriteModel::InsertOne { document: doc.clone() }
}).collect();
let options = Some(InsertManyOptions::new(ordered, None));
match self.insert_many(documents, options) {
Ok(insert_result) =>
result.process_insert_many_result(insert_result, models,
start_index, exception),
Err(_) => {
exception.add_unproccessed_models(models);
false
}
}
}
fn execute_delete_batch(&self, models: Vec<DeleteModel>, ordered: bool,
result: &mut BulkWriteResult,
exception: &mut BulkWriteException) -> bool {
let original_models = models.iter().map(|model| {
if model.multi {
WriteModel::DeleteMany { filter: model.filter.clone() }
} else {
WriteModel::DeleteOne { filter: model.filter.clone() }
}
}).collect();
match self.bulk_delete(models, ordered, None, CommandType::DeleteMany) {
Ok(bulk_delete_result) =>
result.process_bulk_delete_result(bulk_delete_result,
original_models, exception),
Err(_) => {
exception.add_unproccessed_models(original_models);
false
}
}
}
fn execute_update_batch(&self, models: Vec<UpdateModel>, start_index: i64,
ordered: bool, result: &mut BulkWriteResult,
exception: &mut BulkWriteException) -> bool{
let original_models = models.iter().map(|model| {
if model.multi {
WriteModel::DeleteMany { filter: model.filter.clone() }
} else {
WriteModel::DeleteOne { filter: model.filter.clone() }
}
}).collect();
match self.bulk_update(models, ordered, None, CommandType::UpdateMany) {
Ok(bulk_update_result) =>
result.process_bulk_update_result(bulk_update_result,
original_models, start_index,
exception),
Err(_) => {
exception.add_unproccessed_models(original_models);
false
}
}
}
fn execute_batch(&self, batch: Batch, start_index: i64, ordered: bool,
result: &mut BulkWriteResult,
exception: &mut BulkWriteException) -> bool {
match batch {
Batch::Insert(docs) =>
self.execute_insert_batch(docs, start_index, ordered, result,
exception),
Batch::Delete(models) =>
self.execute_delete_batch(models, ordered, result,
exception),
Batch::Update(models) =>
self.execute_update_batch(models, start_index, ordered, result,
exception),
}
}
pub fn bulk_write(&self, requests: Vec<WriteModel>,
ordered: bool) -> BulkWriteResult {
let batches = if ordered {
Collection::get_ordered_batches(VecDeque::from_iter(requests.into_iter()))
} else {
Collection::get_unordered_batches(requests)
};
let mut result = BulkWriteResult::new();
let mut exception = BulkWriteException::new(vec![], vec![], vec![], None);
let mut start_index = 0;
for batch in batches {
let length = batch.len();
let success = self.execute_batch(batch, start_index, ordered,
&mut result, &mut exception);
if !success && ordered {
break;
}
start_index += length;
}
if exception.unprocessed_requests.is_empty() {
result.bulk_write_exception = Some(exception);
}
result
}
fn insert(&self, docs: Vec<bson::Document>, ordered: bool,
write_concern: Option<WriteConcern>,
cmd_type: CommandType) -> Result<(Vec<Bson>, Option<BulkWriteException>)> {
let wc = write_concern.unwrap_or(self.write_concern.clone());
let mut converted_docs = Vec::new();
let mut ids = Vec::new();
for doc in &docs {
let mut cdoc = doc.to_owned();
match doc.get("_id") {
Some(id) => ids.push(id.clone()),
None => {
let id = Bson::ObjectId(try!(oid::ObjectId::new()));
cdoc.insert("_id", id.clone());
ids.push(id);
}
}
converted_docs.push(Bson::Document(cdoc));
}
let mut cmd = bson::Document::new();
cmd.insert("insert", Bson::String(self.name()));
cmd.insert("documents", Bson::Array(converted_docs));
cmd.insert("ordered", Bson::Boolean(ordered));
cmd.insert("writeConcern", Bson::Document(wc.to_bson()));
let result = try!(self.db.command(cmd, cmd_type, None));
let exception_res = BulkWriteException::validate_bulk_write_result(result.clone(), wc);
let exception = match exception_res {
Ok(()) => None,
Err(BulkWriteError(err)) => Some(err),
Err(e) => return Err(e),
};
Ok((ids, exception))
}
pub fn insert_one(&self, doc: bson::Document,
write_concern: Option<WriteConcern>) -> Result<InsertOneResult> {
let (ids, bulk_exception) = try!(self.insert(vec!(doc), true, write_concern.clone(),
CommandType::InsertOne));
if ids.is_empty() {
return Err(OperationError("No ids returned for insert_one.".to_owned()));
}
let exception = match bulk_exception {
Some(e) => Some(WriteException::with_bulk_exception(e)),
None => None,
};
let id = match exception {
Some(ref exc) => match exc.write_error {
Some(_) => None,
None => Some(ids[0].to_owned()),
},
None => Some(ids[0].to_owned()),
};
Ok(InsertOneResult::new(id, exception))
}
pub fn insert_many(&self, docs: Vec<bson::Document>, options: Option<InsertManyOptions>) -> Result<InsertManyResult> {
let options = options.unwrap_or(InsertManyOptions::new(false, None));
let (ids, exception) = try!(self.insert(docs, options.ordered, options.write_concern,
CommandType::InsertMany));
let mut map = BTreeMap::new();
for i in 0..ids.len() {
map.insert(i as i64, ids.get(i).unwrap().to_owned());
}
if let Some(ref exc) = exception {
for error in &exc.write_errors {
map.remove(&(error.index as i64));
}
}
Ok(InsertManyResult::new(Some(map), exception))
}
fn bulk_delete(&self, models: Vec<DeleteModel>, ordered: bool,
write_concern: Option<WriteConcern>,
cmd_type: CommandType) -> Result<BulkDeleteResult> {
let wc = write_concern.unwrap_or(self.write_concern.clone());
let mut deletes = Vec::new();
for model in models {
let mut delete = bson::Document::new();
delete.insert("q", Bson::Document(model.filter));
let limit = if model.multi { 0 } else { 1 };
delete.insert("limit", Bson::I64(limit));
deletes.push(Bson::Document(delete));
}
let mut cmd = bson::Document::new();
cmd.insert("delete", Bson::String(self.name()));
cmd.insert("deletes", Bson::Array(deletes));
if !ordered {
cmd.insert("ordered", Bson::Boolean(ordered));
}
cmd.insert("writeConcern", Bson::Document(wc.to_bson()));
let result = try!(self.db.command(cmd, cmd_type, None));
let exception_res = BulkWriteException::validate_bulk_write_result(result.clone(), wc);
let exception = match exception_res {
Ok(()) => None,
Err(BulkWriteError(err)) => Some(err),
Err(e) => return Err(e),
};
Ok(BulkDeleteResult::new(result, exception))
}
fn delete(&self, filter: bson::Document, multi: bool,
write_concern: Option<WriteConcern>) -> Result<DeleteResult> {
let cmd_type = if multi {
CommandType::DeleteMany
} else {
CommandType::DeleteOne
};
let result = try!(self.bulk_delete(vec![DeleteModel::new(filter, multi)],
true, write_concern, cmd_type));
Ok(DeleteResult::with_bulk_result(result))
}
pub fn delete_one(&self, filter: bson::Document,
write_concern: Option<WriteConcern>) -> Result<DeleteResult> {
self.delete(filter, false, write_concern)
}
pub fn delete_many(&self, filter: bson::Document,
write_concern: Option<WriteConcern>) -> Result<DeleteResult> {
self.delete(filter, true, write_concern)
}
fn bulk_update(&self, models: Vec<UpdateModel>, ordered: bool,
write_concern: Option<WriteConcern>,
cmd_type: CommandType) -> Result<BulkUpdateResult> {
let wc = write_concern.unwrap_or(self.write_concern.clone());
let mut updates = Vec::new();
for model in models {
let mut update = bson::Document::new();
update.insert("q", Bson::Document(model.filter));
update.insert("u", Bson::Document(model.update));
update.insert("upsert", Bson::Boolean(model.upsert));
if !ordered {
update.insert("ordered", Bson::Boolean(ordered));
}
if model.multi {
update.insert("multi", Bson::Boolean(model.multi));
}
updates.push(Bson::Document(update));
}
let mut cmd = bson::Document::new();
cmd.insert("update", Bson::String(self.name()));
cmd.insert("updates", Bson::Array(updates));
cmd.insert("writeConcern", Bson::Document(wc.to_bson()));
let result = try!(self.db.command(cmd, cmd_type, None));
let exception_res = BulkWriteException::validate_bulk_write_result(result.clone(), wc);
let exception = match exception_res {
Ok(()) => None,
Err(BulkWriteError(err)) => Some(err),
Err(e) => return Err(e),
};
Ok(BulkUpdateResult::new(result, exception))
}
fn update(&self, filter: bson::Document, update: bson::Document,
upsert: bool, multi: bool,
write_concern: Option<WriteConcern>) -> Result<UpdateResult> {
let cmd_type = if multi {
CommandType::UpdateMany
} else {
CommandType::UpdateOne
};
let result = try!(self.bulk_update(vec![UpdateModel::new(filter, update, upsert, multi)],
true, write_concern, cmd_type));
Ok(UpdateResult::with_bulk_result(result))
}
pub fn replace_one(&self, filter: bson::Document, replacement: bson::Document,
options: Option<ReplaceOptions>) -> Result<UpdateResult> {
let options = options.unwrap_or(ReplaceOptions::new(false, None));
let _ = try!(Collection::validate_replace(&replacement));
self.update(filter, replacement, options.upsert, false, options.write_concern)
}
pub fn update_one(&self, filter: bson::Document, update: bson::Document, options: Option<UpdateOptions>) -> Result<UpdateResult> {
let options = options.unwrap_or(UpdateOptions::new(false, None));
let _ = try!(Collection::validate_update(&update));
self.update(filter, update, options.upsert, false, options.write_concern)
}
pub fn update_many(&self, filter: bson::Document, update: bson::Document, options: Option<UpdateOptions>) -> Result<UpdateResult> {
let options = options.unwrap_or(UpdateOptions::new(false, None));
let _ = try!(Collection::validate_update(&update));
self.update(filter, update, options.upsert, true, options.write_concern)
}
fn validate_replace(replacement: &bson::Document) -> Result<()> {
for key in replacement.keys() {
if key.starts_with("$") {
return Err(ArgumentError("Replacement cannot include $ operators.".to_owned()));
}
}
Ok(())
}
fn validate_update(update: &bson::Document) -> Result<()> {
for key in update.keys() {
if !key.starts_with("$") {
return Err(ArgumentError("Update only works with $ operators.".to_owned()));
}
}
Ok(())
}
pub fn create_index(&self, keys: bson::Document, options: Option<IndexOptions>) -> Result<String> {
let model = IndexModel::new(keys, options);
self.create_index_model(model)
}
pub fn create_index_model(&self, model: IndexModel) -> Result<String> {
let result = try!(self.create_indexes(vec!(model)));
Ok(result[0].to_owned())
}
pub fn create_indexes(&self, models: Vec<IndexModel>) -> Result<Vec<String>> {
let mut names = Vec::with_capacity(models.len());
let mut indexes = Vec::with_capacity(models.len());
for model in models {
names.push(try!(model.name()));
indexes.push(Bson::Document(try!(model.to_bson())));
}
let mut cmd = bson::Document::new();
cmd.insert("createIndexes", Bson::String(self.name()));
cmd.insert("indexes", Bson::Array(indexes));
let result = try!(self.db.command(cmd, CommandType::CreateIndexes, None));
match result.get("errmsg") {
Some(&Bson::String(ref msg)) => Err(OperationError(msg.to_owned())),
_ => Ok(names),
}
}
pub fn drop_index(&self, keys: bson::Document, options: Option<IndexOptions>) -> Result<()> {
let model = IndexModel::new(keys, options);
self.drop_index_model(model)
}
pub fn drop_index_string(&self, name: String) -> Result<()> {
let mut opts = IndexOptions::new();
opts.name = Some(name.to_owned());
let model = IndexModel::new(bson::Document::new(), Some(opts));
self.drop_index_model(model)
}
pub fn drop_index_model(&self, model: IndexModel) -> Result<()> {
let mut cmd = bson::Document::new();
cmd.insert("dropIndexes", Bson::String(self.name()));
cmd.insert("index", Bson::String(try!(model.name())));
let result = try!(self.db.command(cmd, CommandType::DropIndexes, None));
match result.get("errmsg") {
Some(&Bson::String(ref msg)) => Err(OperationError(msg.to_owned())),
_ => Ok(()),
}
}
pub fn drop_indexes(&self) -> Result<()> {
let mut opts = IndexOptions::new();
opts.name = Some("*".to_owned());
let model = IndexModel::new(bson::Document::new(), Some(opts));
self.drop_index_model(model)
}
pub fn list_indexes(&self) -> Result<Cursor> {
let mut cmd = bson::Document::new();
cmd.insert("listIndexes", Bson::String(self.name()));
self.db.command_cursor(cmd, CommandType::ListIndexes, self.read_preference.to_owned())
}
}