oxirs_cluster/
advanced_partitioning.rs

1//! # Advanced Partitioning Strategies
2//!
3//! Provides sophisticated data partitioning algorithms for distributed RDF storage
4//! including consistent hashing and range-based partitioning with automatic rebalancing.
5
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{info, warn};
11
12use crate::raft::OxirsNodeId;
13
14/// Partitioning strategy
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum PartitionStrategy {
17    /// Consistent hashing with virtual nodes
18    ConsistentHashing,
19    /// Range-based partitioning
20    RangeBased,
21    /// Hybrid approach
22    Hybrid,
23}
24
25/// Partition configuration
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct PartitionConfig {
28    /// Partitioning strategy
29    pub strategy: PartitionStrategy,
30    /// Number of virtual nodes per physical node (for consistent hashing)
31    pub virtual_nodes_per_node: usize,
32    /// Replication factor
33    pub replication_factor: usize,
34    /// Enable automatic rebalancing
35    pub enable_auto_rebalancing: bool,
36    /// Rebalancing threshold (0.0-1.0)
37    pub rebalancing_threshold: f64,
38    /// Maximum keys per partition (for range-based)
39    pub max_keys_per_partition: usize,
40}
41
42impl Default for PartitionConfig {
43    fn default() -> Self {
44        Self {
45            strategy: PartitionStrategy::ConsistentHashing,
46            virtual_nodes_per_node: 150,
47            replication_factor: 3,
48            enable_auto_rebalancing: true,
49            rebalancing_threshold: 0.15,
50            max_keys_per_partition: 100_000,
51        }
52    }
53}
54
55/// Virtual node in consistent hashing ring
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct VirtualNode {
58    /// Virtual node ID
59    pub id: u64,
60    /// Physical node ID
61    pub physical_node: OxirsNodeId,
62    /// Hash position on the ring
63    pub hash_position: u64,
64}
65
66/// Range partition
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct RangePartition {
69    /// Partition ID
70    pub id: usize,
71    /// Start key (inclusive)
72    pub start_key: String,
73    /// End key (exclusive)
74    pub end_key: String,
75    /// Assigned node
76    pub node_id: OxirsNodeId,
77    /// Number of keys in this partition
78    pub key_count: usize,
79    /// Estimated size in bytes
80    pub size_bytes: usize,
81}
82
83/// Partition assignment
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct PartitionAssignment {
86    /// Primary node
87    pub primary_node: OxirsNodeId,
88    /// Replica nodes
89    pub replica_nodes: Vec<OxirsNodeId>,
90    /// Partition weight (0.0-1.0)
91    pub weight: f64,
92}
93
94/// Partitioning statistics
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct PartitioningStats {
97    /// Total partitions
98    pub total_partitions: usize,
99    /// Total virtual nodes (consistent hashing only)
100    pub total_virtual_nodes: usize,
101    /// Average keys per partition
102    pub avg_keys_per_partition: f64,
103    /// Standard deviation of key distribution
104    pub key_distribution_stddev: f64,
105    /// Rebalancing operations performed
106    pub rebalancing_ops: usize,
107    /// Last rebalancing timestamp
108    pub last_rebalancing: Option<std::time::SystemTime>,
109}
110
111impl Default for PartitioningStats {
112    fn default() -> Self {
113        Self {
114            total_partitions: 0,
115            total_virtual_nodes: 0,
116            avg_keys_per_partition: 0.0,
117            key_distribution_stddev: 0.0,
118            rebalancing_ops: 0,
119            last_rebalancing: None,
120        }
121    }
122}
123
124/// Advanced partitioning manager
125pub struct AdvancedPartitioning {
126    config: PartitionConfig,
127    /// Consistent hashing ring (sorted by hash position)
128    hash_ring: Arc<RwLock<Vec<VirtualNode>>>,
129    /// Range partitions
130    range_partitions: Arc<RwLock<Vec<RangePartition>>>,
131    /// Node to partition mapping
132    node_partitions: Arc<RwLock<BTreeMap<OxirsNodeId, Vec<usize>>>>,
133    /// Partition statistics
134    stats: Arc<RwLock<PartitioningStats>>,
135    /// Active nodes
136    active_nodes: Arc<RwLock<Vec<OxirsNodeId>>>,
137}
138
139impl AdvancedPartitioning {
140    /// Create a new partitioning manager
141    pub fn new(config: PartitionConfig) -> Self {
142        Self {
143            config,
144            hash_ring: Arc::new(RwLock::new(Vec::new())),
145            range_partitions: Arc::new(RwLock::new(Vec::new())),
146            node_partitions: Arc::new(RwLock::new(BTreeMap::new())),
147            stats: Arc::new(RwLock::new(PartitioningStats::default())),
148            active_nodes: Arc::new(RwLock::new(Vec::new())),
149        }
150    }
151
152    /// Register a node in the partitioning system
153    pub async fn register_node(&self, node_id: OxirsNodeId) {
154        let mut active_nodes = self.active_nodes.write().await;
155        if !active_nodes.contains(&node_id) {
156            active_nodes.push(node_id);
157            info!("Registered node {} for partitioning", node_id);
158        }
159        drop(active_nodes);
160
161        match self.config.strategy {
162            PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
163                self.add_virtual_nodes(node_id).await;
164            }
165            PartitionStrategy::RangeBased => {
166                self.rebalance_ranges().await;
167            }
168        }
169    }
170
171    /// Remove a node from the partitioning system
172    pub async fn unregister_node(&self, node_id: OxirsNodeId) {
173        let mut active_nodes = self.active_nodes.write().await;
174        active_nodes.retain(|&id| id != node_id);
175        drop(active_nodes);
176
177        match self.config.strategy {
178            PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
179                self.remove_virtual_nodes(node_id).await;
180            }
181            PartitionStrategy::RangeBased => {
182                self.rebalance_ranges().await;
183            }
184        }
185
186        info!("Unregistered node {} from partitioning", node_id);
187    }
188
189    /// Add virtual nodes for a physical node (consistent hashing)
190    async fn add_virtual_nodes(&self, node_id: OxirsNodeId) {
191        let mut hash_ring = self.hash_ring.write().await;
192
193        for i in 0..self.config.virtual_nodes_per_node {
194            let vnode_id = (node_id << 32) | (i as u64);
195            let hash_position = Self::hash_virtual_node(vnode_id);
196
197            let vnode = VirtualNode {
198                id: vnode_id,
199                physical_node: node_id,
200                hash_position,
201            };
202
203            hash_ring.push(vnode);
204        }
205
206        // Sort ring by hash position
207        hash_ring.sort_by_key(|vnode| vnode.hash_position);
208
209        // Update stats
210        let mut stats = self.stats.write().await;
211        stats.total_virtual_nodes = hash_ring.len();
212
213        info!(
214            "Added {} virtual nodes for physical node {}",
215            self.config.virtual_nodes_per_node, node_id
216        );
217    }
218
219    /// Remove virtual nodes for a physical node
220    async fn remove_virtual_nodes(&self, node_id: OxirsNodeId) {
221        let mut hash_ring = self.hash_ring.write().await;
222        hash_ring.retain(|vnode| vnode.physical_node != node_id);
223
224        let mut stats = self.stats.write().await;
225        stats.total_virtual_nodes = hash_ring.len();
226
227        info!("Removed virtual nodes for physical node {}", node_id);
228    }
229
230    /// Hash a virtual node ID
231    fn hash_virtual_node(vnode_id: u64) -> u64 {
232        // Use FNV-1a hash for speed
233        let mut hash: u64 = 0xcbf29ce484222325;
234        let bytes = vnode_id.to_le_bytes();
235
236        for byte in bytes {
237            hash ^= byte as u64;
238            hash = hash.wrapping_mul(0x100000001b3);
239        }
240
241        hash
242    }
243
244    /// Hash a key for consistent hashing
245    pub fn hash_key(key: &str) -> u64 {
246        let mut hash: u64 = 0xcbf29ce484222325;
247
248        for byte in key.bytes() {
249            hash ^= byte as u64;
250            hash = hash.wrapping_mul(0x100000001b3);
251        }
252
253        hash
254    }
255
256    /// Get partition assignment for a key (consistent hashing)
257    pub async fn get_partition_assignment(&self, key: &str) -> Option<PartitionAssignment> {
258        match self.config.strategy {
259            PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
260                self.get_consistent_hash_assignment(key).await
261            }
262            PartitionStrategy::RangeBased => self.get_range_based_assignment(key).await,
263        }
264    }
265
266    /// Get assignment using consistent hashing
267    async fn get_consistent_hash_assignment(&self, key: &str) -> Option<PartitionAssignment> {
268        let hash_ring = self.hash_ring.read().await;
269
270        if hash_ring.is_empty() {
271            return None;
272        }
273
274        let key_hash = Self::hash_key(key);
275
276        // Binary search for the first virtual node >= key_hash
277        let pos = match hash_ring.binary_search_by_key(&key_hash, |vnode| vnode.hash_position) {
278            Ok(idx) => idx,
279            Err(idx) => {
280                if idx >= hash_ring.len() {
281                    0 // Wrap around
282                } else {
283                    idx
284                }
285            }
286        };
287
288        // Get primary node
289        let primary_node = hash_ring[pos].physical_node;
290
291        // Get replica nodes (next N-1 distinct physical nodes on the ring)
292        let mut replica_nodes = Vec::new();
293        let mut seen = std::collections::HashSet::new();
294        seen.insert(primary_node);
295
296        let mut current_pos = (pos + 1) % hash_ring.len();
297        while replica_nodes.len() < self.config.replication_factor - 1
298            && seen.len() < hash_ring.len()
299        {
300            let physical_node = hash_ring[current_pos].physical_node;
301            if !seen.contains(&physical_node) {
302                replica_nodes.push(physical_node);
303                seen.insert(physical_node);
304            }
305            current_pos = (current_pos + 1) % hash_ring.len();
306        }
307
308        Some(PartitionAssignment {
309            primary_node,
310            replica_nodes,
311            weight: 1.0 / hash_ring.len() as f64,
312        })
313    }
314
315    /// Get assignment using range-based partitioning
316    async fn get_range_based_assignment(&self, key: &str) -> Option<PartitionAssignment> {
317        let range_partitions = self.range_partitions.read().await;
318
319        // Binary search for the partition containing this key
320        for partition in range_partitions.iter() {
321            if key >= partition.start_key.as_str() && key < partition.end_key.as_str() {
322                // For range-based, replicas are next N-1 partitions
323                let active_nodes = self.active_nodes.read().await;
324                let mut replica_nodes = Vec::new();
325
326                for node_id in active_nodes.iter() {
327                    if *node_id != partition.node_id
328                        && replica_nodes.len() < self.config.replication_factor - 1
329                    {
330                        replica_nodes.push(*node_id);
331                    }
332                }
333
334                return Some(PartitionAssignment {
335                    primary_node: partition.node_id,
336                    replica_nodes,
337                    weight: partition.key_count as f64 / self.config.max_keys_per_partition as f64,
338                });
339            }
340        }
341
342        None
343    }
344
345    /// Rebalance range partitions
346    async fn rebalance_ranges(&self) {
347        let active_nodes = self.active_nodes.read().await;
348
349        if active_nodes.is_empty() {
350            return;
351        }
352
353        let mut range_partitions = self.range_partitions.write().await;
354
355        // If no partitions exist OR number of partitions doesn't match nodes, (re)create partitions
356        if range_partitions.is_empty() || range_partitions.len() != active_nodes.len() {
357            range_partitions.clear(); // Clear old partitions if resizing
358
359            let nodes_count = active_nodes.len();
360            for (i, &node_id) in active_nodes.iter().enumerate() {
361                let partition = RangePartition {
362                    id: i,
363                    start_key: if i == 0 {
364                        String::new()
365                    } else {
366                        format!("partition_{}", i)
367                    },
368                    end_key: if i == nodes_count - 1 {
369                        String::from("\u{10ffff}") // Maximum Unicode
370                    } else {
371                        format!("partition_{}", i + 1)
372                    },
373                    node_id,
374                    key_count: 0,
375                    size_bytes: 0,
376                };
377                range_partitions.push(partition);
378            }
379
380            info!(
381                "Created {} range partitions for {} nodes",
382                nodes_count, nodes_count
383            );
384        } else {
385            // Check if rebalancing is needed
386            if !self.config.enable_auto_rebalancing {
387                return;
388            }
389
390            let avg_keys = range_partitions.iter().map(|p| p.key_count).sum::<usize>() as f64
391                / range_partitions.len() as f64;
392
393            let mut needs_rebalancing = false;
394            for partition in range_partitions.iter() {
395                let deviation = (partition.key_count as f64 - avg_keys).abs() / avg_keys.max(1.0);
396                if deviation > self.config.rebalancing_threshold {
397                    needs_rebalancing = true;
398                    break;
399                }
400            }
401
402            if needs_rebalancing {
403                // Simple rebalancing: redistribute partitions among nodes
404                let nodes_count = active_nodes.len();
405                for (i, partition) in range_partitions.iter_mut().enumerate() {
406                    partition.node_id = active_nodes[i % nodes_count];
407                }
408
409                let mut stats = self.stats.write().await;
410                stats.rebalancing_ops += 1;
411                stats.last_rebalancing = Some(std::time::SystemTime::now());
412
413                info!("Rebalanced {} range partitions", range_partitions.len());
414            }
415        }
416
417        // Update stats
418        let mut stats = self.stats.write().await;
419        stats.total_partitions = range_partitions.len();
420    }
421
422    /// Update partition statistics (call after data operations)
423    pub async fn update_partition_stats(&self, key: &str, size_delta: isize) {
424        match self.config.strategy {
425            PartitionStrategy::RangeBased | PartitionStrategy::Hybrid => {
426                let mut range_partitions = self.range_partitions.write().await;
427
428                for partition in range_partitions.iter_mut() {
429                    if key >= partition.start_key.as_str() && key < partition.end_key.as_str() {
430                        if size_delta > 0 {
431                            partition.key_count += 1;
432                            partition.size_bytes += size_delta as usize;
433                        } else if size_delta < 0 && partition.key_count > 0 {
434                            partition.key_count -= 1;
435                            partition.size_bytes =
436                                partition.size_bytes.saturating_sub((-size_delta) as usize);
437                        }
438                        break;
439                    }
440                }
441            }
442            _ => {}
443        }
444    }
445
446    /// Get all partition assignments for a node
447    pub async fn get_node_partitions(&self, node_id: OxirsNodeId) -> Vec<usize> {
448        let node_partitions = self.node_partitions.read().await;
449        node_partitions.get(&node_id).cloned().unwrap_or_default()
450    }
451
452    /// Get partition statistics
453    pub async fn get_stats(&self) -> PartitioningStats {
454        let mut stats = self.stats.read().await.clone();
455
456        match self.config.strategy {
457            PartitionStrategy::RangeBased | PartitionStrategy::Hybrid => {
458                let range_partitions = self.range_partitions.read().await;
459                if !range_partitions.is_empty() {
460                    let total_keys: usize = range_partitions.iter().map(|p| p.key_count).sum();
461                    stats.avg_keys_per_partition =
462                        total_keys as f64 / range_partitions.len() as f64;
463
464                    // Calculate standard deviation
465                    let variance: f64 = range_partitions
466                        .iter()
467                        .map(|p| {
468                            let diff = p.key_count as f64 - stats.avg_keys_per_partition;
469                            diff * diff
470                        })
471                        .sum::<f64>()
472                        / range_partitions.len() as f64;
473
474                    stats.key_distribution_stddev = variance.sqrt();
475                }
476            }
477            _ => {}
478        }
479
480        stats
481    }
482
483    /// Get all virtual nodes (for debugging/monitoring)
484    pub async fn get_virtual_nodes(&self) -> Vec<VirtualNode> {
485        self.hash_ring.read().await.clone()
486    }
487
488    /// Get all range partitions
489    pub async fn get_range_partitions(&self) -> Vec<RangePartition> {
490        self.range_partitions.read().await.clone()
491    }
492
493    /// Check if rebalancing is needed
494    pub async fn check_rebalancing_needed(&self) -> bool {
495        if !self.config.enable_auto_rebalancing {
496            return false;
497        }
498
499        let stats = self.get_stats().await;
500
501        if stats.avg_keys_per_partition == 0.0 {
502            return false;
503        }
504
505        stats.key_distribution_stddev / stats.avg_keys_per_partition
506            > self.config.rebalancing_threshold
507    }
508
509    /// Perform rebalancing if needed
510    pub async fn perform_rebalancing(&self) {
511        if !self.check_rebalancing_needed().await {
512            return;
513        }
514
515        match self.config.strategy {
516            PartitionStrategy::RangeBased | PartitionStrategy::Hybrid => {
517                self.rebalance_ranges().await;
518            }
519            PartitionStrategy::ConsistentHashing => {
520                // Consistent hashing auto-rebalances via virtual nodes
521                warn!("Consistent hashing rebalancing triggered, but not needed");
522            }
523        }
524    }
525
526    /// Clear all data
527    pub async fn clear(&self) {
528        self.hash_ring.write().await.clear();
529        self.range_partitions.write().await.clear();
530        self.node_partitions.write().await.clear();
531        self.active_nodes.write().await.clear();
532        *self.stats.write().await = PartitioningStats::default();
533    }
534
535    /// Get load distribution (percentage of data per node)
536    pub async fn get_load_distribution(&self) -> BTreeMap<OxirsNodeId, f64> {
537        let mut distribution = BTreeMap::new();
538
539        match self.config.strategy {
540            PartitionStrategy::ConsistentHashing | PartitionStrategy::Hybrid => {
541                let hash_ring = self.hash_ring.read().await;
542                let total_vnodes = hash_ring.len() as f64;
543
544                for vnode in hash_ring.iter() {
545                    *distribution.entry(vnode.physical_node).or_insert(0.0) += 1.0 / total_vnodes;
546                }
547            }
548            PartitionStrategy::RangeBased => {
549                let range_partitions = self.range_partitions.read().await;
550                let total_keys: usize = range_partitions.iter().map(|p| p.key_count).sum();
551
552                if total_keys > 0 {
553                    for partition in range_partitions.iter() {
554                        *distribution.entry(partition.node_id).or_insert(0.0) +=
555                            partition.key_count as f64 / total_keys as f64;
556                    }
557                }
558            }
559        }
560
561        distribution
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568
569    #[tokio::test]
570    async fn test_partitioning_creation() {
571        let config = PartitionConfig::default();
572        let partitioning = AdvancedPartitioning::new(config);
573
574        let stats = partitioning.get_stats().await;
575        assert_eq!(stats.total_partitions, 0);
576        assert_eq!(stats.total_virtual_nodes, 0);
577    }
578
579    #[tokio::test]
580    async fn test_register_node_consistent_hashing() {
581        let config = PartitionConfig {
582            strategy: PartitionStrategy::ConsistentHashing,
583            virtual_nodes_per_node: 10,
584            ..Default::default()
585        };
586        let partitioning = AdvancedPartitioning::new(config);
587
588        partitioning.register_node(1).await;
589
590        let stats = partitioning.get_stats().await;
591        assert_eq!(stats.total_virtual_nodes, 10);
592
593        let vnodes = partitioning.get_virtual_nodes().await;
594        assert_eq!(vnodes.len(), 10);
595    }
596
597    #[tokio::test]
598    async fn test_register_multiple_nodes() {
599        let config = PartitionConfig {
600            strategy: PartitionStrategy::ConsistentHashing,
601            virtual_nodes_per_node: 5,
602            ..Default::default()
603        };
604        let partitioning = AdvancedPartitioning::new(config);
605
606        partitioning.register_node(1).await;
607        partitioning.register_node(2).await;
608        partitioning.register_node(3).await;
609
610        let stats = partitioning.get_stats().await;
611        assert_eq!(stats.total_virtual_nodes, 15);
612    }
613
614    #[tokio::test]
615    async fn test_unregister_node() {
616        let config = PartitionConfig {
617            strategy: PartitionStrategy::ConsistentHashing,
618            virtual_nodes_per_node: 10,
619            ..Default::default()
620        };
621        let partitioning = AdvancedPartitioning::new(config);
622
623        partitioning.register_node(1).await;
624        partitioning.register_node(2).await;
625
626        partitioning.unregister_node(1).await;
627
628        let stats = partitioning.get_stats().await;
629        assert_eq!(stats.total_virtual_nodes, 10);
630    }
631
632    #[tokio::test]
633    async fn test_consistent_hash_assignment() {
634        let config = PartitionConfig {
635            strategy: PartitionStrategy::ConsistentHashing,
636            virtual_nodes_per_node: 50,
637            replication_factor: 3,
638            ..Default::default()
639        };
640        let partitioning = AdvancedPartitioning::new(config);
641
642        partitioning.register_node(1).await;
643        partitioning.register_node(2).await;
644        partitioning.register_node(3).await;
645
646        let assignment = partitioning.get_partition_assignment("test_key").await;
647        assert!(assignment.is_some());
648
649        let assignment = assignment.unwrap();
650        assert_eq!(assignment.replica_nodes.len(), 2); // replication_factor - 1
651    }
652
653    #[tokio::test]
654    async fn test_range_based_partitioning() {
655        let config = PartitionConfig {
656            strategy: PartitionStrategy::RangeBased,
657            ..Default::default()
658        };
659        let partitioning = AdvancedPartitioning::new(config);
660
661        partitioning.register_node(1).await;
662        partitioning.register_node(2).await;
663
664        let partitions = partitioning.get_range_partitions().await;
665        assert_eq!(partitions.len(), 2);
666    }
667
668    #[tokio::test]
669    async fn test_range_assignment() {
670        let config = PartitionConfig {
671            strategy: PartitionStrategy::RangeBased,
672            replication_factor: 2,
673            ..Default::default()
674        };
675        let partitioning = AdvancedPartitioning::new(config);
676
677        partitioning.register_node(1).await;
678        partitioning.register_node(2).await;
679
680        let assignment = partitioning.get_partition_assignment("test_key").await;
681        assert!(assignment.is_some());
682
683        let assignment = assignment.unwrap();
684        assert!(assignment.replica_nodes.len() <= 1);
685    }
686
687    #[tokio::test]
688    async fn test_update_partition_stats() {
689        let config = PartitionConfig {
690            strategy: PartitionStrategy::RangeBased,
691            ..Default::default()
692        };
693        let partitioning = AdvancedPartitioning::new(config);
694
695        partitioning.register_node(1).await;
696
697        partitioning.update_partition_stats("test_key", 100).await;
698
699        let stats = partitioning.get_stats().await;
700        assert!(stats.avg_keys_per_partition > 0.0);
701    }
702
703    #[tokio::test]
704    async fn test_load_distribution() {
705        let config = PartitionConfig {
706            strategy: PartitionStrategy::ConsistentHashing,
707            virtual_nodes_per_node: 100,
708            ..Default::default()
709        };
710        let partitioning = AdvancedPartitioning::new(config);
711
712        partitioning.register_node(1).await;
713        partitioning.register_node(2).await;
714
715        let distribution = partitioning.get_load_distribution().await;
716        assert_eq!(distribution.len(), 2);
717
718        // Each node should have roughly 50% of virtual nodes
719        for (_, load) in distribution.iter() {
720            assert!(*load > 0.4 && *load < 0.6);
721        }
722    }
723
724    #[tokio::test]
725    async fn test_hash_key_deterministic() {
726        let hash1 = AdvancedPartitioning::hash_key("test_key");
727        let hash2 = AdvancedPartitioning::hash_key("test_key");
728        assert_eq!(hash1, hash2);
729
730        let hash3 = AdvancedPartitioning::hash_key("different_key");
731        assert_ne!(hash1, hash3);
732    }
733
734    #[tokio::test]
735    async fn test_rebalancing_needed() {
736        let config = PartitionConfig {
737            strategy: PartitionStrategy::RangeBased,
738            enable_auto_rebalancing: true,
739            rebalancing_threshold: 0.1,
740            ..Default::default()
741        };
742        let partitioning = AdvancedPartitioning::new(config);
743
744        partitioning.register_node(1).await;
745        partitioning.register_node(2).await;
746
747        // Initially balanced
748        let needed = partitioning.check_rebalancing_needed().await;
749        assert!(!needed);
750    }
751
752    #[tokio::test]
753    async fn test_clear() {
754        let config = PartitionConfig::default();
755        let partitioning = AdvancedPartitioning::new(config);
756
757        partitioning.register_node(1).await;
758        partitioning.register_node(2).await;
759
760        partitioning.clear().await;
761
762        let stats = partitioning.get_stats().await;
763        assert_eq!(stats.total_virtual_nodes, 0);
764        assert_eq!(stats.total_partitions, 0);
765    }
766
767    #[tokio::test]
768    async fn test_virtual_node_ring_sorted() {
769        let config = PartitionConfig {
770            strategy: PartitionStrategy::ConsistentHashing,
771            virtual_nodes_per_node: 20,
772            ..Default::default()
773        };
774        let partitioning = AdvancedPartitioning::new(config);
775
776        partitioning.register_node(1).await;
777
778        let vnodes = partitioning.get_virtual_nodes().await;
779
780        // Verify sorted order
781        for i in 1..vnodes.len() {
782            assert!(vnodes[i].hash_position >= vnodes[i - 1].hash_position);
783        }
784    }
785}