1use crate::error::{RaftError, RaftResult};
7use crate::types::NodeId;
8use amaters_core::Key;
9use std::collections::BTreeMap;
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12
13pub type ShardId = u64;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ShardState {
19 Active,
21 Splitting,
23 Merging,
25 Transferring,
27 Offline,
29}
30
31impl ShardState {
32 pub fn can_read(&self) -> bool {
34 matches!(
35 self,
36 ShardState::Active | ShardState::Splitting | ShardState::Transferring
37 )
38 }
39
40 pub fn can_write(&self) -> bool {
42 matches!(self, ShardState::Active)
43 }
44
45 pub fn as_str(&self) -> &'static str {
47 match self {
48 ShardState::Active => "Active",
49 ShardState::Splitting => "Splitting",
50 ShardState::Merging => "Merging",
51 ShardState::Transferring => "Transferring",
52 ShardState::Offline => "Offline",
53 }
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct KeyRange {
60 pub start: Key,
62 pub end: Key,
64}
65
66impl KeyRange {
67 pub fn new(start: Key, end: Key) -> RaftResult<Self> {
69 if start >= end {
70 return Err(RaftError::ConfigError {
71 message: format!("Invalid key range: start {:?} >= end {:?}", start, end),
72 });
73 }
74 Ok(Self { start, end })
75 }
76
77 pub fn contains(&self, key: &Key) -> bool {
79 key >= &self.start && key < &self.end
80 }
81
82 pub fn overlaps(&self, other: &KeyRange) -> bool {
84 self.start < other.end && other.start < self.end
85 }
86
87 pub fn midpoint(&self) -> Key {
95 let start_bytes = self.start.as_bytes();
96 let end_bytes = self.end.as_bytes();
97 let max_len = start_bytes.len().max(end_bytes.len());
98
99 let get_byte = |v: &[u8], i: usize| -> u16 { v.get(i).copied().unwrap_or(0) as u16 };
100
101 let mut sum: Vec<u16> = (0..max_len)
103 .map(|i| get_byte(start_bytes, i) + get_byte(end_bytes, i))
104 .collect();
105 let mut carry: u16 = 0;
106 for b in sum.iter_mut().rev() {
107 let v = *b + carry;
108 *b = v & 0xFF;
109 carry = v >> 8; }
111 let mut mid: Vec<u8> = Vec::with_capacity(max_len);
115 let mut half_carry = carry; for b in &sum {
117 let val = half_carry * 256 + b;
118 mid.push((val / 2) as u8);
119 half_carry = val % 2;
120 }
121 Key::from_slice(&mid)
129 }
130
131 pub fn full() -> Self {
133 Self {
134 start: Key::from_slice(&[0u8]),
135 end: Key::from_slice(&[0xFFu8; 32]),
136 }
137 }
138}
139
140#[derive(Debug, Clone)]
142pub struct ShardMetadata {
143 pub id: ShardId,
145 pub range: KeyRange,
147 pub state: ShardState,
149 pub node_id: NodeId,
151 pub replicas: Vec<NodeId>,
153 pub estimated_keys: u64,
155 pub estimated_size_bytes: u64,
157 pub last_updated: SystemTime,
159 pub created_at: SystemTime,
161 pub version: u64,
163}
164
165impl ShardMetadata {
166 pub fn new(id: ShardId, range: KeyRange, node_id: NodeId) -> Self {
168 let now = SystemTime::now();
169 Self {
170 id,
171 range,
172 state: ShardState::Active,
173 node_id,
174 replicas: Vec::new(),
175 estimated_keys: 0,
176 estimated_size_bytes: 0,
177 last_updated: now,
178 created_at: now,
179 version: 1,
180 }
181 }
182
183 pub fn set_state(&mut self, state: ShardState) {
185 self.state = state;
186 self.last_updated = SystemTime::now();
187 self.version += 1;
188 }
189
190 pub fn update_stats(&mut self, estimated_keys: u64, estimated_size_bytes: u64) {
192 self.estimated_keys = estimated_keys;
193 self.estimated_size_bytes = estimated_size_bytes;
194 self.last_updated = SystemTime::now();
195 self.version += 1;
196 }
197
198 pub fn add_replica(&mut self, node_id: NodeId) -> RaftResult<()> {
200 if self.replicas.contains(&node_id) {
201 return Err(RaftError::ConfigError {
202 message: format!("Replica {} already exists for shard {}", node_id, self.id),
203 });
204 }
205 self.replicas.push(node_id);
206 self.last_updated = SystemTime::now();
207 self.version += 1;
208 Ok(())
209 }
210
211 pub fn remove_replica(&mut self, node_id: NodeId) -> RaftResult<()> {
213 let initial_len = self.replicas.len();
214 self.replicas.retain(|&id| id != node_id);
215 if self.replicas.len() == initial_len {
216 return Err(RaftError::ConfigError {
217 message: format!("Replica {} not found for shard {}", node_id, self.id),
218 });
219 }
220 self.last_updated = SystemTime::now();
221 self.version += 1;
222 Ok(())
223 }
224
225 pub fn is_hot(&self, key_threshold: u64, size_threshold: u64) -> bool {
227 self.estimated_keys > key_threshold || self.estimated_size_bytes > size_threshold
228 }
229
230 pub fn is_cold(&self, key_threshold: u64, size_threshold: u64) -> bool {
232 self.estimated_keys < key_threshold && self.estimated_size_bytes < size_threshold
233 }
234
235 pub fn is_stale(&self, max_age: Duration) -> bool {
237 self.last_updated
238 .elapsed()
239 .map(|elapsed| elapsed > max_age)
240 .unwrap_or(false)
241 }
242}
243
244#[derive(Debug, Clone)]
246pub struct ShardSplit {
247 pub source_shard_id: ShardId,
249 pub left_shard_id: ShardId,
251 pub right_shard_id: ShardId,
253 pub split_key: Key,
255 pub initiated_at: SystemTime,
257}
258
259impl ShardSplit {
260 pub fn new(
262 source_shard_id: ShardId,
263 left_shard_id: ShardId,
264 right_shard_id: ShardId,
265 split_key: Key,
266 ) -> Self {
267 Self {
268 source_shard_id,
269 left_shard_id,
270 right_shard_id,
271 split_key,
272 initiated_at: SystemTime::now(),
273 }
274 }
275
276 pub fn create_shards(
278 &self,
279 source: &ShardMetadata,
280 ) -> RaftResult<(ShardMetadata, ShardMetadata)> {
281 let left_range = KeyRange::new(source.range.start.clone(), self.split_key.clone())?;
283 let mut left_shard = ShardMetadata::new(self.left_shard_id, left_range, source.node_id);
284 left_shard.replicas = source.replicas.clone();
285
286 let right_range = KeyRange::new(self.split_key.clone(), source.range.end.clone())?;
288 let mut right_shard = ShardMetadata::new(self.right_shard_id, right_range, source.node_id);
289 right_shard.replicas = source.replicas.clone();
290
291 left_shard.estimated_keys = source.estimated_keys / 2;
293 left_shard.estimated_size_bytes = source.estimated_size_bytes / 2;
294 right_shard.estimated_keys = source.estimated_keys / 2;
295 right_shard.estimated_size_bytes = source.estimated_size_bytes / 2;
296
297 Ok((left_shard, right_shard))
298 }
299}
300
301#[derive(Debug, Clone)]
303pub struct ShardMerge {
304 pub left_shard_id: ShardId,
306 pub right_shard_id: ShardId,
308 pub target_shard_id: ShardId,
310 pub initiated_at: SystemTime,
312}
313
314impl ShardMerge {
315 pub fn new(left_shard_id: ShardId, right_shard_id: ShardId, target_shard_id: ShardId) -> Self {
317 Self {
318 left_shard_id,
319 right_shard_id,
320 target_shard_id,
321 initiated_at: SystemTime::now(),
322 }
323 }
324
325 pub fn validate(&self, left: &ShardMetadata, right: &ShardMetadata) -> RaftResult<()> {
327 if left.range.end != right.range.start {
329 return Err(RaftError::ConfigError {
330 message: format!(
331 "Shards {} and {} are not adjacent (left.end={:?}, right.start={:?})",
332 left.id, right.id, left.range.end, right.range.start
333 ),
334 });
335 }
336
337 if left.node_id != right.node_id {
339 return Err(RaftError::ConfigError {
340 message: format!(
341 "Shards {} and {} are on different nodes ({} vs {})",
342 left.id, right.id, left.node_id, right.node_id
343 ),
344 });
345 }
346
347 Ok(())
348 }
349
350 pub fn create_merged_shard(
352 &self,
353 left: &ShardMetadata,
354 right: &ShardMetadata,
355 ) -> RaftResult<ShardMetadata> {
356 self.validate(left, right)?;
357
358 let merged_range = KeyRange::new(left.range.start.clone(), right.range.end.clone())?;
359
360 let mut merged = ShardMetadata::new(self.target_shard_id, merged_range, left.node_id);
361
362 merged.estimated_keys = left.estimated_keys + right.estimated_keys;
364 merged.estimated_size_bytes = left.estimated_size_bytes + right.estimated_size_bytes;
365
366 merged.replicas = left.replicas.clone();
368
369 Ok(merged)
370 }
371}
372
373#[derive(Debug, Clone)]
375pub struct ShardTransfer {
376 pub shard_id: ShardId,
378 pub from_node: NodeId,
380 pub to_node: NodeId,
382 pub progress: f64,
384 pub initiated_at: SystemTime,
386 pub estimated_completion: Option<SystemTime>,
388}
389
390impl ShardTransfer {
391 pub fn new(shard_id: ShardId, from_node: NodeId, to_node: NodeId) -> Self {
393 Self {
394 shard_id,
395 from_node,
396 to_node,
397 progress: 0.0,
398 initiated_at: SystemTime::now(),
399 estimated_completion: None,
400 }
401 }
402
403 pub fn update_progress(&mut self, progress: f64) {
405 self.progress = progress.clamp(0.0, 1.0);
406
407 if progress > 0.0 && progress < 1.0 {
409 if let Ok(elapsed) = self.initiated_at.elapsed() {
410 let total_time = elapsed.as_secs_f64() / progress;
411 let remaining_time = total_time * (1.0 - progress);
412 self.estimated_completion =
413 Some(SystemTime::now() + Duration::from_secs_f64(remaining_time));
414 }
415 }
416 }
417
418 pub fn is_complete(&self) -> bool {
420 self.progress >= 1.0
421 }
422}
423
424#[derive(Debug, Clone)]
426pub struct ShardRegistry {
427 shards: Arc<parking_lot::RwLock<BTreeMap<ShardId, ShardMetadata>>>,
429 next_shard_id: Arc<parking_lot::Mutex<ShardId>>,
431}
432
433impl ShardRegistry {
434 pub fn new() -> Self {
436 Self {
437 shards: Arc::new(parking_lot::RwLock::new(BTreeMap::new())),
438 next_shard_id: Arc::new(parking_lot::Mutex::new(1)),
439 }
440 }
441
442 pub fn allocate_shard_id(&self) -> ShardId {
444 let mut next_id = self.next_shard_id.lock();
445 let id = *next_id;
446 *next_id += 1;
447 id
448 }
449
450 pub fn register(&self, shard: ShardMetadata) -> RaftResult<()> {
452 let mut shards = self.shards.write();
453
454 for existing in shards.values() {
456 if existing.range.overlaps(&shard.range) {
457 return Err(RaftError::ConfigError {
458 message: format!(
459 "Shard {} range overlaps with existing shard {} range",
460 shard.id, existing.id
461 ),
462 });
463 }
464 }
465
466 shards.insert(shard.id, shard);
467 Ok(())
468 }
469
470 pub fn get(&self, shard_id: ShardId) -> Option<ShardMetadata> {
472 self.shards.read().get(&shard_id).cloned()
473 }
474
475 pub fn update(&self, shard: ShardMetadata) -> RaftResult<()> {
477 let mut shards = self.shards.write();
478 shards.insert(shard.id, shard);
479 Ok(())
480 }
481
482 pub fn remove(&self, shard_id: ShardId) -> RaftResult<()> {
484 let mut shards = self.shards.write();
485 shards
486 .remove(&shard_id)
487 .ok_or_else(|| RaftError::ConfigError {
488 message: format!("Shard {} not found", shard_id),
489 })?;
490 Ok(())
491 }
492
493 pub fn get_all(&self) -> Vec<ShardMetadata> {
495 self.shards.read().values().cloned().collect()
496 }
497
498 pub fn get_by_node(&self, node_id: NodeId) -> Vec<ShardMetadata> {
500 self.shards
501 .read()
502 .values()
503 .filter(|shard| shard.node_id == node_id)
504 .cloned()
505 .collect()
506 }
507
508 pub fn find_shard_for_key(&self, key: &Key) -> Option<ShardMetadata> {
510 self.shards
511 .read()
512 .values()
513 .find(|shard| shard.range.contains(key))
514 .cloned()
515 }
516
517 pub fn count(&self) -> usize {
519 self.shards.read().len()
520 }
521
522 pub fn execute_split(&self, split: &ShardSplit) -> RaftResult<()> {
524 let mut shards = self.shards.write();
525 let parent = shards
526 .get(&split.source_shard_id)
527 .ok_or_else(|| RaftError::Other {
528 message: format!("execute_split: shard {} not found", split.source_shard_id),
529 })?
530 .clone();
531 if parent.state != ShardState::Active {
532 return Err(RaftError::Other {
533 message: format!(
534 "execute_split: shard {} is not Active (state={})",
535 split.source_shard_id,
536 parent.state.as_str()
537 ),
538 });
539 }
540 let (left, right) = split.create_shards(&parent)?;
541 shards.insert(left.id, left);
542 shards.insert(right.id, right);
543 shards.remove(&split.source_shard_id);
544 Ok(())
545 }
546
547 pub fn execute_merge(&self, merge: &ShardMerge) -> RaftResult<()> {
549 let mut shards = self.shards.write();
550 let left = shards
551 .get(&merge.left_shard_id)
552 .ok_or_else(|| RaftError::Other {
553 message: format!(
554 "execute_merge: left shard {} not found",
555 merge.left_shard_id
556 ),
557 })?
558 .clone();
559 let right = shards
560 .get(&merge.right_shard_id)
561 .ok_or_else(|| RaftError::Other {
562 message: format!(
563 "execute_merge: right shard {} not found",
564 merge.right_shard_id
565 ),
566 })?
567 .clone();
568 if left.state != ShardState::Active {
569 return Err(RaftError::Other {
570 message: format!(
571 "execute_merge: left shard {} is not Active (state={})",
572 merge.left_shard_id,
573 left.state.as_str()
574 ),
575 });
576 }
577 if right.state != ShardState::Active {
578 return Err(RaftError::Other {
579 message: format!(
580 "execute_merge: right shard {} is not Active (state={})",
581 merge.right_shard_id,
582 right.state.as_str()
583 ),
584 });
585 }
586 let merged = merge.create_merged_shard(&left, &right)?;
588 shards.remove(&merge.left_shard_id);
589 shards.remove(&merge.right_shard_id);
590 shards.insert(merged.id, merged);
591 Ok(())
592 }
593
594 pub fn execute_transfer(&self, transfer: &ShardTransfer) -> RaftResult<()> {
600 let new_target_id = self.allocate_shard_id();
603
604 let mut shards = self.shards.write();
605 let source = shards
606 .get(&transfer.shard_id)
607 .ok_or_else(|| RaftError::Other {
608 message: format!("execute_transfer: shard {} not found", transfer.shard_id),
609 })?
610 .clone();
611 if source.state != ShardState::Active {
612 return Err(RaftError::Other {
613 message: format!(
614 "execute_transfer: shard {} is not Active (state={})",
615 transfer.shard_id,
616 source.state.as_str()
617 ),
618 });
619 }
620 let target_shard =
622 ShardMetadata::new(new_target_id, source.range.clone(), transfer.to_node);
623 let mut source_transferring = source;
625 source_transferring.set_state(ShardState::Transferring);
626 shards.insert(source_transferring.id, source_transferring);
628 shards.insert(new_target_id, target_shard);
629 Ok(())
630 }
631}
632
633impl Default for ShardRegistry {
634 fn default() -> Self {
635 Self::new()
636 }
637}
638
639#[cfg(test)]
640mod prop_tests {
641 use super::*;
642 use proptest::prelude::*;
643
644 fn arb_key_str(min: usize, max: usize) -> impl Strategy<Value = String> {
646 prop::collection::vec(b'a'..=b'z', min..=max)
647 .prop_map(|v| String::from_utf8(v).expect("valid utf-8"))
648 }
649
650 proptest! {
651 #[test]
652 fn prop_key_range_contains_consistent(
653 start in arb_key_str(1, 8),
654 mid in arb_key_str(1, 8),
655 end in arb_key_str(1, 8),
656 ) {
657 prop_assume!(start < end);
659 let range = match KeyRange::new(Key::from_str(&start), Key::from_str(&end)) {
660 Ok(r) => r,
661 Err(_) => return Ok(()),
662 };
663 let mid_key = Key::from_str(&mid);
664 let expected = mid >= start && mid < end;
666 prop_assert_eq!(
667 range.contains(&mid_key),
668 expected,
669 "contains({:?}) in [{:?}, {:?}) should be {}",
670 mid,
671 start,
672 end,
673 expected
674 );
675 }
676
677 #[test]
678 fn prop_key_range_midpoint_is_between_bounds(
679 start in arb_key_str(1, 5),
680 end in arb_key_str(6, 12),
681 ) {
682 prop_assume!(start < end);
685 let range = match KeyRange::new(Key::from_str(&start), Key::from_str(&end)) {
686 Ok(r) => r,
687 Err(_) => return Ok(()),
688 };
689 let mid = range.midpoint();
690 prop_assert!(
692 mid >= range.start,
693 "midpoint {:?} must be >= start {:?}",
694 mid,
695 range.start
696 );
697 prop_assert!(
698 mid < range.end,
699 "midpoint {:?} must be < end {:?}",
700 mid,
701 range.end
702 );
703 }
704
705 #[test]
706 fn prop_key_range_split_no_overlap(
707 start in arb_key_str(1, 4),
708 end in arb_key_str(8, 12),
709 ) {
710 prop_assume!(start < end);
712 let range = match KeyRange::new(Key::from_str(&start), Key::from_str(&end)) {
713 Ok(r) => r,
714 Err(_) => return Ok(()),
715 };
716 let mid = range.midpoint();
717 let (left, right) = match (
718 KeyRange::new(range.start.clone(), mid.clone()),
719 KeyRange::new(mid.clone(), range.end.clone()),
720 ) {
721 (Ok(l), Ok(r)) => (l, r),
722 _ => return Ok(()), };
724 let test_key = Key::from_str(&start);
726 let in_left = left.contains(&test_key);
727 let in_right = right.contains(&test_key);
728 prop_assert!(
729 !(in_left && in_right),
730 "start key {:?} must not be in both halves after split",
731 test_key
732 );
733 }
734
735 #[test]
736 fn prop_shard_registry_count_matches_unique_registrations(
737 raw_ids in prop::collection::vec(1u64..=20u64, 1..=8)
738 ) {
739 let registry = ShardRegistry::new();
741 let mut distinct_ids: Vec<u64> = raw_ids.clone();
744 distinct_ids.sort_unstable();
745 distinct_ids.dedup();
746
747 let mut registered = 0usize;
749 for (slot, _id) in distinct_ids.iter().enumerate() {
750 let start_byte = slot as u8;
751 if start_byte == u8::MAX {
753 break;
754 }
755 let range = match KeyRange::new(
756 Key::from_slice(&[start_byte]),
757 Key::from_slice(&[start_byte + 1]),
758 ) {
759 Ok(r) => r,
760 Err(_) => continue,
761 };
762 let shard_id = registry.allocate_shard_id();
763 let shard = ShardMetadata::new(shard_id, range, 1);
764 if registry.register(shard).is_ok() {
765 registered += 1;
766 }
767 }
768
769 let count = registry.count();
770 prop_assert_eq!(
771 count,
772 registered,
773 "registry.count() must equal number of successfully registered shards"
774 );
775 }
776
777 #[test]
778 fn prop_shard_registry_find_key_correctness(
779 start_byte in 0u8..100u8,
780 end_byte in 101u8..=200u8,
781 query_byte in 0u8..=255u8,
782 ) {
783 let registry = ShardRegistry::new();
785 let range = match KeyRange::new(
786 Key::from_slice(&[start_byte]),
787 Key::from_slice(&[end_byte]),
788 ) {
789 Ok(r) => r,
790 Err(_) => return Ok(()),
791 };
792 let shard_id = registry.allocate_shard_id();
793 let shard = ShardMetadata::new(shard_id, range, 1);
794 registry.register(shard).expect("register shard");
795
796 let query = Key::from_slice(&[query_byte]);
797 let found = registry.find_shard_for_key(&query);
798 let in_range = query_byte >= start_byte && query_byte < end_byte;
799 prop_assert_eq!(
800 found.is_some(),
801 in_range,
802 "find_shard_for_key({}) in [{}, {}) should be {}",
803 query_byte,
804 start_byte,
805 end_byte,
806 in_range
807 );
808 }
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815
816 #[test]
817 fn test_shard_state() {
818 assert!(ShardState::Active.can_read());
819 assert!(ShardState::Active.can_write());
820 assert!(ShardState::Splitting.can_read());
821 assert!(!ShardState::Splitting.can_write());
822 assert!(!ShardState::Offline.can_read());
823 assert!(!ShardState::Offline.can_write());
824 }
825
826 #[test]
827 fn test_key_range_contains() -> RaftResult<()> {
828 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
829
830 assert!(range.contains(&Key::from_str("m")));
831 assert!(range.contains(&Key::from_str("a")));
832 assert!(!range.contains(&Key::from_str("z")));
833 assert!(range.contains(&Key::from_str("aa"))); assert!(!range.contains(&Key::from_str("{"))); Ok(())
837 }
838
839 #[test]
840 fn test_key_range_overlaps() -> RaftResult<()> {
841 let range1 = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
842 let range2 = KeyRange::new(Key::from_str("g"), Key::from_str("z"))?;
843 let range3 = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
844
845 assert!(range1.overlaps(&range2));
846 assert!(range2.overlaps(&range1));
847 assert!(!range1.overlaps(&range3));
848
849 Ok(())
850 }
851
852 #[test]
853 fn test_key_range_midpoint() -> RaftResult<()> {
854 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
855
856 let mid = range.midpoint();
857 assert!(mid > range.start);
858 assert!(mid < range.end);
859
860 Ok(())
861 }
862
863 #[test]
864 fn test_shard_metadata_creation() {
865 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z")).expect("valid range");
866 let shard = ShardMetadata::new(1, range, 100);
867
868 assert_eq!(shard.id, 1);
869 assert_eq!(shard.node_id, 100);
870 assert_eq!(shard.state, ShardState::Active);
871 assert_eq!(shard.version, 1);
872 }
873
874 #[test]
875 fn test_shard_metadata_update_stats() {
876 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z")).expect("valid range");
877 let mut shard = ShardMetadata::new(1, range, 100);
878
879 let initial_version = shard.version;
880 shard.update_stats(1000, 50000);
881
882 assert_eq!(shard.estimated_keys, 1000);
883 assert_eq!(shard.estimated_size_bytes, 50000);
884 assert_eq!(shard.version, initial_version + 1);
885 }
886
887 #[test]
888 fn test_shard_metadata_replicas() -> RaftResult<()> {
889 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
890 let mut shard = ShardMetadata::new(1, range, 100);
891
892 shard.add_replica(101)?;
893 shard.add_replica(102)?;
894 assert_eq!(shard.replicas.len(), 2);
895
896 assert!(shard.add_replica(101).is_err());
897
898 shard.remove_replica(101)?;
899 assert_eq!(shard.replicas.len(), 1);
900 assert!(shard.replicas.contains(&102));
901
902 Ok(())
903 }
904
905 #[test]
906 fn test_shard_split() -> RaftResult<()> {
907 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
908 let mut source = ShardMetadata::new(1, range, 100);
909 source.update_stats(1000, 100000);
910
911 let split = ShardSplit::new(1, 2, 3, Key::from_str("m"));
912 let (left, right) = split.create_shards(&source)?;
913
914 assert_eq!(left.id, 2);
915 assert_eq!(right.id, 3);
916 assert_eq!(left.range.end, Key::from_str("m"));
917 assert_eq!(right.range.start, Key::from_str("m"));
918 assert_eq!(left.estimated_keys, 500);
919 assert_eq!(right.estimated_keys, 500);
920
921 Ok(())
922 }
923
924 #[test]
925 fn test_shard_merge() -> RaftResult<()> {
926 let left_range = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
927 let right_range = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
928
929 let mut left = ShardMetadata::new(1, left_range, 100);
930 let mut right = ShardMetadata::new(2, right_range, 100);
931
932 left.update_stats(500, 50000);
933 right.update_stats(500, 50000);
934
935 let merge = ShardMerge::new(1, 2, 3);
936 let merged = merge.create_merged_shard(&left, &right)?;
937
938 assert_eq!(merged.id, 3);
939 assert_eq!(merged.range.start, Key::from_str("a"));
940 assert_eq!(merged.range.end, Key::from_str("z"));
941 assert_eq!(merged.estimated_keys, 1000);
942 assert_eq!(merged.estimated_size_bytes, 100000);
943
944 Ok(())
945 }
946
947 #[test]
948 fn test_shard_transfer() {
949 let mut transfer = ShardTransfer::new(1, 100, 101);
950 assert_eq!(transfer.progress, 0.0);
951 assert!(!transfer.is_complete());
952
953 transfer.update_progress(0.5);
954 assert_eq!(transfer.progress, 0.5);
955 assert!(!transfer.is_complete());
956
957 transfer.update_progress(1.0);
958 assert!(transfer.is_complete());
959 }
960
961 #[test]
962 fn test_shard_registry() -> RaftResult<()> {
963 let registry = ShardRegistry::new();
964
965 let id1 = registry.allocate_shard_id();
966 let id2 = registry.allocate_shard_id();
967 assert_ne!(id1, id2);
968
969 let range1 = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
970 let shard1 = ShardMetadata::new(id1, range1, 100);
971 registry.register(shard1.clone())?;
972
973 let retrieved = registry.get(id1);
974 assert!(retrieved.is_some());
975 assert_eq!(
976 retrieved
977 .expect("Shard should be retrieved from registry")
978 .id,
979 id1
980 );
981
982 let found = registry.find_shard_for_key(&Key::from_str("g"));
983 assert!(found.is_some());
984 assert_eq!(found.expect("Shard should be found for key").id, id1);
985
986 assert_eq!(registry.count(), 1);
987
988 Ok(())
989 }
990
991 #[test]
992 fn test_shard_registry_overlapping_ranges() -> RaftResult<()> {
993 let registry = ShardRegistry::new();
994
995 let range1 = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
996 let shard1 = ShardMetadata::new(1, range1, 100);
997 registry.register(shard1)?;
998
999 let range2 = KeyRange::new(Key::from_str("g"), Key::from_str("z"))?;
1000 let shard2 = ShardMetadata::new(2, range2, 100);
1001 let result = registry.register(shard2);
1002
1003 assert!(result.is_err());
1004
1005 Ok(())
1006 }
1007
1008 #[test]
1009 fn test_hot_cold_shards() {
1010 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z")).expect("valid range");
1011 let mut shard = ShardMetadata::new(1, range, 100);
1012
1013 shard.update_stats(1000, 50000);
1014 assert!(shard.is_hot(500, 25000));
1015 assert!(!shard.is_cold(500, 25000));
1016
1017 shard.update_stats(100, 5000);
1018 assert!(!shard.is_hot(500, 25000));
1019 assert!(shard.is_cold(500, 25000));
1020 }
1021
1022 #[test]
1023 fn test_execute_split() -> RaftResult<()> {
1024 let registry = ShardRegistry::new();
1025
1026 let src_id = registry.allocate_shard_id();
1028 let left_id = registry.allocate_shard_id();
1029 let right_id = registry.allocate_shard_id();
1030
1031 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1032 let mut source = ShardMetadata::new(src_id, range, 1);
1033 source.update_stats(1000, 100_000);
1034 registry.register(source)?;
1035
1036 let split = ShardSplit::new(src_id, left_id, right_id, Key::from_str("m"));
1037 registry.execute_split(&split)?;
1038
1039 assert!(
1041 registry.get(src_id).is_none(),
1042 "source shard must be removed after split"
1043 );
1044
1045 let left = registry.get(left_id).expect("left child shard must exist");
1047 let right = registry
1048 .get(right_id)
1049 .expect("right child shard must exist");
1050 assert_eq!(left.state, ShardState::Active);
1051 assert_eq!(right.state, ShardState::Active);
1052 assert_eq!(left.range.start, Key::from_str("a"));
1053 assert_eq!(left.range.end, Key::from_str("m"));
1054 assert_eq!(right.range.start, Key::from_str("m"));
1055 assert_eq!(right.range.end, Key::from_str("z"));
1056 assert_eq!(registry.count(), 2);
1057
1058 Ok(())
1059 }
1060
1061 #[test]
1062 fn test_execute_split_non_active_fails() -> RaftResult<()> {
1063 let registry = ShardRegistry::new();
1064
1065 let src_id = registry.allocate_shard_id();
1066 let left_id = registry.allocate_shard_id();
1067 let right_id = registry.allocate_shard_id();
1068
1069 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1070 let mut source = ShardMetadata::new(src_id, range, 1);
1071 source.set_state(ShardState::Offline);
1072 registry.register(source)?;
1073
1074 let split = ShardSplit::new(src_id, left_id, right_id, Key::from_str("m"));
1075 let result = registry.execute_split(&split);
1076 assert!(result.is_err(), "split of non-Active shard must fail");
1077
1078 Ok(())
1079 }
1080
1081 #[test]
1082 fn test_execute_merge() -> RaftResult<()> {
1083 let registry = ShardRegistry::new();
1084
1085 let left_id = registry.allocate_shard_id();
1086 let right_id = registry.allocate_shard_id();
1087 let merged_id = registry.allocate_shard_id();
1088
1089 let left_range = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
1090 let right_range = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
1091
1092 let mut left = ShardMetadata::new(left_id, left_range, 1);
1093 left.update_stats(500, 50_000);
1094 let mut right = ShardMetadata::new(right_id, right_range, 1);
1095 right.update_stats(500, 50_000);
1096
1097 registry.register(left)?;
1098 registry.register(right)?;
1099
1100 let merge = ShardMerge::new(left_id, right_id, merged_id);
1101 registry.execute_merge(&merge)?;
1102
1103 assert!(
1105 registry.get(left_id).is_none(),
1106 "left source must be removed after merge"
1107 );
1108 assert!(
1109 registry.get(right_id).is_none(),
1110 "right source must be removed after merge"
1111 );
1112
1113 let merged = registry.get(merged_id).expect("merged shard must exist");
1115 assert_eq!(merged.state, ShardState::Active);
1116 assert_eq!(merged.range.start, Key::from_str("a"));
1117 assert_eq!(merged.range.end, Key::from_str("z"));
1118 assert_eq!(merged.estimated_keys, 1000);
1119 assert_eq!(merged.estimated_size_bytes, 100_000);
1120 assert_eq!(registry.count(), 1);
1121
1122 Ok(())
1123 }
1124
1125 #[test]
1126 fn test_execute_merge_non_active_fails() -> RaftResult<()> {
1127 let registry = ShardRegistry::new();
1128
1129 let left_id = registry.allocate_shard_id();
1130 let right_id = registry.allocate_shard_id();
1131 let merged_id = registry.allocate_shard_id();
1132
1133 let left_range = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
1134 let right_range = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
1135
1136 let left = ShardMetadata::new(left_id, left_range, 1);
1137 let mut right = ShardMetadata::new(right_id, right_range, 1);
1138 right.set_state(ShardState::Merging);
1139
1140 registry.register(left)?;
1141 registry.register(right)?;
1142
1143 let merge = ShardMerge::new(left_id, right_id, merged_id);
1144 let result = registry.execute_merge(&merge);
1145 assert!(result.is_err(), "merge with non-Active shard must fail");
1146
1147 Ok(())
1148 }
1149
1150 #[test]
1151 fn test_execute_transfer() -> RaftResult<()> {
1152 let registry = ShardRegistry::new();
1153
1154 let src_id = registry.allocate_shard_id();
1155 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1156 let source = ShardMetadata::new(src_id, range, 1);
1157 registry.register(source)?;
1158
1159 let transfer = ShardTransfer::new(src_id, 1, 2);
1160 registry.execute_transfer(&transfer)?;
1161
1162 let updated_source = registry.get(src_id).expect("source shard must still exist");
1164 assert_eq!(
1165 updated_source.state,
1166 ShardState::Transferring,
1167 "source shard must be Transferring after transfer initiation"
1168 );
1169 assert_eq!(
1170 updated_source.node_id, 1,
1171 "source node_id must be unchanged"
1172 );
1173
1174 let all_shards = registry.get_all();
1176 assert_eq!(
1177 all_shards.len(),
1178 2,
1179 "registry must have exactly two shards (source + target)"
1180 );
1181
1182 let target_shard = all_shards
1183 .iter()
1184 .find(|s| s.id != src_id)
1185 .expect("target shard must exist");
1186 assert_eq!(target_shard.state, ShardState::Active);
1187 assert_eq!(target_shard.node_id, 2);
1188 assert_eq!(target_shard.range.start, Key::from_str("a"));
1189 assert_eq!(target_shard.range.end, Key::from_str("z"));
1190
1191 Ok(())
1192 }
1193
1194 #[test]
1195 fn test_execute_transfer_non_active_fails() -> RaftResult<()> {
1196 let registry = ShardRegistry::new();
1197
1198 let src_id = registry.allocate_shard_id();
1199 let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1200 let mut source = ShardMetadata::new(src_id, range, 1);
1201 source.set_state(ShardState::Transferring);
1202 registry.register(source)?;
1203
1204 let transfer = ShardTransfer::new(src_id, 1, 2);
1205 let result = registry.execute_transfer(&transfer);
1206 assert!(result.is_err(), "transfer of non-Active shard must fail");
1207
1208 Ok(())
1209 }
1210}