Skip to main content

aegis_replication/
shard.rs

1//! Aegis Shard Management
2//!
3//! Shard lifecycle and state management.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::node::NodeId;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14// =============================================================================
15// Shard ID
16// =============================================================================
17
18/// Unique identifier for a shard.
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct ShardId(pub u32);
21
22impl ShardId {
23    pub fn new(id: u32) -> Self {
24        Self(id)
25    }
26
27    pub fn as_u32(&self) -> u32 {
28        self.0
29    }
30}
31
32impl std::fmt::Display for ShardId {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "shard_{}", self.0)
35    }
36}
37
38// =============================================================================
39// Shard State
40// =============================================================================
41
42/// State of a shard.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
44pub enum ShardState {
45    /// Shard is being created.
46    #[default]
47    Creating,
48    /// Shard is active and serving requests.
49    Active,
50    /// Shard is being migrated to another node.
51    Migrating,
52    /// Shard is being split into smaller shards.
53    Splitting,
54    /// Shard is being merged with another shard.
55    Merging,
56    /// Shard is inactive (not serving requests).
57    Inactive,
58    /// Shard is being deleted.
59    Deleting,
60}
61
62// =============================================================================
63// Shard
64// =============================================================================
65
66/// A data shard in the distributed system.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Shard {
69    pub id: ShardId,
70    pub state: ShardState,
71    pub primary_node: NodeId,
72    pub replica_nodes: Vec<NodeId>,
73    pub key_range_start: Option<u64>,
74    pub key_range_end: Option<u64>,
75    pub created_at: u64,
76    pub updated_at: u64,
77    pub size_bytes: u64,
78    pub row_count: u64,
79    pub metadata: HashMap<String, String>,
80}
81
82impl Shard {
83    /// Create a new shard.
84    pub fn new(id: ShardId, primary_node: NodeId) -> Self {
85        let now = current_timestamp();
86        Self {
87            id,
88            state: ShardState::Creating,
89            primary_node,
90            replica_nodes: Vec::new(),
91            key_range_start: None,
92            key_range_end: None,
93            created_at: now,
94            updated_at: now,
95            size_bytes: 0,
96            row_count: 0,
97            metadata: HashMap::new(),
98        }
99    }
100
101    /// Create a shard with a key range.
102    pub fn with_range(id: ShardId, primary_node: NodeId, start: u64, end: u64) -> Self {
103        let mut shard = Self::new(id, primary_node);
104        shard.key_range_start = Some(start);
105        shard.key_range_end = Some(end);
106        shard
107    }
108
109    /// Add a replica node.
110    pub fn add_replica(&mut self, node: NodeId) {
111        if !self.replica_nodes.contains(&node) && node != self.primary_node {
112            self.replica_nodes.push(node);
113            self.updated_at = current_timestamp();
114        }
115    }
116
117    /// Remove a replica node.
118    pub fn remove_replica(&mut self, node: &NodeId) {
119        self.replica_nodes.retain(|n| n != node);
120        self.updated_at = current_timestamp();
121    }
122
123    /// Set the primary node.
124    pub fn set_primary(&mut self, node: NodeId) {
125        self.primary_node = node;
126        self.updated_at = current_timestamp();
127    }
128
129    /// Set the shard state.
130    pub fn set_state(&mut self, state: ShardState) {
131        self.state = state;
132        self.updated_at = current_timestamp();
133    }
134
135    /// Check if a key is in this shard's range.
136    pub fn contains_key(&self, key_hash: u64) -> bool {
137        match (self.key_range_start, self.key_range_end) {
138            (Some(start), Some(end)) => key_hash >= start && key_hash < end,
139            _ => true, // No range defined, accepts all keys
140        }
141    }
142
143    /// Check if the shard is active.
144    pub fn is_active(&self) -> bool {
145        self.state == ShardState::Active
146    }
147
148    /// Check if the shard is available for reads.
149    pub fn is_readable(&self) -> bool {
150        matches!(
151            self.state,
152            ShardState::Active | ShardState::Migrating | ShardState::Splitting
153        )
154    }
155
156    /// Check if the shard is available for writes.
157    pub fn is_writable(&self) -> bool {
158        self.state == ShardState::Active
159    }
160
161    /// Get all nodes (primary + replicas).
162    pub fn all_nodes(&self) -> Vec<&NodeId> {
163        let mut nodes = vec![&self.primary_node];
164        nodes.extend(self.replica_nodes.iter());
165        nodes
166    }
167
168    /// Get the replication factor (1 + replica count).
169    pub fn replication_factor(&self) -> usize {
170        1 + self.replica_nodes.len()
171    }
172
173    /// Update size statistics.
174    pub fn update_stats(&mut self, size_bytes: u64, row_count: u64) {
175        self.size_bytes = size_bytes;
176        self.row_count = row_count;
177        self.updated_at = current_timestamp();
178    }
179}
180
181// =============================================================================
182// Shard Manager
183// =============================================================================
184
185/// Manages shards across the cluster.
186pub struct ShardManager {
187    shards: RwLock<HashMap<ShardId, Shard>>,
188    node_shards: RwLock<HashMap<NodeId, Vec<ShardId>>>,
189    num_shards: u32,
190    replication_factor: usize,
191}
192
193impl ShardManager {
194    /// Create a new shard manager.
195    pub fn new(num_shards: u32, replication_factor: usize) -> Self {
196        Self {
197            shards: RwLock::new(HashMap::new()),
198            node_shards: RwLock::new(HashMap::new()),
199            num_shards,
200            replication_factor,
201        }
202    }
203
204    /// Get the number of shards.
205    pub fn num_shards(&self) -> u32 {
206        self.num_shards
207    }
208
209    /// Get the replication factor.
210    pub fn replication_factor(&self) -> usize {
211        self.replication_factor
212    }
213
214    /// Create a shard.
215    pub fn create_shard(&self, id: ShardId, primary_node: NodeId) -> Shard {
216        let shard = Shard::new(id.clone(), primary_node.clone());
217
218        let mut shards = self
219            .shards
220            .write()
221            .expect("shard manager shards lock poisoned");
222        shards.insert(id.clone(), shard.clone());
223
224        let mut node_shards = self
225            .node_shards
226            .write()
227            .expect("shard manager node_shards lock poisoned");
228        node_shards.entry(primary_node).or_default().push(id);
229
230        shard
231    }
232
233    /// Create a shard with a key range.
234    pub fn create_shard_with_range(
235        &self,
236        id: ShardId,
237        primary_node: NodeId,
238        start: u64,
239        end: u64,
240    ) -> Shard {
241        let shard = Shard::with_range(id.clone(), primary_node.clone(), start, end);
242
243        let mut shards = self
244            .shards
245            .write()
246            .expect("shard manager shards lock poisoned");
247        shards.insert(id.clone(), shard.clone());
248
249        let mut node_shards = self
250            .node_shards
251            .write()
252            .expect("shard manager node_shards lock poisoned");
253        node_shards.entry(primary_node).or_default().push(id);
254
255        shard
256    }
257
258    /// Get a shard by ID.
259    pub fn get_shard(&self, id: &ShardId) -> Option<Shard> {
260        self.shards
261            .read()
262            .expect("shard manager shards lock poisoned")
263            .get(id)
264            .cloned()
265    }
266
267    /// Get all shards.
268    pub fn get_all_shards(&self) -> Vec<Shard> {
269        self.shards
270            .read()
271            .expect("shard manager shards lock poisoned")
272            .values()
273            .cloned()
274            .collect()
275    }
276
277    /// Get shards for a node.
278    pub fn get_node_shards(&self, node_id: &NodeId) -> Vec<ShardId> {
279        self.node_shards
280            .read()
281            .expect("shard manager node_shards lock poisoned")
282            .get(node_id)
283            .cloned()
284            .unwrap_or_default()
285    }
286
287    /// Update shard state.
288    pub fn update_shard_state(&self, id: &ShardId, state: ShardState) -> bool {
289        let mut shards = self
290            .shards
291            .write()
292            .expect("shard manager shards lock poisoned");
293        if let Some(shard) = shards.get_mut(id) {
294            shard.set_state(state);
295            true
296        } else {
297            false
298        }
299    }
300
301    /// Add a replica to a shard.
302    pub fn add_replica(&self, shard_id: &ShardId, node_id: NodeId) -> bool {
303        let mut shards = self
304            .shards
305            .write()
306            .expect("shard manager shards lock poisoned");
307        if let Some(shard) = shards.get_mut(shard_id) {
308            shard.add_replica(node_id.clone());
309
310            let mut node_shards = self
311                .node_shards
312                .write()
313                .expect("shard manager node_shards lock poisoned");
314            node_shards
315                .entry(node_id)
316                .or_default()
317                .push(shard_id.clone());
318
319            true
320        } else {
321            false
322        }
323    }
324
325    /// Remove a shard.
326    pub fn remove_shard(&self, id: &ShardId) -> Option<Shard> {
327        let mut shards = self
328            .shards
329            .write()
330            .expect("shard manager shards lock poisoned");
331        let shard = shards.remove(id)?;
332
333        let mut node_shards = self
334            .node_shards
335            .write()
336            .expect("shard manager node_shards lock poisoned");
337
338        // Remove from primary
339        if let Some(shards) = node_shards.get_mut(&shard.primary_node) {
340            shards.retain(|s| s != id);
341        }
342
343        // Remove from replicas
344        for replica in &shard.replica_nodes {
345            if let Some(shards) = node_shards.get_mut(replica) {
346                shards.retain(|s| s != id);
347            }
348        }
349
350        Some(shard)
351    }
352
353    /// Get the shard for a key hash.
354    pub fn get_shard_for_key(&self, key_hash: u64) -> Option<Shard> {
355        let shards = self
356            .shards
357            .read()
358            .expect("shard manager shards lock poisoned");
359
360        // Simple modulo-based shard selection
361        let shard_id = ShardId::new((key_hash % self.num_shards as u64) as u32);
362        shards.get(&shard_id).cloned()
363    }
364
365    /// Get active shards.
366    pub fn get_active_shards(&self) -> Vec<Shard> {
367        self.shards
368            .read()
369            .expect("shard manager shards lock poisoned")
370            .values()
371            .filter(|s| s.is_active())
372            .cloned()
373            .collect()
374    }
375
376    /// Get shard count.
377    pub fn shard_count(&self) -> usize {
378        self.shards
379            .read()
380            .expect("shard manager shards lock poisoned")
381            .len()
382    }
383
384    /// Initialize shards for a set of nodes.
385    pub fn initialize_shards(&self, nodes: &[NodeId]) {
386        if nodes.is_empty() {
387            return;
388        }
389
390        let range_size = u64::MAX / self.num_shards as u64;
391
392        for i in 0..self.num_shards {
393            let shard_id = ShardId::new(i);
394            let primary_idx = i as usize % nodes.len();
395            let primary_node = nodes[primary_idx].clone();
396
397            let start = i as u64 * range_size;
398            let end = if i == self.num_shards - 1 {
399                u64::MAX
400            } else {
401                (i as u64 + 1) * range_size
402            };
403
404            let _shard = self.create_shard_with_range(shard_id.clone(), primary_node, start, end);
405
406            // Add replicas
407            for r in 1..self.replication_factor.min(nodes.len()) {
408                let replica_idx = (primary_idx + r) % nodes.len();
409                self.add_replica(&shard_id, nodes[replica_idx].clone());
410            }
411
412            // Activate the shard
413            self.update_shard_state(&shard_id, ShardState::Active);
414        }
415    }
416
417    /// Get shard statistics.
418    pub fn stats(&self) -> ShardManagerStats {
419        let shards = self
420            .shards
421            .read()
422            .expect("shard manager shards lock poisoned");
423
424        let active = shards.values().filter(|s| s.is_active()).count();
425        let migrating = shards
426            .values()
427            .filter(|s| s.state == ShardState::Migrating)
428            .count();
429        let total_size: u64 = shards.values().map(|s| s.size_bytes).sum();
430        let total_rows: u64 = shards.values().map(|s| s.row_count).sum();
431
432        ShardManagerStats {
433            total_shards: shards.len(),
434            active_shards: active,
435            migrating_shards: migrating,
436            total_size_bytes: total_size,
437            total_row_count: total_rows,
438        }
439    }
440}
441
442// =============================================================================
443// Shard Manager Statistics
444// =============================================================================
445
446/// Statistics for the shard manager.
447#[derive(Debug, Clone)]
448pub struct ShardManagerStats {
449    pub total_shards: usize,
450    pub active_shards: usize,
451    pub migrating_shards: usize,
452    pub total_size_bytes: u64,
453    pub total_row_count: u64,
454}
455
456fn current_timestamp() -> u64 {
457    SystemTime::now()
458        .duration_since(UNIX_EPOCH)
459        .map(|d| d.as_millis() as u64)
460        .unwrap_or(0)
461}
462
463// =============================================================================
464// Tests
465// =============================================================================
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_shard_id() {
473        let id = ShardId::new(5);
474        assert_eq!(id.as_u32(), 5);
475        assert_eq!(id.to_string(), "shard_5");
476    }
477
478    #[test]
479    fn test_shard_creation() {
480        let shard = Shard::new(ShardId::new(0), NodeId::new("node1"));
481
482        assert_eq!(shard.id.as_u32(), 0);
483        assert_eq!(shard.primary_node.as_str(), "node1");
484        assert_eq!(shard.state, ShardState::Creating);
485        assert!(shard.replica_nodes.is_empty());
486    }
487
488    #[test]
489    fn test_shard_with_range() {
490        let shard = Shard::with_range(ShardId::new(0), NodeId::new("node1"), 0, 1000);
491
492        assert!(shard.contains_key(500));
493        assert!(!shard.contains_key(1000));
494    }
495
496    #[test]
497    fn test_shard_replicas() {
498        let mut shard = Shard::new(ShardId::new(0), NodeId::new("node1"));
499
500        shard.add_replica(NodeId::new("node2"));
501        shard.add_replica(NodeId::new("node3"));
502
503        assert_eq!(shard.replica_nodes.len(), 2);
504        assert_eq!(shard.replication_factor(), 3);
505        assert_eq!(shard.all_nodes().len(), 3);
506    }
507
508    #[test]
509    fn test_shard_state() {
510        let mut shard = Shard::new(ShardId::new(0), NodeId::new("node1"));
511
512        assert!(!shard.is_active());
513
514        shard.set_state(ShardState::Active);
515        assert!(shard.is_active());
516        assert!(shard.is_readable());
517        assert!(shard.is_writable());
518
519        shard.set_state(ShardState::Migrating);
520        assert!(shard.is_readable());
521        assert!(!shard.is_writable());
522    }
523
524    #[test]
525    fn test_shard_manager_creation() {
526        let manager = ShardManager::new(16, 3);
527
528        assert_eq!(manager.num_shards(), 16);
529        assert_eq!(manager.replication_factor(), 3);
530    }
531
532    #[test]
533    fn test_shard_manager_create_shard() {
534        let manager = ShardManager::new(16, 3);
535
536        let _shard = manager.create_shard(ShardId::new(0), NodeId::new("node1"));
537
538        assert_eq!(manager.shard_count(), 1);
539
540        let retrieved = manager.get_shard(&ShardId::new(0)).unwrap();
541        assert_eq!(retrieved.primary_node.as_str(), "node1");
542    }
543
544    #[test]
545    fn test_shard_manager_initialize() {
546        let manager = ShardManager::new(8, 2);
547        let nodes = vec![
548            NodeId::new("node1"),
549            NodeId::new("node2"),
550            NodeId::new("node3"),
551        ];
552
553        manager.initialize_shards(&nodes);
554
555        assert_eq!(manager.shard_count(), 8);
556
557        let active = manager.get_active_shards();
558        assert_eq!(active.len(), 8);
559
560        // Check distribution
561        let node1_shards = manager.get_node_shards(&NodeId::new("node1"));
562        let node2_shards = manager.get_node_shards(&NodeId::new("node2"));
563        let node3_shards = manager.get_node_shards(&NodeId::new("node3"));
564
565        assert!(!node1_shards.is_empty());
566        assert!(!node2_shards.is_empty());
567        assert!(!node3_shards.is_empty());
568    }
569
570    #[test]
571    fn test_shard_manager_stats() {
572        let manager = ShardManager::new(4, 2);
573        let nodes = vec![NodeId::new("node1"), NodeId::new("node2")];
574
575        manager.initialize_shards(&nodes);
576
577        let stats = manager.stats();
578        assert_eq!(stats.total_shards, 4);
579        assert_eq!(stats.active_shards, 4);
580    }
581}