1use std::cmp::Ordering;
61use std::io::Write;
62use std::net::SocketAddr;
63
64use crc32fast::Hasher;
65#[cfg(feature = "v2")]
66use i_key_sort::sort::one_key_cmp::OneKeyAndCmpSort;
67
68pub const DEFAULT_POINT_MULTIPLE: u32 = 160;
71
72#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
76pub struct Bucket {
77 node: SocketAddr,
80
81 weight: u32,
84}
85
86impl Bucket {
87 pub fn new(node: SocketAddr, weight: u32) -> Self {
95 assert!(weight != 0, "weight must be at least one");
96
97 Bucket { node, weight }
98 }
99}
100
101#[derive(Clone, Debug, Eq, PartialEq)]
103struct PointV1 {
104 node: u32,
106 hash: u32,
107}
108
109impl Ord for PointV1 {
111 fn cmp(&self, other: &Self) -> Ordering {
112 self.hash.cmp(&other.hash)
113 }
114}
115
116impl PartialOrd for PointV1 {
117 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
118 Some(self.cmp(other))
119 }
120}
121
122impl PointV1 {
123 fn new(node: u32, hash: u32) -> Self {
124 PointV1 { node, hash }
125 }
126}
127
128#[cfg(feature = "v2")]
141#[derive(Copy, Clone, Eq, PartialEq)]
142#[repr(transparent)]
143struct PointV2([u8; 6]);
144
145#[cfg(feature = "v2")]
146impl PointV2 {
147 fn new(node: u16, hash: u32) -> Self {
148 let mut this = [0; 6];
149
150 this[0..4].copy_from_slice(&hash.to_ne_bytes());
151 this[4..6].copy_from_slice(&node.to_ne_bytes());
152
153 Self(this)
154 }
155
156 fn hash(&self) -> u32 {
158 u32::from_ne_bytes(self.0[0..4].try_into().expect("There are exactly 4 bytes"))
159 }
160
161 fn node(&self) -> u16 {
163 u16::from_ne_bytes(self.0[4..6].try_into().expect("There are exactly 2 bytes"))
164 }
165}
166
167#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
168pub enum Version {
169 #[default]
170 V1,
171 #[cfg(feature = "v2")]
172 V2 { point_multiple: u32 },
173}
174
175impl Version {
176 fn point_multiple(&self) -> u32 {
177 match self {
178 Version::V1 => DEFAULT_POINT_MULTIPLE,
179 #[cfg(feature = "v2")]
180 Version::V2 { point_multiple } => *point_multiple,
181 }
182 }
183}
184
185enum RingBuilder {
186 V1(Vec<PointV1>),
187 #[cfg(feature = "v2")]
188 V2(Vec<PointV2>),
189}
190
191impl RingBuilder {
192 fn new(version: Version, total_weight: u32) -> Self {
193 match version {
194 Version::V1 => RingBuilder::V1(Vec::with_capacity(
195 (total_weight * DEFAULT_POINT_MULTIPLE) as usize,
196 )),
197 #[cfg(feature = "v2")]
198 Version::V2 { point_multiple } => {
199 RingBuilder::V2(Vec::with_capacity((total_weight * point_multiple) as usize))
200 }
201 }
202 }
203
204 fn push(&mut self, node: u16, hash: u32) {
205 match self {
206 RingBuilder::V1(ring) => {
207 ring.push(PointV1::new(node as u32, hash));
208 }
209 #[cfg(feature = "v2")]
210 RingBuilder::V2(ring) => {
211 ring.push(PointV2::new(node, hash));
212 }
213 }
214 }
215
216 #[allow(unused)]
217 fn sort(&mut self, addresses: &[SocketAddr]) {
218 match self {
219 RingBuilder::V1(ring) => {
220 ring.sort_unstable();
222 ring.dedup_by(|a, b| a.hash == b.hash);
223 }
224 #[cfg(feature = "v2")]
225 RingBuilder::V2(ring) => {
226 ring.sort_by_one_key_then_by(
227 true,
228 |p| p.hash(),
229 |p1, p2| addresses[p1.node() as usize].cmp(&addresses[p2.node() as usize]),
230 );
231
232 ring.dedup_by(|a, b| a.0[0..4] == b.0[0..4]);
234 }
235 }
236 }
237}
238
239impl From<RingBuilder> for VersionedRing {
240 fn from(ring: RingBuilder) -> Self {
241 match ring {
242 RingBuilder::V1(ring) => VersionedRing::V1(ring.into_boxed_slice()),
243 #[cfg(feature = "v2")]
244 RingBuilder::V2(ring) => VersionedRing::V2(ring.into_boxed_slice()),
245 }
246 }
247}
248
249enum VersionedRing {
250 V1(Box<[PointV1]>),
251 #[cfg(feature = "v2")]
252 V2(Box<[PointV2]>),
253}
254
255impl VersionedRing {
256 pub fn node_idx(&self, hash: u32) -> usize {
258 let search_result = match self {
261 VersionedRing::V1(ring) => ring.binary_search_by(|p| p.hash.cmp(&hash)),
262 #[cfg(feature = "v2")]
263 VersionedRing::V2(ring) => ring.binary_search_by(|p| p.hash().cmp(&hash)),
264 };
265
266 match search_result {
267 Ok(i) => i,
268 Err(i) => {
269 if i == self.len() {
272 0
273 } else {
274 i
275 }
276 }
277 }
278 }
279
280 pub fn get(&self, index: usize) -> Option<usize> {
281 match self {
282 VersionedRing::V1(ring) => ring.get(index).map(|p| p.node as usize),
283 #[cfg(feature = "v2")]
284 VersionedRing::V2(ring) => ring.get(index).map(|p| p.node() as usize),
285 }
286 }
287
288 pub fn len(&self) -> usize {
289 match self {
290 VersionedRing::V1(ring) => ring.len(),
291 #[cfg(feature = "v2")]
292 VersionedRing::V2(ring) => ring.len(),
293 }
294 }
295}
296
297pub struct Continuum {
302 ring: VersionedRing,
303 addrs: Box<[SocketAddr]>,
304}
305
306impl Continuum {
307 pub fn new(buckets: &[Bucket]) -> Self {
308 Self::new_with_version(buckets, Version::default())
309 }
310
311 pub fn new_with_version(buckets: &[Bucket], version: Version) -> Self {
313 if buckets.is_empty() {
314 return Continuum {
315 ring: VersionedRing::V1(Box::new([])),
316 addrs: Box::new([]),
317 };
318 }
319
320 let total_weight: u32 = buckets.iter().fold(0, |sum, b| sum + b.weight);
322 let mut ring = RingBuilder::new(version, total_weight);
323 let mut addrs = Vec::with_capacity(buckets.len());
324
325 for bucket in buckets {
326 let mut hasher = Hasher::new();
327
328 let mut hash_bytes = Vec::with_capacity(39 + 1 + 5);
337 write!(&mut hash_bytes, "{}", bucket.node.ip()).unwrap();
338 write!(&mut hash_bytes, "\0").unwrap();
339 write!(&mut hash_bytes, "{}", bucket.node.port()).unwrap();
340 hasher.update(hash_bytes.as_ref());
341
342 let num_points = bucket.weight * version.point_multiple();
344
345 let mut prev_hash: u32 = 0;
347 addrs.push(bucket.node);
348 let node = addrs.len() - 1;
349 for _ in 0..num_points {
350 let mut hasher = hasher.clone();
351 hasher.update(&prev_hash.to_le_bytes());
352
353 let hash = hasher.finalize();
354 ring.push(node as u16, hash);
355 prev_hash = hash;
356 }
357 }
358
359 let addrs = addrs.into_boxed_slice();
360
361 ring.sort(&addrs);
363
364 Continuum {
365 ring: ring.into(),
366 addrs,
367 }
368 }
369
370 pub fn node_idx(&self, input: &[u8]) -> usize {
372 let hash = crc32fast::hash(input);
373 self.ring.node_idx(hash)
374 }
375
376 pub fn node(&self, hash_key: &[u8]) -> Option<SocketAddr> {
378 self.ring
379 .get(self.node_idx(hash_key)) .map(|n| self.addrs[n])
381 }
382
383 pub fn node_iter(&self, hash_key: &[u8]) -> NodeIterator<'_> {
388 NodeIterator {
389 idx: self.node_idx(hash_key),
390 continuum: self,
391 }
392 }
393
394 pub fn get_addr(&self, idx: &mut usize) -> Option<&SocketAddr> {
395 let point = self.ring.get(*idx);
396 if point.is_some() {
397 *idx = (*idx + 1) % self.ring.len();
399 }
400 point.map(|n| &self.addrs[n])
401 }
402}
403
404pub struct NodeIterator<'a> {
406 idx: usize,
407 continuum: &'a Continuum,
408}
409
410impl<'a> Iterator for NodeIterator<'a> {
411 type Item = &'a SocketAddr;
412
413 fn next(&mut self) -> Option<Self::Item> {
414 self.continuum.get_addr(&mut self.idx)
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use std::net::SocketAddr;
421 use std::path::Path;
422
423 use super::{Bucket, Continuum};
424
425 fn get_sockaddr(ip: &str) -> SocketAddr {
426 ip.parse().unwrap()
427 }
428
429 #[test]
430 fn consistency_after_adding_host() {
431 fn assert_hosts(c: &Continuum) {
432 assert_eq!(c.node(b"a"), Some(get_sockaddr("127.0.0.10:6443")));
433 assert_eq!(c.node(b"b"), Some(get_sockaddr("127.0.0.5:6443")));
434 }
435
436 let buckets: Vec<_> = (1..11)
437 .map(|u| Bucket::new(get_sockaddr(&format!("127.0.0.{u}:6443")), 1))
438 .collect();
439 let c = Continuum::new(&buckets);
440 assert_hosts(&c);
441
442 let buckets: Vec<_> = (1..12)
444 .map(|u| Bucket::new(get_sockaddr(&format!("127.0.0.{u}:6443")), 1))
445 .collect();
446
447 let c = Continuum::new(&buckets);
448 assert_hosts(&c);
449 }
450
451 #[test]
452 fn matches_nginx_sample() {
453 let upstream_hosts = ["127.0.0.1:7777", "127.0.0.1:7778"];
454 let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
455
456 let mut buckets = Vec::new();
457 for upstream in upstream_hosts {
458 buckets.push(Bucket::new(upstream, 1));
459 }
460
461 let c = Continuum::new(&buckets);
462
463 assert_eq!(c.node(b"/some/path"), Some(get_sockaddr("127.0.0.1:7778")));
464 assert_eq!(
465 c.node(b"/some/longer/path"),
466 Some(get_sockaddr("127.0.0.1:7777"))
467 );
468 assert_eq!(
469 c.node(b"/sad/zaidoon"),
470 Some(get_sockaddr("127.0.0.1:7778"))
471 );
472 assert_eq!(c.node(b"/g"), Some(get_sockaddr("127.0.0.1:7777")));
473 assert_eq!(
474 c.node(b"/pingora/team/is/cool/and/this/is/a/long/uri"),
475 Some(get_sockaddr("127.0.0.1:7778"))
476 );
477 assert_eq!(
478 c.node(b"/i/am/not/confident/in/this/code"),
479 Some(get_sockaddr("127.0.0.1:7777"))
480 );
481 }
482
483 #[test]
484 fn matches_nginx_sample_data() {
485 let upstream_hosts = [
486 "10.0.0.1:443",
487 "10.0.0.2:443",
488 "10.0.0.3:443",
489 "10.0.0.4:443",
490 "10.0.0.5:443",
491 "10.0.0.6:443",
492 "10.0.0.7:443",
493 "10.0.0.8:443",
494 "10.0.0.9:443",
495 ];
496 let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
497
498 let mut buckets = Vec::new();
499 for upstream in upstream_hosts {
500 buckets.push(Bucket::new(upstream, 100));
501 }
502
503 let c = Continuum::new(&buckets);
504
505 let path = Path::new(env!("CARGO_MANIFEST_DIR"))
506 .join("test-data")
507 .join("sample-nginx-upstream.csv");
508
509 let mut rdr = csv::ReaderBuilder::new()
510 .has_headers(false)
511 .from_path(path)
512 .unwrap();
513
514 for pair in rdr.records() {
515 let pair = pair.unwrap();
516 let uri = pair.get(0).unwrap();
517 let upstream = pair.get(1).unwrap();
518
519 let got = c.node(uri.as_bytes()).unwrap();
520 assert_eq!(got, get_sockaddr(upstream));
521 }
522 }
523
524 #[test]
525 fn node_iter() {
526 let upstream_hosts = ["127.0.0.1:7777", "127.0.0.1:7778", "127.0.0.1:7779"];
527 let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
528
529 let mut buckets = Vec::new();
530 for upstream in upstream_hosts {
531 buckets.push(Bucket::new(upstream, 1));
532 }
533
534 let c = Continuum::new(&buckets);
535 let mut iter = c.node_iter(b"doghash");
536 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7778")));
537 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
538 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
539 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
540 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
541 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7778")));
542 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7778")));
543 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
544
545 let upstream_hosts = ["127.0.0.1:7777", "127.0.0.1:7779"];
547 let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
548
549 let mut buckets = Vec::new();
550 for upstream in upstream_hosts {
551 buckets.push(Bucket::new(upstream, 1));
552 }
553
554 let c = Continuum::new(&buckets);
555 let mut iter = c.node_iter(b"doghash");
556 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
559 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
560 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
561 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7777")));
562 assert_eq!(iter.next(), Some(&get_sockaddr("127.0.0.1:7779")));
565
566 let c = Continuum::new(&[Bucket::new(get_sockaddr("127.0.0.1:7777"), 1)]);
568 let mut iter = c.node_iter(b"doghash");
569
570 let start_idx = iter.idx;
571 for _ in 0..c.ring.len() {
572 assert!(iter.next().is_some());
573 }
574 assert_eq!(start_idx, iter.idx);
576 }
577
578 #[test]
579 fn test_empty() {
580 let c = Continuum::new(&[]);
581 assert!(c.node(b"doghash").is_none());
582
583 let mut iter = c.node_iter(b"doghash");
584 assert!(iter.next().is_none());
585 assert!(iter.next().is_none());
586 assert!(iter.next().is_none());
587 }
588
589 #[test]
590 fn test_ipv6_ring() {
591 let upstream_hosts = ["[::1]:7777", "[::1]:7778", "[::1]:7779"];
592 let upstream_hosts = upstream_hosts.iter().map(|i| get_sockaddr(i));
593
594 let mut buckets = Vec::new();
595 for upstream in upstream_hosts {
596 buckets.push(Bucket::new(upstream, 1));
597 }
598
599 let c = Continuum::new(&buckets);
600 let mut iter = c.node_iter(b"doghash");
601 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7777")));
602 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7778")));
603 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7777")));
604 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7778")));
605 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7778")));
606 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7777")));
607 assert_eq!(iter.next(), Some(&get_sockaddr("[::1]:7779")));
608 }
609}