pub mod auth;
mod executor;
pub mod options;
pub mod session;
use std::{sync::Arc, time::Duration};
use bson::Bson;
use derivative::Derivative;
use std::time::Instant;
#[cfg(test)]
use crate::options::ServerAddress;
use crate::{
bson::Document,
concern::{ReadConcern, WriteConcern},
db::Database,
error::{ErrorKind, Result},
event::command::CommandEventHandler,
operation::ListDatabases,
options::{
ClientOptions,
DatabaseOptions,
ListDatabasesOptions,
ReadPreference,
SelectionCriteria,
SessionOptions,
},
results::DatabaseSpecification,
sdam::{SelectedServer, SessionSupportStatus, Topology},
ClientSession,
};
pub(crate) use executor::{HELLO_COMMAND_NAMES, REDACTED_COMMANDS};
pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};
const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<ClientInner>,
}
#[derive(Derivative)]
#[derivative(Debug)]
struct ClientInner {
topology: Topology,
options: ClientOptions,
session_pool: ServerSessionPool,
}
impl Drop for ClientInner {
fn drop(&mut self) {
self.topology.close()
}
}
impl Client {
pub async fn with_uri_str(uri: impl AsRef<str>) -> Result<Self> {
let options = ClientOptions::parse_uri(uri.as_ref(), None).await?;
Client::with_options(options)
}
pub fn with_options(options: ClientOptions) -> Result<Self> {
options.validate()?;
let inner = Arc::new(ClientInner {
topology: Topology::new(options.clone())?,
session_pool: ServerSessionPool::new(),
options,
});
Ok(Self { inner })
}
pub(crate) fn emit_command_event(&self, emit: impl FnOnce(&Arc<dyn CommandEventHandler>)) {
if let Some(ref handler) = self.inner.options.command_event_handler {
emit(handler);
}
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.options.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.options.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.options.write_concern.as_ref()
}
pub fn database(&self, name: &str) -> Database {
Database::new(self.clone(), name, None)
}
pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
Database::new(self.clone(), name, Some(options))
}
async fn list_databases_common(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
session: Option<&mut ClientSession>,
) -> Result<Vec<DatabaseSpecification>> {
let op = ListDatabases::new(filter.into(), false, options.into());
self.execute_operation(op, session).await.and_then(|dbs| {
dbs.into_iter()
.map(|db_spec| bson::from_document(db_spec).map_err(crate::error::Error::from))
.collect()
})
}
pub async fn list_databases(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<DatabaseSpecification>> {
self.list_databases_common(filter, options, None).await
}
pub async fn list_databases_with_session(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
session: &mut ClientSession,
) -> Result<Vec<DatabaseSpecification>> {
self.list_databases_common(filter, options, Some(session))
.await
}
pub async fn list_database_names(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<String>> {
let op = ListDatabases::new(filter.into(), true, options.into());
match self.execute_operation(op, None).await {
Ok(databases) => databases
.into_iter()
.map(|doc| {
let name = doc.get("name").and_then(Bson::as_str).ok_or_else(|| {
ErrorKind::InvalidResponse {
message: "Expected \"name\" field in server response, but it was not \
found"
.to_string(),
}
})?;
Ok(name.to_string())
})
.collect(),
Err(e) => Err(e),
}
}
pub async fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} => Ok(self
.start_session_with_timeout(logical_session_timeout, options, false)
.await),
_ => Err(ErrorKind::SessionsNotSupported.into()),
}
}
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
let session_support_status = self.inner.topology.session_support_status().await;
if let SessionSupportStatus::Supported {
logical_session_timeout,
} = session_support_status
{
self.inner
.session_pool
.check_in(session, logical_session_timeout)
.await;
}
}
pub(crate) async fn start_session_with_timeout(
&self,
logical_session_timeout: Option<Duration>,
options: Option<SessionOptions>,
is_implicit: bool,
) -> ClientSession {
ClientSession::new(
self.inner
.session_pool
.check_out(logical_session_timeout)
.await,
self.clone(),
options,
is_implicit,
)
}
#[cfg(test)]
pub(crate) async fn clear_session_pool(&self) {
self.inner.session_pool.clear().await;
}
#[cfg(test)]
pub(crate) async fn is_session_checked_in(&self, id: &Document) -> bool {
self.inner.session_pool.contains(id).await
}
#[cfg(test)]
pub(crate) async fn test_select_server(
&self,
criteria: Option<&SelectionCriteria>,
) -> Result<ServerAddress> {
let server = self.select_server(criteria).await?;
Ok(server.address.clone())
}
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<SelectedServer> {
let criteria =
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
let start_time = Instant::now();
let timeout = self
.inner
.options
.server_selection_timeout
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
loop {
let mut topology_change_subscriber =
self.inner.topology.subscribe_to_topology_changes();
let selected_server = self
.inner
.topology
.attempt_to_select_server(criteria)
.await?;
if let Some(server) = selected_server {
return Ok(server);
}
self.inner.topology.request_topology_check();
let time_passed = start_time.elapsed();
let time_remaining = timeout
.checked_sub(time_passed)
.unwrap_or_else(|| Duration::from_millis(0));
let change_occurred = topology_change_subscriber
.wait_for_message(time_remaining)
.await;
if !change_occurred {
return Err(ErrorKind::ServerSelection {
message: self
.inner
.topology
.server_selection_timeout_error_message(criteria)
.await,
}
.into());
}
}
}
#[cfg(all(test, not(feature = "sync")))]
pub(crate) async fn get_hosts(&self) -> Vec<String> {
let servers = self.inner.topology.servers().await;
servers
.iter()
.map(|stream_address| format!("{}", stream_address))
.collect()
}
}