use std::time::Instant;
use super::types::{MigrationError, MigrationPlan, MigrationProgress, MigrationState, SkipReason};
#[derive(Debug)]
pub struct MigrationController {
pub(super) plan: MigrationPlan,
pub(super) state: MigrationState,
started_at: Option<Instant>,
error_count: usize,
skip_reasons: Vec<SkipReason>,
}
impl MigrationController {
pub fn new(plan: MigrationPlan) -> Self {
Self {
plan,
state: MigrationState::Planned,
started_at: None,
error_count: 0,
skip_reasons: Vec::new(),
}
}
pub fn start(&mut self) -> Result<(), MigrationError> {
match &self.state {
MigrationState::Planned => {
self.state = MigrationState::InProgress {
processed: 0,
total: self.plan.total_embeddings,
skipped: 0,
};
self.started_at = Some(Instant::now());
Ok(())
}
other => Err(MigrationError::InvalidTransition {
from: format!("{other:?}"),
to: "InProgress".to_string(),
}),
}
}
pub fn record_progress(&mut self, newly_processed: usize) -> Result<(), MigrationError> {
match &self.state {
MigrationState::InProgress {
processed,
total,
skipped,
} => {
let new_processed = processed + newly_processed;
let effective_total = total.saturating_sub(*skipped);
if new_processed >= effective_total {
let duration = self
.started_at
.map(|s| s.elapsed().as_secs_f64())
.unwrap_or(0.0);
self.state = MigrationState::Completed {
processed: new_processed,
skipped: *skipped,
duration_secs: duration,
};
} else {
self.state = MigrationState::InProgress {
processed: new_processed,
total: *total,
skipped: *skipped,
};
}
Ok(())
}
other => Err(MigrationError::InvalidTransition {
from: format!("{other:?}"),
to: "InProgress (progress)".to_string(),
}),
}
}
pub fn record_error(&mut self) {
self.error_count += 1;
}
pub fn record_skip(&mut self, reason: SkipReason) -> Result<(), MigrationError> {
match &self.state {
MigrationState::InProgress {
processed,
total,
skipped,
} => {
self.skip_reasons.push(reason);
self.state = MigrationState::InProgress {
processed: *processed,
total: *total,
skipped: skipped + 1,
};
Ok(())
}
other => Err(MigrationError::InvalidTransition {
from: format!("{other:?}"),
to: "InProgress (skip)".to_string(),
}),
}
}
#[inline]
pub fn skip_reasons(&self) -> &[SkipReason] {
&self.skip_reasons
}
pub fn effective_coverage(&self) -> f64 {
self.state.effective_coverage()
}
pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), MigrationError> {
match &self.state {
MigrationState::InProgress {
processed,
total,
skipped,
} => {
self.state = MigrationState::Paused {
processed: *processed,
total: *total,
skipped: *skipped,
reason: reason.into(),
};
Ok(())
}
other => Err(MigrationError::InvalidTransition {
from: format!("{other:?}"),
to: "Paused".to_string(),
}),
}
}
pub fn resume(&mut self) -> Result<(), MigrationError> {
match &self.state {
MigrationState::Paused {
processed,
total,
skipped,
..
}
| MigrationState::Failed {
processed,
total,
skipped,
..
} => {
self.state = MigrationState::InProgress {
processed: *processed,
total: *total,
skipped: *skipped,
};
if self.started_at.is_none() {
self.started_at = Some(Instant::now());
}
Ok(())
}
other => Err(MigrationError::InvalidTransition {
from: format!("{other:?}"),
to: "InProgress (resume)".to_string(),
}),
}
}
pub fn fail(&mut self, error: impl Into<String>) -> Result<(), MigrationError> {
match &self.state {
MigrationState::InProgress {
processed,
total,
skipped,
} => {
self.state = MigrationState::Failed {
processed: *processed,
total: *total,
skipped: *skipped,
error: error.into(),
};
Ok(())
}
other => Err(MigrationError::InvalidTransition {
from: format!("{other:?}"),
to: "Failed".to_string(),
}),
}
}
pub fn cancel(&mut self) -> Result<(), MigrationError> {
if self.state.is_terminal() {
return Err(MigrationError::InvalidTransition {
from: format!("{:?}", self.state),
to: "Cancelled".to_string(),
});
}
let (processed, total, skipped) = match &self.state {
MigrationState::Planned => (0, self.plan.total_embeddings, 0),
MigrationState::InProgress {
processed,
total,
skipped,
} => (*processed, *total, *skipped),
MigrationState::Paused {
processed,
total,
skipped,
..
} => (*processed, *total, *skipped),
MigrationState::Failed {
processed,
total,
skipped,
..
} => (*processed, *total, *skipped),
_ => unreachable!(),
};
self.state = MigrationState::Cancelled {
processed,
total,
skipped,
};
Ok(())
}
pub fn progress(&self) -> MigrationProgress {
let throughput = match (&self.state, self.started_at) {
(MigrationState::InProgress { processed, .. }, Some(start)) => {
let elapsed = start.elapsed().as_secs_f64();
if elapsed > 0.0 {
*processed as f64 / elapsed
} else {
0.0
}
}
_ => 0.0,
};
let eta_secs = match &self.state {
MigrationState::InProgress {
processed,
total,
skipped,
} if throughput > 0.0 => {
let effective_total = total.saturating_sub(*skipped);
let remaining = effective_total.saturating_sub(*processed);
Some(remaining as f64 / throughput)
}
_ => None,
};
MigrationProgress {
migration_id: self.plan.id.clone(),
state: self.state.clone(),
skipped: self.state.skipped(),
effective_total: self.state.effective_total(),
effective_coverage: self.state.effective_coverage(),
throughput,
eta_secs,
error_count: self.error_count,
}
}
#[inline]
pub fn state(&self) -> &MigrationState {
&self.state
}
#[inline]
pub fn plan(&self) -> &MigrationPlan {
&self.plan
}
}