1use crate::node::NodeId;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::RwLock;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct ShardId(pub u32);
21
22impl ShardId {
23 pub fn new(id: u32) -> Self {
24 Self(id)
25 }
26
27 pub fn as_u32(&self) -> u32 {
28 self.0
29 }
30}
31
32impl std::fmt::Display for ShardId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "shard_{}", self.0)
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
44pub enum ShardState {
45 #[default]
47 Creating,
48 Active,
50 Migrating,
52 Splitting,
54 Merging,
56 Inactive,
58 Deleting,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct Shard {
69 pub id: ShardId,
70 pub state: ShardState,
71 pub primary_node: NodeId,
72 pub replica_nodes: Vec<NodeId>,
73 pub key_range_start: Option<u64>,
74 pub key_range_end: Option<u64>,
75 pub created_at: u64,
76 pub updated_at: u64,
77 pub size_bytes: u64,
78 pub row_count: u64,
79 pub metadata: HashMap<String, String>,
80}
81
82impl Shard {
83 pub fn new(id: ShardId, primary_node: NodeId) -> Self {
85 let now = current_timestamp();
86 Self {
87 id,
88 state: ShardState::Creating,
89 primary_node,
90 replica_nodes: Vec::new(),
91 key_range_start: None,
92 key_range_end: None,
93 created_at: now,
94 updated_at: now,
95 size_bytes: 0,
96 row_count: 0,
97 metadata: HashMap::new(),
98 }
99 }
100
101 pub fn with_range(id: ShardId, primary_node: NodeId, start: u64, end: u64) -> Self {
103 let mut shard = Self::new(id, primary_node);
104 shard.key_range_start = Some(start);
105 shard.key_range_end = Some(end);
106 shard
107 }
108
109 pub fn add_replica(&mut self, node: NodeId) {
111 if !self.replica_nodes.contains(&node) && node != self.primary_node {
112 self.replica_nodes.push(node);
113 self.updated_at = current_timestamp();
114 }
115 }
116
117 pub fn remove_replica(&mut self, node: &NodeId) {
119 self.replica_nodes.retain(|n| n != node);
120 self.updated_at = current_timestamp();
121 }
122
123 pub fn set_primary(&mut self, node: NodeId) {
125 self.primary_node = node;
126 self.updated_at = current_timestamp();
127 }
128
129 pub fn set_state(&mut self, state: ShardState) {
131 self.state = state;
132 self.updated_at = current_timestamp();
133 }
134
135 pub fn contains_key(&self, key_hash: u64) -> bool {
137 match (self.key_range_start, self.key_range_end) {
138 (Some(start), Some(end)) => key_hash >= start && key_hash < end,
139 _ => true, }
141 }
142
143 pub fn is_active(&self) -> bool {
145 self.state == ShardState::Active
146 }
147
148 pub fn is_readable(&self) -> bool {
150 matches!(
151 self.state,
152 ShardState::Active | ShardState::Migrating | ShardState::Splitting
153 )
154 }
155
156 pub fn is_writable(&self) -> bool {
158 self.state == ShardState::Active
159 }
160
161 pub fn all_nodes(&self) -> Vec<&NodeId> {
163 let mut nodes = vec![&self.primary_node];
164 nodes.extend(self.replica_nodes.iter());
165 nodes
166 }
167
168 pub fn replication_factor(&self) -> usize {
170 1 + self.replica_nodes.len()
171 }
172
173 pub fn update_stats(&mut self, size_bytes: u64, row_count: u64) {
175 self.size_bytes = size_bytes;
176 self.row_count = row_count;
177 self.updated_at = current_timestamp();
178 }
179}
180
181pub struct ShardManager {
187 shards: RwLock<HashMap<ShardId, Shard>>,
188 node_shards: RwLock<HashMap<NodeId, Vec<ShardId>>>,
189 num_shards: u32,
190 replication_factor: usize,
191}
192
193impl ShardManager {
194 pub fn new(num_shards: u32, replication_factor: usize) -> Self {
196 Self {
197 shards: RwLock::new(HashMap::new()),
198 node_shards: RwLock::new(HashMap::new()),
199 num_shards,
200 replication_factor,
201 }
202 }
203
204 pub fn num_shards(&self) -> u32 {
206 self.num_shards
207 }
208
209 pub fn replication_factor(&self) -> usize {
211 self.replication_factor
212 }
213
214 pub fn create_shard(&self, id: ShardId, primary_node: NodeId) -> Shard {
216 let shard = Shard::new(id.clone(), primary_node.clone());
217
218 let mut shards = self
219 .shards
220 .write()
221 .expect("shard manager shards lock poisoned");
222 shards.insert(id.clone(), shard.clone());
223
224 let mut node_shards = self
225 .node_shards
226 .write()
227 .expect("shard manager node_shards lock poisoned");
228 node_shards.entry(primary_node).or_default().push(id);
229
230 shard
231 }
232
233 pub fn create_shard_with_range(
235 &self,
236 id: ShardId,
237 primary_node: NodeId,
238 start: u64,
239 end: u64,
240 ) -> Shard {
241 let shard = Shard::with_range(id.clone(), primary_node.clone(), start, end);
242
243 let mut shards = self
244 .shards
245 .write()
246 .expect("shard manager shards lock poisoned");
247 shards.insert(id.clone(), shard.clone());
248
249 let mut node_shards = self
250 .node_shards
251 .write()
252 .expect("shard manager node_shards lock poisoned");
253 node_shards.entry(primary_node).or_default().push(id);
254
255 shard
256 }
257
258 pub fn get_shard(&self, id: &ShardId) -> Option<Shard> {
260 self.shards
261 .read()
262 .expect("shard manager shards lock poisoned")
263 .get(id)
264 .cloned()
265 }
266
267 pub fn get_all_shards(&self) -> Vec<Shard> {
269 self.shards
270 .read()
271 .expect("shard manager shards lock poisoned")
272 .values()
273 .cloned()
274 .collect()
275 }
276
277 pub fn get_node_shards(&self, node_id: &NodeId) -> Vec<ShardId> {
279 self.node_shards
280 .read()
281 .expect("shard manager node_shards lock poisoned")
282 .get(node_id)
283 .cloned()
284 .unwrap_or_default()
285 }
286
287 pub fn update_shard_state(&self, id: &ShardId, state: ShardState) -> bool {
289 let mut shards = self
290 .shards
291 .write()
292 .expect("shard manager shards lock poisoned");
293 if let Some(shard) = shards.get_mut(id) {
294 shard.set_state(state);
295 true
296 } else {
297 false
298 }
299 }
300
301 pub fn add_replica(&self, shard_id: &ShardId, node_id: NodeId) -> bool {
303 let mut shards = self
304 .shards
305 .write()
306 .expect("shard manager shards lock poisoned");
307 if let Some(shard) = shards.get_mut(shard_id) {
308 shard.add_replica(node_id.clone());
309
310 let mut node_shards = self
311 .node_shards
312 .write()
313 .expect("shard manager node_shards lock poisoned");
314 node_shards
315 .entry(node_id)
316 .or_default()
317 .push(shard_id.clone());
318
319 true
320 } else {
321 false
322 }
323 }
324
325 pub fn remove_shard(&self, id: &ShardId) -> Option<Shard> {
327 let mut shards = self
328 .shards
329 .write()
330 .expect("shard manager shards lock poisoned");
331 let shard = shards.remove(id)?;
332
333 let mut node_shards = self
334 .node_shards
335 .write()
336 .expect("shard manager node_shards lock poisoned");
337
338 if let Some(shards) = node_shards.get_mut(&shard.primary_node) {
340 shards.retain(|s| s != id);
341 }
342
343 for replica in &shard.replica_nodes {
345 if let Some(shards) = node_shards.get_mut(replica) {
346 shards.retain(|s| s != id);
347 }
348 }
349
350 Some(shard)
351 }
352
353 pub fn get_shard_for_key(&self, key_hash: u64) -> Option<Shard> {
355 let shards = self
356 .shards
357 .read()
358 .expect("shard manager shards lock poisoned");
359
360 let shard_id = ShardId::new((key_hash % self.num_shards as u64) as u32);
362 shards.get(&shard_id).cloned()
363 }
364
365 pub fn get_active_shards(&self) -> Vec<Shard> {
367 self.shards
368 .read()
369 .expect("shard manager shards lock poisoned")
370 .values()
371 .filter(|s| s.is_active())
372 .cloned()
373 .collect()
374 }
375
376 pub fn shard_count(&self) -> usize {
378 self.shards
379 .read()
380 .expect("shard manager shards lock poisoned")
381 .len()
382 }
383
384 pub fn initialize_shards(&self, nodes: &[NodeId]) {
386 if nodes.is_empty() {
387 return;
388 }
389
390 let range_size = u64::MAX / self.num_shards as u64;
391
392 for i in 0..self.num_shards {
393 let shard_id = ShardId::new(i);
394 let primary_idx = i as usize % nodes.len();
395 let primary_node = nodes[primary_idx].clone();
396
397 let start = i as u64 * range_size;
398 let end = if i == self.num_shards - 1 {
399 u64::MAX
400 } else {
401 (i as u64 + 1) * range_size
402 };
403
404 let _shard = self.create_shard_with_range(shard_id.clone(), primary_node, start, end);
405
406 for r in 1..self.replication_factor.min(nodes.len()) {
408 let replica_idx = (primary_idx + r) % nodes.len();
409 self.add_replica(&shard_id, nodes[replica_idx].clone());
410 }
411
412 self.update_shard_state(&shard_id, ShardState::Active);
414 }
415 }
416
417 pub fn stats(&self) -> ShardManagerStats {
419 let shards = self
420 .shards
421 .read()
422 .expect("shard manager shards lock poisoned");
423
424 let active = shards.values().filter(|s| s.is_active()).count();
425 let migrating = shards
426 .values()
427 .filter(|s| s.state == ShardState::Migrating)
428 .count();
429 let total_size: u64 = shards.values().map(|s| s.size_bytes).sum();
430 let total_rows: u64 = shards.values().map(|s| s.row_count).sum();
431
432 ShardManagerStats {
433 total_shards: shards.len(),
434 active_shards: active,
435 migrating_shards: migrating,
436 total_size_bytes: total_size,
437 total_row_count: total_rows,
438 }
439 }
440}
441
442#[derive(Debug, Clone)]
448pub struct ShardManagerStats {
449 pub total_shards: usize,
450 pub active_shards: usize,
451 pub migrating_shards: usize,
452 pub total_size_bytes: u64,
453 pub total_row_count: u64,
454}
455
456fn current_timestamp() -> u64 {
457 SystemTime::now()
458 .duration_since(UNIX_EPOCH)
459 .map(|d| d.as_millis() as u64)
460 .unwrap_or(0)
461}
462
463#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn test_shard_id() {
473 let id = ShardId::new(5);
474 assert_eq!(id.as_u32(), 5);
475 assert_eq!(id.to_string(), "shard_5");
476 }
477
478 #[test]
479 fn test_shard_creation() {
480 let shard = Shard::new(ShardId::new(0), NodeId::new("node1"));
481
482 assert_eq!(shard.id.as_u32(), 0);
483 assert_eq!(shard.primary_node.as_str(), "node1");
484 assert_eq!(shard.state, ShardState::Creating);
485 assert!(shard.replica_nodes.is_empty());
486 }
487
488 #[test]
489 fn test_shard_with_range() {
490 let shard = Shard::with_range(ShardId::new(0), NodeId::new("node1"), 0, 1000);
491
492 assert!(shard.contains_key(500));
493 assert!(!shard.contains_key(1000));
494 }
495
496 #[test]
497 fn test_shard_replicas() {
498 let mut shard = Shard::new(ShardId::new(0), NodeId::new("node1"));
499
500 shard.add_replica(NodeId::new("node2"));
501 shard.add_replica(NodeId::new("node3"));
502
503 assert_eq!(shard.replica_nodes.len(), 2);
504 assert_eq!(shard.replication_factor(), 3);
505 assert_eq!(shard.all_nodes().len(), 3);
506 }
507
508 #[test]
509 fn test_shard_state() {
510 let mut shard = Shard::new(ShardId::new(0), NodeId::new("node1"));
511
512 assert!(!shard.is_active());
513
514 shard.set_state(ShardState::Active);
515 assert!(shard.is_active());
516 assert!(shard.is_readable());
517 assert!(shard.is_writable());
518
519 shard.set_state(ShardState::Migrating);
520 assert!(shard.is_readable());
521 assert!(!shard.is_writable());
522 }
523
524 #[test]
525 fn test_shard_manager_creation() {
526 let manager = ShardManager::new(16, 3);
527
528 assert_eq!(manager.num_shards(), 16);
529 assert_eq!(manager.replication_factor(), 3);
530 }
531
532 #[test]
533 fn test_shard_manager_create_shard() {
534 let manager = ShardManager::new(16, 3);
535
536 let _shard = manager.create_shard(ShardId::new(0), NodeId::new("node1"));
537
538 assert_eq!(manager.shard_count(), 1);
539
540 let retrieved = manager.get_shard(&ShardId::new(0)).unwrap();
541 assert_eq!(retrieved.primary_node.as_str(), "node1");
542 }
543
544 #[test]
545 fn test_shard_manager_initialize() {
546 let manager = ShardManager::new(8, 2);
547 let nodes = vec![
548 NodeId::new("node1"),
549 NodeId::new("node2"),
550 NodeId::new("node3"),
551 ];
552
553 manager.initialize_shards(&nodes);
554
555 assert_eq!(manager.shard_count(), 8);
556
557 let active = manager.get_active_shards();
558 assert_eq!(active.len(), 8);
559
560 let node1_shards = manager.get_node_shards(&NodeId::new("node1"));
562 let node2_shards = manager.get_node_shards(&NodeId::new("node2"));
563 let node3_shards = manager.get_node_shards(&NodeId::new("node3"));
564
565 assert!(!node1_shards.is_empty());
566 assert!(!node2_shards.is_empty());
567 assert!(!node3_shards.is_empty());
568 }
569
570 #[test]
571 fn test_shard_manager_stats() {
572 let manager = ShardManager::new(4, 2);
573 let nodes = vec![NodeId::new("node1"), NodeId::new("node2")];
574
575 manager.initialize_shards(&nodes);
576
577 let stats = manager.stats();
578 assert_eq!(stats.total_shards, 4);
579 assert_eq!(stats.active_shards, 4);
580 }
581}