pub mod auth;
mod executor;
pub mod options;
use std::{
sync::{Arc, RwLock},
time::Duration,
};
use time::PreciseTime;
use bson::{Bson, Document};
use derivative::Derivative;
use crate::{
concern::{ReadConcern, WriteConcern},
db::Database,
error::{ErrorKind, Result},
event::command::CommandEventHandler,
operation::ListDatabases,
options::{ClientOptions, DatabaseOptions},
sdam::{Server, Topology, TopologyUpdateCondvar},
selection_criteria::{ReadPreference, SelectionCriteria},
};
const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<ClientInner>,
}
#[derive(Derivative)]
#[derivative(Debug)]
struct ClientInner {
topology: Arc<RwLock<Topology>>,
options: ClientOptions,
#[derivative(Debug = "ignore")]
condvar: TopologyUpdateCondvar,
}
impl Client {
pub fn with_uri_str(uri: &str) -> Result<Self> {
let options = ClientOptions::parse(uri)?;
Client::with_options(options)
}
pub fn with_options(options: ClientOptions) -> Result<Self> {
let condvar = TopologyUpdateCondvar::new();
let inner = Arc::new(ClientInner {
topology: Topology::new(condvar.clone(), options.clone())?,
condvar,
options,
});
Ok(Self { inner })
}
pub(crate) fn emit_command_event(&self, emit: impl FnOnce(&Arc<dyn CommandEventHandler>)) {
if let Some(ref handler) = self.inner.options.command_event_handler {
emit(handler);
}
}
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.inner.options.selection_criteria.as_ref()
}
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.inner.options.read_concern.as_ref()
}
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.inner.options.write_concern.as_ref()
}
pub fn database(&self, name: &str) -> Database {
Database::new(self.clone(), name, None)
}
pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
Database::new(self.clone(), name, Some(options))
}
pub fn list_databases(&self, filter: impl Into<Option<Document>>) -> Result<Vec<Document>> {
let op = ListDatabases::new(filter.into(), false);
self.execute_operation(&op, None)
}
pub fn list_database_names(&self, filter: impl Into<Option<Document>>) -> Result<Vec<String>> {
let op = ListDatabases::new(filter.into(), true);
match self.execute_operation(&op, None) {
Ok(databases) => databases
.into_iter()
.map(|doc| {
let name = doc.get("name").and_then(Bson::as_str).ok_or_else(|| {
ErrorKind::ResponseError {
message: "Expected \"name\" field in server response, but it was not \
found"
.to_string(),
}
})?;
Ok(name.to_string())
})
.collect(),
Err(e) => Err(e),
}
}
fn topology(&self) -> Arc<RwLock<Topology>> {
self.inner.topology.clone()
}
fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<Arc<Server>> {
let criteria =
criteria.unwrap_or_else(|| &SelectionCriteria::ReadPreference(ReadPreference::Primary));
let start_time = PreciseTime::now();
let timeout = self
.inner
.options
.server_selection_timeout
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
while start_time.to(PreciseTime::now()).to_std().unwrap() < timeout {
let topology = self.inner.topology.read().unwrap().clone();
if let Some(error_msg) = topology.description.compatibility_error() {
return Err(ErrorKind::ServerSelectionError {
message: error_msg.into(),
}
.into());
}
let server = topology
.description
.select_server(&criteria)?
.and_then(|server| topology.servers.get(&server.address));
if let Some(server) = server {
return Ok(server.clone());
}
topology.request_topology_check();
self.inner
.condvar
.wait_timeout(timeout - start_time.to(PreciseTime::now()).to_std().unwrap());
}
Err(ErrorKind::ServerSelectionError {
message: self
.inner
.topology
.read()
.unwrap()
.description
.server_selection_timeout_error_message(&criteria),
}
.into())
}
}