1use std::time::Instant;
4
5use tracing::{info, warn};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum MigrationPhase {
18 Idle,
20 BaseCopy {
22 bytes_transferred: u64,
23 bytes_total: u64,
24 },
25 WalCatchUp {
27 start_lsn: u64,
29 current_lsn: u64,
31 source_lsn: u64,
33 },
34 AtomicCutOver {
36 pause_start_us: u64,
38 },
39 Completed {
41 pause_duration_us: u64,
43 },
44 Failed { reason: String },
46}
47
48#[derive(Debug, Clone)]
50pub struct MigrationState {
51 pub vshard_id: u32,
53 pub source_group: u64,
55 pub target_group: u64,
57 pub source_node: u64,
59 pub target_node: u64,
61 pub phase: MigrationPhase,
63 pub write_pause_budget_us: u64,
65 pub started_at: Option<Instant>,
67}
68
69impl MigrationState {
70 pub fn new(
71 vshard_id: u32,
72 source_group: u64,
73 target_group: u64,
74 source_node: u64,
75 target_node: u64,
76 write_pause_budget_us: u64,
77 ) -> Self {
78 Self {
79 vshard_id,
80 source_group,
81 target_group,
82 source_node,
83 target_node,
84 phase: MigrationPhase::Idle,
85 write_pause_budget_us,
86 started_at: None,
87 }
88 }
89
90 pub fn start_base_copy(&mut self, bytes_total: u64) {
92 self.started_at = Some(Instant::now());
93 self.phase = MigrationPhase::BaseCopy {
94 bytes_transferred: 0,
95 bytes_total,
96 };
97 info!(
98 vshard = self.vshard_id,
99 source = self.source_node,
100 target = self.target_node,
101 bytes_total,
102 "starting base copy"
103 );
104 }
105
106 pub fn update_base_copy(&mut self, bytes_transferred: u64) {
108 if let MigrationPhase::BaseCopy { bytes_total, .. } = self.phase {
109 self.phase = MigrationPhase::BaseCopy {
110 bytes_transferred,
111 bytes_total,
112 };
113 }
114 }
115
116 pub fn start_wal_catchup(&mut self, start_lsn: u64, source_lsn: u64) {
118 self.phase = MigrationPhase::WalCatchUp {
119 start_lsn,
120 current_lsn: start_lsn,
121 source_lsn,
122 };
123 info!(
124 vshard = self.vshard_id,
125 start_lsn,
126 source_lsn,
127 lag = source_lsn - start_lsn,
128 "starting wal catch-up"
129 );
130 }
131
132 pub fn update_wal_catchup(&mut self, current_lsn: u64, source_lsn: u64) {
134 if let MigrationPhase::WalCatchUp { start_lsn, .. } = self.phase {
135 self.phase = MigrationPhase::WalCatchUp {
136 start_lsn,
137 current_lsn,
138 source_lsn,
139 };
140 }
141 }
142
143 pub fn is_catchup_ready(&self) -> bool {
145 if let MigrationPhase::WalCatchUp {
146 current_lsn,
147 source_lsn,
148 ..
149 } = self.phase
150 {
151 source_lsn.saturating_sub(current_lsn) <= 10
153 } else {
154 false
155 }
156 }
157
158 pub fn start_cutover(&mut self, estimated_pause_us: u64) -> crate::Result<()> {
162 if estimated_pause_us > self.write_pause_budget_us {
163 warn!(
164 vshard = self.vshard_id,
165 estimated_us = estimated_pause_us,
166 budget_us = self.write_pause_budget_us,
167 "refusing cut-over: pause exceeds budget"
168 );
169 return Err(crate::ClusterError::MigrationPauseBudgetExceeded {
170 estimated_us: estimated_pause_us,
171 budget_us: self.write_pause_budget_us,
172 });
173 }
174
175 self.phase = MigrationPhase::AtomicCutOver {
176 pause_start_us: estimated_pause_us,
177 };
178 info!(
179 vshard = self.vshard_id,
180 estimated_pause_us, "starting atomic cut-over"
181 );
182 Ok(())
183 }
184
185 pub fn complete(&mut self, actual_pause_us: u64) {
187 self.phase = MigrationPhase::Completed {
188 pause_duration_us: actual_pause_us,
189 };
190 info!(
191 vshard = self.vshard_id,
192 pause_us = actual_pause_us,
193 "migration completed"
194 );
195 }
196
197 pub fn fail(&mut self, reason: String) {
199 warn!(
200 vshard = self.vshard_id,
201 reason = %reason,
202 "migration failed, source remains authoritative"
203 );
204 self.phase = MigrationPhase::Failed { reason };
205 }
206
207 pub fn is_active(&self) -> bool {
208 !matches!(
209 self.phase,
210 MigrationPhase::Idle | MigrationPhase::Completed { .. } | MigrationPhase::Failed { .. }
211 )
212 }
213
214 pub fn elapsed(&self) -> Option<std::time::Duration> {
216 self.started_at.map(|s| s.elapsed())
217 }
218
219 pub fn vshard_id(&self) -> u32 {
220 self.vshard_id
221 }
222
223 pub fn phase(&self) -> &MigrationPhase {
224 &self.phase
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 fn test_migration() -> MigrationState {
233 MigrationState::new(42, 0, 1, 10, 20, 1000)
234 }
235
236 #[test]
237 fn full_lifecycle() {
238 let mut m = test_migration();
239 assert!(!m.is_active());
240
241 m.start_base_copy(1_000_000);
243 assert!(m.is_active());
244 m.update_base_copy(500_000);
245 if let MigrationPhase::BaseCopy {
246 bytes_transferred, ..
247 } = m.phase
248 {
249 assert_eq!(bytes_transferred, 500_000);
250 } else {
251 panic!("expected BaseCopy");
252 }
253
254 m.start_wal_catchup(100, 200);
256 assert!(!m.is_catchup_ready());
257 m.update_wal_catchup(195, 200);
258 assert!(m.is_catchup_ready());
259
260 m.start_cutover(500).unwrap();
262 assert!(matches!(m.phase, MigrationPhase::AtomicCutOver { .. }));
263
264 m.complete(450);
266 assert!(!m.is_active());
267 if let MigrationPhase::Completed {
268 pause_duration_us, ..
269 } = m.phase
270 {
271 assert_eq!(pause_duration_us, 450);
272 }
273 }
274
275 #[test]
276 fn pause_budget_exceeded() {
277 let mut m = test_migration();
278 m.start_base_copy(100);
279 m.start_wal_catchup(0, 5);
280 m.update_wal_catchup(5, 5);
281
282 let err = m.start_cutover(2000).unwrap_err();
284 assert!(matches!(
285 err,
286 crate::ClusterError::MigrationPauseBudgetExceeded { .. }
287 ));
288 }
289
290 #[test]
291 fn failure_recovery() {
292 let mut m = test_migration();
293 m.start_base_copy(100);
294 m.fail("network partition".into());
295 assert!(!m.is_active());
296 assert!(matches!(m.phase, MigrationPhase::Failed { .. }));
297 }
298
299 #[test]
300 fn catchup_threshold() {
301 let mut m = test_migration();
302 m.start_base_copy(100);
303 m.start_wal_catchup(0, 100);
304
305 m.update_wal_catchup(10, 100);
307 assert!(!m.is_catchup_ready());
308
309 m.update_wal_catchup(95, 100);
311 assert!(m.is_catchup_ready());
312 }
313}