#![cfg_attr(feature = "clippy", feature(plugin))]
#![cfg_attr(feature = "clippy", plugin(clippy))]
#![cfg_attr(feature = "clippy", allow(
doc_markdown,
// allow double_parens for bson/doc macro.
double_parens,
// more explicit than catch-alls.
match_wild_err_arm,
too_many_arguments,
))]
#![cfg_attr(feature = "clippy", warn(
cast_precision_loss,
enum_glob_use,
filter_map,
if_not_else,
invalid_upcast_comparisons,
items_after_statements,
mem_forget,
mut_mut,
mutex_integer,
non_ascii_literal,
nonminimal_bool,
option_map_unwrap_or,
option_map_unwrap_or_else,
print_stdout,
shadow_reuse,
shadow_same,
shadow_unrelated,
similar_names,
unicode_not_nfc,
unseparated_literal_suffix,
used_underscore_binding,
wrong_pub_self_convention,
))]
#[doc(html_root_url = "https://docs.rs/mongodb")]
#[macro_use]
extern crate bitflags;
#[macro_use(bson, doc)]
extern crate bson;
extern crate bufstream;
extern crate byteorder;
extern crate chrono;
extern crate crypto;
extern crate data_encoding;
#[cfg(feature = "ssl")]
extern crate openssl;
extern crate rand;
#[macro_use]
extern crate scan_fmt;
extern crate semver;
extern crate separator;
extern crate textnonce;
extern crate time;
pub mod db;
pub mod coll;
pub mod common;
pub mod connstring;
pub mod cursor;
pub mod error;
pub mod gridfs;
pub mod pool;
pub mod stream;
pub mod topology;
pub mod wire_protocol;
mod apm;
mod auth;
mod command_type;
pub use apm::{CommandStarted, CommandResult};
pub use command_type::CommandType;
pub use error::{Error, ErrorCode, Result};
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicIsize, Ordering, ATOMIC_ISIZE_INIT};
use apm::Listener;
use bson::Bson;
use common::{ReadPreference, ReadMode, WriteConcern};
use connstring::ConnectionString;
use db::{Database, ThreadedDatabase};
use error::Error::ResponseError;
use pool::PooledStream;
use stream::StreamConnector;
use topology::{Topology, TopologyDescription, TopologyType, DEFAULT_HEARTBEAT_FREQUENCY_MS,
DEFAULT_LOCAL_THRESHOLD_MS, DEFAULT_SERVER_SELECTION_TIMEOUT_MS};
use topology::server::Server;
pub struct ClientInner {
pub read_preference: ReadPreference,
pub write_concern: WriteConcern,
req_id: Arc<AtomicIsize>,
topology: Topology,
listener: Listener,
log_file: Option<Mutex<File>>,
}
#[derive(Default)]
pub struct ClientOptions {
pub log_file: Option<String>,
pub read_preference: Option<ReadPreference>,
pub write_concern: Option<WriteConcern>,
pub heartbeat_frequency_ms: u32,
pub server_selection_timeout_ms: i64,
pub local_threshold_ms: i64,
pub stream_connector: StreamConnector,
}
impl ClientOptions {
pub fn new() -> ClientOptions {
ClientOptions {
log_file: None,
read_preference: None,
write_concern: None,
heartbeat_frequency_ms: DEFAULT_HEARTBEAT_FREQUENCY_MS,
server_selection_timeout_ms: DEFAULT_SERVER_SELECTION_TIMEOUT_MS,
local_threshold_ms: DEFAULT_LOCAL_THRESHOLD_MS,
stream_connector: StreamConnector::default(),
}
}
pub fn with_log_file(file: &str) -> ClientOptions {
let mut options = ClientOptions::new();
options.log_file = Some(String::from(file));
options
}
#[cfg(feature = "ssl")]
pub fn with_ssl(
ca_file: &str,
certificate_file: &str,
key_file: &str,
verify_peer: bool,
) -> ClientOptions {
let mut options = ClientOptions::new();
options.stream_connector =
StreamConnector::with_ssl(ca_file, certificate_file, key_file, verify_peer);
options
}
}
pub trait ThreadedClient: Sync + Sized {
fn connect(host: &str, port: u16) -> Result<Self>;
fn connect_with_options(host: &str, port: u16, ClientOptions) -> Result<Self>;
fn with_uri(uri: &str) -> Result<Self>;
fn with_uri_and_options(uri: &str, options: ClientOptions) -> Result<Self>;
fn with_config(
config: ConnectionString,
options: Option<ClientOptions>,
description: Option<TopologyDescription>,
) -> Result<Self>;
fn db(&self, db_name: &str) -> Database;
fn db_with_prefs(
&self,
db_name: &str,
read_preference: Option<ReadPreference>,
write_concern: Option<WriteConcern>,
) -> Database;
fn acquire_stream(&self, read_pref: ReadPreference) -> Result<(PooledStream, bool, bool)>;
fn acquire_write_stream(&self) -> Result<PooledStream>;
fn get_req_id(&self) -> i32;
fn database_names(&self) -> Result<Vec<String>>;
fn drop_database(&self, db_name: &str) -> Result<()>;
fn is_master(&self) -> Result<bool>;
fn add_start_hook(&mut self, hook: fn(Client, &CommandStarted)) -> Result<()>;
fn add_completion_hook(&mut self, hook: fn(Client, &CommandResult)) -> Result<()>;
}
pub type Client = Arc<ClientInner>;
impl ThreadedClient for Client {
fn connect(host: &str, port: u16) -> Result<Client> {
let config = ConnectionString::new(host, port);
let mut description = TopologyDescription::new(StreamConnector::Tcp);
description.topology_type = TopologyType::Single;
Client::with_config(config, None, Some(description))
}
fn connect_with_options(host: &str, port: u16, options: ClientOptions) -> Result<Client> {
let config = ConnectionString::new(host, port);
let mut description = TopologyDescription::new(options.stream_connector.clone());
description.topology_type = TopologyType::Single;
Client::with_config(config, Some(options), Some(description))
}
fn with_uri(uri: &str) -> Result<Client> {
let config = try!(connstring::parse(uri));
Client::with_config(config, None, None)
}
fn with_uri_and_options(uri: &str, options: ClientOptions) -> Result<Client> {
let config = try!(connstring::parse(uri));
Client::with_config(config, Some(options), None)
}
fn with_config(
config: ConnectionString,
options: Option<ClientOptions>,
description: Option<TopologyDescription>,
) -> Result<Client> {
let client_options = options.unwrap_or_else(ClientOptions::new);
let rp = client_options.read_preference.unwrap_or_else(|| {
ReadPreference::new(ReadMode::Primary, None)
});
let wc = client_options.write_concern.unwrap_or_else(
WriteConcern::new,
);
let listener = Listener::new();
let file = match client_options.log_file {
Some(string) => {
let _ = listener.add_start_hook(log_command_started);
let _ = listener.add_completion_hook(log_command_completed);
Some(Mutex::new(try!(
OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(&string)
)))
}
None => None,
};
let client = Arc::new(ClientInner {
req_id: Arc::new(ATOMIC_ISIZE_INIT),
topology: try!(Topology::new(
config.clone(),
description,
client_options.stream_connector.clone(),
)),
listener: listener,
read_preference: rp,
write_concern: wc,
log_file: file,
});
{
let top_description = &client.topology.description;
let mut top = try!(top_description.write());
top.heartbeat_frequency_ms = client_options.heartbeat_frequency_ms;
top.server_selection_timeout_ms = client_options.server_selection_timeout_ms;
top.local_threshold_ms = client_options.local_threshold_ms;
for host in &config.hosts {
let server = Server::new(
client.clone(),
host.clone(),
top_description.clone(),
true,
client_options.stream_connector.clone(),
);
top.servers.insert(host.clone(), server);
}
}
Ok(client)
}
fn db(&self, db_name: &str) -> Database {
Database::open(self.clone(), db_name, None, None)
}
fn db_with_prefs(
&self,
db_name: &str,
read_preference: Option<ReadPreference>,
write_concern: Option<WriteConcern>,
) -> Database {
Database::open(self.clone(), db_name, read_preference, write_concern)
}
fn acquire_stream(
&self,
read_preference: ReadPreference,
) -> Result<(PooledStream, bool, bool)> {
self.topology.acquire_stream(read_preference)
}
fn acquire_write_stream(&self) -> Result<PooledStream> {
self.topology.acquire_write_stream()
}
fn get_req_id(&self) -> i32 {
self.req_id.fetch_add(1, Ordering::SeqCst) as i32
}
fn database_names(&self) -> Result<Vec<String>> {
let mut doc = bson::Document::new();
doc.insert("listDatabases", Bson::I32(1));
let db = self.db("admin");
let res = try!(db.command(doc, CommandType::ListDatabases, None));
if let Some(&Bson::Array(ref batch)) = res.get("databases") {
let map = batch
.iter()
.filter_map(|bdoc| {
if let Bson::Document(ref doc) = *bdoc {
if let Some(&Bson::String(ref name)) = doc.get("name") {
return Some(name.to_owned());
}
}
None
})
.collect();
return Ok(map);
}
Err(ResponseError(
String::from("Server reply does not contain 'databases'."),
))
}
fn drop_database(&self, db_name: &str) -> Result<()> {
let db = self.db(db_name);
try!(db.drop_database());
Ok(())
}
fn is_master(&self) -> Result<bool> {
let mut doc = bson::Document::new();
doc.insert("isMaster", Bson::I32(1));
let db = self.db("local");
let res = try!(db.command(doc, CommandType::IsMaster, None));
match res.get("ismaster") {
Some(&Bson::Boolean(is_master)) => Ok(is_master),
_ => Err(ResponseError(
String::from("Server reply does not contain 'ismaster'."),
)),
}
}
fn add_start_hook(&mut self, hook: fn(Client, &CommandStarted)) -> Result<()> {
self.listener.add_start_hook(hook)
}
fn add_completion_hook(&mut self, hook: fn(Client, &CommandResult)) -> Result<()> {
self.listener.add_completion_hook(hook)
}
}
fn log_command_started(client: Client, command_started: &CommandStarted) {
let mutex = match client.log_file {
Some(ref mutex) => mutex,
None => return,
};
let mut guard = match mutex.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let _ = writeln!(guard.deref_mut(), "{}", command_started);
}
fn log_command_completed(client: Client, command_result: &CommandResult) {
let mutex = match client.log_file {
Some(ref mutex) => mutex,
None => return,
};
let mut guard = match mutex.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let _ = writeln!(guard.deref_mut(), "{}", command_result);
}