use std::borrow::Borrow;
use std::marker::PhantomData;
use std::any::TypeId;
use std::cmp::Ordering;
use std::iter::FromIterator;
use std::collections::BTreeMap;
use std::result::Result as StdResult;
use std::hash::{ Hash, Hasher };
use std::fmt::{ Debug, Formatter, Result as FmtResult };
use serde::Deserialize;
use bson::{ Bson, Document, from_bson };
use mongodb::coll::options::{
UpdateOptions,
FindOneAndDeleteOptions,
FindOneAndUpdateOptions,
ReturnDocument,
};
use mongodb::coll::results::UpdateResult;
use typemap::Key;
use crate::{
cursor::Cursor,
doc::Doc,
uid::Uid,
ops::*,
bsn::*,
utils::*,
error::{ Error, ErrorKind::{ MissingId, BsonDecoding }, Result, ResultExt },
};
pub struct Collection<T: Doc> {
inner: mongodb::coll::Collection,
_marker: PhantomData<T>,
}
impl<T: Doc> Collection<T> {
pub fn create_indexes(&self) -> Result<()> {
let indexes = T::indexes();
if indexes.is_empty() {
Ok(())
} else {
self.inner
.create_indexes(indexes)
.map(drop)
.chain(|| format!("can't create indexes on {}", T::NAME))
}
}
pub fn drop(&self) -> Result<()> {
self.inner.drop().map_err(Into::into)
}
pub fn count<Q: Count<T>>(&self, query: Q) -> Result<usize> {
self.inner
.count(query.filter().into(), query.options().into())
.chain(|| format!("error in {}::count({:#?})", T::NAME, query))
.and_then(|n| int_to_usize_with_msg(n, "# of counted documents"))
}
pub fn distinct<Q, C>(&self, query: Q) -> Result<C>
where Q: Distinct<T>,
C: FromIterator<Q::Output>,
{
self.inner
.distinct(Q::FIELD, query.filter().into(), query.options().into())
.chain(|| format!("error in {}::distinct({:#?})", T::NAME, query))
.and_then(|values| {
values
.into_iter()
.map(|b| from_bson(Q::transform(b)?).chain(|| format!(
"can't deserialize {}::{}", T::NAME, Q::FIELD
)))
.collect()
})
}
pub fn aggregate<P: Pipeline<T>>(&self, pipeline: P) -> Result<Cursor<P::Output>> {
self.inner
.aggregate(pipeline.stages(), pipeline.options().into())
.chain(|| format!("error in {}::aggregate({:#?})", T::NAME, pipeline))
.map(|crs| Cursor::from_cursor_and_transform(crs, P::transform))
}
pub fn find_one<Q: Query<T>>(&self, query: Q) -> Result<Option<Q::Output>> {
self.inner
.find_one(query.filter().into(), query.options().into())
.chain(|| format!("error in {}::find_one({:#?})", T::NAME, query))
.and_then(|opt| opt.map_or(Ok(None), |doc| {
let transformed = Q::transform(doc)?;
from_bson(transformed).map_err(From::from)
}))
}
pub fn find_many<Q: Query<T>>(&self, query: Q) -> Result<Cursor<Q::Output>> {
self.inner
.find(query.filter().into(), query.options().into())
.chain(|| format!("error in {}::find_many({:#?})", T::NAME, query))
.map(|crs| Cursor::from_cursor_and_transform(crs, Q::transform))
}
pub fn insert_one(&self, entity: &T) -> Result<Uid<T>> {
let doc = serialize_document(entity)?;
let write_concern = T::insert_options().write_concern;
let message = || format!("error in {}::insert_one()", T::NAME);
self.inner
.insert_one(doc, write_concern)
.chain(&message)
.and_then(|result| {
if let Some(error) = result.write_exception {
Err(Error::with_cause(message(), error))
} else if let Some(id) = result.inserted_id {
from_bson(id).chain(
|| format!("can't deserialize ID for {}", T::NAME)
)
} else {
Err(Error::new(MissingId, message() + ": missing `inserted_id`"))
}
})
}
pub fn insert_many<I>(&self, entities: I) -> Result<BTreeMap<u64, Uid<T>>>
where I: IntoIterator,
I::Item: Borrow<T>,
I::IntoIter: ExactSizeIterator,
T::Id: Clone + Debug,
T: 'static,
{
let values = entities.into_iter();
let n_docs = values.len();
let docs = serialize_documents(values)?;
let options = T::insert_options();
let message = || format!("error in {}::insert_many()", T::NAME);
if n_docs == 0 {
return Ok(BTreeMap::new());
}
self.inner
.insert_many(docs, options.into())
.chain(&message)
.and_then(|result| {
let ids: BTreeMap<_, _> = result.inserted_ids
.unwrap_or_default()
.into_iter()
.map(|(i, id)| {
assert!(i >= 0, "negative index {} for id {}", i, id);
(i as u64, from_bson(id.clone()).map_err(|_| id))
})
.collect();
if let Some(error) = result.bulk_write_exception {
Err(Error::with_cause(message(), error)
.with_context::<InsertManyErrorContext<T>>(ids))
} else if ids.len() == n_docs {
let ids_res: StdResult<BTreeMap<_, _>, _> = ids
.clone()
.into_iter()
.map(|(i, res)| res.map(|id| (i, id)))
.collect();
ids_res.map_err(|_| Error::new(
BsonDecoding,
format!("{}: can't deserialize some IDs", message())
).with_context::<InsertManyErrorContext<T>>(
ids
))
} else {
let msg = format!("{}: {} documents given, but {} IDs returned",
message(), n_docs, ids.len());
Err(Error::new(MissingId, msg)
.with_context::<InsertManyErrorContext<T>>(ids))
}
})
}
pub fn replace_entity(&self, entity: &T) -> Result<UpdateOneResult> where T: Debug {
self.update_entity_internal(entity, false)
.and_then(UpdateOneResult::from_raw)
}
pub fn upsert_entity(&self, entity: &T) -> Result<UpsertOneResult<Uid<T>>> where T: Debug {
self.update_entity_internal(entity, true)
.and_then(UpsertOneResult::from_raw)
}
fn update_entity_internal(&self, entity: &T, upsert: bool) -> Result<UpdateResult>
where T: Debug
{
let mut document = serialize_document(entity)?;
let id = document.remove("_id").ok_or_else(
|| Error::new(MissingId, format!("No `_id` in entity of type {}", T::NAME))
)?;
let filter = doc!{ "_id": id };
let options = UpdateOptions {
upsert: upsert.into(),
write_concern: T::update_options().into(),
};
let message = || format!("error in {}::{}_entity({:#?})",
T::NAME,
if upsert { "upsert" } else { "replace" },
entity);
self.inner
.replace_one(filter, document, options.into())
.chain(&message)
.and_then(|result| {
if let Some(error) = result.write_exception {
Err(Error::with_cause(message(), error))
} else {
Ok(result)
}
})
}
pub fn update_one<U: Update<T>>(&self, update: U) -> Result<UpdateOneResult> {
let filter = update.filter();
let change = update.update();
let options = UpdateOptions {
upsert: Some(false),
write_concern: update.options().into(),
};
let message = || format!("error in {}::update_one({:#?})", T::NAME, update);
self.update_one_internal(filter, change, options, &message)
.and_then(UpdateOneResult::from_raw)
}
pub fn upsert_one<U: Upsert<T>>(&self, upsert: U) -> Result<UpsertOneResult<Uid<T>>> {
let filter = upsert.filter();
let change = upsert.upsert();
let options = UpdateOptions {
upsert: Some(true),
write_concern: upsert.options().into(),
};
let message = || format!("error in {}::upsert_one({:#?})", T::NAME, upsert);
self.update_one_internal(filter, change, options, &message)
.and_then(UpsertOneResult::from_raw)
}
fn update_one_internal<F: Copy + FnOnce() -> String>(
&self,
filter: Document,
change: Document,
options: UpdateOptions,
message: F,
) -> Result<UpdateResult> {
self.inner
.update_one(filter, change, options.into())
.chain(message)
.and_then(|result| {
if let Some(error) = result.write_exception {
Err(Error::with_cause(message(), error))
} else {
Ok(result)
}
})
}
pub fn update_many<U: Update<T>>(&self, update: U) -> Result<UpdateManyResult> {
let filter = update.filter();
let change = update.update();
let options = UpdateOptions {
upsert: Some(false),
write_concern: update.options().into(),
};
let message = || format!("error in {}::update_many({:#?})", T::NAME, update);
self.update_many_internal(filter, change, options, &message)
}
pub fn upsert_many<U: Upsert<T>>(&self, upsert: U) -> Result<UpsertManyResult> {
let filter = upsert.filter();
let change = upsert.upsert();
let options = UpdateOptions {
upsert: Some(true),
write_concern: upsert.options().into(),
};
let message = || format!("error in {}::upsert_many({:#?})", T::NAME, upsert);
self.update_many_internal(filter, change, options, &message)
}
fn update_many_internal<F: Copy + FnOnce() -> String>(
&self,
filter: Document,
change: Document,
options: UpdateOptions,
message: F,
) -> Result<UpdateManyResult> {
self.inner
.update_many(filter, change, options.into())
.chain(message)
.and_then(|result| {
if let Some(error) = result.write_exception {
Err(Error::with_cause(message(), error))
} else {
let num_matched = int_to_usize_with_msg(result.matched_count, "# of matched documents")?;
let num_modified = int_to_usize_with_msg(result.modified_count, "# of modified documents")?;
Ok(UpdateManyResult { num_matched, num_modified })
}
})
}
pub fn delete_entity(&self, entity: &T) -> Result<bool> where T: Debug {
let id = entity.id().ok_or_else(
|| Error::new(MissingId, format!("No `_id` in entity of type {}", T::NAME))
)?;
let id_bson = bson::to_bson(id)?;
self.delete_one(doc!{ "_id": id_bson }).chain(
|| format!("error in {}::delete_entity({:#?})", T::NAME, entity)
)
}
pub fn delete_entities<I>(&self, entities: I) -> Result<usize>
where I: IntoIterator,
I::Item: Borrow<T>,
T: Debug,
{
let ids: Vec<_> = entities
.into_iter()
.map(|item| {
let entity = item.borrow();
let id = entity.id().ok_or_else(|| Error::new(
MissingId,
format!("No `_id` in entity to delete: {:#?}", entity)
))?;
bson::to_bson(id).map_err(From::from)
})
.collect::<Result<_>>()?;
let criterion = doc!{
"_id": {
"$in": ids
}
};
self.delete_many(criterion).chain(
|| format!("error in {}::delete_entities(...)", T::NAME)
)
}
pub fn delete_one<Q: Delete<T>>(&self, query: Q) -> Result<bool> {
let message = || format!("error in {}::delete_one({:#?})", T::NAME, query);
self.inner
.delete_one(query.filter(), query.options().into())
.chain(&message)
.and_then(|result| {
if let Some(error) = result.write_exception {
Err(Error::with_cause(message(), error))
} else {
Ok(result.deleted_count > 0)
}
})
}
pub fn delete_many<Q: Delete<T>>(&self, query: Q) -> Result<usize> {
let message = || format!("error in {}::delete_many({:#?})", T::NAME, query);
self.inner
.delete_many(query.filter(), query.options().into())
.chain(&message)
.and_then(|result| {
if let Some(error) = result.write_exception {
Err(Error::with_cause(message(), error))
} else {
int_to_usize_with_msg(result.deleted_count, "# of deleted documents")
}
})
}
pub fn find_one_and_delete<Q: Query<T>>(&self, query: Q) -> Result<Option<Q::Output>> {
let query_options = query.options();
let find_delete_options = FindOneAndDeleteOptions {
max_time_ms: query_options.max_time_ms,
projection: query_options.projection,
sort: query_options.sort,
write_concern: None,
};
self.inner
.find_one_and_delete(query.filter(), find_delete_options.into())
.chain(|| format!(
"error in {}::find_one_and_delete({:#?})", T::NAME, query
))
.and_then(|opt| match opt {
Some(document) => {
let transformed = Q::transform(document)?;
from_bson(transformed).map_err(From::from)
}
None => Ok(None)
})
}
pub fn find_one_and_replace<Q: Query<T>>(&self, query: Q, replacement: &T) -> Result<Option<Q::Output>>
where T: Debug
{
let query_options = query.options();
let find_replace_options = FindOneAndUpdateOptions {
return_document: Some(ReturnDocument::Before),
max_time_ms: query_options.max_time_ms,
projection: query_options.projection,
sort: query_options.sort,
upsert: Some(false),
..Default::default()
};
let filter = query.filter();
let doc = serialize_document(replacement)?;
self.inner
.find_one_and_replace(filter, doc, find_replace_options.into())
.chain(|| format!(
"error in {}::find_one_and_replace({:#?}, {:#?})",
T::NAME, query, replacement
))
.and_then(|opt| match opt {
Some(document) => {
let transformed = Q::transform(document)?;
from_bson(transformed).map_err(From::from)
}
None => Ok(None)
})
}
pub fn find_one_and_update<U: FindAndUpdate<T>>(&self, update: U) -> Result<Option<U::Output>> {
let filter = update.filter();
let change = update.update();
let options = update.options();
self.inner
.find_one_and_update(filter, change, options.into())
.chain(|| format!(
"error in {}::find_one_and_update({:#?})", T::NAME, update
))
.and_then(|opt| match opt {
Some(document) => {
let transformed = U::transform(document)?;
from_bson(transformed).map_err(From::from)
}
None => Ok(None)
})
}
}
impl<T: Doc> Debug for Collection<T> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "Collection<{}>", T::NAME)
}
}
#[doc(hidden)]
impl<T: Doc> From<mongodb::coll::Collection> for Collection<T> {
fn from(collection: mongodb::coll::Collection) -> Self {
Collection {
inner: collection,
_marker: PhantomData,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct UpdateOneResult {
pub matched: bool,
pub modified: bool,
}
impl UpdateOneResult {
fn from_raw(result: UpdateResult) -> Result<Self> {
if let Some(error) = result.write_exception {
Err(Error::with_cause("couldn't perform single update", error))
} else {
Ok(UpdateOneResult {
matched: result.matched_count > 0,
modified: result.modified_count > 0,
})
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct UpsertOneResult<Id> {
pub matched: bool,
pub modified: bool,
pub upserted_id: Option<Id>,
}
impl<Id: for<'a> Deserialize<'a>> UpsertOneResult<Id> {
fn from_raw(result: UpdateResult) -> Result<Self> {
let matched = result.matched_count > 0;
let modified = result.modified_count > 0;
let upserted_id = match result.upserted_id {
Some(bson) => {
let mut doc = bson.try_into_doc()?;
let id_bson = doc.remove("_id").ok_or_else(
|| Error::new(MissingId, "no `_id` found in `WriteResult.upserted`")
)?;
let id = from_bson(id_bson).chain("can't deserialize upserted ID")?;
Some(id)
}
None => None
};
if let Some(error) = result.write_exception {
Err(Error::with_cause("couldn't perform single upsert", error))
} else {
Ok(UpsertOneResult { matched, modified, upserted_id })
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct UpdateManyResult {
pub num_matched: usize,
pub num_modified: usize,
}
pub type UpsertManyResult = UpdateManyResult;
pub struct InsertManyErrorContext<T>(PhantomData<T>);
impl<T> Default for InsertManyErrorContext<T> {
fn default() -> Self {
InsertManyErrorContext(PhantomData)
}
}
impl<T> Clone for InsertManyErrorContext<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for InsertManyErrorContext<T> {}
impl<T: Doc> Debug for InsertManyErrorContext<T> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
write!(f, "InsertManyErrorContext<{}>", T::NAME)
}
}
impl<T> PartialEq for InsertManyErrorContext<T> {
fn eq(&self, _other: &Self) -> bool {
true
}
}
impl<T> Eq for InsertManyErrorContext<T> {}
impl<T> PartialOrd for InsertManyErrorContext<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.cmp(other).into()
}
}
impl<T> Ord for InsertManyErrorContext<T> {
fn cmp(&self, _other: &Self) -> Ordering {
Ordering::Equal
}
}
impl<T: 'static> Hash for InsertManyErrorContext<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
TypeId::of::<Self>().hash(state)
}
}
impl<T: Doc + 'static> Key for InsertManyErrorContext<T> {
type Value = BTreeMap<u64, StdResult<Uid<T>, Bson>>;
}