allsource_core/domain/value_objects/
partition_key.rs1use crate::error::{AllSourceError, Result};
2use serde::{Deserialize, Serialize};
3use std::{
4 fmt,
5 hash::{Hash, Hasher},
6};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub struct PartitionKey {
19 partition_id: u32,
20 partition_count: u32,
21}
22
23impl PartitionKey {
24 pub const DEFAULT_PARTITION_COUNT: u32 = 32;
26
27 pub fn from_entity_id(entity_id: &str) -> Self {
32 Self::from_entity_id_with_count(entity_id, Self::DEFAULT_PARTITION_COUNT)
33 }
34
35 pub fn from_entity_id_with_count(entity_id: &str, partition_count: u32) -> Self {
37 let mut hasher = std::collections::hash_map::DefaultHasher::new();
38 entity_id.hash(&mut hasher);
39 let hash = hasher.finish();
40 let partition_id = (hash % partition_count as u64) as u32;
41
42 Self {
43 partition_id,
44 partition_count,
45 }
46 }
47
48 pub fn from_partition_id(partition_id: u32, partition_count: u32) -> Result<Self> {
50 if partition_id >= partition_count {
51 return Err(AllSourceError::InvalidInput(format!(
52 "Partition ID {} exceeds partition count {}",
53 partition_id, partition_count
54 )));
55 }
56
57 Ok(Self {
58 partition_id,
59 partition_count,
60 })
61 }
62
63 pub fn partition_id(&self) -> u32 {
65 self.partition_id
66 }
67
68 pub fn partition_count(&self) -> u32 {
70 self.partition_count
71 }
72
73 pub fn belongs_to_node(&self, node_id: u32, total_nodes: u32) -> bool {
75 self.partition_id % total_nodes == node_id
76 }
77}
78
79impl fmt::Display for PartitionKey {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 write!(
82 f,
83 "partition-{}/{}",
84 self.partition_id, self.partition_count
85 )
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92
93 #[test]
94 fn test_consistent_hashing() {
95 let entity_id = "user-123";
96 let key1 = PartitionKey::from_entity_id(entity_id);
97 let key2 = PartitionKey::from_entity_id(entity_id);
98
99 assert_eq!(key1, key2, "Same entity must always map to same partition");
100 }
101
102 #[test]
103 fn test_partition_range() {
104 let key = PartitionKey::from_entity_id("test");
105 assert!(key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
106 }
107
108 #[test]
109 fn test_distribution() {
110 let mut partition_counts = vec![0; PartitionKey::DEFAULT_PARTITION_COUNT as usize];
111
112 for i in 0..1000 {
113 let entity_id = format!("entity-{}", i);
114 let key = PartitionKey::from_entity_id(&entity_id);
115 partition_counts[key.partition_id() as usize] += 1;
116 }
117
118 for (idx, &count) in partition_counts.iter().enumerate() {
120 assert!(count > 10, "Partition {} too few events: {}", idx, count);
121 assert!(count < 60, "Partition {} too many events: {}", idx, count);
122 }
123 }
124
125 #[test]
126 fn test_node_assignment() {
127 let key = PartitionKey::from_partition_id(0, 32).unwrap();
128 assert!(key.belongs_to_node(0, 4)); let key = PartitionKey::from_partition_id(5, 32).unwrap();
131 assert!(key.belongs_to_node(1, 4)); }
133
134 #[test]
135 fn test_invalid_partition_id() {
136 let result = PartitionKey::from_partition_id(32, 32);
137 assert!(result.is_err());
138 }
139
140 #[test]
141 fn test_display() {
142 let key = PartitionKey::from_partition_id(5, 32).unwrap();
143 assert_eq!(key.to_string(), "partition-5/32");
144 }
145}