cueball-manatee-primary-resolver 0.3.0

An implementation of the cueball Resolver trait that is specific to the Joyent manatee project. It queries a zookeeper cluster to determine the PostgreSQL replication primary from a set of PostgreSQL replication peers.
Documentation
//
// Copyright 2019 Joyent, Inc.
//
// THEORY STATEMENT -- READ THIS FIRST!
//
// This library has just one task: watch a zookeeper node for changes and notify
// users of the library when changes occur. This is accomplished in one big pair
// of nested loops. The code structure looks a little like this:
//
// ManateePrimaryResolver::run()
//   -> Spawns tokio task
//     -> Runs outer loop, which handles connecting/reconnecting to zookeeper.
//        The logic here is in the connect_loop() function.
//        -> Runs inner loop, which handles setting/resetting the zookeeper
//           watch. The logic here is in the watch_loop() function.
//           -> For every change in zookeeper data, calls the process_value()
//              function. This function parses the new zookeeper data and
//              notifies the user of the change in data if necessary.
//
// Note that this file also contains unit tests for process_value().
//

use std::convert::From;
use std::fmt::Debug;
use std::net::{AddrParseError, SocketAddr};
use std::str::{FromStr};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use failure::Error as FailureError;
use futures::future::{ok, loop_fn, Either, Future, Loop};
use itertools::Itertools;
use serde::Deserialize;
use serde_json;
use serde_json::Value as SerdeJsonValue;
use slog::{error, info, debug, o, Drain, Key, Logger, Record, Serializer};
use slog::Result as SlogResult;
use slog::Value as SlogValue;
use tokio_zookeeper::*;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::timer::Delay;
use url::Url;

use cueball::backend::*;
use cueball::resolver::{
    BackendAddedMsg,
    BackendRemovedMsg,
    BackendMsg,
    Resolver
};

pub mod util;

//
// The interval at which the resolver should send heartbeats via the
// connection pool channel.
//
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

//
// Delays to be used when reconnecting to the zookeeper client
//
const RECONNECT_DELAY: Duration = Duration::from_secs(10);
const RECONNECT_NODELAY: Duration = Duration::from_secs(0);

//
// Delays to be used when re-setting the watch on the zookeeper node
//
const WATCH_LOOP_DELAY: Duration = Duration::from_secs(10);
const WATCH_LOOP_NODELAY: Duration = Duration::from_secs(0);

//
// An error type to be used internally.
//
#[derive(Clone, Debug, PartialEq)]
enum ResolverError {
    InvalidZkJson,
    InvalidZkData(ZkDataField),
    MissingZkData(ZkDataField),
    ConnectionPoolShutdown
}

impl ResolverError {
    ///
    /// This function provides a means of determining whether or not a given
    /// error should cause the resolver to stop.
    ///
    fn should_stop(&self) -> bool {
        match self {
            ResolverError::ConnectionPoolShutdown => true,
            _ => false
        }
    }
}

#[derive(Clone, Debug, PartialEq)]
enum ZkDataField {
    Ip,
    Port,
    PostgresUrl
}

#[derive(Debug)]
pub enum ZkConnectStringError {
    EmptyString,
    MalformedAddr
}

impl From<AddrParseError> for ZkConnectStringError {
    fn from(_: AddrParseError) -> Self {
        ZkConnectStringError::MalformedAddr
    }
}

///
/// `ZkConnectString` represents a list of zookeeper addresses to connect to.
///
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct ZkConnectString(Vec<SocketAddr>);

impl ZkConnectString {
    ///
    /// Gets a reference to the SocketAddr at the provided index. Returns None
    /// if the index is out of bounds.
    ///
    fn get_addr_at(&self, index: usize) -> Option<&SocketAddr> {
        self.0.get(index)
    }
}

impl ToString for ZkConnectString {
    fn to_string(&self) -> String {
        self
            .0
            .iter()
            .map(|x| x.to_string())
            .intersperse(String::from(","))
            .collect()
    }
}

impl FromStr for ZkConnectString {
    type Err = ZkConnectStringError;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        if s.is_empty() {
            return Err(ZkConnectStringError::EmptyString);
        }
        let acc: Result<Vec<SocketAddr>, Self::Err> = Ok(vec![]);
        s.split(',')
            .map(|x| SocketAddr::from_str(x))
            .fold(acc, |acc, x| {
                match (acc, x) {
                    (Ok(mut addrs), Ok(addr)) => {
                        addrs.push(addr);
                        Ok(addrs)
                    },
                    (Err(e), _) => Err(e),
                    (_, Err(e)) => Err(ZkConnectStringError::from(e))
                }
            })
            .and_then(|x| Ok(ZkConnectString(x)))
    }
}

