Skip to main content

allsource_core/domain/value_objects/
partition_key.rs

1use crate::error::{AllSourceError, Result};
2use serde::{Deserialize, Serialize};
3use std::{
4    fmt,
5    hash::{Hash, Hasher},
6};
7
8/// Partition key for distributing events across fixed partitions
9///
10/// SierraDB uses 32 fixed partitions for single-node, 1024+ for clusters.
11/// We start with 32 for single-node deployment, ready for clustering.
12///
13/// # Invariants
14/// - Partition count is fixed at construction (default: 32)
15/// - Partition ID is always in range [0, partition_count)
16/// - Same entity always maps to same partition (consistent hashing)
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub struct PartitionKey {
19    partition_id: u32,
20    partition_count: u32,
21}
22
23impl PartitionKey {
24    /// Default partition count (SierraDB uses 32 for single-node)
25    pub const DEFAULT_PARTITION_COUNT: u32 = 32;
26
27    /// Create a partition key from an entity ID
28    ///
29    /// Uses consistent hashing to ensure same entity always maps to same partition.
30    /// This is critical for ordering guarantees within a partition.
31    pub fn from_entity_id(entity_id: &str) -> Self {
32        Self::from_entity_id_with_count(entity_id, Self::DEFAULT_PARTITION_COUNT)
33    }
34
35    /// Create a partition key with custom partition count
36    pub fn from_entity_id_with_count(entity_id: &str, partition_count: u32) -> Self {
37        let mut hasher = std::collections::hash_map::DefaultHasher::new();
38        entity_id.hash(&mut hasher);
39        let hash = hasher.finish();
40        let partition_id = (hash % u64::from(partition_count)) as u32;
41
42        Self {
43            partition_id,
44            partition_count,
45        }
46    }
47
48    /// Create from explicit partition ID (for reconstruction)
49    pub fn from_partition_id(partition_id: u32, partition_count: u32) -> Result<Self> {
50        if partition_id >= partition_count {
51            return Err(AllSourceError::InvalidInput(format!(
52                "Partition ID {partition_id} exceeds partition count {partition_count}"
53            )));
54        }
55
56        Ok(Self {
57            partition_id,
58            partition_count,
59        })
60    }
61
62    /// Get partition ID
63    pub fn partition_id(&self) -> u32 {
64        self.partition_id
65    }
66
67    /// Get partition count
68    pub fn partition_count(&self) -> u32 {
69        self.partition_count
70    }
71
72    /// Check if this partition belongs to a specific node (for clustering)
73    pub fn belongs_to_node(&self, node_id: u32, total_nodes: u32) -> bool {
74        self.partition_id % total_nodes == node_id
75    }
76}
77
78impl fmt::Display for PartitionKey {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        write!(
81            f,
82            "partition-{}/{}",
83            self.partition_id, self.partition_count
84        )
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[test]
93    fn test_consistent_hashing() {
94        let entity_id = "user-123";
95        let key1 = PartitionKey::from_entity_id(entity_id);
96        let key2 = PartitionKey::from_entity_id(entity_id);
97
98        assert_eq!(key1, key2, "Same entity must always map to same partition");
99    }
100
101    #[test]
102    fn test_partition_range() {
103        let key = PartitionKey::from_entity_id("test");
104        assert!(key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
105    }
106
107    #[test]
108    fn test_distribution() {
109        let mut partition_counts = vec![0; PartitionKey::DEFAULT_PARTITION_COUNT as usize];
110
111        for i in 0..1000 {
112            let entity_id = format!("entity-{i}");
113            let key = PartitionKey::from_entity_id(&entity_id);
114            partition_counts[key.partition_id() as usize] += 1;
115        }
116
117        // Check reasonable distribution (no partition should be empty or overloaded)
118        for (idx, &count) in partition_counts.iter().enumerate() {
119            assert!(count > 10, "Partition {idx} too few events: {count}");
120            assert!(count < 60, "Partition {idx} too many events: {count}");
121        }
122    }
123
124    #[test]
125    fn test_node_assignment() {
126        let key = PartitionKey::from_partition_id(0, 32).unwrap();
127        assert!(key.belongs_to_node(0, 4)); // 0 % 4 = 0
128
129        let key = PartitionKey::from_partition_id(5, 32).unwrap();
130        assert!(key.belongs_to_node(1, 4)); // 5 % 4 = 1
131    }
132
133    #[test]
134    fn test_invalid_partition_id() {
135        let result = PartitionKey::from_partition_id(32, 32);
136        assert!(result.is_err());
137    }
138
139    #[test]
140    fn test_display() {
141        let key = PartitionKey::from_partition_id(5, 32).unwrap();
142        assert_eq!(key.to_string(), "partition-5/32");
143    }
144}