1use std::collections::{HashMap, HashSet};
29use std::net::SocketAddr;
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33
34use crate::NodeId;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct MigrationId(pub u64);
39
40impl MigrationId {
41 pub fn new(slot: u16) -> Self {
48 use rand::Rng;
49 use std::time::{SystemTime, UNIX_EPOCH};
50 let ts = SystemTime::now()
51 .duration_since(UNIX_EPOCH)
52 .unwrap_or_default()
53 .as_nanos() as u64;
54 let noise: u16 = rand::rng().random();
55 Self(ts ^ (slot as u64) ^ ((noise as u64) << 48))
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum MigrationState {
62 Importing,
64 Migrating,
66 Streaming,
68 Finalizing,
70 Complete,
72}
73
74impl std::fmt::Display for MigrationState {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 match self {
77 Self::Importing => write!(f, "importing"),
78 Self::Migrating => write!(f, "migrating"),
79 Self::Streaming => write!(f, "streaming"),
80 Self::Finalizing => write!(f, "finalizing"),
81 Self::Complete => write!(f, "complete"),
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct Migration {
89 pub id: MigrationId,
91 pub slot: u16,
93 pub source: NodeId,
95 pub target: NodeId,
97 pub state: MigrationState,
99 #[serde(skip)]
101 pub started_at: Option<Instant>,
102 pub keys_migrated: u64,
104 pub keys_total: Option<u64>,
106}
107
108impl Migration {
109 pub fn new_importing(slot: u16, source: NodeId, target: NodeId) -> Self {
111 Self {
112 id: MigrationId::new(slot),
113 slot,
114 source,
115 target,
116 state: MigrationState::Importing,
117 started_at: Some(Instant::now()),
118 keys_migrated: 0,
119 keys_total: None,
120 }
121 }
122
123 pub fn new_migrating(slot: u16, source: NodeId, target: NodeId) -> Self {
125 Self {
126 id: MigrationId::new(slot),
127 slot,
128 source,
129 target,
130 state: MigrationState::Migrating,
131 started_at: Some(Instant::now()),
132 keys_migrated: 0,
133 keys_total: None,
134 }
135 }
136
137 pub fn involves(&self, node: &NodeId) -> bool {
139 self.source == *node || self.target == *node
140 }
141
142 pub fn progress(&self) -> Option<u8> {
144 self.keys_total.map(|total| {
145 if total == 0 {
146 100
147 } else {
148 ((self.keys_migrated as f64 / total as f64) * 100.0).min(100.0) as u8
149 }
150 })
151 }
152
153 pub fn start_streaming(&mut self, total_keys: u64) {
155 self.state = MigrationState::Streaming;
156 self.keys_total = Some(total_keys);
157 }
158
159 pub fn record_migrated(&mut self, count: u64) {
161 self.keys_migrated += count;
162 }
163
164 pub fn start_finalizing(&mut self) {
166 self.state = MigrationState::Finalizing;
167 }
168
169 pub fn complete(&mut self) {
171 self.state = MigrationState::Complete;
172 }
173}
174
175#[derive(Debug, Default)]
177pub struct MigrationManager {
178 outgoing: HashMap<u16, Migration>,
180 incoming: HashMap<u16, Migration>,
182 pending_keys: HashMap<u16, HashSet<Vec<u8>>>,
184}
185
186impl MigrationManager {
187 pub fn new() -> Self {
189 Self::default()
190 }
191
192 pub fn is_migrating(&self, slot: u16) -> bool {
194 self.outgoing.contains_key(&slot)
195 }
196
197 pub fn is_importing(&self, slot: u16) -> bool {
199 self.incoming.contains_key(&slot)
200 }
201
202 pub fn get_outgoing(&self, slot: u16) -> Option<&Migration> {
204 self.outgoing.get(&slot)
205 }
206
207 pub fn get_incoming(&self, slot: u16) -> Option<&Migration> {
209 self.incoming.get(&slot)
210 }
211
212 pub fn start_import(
216 &mut self,
217 slot: u16,
218 source: NodeId,
219 local_id: NodeId,
220 ) -> Result<&Migration, MigrationError> {
221 if self.outgoing.contains_key(&slot) {
222 return Err(MigrationError::SlotAlreadyMigrating { slot });
223 }
224 if self.incoming.contains_key(&slot) {
225 return Err(MigrationError::SlotAlreadyImporting { slot });
226 }
227
228 let migration = Migration::new_importing(slot, source, local_id);
229 self.incoming.insert(slot, migration);
230 self.pending_keys.insert(slot, HashSet::new());
231 self.incoming
232 .get(&slot)
233 .ok_or(MigrationError::NoMigrationInProgress { slot })
234 }
235
236 pub fn start_migrate(
240 &mut self,
241 slot: u16,
242 local_id: NodeId,
243 target: NodeId,
244 ) -> Result<&Migration, MigrationError> {
245 if self.outgoing.contains_key(&slot) {
246 return Err(MigrationError::SlotAlreadyMigrating { slot });
247 }
248 if self.incoming.contains_key(&slot) {
249 return Err(MigrationError::SlotAlreadyImporting { slot });
250 }
251
252 let migration = Migration::new_migrating(slot, local_id, target);
253 self.outgoing.insert(slot, migration);
254 self.pending_keys.insert(slot, HashSet::new());
255 self.outgoing
256 .get(&slot)
257 .ok_or(MigrationError::NoMigrationInProgress { slot })
258 }
259
260 pub fn key_migrated(&mut self, slot: u16, key: Vec<u8>) {
262 if let Some(keys) = self.pending_keys.get_mut(&slot) {
263 keys.insert(key);
264 }
265 if let Some(migration) = self.outgoing.get_mut(&slot) {
266 migration.record_migrated(1);
267 }
268 }
269
270 pub fn is_key_migrated(&self, slot: u16, key: &[u8]) -> bool {
272 self.pending_keys
273 .get(&slot)
274 .is_some_and(|keys| keys.contains(key))
275 }
276
277 pub fn complete_migration(&mut self, slot: u16) -> Option<Migration> {
279 self.pending_keys.remove(&slot);
280 self.outgoing
282 .remove(&slot)
283 .or_else(|| self.incoming.remove(&slot))
284 .map(|mut m| {
285 m.complete();
286 m
287 })
288 }
289
290 pub fn abort_migration(&mut self, slot: u16) -> Option<Migration> {
292 self.pending_keys.remove(&slot);
293 self.outgoing
294 .remove(&slot)
295 .or_else(|| self.incoming.remove(&slot))
296 }
297
298 pub fn outgoing_migrations(&self) -> impl Iterator<Item = &Migration> {
300 self.outgoing.values()
301 }
302
303 pub fn incoming_migrations(&self) -> impl Iterator<Item = &Migration> {
305 self.incoming.values()
306 }
307
308 pub fn active_count(&self) -> usize {
310 self.outgoing.len() + self.incoming.len()
311 }
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct MigrationBatch {
317 pub slot: u16,
319 pub entries: Vec<MigrationEntry>,
321 pub is_final: bool,
323 pub sequence: u64,
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct MigrationEntry {
330 pub key: Vec<u8>,
332 pub value: Vec<u8>,
334 pub ttl_ms: u64,
336}
337
338impl MigrationBatch {
339 pub fn new(slot: u16, sequence: u64) -> Self {
341 Self {
342 slot,
343 entries: Vec::new(),
344 is_final: false,
345 sequence,
346 }
347 }
348
349 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: u64) {
351 self.entries.push(MigrationEntry { key, value, ttl_ms });
352 }
353
354 pub fn mark_final(&mut self) {
356 self.is_final = true;
357 }
358
359 pub fn len(&self) -> usize {
361 self.entries.len()
362 }
363
364 pub fn is_empty(&self) -> bool {
366 self.entries.is_empty()
367 }
368
369 pub fn size_bytes(&self) -> usize {
371 self.entries
372 .iter()
373 .map(|e| e.key.len() + e.value.len() + 8)
374 .sum()
375 }
376}
377
378#[derive(Debug, Clone, thiserror::Error)]
380pub enum MigrationError {
381 #[error("slot {slot} is already migrating")]
383 SlotAlreadyMigrating { slot: u16 },
384
385 #[error("slot {slot} is already importing")]
387 SlotAlreadyImporting { slot: u16 },
388
389 #[error("no migration in progress for slot {slot}")]
391 NoMigrationInProgress { slot: u16 },
392
393 #[error("cannot reach migration target {addr}: {reason}")]
395 TargetUnreachable { addr: SocketAddr, reason: String },
396
397 #[error("migration for slot {slot} was aborted")]
399 Aborted { slot: u16 },
400
401 #[error("invalid state transition from {from} to {to}")]
403 InvalidStateTransition {
404 from: MigrationState,
405 to: MigrationState,
406 },
407
408 #[error("migration timeout after {elapsed:?}")]
410 Timeout { elapsed: Duration },
411}
412
413#[derive(Debug, Clone, PartialEq, Eq)]
415pub enum MigrationRedirect {
416 None,
418 Moved { slot: u16, addr: SocketAddr },
420 Ask { slot: u16, addr: SocketAddr },
422}
423
424impl MigrationRedirect {
425 pub fn to_error_string(&self) -> Option<String> {
427 match self {
428 Self::None => None,
429 Self::Moved { slot, addr } => Some(format!("MOVED {} {}", slot, addr)),
430 Self::Ask { slot, addr } => Some(format!("ASK {} {}", slot, addr)),
431 }
432 }
433}
434
435#[derive(Debug, Clone)]
437pub struct MigrationConfig {
438 pub batch_size: usize,
440 pub batch_bytes: usize,
442 pub key_timeout: Duration,
444 pub migration_timeout: Duration,
446 pub batch_delay: Duration,
448}
449
450impl Default for MigrationConfig {
451 fn default() -> Self {
452 Self {
453 batch_size: 100,
454 batch_bytes: 1024 * 1024, key_timeout: Duration::from_secs(5),
456 migration_timeout: Duration::from_secs(3600), batch_delay: Duration::from_millis(10),
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 fn node_id() -> NodeId {
467 NodeId::new()
468 }
469
470 #[test]
471 fn migration_new_importing() {
472 let source = node_id();
473 let target = node_id();
474 let m = Migration::new_importing(100, source, target);
475
476 assert_eq!(m.slot, 100);
477 assert_eq!(m.source, source);
478 assert_eq!(m.target, target);
479 assert_eq!(m.state, MigrationState::Importing);
480 assert_eq!(m.keys_migrated, 0);
481 }
482
483 #[test]
484 fn migration_new_migrating() {
485 let source = node_id();
486 let target = node_id();
487 let m = Migration::new_migrating(100, source, target);
488
489 assert_eq!(m.state, MigrationState::Migrating);
490 }
491
492 #[test]
493 fn migration_involves() {
494 let source = node_id();
495 let target = node_id();
496 let other = node_id();
497 let m = Migration::new_importing(100, source, target);
498
499 assert!(m.involves(&source));
500 assert!(m.involves(&target));
501 assert!(!m.involves(&other));
502 }
503
504 #[test]
505 fn migration_progress() {
506 let mut m = Migration::new_migrating(100, node_id(), node_id());
507
508 assert_eq!(m.progress(), None);
510
511 m.start_streaming(100);
513 assert_eq!(m.progress(), Some(0));
514
515 m.record_migrated(50);
516 assert_eq!(m.progress(), Some(50));
517
518 m.record_migrated(50);
519 assert_eq!(m.progress(), Some(100));
520 }
521
522 #[test]
523 fn migration_state_transitions() {
524 let mut m = Migration::new_migrating(100, node_id(), node_id());
525
526 assert_eq!(m.state, MigrationState::Migrating);
527
528 m.start_streaming(50);
529 assert_eq!(m.state, MigrationState::Streaming);
530
531 m.start_finalizing();
532 assert_eq!(m.state, MigrationState::Finalizing);
533
534 m.complete();
535 assert_eq!(m.state, MigrationState::Complete);
536 }
537
538 #[test]
539 fn manager_start_import() {
540 let mut manager = MigrationManager::new();
541 let source = node_id();
542 let local = node_id();
543
544 let result = manager.start_import(100, source, local);
545 assert!(result.is_ok());
546 assert!(manager.is_importing(100));
547 assert!(!manager.is_migrating(100));
548 }
549
550 #[test]
551 fn manager_start_migrate() {
552 let mut manager = MigrationManager::new();
553 let local = node_id();
554 let target = node_id();
555
556 let result = manager.start_migrate(100, local, target);
557 assert!(result.is_ok());
558 assert!(manager.is_migrating(100));
559 assert!(!manager.is_importing(100));
560 }
561
562 #[test]
563 fn manager_double_migration_error() {
564 let mut manager = MigrationManager::new();
565 let local = node_id();
566 let target = node_id();
567
568 manager.start_migrate(100, local, target).unwrap();
569
570 let result = manager.start_migrate(100, local, node_id());
572 assert!(matches!(
573 result,
574 Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
575 ));
576
577 let result = manager.start_import(100, node_id(), local);
579 assert!(matches!(
580 result,
581 Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
582 ));
583 }
584
585 #[test]
586 fn manager_key_tracking() {
587 let mut manager = MigrationManager::new();
588 let local = node_id();
589 let target = node_id();
590
591 manager.start_migrate(100, local, target).unwrap();
592
593 assert!(!manager.is_key_migrated(100, b"key1"));
594
595 manager.key_migrated(100, b"key1".to_vec());
596 assert!(manager.is_key_migrated(100, b"key1"));
597 assert!(!manager.is_key_migrated(100, b"key2"));
598 }
599
600 #[test]
601 fn manager_complete_migration() {
602 let mut manager = MigrationManager::new();
603 let local = node_id();
604 let target = node_id();
605
606 manager.start_migrate(100, local, target).unwrap();
607 manager.key_migrated(100, b"key1".to_vec());
608
609 let completed = manager.complete_migration(100);
610 assert!(completed.is_some());
611 assert_eq!(completed.unwrap().state, MigrationState::Complete);
612
613 assert!(!manager.is_migrating(100));
615 assert!(!manager.is_key_migrated(100, b"key1"));
616 }
617
618 #[test]
619 fn manager_abort_migration() {
620 let mut manager = MigrationManager::new();
621 let local = node_id();
622 let target = node_id();
623
624 manager.start_migrate(100, local, target).unwrap();
625
626 let aborted = manager.abort_migration(100);
627 assert!(aborted.is_some());
628
629 assert!(!manager.is_migrating(100));
630 }
631
632 #[test]
633 fn batch_operations() {
634 let mut batch = MigrationBatch::new(100, 1);
635
636 assert!(batch.is_empty());
637 assert_eq!(batch.len(), 0);
638
639 batch.add_entry(b"key1".to_vec(), b"value1".to_vec(), 0);
640 batch.add_entry(b"key2".to_vec(), b"value2".to_vec(), 5000);
641
642 assert!(!batch.is_empty());
643 assert_eq!(batch.len(), 2);
644 assert!(!batch.is_final);
645
646 batch.mark_final();
647 assert!(batch.is_final);
648 }
649
650 #[test]
651 fn redirect_formatting() {
652 let moved = MigrationRedirect::Moved {
653 slot: 100,
654 addr: "127.0.0.1:6379".parse().unwrap(),
655 };
656 assert_eq!(
657 moved.to_error_string(),
658 Some("MOVED 100 127.0.0.1:6379".to_string())
659 );
660
661 let ask = MigrationRedirect::Ask {
662 slot: 200,
663 addr: "127.0.0.1:6380".parse().unwrap(),
664 };
665 assert_eq!(
666 ask.to_error_string(),
667 Some("ASK 200 127.0.0.1:6380".to_string())
668 );
669
670 assert_eq!(MigrationRedirect::None.to_error_string(), None);
671 }
672
673 #[test]
674 fn migration_state_display() {
675 assert_eq!(MigrationState::Importing.to_string(), "importing");
676 assert_eq!(MigrationState::Migrating.to_string(), "migrating");
677 assert_eq!(MigrationState::Streaming.to_string(), "streaming");
678 assert_eq!(MigrationState::Finalizing.to_string(), "finalizing");
679 assert_eq!(MigrationState::Complete.to_string(), "complete");
680 }
681
682 #[test]
683 fn config_defaults() {
684 let config = MigrationConfig::default();
685 assert_eq!(config.batch_size, 100);
686 assert_eq!(config.batch_bytes, 1024 * 1024);
687 assert_eq!(config.key_timeout, Duration::from_secs(5));
688 }
689}