pub mod auth;
mod executor;
pub mod options;
mod session;
use std::{sync::Arc, time::Duration};
use derivative::Derivative;
use time::PreciseTime;
#[cfg(test)]
use crate::options::StreamAddress;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
db::Database,
error::{ErrorKind, Result},
event::command::CommandEventHandler,
operation::ListDatabases,
options::{
ClientOptions,
DatabaseOptions,
ListDatabasesOptions,
ReadPreference,
SelectionCriteria,
},
sdam::{Server, SessionSupportStatus, Topology},
};
pub(crate) use session::{ClientSession, ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};
const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<ClientInner>,
}
#[derive(Derivative)]
#[derivative(Debug)]
struct ClientInner {
topology: Topology,
options: ClientOptions,
session_pool: ServerSessionPool,
}
impl Client {
pub async fn with_uri_str(uri: &str) -> Result<Self> {
let options = ClientOptions::parse(uri).await?;
Client::with_options(options)
}
pub fn with_options(options: ClientOptions) -> Result<Self> {
options.validate()?;
let inner = Arc::new(ClientInner {
topology: Topology::new(options.clone())?,
session_pool: ServerSessionPool::new(),
options,
});
Ok(Self { inner })
}
pub(crate) fn emit_command_event(&self, emit: impl FnOnce(&Arc<dyn CommandEventHandler>)) {
if let Some(ref handler) = self.inner.options.command_event_handler {
emit(handler);
}
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.options.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.options.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.options.write_concern.as_ref()
}
pub fn database(&self, name: &str) -> Database {
Database::new(self.clone(), name, None)
}
pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
Database::new(self.clone(), name, Some(options))
}
pub async fn list_databases(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<Document>> {
let op = ListDatabases::new(filter.into(), false, options.into());
self.execute_operation(op).await
}
pub async fn list_database_names(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<String>> {
let op = ListDatabases::new(filter.into(), true, options.into());
match self.execute_operation(op).await {
Ok(databases) => databases
.into_iter()
.map(|doc| {
let name = doc.get("name").and_then(Bson::as_str).ok_or_else(|| {
ErrorKind::ResponseError {
message: "Expected \"name\" field in server response, but it was not \
found"
.to_string(),
}
})?;
Ok(name.to_string())
})
.collect(),
Err(e) => Err(e),
}
}
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
let session_support_status = self.inner.topology.session_support_status().await;
if let SessionSupportStatus::Supported {
logical_session_timeout,
} = session_support_status
{
self.inner
.session_pool
.check_in(session, logical_session_timeout)
.await;
}
}
pub(crate) async fn start_implicit_session_with_timeout(
&self,
logical_session_timeout: Duration,
) -> ClientSession {
ClientSession::new_implicit(
self.inner
.session_pool
.check_out(logical_session_timeout)
.await,
self.clone(),
)
}
#[cfg(test)]
pub(crate) async fn clear_session_pool(&self) {
self.inner.session_pool.clear().await;
}
#[cfg(test)]
pub(crate) async fn is_session_checked_in(&self, id: &Document) -> bool {
self.inner.session_pool.contains(id).await
}
#[cfg(test)]
pub(crate) async fn test_select_server(
&self,
criteria: Option<&SelectionCriteria>,
) -> Result<StreamAddress> {
let server = self.select_server(criteria).await?;
Ok(server.address.clone())
}
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<Arc<Server>> {
let criteria =
criteria.unwrap_or_else(|| &SelectionCriteria::ReadPreference(ReadPreference::Primary));
let start_time = PreciseTime::now();
let timeout = time::Duration::from_std(
self.inner
.options
.server_selection_timeout
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT),
)?;
loop {
let selected_server = self
.inner
.topology
.attempt_to_select_server(criteria)
.await?;
if let Some(server) = selected_server {
return Ok(server);
}
self.inner.topology.request_topology_check();
let time_passed = start_time.to(PreciseTime::now());
let time_remaining = std::cmp::max(time::Duration::zero(), timeout - time_passed);
let message_received = self
.inner
.topology
.wait_for_topology_change(time_remaining.to_std()?)
.await;
if !message_received {
return Err(ErrorKind::ServerSelectionError {
message: self
.inner
.topology
.server_selection_timeout_error_message(&criteria)
.await,
}
.into());
}
}
}
}