///
/// A serializable type to be used in log entries. Wraps around any type that
/// implements Debug and uses the Debug representation of the type as the
/// serialized output.
///
struct LogItem<T>(T) where T: Debug;

impl<T: Debug> SlogValue for LogItem<T> {
    fn serialize(&self, _rec: &Record, key: Key,
        serializer: &mut dyn Serializer) -> SlogResult {
            serializer.emit_str(key, &format!("{:?}", self.0))
    }
}

// Represents an action to be taken in the event of a connection error.
enum NextAction {
    //
    // The Duration field is the amount of time to wait before
    // reconnecting.
    //
    Reconnect(Duration),
    Stop,
}

//
// Encapsulates the state that one iteration of the watch loop passes
// to the next iteration.
//
struct WatchLoopState {
    watcher: Box<dyn futures::stream::Stream
        <Item = WatchedEvent, Error = ()> + Send>,
    curr_event: WatchedEvent,
    delay: Duration
}

#[derive(Debug)]
pub struct ManateePrimaryResolver {
    ///
    /// The addresses of the Zookeeper cluster the Resolver is connecting to
    ///
    connect_string: ZkConnectString,
    ///
    /// The Zookeeper path for manatee cluster state for the shard. *e.g.*
    /// "/manatee/1.moray.coal.joyent.us/state"
    ///
    cluster_state_path: String,
    ///
    /// The key representation of the last backend sent to the cueball
    /// connection pool. Persists across multiple calls to run().
    ///
    last_backend: Arc<Mutex<Option<BackendKey>>>,
    ///
    /// Indicates whether or not the resolver is running. This is slightly
    /// superfluous (this field is `true` for exactly the duration of each
    /// call to run(), and false otherwise), but could be useful if the caller
    /// wants to check if the resolver is running for some reason.
    ///
    is_running: bool,
    ///
    /// The ManateePrimaryResolver's root log
    ///
    log: Logger
}

impl ManateePrimaryResolver {
    ///
    /// Creates a new ManateePrimaryResolver instance.
    ///
    /// # Arguments
    ///
    /// * `connect_string` - a comma-separated list of the zookeeper instances
    ///   in the cluster
    /// * `path` - The path to the root node in zookeeper for the shard we're
    ///    watching
    ///
    pub fn new(
        connect_string: ZkConnectString,
        path: String,
        log: Option<Logger>
    ) -> Self
    {
        let cluster_state_path = [&path, "/state"].concat();

        //
        // Add the log_values to the passed-in logger, or create a new logger if
        // the caller did not pass one in
        //
        let log = log.unwrap_or_else(||
            Logger::root(slog_stdlog::StdLog.fuse(), o!()));

        ManateePrimaryResolver {
            connect_string: connect_string.clone(),
            cluster_state_path,
            last_backend: Arc::new(Mutex::new(None)),
            is_running: false,
            log
        }
    }
}

