use std::sync::Arc;
use reifydb_core::{
interface::catalog::migration::{Migration, MigrationAction},
internal_error,
value::column::columns::Columns,
};
use reifydb_rql::{compiler::CompilationResult, nodes::MigrateNode};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::Value;
use crate::{
Result,
vm::{services::Services, vm::Vm},
};
pub(crate) fn execute_migrate(
vm: &mut Vm,
services: &Arc<Services>,
tx: &mut Transaction<'_>,
plan: MigrateNode,
) -> Result<Columns> {
let txn = match tx {
Transaction::Admin(txn) => txn,
Transaction::Test(t) => &mut *t.inner,
_ => {
return Err(internal_error!("MIGRATE requires an admin transaction"));
}
};
let mut migrations = services.catalog.list_migrations(&mut Transaction::Admin(&mut *txn))?;
migrations.sort_by(|a, b| a.name.cmp(&b.name));
let events = services.catalog.list_migration_events(&mut Transaction::Admin(&mut *txn))?;
let pending: Vec<Migration> = migrations
.into_iter()
.filter(|m| {
let latest = events.iter().find(|e| e.migration_id == m.id);
match latest {
Some(e) => e.action != MigrationAction::Applied,
None => true, }
})
.collect();
let to_apply: Vec<Migration> = if let Some(ref target) = plan.target {
let mut result = Vec::new();
for m in pending {
result.push(m.clone());
if m.name == *target {
break;
}
}
result
} else {
pending
};
let applied_count = to_apply.len();
for migration in &to_apply {
let compiled = services.compiler.compile(&mut Transaction::Admin(&mut *txn), &migration.body)?;
match compiled {
CompilationResult::Ready(compiled_list) => {
let saved_ip = vm.ip;
let mut migration_result = Vec::new();
for compiled_unit in compiled_list.iter() {
vm.ip = 0;
vm.run(
services,
&mut Transaction::Admin(&mut *txn),
&compiled_unit.instructions,
&mut migration_result,
)?;
}
vm.ip = saved_ip;
}
CompilationResult::Incremental(mut state) => {
let saved_ip = vm.ip;
let mut migration_result = Vec::new();
while let Some(compiled_unit) = services
.compiler
.compile_next(&mut Transaction::Admin(&mut *txn), &mut state)?
{
vm.ip = 0;
vm.run(
services,
&mut Transaction::Admin(&mut *txn),
&compiled_unit.instructions,
&mut migration_result,
)?;
}
vm.ip = saved_ip;
}
}
services.catalog.create_migration_event(txn, migration, MigrationAction::Applied)?;
}
Ok(Columns::single_row([("migrations_applied", Value::Uint4(applied_count as u32))]))
}