pub mod options;
use std::sync::Arc;
use futures::stream::TryStreamExt;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
cursor::Cursor,
error::{ErrorKind, Result},
operation::{Aggregate, Create, DropDatabase, ListCollections, RunCommand},
options::{
AggregateOptions,
CollectionOptions,
CreateCollectionOptions,
DatabaseOptions,
DropDatabaseOptions,
ListCollectionsOptions,
},
selection_criteria::SelectionCriteria,
Client,
Collection,
Namespace,
};
#[derive(Clone, Debug)]
pub struct Database {
inner: Arc<DatabaseInner>,
}
#[derive(Debug)]
struct DatabaseInner {
client: Client,
name: String,
selection_criteria: Option<SelectionCriteria>,
read_concern: Option<ReadConcern>,
write_concern: Option<WriteConcern>,
}
impl Database {
pub(crate) fn new(client: Client, name: &str, options: Option<DatabaseOptions>) -> Self {
let options = options.unwrap_or_default();
let selection_criteria = options
.selection_criteria
.or_else(|| client.selection_criteria().cloned());
let read_concern = options
.read_concern
.or_else(|| client.read_concern().cloned());
let write_concern = options
.write_concern
.or_else(|| client.write_concern().cloned());
Self {
inner: Arc::new(DatabaseInner {
client,
name: name.to_string(),
selection_criteria,
read_concern,
write_concern,
}),
}
}
pub(crate) fn client(&self) -> &Client {
&self.inner.client
}
pub fn name(&self) -> &str {
&self.inner.name
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.write_concern.as_ref()
}
pub fn collection(&self, name: &str) -> Collection {
Collection::new(self.clone(), name, None)
}
pub fn collection_with_options(&self, name: &str, options: CollectionOptions) -> Collection {
Collection::new(self.clone(), name, Some(options))
}
pub async fn drop(&self, options: impl Into<Option<DropDatabaseOptions>>) -> Result<()> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let drop_database = DropDatabase::new(self.name().to_string(), options);
self.client().execute_operation(drop_database).await
}
pub async fn list_collections(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListCollectionsOptions>>,
) -> Result<Cursor> {
let list_collections = ListCollections::new(
self.name().to_string(),
filter.into(),
false,
options.into(),
);
self.client()
.execute_cursor_operation(list_collections)
.await
.map(|(spec, session)| Cursor::new(self.client().clone(), spec, session))
}
pub async fn list_collection_names(
&self,
filter: impl Into<Option<Document>>,
) -> Result<Vec<String>> {
let list_collections =
ListCollections::new(self.name().to_string(), filter.into(), true, None);
let cursor = self
.client()
.execute_cursor_operation(list_collections)
.await
.map(|(spec, session)| Cursor::new(self.client().clone(), spec, session))?;
cursor
.and_then(|doc| match doc.get("name").and_then(Bson::as_str) {
Some(name) => futures::future::ok(name.into()),
None => futures::future::err(
ErrorKind::ResponseError {
message: "Expected name field in server response, but there was none."
.to_string(),
}
.into(),
),
})
.try_collect()
.await
}
pub async fn create_collection(
&self,
name: &str,
options: impl Into<Option<CreateCollectionOptions>>,
) -> Result<()> {
let mut options = options.into();
resolve_options!(self, options, [write_concern]);
let create = Create::new(
Namespace {
db: self.name().to_string(),
coll: name.to_string(),
},
options,
);
self.client().execute_operation(create).await
}
pub async fn run_command(
&self,
command: Document,
selection_criteria: impl Into<Option<SelectionCriteria>>,
) -> Result<Document> {
let operation = RunCommand::new(self.name().into(), command, selection_criteria.into())?;
self.client().execute_operation(operation).await
}
pub async fn aggregate(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<AggregateOptions>>,
) -> Result<Cursor> {
let mut options = options.into();
resolve_options!(
self,
options,
[read_concern, write_concern, selection_criteria]
);
let aggregate = Aggregate::new(self.name().to_string(), pipeline, options);
let client = self.client();
client
.execute_cursor_operation(aggregate)
.await
.map(|(spec, session)| Cursor::new(client.clone(), spec, session))
}
}