///
/// Parses the given zookeeper node data into a Backend object, compares it to
/// the last Backend sent to the cueball connection pool, and sends it to the
/// connection pool if the values differ.
///
/// We need to extract two pieces of data from the "primary" json object in the
/// json returned by zookeeper:
/// * The backend's IP address
/// * The backend's port
/// The json object has an "ip" field, but not a port field. However, the port
/// is contained in the "pgUrl" field, so this function extracts it from that.
/// The port could also be extracted from the "id" field, but the pgUrl field is
/// a convenient choice as it can be parsed structurally as a url and the port
/// extracted from there.
///
/// What this all means is: the resolver relies on the "primary.ip" and
/// "primary.pgUrl" fields as an _interface_ to the zookeeper data. This feels a
/// little ad-hoc and should be formalized and documented.
///
/// # Arguments
///
/// * `pool_tx` - The Sender upon which to send the update message
/// * `new_value` - The raw zookeeper data we've newly retrieved
/// * `last_backend` - The last Backend we sent to the connection pool
/// * `log` - The Logger to be used for logging
///
fn process_value(
    pool_tx: &Sender<BackendMsg>,
    new_value: &[u8],
    last_backend: Arc<Mutex<Option<BackendKey>>>,
    log: Logger
) -> Result<(), ResolverError> {
    debug!(log, "process_value() entered");

    // Parse the bytes into a json object
    let v: SerdeJsonValue = match serde_json::from_slice(&new_value) {
        Ok(v) => v,
        Err(_) => {
            return Err(ResolverError::InvalidZkJson);
        }
    };

    //
    // Parse out the ip. We expect the json fields to exist, and return an error
    // if they don't, or if they are of the wrong type.
    //
    let ip = match &v["primary"]["ip"] {
        SerdeJsonValue::String(s) => {
            match BackendAddress::from_str(s) {
                Ok(s) => s,
                Err(_) => {
                    return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
                }
            }
        },
        SerdeJsonValue::Null => {
            return Err(ResolverError::MissingZkData(ZkDataField::Ip));
        },
        _ => {
            return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
        }
    };

    //
    // Parse out the port. We expect the json fields to exist, and return an
    // error if they don't, or if they are of the wrong type.
    //
    let port = match &v["primary"]["pgUrl"] {
        SerdeJsonValue::String(s) => {
            match Url::parse(s) {
                Ok(url) => {
                    match url.port() {
                        Some(port) => port,
                        None => {
                            return Err(ResolverError::MissingZkData(
                                ZkDataField::Port));
                        }
                    }
                },
                Err(_) => {
                    return Err(ResolverError::InvalidZkData(
                        ZkDataField::PostgresUrl))
                }
            }
        },
        SerdeJsonValue::Null => {
            return Err(ResolverError::MissingZkData(ZkDataField::PostgresUrl));
        },
        _ => {
            return Err(ResolverError::InvalidZkData(ZkDataField::PostgresUrl));
        }
    };

    // Construct a backend and key
    let backend = Backend::new(&ip, port);
    let backend_key = srv_key(&backend);

    // Determine whether we need to send the new backend over
    let mut last_backend = last_backend.lock().unwrap();
    let should_send = match (*last_backend).clone() {
        Some(lb) => lb != backend_key,
        None => true,
    };

    // Send the new backend if necessary
    if should_send {
        info!(log, "New backend found; sending to connection pool";
            "backend" => LogItem(backend.clone()));
        if pool_tx.send(BackendMsg::AddedMsg(BackendAddedMsg {
            key: backend_key.clone(),
            backend
        })).is_err() {
            return Err(ResolverError::ConnectionPoolShutdown);
        }

        let lb_clone = (*last_backend).clone();
        *last_backend = Some(backend_key);

        //
        // Notify the connection pool that the old backend should be
        // removed, if the old backend is not None
        //
        if let Some(lbc) = lb_clone {
            info!(log, "Notifying connection pool of removal of old backend");
            if pool_tx.send(BackendMsg::RemovedMsg(
                BackendRemovedMsg(lbc))).is_err() {
                return Err(ResolverError::ConnectionPoolShutdown);
            }
        }
    } else {
        info!(log, "New backend value does not differ; not sending");
    }
    debug!(log, "process_value() returned successfully");
    Ok(())
}

