pub mod server;
pub mod monitor;
use {Client, Result};
use Error::{self, ArgumentError, OperationError};
use bson::oid;
use common::{ReadPreference, ReadMode};
use connstring::{ConnectionString, Host};
use pool::PooledStream;
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::i64;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use time;
use self::server::{Server, ServerDescription, ServerType};
pub const DEFAULT_HEARTBEAT_FREQUENCY_MS: u32 = 10000;
pub const DEFAULT_LOCAL_THRESHOLD_MS: i64 = 15;
pub const DEFAULT_SERVER_SELECTION_TIMEOUT_MS: i64 = 30000;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TopologyType {
Single,
ReplicaSetNoPrimary,
ReplicaSetWithPrimary,
Sharded,
Unknown,
}
#[derive(Clone)]
pub struct TopologyDescription {
pub topology_type: TopologyType,
pub set_name: String,
pub servers: HashMap<Host, Server>,
pub heartbeat_frequency_ms: u32,
pub local_threshold_ms: i64,
pub server_selection_timeout_ms: i64,
max_election_id: Option<oid::ObjectId>,
compatible: bool,
max_set_version: Option<i64>,
compat_error: String,
}
#[derive(Clone)]
pub struct Topology {
pub config: ConnectionString,
pub description: Arc<RwLock<TopologyDescription>>,
}
impl FromStr for TopologyType {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
Ok(match s {
"Single" => TopologyType::Single,
"ReplicaSetNoPrimary" => TopologyType::ReplicaSetNoPrimary,
"ReplicaSetWithPrimary" => TopologyType::ReplicaSetWithPrimary,
"Sharded" => TopologyType::Sharded,
_ => TopologyType::Unknown,
})
}
}
impl TopologyDescription {
pub fn new() -> TopologyDescription {
TopologyDescription {
topology_type: TopologyType::Unknown,
set_name: String::new(),
heartbeat_frequency_ms: DEFAULT_HEARTBEAT_FREQUENCY_MS,
server_selection_timeout_ms: DEFAULT_SERVER_SELECTION_TIMEOUT_MS,
local_threshold_ms: DEFAULT_LOCAL_THRESHOLD_MS,
servers: HashMap::new(),
max_election_id: None,
compatible: true,
compat_error: String::new(),
max_set_version: None,
}
}
fn get_nearest_from_vec(&self, servers: &mut Vec<Host>) -> Result<(PooledStream, ServerType)> {
servers.sort_by(|a, b| {
let mut a_rtt = i64::MAX;
let mut b_rtt = i64::MAX;
if let Some(server) = self.servers.get(a) {
if let Ok(a_description) = server.description.read() {
a_rtt = a_description.round_trip_time.unwrap_or(i64::MAX);
}
}
if let Some(server) = self.servers.get(b) {
if let Ok(b_description) = server.description.read() {
b_rtt = b_description.round_trip_time.unwrap_or(i64::MAX);
}
}
a_rtt.cmp(&b_rtt)
});
for host in servers.iter() {
if let Some(server) = self.servers.get(host) {
if let Ok(description) = server.description.read() {
if description.round_trip_time.is_none() {
break;
} else if let Ok(stream) = server.acquire_stream() {
return Ok((stream, description.server_type));
}
}
}
}
Err(OperationError("No servers available for the provided ReadPreference.".to_owned()))
}
fn get_rand_from_vec(&self, servers: &mut Vec<Host>) -> Result<(PooledStream, ServerType)> {
while !servers.is_empty() {
let len = servers.len();
let index = thread_rng().gen_range(0, len);
if let Some(server) = self.servers.get(servers.get(index).unwrap()) {
if let Ok(stream) = server.acquire_stream() {
if let Ok(description) = server.description.read() {
return Ok((stream, description.server_type));
}
}
}
servers.remove(index);
}
Err(OperationError("No servers available for the provided ReadPreference.".to_owned()))
}
pub fn acquire_stream(&self, read_preference: &ReadPreference) -> Result<(PooledStream, bool, bool)> {
let (mut hosts, rand) = self.choose_hosts(&read_preference);
if self.topology_type != TopologyType::Sharded && self.topology_type != TopologyType::Single {
self.filter_hosts(&mut hosts, read_preference);
}
if hosts.is_empty() && read_preference.mode == ReadMode::SecondaryPreferred {
let mut read_pref = read_preference.clone();
read_pref.mode = ReadMode::PrimaryPreferred;
return self.acquire_stream(&read_pref);
}
if hosts.is_empty() {
for (_, server) in &self.servers {
server.request_update();
}
}
self.filter_latency_hosts(&mut hosts);
let (pooled_stream, server_type) = if rand {
try!(self.get_rand_from_vec(&mut hosts))
} else {
try!(self.get_nearest_from_vec(&mut hosts))
};
let (slave_ok, send_read_pref) = match self.topology_type {
TopologyType::Unknown => (false, false),
TopologyType::Single => match server_type {
ServerType::Mongos => {
match read_preference.mode {
ReadMode::Primary => (false, false),
ReadMode::Secondary => (true, true),
ReadMode::PrimaryPreferred => (true, true),
ReadMode::SecondaryPreferred => (true, !read_preference.tag_sets.is_empty()),
ReadMode::Nearest => (true, true),
}
},
_ => (true, false),
},
TopologyType::ReplicaSetWithPrimary | TopologyType::ReplicaSetNoPrimary => {
match read_preference.mode {
ReadMode::Primary => (false, false),
_ => (true, false),
}
},
TopologyType::Sharded => {
match read_preference.mode {
ReadMode::Primary => (false, false),
ReadMode::Secondary => (true, true),
ReadMode::PrimaryPreferred => (true, true),
ReadMode::SecondaryPreferred => (true, !read_preference.tag_sets.is_empty()),
ReadMode::Nearest => (true, true),
}
}
};
Ok((pooled_stream, slave_ok, send_read_pref))
}
pub fn acquire_write_stream(&self) -> Result<PooledStream> {
let (mut hosts, rand) = self.choose_write_hosts();
if hosts.is_empty() {
for (_, server) in &self.servers {
server.request_update();
}
}
if rand {
Ok(try!(self.get_rand_from_vec(&mut hosts)).0)
} else {
Ok(try!(self.get_nearest_from_vec(&mut hosts)).0)
}
}
pub fn filter_hosts(&self, hosts: &mut Vec<Host>, read_preference: &ReadPreference) {
let mut tag_filter = None;
if read_preference.tag_sets.is_empty() {
return;
}
for tags in &read_preference.tag_sets {
for ref host in hosts.iter() {
if let Some(server) = self.servers.get(host) {
let description = server.description.read().unwrap();
let mut valid = true;
for (key, ref val) in tags.iter() {
match description.tags.get(key) {
Some(ref v) => if val != v { valid = false; break },
None => { valid = false; break },
}
}
if valid {
tag_filter = Some(tags);
break;
}
}
}
if tag_filter.is_some() {
break;
}
}
match tag_filter {
None => {
if self.topology_type == TopologyType::ReplicaSetWithPrimary &&
(read_preference.mode == ReadMode::Primary ||
read_preference.mode == ReadMode::PrimaryPreferred) {
hosts.retain(|host| {
if let Some(server) = self.servers.get(host) {
let description = server.description.read().unwrap();
description.server_type == ServerType::RSPrimary
} else {
false
}
});
} else {
hosts.clear();
}
},
Some(tag_filter) => {
hosts.retain(|host| {
if let Some(server) = self.servers.get(host) {
let description = server.description.read().unwrap();
for (key, ref val) in tag_filter.iter() {
match description.tags.get(key) {
Some(ref v) => if val != v { return false; },
None => return false,
}
}
true
} else {
false
}
});
}
}
}
pub fn filter_latency_hosts(&self, hosts: &mut Vec<Host>) {
if hosts.len() <= 1 {
return;
}
let shortest_rtt = hosts.iter().fold({
if let Some(server) = self.servers.get(hosts.get(0).unwrap()) {
if let Ok(description) = server.description.read() {
description.round_trip_time.unwrap_or(i64::MAX)
} else {
i64::MAX
}
} else {
i64::MAX
}
}, |acc, host| {
if let Some(server) = self.servers.get(&host) {
if let Ok(description) = server.description.read() {
let item_rtt = description.round_trip_time.unwrap_or(i64::MAX);
if acc < item_rtt {
return acc;
} else {
return item_rtt;
}
}
}
acc
});
if shortest_rtt == i64::MAX {
return;
}
let high_rtt = shortest_rtt + self.local_threshold_ms;
hosts.retain(|host| {
if let Some(server) = self.servers.get(&host) {
if let Ok(description) = server.description.read() {
let rtt = description.round_trip_time.unwrap_or(i64::MAX);
return shortest_rtt <= rtt && rtt <= high_rtt;
}
}
false
});
}
pub fn choose_write_hosts(&self) -> (Vec<Host>, bool) {
if self.servers.is_empty() {
return (Vec::new(), true);
}
match self.topology_type {
TopologyType::Unknown => (Vec::new(), true),
TopologyType::Single => (self.servers.keys().map(|host| host.clone()).collect(), true),
TopologyType::Sharded => (self.servers.keys().map(|host| host.clone()).collect(), false),
_ => (self.servers.keys().filter_map(|host| {
if let Some(server) = self.servers.get(host) {
if let Ok(description) = server.description.read() {
if description.server_type == ServerType::RSPrimary {
return Some(host.clone());
}
}
}
None
}).collect(), true)
}
}
pub fn choose_hosts(&self, read_preference: &ReadPreference) -> (Vec<Host>, bool) {
if self.servers.is_empty() {
return (Vec::new(), true);
}
match self.topology_type {
TopologyType::Unknown => (Vec::new(), true),
TopologyType::Single => (self.servers.keys().map(|host| host.clone()).collect(), true),
TopologyType::Sharded => (self.servers.keys().map(|host| host.clone()).collect(), false),
_ => {
if read_preference.mode == ReadMode::Nearest {
return (self.servers.keys().map(|host| host.clone()).collect(), false);
}
let mut primaries = Vec::new();
let mut secondaries = Vec::new();
for (host, server) in &self.servers {
let stype = server.description.read().unwrap().server_type;
match stype {
ServerType::RSPrimary => primaries.push(host.clone()),
ServerType::RSSecondary => secondaries.push(host.clone()),
_ => (),
}
}
match read_preference.mode {
ReadMode::Primary => (primaries, true),
ReadMode::PrimaryPreferred => {
let servers = if !primaries.is_empty() { primaries } else { secondaries };
(servers, true)
},
ReadMode::Secondary => (secondaries, true),
ReadMode::SecondaryPreferred => {
let servers = if !secondaries.is_empty() { secondaries } else { primaries };
(servers, true)
},
ReadMode::Nearest => (self.servers.keys().map(|host| host.clone()).collect(), false),
}
}
}
}
pub fn update_without_monitor(&mut self, host: Host, description: ServerDescription,
client: Client, top_arc: Arc<RwLock<TopologyDescription>>) {
self.update_private(host, description, client, top_arc, false);
}
pub fn update(&mut self, host: Host, description: ServerDescription,
client: Client, top_arc: Arc<RwLock<TopologyDescription>>) {
self.update_private(host, description, client, top_arc, true);
}
fn update_private(&mut self, host: Host, description: ServerDescription,
client: Client, top_arc: Arc<RwLock<TopologyDescription>>, run_monitor: bool) {
let stype = description.server_type;
match self.topology_type {
TopologyType::Unknown => {
match stype {
ServerType::Standalone => self.update_unknown_with_standalone(host),
ServerType::Mongos => self.topology_type = TopologyType::Sharded,
ServerType::RSPrimary => self.update_rs_from_primary(host, description, client, top_arc, run_monitor),
ServerType::RSSecondary |
ServerType::RSArbiter |
ServerType::RSOther => self.update_rs_without_primary(host, description, client, top_arc, run_monitor),
_ => (),
}
},
TopologyType::ReplicaSetNoPrimary => {
match stype {
ServerType::Standalone | ServerType::Mongos => {
self.servers.remove(&host);
self.check_if_has_primary();
},
ServerType::RSPrimary => self.update_rs_from_primary(host, description, client, top_arc, run_monitor),
ServerType::RSSecondary |
ServerType::RSArbiter |
ServerType::RSOther => self.update_rs_without_primary(host, description, client, top_arc, run_monitor),
_ => self.check_if_has_primary(),
}
},
TopologyType::ReplicaSetWithPrimary => {
match stype {
ServerType::Standalone | ServerType::Mongos => {
self.servers.remove(&host);
self.check_if_has_primary();
},
ServerType::RSPrimary => self.update_rs_from_primary(host, description, client, top_arc, run_monitor),
ServerType::RSSecondary |
ServerType::RSArbiter |
ServerType::RSOther => self.update_rs_with_primary_from_member(host, description),
_ => self.check_if_has_primary(),
}
},
TopologyType::Sharded => {
match stype {
ServerType::Unknown | ServerType::Mongos => (),
_ => { self.servers.remove(&host); },
}
},
TopologyType::Single => (),
}
}
fn check_if_has_primary(&mut self) {
for (_, server) in &self.servers {
let stype = server.description.read().unwrap().server_type;
if stype == ServerType::RSPrimary {
self.topology_type = TopologyType::ReplicaSetWithPrimary;
return;
}
}
self.topology_type = TopologyType::ReplicaSetNoPrimary;
}
fn update_unknown_with_standalone(&mut self, host: Host) {
if !self.servers.contains_key(&host) {
return;
}
if self.servers.len() == 1 {
self.topology_type = TopologyType::Single;
} else {
self.servers.remove(&host);
}
}
fn update_rs_from_primary(&mut self, host: Host, description: ServerDescription,
client: Client, top_arc: Arc<RwLock<TopologyDescription>>, run_monitor: bool) {
if !self.servers.contains_key(&host) {
return;
}
if self.set_name.is_empty() {
self.set_name = description.set_name.to_owned();
} else if self.set_name != description.set_name {
self.servers.remove(&host);
self.check_if_has_primary();
return;
}
if description.set_version.is_some() && description.election_id.is_some() {
if self.max_set_version.is_some() && self.max_election_id.is_some() &&
(self.max_set_version.unwrap() > description.set_version.unwrap() ||
(self.max_set_version.unwrap() == description.set_version.unwrap() &&
self.max_election_id.as_ref().unwrap() > description.election_id.as_ref().unwrap())) {
if let Some(server) = self.servers.get(&host) {
{
let mut server_description = server.description.write().unwrap();
server_description.server_type = ServerType::Unknown;
server_description.set_name = String::new();
server_description.election_id = None;
}
}
self.check_if_has_primary();
return;
} else {
self.max_election_id = description.election_id.clone();
}
}
if description.set_version.is_some() && (self.max_set_version.is_none() || description.set_version.unwrap() > self.max_set_version.unwrap()) {
self.max_set_version = description.set_version;
}
for (top_host, server) in &self.servers {
if *top_host != host {
let mut server_description = server.description.write().unwrap();
if server_description.server_type == ServerType::RSPrimary {
server_description.server_type = ServerType::Unknown;
server_description.set_name = String::new();
server_description.election_id = None;
}
}
}
self.add_missing_hosts(&description, client, top_arc, run_monitor);
let mut hosts_to_remove = Vec::new();
for (host, _) in &self.servers {
if !description.hosts.contains(&host) &&
!description.passives.contains(&host) &&
!description.arbiters.contains(&host) {
hosts_to_remove.push(host.clone());
}
}
for host in hosts_to_remove {
self.servers.remove(&host);
}
self.check_if_has_primary();
}
fn update_rs_without_primary(&mut self, host: Host, description: ServerDescription,
client: Client, top_arc: Arc<RwLock<TopologyDescription>>, run_monitor: bool) {
self.topology_type = TopologyType::ReplicaSetNoPrimary;
if !self.servers.contains_key(&host) {
return;
}
if self.set_name.is_empty() {
self.set_name = description.set_name.to_owned();
} else if self.set_name != description.set_name {
self.servers.remove(&host);
self.check_if_has_primary();
return;
}
self.add_missing_hosts(&description, client, top_arc, run_monitor);
if let Some(me) = description.me {
if host != me {
self.servers.remove(&host);
self.check_if_has_primary();
}
}
}
fn update_rs_with_primary_from_member(&mut self, host: Host, description: ServerDescription) {
if !self.servers.contains_key(&host) {
return;
}
if self.set_name != description.set_name {
self.servers.remove(&host);
}
if let Some(me) = description.me {
if host != me {
self.servers.remove(&host);
}
return;
}
self.check_if_has_primary();
}
fn add_missing_hosts(&mut self, description: &ServerDescription, client: Client,
top_arc: Arc<RwLock<TopologyDescription>>, run_monitor: bool) {
for host in &description.hosts {
if !self.servers.contains_key(host) {
let server = Server::new(client.clone(), host.clone(), top_arc.clone(), run_monitor);
self.servers.insert(host.clone(), server);
}
}
for host in &description.passives {
if !self.servers.contains_key(host) {
let server = Server::new(client.clone(), host.clone(), top_arc.clone(), run_monitor);
self.servers.insert(host.clone(), server);
}
}
for host in &description.arbiters {
if !self.servers.contains_key(host) {
let server = Server::new(client.clone(), host.clone(), top_arc.clone(), run_monitor);
self.servers.insert(host.clone(), server);
}
}
}
}
impl Topology {
pub fn new(config: ConnectionString, description: Option<TopologyDescription>) -> Result<Topology> {
let mut options = description.unwrap_or(TopologyDescription::new());
if config.hosts.len() > 1 && options.topology_type == TopologyType::Single {
return Err(ArgumentError(
"TopologyType::Single cannot be used with multiple seeds.".to_owned()));
}
if let Some(ref config_opts) = config.options {
if let Some(name) = config_opts.options.get("replicaSet") {
options.set_name = name.to_owned();
options.topology_type = TopologyType::ReplicaSetNoPrimary;
}
}
if !options.set_name.is_empty() && options.topology_type != TopologyType::ReplicaSetNoPrimary {
return Err(ArgumentError(
"TopologyType must be ReplicaSetNoPrimary if set_name is provided.".to_owned()));
}
let top_description = Arc::new(RwLock::new(options));
Ok(Topology {
config: config,
description: top_description,
})
}
fn acquire_stream_private(&self, read_preference: Option<ReadPreference>, write: bool) -> Result<(PooledStream, bool, bool)> {
let time = time::get_time();
let start_ms = time.sec * 1000 + (time.nsec as i64) / 1000000;
loop {
{
let description = try!(self.description.read());
let result = if write {
match description.acquire_write_stream() {
Ok(stream) => Ok((stream, false, false)),
Err(err) => Err(err),
}
} else {
description.acquire_stream(read_preference.as_ref().unwrap())
};
match result {
Ok(stream) => return Ok(stream),
Err(err) => {
let end_time = time::get_time();
let end_ms = end_time.sec * 1000 + (end_time.nsec as i64) / 1000000;
if end_ms - start_ms >= description.server_selection_timeout_ms {
return Err(err)
}
},
}
}
thread::sleep(Duration::from_millis(500));
}
}
pub fn acquire_stream(&self, read_preference: ReadPreference) -> Result<(PooledStream, bool, bool)> {
self.acquire_stream_private(Some(read_preference), false)
}
pub fn acquire_write_stream(&self) -> Result<PooledStream> {
let (stream, _, _) = try!(self.acquire_stream_private(None, true));
Ok(stream)
}
}