pub mod auth;
mod executor;
pub mod options;
pub mod session;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use derivative::Derivative;
#[cfg(test)]
use crate::options::ServerAddress;
use crate::{
bson::Document,
change_stream::{
event::ChangeStreamEvent,
options::ChangeStreamOptions,
session::SessionChangeStream,
ChangeStream,
},
concern::{ReadConcern, WriteConcern},
db::Database,
error::{ErrorKind, Result},
event::command::CommandEventHandler,
operation::{AggregateTarget, ListDatabases},
options::{
ClientOptions,
DatabaseOptions,
ListDatabasesOptions,
ReadPreference,
SelectionCriteria,
SessionOptions,
},
results::DatabaseSpecification,
sdam::{server_selection, SelectedServer, SessionSupportStatus, Topology},
ClientSession,
};
pub(crate) use executor::{HELLO_COMMAND_NAMES, REDACTED_COMMANDS};
pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};
const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<ClientInner>,
}
#[derive(Derivative)]
#[derivative(Debug)]
struct ClientInner {
topology: Topology,
options: ClientOptions,
session_pool: ServerSessionPool,
}
impl Client {
pub async fn with_uri_str(uri: impl AsRef<str>) -> Result<Self> {
let options = ClientOptions::parse_uri(uri.as_ref(), None).await?;
Client::with_options(options)
}
pub fn with_options(options: ClientOptions) -> Result<Self> {
options.validate()?;
let inner = Arc::new(ClientInner {
topology: Topology::new(options.clone())?,
session_pool: ServerSessionPool::new(),
options,
});
Ok(Self { inner })
}
pub(crate) fn emit_command_event(&self, emit: impl FnOnce(&Arc<dyn CommandEventHandler>)) {
if let Some(ref handler) = self.inner.options.command_event_handler {
emit(handler);
}
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.options.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.options.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.options.write_concern.as_ref()
}
pub fn database(&self, name: &str) -> Database {
Database::new(self.clone(), name, None)
}
pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
Database::new(self.clone(), name, Some(options))
}
pub fn default_database(&self) -> Option<Database> {
self.inner
.options
.default_database
.as_ref()
.map(|db_name| self.database(db_name))
}
async fn list_databases_common(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
session: Option<&mut ClientSession>,
) -> Result<Vec<DatabaseSpecification>> {
let op = ListDatabases::new(filter.into(), false, options.into());
self.execute_operation(op, session).await.and_then(|dbs| {
dbs.into_iter()
.map(|db_spec| {
bson::from_slice(db_spec.as_bytes()).map_err(crate::error::Error::from)
})
.collect()
})
}
pub async fn list_databases(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<DatabaseSpecification>> {
self.list_databases_common(filter, options, None).await
}
pub async fn list_databases_with_session(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
session: &mut ClientSession,
) -> Result<Vec<DatabaseSpecification>> {
self.list_databases_common(filter, options, Some(session))
.await
}
pub async fn list_database_names(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<String>> {
let op = ListDatabases::new(filter.into(), true, options.into());
match self.execute_operation(op, None).await {
Ok(databases) => databases
.into_iter()
.map(|doc| {
let name = doc
.get_str("name")
.map_err(|_| ErrorKind::InvalidResponse {
message: "Expected \"name\" field in server response, but it was not \
found"
.to_string(),
})?;
Ok(name.to_string())
})
.collect(),
Err(e) => Err(e),
}
}
pub async fn start_session(
&self,
options: impl Into<Option<SessionOptions>>,
) -> Result<ClientSession> {
let options = options.into();
if let Some(ref options) = options {
options.validate()?;
}
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} => Ok(self
.start_session_with_timeout(logical_session_timeout, options, false)
.await),
_ => Err(ErrorKind::SessionsNotSupported.into()),
}
}
pub async fn watch(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<ChangeStreamOptions>>,
) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
let mut options = options.into();
resolve_options!(self, options, [read_concern, selection_criteria]);
options
.get_or_insert_with(Default::default)
.all_changes_for_cluster = Some(true);
let target = AggregateTarget::Database("admin".to_string());
self.execute_watch(pipeline, options, target, None).await
}
pub async fn watch_with_session(
&self,
pipeline: impl IntoIterator<Item = Document>,
options: impl Into<Option<ChangeStreamOptions>>,
session: &mut ClientSession,
) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
let mut options = options.into();
resolve_read_concern_with_session!(self, options, Some(&mut *session))?;
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
options
.get_or_insert_with(Default::default)
.all_changes_for_cluster = Some(true);
let target = AggregateTarget::Database("admin".to_string());
self.execute_watch_with_session(pipeline, options, target, None, session)
.await
}
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
let session_support_status = self.inner.topology.session_support_status();
if let SessionSupportStatus::Supported {
logical_session_timeout,
} = session_support_status
{
self.inner
.session_pool
.check_in(session, logical_session_timeout)
.await;
}
}
pub(crate) async fn start_session_with_timeout(
&self,
logical_session_timeout: Option<Duration>,
options: Option<SessionOptions>,
is_implicit: bool,
) -> ClientSession {
ClientSession::new(
self.inner
.session_pool
.check_out(logical_session_timeout)
.await,
self.clone(),
options,
is_implicit,
)
}
#[cfg(test)]
pub(crate) async fn clear_session_pool(&self) {
self.inner.session_pool.clear().await;
}
#[cfg(test)]
pub(crate) async fn is_session_checked_in(&self, id: &Document) -> bool {
self.inner.session_pool.contains(id).await
}
#[cfg(test)]
pub(crate) async fn test_select_server(
&self,
criteria: Option<&SelectionCriteria>,
) -> Result<ServerAddress> {
let server = self.select_server(criteria).await?;
Ok(server.address.clone())
}
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<SelectedServer> {
let criteria =
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
let start_time = Instant::now();
let timeout = self
.inner
.options
.server_selection_timeout
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
let mut watcher = self.inner.topology.watch();
loop {
let state = watcher.observe_latest();
if let Some(server) = server_selection::attempt_to_select_server(
criteria,
&state.description,
&state.servers,
)? {
return Ok(server);
}
self.inner.topology.request_update();
let change_occurred = start_time.elapsed() < timeout
&& watcher
.wait_for_update(timeout - start_time.elapsed())
.await;
if !change_occurred {
return Err(ErrorKind::ServerSelection {
message: self
.inner
.topology
.server_selection_timeout_error_message(criteria),
}
.into());
}
}
}
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
pub(crate) async fn get_hosts(&self) -> Vec<String> {
let watcher = self.inner.topology.watch();
let state = watcher.peek_latest();
let servers = state.servers.keys();
servers
.map(|stream_address| format!("{}", stream_address))
.collect()
}
#[cfg(test)]
pub(crate) async fn sync_workers(&self) {
self.inner.topology.sync_workers().await;
}
#[cfg(test)]
pub(crate) fn topology_description(&self) -> crate::sdam::TopologyDescription {
self.inner
.topology
.watch()
.peek_latest()
.description
.clone()
}
}