///
/// This function represents the body of the watch loop. It both sets and
/// handles the watch, and calls process_value() to send new data to the
/// connection pool as the data arrives.
///
/// This function can return from two states: before we've waited for the
///  watch to fire (if we hit an error before waiting), or after we've waited
/// for the watch to fire (this could be a success or an error). These two
/// states require returning different Future types, so we wrap the returned
/// values in a future::Either to satisfy the type checker.
///
/// # Arguments
///
/// * `pool_tx` - The Sender upon which to send the update message
/// * `last_backend` - The last Backend we sent to the connection pool
/// * `cluster_state_path` - The path to the cluster state node in zookeeper for
///   the shard we're watching
/// * `zk` - The ZooKeeper client object
/// * `loop_state`: The state to be passed to the next iteration of loop_fn()
/// * `log` - The Logger to be used for logging
///
fn watch_loop(
    pool_tx: Sender<BackendMsg>,
    last_backend: Arc<Mutex<Option<BackendKey>>>,
    cluster_state_path: String,
    zk: ZooKeeper,
    loop_state: WatchLoopState,
    log: Logger
) -> impl Future<Item = Loop<NextAction, WatchLoopState>,
    Error = FailureError> + Send {

    let watcher = loop_state.watcher;
    let curr_event = loop_state.curr_event;
    let delay = loop_state.delay;
    //
    // TODO avoid mutex boilerplate from showing up in the log
    //
    let log = log.new(o!(
        "curr_event" => LogItem(curr_event.clone()),
        "delay" => LogItem(delay),
        "last_backend" => LogItem(Arc::clone(&last_backend))
    ));

    info!(log, "Getting data");
    let oe_log = log.clone();

    //
    // We set the watch here. If the previous iteration of the loop ended
    // because the keeper state changed rather than because the watch fired, the
    // watch will already have been set, so we don't _need_ to set it here. With
    // that said, it does no harm (zookeeper deduplicates watches on the server
    // side), and it may not be worth the effort to optimize for this case,
    // since keeper state changes (and, indeed, changes of any sort) should
    // happen infrequently.
    //
    Delay::new(Instant::now() + delay)
    .and_then(move |_| {
        zk
        .watch()
        .get_data(&cluster_state_path)
        .and_then(move |(_, data)| {
            match curr_event.event_type {
                // Keeper state has changed
                WatchedEventType::None => {
                    match curr_event.keeper_state {
                        //
                        // TODO will these cases ever happen? Because if the
                        // keeper state is "bad", then get_data() will have
                        // failed and we won't be here.
                        //
                        KeeperState::Disconnected |
                        KeeperState::AuthFailed |
                        KeeperState::Expired => {
                            error!(log, "Keeper state changed; reconnecting";
                                "keeper_state" =>
                                LogItem(curr_event.keeper_state));
                            return Either::A(ok(Loop::Break(
                                NextAction::Reconnect(RECONNECT_NODELAY))));
                        },
                        KeeperState::SyncConnected |
                        KeeperState::ConnectedReadOnly |
                        KeeperState::SaslAuthenticated => {
                            info!(log, "Keeper state changed"; "keeper_state" =>
                                LogItem(curr_event.keeper_state));
                        }
                    }
                },
                // The data watch fired
                WatchedEventType::NodeDataChanged => {
                    //
                    // We didn't get the data, which means the node doesn't
                    // exist yet. We should wait a bit and try again. We'll just
                    // use the same event as before.
                    //
                    if data.is_none() {
                        info!(log, "ZK data does not exist yet");
                        return Either::A(ok(Loop::Continue(WatchLoopState {
                            watcher,
                            curr_event,
                            delay: WATCH_LOOP_DELAY
                        })));
                    }
                    //
                    // Discard the Stat from the data, as we don't use it.
                    //
                    let data = data.unwrap().0;
                    info!(log, "got data"; "data" => LogItem(data.clone()));
                    match process_value(
                        &pool_tx.clone(),
                        &data,
                        Arc::clone(&last_backend),
                        log.clone()
                    ) {
                        Ok(_) => {},
                        Err(e) => {
                            error!(log, ""; "error" => LogItem(e.clone()));
                            //
                            // The error is between the client and the
                            // outward-facing channel, not between the client
                            // and the zookeeper connection, so we don't have to
                            // attempt to reconnect here and can continue,
                            // unless the error tells us to stop.
                            //
                            if e.should_stop() {
                                return Either::A(ok(Loop::Break(
                                    NextAction::Stop)));
                            }
                        }
                    }
                },
                WatchedEventType::NodeDeleted => {
                    //
                    // Same behavior as the above case where we didn't get the
                    // data because the node doesn't exist. See comment above.
                    //
                    info!(log, "ZK node deleted");
                    return Either::A(ok(Loop::Continue(WatchLoopState {
                        watcher,
                        curr_event,
                        delay: WATCH_LOOP_DELAY
                    })));
                },
                e => panic!("Unexpected event received: {:?}", e)
            };

            //
            // If we got here, we're waiting for the watch to fire. Before this
            // point, we wrap the return value in Either::A. After this point,
            // we wrap the return value in Either::B. See the comment about
            // "Either" some lines above.
            //
            info!(log, "Watching for change");
            let oe_log = log.clone();
            Either::B(watcher.into_future()
                .and_then(move |(event, watcher)| {
                    let loop_next = match event {
                        Some(event) => {
                            info!(log, "change event received; looping to \
                                process event"; "event" =>
                                LogItem(event.clone()));
                            Loop::Continue(WatchLoopState {
                                watcher,
                                curr_event: event,
                                delay:WATCH_LOOP_NODELAY
                            })
                        },
                        //
                        // If we didn't get a valid event, this means the Stream
                        // got closed, which indicates a connection issue, so we
                        // reconnect.
                        //
                        None => {
                            error!(log, "Event stream closed; reconnecting");
                            Loop::Break(NextAction::Reconnect(
                                RECONNECT_NODELAY))
                        }
                    };
                    ok(loop_next)
                })
                .or_else(move |_| {
                    //
                    // If we get an error from the event Stream, we assume that
                    // something went wrong with the zookeeper connection and
                    // attempt to reconnect.
                    //
                    // The stream's error type is (), so there's no information
                    // to extract from it.
                    //
                    error!(oe_log, "Error received from event stream");
                    ok(Loop::Break(NextAction::Reconnect(RECONNECT_NODELAY)))
                }))
        })
        //
        // If some error occurred getting the data, we assume we should
        // reconnect to the zookeeper server.
        //
        .or_else(move |error| {
            error!(oe_log, "Error getting data"; "error" => LogItem(error));
            ok(Loop::Break(NextAction::Reconnect(RECONNECT_NODELAY)))
        })
    })
    .map_err(|e| panic!("delay errored; err: {:?}", e))
}

