1use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{info, warn};
11
12use crate::raft::OxirsNodeId;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum PartitionStrategy {
17 ConsistentHashing,
19 RangeBased,
21 Hybrid,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct PartitionConfig {
28 pub strategy: PartitionStrategy,
30 pub virtual_nodes_per_node: usize,
32 pub replication_factor: usize,
34 pub enable_auto_rebalancing: bool,
36 pub rebalancing_threshold: f64,
38 pub max_keys_per_partition: usize,
40}
41
42impl Default for PartitionConfig {
43 fn default() -> Self {
44 Self {
45 strategy: PartitionStrategy::ConsistentHashing,
46 virtual_nodes_per_node: 150,
47 replication_factor: 3,
48 enable_auto_rebalancing: true,
49 rebalancing_threshold: 0.15,
50 max_keys_per_partition: 100_000,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct VirtualNode {
58 pub id: u64,
60 pub physical_node: OxirsNodeId,
62 pub hash_position: u64,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct RangePartition {
69 pub id: usize,
71 pub start_key: String,
73 pub end_key: String,
75 pub node_id: OxirsNodeId,
77 pub key_count: usize,
79 pub size_bytes: usize,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct PartitionAssignment {
86 pub primary_node: OxirsNodeId,
88 pub replica_nodes: Vec<OxirsNodeId>,
90 pub weight: f64,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct PartitioningStats {
97 pub total_partitions: usize,
99 pub total_virtual_nodes: usize,
101 pub avg_keys_per_partition: f64,
103 pub key_distribution_stddev: f64,
105 pub rebalancing_ops: usize,
107 pub last_rebalancing: Option<std::time::SystemTime>,
109}
110
111impl Default for PartitioningStats {
112 fn default() -> Self {
113 Self {
114 total_partitions: 0,
115 total_virtual_nodes: 0,
116 avg_keys_per_partition: 0.0,
117 key_distribution_stddev: 0.0,
118 rebalancing_ops: 0,
119 last_rebalancing: None,
120 }
121 }
122}
123
124pub struct AdvancedPartitioning {
126 config: PartitionConfig,
127 hash_ring: Arc<RwLock<Vec<VirtualNode>>>,
129 range_partitions: Arc<RwLock<Vec<RangePartition>>>,
131 node_partitions: Arc<RwLock<BTreeMap<OxirsNodeId, Vec<usize>>>>,
133 stats: Arc<RwLock<PartitioningStats>>,
135 active_nodes: Arc<RwLock<Vec<OxirsNodeId>>>,
137}
138
139impl AdvancedPartitioning {
140 pub fn new(config: PartitionConfig) -> Self {
142 Self {
143 config,
144 hash_ring: Arc::new(RwLock::new(Vec::new())),
145 range_partitions: Arc::new(RwLock::new(Vec::new())),
146 node_partitions: Arc::new(RwLock::new(BTreeMap::new())),
147 stats: Arc::new(RwLock::new(PartitioningStats::default())),
148 active_nodes: Arc::new(RwLock::new(Vec::new())),
149 }
150 }
151
152 pub async fn register_node(&self, node_id: OxirsNodeId) {
154 let mut active_nodes = self.active_nodes.write().await;
155 if !active_nodes.contains(&node_id) {
156 active_nodes.push(node_id);
157 info!("Registered node {} for partitioning", node_id);
158 }
159 drop(active_nodes);
160
161 match self.config.strategy {
162 PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
163 self.add_virtual_nodes(node_id).await;
164 }
165 PartitionStrategy::RangeBased => {
166 self.rebalance_ranges().await;
167 }
168 }
169 }
170
171 pub async fn unregister_node(&self, node_id: OxirsNodeId) {
173 let mut active_nodes = self.active_nodes.write().await;
174 active_nodes.retain(|&id| id != node_id);
175 drop(active_nodes);
176
177 match self.config.strategy {
178 PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
179 self.remove_virtual_nodes(node_id).await;
180 }
181 PartitionStrategy::RangeBased => {
182 self.rebalance_ranges().await;
183 }
184 }
185
186 info!("Unregistered node {} from partitioning", node_id);
187 }
188
189 async fn add_virtual_nodes(&self, node_id: OxirsNodeId) {
191 let mut hash_ring = self.hash_ring.write().await;
192
193 for i in 0..self.config.virtual_nodes_per_node {
194 let vnode_id = (node_id << 32) | (i as u64);
195 let hash_position = Self::hash_virtual_node(vnode_id);
196
197 let vnode = VirtualNode {
198 id: vnode_id,
199 physical_node: node_id,
200 hash_position,
201 };
202
203 hash_ring.push(vnode);
204 }
205
206 hash_ring.sort_by_key(|vnode| vnode.hash_position);
208
209 let mut stats = self.stats.write().await;
211 stats.total_virtual_nodes = hash_ring.len();
212
213 info!(
214 "Added {} virtual nodes for physical node {}",
215 self.config.virtual_nodes_per_node, node_id
216 );
217 }
218
219 async fn remove_virtual_nodes(&self, node_id: OxirsNodeId) {
221 let mut hash_ring = self.hash_ring.write().await;
222 hash_ring.retain(|vnode| vnode.physical_node != node_id);
223
224 let mut stats = self.stats.write().await;
225 stats.total_virtual_nodes = hash_ring.len();
226
227 info!("Removed virtual nodes for physical node {}", node_id);
228 }
229
230 fn hash_virtual_node(vnode_id: u64) -> u64 {
232 let mut hash: u64 = 0xcbf29ce484222325;
234 let bytes = vnode_id.to_le_bytes();
235
236 for byte in bytes {
237 hash ^= byte as u64;
238 hash = hash.wrapping_mul(0x100000001b3);
239 }
240
241 hash
242 }
243
244 pub fn hash_key(key: &str) -> u64 {
246 let mut hash: u64 = 0xcbf29ce484222325;
247
248 for byte in key.bytes() {
249 hash ^= byte as u64;
250 hash = hash.wrapping_mul(0x100000001b3);
251 }
252
253 hash
254 }
255
256 pub async fn get_partition_assignment(&self, key: &str) -> Option<PartitionAssignment> {
258 match self.config.strategy {
259 PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
260 self.get_consistent_hash_assignment(key).await
261 }
262 PartitionStrategy::RangeBased => self.get_range_based_assignment(key).await,
263 }
264 }
265
266 async fn get_consistent_hash_assignment(&self, key: &str) -> Option<PartitionAssignment> {
268 let hash_ring = self.hash_ring.read().await;
269
270 if hash_ring.is_empty() {
271 return None;
272 }
273
274 let key_hash = Self::hash_key(key);
275
276 let pos = match hash_ring.binary_search_by_key(&key_hash, |vnode| vnode.hash_position) {
278 Ok(idx) => idx,
279 Err(idx) => {
280 if idx >= hash_ring.len() {
281 0 } else {
283 idx
284 }
285 }
286 };
287
288 let primary_node = hash_ring[pos].physical_node;
290
291 let mut replica_nodes = Vec::new();
293 let mut seen = std::collections::HashSet::new();
294 seen.insert(primary_node);
295
296 let mut current_pos = (pos + 1) % hash_ring.len();
297 while replica_nodes.len() < self.config.replication_factor - 1
298 && seen.len() < hash_ring.len()
299 {
300 let physical_node = hash_ring[current_pos].physical_node;
301 if !seen.contains(&physical_node) {
302 replica_nodes.push(physical_node);
303 seen.insert(physical_node);
304 }
305 current_pos = (current_pos + 1) % hash_ring.len();
306 }
307
308 Some(PartitionAssignment {
309 primary_node,
310 replica_nodes,
311 weight: 1.0 / hash_ring.len() as f64,
312 })
313 }
314
315 async fn get_range_based_assignment(&self, key: &str) -> Option<PartitionAssignment> {
317 let range_partitions = self.range_partitions.read().await;
318
319 for partition in range_partitions.iter() {
321 if key >= partition.start_key.as_str() && key < partition.end_key.as_str() {
322 let active_nodes = self.active_nodes.read().await;
324 let mut replica_nodes = Vec::new();
325
326 for node_id in active_nodes.iter() {
327 if *node_id != partition.node_id
328 && replica_nodes.len() < self.config.replication_factor - 1
329 {
330 replica_nodes.push(*node_id);
331 }
332 }
333
334 return Some(PartitionAssignment {
335 primary_node: partition.node_id,
336 replica_nodes,
337 weight: partition.key_count as f64 / self.config.max_keys_per_partition as f64,
338 });
339 }
340 }
341
342 None
343 }
344
345 async fn rebalance_ranges(&self) {
347 let active_nodes = self.active_nodes.read().await;
348
349 if active_nodes.is_empty() {
350 return;
351 }
352
353 let mut range_partitions = self.range_partitions.write().await;
354
355 if range_partitions.is_empty() || range_partitions.len() != active_nodes.len() {
357 range_partitions.clear(); let nodes_count = active_nodes.len();
360 for (i, &node_id) in active_nodes.iter().enumerate() {
361 let partition = RangePartition {
362 id: i,
363 start_key: if i == 0 {
364 String::new()
365 } else {
366 format!("partition_{}", i)
367 },
368 end_key: if i == nodes_count - 1 {
369 String::from("\u{10ffff}") } else {
371 format!("partition_{}", i + 1)
372 },
373 node_id,
374 key_count: 0,
375 size_bytes: 0,
376 };
377 range_partitions.push(partition);
378 }
379
380 info!(
381 "Created {} range partitions for {} nodes",
382 nodes_count, nodes_count
383 );
384 } else {
385 if !self.config.enable_auto_rebalancing {
387 return;
388 }
389
390 let avg_keys = range_partitions.iter().map(|p| p.key_count).sum::<usize>() as f64
391 / range_partitions.len() as f64;
392
393 let mut needs_rebalancing = false;
394 for partition in range_partitions.iter() {
395 let deviation = (partition.key_count as f64 - avg_keys).abs() / avg_keys.max(1.0);
396 if deviation > self.config.rebalancing_threshold {
397 needs_rebalancing = true;
398 break;
399 }
400 }
401
402 if needs_rebalancing {
403 let nodes_count = active_nodes.len();
405 for (i, partition) in range_partitions.iter_mut().enumerate() {
406 partition.node_id = active_nodes[i % nodes_count];
407 }
408
409 let mut stats = self.stats.write().await;
410 stats.rebalancing_ops += 1;
411 stats.last_rebalancing = Some(std::time::SystemTime::now());
412
413 info!("Rebalanced {} range partitions", range_partitions.len());
414 }
415 }
416
417 let mut stats = self.stats.write().await;
419 stats.total_partitions = range_partitions.len();
420 }
421
422 pub async fn update_partition_stats(&self, key: &str, size_delta: isize) {
424 match self.config.strategy {
425 PartitionStrategy::RangeBased | PartitionStrategy::Hybrid => {
426 let mut range_partitions = self.range_partitions.write().await;
427
428 for partition in range_partitions.iter_mut() {
429 if key >= partition.start_key.as_str() && key < partition.end_key.as_str() {
430 if size_delta > 0 {
431 partition.key_count += 1;
432 partition.size_bytes += size_delta as usize;
433 } else if size_delta < 0 && partition.key_count > 0 {
434 partition.key_count -= 1;
435 partition.size_bytes =
436 partition.size_bytes.saturating_sub((-size_delta) as usize);
437 }
438 break;
439 }
440 }
441 }
442 _ => {}
443 }
444 }
445
446 pub async fn get_node_partitions(&self, node_id: OxirsNodeId) -> Vec<usize> {
448 let node_partitions = self.node_partitions.read().await;
449 node_partitions.get(&node_id).cloned().unwrap_or_default()
450 }
451
452 pub async fn get_stats(&self) -> PartitioningStats {
454 let mut stats = self.stats.read().await.clone();
455
456 match self.config.strategy {
457 PartitionStrategy::RangeBased | PartitionStrategy::Hybrid => {
458 let range_partitions = self.range_partitions.read().await;
459 if !range_partitions.is_empty() {
460 let total_keys: usize = range_partitions.iter().map(|p| p.key_count).sum();
461 stats.avg_keys_per_partition =
462 total_keys as f64 / range_partitions.len() as f64;
463
464 let variance: f64 = range_partitions
466 .iter()
467 .map(|p| {
468 let diff = p.key_count as f64 - stats.avg_keys_per_partition;
469 diff * diff
470 })
471 .sum::<f64>()
472 / range_partitions.len() as f64;
473
474 stats.key_distribution_stddev = variance.sqrt();
475 }
476 }
477 _ => {}
478 }
479
480 stats
481 }
482
483 pub async fn get_virtual_nodes(&self) -> Vec<VirtualNode> {
485 self.hash_ring.read().await.clone()
486 }
487
488 pub async fn get_range_partitions(&self) -> Vec<RangePartition> {
490 self.range_partitions.read().await.clone()
491 }
492
493 pub async fn check_rebalancing_needed(&self) -> bool {
495 if !self.config.enable_auto_rebalancing {
496 return false;
497 }
498
499 let stats = self.get_stats().await;
500
501 if stats.avg_keys_per_partition == 0.0 {
502 return false;
503 }
504
505 stats.key_distribution_stddev / stats.avg_keys_per_partition
506 > self.config.rebalancing_threshold
507 }
508
509 pub async fn perform_rebalancing(&self) {
511 if !self.check_rebalancing_needed().await {
512 return;
513 }
514
515 match self.config.strategy {
516 PartitionStrategy::RangeBased | PartitionStrategy::Hybrid => {
517 self.rebalance_ranges().await;
518 }
519 PartitionStrategy::ConsistentHashing => {
520 warn!("Consistent hashing rebalancing triggered, but not needed");
522 }
523 }
524 }
525
526 pub async fn clear(&self) {
528 self.hash_ring.write().await.clear();
529 self.range_partitions.write().await.clear();
530 self.node_partitions.write().await.clear();
531 self.active_nodes.write().await.clear();
532 *self.stats.write().await = PartitioningStats::default();
533 }
534
535 pub async fn get_load_distribution(&self) -> BTreeMap<OxirsNodeId, f64> {
537 let mut distribution = BTreeMap::new();
538
539 match self.config.strategy {
540 PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
541 let hash_ring = self.hash_ring.read().await;
542 let total_vnodes = hash_ring.len() as f64;
543
544 for vnode in hash_ring.iter() {
545 *distribution.entry(vnode.physical_node).or_insert(0.0) += 1.0 / total_vnodes;
546 }
547 }
548 PartitionStrategy::RangeBased => {
549 let range_partitions = self.range_partitions.read().await;
550 let total_keys: usize = range_partitions.iter().map(|p| p.key_count).sum();
551
552 if total_keys > 0 {
553 for partition in range_partitions.iter() {
554 *distribution.entry(partition.node_id).or_insert(0.0) +=
555 partition.key_count as f64 / total_keys as f64;
556 }
557 }
558 }
559 }
560
561 distribution
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568
569 #[tokio::test]
570 async fn test_partitioning_creation() {
571 let config = PartitionConfig::default();
572 let partitioning = AdvancedPartitioning::new(config);
573
574 let stats = partitioning.get_stats().await;
575 assert_eq!(stats.total_partitions, 0);
576 assert_eq!(stats.total_virtual_nodes, 0);
577 }
578
579 #[tokio::test]
580 async fn test_register_node_consistent_hashing() {
581 let config = PartitionConfig {
582 strategy: PartitionStrategy::ConsistentHashing,
583 virtual_nodes_per_node: 10,
584 ..Default::default()
585 };
586 let partitioning = AdvancedPartitioning::new(config);
587
588 partitioning.register_node(1).await;
589
590 let stats = partitioning.get_stats().await;
591 assert_eq!(stats.total_virtual_nodes, 10);
592
593 let vnodes = partitioning.get_virtual_nodes().await;
594 assert_eq!(vnodes.len(), 10);
595 }
596
597 #[tokio::test]
598 async fn test_register_multiple_nodes() {
599 let config = PartitionConfig {
600 strategy: PartitionStrategy::ConsistentHashing,
601 virtual_nodes_per_node: 5,
602 ..Default::default()
603 };
604 let partitioning = AdvancedPartitioning::new(config);
605
606 partitioning.register_node(1).await;
607 partitioning.register_node(2).await;
608 partitioning.register_node(3).await;
609
610 let stats = partitioning.get_stats().await;
611 assert_eq!(stats.total_virtual_nodes, 15);
612 }
613
614 #[tokio::test]
615 async fn test_unregister_node() {
616 let config = PartitionConfig {
617 strategy: PartitionStrategy::ConsistentHashing,
618 virtual_nodes_per_node: 10,
619 ..Default::default()
620 };
621 let partitioning = AdvancedPartitioning::new(config);
622
623 partitioning.register_node(1).await;
624 partitioning.register_node(2).await;
625
626 partitioning.unregister_node(1).await;
627
628 let stats = partitioning.get_stats().await;
629 assert_eq!(stats.total_virtual_nodes, 10);
630 }
631
632 #[tokio::test]
633 async fn test_consistent_hash_assignment() {
634 let config = PartitionConfig {
635 strategy: PartitionStrategy::ConsistentHashing,
636 virtual_nodes_per_node: 50,
637 replication_factor: 3,
638 ..Default::default()
639 };
640 let partitioning = AdvancedPartitioning::new(config);
641
642 partitioning.register_node(1).await;
643 partitioning.register_node(2).await;
644 partitioning.register_node(3).await;
645
646 let assignment = partitioning.get_partition_assignment("test_key").await;
647 assert!(assignment.is_some());
648
649 let assignment = assignment.unwrap();
650 assert_eq!(assignment.replica_nodes.len(), 2); }
652
653 #[tokio::test]
654 async fn test_range_based_partitioning() {
655 let config = PartitionConfig {
656 strategy: PartitionStrategy::RangeBased,
657 ..Default::default()
658 };
659 let partitioning = AdvancedPartitioning::new(config);
660
661 partitioning.register_node(1).await;
662 partitioning.register_node(2).await;
663
664 let partitions = partitioning.get_range_partitions().await;
665 assert_eq!(partitions.len(), 2);
666 }
667
668 #[tokio::test]
669 async fn test_range_assignment() {
670 let config = PartitionConfig {
671 strategy: PartitionStrategy::RangeBased,
672 replication_factor: 2,
673 ..Default::default()
674 };
675 let partitioning = AdvancedPartitioning::new(config);
676
677 partitioning.register_node(1).await;
678 partitioning.register_node(2).await;
679
680 let assignment = partitioning.get_partition_assignment("test_key").await;
681 assert!(assignment.is_some());
682
683 let assignment = assignment.unwrap();
684 assert!(assignment.replica_nodes.len() <= 1);
685 }
686
687 #[tokio::test]
688 async fn test_update_partition_stats() {
689 let config = PartitionConfig {
690 strategy: PartitionStrategy::RangeBased,
691 ..Default::default()
692 };
693 let partitioning = AdvancedPartitioning::new(config);
694
695 partitioning.register_node(1).await;
696
697 partitioning.update_partition_stats("test_key", 100).await;
698
699 let stats = partitioning.get_stats().await;
700 assert!(stats.avg_keys_per_partition > 0.0);
701 }
702
703 #[tokio::test]
704 async fn test_load_distribution() {
705 let config = PartitionConfig {
706 strategy: PartitionStrategy::ConsistentHashing,
707 virtual_nodes_per_node: 100,
708 ..Default::default()
709 };
710 let partitioning = AdvancedPartitioning::new(config);
711
712 partitioning.register_node(1).await;
713 partitioning.register_node(2).await;
714
715 let distribution = partitioning.get_load_distribution().await;
716 assert_eq!(distribution.len(), 2);
717
718 for (_, load) in distribution.iter() {
720 assert!(*load > 0.4 && *load < 0.6);
721 }
722 }
723
724 #[tokio::test]
725 async fn test_hash_key_deterministic() {
726 let hash1 = AdvancedPartitioning::hash_key("test_key");
727 let hash2 = AdvancedPartitioning::hash_key("test_key");
728 assert_eq!(hash1, hash2);
729
730 let hash3 = AdvancedPartitioning::hash_key("different_key");
731 assert_ne!(hash1, hash3);
732 }
733
734 #[tokio::test]
735 async fn test_rebalancing_needed() {
736 let config = PartitionConfig {
737 strategy: PartitionStrategy::RangeBased,
738 enable_auto_rebalancing: true,
739 rebalancing_threshold: 0.1,
740 ..Default::default()
741 };
742 let partitioning = AdvancedPartitioning::new(config);
743
744 partitioning.register_node(1).await;
745 partitioning.register_node(2).await;
746
747 let needed = partitioning.check_rebalancing_needed().await;
749 assert!(!needed);
750 }
751
752 #[tokio::test]
753 async fn test_clear() {
754 let config = PartitionConfig::default();
755 let partitioning = AdvancedPartitioning::new(config);
756
757 partitioning.register_node(1).await;
758 partitioning.register_node(2).await;
759
760 partitioning.clear().await;
761
762 let stats = partitioning.get_stats().await;
763 assert_eq!(stats.total_virtual_nodes, 0);
764 assert_eq!(stats.total_partitions, 0);
765 }
766
767 #[tokio::test]
768 async fn test_virtual_node_ring_sorted() {
769 let config = PartitionConfig {
770 strategy: PartitionStrategy::ConsistentHashing,
771 virtual_nodes_per_node: 20,
772 ..Default::default()
773 };
774 let partitioning = AdvancedPartitioning::new(config);
775
776 partitioning.register_node(1).await;
777
778 let vnodes = partitioning.get_virtual_nodes().await;
779
780 for i in 1..vnodes.len() {
782 assert!(vnodes[i].hash_position >= vnodes[i - 1].hash_position);
783 }
784 }
785}