allsource_core/domain/value_objects/
partition_key.rs1use crate::error::{AllSourceError, Result};
2use serde::{Deserialize, Serialize};
3use std::{
4 fmt,
5 hash::{Hash, Hasher},
6};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub struct PartitionKey {
19 partition_id: u32,
20 partition_count: u32,
21}
22
23impl PartitionKey {
24 pub const DEFAULT_PARTITION_COUNT: u32 = 32;
26
27 pub fn from_entity_id(entity_id: &str) -> Self {
32 Self::from_entity_id_with_count(entity_id, Self::DEFAULT_PARTITION_COUNT)
33 }
34
35 pub fn from_entity_id_with_count(entity_id: &str, partition_count: u32) -> Self {
37 let mut hasher = std::collections::hash_map::DefaultHasher::new();
38 entity_id.hash(&mut hasher);
39 let hash = hasher.finish();
40 let partition_id = (hash % u64::from(partition_count)) as u32;
41
42 Self {
43 partition_id,
44 partition_count,
45 }
46 }
47
48 pub fn from_partition_id(partition_id: u32, partition_count: u32) -> Result<Self> {
50 if partition_id >= partition_count {
51 return Err(AllSourceError::InvalidInput(format!(
52 "Partition ID {partition_id} exceeds partition count {partition_count}"
53 )));
54 }
55
56 Ok(Self {
57 partition_id,
58 partition_count,
59 })
60 }
61
62 pub fn partition_id(&self) -> u32 {
64 self.partition_id
65 }
66
67 pub fn partition_count(&self) -> u32 {
69 self.partition_count
70 }
71
72 pub fn belongs_to_node(&self, node_id: u32, total_nodes: u32) -> bool {
74 self.partition_id % total_nodes == node_id
75 }
76}
77
78impl fmt::Display for PartitionKey {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 write!(
81 f,
82 "partition-{}/{}",
83 self.partition_id, self.partition_count
84 )
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 #[test]
93 fn test_consistent_hashing() {
94 let entity_id = "user-123";
95 let key1 = PartitionKey::from_entity_id(entity_id);
96 let key2 = PartitionKey::from_entity_id(entity_id);
97
98 assert_eq!(key1, key2, "Same entity must always map to same partition");
99 }
100
101 #[test]
102 fn test_partition_range() {
103 let key = PartitionKey::from_entity_id("test");
104 assert!(key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
105 }
106
107 #[test]
108 fn test_distribution() {
109 let mut partition_counts = vec![0; PartitionKey::DEFAULT_PARTITION_COUNT as usize];
110
111 for i in 0..1000 {
112 let entity_id = format!("entity-{i}");
113 let key = PartitionKey::from_entity_id(&entity_id);
114 partition_counts[key.partition_id() as usize] += 1;
115 }
116
117 for (idx, &count) in partition_counts.iter().enumerate() {
119 assert!(count > 10, "Partition {idx} too few events: {count}");
120 assert!(count < 60, "Partition {idx} too many events: {count}");
121 }
122 }
123
124 #[test]
125 fn test_node_assignment() {
126 let key = PartitionKey::from_partition_id(0, 32).unwrap();
127 assert!(key.belongs_to_node(0, 4)); let key = PartitionKey::from_partition_id(5, 32).unwrap();
130 assert!(key.belongs_to_node(1, 4)); }
132
133 #[test]
134 fn test_invalid_partition_id() {
135 let result = PartitionKey::from_partition_id(32, 32);
136 assert!(result.is_err());
137 }
138
139 #[test]
140 fn test_display() {
141 let key = PartitionKey::from_partition_id(5, 32).unwrap();
142 assert_eq!(key.to_string(), "partition-5/32");
143 }
144}