#![cfg_attr(feature = "clippy", feature(plugin))]
#![cfg_attr(feature = "clippy", plugin(clippy))]
#![cfg_attr(feature = "clippy", allow(
doc_markdown,
double_parens,
match_wild_err_arm,
too_many_arguments,
))]
#![cfg_attr(feature = "clippy", warn(
cast_precision_loss,
enum_glob_use,
filter_map,
if_not_else,
invalid_upcast_comparisons,
items_after_statements,
mem_forget,
mut_mut,
mutex_integer,
non_ascii_literal,
nonminimal_bool,
option_map_unwrap_or,
option_map_unwrap_or_else,
print_stdout,
shadow_reuse,
shadow_same,
shadow_unrelated,
similar_names,
unicode_not_nfc,
unseparated_literal_suffix,
used_underscore_binding,
wrong_pub_self_convention,
))]
#[doc(html_root_url = "https://docs.rs/mongodb")]
#[macro_use]
extern crate bitflags;
extern crate bson;
extern crate bufstream;
extern crate byteorder;
extern crate chrono;
extern crate data_encoding;
#[cfg(feature = "ssl")]
extern crate openssl;
extern crate rand;
#[macro_use]
extern crate scan_fmt;
extern crate semver;
extern crate serde;
#[macro_use(Serialize, Deserialize)]
extern crate serde_derive;
extern crate separator;
extern crate textnonce;
extern crate time;
extern crate md5;
extern crate sha1;
extern crate hmac;
extern crate pbkdf2;
extern crate hex;
extern crate trust_dns_resolver;
pub mod db;
pub mod coll;
pub mod common;
pub mod connstring;
pub mod cursor;
pub mod error;
pub mod gridfs;
pub mod pool;
pub mod r2d2_mongo;
pub mod stream;
pub mod topology;
pub mod wire_protocol;
mod apm;
mod auth;
mod command_type;
pub use bson::*;
pub use apm::{CommandStarted, CommandResult};
pub use command_type::CommandType;
pub use error::{Error, ErrorCode, Result};
use std::fmt;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicIsize, Ordering};
use apm::Listener;
use common::{ReadPreference, ReadMode, WriteConcern};
use connstring::{ConnectionString, ConnectionProtocol};
use db::{Database, ThreadedDatabase};
use error::Error::ResponseError;
use pool::PooledStream;
use stream::StreamConnector;
use topology::{Topology, TopologyDescription, TopologyType, DEFAULT_HEARTBEAT_FREQUENCY_MS,
DEFAULT_LOCAL_THRESHOLD_MS, DEFAULT_SERVER_SELECTION_TIMEOUT_MS};
use topology::server::Server;
use std::time::Duration;
pub const DRIVER_NAME: &str = "mongodb-cwal-rs";
pub struct ClientInner {
pub read_preference: ReadPreference,
pub write_concern: WriteConcern,
req_id: Arc<AtomicIsize>,
topology: Topology,
listener: Listener,
log_file: Option<Mutex<File>>,
}
impl fmt::Debug for ClientInner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ClientInner")
.field("read_preference", &self.read_preference)
.field("write_concern", &self.write_concern)
.field("req_id", &self.req_id)
.field("topology", &self.topology)
.field("listener", &"Listener { .. }")
.field("log_file", &self.log_file)
.finish()
}
}
#[derive(Default, Debug, Clone)]
pub struct ClientOptions {
pub pool_size: Option<usize>,
pub idle_connection_timeout: Option<Duration>,
pub log_file: Option<String>,
pub read_preference: Option<ReadPreference>,
pub write_concern: Option<WriteConcern>,
pub heartbeat_frequency_ms: u32,
pub server_selection_timeout_ms: i64,
pub local_threshold_ms: i64,
pub stream_connector: StreamConnector,
}
impl ClientOptions {
pub fn new() -> ClientOptions {
ClientOptions {
pool_size: None,
idle_connection_timeout: None,
log_file: None,
read_preference: None,
write_concern: None,
heartbeat_frequency_ms: DEFAULT_HEARTBEAT_FREQUENCY_MS,
server_selection_timeout_ms: DEFAULT_SERVER_SELECTION_TIMEOUT_MS,
local_threshold_ms: DEFAULT_LOCAL_THRESHOLD_MS,
stream_connector: StreamConnector::default(),
}
}
pub fn with_log_file(file: &str) -> ClientOptions {
let mut options = ClientOptions::new();
options.log_file = Some(String::from(file));
options
}
#[cfg(feature = "ssl")]
pub fn with_ssl(
ca_file: Option<&str>,
certificate_file: &str,
key_file: &str,
verify_peer: bool,
) -> ClientOptions {
let mut options = ClientOptions::new();
options.stream_connector =
StreamConnector::with_ssl(ca_file, certificate_file, key_file, verify_peer);
options
}
#[cfg(feature = "ssl")]
pub fn with_unauthenticated_ssl(ca_file: Option<&str>, verify_peer: bool) -> ClientOptions {
let mut options = ClientOptions::new();
options.stream_connector = StreamConnector::with_unauthenticated_ssl(ca_file, verify_peer);
options
}
}
pub trait ThreadedClient: Sync + Sized {
fn connect(host: &str, port: u16) -> Result<Self>;
fn connect_with_options(host: &str, port: u16, ClientOptions) -> Result<Self>;
fn with_uri(uri: &str) -> Result<Self>;
fn with_uri_and_options(uri: &str, options: ClientOptions) -> Result<Self>;
fn with_config(
config: ConnectionString,
options: Option<ClientOptions>,
description: Option<TopologyDescription>,
) -> Result<Self>;
fn db(&self, db_name: &str) -> Database;
fn db_with_prefs(
&self,
db_name: &str,
read_preference: Option<ReadPreference>,
write_concern: Option<WriteConcern>,
) -> Database;
fn acquire_stream(&self, read_pref: ReadPreference) -> Result<(PooledStream, bool, bool)>;
fn acquire_write_stream(&self) -> Result<PooledStream>;
fn get_req_id(&self) -> i32;
fn database_names(&self) -> Result<Vec<String>>;
fn drop_database(&self, db_name: &str) -> Result<()>;
fn is_master(&self) -> Result<bool>;
fn add_start_hook(&mut self, hook: fn(Client, &CommandStarted)) -> Result<()>;
fn add_completion_hook(&mut self, hook: fn(Client, &CommandResult)) -> Result<()>;
}
pub type Client = Arc<ClientInner>;
impl ThreadedClient for Client {
fn connect(host: &str, port: u16) -> Result<Client> {
let config = ConnectionString::new(host, port);
let mut description = TopologyDescription::new(StreamConnector::Tcp);
description.topology_type = TopologyType::Single;
Client::with_config(config, None, Some(description))
}
fn connect_with_options(host: &str, port: u16, options: ClientOptions) -> Result<Client> {
let config = ConnectionString::new(host, port);
let mut description = TopologyDescription::new(options.stream_connector.clone());
description.topology_type = TopologyType::Single;
Client::with_config(config, Some(options), Some(description))
}
fn with_uri(uri: &str) -> Result<Client> {
let config = connstring::parse(uri)?;
Client::with_config(config, None, None)
}
fn with_uri_and_options(uri: &str, options: ClientOptions) -> Result<Client> {
let config = connstring::parse(uri)?;
Client::with_config(config, Some(options), None)
}
fn with_config(
mut config: ConnectionString,
options: Option<ClientOptions>,
description: Option<TopologyDescription>,
) -> Result<Client> {
let client_options = options.unwrap_or_else(ClientOptions::new);
let rp = client_options.read_preference.unwrap_or_else(|| {
ReadPreference::new(ReadMode::Primary, None)
});
let wc = client_options.write_concern.unwrap_or_else(
WriteConcern::new,
);
let listener = Listener::new();
let file = match client_options.log_file {
Some(string) => {
let _ = listener.add_start_hook(log_command_started);
let _ = listener.add_completion_hook(log_command_completed);
Some(Mutex::new(
OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(&string)?
))
}
None => None,
};
let client = Arc::new(ClientInner {
req_id: Arc::new(AtomicIsize::new(0)),
topology: Topology::new(
config.clone(),
description,
client_options.stream_connector.clone(),
)?,
listener: listener,
read_preference: rp,
write_concern: wc,
log_file: file,
});
{
let top_description = &client.topology.description;
let mut top = top_description.write()?;
top.heartbeat_frequency_ms = client_options.heartbeat_frequency_ms;
top.server_selection_timeout_ms = client_options.server_selection_timeout_ms;
top.local_threshold_ms = client_options.local_threshold_ms;
if let ConnectionProtocol::DNS(dns) = &mut config.hosts {
dns.discover_hosts()?;
}
for host in config.hosts.into_iter() {
let server = Server::new(
client.clone(),
host.clone(),
top_description.clone(),
true,
client_options.stream_connector.clone(),
client_options.pool_size,
client_options.idle_connection_timeout,
);
top.servers.insert(host, server);
}
}
Ok(client)
}
fn db(&self, db_name: &str) -> Database {
Database::open(self.clone(), db_name, None, None)
}
fn db_with_prefs(
&self,
db_name: &str,
read_preference: Option<ReadPreference>,
write_concern: Option<WriteConcern>,
) -> Database {
Database::open(self.clone(), db_name, read_preference, write_concern)
}
fn acquire_stream(
&self,
read_preference: ReadPreference,
) -> Result<(PooledStream, bool, bool)> {
self.topology.acquire_stream(self.clone(), read_preference)
}
fn acquire_write_stream(&self) -> Result<PooledStream> {
self.topology.acquire_write_stream(self.clone())
}
fn get_req_id(&self) -> i32 {
self.req_id.fetch_add(1, Ordering::SeqCst) as i32
}
fn database_names(&self) -> Result<Vec<String>> {
let doc = doc!{ "listDatabases": 1 };
let db = self.db("admin");
let res = db.command(doc, CommandType::ListDatabases, None)?;
if let Some(&Bson::Array(ref batch)) = res.get("databases") {
let map = batch
.iter()
.filter_map(|bdoc| {
if let Bson::Document(ref doc) = *bdoc {
if let Some(&Bson::String(ref name)) = doc.get("name") {
return Some(name.to_owned());
}
}
None
})
.collect();
Ok(map)
} else {
Err(ResponseError(
String::from("Server reply does not contain 'databases'."),
))
}
}
fn drop_database(&self, db_name: &str) -> Result<()> {
self.db(db_name).drop_database()
}
fn is_master(&self) -> Result<bool> {
let doc = doc!{ "isMaster": 1 };
let db = self.db("local");
let res = db.command(doc, CommandType::IsMaster, None)?;
match res.get("ismaster") {
Some(&Bson::Boolean(is_master)) => Ok(is_master),
_ => Err(ResponseError(
String::from("Server reply does not contain 'ismaster'."),
)),
}
}
fn add_start_hook(&mut self, hook: fn(Client, &CommandStarted)) -> Result<()> {
self.listener.add_start_hook(hook)
}
fn add_completion_hook(&mut self, hook: fn(Client, &CommandResult)) -> Result<()> {
self.listener.add_completion_hook(hook)
}
}
fn log_command_started(client: Client, command_started: &CommandStarted) {
let mutex = match client.log_file {
Some(ref mutex) => mutex,
None => return,
};
let mut guard = match mutex.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let _ = writeln!(guard.deref_mut(), "{}", command_started);
}
fn log_command_completed(client: Client, command_result: &CommandResult) {
let mutex = match client.log_file {
Some(ref mutex) => mutex,
None => return,
};
let mut guard = match mutex.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let _ = writeln!(guard.deref_mut(), "{}", command_result);
}