1use std::collections::{HashMap, HashSet};
29use std::net::SocketAddr;
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33
34use crate::NodeId;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct MigrationId(pub u64);
39
40impl MigrationId {
41 pub fn new(slot: u16) -> Self {
43 use std::time::{SystemTime, UNIX_EPOCH};
44 let ts = SystemTime::now()
45 .duration_since(UNIX_EPOCH)
46 .unwrap_or_default()
47 .as_nanos() as u64;
48 Self(ts ^ (slot as u64))
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55pub enum MigrationState {
56 Importing,
58 Migrating,
60 Streaming,
62 Finalizing,
64 Complete,
66}
67
68impl std::fmt::Display for MigrationState {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 Self::Importing => write!(f, "importing"),
72 Self::Migrating => write!(f, "migrating"),
73 Self::Streaming => write!(f, "streaming"),
74 Self::Finalizing => write!(f, "finalizing"),
75 Self::Complete => write!(f, "complete"),
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Migration {
83 pub id: MigrationId,
85 pub slot: u16,
87 pub source: NodeId,
89 pub target: NodeId,
91 pub state: MigrationState,
93 #[serde(skip)]
95 pub started_at: Option<Instant>,
96 pub keys_migrated: u64,
98 pub keys_total: Option<u64>,
100}
101
102impl Migration {
103 pub fn new_importing(slot: u16, source: NodeId, target: NodeId) -> Self {
105 Self {
106 id: MigrationId::new(slot),
107 slot,
108 source,
109 target,
110 state: MigrationState::Importing,
111 started_at: Some(Instant::now()),
112 keys_migrated: 0,
113 keys_total: None,
114 }
115 }
116
117 pub fn new_migrating(slot: u16, source: NodeId, target: NodeId) -> Self {
119 Self {
120 id: MigrationId::new(slot),
121 slot,
122 source,
123 target,
124 state: MigrationState::Migrating,
125 started_at: Some(Instant::now()),
126 keys_migrated: 0,
127 keys_total: None,
128 }
129 }
130
131 pub fn involves(&self, node: &NodeId) -> bool {
133 self.source == *node || self.target == *node
134 }
135
136 pub fn progress(&self) -> Option<u8> {
138 self.keys_total.map(|total| {
139 if total == 0 {
140 100
141 } else {
142 ((self.keys_migrated * 100) / total).min(100) as u8
143 }
144 })
145 }
146
147 pub fn start_streaming(&mut self, total_keys: u64) {
149 self.state = MigrationState::Streaming;
150 self.keys_total = Some(total_keys);
151 }
152
153 pub fn record_migrated(&mut self, count: u64) {
155 self.keys_migrated += count;
156 }
157
158 pub fn start_finalizing(&mut self) {
160 self.state = MigrationState::Finalizing;
161 }
162
163 pub fn complete(&mut self) {
165 self.state = MigrationState::Complete;
166 }
167}
168
169#[derive(Debug, Default)]
171pub struct MigrationManager {
172 outgoing: HashMap<u16, Migration>,
174 incoming: HashMap<u16, Migration>,
176 pending_keys: HashMap<u16, HashSet<Vec<u8>>>,
178}
179
180impl MigrationManager {
181 pub fn new() -> Self {
183 Self::default()
184 }
185
186 pub fn is_migrating(&self, slot: u16) -> bool {
188 self.outgoing.contains_key(&slot)
189 }
190
191 pub fn is_importing(&self, slot: u16) -> bool {
193 self.incoming.contains_key(&slot)
194 }
195
196 pub fn get_outgoing(&self, slot: u16) -> Option<&Migration> {
198 self.outgoing.get(&slot)
199 }
200
201 pub fn get_incoming(&self, slot: u16) -> Option<&Migration> {
203 self.incoming.get(&slot)
204 }
205
206 pub fn start_import(
210 &mut self,
211 slot: u16,
212 source: NodeId,
213 local_id: NodeId,
214 ) -> Result<&Migration, MigrationError> {
215 if self.outgoing.contains_key(&slot) {
216 return Err(MigrationError::SlotAlreadyMigrating { slot });
217 }
218 if self.incoming.contains_key(&slot) {
219 return Err(MigrationError::SlotAlreadyImporting { slot });
220 }
221
222 let migration = Migration::new_importing(slot, source, local_id);
223 self.incoming.insert(slot, migration);
224 self.pending_keys.insert(slot, HashSet::new());
225 Ok(self.incoming.get(&slot).unwrap())
226 }
227
228 pub fn start_migrate(
232 &mut self,
233 slot: u16,
234 local_id: NodeId,
235 target: NodeId,
236 ) -> Result<&Migration, MigrationError> {
237 if self.outgoing.contains_key(&slot) {
238 return Err(MigrationError::SlotAlreadyMigrating { slot });
239 }
240 if self.incoming.contains_key(&slot) {
241 return Err(MigrationError::SlotAlreadyImporting { slot });
242 }
243
244 let migration = Migration::new_migrating(slot, local_id, target);
245 self.outgoing.insert(slot, migration);
246 self.pending_keys.insert(slot, HashSet::new());
247 Ok(self.outgoing.get(&slot).unwrap())
248 }
249
250 pub fn key_migrated(&mut self, slot: u16, key: Vec<u8>) {
252 if let Some(keys) = self.pending_keys.get_mut(&slot) {
253 keys.insert(key);
254 }
255 if let Some(migration) = self.outgoing.get_mut(&slot) {
256 migration.record_migrated(1);
257 }
258 }
259
260 pub fn is_key_migrated(&self, slot: u16, key: &[u8]) -> bool {
262 self.pending_keys
263 .get(&slot)
264 .is_some_and(|keys| keys.contains(key))
265 }
266
267 pub fn complete_migration(&mut self, slot: u16) -> Option<Migration> {
269 self.pending_keys.remove(&slot);
270 self.outgoing
272 .remove(&slot)
273 .or_else(|| self.incoming.remove(&slot))
274 .map(|mut m| {
275 m.complete();
276 m
277 })
278 }
279
280 pub fn abort_migration(&mut self, slot: u16) -> Option<Migration> {
282 self.pending_keys.remove(&slot);
283 self.outgoing
284 .remove(&slot)
285 .or_else(|| self.incoming.remove(&slot))
286 }
287
288 pub fn outgoing_migrations(&self) -> impl Iterator<Item = &Migration> {
290 self.outgoing.values()
291 }
292
293 pub fn incoming_migrations(&self) -> impl Iterator<Item = &Migration> {
295 self.incoming.values()
296 }
297
298 pub fn active_count(&self) -> usize {
300 self.outgoing.len() + self.incoming.len()
301 }
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct MigrationBatch {
307 pub slot: u16,
309 pub entries: Vec<MigrationEntry>,
311 pub is_final: bool,
313 pub sequence: u64,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct MigrationEntry {
320 pub key: Vec<u8>,
322 pub value: Vec<u8>,
324 pub ttl_ms: u64,
326}
327
328impl MigrationBatch {
329 pub fn new(slot: u16, sequence: u64) -> Self {
331 Self {
332 slot,
333 entries: Vec::new(),
334 is_final: false,
335 sequence,
336 }
337 }
338
339 pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: u64) {
341 self.entries.push(MigrationEntry { key, value, ttl_ms });
342 }
343
344 pub fn mark_final(&mut self) {
346 self.is_final = true;
347 }
348
349 pub fn len(&self) -> usize {
351 self.entries.len()
352 }
353
354 pub fn is_empty(&self) -> bool {
356 self.entries.is_empty()
357 }
358
359 pub fn size_bytes(&self) -> usize {
361 self.entries
362 .iter()
363 .map(|e| e.key.len() + e.value.len() + 8)
364 .sum()
365 }
366}
367
368#[derive(Debug, Clone, thiserror::Error)]
370pub enum MigrationError {
371 #[error("slot {slot} is already migrating")]
373 SlotAlreadyMigrating { slot: u16 },
374
375 #[error("slot {slot} is already importing")]
377 SlotAlreadyImporting { slot: u16 },
378
379 #[error("no migration in progress for slot {slot}")]
381 NoMigrationInProgress { slot: u16 },
382
383 #[error("cannot reach migration target {addr}: {reason}")]
385 TargetUnreachable { addr: SocketAddr, reason: String },
386
387 #[error("migration for slot {slot} was aborted")]
389 Aborted { slot: u16 },
390
391 #[error("invalid state transition from {from} to {to}")]
393 InvalidStateTransition {
394 from: MigrationState,
395 to: MigrationState,
396 },
397
398 #[error("migration timeout after {elapsed:?}")]
400 Timeout { elapsed: Duration },
401}
402
403#[derive(Debug, Clone, PartialEq, Eq)]
405pub enum MigrationRedirect {
406 None,
408 Moved { slot: u16, addr: SocketAddr },
410 Ask { slot: u16, addr: SocketAddr },
412}
413
414impl MigrationRedirect {
415 pub fn to_error_string(&self) -> Option<String> {
417 match self {
418 Self::None => None,
419 Self::Moved { slot, addr } => Some(format!("MOVED {} {}", slot, addr)),
420 Self::Ask { slot, addr } => Some(format!("ASK {} {}", slot, addr)),
421 }
422 }
423}
424
425#[derive(Debug, Clone)]
427pub struct MigrationConfig {
428 pub batch_size: usize,
430 pub batch_bytes: usize,
432 pub key_timeout: Duration,
434 pub migration_timeout: Duration,
436 pub batch_delay: Duration,
438}
439
440impl Default for MigrationConfig {
441 fn default() -> Self {
442 Self {
443 batch_size: 100,
444 batch_bytes: 1024 * 1024, key_timeout: Duration::from_secs(5),
446 migration_timeout: Duration::from_secs(3600), batch_delay: Duration::from_millis(10),
448 }
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 fn node_id() -> NodeId {
457 NodeId::new()
458 }
459
460 #[test]
461 fn migration_new_importing() {
462 let source = node_id();
463 let target = node_id();
464 let m = Migration::new_importing(100, source, target);
465
466 assert_eq!(m.slot, 100);
467 assert_eq!(m.source, source);
468 assert_eq!(m.target, target);
469 assert_eq!(m.state, MigrationState::Importing);
470 assert_eq!(m.keys_migrated, 0);
471 }
472
473 #[test]
474 fn migration_new_migrating() {
475 let source = node_id();
476 let target = node_id();
477 let m = Migration::new_migrating(100, source, target);
478
479 assert_eq!(m.state, MigrationState::Migrating);
480 }
481
482 #[test]
483 fn migration_involves() {
484 let source = node_id();
485 let target = node_id();
486 let other = node_id();
487 let m = Migration::new_importing(100, source, target);
488
489 assert!(m.involves(&source));
490 assert!(m.involves(&target));
491 assert!(!m.involves(&other));
492 }
493
494 #[test]
495 fn migration_progress() {
496 let mut m = Migration::new_migrating(100, node_id(), node_id());
497
498 assert_eq!(m.progress(), None);
500
501 m.start_streaming(100);
503 assert_eq!(m.progress(), Some(0));
504
505 m.record_migrated(50);
506 assert_eq!(m.progress(), Some(50));
507
508 m.record_migrated(50);
509 assert_eq!(m.progress(), Some(100));
510 }
511
512 #[test]
513 fn migration_state_transitions() {
514 let mut m = Migration::new_migrating(100, node_id(), node_id());
515
516 assert_eq!(m.state, MigrationState::Migrating);
517
518 m.start_streaming(50);
519 assert_eq!(m.state, MigrationState::Streaming);
520
521 m.start_finalizing();
522 assert_eq!(m.state, MigrationState::Finalizing);
523
524 m.complete();
525 assert_eq!(m.state, MigrationState::Complete);
526 }
527
528 #[test]
529 fn manager_start_import() {
530 let mut manager = MigrationManager::new();
531 let source = node_id();
532 let local = node_id();
533
534 let result = manager.start_import(100, source, local);
535 assert!(result.is_ok());
536 assert!(manager.is_importing(100));
537 assert!(!manager.is_migrating(100));
538 }
539
540 #[test]
541 fn manager_start_migrate() {
542 let mut manager = MigrationManager::new();
543 let local = node_id();
544 let target = node_id();
545
546 let result = manager.start_migrate(100, local, target);
547 assert!(result.is_ok());
548 assert!(manager.is_migrating(100));
549 assert!(!manager.is_importing(100));
550 }
551
552 #[test]
553 fn manager_double_migration_error() {
554 let mut manager = MigrationManager::new();
555 let local = node_id();
556 let target = node_id();
557
558 manager.start_migrate(100, local, target).unwrap();
559
560 let result = manager.start_migrate(100, local, node_id());
562 assert!(matches!(
563 result,
564 Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
565 ));
566
567 let result = manager.start_import(100, node_id(), local);
569 assert!(matches!(
570 result,
571 Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
572 ));
573 }
574
575 #[test]
576 fn manager_key_tracking() {
577 let mut manager = MigrationManager::new();
578 let local = node_id();
579 let target = node_id();
580
581 manager.start_migrate(100, local, target).unwrap();
582
583 assert!(!manager.is_key_migrated(100, b"key1"));
584
585 manager.key_migrated(100, b"key1".to_vec());
586 assert!(manager.is_key_migrated(100, b"key1"));
587 assert!(!manager.is_key_migrated(100, b"key2"));
588 }
589
590 #[test]
591 fn manager_complete_migration() {
592 let mut manager = MigrationManager::new();
593 let local = node_id();
594 let target = node_id();
595
596 manager.start_migrate(100, local, target).unwrap();
597 manager.key_migrated(100, b"key1".to_vec());
598
599 let completed = manager.complete_migration(100);
600 assert!(completed.is_some());
601 assert_eq!(completed.unwrap().state, MigrationState::Complete);
602
603 assert!(!manager.is_migrating(100));
605 assert!(!manager.is_key_migrated(100, b"key1"));
606 }
607
608 #[test]
609 fn manager_abort_migration() {
610 let mut manager = MigrationManager::new();
611 let local = node_id();
612 let target = node_id();
613
614 manager.start_migrate(100, local, target).unwrap();
615
616 let aborted = manager.abort_migration(100);
617 assert!(aborted.is_some());
618
619 assert!(!manager.is_migrating(100));
620 }
621
622 #[test]
623 fn batch_operations() {
624 let mut batch = MigrationBatch::new(100, 1);
625
626 assert!(batch.is_empty());
627 assert_eq!(batch.len(), 0);
628
629 batch.add_entry(b"key1".to_vec(), b"value1".to_vec(), 0);
630 batch.add_entry(b"key2".to_vec(), b"value2".to_vec(), 5000);
631
632 assert!(!batch.is_empty());
633 assert_eq!(batch.len(), 2);
634 assert!(!batch.is_final);
635
636 batch.mark_final();
637 assert!(batch.is_final);
638 }
639
640 #[test]
641 fn redirect_formatting() {
642 let moved = MigrationRedirect::Moved {
643 slot: 100,
644 addr: "127.0.0.1:6379".parse().unwrap(),
645 };
646 assert_eq!(
647 moved.to_error_string(),
648 Some("MOVED 100 127.0.0.1:6379".to_string())
649 );
650
651 let ask = MigrationRedirect::Ask {
652 slot: 200,
653 addr: "127.0.0.1:6380".parse().unwrap(),
654 };
655 assert_eq!(
656 ask.to_error_string(),
657 Some("ASK 200 127.0.0.1:6380".to_string())
658 );
659
660 assert_eq!(MigrationRedirect::None.to_error_string(), None);
661 }
662
663 #[test]
664 fn migration_state_display() {
665 assert_eq!(MigrationState::Importing.to_string(), "importing");
666 assert_eq!(MigrationState::Migrating.to_string(), "migrating");
667 assert_eq!(MigrationState::Streaming.to_string(), "streaming");
668 assert_eq!(MigrationState::Finalizing.to_string(), "finalizing");
669 assert_eq!(MigrationState::Complete.to_string(), "complete");
670 }
671
672 #[test]
673 fn config_defaults() {
674 let config = MigrationConfig::default();
675 assert_eq!(config.batch_size, 100);
676 assert_eq!(config.batch_bytes, 1024 * 1024);
677 assert_eq!(config.key_timeout, Duration::from_secs(5));
678 }
679}