use {Client, Result};
use Error::{self, OperationError};
use bson::oid;
use connstring::Host;
use pool::{ConnectionPool, PooledStream};
use stream::StreamConnector;
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::Ordering;
use std::thread;
use super::monitor::{IsMasterResult, Monitor};
use super::TopologyDescription;
pub const ROUND_TRIP_DIVISOR: i64 = 5;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ServerType {
Standalone,
Mongos,
RSPrimary,
RSSecondary,
RSArbiter,
RSOther,
RSGhost,
Unknown,
}
#[derive(Clone, Debug)]
pub struct ServerDescription {
pub server_type: ServerType,
pub err: Arc<Option<Error>>,
pub round_trip_time: Option<i64>,
pub min_wire_version: i64,
pub max_wire_version: i64,
pub me: Option<Host>,
pub hosts: Vec<Host>,
pub passives: Vec<Host>,
pub arbiters: Vec<Host>,
pub tags: BTreeMap<String, String>,
pub set_name: String,
pub election_id: Option<oid::ObjectId>,
pub primary: Option<Host>,
pub set_version: Option<i64>,
}
#[derive(Clone)]
pub struct Server {
pub host: Host,
pub description: Arc<RwLock<ServerDescription>>,
pool: Arc<ConnectionPool>,
monitor: Arc<Monitor>,
}
impl FromStr for ServerType {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
Ok(match s {
"Standalone" => ServerType::Standalone,
"Mongos" => ServerType::Mongos,
"RSPrimary" => ServerType::RSPrimary,
"RSSecondary" => ServerType::RSSecondary,
"RSArbiter" => ServerType::RSArbiter,
"RSOther" => ServerType::RSOther,
"RSGhost" => ServerType::RSGhost,
_ => ServerType::Unknown,
})
}
}
impl Default for ServerDescription {
fn default() -> Self {
Self::new()
}
}
impl ServerDescription {
pub fn new() -> ServerDescription {
ServerDescription {
server_type: ServerType::Unknown,
err: Arc::new(None),
round_trip_time: None,
min_wire_version: 0,
max_wire_version: 0,
me: None,
hosts: Vec::new(),
passives: Vec::new(),
arbiters: Vec::new(),
tags: BTreeMap::new(),
set_name: String::new(),
election_id: None,
primary: None,
set_version: None,
}
}
pub fn update(&mut self, ismaster: IsMasterResult, round_trip_time: i64) {
if !ismaster.ok {
self.set_err(OperationError(
String::from("ismaster returned a not-ok response."),
));
return;
}
self.min_wire_version = ismaster.min_wire_version;
self.max_wire_version = ismaster.max_wire_version;
self.me = ismaster.me;
self.hosts = ismaster.hosts;
self.passives = ismaster.passives;
self.arbiters = ismaster.arbiters;
self.tags = ismaster.tags;
self.set_name = ismaster.set_name;
self.election_id = ismaster.election_id;
self.primary = ismaster.primary;
self.set_version = ismaster.set_version;
self.round_trip_time = match self.round_trip_time {
Some(old_rtt) => {
Some(
round_trip_time / ROUND_TRIP_DIVISOR +
(old_rtt / (ROUND_TRIP_DIVISOR)) * (ROUND_TRIP_DIVISOR - 1),
)
}
None => Some(round_trip_time),
};
let set_name_empty = self.set_name.is_empty();
let msg_empty = ismaster.msg.is_empty();
self.server_type = if msg_empty && set_name_empty && !ismaster.is_replica_set {
ServerType::Standalone
} else if !msg_empty {
ServerType::Mongos
} else if ismaster.is_master && !set_name_empty {
ServerType::RSPrimary
} else if ismaster.is_secondary && !set_name_empty && !ismaster.hidden {
ServerType::RSSecondary
} else if ismaster.arbiter_only && !set_name_empty {
ServerType::RSArbiter
} else if !set_name_empty {
ServerType::RSOther
} else if ismaster.is_replica_set {
ServerType::RSGhost
} else {
ServerType::Unknown
}
}
pub fn set_err(&mut self, err: Error) {
self.err = Arc::new(Some(err));
self.clear();
}
pub fn clear(&mut self) {
self.election_id = None;
self.round_trip_time = None;
self.server_type = ServerType::Unknown;
self.set_name = String::new();
}
}
impl Drop for Server {
fn drop(&mut self) {
self.monitor.running.store(false, Ordering::SeqCst);
}
}
impl Server {
pub fn new(
client: Client,
host: Host,
top_description: Arc<RwLock<TopologyDescription>>,
run_monitor: bool,
connector: StreamConnector,
) -> Server {
let description = Arc::new(RwLock::new(ServerDescription::new()));
let host_clone = host.clone();
let desc_clone = description.clone();
let pool = Arc::new(ConnectionPool::new(host.clone(), connector.clone()));
let monitor = Arc::new(Monitor::new(
client,
host_clone,
pool.clone(),
top_description,
desc_clone,
connector,
));
if run_monitor {
let monitor_clone = monitor.clone();
thread::spawn(move || { monitor_clone.run(); });
}
Server {
host: host,
pool: pool,
description: description.clone(),
monitor: monitor,
}
}
pub fn acquire_stream(&self, client: Client) -> Result<PooledStream> {
self.pool.acquire_stream(client)
}
pub fn request_update(&self) {
self.monitor.request_update();
}
}