mod batch;
pub mod options;
use std::{fmt, fmt::Debug, sync::Arc};
use futures::StreamExt;
use serde::{
de::{DeserializeOwned, Error},
Deserialize,
Deserializer,
Serialize,
};
use self::options::*;
use crate::{
bson::{doc, ser, to_document, Bson, Document},
bson_util,
concern::{ReadConcern, WriteConcern},
error::{convert_bulk_errors, BulkWriteError, BulkWriteFailure, ErrorKind, Result},
operation::{
Aggregate,
Count,
CountDocuments,
Delete,
Distinct,
DropCollection,
Find,
FindAndModify,
Insert,
Update,
},
results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult},
selection_criteria::SelectionCriteria,
Client,
Cursor,
Database,
};
const MAX_INSERT_DOCS_BYTES: usize = 16 * 1000 * 1000;
#[derive(Debug, Clone)]
pub struct Collection<T = Document>
where
T: Serialize + DeserializeOwned + Unpin + Debug,
{
inner: Arc<CollectionInner>,
_phantom: std::marker::PhantomData<T>,
}
#[derive(Debug)]
struct CollectionInner {
client: Client,
db: Database,
name: String,
selection_criteria: Option<SelectionCriteria>,
read_concern: Option<ReadConcern>,
write_concern: Option<WriteConcern>,
}
impl<T> Collection<T>
where
T: Serialize + DeserializeOwned + Unpin + Debug,
{
pub(crate) fn new(db: Database, name: &str, options: Option<CollectionOptions>) -> Self {
let options = options.unwrap_or_default();
let selection_criteria = options
.selection_criteria
.or_else(|| db.selection_criteria().cloned());
let read_concern = options.read_concern.or_else(|| db.read_concern().cloned());
let write_concern = options
.write_concern
.or_else(|| db.write_concern().cloned());
Self {
inner: Arc::new(CollectionInner {
client: db.client().clone(),
db,
name: name.to_string(),
selection_criteria,
read_concern,
write_concern,
}),
_phantom: Default::default(),
}
}
pub fn clone_with_type<U: Serialize + DeserializeOwned + Unpin + Debug>(
&self,
) -> Collection<U> {
let mut options = CollectionOptions::builder().build();
options.selection_criteria = self.inner.selection_criteria.clone();
options.read_concern = self.inner.read_concern.clone();
options.write_concern = self.inner.write_concern.clone();
Collection::new(self.inner.db.clone(), &self.inner.name, Some(options))
}
fn client(&self) -> &Client {
&self.inner.client
}
pub fn name(&self) -> &str {
&self.inner.name
}
pub fn namespace(&self) -> Namespace {
Namespace {
db: self.inner.db.name().into(),
coll: self.name().into(),
}
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.write_concern.as_ref()
}
pub async fn drop(&self, options: impl Into<Option<DropCollectionOptions>>) -> Result<()> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let drop = DropCollection::new(self.namespace(), options);
self.client().execute_operation(drop).await
}
pub async fn aggregate(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<AggregateOptions>>,
) -> Result<Cursor> {
let mut options = options.into();
resolve_options!(
self,
options,
[read_concern, write_concern, selection_criteria]
);
let aggregate = Aggregate::new(self.namespace(), pipeline, options);
let client = self.client();
client
.execute_cursor_operation(aggregate)
.await
.map(|(spec, session)| Cursor::new(client.clone(), spec, session))
}
pub async fn estimated_document_count(
&self,
options: impl Into<Option<EstimatedDocumentCountOptions>>,
) -> Result<i64> {
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
let op = Count::new(self.namespace(), options);
self.client().execute_operation(op).await
}
pub async fn count_documents(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<CountOptions>>,
) -> Result<i64> {
let options = options.into();
let filter = filter.into();
let op = CountDocuments::new(self.namespace(), filter, options);
self.client().execute_operation(op).await
}
pub async fn delete_many(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
) -> Result<DeleteResult> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let delete = Delete::new(self.namespace(), query, None, options);
self.client().execute_operation(delete).await
}
pub async fn delete_one(
&self,
query: Document,
options: impl Into<Option<DeleteOptions>>,
) -> Result<DeleteResult> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let delete = Delete::new(self.namespace(), query, Some(1), options);
self.client().execute_operation(delete).await
}
pub async fn distinct(
&self,
field_name: &str,
filter: impl Into<Option<Document>>,
options: impl Into<Option<DistinctOptions>>,
) -> Result<Vec<Bson>> {
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
let op = Distinct::new(
self.namespace(),
field_name.to_string(),
filter.into(),
options,
);
self.client().execute_operation(op).await
}
pub async fn find(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOptions>>,
) -> Result<Cursor<T>> {
let find = Find::new(self.namespace(), filter.into(), options.into());
let client = self.client();
client
.execute_cursor_operation(find)
.await
.map(|(result, session)| Cursor::new(client.clone(), result, session))
}
pub async fn find_one(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<FindOneOptions>>,
) -> Result<Option<T>> {
let mut options: FindOptions = options
.into()
.map(Into::into)
.unwrap_or_else(Default::default);
options.limit = Some(-1);
let mut cursor = self.find(filter, Some(options)).await?;
cursor.next().await.transpose()
}
pub async fn find_one_and_delete(
&self,
filter: Document,
options: impl Into<Option<FindOneAndDeleteOptions>>,
) -> Result<Option<T>> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let op = FindAndModify::<T>::with_delete(self.namespace(), filter, options);
self.client().execute_operation(op).await
}
pub async fn find_one_and_replace(
&self,
filter: Document,
replacement: T,
options: impl Into<Option<FindOneAndReplaceOptions>>,
) -> Result<Option<T>> {
let replacement = to_document(&replacement)?;
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let op = FindAndModify::<T>::with_replace(self.namespace(), filter, replacement, options)?;
self.client().execute_operation(op).await
}
pub async fn find_one_and_update(
&self,
filter: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<FindOneAndUpdateOptions>>,
) -> Result<Option<T>> {
let update = update.into();
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let op = FindAndModify::<T>::with_update(self.namespace(), filter, update, options)?;
self.client().execute_operation(op).await
}
pub async fn insert_many(
&self,
docs: impl IntoIterator<Item = T>,
options: impl Into<Option<InsertManyOptions>>,
) -> Result<InsertManyResult> {
let docs: ser::Result<Vec<Document>> = docs
.into_iter()
.map(|doc| bson::to_document(&doc))
.collect();
let mut docs: Vec<Document> = docs?;
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
if docs.is_empty() {
return Err(ErrorKind::ArgumentError {
message: "No documents provided to insert_many".to_string(),
}
.into());
}
let ordered = options.as_ref().and_then(|o| o.ordered).unwrap_or(true);
let mut cumulative_failure: Option<BulkWriteFailure> = None;
let mut cumulative_result: Option<InsertManyResult> = None;
let mut n_attempted = 0;
while !docs.is_empty() {
let mut remaining_docs =
batch::split_off_batch(&mut docs, MAX_INSERT_DOCS_BYTES, bson_util::doc_size_bytes);
std::mem::swap(&mut docs, &mut remaining_docs);
let current_batch = remaining_docs;
let current_batch_size = current_batch.len();
n_attempted += current_batch_size;
let insert = Insert::new(self.namespace(), current_batch, options.clone());
match self.client().execute_operation(insert).await {
Ok(result) => {
if cumulative_failure.is_none() {
let cumulative_result =
cumulative_result.get_or_insert_with(InsertManyResult::new);
for (index, id) in result.inserted_ids {
cumulative_result
.inserted_ids
.insert(index + n_attempted - current_batch_size, id);
}
}
}
Err(e) => match e.kind.as_ref() {
ErrorKind::BulkWriteError(failure) => {
let failure_ref =
cumulative_failure.get_or_insert_with(BulkWriteFailure::new);
if let Some(ref write_errors) = failure.write_errors {
failure_ref
.write_errors
.get_or_insert_with(Default::default)
.extend(write_errors.iter().map(|error| BulkWriteError {
index: error.index + n_attempted - current_batch_size,
..error.clone()
}));
}
if let Some(ref write_concern_error) = failure.write_concern_error {
failure_ref.write_concern_error = Some(write_concern_error.clone());
}
if ordered {
return Err(ErrorKind::BulkWriteError(
cumulative_failure.unwrap_or_else(BulkWriteFailure::new),
)
.into());
}
}
_ => return Err(e),
},
}
}
match cumulative_failure {
Some(failure) => Err(ErrorKind::BulkWriteError(failure).into()),
None => Ok(cumulative_result.unwrap_or_else(InsertManyResult::new)),
}
}
pub async fn insert_one(
&self,
doc: T,
options: impl Into<Option<InsertOneOptions>>,
) -> Result<InsertOneResult> {
let doc = to_document(&doc)?;
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let insert = Insert::new(
self.namespace(),
vec![doc],
options.map(InsertManyOptions::from_insert_one_options),
);
self.client()
.execute_operation(insert)
.await
.map(InsertOneResult::from_insert_many_result)
.map_err(convert_bulk_errors)
}
pub async fn replace_one(
&self,
query: Document,
replacement: T,
options: impl Into<Option<ReplaceOptions>>,
) -> Result<UpdateResult> {
let replacement = to_document(&replacement)?;
bson_util::replacement_document_check(&replacement)?;
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let update = Update::new(
self.namespace(),
query,
UpdateModifications::Document(replacement),
false,
options.map(UpdateOptions::from_replace_options),
);
self.client().execute_operation(update).await
}
pub async fn update_many(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
) -> Result<UpdateResult> {
let update = update.into();
let mut options = options.into();
if let UpdateModifications::Document(ref d) = update {
bson_util::update_document_check(d)?;
};
resolve_options!(self, options, [write_concern]);
let update = Update::new(self.namespace(), query, update, true, options);
self.client().execute_operation(update).await
}
pub async fn update_one(
&self,
query: Document,
update: impl Into<UpdateModifications>,
options: impl Into<Option<UpdateOptions>>,
) -> Result<UpdateResult> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let update = Update::new(self.namespace(), query, update.into(), false, options);
self.client().execute_operation(update).await
}
pub(super) async fn kill_cursor(&self, cursor_id: i64) -> Result<()> {
let ns = self.namespace();
self.client()
.database(ns.db.as_str())
.run_command(
doc! {
"killCursors": ns.coll.as_str(),
"cursors": [cursor_id]
},
None,
)
.await?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Namespace {
pub db: String,
pub coll: String,
}
impl Namespace {
#[cfg(test)]
pub(crate) fn empty() -> Self {
Self {
db: String::new(),
coll: String::new(),
}
}
}
impl fmt::Display for Namespace {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}.{}", self.db, self.coll)
}
}
impl<'de> Deserialize<'de> for Namespace {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let mut parts = s.split('.');
let db = parts.next();
let coll = parts.collect::<Vec<_>>().join(".");
match (db, coll) {
(Some(db), coll) if !coll.is_empty() => Ok(Self {
db: db.to_string(),
coll,
}),
_ => Err(D::Error::custom("Missing one or more fields in namespace")),
}
}
}