Skip to main content

ember_cluster/
migration.rs

1//! Live slot migration for cluster resharding.
2//!
3//! This module implements Redis-compatible slot migration that allows moving
4//! slots between nodes without downtime. The protocol follows these steps:
5//!
6//! 1. Target marks slot as IMPORTING from source
7//! 2. Source marks slot as MIGRATING to target
8//! 3. Keys are streamed in batches from source to target
9//! 4. During migration, reads go to source, writes get ASK redirect to target
10//! 5. Final batch completes, ownership transfers via Raft
11//!
12//! # Example
13//!
14//! ```ignore
15//! // On target node:
16//! CLUSTER SETSLOT 100 IMPORTING <source-node-id>
17//!
18//! // On source node:
19//! CLUSTER SETSLOT 100 MIGRATING <target-node-id>
20//!
21//! // Migrate keys (repeated for each key in slot):
22//! MIGRATE <target-host> <target-port> <key> 0 5000
23//!
24//! // Finalize on both nodes:
25//! CLUSTER SETSLOT 100 NODE <target-node-id>
26//! ```
27
28use std::collections::{HashMap, HashSet};
29use std::net::SocketAddr;
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33
34use crate::NodeId;
35
36/// Unique identifier for a migration operation.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct MigrationId(pub u64);
39
40impl MigrationId {
41    /// Generate a new migration ID from timestamp, slot, and random bits.
42    ///
43    /// The ID combines nanosecond timestamp with the slot number and 16 bits
44    /// of randomness. The random component prevents collisions when the
45    /// system clock is unavailable (`unwrap_or_default` returns zero) or
46    /// when two migrations for the same slot start within the same nanosecond.
47    pub fn new(slot: u16) -> Self {
48        use rand::Rng;
49        use std::time::{SystemTime, UNIX_EPOCH};
50        let ts = SystemTime::now()
51            .duration_since(UNIX_EPOCH)
52            .unwrap_or_default()
53            .as_nanos() as u64;
54        let noise: u16 = rand::rng().random();
55        Self(ts ^ (slot as u64) ^ ((noise as u64) << 48))
56    }
57}
58
59/// Current state of a slot migration.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61pub enum MigrationState {
62    /// Target has marked slot as importing, waiting for source.
63    Importing,
64    /// Source has marked slot as migrating, ready to stream keys.
65    Migrating,
66    /// Keys are being transferred in batches.
67    Streaming,
68    /// Final batch being sent, brief pause on writes.
69    Finalizing,
70    /// Migration complete, ownership transferred.
71    Complete,
72}
73
74impl std::fmt::Display for MigrationState {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        match self {
77            Self::Importing => write!(f, "importing"),
78            Self::Migrating => write!(f, "migrating"),
79            Self::Streaming => write!(f, "streaming"),
80            Self::Finalizing => write!(f, "finalizing"),
81            Self::Complete => write!(f, "complete"),
82        }
83    }
84}
85
86/// A single slot migration operation.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct Migration {
89    /// Unique migration identifier.
90    pub id: MigrationId,
91    /// The slot being migrated.
92    pub slot: u16,
93    /// Node currently owning the slot.
94    pub source: NodeId,
95    /// Node receiving the slot.
96    pub target: NodeId,
97    /// Current migration state.
98    pub state: MigrationState,
99    /// When migration started.
100    #[serde(skip)]
101    pub started_at: Option<Instant>,
102    /// Number of keys migrated so far.
103    pub keys_migrated: u64,
104    /// Total keys to migrate (if known).
105    pub keys_total: Option<u64>,
106}
107
108impl Migration {
109    /// Create a new migration in the importing state (target perspective).
110    pub fn new_importing(slot: u16, source: NodeId, target: NodeId) -> Self {
111        Self {
112            id: MigrationId::new(slot),
113            slot,
114            source,
115            target,
116            state: MigrationState::Importing,
117            started_at: Some(Instant::now()),
118            keys_migrated: 0,
119            keys_total: None,
120        }
121    }
122
123    /// Create a new migration in the migrating state (source perspective).
124    pub fn new_migrating(slot: u16, source: NodeId, target: NodeId) -> Self {
125        Self {
126            id: MigrationId::new(slot),
127            slot,
128            source,
129            target,
130            state: MigrationState::Migrating,
131            started_at: Some(Instant::now()),
132            keys_migrated: 0,
133            keys_total: None,
134        }
135    }
136
137    /// Check if this migration involves a specific node.
138    pub fn involves(&self, node: &NodeId) -> bool {
139        self.source == *node || self.target == *node
140    }
141
142    /// Get progress as a percentage (0-100).
143    pub fn progress(&self) -> Option<u8> {
144        self.keys_total.map(|total| {
145            if total == 0 {
146                100
147            } else {
148                ((self.keys_migrated as f64 / total as f64) * 100.0).min(100.0) as u8
149            }
150        })
151    }
152
153    /// Transition to streaming state.
154    pub fn start_streaming(&mut self, total_keys: u64) {
155        self.state = MigrationState::Streaming;
156        self.keys_total = Some(total_keys);
157    }
158
159    /// Record migrated keys.
160    pub fn record_migrated(&mut self, count: u64) {
161        self.keys_migrated += count;
162    }
163
164    /// Transition to finalizing state.
165    pub fn start_finalizing(&mut self) {
166        self.state = MigrationState::Finalizing;
167    }
168
169    /// Mark migration as complete.
170    pub fn complete(&mut self) {
171        self.state = MigrationState::Complete;
172    }
173}
174
175/// Tracks all active migrations for a node.
176#[derive(Debug, Default)]
177pub struct MigrationManager {
178    /// Migrations where this node is the source (slot is migrating out).
179    outgoing: HashMap<u16, Migration>,
180    /// Migrations where this node is the target (slot is importing in).
181    incoming: HashMap<u16, Migration>,
182    /// Keys that have been migrated but not yet confirmed.
183    pending_keys: HashMap<u16, HashSet<Vec<u8>>>,
184}
185
186impl MigrationManager {
187    /// Create a new migration manager.
188    pub fn new() -> Self {
189        Self::default()
190    }
191
192    /// Check if a slot is currently migrating out.
193    pub fn is_migrating(&self, slot: u16) -> bool {
194        self.outgoing.contains_key(&slot)
195    }
196
197    /// Check if a slot is currently being imported.
198    pub fn is_importing(&self, slot: u16) -> bool {
199        self.incoming.contains_key(&slot)
200    }
201
202    /// Get migration info for a slot that's migrating out.
203    pub fn get_outgoing(&self, slot: u16) -> Option<&Migration> {
204        self.outgoing.get(&slot)
205    }
206
207    /// Get migration info for a slot that's being imported.
208    pub fn get_incoming(&self, slot: u16) -> Option<&Migration> {
209        self.incoming.get(&slot)
210    }
211
212    /// Start importing a slot from another node.
213    ///
214    /// Returns error if slot is already involved in a migration.
215    pub fn start_import(
216        &mut self,
217        slot: u16,
218        source: NodeId,
219        local_id: NodeId,
220    ) -> Result<&Migration, MigrationError> {
221        if self.outgoing.contains_key(&slot) {
222            return Err(MigrationError::SlotAlreadyMigrating { slot });
223        }
224        if self.incoming.contains_key(&slot) {
225            return Err(MigrationError::SlotAlreadyImporting { slot });
226        }
227
228        let migration = Migration::new_importing(slot, source, local_id);
229        self.incoming.insert(slot, migration);
230        self.pending_keys.insert(slot, HashSet::new());
231        self.incoming
232            .get(&slot)
233            .ok_or(MigrationError::NoMigrationInProgress { slot })
234    }
235
236    /// Start migrating a slot to another node.
237    ///
238    /// Returns error if slot is already involved in a migration.
239    pub fn start_migrate(
240        &mut self,
241        slot: u16,
242        local_id: NodeId,
243        target: NodeId,
244    ) -> Result<&Migration, MigrationError> {
245        if self.outgoing.contains_key(&slot) {
246            return Err(MigrationError::SlotAlreadyMigrating { slot });
247        }
248        if self.incoming.contains_key(&slot) {
249            return Err(MigrationError::SlotAlreadyImporting { slot });
250        }
251
252        let migration = Migration::new_migrating(slot, local_id, target);
253        self.outgoing.insert(slot, migration);
254        self.pending_keys.insert(slot, HashSet::new());
255        self.outgoing
256            .get(&slot)
257            .ok_or(MigrationError::NoMigrationInProgress { slot })
258    }
259
260    /// Record that a key has been migrated.
261    pub fn key_migrated(&mut self, slot: u16, key: Vec<u8>) {
262        if let Some(keys) = self.pending_keys.get_mut(&slot) {
263            keys.insert(key);
264        }
265        if let Some(migration) = self.outgoing.get_mut(&slot) {
266            migration.record_migrated(1);
267        }
268    }
269
270    /// Check if a specific key has been migrated.
271    pub fn is_key_migrated(&self, slot: u16, key: &[u8]) -> bool {
272        self.pending_keys
273            .get(&slot)
274            .is_some_and(|keys| keys.contains(key))
275    }
276
277    /// Complete a migration and clean up state.
278    pub fn complete_migration(&mut self, slot: u16) -> Option<Migration> {
279        self.pending_keys.remove(&slot);
280        // Try outgoing first, then incoming
281        self.outgoing
282            .remove(&slot)
283            .or_else(|| self.incoming.remove(&slot))
284            .map(|mut m| {
285                m.complete();
286                m
287            })
288    }
289
290    /// Abort a migration and clean up state.
291    pub fn abort_migration(&mut self, slot: u16) -> Option<Migration> {
292        self.pending_keys.remove(&slot);
293        self.outgoing
294            .remove(&slot)
295            .or_else(|| self.incoming.remove(&slot))
296    }
297
298    /// Get all active outgoing migrations.
299    pub fn outgoing_migrations(&self) -> impl Iterator<Item = &Migration> {
300        self.outgoing.values()
301    }
302
303    /// Get all active incoming migrations.
304    pub fn incoming_migrations(&self) -> impl Iterator<Item = &Migration> {
305        self.incoming.values()
306    }
307
308    /// Get total number of active migrations.
309    pub fn active_count(&self) -> usize {
310        self.outgoing.len() + self.incoming.len()
311    }
312}
313
314/// Represents a batch of keys to migrate.
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct MigrationBatch {
317    /// The slot being migrated.
318    pub slot: u16,
319    /// Keys and their values in this batch.
320    pub entries: Vec<MigrationEntry>,
321    /// Whether this is the final batch.
322    pub is_final: bool,
323    /// Sequence number for ordering.
324    pub sequence: u64,
325}
326
327/// A single key-value entry being migrated.
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct MigrationEntry {
330    /// The key being migrated.
331    pub key: Vec<u8>,
332    /// Serialized value data.
333    pub value: Vec<u8>,
334    /// TTL remaining in milliseconds (0 = no expiry).
335    pub ttl_ms: u64,
336}
337
338impl MigrationBatch {
339    /// Create a new migration batch.
340    pub fn new(slot: u16, sequence: u64) -> Self {
341        Self {
342            slot,
343            entries: Vec::new(),
344            is_final: false,
345            sequence,
346        }
347    }
348
349    /// Add an entry to the batch.
350    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: u64) {
351        self.entries.push(MigrationEntry { key, value, ttl_ms });
352    }
353
354    /// Mark this as the final batch.
355    pub fn mark_final(&mut self) {
356        self.is_final = true;
357    }
358
359    /// Get the number of entries in this batch.
360    pub fn len(&self) -> usize {
361        self.entries.len()
362    }
363
364    /// Check if the batch is empty.
365    pub fn is_empty(&self) -> bool {
366        self.entries.is_empty()
367    }
368
369    /// Estimate the size of this batch in bytes.
370    pub fn size_bytes(&self) -> usize {
371        self.entries
372            .iter()
373            .map(|e| e.key.len() + e.value.len() + 8)
374            .sum()
375    }
376}
377
378/// Errors that can occur during migration.
379#[derive(Debug, Clone, thiserror::Error)]
380pub enum MigrationError {
381    /// Slot is already being migrated out.
382    #[error("slot {slot} is already migrating")]
383    SlotAlreadyMigrating { slot: u16 },
384
385    /// Slot is already being imported.
386    #[error("slot {slot} is already importing")]
387    SlotAlreadyImporting { slot: u16 },
388
389    /// No migration in progress for this slot.
390    #[error("no migration in progress for slot {slot}")]
391    NoMigrationInProgress { slot: u16 },
392
393    /// Migration target is unreachable.
394    #[error("cannot reach migration target {addr}: {reason}")]
395    TargetUnreachable { addr: SocketAddr, reason: String },
396
397    /// Migration was aborted.
398    #[error("migration for slot {slot} was aborted")]
399    Aborted { slot: u16 },
400
401    /// Invalid migration state transition.
402    #[error("invalid state transition from {from} to {to}")]
403    InvalidStateTransition {
404        from: MigrationState,
405        to: MigrationState,
406    },
407
408    /// Timeout during migration.
409    #[error("migration timeout after {elapsed:?}")]
410    Timeout { elapsed: Duration },
411}
412
413/// Result of checking whether a command should be redirected during migration.
414#[derive(Debug, Clone, PartialEq, Eq)]
415pub enum MigrationRedirect {
416    /// No redirect needed, handle locally.
417    None,
418    /// Send MOVED redirect (slot permanently moved).
419    Moved { slot: u16, addr: SocketAddr },
420    /// Send ASK redirect (slot temporarily at another node).
421    Ask { slot: u16, addr: SocketAddr },
422}
423
424impl MigrationRedirect {
425    /// Format as RESP error string.
426    pub fn to_error_string(&self) -> Option<String> {
427        match self {
428            Self::None => None,
429            Self::Moved { slot, addr } => Some(format!("MOVED {} {}", slot, addr)),
430            Self::Ask { slot, addr } => Some(format!("ASK {} {}", slot, addr)),
431        }
432    }
433}
434
435/// Configuration for migration behavior.
436#[derive(Debug, Clone)]
437pub struct MigrationConfig {
438    /// Maximum keys per batch.
439    pub batch_size: usize,
440    /// Maximum batch size in bytes.
441    pub batch_bytes: usize,
442    /// Timeout for individual key migration.
443    pub key_timeout: Duration,
444    /// Timeout for entire migration.
445    pub migration_timeout: Duration,
446    /// Delay between batches to avoid overwhelming the network.
447    pub batch_delay: Duration,
448}
449
450impl Default for MigrationConfig {
451    fn default() -> Self {
452        Self {
453            batch_size: 100,
454            batch_bytes: 1024 * 1024, // 1MB
455            key_timeout: Duration::from_secs(5),
456            migration_timeout: Duration::from_secs(3600), // 1 hour
457            batch_delay: Duration::from_millis(10),
458        }
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    fn node_id() -> NodeId {
467        NodeId::new()
468    }
469
470    #[test]
471    fn migration_new_importing() {
472        let source = node_id();
473        let target = node_id();
474        let m = Migration::new_importing(100, source, target);
475
476        assert_eq!(m.slot, 100);
477        assert_eq!(m.source, source);
478        assert_eq!(m.target, target);
479        assert_eq!(m.state, MigrationState::Importing);
480        assert_eq!(m.keys_migrated, 0);
481    }
482
483    #[test]
484    fn migration_new_migrating() {
485        let source = node_id();
486        let target = node_id();
487        let m = Migration::new_migrating(100, source, target);
488
489        assert_eq!(m.state, MigrationState::Migrating);
490    }
491
492    #[test]
493    fn migration_involves() {
494        let source = node_id();
495        let target = node_id();
496        let other = node_id();
497        let m = Migration::new_importing(100, source, target);
498
499        assert!(m.involves(&source));
500        assert!(m.involves(&target));
501        assert!(!m.involves(&other));
502    }
503
504    #[test]
505    fn migration_progress() {
506        let mut m = Migration::new_migrating(100, node_id(), node_id());
507
508        // No total set
509        assert_eq!(m.progress(), None);
510
511        // Set total and migrate some
512        m.start_streaming(100);
513        assert_eq!(m.progress(), Some(0));
514
515        m.record_migrated(50);
516        assert_eq!(m.progress(), Some(50));
517
518        m.record_migrated(50);
519        assert_eq!(m.progress(), Some(100));
520    }
521
522    #[test]
523    fn migration_state_transitions() {
524        let mut m = Migration::new_migrating(100, node_id(), node_id());
525
526        assert_eq!(m.state, MigrationState::Migrating);
527
528        m.start_streaming(50);
529        assert_eq!(m.state, MigrationState::Streaming);
530
531        m.start_finalizing();
532        assert_eq!(m.state, MigrationState::Finalizing);
533
534        m.complete();
535        assert_eq!(m.state, MigrationState::Complete);
536    }
537
538    #[test]
539    fn manager_start_import() {
540        let mut manager = MigrationManager::new();
541        let source = node_id();
542        let local = node_id();
543
544        let result = manager.start_import(100, source, local);
545        assert!(result.is_ok());
546        assert!(manager.is_importing(100));
547        assert!(!manager.is_migrating(100));
548    }
549
550    #[test]
551    fn manager_start_migrate() {
552        let mut manager = MigrationManager::new();
553        let local = node_id();
554        let target = node_id();
555
556        let result = manager.start_migrate(100, local, target);
557        assert!(result.is_ok());
558        assert!(manager.is_migrating(100));
559        assert!(!manager.is_importing(100));
560    }
561
562    #[test]
563    fn manager_double_migration_error() {
564        let mut manager = MigrationManager::new();
565        let local = node_id();
566        let target = node_id();
567
568        manager.start_migrate(100, local, target).unwrap();
569
570        // Can't migrate same slot again
571        let result = manager.start_migrate(100, local, node_id());
572        assert!(matches!(
573            result,
574            Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
575        ));
576
577        // Can't import a slot that's migrating
578        let result = manager.start_import(100, node_id(), local);
579        assert!(matches!(
580            result,
581            Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
582        ));
583    }
584
585    #[test]
586    fn manager_key_tracking() {
587        let mut manager = MigrationManager::new();
588        let local = node_id();
589        let target = node_id();
590
591        manager.start_migrate(100, local, target).unwrap();
592
593        assert!(!manager.is_key_migrated(100, b"key1"));
594
595        manager.key_migrated(100, b"key1".to_vec());
596        assert!(manager.is_key_migrated(100, b"key1"));
597        assert!(!manager.is_key_migrated(100, b"key2"));
598    }
599
600    #[test]
601    fn manager_complete_migration() {
602        let mut manager = MigrationManager::new();
603        let local = node_id();
604        let target = node_id();
605
606        manager.start_migrate(100, local, target).unwrap();
607        manager.key_migrated(100, b"key1".to_vec());
608
609        let completed = manager.complete_migration(100);
610        assert!(completed.is_some());
611        assert_eq!(completed.unwrap().state, MigrationState::Complete);
612
613        // State should be cleaned up
614        assert!(!manager.is_migrating(100));
615        assert!(!manager.is_key_migrated(100, b"key1"));
616    }
617
618    #[test]
619    fn manager_abort_migration() {
620        let mut manager = MigrationManager::new();
621        let local = node_id();
622        let target = node_id();
623
624        manager.start_migrate(100, local, target).unwrap();
625
626        let aborted = manager.abort_migration(100);
627        assert!(aborted.is_some());
628
629        assert!(!manager.is_migrating(100));
630    }
631
632    #[test]
633    fn batch_operations() {
634        let mut batch = MigrationBatch::new(100, 1);
635
636        assert!(batch.is_empty());
637        assert_eq!(batch.len(), 0);
638
639        batch.add_entry(b"key1".to_vec(), b"value1".to_vec(), 0);
640        batch.add_entry(b"key2".to_vec(), b"value2".to_vec(), 5000);
641
642        assert!(!batch.is_empty());
643        assert_eq!(batch.len(), 2);
644        assert!(!batch.is_final);
645
646        batch.mark_final();
647        assert!(batch.is_final);
648    }
649
650    #[test]
651    fn redirect_formatting() {
652        let moved = MigrationRedirect::Moved {
653            slot: 100,
654            addr: "127.0.0.1:6379".parse().unwrap(),
655        };
656        assert_eq!(
657            moved.to_error_string(),
658            Some("MOVED 100 127.0.0.1:6379".to_string())
659        );
660
661        let ask = MigrationRedirect::Ask {
662            slot: 200,
663            addr: "127.0.0.1:6380".parse().unwrap(),
664        };
665        assert_eq!(
666            ask.to_error_string(),
667            Some("ASK 200 127.0.0.1:6380".to_string())
668        );
669
670        assert_eq!(MigrationRedirect::None.to_error_string(), None);
671    }
672
673    #[test]
674    fn migration_state_display() {
675        assert_eq!(MigrationState::Importing.to_string(), "importing");
676        assert_eq!(MigrationState::Migrating.to_string(), "migrating");
677        assert_eq!(MigrationState::Streaming.to_string(), "streaming");
678        assert_eq!(MigrationState::Finalizing.to_string(), "finalizing");
679        assert_eq!(MigrationState::Complete.to_string(), "complete");
680    }
681
682    #[test]
683    fn config_defaults() {
684        let config = MigrationConfig::default();
685        assert_eq!(config.batch_size, 100);
686        assert_eq!(config.batch_bytes, 1024 * 1024);
687        assert_eq!(config.key_timeout, Duration::from_secs(5));
688    }
689}