Skip to main content

allsource_core/domain/value_objects/
partition_key.rs

1use crate::error::{AllSourceError, Result};
2use serde::{Deserialize, Serialize};
3use std::{
4    fmt,
5    hash::{Hash, Hasher},
6};
7
8/// Partition key for distributing events across fixed partitions
9///
10/// SierraDB uses 32 fixed partitions for single-node, 1024+ for clusters.
11/// We start with 32 for single-node deployment, ready for clustering.
12///
13/// # Invariants
14/// - Partition count is fixed at construction (default: 32)
15/// - Partition ID is always in range [0, partition_count)
16/// - Same entity always maps to same partition (consistent hashing)
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub struct PartitionKey {
19    partition_id: u32,
20    partition_count: u32,
21}
22
23impl PartitionKey {
24    /// Default partition count (SierraDB uses 32 for single-node)
25    pub const DEFAULT_PARTITION_COUNT: u32 = 32;
26
27    /// Create a partition key from an entity ID
28    ///
29    /// Uses consistent hashing to ensure same entity always maps to same partition.
30    /// This is critical for ordering guarantees within a partition.
31    pub fn from_entity_id(entity_id: &str) -> Self {
32        Self::from_entity_id_with_count(entity_id, Self::DEFAULT_PARTITION_COUNT)
33    }
34
35    /// Create a partition key with custom partition count
36    pub fn from_entity_id_with_count(entity_id: &str, partition_count: u32) -> Self {
37        let mut hasher = std::collections::hash_map::DefaultHasher::new();
38        entity_id.hash(&mut hasher);
39        let hash = hasher.finish();
40        let partition_id = (hash % partition_count as u64) as u32;
41
42        Self {
43            partition_id,
44            partition_count,
45        }
46    }
47
48    /// Create from explicit partition ID (for reconstruction)
49    pub fn from_partition_id(partition_id: u32, partition_count: u32) -> Result<Self> {
50        if partition_id >= partition_count {
51            return Err(AllSourceError::InvalidInput(format!(
52                "Partition ID {} exceeds partition count {}",
53                partition_id, partition_count
54            )));
55        }
56
57        Ok(Self {
58            partition_id,
59            partition_count,
60        })
61    }
62
63    /// Get partition ID
64    pub fn partition_id(&self) -> u32 {
65        self.partition_id
66    }
67
68    /// Get partition count
69    pub fn partition_count(&self) -> u32 {
70        self.partition_count
71    }
72
73    /// Check if this partition belongs to a specific node (for clustering)
74    pub fn belongs_to_node(&self, node_id: u32, total_nodes: u32) -> bool {
75        self.partition_id % total_nodes == node_id
76    }
77}
78
79impl fmt::Display for PartitionKey {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        write!(
82            f,
83            "partition-{}/{}",
84            self.partition_id, self.partition_count
85        )
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn test_consistent_hashing() {
95        let entity_id = "user-123";
96        let key1 = PartitionKey::from_entity_id(entity_id);
97        let key2 = PartitionKey::from_entity_id(entity_id);
98
99        assert_eq!(key1, key2, "Same entity must always map to same partition");
100    }
101
102    #[test]
103    fn test_partition_range() {
104        let key = PartitionKey::from_entity_id("test");
105        assert!(key.partition_id() < PartitionKey::DEFAULT_PARTITION_COUNT);
106    }
107
108    #[test]
109    fn test_distribution() {
110        let mut partition_counts = vec![0; PartitionKey::DEFAULT_PARTITION_COUNT as usize];
111
112        for i in 0..1000 {
113            let entity_id = format!("entity-{}", i);
114            let key = PartitionKey::from_entity_id(&entity_id);
115            partition_counts[key.partition_id() as usize] += 1;
116        }
117
118        // Check reasonable distribution (no partition should be empty or overloaded)
119        for (idx, &count) in partition_counts.iter().enumerate() {
120            assert!(count > 10, "Partition {} too few events: {}", idx, count);
121            assert!(count < 60, "Partition {} too many events: {}", idx, count);
122        }
123    }
124
125    #[test]
126    fn test_node_assignment() {
127        let key = PartitionKey::from_partition_id(0, 32).unwrap();
128        assert!(key.belongs_to_node(0, 4)); // 0 % 4 = 0
129
130        let key = PartitionKey::from_partition_id(5, 32).unwrap();
131        assert!(key.belongs_to_node(1, 4)); // 5 % 4 = 1
132    }
133
134    #[test]
135    fn test_invalid_partition_id() {
136        let result = PartitionKey::from_partition_id(32, 32);
137        assert!(result.is_err());
138    }
139
140    #[test]
141    fn test_display() {
142        let key = PartitionKey::from_partition_id(5, 32).unwrap();
143        assert_eq!(key.to_string(), "partition-5/32");
144    }
145}