allsource_core/domain/value_objects/
partition_key.rs1use crate::error::{AllSourceError, Result};
2use serde::{Deserialize, Serialize};
3use std::fmt;
4use std::hash::{Hash, Hasher};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub struct PartitionKey {
17 partition_id: u32,
18 partition_count: u32,
19}
20
21impl PartitionKey {
22 pub const DEFAULT_PARTITION_COUNT: u32 = 32;
24
25 pub fn from_entity_id(entity_id: &str) -> Self {
30 Self::from_entity_id_with_count(entity_id, Self::DEFAULT_PARTITION_COUNT)
31 }
32
33 pub fn from_entity_id_with_count(entity_id: &str, partition_count: u32) -> Self {
35 let mut hasher = std::collections::hash_map::DefaultHasher::new();
36 entity_id.hash(&mut hasher);
37 let hash = hasher.finish();
38 let partition_id = (hash % partition_count as u64) as u32;
39
40 Self {
41 partition_id,
42 partition_count,
43 }
44 }
45
46 pub fn from_partition_id(partition_id: u32, partition_count: u32) -> Result<Self> {
48 if partition_id >= partition_count {
49 return Err(AllSourceError::InvalidInput(format!(
50 "Partition ID {} exceeds partition count {}",
51 partition_id, partition_count
52 )));
53 }
54
55 Ok(Self {
56 partition_id,
57 partition_count,
58 })
59 }
60
61 pub fn partition_id(&self) -> u32 {
63 self.partition_id
64 }
65
66 pub fn partition_count(&self) -> u32 {
68 self.partition_count
69 }
70
71 pub fn belongs_to_node(&self, node_id: u32, total_nodes: u32) -> bool {
73 self.partition_id % total_nodes == node_id
74 }
75}
76
77impl fmt::Display for PartitionKey {
78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79 write!(
80 f,
81 "partition-{}/{}",
82 self.partition_id, self.partition_count
83 )
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90
91 #[test]
92 fn test_consistent_hashing() {
93 let entity_id = "user-123";
94 let key1 = PartitionKey::from_entity_id(entity_id);
95 let key2 = PartitionKey::from_entity_id(entity_id);
96
97 assert_eq!(key1, key2, "Same entity must always map to same partition");
98 }
99
100 #[test]
101 fn test_partition_range() {
102 let key = PartitionKey::from_entity_id("test");
103 assert!(key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
104 }
105
106 #[test]
107 fn test_distribution() {
108 let mut partition_counts = vec![0; PartitionKey::DEFAULT_PARTITION_COUNT as usize];
109
110 for i in 0..1000 {
111 let entity_id = format!("entity-{}", i);
112 let key = PartitionKey::from_entity_id(&entity_id);
113 partition_counts[key.partition_id() as usize] += 1;
114 }
115
116 for (idx, &count) in partition_counts.iter().enumerate() {
118 assert!(count > 10, "Partition {} too few events: {}", idx, count);
119 assert!(count < 60, "Partition {} too many events: {}", idx, count);
120 }
121 }
122
123 #[test]
124 fn test_node_assignment() {
125 let key = PartitionKey::from_partition_id(0, 32).unwrap();
126 assert!(key.belongs_to_node(0, 4)); let key = PartitionKey::from_partition_id(5, 32).unwrap();
129 assert!(key.belongs_to_node(1, 4)); }
131
132 #[test]
133 fn test_invalid_partition_id() {
134 let result = PartitionKey::from_partition_id(32, 32);
135 assert!(result.is_err());
136 }
137
138 #[test]
139 fn test_display() {
140 let key = PartitionKey::from_partition_id(5, 32).unwrap();
141 assert_eq!(key.to_string(), "partition-5/32");
142 }
143}