serf_core/
coordinate.rs

1use std::{
2  collections::HashMap,
3  sync::atomic::{AtomicUsize, Ordering},
4  time::Duration,
5};
6
7use byteorder::{ByteOrder, NetworkEndian};
8use memberlist_core::CheapClone;
9use parking_lot::RwLock;
10use rand::Rng;
11use serf_types::Transformable;
12use smallvec::SmallVec;
13
14/// Used to convert float seconds to nanoseconds.
15const SECONDS_TO_NANOSECONDS: f64 = 1.0e9;
16/// Used to decide if two coordinates are on top of each
17/// other.
18const ZERO_THRESHOLD: f64 = 1.0e-6;
19
20/// The default dimensionality of the coordinate system.
21const DEFAULT_DIMENSIONALITY: usize = 8;
22
23/// The default adjustment window size.
24const DEFAULT_ADJUSTMENT_WINDOW_SIZE: usize = 20;
25
26const DEFAULT_LATENCY_FILTER_SAMPLES_SIZE: usize = 8;
27
28/// Error type for the [`Coordinate`].
29#[derive(Debug, thiserror::Error, PartialEq, Eq)]
30pub enum CoordinateError {
31  /// Returned when the dimensions of the coordinates are not compatible.
32  #[error("dimensions aren't compatible")]
33  DimensionalityMismatch,
34  /// Returned when the coordinate is invalid.
35  #[error("invalid coordinate")]
36  InvalidCoordinate,
37  /// Returned when the round trip time is not in a valid range.
38  #[error("round trip time not in valid range, duration {0:?} is not a value less than 10s")]
39  InvalidRTT(Duration),
40}
41
42/// CoordinateOptions is used to set the parameters of the Vivaldi-based coordinate mapping
43/// algorithm.
44///
45/// The following references are called out at various points in the documentation
46/// here:
47///
48/// [1] Dabek, Frank, et al. "Vivaldi: A decentralized network coordinate system."
49///     ACM SIGCOMM Computer Communication Review. Vol. 34. No. 4. ACM, 2004.
50/// [2] Ledlie, Jonathan, Paul Gardner, and Margo I. Seltzer. "Network Coordinates
51///     in the Wild." NSDI. Vol. 7. 2007.
52/// [3] Lee, Sanghwan, et al. "On suitability of Euclidean embedding for
53///     host-based network coordinate systems." Networking, IEEE/ACM Transactions
54///     on 18.1 (2010): 27-40.
55#[viewit::viewit(
56  vis_all = "pub(crate)",
57  getters(style = "ref", vis_all = "pub"),
58  setters(prefix = "with", vis_all = "pub")
59)]
60#[derive(Debug, Clone, PartialEq)]
61#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
62pub struct CoordinateOptions {
63  /// The dimensionality of the coordinate system. As discussed in [2], more
64  /// dimensions improves the accuracy of the estimates up to a point. Per [2]
65  /// we chose 8 dimensions plus a non-Euclidean height.
66  #[viewit(
67    getter(
68      const,
69      style = "move",
70      attrs(doc = "Returns the dimensionality of the coordinate system.")
71    ),
72    setter(attrs(doc = "Sets the dimensionality of the coordinate system."))
73  )]
74  dimensionality: usize,
75
76  /// The default error value when a node hasn't yet made
77  /// any observations. It also serves as an upper limit on the error value in
78  /// case observations cause the error value to increase without bound.
79  #[viewit(
80    getter(
81      const,
82      style = "move",
83      attrs(doc = "Returns the default error value when a node hasn't yet made any observations.")
84    ),
85    setter(attrs(
86      doc = "Sets the default error value when a node hasn't yet made any observations."
87    ))
88  )]
89  vivaldi_error_max: f64,
90
91  /// A tuning factor that controls the maximum impact an
92  /// observation can have on a node's confidence. See [1] for more details.
93  #[viewit(
94    getter(
95      const,
96      style = "move",
97      attrs(doc = "Returns the maximum impact an observation can have on a node's confidence.")
98    ),
99    setter(attrs(
100      doc = "Sets the maximum impact an observation can have on a node's confidence."
101    ))
102  )]
103  vivaldi_ce: f64,
104
105  /// A tuning factor that controls the maximum impact an
106  /// observation can have on a node's coordinate. See [1] for more details.
107  #[viewit(
108    getter(
109      const,
110      style = "move",
111      attrs(doc = "Returns the maximum impact an observation can have on a node's coordinate.")
112    ),
113    setter(attrs(
114      doc = "Sets the maximum impact an observation can have on a node's coordinate."
115    ))
116  )]
117  vivaldi_cc: f64,
118
119  /// A tuning factor that determines how many samples
120  /// we retain to calculate the adjustment factor as discussed in [3]. Setting
121  /// this to zero disables this feature.
122  #[viewit(
123    getter(
124      const,
125      style = "move",
126      attrs(doc = "Returns how many samples we retain to calculate the adjustment factor.")
127    ),
128    setter(attrs(doc = "Sets how many samples we retain to calculate the adjustment factor."))
129  )]
130  adjustment_window_size: usize,
131
132  /// The minimum value of the height parameter. Since this
133  /// always must be positive, it will introduce a small amount error, so
134  /// the chosen value should be relatively small compared to "normal"
135  /// coordinates.
136  #[viewit(
137    getter(
138      const,
139      style = "move",
140      attrs(doc = "Returns the minimum value of the height parameter.")
141    ),
142    setter(attrs(doc = "Sets the minimum value of the height parameter."))
143  )]
144  height_min: f64,
145
146  /// The maximum number of samples that are retained
147  /// per node, in order to compute a median. The intent is to ride out blips
148  /// but still keep the delay low, since our time to probe any given node is
149  /// pretty infrequent. See [2] for more details.
150  #[viewit(
151    getter(
152      const,
153      style = "move",
154      attrs(doc = "Returns the maximum number of samples that are retained per node.")
155    ),
156    setter(attrs(doc = "Sets the maximum number of samples that are retained per node."))
157  )]
158  latency_filter_size: usize,
159
160  /// A tuning factor that sets how much gravity has an effect
161  /// to try to re-center coordinates. See [2] for more details.
162  #[viewit(
163    getter(
164      const,
165      style = "move",
166      attrs(doc = "Returns how much gravity has an effect to try to re-center coordinates.")
167    ),
168    setter(attrs(doc = "Sets how much gravity has an effect to try to re-center coordinates."))
169  )]
170  gravity_rho: f64,
171
172  #[cfg(feature = "metrics")]
173  #[viewit(
174    getter(
175      const,
176      style = "ref",
177      attrs(
178        cfg(feature = "metrics"),
179        doc = "The metric labels used to identify the metrics for this coordinate client."
180      )
181    ),
182    setter(attrs(
183      cfg(feature = "metrics"),
184      doc = "Sets the metric labels used to identify the metrics for this coordinate client."
185    ))
186  )]
187  metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
188}
189
190impl Default for CoordinateOptions {
191  fn default() -> Self {
192    Self::new()
193  }
194}
195
196impl CoordinateOptions {
197  /// Returns a `CoordinateOptions` that has some default values suitable for
198  /// basic testing of the algorithm, but not tuned to any particular type of cluster.
199  #[inline]
200  pub fn new() -> Self {
201    Self {
202      dimensionality: DEFAULT_DIMENSIONALITY,
203      vivaldi_error_max: 1.5,
204      vivaldi_ce: 0.25,
205      vivaldi_cc: 0.25,
206      adjustment_window_size: 20,
207      height_min: 10.0e-6,
208      latency_filter_size: 3,
209      gravity_rho: 150.0,
210      #[cfg(feature = "metrics")]
211      metric_labels: std::sync::Arc::new(memberlist_core::types::MetricLabels::default()),
212    }
213  }
214}
215
216/// Used to record events that occur when updating coordinates.
217#[viewit::viewit(setters(prefix = "with"))]
218#[derive(Debug, Copy, Clone, Eq, PartialEq)]
219#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
220pub struct CoordinateClientStats {
221  /// Incremented any time we reset our local coordinate because
222  /// our calculations have resulted in an invalid state.
223  #[viewit(
224    getter(
225      const,
226      style = "move",
227      attrs(
228        doc = "Returns the number of times we reset our local coordinate because our calculations have resulted in an invalid state."
229      )
230    ),
231    setter(attrs(
232      doc = "Sets the number of times we reset our local coordinate because our calculations have resulted in an invalid state."
233    ))
234  )]
235  resets: usize,
236}
237
238impl Default for CoordinateClientStats {
239  #[inline]
240  fn default() -> Self {
241    Self::new()
242  }
243}
244
245impl CoordinateClientStats {
246  #[inline]
247  const fn new() -> Self {
248    Self { resets: 0 }
249  }
250}
251
252struct CoordinateClientInner<I> {
253  /// The current estimate of the client's network coordinate.
254  coord: Coordinate,
255
256  /// Origin is a coordinate sitting at the origin.
257  origin: Coordinate,
258
259  /// Contains the tuning parameters that govern the performance of
260  /// the algorithm.
261  opts: CoordinateOptions,
262
263  /// The current index into the adjustmentSamples slice.
264  adjustment_index: usize,
265
266  /// Used to store samples for the adjustment calculation.
267  adjustment_samples: SmallVec<[f64; DEFAULT_ADJUSTMENT_WINDOW_SIZE]>,
268
269  /// Used to store the last several RTT samples,
270  /// keyed by node name. We will use the config's LatencyFilterSamples
271  /// value to determine how many samples we keep, per node.
272  latency_filter_samples: HashMap<I, SmallVec<[f64; DEFAULT_LATENCY_FILTER_SAMPLES_SIZE]>>,
273}
274
275impl<I> CoordinateClientInner<I>
276where
277  I: CheapClone + Eq + core::hash::Hash,
278{
279  /// Applies a small amount of gravity to pull coordinates towards
280  /// the center of the coordinate system to combat drift. This assumes that the
281  /// mutex is locked already.
282  #[inline]
283  fn update_gravity(&mut self) {
284    let dist = self.origin.distance_to(&self.coord).as_secs();
285    let force = -1.0 * f64::powf((dist as f64) / self.opts.gravity_rho, 2.0);
286    self
287      .coord
288      .apply_force_in_place(self.opts.height_min, force, &self.origin);
289  }
290
291  #[inline]
292  fn latency_filter(&mut self, node: &I, rtt_seconds: f64) -> f64 {
293    let samples = self
294      .latency_filter_samples
295      .entry(node.cheap_clone())
296      .or_insert_with(|| SmallVec::with_capacity(self.opts.latency_filter_size));
297
298    // Add the new sample and trim the list, if needed.
299    samples.push(rtt_seconds);
300    if samples.len() > self.opts.latency_filter_size {
301      samples.remove(0);
302    }
303    // Sort a copy of the samples and return the median.
304    let mut tmp = SmallVec::<[f64; DEFAULT_LATENCY_FILTER_SAMPLES_SIZE]>::from_slice(samples);
305    tmp.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
306    tmp[tmp.len() / 2]
307  }
308
309  /// Updates the Vivaldi portion of the client's coordinate. This
310  /// assumes that the mutex has been locked already.
311  fn update_vivaldi(&mut self, other: &Coordinate, mut rtt_seconds: f64) {
312    const ZERO_THRESHOLD: f64 = 1.0e-6;
313
314    let dist = self.coord.distance_to(other).as_secs_f64();
315    rtt_seconds = rtt_seconds.max(ZERO_THRESHOLD);
316
317    let wrongness = ((dist - rtt_seconds) / rtt_seconds).abs();
318
319    let total_error = (self.coord.error + other.error).max(ZERO_THRESHOLD);
320
321    let weight = self.coord.error / total_error;
322    self.coord.error = ((self.opts.vivaldi_ce * weight * wrongness)
323      + (self.coord.error * (1.0 - self.opts.vivaldi_ce * weight)))
324      .min(self.opts.vivaldi_error_max);
325
326    let force = self.opts.vivaldi_cc * weight * (rtt_seconds - dist);
327    self
328      .coord
329      .apply_force_in_place(self.opts.height_min, force, other);
330  }
331
332  /// Updates the adjustment portion of the client's coordinate, if
333  /// the feature is enabled. This assumes that the mutex has been locked already.
334  fn update_adjustment(&mut self, other: &Coordinate, rtt_seconds: f64) {
335    if self.opts.adjustment_window_size == 0 {
336      return;
337    }
338    // Note that the existing adjustment factors don't figure in to this
339    // calculation so we use the raw distance here.
340    let dist = self.coord.raw_distance_to(other);
341    self.adjustment_samples[self.adjustment_index] = rtt_seconds - dist;
342    self.adjustment_index = (self.adjustment_index + 1) % self.opts.adjustment_window_size;
343
344    self.coord.adjustment =
345      self.adjustment_samples.iter().sum::<f64>() / (2.0 * self.opts.adjustment_window_size as f64);
346  }
347}
348
349/// Manages the estimated network coordinate for a given node, and adjusts
350/// it as the node observes round trip times and estimated coordinates from other
351/// nodes. The core algorithm is based on Vivaldi, see the documentation for Config
352/// for more details.
353///
354/// `CoordinateClient` is thread-safe.
355// TODO: are there any better ways to avoid using a RwLock?
356pub struct CoordinateClient<I> {
357  inner: RwLock<CoordinateClientInner<I>>,
358  /// Used to record events that occur when updating coordinates.
359  stats: AtomicUsize,
360}
361
362impl<I> Default for CoordinateClient<I> {
363  #[inline]
364  fn default() -> Self {
365    Self::new()
366  }
367}
368
369impl<I> CoordinateClient<I> {
370  /// Creates a new client.
371  #[inline]
372  pub fn new() -> Self {
373    Self {
374      inner: RwLock::new(CoordinateClientInner {
375        coord: Coordinate::new(),
376        origin: Coordinate::new(),
377        opts: CoordinateOptions::new(),
378        adjustment_index: 0,
379        adjustment_samples: SmallVec::from_slice(&[0.0; DEFAULT_ADJUSTMENT_WINDOW_SIZE]),
380        latency_filter_samples: HashMap::new(),
381      }),
382      stats: AtomicUsize::new(0),
383    }
384  }
385
386  /// Creates a new client with given options.
387  #[inline]
388  pub fn with_options(opts: CoordinateOptions) -> Self {
389    let mut samples = SmallVec::with_capacity(opts.adjustment_window_size);
390    samples.resize(opts.adjustment_window_size, 0.0);
391    Self {
392      inner: RwLock::new(CoordinateClientInner {
393        coord: Coordinate::with_options(opts.clone()),
394        origin: Coordinate::with_options(opts.clone()),
395        opts,
396        adjustment_index: 0,
397        adjustment_samples: samples,
398        latency_filter_samples: HashMap::new(),
399      }),
400      stats: AtomicUsize::new(0),
401    }
402  }
403
404  /// Returns a copy of the coordinate for this client.
405  #[inline]
406  pub fn get_coordinate(&self) -> Coordinate {
407    self.inner.read().coord.clone()
408  }
409
410  /// Forces the client's coordinate to a known state.
411  #[inline]
412  pub fn set_coordinate(&self, coord: Coordinate) -> Result<(), CoordinateError> {
413    let mut l = self.inner.write();
414    Self::check_coordinate(&l.coord, &coord).map(|_| l.coord = coord)
415  }
416
417  /// Returns a copy of stats for the client.
418  #[inline]
419  pub fn stats(&self) -> CoordinateClientStats {
420    CoordinateClientStats {
421      resets: self.stats.load(Ordering::Relaxed),
422    }
423  }
424
425  /// Returns the estimated RTT from the client's coordinate to other, the
426  /// coordinate for another node.
427  #[inline]
428  pub fn distance_to(&self, coord: &Coordinate) -> Duration {
429    self.inner.read().coord.distance_to(coord)
430  }
431
432  /// Returns an error if the coordinate isn't compatible with
433  /// this client, or if the coordinate itself isn't valid. This assumes the mutex
434  /// has been locked already.
435  #[inline]
436  fn check_coordinate(this: &Coordinate, coord: &Coordinate) -> Result<(), CoordinateError> {
437    if !this.is_compatible_with(coord) {
438      return Err(CoordinateError::DimensionalityMismatch);
439    }
440
441    if !coord.is_valid() {
442      return Err(CoordinateError::InvalidCoordinate);
443    }
444
445    Ok(())
446  }
447}
448
449impl<I> CoordinateClient<I>
450where
451  I: CheapClone + Eq + core::hash::Hash,
452{
453  /// Removes any client state for the given node.
454  #[inline]
455  pub fn forget_node(&self, node: &I) {
456    self.inner.write().latency_filter_samples.remove(node);
457  }
458
459  /// Update takes other, a coordinate for another node, and rtt, a round trip
460  /// time observation for a ping to that node, and updates the estimated position of
461  /// the client's coordinate. Returns the updated coordinate.
462  pub fn update(
463    &self,
464    node: &I,
465    other: &Coordinate,
466    rtt: Duration,
467  ) -> Result<Coordinate, CoordinateError> {
468    let mut l = self.inner.write();
469    Self::check_coordinate(&l.coord, other)?;
470
471    // The code down below can handle zero RTTs, which we have seen in
472    // https://github.com/hashicorp/consul/issues/3789, presumably in
473    // environments with coarse-grained monotonic clocks (we are still
474    // trying to pin this down). In any event, this is ok from a code PoV
475    // so we don't need to alert operators with spammy messages. We did
476    // add a counter so this is still observable, though.
477    const MAX_RTT: Duration = Duration::from_secs(10);
478
479    if rtt > MAX_RTT {
480      return Err(CoordinateError::InvalidRTT(rtt));
481    }
482
483    #[cfg(feature = "metrics")]
484    if rtt.is_zero() {
485      metrics::counter!("serf.coordinate.zero-rtt", l.opts.metric_labels.iter()).increment(1);
486    }
487
488    let rtt_seconds = l.latency_filter(node, rtt.as_secs_f64());
489    l.update_vivaldi(other, rtt_seconds);
490    l.update_adjustment(other, rtt_seconds);
491    l.update_gravity();
492
493    if !l.coord.is_valid() {
494      self.stats.fetch_add(1, Ordering::Acquire);
495      l.coord = Coordinate::with_options(l.opts.clone());
496    }
497
498    Ok(l.coord.clone())
499  }
500}
501
502/// A specialized structure for holding network coordinates for the
503/// Vivaldi-based coordinate mapping algorithm. All of the fields should be public
504/// to enable this to be serialized. All values in here are in units of seconds.
505#[viewit::viewit(getters(style = "move"), setters(prefix = "with"))]
506#[derive(Debug, Clone, PartialEq)]
507#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
508pub struct Coordinate {
509  /// The Euclidean portion of the coordinate. This is used along
510  /// with the other fields to provide an overall distance estimate. The
511  /// units here are seconds.
512  #[viewit(
513    getter(
514      const,
515      style = "ref",
516      attrs(doc = "Returns the Euclidean portion of the coordinate.")
517    ),
518    setter(attrs(doc = "Sets the Euclidean portion of the coordinate."))
519  )]
520  portion: SmallVec<[f64; DEFAULT_DIMENSIONALITY]>,
521  /// Reflects the confidence in the given coordinate and is updated
522  /// dynamically by the Vivaldi Client. This is dimensionless.
523  #[viewit(
524    getter(const, attrs(doc = "Returns the confidence in the given coordinate.")),
525    setter(attrs(doc = "Sets the confidence in the given coordinate."))
526  )]
527  error: f64,
528  /// A distance offset computed based on a calculation over
529  /// observations from all other nodes over a fixed window and is updated
530  /// dynamically by the Vivaldi Client. The units here are seconds.
531  #[viewit(
532    getter(const, attrs(doc = "Returns the distance offset.")),
533    setter(attrs(doc = "Sets the distance offset."))
534  )]
535  adjustment: f64,
536  /// A distance offset that accounts for non-Euclidean effects
537  /// which model the access links from nodes to the core Internet. The access
538  /// links are usually set by bandwidth and congestion, and the core links
539  /// usually follow distance based on geography.
540  #[viewit(
541    getter(
542      const,
543      attrs(doc = "Returns the distance offset that accounts for non-Euclidean effects.")
544    ),
545    setter(attrs(doc = "Sets the distance offset that accounts for non-Euclidean effects."))
546  )]
547  height: f64,
548}
549
550impl Default for Coordinate {
551  #[inline]
552  fn default() -> Self {
553    Self::new()
554  }
555}
556
557impl Coordinate {
558  /// Creates a new coordinate at the origin, using the default options
559  /// to supply key initial values.
560  #[inline]
561  pub fn new() -> Self {
562    Self::with_options(CoordinateOptions::new())
563  }
564
565  /// Creates a new coordinate at the origin, using the given options
566  /// to supply key initial values.
567  #[inline]
568  pub fn with_options(opts: CoordinateOptions) -> Self {
569    let mut vec = SmallVec::with_capacity(opts.dimensionality);
570    vec.resize(opts.dimensionality, 0.0);
571    Self {
572      portion: vec,
573      error: opts.vivaldi_error_max,
574      adjustment: 0.0,
575      height: opts.height_min,
576    }
577  }
578
579  /// Returns true if the coordinate is valid.
580  #[inline]
581  pub fn is_valid(&self) -> bool {
582    self.portion.iter().all(|&f| f.is_finite())
583      && self.error.is_finite()
584      && self.adjustment.is_finite()
585      && self.height.is_finite()
586  }
587
588  /// Returns true if the dimensions of the coordinates are compatible.
589  #[inline]
590  pub fn is_compatible_with(&self, other: &Self) -> bool {
591    self.portion.len() == other.portion.len()
592  }
593
594  /// Returns the result of applying the force from the direction of the
595  /// other coordinate.
596  pub fn apply_force(&self, height_min: f64, force: f64, other: &Self) -> Self {
597    assert!(
598      self.is_compatible_with(other),
599      "coordinate dimensionality does not match"
600    );
601
602    let mut ret = self.clone();
603    let (mut unit, mag) = unit_vector_at(&self.portion, &other.portion);
604    add_in_place(&mut ret.portion, mul_in_place(&mut unit, force));
605    if mag > ZERO_THRESHOLD {
606      ret.height = (ret.height + other.height) * force / mag + ret.height;
607      ret.height = ret.height.max(height_min);
608    }
609    ret
610  }
611
612  /// Apply the result of applying the force from the direction of the
613  /// other coordinate to self.
614  pub fn apply_force_in_place(&mut self, height_min: f64, force: f64, other: &Self) {
615    assert!(
616      self.is_compatible_with(other),
617      "coordinate dimensionality does not match"
618    );
619    let (mut unit, mag) = unit_vector_at(&self.portion, &other.portion);
620    add_in_place(&mut self.portion, mul_in_place(&mut unit, force));
621
622    if mag > ZERO_THRESHOLD {
623      self.height = (self.height + other.height) * force / mag + self.height;
624      self.height = self.height.max(height_min);
625    }
626  }
627
628  /// Returns the distance between this coordinate and the other
629  /// coordinate, including adjustments.
630  pub fn distance_to(&self, other: &Self) -> Duration {
631    assert!(
632      self.is_compatible_with(other),
633      "coordinate dimensionality does not match"
634    );
635
636    let dist = self.raw_distance_to(other);
637    let adjusted_dist = dist + self.adjustment + other.adjustment;
638    let dist = if adjusted_dist > 0.0 {
639      adjusted_dist
640    } else {
641      dist
642    };
643    Duration::from_nanos((dist * SECONDS_TO_NANOSECONDS) as u64)
644  }
645
646  #[inline]
647  fn raw_distance_to(&self, other: &Self) -> f64 {
648    magnitude_in_place(diff_in_place(&self.portion, &other.portion)) + self.height + other.height
649  }
650}
651
652/// The error when encoding or decoding a coordinate.
653#[derive(Debug, thiserror::Error)]
654pub enum CoordinateTransformError {
655  /// Returned when the buffer is too small to encode the coordinate.
656  #[error("encode buffer too small")]
657  BufferTooSmall,
658  /// Returned when there are not enough bytes to decode the coordinate.
659  #[error("not enough bytes to decode")]
660  NotEnoughBytes,
661}
662
663impl Transformable for Coordinate {
664  type Error = CoordinateTransformError;
665
666  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
667    let encoded_len = self.encoded_len();
668    if dst.len() < encoded_len {
669      return Err(Self::Error::BufferTooSmall);
670    }
671
672    let mut offset = 0;
673    NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
674    offset += 4;
675    NetworkEndian::write_f64(&mut dst[offset..], self.error);
676    offset += 8;
677    NetworkEndian::write_f64(&mut dst[offset..], self.adjustment);
678    offset += 8;
679    NetworkEndian::write_f64(&mut dst[offset..], self.height);
680    offset += 8;
681    for f in &self.portion {
682      NetworkEndian::write_f64(&mut dst[offset..], *f);
683      offset += 8;
684    }
685
686    debug_assert_eq!(
687      offset, encoded_len,
688      "expect write {} bytes, but actual write {} bytes",
689      encoded_len, offset
690    );
691
692    Ok(offset)
693  }
694
695  fn encoded_len(&self) -> usize {
696    4 + 8 * self.portion.len() + 8 * 3
697  }
698
699  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
700  where
701    Self: Sized,
702  {
703    let src_len = src.len();
704    if src_len < 4 + 3 * 8 {
705      return Err(Self::Error::NotEnoughBytes);
706    }
707
708    let len = NetworkEndian::read_u32(&src[0..4]) as usize;
709    if src_len < len {
710      return Err(Self::Error::NotEnoughBytes);
711    }
712
713    let mut offset = 4;
714    let error = NetworkEndian::read_f64(&src[offset..]);
715    offset += 8;
716    let adjustment = NetworkEndian::read_f64(&src[offset..]);
717    offset += 8;
718    let height = NetworkEndian::read_f64(&src[offset..]);
719    offset += 8;
720
721    let num_portion = (len - 4 - 3 * 8) / 8;
722    let mut portion = SmallVec::with_capacity(num_portion);
723
724    for _ in 0..num_portion {
725      portion.push(NetworkEndian::read_f64(&src[offset..]));
726      offset += 8;
727    }
728
729    debug_assert_eq!(
730      offset, len,
731      "expect read {} bytes, but actual read {} bytes",
732      len, offset
733    );
734
735    Ok((
736      len,
737      Self {
738        portion,
739        error,
740        adjustment,
741        height,
742      },
743    ))
744  }
745}
746
747#[inline]
748fn add_in_place(vec1: &mut [f64], vec2: &[f64]) {
749  for (x, y) in vec1.iter_mut().zip(vec2.iter()) {
750    *x += y;
751  }
752}
753
754/// Returns difference between the vec1 and vec2. This assumes the
755/// dimensions have already been checked to be compatible.
756#[inline]
757fn diff(vec1: &[f64], vec2: &[f64]) -> SmallVec<[f64; DEFAULT_DIMENSIONALITY]> {
758  vec1.iter().zip(vec2).map(|(x, y)| x - y).collect()
759}
760
761/// computes difference between the vec1 and vec2 in place. This assumes the
762/// dimensions have already been checked to be compatible.
763#[inline]
764fn diff_in_place<'a>(vec1: &'a [f64], vec2: &'a [f64]) -> impl Iterator<Item = f64> + 'a {
765  vec1.iter().zip(vec2).map(|(x, y)| x - y)
766}
767
768/// multiplied by a scalar factor in place.
769#[inline]
770fn mul_in_place(vec: &mut [f64], factor: f64) -> &mut [f64] {
771  for x in vec.iter_mut() {
772    *x *= factor;
773  }
774  vec
775}
776
777/// Computes the magnitude of the vec.
778#[inline]
779fn magnitude_in_place(vec: impl Iterator<Item = f64>) -> f64 {
780  vec.fold(0.0, |acc, x| acc + x * x).sqrt()
781}
782
783/// Returns a unit vector pointing at vec1 from vec2. If the two
784/// positions are the same then a random unit vector is returned. We also return
785/// the distance between the points for use in the later height calculation.
786fn unit_vector_at(vec1: &[f64], vec2: &[f64]) -> (SmallVec<[f64; DEFAULT_DIMENSIONALITY]>, f64) {
787  let mut ret = diff(vec1, vec2);
788
789  let mag = magnitude_in_place(ret.iter().copied());
790  if mag > ZERO_THRESHOLD {
791    mul_in_place(&mut ret, mag.recip());
792    return (ret, mag);
793  }
794
795  for x in ret.iter_mut() {
796    *x = rand_f64() - 0.5;
797  }
798
799  let mag = magnitude_in_place(ret.iter().copied());
800  if mag > ZERO_THRESHOLD {
801    mul_in_place(&mut ret, mag.recip());
802    return (ret, 0.0);
803  }
804
805  // And finally just give up and make a unit vector along the first
806  // dimension. This should be exceedingly rare.
807  ret.fill(0.0);
808  ret[0] = 1.0;
809  (ret, 0.0)
810}
811
812fn rand_f64() -> f64 {
813  let mut rng = rand::thread_rng();
814  loop {
815    let f = (rng.gen_range::<u64, _>(0..(1u64 << 63u64)) as f64) / ((1u64 << 63u64) as f64);
816    if f == 1.0 {
817      continue;
818    }
819    return f;
820  }
821}
822
823#[cfg(test)]
824mod tests {
825  use smol_str::SmolStr;
826
827  use super::*;
828
829  fn verify_equal_floats(f1: f64, f2: f64) {
830    if (f1 - f2).abs() > ZERO_THRESHOLD {
831      panic!("Equal assertion fail, {:9.6} != {:9.6}", f1, f2);
832    }
833  }
834
835  fn verify_equal_vectors(vec1: &[f64], vec2: &[f64]) {
836    if vec1.len() != vec2.len() {
837      panic!("Vector length mismatch, {} != {}", vec1.len(), vec2.len());
838    }
839
840    for (v1, v2) in vec1.iter().zip(vec2.iter()) {
841      verify_equal_floats(*v1, *v2);
842    }
843  }
844
845  impl Coordinate {
846    fn random(size: usize) -> Self {
847      let mut portion = SmallVec::with_capacity(size);
848      for _ in 0..size {
849        portion.push(rand_f64());
850      }
851      Self {
852        portion,
853        error: rand_f64(),
854        adjustment: rand_f64(),
855        height: rand_f64(),
856      }
857    }
858  }
859
860  #[tokio::test]
861  async fn test_transform_encode_decode() {
862    for i in 0..100 {
863      let filter = Coordinate::random(i);
864      let mut buf = vec![0; filter.encoded_len()];
865      let encoded_len = filter.encode(&mut buf).unwrap();
866      assert_eq!(encoded_len, filter.encoded_len());
867
868      let (decoded_len, decoded) = Coordinate::decode(&buf).unwrap();
869      assert_eq!(decoded_len, encoded_len);
870      assert_eq!(decoded, filter);
871
872      let (decoded_len, decoded) =
873        Coordinate::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
874      assert_eq!(decoded_len, encoded_len);
875      assert_eq!(decoded, filter);
876
877      let (decoded_len, decoded) =
878        Coordinate::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
879          .await
880          .unwrap();
881      assert_eq!(decoded_len, encoded_len);
882      assert_eq!(decoded, filter);
883    }
884  }
885
886  #[test]
887  fn test_client_update() {
888    let cfg = CoordinateOptions::default().with_dimensionality(3);
889
890    let client = CoordinateClient::with_options(cfg.clone());
891
892    let c = client.get_coordinate();
893    verify_equal_vectors(&c.portion, [0.0, 0.0, 0.0].as_slice());
894
895    // Place a node right above the client and observe an RTT longer than the
896    // client expects, given its distance.
897    let mut other = Coordinate::with_options(cfg.clone());
898    other.portion[2] = 0.001;
899
900    let rtt = Duration::from_nanos((2.0 * other.portion[2] * 1.0e9) as u64);
901    let mut c = client.update(&SmolStr::from("other"), &other, rtt).unwrap();
902
903    // The client should have scooted down to get away from it.
904    assert!(c.portion[2] < 0.0);
905
906    // Set the coordinate to a known state.
907    c.portion[2] = 99.0;
908    client.set_coordinate(c.clone()).unwrap();
909    let c = client.get_coordinate();
910    verify_equal_floats(c.portion[2], 99.0);
911  }
912
913  #[test]
914  fn test_client_invalid_in_ping_values() {
915    let cfg = CoordinateOptions::default().with_dimensionality(3);
916
917    let client = CoordinateClient::with_options(cfg.clone());
918
919    // Place another node
920    let mut other = Coordinate::with_options(cfg);
921    other.portion[2] = 0.001;
922    let dist = client.distance_to(&other);
923
924    // Update with a series of invalid ping periods, should return an error and estimated rtt remains unchanged
925    let pings = [9223372036854775807f64, -35f64, 11f64];
926    for p in pings {
927      client
928        .update(
929          &SmolStr::from("node"),
930          &other,
931          Duration::from_nanos((p as i64).wrapping_mul(SECONDS_TO_NANOSECONDS as i64) as u64),
932        )
933        .unwrap_err();
934
935      let dist_new = client.distance_to(&other);
936      assert_eq!(dist_new, dist);
937    }
938  }
939
940  #[test]
941  fn test_client_distance_to() {
942    let cfg = CoordinateOptions::default()
943      .with_dimensionality(3)
944      .with_height_min(0f64);
945
946    let client = CoordinateClient::<SmolStr>::with_options(cfg.clone());
947
948    // Fiddle a raw coordinate to put it a specific number of seconds away.
949    let mut other = Coordinate::with_options(cfg);
950    other.portion[2] = 12.345;
951    let expected = Duration::from_nanos((other.portion[2] * SECONDS_TO_NANOSECONDS) as u64);
952    let dist = client.distance_to(&other);
953    assert_eq!(dist, expected);
954  }
955
956  #[test]
957  fn test_client_latency_filter() {
958    let cfg = CoordinateOptions::default().with_latency_filter_size(3);
959
960    let client = CoordinateClient::with_options(cfg);
961
962    // Make sure we get the median, and that things age properly.
963    verify_equal_floats(
964      client
965        .inner
966        .write()
967        .latency_filter(&SmolStr::from("alice"), 0.201),
968      0.201,
969    );
970    verify_equal_floats(
971      client
972        .inner
973        .write()
974        .latency_filter(&SmolStr::from("alice"), 0.200),
975      0.201,
976    );
977    verify_equal_floats(
978      client
979        .inner
980        .write()
981        .latency_filter(&SmolStr::from("alice"), 0.207),
982      0.201,
983    );
984
985    // This glitch will get median-ed out and never seen by Vivaldi.
986    verify_equal_floats(
987      client
988        .inner
989        .write()
990        .latency_filter(&SmolStr::from("alice"), 1.9),
991      0.207,
992    );
993    verify_equal_floats(
994      client
995        .inner
996        .write()
997        .latency_filter(&SmolStr::from("alice"), 0.203),
998      0.207,
999    );
1000    verify_equal_floats(
1001      client
1002        .inner
1003        .write()
1004        .latency_filter(&SmolStr::from("alice"), 0.199),
1005      0.203,
1006    );
1007    verify_equal_floats(
1008      client
1009        .inner
1010        .write()
1011        .latency_filter(&SmolStr::from("alice"), 0.211),
1012      0.203,
1013    );
1014
1015    // Make sure different nodes are not coupled.
1016    verify_equal_floats(
1017      client
1018        .inner
1019        .write()
1020        .latency_filter(&SmolStr::from("bob"), 0.310),
1021      0.310,
1022    );
1023
1024    // Make sure we don't leak coordinates for nodes that leave.
1025    client.forget_node(&SmolStr::from("alice"));
1026    verify_equal_floats(
1027      client
1028        .inner
1029        .write()
1030        .latency_filter(&SmolStr::from("alice"), 0.888),
1031      0.888,
1032    );
1033  }
1034
1035  #[test]
1036  fn test_client_nan_defense() {
1037    let cfg = CoordinateOptions::default().with_dimensionality(3);
1038
1039    let client = CoordinateClient::with_options(cfg.clone());
1040
1041    // Block a bad coordinate from coming in.
1042    let mut other = Coordinate::with_options(cfg.clone());
1043    other.portion[0] = f64::NAN;
1044    assert!(!other.is_valid());
1045
1046    let rtt = Duration::from_millis(250);
1047    let c = client
1048      .update(&SmolStr::from("node"), &other, rtt)
1049      .unwrap_err();
1050    assert_eq!(c, CoordinateError::InvalidCoordinate);
1051    let c = client.get_coordinate();
1052    assert!(c.is_valid());
1053
1054    // Block setting an incompatible coordinate directly.
1055    other.portion.resize(other.portion.len() * 2, 0.0);
1056    let e = client.set_coordinate(other).unwrap_err();
1057    assert_eq!(e, CoordinateError::DimensionalityMismatch);
1058    let c = client.get_coordinate();
1059    assert!(c.is_valid());
1060
1061    // Poison the internal state and make sure we reset on an update.
1062    client.inner.write().coord.portion[0] = f64::NAN;
1063    let other = Coordinate::with_options(cfg);
1064    let c = client.update(&SmolStr::from("node"), &other, rtt).unwrap();
1065    assert!(c.is_valid());
1066    assert_eq!(client.stats().resets, 1);
1067  }
1068
1069  #[test]
1070  fn test_coordinate_new() {
1071    let opts = CoordinateOptions::default();
1072    let c = Coordinate::with_options(opts.clone());
1073    assert_eq!(opts.dimensionality, c.portion.len());
1074  }
1075
1076  #[test]
1077  fn test_coordinate_is_valid() {
1078    let c = Coordinate::new();
1079    let mut fields = vec![];
1080    for i in 0..c.portion.len() {
1081      fields.push(c.portion[i]);
1082    }
1083    fields.push(c.error);
1084    fields.push(c.adjustment);
1085    fields.push(c.height);
1086
1087    for field in fields.iter_mut() {
1088      assert!(c.is_valid());
1089      *field = f64::NAN;
1090    }
1091  }
1092
1093  #[test]
1094  fn test_coordinate_is_compatible_with() {
1095    let cfg = CoordinateOptions::default().with_dimensionality(3);
1096
1097    let c1 = Coordinate::with_options(cfg.clone());
1098    let c2 = Coordinate::with_options(cfg.clone());
1099    let cfg = cfg.with_dimensionality(2);
1100    let alien = Coordinate::with_options(cfg);
1101
1102    assert!(c1.is_compatible_with(&c2));
1103    assert!(!c1.is_compatible_with(&alien));
1104    assert!(c2.is_compatible_with(&c1));
1105    assert!(!c2.is_compatible_with(&alien));
1106  }
1107
1108  #[test]
1109  #[should_panic(expected = "coordinate dimensionality does not match")]
1110  fn test_coordinate_apply_force() {
1111    let cfg = CoordinateOptions::default()
1112      .with_dimensionality(3)
1113      .with_height_min(0f64);
1114
1115    let origin = Coordinate::with_options(cfg.clone());
1116
1117    // This proves that we normalize, get the direction right, and apply the
1118    // force multiplier correctly.
1119    let mut above = Coordinate::with_options(cfg.clone());
1120    above.portion[0] = 0.0;
1121    above.portion[1] = 0.0;
1122    above.portion[2] = 2.9;
1123    let c = origin.apply_force(cfg.height_min, 5.3, &above);
1124
1125    verify_equal_vectors(&c.portion, [0.0, 0.0, -5.3].as_slice());
1126
1127    // Scoot a point not starting at the origin to make sure there's nothing
1128    // special there.
1129    let mut right = Coordinate::with_options(cfg.clone());
1130    right.portion[0] = 3.4;
1131    right.portion[1] = 0.0;
1132    right.portion[2] = -5.3;
1133    let c = c.apply_force(cfg.height_min, 2.0, &right);
1134    verify_equal_vectors(&c.portion, [-2.0, 0.0, -5.3].as_slice());
1135
1136    // If the points are right on top of each other, then we should end up
1137    // in a random direction, one unit away. This makes sure the unit vector
1138    // build up doesn't divide by zero.
1139    let c = origin.apply_force(cfg.height_min, 1.0, &origin);
1140    verify_equal_floats(origin.distance_to(&c).as_secs_f64(), 1.0);
1141
1142    // Enable a minimum height and make sure that gets factored in properly.
1143    let cfg = cfg.with_height_min(10.0e-6);
1144    let origin = Coordinate::with_options(cfg.clone());
1145    let c = origin.apply_force(cfg.height_min, 5.3, &above);
1146    verify_equal_vectors(&c.portion, [0.0, 0.0, -5.3].as_slice());
1147    verify_equal_floats(c.height, cfg.height_min + 5.3 * cfg.height_min / 2.9);
1148
1149    // Make sure the height minimum is enforced.
1150    let c = origin.apply_force(cfg.height_min, -5.3, &above);
1151    verify_equal_vectors(&c.portion, [0.0, 0.0, 5.3].as_slice());
1152    verify_equal_floats(c.height, cfg.height_min);
1153
1154    // Shenanigans should get called if the dimensions don't match.
1155    let mut bad = c.clone();
1156    bad.portion = SmallVec::from_slice(&vec![0.0; bad.portion.len() + 1]);
1157    c.apply_force(cfg.height_min, 1.0, &bad);
1158  }
1159
1160  #[test]
1161  fn test_coordinate_add() {
1162    let mut vec1 = [1.0, -3.0, 3.0];
1163    let vec2 = [-4.0, 5.0, 6.0];
1164    add_in_place(&mut vec1, &vec2);
1165    verify_equal_vectors(&vec1, [-3.0, 2.0, 9.0].as_slice());
1166
1167    let zero = [0.0; 3];
1168    let mut vec1 = [1.0, -3.0, 3.0];
1169    add_in_place(&mut vec1, &zero);
1170    verify_equal_vectors(&[1.0, -3.0, 3.0], vec1.as_slice());
1171  }
1172
1173  #[test]
1174  fn test_coordinate_diff() {
1175    let vec1 = [1.0, -3.0, 3.0];
1176    let vec2 = [-4.0, 5.0, 6.0];
1177    verify_equal_vectors(diff(&vec1, &vec2).as_slice(), [5.0, -8.0, -3.0].as_slice());
1178
1179    let zero = [0.0; 3];
1180    verify_equal_vectors(diff(&vec1, &zero).as_slice(), vec1.as_slice());
1181  }
1182
1183  #[test]
1184  fn test_coordinate_diff_in_place() {
1185    let vec1 = [1.0, -3.0, 3.0];
1186    let vec2 = [-4.0, 5.0, 6.0];
1187    verify_equal_vectors(
1188      &diff_in_place(&vec1, &vec2).collect::<Vec<_>>(),
1189      [5.0, -8.0, -3.0].as_slice(),
1190    );
1191
1192    let zero = [0.0; 3];
1193    verify_equal_vectors(
1194      &diff_in_place(&vec1, &zero).collect::<Vec<_>>(),
1195      vec1.as_slice(),
1196    );
1197  }
1198
1199  #[test]
1200  fn test_coordinate_magnitude() {
1201    let zero = [0.0; 3];
1202    verify_equal_floats(magnitude_in_place(zero.into_iter()), 0.0);
1203
1204    let vec = [1.0, -2.0, 3.0];
1205    verify_equal_floats(magnitude_in_place(vec.into_iter()), 3.7416573867739413);
1206  }
1207
1208  #[test]
1209  fn test_coordinate_unit_vector_at() {
1210    let vec1 = [1.0, 2.0, 3.0];
1211    let vec2 = [0.5, 0.6, 0.7];
1212    let (u, mag) = unit_vector_at(&vec1, &vec2);
1213    verify_equal_vectors(
1214      &u,
1215      [0.18257418583505536, 0.511207720338155, 0.8398412548412546].as_slice(),
1216    );
1217    verify_equal_floats(magnitude_in_place(u.iter().copied()), 1.0);
1218    let vec1 = [1.0, 2.0, 3.0];
1219    verify_equal_floats(mag, magnitude_in_place(diff(&vec1, &vec2).into_iter()));
1220
1221    // If we give positions that are equal we should get a random unit vector
1222    // returned to us, rather than a divide by zero.
1223    let vec1 = [1.0, 2.0, 3.0];
1224    let (u, mag) = unit_vector_at(&vec1, &vec1);
1225    verify_equal_floats(mag, 0.0);
1226    verify_equal_floats(magnitude_in_place(u.iter().copied()), 1.0);
1227  }
1228}