1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
pub mod session;
use super::{ChangeStream, ClientSession, Database, SessionChangeStream};
use crate::{
bson::Document,
change_stream::{event::ChangeStreamEvent, options::ChangeStreamOptions},
concern::{ReadConcern, WriteConcern},
error::Result,
options::{
ClientOptions,
DatabaseOptions,
ListDatabasesOptions,
SelectionCriteria,
SessionOptions,
},
results::DatabaseSpecification,
runtime,
Client as AsyncClient,
};
/// This is the main entry point for the synchronous API. A `Client` is used to connect to a MongoDB
/// cluster. By default, it will monitor the topology of the cluster, keeping track of any changes,
/// such as servers being added or removed.
///
/// `Client` is a wrapper around the asynchronous [`mongodb::Client`](../struct.Client.html), and it
/// starts up an async-std runtime internally to run that wrapped client on.
///
/// `Client` uses [`std::sync::Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html) internally,
/// so it can safely be shared across threads. For example:
///
/// ```rust
/// # use mongodb::{bson::Document, sync::Client, error::Result};
/// #
/// # fn start_workers() -> Result<()> {
/// let client = Client::with_uri_str("mongodb://example.com")?;
///
/// for i in 0..5 {
/// let client_ref = client.clone();
///
/// std::thread::spawn(move || {
/// let collection = client_ref.database("items").collection::<Document>(&format!("coll{}", i));
///
/// // Do something with the collection
/// });
/// }
/// #
/// # // Technically we should join the threads here, but for the purpose of the example, we'll just
/// # // sleep for a bit.
/// # std::thread::sleep(std::time::Duration::from_secs(3));
/// # Ok(())
/// # }
/// ```
///
/// ## TCP Keepalive
/// TCP keepalive is enabled by default with ``tcp_keepalive_time`` set to 120 seconds. The
/// driver does not set ``tcp_keepalive_intvl``. See the
/// [MongoDB Diagnostics FAQ keepalive section](https://docs.mongodb.com/manual/faq/diagnostics/#does-tcp-keepalive-time-affect-mongodb-deployments)
/// for instructions on setting these values at the system level.
#[derive(Clone, Debug)]
pub struct Client {
async_client: AsyncClient,
}
impl From<AsyncClient> for Client {
fn from(async_client: AsyncClient) -> Self {
Self { async_client }
}
}
impl Client {
/// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
/// MongoDB connection string.
///
/// See the documentation on
/// [`ClientOptions::parse`](../options/struct.ClientOptions.html#method.parse) for more
/// details.
pub fn with_uri_str(uri: impl AsRef<str>) -> Result<Self> {
let async_client = runtime::block_on(AsyncClient::with_uri_str(uri.as_ref()))?;
Ok(Self { async_client })
}
/// Creates a new `Client` connected to the cluster specified by `options`.
pub fn with_options(options: ClientOptions) -> Result<Self> {
let async_client = AsyncClient::with_options(options)?;
Ok(Self { async_client })
}
/// Gets the default selection criteria the `Client` uses for operations..
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.async_client.selection_criteria()
}
/// Gets the default read concern the `Client` uses for operations.
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.async_client.read_concern()
}
/// Gets the default write concern the `Client` uses for operations.
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.async_client.write_concern()
}
/// Gets a handle to a database specified by `name` in the cluster the `Client` is connected to.
/// The `Database` options (e.g. read preference and write concern) will default to those of the
/// `Client`.
///
/// This method does not send or receive anything across the wire to the database, so it can be
/// used repeatedly without incurring any costs from I/O.
pub fn database(&self, name: &str) -> Database {
Database::new(self.async_client.database(name))
}
/// Gets a handle to a database specified by `name` in the cluster the `Client` is connected to.
/// Operations done with this `Database` will use the options specified by `options` by default
/// and will otherwise default to those of the `Client`.
///
/// This method does not send or receive anything across the wire to the database, so it can be
/// used repeatedly without incurring any costs from I/O.
pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
Database::new(self.async_client.database_with_options(name, options))
}
/// Gets a handle to the default database specified in the `ClientOptions` or MongoDB connection
/// string used to construct this `Client`.
///
/// If no default database was specified, `None` will be returned.
pub fn default_database(&self) -> Option<Database> {
self.async_client.default_database().map(Database::new)
}
/// Gets information about each database present in the cluster the Client is connected to.
pub fn list_databases(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<DatabaseSpecification>> {
runtime::block_on(
self.async_client
.list_databases(filter.into(), options.into()),
)
}
/// Gets the names of the databases present in the cluster the Client is connected to.
pub fn list_database_names(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<String>> {
runtime::block_on(
self.async_client
.list_database_names(filter.into(), options.into()),
)
}
/// Starts a new `ClientSession`.
pub fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
runtime::block_on(self.async_client.start_session(options)).map(Into::into)
}
/// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
/// stream does not observe changes from system collections or the "config", "local" or
/// "admin" databases. Note that this method (`watch` on a cluster) is only supported in
/// MongoDB 4.0 or greater.
///
/// See the documentation [here](https://docs.mongodb.com/manual/changeStreams/) on change
/// streams.
///
/// Change streams require either a "majority" read concern or no read
/// concern. Anything else will cause a server error.
///
/// Note that using a `$project` stage to remove any of the `_id` `operationType` or `ns` fields
/// will cause an error. The driver requires these fields to support resumability. For
/// more information on resumability, see the documentation for
/// [`ChangeStream`](change_stream/struct.ChangeStream.html)
///
/// If the pipeline alters the structure of the returned events, the parsed type will need to be
/// changed via [`ChangeStream::with_type`].
pub fn watch(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<ChangeStreamOptions>>,
) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
runtime::block_on(self.async_client.watch(pipeline, options)).map(ChangeStream::new)
}
/// Starts a new [`SessionChangeStream`] that receives events for all changes in the cluster
/// using the provided [`ClientSession`]. See [`Client::watch`] for more information.
pub fn watch_with_session(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<ChangeStreamOptions>>,
session: &mut ClientSession,
) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
runtime::block_on(self.async_client.watch_with_session(
pipeline,
options,
&mut session.async_client_session,
))
.map(SessionChangeStream::new)
}
}