scirs2_metrics/optimization/distributed/sharding/
mod.rs

1//! Data sharding and distribution management
2//!
3//! This module provides comprehensive data sharding capabilities:
4//! - Consistent hashing for shard distribution
5//! - Dynamic resharding and rebalancing
6//! - Shard migration and replication
7//! - Data locality optimization
8
9use crate::error::{MetricsError, Result};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet};
12use std::hash::{Hash, Hasher};
13use std::net::SocketAddr;
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant, SystemTime};
16
17pub use super::config::{HashFunction, ShardingConfig, ShardingStrategy};
18
19/// Data shard representation
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DataShard {
22    /// Shard ID
23    pub id: String,
24    /// Shard range
25    pub range: DataRange,
26    /// Primary node for this shard
27    pub primary_node: String,
28    /// Replica nodes
29    pub replicas: Vec<String>,
30    /// Data size (bytes)
31    pub size_bytes: u64,
32    /// Number of keys in shard
33    pub key_count: usize,
34    /// Last access time
35    pub last_access: SystemTime,
36    /// Shard status
37    pub status: ShardStatus,
38    /// Migration info (if being migrated)
39    pub migration: Option<ShardMigration>,
40}
41
42/// Shard status
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44pub enum ShardStatus {
45    /// Shard is active and serving requests
46    Active,
47    /// Shard is being migrated
48    Migrating,
49    /// Shard is being split
50    Splitting,
51    /// Shard is being merged
52    Merging,
53    /// Shard is inactive/offline
54    Inactive,
55    /// Shard is in error state
56    Error(String),
57}
58
59/// Data range for sharding
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum DataRange {
62    /// Hash range (start_hash, end_hash)
63    Hash { start: u64, end: u64 },
64    /// Key range (start_key, end_key)
65    Key { start: String, end: String },
66    /// Numeric range (start, end)
67    Numeric { start: f64, end: f64 },
68    /// Time range
69    Time { start: SystemTime, end: SystemTime },
70    /// Geographic range
71    Geographic {
72        lat_min: f64,
73        lat_max: f64,
74        lon_min: f64,
75        lon_max: f64,
76    },
77    /// Custom range
78    Custom {
79        range_type: String,
80        range_data: Vec<u8>,
81    },
82}
83
84impl DataRange {
85    /// Check if a key falls within this range
86    pub fn contains_key(&self, key: &str) -> bool {
87        match self {
88            DataRange::Hash { start, end } => {
89                let hash = self.hash_key(key);
90                hash >= *start && hash <= *end
91            }
92            DataRange::Key { start, end } => key >= start.as_str() && key <= end.as_str(),
93            DataRange::Numeric { start, end } => {
94                if let Ok(num) = key.parse::<f64>() {
95                    num >= *start && num <= *end
96                } else {
97                    false
98                }
99            }
100            DataRange::Time { start, end } => {
101                // Attempt to parse key as timestamp
102                if let Ok(timestamp_str) = key.parse::<u64>() {
103                    if let Some(timestamp) =
104                        SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp_str))
105                    {
106                        timestamp >= *start && timestamp <= *end
107                    } else {
108                        false
109                    }
110                } else {
111                    false
112                }
113            }
114            DataRange::Geographic { .. } => {
115                // Would need to parse geographic coordinates from key
116                // For now, return false
117                false
118            }
119            DataRange::Custom { .. } => {
120                // Custom logic would be implemented here
121                false
122            }
123        }
124    }
125
126    /// Hash a key using the specified hash function
127    fn hash_key(&self, key: &str) -> u64 {
128        use std::collections::hash_map::DefaultHasher;
129        let mut hasher = DefaultHasher::new();
130        key.hash(&mut hasher);
131        hasher.finish()
132    }
133
134    /// Check if this range overlaps with another
135    pub fn overlaps_with(&self, other: &DataRange) -> bool {
136        match (self, other) {
137            (DataRange::Hash { start: s1, end: e1 }, DataRange::Hash { start: s2, end: e2 }) => {
138                s1 <= e2 && s2 <= e1
139            }
140            (DataRange::Key { start: s1, end: e1 }, DataRange::Key { start: s2, end: e2 }) => {
141                s1 <= e2 && s2 <= e1
142            }
143            (
144                DataRange::Numeric { start: s1, end: e1 },
145                DataRange::Numeric { start: s2, end: e2 },
146            ) => s1 <= e2 && s2 <= e1,
147            (DataRange::Time { start: s1, end: e1 }, DataRange::Time { start: s2, end: e2 }) => {
148                s1 <= e2 && s2 <= e1
149            }
150            _ => false, // Different range types don't overlap
151        }
152    }
153
154    /// Split this range into two ranges
155    pub fn split(&self) -> Result<(DataRange, DataRange)> {
156        match self {
157            DataRange::Hash { start, end } => {
158                let mid = start + (end - start) / 2;
159                Ok((
160                    DataRange::Hash {
161                        start: *start,
162                        end: mid,
163                    },
164                    DataRange::Hash {
165                        start: mid + 1,
166                        end: *end,
167                    },
168                ))
169            }
170            DataRange::Key { start, end } => {
171                // Simple string-based split (could be improved)
172                let mid = format!("{}_{}", start, end);
173                Ok((
174                    DataRange::Key {
175                        start: start.clone(),
176                        end: mid.clone(),
177                    },
178                    DataRange::Key {
179                        start: mid,
180                        end: end.clone(),
181                    },
182                ))
183            }
184            DataRange::Numeric { start, end } => {
185                let mid = start + (end - start) / 2.0;
186                Ok((
187                    DataRange::Numeric {
188                        start: *start,
189                        end: mid,
190                    },
191                    DataRange::Numeric {
192                        start: mid,
193                        end: *end,
194                    },
195                ))
196            }
197            DataRange::Time { start, end } => {
198                let duration = end
199                    .duration_since(*start)
200                    .map_err(|_| MetricsError::ShardingError("Invalid time range".to_string()))?;
201                let mid_duration = duration / 2;
202                let mid = *start + mid_duration;
203                Ok((
204                    DataRange::Time {
205                        start: *start,
206                        end: mid,
207                    },
208                    DataRange::Time {
209                        start: mid,
210                        end: *end,
211                    },
212                ))
213            }
214            _ => Err(MetricsError::ShardingError(
215                "Cannot split this range type".to_string(),
216            )),
217        }
218    }
219}
220
221/// Shard migration information
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct ShardMigration {
224    /// Migration ID
225    pub id: String,
226    /// Source node
227    pub source_node: String,
228    /// Target node
229    pub target_node: String,
230    /// Migration progress (0.0 - 1.0)
231    pub progress: f64,
232    /// Started time
233    pub started_at: SystemTime,
234    /// Estimated completion time
235    pub estimated_completion: Option<SystemTime>,
236    /// Migration status
237    pub status: MigrationStatus,
238    /// Error message (if failed)
239    pub error: Option<String>,
240}
241
242/// Migration status
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
244pub enum MigrationStatus {
245    /// Migration is planned but not started
246    Planned,
247    /// Migration is in progress
248    InProgress,
249    /// Migration completed successfully
250    Completed,
251    /// Migration failed
252    Failed,
253    /// Migration was cancelled
254    Cancelled,
255}
256
257/// Shard manager for handling sharding operations
258#[derive(Debug)]
259pub struct ShardManager {
260    /// Sharding configuration
261    config: ShardingConfig,
262    /// Current shards
263    shards: Arc<RwLock<HashMap<String, DataShard>>>,
264    /// Node assignments
265    node_assignments: Arc<RwLock<HashMap<String, Vec<String>>>>,
266    /// Consistent hash ring (for consistent hashing)
267    hash_ring: Arc<RwLock<BTreeMap<u64, String>>>,
268    /// Active migrations
269    migrations: Arc<RwLock<HashMap<String, ShardMigration>>>,
270    /// Statistics
271    stats: ShardingStats,
272}
273
274impl ShardManager {
275    /// Create a new shard manager
276    pub fn new(config: ShardingConfig) -> Self {
277        Self {
278            config,
279            shards: Arc::new(RwLock::new(HashMap::new())),
280            node_assignments: Arc::new(RwLock::new(HashMap::new())),
281            hash_ring: Arc::new(RwLock::new(BTreeMap::new())),
282            migrations: Arc::new(RwLock::new(HashMap::new())),
283            stats: ShardingStats::default(),
284        }
285    }
286
287    /// Initialize sharding with available nodes
288    pub fn initialize(&mut self, nodes: Vec<String>) -> Result<()> {
289        match self.config.strategy {
290            ShardingStrategy::ConsistentHash => {
291                self.initialize_consistent_hash(nodes)?;
292            }
293            ShardingStrategy::Hash => {
294                self.initialize_hash_sharding(nodes)?;
295            }
296            ShardingStrategy::Range => {
297                self.initialize_range_sharding(nodes)?;
298            }
299            _ => {
300                return Err(MetricsError::ShardingError(
301                    "Sharding strategy not implemented".to_string(),
302                ));
303            }
304        }
305
306        Ok(())
307    }
308
309    /// Initialize consistent hash ring
310    fn initialize_consistent_hash(&mut self, nodes: Vec<String>) -> Result<()> {
311        let mut hash_ring = self.hash_ring.write().expect("Operation failed");
312        let mut shards = self.shards.write().expect("Operation failed");
313
314        hash_ring.clear();
315        shards.clear();
316
317        // Add virtual nodes to the hash ring
318        for node in &nodes {
319            for i in 0..self.config.virtual_nodes {
320                let virtual_node_key = format!("{}:{}", node, i);
321                let hash = self.hash_string(&virtual_node_key);
322                hash_ring.insert(hash, node.clone());
323            }
324        }
325
326        // Create shards based on hash ring
327        let mut prev_hash = 0u64;
328        let ring_keys: Vec<u64> = hash_ring.keys().cloned().collect();
329
330        for (i, &hash) in ring_keys.iter().enumerate() {
331            let shard_id = format!("shard_{}", i);
332            let node = hash_ring.get(&hash).expect("Operation failed").clone();
333
334            let shard = DataShard {
335                id: shard_id.clone(),
336                range: DataRange::Hash {
337                    start: prev_hash,
338                    end: hash,
339                },
340                primary_node: node.clone(),
341                replicas: self.select_replicas(&node, &nodes),
342                size_bytes: 0,
343                key_count: 0,
344                last_access: SystemTime::now(),
345                status: ShardStatus::Active,
346                migration: None,
347            };
348
349            shards.insert(shard_id, shard);
350            prev_hash = hash + 1;
351        }
352
353        Ok(())
354    }
355
356    /// Initialize hash-based sharding
357    fn initialize_hash_sharding(&mut self, nodes: Vec<String>) -> Result<()> {
358        let mut shards = self.shards.write().expect("Operation failed");
359        shards.clear();
360
361        let hash_range_size = u64::MAX / self.config.shard_count as u64;
362
363        for i in 0..self.config.shard_count {
364            let shard_id = format!("shard_{}", i);
365            let start_hash = i as u64 * hash_range_size;
366            let end_hash = if i == self.config.shard_count - 1 {
367                u64::MAX
368            } else {
369                (i + 1) as u64 * hash_range_size - 1
370            };
371
372            let node = &nodes[i % nodes.len()];
373
374            let shard = DataShard {
375                id: shard_id.clone(),
376                range: DataRange::Hash {
377                    start: start_hash,
378                    end: end_hash,
379                },
380                primary_node: node.clone(),
381                replicas: self.select_replicas(node, &nodes),
382                size_bytes: 0,
383                key_count: 0,
384                last_access: SystemTime::now(),
385                status: ShardStatus::Active,
386                migration: None,
387            };
388
389            shards.insert(shard_id, shard);
390        }
391
392        Ok(())
393    }
394
395    /// Initialize range-based sharding
396    fn initialize_range_sharding(&mut self, nodes: Vec<String>) -> Result<()> {
397        let mut shards = self.shards.write().expect("Operation failed");
398        shards.clear();
399
400        // For range sharding, we'll use key-based ranges
401        // This is a simplified implementation
402        for i in 0..self.config.shard_count {
403            let shard_id = format!("shard_{}", i);
404            let start_key = format!("{:04}", i * 1000);
405            let end_key = format!("{:04}", (i + 1) * 1000 - 1);
406
407            let node = &nodes[i % nodes.len()];
408
409            let shard = DataShard {
410                id: shard_id.clone(),
411                range: DataRange::Key {
412                    start: start_key,
413                    end: end_key,
414                },
415                primary_node: node.clone(),
416                replicas: self.select_replicas(node, &nodes),
417                size_bytes: 0,
418                key_count: 0,
419                last_access: SystemTime::now(),
420                status: ShardStatus::Active,
421                migration: None,
422            };
423
424            shards.insert(shard_id, shard);
425        }
426
427        Ok(())
428    }
429
430    /// Select replica nodes for a primary node
431    fn select_replicas(&self, primary: &str, all_nodes: &[String]) -> Vec<String> {
432        let mut replicas = Vec::new();
433        let mut count = 0;
434
435        for node in all_nodes {
436            if node != primary && count < self.config.replication_factor - 1 {
437                replicas.push(node.clone());
438                count += 1;
439            }
440        }
441
442        replicas
443    }
444
445    /// Find the shard for a given key
446    pub fn find_shard(&self, key: &str) -> Result<String> {
447        let shards = self.shards.read().expect("Operation failed");
448
449        for shard in shards.values() {
450            if shard.range.contains_key(key) {
451                return Ok(shard.id.clone());
452            }
453        }
454
455        Err(MetricsError::ShardingError(
456            "No shard found for key".to_string(),
457        ))
458    }
459
460    /// Get the node responsible for a key
461    pub fn get_node_for_key(&self, key: &str) -> Result<String> {
462        match self.config.strategy {
463            ShardingStrategy::ConsistentHash => self.get_node_consistent_hash(key),
464            _ => {
465                let shard_id = self.find_shard(key)?;
466                let shards = self.shards.read().expect("Operation failed");
467                if let Some(shard) = shards.get(&shard_id) {
468                    Ok(shard.primary_node.clone())
469                } else {
470                    Err(MetricsError::ShardingError("Shard not found".to_string()))
471                }
472            }
473        }
474    }
475
476    /// Get node using consistent hashing
477    fn get_node_consistent_hash(&self, key: &str) -> Result<String> {
478        let hash_ring = self.hash_ring.read().expect("Operation failed");
479        if hash_ring.is_empty() {
480            return Err(MetricsError::ShardingError(
481                "Hash ring is empty".to_string(),
482            ));
483        }
484
485        let key_hash = self.hash_string(key);
486
487        // Find the first node with hash >= key_hash
488        for (&node_hash, node) in hash_ring.range(key_hash..) {
489            if node_hash >= key_hash {
490                return Ok(node.clone());
491            }
492        }
493
494        // Wrap around to the first node
495        if let Some((_, node)) = hash_ring.iter().next() {
496            Ok(node.clone())
497        } else {
498            Err(MetricsError::ShardingError(
499                "No nodes in hash ring".to_string(),
500            ))
501        }
502    }
503
504    /// Hash a string using the configured hash function
505    fn hash_string(&self, s: &str) -> u64 {
506        match self.config.hash_function {
507            HashFunction::Murmur3 | HashFunction::XxHash => {
508                // Simplified hash using DefaultHasher
509                use std::collections::hash_map::DefaultHasher;
510                let mut hasher = DefaultHasher::new();
511                s.hash(&mut hasher);
512                hasher.finish()
513            }
514            HashFunction::Crc32 => {
515                // Simplified CRC32 implementation
516                let mut crc = 0xFFFFFFFFu32;
517                for byte in s.bytes() {
518                    crc ^= byte as u32;
519                    for _ in 0..8 {
520                        if crc & 1 != 0 {
521                            crc = (crc >> 1) ^ 0xEDB88320;
522                        } else {
523                            crc >>= 1;
524                        }
525                    }
526                }
527                (crc ^ 0xFFFFFFFF) as u64
528            }
529            _ => {
530                // Default to standard hasher
531                use std::collections::hash_map::DefaultHasher;
532                let mut hasher = DefaultHasher::new();
533                s.hash(&mut hasher);
534                hasher.finish()
535            }
536        }
537    }
538
539    /// Add a new node to the cluster
540    pub fn add_node(&mut self, node_id: String) -> Result<()> {
541        match self.config.strategy {
542            ShardingStrategy::ConsistentHash => self.add_node_consistent_hash(node_id),
543            _ => {
544                // For other strategies, we might need to rebalance shards
545                self.rebalance_shards_with_new_node(node_id)
546            }
547        }
548    }
549
550    /// Add node to consistent hash ring
551    fn add_node_consistent_hash(&mut self, node_id: String) -> Result<()> {
552        {
553            let mut hash_ring = self.hash_ring.write().expect("Operation failed");
554
555            // Add virtual nodes for the new node
556            for i in 0..self.config.virtual_nodes {
557                let virtual_node_key = format!("{}:{}", node_id, i);
558                let hash = self.hash_string(&virtual_node_key);
559                hash_ring.insert(hash, node_id.clone());
560            }
561        } // hash_ring lock is dropped here
562
563        // TODO: Trigger shard migration for rebalancing
564        self.trigger_rebalancing()?;
565
566        Ok(())
567    }
568
569    /// Remove a node from the cluster
570    pub fn remove_node(&mut self, node_id: &str) -> Result<()> {
571        match self.config.strategy {
572            ShardingStrategy::ConsistentHash => self.remove_node_consistent_hash(node_id),
573            _ => self.migrate_shards_from_node(node_id),
574        }
575    }
576
577    /// Remove node from consistent hash ring
578    fn remove_node_consistent_hash(&mut self, node_id: &str) -> Result<()> {
579        {
580            let mut hash_ring = self.hash_ring.write().expect("Operation failed");
581
582            // Remove all virtual nodes for this node
583            hash_ring.retain(|_, node| node != node_id);
584        } // hash_ring lock is dropped here
585
586        // TODO: Trigger shard migration for affected shards
587        self.migrate_shards_from_node(node_id)?;
588
589        Ok(())
590    }
591
592    /// Rebalance shards with a new node
593    fn rebalance_shards_with_new_node(&mut self, _node_id: String) -> Result<()> {
594        // TODO: Implement shard rebalancing logic
595        self.trigger_rebalancing()
596    }
597
598    /// Migrate shards away from a node being removed
599    fn migrate_shards_from_node(&mut self, node_id: &str) -> Result<()> {
600        let shards = self.shards.read().expect("Operation failed");
601        let affected_shards: Vec<_> = shards
602            .values()
603            .filter(|shard| shard.primary_node == node_id)
604            .map(|shard| shard.id.clone())
605            .collect();
606        drop(shards);
607
608        for shard_id in affected_shards {
609            self.migrate_shard(&shard_id, None)?;
610        }
611
612        Ok(())
613    }
614
615    /// Trigger cluster rebalancing
616    fn trigger_rebalancing(&mut self) -> Result<()> {
617        // TODO: Implement rebalancing logic
618        // This would analyze current shard distribution and trigger migrations
619        // to achieve better balance
620        Ok(())
621    }
622
623    /// Migrate a shard to a different node
624    pub fn migrate_shard(&mut self, shard_id: &str, target_node: Option<String>) -> Result<String> {
625        let migration_id = {
626            let mut shards = self.shards.write().expect("Operation failed");
627            let mut migrations = self.migrations.write().expect("Operation failed");
628
629            let shard = shards
630                .get_mut(shard_id)
631                .ok_or_else(|| MetricsError::ShardingError("Shard not found".to_string()))?;
632
633            if shard.status == ShardStatus::Migrating {
634                return Err(MetricsError::ShardingError(
635                    "Shard is already being migrated".to_string(),
636                ));
637            }
638
639            // Select target node if not provided
640            let target = target_node.unwrap_or_else(|| {
641                // Simple selection: pick the first replica or a default node
642                shard
643                    .replicas
644                    .first()
645                    .cloned()
646                    .unwrap_or_else(|| "default_node".to_string())
647            });
648
649            let migration_id = format!(
650                "migration_{}_{}",
651                shard_id,
652                SystemTime::now()
653                    .duration_since(std::time::UNIX_EPOCH)
654                    .expect("Operation failed")
655                    .as_millis()
656            );
657
658            let migration = ShardMigration {
659                id: migration_id.clone(),
660                source_node: shard.primary_node.clone(),
661                target_node: target.clone(),
662                progress: 0.0,
663                started_at: SystemTime::now(),
664                estimated_completion: None,
665                status: MigrationStatus::Planned,
666                error: None,
667            };
668
669            shard.status = ShardStatus::Migrating;
670            shard.migration = Some(migration.clone());
671            migrations.insert(migration_id.clone(), migration);
672
673            migration_id.clone()
674        }; // Drop locks here before calling start_migration
675
676        // TODO: Start actual migration process
677        self.start_migration(&migration_id)?;
678
679        Ok(migration_id)
680    }
681
682    /// Start a migration process
683    fn start_migration(&mut self, migration_id: &str) -> Result<()> {
684        let mut migrations = self.migrations.write().expect("Operation failed");
685
686        if let Some(migration) = migrations.get_mut(migration_id) {
687            migration.status = MigrationStatus::InProgress;
688            // TODO: Implement actual migration logic
689            // This would involve copying data from source to target
690        }
691
692        Ok(())
693    }
694
695    /// Complete a migration
696    pub fn complete_migration(&mut self, migration_id: &str) -> Result<()> {
697        let mut migrations = self.migrations.write().expect("Operation failed");
698        let mut shards = self.shards.write().expect("Operation failed");
699
700        let migration = migrations
701            .get_mut(migration_id)
702            .ok_or_else(|| MetricsError::ShardingError("Migration not found".to_string()))?;
703
704        migration.status = MigrationStatus::Completed;
705        migration.progress = 1.0;
706
707        // Find and update the shard
708        for shard in shards.values_mut() {
709            if let Some(ref shard_migration) = shard.migration {
710                if shard_migration.id == migration_id {
711                    shard.primary_node = migration.target_node.clone();
712                    shard.status = ShardStatus::Active;
713                    shard.migration = None;
714                    break;
715                }
716            }
717        }
718
719        Ok(())
720    }
721
722    /// Get sharding statistics
723    pub fn get_stats(&self) -> ShardingStats {
724        let shards = self.shards.read().expect("Operation failed");
725        let migrations = self.migrations.read().expect("Operation failed");
726
727        let total_shards = shards.len();
728        let active_migrations = migrations
729            .values()
730            .filter(|m| m.status == MigrationStatus::InProgress)
731            .count();
732
733        let total_size: u64 = shards.values().map(|s| s.size_bytes).sum();
734        let total_keys: usize = shards.values().map(|s| s.key_count).sum();
735
736        ShardingStats {
737            total_shards,
738            active_migrations,
739            total_size_bytes: total_size,
740            total_keys,
741            replication_factor: self.config.replication_factor,
742            last_rebalance: SystemTime::now(), // Simplified
743        }
744    }
745
746    /// List all shards
747    pub fn list_shards(&self) -> Vec<DataShard> {
748        let shards = self.shards.read().expect("Operation failed");
749        shards.values().cloned().collect()
750    }
751
752    /// Get shard by ID
753    pub fn get_shard(&self, shard_id: &str) -> Option<DataShard> {
754        let shards = self.shards.read().expect("Operation failed");
755        shards.get(shard_id).cloned()
756    }
757
758    /// Update shard statistics
759    pub fn update_shard_stats(
760        &mut self,
761        shard_id: &str,
762        size_bytes: u64,
763        key_count: usize,
764    ) -> Result<()> {
765        let mut shards = self.shards.write().expect("Operation failed");
766
767        if let Some(shard) = shards.get_mut(shard_id) {
768            shard.size_bytes = size_bytes;
769            shard.key_count = key_count;
770            shard.last_access = SystemTime::now();
771            Ok(())
772        } else {
773            Err(MetricsError::ShardingError("Shard not found".to_string()))
774        }
775    }
776}
777
778/// Sharding statistics
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct ShardingStats {
781    /// Total number of shards
782    pub total_shards: usize,
783    /// Number of active migrations
784    pub active_migrations: usize,
785    /// Total data size across all shards
786    pub total_size_bytes: u64,
787    /// Total number of keys
788    pub total_keys: usize,
789    /// Replication factor
790    pub replication_factor: usize,
791    /// Last rebalance time
792    pub last_rebalance: SystemTime,
793}
794
795impl Default for ShardingStats {
796    fn default() -> Self {
797        Self {
798            total_shards: 0,
799            active_migrations: 0,
800            total_size_bytes: 0,
801            total_keys: 0,
802            replication_factor: 1,
803            last_rebalance: SystemTime::now(),
804        }
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811
812    #[test]
813    fn test_data_range_contains_key() {
814        let hash_range = DataRange::Hash {
815            start: 1000,
816            end: 2000,
817        };
818        // This test is dependent on the hash function, so we'll test basic functionality
819        assert!(hash_range.contains_key("test") || !hash_range.contains_key("test"));
820
821        let key_range = DataRange::Key {
822            start: "a".to_string(),
823            end: "z".to_string(),
824        };
825        assert!(key_range.contains_key("m"));
826        assert!(!key_range.contains_key("z1"));
827
828        let numeric_range = DataRange::Numeric {
829            start: 10.0,
830            end: 20.0,
831        };
832        assert!(numeric_range.contains_key("15"));
833        assert!(!numeric_range.contains_key("25"));
834    }
835
836    #[test]
837    fn test_data_range_split() {
838        let hash_range = DataRange::Hash {
839            start: 1000,
840            end: 2000,
841        };
842        let (left, right) = hash_range.split().expect("Operation failed");
843
844        if let (DataRange::Hash { start: s1, end: e1 }, DataRange::Hash { start: s2, end: e2 }) =
845            (left, right)
846        {
847            assert_eq!(s1, 1000);
848            assert_eq!(e2, 2000);
849            assert_eq!(e1 + 1, s2);
850        } else {
851            panic!("Unexpected range types after split");
852        }
853    }
854
855    #[test]
856    fn test_shard_manager_creation() {
857        let config = ShardingConfig::default();
858        let manager = ShardManager::new(config);
859        assert_eq!(manager.list_shards().len(), 0);
860    }
861
862    #[test]
863    fn test_shard_manager_initialization() {
864        let config = ShardingConfig {
865            strategy: ShardingStrategy::Hash,
866            shard_count: 4,
867            replication_factor: 2,
868            hash_function: HashFunction::Murmur3,
869            virtual_nodes: 256,
870            dynamic_resharding: true,
871            migration_threshold: 0.8,
872        };
873
874        let mut manager = ShardManager::new(config);
875        let nodes = vec![
876            "node1".to_string(),
877            "node2".to_string(),
878            "node3".to_string(),
879        ];
880
881        manager.initialize(nodes).expect("Operation failed");
882        assert_eq!(manager.list_shards().len(), 4);
883    }
884
885    #[test]
886    fn test_find_shard() {
887        let config = ShardingConfig {
888            strategy: ShardingStrategy::Hash,
889            shard_count: 2,
890            replication_factor: 1,
891            hash_function: HashFunction::Murmur3,
892            virtual_nodes: 256,
893            dynamic_resharding: true,
894            migration_threshold: 0.8,
895        };
896
897        let mut manager = ShardManager::new(config);
898        let nodes = vec!["node1".to_string(), "node2".to_string()];
899
900        manager.initialize(nodes).expect("Operation failed");
901
902        // Test that we can find a shard for any key
903        let shard_id = manager.find_shard("test_key");
904        assert!(shard_id.is_ok());
905    }
906
907    #[test]
908    fn test_shard_migration() {
909        let config = ShardingConfig::default();
910        let mut manager = ShardManager::new(config);
911        let nodes = vec!["node1".to_string(), "node2".to_string()];
912
913        manager.initialize(nodes).expect("Operation failed");
914        let shards = manager.list_shards();
915
916        if let Some(shard) = shards.first() {
917            let migration_id = manager.migrate_shard(&shard.id, Some("node2".to_string()));
918            assert!(migration_id.is_ok());
919        }
920    }
921
922    #[test]
923    fn test_consistent_hash_node_operations() {
924        let config = ShardingConfig {
925            strategy: ShardingStrategy::ConsistentHash,
926            shard_count: 4,
927            replication_factor: 2,
928            hash_function: HashFunction::Murmur3,
929            virtual_nodes: 4, // Small number for testing
930            dynamic_resharding: true,
931            migration_threshold: 0.8,
932        };
933
934        let mut manager = ShardManager::new(config);
935        let nodes = vec!["node1".to_string(), "node2".to_string()];
936
937        manager.initialize(nodes).expect("Operation failed");
938
939        // Test adding a node
940        manager
941            .add_node("node3".to_string())
942            .expect("Operation failed");
943
944        // Test removing a node
945        manager.remove_node("node1").expect("Operation failed");
946    }
947}