1use std::time::Instant;
4
5use super::types::{MigrationError, MigrationPlan, MigrationProgress, MigrationState, SkipReason};
6
7#[derive(Debug)]
33pub struct MigrationController {
34 pub(super) plan: MigrationPlan,
35 pub(super) state: MigrationState,
36 started_at: Option<Instant>,
37 error_count: usize,
38 skip_reasons: Vec<SkipReason>,
39}
40
41impl MigrationController {
42 pub fn new(plan: MigrationPlan) -> Self {
44 Self {
45 plan,
46 state: MigrationState::Planned,
47 started_at: None,
48 error_count: 0,
49 skip_reasons: Vec::new(),
50 }
51 }
52
53 pub fn start(&mut self) -> Result<(), MigrationError> {
55 match &self.state {
56 MigrationState::Planned => {
57 self.state = MigrationState::InProgress {
58 processed: 0,
59 total: self.plan.total_embeddings,
60 skipped: 0,
61 };
62 self.started_at = Some(Instant::now());
63 Ok(())
64 }
65 other => Err(MigrationError::InvalidTransition {
66 from: format!("{other:?}"),
67 to: "InProgress".to_string(),
68 }),
69 }
70 }
71
72 pub fn record_progress(&mut self, newly_processed: usize) -> Result<(), MigrationError> {
74 match &self.state {
75 MigrationState::InProgress {
76 processed,
77 total,
78 skipped,
79 } => {
80 let new_processed = processed + newly_processed;
81 let effective_total = total.saturating_sub(*skipped);
82 if new_processed >= effective_total {
83 let duration = self
84 .started_at
85 .map(|s| s.elapsed().as_secs_f64())
86 .unwrap_or(0.0);
87 self.state = MigrationState::Completed {
88 processed: new_processed,
89 skipped: *skipped,
90 duration_secs: duration,
91 };
92 } else {
93 self.state = MigrationState::InProgress {
94 processed: new_processed,
95 total: *total,
96 skipped: *skipped,
97 };
98 }
99 Ok(())
100 }
101 other => Err(MigrationError::InvalidTransition {
102 from: format!("{other:?}"),
103 to: "InProgress (progress)".to_string(),
104 }),
105 }
106 }
107
108 pub fn record_error(&mut self) {
110 self.error_count += 1;
111 }
112
113 pub fn record_skip(&mut self, reason: SkipReason) -> Result<(), MigrationError> {
136 match &self.state {
137 MigrationState::InProgress {
138 processed,
139 total,
140 skipped,
141 } => {
142 self.skip_reasons.push(reason);
143 self.state = MigrationState::InProgress {
144 processed: *processed,
145 total: *total,
146 skipped: skipped + 1,
147 };
148 Ok(())
149 }
150 other => Err(MigrationError::InvalidTransition {
151 from: format!("{other:?}"),
152 to: "InProgress (skip)".to_string(),
153 }),
154 }
155 }
156
157 #[inline]
159 pub fn skip_reasons(&self) -> &[SkipReason] {
160 &self.skip_reasons
161 }
162
163 pub fn effective_coverage(&self) -> f64 {
165 self.state.effective_coverage()
166 }
167
168 pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), MigrationError> {
170 match &self.state {
171 MigrationState::InProgress {
172 processed,
173 total,
174 skipped,
175 } => {
176 self.state = MigrationState::Paused {
177 processed: *processed,
178 total: *total,
179 skipped: *skipped,
180 reason: reason.into(),
181 };
182 Ok(())
183 }
184 other => Err(MigrationError::InvalidTransition {
185 from: format!("{other:?}"),
186 to: "Paused".to_string(),
187 }),
188 }
189 }
190
191 pub fn resume(&mut self) -> Result<(), MigrationError> {
193 match &self.state {
194 MigrationState::Paused {
195 processed,
196 total,
197 skipped,
198 ..
199 }
200 | MigrationState::Failed {
201 processed,
202 total,
203 skipped,
204 ..
205 } => {
206 self.state = MigrationState::InProgress {
207 processed: *processed,
208 total: *total,
209 skipped: *skipped,
210 };
211 if self.started_at.is_none() {
212 self.started_at = Some(Instant::now());
213 }
214 Ok(())
215 }
216 other => Err(MigrationError::InvalidTransition {
217 from: format!("{other:?}"),
218 to: "InProgress (resume)".to_string(),
219 }),
220 }
221 }
222
223 pub fn fail(&mut self, error: impl Into<String>) -> Result<(), MigrationError> {
225 match &self.state {
226 MigrationState::InProgress {
227 processed,
228 total,
229 skipped,
230 } => {
231 self.state = MigrationState::Failed {
232 processed: *processed,
233 total: *total,
234 skipped: *skipped,
235 error: error.into(),
236 };
237 Ok(())
238 }
239 other => Err(MigrationError::InvalidTransition {
240 from: format!("{other:?}"),
241 to: "Failed".to_string(),
242 }),
243 }
244 }
245
246 pub fn cancel(&mut self) -> Result<(), MigrationError> {
248 if self.state.is_terminal() {
249 return Err(MigrationError::InvalidTransition {
250 from: format!("{:?}", self.state),
251 to: "Cancelled".to_string(),
252 });
253 }
254 let (processed, total, skipped) = match &self.state {
255 MigrationState::Planned => (0, self.plan.total_embeddings, 0),
256 MigrationState::InProgress {
257 processed,
258 total,
259 skipped,
260 } => (*processed, *total, *skipped),
261 MigrationState::Paused {
262 processed,
263 total,
264 skipped,
265 ..
266 } => (*processed, *total, *skipped),
267 MigrationState::Failed {
268 processed,
269 total,
270 skipped,
271 ..
272 } => (*processed, *total, *skipped),
273 _ => unreachable!(),
274 };
275 self.state = MigrationState::Cancelled {
276 processed,
277 total,
278 skipped,
279 };
280 Ok(())
281 }
282
283 pub fn progress(&self) -> MigrationProgress {
285 let throughput = match (&self.state, self.started_at) {
286 (MigrationState::InProgress { processed, .. }, Some(start)) => {
287 let elapsed = start.elapsed().as_secs_f64();
288 if elapsed > 0.0 {
289 *processed as f64 / elapsed
290 } else {
291 0.0
292 }
293 }
294 _ => 0.0,
295 };
296
297 let eta_secs = match &self.state {
298 MigrationState::InProgress {
299 processed,
300 total,
301 skipped,
302 } if throughput > 0.0 => {
303 let effective_total = total.saturating_sub(*skipped);
304 let remaining = effective_total.saturating_sub(*processed);
305 Some(remaining as f64 / throughput)
306 }
307 _ => None,
308 };
309
310 MigrationProgress {
311 migration_id: self.plan.id.clone(),
312 state: self.state.clone(),
313 skipped: self.state.skipped(),
314 effective_total: self.state.effective_total(),
315 effective_coverage: self.state.effective_coverage(),
316 throughput,
317 eta_secs,
318 error_count: self.error_count,
319 }
320 }
321
322 #[inline]
324 pub fn state(&self) -> &MigrationState {
325 &self.state
326 }
327
328 #[inline]
330 pub fn plan(&self) -> &MigrationPlan {
331 &self.plan
332 }
333}