Skip to main content

oximedia_cache/
distributed_cache.rs

1//! Distributed cache coordination primitives.
2//!
3//! This module provides the building blocks for routing cache operations across
4//! a cluster of nodes:
5//!
6//! - [`NodeId`] — a lightweight opaque node identifier.
7//! - [`ConsistentHash`] — a virtual-node consistent-hash ring for stable key
8//!   routing as nodes join and leave.
9//! - [`DistributedCacheClient`] — per-node view with a routing helper.
10//! - [`ReplicationFactor`] — quorum read/write logic.
11//! - [`CacheCoordinator`] — cluster-level coordinator that ties it all
12//!   together.
13
14use std::collections::{BTreeMap, HashMap};
15use std::fmt;
16
17// ── FNV-1a (same scheme as bloom_filter but local to avoid cross-module dep) ──
18
19const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325u64;
20const FNV_PRIME: u64 = 0x00000100000001b3u64;
21
22fn fnv1a_64(data: &[u8]) -> u64 {
23    let mut hash = FNV_OFFSET_BASIS;
24    for &byte in data {
25        hash ^= u64::from(byte);
26        hash = hash.wrapping_mul(FNV_PRIME);
27    }
28    hash
29}
30
31// ── NodeId ────────────────────────────────────────────────────────────────────
32
33/// Opaque identifier for a cache cluster node.
34///
35/// Implements `Copy`, `Eq`, `Hash`, and `Display` so it can be used both as a
36/// map key and in format strings.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub struct NodeId(pub u64);
39
40impl fmt::Display for NodeId {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        write!(f, "node:{}", self.0)
43    }
44}
45
46// ── ConsistentHash ────────────────────────────────────────────────────────────
47
48/// Virtual-node consistent-hash ring.
49///
50/// Each real node is mapped to `virtual_nodes_per_node` positions on a
51/// `u64` hash ring using FNV-1a hashes of the string `"<node_id>_<i>"`.
52/// Key routing finds the first virtual-node position ≥ `hash(key)` on the
53/// ring (wrapping around).
54#[derive(Debug, Clone)]
55pub struct ConsistentHash {
56    /// `ring_position → NodeId` sorted map representing the virtual-node ring.
57    ring: BTreeMap<u64, NodeId>,
58    /// How many virtual nodes each real node occupies.
59    virtual_nodes_per_node: u32,
60}
61
62impl ConsistentHash {
63    /// Create an empty ring with the given number of virtual nodes per real node.
64    pub fn new(virtual_nodes: u32) -> Self {
65        Self {
66            ring: BTreeMap::new(),
67            virtual_nodes_per_node: virtual_nodes.max(1),
68        }
69    }
70
71    /// Add `node_id` to the ring by inserting `virtual_nodes_per_node` hash
72    /// positions derived from `"<node_id>_<i>"` for `i` in `0..virtual_nodes`.
73    pub fn add_node(&mut self, node_id: NodeId) {
74        for i in 0..self.virtual_nodes_per_node {
75            let label = format!("{node_id}_{i}");
76            let pos = fnv1a_64(label.as_bytes());
77            self.ring.insert(pos, node_id);
78        }
79    }
80
81    /// Remove all virtual nodes associated with `node_id`.
82    pub fn remove_node(&mut self, node_id: NodeId) {
83        // Collect positions to remove first to avoid borrow conflicts.
84        let to_remove: Vec<u64> = self
85            .ring
86            .iter()
87            .filter_map(|(&pos, &nid)| if nid == node_id { Some(pos) } else { None })
88            .collect();
89        for pos in to_remove {
90            self.ring.remove(&pos);
91        }
92    }
93
94    /// Route `key` to the first node whose ring position is ≥ `hash(key)`,
95    /// wrapping around to the lowest position if needed.
96    ///
97    /// Returns `None` when the ring is empty.
98    pub fn get_node(&self, key: &[u8]) -> Option<NodeId> {
99        if self.ring.is_empty() {
100            return None;
101        }
102        let pos = fnv1a_64(key);
103        // Try to find the first entry ≥ pos (successor).
104        self.ring
105            .range(pos..)
106            .next()
107            .or_else(|| self.ring.iter().next())
108            .map(|(_, &nid)| nid)
109    }
110
111    /// Return up to `n` distinct successor nodes for `key` (for replication).
112    ///
113    /// Starts at the primary successor and walks the ring clockwise, collecting
114    /// distinct `NodeId`s until `n` unique nodes are found or the ring is
115    /// exhausted.
116    pub fn get_n_nodes(&self, key: &[u8], n: usize) -> Vec<NodeId> {
117        if self.ring.is_empty() || n == 0 {
118            return Vec::new();
119        }
120        let pos = fnv1a_64(key);
121
122        // Build an iterator that walks the ring starting at `pos`, wrapping.
123        let after = self.ring.range(pos..).map(|(_, nid)| *nid);
124        let before = self.ring.range(..pos).map(|(_, nid)| *nid);
125        let full_circle = after.chain(before);
126
127        let mut seen: Vec<NodeId> = Vec::with_capacity(n);
128        for node in full_circle {
129            if !seen.contains(&node) {
130                seen.push(node);
131                if seen.len() == n {
132                    break;
133                }
134            }
135        }
136        seen
137    }
138
139    /// Return the number of virtual nodes currently in the ring.
140    pub fn virtual_node_count(&self) -> usize {
141        self.ring.len()
142    }
143
144    /// Return the number of distinct real nodes in the ring.
145    pub fn real_node_count(&self) -> usize {
146        let mut nodes: Vec<NodeId> = self.ring.values().copied().collect();
147        nodes.sort_unstable();
148        nodes.dedup();
149        nodes.len()
150    }
151}
152
153// ── DistributedCacheClient ────────────────────────────────────────────────────
154
155/// Per-node client that wraps a [`ConsistentHash`] ring and provides key
156/// routing from the perspective of `local_node`.
157#[derive(Debug, Clone)]
158pub struct DistributedCacheClient {
159    /// The node this client represents.
160    pub local_node: NodeId,
161    /// Shared ring (each client holds its own copy for isolation in this
162    /// in-process model; in a real system this would be a shared reference).
163    pub ring: ConsistentHash,
164}
165
166impl DistributedCacheClient {
167    /// Create a new client for `local_node` with the given ring.
168    pub fn new(local_node: NodeId, ring: ConsistentHash) -> Self {
169        Self { local_node, ring }
170    }
171
172    /// Determine which node should own `key`.
173    ///
174    /// Returns `local_node` when the ring is empty (degenerate single-node
175    /// mode).
176    pub fn route_key(&self, key: &[u8]) -> NodeId {
177        self.ring.get_node(key).unwrap_or(self.local_node)
178    }
179
180    /// Return `true` if `key` maps to `local_node` (i.e. this node is the
181    /// primary owner).
182    pub fn is_local_key(&self, key: &[u8]) -> bool {
183        self.route_key(key) == self.local_node
184    }
185}
186
187// ── ReplicationFactor ─────────────────────────────────────────────────────────
188
189/// Quorum-based replication configuration.
190///
191/// A write quorum requires acknowledgement from at least `writes` nodes; a
192/// read quorum requires responses from at least `reads` nodes.
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub struct ReplicationFactor {
195    /// Number of read responses required to satisfy a quorum.
196    pub reads: u8,
197    /// Number of write acknowledgements required to satisfy a quorum.
198    pub writes: u8,
199}
200
201impl ReplicationFactor {
202    /// Construct a new `ReplicationFactor`.
203    pub fn new(reads: u8, writes: u8) -> Self {
204        Self { reads, writes }
205    }
206
207    /// Return `true` if `responses` meets or exceeds the read quorum.
208    pub fn is_quorum_read_met(&self, responses: u8) -> bool {
209        responses >= self.reads
210    }
211
212    /// Return `true` if `responses` meets or exceeds the write quorum.
213    pub fn is_quorum_write_met(&self, responses: u8) -> bool {
214        responses >= self.writes
215    }
216
217    /// Convenience constructor for a standard RF-3 cluster (R=2, W=2).
218    pub fn rf3() -> Self {
219        Self {
220            reads: 2,
221            writes: 2,
222        }
223    }
224
225    /// Convenience constructor for a strongly consistent RF-3 cluster
226    /// (R+W > N, so R=3, W=3 for N=3).
227    pub fn rf3_strong() -> Self {
228        Self {
229            reads: 3,
230            writes: 3,
231        }
232    }
233}
234
235impl Default for ReplicationFactor {
236    fn default() -> Self {
237        Self::rf3()
238    }
239}
240
241// ── CacheCoordinator ──────────────────────────────────────────────────────────
242
243/// Cluster-level coordinator.
244///
245/// Tracks all node clients and the replication policy for the cluster.  In a
246/// real distributed system the coordinator would issue RPCs; here it simulates
247/// the routing and quorum decisions.
248#[derive(Debug)]
249pub struct CacheCoordinator {
250    /// Map from `NodeId` to per-node client.
251    pub clients: HashMap<NodeId, DistributedCacheClient>,
252    /// Cluster-wide replication factor.
253    pub replication: ReplicationFactor,
254}
255
256impl CacheCoordinator {
257    /// Create a new `CacheCoordinator` with the given replication factor.
258    pub fn new(replication: ReplicationFactor) -> Self {
259        Self {
260            clients: HashMap::new(),
261            replication,
262        }
263    }
264
265    /// Register a `DistributedCacheClient` for its `local_node`.
266    pub fn add_client(&mut self, client: DistributedCacheClient) {
267        self.clients.insert(client.local_node, client);
268    }
269
270    /// Remove the client (and node) identified by `node_id`.
271    pub fn remove_client(&mut self, node_id: NodeId) {
272        self.clients.remove(&node_id);
273    }
274
275    /// Determine the primary owner of `key` according to the first registered
276    /// client's ring.
277    ///
278    /// Returns `None` when no clients are registered.
279    pub fn primary_node_for(&self, key: &[u8]) -> Option<NodeId> {
280        self.clients.values().next().map(|c| c.route_key(key))
281    }
282
283    /// Return up to `n` replica nodes for `key` according to the first
284    /// registered client's ring.
285    pub fn replica_nodes_for(&self, key: &[u8], n: usize) -> Vec<NodeId> {
286        self.clients
287            .values()
288            .next()
289            .map(|c| c.ring.get_n_nodes(key, n))
290            .unwrap_or_default()
291    }
292
293    /// Simulate a write operation: determine the owner nodes for `key` and
294    /// check whether a quorum can be formed from `available_nodes`.
295    pub fn can_write_quorum(&self, key: &[u8], available_nodes: &[NodeId]) -> bool {
296        let replicas = self.replica_nodes_for(key, self.replication.writes as usize);
297        let ack_count = replicas
298            .iter()
299            .filter(|nid| available_nodes.contains(nid))
300            .count() as u8;
301        self.replication.is_quorum_write_met(ack_count)
302    }
303
304    /// Simulate a read operation: determine the owner nodes for `key` and
305    /// check whether a quorum can be formed from `available_nodes`.
306    pub fn can_read_quorum(&self, key: &[u8], available_nodes: &[NodeId]) -> bool {
307        let replicas = self.replica_nodes_for(key, self.replication.reads as usize);
308        let response_count = replicas
309            .iter()
310            .filter(|nid| available_nodes.contains(nid))
311            .count() as u8;
312        self.replication.is_quorum_read_met(response_count)
313    }
314
315    /// Return the number of registered clients/nodes.
316    pub fn node_count(&self) -> usize {
317        self.clients.len()
318    }
319}
320
321// ── Tests ─────────────────────────────────────────────────────────────────────
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    fn make_ring_with_nodes(vn: u32, ids: &[u64]) -> ConsistentHash {
328        let mut ring = ConsistentHash::new(vn);
329        for &id in ids {
330            ring.add_node(NodeId(id));
331        }
332        ring
333    }
334
335    // 1. NodeId display
336    #[test]
337    fn test_node_id_display() {
338        let nid = NodeId(42);
339        assert_eq!(format!("{nid}"), "node:42");
340    }
341
342    // 2. Empty ring returns None
343    #[test]
344    fn test_empty_ring_get_node() {
345        let ring = ConsistentHash::new(10);
346        assert!(ring.get_node(b"any_key").is_none());
347    }
348
349    // 3. Single node always wins
350    #[test]
351    fn test_single_node_routing() {
352        let ring = make_ring_with_nodes(20, &[1]);
353        for key in [b"a".as_ref(), b"hello", b"oximedia"] {
354            assert_eq!(ring.get_node(key), Some(NodeId(1)));
355        }
356    }
357
358    // 4. Adding two nodes splits the keyspace
359    #[test]
360    fn test_two_nodes_split_keyspace() {
361        let ring = make_ring_with_nodes(150, &[1, 2]);
362        let mut counts = [0usize; 2];
363        for i in 0u32..1000 {
364            let key = i.to_le_bytes();
365            match ring.get_node(&key) {
366                Some(NodeId(1)) => counts[0] += 1,
367                Some(NodeId(2)) => counts[1] += 1,
368                _ => {}
369            }
370        }
371        // Each node should get a non-trivial fraction.
372        assert!(counts[0] > 100, "node 1 got too few keys: {}", counts[0]);
373        assert!(counts[1] > 100, "node 2 got too few keys: {}", counts[1]);
374    }
375
376    // 5. virtual_node_count matches expected
377    #[test]
378    fn test_virtual_node_count() {
379        let ring = make_ring_with_nodes(50, &[1, 2, 3]);
380        assert_eq!(ring.virtual_node_count(), 150);
381    }
382
383    // 6. real_node_count
384    #[test]
385    fn test_real_node_count() {
386        let ring = make_ring_with_nodes(20, &[10, 20, 30, 40]);
387        assert_eq!(ring.real_node_count(), 4);
388    }
389
390    // 7. remove_node shrinks the ring
391    #[test]
392    fn test_remove_node() {
393        let mut ring = make_ring_with_nodes(10, &[1, 2]);
394        ring.remove_node(NodeId(1));
395        assert_eq!(ring.real_node_count(), 1);
396        assert_eq!(ring.virtual_node_count(), 10);
397        for i in 0u32..50 {
398            assert_eq!(ring.get_node(&i.to_le_bytes()), Some(NodeId(2)));
399        }
400    }
401
402    // 8. Stability: re-adding the same node does not double-add
403    #[test]
404    fn test_add_node_twice_does_not_double_positions() {
405        let mut ring = ConsistentHash::new(10);
406        ring.add_node(NodeId(7));
407        ring.add_node(NodeId(7)); // second add should overwrite same positions
408                                  // BTreeMap deduplicates by position key, so count <= 10.
409        assert!(ring.virtual_node_count() <= 10);
410    }
411
412    // 9. get_n_nodes returns distinct nodes
413    #[test]
414    fn test_get_n_nodes_distinct() {
415        let ring = make_ring_with_nodes(100, &[1, 2, 3]);
416        let nodes = ring.get_n_nodes(b"replicated_key", 3);
417        assert_eq!(nodes.len(), 3);
418        let unique: std::collections::HashSet<_> = nodes.iter().cloned().collect();
419        assert_eq!(unique.len(), 3);
420    }
421
422    // 10. get_n_nodes with n > real nodes returns all real nodes
423    #[test]
424    fn test_get_n_nodes_exceeds_real_count() {
425        let ring = make_ring_with_nodes(50, &[1, 2]);
426        let nodes = ring.get_n_nodes(b"key", 10);
427        // Only 2 real nodes exist.
428        assert_eq!(nodes.len(), 2);
429    }
430
431    // 11. get_n_nodes with n=0 returns empty
432    #[test]
433    fn test_get_n_nodes_zero() {
434        let ring = make_ring_with_nodes(50, &[1, 2, 3]);
435        assert!(ring.get_n_nodes(b"key", 0).is_empty());
436    }
437
438    // 12. get_n_nodes on empty ring returns empty
439    #[test]
440    fn test_get_n_nodes_empty_ring() {
441        let ring = ConsistentHash::new(10);
442        assert!(ring.get_n_nodes(b"key", 3).is_empty());
443    }
444
445    // 13. Consistent routing: same key always maps to same node
446    #[test]
447    fn test_consistent_routing() {
448        let ring = make_ring_with_nodes(100, &[1, 2, 3, 4, 5]);
449        for key in [b"video_001".as_ref(), b"audio_002", b"manifest"] {
450            let first = ring.get_node(key);
451            for _ in 0..10 {
452                assert_eq!(ring.get_node(key), first, "routing is not deterministic");
453            }
454        }
455    }
456
457    // 14. DistributedCacheClient::route_key
458    #[test]
459    fn test_distributed_cache_client_route() {
460        let ring = make_ring_with_nodes(100, &[1, 2, 3]);
461        let client = DistributedCacheClient::new(NodeId(1), ring);
462        // Must return a valid node, not panic.
463        let routed = client.route_key(b"some_key");
464        assert!(routed.0 >= 1 && routed.0 <= 3);
465    }
466
467    // 15. is_local_key when ring has only local node
468    #[test]
469    fn test_is_local_key_single_node() {
470        let mut ring = ConsistentHash::new(50);
471        ring.add_node(NodeId(99));
472        let client = DistributedCacheClient::new(NodeId(99), ring);
473        assert!(client.is_local_key(b"anything"));
474    }
475
476    // 16. ReplicationFactor quorum read
477    #[test]
478    fn test_replication_factor_read_quorum() {
479        let rf = ReplicationFactor::new(2, 2);
480        assert!(!rf.is_quorum_read_met(1));
481        assert!(rf.is_quorum_read_met(2));
482        assert!(rf.is_quorum_read_met(3));
483    }
484
485    // 17. ReplicationFactor quorum write
486    #[test]
487    fn test_replication_factor_write_quorum() {
488        let rf = ReplicationFactor::new(2, 3);
489        assert!(!rf.is_quorum_write_met(2));
490        assert!(rf.is_quorum_write_met(3));
491    }
492
493    // 18. rf3 default quorum
494    #[test]
495    fn test_rf3_defaults() {
496        let rf = ReplicationFactor::rf3();
497        assert_eq!(rf.reads, 2);
498        assert_eq!(rf.writes, 2);
499    }
500
501    // 19. CacheCoordinator add / remove clients
502    #[test]
503    fn test_cache_coordinator_node_count() {
504        let mut coord = CacheCoordinator::new(ReplicationFactor::rf3());
505        let ring = make_ring_with_nodes(50, &[1, 2, 3]);
506        for id in 1..=3u64 {
507            coord.add_client(DistributedCacheClient::new(NodeId(id), ring.clone()));
508        }
509        assert_eq!(coord.node_count(), 3);
510        coord.remove_client(NodeId(2));
511        assert_eq!(coord.node_count(), 2);
512    }
513
514    // 20. CacheCoordinator can_write_quorum all available
515    #[test]
516    fn test_can_write_quorum_all_nodes_up() {
517        let ring = make_ring_with_nodes(100, &[1, 2, 3]);
518        let mut coord = CacheCoordinator::new(ReplicationFactor::new(2, 2));
519        for id in 1..=3u64 {
520            coord.add_client(DistributedCacheClient::new(NodeId(id), ring.clone()));
521        }
522        let all_nodes = vec![NodeId(1), NodeId(2), NodeId(3)];
523        assert!(coord.can_write_quorum(b"key", &all_nodes));
524    }
525
526    // 21. CacheCoordinator can_write_quorum insufficient nodes
527    #[test]
528    fn test_can_write_quorum_insufficient() {
529        let ring = make_ring_with_nodes(100, &[1, 2, 3]);
530        let mut coord = CacheCoordinator::new(ReplicationFactor::new(2, 3));
531        for id in 1..=3u64 {
532            coord.add_client(DistributedCacheClient::new(NodeId(id), ring.clone()));
533        }
534        // Only one node available.
535        let partial = vec![NodeId(1)];
536        assert!(!coord.can_write_quorum(b"key", &partial));
537    }
538
539    // 22. primary_node_for returns Some when nodes are registered
540    #[test]
541    fn test_primary_node_for() {
542        let ring = make_ring_with_nodes(100, &[5, 6, 7]);
543        let mut coord = CacheCoordinator::new(ReplicationFactor::default());
544        coord.add_client(DistributedCacheClient::new(NodeId(5), ring));
545        let primary = coord.primary_node_for(b"video_segment");
546        assert!(primary.is_some());
547    }
548
549    // 23. primary_node_for returns None when no clients
550    #[test]
551    fn test_primary_node_for_empty() {
552        let coord = CacheCoordinator::new(ReplicationFactor::default());
553        assert!(coord.primary_node_for(b"key").is_none());
554    }
555
556    // 24. Adding many nodes preserves routing consistency after removals
557    #[test]
558    fn test_routing_consistency_after_removal() {
559        let mut ring = make_ring_with_nodes(100, &[1, 2, 3, 4, 5]);
560        let key = b"stable_key";
561        let before = ring.get_node(key);
562        ring.remove_node(NodeId(99)); // remove a node that was never added
563        let after = ring.get_node(key);
564        assert_eq!(before, after, "routing changed when removing absent node");
565    }
566
567    // 25. Keyspace distribution is roughly uniform across 3 nodes
568    #[test]
569    fn test_uniform_distribution_three_nodes() {
570        let ring = make_ring_with_nodes(200, &[1, 2, 3]);
571        let mut counts: HashMap<u64, usize> = HashMap::new();
572        for i in 0u32..3000 {
573            let key = format!("key_{i}");
574            if let Some(nid) = ring.get_node(key.as_bytes()) {
575                *counts.entry(nid.0).or_insert(0) += 1;
576            }
577        }
578        // Each node should receive between 20% and 80% of keys.
579        for (node, count) in &counts {
580            assert!(
581                *count > 300 && *count < 2400,
582                "node {node} has unbalanced load: {count} / 3000"
583            );
584        }
585    }
586}