use std::convert::From;
use std::fmt::Debug;
use std::net::{AddrParseError, SocketAddr};
use std::str::{FromStr};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use failure::Error as FailureError;
use futures::future::{ok, loop_fn, Either, Future, Loop};
use itertools::Itertools;
use serde::Deserialize;
use serde_json;
use serde_json::Value as SerdeJsonValue;
use slog::{error, info, debug, o, Drain, Key, Logger, Record, Serializer};
use slog::Result as SlogResult;
use slog::Value as SlogValue;
use tokio_zookeeper::*;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::timer::Delay;
use url::Url;
use cueball::backend::*;
use cueball::resolver::{
BackendAddedMsg,
BackendRemovedMsg,
BackendMsg,
Resolver
};
pub mod util;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const RECONNECT_DELAY: Duration = Duration::from_secs(10);
const RECONNECT_NODELAY: Duration = Duration::from_secs(0);
const WATCH_LOOP_DELAY: Duration = Duration::from_secs(10);
const WATCH_LOOP_NODELAY: Duration = Duration::from_secs(0);
#[derive(Clone, Debug, PartialEq)]
enum ResolverError {
InvalidZkJson,
InvalidZkData(ZkDataField),
MissingZkData(ZkDataField),
ConnectionPoolShutdown
}
impl ResolverError {
fn should_stop(&self) -> bool {
match self {
ResolverError::ConnectionPoolShutdown => true,
_ => false
}
}
}
#[derive(Clone, Debug, PartialEq)]
enum ZkDataField {
Ip,
Port,
PostgresUrl
}
#[derive(Debug)]
pub enum ZkConnectStringError {
EmptyString,
MalformedAddr
}
impl From<AddrParseError> for ZkConnectStringError {
fn from(_: AddrParseError) -> Self {
ZkConnectStringError::MalformedAddr
}
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct ZkConnectString(Vec<SocketAddr>);
impl ZkConnectString {
fn get_addr_at(&self, index: usize) -> Option<&SocketAddr> {
self.0.get(index)
}
}
impl ToString for ZkConnectString {
fn to_string(&self) -> String {
self
.0
.iter()
.map(|x| x.to_string())
.intersperse(String::from(","))
.collect()
}
}
impl FromStr for ZkConnectString {
type Err = ZkConnectStringError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Err(ZkConnectStringError::EmptyString);
}
let acc: Result<Vec<SocketAddr>, Self::Err> = Ok(vec![]);
s.split(',')
.map(|x| SocketAddr::from_str(x))
.fold(acc, |acc, x| {
match (acc, x) {
(Ok(mut addrs), Ok(addr)) => {
addrs.push(addr);
Ok(addrs)
},
(Err(e), _) => Err(e),
(_, Err(e)) => Err(ZkConnectStringError::from(e))
}
})
.and_then(|x| Ok(ZkConnectString(x)))
}
}
struct LogItem<T>(T) where T: Debug;
impl<T: Debug> SlogValue for LogItem<T> {
fn serialize(&self, _rec: &Record, key: Key,
serializer: &mut dyn Serializer) -> SlogResult {
serializer.emit_str(key, &format!("{:?}", self.0))
}
}
enum NextAction {
Reconnect(Duration),
Stop,
}
struct WatchLoopState {
watcher: Box<dyn futures::stream::Stream
<Item = WatchedEvent, Error = ()> + Send>,
curr_event: WatchedEvent,
delay: Duration
}
#[derive(Debug)]
pub struct ManateePrimaryResolver {
connect_string: ZkConnectString,
cluster_state_path: String,
last_backend: Arc<Mutex<Option<BackendKey>>>,
is_running: bool,
log: Logger
}
impl ManateePrimaryResolver {
pub fn new(
connect_string: ZkConnectString,
path: String,
log: Option<Logger>
) -> Self
{
let cluster_state_path = [&path, "/state"].concat();
let log = log.unwrap_or_else(||
Logger::root(slog_stdlog::StdLog.fuse(), o!()));
ManateePrimaryResolver {
connect_string: connect_string.clone(),
cluster_state_path,
last_backend: Arc::new(Mutex::new(None)),
is_running: false,
log
}
}
}
fn process_value(
pool_tx: &Sender<BackendMsg>,
new_value: &[u8],
last_backend: Arc<Mutex<Option<BackendKey>>>,
log: Logger
) -> Result<(), ResolverError> {
debug!(log, "process_value() entered");
let v: SerdeJsonValue = match serde_json::from_slice(&new_value) {
Ok(v) => v,
Err(_) => {
return Err(ResolverError::InvalidZkJson);
}
};
let ip = match &v["primary"]["ip"] {
SerdeJsonValue::String(s) => {
match BackendAddress::from_str(s) {
Ok(s) => s,
Err(_) => {
return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
}
}
},
SerdeJsonValue::Null => {
return Err(ResolverError::MissingZkData(ZkDataField::Ip));
},
_ => {
return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
}
};
let port = match &v["primary"]["pgUrl"] {
SerdeJsonValue::String(s) => {
match Url::parse(s) {
Ok(url) => {
match url.port() {
Some(port) => port,
None => {
return Err(ResolverError::MissingZkData(
ZkDataField::Port));
}
}
},
Err(_) => {
return Err(ResolverError::InvalidZkData(
ZkDataField::PostgresUrl))
}
}
},
SerdeJsonValue::Null => {
return Err(ResolverError::MissingZkData(ZkDataField::PostgresUrl));
},
_ => {
return Err(ResolverError::InvalidZkData(ZkDataField::PostgresUrl));
}
};
let backend = Backend::new(&ip, port);
let backend_key = srv_key(&backend);
let mut last_backend = last_backend.lock().unwrap();
let should_send = match (*last_backend).clone() {
Some(lb) => lb != backend_key,
None => true,
};
if should_send {
info!(log, "New backend found; sending to connection pool";
"backend" => LogItem(backend.clone()));
if pool_tx.send(BackendMsg::AddedMsg(BackendAddedMsg {
key: backend_key.clone(),
backend
})).is_err() {
return Err(ResolverError::ConnectionPoolShutdown);
}
let lb_clone = (*last_backend).clone();
*last_backend = Some(backend_key);
if let Some(lbc) = lb_clone {
info!(log, "Notifying connection pool of removal of old backend");
if pool_tx.send(BackendMsg::RemovedMsg(
BackendRemovedMsg(lbc))).is_err() {
return Err(ResolverError::ConnectionPoolShutdown);
}
}
} else {
info!(log, "New backend value does not differ; not sending");
}
debug!(log, "process_value() returned successfully");
Ok(())
}
fn watch_loop(
pool_tx: Sender<BackendMsg>,
last_backend: Arc<Mutex<Option<BackendKey>>>,
cluster_state_path: String,
zk: ZooKeeper,
loop_state: WatchLoopState,
log: Logger
) -> impl Future<Item = Loop<NextAction, WatchLoopState>,
Error = FailureError> + Send {
let watcher = loop_state.watcher;
let curr_event = loop_state.curr_event;
let delay = loop_state.delay;
let log = log.new(o!(
"curr_event" => LogItem(curr_event.clone()),
"delay" => LogItem(delay),
"last_backend" => LogItem(Arc::clone(&last_backend))
));
info!(log, "Getting data");
let oe_log = log.clone();
Delay::new(Instant::now() + delay)
.and_then(move |_| {
zk
.watch()
.get_data(&cluster_state_path)
.and_then(move |(_, data)| {
match curr_event.event_type {
WatchedEventType::None => {
match curr_event.keeper_state {
KeeperState::Disconnected |
KeeperState::AuthFailed |
KeeperState::Expired => {
error!(log, "Keeper state changed; reconnecting";
"keeper_state" =>
LogItem(curr_event.keeper_state));
return Either::A(ok(Loop::Break(
NextAction::Reconnect(RECONNECT_NODELAY))));
},
KeeperState::SyncConnected |
KeeperState::ConnectedReadOnly |
KeeperState::SaslAuthenticated => {
info!(log, "Keeper state changed"; "keeper_state" =>
LogItem(curr_event.keeper_state));
}
}
},
WatchedEventType::NodeDataChanged => {
if data.is_none() {
info!(log, "ZK data does not exist yet");
return Either::A(ok(Loop::Continue(WatchLoopState {
watcher,
curr_event,
delay: WATCH_LOOP_DELAY
})));
}
let data = data.unwrap().0;
info!(log, "got data"; "data" => LogItem(data.clone()));
match process_value(
&pool_tx.clone(),
&data,
Arc::clone(&last_backend),
log.clone()
) {
Ok(_) => {},
Err(e) => {
error!(log, ""; "error" => LogItem(e.clone()));
if e.should_stop() {
return Either::A(ok(Loop::Break(
NextAction::Stop)));
}
}
}
},
WatchedEventType::NodeDeleted => {
info!(log, "ZK node deleted");
return Either::A(ok(Loop::Continue(WatchLoopState {
watcher,
curr_event,
delay: WATCH_LOOP_DELAY
})));
},
e => panic!("Unexpected event received: {:?}", e)
};
info!(log, "Watching for change");
let oe_log = log.clone();
Either::B(watcher.into_future()
.and_then(move |(event, watcher)| {
let loop_next = match event {
Some(event) => {
info!(log, "change event received; looping to \
process event"; "event" =>
LogItem(event.clone()));
Loop::Continue(WatchLoopState {
watcher,
curr_event: event,
delay:WATCH_LOOP_NODELAY
})
},
None => {
error!(log, "Event stream closed; reconnecting");
Loop::Break(NextAction::Reconnect(
RECONNECT_NODELAY))
}
};
ok(loop_next)
})
.or_else(move |_| {
error!(oe_log, "Error received from event stream");
ok(Loop::Break(NextAction::Reconnect(RECONNECT_NODELAY)))
}))
})
.or_else(move |error| {
error!(oe_log, "Error getting data"; "error" => LogItem(error));
ok(Loop::Break(NextAction::Reconnect(RECONNECT_NODELAY)))
})
})
.map_err(|e| panic!("delay errored; err: {:?}", e))
}
fn connect_loop(
pool_tx: Sender<BackendMsg>,
last_backend: Arc<Mutex<Option<BackendKey>>>,
connect_string: ZkConnectString,
cluster_state_path: String,
delay: Duration,
log: Logger
) -> impl Future<Item = Loop<(), Duration>,
Error = ()> + Send {
let oe_log = log.clone();
Delay::new(Instant::now() + delay)
.and_then(move |_| {
info!(log, "Connecting to ZooKeeper cluster");
ZooKeeper::connect(connect_string.get_addr_at(0)
.expect("connect_string should have at least one IP address"))
.and_then(move |(zk, default_watcher)| {
info!(log, "Connected to ZooKeeper cluster");
loop_fn(WatchLoopState {
watcher: Box::new(default_watcher),
curr_event: WatchedEvent {
event_type: WatchedEventType::NodeDataChanged,
keeper_state: KeeperState::SyncConnected,
path: "".to_string(),
},
delay: WATCH_LOOP_NODELAY
} , move |loop_state| {
let pool_tx = pool_tx.clone();
let last_backend = Arc::clone(&last_backend);
let cluster_state_path = cluster_state_path.clone();
let zk = zk.clone();
let log = log.clone();
watch_loop(
pool_tx,
last_backend,
cluster_state_path,
zk,
loop_state,
log
)
})
.and_then(|next_action| {
ok(match next_action {
NextAction::Stop => Loop::Break(()),
NextAction::Reconnect(delay) => Loop::Continue(delay)
})
})
})
.or_else(move |error| {
error!(oe_log, "Error connecting to ZooKeeper cluster";
"error" => LogItem(error));
ok(Loop::Continue(RECONNECT_DELAY))
})
})
.map_err(|e| panic!("delay errored; err: {:?}", e))
}
impl Resolver for ManateePrimaryResolver {
fn run(&mut self, s: Sender<BackendMsg>) {
debug!(self.log, "run() method entered");
let mut rt = Runtime::new().unwrap();
self.is_running = true;
let connect_string = self.connect_string.clone();
let cluster_state_path = self.cluster_state_path.clone();
let pool_tx = s.clone();
let last_backend = Arc::clone(&self.last_backend);
let log = self.log.clone();
let at_log = self.log.clone();
info!(self.log, "run(): starting runtime");
rt.spawn(
loop_fn(Duration::from_secs(0), move |delay| {
let pool_tx = pool_tx.clone();
let last_backend = Arc::clone(&last_backend);
let connect_string = connect_string.clone();
let cluster_state_path = cluster_state_path.clone();
let log = log.clone();
connect_loop(
pool_tx,
last_backend,
connect_string,
cluster_state_path,
delay,
log
)
}).and_then(move |_| {
info!(at_log, "Event-processing task stopping");
Ok(())
})
.map(|_| ())
);
loop {
if s.send(BackendMsg::HeartbeatMsg).is_err() {
info!(self.log, "Connection pool channel closed");
break;
}
thread::sleep(HEARTBEAT_INTERVAL);
}
info!(self.log, "Stopping runtime");
rt.shutdown_now().wait().unwrap();
info!(self.log, "Runtime stopped successfully");
self.is_running = false;
debug!(self.log, "run() returned successfully");
}
}
#[cfg(test)]
mod test {
use super::*;
use std::iter;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, TryRecvError};
use std::vec::Vec;
use clap::crate_version;
use quickcheck::{quickcheck, Arbitrary, Gen};
use super::util;
impl Arbitrary for ZkConnectString {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let size = usize::arbitrary(g);
ZkConnectString(
iter::repeat(())
.map(|()| SocketAddr::arbitrary(g))
.take(size)
.collect()
)
}
}
quickcheck! {
fn prop_zk_connect_string_parse(
connect_string: ZkConnectString
) -> bool
{
match ZkConnectString::from_str(&connect_string.to_string()) {
Ok(cs) => cs == connect_string,
_ => connect_string.to_string() == ""
}
}
}
fn test_log() -> Logger {
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
Logger::root(
Mutex::new(slog_term::FullFormat::new(plain).build()).fuse(),
o!("build-id" => crate_version!()))
}
#[derive(Clone)]
struct BackendData {
raw: Vec<u8>,
object: Backend
}
impl BackendData {
fn new(ip: &str, port: u16) -> Self {
let raw = format!(r#" {{
"generation": 1,
"primary": {{
"id": "{ip}:{port}:12345",
"ip": "{ip}",
"pgUrl": "tcp://postgres@{ip}:{port}/postgres",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://{ip}:12345"
}},
"sync": {{
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": "10.77.77.21",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
}},
"async": [],
"deposed": [
{{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}}
],
"initWal": "0/16522D8"
}}"#, ip = ip, port = port).as_bytes().to_vec();
BackendData {
raw,
object: Backend::new(&BackendAddress::from_str(ip).unwrap(),
port)
}
}
fn raw(&self) -> Vec<u8> {
self.raw.clone()
}
fn key(&self) -> BackendKey {
srv_key(&self.object)
}
fn added_msg(&self) -> BackendAddedMsg {
BackendAddedMsg {
key: self.key(),
backend: self.object.clone()
}
}
fn removed_msg(&self) -> BackendRemovedMsg {
BackendRemovedMsg(self.key())
}
}
fn backend_ip1_port1() -> BackendData {
BackendData::new("10.77.77.28", 5432)
}
fn backend_ip1_port2() -> BackendData {
BackendData::new("10.77.77.28", 5431)
}
fn backend_ip2_port1() -> BackendData {
BackendData::new("10.77.77.21", 5432)
}
fn backend_ip2_port2() -> BackendData {
BackendData::new("10.77.77.21", 5431)
}
fn raw_invalid_json() -> Vec<u8> {
"foo".as_bytes().to_vec()
}
fn raw_no_ip() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"sync": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"pgUrl": "tcp://postgres@10.77.77.28:5432/postgres",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
fn raw_invalid_ip() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": "foo",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"sync": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"pgUrl": "tcp://postgres@10.77.77.28:5432/postgres",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
fn raw_wrong_type_ip() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": true,
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"sync": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"pgUrl": "tcp://postgres@10.77.77.28:5432/postgres",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
fn raw_no_pg_url() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"sync": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": "10.77.77.21",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
fn raw_invalid_pg_url() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"pgUrl": "foo",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"sync": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": "10.77.77.21",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
fn raw_wrong_type_pg_url() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"pgUrl": true,
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"sync": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": "10.77.77.21",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
fn raw_no_port_pg_url() -> Vec<u8> {
r#" {
"generation": 1,
"primary": {
"id": "10.77.77.28:5432:12345",
"ip": "10.77.77.28",
"pgUrl": "tcp://postgres@10.77.77.22/postgres",
"zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
"backupUrl": "http://10.77.77.28:12345"
},
"sync": {
"id": "10.77.77.21:5432:12345",
"zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
"ip": "10.77.77.21",
"pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
"backupUrl": "http://10.77.77.21:12345"
},
"async": [],
"deposed": [
{
"id":"10.77.77.22:5432:12345",
"ip": "10.77.77.22",
"pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
"zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
"backupUrl": "http://10.77.77.22:12345"
}
],
"initWal": "0/16522D8"
}
"#.as_bytes().to_vec()
}
struct ProcessValueFields {
value: Vec<u8>,
last_backend: BackendKey,
expected_error: Option<ResolverError>,
message_count: u32,
added_backend: Option<BackendAddedMsg>,
removed_backend: Option<BackendRemovedMsg>
}
fn run_process_value_fields(input: ProcessValueFields) {
let (tx, rx) = channel();
let last_backend = Arc::new(Mutex::new(Some(input.last_backend)));
let result = process_value(
&tx.clone(),
&input.value,
last_backend,
test_log());
match input.expected_error {
None => assert_eq!(result, Ok(())),
Some(expected_error) => {
assert_eq!(result, Err(expected_error))
}
}
let mut received_messages = Vec::new();
for i in 0..input.message_count {
let channel_result = rx.try_recv();
match channel_result {
Err(e) => panic!("Unexpected error receiving on channel: {:?} \
-- Loop iteration: {:?}", e, i),
Ok(result) => {
received_messages.push(result);
}
}
}
match rx.try_recv() {
Err(TryRecvError::Empty) => (),
_ => panic!("Unexpected message on resolver channel")
}
if let Some(msg) = input.added_backend {
let msg = BackendMsg::AddedMsg(msg);
match util::find_msg_match(&received_messages, &msg) {
None => panic!("added_backend not found in received messages"),
Some(index) => {
received_messages.remove(index);
()
}
}
}
if let Some(msg) = input.removed_backend {
let msg = BackendMsg::RemovedMsg(msg);
match util::find_msg_match(&received_messages, &msg) {
None =>
panic!("removed_backend not found in received messages"),
Some(index) => {
received_messages.remove(index);
()
}
}
}
}
#[test]
fn port_ip_change_test() {
let data_1 = backend_ip1_port1();
let data_2 = backend_ip2_port2();
run_process_value_fields(ProcessValueFields{
value: data_2.raw(),
last_backend: data_1.key(),
expected_error: None,
message_count: 2,
added_backend: Some(data_2.added_msg()),
removed_backend: Some(data_1.removed_msg())
});
}
#[test]
fn port_change_test() {
let data_1 = backend_ip1_port1();
let data_2 = backend_ip2_port1();
run_process_value_fields(ProcessValueFields{
value: data_2.raw(),
last_backend: data_1.key(),
expected_error: None,
message_count: 2,
added_backend: Some(data_2.added_msg()),
removed_backend: Some(data_1.removed_msg())
});
}
#[test]
fn ip_change_test() {
let data_1 = backend_ip1_port1();
let data_2 = backend_ip1_port2();
run_process_value_fields(ProcessValueFields{
value: data_2.raw(),
last_backend: data_1.key(),
expected_error: None,
message_count: 2,
added_backend: Some(data_2.added_msg()),
removed_backend: Some(data_1.removed_msg())
});
}
#[test]
fn no_change_test() {
let data = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: data.raw(),
last_backend: data.key(),
expected_error: None,
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn no_ip_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_no_ip(),
last_backend: filler.key(),
expected_error: Some(ResolverError::MissingZkData(ZkDataField::Ip)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn wrong_type_ip_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_wrong_type_ip(),
last_backend: filler.key(),
expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn invalid_ip_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_invalid_ip(),
last_backend: filler.key(),
expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn no_pg_url_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_no_pg_url(),
last_backend: filler.key(),
expected_error: Some(ResolverError::MissingZkData(
ZkDataField::PostgresUrl)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn wrong_type_pg_url_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_wrong_type_pg_url(),
last_backend: filler.key(),
expected_error: Some(ResolverError::InvalidZkData(
ZkDataField::PostgresUrl)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn invalid_pg_url_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_invalid_pg_url(),
last_backend: filler.key(),
expected_error: Some(ResolverError::InvalidZkData(
ZkDataField::PostgresUrl)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn no_port_pg_url_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_no_port_pg_url(),
last_backend: filler.key(),
expected_error: Some(ResolverError::MissingZkData(
ZkDataField::Port)),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
#[test]
fn invalid_json_test() {
let filler = backend_ip1_port1();
run_process_value_fields(ProcessValueFields{
value: raw_invalid_json(),
last_backend: filler.key(),
expected_error: Some(ResolverError::InvalidZkJson),
message_count: 0,
added_backend: None,
removed_backend: None
});
}
}