1use crate::error::{MetricsError, Result};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet};
12use std::hash::{Hash, Hasher};
13use std::net::SocketAddr;
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant, SystemTime};
16
17pub use crate::optimization::distributed::config::{
18 HashFunction, ShardingConfig, ShardingStrategy,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct DataShard {
24 pub id: String,
26 pub range: DataRange,
28 pub primary_node: String,
30 pub replicas: Vec<String>,
32 pub size_bytes: u64,
34 pub key_count: usize,
36 pub last_access: SystemTime,
38 pub status: ShardStatus,
40 pub migration: Option<ShardMigration>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
46pub enum ShardStatus {
47 Active,
49 Migrating,
51 Splitting,
53 Merging,
55 Inactive,
57 Error(String),
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub enum DataRange {
64 Hash { start: u64, end: u64 },
66 Key { start: String, end: String },
68 Numeric { start: f64, end: f64 },
70 Time { start: SystemTime, end: SystemTime },
72 Geographic {
74 lat_min: f64,
75 lat_max: f64,
76 lon_min: f64,
77 lon_max: f64,
78 },
79 Custom {
81 range_type: String,
82 range_data: Vec<u8>,
83 },
84}
85
86impl DataRange {
87 pub fn contains_key(&self, key: &str) -> bool {
89 match self {
90 DataRange::Hash { start, end } => {
91 let hash = self.hash_key(key);
92 hash >= *start && hash <= *end
93 }
94 DataRange::Key { start, end } => key >= start.as_str() && key <= end.as_str(),
95 DataRange::Numeric { start, end } => {
96 if let Ok(num) = key.parse::<f64>() {
97 num >= *start && num <= *end
98 } else {
99 false
100 }
101 }
102 DataRange::Time { start, end } => {
103 if let Ok(timestamp_str) = key.parse::<u64>() {
105 if let Some(timestamp) =
106 SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp_str))
107 {
108 timestamp >= *start && timestamp <= *end
109 } else {
110 false
111 }
112 } else {
113 false
114 }
115 }
116 DataRange::Geographic { .. } => {
117 false
120 }
121 DataRange::Custom { .. } => {
122 false
124 }
125 }
126 }
127
128 fn hash_key(&self, key: &str) -> u64 {
130 use std::collections::hash_map::DefaultHasher;
131 let mut hasher = DefaultHasher::new();
132 key.hash(&mut hasher);
133 hasher.finish()
134 }
135
136 pub fn overlaps_with(&self, other: &DataRange) -> bool {
138 match (self, other) {
139 (DataRange::Hash { start: s1, end: e1 }, DataRange::Hash { start: s2, end: e2 }) => {
140 s1 <= e2 && s2 <= e1
141 }
142 (DataRange::Key { start: s1, end: e1 }, DataRange::Key { start: s2, end: e2 }) => {
143 s1 <= e2 && s2 <= e1
144 }
145 (
146 DataRange::Numeric { start: s1, end: e1 },
147 DataRange::Numeric { start: s2, end: e2 },
148 ) => s1 <= e2 && s2 <= e1,
149 (DataRange::Time { start: s1, end: e1 }, DataRange::Time { start: s2, end: e2 }) => {
150 s1 <= e2 && s2 <= e1
151 }
152 _ => false, }
154 }
155
156 pub fn split(&self) -> Result<(DataRange, DataRange)> {
158 match self {
159 DataRange::Hash { start, end } => {
160 let mid = start + (end - start) / 2;
161 Ok((
162 DataRange::Hash {
163 start: *start,
164 end: mid,
165 },
166 DataRange::Hash {
167 start: mid + 1,
168 end: *end,
169 },
170 ))
171 }
172 DataRange::Key { start, end } => {
173 let mid = format!("{}_{}", start, end);
175 Ok((
176 DataRange::Key {
177 start: start.clone(),
178 end: mid.clone(),
179 },
180 DataRange::Key {
181 start: mid,
182 end: end.clone(),
183 },
184 ))
185 }
186 DataRange::Numeric { start, end } => {
187 let mid = start + (end - start) / 2.0;
188 Ok((
189 DataRange::Numeric {
190 start: *start,
191 end: mid,
192 },
193 DataRange::Numeric {
194 start: mid,
195 end: *end,
196 },
197 ))
198 }
199 DataRange::Time { start, end } => {
200 let duration = end
201 .duration_since(*start)
202 .map_err(|_| MetricsError::ShardingError("Invalid time range".to_string()))?;
203 let mid_duration = duration / 2;
204 let mid = *start + mid_duration;
205 Ok((
206 DataRange::Time {
207 start: *start,
208 end: mid,
209 },
210 DataRange::Time {
211 start: mid,
212 end: *end,
213 },
214 ))
215 }
216 _ => Err(MetricsError::ShardingError(
217 "Cannot split this range type".to_string(),
218 )),
219 }
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct ShardMigration {
226 pub id: String,
228 pub source_node: String,
230 pub target_node: String,
232 pub progress: f64,
234 pub started_at: SystemTime,
236 pub estimated_completion: Option<SystemTime>,
238 pub status: MigrationStatus,
240 pub error: Option<String>,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
246pub enum MigrationStatus {
247 Planned,
249 InProgress,
251 Completed,
253 Failed,
255 Cancelled,
257}
258
259#[derive(Debug)]
261pub struct ShardManager {
262 config: ShardingConfig,
264 shards: Arc<RwLock<HashMap<String, DataShard>>>,
266 node_assignments: Arc<RwLock<HashMap<String, Vec<String>>>>,
268 hash_ring: Arc<RwLock<BTreeMap<u64, String>>>,
270 migrations: Arc<RwLock<HashMap<String, ShardMigration>>>,
272 stats: ShardingStats,
274}
275
276impl ShardManager {
277 pub fn new(config: ShardingConfig) -> Self {
279 Self {
280 config,
281 shards: Arc::new(RwLock::new(HashMap::new())),
282 node_assignments: Arc::new(RwLock::new(HashMap::new())),
283 hash_ring: Arc::new(RwLock::new(BTreeMap::new())),
284 migrations: Arc::new(RwLock::new(HashMap::new())),
285 stats: ShardingStats::default(),
286 }
287 }
288
289 pub fn initialize(&mut self, nodes: Vec<String>) -> Result<()> {
291 match self.config.strategy {
292 ShardingStrategy::ConsistentHash => {
293 self.initialize_consistent_hash(nodes)?;
294 }
295 ShardingStrategy::Hash => {
296 self.initialize_hash_sharding(nodes)?;
297 }
298 ShardingStrategy::Range => {
299 self.initialize_range_sharding(nodes)?;
300 }
301 _ => {
302 return Err(MetricsError::ShardingError(
303 "Sharding strategy not implemented".to_string(),
304 ));
305 }
306 }
307
308 Ok(())
309 }
310
311 fn initialize_consistent_hash(&mut self, nodes: Vec<String>) -> Result<()> {
313 let mut hash_ring = self.hash_ring.write().unwrap();
314 let mut shards = self.shards.write().unwrap();
315
316 hash_ring.clear();
317 shards.clear();
318
319 for node in &nodes {
321 for i in 0..self.config.virtual_nodes {
322 let virtual_node_key = format!("{}:{}", node, i);
323 let hash = self.hash_string(&virtual_node_key);
324 hash_ring.insert(hash, node.clone());
325 }
326 }
327
328 let mut prev_hash = 0u64;
330 let ring_keys: Vec<u64> = hash_ring.keys().cloned().collect();
331
332 for (i, &hash) in ring_keys.iter().enumerate() {
333 let shard_id = format!("shard_{}", i);
334 let node = hash_ring.get(&hash).unwrap().clone();
335
336 let shard = DataShard {
337 id: shard_id.clone(),
338 range: DataRange::Hash {
339 start: prev_hash,
340 end: hash,
341 },
342 primary_node: node.clone(),
343 replicas: self.select_replicas(&node, &nodes),
344 size_bytes: 0,
345 key_count: 0,
346 last_access: SystemTime::now(),
347 status: ShardStatus::Active,
348 migration: None,
349 };
350
351 shards.insert(shard_id, shard);
352 prev_hash = hash + 1;
353 }
354
355 Ok(())
356 }
357
358 fn initialize_hash_sharding(&mut self, nodes: Vec<String>) -> Result<()> {
360 let mut shards = self.shards.write().unwrap();
361 shards.clear();
362
363 let hash_range_size = u64::MAX / self.config.shard_count as u64;
364
365 for i in 0..self.config.shard_count {
366 let shard_id = format!("shard_{}", i);
367 let start_hash = i as u64 * hash_range_size;
368 let end_hash = if i == self.config.shard_count - 1 {
369 u64::MAX
370 } else {
371 (i + 1) as u64 * hash_range_size - 1
372 };
373
374 let node = &nodes[i % nodes.len()];
375
376 let shard = DataShard {
377 id: shard_id.clone(),
378 range: DataRange::Hash {
379 start: start_hash,
380 end: end_hash,
381 },
382 primary_node: node.clone(),
383 replicas: self.select_replicas(node, &nodes),
384 size_bytes: 0,
385 key_count: 0,
386 last_access: SystemTime::now(),
387 status: ShardStatus::Active,
388 migration: None,
389 };
390
391 shards.insert(shard_id, shard);
392 }
393
394 Ok(())
395 }
396
397 fn initialize_range_sharding(&mut self, nodes: Vec<String>) -> Result<()> {
399 let mut shards = self.shards.write().unwrap();
400 shards.clear();
401
402 for i in 0..self.config.shard_count {
405 let shard_id = format!("shard_{}", i);
406 let start_key = format!("{:04}", i * 1000);
407 let end_key = format!("{:04}", (i + 1) * 1000 - 1);
408
409 let node = &nodes[i % nodes.len()];
410
411 let shard = DataShard {
412 id: shard_id.clone(),
413 range: DataRange::Key {
414 start: start_key,
415 end: end_key,
416 },
417 primary_node: node.clone(),
418 replicas: self.select_replicas(node, &nodes),
419 size_bytes: 0,
420 key_count: 0,
421 last_access: SystemTime::now(),
422 status: ShardStatus::Active,
423 migration: None,
424 };
425
426 shards.insert(shard_id, shard);
427 }
428
429 Ok(())
430 }
431
432 fn select_replicas(&self, primary: &str, all_nodes: &[String]) -> Vec<String> {
434 let mut replicas = Vec::new();
435 let mut count = 0;
436
437 for node in all_nodes {
438 if node != primary && count < self.config.replication_factor - 1 {
439 replicas.push(node.clone());
440 count += 1;
441 }
442 }
443
444 replicas
445 }
446
447 pub fn find_shard(&self, key: &str) -> Result<String> {
449 let shards = self.shards.read().unwrap();
450
451 for shard in shards.values() {
452 if shard.range.contains_key(key) {
453 return Ok(shard.id.clone());
454 }
455 }
456
457 Err(MetricsError::ShardingError(
458 "No shard found for key".to_string(),
459 ))
460 }
461
462 pub fn get_node_for_key(&self, key: &str) -> Result<String> {
464 match self.config.strategy {
465 ShardingStrategy::ConsistentHash => self.get_node_consistent_hash(key),
466 _ => {
467 let shard_id = self.find_shard(key)?;
468 let shards = self.shards.read().unwrap();
469 if let Some(shard) = shards.get(&shard_id) {
470 Ok(shard.primary_node.clone())
471 } else {
472 Err(MetricsError::ShardingError("Shard not found".to_string()))
473 }
474 }
475 }
476 }
477
478 fn get_node_consistent_hash(&self, key: &str) -> Result<String> {
480 let hash_ring = self.hash_ring.read().unwrap();
481 if hash_ring.is_empty() {
482 return Err(MetricsError::ShardingError(
483 "Hash ring is empty".to_string(),
484 ));
485 }
486
487 let key_hash = self.hash_string(key);
488
489 for (&node_hash, node) in hash_ring.range(key_hash..) {
491 if node_hash >= key_hash {
492 return Ok(node.clone());
493 }
494 }
495
496 if let Some((_, node)) = hash_ring.iter().next() {
498 Ok(node.clone())
499 } else {
500 Err(MetricsError::ShardingError(
501 "No nodes in hash ring".to_string(),
502 ))
503 }
504 }
505
506 fn hash_string(&self, s: &str) -> u64 {
508 match self.config.hash_function {
509 HashFunction::Murmur3 | HashFunction::XxHash => {
510 use std::collections::hash_map::DefaultHasher;
512 let mut hasher = DefaultHasher::new();
513 s.hash(&mut hasher);
514 hasher.finish()
515 }
516 HashFunction::Crc32 => {
517 let mut crc = 0xFFFFFFFFu32;
519 for byte in s.bytes() {
520 crc ^= byte as u32;
521 for _ in 0..8 {
522 if crc & 1 != 0 {
523 crc = (crc >> 1) ^ 0xEDB88320;
524 } else {
525 crc >>= 1;
526 }
527 }
528 }
529 (crc ^ 0xFFFFFFFF) as u64
530 }
531 _ => {
532 use std::collections::hash_map::DefaultHasher;
534 let mut hasher = DefaultHasher::new();
535 s.hash(&mut hasher);
536 hasher.finish()
537 }
538 }
539 }
540
541 pub fn add_node(&mut self, node_id: String) -> Result<()> {
543 match self.config.strategy {
544 ShardingStrategy::ConsistentHash => self.add_node_consistent_hash(node_id),
545 _ => {
546 self.rebalance_shards_with_new_node(node_id)
548 }
549 }
550 }
551
552 fn add_node_consistent_hash(&mut self, node_id: String) -> Result<()> {
554 {
555 let mut hash_ring = self.hash_ring.write().unwrap();
556
557 for i in 0..self.config.virtual_nodes {
559 let virtual_node_key = format!("{}:{}", node_id, i);
560 let hash = self.hash_string(&virtual_node_key);
561 hash_ring.insert(hash, node_id.clone());
562 }
563 } self.trigger_rebalancing()?;
567
568 Ok(())
569 }
570
571 pub fn remove_node(&mut self, node_id: &str) -> Result<()> {
573 match self.config.strategy {
574 ShardingStrategy::ConsistentHash => self.remove_node_consistent_hash(node_id),
575 _ => self.migrate_shards_from_node(node_id),
576 }
577 }
578
579 fn remove_node_consistent_hash(&mut self, node_id: &str) -> Result<()> {
581 {
582 let mut hash_ring = self.hash_ring.write().unwrap();
583
584 hash_ring.retain(|_, node| node != node_id);
586 } self.migrate_shards_from_node(node_id)?;
590
591 Ok(())
592 }
593
594 fn rebalance_shards_with_new_node(&mut self, _node_id: String) -> Result<()> {
596 self.trigger_rebalancing()
598 }
599
600 fn migrate_shards_from_node(&mut self, node_id: &str) -> Result<()> {
602 let shards = self.shards.read().unwrap();
603 let affected_shards: Vec<_> = shards
604 .values()
605 .filter(|shard| shard.primary_node == node_id)
606 .map(|shard| shard.id.clone())
607 .collect();
608 drop(shards);
609
610 for shard_id in affected_shards {
611 self.migrate_shard(&shard_id, None)?;
612 }
613
614 Ok(())
615 }
616
617 fn trigger_rebalancing(&mut self) -> Result<()> {
619 Ok(())
623 }
624
625 pub fn migrate_shard(&mut self, shard_id: &str, target_node: Option<String>) -> Result<String> {
627 let migration_id = {
628 let mut shards = self.shards.write().unwrap();
629 let mut migrations = self.migrations.write().unwrap();
630
631 let shard = shards
632 .get_mut(shard_id)
633 .ok_or_else(|| MetricsError::ShardingError("Shard not found".to_string()))?;
634
635 if shard.status == ShardStatus::Migrating {
636 return Err(MetricsError::ShardingError(
637 "Shard is already being migrated".to_string(),
638 ));
639 }
640
641 let target = target_node.unwrap_or_else(|| {
643 shard
645 .replicas
646 .first()
647 .cloned()
648 .unwrap_or_else(|| "default_node".to_string())
649 });
650
651 let migration_id = format!(
652 "migration_{}_{}",
653 shard_id,
654 SystemTime::now()
655 .duration_since(std::time::UNIX_EPOCH)
656 .unwrap()
657 .as_millis()
658 );
659
660 let migration = ShardMigration {
661 id: migration_id.clone(),
662 source_node: shard.primary_node.clone(),
663 target_node: target.clone(),
664 progress: 0.0,
665 started_at: SystemTime::now(),
666 estimated_completion: None,
667 status: MigrationStatus::Planned,
668 error: None,
669 };
670
671 shard.status = ShardStatus::Migrating;
672 shard.migration = Some(migration.clone());
673 migrations.insert(migration_id.clone(), migration);
674
675 migration_id.clone()
676 }; self.start_migration(&migration_id)?;
680
681 Ok(migration_id)
682 }
683
684 fn start_migration(&mut self, migration_id: &str) -> Result<()> {
686 let mut migrations = self.migrations.write().unwrap();
687
688 if let Some(migration) = migrations.get_mut(migration_id) {
689 migration.status = MigrationStatus::InProgress;
690 }
693
694 Ok(())
695 }
696
697 pub fn complete_migration(&mut self, migration_id: &str) -> Result<()> {
699 let mut migrations = self.migrations.write().unwrap();
700 let mut shards = self.shards.write().unwrap();
701
702 let migration = migrations
703 .get_mut(migration_id)
704 .ok_or_else(|| MetricsError::ShardingError("Migration not found".to_string()))?;
705
706 migration.status = MigrationStatus::Completed;
707 migration.progress = 1.0;
708
709 for shard in shards.values_mut() {
711 if let Some(ref shard_migration) = shard.migration {
712 if shard_migration.id == migration_id {
713 shard.primary_node = migration.target_node.clone();
714 shard.status = ShardStatus::Active;
715 shard.migration = None;
716 break;
717 }
718 }
719 }
720
721 Ok(())
722 }
723
724 pub fn get_stats(&self) -> ShardingStats {
726 let shards = self.shards.read().unwrap();
727 let migrations = self.migrations.read().unwrap();
728
729 let total_shards = shards.len();
730 let active_migrations = migrations
731 .values()
732 .filter(|m| m.status == MigrationStatus::InProgress)
733 .count();
734
735 let total_size: u64 = shards.values().map(|s| s.size_bytes).sum();
736 let total_keys: usize = shards.values().map(|s| s.key_count).sum();
737
738 ShardingStats {
739 total_shards,
740 active_migrations,
741 total_size_bytes: total_size,
742 total_keys,
743 replication_factor: self.config.replication_factor,
744 last_rebalance: SystemTime::now(), }
746 }
747
748 pub fn list_shards(&self) -> Vec<DataShard> {
750 let shards = self.shards.read().unwrap();
751 shards.values().cloned().collect()
752 }
753
754 pub fn get_shard(&self, shard_id: &str) -> Option<DataShard> {
756 let shards = self.shards.read().unwrap();
757 shards.get(shard_id).cloned()
758 }
759
760 pub fn update_shard_stats(
762 &mut self,
763 shard_id: &str,
764 size_bytes: u64,
765 key_count: usize,
766 ) -> Result<()> {
767 let mut shards = self.shards.write().unwrap();
768
769 if let Some(shard) = shards.get_mut(shard_id) {
770 shard.size_bytes = size_bytes;
771 shard.key_count = key_count;
772 shard.last_access = SystemTime::now();
773 Ok(())
774 } else {
775 Err(MetricsError::ShardingError("Shard not found".to_string()))
776 }
777 }
778}
779
780#[derive(Debug, Clone, Serialize, Deserialize)]
782pub struct ShardingStats {
783 pub total_shards: usize,
785 pub active_migrations: usize,
787 pub total_size_bytes: u64,
789 pub total_keys: usize,
791 pub replication_factor: usize,
793 pub last_rebalance: SystemTime,
795}
796
797impl Default for ShardingStats {
798 fn default() -> Self {
799 Self {
800 total_shards: 0,
801 active_migrations: 0,
802 total_size_bytes: 0,
803 total_keys: 0,
804 replication_factor: 1,
805 last_rebalance: SystemTime::now(),
806 }
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813
814 #[test]
815 fn test_data_range_contains_key() {
816 let hash_range = DataRange::Hash {
817 start: 1000,
818 end: 2000,
819 };
820 assert!(hash_range.contains_key("test") || !hash_range.contains_key("test"));
822
823 let key_range = DataRange::Key {
824 start: "a".to_string(),
825 end: "z".to_string(),
826 };
827 assert!(key_range.contains_key("m"));
828 assert!(!key_range.contains_key("z1"));
829
830 let numeric_range = DataRange::Numeric {
831 start: 10.0,
832 end: 20.0,
833 };
834 assert!(numeric_range.contains_key("15"));
835 assert!(!numeric_range.contains_key("25"));
836 }
837
838 #[test]
839 fn test_data_range_split() {
840 let hash_range = DataRange::Hash {
841 start: 1000,
842 end: 2000,
843 };
844 let (left, right) = hash_range.split().unwrap();
845
846 if let (DataRange::Hash { start: s1, end: e1 }, DataRange::Hash { start: s2, end: e2 }) =
847 (left, right)
848 {
849 assert_eq!(s1, 1000);
850 assert_eq!(e2, 2000);
851 assert_eq!(e1 + 1, s2);
852 } else {
853 panic!("Unexpected range types after split");
854 }
855 }
856
857 #[test]
858 fn test_shard_manager_creation() {
859 let config = ShardingConfig::default();
860 let manager = ShardManager::new(config);
861 assert_eq!(manager.list_shards().len(), 0);
862 }
863
864 #[test]
865 fn test_shard_manager_initialization() {
866 let config = ShardingConfig {
867 strategy: ShardingStrategy::Hash,
868 shard_count: 4,
869 replication_factor: 2,
870 hash_function: HashFunction::Murmur3,
871 virtual_nodes: 256,
872 dynamic_resharding: true,
873 migration_threshold: 0.8,
874 };
875
876 let mut manager = ShardManager::new(config);
877 let nodes = vec![
878 "node1".to_string(),
879 "node2".to_string(),
880 "node3".to_string(),
881 ];
882
883 manager.initialize(nodes).unwrap();
884 assert_eq!(manager.list_shards().len(), 4);
885 }
886
887 #[test]
888 fn test_find_shard() {
889 let config = ShardingConfig {
890 strategy: ShardingStrategy::Hash,
891 shard_count: 2,
892 replication_factor: 1,
893 hash_function: HashFunction::Murmur3,
894 virtual_nodes: 256,
895 dynamic_resharding: true,
896 migration_threshold: 0.8,
897 };
898
899 let mut manager = ShardManager::new(config);
900 let nodes = vec!["node1".to_string(), "node2".to_string()];
901
902 manager.initialize(nodes).unwrap();
903
904 let shard_id = manager.find_shard("test_key");
906 assert!(shard_id.is_ok());
907 }
908
909 #[test]
910 fn test_shard_migration() {
911 let config = ShardingConfig::default();
912 let mut manager = ShardManager::new(config);
913 let nodes = vec!["node1".to_string(), "node2".to_string()];
914
915 manager.initialize(nodes).unwrap();
916 let shards = manager.list_shards();
917
918 if let Some(shard) = shards.first() {
919 let migration_id = manager.migrate_shard(&shard.id, Some("node2".to_string()));
920 assert!(migration_id.is_ok());
921 }
922 }
923
924 #[test]
925 fn test_consistent_hash_node_operations() {
926 let config = ShardingConfig {
927 strategy: ShardingStrategy::ConsistentHash,
928 shard_count: 4,
929 replication_factor: 2,
930 hash_function: HashFunction::Murmur3,
931 virtual_nodes: 4, dynamic_resharding: true,
933 migration_threshold: 0.8,
934 };
935
936 let mut manager = ShardManager::new(config);
937 let nodes = vec!["node1".to_string(), "node2".to_string()];
938
939 manager.initialize(nodes).unwrap();
940
941 manager.add_node("node3".to_string()).unwrap();
943
944 manager.remove_node("node1").unwrap();
946 }
947}