1use std::{
2 collections::HashMap,
3 sync::atomic::{AtomicUsize, Ordering},
4 time::Duration,
5};
6
7use byteorder::{ByteOrder, NetworkEndian};
8use memberlist_core::CheapClone;
9use parking_lot::RwLock;
10use rand::Rng;
11use serf_types::Transformable;
12use smallvec::SmallVec;
13
14const SECONDS_TO_NANOSECONDS: f64 = 1.0e9;
16const ZERO_THRESHOLD: f64 = 1.0e-6;
19
20const DEFAULT_DIMENSIONALITY: usize = 8;
22
23const DEFAULT_ADJUSTMENT_WINDOW_SIZE: usize = 20;
25
26const DEFAULT_LATENCY_FILTER_SAMPLES_SIZE: usize = 8;
27
28#[derive(Debug, thiserror::Error, PartialEq, Eq)]
30pub enum CoordinateError {
31 #[error("dimensions aren't compatible")]
33 DimensionalityMismatch,
34 #[error("invalid coordinate")]
36 InvalidCoordinate,
37 #[error("round trip time not in valid range, duration {0:?} is not a value less than 10s")]
39 InvalidRTT(Duration),
40}
41
42#[viewit::viewit(
56 vis_all = "pub(crate)",
57 getters(style = "ref", vis_all = "pub"),
58 setters(prefix = "with", vis_all = "pub")
59)]
60#[derive(Debug, Clone, PartialEq)]
61#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
62pub struct CoordinateOptions {
63 #[viewit(
67 getter(
68 const,
69 style = "move",
70 attrs(doc = "Returns the dimensionality of the coordinate system.")
71 ),
72 setter(attrs(doc = "Sets the dimensionality of the coordinate system."))
73 )]
74 dimensionality: usize,
75
76 #[viewit(
80 getter(
81 const,
82 style = "move",
83 attrs(doc = "Returns the default error value when a node hasn't yet made any observations.")
84 ),
85 setter(attrs(
86 doc = "Sets the default error value when a node hasn't yet made any observations."
87 ))
88 )]
89 vivaldi_error_max: f64,
90
91 #[viewit(
94 getter(
95 const,
96 style = "move",
97 attrs(doc = "Returns the maximum impact an observation can have on a node's confidence.")
98 ),
99 setter(attrs(
100 doc = "Sets the maximum impact an observation can have on a node's confidence."
101 ))
102 )]
103 vivaldi_ce: f64,
104
105 #[viewit(
108 getter(
109 const,
110 style = "move",
111 attrs(doc = "Returns the maximum impact an observation can have on a node's coordinate.")
112 ),
113 setter(attrs(
114 doc = "Sets the maximum impact an observation can have on a node's coordinate."
115 ))
116 )]
117 vivaldi_cc: f64,
118
119 #[viewit(
123 getter(
124 const,
125 style = "move",
126 attrs(doc = "Returns how many samples we retain to calculate the adjustment factor.")
127 ),
128 setter(attrs(doc = "Sets how many samples we retain to calculate the adjustment factor."))
129 )]
130 adjustment_window_size: usize,
131
132 #[viewit(
137 getter(
138 const,
139 style = "move",
140 attrs(doc = "Returns the minimum value of the height parameter.")
141 ),
142 setter(attrs(doc = "Sets the minimum value of the height parameter."))
143 )]
144 height_min: f64,
145
146 #[viewit(
151 getter(
152 const,
153 style = "move",
154 attrs(doc = "Returns the maximum number of samples that are retained per node.")
155 ),
156 setter(attrs(doc = "Sets the maximum number of samples that are retained per node."))
157 )]
158 latency_filter_size: usize,
159
160 #[viewit(
163 getter(
164 const,
165 style = "move",
166 attrs(doc = "Returns how much gravity has an effect to try to re-center coordinates.")
167 ),
168 setter(attrs(doc = "Sets how much gravity has an effect to try to re-center coordinates."))
169 )]
170 gravity_rho: f64,
171
172 #[cfg(feature = "metrics")]
173 #[viewit(
174 getter(
175 const,
176 style = "ref",
177 attrs(
178 cfg(feature = "metrics"),
179 doc = "The metric labels used to identify the metrics for this coordinate client."
180 )
181 ),
182 setter(attrs(
183 cfg(feature = "metrics"),
184 doc = "Sets the metric labels used to identify the metrics for this coordinate client."
185 ))
186 )]
187 metric_labels: std::sync::Arc<memberlist_core::types::MetricLabels>,
188}
189
190impl Default for CoordinateOptions {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl CoordinateOptions {
197 #[inline]
200 pub fn new() -> Self {
201 Self {
202 dimensionality: DEFAULT_DIMENSIONALITY,
203 vivaldi_error_max: 1.5,
204 vivaldi_ce: 0.25,
205 vivaldi_cc: 0.25,
206 adjustment_window_size: 20,
207 height_min: 10.0e-6,
208 latency_filter_size: 3,
209 gravity_rho: 150.0,
210 #[cfg(feature = "metrics")]
211 metric_labels: std::sync::Arc::new(memberlist_core::types::MetricLabels::default()),
212 }
213 }
214}
215
216#[viewit::viewit(setters(prefix = "with"))]
218#[derive(Debug, Copy, Clone, Eq, PartialEq)]
219#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
220pub struct CoordinateClientStats {
221 #[viewit(
224 getter(
225 const,
226 style = "move",
227 attrs(
228 doc = "Returns the number of times we reset our local coordinate because our calculations have resulted in an invalid state."
229 )
230 ),
231 setter(attrs(
232 doc = "Sets the number of times we reset our local coordinate because our calculations have resulted in an invalid state."
233 ))
234 )]
235 resets: usize,
236}
237
238impl Default for CoordinateClientStats {
239 #[inline]
240 fn default() -> Self {
241 Self::new()
242 }
243}
244
245impl CoordinateClientStats {
246 #[inline]
247 const fn new() -> Self {
248 Self { resets: 0 }
249 }
250}
251
252struct CoordinateClientInner<I> {
253 coord: Coordinate,
255
256 origin: Coordinate,
258
259 opts: CoordinateOptions,
262
263 adjustment_index: usize,
265
266 adjustment_samples: SmallVec<[f64; DEFAULT_ADJUSTMENT_WINDOW_SIZE]>,
268
269 latency_filter_samples: HashMap<I, SmallVec<[f64; DEFAULT_LATENCY_FILTER_SAMPLES_SIZE]>>,
273}
274
275impl<I> CoordinateClientInner<I>
276where
277 I: CheapClone + Eq + core::hash::Hash,
278{
279 #[inline]
283 fn update_gravity(&mut self) {
284 let dist = self.origin.distance_to(&self.coord).as_secs();
285 let force = -1.0 * f64::powf((dist as f64) / self.opts.gravity_rho, 2.0);
286 self
287 .coord
288 .apply_force_in_place(self.opts.height_min, force, &self.origin);
289 }
290
291 #[inline]
292 fn latency_filter(&mut self, node: &I, rtt_seconds: f64) -> f64 {
293 let samples = self
294 .latency_filter_samples
295 .entry(node.cheap_clone())
296 .or_insert_with(|| SmallVec::with_capacity(self.opts.latency_filter_size));
297
298 samples.push(rtt_seconds);
300 if samples.len() > self.opts.latency_filter_size {
301 samples.remove(0);
302 }
303 let mut tmp = SmallVec::<[f64; DEFAULT_LATENCY_FILTER_SAMPLES_SIZE]>::from_slice(samples);
305 tmp.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
306 tmp[tmp.len() / 2]
307 }
308
309 fn update_vivaldi(&mut self, other: &Coordinate, mut rtt_seconds: f64) {
312 const ZERO_THRESHOLD: f64 = 1.0e-6;
313
314 let dist = self.coord.distance_to(other).as_secs_f64();
315 rtt_seconds = rtt_seconds.max(ZERO_THRESHOLD);
316
317 let wrongness = ((dist - rtt_seconds) / rtt_seconds).abs();
318
319 let total_error = (self.coord.error + other.error).max(ZERO_THRESHOLD);
320
321 let weight = self.coord.error / total_error;
322 self.coord.error = ((self.opts.vivaldi_ce * weight * wrongness)
323 + (self.coord.error * (1.0 - self.opts.vivaldi_ce * weight)))
324 .min(self.opts.vivaldi_error_max);
325
326 let force = self.opts.vivaldi_cc * weight * (rtt_seconds - dist);
327 self
328 .coord
329 .apply_force_in_place(self.opts.height_min, force, other);
330 }
331
332 fn update_adjustment(&mut self, other: &Coordinate, rtt_seconds: f64) {
335 if self.opts.adjustment_window_size == 0 {
336 return;
337 }
338 let dist = self.coord.raw_distance_to(other);
341 self.adjustment_samples[self.adjustment_index] = rtt_seconds - dist;
342 self.adjustment_index = (self.adjustment_index + 1) % self.opts.adjustment_window_size;
343
344 self.coord.adjustment =
345 self.adjustment_samples.iter().sum::<f64>() / (2.0 * self.opts.adjustment_window_size as f64);
346 }
347}
348
349pub struct CoordinateClient<I> {
357 inner: RwLock<CoordinateClientInner<I>>,
358 stats: AtomicUsize,
360}
361
362impl<I> Default for CoordinateClient<I> {
363 #[inline]
364 fn default() -> Self {
365 Self::new()
366 }
367}
368
369impl<I> CoordinateClient<I> {
370 #[inline]
372 pub fn new() -> Self {
373 Self {
374 inner: RwLock::new(CoordinateClientInner {
375 coord: Coordinate::new(),
376 origin: Coordinate::new(),
377 opts: CoordinateOptions::new(),
378 adjustment_index: 0,
379 adjustment_samples: SmallVec::from_slice(&[0.0; DEFAULT_ADJUSTMENT_WINDOW_SIZE]),
380 latency_filter_samples: HashMap::new(),
381 }),
382 stats: AtomicUsize::new(0),
383 }
384 }
385
386 #[inline]
388 pub fn with_options(opts: CoordinateOptions) -> Self {
389 let mut samples = SmallVec::with_capacity(opts.adjustment_window_size);
390 samples.resize(opts.adjustment_window_size, 0.0);
391 Self {
392 inner: RwLock::new(CoordinateClientInner {
393 coord: Coordinate::with_options(opts.clone()),
394 origin: Coordinate::with_options(opts.clone()),
395 opts,
396 adjustment_index: 0,
397 adjustment_samples: samples,
398 latency_filter_samples: HashMap::new(),
399 }),
400 stats: AtomicUsize::new(0),
401 }
402 }
403
404 #[inline]
406 pub fn get_coordinate(&self) -> Coordinate {
407 self.inner.read().coord.clone()
408 }
409
410 #[inline]
412 pub fn set_coordinate(&self, coord: Coordinate) -> Result<(), CoordinateError> {
413 let mut l = self.inner.write();
414 Self::check_coordinate(&l.coord, &coord).map(|_| l.coord = coord)
415 }
416
417 #[inline]
419 pub fn stats(&self) -> CoordinateClientStats {
420 CoordinateClientStats {
421 resets: self.stats.load(Ordering::Relaxed),
422 }
423 }
424
425 #[inline]
428 pub fn distance_to(&self, coord: &Coordinate) -> Duration {
429 self.inner.read().coord.distance_to(coord)
430 }
431
432 #[inline]
436 fn check_coordinate(this: &Coordinate, coord: &Coordinate) -> Result<(), CoordinateError> {
437 if !this.is_compatible_with(coord) {
438 return Err(CoordinateError::DimensionalityMismatch);
439 }
440
441 if !coord.is_valid() {
442 return Err(CoordinateError::InvalidCoordinate);
443 }
444
445 Ok(())
446 }
447}
448
449impl<I> CoordinateClient<I>
450where
451 I: CheapClone + Eq + core::hash::Hash,
452{
453 #[inline]
455 pub fn forget_node(&self, node: &I) {
456 self.inner.write().latency_filter_samples.remove(node);
457 }
458
459 pub fn update(
463 &self,
464 node: &I,
465 other: &Coordinate,
466 rtt: Duration,
467 ) -> Result<Coordinate, CoordinateError> {
468 let mut l = self.inner.write();
469 Self::check_coordinate(&l.coord, other)?;
470
471 const MAX_RTT: Duration = Duration::from_secs(10);
478
479 if rtt > MAX_RTT {
480 return Err(CoordinateError::InvalidRTT(rtt));
481 }
482
483 #[cfg(feature = "metrics")]
484 if rtt.is_zero() {
485 metrics::counter!("serf.coordinate.zero-rtt", l.opts.metric_labels.iter()).increment(1);
486 }
487
488 let rtt_seconds = l.latency_filter(node, rtt.as_secs_f64());
489 l.update_vivaldi(other, rtt_seconds);
490 l.update_adjustment(other, rtt_seconds);
491 l.update_gravity();
492
493 if !l.coord.is_valid() {
494 self.stats.fetch_add(1, Ordering::Acquire);
495 l.coord = Coordinate::with_options(l.opts.clone());
496 }
497
498 Ok(l.coord.clone())
499 }
500}
501
502#[viewit::viewit(getters(style = "move"), setters(prefix = "with"))]
506#[derive(Debug, Clone, PartialEq)]
507#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
508pub struct Coordinate {
509 #[viewit(
513 getter(
514 const,
515 style = "ref",
516 attrs(doc = "Returns the Euclidean portion of the coordinate.")
517 ),
518 setter(attrs(doc = "Sets the Euclidean portion of the coordinate."))
519 )]
520 portion: SmallVec<[f64; DEFAULT_DIMENSIONALITY]>,
521 #[viewit(
524 getter(const, attrs(doc = "Returns the confidence in the given coordinate.")),
525 setter(attrs(doc = "Sets the confidence in the given coordinate."))
526 )]
527 error: f64,
528 #[viewit(
532 getter(const, attrs(doc = "Returns the distance offset.")),
533 setter(attrs(doc = "Sets the distance offset."))
534 )]
535 adjustment: f64,
536 #[viewit(
541 getter(
542 const,
543 attrs(doc = "Returns the distance offset that accounts for non-Euclidean effects.")
544 ),
545 setter(attrs(doc = "Sets the distance offset that accounts for non-Euclidean effects."))
546 )]
547 height: f64,
548}
549
550impl Default for Coordinate {
551 #[inline]
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557impl Coordinate {
558 #[inline]
561 pub fn new() -> Self {
562 Self::with_options(CoordinateOptions::new())
563 }
564
565 #[inline]
568 pub fn with_options(opts: CoordinateOptions) -> Self {
569 let mut vec = SmallVec::with_capacity(opts.dimensionality);
570 vec.resize(opts.dimensionality, 0.0);
571 Self {
572 portion: vec,
573 error: opts.vivaldi_error_max,
574 adjustment: 0.0,
575 height: opts.height_min,
576 }
577 }
578
579 #[inline]
581 pub fn is_valid(&self) -> bool {
582 self.portion.iter().all(|&f| f.is_finite())
583 && self.error.is_finite()
584 && self.adjustment.is_finite()
585 && self.height.is_finite()
586 }
587
588 #[inline]
590 pub fn is_compatible_with(&self, other: &Self) -> bool {
591 self.portion.len() == other.portion.len()
592 }
593
594 pub fn apply_force(&self, height_min: f64, force: f64, other: &Self) -> Self {
597 assert!(
598 self.is_compatible_with(other),
599 "coordinate dimensionality does not match"
600 );
601
602 let mut ret = self.clone();
603 let (mut unit, mag) = unit_vector_at(&self.portion, &other.portion);
604 add_in_place(&mut ret.portion, mul_in_place(&mut unit, force));
605 if mag > ZERO_THRESHOLD {
606 ret.height = (ret.height + other.height) * force / mag + ret.height;
607 ret.height = ret.height.max(height_min);
608 }
609 ret
610 }
611
612 pub fn apply_force_in_place(&mut self, height_min: f64, force: f64, other: &Self) {
615 assert!(
616 self.is_compatible_with(other),
617 "coordinate dimensionality does not match"
618 );
619 let (mut unit, mag) = unit_vector_at(&self.portion, &other.portion);
620 add_in_place(&mut self.portion, mul_in_place(&mut unit, force));
621
622 if mag > ZERO_THRESHOLD {
623 self.height = (self.height + other.height) * force / mag + self.height;
624 self.height = self.height.max(height_min);
625 }
626 }
627
628 pub fn distance_to(&self, other: &Self) -> Duration {
631 assert!(
632 self.is_compatible_with(other),
633 "coordinate dimensionality does not match"
634 );
635
636 let dist = self.raw_distance_to(other);
637 let adjusted_dist = dist + self.adjustment + other.adjustment;
638 let dist = if adjusted_dist > 0.0 {
639 adjusted_dist
640 } else {
641 dist
642 };
643 Duration::from_nanos((dist * SECONDS_TO_NANOSECONDS) as u64)
644 }
645
646 #[inline]
647 fn raw_distance_to(&self, other: &Self) -> f64 {
648 magnitude_in_place(diff_in_place(&self.portion, &other.portion)) + self.height + other.height
649 }
650}
651
652#[derive(Debug, thiserror::Error)]
654pub enum CoordinateTransformError {
655 #[error("encode buffer too small")]
657 BufferTooSmall,
658 #[error("not enough bytes to decode")]
660 NotEnoughBytes,
661}
662
663impl Transformable for Coordinate {
664 type Error = CoordinateTransformError;
665
666 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
667 let encoded_len = self.encoded_len();
668 if dst.len() < encoded_len {
669 return Err(Self::Error::BufferTooSmall);
670 }
671
672 let mut offset = 0;
673 NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
674 offset += 4;
675 NetworkEndian::write_f64(&mut dst[offset..], self.error);
676 offset += 8;
677 NetworkEndian::write_f64(&mut dst[offset..], self.adjustment);
678 offset += 8;
679 NetworkEndian::write_f64(&mut dst[offset..], self.height);
680 offset += 8;
681 for f in &self.portion {
682 NetworkEndian::write_f64(&mut dst[offset..], *f);
683 offset += 8;
684 }
685
686 debug_assert_eq!(
687 offset, encoded_len,
688 "expect write {} bytes, but actual write {} bytes",
689 encoded_len, offset
690 );
691
692 Ok(offset)
693 }
694
695 fn encoded_len(&self) -> usize {
696 4 + 8 * self.portion.len() + 8 * 3
697 }
698
699 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
700 where
701 Self: Sized,
702 {
703 let src_len = src.len();
704 if src_len < 4 + 3 * 8 {
705 return Err(Self::Error::NotEnoughBytes);
706 }
707
708 let len = NetworkEndian::read_u32(&src[0..4]) as usize;
709 if src_len < len {
710 return Err(Self::Error::NotEnoughBytes);
711 }
712
713 let mut offset = 4;
714 let error = NetworkEndian::read_f64(&src[offset..]);
715 offset += 8;
716 let adjustment = NetworkEndian::read_f64(&src[offset..]);
717 offset += 8;
718 let height = NetworkEndian::read_f64(&src[offset..]);
719 offset += 8;
720
721 let num_portion = (len - 4 - 3 * 8) / 8;
722 let mut portion = SmallVec::with_capacity(num_portion);
723
724 for _ in 0..num_portion {
725 portion.push(NetworkEndian::read_f64(&src[offset..]));
726 offset += 8;
727 }
728
729 debug_assert_eq!(
730 offset, len,
731 "expect read {} bytes, but actual read {} bytes",
732 len, offset
733 );
734
735 Ok((
736 len,
737 Self {
738 portion,
739 error,
740 adjustment,
741 height,
742 },
743 ))
744 }
745}
746
747#[inline]
748fn add_in_place(vec1: &mut [f64], vec2: &[f64]) {
749 for (x, y) in vec1.iter_mut().zip(vec2.iter()) {
750 *x += y;
751 }
752}
753
754#[inline]
757fn diff(vec1: &[f64], vec2: &[f64]) -> SmallVec<[f64; DEFAULT_DIMENSIONALITY]> {
758 vec1.iter().zip(vec2).map(|(x, y)| x - y).collect()
759}
760
761#[inline]
764fn diff_in_place<'a>(vec1: &'a [f64], vec2: &'a [f64]) -> impl Iterator<Item = f64> + 'a {
765 vec1.iter().zip(vec2).map(|(x, y)| x - y)
766}
767
768#[inline]
770fn mul_in_place(vec: &mut [f64], factor: f64) -> &mut [f64] {
771 for x in vec.iter_mut() {
772 *x *= factor;
773 }
774 vec
775}
776
777#[inline]
779fn magnitude_in_place(vec: impl Iterator<Item = f64>) -> f64 {
780 vec.fold(0.0, |acc, x| acc + x * x).sqrt()
781}
782
783fn unit_vector_at(vec1: &[f64], vec2: &[f64]) -> (SmallVec<[f64; DEFAULT_DIMENSIONALITY]>, f64) {
787 let mut ret = diff(vec1, vec2);
788
789 let mag = magnitude_in_place(ret.iter().copied());
790 if mag > ZERO_THRESHOLD {
791 mul_in_place(&mut ret, mag.recip());
792 return (ret, mag);
793 }
794
795 for x in ret.iter_mut() {
796 *x = rand_f64() - 0.5;
797 }
798
799 let mag = magnitude_in_place(ret.iter().copied());
800 if mag > ZERO_THRESHOLD {
801 mul_in_place(&mut ret, mag.recip());
802 return (ret, 0.0);
803 }
804
805 ret.fill(0.0);
808 ret[0] = 1.0;
809 (ret, 0.0)
810}
811
812fn rand_f64() -> f64 {
813 let mut rng = rand::thread_rng();
814 loop {
815 let f = (rng.gen_range::<u64, _>(0..(1u64 << 63u64)) as f64) / ((1u64 << 63u64) as f64);
816 if f == 1.0 {
817 continue;
818 }
819 return f;
820 }
821}
822
823#[cfg(test)]
824mod tests {
825 use smol_str::SmolStr;
826
827 use super::*;
828
829 fn verify_equal_floats(f1: f64, f2: f64) {
830 if (f1 - f2).abs() > ZERO_THRESHOLD {
831 panic!("Equal assertion fail, {:9.6} != {:9.6}", f1, f2);
832 }
833 }
834
835 fn verify_equal_vectors(vec1: &[f64], vec2: &[f64]) {
836 if vec1.len() != vec2.len() {
837 panic!("Vector length mismatch, {} != {}", vec1.len(), vec2.len());
838 }
839
840 for (v1, v2) in vec1.iter().zip(vec2.iter()) {
841 verify_equal_floats(*v1, *v2);
842 }
843 }
844
845 impl Coordinate {
846 fn random(size: usize) -> Self {
847 let mut portion = SmallVec::with_capacity(size);
848 for _ in 0..size {
849 portion.push(rand_f64());
850 }
851 Self {
852 portion,
853 error: rand_f64(),
854 adjustment: rand_f64(),
855 height: rand_f64(),
856 }
857 }
858 }
859
860 #[tokio::test]
861 async fn test_transform_encode_decode() {
862 for i in 0..100 {
863 let filter = Coordinate::random(i);
864 let mut buf = vec![0; filter.encoded_len()];
865 let encoded_len = filter.encode(&mut buf).unwrap();
866 assert_eq!(encoded_len, filter.encoded_len());
867
868 let (decoded_len, decoded) = Coordinate::decode(&buf).unwrap();
869 assert_eq!(decoded_len, encoded_len);
870 assert_eq!(decoded, filter);
871
872 let (decoded_len, decoded) =
873 Coordinate::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
874 assert_eq!(decoded_len, encoded_len);
875 assert_eq!(decoded, filter);
876
877 let (decoded_len, decoded) =
878 Coordinate::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
879 .await
880 .unwrap();
881 assert_eq!(decoded_len, encoded_len);
882 assert_eq!(decoded, filter);
883 }
884 }
885
886 #[test]
887 fn test_client_update() {
888 let cfg = CoordinateOptions::default().with_dimensionality(3);
889
890 let client = CoordinateClient::with_options(cfg.clone());
891
892 let c = client.get_coordinate();
893 verify_equal_vectors(&c.portion, [0.0, 0.0, 0.0].as_slice());
894
895 let mut other = Coordinate::with_options(cfg.clone());
898 other.portion[2] = 0.001;
899
900 let rtt = Duration::from_nanos((2.0 * other.portion[2] * 1.0e9) as u64);
901 let mut c = client.update(&SmolStr::from("other"), &other, rtt).unwrap();
902
903 assert!(c.portion[2] < 0.0);
905
906 c.portion[2] = 99.0;
908 client.set_coordinate(c.clone()).unwrap();
909 let c = client.get_coordinate();
910 verify_equal_floats(c.portion[2], 99.0);
911 }
912
913 #[test]
914 fn test_client_invalid_in_ping_values() {
915 let cfg = CoordinateOptions::default().with_dimensionality(3);
916
917 let client = CoordinateClient::with_options(cfg.clone());
918
919 let mut other = Coordinate::with_options(cfg);
921 other.portion[2] = 0.001;
922 let dist = client.distance_to(&other);
923
924 let pings = [9223372036854775807f64, -35f64, 11f64];
926 for p in pings {
927 client
928 .update(
929 &SmolStr::from("node"),
930 &other,
931 Duration::from_nanos((p as i64).wrapping_mul(SECONDS_TO_NANOSECONDS as i64) as u64),
932 )
933 .unwrap_err();
934
935 let dist_new = client.distance_to(&other);
936 assert_eq!(dist_new, dist);
937 }
938 }
939
940 #[test]
941 fn test_client_distance_to() {
942 let cfg = CoordinateOptions::default()
943 .with_dimensionality(3)
944 .with_height_min(0f64);
945
946 let client = CoordinateClient::<SmolStr>::with_options(cfg.clone());
947
948 let mut other = Coordinate::with_options(cfg);
950 other.portion[2] = 12.345;
951 let expected = Duration::from_nanos((other.portion[2] * SECONDS_TO_NANOSECONDS) as u64);
952 let dist = client.distance_to(&other);
953 assert_eq!(dist, expected);
954 }
955
956 #[test]
957 fn test_client_latency_filter() {
958 let cfg = CoordinateOptions::default().with_latency_filter_size(3);
959
960 let client = CoordinateClient::with_options(cfg);
961
962 verify_equal_floats(
964 client
965 .inner
966 .write()
967 .latency_filter(&SmolStr::from("alice"), 0.201),
968 0.201,
969 );
970 verify_equal_floats(
971 client
972 .inner
973 .write()
974 .latency_filter(&SmolStr::from("alice"), 0.200),
975 0.201,
976 );
977 verify_equal_floats(
978 client
979 .inner
980 .write()
981 .latency_filter(&SmolStr::from("alice"), 0.207),
982 0.201,
983 );
984
985 verify_equal_floats(
987 client
988 .inner
989 .write()
990 .latency_filter(&SmolStr::from("alice"), 1.9),
991 0.207,
992 );
993 verify_equal_floats(
994 client
995 .inner
996 .write()
997 .latency_filter(&SmolStr::from("alice"), 0.203),
998 0.207,
999 );
1000 verify_equal_floats(
1001 client
1002 .inner
1003 .write()
1004 .latency_filter(&SmolStr::from("alice"), 0.199),
1005 0.203,
1006 );
1007 verify_equal_floats(
1008 client
1009 .inner
1010 .write()
1011 .latency_filter(&SmolStr::from("alice"), 0.211),
1012 0.203,
1013 );
1014
1015 verify_equal_floats(
1017 client
1018 .inner
1019 .write()
1020 .latency_filter(&SmolStr::from("bob"), 0.310),
1021 0.310,
1022 );
1023
1024 client.forget_node(&SmolStr::from("alice"));
1026 verify_equal_floats(
1027 client
1028 .inner
1029 .write()
1030 .latency_filter(&SmolStr::from("alice"), 0.888),
1031 0.888,
1032 );
1033 }
1034
1035 #[test]
1036 fn test_client_nan_defense() {
1037 let cfg = CoordinateOptions::default().with_dimensionality(3);
1038
1039 let client = CoordinateClient::with_options(cfg.clone());
1040
1041 let mut other = Coordinate::with_options(cfg.clone());
1043 other.portion[0] = f64::NAN;
1044 assert!(!other.is_valid());
1045
1046 let rtt = Duration::from_millis(250);
1047 let c = client
1048 .update(&SmolStr::from("node"), &other, rtt)
1049 .unwrap_err();
1050 assert_eq!(c, CoordinateError::InvalidCoordinate);
1051 let c = client.get_coordinate();
1052 assert!(c.is_valid());
1053
1054 other.portion.resize(other.portion.len() * 2, 0.0);
1056 let e = client.set_coordinate(other).unwrap_err();
1057 assert_eq!(e, CoordinateError::DimensionalityMismatch);
1058 let c = client.get_coordinate();
1059 assert!(c.is_valid());
1060
1061 client.inner.write().coord.portion[0] = f64::NAN;
1063 let other = Coordinate::with_options(cfg);
1064 let c = client.update(&SmolStr::from("node"), &other, rtt).unwrap();
1065 assert!(c.is_valid());
1066 assert_eq!(client.stats().resets, 1);
1067 }
1068
1069 #[test]
1070 fn test_coordinate_new() {
1071 let opts = CoordinateOptions::default();
1072 let c = Coordinate::with_options(opts.clone());
1073 assert_eq!(opts.dimensionality, c.portion.len());
1074 }
1075
1076 #[test]
1077 fn test_coordinate_is_valid() {
1078 let c = Coordinate::new();
1079 let mut fields = vec![];
1080 for i in 0..c.portion.len() {
1081 fields.push(c.portion[i]);
1082 }
1083 fields.push(c.error);
1084 fields.push(c.adjustment);
1085 fields.push(c.height);
1086
1087 for field in fields.iter_mut() {
1088 assert!(c.is_valid());
1089 *field = f64::NAN;
1090 }
1091 }
1092
1093 #[test]
1094 fn test_coordinate_is_compatible_with() {
1095 let cfg = CoordinateOptions::default().with_dimensionality(3);
1096
1097 let c1 = Coordinate::with_options(cfg.clone());
1098 let c2 = Coordinate::with_options(cfg.clone());
1099 let cfg = cfg.with_dimensionality(2);
1100 let alien = Coordinate::with_options(cfg);
1101
1102 assert!(c1.is_compatible_with(&c2));
1103 assert!(!c1.is_compatible_with(&alien));
1104 assert!(c2.is_compatible_with(&c1));
1105 assert!(!c2.is_compatible_with(&alien));
1106 }
1107
1108 #[test]
1109 #[should_panic(expected = "coordinate dimensionality does not match")]
1110 fn test_coordinate_apply_force() {
1111 let cfg = CoordinateOptions::default()
1112 .with_dimensionality(3)
1113 .with_height_min(0f64);
1114
1115 let origin = Coordinate::with_options(cfg.clone());
1116
1117 let mut above = Coordinate::with_options(cfg.clone());
1120 above.portion[0] = 0.0;
1121 above.portion[1] = 0.0;
1122 above.portion[2] = 2.9;
1123 let c = origin.apply_force(cfg.height_min, 5.3, &above);
1124
1125 verify_equal_vectors(&c.portion, [0.0, 0.0, -5.3].as_slice());
1126
1127 let mut right = Coordinate::with_options(cfg.clone());
1130 right.portion[0] = 3.4;
1131 right.portion[1] = 0.0;
1132 right.portion[2] = -5.3;
1133 let c = c.apply_force(cfg.height_min, 2.0, &right);
1134 verify_equal_vectors(&c.portion, [-2.0, 0.0, -5.3].as_slice());
1135
1136 let c = origin.apply_force(cfg.height_min, 1.0, &origin);
1140 verify_equal_floats(origin.distance_to(&c).as_secs_f64(), 1.0);
1141
1142 let cfg = cfg.with_height_min(10.0e-6);
1144 let origin = Coordinate::with_options(cfg.clone());
1145 let c = origin.apply_force(cfg.height_min, 5.3, &above);
1146 verify_equal_vectors(&c.portion, [0.0, 0.0, -5.3].as_slice());
1147 verify_equal_floats(c.height, cfg.height_min + 5.3 * cfg.height_min / 2.9);
1148
1149 let c = origin.apply_force(cfg.height_min, -5.3, &above);
1151 verify_equal_vectors(&c.portion, [0.0, 0.0, 5.3].as_slice());
1152 verify_equal_floats(c.height, cfg.height_min);
1153
1154 let mut bad = c.clone();
1156 bad.portion = SmallVec::from_slice(&vec![0.0; bad.portion.len() + 1]);
1157 c.apply_force(cfg.height_min, 1.0, &bad);
1158 }
1159
1160 #[test]
1161 fn test_coordinate_add() {
1162 let mut vec1 = [1.0, -3.0, 3.0];
1163 let vec2 = [-4.0, 5.0, 6.0];
1164 add_in_place(&mut vec1, &vec2);
1165 verify_equal_vectors(&vec1, [-3.0, 2.0, 9.0].as_slice());
1166
1167 let zero = [0.0; 3];
1168 let mut vec1 = [1.0, -3.0, 3.0];
1169 add_in_place(&mut vec1, &zero);
1170 verify_equal_vectors(&[1.0, -3.0, 3.0], vec1.as_slice());
1171 }
1172
1173 #[test]
1174 fn test_coordinate_diff() {
1175 let vec1 = [1.0, -3.0, 3.0];
1176 let vec2 = [-4.0, 5.0, 6.0];
1177 verify_equal_vectors(diff(&vec1, &vec2).as_slice(), [5.0, -8.0, -3.0].as_slice());
1178
1179 let zero = [0.0; 3];
1180 verify_equal_vectors(diff(&vec1, &zero).as_slice(), vec1.as_slice());
1181 }
1182
1183 #[test]
1184 fn test_coordinate_diff_in_place() {
1185 let vec1 = [1.0, -3.0, 3.0];
1186 let vec2 = [-4.0, 5.0, 6.0];
1187 verify_equal_vectors(
1188 &diff_in_place(&vec1, &vec2).collect::<Vec<_>>(),
1189 [5.0, -8.0, -3.0].as_slice(),
1190 );
1191
1192 let zero = [0.0; 3];
1193 verify_equal_vectors(
1194 &diff_in_place(&vec1, &zero).collect::<Vec<_>>(),
1195 vec1.as_slice(),
1196 );
1197 }
1198
1199 #[test]
1200 fn test_coordinate_magnitude() {
1201 let zero = [0.0; 3];
1202 verify_equal_floats(magnitude_in_place(zero.into_iter()), 0.0);
1203
1204 let vec = [1.0, -2.0, 3.0];
1205 verify_equal_floats(magnitude_in_place(vec.into_iter()), 3.7416573867739413);
1206 }
1207
1208 #[test]
1209 fn test_coordinate_unit_vector_at() {
1210 let vec1 = [1.0, 2.0, 3.0];
1211 let vec2 = [0.5, 0.6, 0.7];
1212 let (u, mag) = unit_vector_at(&vec1, &vec2);
1213 verify_equal_vectors(
1214 &u,
1215 [0.18257418583505536, 0.511207720338155, 0.8398412548412546].as_slice(),
1216 );
1217 verify_equal_floats(magnitude_in_place(u.iter().copied()), 1.0);
1218 let vec1 = [1.0, 2.0, 3.0];
1219 verify_equal_floats(mag, magnitude_in_place(diff(&vec1, &vec2).into_iter()));
1220
1221 let vec1 = [1.0, 2.0, 3.0];
1224 let (u, mag) = unit_vector_at(&vec1, &vec1);
1225 verify_equal_floats(mag, 0.0);
1226 verify_equal_floats(magnitude_in_place(u.iter().copied()), 1.0);
1227 }
1228}