Skip to main content

lattice_embed/migration/
controller.rs

1//! Migration controller: state machine executor.
2
3use std::time::Instant;
4
5use super::types::{MigrationError, MigrationPlan, MigrationProgress, MigrationState, SkipReason};
6
7/// Manages the state machine for a single migration.
8///
9/// # Example
10///
11/// ```rust
12/// use lattice_embed::migration::{MigrationController, MigrationPlan};
13/// use lattice_embed::EmbeddingModel;
14///
15/// let plan = MigrationPlan {
16///     id: "mig-001".to_string(),
17///     source_model: EmbeddingModel::BgeSmallEnV15,
18///     target_model: EmbeddingModel::BgeBaseEnV15,
19///     total_embeddings: 100,
20///     batch_size: 50,
21///     created_at: "2026-01-27T00:00:00Z".to_string(),
22/// };
23///
24/// let mut ctrl = MigrationController::new(plan);
25/// ctrl.start().unwrap();
26/// ctrl.record_progress(50).unwrap();
27///
28/// let report = ctrl.progress();
29/// assert!(report.state.is_active());
30/// assert_eq!(report.state.processed(), 50);
31/// ```
32#[derive(Debug)]
33pub struct MigrationController {
34    pub(super) plan: MigrationPlan,
35    pub(super) state: MigrationState,
36    started_at: Option<Instant>,
37    error_count: usize,
38    skip_reasons: Vec<SkipReason>,
39}
40
41impl MigrationController {
42    /// Create a new migration controller from a plan.
43    pub fn new(plan: MigrationPlan) -> Self {
44        Self {
45            plan,
46            state: MigrationState::Planned,
47            started_at: None,
48            error_count: 0,
49            skip_reasons: Vec::new(),
50        }
51    }
52
53    /// Start the migration (`Planned` -> `InProgress`).
54    pub fn start(&mut self) -> Result<(), MigrationError> {
55        match &self.state {
56            MigrationState::Planned => {
57                self.state = MigrationState::InProgress {
58                    processed: 0,
59                    total: self.plan.total_embeddings,
60                    skipped: 0,
61                };
62                self.started_at = Some(Instant::now());
63                Ok(())
64            }
65            other => Err(MigrationError::InvalidTransition {
66                from: format!("{other:?}"),
67                to: "InProgress".to_string(),
68            }),
69        }
70    }
71
72    /// Record that `newly_processed` embeddings were completed.
73    pub fn record_progress(&mut self, newly_processed: usize) -> Result<(), MigrationError> {
74        match &self.state {
75            MigrationState::InProgress {
76                processed,
77                total,
78                skipped,
79            } => {
80                let new_processed = processed + newly_processed;
81                let effective_total = total.saturating_sub(*skipped);
82                if new_processed >= effective_total {
83                    let duration = self
84                        .started_at
85                        .map(|s| s.elapsed().as_secs_f64())
86                        .unwrap_or(0.0);
87                    self.state = MigrationState::Completed {
88                        processed: new_processed,
89                        skipped: *skipped,
90                        duration_secs: duration,
91                    };
92                } else {
93                    self.state = MigrationState::InProgress {
94                        processed: new_processed,
95                        total: *total,
96                        skipped: *skipped,
97                    };
98                }
99                Ok(())
100            }
101            other => Err(MigrationError::InvalidTransition {
102                from: format!("{other:?}"),
103                to: "InProgress (progress)".to_string(),
104            }),
105        }
106    }
107
108    /// Record a non-fatal error during processing.
109    pub fn record_error(&mut self) {
110        self.error_count += 1;
111    }
112
113    /// Record an item that will be permanently skipped.
114    ///
115    /// # Example
116    ///
117    /// ```rust
118    /// use lattice_embed::migration::{MigrationController, MigrationPlan, SkipReason};
119    /// use lattice_embed::EmbeddingModel;
120    ///
121    /// let plan = MigrationPlan {
122    ///     id: "mig-001".to_string(),
123    ///     source_model: EmbeddingModel::BgeSmallEnV15,
124    ///     target_model: EmbeddingModel::BgeBaseEnV15,
125    ///     total_embeddings: 100,
126    ///     batch_size: 50,
127    ///     created_at: "2026-01-27T00:00:00Z".to_string(),
128    /// };
129    ///
130    /// let mut ctrl = MigrationController::new(plan);
131    /// ctrl.start().unwrap();
132    /// ctrl.record_skip(SkipReason::ContentTooLarge { size: 50000, max: 8192 }).unwrap();
133    /// assert_eq!(ctrl.state().skipped(), 1);
134    /// ```
135    pub fn record_skip(&mut self, reason: SkipReason) -> Result<(), MigrationError> {
136        match &self.state {
137            MigrationState::InProgress {
138                processed,
139                total,
140                skipped,
141            } => {
142                self.skip_reasons.push(reason);
143                self.state = MigrationState::InProgress {
144                    processed: *processed,
145                    total: *total,
146                    skipped: skipped + 1,
147                };
148                Ok(())
149            }
150            other => Err(MigrationError::InvalidTransition {
151                from: format!("{other:?}"),
152                to: "InProgress (skip)".to_string(),
153            }),
154        }
155    }
156
157    /// Returns the list of reasons why entries were skipped during migration.
158    #[inline]
159    pub fn skip_reasons(&self) -> &[SkipReason] {
160        &self.skip_reasons
161    }
162
163    /// Returns the effective coverage fraction (0.0–1.0) of the migration.
164    pub fn effective_coverage(&self) -> f64 {
165        self.state.effective_coverage()
166    }
167
168    /// Pause the migration (`InProgress` -> `Paused`).
169    pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), MigrationError> {
170        match &self.state {
171            MigrationState::InProgress {
172                processed,
173                total,
174                skipped,
175            } => {
176                self.state = MigrationState::Paused {
177                    processed: *processed,
178                    total: *total,
179                    skipped: *skipped,
180                    reason: reason.into(),
181                };
182                Ok(())
183            }
184            other => Err(MigrationError::InvalidTransition {
185                from: format!("{other:?}"),
186                to: "Paused".to_string(),
187            }),
188        }
189    }
190
191    /// Resume the migration (`Paused`/`Failed` -> `InProgress`).
192    pub fn resume(&mut self) -> Result<(), MigrationError> {
193        match &self.state {
194            MigrationState::Paused {
195                processed,
196                total,
197                skipped,
198                ..
199            }
200            | MigrationState::Failed {
201                processed,
202                total,
203                skipped,
204                ..
205            } => {
206                self.state = MigrationState::InProgress {
207                    processed: *processed,
208                    total: *total,
209                    skipped: *skipped,
210                };
211                if self.started_at.is_none() {
212                    self.started_at = Some(Instant::now());
213                }
214                Ok(())
215            }
216            other => Err(MigrationError::InvalidTransition {
217                from: format!("{other:?}"),
218                to: "InProgress (resume)".to_string(),
219            }),
220        }
221    }
222
223    /// Fail the migration (`InProgress` -> `Failed`).
224    pub fn fail(&mut self, error: impl Into<String>) -> Result<(), MigrationError> {
225        match &self.state {
226            MigrationState::InProgress {
227                processed,
228                total,
229                skipped,
230            } => {
231                self.state = MigrationState::Failed {
232                    processed: *processed,
233                    total: *total,
234                    skipped: *skipped,
235                    error: error.into(),
236                };
237                Ok(())
238            }
239            other => Err(MigrationError::InvalidTransition {
240                from: format!("{other:?}"),
241                to: "Failed".to_string(),
242            }),
243        }
244    }
245
246    /// Cancel the migration (any non-terminal state -> `Cancelled`).
247    pub fn cancel(&mut self) -> Result<(), MigrationError> {
248        if self.state.is_terminal() {
249            return Err(MigrationError::InvalidTransition {
250                from: format!("{:?}", self.state),
251                to: "Cancelled".to_string(),
252            });
253        }
254        let (processed, total, skipped) = match &self.state {
255            MigrationState::Planned => (0, self.plan.total_embeddings, 0),
256            MigrationState::InProgress {
257                processed,
258                total,
259                skipped,
260            } => (*processed, *total, *skipped),
261            MigrationState::Paused {
262                processed,
263                total,
264                skipped,
265                ..
266            } => (*processed, *total, *skipped),
267            MigrationState::Failed {
268                processed,
269                total,
270                skipped,
271                ..
272            } => (*processed, *total, *skipped),
273            _ => unreachable!(),
274        };
275        self.state = MigrationState::Cancelled {
276            processed,
277            total,
278            skipped,
279        };
280        Ok(())
281    }
282
283    /// Get a snapshot of current progress.
284    pub fn progress(&self) -> MigrationProgress {
285        let throughput = match (&self.state, self.started_at) {
286            (MigrationState::InProgress { processed, .. }, Some(start)) => {
287                let elapsed = start.elapsed().as_secs_f64();
288                if elapsed > 0.0 {
289                    *processed as f64 / elapsed
290                } else {
291                    0.0
292                }
293            }
294            _ => 0.0,
295        };
296
297        let eta_secs = match &self.state {
298            MigrationState::InProgress {
299                processed,
300                total,
301                skipped,
302            } if throughput > 0.0 => {
303                let effective_total = total.saturating_sub(*skipped);
304                let remaining = effective_total.saturating_sub(*processed);
305                Some(remaining as f64 / throughput)
306            }
307            _ => None,
308        };
309
310        MigrationProgress {
311            migration_id: self.plan.id.clone(),
312            state: self.state.clone(),
313            skipped: self.state.skipped(),
314            effective_total: self.state.effective_total(),
315            effective_coverage: self.state.effective_coverage(),
316            throughput,
317            eta_secs,
318            error_count: self.error_count,
319        }
320    }
321
322    /// Returns the current migration state.
323    #[inline]
324    pub fn state(&self) -> &MigrationState {
325        &self.state
326    }
327
328    /// Returns the migration plan.
329    #[inline]
330    pub fn plan(&self) -> &MigrationPlan {
331        &self.plan
332    }
333}