use std::time::{Duration, Instant};
use crate::migration::{
MigrationController, MigrationError, MigrationPlan, MigrationProgress, MigrationState,
};
use crate::model::EmbeddingModel;
use super::types::{BackfillConfig, EmbeddingRoute, EmbeddingRoutingConfig, RoutingPhase};
#[derive(Debug)]
pub struct BackfillCoordinator {
pub(super) config: BackfillConfig,
pub(super) controller: MigrationController,
backfilled_count: usize,
pub(super) cutover_at: Option<Instant>,
rollback_window: Duration,
}
impl BackfillCoordinator {
pub fn new(plan: MigrationPlan, config: BackfillConfig) -> Self {
let rollback_window = Duration::from_secs(config.rollback_window_secs);
Self {
config,
controller: MigrationController::new(plan),
backfilled_count: 0,
cutover_at: None,
rollback_window,
}
}
pub fn with_defaults(plan: MigrationPlan) -> Self {
Self::new(plan, BackfillConfig::default())
}
pub fn start(&mut self) -> Result<(), MigrationError> {
self.controller.start()
}
pub fn route_request(&self, is_new_document: bool) -> EmbeddingRoute {
match self.controller.state() {
MigrationState::Planned => EmbeddingRoute::Legacy,
MigrationState::InProgress { .. } => {
if is_new_document && self.config.dual_write {
EmbeddingRoute::DualWrite
} else {
EmbeddingRoute::Legacy
}
}
MigrationState::Paused { .. } => EmbeddingRoute::Legacy,
MigrationState::Completed { .. } => EmbeddingRoute::Target,
MigrationState::Failed { .. } => EmbeddingRoute::Legacy,
MigrationState::Cancelled { .. } => EmbeddingRoute::Legacy,
}
}
pub fn route_query(&self) -> EmbeddingRoute {
match self.controller.state() {
MigrationState::Completed { .. } => EmbeddingRoute::Target,
MigrationState::InProgress {
processed, total, ..
} => {
let progress = if *total == 0 {
1.0
} else {
*processed as f64 / *total as f64
};
if progress >= self.config.target_query_threshold {
EmbeddingRoute::Target
} else {
EmbeddingRoute::Legacy
}
}
_ => EmbeddingRoute::Legacy,
}
}
pub fn record_batch(&mut self, count: usize) -> Result<(), MigrationError> {
let was_in_progress = matches!(self.controller.state(), MigrationState::InProgress { .. });
self.backfilled_count += count;
self.controller.record_progress(count)?;
if was_in_progress && matches!(self.controller.state(), MigrationState::Completed { .. }) {
self.cutover_at = Some(Instant::now());
}
Ok(())
}
pub fn record_error(&mut self) {
self.controller.record_error();
}
pub fn pause(&mut self, reason: impl Into<String>) -> Result<(), MigrationError> {
self.controller.pause(reason)
}
pub fn resume(&mut self) -> Result<(), MigrationError> {
self.controller.resume()
}
pub fn cancel(&mut self) -> Result<(), MigrationError> {
self.controller.cancel()
}
#[inline]
pub fn state(&self) -> &MigrationState {
self.controller.state()
}
#[inline]
pub fn progress(&self) -> MigrationProgress {
self.controller.progress()
}
#[inline]
pub fn source_model(&self) -> EmbeddingModel {
self.controller.plan().source_model
}
#[inline]
pub fn target_model(&self) -> EmbeddingModel {
self.controller.plan().target_model
}
#[inline]
pub fn in_rollback_window(&self) -> bool {
self.cutover_at
.map(|t| t.elapsed() < self.rollback_window)
.unwrap_or(false)
}
pub fn routing_config(&self) -> EmbeddingRoutingConfig {
match self.controller.state() {
MigrationState::Planned => EmbeddingRoutingConfig {
query_model: self.source_model(),
write_models: vec![self.source_model()],
phase: RoutingPhase::Stable,
migration_id: None,
},
MigrationState::InProgress { .. } => {
let write_models = if self.config.dual_write {
vec![self.source_model(), self.target_model()]
} else {
vec![self.source_model()]
};
EmbeddingRoutingConfig {
query_model: self.source_model(), write_models,
phase: RoutingPhase::Migrating,
migration_id: Some(self.controller.plan().id.clone()),
}
}
MigrationState::Completed { .. } => {
if self.in_rollback_window() {
EmbeddingRoutingConfig {
query_model: self.target_model(), write_models: vec![self.source_model(), self.target_model()], phase: RoutingPhase::RollbackWindow,
migration_id: Some(self.controller.plan().id.clone()),
}
} else {
EmbeddingRoutingConfig {
query_model: self.target_model(),
write_models: vec![self.target_model()],
phase: RoutingPhase::Stable,
migration_id: None,
}
}
}
_ => EmbeddingRoutingConfig {
query_model: self.source_model(),
write_models: vec![self.source_model()],
phase: RoutingPhase::Stable,
migration_id: None,
},
}
}
#[inline]
pub fn config(&self) -> &BackfillConfig {
&self.config
}
#[inline]
pub fn backfilled_count(&self) -> usize {
self.backfilled_count
}
pub fn next_batch_size(&self) -> usize {
match self.controller.state() {
MigrationState::InProgress {
processed, total, ..
} => {
let remaining = total.saturating_sub(*processed);
remaining.min(self.config.batch_size)
}
_ => 0,
}
}
}