1use std::collections::{HashMap, HashSet};
29use std::net::SocketAddr;
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33
34use crate::NodeId;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct MigrationId(pub u64);
39
40impl MigrationId {
41 pub fn new(slot: u16) -> Self {
43 use std::time::{SystemTime, UNIX_EPOCH};
44 let ts = SystemTime::now()
45 .duration_since(UNIX_EPOCH)
46 .unwrap_or_default()
47 .as_nanos() as u64;
48 Self(ts ^ (slot as u64))
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55pub enum MigrationState {
56 Importing,
58 Migrating,
60 Streaming,
62 Finalizing,
64 Complete,
66}
67
68impl std::fmt::Display for MigrationState {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 Self::Importing => write!(f, "importing"),
72 Self::Migrating => write!(f, "migrating"),
73 Self::Streaming => write!(f, "streaming"),
74 Self::Finalizing => write!(f, "finalizing"),
75 Self::Complete => write!(f, "complete"),
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Migration {
83 pub id: MigrationId,
85 pub slot: u16,
87 pub source: NodeId,
89 pub target: NodeId,
91 pub state: MigrationState,
93 #[serde(skip)]
95 pub started_at: Option<Instant>,
96 pub keys_migrated: u64,
98 pub keys_total: Option<u64>,
100}
101
102impl Migration {
103 pub fn new_importing(slot: u16, source: NodeId, target: NodeId) -> Self {
105 Self {
106 id: MigrationId::new(slot),
107 slot,
108 source,
109 target,
110 state: MigrationState::Importing,
111 started_at: Some(Instant::now()),
112 keys_migrated: 0,
113 keys_total: None,
114 }
115 }
116
117 pub fn new_migrating(slot: u16, source: NodeId, target: NodeId) -> Self {
119 Self {
120 id: MigrationId::new(slot),
121 slot,
122 source,
123 target,
124 state: MigrationState::Migrating,
125 started_at: Some(Instant::now()),
126 keys_migrated: 0,
127 keys_total: None,
128 }
129 }
130
131 pub fn involves(&self, node: &NodeId) -> bool {
133 self.source == *node || self.target == *node
134 }
135
136 pub fn progress(&self) -> Option<u8> {
138 self.keys_total.map(|total| {
139 if total == 0 {
140 100
141 } else {
142 ((self.keys_migrated * 100) / total).min(100) as u8
143 }
144 })
145 }
146
147 pub fn start_streaming(&mut self, total_keys: u64) {
149 self.state = MigrationState::Streaming;
150 self.keys_total = Some(total_keys);
151 }
152
153 pub fn record_migrated(&mut self, count: u64) {
155 self.keys_migrated += count;
156 }
157
158 pub fn start_finalizing(&mut self) {
160 self.state = MigrationState::Finalizing;
161 }
162
163 pub fn complete(&mut self) {
165 self.state = MigrationState::Complete;
166 }
167}
168
169#[derive(Debug, Default)]
171pub struct MigrationManager {
172 outgoing: HashMap<u16, Migration>,
174 incoming: HashMap<u16, Migration>,
176 pending_keys: HashMap<u16, HashSet<Vec<u8>>>,
178}
179
180impl MigrationManager {
181 pub fn new() -> Self {
183 Self::default()
184 }
185
186 pub fn is_migrating(&self, slot: u16) -> bool {
188 self.outgoing.contains_key(&slot)
189 }
190
191 pub fn is_importing(&self, slot: u16) -> bool {
193 self.incoming.contains_key(&slot)
194 }
195
196 pub fn get_outgoing(&self, slot: u16) -> Option<&Migration> {
198 self.outgoing.get(&slot)
199 }
200
201 pub fn get_incoming(&self, slot: u16) -> Option<&Migration> {
203 self.incoming.get(&slot)
204 }
205
206 pub fn start_import(
210 &mut self,
211 slot: u16,
212 source: NodeId,
213 local_id: NodeId,
214 ) -> Result<&Migration, MigrationError> {
215 if self.outgoing.contains_key(&slot) {
216 return Err(MigrationError::SlotAlreadyMigrating { slot });
217 }
218 if self.incoming.contains_key(&slot) {
219 return Err(MigrationError::SlotAlreadyImporting { slot });
220 }
221
222 let migration = Migration::new_importing(slot, source, local_id);
223 self.incoming.insert(slot, migration);
224 self.pending_keys.insert(slot, HashSet::new());
225 self.incoming
226 .get(&slot)
227 .ok_or(MigrationError::NoMigrationInProgress { slot })
228 }
229
230 pub fn start_migrate(
234 &mut self,
235 slot: u16,
236 local_id: NodeId,
237 target: NodeId,
238 ) -> Result<&Migration, MigrationError> {
239 if self.outgoing.contains_key(&slot) {
240 return Err(MigrationError::SlotAlreadyMigrating { slot });
241 }
242 if self.incoming.contains_key(&slot) {
243 return Err(MigrationError::SlotAlreadyImporting { slot });
244 }
245
246 let migration = Migration::new_migrating(slot, local_id, target);
247 self.outgoing.insert(slot, migration);
248 self.pending_keys.insert(slot, HashSet::new());
249 self.outgoing
250 .get(&slot)
251 .ok_or(MigrationError::NoMigrationInProgress { slot })
252 }
253
254 pub fn key_migrated(&mut self, slot: u16, key: Vec<u8>) {
256 if let Some(keys) = self.pending_keys.get_mut(&slot) {
257 keys.insert(key);
258 }
259 if let Some(migration) = self.outgoing.get_mut(&slot) {
260 migration.record_migrated(1);
261 }
262 }
263
264 pub fn is_key_migrated(&self, slot: u16, key: &[u8]) -> bool {
266 self.pending_keys
267 .get(&slot)
268 .is_some_and(|keys| keys.contains(key))
269 }
270
271 pub fn complete_migration(&mut self, slot: u16) -> Option<Migration> {
273 self.pending_keys.remove(&slot);
274 self.outgoing
276 .remove(&slot)
277 .or_else(|| self.incoming.remove(&slot))
278 .map(|mut m| {
279 m.complete();
280 m
281 })
282 }
283
284 pub fn abort_migration(&mut self, slot: u16) -> Option<Migration> {
286 self.pending_keys.remove(&slot);
287 self.outgoing
288 .remove(&slot)
289 .or_else(|| self.incoming.remove(&slot))
290 }
291
292 pub fn outgoing_migrations(&self) -> impl Iterator<Item = &Migration> {
294 self.outgoing.values()
295 }
296
297 pub fn incoming_migrations(&self) -> impl Iterator<Item = &Migration> {
299 self.incoming.values()
300 }
301
302 pub fn active_count(&self) -> usize {
304 self.outgoing.len() + self.incoming.len()
305 }
306}
307
308#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct MigrationBatch {
311 pub slot: u16,
313 pub entries: Vec<MigrationEntry>,
315 pub is_final: bool,
317 pub sequence: u64,
319}
320
321#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct MigrationEntry {
324 pub key: Vec<u8>,
326 pub value: Vec<u8>,
328 pub ttl_ms: u64,
330}
331
332impl MigrationBatch {
333 pub fn new(slot: u16, sequence: u64) -> Self {
335 Self {
336 slot,
337 entries: Vec::new(),
338 is_final: false,
339 sequence,
340 }
341 }
342
343 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: u64) {
345 self.entries.push(MigrationEntry { key, value, ttl_ms });
346 }
347
348 pub fn mark_final(&mut self) {
350 self.is_final = true;
351 }
352
353 pub fn len(&self) -> usize {
355 self.entries.len()
356 }
357
358 pub fn is_empty(&self) -> bool {
360 self.entries.is_empty()
361 }
362
363 pub fn size_bytes(&self) -> usize {
365 self.entries
366 .iter()
367 .map(|e| e.key.len() + e.value.len() + 8)
368 .sum()
369 }
370}
371
372#[derive(Debug, Clone, thiserror::Error)]
374pub enum MigrationError {
375 #[error("slot {slot} is already migrating")]
377 SlotAlreadyMigrating { slot: u16 },
378
379 #[error("slot {slot} is already importing")]
381 SlotAlreadyImporting { slot: u16 },
382
383 #[error("no migration in progress for slot {slot}")]
385 NoMigrationInProgress { slot: u16 },
386
387 #[error("cannot reach migration target {addr}: {reason}")]
389 TargetUnreachable { addr: SocketAddr, reason: String },
390
391 #[error("migration for slot {slot} was aborted")]
393 Aborted { slot: u16 },
394
395 #[error("invalid state transition from {from} to {to}")]
397 InvalidStateTransition {
398 from: MigrationState,
399 to: MigrationState,
400 },
401
402 #[error("migration timeout after {elapsed:?}")]
404 Timeout { elapsed: Duration },
405}
406
407#[derive(Debug, Clone, PartialEq, Eq)]
409pub enum MigrationRedirect {
410 None,
412 Moved { slot: u16, addr: SocketAddr },
414 Ask { slot: u16, addr: SocketAddr },
416}
417
418impl MigrationRedirect {
419 pub fn to_error_string(&self) -> Option<String> {
421 match self {
422 Self::None => None,
423 Self::Moved { slot, addr } => Some(format!("MOVED {} {}", slot, addr)),
424 Self::Ask { slot, addr } => Some(format!("ASK {} {}", slot, addr)),
425 }
426 }
427}
428
429#[derive(Debug, Clone)]
431pub struct MigrationConfig {
432 pub batch_size: usize,
434 pub batch_bytes: usize,
436 pub key_timeout: Duration,
438 pub migration_timeout: Duration,
440 pub batch_delay: Duration,
442}
443
444impl Default for MigrationConfig {
445 fn default() -> Self {
446 Self {
447 batch_size: 100,
448 batch_bytes: 1024 * 1024, key_timeout: Duration::from_secs(5),
450 migration_timeout: Duration::from_secs(3600), batch_delay: Duration::from_millis(10),
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 fn node_id() -> NodeId {
461 NodeId::new()
462 }
463
464 #[test]
465 fn migration_new_importing() {
466 let source = node_id();
467 let target = node_id();
468 let m = Migration::new_importing(100, source, target);
469
470 assert_eq!(m.slot, 100);
471 assert_eq!(m.source, source);
472 assert_eq!(m.target, target);
473 assert_eq!(m.state, MigrationState::Importing);
474 assert_eq!(m.keys_migrated, 0);
475 }
476
477 #[test]
478 fn migration_new_migrating() {
479 let source = node_id();
480 let target = node_id();
481 let m = Migration::new_migrating(100, source, target);
482
483 assert_eq!(m.state, MigrationState::Migrating);
484 }
485
486 #[test]
487 fn migration_involves() {
488 let source = node_id();
489 let target = node_id();
490 let other = node_id();
491 let m = Migration::new_importing(100, source, target);
492
493 assert!(m.involves(&source));
494 assert!(m.involves(&target));
495 assert!(!m.involves(&other));
496 }
497
498 #[test]
499 fn migration_progress() {
500 let mut m = Migration::new_migrating(100, node_id(), node_id());
501
502 assert_eq!(m.progress(), None);
504
505 m.start_streaming(100);
507 assert_eq!(m.progress(), Some(0));
508
509 m.record_migrated(50);
510 assert_eq!(m.progress(), Some(50));
511
512 m.record_migrated(50);
513 assert_eq!(m.progress(), Some(100));
514 }
515
516 #[test]
517 fn migration_state_transitions() {
518 let mut m = Migration::new_migrating(100, node_id(), node_id());
519
520 assert_eq!(m.state, MigrationState::Migrating);
521
522 m.start_streaming(50);
523 assert_eq!(m.state, MigrationState::Streaming);
524
525 m.start_finalizing();
526 assert_eq!(m.state, MigrationState::Finalizing);
527
528 m.complete();
529 assert_eq!(m.state, MigrationState::Complete);
530 }
531
532 #[test]
533 fn manager_start_import() {
534 let mut manager = MigrationManager::new();
535 let source = node_id();
536 let local = node_id();
537
538 let result = manager.start_import(100, source, local);
539 assert!(result.is_ok());
540 assert!(manager.is_importing(100));
541 assert!(!manager.is_migrating(100));
542 }
543
544 #[test]
545 fn manager_start_migrate() {
546 let mut manager = MigrationManager::new();
547 let local = node_id();
548 let target = node_id();
549
550 let result = manager.start_migrate(100, local, target);
551 assert!(result.is_ok());
552 assert!(manager.is_migrating(100));
553 assert!(!manager.is_importing(100));
554 }
555
556 #[test]
557 fn manager_double_migration_error() {
558 let mut manager = MigrationManager::new();
559 let local = node_id();
560 let target = node_id();
561
562 manager.start_migrate(100, local, target).unwrap();
563
564 let result = manager.start_migrate(100, local, node_id());
566 assert!(matches!(
567 result,
568 Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
569 ));
570
571 let result = manager.start_import(100, node_id(), local);
573 assert!(matches!(
574 result,
575 Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
576 ));
577 }
578
579 #[test]
580 fn manager_key_tracking() {
581 let mut manager = MigrationManager::new();
582 let local = node_id();
583 let target = node_id();
584
585 manager.start_migrate(100, local, target).unwrap();
586
587 assert!(!manager.is_key_migrated(100, b"key1"));
588
589 manager.key_migrated(100, b"key1".to_vec());
590 assert!(manager.is_key_migrated(100, b"key1"));
591 assert!(!manager.is_key_migrated(100, b"key2"));
592 }
593
594 #[test]
595 fn manager_complete_migration() {
596 let mut manager = MigrationManager::new();
597 let local = node_id();
598 let target = node_id();
599
600 manager.start_migrate(100, local, target).unwrap();
601 manager.key_migrated(100, b"key1".to_vec());
602
603 let completed = manager.complete_migration(100);
604 assert!(completed.is_some());
605 assert_eq!(completed.unwrap().state, MigrationState::Complete);
606
607 assert!(!manager.is_migrating(100));
609 assert!(!manager.is_key_migrated(100, b"key1"));
610 }
611
612 #[test]
613 fn manager_abort_migration() {
614 let mut manager = MigrationManager::new();
615 let local = node_id();
616 let target = node_id();
617
618 manager.start_migrate(100, local, target).unwrap();
619
620 let aborted = manager.abort_migration(100);
621 assert!(aborted.is_some());
622
623 assert!(!manager.is_migrating(100));
624 }
625
626 #[test]
627 fn batch_operations() {
628 let mut batch = MigrationBatch::new(100, 1);
629
630 assert!(batch.is_empty());
631 assert_eq!(batch.len(), 0);
632
633 batch.add_entry(b"key1".to_vec(), b"value1".to_vec(), 0);
634 batch.add_entry(b"key2".to_vec(), b"value2".to_vec(), 5000);
635
636 assert!(!batch.is_empty());
637 assert_eq!(batch.len(), 2);
638 assert!(!batch.is_final);
639
640 batch.mark_final();
641 assert!(batch.is_final);
642 }
643
644 #[test]
645 fn redirect_formatting() {
646 let moved = MigrationRedirect::Moved {
647 slot: 100,
648 addr: "127.0.0.1:6379".parse().unwrap(),
649 };
650 assert_eq!(
651 moved.to_error_string(),
652 Some("MOVED 100 127.0.0.1:6379".to_string())
653 );
654
655 let ask = MigrationRedirect::Ask {
656 slot: 200,
657 addr: "127.0.0.1:6380".parse().unwrap(),
658 };
659 assert_eq!(
660 ask.to_error_string(),
661 Some("ASK 200 127.0.0.1:6380".to_string())
662 );
663
664 assert_eq!(MigrationRedirect::None.to_error_string(), None);
665 }
666
667 #[test]
668 fn migration_state_display() {
669 assert_eq!(MigrationState::Importing.to_string(), "importing");
670 assert_eq!(MigrationState::Migrating.to_string(), "migrating");
671 assert_eq!(MigrationState::Streaming.to_string(), "streaming");
672 assert_eq!(MigrationState::Finalizing.to_string(), "finalizing");
673 assert_eq!(MigrationState::Complete.to_string(), "complete");
674 }
675
676 #[test]
677 fn config_defaults() {
678 let config = MigrationConfig::default();
679 assert_eq!(config.batch_size, 100);
680 assert_eq!(config.batch_bytes, 1024 * 1024);
681 assert_eq!(config.key_timeout, Duration::from_secs(5));
682 }
683}