fn connect_loop(
    pool_tx: Sender<BackendMsg>,
    last_backend: Arc<Mutex<Option<BackendKey>>>,
    connect_string: ZkConnectString,
    cluster_state_path: String,
    delay: Duration,
    log: Logger
) -> impl Future<Item = Loop<(), Duration>,
    Error = ()> + Send {

    let oe_log = log.clone();
    Delay::new(Instant::now() + delay)
    .and_then(move |_| {
        info!(log, "Connecting to ZooKeeper cluster");
        //
        // We expect() the result of get_addr_at() because we anticipate the
        // connect string having at least one element, and we can't do anything
        // useful if it doesn't.
        //
        ZooKeeper::connect(connect_string.get_addr_at(0)
        .expect("connect_string should have at least one IP address"))
        .and_then(move |(zk, default_watcher)| {
            info!(log, "Connected to ZooKeeper cluster");

            //
            // Main change-watching loop. A new loop iteration means we're
            // setting a new watch (if necessary) and waiting for a result.
            // Breaking from the loop means that we've hit some error and are
            // returning control to the outer loop.
            //
            // Arg: WatchLoopState -- we set curr_event to an artificially
            //     constructed WatchedEvent for the first loop iteration, so the
            //     connection pool will be initialized with the initial primary
            //     as its backend.
            // Loop::Break type: NextAction -- this value is used to instruct
            //     the outer loop (this function) whether to try to reconnect or
            //     terminate.
            //
            loop_fn(WatchLoopState {
                watcher: Box::new(default_watcher),
                curr_event: WatchedEvent {
                    event_type: WatchedEventType::NodeDataChanged,
                    //
                    // This initial artificial keeper_state doesn't necessarily
                    // reflect reality, but that's ok because it's paired with
                    // an artificial NodeDataChanged event, and our handling for
                    // this type of event doesn't involve the keeper_state
                    // field.
                    //
                    keeper_state: KeeperState::SyncConnected,
                    //
                    // We never use `path`, so we might as well set it to an
                    // empty string in our artificially constructed WatchedEvent
                    // object.
                    //
                    path: "".to_string(),
                },
                delay: WATCH_LOOP_NODELAY
            } , move |loop_state| {
                //
                // These fields require a new clone for every loop iteration,
                // but they don't actually change from iteration to iteration,
                // so they're not included as part of loop_state.
                //
                let pool_tx = pool_tx.clone();
                let last_backend = Arc::clone(&last_backend);
                let cluster_state_path = cluster_state_path.clone();
                let zk = zk.clone();
                let log = log.clone();

                watch_loop(
                    pool_tx,
                    last_backend,
                    cluster_state_path,
                    zk,
                    loop_state,
                    log
                )
            })
            .and_then(|next_action| {
                ok(match next_action {
                    NextAction::Stop => Loop::Break(()),
                    //
                    // We reconnect immediately here instead of waiting, because
                    // if we're here it means that we came from the inner loop
                    // and thus we just had a valid connection terminate (as
                    // opposed to the `or_else` block below, were we've just
                    // tried to connect and failed), and thus there's no reason
                    // for us to delay trying to connect again.
                    //
                    NextAction::Reconnect(delay) => Loop::Continue(delay)
                })
            })
        })
        .or_else(move |error| {
            error!(oe_log, "Error connecting to ZooKeeper cluster";
                "error" => LogItem(error));
            ok(Loop::Continue(RECONNECT_DELAY))
        })
    })
    .map_err(|e| panic!("delay errored; err: {:?}", e))
}

impl Resolver for ManateePrimaryResolver {

