cueball_manatee_primary_resolver/lib.rs
1//
2// Copyright 2020 Joyent, Inc.
3//
4// THEORY STATEMENT -- READ THIS FIRST!
5//
6// This library has just one task: watch a zookeeper node for changes and notify
7// users of the library when changes occur. This is accomplished in one big pair
8// of nested loops. The code structure looks a little like this:
9//
10// ManateePrimaryResolver::run()
11// -> Spawns tokio task
12// -> Runs outer loop, which handles connecting/reconnecting to zookeeper.
13// The logic here is in the connect_loop() function.
14// -> Runs inner loop, which handles setting/resetting the zookeeper
15// watch. The logic here is in the watch_loop() function.
16// -> For every change in zookeeper data, calls the process_value()
17// function. This function parses the new zookeeper data and
18// notifies the user of the change in data if necessary.
19//
20// Note that this file also contains unit tests for process_value().
21//
22
23use std::convert::From;
24use std::default::Default;
25use std::fmt::Debug;
26use std::net::{AddrParseError, SocketAddr};
27use std::str::FromStr;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::mpsc::{channel, RecvTimeoutError, Sender};
30use std::sync::{Arc, Mutex};
31use std::thread;
32use std::time::{Duration, Instant};
33
34use backoff::backoff::Backoff;
35use backoff::default::{MAX_INTERVAL_MILLIS, MULTIPLIER};
36use backoff::ExponentialBackoff;
37use clap::{crate_name, crate_version};
38use failure::Error as FailureError;
39use futures::future::{loop_fn, ok, Either, Future, Loop};
40use futures::stream::Stream;
41use itertools::Itertools;
42use lazy_static::lazy_static;
43use rand;
44use rand::Rng;
45use serde::Deserialize;
46use serde_json;
47use serde_json::Value as SerdeJsonValue;
48use slog::Result as SlogResult;
49use slog::Value as SlogValue;
50use slog::{
51 debug, error, info, o, Drain, Key, LevelFilter, Logger, Record, Serializer,
52};
53use tokio::prelude::*;
54use tokio::runtime::Runtime;
55use tokio::timer::timeout::Error as TimeoutError;
56use tokio::timer::Delay;
57use tokio_zookeeper::{
58 KeeperState, WatchedEvent, WatchedEventType, ZooKeeper, ZooKeeperBuilder,
59};
60use url::Url;
61
62use cueball::backend::{self, Backend, BackendAddress, BackendKey};
63use cueball::resolver::{
64 BackendAddedMsg, BackendMsg, BackendRemovedMsg, Resolver,
65};
66
67pub mod common;
68
69//
70// The interval at which the resolver should send heartbeats via the
71// connection pool channel. Public for use in tests.
72//
73pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
74
75//
76// Timeout for zookeeper sessions.
77//
78const SESSION_TIMEOUT: Duration = Duration::from_secs(5);
79
80//
81// Timeout for the initial tcp connect operation to zookeeper.
82//
83const TCP_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
84
85//
86// To be used when we don't want to wait between loop iterations.
87//
88const NO_DELAY: Duration = Duration::from_secs(0);
89
90lazy_static! {
91 //
92 // The maximum Duration that next_backoff() can return (when using the
93 // default backoff parameters). Public for use in tests.
94 //
95 pub static ref MAX_BACKOFF_INTERVAL: Duration = Duration::from_millis(
96 (MAX_INTERVAL_MILLIS as f64 * MULTIPLIER).ceil() as u64,
97 );
98
99 //
100 // The amount of time that must elapse after successful connection without
101 // an error occurring in order for the resolver state to be considered
102 // stable, at which point the backoff state is reset.
103 //
104 // We choose the threshold based on MAX_BACKOFF_INTERVAL because if the
105 // threshold were smaller, a given backoff interval could be bigger than the
106 // threshold, so we would prematurely reset the backoff before the operation
107 // even got a chance to try again. The threshold could be bigger, but what's
108 // the point in that?
109 //
110 // We add a little slack so the backoff doesn't get
111 // reset just before a (possibly failing) reconnect attempt is made.
112 //
113 static ref BACKOFF_RESET_THRESHOLD: Duration =
114 *MAX_BACKOFF_INTERVAL + Duration::from_secs(1);
115}
116
117//
118// An error type to be used internally.
119//
120#[derive(Clone, Debug, PartialEq)]
121enum ResolverError {
122 InvalidZkJson,
123 InvalidZkData(ZkDataField),
124 MissingZkData(ZkDataField),
125 ConnectionPoolShutdown,
126}
127
128impl ResolverError {
129 ///
130 /// This function provides a means of determining whether or not a given
131 /// error should cause the resolver to stop.
132 ///
133 fn should_stop(&self) -> bool {
134 match self {
135 ResolverError::ConnectionPoolShutdown => true,
136 _ => false,
137 }
138 }
139}
140
141#[derive(Clone, Debug, PartialEq)]
142enum ZkDataField {
143 Ip,
144 Port,
145 PostgresUrl,
146}
147
148#[derive(Debug)]
149pub enum ZkConnectStringError {
150 EmptyString,
151 MalformedAddr,
152}
153
154impl From<AddrParseError> for ZkConnectStringError {
155 fn from(_: AddrParseError) -> Self {
156 ZkConnectStringError::MalformedAddr
157 }
158}
159
160///
161/// `ZkConnectString` represents a list of zookeeper addresses to connect to.
162///
163#[derive(Debug, Clone, PartialEq, Deserialize)]
164pub struct ZkConnectString(Vec<SocketAddr>);
165
166impl ZkConnectString {
167 ///
168 /// Gets a reference to the SocketAddr at the provided index. Returns None
169 /// if the index is out of bounds.
170 ///
171 fn get_addr_at(&self, index: usize) -> Option<SocketAddr> {
172 self.0.get(index).cloned()
173 }
174
175 ///
176 /// Returns the number of addresses in the ZkConnectString
177 ///
178 fn len(&self) -> usize {
179 self.0.len()
180 }
181}
182
183impl ToString for ZkConnectString {
184 fn to_string(&self) -> String {
185 self.0
186 .iter()
187 .map(|x| x.to_string())
188 .intersperse(String::from(","))
189 .collect()
190 }
191}
192
193impl FromStr for ZkConnectString {
194 type Err = ZkConnectStringError;
195
196 fn from_str(s: &str) -> Result<Self, Self::Err> {
197 if s.is_empty() {
198 return Err(ZkConnectStringError::EmptyString);
199 }
200 let acc: Result<Vec<SocketAddr>, Self::Err> = Ok(vec![]);
201 s.split(',')
202 .map(|x| SocketAddr::from_str(x))
203 .fold(acc, |acc, x| match (acc, x) {
204 (Ok(mut addrs), Ok(addr)) => {
205 addrs.push(addr);
206 Ok(addrs)
207 }
208 (Err(e), _) => Err(e),
209 (_, Err(e)) => Err(ZkConnectStringError::from(e)),
210 })
211 .and_then(|x| Ok(ZkConnectString(x)))
212 }
213}
214
215//
216// Encapsulates a ZkConnectString with some bookkeeping to keep track of which
217// address the resolver should attempt to connect to next, and how many
218// connection attempts have failed in a row. Provides methods for getting the
219// next address, resetting the number of failed attempts, and checking if
220// the resolver should wait before trying to connect again.
221//
222#[derive(Debug, Clone)]
223struct ZkConnectStringState {
224 conn_str: ZkConnectString,
225 curr_idx: usize,
226 conn_attempts: usize,
227}
228
229impl ZkConnectStringState {
230 fn new(conn_str: ZkConnectString) -> Self {
231 let mut rng = rand::thread_rng();
232 let idx: usize = rng.gen_range(0, conn_str.len());
233
234 ZkConnectStringState {
235 conn_str,
236 curr_idx: idx,
237 conn_attempts: 0,
238 }
239 }
240
241 fn next_addr(&mut self) -> SocketAddr {
242 let ret = self
243 .conn_str
244 .get_addr_at(self.curr_idx)
245 .expect("connect string access out of bounds");
246 self.curr_idx += 1;
247 self.curr_idx %= self.conn_str.len();
248 self.conn_attempts += 1;
249 ret
250 }
251
252 fn reset_attempts(&mut self) {
253 self.conn_attempts = 0;
254 }
255
256 fn should_wait(&self) -> bool {
257 self.conn_attempts == self.conn_str.len()
258 }
259}
260
261///
262/// A serializable type to be used in log entries. Wraps around any type that
263/// implements Debug and uses the Debug representation of the type as the
264/// serialized output.
265///
266struct LogItem<T>(T)
267where
268 T: Debug;
269
270impl<T: Debug> SlogValue for LogItem<T> {
271 fn serialize(
272 &self,
273 _rec: &Record,
274 key: Key,
275 serializer: &mut dyn Serializer,
276 ) -> SlogResult {
277 serializer.emit_str(key, &format!("{:?}", self.0))
278 }
279}
280
281// Represents an action to be taken in the event of a connection error.
282enum NextAction {
283 //
284 // The Duration field is the amount of time to wait before reconnecting.
285 //
286 Reconnect(Duration),
287 Stop,
288}
289
290//
291// Encapsulates the state that changes between iterations of watch_loop().
292//
293struct WatchLoopState {
294 watcher: Box<dyn Stream<Item = WatchedEvent, Error = ()> + Send>,
295 curr_event: WatchedEvent,
296 delay: Duration,
297}
298
299//
300// Encapsulates a backoff object and adds the notion of a time threshold that
301// must be reached before the connection is considered stable and the backoff
302// state is reset. This threshold is managed automatically using a background
303// thread -- users can just call ResolverBackoff::next_backoff() normally.
304//
305struct ResolverBackoff {
306 backoff: Arc<Mutex<ExponentialBackoff>>,
307 //
308 // Internal channel used for indicating that an error has occurred to the
309 // stability-tracking thread
310 //
311 error_tx: Sender<ResolverBackoffMsg>,
312 log: Logger,
313}
314
315//
316// For use by ResolverBackoff object.
317//
318enum ResolverBackoffMsg {
319 //
320 // The Duration is the duration of the backoff in response to the error.
321 //
322 ErrorOccurred(Duration),
323}
324
325//
326// For use by ResolverBackoff object.
327//
328enum ResolverBackoffState {
329 Stable,
330 //
331 // The Duration is the duration of the backoff in response to the error.
332 //
333 Unstable(Duration),
334}
335
336impl ResolverBackoff {
337 fn new(log: Logger) -> Self {
338 let mut backoff = ExponentialBackoff::default();
339 //
340 // We'd rather the resolver not give up trying to reconnect, so we
341 // set the max_elapsed_time to `None` so next_backoff() always returns
342 // a valid interval.
343 //
344 backoff.max_elapsed_time = None;
345
346 let (error_tx, error_rx) = channel();
347 let backoff = Arc::new(Mutex::new(backoff));
348
349 //
350 // Start the stability-tracking thread.
351 //
352 //
353 // The waiting period for stability starts from the _time of successful
354 // connection_. The time of successful connection, if it occurs at all,
355 // will always be equal to (time of last error + duration of resulting
356 // backoff). Thus, from time of last error, we must wait (duration of
357 // resulting backoff + stability threshold) in order for the connection
358 // to be considered stable. Thus, we pass the backoff duration from
359 // next_backoff to this thread so the thread can figure out how long to
360 // wait for stability.
361 //
362 // Note that this thread doesn't actually _know_ if the connect
363 // operation succeeds, because we only touch the ResolverBackoff
364 // object when an error occurs. However, we can assume that it succeeds
365 // when waiting for stability, because, if the connect operation fails,
366 // this thread will receive another error and restart the wait period.
367 //
368 // If the connect operation takes longer than BACKOFF_RESET_THRESHOLD to
369 // complete and then fails, we'll reset the backoff erroneously. This
370 // situation is highly unlikely, so we'll cross that bridge when we come
371 // to it.
372 //
373 let backoff_clone = Arc::clone(&backoff);
374 let thread_log = log.clone();
375 debug!(log, "spawning stability-tracking thread");
376 thread::spawn(move || {
377 let mut state = ResolverBackoffState::Stable;
378 loop {
379 match state {
380 ResolverBackoffState::Stable => {
381 //
382 // Wait for an error to happen
383 //
384 debug!(thread_log, "backoff stable; waiting for error");
385 match error_rx.recv() {
386 //
387 // * zero days since last accident *
388 //
389 Ok(ResolverBackoffMsg::ErrorOccurred(
390 new_backoff,
391 )) => {
392 info!(
393 thread_log,
394 "error received; backoff transitioning to \
395 unstable state"
396 );
397 state =
398 ResolverBackoffState::Unstable(new_backoff);
399 continue;
400 }
401 //
402 // ResolverBackoff object was dropped, so we exit
403 // the thread
404 //
405 Err(_) => break,
406 }
407 }
408 ResolverBackoffState::Unstable(current_backoff) => {
409 debug!(
410 thread_log,
411 "backoff unstable; waiting for stability"
412 );
413 //
414 // See large comment above for explanation of why we
415 // wait this long
416 //
417 match error_rx.recv_timeout(
418 current_backoff + *BACKOFF_RESET_THRESHOLD,
419 ) {
420 //
421 // We got another error, so restart the countdown
422 //
423 Ok(ResolverBackoffMsg::ErrorOccurred(
424 new_backoff,
425 )) => {
426 debug!(
427 thread_log,
428 "error received while waiting for \
429 stability; restarting wait period"
430 );
431 state =
432 ResolverBackoffState::Unstable(new_backoff);
433 continue;
434 }
435 //
436 // Timeout waiting for an error: the stability
437 // threshold has been reached, so reset the backoff
438 //
439 Err(RecvTimeoutError::Timeout) => {
440 info!(
441 thread_log,
442 "stability threshold reached; resetting backoff"
443 );
444 let mut backoff = backoff_clone.lock().unwrap();
445 backoff.reset();
446 state = ResolverBackoffState::Stable
447 }
448 //
449 // ResolverBackoff object was dropped, so we exit
450 // the thread
451 //
452 Err(RecvTimeoutError::Disconnected) => break,
453 }
454 }
455 }
456 }
457 debug!(thread_log, "stability-tracking thread exiting");
458 });
459
460 ResolverBackoff {
461 backoff,
462 error_tx,
463 log,
464 }
465 }
466
467 fn next_backoff(&mut self) -> Duration {
468 let mut backoff = self.backoff.lock().unwrap();
469 //
470 // This should never fail because we set max_elapsed_time to `None` in
471 // ResolverBackoff::new().
472 //
473 let next_backoff = backoff.next_backoff().expect(
474 "next_backoff returned
475 None; max_elapsed_time has been reached erroneously",
476 );
477
478 //
479 // Notify the stability-tracking thread that an error has occurred
480 //
481 self.error_tx
482 .send(ResolverBackoffMsg::ErrorOccurred(next_backoff))
483 .expect("Error sending over error_tx");
484
485 debug!(self.log, "retrying with backoff {:?}", next_backoff);
486 next_backoff
487 }
488}
489
490//
491// For use as argument to next_delay().
492//
493#[derive(Debug, Clone)]
494enum DelayBehavior {
495 AlwaysWait,
496 CheckConnState(Arc<Mutex<ZkConnectStringState>>),
497}
498
499//
500// Helper function: Accepts a ResolverBackoff and a DelayBehavior. Returns a
501// duration to wait accordingly, consulting the provided ZkConnectStringState if
502// applicable.
503//
504// NOTE: also resets the ZkConnectStringState's connection attempts if
505// `behavior` is CheckConnState and it is found that we should wait.
506//
507fn next_delay(
508 backoff: &Arc<Mutex<ResolverBackoff>>,
509 behavior: &DelayBehavior,
510) -> Duration {
511 match behavior {
512 DelayBehavior::AlwaysWait => {
513 let backoff = &mut backoff.lock().unwrap();
514 backoff.next_backoff()
515 }
516 DelayBehavior::CheckConnState(conn_str_state) => {
517 let mut conn_str_state = conn_str_state.lock().unwrap();
518 let should_wait = conn_str_state.should_wait();
519 if should_wait {
520 conn_str_state.reset_attempts();
521 let backoff = &mut backoff.lock().unwrap();
522 backoff.next_backoff()
523 } else {
524 NO_DELAY
525 }
526 }
527 }
528}
529
530#[derive(Debug)]
531pub struct ManateePrimaryResolver {
532 ///
533 /// The addresses of the Zookeeper cluster the Resolver is connecting to,
534 /// along with associated state
535 ///
536 conn_str_state: Arc<Mutex<ZkConnectStringState>>,
537 ///
538 /// The Zookeeper path for manatee cluster state for the shard. *e.g.*
539 /// "/manatee/1.moray.coal.joyent.us/state"
540 ///
541 cluster_state_path: String,
542 ///
543 /// The key representation of the last backend sent to the cueball
544 /// connection pool. Persists across multiple calls to run().
545 ///
546 last_backend: Arc<Mutex<Option<BackendKey>>>,
547 ///
548 /// Indicates whether or not the resolver is running. This is slightly
549 /// superfluous (this field is `true` for exactly the duration of each
550 /// call to run(), and false otherwise), but could be useful if the caller
551 /// wants to check if the resolver is running for some reason.
552 ///
553 pub is_running: bool,
554 ///
555 /// The ManateePrimaryResolver's root log
556 ///
557 log: Logger,
558}
559
560impl ManateePrimaryResolver {
561 ///
562 /// Creates a new ManateePrimaryResolver instance.
563 ///
564 /// # Arguments
565 ///
566 /// * `conn_str` - a comma-separated list of the zookeeper instances
567 /// in the cluster
568 /// * `path` - The path to the root node in zookeeper for the shard we're
569 /// watching
570 ///
571 pub fn new(
572 conn_str: ZkConnectString,
573 path: String,
574 log: Option<Logger>,
575 ) -> Self {
576 let cluster_state_path = [&path, "/state"].concat();
577
578 //
579 // Add the log_values to the passed-in logger, or create a new logger if
580 // the caller did not pass one in
581 //
582 let log = log.unwrap_or_else(|| {
583 Logger::root(
584 Mutex::new(LevelFilter::new(
585 slog_bunyan::with_name(crate_name!(), std::io::stdout())
586 .build(),
587 slog::Level::Info,
588 ))
589 .fuse(),
590 o!("build-id" => crate_version!()),
591 )
592 });
593
594 ManateePrimaryResolver {
595 conn_str_state: Arc::new(Mutex::new(ZkConnectStringState::new(
596 conn_str,
597 ))),
598 cluster_state_path,
599 last_backend: Arc::new(Mutex::new(None)),
600 is_running: false,
601 log,
602 }
603 }
604}
605
606impl Resolver for ManateePrimaryResolver {
607 //
608 // The resolver object is not Sync, so we can assume that only one instance
609 // of this function is running at once, because callers will have to control
610 // concurrent access.
611 //
612 // If the connection pool closes the receiving end of the channel, this
613 // function may not return right away -- this function will not notice that
614 // the pool has disconnected until this function tries to send another
615 // heartbeat, at which point this function will return. This means that the
616 // time between disconnection and function return is at most the length of
617 // HEARTBEAT_INTERVAL. Any change in the meantime will be picked up by the
618 // next call to run().
619 //
620 // Indeed, the heartbeat messages exist solely as a time-boxed method to
621 // test whether the connection pool has closed the channel, so we don't leak
622 // resolver threads.
623 //
624 fn run(&mut self, s: Sender<BackendMsg>) {
625 debug!(self.log, "run() method entered");
626
627 let mut rt = Runtime::new().unwrap();
628 //
629 // There's no need to check if the pool is already running and return
630 // early, because multiple instances of this function _cannot_ be
631 // running concurrently -- see this function's header comment.
632 //
633 self.is_running = true;
634
635 let conn_backoff = Arc::new(Mutex::new(ResolverBackoff::new(
636 self.log.new(o!("component" => "conn_backoff")),
637 )));
638 let watch_backoff = Arc::new(Mutex::new(ResolverBackoff::new(
639 self.log.new(o!("component" => "watch_backoff")),
640 )));
641 let loop_core = ResolverCore {
642 pool_tx: s.clone(),
643 last_backend: Arc::clone(&self.last_backend),
644 conn_str_state: Arc::clone(&self.conn_str_state),
645 cluster_state_path: self.cluster_state_path.clone(),
646 conn_backoff,
647 watch_backoff,
648 log: self.log.clone(),
649 };
650 let at_log = self.log.clone();
651
652 let exited = Arc::new(AtomicBool::new(false));
653 let exited_clone = Arc::clone(&exited);
654
655 //
656 // Start the event-processing task. This is structured as two nested
657 // loops: one to handle the zookeeper connection and one to handle
658 // setting the watch. These are handled by the connect_loop() and
659 // watch_loop() functions, respectively.
660 //
661 info!(self.log, "run(): starting runtime");
662 rt.spawn(
663 //
664 // Outer loop. Handles connecting to zookeeper. A new loop iteration
665 // means a new zookeeper connection. We break from the loop if we
666 // discover that the user has closed the receiving channel, which
667 // is their sole means of stopping the client.
668 //
669 // Arg: Time to wait before attempting to connect. Initially 0s.
670 // Repeated iterations of the loop set a delay before
671 // connecting.
672 // Loop::Break type: ()
673 //
674 loop_fn(NO_DELAY, move |delay| {
675 loop_core.clone().connect_loop(delay)
676 })
677 .and_then(move |_| {
678 info!(at_log, "Event-processing task stopping");
679 exited_clone.store(true, Ordering::Relaxed);
680 Ok(())
681 })
682 .map(|_| ())
683 .map_err(|_| {
684 unreachable!("connect_loop() should never return an error")
685 }),
686 );
687
688 //
689 // Heartbeat-sending loop. If we break from this loop, the resolver
690 // exits.
691 //
692 loop {
693 if exited.load(Ordering::Relaxed) {
694 info!(
695 self.log,
696 "event-processing task exited; stopping heartbeats"
697 );
698 break;
699 }
700 if s.send(BackendMsg::HeartbeatMsg).is_err() {
701 info!(self.log, "Connection pool channel closed");
702 break;
703 }
704 thread::sleep(HEARTBEAT_INTERVAL);
705 }
706
707 //
708 // We shut down the background watch-looping thread. It may have already
709 // exited by itself if it noticed that the connection pool closed its
710 // channel, but there's no harm still calling shutdown_now() in that
711 // case.
712 //
713 info!(self.log, "Stopping runtime");
714 rt.shutdown_now().wait().unwrap();
715 info!(self.log, "Runtime stopped successfully");
716 self.is_running = false;
717 debug!(self.log, "run() returned successfully");
718 }
719}
720
721//
722// Returns a mock event that can be used to bootstrap the watch loop. We use
723// a NodeDataChanged event because the watch loop's response to this type of
724// event is to get the data from the node being watched, which is also what
725// we want to do when we start the watch loop anew.
726//
727fn mock_event() -> WatchedEvent {
728 WatchedEvent {
729 event_type: WatchedEventType::NodeDataChanged,
730 //
731 // This artificial keeper_state doesn't necessarily reflect reality, but
732 // that's ok because it's paired with an artificial NodeDataChanged
733 // event, and our handling for this type of event doesn't involve the
734 // keeper_state field.
735 //
736 keeper_state: KeeperState::SyncConnected,
737 //
738 // We never use `path`, so we might as well set it to a clarifying
739 // string in our artificially constructed WatchedEvent object.
740 //
741 path: "MOCK_EVENT_PATH".to_string(),
742 }
743}
744
745//
746// Parses the given zookeeper node data into a Backend object, compares it to
747// the last Backend sent to the cueball connection pool, and sends it to the
748// connection pool if the values differ.
749//
750// We need to extract two pieces of data from the "primary" json object in the
751// json returned by zookeeper:
752// * The backend's IP address
753// * The backend's port
754// The json object has an "ip" field, but not a port field. However, the port
755// is contained in the "pgUrl" field, so this function extracts it from that.
756// The port could also be extracted from the "id" field, but the pgUrl field is
757// a convenient choice as it can be parsed structurally as a url and the port
758// extracted from there.
759//
760// What this all means is: the resolver relies on the "primary.ip" and
761// "primary.pgUrl" fields as an _interface_ to the zookeeper data. This feels a
762// little ad-hoc and should be formalized and documented.
763//
764// # Arguments
765//
766// * `pool_tx` - The Sender upon which to send the update message
767// * `new_value` - The raw zookeeper data we've newly retrieved
768// * `last_backend` - The last Backend we sent to the connection pool
769// * `log` - The Logger to be used for logging
770//
771fn process_value(
772 pool_tx: &Sender<BackendMsg>,
773 new_value: &[u8],
774 last_backend: Arc<Mutex<Option<BackendKey>>>,
775 log: Logger,
776) -> Result<(), ResolverError> {
777 debug!(log, "process_value() entered");
778
779 // Parse the bytes into a json object
780 let v: SerdeJsonValue = match serde_json::from_slice(&new_value) {
781 Ok(v) => v,
782 Err(_) => {
783 return Err(ResolverError::InvalidZkJson);
784 }
785 };
786
787 //
788 // Parse out the ip. We expect the json fields to exist, and return an error
789 // if they don't, or if they are of the wrong type.
790 //
791 let ip = match &v["primary"]["ip"] {
792 SerdeJsonValue::String(s) => match BackendAddress::from_str(s) {
793 Ok(s) => s,
794 Err(_) => {
795 return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
796 }
797 },
798 SerdeJsonValue::Null => {
799 return Err(ResolverError::MissingZkData(ZkDataField::Ip));
800 }
801 _ => {
802 return Err(ResolverError::InvalidZkData(ZkDataField::Ip));
803 }
804 };
805
806 //
807 // Parse out the port. We expect the json fields to exist, and return an
808 // error if they don't, or if they are of the wrong type.
809 //
810 let port = match &v["primary"]["pgUrl"] {
811 SerdeJsonValue::String(s) => match Url::parse(s) {
812 Ok(url) => match url.port() {
813 Some(port) => port,
814 None => {
815 return Err(ResolverError::MissingZkData(
816 ZkDataField::Port,
817 ));
818 }
819 },
820 Err(_) => {
821 return Err(ResolverError::InvalidZkData(
822 ZkDataField::PostgresUrl,
823 ));
824 }
825 },
826 SerdeJsonValue::Null => {
827 return Err(ResolverError::MissingZkData(ZkDataField::PostgresUrl));
828 }
829 _ => {
830 return Err(ResolverError::InvalidZkData(ZkDataField::PostgresUrl));
831 }
832 };
833
834 // Construct a backend and key
835 let backend = Backend::new(&ip, port);
836 let backend_key = backend::srv_key(&backend);
837
838 // Determine whether we need to send the new backend over
839 let mut last_backend = last_backend.lock().unwrap();
840 let should_send = match (*last_backend).clone() {
841 Some(lb) => lb != backend_key,
842 None => true,
843 };
844
845 // Send the new backend if necessary
846 if should_send {
847 info!(log, "New backend found; sending to connection pool";
848 "backend" => LogItem(backend.clone()));
849 if pool_tx
850 .send(BackendMsg::AddedMsg(BackendAddedMsg {
851 key: backend_key.clone(),
852 backend,
853 }))
854 .is_err()
855 {
856 return Err(ResolverError::ConnectionPoolShutdown);
857 }
858
859 let lb_clone = (*last_backend).clone();
860 *last_backend = Some(backend_key);
861
862 //
863 // Notify the connection pool that the old backend should be
864 // removed, if the old backend is not None
865 //
866 if let Some(lbc) = lb_clone {
867 info!(log, "Notifying connection pool of removal of old backend");
868 if pool_tx
869 .send(BackendMsg::RemovedMsg(BackendRemovedMsg(lbc)))
870 .is_err()
871 {
872 return Err(ResolverError::ConnectionPoolShutdown);
873 }
874 }
875 } else {
876 info!(log, "New backend value does not differ; not sending");
877 }
878 debug!(log, "process_value() returned successfully");
879 Ok(())
880}
881
882//
883// Encapsulates all of the resolver state used in the futures context.
884//
885// Note that this struct's methods consume the instance of the struct they are
886// called upon. The struct should be freely cloned to accommodate this.
887//
888#[derive(Clone)]
889struct ResolverCore {
890 // The Sender that this function should use to communicate with the cueball
891 // connection pool
892 pool_tx: Sender<BackendMsg>,
893 // The key representation of the last backend sent to the cueball connection
894 // pool. It will be updated by process_value() if we send a new backend over
895 last_backend: Arc<Mutex<Option<BackendKey>>>,
896 // The addresses of the Zookeeper cluster the Resolver is connecting to,
897 // along with associated state
898 conn_str_state: Arc<Mutex<ZkConnectStringState>>,
899 // The path to the cluster state node in zookeeper for the shard we're
900 // watching
901 cluster_state_path: String,
902 // The exponential backoff state for the ZooKeeper connection
903 conn_backoff: Arc<Mutex<ResolverBackoff>>,
904 // The exponential backoff state for watching the ZooKeeper node
905 watch_backoff: Arc<Mutex<ResolverBackoff>>,
906 log: Logger,
907}
908
909impl ResolverCore {
910 //
911 // This function represents the body of the connect loop. It handles the
912 // ZooKeeper connection, and calls watch_loop() to watch the ZooKeeper node.
913 //
914 // # Arguments
915 //
916 // * `delay` - The length of time to wait before attempting to connect
917 //
918 fn connect_loop(
919 self,
920 delay: Duration,
921 ) -> impl Future<Item = Loop<(), Duration>, Error = ()> + Send {
922 let log = self.log.clone();
923 let oe_log = log.clone();
924 let oe_conn_backoff = Arc::clone(&self.conn_backoff);
925 let oe_conn_str_state = Arc::clone(&self.conn_str_state);
926
927 Delay::new(Instant::now() + delay)
928 .and_then(move |_| {
929 let mut builder = ZooKeeperBuilder::default();
930 builder.set_timeout(SESSION_TIMEOUT);
931 builder.set_logger(log.new(o!(
932 "component" => "zookeeper"
933 )));
934
935 // Get the next address to connect to
936 let mut state = self.conn_str_state.lock().unwrap();
937 let addr = state.next_addr();
938 drop(state);
939
940 info!(log, "Connecting to ZooKeeper"; "addr" => addr);
941
942 //
943 // We expect() the result of get_addr_at() because we anticipate
944 // the connect string having at least one element, and we can't
945 // do anything useful if it doesn't.
946 //
947 builder
948 .connect(&addr)
949 .timeout(TCP_CONNECT_TIMEOUT)
950 .and_then(move |(zk, default_watcher)| {
951 info!(log, "Connected to ZooKeeper";
952 "addr" => addr);
953
954 //
955 // We've connected successfully, so reset the
956 // connection attempts
957 //
958 let mut state = self.conn_str_state.lock().unwrap();
959 state.reset_attempts();
960 drop(state);
961
962 //
963 // Main change-watching loop. A new loop iteration means
964 // we're setting a new watch (if necessary) and waiting
965 // for a result. Breaking from the loop means that we've
966 // hit some error and are returning control to the outer
967 // loop.
968 //
969 // Arg: WatchLoopState -- we set curr_event to an
970 // artificially constructed WatchedEvent for the
971 // first loop iteration, so the connection pool will
972 // be initialized with the initial primary as its
973 // backend.
974 // Loop::Break type: NextAction -- this value is used to
975 // instruct the outer loop (this function) whether
976 // to try to reconnect or terminate.
977 //
978 loop_fn(
979 WatchLoopState {
980 watcher: Box::new(default_watcher),
981 curr_event: mock_event(),
982 delay: NO_DELAY,
983 },
984 move |loop_state| {
985 //
986 // These fields require a new clone for every
987 // loop iteration, but they don't actually
988 // change from iteration to iteration, so
989 // they're not included as part of loop_state.
990 //
991 let loop_core = self.clone();
992 let zk = zk.clone();
993 loop_core
994 .watch_loop(zk, loop_state)
995 .map_err(TimeoutError::inner)
996 },
997 )
998 .and_then(move |next_action| {
999 ok(match next_action {
1000 NextAction::Stop => Loop::Break(()),
1001 //
1002 // We reconnect immediately here instead of
1003 // waiting, because if we're here it means that
1004 // we came from the inner loop and thus we just
1005 // had a valid connection terminate (as opposed
1006 // to the `or_else` block below, were we've just
1007 // tried to connect and failed), and thus
1008 // there's no reason for us to delay trying to
1009 // connect again.
1010 //
1011 NextAction::Reconnect(delay) => {
1012 Loop::Continue(delay)
1013 }
1014 })
1015 })
1016 })
1017 .or_else(move |error| {
1018 error!(oe_log, "Error connecting to ZooKeeper cluster";
1019 "error" => LogItem(error));
1020 ok(Loop::Continue(next_delay(
1021 &oe_conn_backoff,
1022 &DelayBehavior::CheckConnState(oe_conn_str_state),
1023 )))
1024 })
1025 })
1026 .map_err(|e| panic!("delay errored; err: {:?}", e))
1027 }
1028
1029 //
1030 // This function represents the body of the watch loop. It both sets and
1031 // handles the watch, and calls process_value() to send new data to the
1032 // connection pool as the data arrives.
1033 //
1034 // This function can return from two states: before we've waited for the
1035 // watch to fire (if we hit an error before waiting), or after we've waited
1036 // for the watch to fire (this could be a success or an error). These two
1037 // states require returning different Future types, so we wrap the returned
1038 // values in a future::Either to satisfy the type checker.
1039 //
1040 // # Arguments
1041 //
1042 // * `zk` - The ZooKeeper client object
1043 // * `loop_state`: The state to be passed to the next iteration of loop_fn()
1044 //
1045 fn watch_loop(
1046 self,
1047 zk: ZooKeeper,
1048 loop_state: WatchLoopState,
1049 ) -> impl Future<Item = Loop<NextAction, WatchLoopState>, Error = FailureError>
1050 + Send {
1051 let watcher = loop_state.watcher;
1052 let curr_event = loop_state.curr_event;
1053 let delay = loop_state.delay;
1054 let check_behavior =
1055 DelayBehavior::CheckConnState(self.conn_str_state.clone());
1056 //
1057 // TODO avoid mutex boilerplate from showing up in the log
1058 //
1059 let log = self.log.new(o!(
1060 "curr_event" => LogItem(curr_event.clone()),
1061 "delay" => LogItem(delay),
1062 "last_backend" => LogItem(Arc::clone(&self.last_backend))
1063 ));
1064
1065 let oe_log = log.clone();
1066 let oe_conn_backoff = Arc::clone(&self.conn_backoff);
1067 let oe_check_behavior = check_behavior.clone();
1068
1069 //
1070 // Helper function to handle the arcmut and the loop boilerplate
1071 //
1072 fn next_delay_action(
1073 backoff: &Arc<Mutex<ResolverBackoff>>,
1074 behavior: &DelayBehavior,
1075 ) -> Loop<NextAction, WatchLoopState> {
1076 Loop::Break(NextAction::Reconnect(next_delay(backoff, behavior)))
1077 }
1078
1079 //
1080 // We set the watch here. If the previous iteration of the loop ended
1081 // because the keeper state changed rather than because the watch fired,
1082 // the watch will already have been set, so we don't _need_ to set it
1083 // here. With that said, it does no harm (zookeeper deduplicates watches
1084 // on the server side), and it may not be worth the effort to optimize
1085 // for this case, since keeper state changes (and, indeed, changes of
1086 // any sort) should happen infrequently.
1087 //
1088 info!(log, "Getting data");
1089 Delay::new(Instant::now() + delay)
1090 .and_then(move |_| {
1091 zk
1092 .watch()
1093 .get_data(&self.cluster_state_path)
1094 .and_then(move |(_, data)| {
1095 match curr_event.event_type {
1096 // Keeper state has changed
1097 WatchedEventType::None => {
1098 match curr_event.keeper_state {
1099 //
1100 // TODO will these cases ever happen? Because if the
1101 // keeper state is "bad", then get_data() will have
1102 // failed and we won't be here.
1103 //
1104 KeeperState::Disconnected |
1105 KeeperState::AuthFailed |
1106 KeeperState::Expired => {
1107 error!(log, "Keeper state changed; reconnecting";
1108 "keeper_state" =>
1109 LogItem(curr_event.keeper_state));
1110 return Either::A(ok(next_delay_action(
1111 &self.conn_backoff,
1112 &check_behavior
1113 )));
1114 },
1115 KeeperState::SyncConnected |
1116 KeeperState::ConnectedReadOnly |
1117 KeeperState::SaslAuthenticated => {
1118 info!(log, "Keeper state changed";
1119 "keeper_state" =>
1120 LogItem(curr_event.keeper_state));
1121 }
1122 }
1123 },
1124 // The data watch fired
1125 WatchedEventType::NodeDataChanged => {
1126 //
1127 // We didn't get the data, which means the node doesn't
1128 // exist yet. We should wait a bit and try again. We'll
1129 // just use the same event as before.
1130 //
1131 if data.is_none() {
1132 info!(log, "ZK data does not exist yet");
1133 let delay = next_delay(
1134 &self.watch_backoff,
1135 &DelayBehavior::AlwaysWait
1136 );
1137 return Either::A(ok(Loop::Continue(WatchLoopState {
1138 watcher,
1139 curr_event,
1140 delay,
1141 })));
1142 }
1143 //
1144 // Discard the Stat from the data, as we don't use it.
1145 //
1146 let data = data.unwrap().0;
1147 info!(log, "got data"; "data" => LogItem(data.clone()));
1148 match process_value(
1149 &self.pool_tx.clone(),
1150 &data,
1151 Arc::clone(&self.last_backend),
1152 log.clone()
1153 ) {
1154 Ok(_) => {},
1155 Err(e) => {
1156 error!(log, ""; "error" => LogItem(e.clone()));
1157 //
1158 // The error is between the client and the
1159 // outward-facing channel, not between the
1160 // client and the zookeeper connection, so we
1161 // don't have to attempt to reconnect here and
1162 // can continue, unless the error tells us to
1163 // stop.
1164 //
1165 if e.should_stop() {
1166 return Either::A(ok(Loop::Break(
1167 NextAction::Stop)));
1168 }
1169 }
1170 }
1171 },
1172 WatchedEventType::NodeDeleted => {
1173 //
1174 // The node doesn't exist, but we can't use the existing
1175 // event, or we'll just loop on this case forever. We
1176 // use the mock event instead.
1177 //
1178 info!(log, "ZK node deleted");
1179 let delay = next_delay(
1180 &self.watch_backoff,
1181 &DelayBehavior::AlwaysWait
1182 );
1183 return Either::A(ok(Loop::Continue(WatchLoopState {
1184 watcher,
1185 curr_event: mock_event(),
1186 delay,
1187 })));
1188 },
1189 e => panic!("Unexpected event received: {:?}", e)
1190 };
1191
1192 //
1193 // If we got here, we're waiting for the watch to fire. Before
1194 // this point, we wrap the return value in Either::A. After this
1195 // point, we wrap the return value in Either::B. See the comment
1196 // about "Either" some lines above.
1197 //
1198 let oe_log = log.clone();
1199 let oe_conn_backoff = Arc::clone(&self.conn_backoff);
1200 let oe_check_behavior = check_behavior.clone();
1201
1202 info!(log, "Watching for change");
1203 Either::B(watcher.into_future()
1204 .and_then(move |(event, watcher)| {
1205 let loop_next = match event {
1206 Some(event) => {
1207 info!(log, "change event received; looping to \
1208 process event"; "event" =>
1209 LogItem(event.clone()));
1210 Loop::Continue(WatchLoopState {
1211 watcher,
1212 curr_event: event,
1213 delay: NO_DELAY
1214 })
1215 },
1216 //
1217 // If we didn't get a valid event, this means the
1218 // Stream got closed, which indicates a connection
1219 // issue, so we reconnect.
1220 //
1221 None => {
1222 error!(log, "Event stream closed; reconnecting");
1223 next_delay_action(
1224 &self.conn_backoff,
1225 &check_behavior
1226 )
1227 }
1228 };
1229 ok(loop_next)
1230 })
1231 .or_else(move |_| {
1232 //
1233 // If we get an error from the event Stream, we assume
1234 // that something went wrong with the zookeeper
1235 // connection and attempt to reconnect.
1236 //
1237 // The stream's error type is (), so there's no
1238 // information to extract from it.
1239 //
1240 error!(oe_log, "Error received from event stream");
1241 ok(next_delay_action(
1242 &oe_conn_backoff,
1243 &oe_check_behavior
1244 ))
1245 }))
1246 })
1247 //
1248 // If some error occurred getting the data, we assume we should
1249 // reconnect to the zookeeper server.
1250 //
1251 .or_else(move |error| {
1252 error!(oe_log, "Error getting data"; "error" => LogItem(error));
1253 ok(next_delay_action(
1254 &oe_conn_backoff,
1255 &oe_check_behavior
1256 ))
1257 })
1258 })
1259 .map_err(|e| panic!("delay errored; err: {:?}", e))
1260 }
1261}
1262
1263//
1264// Unit tests
1265//
1266// The "." path attribute below is so paths within the `test` submodule will be
1267// relative to "src" rather than "src/test", which does not exist. This allows
1268// us to import the "../tests/test_data.rs" module here.
1269//
1270#[path = "."]
1271#[cfg(test)]
1272mod test {
1273 use super::*;
1274
1275 use std::iter;
1276 use std::sync::mpsc::TryRecvError;
1277 use std::sync::{Arc, Mutex};
1278 use std::vec::Vec;
1279
1280 use quickcheck::{quickcheck, Arbitrary, Gen};
1281
1282 use common::{test_data, util};
1283
1284 impl Arbitrary for ZkConnectString {
1285 fn arbitrary<G: Gen>(g: &mut G) -> Self {
1286 let size = usize::arbitrary(g);
1287 ZkConnectString(
1288 iter::repeat(())
1289 .map(|()| SocketAddr::arbitrary(g))
1290 .take(size)
1291 .collect(),
1292 )
1293 }
1294 }
1295
1296 //
1297 // Test parsing ZkConnectString from string
1298 //
1299 quickcheck! {
1300 fn prop_zk_connect_string_parse(
1301 conn_str: ZkConnectString
1302 ) -> bool
1303 {
1304 //
1305 // We expect an error only if the input string was zero-length
1306 //
1307 match ZkConnectString::from_str(&conn_str.to_string()) {
1308 Ok(cs) => cs == conn_str,
1309 _ => conn_str.to_string() == ""
1310 }
1311 }
1312 }
1313
1314 // Below: test process_value()
1315
1316 //
1317 // Represents a process_value test case, including inputs and expected
1318 // outputs.
1319 //
1320 struct ProcessValueFields {
1321 value: Vec<u8>,
1322 last_backend: BackendKey,
1323 expected_error: Option<ResolverError>,
1324 added_backend: Option<BackendAddedMsg>,
1325 removed_backend: Option<BackendRemovedMsg>,
1326 }
1327
1328 //
1329 // Run a process_value test case
1330 //
1331 fn run_process_value_fields(input: ProcessValueFields) {
1332 let (tx, rx) = channel();
1333 let last_backend = Arc::new(Mutex::new(Some(input.last_backend)));
1334
1335 let result = process_value(
1336 &tx.clone(),
1337 &input.value,
1338 last_backend,
1339 util::log_from_env(util::DEFAULT_LOG_LEVEL).unwrap(),
1340 );
1341 match input.expected_error {
1342 None => assert_eq!(result, Ok(())),
1343 Some(expected_error) => assert_eq!(result, Err(expected_error)),
1344 }
1345
1346 let mut received_messages = Vec::new();
1347
1348 let expected_message_count = {
1349 let mut acc = 0;
1350 if input.added_backend.is_some() {
1351 acc += 1;
1352 }
1353 if input.removed_backend.is_some() {
1354 acc += 1;
1355 }
1356 acc
1357 };
1358
1359 // Receive as many messages as we expect
1360 for i in 0..expected_message_count {
1361 let channel_result = rx.try_recv();
1362 match channel_result {
1363 Err(e) => panic!(
1364 "Unexpected error receiving on channel: {:?} \
1365 -- Loop iteration: {:?}",
1366 e, i
1367 ),
1368 Ok(result) => {
1369 received_messages.push(result);
1370 }
1371 }
1372 }
1373
1374 //
1375 // Make sure there are not more messages than we expect on the channel.
1376 // Can't use assert_eq! here because BackendMsg doesn't implement Debug.
1377 //
1378 match rx.try_recv() {
1379 Err(TryRecvError::Empty) => (),
1380 _ => panic!("Unexpected message on resolver channel"),
1381 }
1382
1383 // Check that the "added" message was received if applicable
1384 if let Some(msg) = input.added_backend {
1385 let msg = BackendMsg::AddedMsg(msg);
1386 match util::find_msg_match(&received_messages, &msg) {
1387 None => panic!("added_backend not found in received messages"),
1388 Some(index) => {
1389 received_messages.remove(index);
1390 ()
1391 }
1392 }
1393 }
1394
1395 // Check that the "removed" message was received if applicable
1396 if let Some(msg) = input.removed_backend {
1397 let msg = BackendMsg::RemovedMsg(msg);
1398 match util::find_msg_match(&received_messages, &msg) {
1399 None => {
1400 panic!("removed_backend not found in received messages")
1401 }
1402 Some(index) => {
1403 received_messages.remove(index);
1404 ()
1405 }
1406 }
1407 }
1408 }
1409
1410 #[test]
1411 fn process_value_test_port_ip_change() {
1412 let data_1 = test_data::backend_ip1_port1();
1413 let data_2 = test_data::backend_ip2_port2();
1414
1415 run_process_value_fields(ProcessValueFields {
1416 value: data_2.raw_vec(),
1417 last_backend: data_1.key(),
1418 expected_error: None,
1419 added_backend: Some(data_2.added_msg()),
1420 removed_backend: Some(data_1.removed_msg()),
1421 });
1422 }
1423
1424 #[test]
1425 fn process_value_test_port_change() {
1426 let data_1 = test_data::backend_ip1_port1();
1427 let data_2 = test_data::backend_ip2_port1();
1428
1429 run_process_value_fields(ProcessValueFields {
1430 value: data_2.raw_vec(),
1431 last_backend: data_1.key(),
1432 expected_error: None,
1433 added_backend: Some(data_2.added_msg()),
1434 removed_backend: Some(data_1.removed_msg()),
1435 });
1436 }
1437
1438 #[test]
1439 fn process_value_test_ip_change() {
1440 let data_1 = test_data::backend_ip1_port1();
1441 let data_2 = test_data::backend_ip1_port2();
1442
1443 run_process_value_fields(ProcessValueFields {
1444 value: data_2.raw_vec(),
1445 last_backend: data_1.key(),
1446 expected_error: None,
1447 added_backend: Some(data_2.added_msg()),
1448 removed_backend: Some(data_1.removed_msg()),
1449 });
1450 }
1451
1452 #[test]
1453 fn process_value_test_no_change() {
1454 let data = test_data::backend_ip1_port1();
1455
1456 run_process_value_fields(ProcessValueFields {
1457 value: data.raw_vec(),
1458 last_backend: data.key(),
1459 expected_error: None,
1460 added_backend: None,
1461 removed_backend: None,
1462 });
1463 }
1464
1465 #[test]
1466 fn process_value_test_no_ip() {
1467 let filler = test_data::backend_ip1_port1();
1468
1469 run_process_value_fields(ProcessValueFields {
1470 value: test_data::no_ip_vec(),
1471 last_backend: filler.key(),
1472 expected_error: Some(ResolverError::MissingZkData(ZkDataField::Ip)),
1473 added_backend: None,
1474 removed_backend: None,
1475 });
1476 }
1477
1478 #[test]
1479 fn process_value_test_wrong_type_ip() {
1480 let filler = test_data::backend_ip1_port1();
1481
1482 run_process_value_fields(ProcessValueFields {
1483 value: test_data::wrong_type_ip_vec(),
1484 last_backend: filler.key(),
1485 expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
1486 added_backend: None,
1487 removed_backend: None,
1488 });
1489 }
1490
1491 #[test]
1492 fn process_value_test_invalid_ip() {
1493 let filler = test_data::backend_ip1_port1();
1494
1495 run_process_value_fields(ProcessValueFields {
1496 value: test_data::invalid_ip_vec(),
1497 last_backend: filler.key(),
1498 expected_error: Some(ResolverError::InvalidZkData(ZkDataField::Ip)),
1499 added_backend: None,
1500 removed_backend: None,
1501 });
1502 }
1503
1504 #[test]
1505 fn process_value_test_no_pg_url() {
1506 let filler = test_data::backend_ip1_port1();
1507
1508 run_process_value_fields(ProcessValueFields {
1509 value: test_data::no_pg_url_vec(),
1510 last_backend: filler.key(),
1511 expected_error: Some(ResolverError::MissingZkData(
1512 ZkDataField::PostgresUrl,
1513 )),
1514 added_backend: None,
1515 removed_backend: None,
1516 });
1517 }
1518
1519 #[test]
1520 fn process_value_test_wrong_type_pg_url() {
1521 let filler = test_data::backend_ip1_port1();
1522
1523 run_process_value_fields(ProcessValueFields {
1524 value: test_data::wrong_type_pg_url_vec(),
1525 last_backend: filler.key(),
1526 expected_error: Some(ResolverError::InvalidZkData(
1527 ZkDataField::PostgresUrl,
1528 )),
1529 added_backend: None,
1530 removed_backend: None,
1531 });
1532 }
1533
1534 #[test]
1535 fn process_value_test_invalid_pg_url() {
1536 let filler = test_data::backend_ip1_port1();
1537
1538 run_process_value_fields(ProcessValueFields {
1539 value: test_data::invalid_pg_url_vec(),
1540 last_backend: filler.key(),
1541 expected_error: Some(ResolverError::InvalidZkData(
1542 ZkDataField::PostgresUrl,
1543 )),
1544 added_backend: None,
1545 removed_backend: None,
1546 });
1547 }
1548
1549 #[test]
1550 fn process_value_test_no_port_pg_url() {
1551 let filler = test_data::backend_ip1_port1();
1552
1553 run_process_value_fields(ProcessValueFields {
1554 value: test_data::no_port_pg_url_vec(),
1555 last_backend: filler.key(),
1556 expected_error: Some(ResolverError::MissingZkData(
1557 ZkDataField::Port,
1558 )),
1559 added_backend: None,
1560 removed_backend: None,
1561 });
1562 }
1563
1564 #[test]
1565 fn process_value_test_invalid_json() {
1566 let filler = test_data::backend_ip1_port1();
1567
1568 run_process_value_fields(ProcessValueFields {
1569 value: test_data::invalid_json_vec(),
1570 last_backend: filler.key(),
1571 expected_error: Some(ResolverError::InvalidZkJson),
1572 added_backend: None,
1573 removed_backend: None,
1574 });
1575 }
1576}