1use std::time::Instant;
2
3use tracing::{info, warn};
4
5#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum MigrationPhase {
16 Idle,
18 BaseCopy {
20 bytes_transferred: u64,
21 bytes_total: u64,
22 },
23 WalCatchUp {
25 start_lsn: u64,
27 current_lsn: u64,
29 source_lsn: u64,
31 },
32 AtomicCutOver {
34 pause_start_us: u64,
36 },
37 Completed {
39 pause_duration_us: u64,
41 },
42 Failed { reason: String },
44}
45
46#[derive(Debug, Clone)]
48pub struct MigrationState {
49 pub vshard_id: u16,
51 pub source_group: u64,
53 pub target_group: u64,
55 pub source_node: u64,
57 pub target_node: u64,
59 pub phase: MigrationPhase,
61 pub write_pause_budget_us: u64,
63 pub started_at: Option<Instant>,
65}
66
67impl MigrationState {
68 pub fn new(
69 vshard_id: u16,
70 source_group: u64,
71 target_group: u64,
72 source_node: u64,
73 target_node: u64,
74 write_pause_budget_us: u64,
75 ) -> Self {
76 Self {
77 vshard_id,
78 source_group,
79 target_group,
80 source_node,
81 target_node,
82 phase: MigrationPhase::Idle,
83 write_pause_budget_us,
84 started_at: None,
85 }
86 }
87
88 pub fn start_base_copy(&mut self, bytes_total: u64) {
90 self.started_at = Some(Instant::now());
91 self.phase = MigrationPhase::BaseCopy {
92 bytes_transferred: 0,
93 bytes_total,
94 };
95 info!(
96 vshard = self.vshard_id,
97 source = self.source_node,
98 target = self.target_node,
99 bytes_total,
100 "starting base copy"
101 );
102 }
103
104 pub fn update_base_copy(&mut self, bytes_transferred: u64) {
106 if let MigrationPhase::BaseCopy { bytes_total, .. } = self.phase {
107 self.phase = MigrationPhase::BaseCopy {
108 bytes_transferred,
109 bytes_total,
110 };
111 }
112 }
113
114 pub fn start_wal_catchup(&mut self, start_lsn: u64, source_lsn: u64) {
116 self.phase = MigrationPhase::WalCatchUp {
117 start_lsn,
118 current_lsn: start_lsn,
119 source_lsn,
120 };
121 info!(
122 vshard = self.vshard_id,
123 start_lsn,
124 source_lsn,
125 lag = source_lsn - start_lsn,
126 "starting wal catch-up"
127 );
128 }
129
130 pub fn update_wal_catchup(&mut self, current_lsn: u64, source_lsn: u64) {
132 if let MigrationPhase::WalCatchUp { start_lsn, .. } = self.phase {
133 self.phase = MigrationPhase::WalCatchUp {
134 start_lsn,
135 current_lsn,
136 source_lsn,
137 };
138 }
139 }
140
141 pub fn is_catchup_ready(&self) -> bool {
143 if let MigrationPhase::WalCatchUp {
144 current_lsn,
145 source_lsn,
146 ..
147 } = self.phase
148 {
149 source_lsn.saturating_sub(current_lsn) <= 10
151 } else {
152 false
153 }
154 }
155
156 pub fn start_cutover(&mut self, estimated_pause_us: u64) -> crate::Result<()> {
160 if estimated_pause_us > self.write_pause_budget_us {
161 warn!(
162 vshard = self.vshard_id,
163 estimated_us = estimated_pause_us,
164 budget_us = self.write_pause_budget_us,
165 "refusing cut-over: pause exceeds budget"
166 );
167 return Err(crate::ClusterError::MigrationPauseBudgetExceeded {
168 estimated_us: estimated_pause_us,
169 budget_us: self.write_pause_budget_us,
170 });
171 }
172
173 self.phase = MigrationPhase::AtomicCutOver {
174 pause_start_us: estimated_pause_us,
175 };
176 info!(
177 vshard = self.vshard_id,
178 estimated_pause_us, "starting atomic cut-over"
179 );
180 Ok(())
181 }
182
183 pub fn complete(&mut self, actual_pause_us: u64) {
185 self.phase = MigrationPhase::Completed {
186 pause_duration_us: actual_pause_us,
187 };
188 info!(
189 vshard = self.vshard_id,
190 pause_us = actual_pause_us,
191 "migration completed"
192 );
193 }
194
195 pub fn fail(&mut self, reason: String) {
197 warn!(
198 vshard = self.vshard_id,
199 reason = %reason,
200 "migration failed, source remains authoritative"
201 );
202 self.phase = MigrationPhase::Failed { reason };
203 }
204
205 pub fn is_active(&self) -> bool {
206 !matches!(
207 self.phase,
208 MigrationPhase::Idle | MigrationPhase::Completed { .. } | MigrationPhase::Failed { .. }
209 )
210 }
211
212 pub fn elapsed(&self) -> Option<std::time::Duration> {
214 self.started_at.map(|s| s.elapsed())
215 }
216
217 pub fn vshard_id(&self) -> u16 {
218 self.vshard_id
219 }
220
221 pub fn phase(&self) -> &MigrationPhase {
222 &self.phase
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 fn test_migration() -> MigrationState {
231 MigrationState::new(42, 0, 1, 10, 20, 1000)
232 }
233
234 #[test]
235 fn full_lifecycle() {
236 let mut m = test_migration();
237 assert!(!m.is_active());
238
239 m.start_base_copy(1_000_000);
241 assert!(m.is_active());
242 m.update_base_copy(500_000);
243 if let MigrationPhase::BaseCopy {
244 bytes_transferred, ..
245 } = m.phase
246 {
247 assert_eq!(bytes_transferred, 500_000);
248 } else {
249 panic!("expected BaseCopy");
250 }
251
252 m.start_wal_catchup(100, 200);
254 assert!(!m.is_catchup_ready());
255 m.update_wal_catchup(195, 200);
256 assert!(m.is_catchup_ready());
257
258 m.start_cutover(500).unwrap();
260 assert!(matches!(m.phase, MigrationPhase::AtomicCutOver { .. }));
261
262 m.complete(450);
264 assert!(!m.is_active());
265 if let MigrationPhase::Completed {
266 pause_duration_us, ..
267 } = m.phase
268 {
269 assert_eq!(pause_duration_us, 450);
270 }
271 }
272
273 #[test]
274 fn pause_budget_exceeded() {
275 let mut m = test_migration();
276 m.start_base_copy(100);
277 m.start_wal_catchup(0, 5);
278 m.update_wal_catchup(5, 5);
279
280 let err = m.start_cutover(2000).unwrap_err();
282 assert!(matches!(
283 err,
284 crate::ClusterError::MigrationPauseBudgetExceeded { .. }
285 ));
286 }
287
288 #[test]
289 fn failure_recovery() {
290 let mut m = test_migration();
291 m.start_base_copy(100);
292 m.fail("network partition".into());
293 assert!(!m.is_active());
294 assert!(matches!(m.phase, MigrationPhase::Failed { .. }));
295 }
296
297 #[test]
298 fn catchup_threshold() {
299 let mut m = test_migration();
300 m.start_base_copy(100);
301 m.start_wal_catchup(0, 100);
302
303 m.update_wal_catchup(10, 100);
305 assert!(!m.is_catchup_ready());
306
307 m.update_wal_catchup(95, 100);
309 assert!(m.is_catchup_ready());
310 }
311}