    //
    // The resolver object is not Sync, so we can assume that only one instance
    // of this function is running at once, because callers will have to control
    // concurrent access.
    //
    // If the connection pool closes the receiving end of the channel, this
    // function may not return right away -- this function will not notice that
    // the pool has disconnected until this function tries to send another
    // heartbeat, at which point this function will return. This means that the
    // time between disconnection and function return is at most the length of
    // HEARTBEAT_INTERVAL. Any change in the meantime will be picked up by the
    // next call to run().
    //
    // Indeed, the heartbeat messages exist solely as a time-boxed method to
    // test whether the connection pool has closed the channel, so we don't leak
    // resolver threads.
    //
    fn run(&mut self, s: Sender<BackendMsg>) {
        debug!(self.log, "run() method entered");

        let mut rt = Runtime::new().unwrap();
        //
        // There's no need to check if the pool is already running and return
        // early, because multiple instances of this function _cannot_ be
        // running concurrently -- see this function's header comment.
        //
        self.is_running = true;

        //
        // Variables moved to tokio runtime thread:
        //
        // * `connect_string` - A comma-separated list of the zookeeper
        //   instances in the cluster
        // * `cluster_state_path` - The path to the cluster state node in
        //   zookeeper for the shard we're watching
        // * `pool_tx` - The Sender that this function should use to communicate
        //   with the cueball connection pool
        // * `last_backend` - The key representation of the last backend sent to
        //   the cueball connection pool. It will be updated by process_value()
        //   if we send a new backend over.
        // * log - A clone of the resolver's master log
        // * at_log - Another clone, used in the `and_then` portion of the loop
        //
        let connect_string = self.connect_string.clone();
        let cluster_state_path = self.cluster_state_path.clone();
        let pool_tx = s.clone();
        let last_backend = Arc::clone(&self.last_backend);
        let log = self.log.clone();
        let at_log = self.log.clone();

        //
        // Start the event-processing task. This is structured as two nested
        // loops: one to handle the zookeeper connection and one to handle
        // setting the watch. These are handled by the connect_loop() and
        // watch_loop() functions, respectively
        //
        info!(self.log, "run(): starting runtime");
        rt.spawn(
            //
            // Outer loop. Handles connecting to zookeeper. A new loop iteration
            // means a new zookeeper connection. We break from the loop if we
            // discover that the user has closed the receiving channel, which
            // is their sole means of stopping the client.
            //
            // Arg: Time to wait before attempting to connect. Initially 0s.
            //     Repeated iterations of the loop set a delay before
            //     connecting.
            // Loop::Break type: ()
            //
            loop_fn(Duration::from_secs(0), move |delay| {
                let pool_tx = pool_tx.clone();
                let last_backend = Arc::clone(&last_backend);
                let connect_string = connect_string.clone();
                let cluster_state_path = cluster_state_path.clone();
                let log = log.clone();

                connect_loop(
                    pool_tx,
                    last_backend,
                    connect_string,
                    cluster_state_path,
                    delay,
                    log
                )
            }).and_then(move |_| {
                info!(at_log, "Event-processing task stopping");
                Ok(())
            })
            .map(|_| ())
        );
        loop {
            if s.send(BackendMsg::HeartbeatMsg).is_err() {
                info!(self.log, "Connection pool channel closed");
                break;
            }
            thread::sleep(HEARTBEAT_INTERVAL);
        }
        info!(self.log, "Stopping runtime");
        //
        // We shut down the background watch-looping thread. It may have already
        // exited by itself if it noticed that the connection pool closed its
        // channel, but there's no harm still calling shutdown_now() in that
        // case.
        //
        rt.shutdown_now().wait().unwrap();
        info!(self.log, "Runtime stopped successfully");
        self.is_running = false;
        debug!(self.log, "run() returned successfully");
    }
}

//
// Unit tests
//
#[cfg(test)]
mod test {
    use super::*;

    use std::iter;
    use std::sync::{Arc, Mutex};
    use std::sync::mpsc::{channel, TryRecvError};
    use std::vec::Vec;

    use clap::crate_version;
    use quickcheck::{quickcheck, Arbitrary, Gen};

    use super::util;

    impl Arbitrary for ZkConnectString {
        fn arbitrary<G: Gen>(g: &mut G) -> Self {
            let size = usize::arbitrary(g);
            ZkConnectString(
                iter::repeat(())
                    .map(|()| SocketAddr::arbitrary(g))
                    .take(size)
                    .collect()
            )
        }
    }

    //
    // Test parsing ZkConnectString from string
    //
    quickcheck! {
        fn prop_zk_connect_string_parse(
            connect_string: ZkConnectString
        ) -> bool
        {
            //
            // We expect an error only if the input string was zero-length
            //
            match ZkConnectString::from_str(&connect_string.to_string()) {
                Ok(cs) => cs == connect_string,
                _ => connect_string.to_string() == ""
            }
        }
    }

