Skip to main content

kevy_scope/
migration.rs

1//! Per-scope migration state machine for the Q3=(a) quiesce-window
2//! `MOVE-SCOPE` design (RFC `## Q3 resolution`). Tracks two
3//! concurrent maps:
4//!
5//! - **MIGRATING** — the writer (`from`) has stopped accepting
6//!   writes for the prefix and is shipping its slice to the target
7//!   (`to`). Writes during this window answer `-QUIESCED <prefix>
8//!   migrating to <host:port>`.
9//! - **MIGRATED** — the writer (`from`) has committed the slice
10//!   to `to`. Writes here now answer `-MISDIRECTED writer is
11//!   <to-host:port>` (no quiesce — the migration is done; the
12//!   client should follow). The MIGRATED table is per-node local
13//!   state; other nodes pick up the new writer via static config
14//!   restart (v1.21 MVP — no gossip, per the anti-scope).
15//!
16//! This module is pure data + a single `Mutex` under the hood.
17//! Server cement layer plugs the start / commit / abort transitions
18//! into the `MOVE-SCOPE` / `MOVE-SCOPE-INGEST` command handlers.
19
20use std::collections::HashMap;
21use std::sync::Mutex;
22
23/// One in-flight migration. Carries enough metadata so the server
24/// cement can encode `-QUIESCED <prefix> migrating to <host:port>`
25/// without re-resolving the target.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct MigrationState {
28    /// Source writer node id (the node currently quiescing).
29    pub from: String,
30    /// Target writer node id (the node receiving the slice).
31    pub to: String,
32}
33
34/// Per-scope migration tracker. Insert order doesn't matter; lookups
35/// are O(prefix-set-size), expected ≤ tens of entries even in the
36/// largest cluster.
37#[derive(Debug, Default)]
38pub struct MigrationTable {
39    migrating: Mutex<HashMap<Vec<u8>, MigrationState>>,
40    migrated: Mutex<HashMap<Vec<u8>, MigrationState>>,
41}
42
43/// Why [`MigrationTable::start`] refused.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum MigrationError {
46    /// A migration for this prefix is already in flight (idempotent
47    /// retry would clobber the state).
48    AlreadyMigrating,
49    /// A prior migration's commit hasn't been observed locally yet.
50    /// Caller should abort the prior one first or accept the new
51    /// state as a no-op.
52    AlreadyMigrated,
53}
54
55impl std::fmt::Display for MigrationError {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        match self {
58            Self::AlreadyMigrating => write!(f, "migration for this prefix is already in flight"),
59            Self::AlreadyMigrated => write!(f, "prefix has already been migrated"),
60        }
61    }
62}
63
64impl std::error::Error for MigrationError {}
65
66impl MigrationTable {
67    /// Empty table — the v1.21 default (no migrations in flight).
68    #[must_use]
69    pub fn new() -> Self {
70        Self::default()
71    }
72
73    /// Start a migration for `prefix`. Returns `Err` if the prefix
74    /// is already in either map. The caller (server cement) is
75    /// expected to hold a higher-level lock per `MOVE-SCOPE`
76    /// invocation so concurrent operator commands don't race.
77    pub fn start(
78        &self,
79        prefix: Vec<u8>,
80        from: String,
81        to: String,
82    ) -> Result<(), MigrationError> {
83        let mut mig = self
84            .migrating
85            .lock()
86            .unwrap_or_else(std::sync::PoisonError::into_inner);
87        if mig.contains_key(&prefix) {
88            return Err(MigrationError::AlreadyMigrating);
89        }
90        let done = self
91            .migrated
92            .lock()
93            .unwrap_or_else(std::sync::PoisonError::into_inner);
94        if done.contains_key(&prefix) {
95            return Err(MigrationError::AlreadyMigrated);
96        }
97        drop(done);
98        mig.insert(prefix, MigrationState { from, to });
99        Ok(())
100    }
101
102    /// Commit an in-flight migration: move the entry from MIGRATING
103    /// to MIGRATED. Returns the state that was committed, or `None`
104    /// when there's no in-flight migration for `prefix` (idempotent).
105    pub fn commit(&self, prefix: &[u8]) -> Option<MigrationState> {
106        let mut mig = self
107            .migrating
108            .lock()
109            .unwrap_or_else(std::sync::PoisonError::into_inner);
110        let entry = mig.remove(prefix)?;
111        drop(mig);
112        let mut done = self
113            .migrated
114            .lock()
115            .unwrap_or_else(std::sync::PoisonError::into_inner);
116        done.insert(prefix.to_vec(), entry.clone());
117        Some(entry)
118    }
119
120    /// Abort an in-flight migration without committing. Drops the
121    /// entry from MIGRATING; the MIGRATED map is untouched (an
122    /// aborted migration never moved data). Returns the state that
123    /// was aborted, or `None` when there was nothing to abort.
124    pub fn abort(&self, prefix: &[u8]) -> Option<MigrationState> {
125        let mut mig = self
126            .migrating
127            .lock()
128            .unwrap_or_else(std::sync::PoisonError::into_inner);
129        mig.remove(prefix)
130    }
131
132    /// Look up the in-flight migration for `prefix`. Returns the
133    /// state by value (cheap clone — two short Strings) so the
134    /// caller doesn't hold the mutex while encoding the
135    /// `-QUIESCED` reply.
136    #[must_use]
137    pub fn lookup_migrating(&self, prefix: &[u8]) -> Option<MigrationState> {
138        self.migrating
139            .lock()
140            .unwrap_or_else(std::sync::PoisonError::into_inner)
141            .get(prefix)
142            .cloned()
143    }
144
145    /// Same shape as [`Self::lookup_migrating`], but for committed
146    /// migrations (post-MOVE-SCOPE-INGEST).
147    #[must_use]
148    pub fn lookup_migrated(&self, prefix: &[u8]) -> Option<MigrationState> {
149        self.migrated
150            .lock()
151            .unwrap_or_else(std::sync::PoisonError::into_inner)
152            .get(prefix)
153            .cloned()
154    }
155
156    /// Longest-prefix-match lookup for in-flight migrations. Used by
157    /// `scope_integration::route_write` to decide whether to encode
158    /// `-QUIESCED` before falling through to the static
159    /// `OwnershipTable::route`.
160    #[must_use]
161    pub fn match_migrating(&self, key: &[u8]) -> Option<MigrationState> {
162        let g = self
163            .migrating
164            .lock()
165            .unwrap_or_else(std::sync::PoisonError::into_inner);
166        // The expected prefix count is small (operator-declared per-
167        // cluster); a linear scan is fine. Choose the longest match
168        // so nested prefixes route deterministically.
169        let mut best: Option<(&Vec<u8>, &MigrationState)> = None;
170        for (p, st) in g.iter() {
171            if key.starts_with(p)
172                && best.is_none_or(|(prev, _)| p.len() > prev.len())
173            {
174                best = Some((p, st));
175            }
176        }
177        best.map(|(_, st)| st.clone())
178    }
179
180    /// As [`Self::match_migrating`], for the MIGRATED map.
181    #[must_use]
182    pub fn match_migrated(&self, key: &[u8]) -> Option<MigrationState> {
183        let g = self
184            .migrated
185            .lock()
186            .unwrap_or_else(std::sync::PoisonError::into_inner);
187        let mut best: Option<(&Vec<u8>, &MigrationState)> = None;
188        for (p, st) in g.iter() {
189            if key.starts_with(p)
190                && best.is_none_or(|(prev, _)| p.len() > prev.len())
191            {
192                best = Some((p, st));
193            }
194        }
195        best.map(|(_, st)| st.clone())
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn start_then_lookup() {
205        let t = MigrationTable::new();
206        t.start(b"app:billing:".to_vec(), "A".into(), "B".into())
207            .unwrap();
208        let st = t.lookup_migrating(b"app:billing:").unwrap();
209        assert_eq!(st.from, "A");
210        assert_eq!(st.to, "B");
211    }
212
213    #[test]
214    fn start_double_errs_already_migrating() {
215        let t = MigrationTable::new();
216        t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
217        let err = t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap_err();
218        assert_eq!(err, MigrationError::AlreadyMigrating);
219    }
220
221    #[test]
222    fn commit_moves_to_migrated() {
223        let t = MigrationTable::new();
224        t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
225        let committed = t.commit(b"p:").unwrap();
226        assert_eq!(committed.to, "B");
227        assert!(t.lookup_migrating(b"p:").is_none());
228        assert_eq!(t.lookup_migrated(b"p:").map(|s| s.to), Some("B".into()));
229    }
230
231    #[test]
232    fn abort_drops_migrating() {
233        let t = MigrationTable::new();
234        t.start(b"p:".to_vec(), "A".into(), "B".into()).unwrap();
235        t.abort(b"p:").unwrap();
236        assert!(t.lookup_migrating(b"p:").is_none());
237        assert!(t.lookup_migrated(b"p:").is_none());
238    }
239
240    #[test]
241    fn match_migrating_longest_prefix_wins() {
242        let t = MigrationTable::new();
243        t.start(b"app:".to_vec(), "A".into(), "B".into()).unwrap();
244        t.start(b"app:billing:".to_vec(), "B".into(), "C".into())
245            .unwrap();
246        // Note: kevy-scope's OwnershipTable rejects overlapping
247        // prefixes at startup, but the MigrationTable is operator-
248        // driven runtime state; it has to handle whatever the
249        // operator types. Longest match keeps the routing
250        // deterministic.
251        let st = t.match_migrating(b"app:billing:x").unwrap();
252        assert_eq!(st.from, "B"); // longest prefix wins
253    }
254}