Skip to main content

amaters_cluster/
shard.rs

1//! Shard metadata and operations
2//!
3//! This module defines the core types for distributed sharding in AmateRS.
4//! It handles shard metadata, split/merge operations, and data migration.
5
6use crate::error::{RaftError, RaftResult};
7use crate::types::NodeId;
8use amaters_core::Key;
9use std::collections::BTreeMap;
10use std::sync::Arc;
11use std::time::{Duration, SystemTime};
12
13/// Unique identifier for a shard
14pub type ShardId = u64;
15
16/// Shard state
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ShardState {
19    /// Shard is active and serving requests
20    Active,
21    /// Shard is being split into two shards
22    Splitting,
23    /// Shard is being merged with another shard
24    Merging,
25    /// Shard is being transferred to another node
26    Transferring,
27    /// Shard is offline (node failure or maintenance)
28    Offline,
29}
30
31impl ShardState {
32    /// Check if the shard can serve read requests
33    pub fn can_read(&self) -> bool {
34        matches!(
35            self,
36            ShardState::Active | ShardState::Splitting | ShardState::Transferring
37        )
38    }
39
40    /// Check if the shard can serve write requests
41    pub fn can_write(&self) -> bool {
42        matches!(self, ShardState::Active)
43    }
44
45    /// Get the state name as a string
46    pub fn as_str(&self) -> &'static str {
47        match self {
48            ShardState::Active => "Active",
49            ShardState::Splitting => "Splitting",
50            ShardState::Merging => "Merging",
51            ShardState::Transferring => "Transferring",
52            ShardState::Offline => "Offline",
53        }
54    }
55}
56
57/// Key range for a shard
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct KeyRange {
60    /// Start key (inclusive)
61    pub start: Key,
62    /// End key (exclusive)
63    pub end: Key,
64}
65
66impl KeyRange {
67    /// Create a new key range
68    pub fn new(start: Key, end: Key) -> RaftResult<Self> {
69        if start >= end {
70            return Err(RaftError::ConfigError {
71                message: format!("Invalid key range: start {:?} >= end {:?}", start, end),
72            });
73        }
74        Ok(Self { start, end })
75    }
76
77    /// Check if a key is within this range
78    pub fn contains(&self, key: &Key) -> bool {
79        key >= &self.start && key < &self.end
80    }
81
82    /// Check if this range overlaps with another range
83    pub fn overlaps(&self, other: &KeyRange) -> bool {
84        self.start < other.end && other.start < self.end
85    }
86
87    /// Calculate the midpoint key for splitting.
88    ///
89    /// Returns a key M such that `self.start <= M < self.end`.
90    ///
91    /// Both keys are padded to the same length with trailing zero bytes and
92    /// treated as big-endian integers, so keys of different lengths are handled
93    /// correctly (e.g. midpoint("y", "yyyyyy") yields [0x79, 0x3C, …], not "z").
94    pub fn midpoint(&self) -> Key {
95        let start_bytes = self.start.as_bytes();
96        let end_bytes = self.end.as_bytes();
97        let max_len = start_bytes.len().max(end_bytes.len());
98
99        let get_byte = |v: &[u8], i: usize| -> u16 { v.get(i).copied().unwrap_or(0) as u16 };
100
101        // Step 1: byte-wise sum with right-to-left carry propagation.
102        let mut sum: Vec<u16> = (0..max_len)
103            .map(|i| get_byte(start_bytes, i) + get_byte(end_bytes, i))
104            .collect();
105        let mut carry: u16 = 0;
106        for b in sum.iter_mut().rev() {
107            let v = *b + carry;
108            *b = v & 0xFF;
109            carry = v >> 8; // 0 or 1
110        }
111        // carry: 0 or 1 — overflow bit above position 0
112
113        // Step 2: divide (carry || sum) by 2 via left-to-right bit-shift.
114        let mut mid: Vec<u8> = Vec::with_capacity(max_len);
115        let mut half_carry = carry; // the leading overflow bit
116        for b in &sum {
117            let val = half_carry * 256 + b;
118            mid.push((val / 2) as u8);
119            half_carry = val % 2;
120        }
121        // Discard the trailing remainder bit — it truncates (floor), keeping mid < end.
122
123        // Note: we intentionally do NOT strip trailing zeros here. Stripping could
124        // make `mid` lexicographically shorter than `start`, e.g. when both keys
125        // begin with 0x00 bytes. The extra trailing zeros are harmless — they just
126        // extend the resulting key to the same length as the longer of the two inputs.
127
128        Key::from_slice(&mid)
129    }
130
131    /// Create a range that covers all possible keys
132    pub fn full() -> Self {
133        Self {
134            start: Key::from_slice(&[0u8]),
135            end: Key::from_slice(&[0xFFu8; 32]),
136        }
137    }
138}
139
140/// Shard metadata tracking
141#[derive(Debug, Clone)]
142pub struct ShardMetadata {
143    /// Unique shard identifier
144    pub id: ShardId,
145    /// Key range this shard is responsible for
146    pub range: KeyRange,
147    /// Current state of the shard
148    pub state: ShardState,
149    /// Node ID where this shard is located
150    pub node_id: NodeId,
151    /// Replica node IDs (for fault tolerance)
152    pub replicas: Vec<NodeId>,
153    /// Estimated number of keys in this shard
154    pub estimated_keys: u64,
155    /// Estimated size in bytes
156    pub estimated_size_bytes: u64,
157    /// Last update timestamp
158    pub last_updated: SystemTime,
159    /// Creation timestamp
160    pub created_at: SystemTime,
161    /// Version number for optimistic concurrency control
162    pub version: u64,
163}
164
165impl ShardMetadata {
166    /// Create new shard metadata
167    pub fn new(id: ShardId, range: KeyRange, node_id: NodeId) -> Self {
168        let now = SystemTime::now();
169        Self {
170            id,
171            range,
172            state: ShardState::Active,
173            node_id,
174            replicas: Vec::new(),
175            estimated_keys: 0,
176            estimated_size_bytes: 0,
177            last_updated: now,
178            created_at: now,
179            version: 1,
180        }
181    }
182
183    /// Update shard state
184    pub fn set_state(&mut self, state: ShardState) {
185        self.state = state;
186        self.last_updated = SystemTime::now();
187        self.version += 1;
188    }
189
190    /// Update shard statistics
191    pub fn update_stats(&mut self, estimated_keys: u64, estimated_size_bytes: u64) {
192        self.estimated_keys = estimated_keys;
193        self.estimated_size_bytes = estimated_size_bytes;
194        self.last_updated = SystemTime::now();
195        self.version += 1;
196    }
197
198    /// Add a replica
199    pub fn add_replica(&mut self, node_id: NodeId) -> RaftResult<()> {
200        if self.replicas.contains(&node_id) {
201            return Err(RaftError::ConfigError {
202                message: format!("Replica {} already exists for shard {}", node_id, self.id),
203            });
204        }
205        self.replicas.push(node_id);
206        self.last_updated = SystemTime::now();
207        self.version += 1;
208        Ok(())
209    }
210
211    /// Remove a replica
212    pub fn remove_replica(&mut self, node_id: NodeId) -> RaftResult<()> {
213        let initial_len = self.replicas.len();
214        self.replicas.retain(|&id| id != node_id);
215        if self.replicas.len() == initial_len {
216            return Err(RaftError::ConfigError {
217                message: format!("Replica {} not found for shard {}", node_id, self.id),
218            });
219        }
220        self.last_updated = SystemTime::now();
221        self.version += 1;
222        Ok(())
223    }
224
225    /// Check if this shard is hot (exceeds threshold)
226    pub fn is_hot(&self, key_threshold: u64, size_threshold: u64) -> bool {
227        self.estimated_keys > key_threshold || self.estimated_size_bytes > size_threshold
228    }
229
230    /// Check if this shard is cold (below threshold)
231    pub fn is_cold(&self, key_threshold: u64, size_threshold: u64) -> bool {
232        self.estimated_keys < key_threshold && self.estimated_size_bytes < size_threshold
233    }
234
235    /// Check if the shard metadata is stale
236    pub fn is_stale(&self, max_age: Duration) -> bool {
237        self.last_updated
238            .elapsed()
239            .map(|elapsed| elapsed > max_age)
240            .unwrap_or(false)
241    }
242}
243
244/// Shard split operation descriptor
245#[derive(Debug, Clone)]
246pub struct ShardSplit {
247    /// Source shard ID
248    pub source_shard_id: ShardId,
249    /// First new shard ID (left range)
250    pub left_shard_id: ShardId,
251    /// Second new shard ID (right range)
252    pub right_shard_id: ShardId,
253    /// Split point key
254    pub split_key: Key,
255    /// Timestamp when split was initiated
256    pub initiated_at: SystemTime,
257}
258
259impl ShardSplit {
260    /// Create a new shard split descriptor
261    pub fn new(
262        source_shard_id: ShardId,
263        left_shard_id: ShardId,
264        right_shard_id: ShardId,
265        split_key: Key,
266    ) -> Self {
267        Self {
268            source_shard_id,
269            left_shard_id,
270            right_shard_id,
271            split_key,
272            initiated_at: SystemTime::now(),
273        }
274    }
275
276    /// Create left and right shard metadata from source
277    pub fn create_shards(
278        &self,
279        source: &ShardMetadata,
280    ) -> RaftResult<(ShardMetadata, ShardMetadata)> {
281        // Create left shard (start to split_key)
282        let left_range = KeyRange::new(source.range.start.clone(), self.split_key.clone())?;
283        let mut left_shard = ShardMetadata::new(self.left_shard_id, left_range, source.node_id);
284        left_shard.replicas = source.replicas.clone();
285
286        // Create right shard (split_key to end)
287        let right_range = KeyRange::new(self.split_key.clone(), source.range.end.clone())?;
288        let mut right_shard = ShardMetadata::new(self.right_shard_id, right_range, source.node_id);
289        right_shard.replicas = source.replicas.clone();
290
291        // Estimate stats (simple equal split assumption)
292        left_shard.estimated_keys = source.estimated_keys / 2;
293        left_shard.estimated_size_bytes = source.estimated_size_bytes / 2;
294        right_shard.estimated_keys = source.estimated_keys / 2;
295        right_shard.estimated_size_bytes = source.estimated_size_bytes / 2;
296
297        Ok((left_shard, right_shard))
298    }
299}
300
301/// Shard merge operation descriptor
302#[derive(Debug, Clone)]
303pub struct ShardMerge {
304    /// First source shard ID (should have lower key range)
305    pub left_shard_id: ShardId,
306    /// Second source shard ID (should have higher key range)
307    pub right_shard_id: ShardId,
308    /// Target merged shard ID
309    pub target_shard_id: ShardId,
310    /// Timestamp when merge was initiated
311    pub initiated_at: SystemTime,
312}
313
314impl ShardMerge {
315    /// Create a new shard merge descriptor
316    pub fn new(left_shard_id: ShardId, right_shard_id: ShardId, target_shard_id: ShardId) -> Self {
317        Self {
318            left_shard_id,
319            right_shard_id,
320            target_shard_id,
321            initiated_at: SystemTime::now(),
322        }
323    }
324
325    /// Validate that two shards can be merged
326    pub fn validate(&self, left: &ShardMetadata, right: &ShardMetadata) -> RaftResult<()> {
327        // Check that key ranges are adjacent
328        if left.range.end != right.range.start {
329            return Err(RaftError::ConfigError {
330                message: format!(
331                    "Shards {} and {} are not adjacent (left.end={:?}, right.start={:?})",
332                    left.id, right.id, left.range.end, right.range.start
333                ),
334            });
335        }
336
337        // Check that shards are on the same node
338        if left.node_id != right.node_id {
339            return Err(RaftError::ConfigError {
340                message: format!(
341                    "Shards {} and {} are on different nodes ({} vs {})",
342                    left.id, right.id, left.node_id, right.node_id
343                ),
344            });
345        }
346
347        Ok(())
348    }
349
350    /// Create merged shard metadata
351    pub fn create_merged_shard(
352        &self,
353        left: &ShardMetadata,
354        right: &ShardMetadata,
355    ) -> RaftResult<ShardMetadata> {
356        self.validate(left, right)?;
357
358        let merged_range = KeyRange::new(left.range.start.clone(), right.range.end.clone())?;
359
360        let mut merged = ShardMetadata::new(self.target_shard_id, merged_range, left.node_id);
361
362        // Combine statistics
363        merged.estimated_keys = left.estimated_keys + right.estimated_keys;
364        merged.estimated_size_bytes = left.estimated_size_bytes + right.estimated_size_bytes;
365
366        // Use replicas from left shard (should be same as right)
367        merged.replicas = left.replicas.clone();
368
369        Ok(merged)
370    }
371}
372
373/// Shard transfer operation descriptor
374#[derive(Debug, Clone)]
375pub struct ShardTransfer {
376    /// Shard ID being transferred
377    pub shard_id: ShardId,
378    /// Source node ID
379    pub from_node: NodeId,
380    /// Destination node ID
381    pub to_node: NodeId,
382    /// Transfer progress (0.0 to 1.0)
383    pub progress: f64,
384    /// Timestamp when transfer was initiated
385    pub initiated_at: SystemTime,
386    /// Estimated completion time
387    pub estimated_completion: Option<SystemTime>,
388}
389
390impl ShardTransfer {
391    /// Create a new shard transfer descriptor
392    pub fn new(shard_id: ShardId, from_node: NodeId, to_node: NodeId) -> Self {
393        Self {
394            shard_id,
395            from_node,
396            to_node,
397            progress: 0.0,
398            initiated_at: SystemTime::now(),
399            estimated_completion: None,
400        }
401    }
402
403    /// Update transfer progress
404    pub fn update_progress(&mut self, progress: f64) {
405        self.progress = progress.clamp(0.0, 1.0);
406
407        // Estimate completion time based on progress
408        if progress > 0.0 && progress < 1.0 {
409            if let Ok(elapsed) = self.initiated_at.elapsed() {
410                let total_time = elapsed.as_secs_f64() / progress;
411                let remaining_time = total_time * (1.0 - progress);
412                self.estimated_completion =
413                    Some(SystemTime::now() + Duration::from_secs_f64(remaining_time));
414            }
415        }
416    }
417
418    /// Check if transfer is complete
419    pub fn is_complete(&self) -> bool {
420        self.progress >= 1.0
421    }
422}
423
424/// Shard registry for tracking all shards in the cluster
425#[derive(Debug, Clone)]
426pub struct ShardRegistry {
427    /// Map from shard ID to shard metadata
428    shards: Arc<parking_lot::RwLock<BTreeMap<ShardId, ShardMetadata>>>,
429    /// Next available shard ID
430    next_shard_id: Arc<parking_lot::Mutex<ShardId>>,
431}
432
433impl ShardRegistry {
434    /// Create a new shard registry
435    pub fn new() -> Self {
436        Self {
437            shards: Arc::new(parking_lot::RwLock::new(BTreeMap::new())),
438            next_shard_id: Arc::new(parking_lot::Mutex::new(1)),
439        }
440    }
441
442    /// Allocate a new shard ID
443    pub fn allocate_shard_id(&self) -> ShardId {
444        let mut next_id = self.next_shard_id.lock();
445        let id = *next_id;
446        *next_id += 1;
447        id
448    }
449
450    /// Register a new shard
451    pub fn register(&self, shard: ShardMetadata) -> RaftResult<()> {
452        let mut shards = self.shards.write();
453
454        // Check for overlapping ranges
455        for existing in shards.values() {
456            if existing.range.overlaps(&shard.range) {
457                return Err(RaftError::ConfigError {
458                    message: format!(
459                        "Shard {} range overlaps with existing shard {} range",
460                        shard.id, existing.id
461                    ),
462                });
463            }
464        }
465
466        shards.insert(shard.id, shard);
467        Ok(())
468    }
469
470    /// Get shard metadata by ID
471    pub fn get(&self, shard_id: ShardId) -> Option<ShardMetadata> {
472        self.shards.read().get(&shard_id).cloned()
473    }
474
475    /// Update shard metadata
476    pub fn update(&self, shard: ShardMetadata) -> RaftResult<()> {
477        let mut shards = self.shards.write();
478        shards.insert(shard.id, shard);
479        Ok(())
480    }
481
482    /// Remove a shard
483    pub fn remove(&self, shard_id: ShardId) -> RaftResult<()> {
484        let mut shards = self.shards.write();
485        shards
486            .remove(&shard_id)
487            .ok_or_else(|| RaftError::ConfigError {
488                message: format!("Shard {} not found", shard_id),
489            })?;
490        Ok(())
491    }
492
493    /// Get all shards
494    pub fn get_all(&self) -> Vec<ShardMetadata> {
495        self.shards.read().values().cloned().collect()
496    }
497
498    /// Get shards on a specific node
499    pub fn get_by_node(&self, node_id: NodeId) -> Vec<ShardMetadata> {
500        self.shards
501            .read()
502            .values()
503            .filter(|shard| shard.node_id == node_id)
504            .cloned()
505            .collect()
506    }
507
508    /// Find shard responsible for a key
509    pub fn find_shard_for_key(&self, key: &Key) -> Option<ShardMetadata> {
510        self.shards
511            .read()
512            .values()
513            .find(|shard| shard.range.contains(key))
514            .cloned()
515    }
516
517    /// Get total number of shards
518    pub fn count(&self) -> usize {
519        self.shards.read().len()
520    }
521
522    /// Atomic split: parent Active→removed, two children inserted as Active.
523    pub fn execute_split(&self, split: &ShardSplit) -> RaftResult<()> {
524        let mut shards = self.shards.write();
525        let parent = shards
526            .get(&split.source_shard_id)
527            .ok_or_else(|| RaftError::Other {
528                message: format!("execute_split: shard {} not found", split.source_shard_id),
529            })?
530            .clone();
531        if parent.state != ShardState::Active {
532            return Err(RaftError::Other {
533                message: format!(
534                    "execute_split: shard {} is not Active (state={})",
535                    split.source_shard_id,
536                    parent.state.as_str()
537                ),
538            });
539        }
540        let (left, right) = split.create_shards(&parent)?;
541        shards.insert(left.id, left);
542        shards.insert(right.id, right);
543        shards.remove(&split.source_shard_id);
544        Ok(())
545    }
546
547    /// Atomic merge: both sources validated as Active, merged shard inserted, sources removed.
548    pub fn execute_merge(&self, merge: &ShardMerge) -> RaftResult<()> {
549        let mut shards = self.shards.write();
550        let left = shards
551            .get(&merge.left_shard_id)
552            .ok_or_else(|| RaftError::Other {
553                message: format!(
554                    "execute_merge: left shard {} not found",
555                    merge.left_shard_id
556                ),
557            })?
558            .clone();
559        let right = shards
560            .get(&merge.right_shard_id)
561            .ok_or_else(|| RaftError::Other {
562                message: format!(
563                    "execute_merge: right shard {} not found",
564                    merge.right_shard_id
565                ),
566            })?
567            .clone();
568        if left.state != ShardState::Active {
569            return Err(RaftError::Other {
570                message: format!(
571                    "execute_merge: left shard {} is not Active (state={})",
572                    merge.left_shard_id,
573                    left.state.as_str()
574                ),
575            });
576        }
577        if right.state != ShardState::Active {
578            return Err(RaftError::Other {
579                message: format!(
580                    "execute_merge: right shard {} is not Active (state={})",
581                    merge.right_shard_id,
582                    right.state.as_str()
583                ),
584            });
585        }
586        // create_merged_shard validates adjacency using Active metadata.
587        let merged = merge.create_merged_shard(&left, &right)?;
588        shards.remove(&merge.left_shard_id);
589        shards.remove(&merge.right_shard_id);
590        shards.insert(merged.id, merged);
591        Ok(())
592    }
593
594    /// Atomic transfer: source shard set to Transferring state; a new shard record
595    /// with a freshly allocated ID is inserted for the target node as Active.
596    ///
597    /// Lock ordering: `next_shard_id` Mutex is acquired first (briefly), then
598    /// `shards` RwLock — consistent order prevents deadlock.
599    pub fn execute_transfer(&self, transfer: &ShardTransfer) -> RaftResult<()> {
600        // Allocate the new shard ID before acquiring the shards write-lock
601        // so we never hold both locks simultaneously.
602        let new_target_id = self.allocate_shard_id();
603
604        let mut shards = self.shards.write();
605        let source = shards
606            .get(&transfer.shard_id)
607            .ok_or_else(|| RaftError::Other {
608                message: format!("execute_transfer: shard {} not found", transfer.shard_id),
609            })?
610            .clone();
611        if source.state != ShardState::Active {
612            return Err(RaftError::Other {
613                message: format!(
614                    "execute_transfer: shard {} is not Active (state={})",
615                    transfer.shard_id,
616                    source.state.as_str()
617                ),
618            });
619        }
620        // Build target metadata on the destination node (same range, new ID, Active state).
621        let target_shard =
622            ShardMetadata::new(new_target_id, source.range.clone(), transfer.to_node);
623        // Transition source to Transferring.
624        let mut source_transferring = source;
625        source_transferring.set_state(ShardState::Transferring);
626        // Apply both changes atomically under the single write-lock.
627        shards.insert(source_transferring.id, source_transferring);
628        shards.insert(new_target_id, target_shard);
629        Ok(())
630    }
631}
632
633impl Default for ShardRegistry {
634    fn default() -> Self {
635        Self::new()
636    }
637}
638
639#[cfg(test)]
640mod prop_tests {
641    use super::*;
642    use proptest::prelude::*;
643
644    /// Strategy: lowercase ASCII string, length [min, max].
645    fn arb_key_str(min: usize, max: usize) -> impl Strategy<Value = String> {
646        prop::collection::vec(b'a'..=b'z', min..=max)
647            .prop_map(|v| String::from_utf8(v).expect("valid utf-8"))
648    }
649
650    proptest! {
651        #[test]
652        fn prop_key_range_contains_consistent(
653            start in arb_key_str(1, 8),
654            mid in arb_key_str(1, 8),
655            end in arb_key_str(1, 8),
656        ) {
657            // Only test when start < end (valid range).
658            prop_assume!(start < end);
659            let range = match KeyRange::new(Key::from_str(&start), Key::from_str(&end)) {
660                Ok(r) => r,
661                Err(_) => return Ok(()),
662            };
663            let mid_key = Key::from_str(&mid);
664            // [start, end) — inclusive start, exclusive end.
665            let expected = mid >= start && mid < end;
666            prop_assert_eq!(
667                range.contains(&mid_key),
668                expected,
669                "contains({:?}) in [{:?}, {:?}) should be {}",
670                mid,
671                start,
672                end,
673                expected
674            );
675        }
676
677        #[test]
678        fn prop_key_range_midpoint_is_between_bounds(
679            start in arb_key_str(1, 5),
680            end in arb_key_str(6, 12),
681        ) {
682            // Property: midpoint lies strictly inside [start, end).
683            // Use disjoint length ranges so start < end almost always holds.
684            prop_assume!(start < end);
685            let range = match KeyRange::new(Key::from_str(&start), Key::from_str(&end)) {
686                Ok(r) => r,
687                Err(_) => return Ok(()),
688            };
689            let mid = range.midpoint();
690            // Midpoint must satisfy start <= mid < end.
691            prop_assert!(
692                mid >= range.start,
693                "midpoint {:?} must be >= start {:?}",
694                mid,
695                range.start
696            );
697            prop_assert!(
698                mid < range.end,
699                "midpoint {:?} must be < end {:?}",
700                mid,
701                range.end
702            );
703        }
704
705        #[test]
706        fn prop_key_range_split_no_overlap(
707            start in arb_key_str(1, 4),
708            end in arb_key_str(8, 12),
709        ) {
710            // Property: after split at midpoint, no key is in both halves.
711            prop_assume!(start < end);
712            let range = match KeyRange::new(Key::from_str(&start), Key::from_str(&end)) {
713                Ok(r) => r,
714                Err(_) => return Ok(()),
715            };
716            let mid = range.midpoint();
717            let (left, right) = match (
718                KeyRange::new(range.start.clone(), mid.clone()),
719                KeyRange::new(mid.clone(), range.end.clone()),
720            ) {
721                (Ok(l), Ok(r)) => (l, r),
722                _ => return Ok(()), // degenerate midpoint — skip
723            };
724            // Spot-check: start key is in left, never in right.
725            let test_key = Key::from_str(&start);
726            let in_left = left.contains(&test_key);
727            let in_right = right.contains(&test_key);
728            prop_assert!(
729                !(in_left && in_right),
730                "start key {:?} must not be in both halves after split",
731                test_key
732            );
733        }
734
735        #[test]
736        fn prop_shard_registry_count_matches_unique_registrations(
737            raw_ids in prop::collection::vec(1u64..=20u64, 1..=8)
738        ) {
739            // Property: registry count == number of unique shards successfully registered.
740            let registry = ShardRegistry::new();
741            // Collect non-overlapping ranges by building shards with single-byte keys.
742            // We use each unique id to claim a single 1-byte slice of the keyspace.
743            let mut distinct_ids: Vec<u64> = raw_ids.clone();
744            distinct_ids.sort_unstable();
745            distinct_ids.dedup();
746
747            // Each shard gets a unique 1-byte range: [i, i+1)
748            let mut registered = 0usize;
749            for (slot, _id) in distinct_ids.iter().enumerate() {
750                let start_byte = slot as u8;
751                // Stop before overflow
752                if start_byte == u8::MAX {
753                    break;
754                }
755                let range = match KeyRange::new(
756                    Key::from_slice(&[start_byte]),
757                    Key::from_slice(&[start_byte + 1]),
758                ) {
759                    Ok(r) => r,
760                    Err(_) => continue,
761                };
762                let shard_id = registry.allocate_shard_id();
763                let shard = ShardMetadata::new(shard_id, range, 1);
764                if registry.register(shard).is_ok() {
765                    registered += 1;
766                }
767            }
768
769            let count = registry.count();
770            prop_assert_eq!(
771                count,
772                registered,
773                "registry.count() must equal number of successfully registered shards"
774            );
775        }
776
777        #[test]
778        fn prop_shard_registry_find_key_correctness(
779            start_byte in 0u8..100u8,
780            end_byte in 101u8..=200u8,
781            query_byte in 0u8..=255u8,
782        ) {
783            // Property: find_shard_for_key returns Some iff query is in [start, end).
784            let registry = ShardRegistry::new();
785            let range = match KeyRange::new(
786                Key::from_slice(&[start_byte]),
787                Key::from_slice(&[end_byte]),
788            ) {
789                Ok(r) => r,
790                Err(_) => return Ok(()),
791            };
792            let shard_id = registry.allocate_shard_id();
793            let shard = ShardMetadata::new(shard_id, range, 1);
794            registry.register(shard).expect("register shard");
795
796            let query = Key::from_slice(&[query_byte]);
797            let found = registry.find_shard_for_key(&query);
798            let in_range = query_byte >= start_byte && query_byte < end_byte;
799            prop_assert_eq!(
800                found.is_some(),
801                in_range,
802                "find_shard_for_key({}) in [{}, {}) should be {}",
803                query_byte,
804                start_byte,
805                end_byte,
806                in_range
807            );
808        }
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    #[test]
817    fn test_shard_state() {
818        assert!(ShardState::Active.can_read());
819        assert!(ShardState::Active.can_write());
820        assert!(ShardState::Splitting.can_read());
821        assert!(!ShardState::Splitting.can_write());
822        assert!(!ShardState::Offline.can_read());
823        assert!(!ShardState::Offline.can_write());
824    }
825
826    #[test]
827    fn test_key_range_contains() -> RaftResult<()> {
828        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
829
830        assert!(range.contains(&Key::from_str("m")));
831        assert!(range.contains(&Key::from_str("a")));
832        assert!(!range.contains(&Key::from_str("z")));
833        assert!(range.contains(&Key::from_str("aa"))); // "aa" > "a" and "aa" < "z" lexicographically
834        assert!(!range.contains(&Key::from_str("{"))); // "{" has ASCII 123 > 'z' (122), outside range
835
836        Ok(())
837    }
838
839    #[test]
840    fn test_key_range_overlaps() -> RaftResult<()> {
841        let range1 = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
842        let range2 = KeyRange::new(Key::from_str("g"), Key::from_str("z"))?;
843        let range3 = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
844
845        assert!(range1.overlaps(&range2));
846        assert!(range2.overlaps(&range1));
847        assert!(!range1.overlaps(&range3));
848
849        Ok(())
850    }
851
852    #[test]
853    fn test_key_range_midpoint() -> RaftResult<()> {
854        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
855
856        let mid = range.midpoint();
857        assert!(mid > range.start);
858        assert!(mid < range.end);
859
860        Ok(())
861    }
862
863    #[test]
864    fn test_shard_metadata_creation() {
865        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z")).expect("valid range");
866        let shard = ShardMetadata::new(1, range, 100);
867
868        assert_eq!(shard.id, 1);
869        assert_eq!(shard.node_id, 100);
870        assert_eq!(shard.state, ShardState::Active);
871        assert_eq!(shard.version, 1);
872    }
873
874    #[test]
875    fn test_shard_metadata_update_stats() {
876        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z")).expect("valid range");
877        let mut shard = ShardMetadata::new(1, range, 100);
878
879        let initial_version = shard.version;
880        shard.update_stats(1000, 50000);
881
882        assert_eq!(shard.estimated_keys, 1000);
883        assert_eq!(shard.estimated_size_bytes, 50000);
884        assert_eq!(shard.version, initial_version + 1);
885    }
886
887    #[test]
888    fn test_shard_metadata_replicas() -> RaftResult<()> {
889        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
890        let mut shard = ShardMetadata::new(1, range, 100);
891
892        shard.add_replica(101)?;
893        shard.add_replica(102)?;
894        assert_eq!(shard.replicas.len(), 2);
895
896        assert!(shard.add_replica(101).is_err());
897
898        shard.remove_replica(101)?;
899        assert_eq!(shard.replicas.len(), 1);
900        assert!(shard.replicas.contains(&102));
901
902        Ok(())
903    }
904
905    #[test]
906    fn test_shard_split() -> RaftResult<()> {
907        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
908        let mut source = ShardMetadata::new(1, range, 100);
909        source.update_stats(1000, 100000);
910
911        let split = ShardSplit::new(1, 2, 3, Key::from_str("m"));
912        let (left, right) = split.create_shards(&source)?;
913
914        assert_eq!(left.id, 2);
915        assert_eq!(right.id, 3);
916        assert_eq!(left.range.end, Key::from_str("m"));
917        assert_eq!(right.range.start, Key::from_str("m"));
918        assert_eq!(left.estimated_keys, 500);
919        assert_eq!(right.estimated_keys, 500);
920
921        Ok(())
922    }
923
924    #[test]
925    fn test_shard_merge() -> RaftResult<()> {
926        let left_range = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
927        let right_range = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
928
929        let mut left = ShardMetadata::new(1, left_range, 100);
930        let mut right = ShardMetadata::new(2, right_range, 100);
931
932        left.update_stats(500, 50000);
933        right.update_stats(500, 50000);
934
935        let merge = ShardMerge::new(1, 2, 3);
936        let merged = merge.create_merged_shard(&left, &right)?;
937
938        assert_eq!(merged.id, 3);
939        assert_eq!(merged.range.start, Key::from_str("a"));
940        assert_eq!(merged.range.end, Key::from_str("z"));
941        assert_eq!(merged.estimated_keys, 1000);
942        assert_eq!(merged.estimated_size_bytes, 100000);
943
944        Ok(())
945    }
946
947    #[test]
948    fn test_shard_transfer() {
949        let mut transfer = ShardTransfer::new(1, 100, 101);
950        assert_eq!(transfer.progress, 0.0);
951        assert!(!transfer.is_complete());
952
953        transfer.update_progress(0.5);
954        assert_eq!(transfer.progress, 0.5);
955        assert!(!transfer.is_complete());
956
957        transfer.update_progress(1.0);
958        assert!(transfer.is_complete());
959    }
960
961    #[test]
962    fn test_shard_registry() -> RaftResult<()> {
963        let registry = ShardRegistry::new();
964
965        let id1 = registry.allocate_shard_id();
966        let id2 = registry.allocate_shard_id();
967        assert_ne!(id1, id2);
968
969        let range1 = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
970        let shard1 = ShardMetadata::new(id1, range1, 100);
971        registry.register(shard1.clone())?;
972
973        let retrieved = registry.get(id1);
974        assert!(retrieved.is_some());
975        assert_eq!(
976            retrieved
977                .expect("Shard should be retrieved from registry")
978                .id,
979            id1
980        );
981
982        let found = registry.find_shard_for_key(&Key::from_str("g"));
983        assert!(found.is_some());
984        assert_eq!(found.expect("Shard should be found for key").id, id1);
985
986        assert_eq!(registry.count(), 1);
987
988        Ok(())
989    }
990
991    #[test]
992    fn test_shard_registry_overlapping_ranges() -> RaftResult<()> {
993        let registry = ShardRegistry::new();
994
995        let range1 = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
996        let shard1 = ShardMetadata::new(1, range1, 100);
997        registry.register(shard1)?;
998
999        let range2 = KeyRange::new(Key::from_str("g"), Key::from_str("z"))?;
1000        let shard2 = ShardMetadata::new(2, range2, 100);
1001        let result = registry.register(shard2);
1002
1003        assert!(result.is_err());
1004
1005        Ok(())
1006    }
1007
1008    #[test]
1009    fn test_hot_cold_shards() {
1010        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z")).expect("valid range");
1011        let mut shard = ShardMetadata::new(1, range, 100);
1012
1013        shard.update_stats(1000, 50000);
1014        assert!(shard.is_hot(500, 25000));
1015        assert!(!shard.is_cold(500, 25000));
1016
1017        shard.update_stats(100, 5000);
1018        assert!(!shard.is_hot(500, 25000));
1019        assert!(shard.is_cold(500, 25000));
1020    }
1021
1022    #[test]
1023    fn test_execute_split() -> RaftResult<()> {
1024        let registry = ShardRegistry::new();
1025
1026        // Register source shard covering "a".."z"
1027        let src_id = registry.allocate_shard_id();
1028        let left_id = registry.allocate_shard_id();
1029        let right_id = registry.allocate_shard_id();
1030
1031        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1032        let mut source = ShardMetadata::new(src_id, range, 1);
1033        source.update_stats(1000, 100_000);
1034        registry.register(source)?;
1035
1036        let split = ShardSplit::new(src_id, left_id, right_id, Key::from_str("m"));
1037        registry.execute_split(&split)?;
1038
1039        // Source shard must be gone.
1040        assert!(
1041            registry.get(src_id).is_none(),
1042            "source shard must be removed after split"
1043        );
1044
1045        // Both children must exist and be Active.
1046        let left = registry.get(left_id).expect("left child shard must exist");
1047        let right = registry
1048            .get(right_id)
1049            .expect("right child shard must exist");
1050        assert_eq!(left.state, ShardState::Active);
1051        assert_eq!(right.state, ShardState::Active);
1052        assert_eq!(left.range.start, Key::from_str("a"));
1053        assert_eq!(left.range.end, Key::from_str("m"));
1054        assert_eq!(right.range.start, Key::from_str("m"));
1055        assert_eq!(right.range.end, Key::from_str("z"));
1056        assert_eq!(registry.count(), 2);
1057
1058        Ok(())
1059    }
1060
1061    #[test]
1062    fn test_execute_split_non_active_fails() -> RaftResult<()> {
1063        let registry = ShardRegistry::new();
1064
1065        let src_id = registry.allocate_shard_id();
1066        let left_id = registry.allocate_shard_id();
1067        let right_id = registry.allocate_shard_id();
1068
1069        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1070        let mut source = ShardMetadata::new(src_id, range, 1);
1071        source.set_state(ShardState::Offline);
1072        registry.register(source)?;
1073
1074        let split = ShardSplit::new(src_id, left_id, right_id, Key::from_str("m"));
1075        let result = registry.execute_split(&split);
1076        assert!(result.is_err(), "split of non-Active shard must fail");
1077
1078        Ok(())
1079    }
1080
1081    #[test]
1082    fn test_execute_merge() -> RaftResult<()> {
1083        let registry = ShardRegistry::new();
1084
1085        let left_id = registry.allocate_shard_id();
1086        let right_id = registry.allocate_shard_id();
1087        let merged_id = registry.allocate_shard_id();
1088
1089        let left_range = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
1090        let right_range = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
1091
1092        let mut left = ShardMetadata::new(left_id, left_range, 1);
1093        left.update_stats(500, 50_000);
1094        let mut right = ShardMetadata::new(right_id, right_range, 1);
1095        right.update_stats(500, 50_000);
1096
1097        registry.register(left)?;
1098        registry.register(right)?;
1099
1100        let merge = ShardMerge::new(left_id, right_id, merged_id);
1101        registry.execute_merge(&merge)?;
1102
1103        // Both sources must be removed.
1104        assert!(
1105            registry.get(left_id).is_none(),
1106            "left source must be removed after merge"
1107        );
1108        assert!(
1109            registry.get(right_id).is_none(),
1110            "right source must be removed after merge"
1111        );
1112
1113        // Merged shard must exist as Active with combined range and stats.
1114        let merged = registry.get(merged_id).expect("merged shard must exist");
1115        assert_eq!(merged.state, ShardState::Active);
1116        assert_eq!(merged.range.start, Key::from_str("a"));
1117        assert_eq!(merged.range.end, Key::from_str("z"));
1118        assert_eq!(merged.estimated_keys, 1000);
1119        assert_eq!(merged.estimated_size_bytes, 100_000);
1120        assert_eq!(registry.count(), 1);
1121
1122        Ok(())
1123    }
1124
1125    #[test]
1126    fn test_execute_merge_non_active_fails() -> RaftResult<()> {
1127        let registry = ShardRegistry::new();
1128
1129        let left_id = registry.allocate_shard_id();
1130        let right_id = registry.allocate_shard_id();
1131        let merged_id = registry.allocate_shard_id();
1132
1133        let left_range = KeyRange::new(Key::from_str("a"), Key::from_str("m"))?;
1134        let right_range = KeyRange::new(Key::from_str("m"), Key::from_str("z"))?;
1135
1136        let left = ShardMetadata::new(left_id, left_range, 1);
1137        let mut right = ShardMetadata::new(right_id, right_range, 1);
1138        right.set_state(ShardState::Merging);
1139
1140        registry.register(left)?;
1141        registry.register(right)?;
1142
1143        let merge = ShardMerge::new(left_id, right_id, merged_id);
1144        let result = registry.execute_merge(&merge);
1145        assert!(result.is_err(), "merge with non-Active shard must fail");
1146
1147        Ok(())
1148    }
1149
1150    #[test]
1151    fn test_execute_transfer() -> RaftResult<()> {
1152        let registry = ShardRegistry::new();
1153
1154        let src_id = registry.allocate_shard_id();
1155        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1156        let source = ShardMetadata::new(src_id, range, 1);
1157        registry.register(source)?;
1158
1159        let transfer = ShardTransfer::new(src_id, 1, 2);
1160        registry.execute_transfer(&transfer)?;
1161
1162        // Source shard must now be in Transferring state.
1163        let updated_source = registry.get(src_id).expect("source shard must still exist");
1164        assert_eq!(
1165            updated_source.state,
1166            ShardState::Transferring,
1167            "source shard must be Transferring after transfer initiation"
1168        );
1169        assert_eq!(
1170            updated_source.node_id, 1,
1171            "source node_id must be unchanged"
1172        );
1173
1174        // A new shard record for the target node must have been inserted.
1175        let all_shards = registry.get_all();
1176        assert_eq!(
1177            all_shards.len(),
1178            2,
1179            "registry must have exactly two shards (source + target)"
1180        );
1181
1182        let target_shard = all_shards
1183            .iter()
1184            .find(|s| s.id != src_id)
1185            .expect("target shard must exist");
1186        assert_eq!(target_shard.state, ShardState::Active);
1187        assert_eq!(target_shard.node_id, 2);
1188        assert_eq!(target_shard.range.start, Key::from_str("a"));
1189        assert_eq!(target_shard.range.end, Key::from_str("z"));
1190
1191        Ok(())
1192    }
1193
1194    #[test]
1195    fn test_execute_transfer_non_active_fails() -> RaftResult<()> {
1196        let registry = ShardRegistry::new();
1197
1198        let src_id = registry.allocate_shard_id();
1199        let range = KeyRange::new(Key::from_str("a"), Key::from_str("z"))?;
1200        let mut source = ShardMetadata::new(src_id, range, 1);
1201        source.set_state(ShardState::Transferring);
1202        registry.register(source)?;
1203
1204        let transfer = ShardTransfer::new(src_id, 1, 2);
1205        let result = registry.execute_transfer(&transfer);
1206        assert!(result.is_err(), "transfer of non-Active shard must fail");
1207
1208        Ok(())
1209    }
1210}