    // Below: test process_value()

    fn test_log() -> Logger {
        let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
        Logger::root(
            Mutex::new(slog_term::FullFormat::new(plain).build()).fuse(),
            o!("build-id" => crate_version!()))
    }

    #[derive(Clone)]
    struct BackendData {
        raw: Vec<u8>,
        object: Backend
    }

    impl BackendData {
        //
        // Most of the data here isn't relevant, but real json from zookeeper
        // will include it, so we include it here.
        //
        fn new(ip: &str, port: u16) -> Self {
            let raw = format!(r#" {{
                "generation": 1,
                "primary": {{
                    "id": "{ip}:{port}:12345",
                    "ip": "{ip}",
                    "pgUrl": "tcp://postgres@{ip}:{port}/postgres",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://{ip}:12345"
                }},
                "sync": {{
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": "10.77.77.21",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                }},
                "async": [],
                "deposed": [
                    {{
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }}
                ],
                "initWal": "0/16522D8"
            }}"#, ip = ip, port = port).as_bytes().to_vec();

            BackendData {
                raw,
                object: Backend::new(&BackendAddress::from_str(ip).unwrap(),
                    port)
            }
        }

        fn raw(&self) -> Vec<u8> {
            self.raw.clone()
        }

        fn key(&self) -> BackendKey {
            srv_key(&self.object)
        }

        fn added_msg(&self) -> BackendAddedMsg {
            BackendAddedMsg {
                key: self.key(),
                backend: self.object.clone()
            }
        }

        fn removed_msg(&self) -> BackendRemovedMsg {
            BackendRemovedMsg(self.key())
        }
    }

    fn backend_ip1_port1() -> BackendData {
        BackendData::new("10.77.77.28", 5432)
    }

    fn backend_ip1_port2() -> BackendData {
        BackendData::new("10.77.77.28", 5431)
    }

    fn backend_ip2_port1() -> BackendData {
        BackendData::new("10.77.77.21", 5432)
    }

    fn backend_ip2_port2() -> BackendData {
        BackendData::new("10.77.77.21", 5431)
    }

    fn raw_invalid_json() -> Vec<u8> {
        "foo".as_bytes().to_vec()
    }

