Skip to main content

ember_cluster/
migration.rs

1//! Live slot migration for cluster resharding.
2//!
3//! This module implements Redis-compatible slot migration that allows moving
4//! slots between nodes without downtime. The protocol follows these steps:
5//!
6//! 1. Target marks slot as IMPORTING from source
7//! 2. Source marks slot as MIGRATING to target
8//! 3. Keys are streamed in batches from source to target
9//! 4. During migration, reads go to source, writes get ASK redirect to target
10//! 5. Final batch completes, ownership transfers via Raft
11//!
12//! # Example
13//!
14//! ```ignore
15//! // On target node:
16//! CLUSTER SETSLOT 100 IMPORTING <source-node-id>
17//!
18//! // On source node:
19//! CLUSTER SETSLOT 100 MIGRATING <target-node-id>
20//!
21//! // Migrate keys (repeated for each key in slot):
22//! MIGRATE <target-host> <target-port> <key> 0 5000
23//!
24//! // Finalize on both nodes:
25//! CLUSTER SETSLOT 100 NODE <target-node-id>
26//! ```
27
28use std::collections::{HashMap, HashSet};
29use std::net::SocketAddr;
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33
34use crate::NodeId;
35
36/// Unique identifier for a migration operation.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct MigrationId(pub u64);
39
40impl MigrationId {
41    /// Generate a new migration ID from timestamp and slot.
42    pub fn new(slot: u16) -> Self {
43        use std::time::{SystemTime, UNIX_EPOCH};
44        let ts = SystemTime::now()
45            .duration_since(UNIX_EPOCH)
46            .unwrap_or_default()
47            .as_nanos() as u64;
48        // Combine timestamp with slot for uniqueness
49        Self(ts ^ (slot as u64))
50    }
51}
52
53/// Current state of a slot migration.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55pub enum MigrationState {
56    /// Target has marked slot as importing, waiting for source.
57    Importing,
58    /// Source has marked slot as migrating, ready to stream keys.
59    Migrating,
60    /// Keys are being transferred in batches.
61    Streaming,
62    /// Final batch being sent, brief pause on writes.
63    Finalizing,
64    /// Migration complete, ownership transferred.
65    Complete,
66}
67
68impl std::fmt::Display for MigrationState {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            Self::Importing => write!(f, "importing"),
72            Self::Migrating => write!(f, "migrating"),
73            Self::Streaming => write!(f, "streaming"),
74            Self::Finalizing => write!(f, "finalizing"),
75            Self::Complete => write!(f, "complete"),
76        }
77    }
78}
79
80/// A single slot migration operation.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Migration {
83    /// Unique migration identifier.
84    pub id: MigrationId,
85    /// The slot being migrated.
86    pub slot: u16,
87    /// Node currently owning the slot.
88    pub source: NodeId,
89    /// Node receiving the slot.
90    pub target: NodeId,
91    /// Current migration state.
92    pub state: MigrationState,
93    /// When migration started.
94    #[serde(skip)]
95    pub started_at: Option<Instant>,
96    /// Number of keys migrated so far.
97    pub keys_migrated: u64,
98    /// Total keys to migrate (if known).
99    pub keys_total: Option<u64>,
100}
101
102impl Migration {
103    /// Create a new migration in the importing state (target perspective).
104    pub fn new_importing(slot: u16, source: NodeId, target: NodeId) -> Self {
105        Self {
106            id: MigrationId::new(slot),
107            slot,
108            source,
109            target,
110            state: MigrationState::Importing,
111            started_at: Some(Instant::now()),
112            keys_migrated: 0,
113            keys_total: None,
114        }
115    }
116
117    /// Create a new migration in the migrating state (source perspective).
118    pub fn new_migrating(slot: u16, source: NodeId, target: NodeId) -> Self {
119        Self {
120            id: MigrationId::new(slot),
121            slot,
122            source,
123            target,
124            state: MigrationState::Migrating,
125            started_at: Some(Instant::now()),
126            keys_migrated: 0,
127            keys_total: None,
128        }
129    }
130
131    /// Check if this migration involves a specific node.
132    pub fn involves(&self, node: &NodeId) -> bool {
133        self.source == *node || self.target == *node
134    }
135
136    /// Get progress as a percentage (0-100).
137    pub fn progress(&self) -> Option<u8> {
138        self.keys_total.map(|total| {
139            if total == 0 {
140                100
141            } else {
142                ((self.keys_migrated * 100) / total).min(100) as u8
143            }
144        })
145    }
146
147    /// Transition to streaming state.
148    pub fn start_streaming(&mut self, total_keys: u64) {
149        self.state = MigrationState::Streaming;
150        self.keys_total = Some(total_keys);
151    }
152
153    /// Record migrated keys.
154    pub fn record_migrated(&mut self, count: u64) {
155        self.keys_migrated += count;
156    }
157
158    /// Transition to finalizing state.
159    pub fn start_finalizing(&mut self) {
160        self.state = MigrationState::Finalizing;
161    }
162
163    /// Mark migration as complete.
164    pub fn complete(&mut self) {
165        self.state = MigrationState::Complete;
166    }
167}
168
169/// Tracks all active migrations for a node.
170#[derive(Debug, Default)]
171pub struct MigrationManager {
172    /// Migrations where this node is the source (slot is migrating out).
173    outgoing: HashMap<u16, Migration>,
174    /// Migrations where this node is the target (slot is importing in).
175    incoming: HashMap<u16, Migration>,
176    /// Keys that have been migrated but not yet confirmed.
177    pending_keys: HashMap<u16, HashSet<Vec<u8>>>,
178}
179
180impl MigrationManager {
181    /// Create a new migration manager.
182    pub fn new() -> Self {
183        Self::default()
184    }
185
186    /// Check if a slot is currently migrating out.
187    pub fn is_migrating(&self, slot: u16) -> bool {
188        self.outgoing.contains_key(&slot)
189    }
190
191    /// Check if a slot is currently being imported.
192    pub fn is_importing(&self, slot: u16) -> bool {
193        self.incoming.contains_key(&slot)
194    }
195
196    /// Get migration info for a slot that's migrating out.
197    pub fn get_outgoing(&self, slot: u16) -> Option<&Migration> {
198        self.outgoing.get(&slot)
199    }
200
201    /// Get migration info for a slot that's being imported.
202    pub fn get_incoming(&self, slot: u16) -> Option<&Migration> {
203        self.incoming.get(&slot)
204    }
205
206    /// Start importing a slot from another node.
207    ///
208    /// Returns error if slot is already involved in a migration.
209    pub fn start_import(
210        &mut self,
211        slot: u16,
212        source: NodeId,
213        local_id: NodeId,
214    ) -> Result<&Migration, MigrationError> {
215        if self.outgoing.contains_key(&slot) {
216            return Err(MigrationError::SlotAlreadyMigrating { slot });
217        }
218        if self.incoming.contains_key(&slot) {
219            return Err(MigrationError::SlotAlreadyImporting { slot });
220        }
221
222        let migration = Migration::new_importing(slot, source, local_id);
223        self.incoming.insert(slot, migration);
224        self.pending_keys.insert(slot, HashSet::new());
225        Ok(self.incoming.get(&slot).unwrap())
226    }
227
228    /// Start migrating a slot to another node.
229    ///
230    /// Returns error if slot is already involved in a migration.
231    pub fn start_migrate(
232        &mut self,
233        slot: u16,
234        local_id: NodeId,
235        target: NodeId,
236    ) -> Result<&Migration, MigrationError> {
237        if self.outgoing.contains_key(&slot) {
238            return Err(MigrationError::SlotAlreadyMigrating { slot });
239        }
240        if self.incoming.contains_key(&slot) {
241            return Err(MigrationError::SlotAlreadyImporting { slot });
242        }
243
244        let migration = Migration::new_migrating(slot, local_id, target);
245        self.outgoing.insert(slot, migration);
246        self.pending_keys.insert(slot, HashSet::new());
247        Ok(self.outgoing.get(&slot).unwrap())
248    }
249
250    /// Record that a key has been migrated.
251    pub fn key_migrated(&mut self, slot: u16, key: Vec<u8>) {
252        if let Some(keys) = self.pending_keys.get_mut(&slot) {
253            keys.insert(key);
254        }
255        if let Some(migration) = self.outgoing.get_mut(&slot) {
256            migration.record_migrated(1);
257        }
258    }
259
260    /// Check if a specific key has been migrated.
261    pub fn is_key_migrated(&self, slot: u16, key: &[u8]) -> bool {
262        self.pending_keys
263            .get(&slot)
264            .is_some_and(|keys| keys.contains(key))
265    }
266
267    /// Complete a migration and clean up state.
268    pub fn complete_migration(&mut self, slot: u16) -> Option<Migration> {
269        self.pending_keys.remove(&slot);
270        // Try outgoing first, then incoming
271        self.outgoing
272            .remove(&slot)
273            .or_else(|| self.incoming.remove(&slot))
274            .map(|mut m| {
275                m.complete();
276                m
277            })
278    }
279
280    /// Abort a migration and clean up state.
281    pub fn abort_migration(&mut self, slot: u16) -> Option<Migration> {
282        self.pending_keys.remove(&slot);
283        self.outgoing
284            .remove(&slot)
285            .or_else(|| self.incoming.remove(&slot))
286    }
287
288    /// Get all active outgoing migrations.
289    pub fn outgoing_migrations(&self) -> impl Iterator<Item = &Migration> {
290        self.outgoing.values()
291    }
292
293    /// Get all active incoming migrations.
294    pub fn incoming_migrations(&self) -> impl Iterator<Item = &Migration> {
295        self.incoming.values()
296    }
297
298    /// Get total number of active migrations.
299    pub fn active_count(&self) -> usize {
300        self.outgoing.len() + self.incoming.len()
301    }
302}
303
304/// Represents a batch of keys to migrate.
305#[derive(Debug, Clone, Serialize, Deserialize)]
306pub struct MigrationBatch {
307    /// The slot being migrated.
308    pub slot: u16,
309    /// Keys and their values in this batch.
310    pub entries: Vec<MigrationEntry>,
311    /// Whether this is the final batch.
312    pub is_final: bool,
313    /// Sequence number for ordering.
314    pub sequence: u64,
315}
316
317/// A single key-value entry being migrated.
318#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct MigrationEntry {
320    /// The key being migrated.
321    pub key: Vec<u8>,
322    /// Serialized value data.
323    pub value: Vec<u8>,
324    /// TTL remaining in milliseconds (0 = no expiry).
325    pub ttl_ms: u64,
326}
327
328impl MigrationBatch {
329    /// Create a new migration batch.
330    pub fn new(slot: u16, sequence: u64) -> Self {
331        Self {
332            slot,
333            entries: Vec::new(),
334            is_final: false,
335            sequence,
336        }
337    }
338
339    /// Add an entry to the batch.
340    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: u64) {
341        self.entries.push(MigrationEntry { key, value, ttl_ms });
342    }
343
344    /// Mark this as the final batch.
345    pub fn mark_final(&mut self) {
346        self.is_final = true;
347    }
348
349    /// Get the number of entries in this batch.
350    pub fn len(&self) -> usize {
351        self.entries.len()
352    }
353
354    /// Check if the batch is empty.
355    pub fn is_empty(&self) -> bool {
356        self.entries.is_empty()
357    }
358
359    /// Estimate the size of this batch in bytes.
360    pub fn size_bytes(&self) -> usize {
361        self.entries
362            .iter()
363            .map(|e| e.key.len() + e.value.len() + 8)
364            .sum()
365    }
366}
367
368/// Errors that can occur during migration.
369#[derive(Debug, Clone, thiserror::Error)]
370pub enum MigrationError {
371    /// Slot is already being migrated out.
372    #[error("slot {slot} is already migrating")]
373    SlotAlreadyMigrating { slot: u16 },
374
375    /// Slot is already being imported.
376    #[error("slot {slot} is already importing")]
377    SlotAlreadyImporting { slot: u16 },
378
379    /// No migration in progress for this slot.
380    #[error("no migration in progress for slot {slot}")]
381    NoMigrationInProgress { slot: u16 },
382
383    /// Migration target is unreachable.
384    #[error("cannot reach migration target {addr}: {reason}")]
385    TargetUnreachable { addr: SocketAddr, reason: String },
386
387    /// Migration was aborted.
388    #[error("migration for slot {slot} was aborted")]
389    Aborted { slot: u16 },
390
391    /// Invalid migration state transition.
392    #[error("invalid state transition from {from} to {to}")]
393    InvalidStateTransition {
394        from: MigrationState,
395        to: MigrationState,
396    },
397
398    /// Timeout during migration.
399    #[error("migration timeout after {elapsed:?}")]
400    Timeout { elapsed: Duration },
401}
402
403/// Result of checking whether a command should be redirected during migration.
404#[derive(Debug, Clone, PartialEq, Eq)]
405pub enum MigrationRedirect {
406    /// No redirect needed, handle locally.
407    None,
408    /// Send MOVED redirect (slot permanently moved).
409    Moved { slot: u16, addr: SocketAddr },
410    /// Send ASK redirect (slot temporarily at another node).
411    Ask { slot: u16, addr: SocketAddr },
412}
413
414impl MigrationRedirect {
415    /// Format as RESP error string.
416    pub fn to_error_string(&self) -> Option<String> {
417        match self {
418            Self::None => None,
419            Self::Moved { slot, addr } => Some(format!("MOVED {} {}", slot, addr)),
420            Self::Ask { slot, addr } => Some(format!("ASK {} {}", slot, addr)),
421        }
422    }
423}
424
425/// Configuration for migration behavior.
426#[derive(Debug, Clone)]
427pub struct MigrationConfig {
428    /// Maximum keys per batch.
429    pub batch_size: usize,
430    /// Maximum batch size in bytes.
431    pub batch_bytes: usize,
432    /// Timeout for individual key migration.
433    pub key_timeout: Duration,
434    /// Timeout for entire migration.
435    pub migration_timeout: Duration,
436    /// Delay between batches to avoid overwhelming the network.
437    pub batch_delay: Duration,
438}
439
440impl Default for MigrationConfig {
441    fn default() -> Self {
442        Self {
443            batch_size: 100,
444            batch_bytes: 1024 * 1024, // 1MB
445            key_timeout: Duration::from_secs(5),
446            migration_timeout: Duration::from_secs(3600), // 1 hour
447            batch_delay: Duration::from_millis(10),
448        }
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    fn node_id() -> NodeId {
457        NodeId::new()
458    }
459
460    #[test]
461    fn migration_new_importing() {
462        let source = node_id();
463        let target = node_id();
464        let m = Migration::new_importing(100, source, target);
465
466        assert_eq!(m.slot, 100);
467        assert_eq!(m.source, source);
468        assert_eq!(m.target, target);
469        assert_eq!(m.state, MigrationState::Importing);
470        assert_eq!(m.keys_migrated, 0);
471    }
472
473    #[test]
474    fn migration_new_migrating() {
475        let source = node_id();
476        let target = node_id();
477        let m = Migration::new_migrating(100, source, target);
478
479        assert_eq!(m.state, MigrationState::Migrating);
480    }
481
482    #[test]
483    fn migration_involves() {
484        let source = node_id();
485        let target = node_id();
486        let other = node_id();
487        let m = Migration::new_importing(100, source, target);
488
489        assert!(m.involves(&source));
490        assert!(m.involves(&target));
491        assert!(!m.involves(&other));
492    }
493
494    #[test]
495    fn migration_progress() {
496        let mut m = Migration::new_migrating(100, node_id(), node_id());
497
498        // No total set
499        assert_eq!(m.progress(), None);
500
501        // Set total and migrate some
502        m.start_streaming(100);
503        assert_eq!(m.progress(), Some(0));
504
505        m.record_migrated(50);
506        assert_eq!(m.progress(), Some(50));
507
508        m.record_migrated(50);
509        assert_eq!(m.progress(), Some(100));
510    }
511
512    #[test]
513    fn migration_state_transitions() {
514        let mut m = Migration::new_migrating(100, node_id(), node_id());
515
516        assert_eq!(m.state, MigrationState::Migrating);
517
518        m.start_streaming(50);
519        assert_eq!(m.state, MigrationState::Streaming);
520
521        m.start_finalizing();
522        assert_eq!(m.state, MigrationState::Finalizing);
523
524        m.complete();
525        assert_eq!(m.state, MigrationState::Complete);
526    }
527
528    #[test]
529    fn manager_start_import() {
530        let mut manager = MigrationManager::new();
531        let source = node_id();
532        let local = node_id();
533
534        let result = manager.start_import(100, source, local);
535        assert!(result.is_ok());
536        assert!(manager.is_importing(100));
537        assert!(!manager.is_migrating(100));
538    }
539
540    #[test]
541    fn manager_start_migrate() {
542        let mut manager = MigrationManager::new();
543        let local = node_id();
544        let target = node_id();
545
546        let result = manager.start_migrate(100, local, target);
547        assert!(result.is_ok());
548        assert!(manager.is_migrating(100));
549        assert!(!manager.is_importing(100));
550    }
551
552    #[test]
553    fn manager_double_migration_error() {
554        let mut manager = MigrationManager::new();
555        let local = node_id();
556        let target = node_id();
557
558        manager.start_migrate(100, local, target).unwrap();
559
560        // Can't migrate same slot again
561        let result = manager.start_migrate(100, local, node_id());
562        assert!(matches!(
563            result,
564            Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
565        ));
566
567        // Can't import a slot that's migrating
568        let result = manager.start_import(100, node_id(), local);
569        assert!(matches!(
570            result,
571            Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
572        ));
573    }
574
575    #[test]
576    fn manager_key_tracking() {
577        let mut manager = MigrationManager::new();
578        let local = node_id();
579        let target = node_id();
580
581        manager.start_migrate(100, local, target).unwrap();
582
583        assert!(!manager.is_key_migrated(100, b"key1"));
584
585        manager.key_migrated(100, b"key1".to_vec());
586        assert!(manager.is_key_migrated(100, b"key1"));
587        assert!(!manager.is_key_migrated(100, b"key2"));
588    }
589
590    #[test]
591    fn manager_complete_migration() {
592        let mut manager = MigrationManager::new();
593        let local = node_id();
594        let target = node_id();
595
596        manager.start_migrate(100, local, target).unwrap();
597        manager.key_migrated(100, b"key1".to_vec());
598
599        let completed = manager.complete_migration(100);
600        assert!(completed.is_some());
601        assert_eq!(completed.unwrap().state, MigrationState::Complete);
602
603        // State should be cleaned up
604        assert!(!manager.is_migrating(100));
605        assert!(!manager.is_key_migrated(100, b"key1"));
606    }
607
608    #[test]
609    fn manager_abort_migration() {
610        let mut manager = MigrationManager::new();
611        let local = node_id();
612        let target = node_id();
613
614        manager.start_migrate(100, local, target).unwrap();
615
616        let aborted = manager.abort_migration(100);
617        assert!(aborted.is_some());
618
619        assert!(!manager.is_migrating(100));
620    }
621
622    #[test]
623    fn batch_operations() {
624        let mut batch = MigrationBatch::new(100, 1);
625
626        assert!(batch.is_empty());
627        assert_eq!(batch.len(), 0);
628
629        batch.add_entry(b"key1".to_vec(), b"value1".to_vec(), 0);
630        batch.add_entry(b"key2".to_vec(), b"value2".to_vec(), 5000);
631
632        assert!(!batch.is_empty());
633        assert_eq!(batch.len(), 2);
634        assert!(!batch.is_final);
635
636        batch.mark_final();
637        assert!(batch.is_final);
638    }
639
640    #[test]
641    fn redirect_formatting() {
642        let moved = MigrationRedirect::Moved {
643            slot: 100,
644            addr: "127.0.0.1:6379".parse().unwrap(),
645        };
646        assert_eq!(
647            moved.to_error_string(),
648            Some("MOVED 100 127.0.0.1:6379".to_string())
649        );
650
651        let ask = MigrationRedirect::Ask {
652            slot: 200,
653            addr: "127.0.0.1:6380".parse().unwrap(),
654        };
655        assert_eq!(
656            ask.to_error_string(),
657            Some("ASK 200 127.0.0.1:6380".to_string())
658        );
659
660        assert_eq!(MigrationRedirect::None.to_error_string(), None);
661    }
662
663    #[test]
664    fn migration_state_display() {
665        assert_eq!(MigrationState::Importing.to_string(), "importing");
666        assert_eq!(MigrationState::Migrating.to_string(), "migrating");
667        assert_eq!(MigrationState::Streaming.to_string(), "streaming");
668        assert_eq!(MigrationState::Finalizing.to_string(), "finalizing");
669        assert_eq!(MigrationState::Complete.to_string(), "complete");
670    }
671
672    #[test]
673    fn config_defaults() {
674        let config = MigrationConfig::default();
675        assert_eq!(config.batch_size, 100);
676        assert_eq!(config.batch_bytes, 1024 * 1024);
677        assert_eq!(config.key_timeout, Duration::from_secs(5));
678    }
679}