Skip to main content

cueball_manatee_primary_resolver/
lib.rs

1//
2// Copyright 2020 Joyent, Inc.
3//
4// THEORY STATEMENT -- READ THIS FIRST!
5//
6// This library has just one task: watch a zookeeper node for changes and notify
7// users of the library when changes occur. This is accomplished in one big pair
8// of nested loops. The code structure looks a little like this:
9//
10// ManateePrimaryResolver::run()
11//   -> Spawns tokio task
12//     -> Runs outer loop, which handles connecting/reconnecting to zookeeper.
13//        The logic here is in the connect_loop() function.
14//        -> Runs inner loop, which handles setting/resetting the zookeeper
15//           watch. The logic here is in the watch_loop() function.
16//           -> For every change in zookeeper data, calls the process_value()
17//              function. This function parses the new zookeeper data and
18//              notifies the user of the change in data if necessary.
19//
20// Note that this file also contains unit tests for process_value().
21//
22
23use std::convert::From;
24use std::default::Default;
25use std::fmt::Debug;
26use std::net::{AddrParseError, SocketAddr};
27use std::str::FromStr;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::mpsc::{channel, RecvTimeoutError, Sender};
30use std::sync::{Arc, Mutex};
31use std::thread;
32use std::time::{Duration, Instant};
33
34use backoff::backoff::Backoff;
35use backoff::default::{MAX_INTERVAL_MILLIS, MULTIPLIER};
36use backoff::ExponentialBackoff;
37use clap::{crate_name, crate_version};
38use failure::Error as FailureError;
39use futures::future::{loop_fn, ok, Either, Future, Loop};
40use futures::stream::Stream;
41use itertools::Itertools;
42use lazy_static::lazy_static;
43use rand;
44use rand::Rng;
45use serde::Deserialize;
46use serde_json;
47use serde_json::Value as SerdeJsonValue;
48use slog::Result as SlogResult;
49use slog::Value as SlogValue;
50use slog::{
51    debug, error, info, o, Drain, Key, LevelFilter, Logger, Record, Serializer,
52};
53use tokio::prelude::*;
54use tokio::runtime::Runtime;
55use tokio::timer::timeout::Error as TimeoutError;
56use tokio::timer::Delay;
57use tokio_zookeeper::{
58    KeeperState, WatchedEvent, WatchedEventType, ZooKeeper, ZooKeeperBuilder,
59};
60use url::Url;
61
62use cueball::backend::{self, Backend, BackendAddress, BackendKey};
63use cueball::resolver::{
64    BackendAddedMsg, BackendMsg, BackendRemovedMsg, Resolver,
65};
66
67pub mod common;
68
69//
70// The interval at which the resolver should send heartbeats via the
71// connection pool channel. Public for use in tests.
72//
73pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
74
75//
76// Timeout for zookeeper sessions.
77//
78const SESSION_TIMEOUT: Duration = Duration::from_secs(5);
79
80//
81// Timeout for the initial tcp connect operation to zookeeper.
82//
83const TCP_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
84
85//
86// To be used when we don't want to wait between loop iterations.
87//
88const NO_DELAY: Duration = Duration::from_secs(0);
89
90lazy_static! {
91    //
92    // The maximum Duration that next_backoff() can return (when using the
93    // default backoff parameters). Public for use in tests.
94    //
95    pub static ref MAX_BACKOFF_INTERVAL: Duration = Duration::from_millis(
96        (MAX_INTERVAL_MILLIS as f64 * MULTIPLIER).ceil() as u64,
97    );
98
99    //
100    // The amount of time that must elapse after successful connection without
101    // an error occurring in order for the resolver state to be considered
102    // stable, at which point the backoff state is reset.
103    //
104    // We choose the threshold based on MAX_BACKOFF_INTERVAL because if the
105    // threshold were smaller, a given backoff interval could be bigger than the
106    // threshold, so we would prematurely reset the backoff before the operation
107    // even got a chance to try again. The threshold could be bigger, but what's
108    // the point in that?
109    //
110    // We add a little slack so the backoff doesn't get
111    // reset just before a (possibly failing) reconnect attempt is made.
112    //
113    static ref BACKOFF_RESET_THRESHOLD: Duration =
114        *MAX_BACKOFF_INTERVAL + Duration::from_secs(1);
115}
116
117//
118// An error type to be used internally.
119//
120#[derive(Clone, Debug, PartialEq)]
121enum ResolverError {
122    InvalidZkJson,
123    InvalidZkData(ZkDataField),
124    MissingZkData(ZkDataField),
125    ConnectionPoolShutdown,
126}
127
128impl ResolverError {
129    ///
130    /// This function provides a means of determining whether or not a given
131    /// error should cause the resolver to stop.
132    ///
133    fn should_stop(&self) -> bool {
134        match self {
135            ResolverError::ConnectionPoolShutdown => true,
136            _ => false,
137        }
138    }
139}
140
141#[derive(Clone, Debug, PartialEq)]
142enum ZkDataField {
143    Ip,
144    Port,
145    PostgresUrl,
146}
147
148#[derive(Debug)]
149pub enum ZkConnectStringError {
150    EmptyString,
151    MalformedAddr,
152}
153
154impl From<AddrParseError> for ZkConnectStringError {
155    fn from(_: AddrParseError) -> Self {
156        ZkConnectStringError::MalformedAddr
157    }
158}
159
160///
161/// `ZkConnectString` represents a list of zookeeper addresses to connect to.
162///
163#[derive(Debug, Clone, PartialEq, Deserialize)]
164pub struct ZkConnectString(Vec<SocketAddr>);
165
166impl ZkConnectString {
167    ///
168    /// Gets a reference to the SocketAddr at the provided index. Returns None
169    /// if the index is out of bounds.
170    ///
171    fn get_addr_at(&self, index: usize) -> Option<SocketAddr> {
172        self.0.get(index).cloned()
173    }
174
175    ///
176    /// Returns the number of addresses in the ZkConnectString
177    ///
178    fn len(&self) -> usize {
179        self.0.len()
180    }
181}
182
183impl ToString for ZkConnectString {
184    fn to_string(&self) -> String {
185        self.0
186            .iter()
187            .map(|x| x.to_string())
188            .intersperse(String::from(","))
189            .collect()
190    }
191}
192
193impl FromStr for ZkConnectString {
194    type Err = ZkConnectStringError;
195
196    fn from_str(s: &str) -> Result<Self, Self::Err> {
197        if s.is_empty() {
198            return Err(ZkConnectStringError::EmptyString);
199        }
200        let acc: Result<Vec<SocketAddr>, Self::Err> = Ok(vec![]);
201        s.split(',')
202            .map(|x| SocketAddr::from_str(x))
203            .fold(acc, |acc, x| match (acc, x) {
204                (Ok(mut addrs), Ok(addr)) => {
205                    addrs.push(addr);
206                    Ok(addrs)
207                }
208                (Err(e), _) => Err(e),
209                (_, Err(e)) => Err(ZkConnectStringError::from(e)),
210            })
211            .and_then(|x| Ok(ZkConnectString(x)))
212    }
213}
214
215//
216// Encapsulates a ZkConnectString with some bookkeeping to keep track of which
217// address the resolver should attempt to connect to next, and how many
218// connection attempts have failed in a row. Provides methods for getting the
219// next address, resetting the number of failed attempts, and checking if
220// the resolver should wait before trying to connect again.
221//
222#[derive(Debug, Clone)]
223struct ZkConnectStringState {
224    conn_str: ZkConnectString,
225    curr_idx: usize,
226    conn_attempts: usize,
227}
228
229impl ZkConnectStringState {
230    fn new(conn_str: ZkConnectString) -> Self {
231        let mut rng = rand::thread_rng();
232        let idx: usize = rng.gen_range(0, conn_str.len());
233
234        ZkConnectStringState {
235            conn_str,
236            curr_idx: idx,
237            conn_attempts: 0,
238        }
239    }
240
241    fn next_addr(&mut self) -> SocketAddr {
242        let ret = self
243            .conn_str
244            .get_addr_at(self.curr_idx)
245            .expect("connect string access out of bounds");
246        self.curr_idx += 1;
247        self.curr_idx %= self.conn_str.len();
248        self.conn_attempts += 1;
249        ret
250    }
251
252    fn reset_attempts(&mut self) {
253        self.conn_attempts = 0;
254    }
255
256    fn should_wait(&self) -> bool {
257        self.conn_attempts == self.conn_str.len()
258    }
259}
260
261///
262/// A serializable type to be used in log entries. Wraps around any type that
263/// implements Debug and uses the Debug representation of the type as the
264/// serialized output.
265///
266struct LogItem<T>(T)
267where
268    T: Debug;
269
270impl<T: Debug> SlogValue for LogItem<T> {
271    fn serialize(
272        &self,
273        _rec: &Record,
274        key: Key,
275        serializer: &mut dyn Serializer,
276    ) -> SlogResult {
277        serializer.emit_str(key, &format!("{:?}", self.0))
278    }
279}
280
281// Represents an action to be taken in the event of a connection error.
282enum NextAction {
283    //
284    // The Duration field is the amount of time to wait before reconnecting.
285    //
286    Reconnect(Duration),
287    Stop,
288}
289
290//
291// Encapsulates the state that changes between iterations of watch_loop().
292//
293struct WatchLoopState {
294    watcher: Box<dyn Stream<Item = WatchedEvent, Error = ()> + Send>,
295    curr_event: WatchedEvent,
296    delay: Duration,
297}
298
299//
300// Encapsulates a backoff object and adds the notion of a time threshold that
301// must be reached before the connection is considered stable and the backoff
302// state is reset. This threshold is managed automatically using a background
303// thread -- users can just call ResolverBackoff::next_backoff() normally.
304//
305struct ResolverBackoff {
306    backoff: Arc<Mutex<ExponentialBackoff>>,
307    //
308    // Internal channel used for indicating that an error has occurred to the
309    // stability-tracking thread
310    //
311    error_tx: Sender<ResolverBackoffMsg>,
312    log: Logger,
313}
314
315//
316// For use by ResolverBackoff object.
317//
318enum ResolverBackoffMsg {
319    //
320    // The Duration is the duration of the backoff in response to the error.
321    //
322    ErrorOccurred(Duration),
323}
324
325//
326// For use by ResolverBackoff object.
327//
328enum ResolverBackoffState {
329    Stable,
330    //
331    // The Duration is the duration of the backoff in response to the error.
332    //
333    Unstable(Duration),
334}
335
336impl ResolverBackoff {
337    fn new(log: Logger) -> Self {
338        let mut backoff = ExponentialBackoff::default();
339        //
340        // We'd rather the resolver not give up trying to reconnect, so we
341        // set the max_elapsed_time to `None` so next_backoff() always returns
342        // a valid interval.
343        //
344        backoff.max_elapsed_time = None;
345
346        let (error_tx, error_rx) = channel();
347        let backoff = Arc::new(Mutex::new(backoff));
348
349        //
350        // Start the stability-tracking thread.
351        //
352        //
353        // The waiting period for stability starts from the _time of successful
354        // connection_. The time of successful connection, if it occurs at all,
355        // will always be equal to (time of last error + duration of resulting
356        // backoff). Thus, from time of last error, we must wait (duration of
357        // resulting backoff + stability threshold) in order for the connection
358        // to be considered stable. Thus, we pass the backoff duration from
359        // next_backoff to this thread so the thread can figure out how long to
360        // wait for stability.
361        //
362        // Note that this thread doesn't actually _know_ if the connect
363        // operation succeeds, because we only touch the ResolverBackoff
364        // object when an error occurs. However, we can assume that it succeeds
365        // when waiting for stability, because, if the connect operation fails,
366        // this thread will receive another error and restart the wait period.
367        //
368        // If the connect operation takes longer than BACKOFF_RESET_THRESHOLD to
369        // complete and then fails, we'll reset the backoff erroneously. This
370        // situation is highly unlikely, so we'll cross that bridge when we come
371        // to it.
372        //
373        let backoff_clone = Arc::clone(&backoff);
374        let thread_log = log.clone();
375        debug!(log, "spawning stability-tracking thread");
376        thread::spawn(move || {
377            let mut state = ResolverBackoffState::Stable;
378            loop {
379                match state {
380                    ResolverBackoffState::Stable => {
381                        //
382                        // Wait for an error to happen
383                        //
384                        debug!(thread_log, "backoff stable; waiting for error");
385                        match error_rx.recv() {
386                            //
387                            // * zero days since last accident *
388                            //
389                            Ok(ResolverBackoffMsg::ErrorOccurred(
390                                new_backoff,
391                            )) => {
392                                info!(
393                                    thread_log,
394                                    "error received; backoff transitioning to \
395                                     unstable state"
396                                );
397                                state =
398                                    ResolverBackoffState::Unstable(new_backoff);
399                                continue;
400                            }
401                            //
402                            // ResolverBackoff object was dropped, so we exit
403                            // the thread
404                            //
405                            Err(_) => break,
406                        }
407                    }
408                    ResolverBackoffState::Unstable(current_backoff) => {
409                        debug!(
410                            thread_log,
411                            "backoff unstable; waiting for stability"
412                        );
413                        //
414                        // See large comment above for explanation of why we
415                        // wait this long
416                        //
417                        match error_rx.recv_timeout(
418                            current_backoff + *BACKOFF_RESET_THRESHOLD,
419                        ) {
420                            //
421                            // We got another error, so restart the countdown
422                            //
423                            Ok(ResolverBackoffMsg::ErrorOccurred(
424                                new_backoff,
425                            )) => {
426                                debug!(
427                                    thread_log,
428                                    "error received while waiting for \
429                                     stability; restarting wait period"
430                                );
431                                state =
432                                    ResolverBackoffState::Unstable(new_backoff);
433                                continue;
434                            }
435                            //
436                            // Timeout waiting for an error: the stability
437                            // threshold has been reached, so reset the backoff
438                            //
439                            Err(RecvTimeoutError::Timeout) => {
440                                info!(
441                                    thread_log,
442                                    "stability threshold reached; resetting backoff"
443                                );
444                                let mut backoff = backoff_clone.lock().unwrap();
445                                backoff.reset();
446                                state = ResolverBackoffState::Stable
447                            }
448                            //
449                            // ResolverBackoff object was dropped, so we exit
450                            // the thread
451                            //
452                            Err(RecvTimeoutError::Disconnected) => break,
453                        }
454                    }
455                }
456            }
457            debug!(thread_log, "stability-tracking thread exiting");
458        });
459
460        ResolverBackoff {
461            backoff,
462            error_tx,
463            log,
464        }
465    }
466
467    fn next_backoff(&mut self) -> Duration {
468        let mut backoff = self.backoff.lock().unwrap();
469        //
470        // This should never fail because we set max_elapsed_time to `None` in
471        // ResolverBackoff::new().
472        //
473        let next_backoff = backoff.next_backoff().expect(
474            "next_backoff returned
475            None; max_elapsed_time has been reached erroneously",
476        );
477
478        //
479        // Notify the stability-tracking thread that an error has occurred
480        //
481        self.error_tx
482            .send(ResolverBackoffMsg::ErrorOccurred(next_backoff))
483            .expect("Error sending over error_tx");
484
485        debug!(self.log, "retrying with backoff {:?}", next_backoff);
486        next_backoff
487    }
488}
489
490//
491// For use as argument to next_delay().
492//
493#[derive(Debug, Clone)]
494enum DelayBehavior {
495    AlwaysWait,
496    CheckConnState(Arc<Mutex<ZkConnectStringState>>),
497}
498
499//
500// Helper function: Accepts a ResolverBackoff and a DelayBehavior. Returns a
501// duration to wait accordingly, consulting the provided ZkConnectStringState if
502// applicable.
503//
504// NOTE: also resets the ZkConnectStringState's connection attempts if
505// `behavior` is CheckConnState and it is found that we should wait.
506//
507fn next_delay(
508    backoff: &Arc<Mutex<ResolverBackoff>>,
509    behavior: &DelayBehavior,
510) -> Duration {
511    match behavior {
512        DelayBehavior::AlwaysWait => {
513            let backoff = &mut backoff.lock().unwrap();
514            backoff.next_backoff()
515        }
516        DelayBehavior::CheckConnState(conn_str_state) => {
517            let mut conn_str_state = conn_str_state.lock().unwrap();
518            let should_wait = conn_str_state.should_wait();
519            if should_wait {
520                conn_str_state.reset_attempts();
521                let backoff = &mut backoff.lock().unwrap();
522                backoff.next_backoff()
523            } else {
524                NO_DELAY
525            }
526        }
527    }
528}
529
530#[derive(Debug)]
531pub struct ManateePrimaryResolver {
532    ///
533    /// The addresses of the Zookeeper cluster the Resolver is connecting to,
534    /// along with associated state
535    ///
536    conn_str_state: Arc<Mutex<ZkConnectStringState>>,
537    ///
538    /// The Zookeeper path for manatee cluster state for the shard. *e.g.*
539    /// "/manatee/1.moray.coal.joyent.us/state"
540    ///
541    cluster_state_path: String,
542    ///
543    /// The key representation of the last backend sent to the cueball
544    /// connection pool. Persists across multiple calls to run().
545    ///
546    last_backend: Arc<Mutex<Option<BackendKey>>>,
547    ///
548    /// Indicates whether or not the resolver is running. This is slightly
549    /// superfluous (this field is `true` for exactly the duration of each
550    /// call to run(), and false otherwise), but could be useful if the caller
551    /// wants to check if the resolver is running for some reason.
552    ///
553    pub is_running: bool,
554    ///
555    /// The ManateePrimaryResolver's root log
556    ///
557    log: Logger,
558}
559
560impl ManateePrimaryResolver {
561    ///
562    /// Creates a new ManateePrimaryResolver instance.
563    ///
564    /// # Arguments
565    ///
566    /// * `conn_str` - a comma-separated list of the zookeeper instances
567    ///   in the cluster
568    /// * `path` - The path to the root node in zookeeper for the shard we're
569    ///    watching
570    ///
571    pub fn new(
572        conn_str: ZkConnectString,
573        path: String,
574        log: Option<Logger>,
575    ) -> Self {
576        let cluster_state_path = [&path, "/state"].concat();
577
578        //
579        // Add the log_values to the passed-in logger, or create a new logger if
580        // the caller did not pass one in
581        //
582        let log = log.unwrap_or_else(|| {
583            Logger::root(
584                Mutex::new(LevelFilter::new(
585                    slog_bunyan::with_name(crate_name!(), std::io::stdout())
586                        .build(),
587                    slog::Level::Info,
588                ))
589                .fuse(),
590                o!("build-id" => crate_version!()),
591            )
592        });
593
594        ManateePrimaryResolver {
595            conn_str_state: Arc::new(Mutex::new(ZkConnectStringState::new(
596                conn_str,
597            ))),
598            cluster_state_path,
599            last_backend: Arc::new(Mutex::new(None)),
600            is_running: false,
601            log,
602        }
603    }
604}
605
606impl Resolver for ManateePrimaryResolver {
607    //
608    // The resolver object is not Sync, so we can assume that only one instance
609    // of this function is running at once, because callers will have to control
610    // concurrent access.
611    //
612    // If the connection pool closes the receiving end of the channel, this
613    // function may not return right away -- this function will not notice that
614    // the pool has disconnected until this function tries to send another
615    // heartbeat, at which point this function will return. This means that the
616    // time between disconnection and function return is at most the length of
617    // HEARTBEAT_INTERVAL. Any change in the meantime will be picked up by the
618    // next call to run().
619    //
620    // Indeed, the heartbeat messages exist solely as a time-boxed method to
621    // test whether the connection pool has closed the channel, so we don't leak
622    // resolver threads.
623    //
624    fn run(&mut self, s: Sender<BackendMsg>) {
625        debug!(self.log, "run() method entered");
626
627        let mut rt = Runtime::new().unwrap();
628        //
629        // There's no need to check if the pool is already running and return
630        // early, because multiple instances of this function _cannot_ be
631        // running concurrently -- see this function's header comment.
632        //
633        self.is_running = true;
634
635        let conn_backoff = Arc::new(Mutex::new(ResolverBackoff::new(
636            self.log.new(o!("component" => "conn_backoff")),
637        )));
638        let watch_backoff = Arc::new(Mutex::new(ResolverBackoff::new(
639            self.log.new(o!("component" => "watch_backoff")),
640        )));
641        let loop_core = ResolverCore {
642            pool_tx: s.clone(),
643            last_backend: Arc::clone(&self.last_backend),
644            conn_str_state: Arc::clone(&self.conn_str_state),
645            cluster_state_path: self.cluster_state_path.clone(),
646            conn_backoff,
647            watch_backoff,
648            log: self.log.clone(),
649        };
650        let at_log = self.log.clone();
651
652        let exited = Arc::new(AtomicBool::new(false));
653        let exited_clone = Arc::clone(&exited);
654
655        //
656        // Start the event-processing task. This is structured as two nested
657        // loops: one to handle the zookeeper connection and one to handle
658        // setting the watch. These are handled by the connect_loop() and
659        // watch_loop() functions, respectively.
660        //
661        info!(self.log, "run(): starting runtime");
662        rt.spawn(
663            //
664            // Outer loop. Handles connecting to zookeeper. A new loop iteration
665            // means a new zookeeper connection. We break from the loop if we
666            // discover that the user has closed the receiving channel, which
667            // is their sole means of stopping the client.
668            //
669            // Arg: Time to wait before attempting to connect. Initially 0s.
670            //     Repeated iterations of the loop set a delay before
671            //     connecting.
672            // Loop::Break type: ()
673            //
674            loop_fn(NO_DELAY, move |delay| {
675                loop_core.clone().connect_loop(delay)
676            })
677            .and_then(move |_| {
678                info!(at_log, "Event-processing task stopping");
679                exited_clone.store(true, Ordering::Relaxed);
680                Ok(())
681            })
682            .map(|_| ())
683            .map_err(|_| {
684                unreachable!("connect_loop() should never return an error")
685            }),
686        );
687
688        //
689        // Heartbeat-sending loop. If we break from this loop, the resolver
690        // exits.
691        //
692        loop {
693            if exited.load(Ordering::Relaxed) {
694                info!(
695                    self.log,
696                    "event-processing task exited; stopping heartbeats"
697                );
698                break;
699            }
700            if s.send(BackendMsg::HeartbeatMsg).is_err() {
701                info!(self.log, "Connection pool channel closed");
702                break;
703            }
704            thread::sleep(HEARTBEAT_INTERVAL);
705        }
706
707        //
708        // We shut down the background watch-looping thread. It may have already
709        // exited by itself if it noticed that the connection pool closed its
710        // channel, but there's no harm still calling shutdown_now() in that
711        // case.
712        //
713        info!(self.log, "Stopping runtime");
714        rt.shutdown_now().wait().unwrap();
715        info!(self.log, "Runtime stopped successfully");
716        self.is_running = false;
717        debug!(self.log, "run() returned successfully");
718    }
719}
720
721//
722// Returns a mock event that can be used to bootstrap the watch loop. We use
723// a NodeDataChanged event because the watch loop's response to this type of
724// event is to get the data from the node being watched, which is also what
725// we want to do when we start the watch loop anew.
726//
727fn mock_event() -> WatchedEvent {
728    WatchedEvent {
729        event_type: WatchedEventType::NodeDataChanged,
730        //
731        // This artificial keeper_state doesn't necessarily reflect reality, but
732        // that's ok because it's paired with an artificial NodeDataChanged
733        // event, and our handling for this type of event doesn't involve the
734        // keeper_state field.
735        //
736        keeper_state: KeeperState::SyncConnected,
737        //
738        // We never use `path`, so we might as well set it to a clarifying
739        // string in our artificially constructed WatchedEvent object.
740        //
741        path: "MOCK_EVENT_PATH".to_string(),
742    }
743}
744
745//
746// Parses the given zookeeper node data into a Backend object, compares it to
747// the last Backend sent to the cueball connection pool, and sends it to the
748// connection pool if the values differ.
749//
750// We need to extract two pieces of data from the "primary" json object in the
751// json returned by zookeeper:
752// * The backend's IP address
753// * The backend's port
754// The json object has an "ip" field, but not a port field. However, the port
755// is contained in the "pgUrl" field, so this function extracts it from that.
756// The port could also be extracted from the "id" field, but the pgUrl field is
757// a convenient choice as it can be parsed structurally as a url and the port
758// extracted from there.
759//
760// What this all means is: the resolver relies on the "primary.ip" and
761// "primary.pgUrl" fields as an _interface_ to the zookeeper data. This feels a
762// little ad-hoc and should be formalized and documented.
763//
764// # Arguments
765//
766// * `pool_tx` - The Sender upon which to send the update message
767// * `new_value` - The raw zookeeper data we've newly retrieved
768// * `last_backend` - The last Backend we sent to the connection pool
769// * `log` - The Logger to be used for logging
770//
771fn process_value(
772    pool_tx: &Sender<BackendMsg>,
773    new_value: &[u8],
774    last_backend: Arc<Mutex<Option<BackendKey>>>,
775    log: Logger,
776) -> Result<(), ResolverError> {
777    debug!(log, "process_value() entered");
778
779    // Parse the bytes into a json object
780    let v: SerdeJsonValue = match serde_json::from_slice(&new_value) {
781        Ok(v) => v,
782        Err(_) => {
783            return Err(ResolverError::InvalidZkJson);
784        }
785    };
786
787    //
788    // Parse out the ip. We expect the json fields to exist, and return an error
789    // if they don't, or if they are of the wrong type.
790    //
791    let ip = match &v["primary"]["ip"] {
792        SerdeJsonValue::String(s) => match BackendAddress::from_str(s) {
793            Ok(s) => s,
794            Err(_) => {
795                return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
796            }
797        },
798        SerdeJsonValue::Null => {
799            return Err(ResolverError::MissingZkData(ZkDataField::Ip));
800        }
801        _ => {
802            return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
803        }
804    };
805
806    //
807    // Parse out the port. We expect the json fields to exist, and return an
808    // error if they don't, or if they are of the wrong type.
809    //
810    let port = match &v["primary"]["pgUrl"] {
811        SerdeJsonValue::String(s) => match Url::parse(s) {
812            Ok(url) => match url.port() {
813                Some(port) => port,
814                None => {
815                    return Err(ResolverError::MissingZkData(
816                        ZkDataField::Port,
817                    ));
818                }
819            },
820            Err(_) => {
821                return Err(ResolverError::InvalidZkData(
822                    ZkDataField::PostgresUrl,
823                ));
824            }
825        },
826        SerdeJsonValue::Null => {
827            return Err(ResolverError::MissingZkData(ZkDataField::PostgresUrl));
828        }
829        _ => {
830            return Err(ResolverError::InvalidZkData(ZkDataField::PostgresUrl));
831        }
832    };
833
834    // Construct a backend and key
835    let backend = Backend::new(&ip, port);
836    let backend_key = backend::srv_key(&backend);
837
838    // Determine whether we need to send the new backend over
839    let mut last_backend = last_backend.lock().unwrap();
840    let should_send = match (*last_backend).clone() {
841        Some(lb) => lb != backend_key,
842        None => true,
843    };
844
845    // Send the new backend if necessary
846    if should_send {
847        info!(log, "New backend found; sending to connection pool";
848            "backend" => LogItem(backend.clone()));
849        if pool_tx
850            .send(BackendMsg::AddedMsg(BackendAddedMsg {
851                key: backend_key.clone(),
852                backend,
853            }))
854            .is_err()
855        {
856            return Err(ResolverError::ConnectionPoolShutdown);
857        }
858
859        let lb_clone = (*last_backend).clone();
860        *last_backend = Some(backend_key);
861
862        //
863        // Notify the connection pool that the old backend should be
864        // removed, if the old backend is not None
865        //
866        if let Some(lbc) = lb_clone {
867            info!(log, "Notifying connection pool of removal of old backend");
868            if pool_tx
869                .send(BackendMsg::RemovedMsg(BackendRemovedMsg(lbc)))
870                .is_err()
871            {
872                return Err(ResolverError::ConnectionPoolShutdown);
873            }
874        }
875    } else {
876        info!(log, "New backend value does not differ; not sending");
877    }
878    debug!(log, "process_value() returned successfully");
879    Ok(())
880}
881
882//
883// Encapsulates all of the resolver state used in the futures context.
884//
885// Note that this struct's methods consume the instance of the struct they are
886// called upon. The struct should be freely cloned to accommodate this.
887//
888#[derive(Clone)]
889struct ResolverCore {
890    // The Sender that this function should use to communicate with the cueball
891    // connection pool
892    pool_tx: Sender<BackendMsg>,
893    // The key representation of the last backend sent to the cueball connection
894    // pool. It will be updated by process_value() if we send a new backend over
895    last_backend: Arc<Mutex<Option<BackendKey>>>,
896    // The addresses of the Zookeeper cluster the Resolver is connecting to,
897    // along with associated state
898    conn_str_state: Arc<Mutex<ZkConnectStringState>>,
899    // The path to the cluster state node in zookeeper for the shard we're
900    // watching
901    cluster_state_path: String,
902    // The exponential backoff state for the ZooKeeper connection
903    conn_backoff: Arc<Mutex<ResolverBackoff>>,
904    // The exponential backoff state for watching the ZooKeeper node
905    watch_backoff: Arc<Mutex<ResolverBackoff>>,
906    log: Logger,
907}
908
909impl ResolverCore {
910    //
911    // This function represents the body of the connect loop. It handles the
912    // ZooKeeper connection, and calls watch_loop() to watch the ZooKeeper node.
913    //
914    // # Arguments
915    //
916    // * `delay` - The length of time to wait before attempting to connect
917    //
918    fn connect_loop(
919        self,
920        delay: Duration,
921    ) -> impl Future<Item = Loop<(), Duration>, Error = ()> + Send {
922        let log = self.log.clone();
923        let oe_log = log.clone();
924        let oe_conn_backoff = Arc::clone(&self.conn_backoff);
925        let oe_conn_str_state = Arc::clone(&self.conn_str_state);
926
927        Delay::new(Instant::now() + delay)
928            .and_then(move |_| {
929                let mut builder = ZooKeeperBuilder::default();
930                builder.set_timeout(SESSION_TIMEOUT);
931                builder.set_logger(log.new(o!(
932                    "component" => "zookeeper"
933                )));
934
935                // Get the next address to connect to
936                let mut state = self.conn_str_state.lock().unwrap();
937                let addr = state.next_addr();
938                drop(state);
939
940                info!(log, "Connecting to ZooKeeper"; "addr" => addr);
941
942                //
943                // We expect() the result of get_addr_at() because we anticipate
944                // the connect string having at least one element, and we can't
945                // do anything useful if it doesn't.
946                //
947                builder
948                    .connect(&addr)
949                    .timeout(TCP_CONNECT_TIMEOUT)
950                    .and_then(move |(zk, default_watcher)| {
951                        info!(log, "Connected to ZooKeeper";
952                            "addr" => addr);
953
954                        //
955                        // We've connected successfully, so reset the
956                        // connection attempts
957                        //
958                        let mut state = self.conn_str_state.lock().unwrap();
959                        state.reset_attempts();
960                        drop(state);
961
962                        //
963                        // Main change-watching loop. A new loop iteration means
964                        // we're setting a new watch (if necessary) and waiting
965                        // for a result. Breaking from the loop means that we've
966                        // hit some error and are returning control to the outer
967                        // loop.
968                        //
969                        // Arg: WatchLoopState -- we set curr_event to an
970                        //     artificially constructed WatchedEvent for the
971                        //     first loop iteration, so the connection pool will
972                        //     be initialized with the initial primary as its
973                        //     backend.
974                        // Loop::Break type: NextAction -- this value is used to
975                        //     instruct the outer loop (this function) whether
976                        //     to try to reconnect or terminate.
977                        //
978                        loop_fn(
979                            WatchLoopState {
980                                watcher: Box::new(default_watcher),
981                                curr_event: mock_event(),
982                                delay: NO_DELAY,
983                            },
984                            move |loop_state| {
985                                //
986                                // These fields require a new clone for every
987                                // loop iteration, but they don't actually
988                                // change from iteration to iteration, so
989                                // they're not included as part of loop_state.
990                                //
991                                let loop_core = self.clone();
992                                let zk = zk.clone();
993                                loop_core
994                                    .watch_loop(zk, loop_state)
995                                    .map_err(TimeoutError::inner)
996                            },
997                        )
998                        .and_then(move |next_action| {
999                            ok(match next_action {
1000                                NextAction::Stop => Loop::Break(()),
1001                                //
1002                                // We reconnect immediately here instead of
1003                                // waiting, because if we're here it means that
1004                                // we came from the inner loop and thus we just
1005                                // had a valid connection terminate (as opposed
1006                                // to the `or_else` block below, were we've just
1007                                // tried to connect and failed), and thus
1008                                // there's no reason for us to delay trying to
1009                                // connect again.
1010                                //
1011                                NextAction::Reconnect(delay) => {
1012                                    Loop::Continue(delay)
1013                                }
1014                            })
1015                        })
1016                    })
1017                    .or_else(move |error| {
1018                        error!(oe_log, "Error connecting to ZooKeeper cluster";
1019                    "error" => LogItem(error));
1020                        ok(Loop::Continue(next_delay(
1021                            &oe_conn_backoff,
1022                            &DelayBehavior::CheckConnState(oe_conn_str_state),
1023                        )))
1024                    })
1025            })
1026            .map_err(|e| panic!("delay errored; err: {:?}", e))
1027    }
1028
1029    //
1030    // This function represents the body of the watch loop. It both sets and
1031    // handles the watch, and calls process_value() to send new data to the
1032    // connection pool as the data arrives.
1033    //
1034    // This function can return from two states: before we've waited for the
1035    //  watch to fire (if we hit an error before waiting), or after we've waited
1036    // for the watch to fire (this could be a success or an error). These two
1037    // states require returning different Future types, so we wrap the returned
1038    // values in a future::Either to satisfy the type checker.
1039    //
1040    // # Arguments
1041    //
1042    // * `zk` - The ZooKeeper client object
1043    // * `loop_state`: The state to be passed to the next iteration of loop_fn()
1044    //
1045    fn watch_loop(
1046        self,
1047        zk: ZooKeeper,
1048        loop_state: WatchLoopState,
1049    ) -> impl Future<Item = Loop<NextAction, WatchLoopState>, Error = FailureError>
1050           + Send {
1051        let watcher = loop_state.watcher;
1052        let curr_event = loop_state.curr_event;
1053        let delay = loop_state.delay;
1054        let check_behavior =
1055            DelayBehavior::CheckConnState(self.conn_str_state.clone());
1056        //
1057        // TODO avoid mutex boilerplate from showing up in the log
1058        //
1059        let log = self.log.new(o!(
1060            "curr_event" => LogItem(curr_event.clone()),
1061            "delay" => LogItem(delay),
1062            "last_backend" => LogItem(Arc::clone(&self.last_backend))
1063        ));
1064
1065        let oe_log = log.clone();
1066        let oe_conn_backoff = Arc::clone(&self.conn_backoff);
1067        let oe_check_behavior = check_behavior.clone();
1068
1069        //
1070        // Helper function to handle the arcmut and the loop boilerplate
1071        //
1072        fn next_delay_action(
1073            backoff: &Arc<Mutex<ResolverBackoff>>,
1074            behavior: &DelayBehavior,
1075        ) -> Loop<NextAction, WatchLoopState> {
1076            Loop::Break(NextAction::Reconnect(next_delay(backoff, behavior)))
1077        }
1078
1079        //
1080        // We set the watch here. If the previous iteration of the loop ended
1081        // because the keeper state changed rather than because the watch fired,
1082        // the watch will already have been set, so we don't _need_ to set it
1083        // here. With that said, it does no harm (zookeeper deduplicates watches
1084        // on the server side), and it may not be worth the effort to optimize
1085        // for this case, since keeper state changes (and, indeed, changes of
1086        // any sort) should happen infrequently.
1087        //
1088        info!(log, "Getting data");
1089        Delay::new(Instant::now() + delay)
1090            .and_then(move |_| {
1091                zk
1092            .watch()
1093            .get_data(&self.cluster_state_path)
1094            .and_then(move |(_, data)| {
1095                match curr_event.event_type {
1096                    // Keeper state has changed
1097                    WatchedEventType::None => {
1098                        match curr_event.keeper_state {
1099                            //
1100                            // TODO will these cases ever happen? Because if the
1101                            // keeper state is "bad", then get_data() will have
1102                            // failed and we won't be here.
1103                            //
1104                            KeeperState::Disconnected |
1105                            KeeperState::AuthFailed |
1106                            KeeperState::Expired => {
1107                                error!(log, "Keeper state changed; reconnecting";
1108                                    "keeper_state" =>
1109                                    LogItem(curr_event.keeper_state));
1110                                return Either::A(ok(next_delay_action(
1111                                    &self.conn_backoff,
1112                                    &check_behavior
1113                                )));
1114                            },
1115                            KeeperState::SyncConnected |
1116                            KeeperState::ConnectedReadOnly |
1117                            KeeperState::SaslAuthenticated => {
1118                                info!(log, "Keeper state changed";
1119                                    "keeper_state" =>
1120                                    LogItem(curr_event.keeper_state));
1121                            }
1122                        }
1123                    },
1124                    // The data watch fired
1125                    WatchedEventType::NodeDataChanged => {
1126                        //
1127                        // We didn't get the data, which means the node doesn't
1128                        // exist yet. We should wait a bit and try again. We'll
1129                        // just use the same event as before.
1130                        //
1131                        if data.is_none() {
1132                            info!(log, "ZK data does not exist yet");
1133                            let delay = next_delay(
1134                                &self.watch_backoff,
1135                                &DelayBehavior::AlwaysWait
1136                            );
1137                            return Either::A(ok(Loop::Continue(WatchLoopState {
1138                                watcher,
1139                                curr_event,
1140                                delay,
1141                            })));
1142                        }
1143                        //
1144                        // Discard the Stat from the data, as we don't use it.
1145                        //
1146                        let data = data.unwrap().0;
1147                        info!(log, "got data"; "data" => LogItem(data.clone()));
1148                        match process_value(
1149                            &self.pool_tx.clone(),
1150                            &data,
1151                            Arc::clone(&self.last_backend),
1152                            log.clone()
1153                        ) {
1154                            Ok(_) => {},
1155                            Err(e) => {
1156                                error!(log, ""; "error" => LogItem(e.clone()));
1157                                //
1158                                // The error is between the client and the
1159                                // outward-facing channel, not between the
1160                                // client and the zookeeper connection, so we
1161                                // don't have to attempt to reconnect here and
1162                                // can continue, unless the error tells us to
1163                                // stop.
1164                                //
1165                                if e.should_stop() {
1166                                    return Either::A(ok(Loop::Break(
1167                                        NextAction::Stop)));
1168                                }
1169                            }
1170                        }
1171                    },
1172                    WatchedEventType::NodeDeleted => {
1173                        //
1174                        // The node doesn't exist, but we can't use the existing
1175                        // event, or we'll just loop on this case forever. We
1176                        // use the mock event instead.
1177                        //
1178                        info!(log, "ZK node deleted");
1179                        let delay = next_delay(
1180                            &self.watch_backoff,
1181                            &DelayBehavior::AlwaysWait
1182                        );
1183                        return Either::A(ok(Loop::Continue(WatchLoopState {
1184                            watcher,
1185                            curr_event: mock_event(),
1186                            delay,
1187                        })));
1188                    },
1189                    e => panic!("Unexpected event received: {:?}", e)
1190                };
1191
1192                //
1193                // If we got here, we're waiting for the watch to fire. Before
1194                // this point, we wrap the return value in Either::A. After this
1195                // point, we wrap the return value in Either::B. See the comment
1196                // about "Either" some lines above.
1197                //
1198                let oe_log = log.clone();
1199                let oe_conn_backoff = Arc::clone(&self.conn_backoff);
1200                let oe_check_behavior = check_behavior.clone();
1201
1202                info!(log, "Watching for change");
1203                Either::B(watcher.into_future()
1204                    .and_then(move |(event, watcher)| {
1205                        let loop_next = match event {
1206                            Some(event) => {
1207                                info!(log, "change event received; looping to \
1208                                    process event"; "event" =>
1209                                    LogItem(event.clone()));
1210                                Loop::Continue(WatchLoopState {
1211                                    watcher,
1212                                    curr_event: event,
1213                                    delay: NO_DELAY
1214                                })
1215                            },
1216                            //
1217                            // If we didn't get a valid event, this means the
1218                            // Stream got closed, which indicates a connection
1219                            // issue, so we reconnect.
1220                            //
1221                            None => {
1222                                error!(log, "Event stream closed; reconnecting");
1223                                next_delay_action(
1224                                    &self.conn_backoff,
1225                                    &check_behavior
1226                                )
1227                            }
1228                        };
1229                        ok(loop_next)
1230                    })
1231                    .or_else(move |_| {
1232                        //
1233                        // If we get an error from the event Stream, we assume
1234                        // that something went wrong with the zookeeper
1235                        // connection and attempt to reconnect.
1236                        //
1237                        // The stream's error type is (), so there's no
1238                        // information to extract from it.
1239                        //
1240                        error!(oe_log, "Error received from event stream");
1241                        ok(next_delay_action(
1242                            &oe_conn_backoff,
1243                            &oe_check_behavior
1244                        ))
1245                    }))
1246            })
1247            //
1248            // If some error occurred getting the data, we assume we should
1249            // reconnect to the zookeeper server.
1250            //
1251            .or_else(move |error| {
1252                error!(oe_log, "Error getting data"; "error" => LogItem(error));
1253                ok(next_delay_action(
1254                    &oe_conn_backoff,
1255                    &oe_check_behavior
1256                ))
1257            })
1258            })
1259            .map_err(|e| panic!("delay errored; err: {:?}", e))
1260    }
1261}
1262
1263//
1264// Unit tests
1265//
1266// The "." path attribute below is so paths within the `test` submodule will be
1267// relative to "src" rather than "src/test", which does not exist. This allows
1268// us to import the "../tests/test_data.rs" module here.
1269//
1270#[path = "."]
1271#[cfg(test)]
1272mod test {
1273    use super::*;
1274
1275    use std::iter;
1276    use std::sync::mpsc::TryRecvError;
1277    use std::sync::{Arc, Mutex};
1278    use std::vec::Vec;
1279
1280    use quickcheck::{quickcheck, Arbitrary, Gen};
1281
1282    use common::{test_data, util};
1283
1284    impl Arbitrary for ZkConnectString {
1285        fn arbitrary<G: Gen>(g: &mut G) -> Self {
1286            let size = usize::arbitrary(g);
1287            ZkConnectString(
1288                iter::repeat(())
1289                    .map(|()| SocketAddr::arbitrary(g))
1290                    .take(size)
1291                    .collect(),
1292            )
1293        }
1294    }
1295
1296    //
1297    // Test parsing ZkConnectString from string
1298    //
1299    quickcheck! {
1300        fn prop_zk_connect_string_parse(
1301            conn_str: ZkConnectString
1302        ) -> bool
1303        {
1304            //
1305            // We expect an error only if the input string was zero-length
1306            //
1307            match ZkConnectString::from_str(&conn_str.to_string()) {
1308                Ok(cs) => cs == conn_str,
1309                _ => conn_str.to_string() == ""
1310            }
1311        }
1312    }
1313
1314    // Below: test process_value()
1315
1316    //
1317    // Represents a process_value test case, including inputs and expected
1318    // outputs.
1319    //
1320    struct ProcessValueFields {
1321        value: Vec<u8>,
1322        last_backend: BackendKey,
1323        expected_error: Option<ResolverError>,
1324        added_backend: Option<BackendAddedMsg>,
1325        removed_backend: Option<BackendRemovedMsg>,
1326    }
1327
1328    //
1329    // Run a process_value test case
1330    //
1331    fn run_process_value_fields(input: ProcessValueFields) {
1332        let (tx, rx) = channel();
1333        let last_backend = Arc::new(Mutex::new(Some(input.last_backend)));
1334
1335        let result = process_value(
1336            &tx.clone(),
1337            &input.value,
1338            last_backend,
1339            util::log_from_env(util::DEFAULT_LOG_LEVEL).unwrap(),
1340        );
1341        match input.expected_error {
1342            None => assert_eq!(result, Ok(())),
1343            Some(expected_error) => assert_eq!(result, Err(expected_error)),
1344        }
1345
1346        let mut received_messages = Vec::new();
1347
1348        let expected_message_count = {
1349            let mut acc = 0;
1350            if input.added_backend.is_some() {
1351                acc += 1;
1352            }
1353            if input.removed_backend.is_some() {
1354                acc += 1;
1355            }
1356            acc
1357        };
1358
1359        // Receive as many messages as we expect
1360        for i in 0..expected_message_count {
1361            let channel_result = rx.try_recv();
1362            match channel_result {
1363                Err(e) => panic!(
1364                    "Unexpected error receiving on channel: {:?} \
1365                     -- Loop iteration: {:?}",
1366                    e, i
1367                ),
1368                Ok(result) => {
1369                    received_messages.push(result);
1370                }
1371            }
1372        }
1373
1374        //
1375        // Make sure there are not more messages than we expect on the channel.
1376        // Can't use assert_eq! here because BackendMsg doesn't implement Debug.
1377        //
1378        match rx.try_recv() {
1379            Err(TryRecvError::Empty) => (),
1380            _ => panic!("Unexpected message on resolver channel"),
1381        }
1382
1383        // Check that the "added" message was received if applicable
1384        if let Some(msg) = input.added_backend {
1385            let msg = BackendMsg::AddedMsg(msg);
1386            match util::find_msg_match(&received_messages, &msg) {
1387                None => panic!("added_backend not found in received messages"),
1388                Some(index) => {
1389                    received_messages.remove(index);
1390                    ()
1391                }
1392            }
1393        }
1394
1395        // Check that the "removed" message was received if applicable
1396        if let Some(msg) = input.removed_backend {
1397            let msg = BackendMsg::RemovedMsg(msg);
1398            match util::find_msg_match(&received_messages, &msg) {
1399                None => {
1400                    panic!("removed_backend not found in received messages")
1401                }
1402                Some(index) => {
1403                    received_messages.remove(index);
1404                    ()
1405                }
1406            }
1407        }
1408    }
1409
1410    #[test]
1411    fn process_value_test_port_ip_change() {
1412        let data_1 = test_data::backend_ip1_port1();
1413        let data_2 = test_data::backend_ip2_port2();
1414
1415        run_process_value_fields(ProcessValueFields {
1416            value: data_2.raw_vec(),
1417            last_backend: data_1.key(),
1418            expected_error: None,
1419            added_backend: Some(data_2.added_msg()),
1420            removed_backend: Some(data_1.removed_msg()),
1421        });
1422    }
1423
1424    #[test]
1425    fn process_value_test_port_change() {
1426        let data_1 = test_data::backend_ip1_port1();
1427        let data_2 = test_data::backend_ip2_port1();
1428
1429        run_process_value_fields(ProcessValueFields {
1430            value: data_2.raw_vec(),
1431            last_backend: data_1.key(),
1432            expected_error: None,
1433            added_backend: Some(data_2.added_msg()),
1434            removed_backend: Some(data_1.removed_msg()),
1435        });
1436    }
1437
1438    #[test]
1439    fn process_value_test_ip_change() {
1440        let data_1 = test_data::backend_ip1_port1();
1441        let data_2 = test_data::backend_ip1_port2();
1442
1443        run_process_value_fields(ProcessValueFields {
1444            value: data_2.raw_vec(),
1445            last_backend: data_1.key(),
1446            expected_error: None,
1447            added_backend: Some(data_2.added_msg()),
1448            removed_backend: Some(data_1.removed_msg()),
1449        });
1450    }
1451
1452    #[test]
1453    fn process_value_test_no_change() {
1454        let data = test_data::backend_ip1_port1();
1455
1456        run_process_value_fields(ProcessValueFields {
1457            value: data.raw_vec(),
1458            last_backend: data.key(),
1459            expected_error: None,
1460            added_backend: None,
1461            removed_backend: None,
1462        });
1463    }
1464
1465    #[test]
1466    fn process_value_test_no_ip() {
1467        let filler = test_data::backend_ip1_port1();
1468
1469        run_process_value_fields(ProcessValueFields {
1470            value: test_data::no_ip_vec(),
1471            last_backend: filler.key(),
1472            expected_error: Some(ResolverError::MissingZkData(ZkDataField::Ip)),
1473            added_backend: None,
1474            removed_backend: None,
1475        });
1476    }
1477
1478    #[test]
1479    fn process_value_test_wrong_type_ip() {
1480        let filler = test_data::backend_ip1_port1();
1481
1482        run_process_value_fields(ProcessValueFields {
1483            value: test_data::wrong_type_ip_vec(),
1484            last_backend: filler.key(),
1485            expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
1486            added_backend: None,
1487            removed_backend: None,
1488        });
1489    }
1490
1491    #[test]
1492    fn process_value_test_invalid_ip() {
1493        let filler = test_data::backend_ip1_port1();
1494
1495        run_process_value_fields(ProcessValueFields {
1496            value: test_data::invalid_ip_vec(),
1497            last_backend: filler.key(),
1498            expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
1499            added_backend: None,
1500            removed_backend: None,
1501        });
1502    }
1503
1504    #[test]
1505    fn process_value_test_no_pg_url() {
1506        let filler = test_data::backend_ip1_port1();
1507
1508        run_process_value_fields(ProcessValueFields {
1509            value: test_data::no_pg_url_vec(),
1510            last_backend: filler.key(),
1511            expected_error: Some(ResolverError::MissingZkData(
1512                ZkDataField::PostgresUrl,
1513            )),
1514            added_backend: None,
1515            removed_backend: None,
1516        });
1517    }
1518
1519    #[test]
1520    fn process_value_test_wrong_type_pg_url() {
1521        let filler = test_data::backend_ip1_port1();
1522
1523        run_process_value_fields(ProcessValueFields {
1524            value: test_data::wrong_type_pg_url_vec(),
1525            last_backend: filler.key(),
1526            expected_error: Some(ResolverError::InvalidZkData(
1527                ZkDataField::PostgresUrl,
1528            )),
1529            added_backend: None,
1530            removed_backend: None,
1531        });
1532    }
1533
1534    #[test]
1535    fn process_value_test_invalid_pg_url() {
1536        let filler = test_data::backend_ip1_port1();
1537
1538        run_process_value_fields(ProcessValueFields {
1539            value: test_data::invalid_pg_url_vec(),
1540            last_backend: filler.key(),
1541            expected_error: Some(ResolverError::InvalidZkData(
1542                ZkDataField::PostgresUrl,
1543            )),
1544            added_backend: None,
1545            removed_backend: None,
1546        });
1547    }
1548
1549    #[test]
1550    fn process_value_test_no_port_pg_url() {
1551        let filler = test_data::backend_ip1_port1();
1552
1553        run_process_value_fields(ProcessValueFields {
1554            value: test_data::no_port_pg_url_vec(),
1555            last_backend: filler.key(),
1556            expected_error: Some(ResolverError::MissingZkData(
1557                ZkDataField::Port,
1558            )),
1559            added_backend: None,
1560            removed_backend: None,
1561        });
1562    }
1563
1564    #[test]
1565    fn process_value_test_invalid_json() {
1566        let filler = test_data::backend_ip1_port1();
1567
1568        run_process_value_fields(ProcessValueFields {
1569            value: test_data::invalid_json_vec(),
1570            last_backend: filler.key(),
1571            expected_error: Some(ResolverError::InvalidZkJson),
1572            added_backend: None,
1573            removed_backend: None,
1574        });
1575    }
1576}