    fn raw_no_ip() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "sync": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "pgUrl": "tcp://postgres@10.77.77.28:5432/postgres",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    fn raw_invalid_ip() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": "foo",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "sync": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "pgUrl": "tcp://postgres@10.77.77.28:5432/postgres",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    fn raw_wrong_type_ip() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": true,
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "sync": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "pgUrl": "tcp://postgres@10.77.77.28:5432/postgres",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    fn raw_no_pg_url() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "sync": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": "10.77.77.21",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    fn raw_invalid_pg_url() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "pgUrl": "foo",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "sync": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": "10.77.77.21",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    fn raw_wrong_type_pg_url() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "pgUrl": true,
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "sync": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": "10.77.77.21",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    fn raw_no_port_pg_url() -> Vec<u8> {
        r#" {
                "generation": 1,
                "primary": {
                    "id": "10.77.77.28:5432:12345",
                    "ip": "10.77.77.28",
                    "pgUrl": "tcp://postgres@10.77.77.22/postgres",
                    "zoneId": "f47c4766-1857-4bdc-97f0-c1fd009c955b",
                    "backupUrl": "http://10.77.77.28:12345"
                },
                "sync": {
                    "id": "10.77.77.21:5432:12345",
                    "zoneId": "f8727df9-c639-4152-a861-c77a878ca387",
                    "ip": "10.77.77.21",
                    "pgUrl": "tcp://postgres@10.77.77.21:5432/postgres",
                    "backupUrl": "http://10.77.77.21:12345"
                },
                "async": [],
                "deposed": [
                    {
                        "id":"10.77.77.22:5432:12345",
                        "ip": "10.77.77.22",
                        "pgUrl": "tcp://postgres@10.77.77.22:5432/postgres",
                        "zoneId": "c7a64f9f-4d49-4e6b-831a-68fd6ebf1d3c",
                        "backupUrl": "http://10.77.77.22:12345"
                    }
                ],
                "initWal": "0/16522D8"
            }
        "#.as_bytes().to_vec()
    }

    //
    // Represents a process_value test case, including inputs and expected
    // outputs.
    //
    struct ProcessValueFields {
        value: Vec<u8>,
        last_backend: BackendKey,
        expected_error: Option<ResolverError>,
        message_count: u32,
        added_backend: Option<BackendAddedMsg>,
        removed_backend: Option<BackendRemovedMsg>
    }

    //
    // Run a process_value test case
    //
    fn run_process_value_fields(input: ProcessValueFields) {
        let (tx, rx) = channel();
        let last_backend = Arc::new(Mutex::new(Some(input.last_backend)));

        let result = process_value(
            &tx.clone(),
            &input.value,
            last_backend,
            test_log());
        match input.expected_error {
            None => assert_eq!(result, Ok(())),
            Some(expected_error) => {
                assert_eq!(result, Err(expected_error))
            }
        }

        let mut received_messages = Vec::new();

        // Receive as many messages as we expect
        for i in 0..input.message_count {
            let channel_result = rx.try_recv();
            match channel_result {
                Err(e) => panic!("Unexpected error receiving on channel: {:?} \
                    -- Loop iteration: {:?}", e, i),
                Ok(result) => {
                    received_messages.push(result);
                }
            }
        }

        //
        // Make sure there are not more messages than we expect on the channel.
        // Can't use assert_eq! here because BackendMsg doesn't implement Debug.
        //
        match rx.try_recv() {
            Err(TryRecvError::Empty) => (),
            _ => panic!("Unexpected message on resolver channel")
        }

        // Check that the "added" message was received if applicable
        if let Some(msg) = input.added_backend {
            let msg = BackendMsg::AddedMsg(msg);
            match util::find_msg_match(&received_messages, &msg) {
                None => panic!("added_backend not found in received messages"),
                Some(index) => {
                    received_messages.remove(index);
                    ()
                }
            }
        }

        // Check that the "removed" message was received if applicable
        if let Some(msg) = input.removed_backend {
            let msg = BackendMsg::RemovedMsg(msg);
            match util::find_msg_match(&received_messages, &msg) {
                None =>
                    panic!("removed_backend not found in received messages"),
                Some(index) => {
                    received_messages.remove(index);
                    ()
                }
            }
        }
    }

    #[test]
    fn port_ip_change_test() {
        let data_1 = backend_ip1_port1();
        let data_2 = backend_ip2_port2();

        run_process_value_fields(ProcessValueFields{
            value: data_2.raw(),
            last_backend: data_1.key(),
            expected_error: None,
            message_count: 2,
            added_backend: Some(data_2.added_msg()),
            removed_backend: Some(data_1.removed_msg())
        });
    }

    #[test]
    fn port_change_test() {
        let data_1 = backend_ip1_port1();
        let data_2 = backend_ip2_port1();

        run_process_value_fields(ProcessValueFields{
            value: data_2.raw(),
            last_backend: data_1.key(),
            expected_error: None,
            message_count: 2,
            added_backend: Some(data_2.added_msg()),
            removed_backend: Some(data_1.removed_msg())
        });
    }

    #[test]
    fn ip_change_test() {
        let data_1 = backend_ip1_port1();
        let data_2 = backend_ip1_port2();

        run_process_value_fields(ProcessValueFields{
            value: data_2.raw(),
            last_backend: data_1.key(),
            expected_error: None,
            message_count: 2,
            added_backend: Some(data_2.added_msg()),
            removed_backend: Some(data_1.removed_msg())
        });
    }

    #[test]
    fn no_change_test() {
        let data = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: data.raw(),
            last_backend: data.key(),
            expected_error: None,
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn no_ip_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_no_ip(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::MissingZkData(ZkDataField::Ip)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn wrong_type_ip_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_wrong_type_ip(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn invalid_ip_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_invalid_ip(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn no_pg_url_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_no_pg_url(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::MissingZkData(
                ZkDataField::PostgresUrl)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn wrong_type_pg_url_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_wrong_type_pg_url(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::InvalidZkData(
                ZkDataField::PostgresUrl)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn invalid_pg_url_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_invalid_pg_url(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::InvalidZkData(
                ZkDataField::PostgresUrl)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn no_port_pg_url_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_no_port_pg_url(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::MissingZkData(
                ZkDataField::Port)),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }

    #[test]
    fn invalid_json_test() {
        let filler = backend_ip1_port1();

        run_process_value_fields(ProcessValueFields{
            value: raw_invalid_json(),
            last_backend: filler.key(),
            expected_error: Some(ResolverError::InvalidZkJson),
            message_count: 0,
            added_backend: None,
            removed_backend: None
        });
    }
 }