allsource_core/domain/value_objects/
partition_key.rs

1use crate::error::{AllSourceError, Result};
2use serde::{Deserialize, Serialize};
3use std::fmt;
4use std::hash::{Hash, Hasher};
5
6/// Partition key for distributing events across fixed partitions
7///
8/// SierraDB uses 32 fixed partitions for single-node, 1024+ for clusters.
9/// We start with 32 for single-node deployment, ready for clustering.
10///
11/// # Invariants
12/// - Partition count is fixed at construction (default: 32)
13/// - Partition ID is always in range [0, partition_count)
14/// - Same entity always maps to same partition (consistent hashing)
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub struct PartitionKey {
17    partition_id: u32,
18    partition_count: u32,
19}
20
21impl PartitionKey {
22    /// Default partition count (SierraDB uses 32 for single-node)
23    pub const DEFAULT_PARTITION_COUNT: u32 = 32;
24
25    /// Create a partition key from an entity ID
26    ///
27    /// Uses consistent hashing to ensure same entity always maps to same partition.
28    /// This is critical for ordering guarantees within a partition.
29    pub fn from_entity_id(entity_id: &str) -> Self {
30        Self::from_entity_id_with_count(entity_id, Self::DEFAULT_PARTITION_COUNT)
31    }
32
33    /// Create a partition key with custom partition count
34    pub fn from_entity_id_with_count(entity_id: &str, partition_count: u32) -> Self {
35        let mut hasher = std::collections::hash_map::DefaultHasher::new();
36        entity_id.hash(&mut hasher);
37        let hash = hasher.finish();
38        let partition_id = (hash % partition_count as u64) as u32;
39
40        Self {
41            partition_id,
42            partition_count,
43        }
44    }
45
46    /// Create from explicit partition ID (for reconstruction)
47    pub fn from_partition_id(partition_id: u32, partition_count: u32) -> Result<Self> {
48        if partition_id >= partition_count {
49            return Err(AllSourceError::InvalidInput(format!(
50                "Partition ID {} exceeds partition count {}",
51                partition_id, partition_count
52            )));
53        }
54
55        Ok(Self {
56            partition_id,
57            partition_count,
58        })
59    }
60
61    /// Get partition ID
62    pub fn partition_id(&self) -> u32 {
63        self.partition_id
64    }
65
66    /// Get partition count
67    pub fn partition_count(&self) -> u32 {
68        self.partition_count
69    }
70
71    /// Check if this partition belongs to a specific node (for clustering)
72    pub fn belongs_to_node(&self, node_id: u32, total_nodes: u32) -> bool {
73        self.partition_id % total_nodes == node_id
74    }
75}
76
77impl fmt::Display for PartitionKey {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        write!(
80            f,
81            "partition-{}/{}",
82            self.partition_id, self.partition_count
83        )
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90
91    #[test]
92    fn test_consistent_hashing() {
93        let entity_id = "user-123";
94        let key1 = PartitionKey::from_entity_id(entity_id);
95        let key2 = PartitionKey::from_entity_id(entity_id);
96
97        assert_eq!(key1, key2, "Same entity must always map to same partition");
98    }
99
100    #[test]
101    fn test_partition_range() {
102        let key = PartitionKey::from_entity_id("test");
103        assert!(key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
104    }
105
106    #[test]
107    fn test_distribution() {
108        let mut partition_counts = vec![0; PartitionKey::DEFAULT_PARTITION_COUNT as usize];
109
110        for i in 0..1000 {
111            let entity_id = format!("entity-{}", i);
112            let key = PartitionKey::from_entity_id(&entity_id);
113            partition_counts[key.partition_id() as usize] += 1;
114        }
115
116        // Check reasonable distribution (no partition should be empty or overloaded)
117        for (idx, &count) in partition_counts.iter().enumerate() {
118            assert!(count > 10, "Partition {} too few events: {}", idx, count);
119            assert!(count < 60, "Partition {} too many events: {}", idx, count);
120        }
121    }
122
123    #[test]
124    fn test_node_assignment() {
125        let key = PartitionKey::from_partition_id(0, 32).unwrap();
126        assert!(key.belongs_to_node(0, 4)); // 0 % 4 = 0
127
128        let key = PartitionKey::from_partition_id(5, 32).unwrap();
129        assert!(key.belongs_to_node(1, 4)); // 5 % 4 = 1
130    }
131
132    #[test]
133    fn test_invalid_partition_id() {
134        let result = PartitionKey::from_partition_id(32, 32);
135        assert!(result.is_err());
136    }
137
138    #[test]
139    fn test_display() {
140        let key = PartitionKey::from_partition_id(5, 32).unwrap();
141        assert_eq!(key.to_string(), "partition-5/32");
142    }
143}