1use std::collections::{BTreeMap, HashMap};
15use std::fmt;
16
17const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325u64;
20const FNV_PRIME: u64 = 0x00000100000001b3u64;
21
22fn fnv1a_64(data: &[u8]) -> u64 {
23 let mut hash = FNV_OFFSET_BASIS;
24 for &byte in data {
25 hash ^= u64::from(byte);
26 hash = hash.wrapping_mul(FNV_PRIME);
27 }
28 hash
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub struct NodeId(pub u64);
39
40impl fmt::Display for NodeId {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "node:{}", self.0)
43 }
44}
45
46#[derive(Debug, Clone)]
55pub struct ConsistentHash {
56 ring: BTreeMap<u64, NodeId>,
58 virtual_nodes_per_node: u32,
60}
61
62impl ConsistentHash {
63 pub fn new(virtual_nodes: u32) -> Self {
65 Self {
66 ring: BTreeMap::new(),
67 virtual_nodes_per_node: virtual_nodes.max(1),
68 }
69 }
70
71 pub fn add_node(&mut self, node_id: NodeId) {
74 for i in 0..self.virtual_nodes_per_node {
75 let label = format!("{node_id}_{i}");
76 let pos = fnv1a_64(label.as_bytes());
77 self.ring.insert(pos, node_id);
78 }
79 }
80
81 pub fn remove_node(&mut self, node_id: NodeId) {
83 let to_remove: Vec<u64> = self
85 .ring
86 .iter()
87 .filter_map(|(&pos, &nid)| if nid == node_id { Some(pos) } else { None })
88 .collect();
89 for pos in to_remove {
90 self.ring.remove(&pos);
91 }
92 }
93
94 pub fn get_node(&self, key: &[u8]) -> Option<NodeId> {
99 if self.ring.is_empty() {
100 return None;
101 }
102 let pos = fnv1a_64(key);
103 self.ring
105 .range(pos..)
106 .next()
107 .or_else(|| self.ring.iter().next())
108 .map(|(_, &nid)| nid)
109 }
110
111 pub fn get_n_nodes(&self, key: &[u8], n: usize) -> Vec<NodeId> {
117 if self.ring.is_empty() || n == 0 {
118 return Vec::new();
119 }
120 let pos = fnv1a_64(key);
121
122 let after = self.ring.range(pos..).map(|(_, nid)| *nid);
124 let before = self.ring.range(..pos).map(|(_, nid)| *nid);
125 let full_circle = after.chain(before);
126
127 let mut seen: Vec<NodeId> = Vec::with_capacity(n);
128 for node in full_circle {
129 if !seen.contains(&node) {
130 seen.push(node);
131 if seen.len() == n {
132 break;
133 }
134 }
135 }
136 seen
137 }
138
139 pub fn virtual_node_count(&self) -> usize {
141 self.ring.len()
142 }
143
144 pub fn real_node_count(&self) -> usize {
146 let mut nodes: Vec<NodeId> = self.ring.values().copied().collect();
147 nodes.sort_unstable();
148 nodes.dedup();
149 nodes.len()
150 }
151}
152
153#[derive(Debug, Clone)]
158pub struct DistributedCacheClient {
159 pub local_node: NodeId,
161 pub ring: ConsistentHash,
164}
165
166impl DistributedCacheClient {
167 pub fn new(local_node: NodeId, ring: ConsistentHash) -> Self {
169 Self { local_node, ring }
170 }
171
172 pub fn route_key(&self, key: &[u8]) -> NodeId {
177 self.ring.get_node(key).unwrap_or(self.local_node)
178 }
179
180 pub fn is_local_key(&self, key: &[u8]) -> bool {
183 self.route_key(key) == self.local_node
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub struct ReplicationFactor {
195 pub reads: u8,
197 pub writes: u8,
199}
200
201impl ReplicationFactor {
202 pub fn new(reads: u8, writes: u8) -> Self {
204 Self { reads, writes }
205 }
206
207 pub fn is_quorum_read_met(&self, responses: u8) -> bool {
209 responses >= self.reads
210 }
211
212 pub fn is_quorum_write_met(&self, responses: u8) -> bool {
214 responses >= self.writes
215 }
216
217 pub fn rf3() -> Self {
219 Self {
220 reads: 2,
221 writes: 2,
222 }
223 }
224
225 pub fn rf3_strong() -> Self {
228 Self {
229 reads: 3,
230 writes: 3,
231 }
232 }
233}
234
235impl Default for ReplicationFactor {
236 fn default() -> Self {
237 Self::rf3()
238 }
239}
240
241#[derive(Debug)]
249pub struct CacheCoordinator {
250 pub clients: HashMap<NodeId, DistributedCacheClient>,
252 pub replication: ReplicationFactor,
254}
255
256impl CacheCoordinator {
257 pub fn new(replication: ReplicationFactor) -> Self {
259 Self {
260 clients: HashMap::new(),
261 replication,
262 }
263 }
264
265 pub fn add_client(&mut self, client: DistributedCacheClient) {
267 self.clients.insert(client.local_node, client);
268 }
269
270 pub fn remove_client(&mut self, node_id: NodeId) {
272 self.clients.remove(&node_id);
273 }
274
275 pub fn primary_node_for(&self, key: &[u8]) -> Option<NodeId> {
280 self.clients.values().next().map(|c| c.route_key(key))
281 }
282
283 pub fn replica_nodes_for(&self, key: &[u8], n: usize) -> Vec<NodeId> {
286 self.clients
287 .values()
288 .next()
289 .map(|c| c.ring.get_n_nodes(key, n))
290 .unwrap_or_default()
291 }
292
293 pub fn can_write_quorum(&self, key: &[u8], available_nodes: &[NodeId]) -> bool {
296 let replicas = self.replica_nodes_for(key, self.replication.writes as usize);
297 let ack_count = replicas
298 .iter()
299 .filter(|nid| available_nodes.contains(nid))
300 .count() as u8;
301 self.replication.is_quorum_write_met(ack_count)
302 }
303
304 pub fn can_read_quorum(&self, key: &[u8], available_nodes: &[NodeId]) -> bool {
307 let replicas = self.replica_nodes_for(key, self.replication.reads as usize);
308 let response_count = replicas
309 .iter()
310 .filter(|nid| available_nodes.contains(nid))
311 .count() as u8;
312 self.replication.is_quorum_read_met(response_count)
313 }
314
315 pub fn node_count(&self) -> usize {
317 self.clients.len()
318 }
319}
320
321#[cfg(test)]
324mod tests {
325 use super::*;
326
327 fn make_ring_with_nodes(vn: u32, ids: &[u64]) -> ConsistentHash {
328 let mut ring = ConsistentHash::new(vn);
329 for &id in ids {
330 ring.add_node(NodeId(id));
331 }
332 ring
333 }
334
335 #[test]
337 fn test_node_id_display() {
338 let nid = NodeId(42);
339 assert_eq!(format!("{nid}"), "node:42");
340 }
341
342 #[test]
344 fn test_empty_ring_get_node() {
345 let ring = ConsistentHash::new(10);
346 assert!(ring.get_node(b"any_key").is_none());
347 }
348
349 #[test]
351 fn test_single_node_routing() {
352 let ring = make_ring_with_nodes(20, &[1]);
353 for key in [b"a".as_ref(), b"hello", b"oximedia"] {
354 assert_eq!(ring.get_node(key), Some(NodeId(1)));
355 }
356 }
357
358 #[test]
360 fn test_two_nodes_split_keyspace() {
361 let ring = make_ring_with_nodes(150, &[1, 2]);
362 let mut counts = [0usize; 2];
363 for i in 0u32..1000 {
364 let key = i.to_le_bytes();
365 match ring.get_node(&key) {
366 Some(NodeId(1)) => counts[0] += 1,
367 Some(NodeId(2)) => counts[1] += 1,
368 _ => {}
369 }
370 }
371 assert!(counts[0] > 100, "node 1 got too few keys: {}", counts[0]);
373 assert!(counts[1] > 100, "node 2 got too few keys: {}", counts[1]);
374 }
375
376 #[test]
378 fn test_virtual_node_count() {
379 let ring = make_ring_with_nodes(50, &[1, 2, 3]);
380 assert_eq!(ring.virtual_node_count(), 150);
381 }
382
383 #[test]
385 fn test_real_node_count() {
386 let ring = make_ring_with_nodes(20, &[10, 20, 30, 40]);
387 assert_eq!(ring.real_node_count(), 4);
388 }
389
390 #[test]
392 fn test_remove_node() {
393 let mut ring = make_ring_with_nodes(10, &[1, 2]);
394 ring.remove_node(NodeId(1));
395 assert_eq!(ring.real_node_count(), 1);
396 assert_eq!(ring.virtual_node_count(), 10);
397 for i in 0u32..50 {
398 assert_eq!(ring.get_node(&i.to_le_bytes()), Some(NodeId(2)));
399 }
400 }
401
402 #[test]
404 fn test_add_node_twice_does_not_double_positions() {
405 let mut ring = ConsistentHash::new(10);
406 ring.add_node(NodeId(7));
407 ring.add_node(NodeId(7)); assert!(ring.virtual_node_count() <= 10);
410 }
411
412 #[test]
414 fn test_get_n_nodes_distinct() {
415 let ring = make_ring_with_nodes(100, &[1, 2, 3]);
416 let nodes = ring.get_n_nodes(b"replicated_key", 3);
417 assert_eq!(nodes.len(), 3);
418 let unique: std::collections::HashSet<_> = nodes.iter().cloned().collect();
419 assert_eq!(unique.len(), 3);
420 }
421
422 #[test]
424 fn test_get_n_nodes_exceeds_real_count() {
425 let ring = make_ring_with_nodes(50, &[1, 2]);
426 let nodes = ring.get_n_nodes(b"key", 10);
427 assert_eq!(nodes.len(), 2);
429 }
430
431 #[test]
433 fn test_get_n_nodes_zero() {
434 let ring = make_ring_with_nodes(50, &[1, 2, 3]);
435 assert!(ring.get_n_nodes(b"key", 0).is_empty());
436 }
437
438 #[test]
440 fn test_get_n_nodes_empty_ring() {
441 let ring = ConsistentHash::new(10);
442 assert!(ring.get_n_nodes(b"key", 3).is_empty());
443 }
444
445 #[test]
447 fn test_consistent_routing() {
448 let ring = make_ring_with_nodes(100, &[1, 2, 3, 4, 5]);
449 for key in [b"video_001".as_ref(), b"audio_002", b"manifest"] {
450 let first = ring.get_node(key);
451 for _ in 0..10 {
452 assert_eq!(ring.get_node(key), first, "routing is not deterministic");
453 }
454 }
455 }
456
457 #[test]
459 fn test_distributed_cache_client_route() {
460 let ring = make_ring_with_nodes(100, &[1, 2, 3]);
461 let client = DistributedCacheClient::new(NodeId(1), ring);
462 let routed = client.route_key(b"some_key");
464 assert!(routed.0 >= 1 && routed.0 <= 3);
465 }
466
467 #[test]
469 fn test_is_local_key_single_node() {
470 let mut ring = ConsistentHash::new(50);
471 ring.add_node(NodeId(99));
472 let client = DistributedCacheClient::new(NodeId(99), ring);
473 assert!(client.is_local_key(b"anything"));
474 }
475
476 #[test]
478 fn test_replication_factor_read_quorum() {
479 let rf = ReplicationFactor::new(2, 2);
480 assert!(!rf.is_quorum_read_met(1));
481 assert!(rf.is_quorum_read_met(2));
482 assert!(rf.is_quorum_read_met(3));
483 }
484
485 #[test]
487 fn test_replication_factor_write_quorum() {
488 let rf = ReplicationFactor::new(2, 3);
489 assert!(!rf.is_quorum_write_met(2));
490 assert!(rf.is_quorum_write_met(3));
491 }
492
493 #[test]
495 fn test_rf3_defaults() {
496 let rf = ReplicationFactor::rf3();
497 assert_eq!(rf.reads, 2);
498 assert_eq!(rf.writes, 2);
499 }
500
501 #[test]
503 fn test_cache_coordinator_node_count() {
504 let mut coord = CacheCoordinator::new(ReplicationFactor::rf3());
505 let ring = make_ring_with_nodes(50, &[1, 2, 3]);
506 for id in 1..=3u64 {
507 coord.add_client(DistributedCacheClient::new(NodeId(id), ring.clone()));
508 }
509 assert_eq!(coord.node_count(), 3);
510 coord.remove_client(NodeId(2));
511 assert_eq!(coord.node_count(), 2);
512 }
513
514 #[test]
516 fn test_can_write_quorum_all_nodes_up() {
517 let ring = make_ring_with_nodes(100, &[1, 2, 3]);
518 let mut coord = CacheCoordinator::new(ReplicationFactor::new(2, 2));
519 for id in 1..=3u64 {
520 coord.add_client(DistributedCacheClient::new(NodeId(id), ring.clone()));
521 }
522 let all_nodes = vec![NodeId(1), NodeId(2), NodeId(3)];
523 assert!(coord.can_write_quorum(b"key", &all_nodes));
524 }
525
526 #[test]
528 fn test_can_write_quorum_insufficient() {
529 let ring = make_ring_with_nodes(100, &[1, 2, 3]);
530 let mut coord = CacheCoordinator::new(ReplicationFactor::new(2, 3));
531 for id in 1..=3u64 {
532 coord.add_client(DistributedCacheClient::new(NodeId(id), ring.clone()));
533 }
534 let partial = vec![NodeId(1)];
536 assert!(!coord.can_write_quorum(b"key", &partial));
537 }
538
539 #[test]
541 fn test_primary_node_for() {
542 let ring = make_ring_with_nodes(100, &[5, 6, 7]);
543 let mut coord = CacheCoordinator::new(ReplicationFactor::default());
544 coord.add_client(DistributedCacheClient::new(NodeId(5), ring));
545 let primary = coord.primary_node_for(b"video_segment");
546 assert!(primary.is_some());
547 }
548
549 #[test]
551 fn test_primary_node_for_empty() {
552 let coord = CacheCoordinator::new(ReplicationFactor::default());
553 assert!(coord.primary_node_for(b"key").is_none());
554 }
555
556 #[test]
558 fn test_routing_consistency_after_removal() {
559 let mut ring = make_ring_with_nodes(100, &[1, 2, 3, 4, 5]);
560 let key = b"stable_key";
561 let before = ring.get_node(key);
562 ring.remove_node(NodeId(99)); let after = ring.get_node(key);
564 assert_eq!(before, after, "routing changed when removing absent node");
565 }
566
567 #[test]
569 fn test_uniform_distribution_three_nodes() {
570 let ring = make_ring_with_nodes(200, &[1, 2, 3]);
571 let mut counts: HashMap<u64, usize> = HashMap::new();
572 for i in 0u32..3000 {
573 let key = format!("key_{i}");
574 if let Some(nid) = ring.get_node(key.as_bytes()) {
575 *counts.entry(nid.0).or_insert(0) += 1;
576 }
577 }
578 for (node, count) in &counts {
580 assert!(
581 *count > 300 && *count < 2400,
582 "node {node} has unbalanced load: {count} / 3000"
583 );
584 }
585 }
586}