Skip to main content

ember_cluster/
migration.rs

1//! Live slot migration for cluster resharding.
2//!
3//! This module implements Redis-compatible slot migration that allows moving
4//! slots between nodes without downtime. The protocol follows these steps:
5//!
6//! 1. Target marks slot as IMPORTING from source
7//! 2. Source marks slot as MIGRATING to target
8//! 3. Keys are streamed in batches from source to target
9//! 4. During migration, reads go to source, writes get ASK redirect to target
10//! 5. Final batch completes, ownership transfers via Raft
11//!
12//! # Example
13//!
14//! ```ignore
15//! // On target node:
16//! CLUSTER SETSLOT 100 IMPORTING <source-node-id>
17//!
18//! // On source node:
19//! CLUSTER SETSLOT 100 MIGRATING <target-node-id>
20//!
21//! // Migrate keys (repeated for each key in slot):
22//! MIGRATE <target-host> <target-port> <key> 0 5000
23//!
24//! // Finalize on both nodes:
25//! CLUSTER SETSLOT 100 NODE <target-node-id>
26//! ```
27
28use std::collections::{HashMap, HashSet};
29use std::net::SocketAddr;
30use std::time::{Duration, Instant};
31
32use serde::{Deserialize, Serialize};
33
34use crate::NodeId;
35
36/// Unique identifier for a migration operation.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub struct MigrationId(pub u64);
39
40impl MigrationId {
41    /// Generate a new migration ID from timestamp and slot.
42    pub fn new(slot: u16) -> Self {
43        use std::time::{SystemTime, UNIX_EPOCH};
44        let ts = SystemTime::now()
45            .duration_since(UNIX_EPOCH)
46            .unwrap_or_default()
47            .as_nanos() as u64;
48        // Combine timestamp with slot for uniqueness
49        Self(ts ^ (slot as u64))
50    }
51}
52
53/// Current state of a slot migration.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55pub enum MigrationState {
56    /// Target has marked slot as importing, waiting for source.
57    Importing,
58    /// Source has marked slot as migrating, ready to stream keys.
59    Migrating,
60    /// Keys are being transferred in batches.
61    Streaming,
62    /// Final batch being sent, brief pause on writes.
63    Finalizing,
64    /// Migration complete, ownership transferred.
65    Complete,
66}
67
68impl std::fmt::Display for MigrationState {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            Self::Importing => write!(f, "importing"),
72            Self::Migrating => write!(f, "migrating"),
73            Self::Streaming => write!(f, "streaming"),
74            Self::Finalizing => write!(f, "finalizing"),
75            Self::Complete => write!(f, "complete"),
76        }
77    }
78}
79
80/// A single slot migration operation.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Migration {
83    /// Unique migration identifier.
84    pub id: MigrationId,
85    /// The slot being migrated.
86    pub slot: u16,
87    /// Node currently owning the slot.
88    pub source: NodeId,
89    /// Node receiving the slot.
90    pub target: NodeId,
91    /// Current migration state.
92    pub state: MigrationState,
93    /// When migration started.
94    #[serde(skip)]
95    pub started_at: Option<Instant>,
96    /// Number of keys migrated so far.
97    pub keys_migrated: u64,
98    /// Total keys to migrate (if known).
99    pub keys_total: Option<u64>,
100}
101
102impl Migration {
103    /// Create a new migration in the importing state (target perspective).
104    pub fn new_importing(slot: u16, source: NodeId, target: NodeId) -> Self {
105        Self {
106            id: MigrationId::new(slot),
107            slot,
108            source,
109            target,
110            state: MigrationState::Importing,
111            started_at: Some(Instant::now()),
112            keys_migrated: 0,
113            keys_total: None,
114        }
115    }
116
117    /// Create a new migration in the migrating state (source perspective).
118    pub fn new_migrating(slot: u16, source: NodeId, target: NodeId) -> Self {
119        Self {
120            id: MigrationId::new(slot),
121            slot,
122            source,
123            target,
124            state: MigrationState::Migrating,
125            started_at: Some(Instant::now()),
126            keys_migrated: 0,
127            keys_total: None,
128        }
129    }
130
131    /// Check if this migration involves a specific node.
132    pub fn involves(&self, node: &NodeId) -> bool {
133        self.source == *node || self.target == *node
134    }
135
136    /// Get progress as a percentage (0-100).
137    pub fn progress(&self) -> Option<u8> {
138        self.keys_total.map(|total| {
139            if total == 0 {
140                100
141            } else {
142                ((self.keys_migrated * 100) / total).min(100) as u8
143            }
144        })
145    }
146
147    /// Transition to streaming state.
148    pub fn start_streaming(&mut self, total_keys: u64) {
149        self.state = MigrationState::Streaming;
150        self.keys_total = Some(total_keys);
151    }
152
153    /// Record migrated keys.
154    pub fn record_migrated(&mut self, count: u64) {
155        self.keys_migrated += count;
156    }
157
158    /// Transition to finalizing state.
159    pub fn start_finalizing(&mut self) {
160        self.state = MigrationState::Finalizing;
161    }
162
163    /// Mark migration as complete.
164    pub fn complete(&mut self) {
165        self.state = MigrationState::Complete;
166    }
167}
168
169/// Tracks all active migrations for a node.
170#[derive(Debug, Default)]
171pub struct MigrationManager {
172    /// Migrations where this node is the source (slot is migrating out).
173    outgoing: HashMap<u16, Migration>,
174    /// Migrations where this node is the target (slot is importing in).
175    incoming: HashMap<u16, Migration>,
176    /// Keys that have been migrated but not yet confirmed.
177    pending_keys: HashMap<u16, HashSet<Vec<u8>>>,
178}
179
180impl MigrationManager {
181    /// Create a new migration manager.
182    pub fn new() -> Self {
183        Self::default()
184    }
185
186    /// Check if a slot is currently migrating out.
187    pub fn is_migrating(&self, slot: u16) -> bool {
188        self.outgoing.contains_key(&slot)
189    }
190
191    /// Check if a slot is currently being imported.
192    pub fn is_importing(&self, slot: u16) -> bool {
193        self.incoming.contains_key(&slot)
194    }
195
196    /// Get migration info for a slot that's migrating out.
197    pub fn get_outgoing(&self, slot: u16) -> Option<&Migration> {
198        self.outgoing.get(&slot)
199    }
200
201    /// Get migration info for a slot that's being imported.
202    pub fn get_incoming(&self, slot: u16) -> Option<&Migration> {
203        self.incoming.get(&slot)
204    }
205
206    /// Start importing a slot from another node.
207    ///
208    /// Returns error if slot is already involved in a migration.
209    pub fn start_import(
210        &mut self,
211        slot: u16,
212        source: NodeId,
213        local_id: NodeId,
214    ) -> Result<&Migration, MigrationError> {
215        if self.outgoing.contains_key(&slot) {
216            return Err(MigrationError::SlotAlreadyMigrating { slot });
217        }
218        if self.incoming.contains_key(&slot) {
219            return Err(MigrationError::SlotAlreadyImporting { slot });
220        }
221
222        let migration = Migration::new_importing(slot, source, local_id);
223        self.incoming.insert(slot, migration);
224        self.pending_keys.insert(slot, HashSet::new());
225        self.incoming
226            .get(&slot)
227            .ok_or(MigrationError::NoMigrationInProgress { slot })
228    }
229
230    /// Start migrating a slot to another node.
231    ///
232    /// Returns error if slot is already involved in a migration.
233    pub fn start_migrate(
234        &mut self,
235        slot: u16,
236        local_id: NodeId,
237        target: NodeId,
238    ) -> Result<&Migration, MigrationError> {
239        if self.outgoing.contains_key(&slot) {
240            return Err(MigrationError::SlotAlreadyMigrating { slot });
241        }
242        if self.incoming.contains_key(&slot) {
243            return Err(MigrationError::SlotAlreadyImporting { slot });
244        }
245
246        let migration = Migration::new_migrating(slot, local_id, target);
247        self.outgoing.insert(slot, migration);
248        self.pending_keys.insert(slot, HashSet::new());
249        self.outgoing
250            .get(&slot)
251            .ok_or(MigrationError::NoMigrationInProgress { slot })
252    }
253
254    /// Record that a key has been migrated.
255    pub fn key_migrated(&mut self, slot: u16, key: Vec<u8>) {
256        if let Some(keys) = self.pending_keys.get_mut(&slot) {
257            keys.insert(key);
258        }
259        if let Some(migration) = self.outgoing.get_mut(&slot) {
260            migration.record_migrated(1);
261        }
262    }
263
264    /// Check if a specific key has been migrated.
265    pub fn is_key_migrated(&self, slot: u16, key: &[u8]) -> bool {
266        self.pending_keys
267            .get(&slot)
268            .is_some_and(|keys| keys.contains(key))
269    }
270
271    /// Complete a migration and clean up state.
272    pub fn complete_migration(&mut self, slot: u16) -> Option<Migration> {
273        self.pending_keys.remove(&slot);
274        // Try outgoing first, then incoming
275        self.outgoing
276            .remove(&slot)
277            .or_else(|| self.incoming.remove(&slot))
278            .map(|mut m| {
279                m.complete();
280                m
281            })
282    }
283
284    /// Abort a migration and clean up state.
285    pub fn abort_migration(&mut self, slot: u16) -> Option<Migration> {
286        self.pending_keys.remove(&slot);
287        self.outgoing
288            .remove(&slot)
289            .or_else(|| self.incoming.remove(&slot))
290    }
291
292    /// Get all active outgoing migrations.
293    pub fn outgoing_migrations(&self) -> impl Iterator<Item = &Migration> {
294        self.outgoing.values()
295    }
296
297    /// Get all active incoming migrations.
298    pub fn incoming_migrations(&self) -> impl Iterator<Item = &Migration> {
299        self.incoming.values()
300    }
301
302    /// Get total number of active migrations.
303    pub fn active_count(&self) -> usize {
304        self.outgoing.len() + self.incoming.len()
305    }
306}
307
308/// Represents a batch of keys to migrate.
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct MigrationBatch {
311    /// The slot being migrated.
312    pub slot: u16,
313    /// Keys and their values in this batch.
314    pub entries: Vec<MigrationEntry>,
315    /// Whether this is the final batch.
316    pub is_final: bool,
317    /// Sequence number for ordering.
318    pub sequence: u64,
319}
320
321/// A single key-value entry being migrated.
322#[derive(Debug, Clone, Serialize, Deserialize)]
323pub struct MigrationEntry {
324    /// The key being migrated.
325    pub key: Vec<u8>,
326    /// Serialized value data.
327    pub value: Vec<u8>,
328    /// TTL remaining in milliseconds (0 = no expiry).
329    pub ttl_ms: u64,
330}
331
332impl MigrationBatch {
333    /// Create a new migration batch.
334    pub fn new(slot: u16, sequence: u64) -> Self {
335        Self {
336            slot,
337            entries: Vec::new(),
338            is_final: false,
339            sequence,
340        }
341    }
342
343    /// Add an entry to the batch.
344    pub fn add_entry(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: u64) {
345        self.entries.push(MigrationEntry { key, value, ttl_ms });
346    }
347
348    /// Mark this as the final batch.
349    pub fn mark_final(&mut self) {
350        self.is_final = true;
351    }
352
353    /// Get the number of entries in this batch.
354    pub fn len(&self) -> usize {
355        self.entries.len()
356    }
357
358    /// Check if the batch is empty.
359    pub fn is_empty(&self) -> bool {
360        self.entries.is_empty()
361    }
362
363    /// Estimate the size of this batch in bytes.
364    pub fn size_bytes(&self) -> usize {
365        self.entries
366            .iter()
367            .map(|e| e.key.len() + e.value.len() + 8)
368            .sum()
369    }
370}
371
372/// Errors that can occur during migration.
373#[derive(Debug, Clone, thiserror::Error)]
374pub enum MigrationError {
375    /// Slot is already being migrated out.
376    #[error("slot {slot} is already migrating")]
377    SlotAlreadyMigrating { slot: u16 },
378
379    /// Slot is already being imported.
380    #[error("slot {slot} is already importing")]
381    SlotAlreadyImporting { slot: u16 },
382
383    /// No migration in progress for this slot.
384    #[error("no migration in progress for slot {slot}")]
385    NoMigrationInProgress { slot: u16 },
386
387    /// Migration target is unreachable.
388    #[error("cannot reach migration target {addr}: {reason}")]
389    TargetUnreachable { addr: SocketAddr, reason: String },
390
391    /// Migration was aborted.
392    #[error("migration for slot {slot} was aborted")]
393    Aborted { slot: u16 },
394
395    /// Invalid migration state transition.
396    #[error("invalid state transition from {from} to {to}")]
397    InvalidStateTransition {
398        from: MigrationState,
399        to: MigrationState,
400    },
401
402    /// Timeout during migration.
403    #[error("migration timeout after {elapsed:?}")]
404    Timeout { elapsed: Duration },
405}
406
407/// Result of checking whether a command should be redirected during migration.
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub enum MigrationRedirect {
410    /// No redirect needed, handle locally.
411    None,
412    /// Send MOVED redirect (slot permanently moved).
413    Moved { slot: u16, addr: SocketAddr },
414    /// Send ASK redirect (slot temporarily at another node).
415    Ask { slot: u16, addr: SocketAddr },
416}
417
418impl MigrationRedirect {
419    /// Format as RESP error string.
420    pub fn to_error_string(&self) -> Option<String> {
421        match self {
422            Self::None => None,
423            Self::Moved { slot, addr } => Some(format!("MOVED {} {}", slot, addr)),
424            Self::Ask { slot, addr } => Some(format!("ASK {} {}", slot, addr)),
425        }
426    }
427}
428
429/// Configuration for migration behavior.
430#[derive(Debug, Clone)]
431pub struct MigrationConfig {
432    /// Maximum keys per batch.
433    pub batch_size: usize,
434    /// Maximum batch size in bytes.
435    pub batch_bytes: usize,
436    /// Timeout for individual key migration.
437    pub key_timeout: Duration,
438    /// Timeout for entire migration.
439    pub migration_timeout: Duration,
440    /// Delay between batches to avoid overwhelming the network.
441    pub batch_delay: Duration,
442}
443
444impl Default for MigrationConfig {
445    fn default() -> Self {
446        Self {
447            batch_size: 100,
448            batch_bytes: 1024 * 1024, // 1MB
449            key_timeout: Duration::from_secs(5),
450            migration_timeout: Duration::from_secs(3600), // 1 hour
451            batch_delay: Duration::from_millis(10),
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    fn node_id() -> NodeId {
461        NodeId::new()
462    }
463
464    #[test]
465    fn migration_new_importing() {
466        let source = node_id();
467        let target = node_id();
468        let m = Migration::new_importing(100, source, target);
469
470        assert_eq!(m.slot, 100);
471        assert_eq!(m.source, source);
472        assert_eq!(m.target, target);
473        assert_eq!(m.state, MigrationState::Importing);
474        assert_eq!(m.keys_migrated, 0);
475    }
476
477    #[test]
478    fn migration_new_migrating() {
479        let source = node_id();
480        let target = node_id();
481        let m = Migration::new_migrating(100, source, target);
482
483        assert_eq!(m.state, MigrationState::Migrating);
484    }
485
486    #[test]
487    fn migration_involves() {
488        let source = node_id();
489        let target = node_id();
490        let other = node_id();
491        let m = Migration::new_importing(100, source, target);
492
493        assert!(m.involves(&source));
494        assert!(m.involves(&target));
495        assert!(!m.involves(&other));
496    }
497
498    #[test]
499    fn migration_progress() {
500        let mut m = Migration::new_migrating(100, node_id(), node_id());
501
502        // No total set
503        assert_eq!(m.progress(), None);
504
505        // Set total and migrate some
506        m.start_streaming(100);
507        assert_eq!(m.progress(), Some(0));
508
509        m.record_migrated(50);
510        assert_eq!(m.progress(), Some(50));
511
512        m.record_migrated(50);
513        assert_eq!(m.progress(), Some(100));
514    }
515
516    #[test]
517    fn migration_state_transitions() {
518        let mut m = Migration::new_migrating(100, node_id(), node_id());
519
520        assert_eq!(m.state, MigrationState::Migrating);
521
522        m.start_streaming(50);
523        assert_eq!(m.state, MigrationState::Streaming);
524
525        m.start_finalizing();
526        assert_eq!(m.state, MigrationState::Finalizing);
527
528        m.complete();
529        assert_eq!(m.state, MigrationState::Complete);
530    }
531
532    #[test]
533    fn manager_start_import() {
534        let mut manager = MigrationManager::new();
535        let source = node_id();
536        let local = node_id();
537
538        let result = manager.start_import(100, source, local);
539        assert!(result.is_ok());
540        assert!(manager.is_importing(100));
541        assert!(!manager.is_migrating(100));
542    }
543
544    #[test]
545    fn manager_start_migrate() {
546        let mut manager = MigrationManager::new();
547        let local = node_id();
548        let target = node_id();
549
550        let result = manager.start_migrate(100, local, target);
551        assert!(result.is_ok());
552        assert!(manager.is_migrating(100));
553        assert!(!manager.is_importing(100));
554    }
555
556    #[test]
557    fn manager_double_migration_error() {
558        let mut manager = MigrationManager::new();
559        let local = node_id();
560        let target = node_id();
561
562        manager.start_migrate(100, local, target).unwrap();
563
564        // Can't migrate same slot again
565        let result = manager.start_migrate(100, local, node_id());
566        assert!(matches!(
567            result,
568            Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
569        ));
570
571        // Can't import a slot that's migrating
572        let result = manager.start_import(100, node_id(), local);
573        assert!(matches!(
574            result,
575            Err(MigrationError::SlotAlreadyMigrating { slot: 100 })
576        ));
577    }
578
579    #[test]
580    fn manager_key_tracking() {
581        let mut manager = MigrationManager::new();
582        let local = node_id();
583        let target = node_id();
584
585        manager.start_migrate(100, local, target).unwrap();
586
587        assert!(!manager.is_key_migrated(100, b"key1"));
588
589        manager.key_migrated(100, b"key1".to_vec());
590        assert!(manager.is_key_migrated(100, b"key1"));
591        assert!(!manager.is_key_migrated(100, b"key2"));
592    }
593
594    #[test]
595    fn manager_complete_migration() {
596        let mut manager = MigrationManager::new();
597        let local = node_id();
598        let target = node_id();
599
600        manager.start_migrate(100, local, target).unwrap();
601        manager.key_migrated(100, b"key1".to_vec());
602
603        let completed = manager.complete_migration(100);
604        assert!(completed.is_some());
605        assert_eq!(completed.unwrap().state, MigrationState::Complete);
606
607        // State should be cleaned up
608        assert!(!manager.is_migrating(100));
609        assert!(!manager.is_key_migrated(100, b"key1"));
610    }
611
612    #[test]
613    fn manager_abort_migration() {
614        let mut manager = MigrationManager::new();
615        let local = node_id();
616        let target = node_id();
617
618        manager.start_migrate(100, local, target).unwrap();
619
620        let aborted = manager.abort_migration(100);
621        assert!(aborted.is_some());
622
623        assert!(!manager.is_migrating(100));
624    }
625
626    #[test]
627    fn batch_operations() {
628        let mut batch = MigrationBatch::new(100, 1);
629
630        assert!(batch.is_empty());
631        assert_eq!(batch.len(), 0);
632
633        batch.add_entry(b"key1".to_vec(), b"value1".to_vec(), 0);
634        batch.add_entry(b"key2".to_vec(), b"value2".to_vec(), 5000);
635
636        assert!(!batch.is_empty());
637        assert_eq!(batch.len(), 2);
638        assert!(!batch.is_final);
639
640        batch.mark_final();
641        assert!(batch.is_final);
642    }
643
644    #[test]
645    fn redirect_formatting() {
646        let moved = MigrationRedirect::Moved {
647            slot: 100,
648            addr: "127.0.0.1:6379".parse().unwrap(),
649        };
650        assert_eq!(
651            moved.to_error_string(),
652            Some("MOVED 100 127.0.0.1:6379".to_string())
653        );
654
655        let ask = MigrationRedirect::Ask {
656            slot: 200,
657            addr: "127.0.0.1:6380".parse().unwrap(),
658        };
659        assert_eq!(
660            ask.to_error_string(),
661            Some("ASK 200 127.0.0.1:6380".to_string())
662        );
663
664        assert_eq!(MigrationRedirect::None.to_error_string(), None);
665    }
666
667    #[test]
668    fn migration_state_display() {
669        assert_eq!(MigrationState::Importing.to_string(), "importing");
670        assert_eq!(MigrationState::Migrating.to_string(), "migrating");
671        assert_eq!(MigrationState::Streaming.to_string(), "streaming");
672        assert_eq!(MigrationState::Finalizing.to_string(), "finalizing");
673        assert_eq!(MigrationState::Complete.to_string(), "complete");
674    }
675
676    #[test]
677    fn config_defaults() {
678        let config = MigrationConfig::default();
679        assert_eq!(config.batch_size, 100);
680        assert_eq!(config.batch_bytes, 1024 * 1024);
681        assert_eq!(config.key_timeout, Duration::from_secs(5));
682    }
683}