#[macro_use]
extern crate log;
#[macro_use]
extern crate tetsy_macros;
extern crate tetsy_kvdb;
extern crate tetsy_kvdb_rocksdb;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{fs, io, error};
use tetsy_kvdb::DBTransaction;
use tetsy_kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<dyn error::Error + Send + Sync>> {
io::Error::new(io::ErrorKind::Other, e)
}
#[derive(Clone)]
pub struct Config {
pub batch_size: usize,
pub compaction_profile: CompactionProfile,
}
impl Default for Config {
fn default() -> Self {
Config {
batch_size: 1024,
compaction_profile: Default::default(),
}
}
}
pub struct Batch {
inner: BTreeMap<Vec<u8>, Vec<u8>>,
batch_size: usize,
column: u32,
}
impl Batch {
pub fn new(config: &Config, column: u32) -> Self {
Batch {
inner: BTreeMap::new(),
batch_size: config.batch_size,
column,
}
}
pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> io::Result<()> {
self.inner.insert(key, value);
if self.inner.len() == self.batch_size {
self.commit(dest)?;
}
Ok(())
}
pub fn commit(&mut self, dest: &mut Database) -> io::Result<()> {
if self.inner.is_empty() { return Ok(()) }
let mut transaction = DBTransaction::new();
for keypair in &self.inner {
transaction.put(self.column, &keypair.0, &keypair.1);
}
self.inner.clear();
dest.write(transaction)
}
}
pub trait Migration {
fn pre_columns(&self) -> u32 { self.columns() }
fn columns(&self) -> u32;
fn alters_existing(&self) -> bool { true }
fn version(&self) -> u32;
fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: u32) -> io::Result<()>;
}
pub trait SimpleMigration {
fn columns(&self) -> u32;
fn version(&self) -> u32;
fn migrated_column_index(&self) -> u32;
fn simple_migrate(&mut self, key: Vec<u8>, value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)>;
}
impl<T: SimpleMigration> Migration for T {
fn columns(&self) -> u32 { SimpleMigration::columns(self) }
fn alters_existing(&self) -> bool { true }
fn version(&self) -> u32 { SimpleMigration::version(self) }
fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: u32) -> io::Result<()> {
let migration_needed = col == SimpleMigration::migrated_column_index(self);
let mut batch = Batch::new(config, col);
for (key, value) in source.iter(col) {
if migration_needed {
if let Some((key, value)) = self.simple_migrate(key.into_vec(), value.into_vec()) {
batch.insert(key, value, dest)?;
}
} else {
batch.insert(key.into_vec(), value.into_vec(), dest)?;
}
}
batch.commit(dest)
}
}
pub struct ChangeColumns {
pub pre_columns: u32,
pub post_columns: u32,
pub version: u32,
}
impl Migration for ChangeColumns {
fn pre_columns(&self) -> u32 { self.pre_columns }
fn columns(&self) -> u32 { self.post_columns }
fn alters_existing(&self) -> bool { false }
fn version(&self) -> u32 { self.version }
fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: u32) -> io::Result<()> {
Ok(())
}
}
fn database_path(path: &Path) -> PathBuf {
let mut temp_path = path.to_owned();
temp_path.pop();
temp_path
}
enum TempIndex {
One,
Two,
}
impl TempIndex {
fn swap(&mut self) {
match *self {
TempIndex::One => *self = TempIndex::Two,
TempIndex::Two => *self = TempIndex::One,
}
}
fn path(&self, db_root: &Path) -> PathBuf {
let mut buf = db_root.to_owned();
match *self {
TempIndex::One => buf.push("temp_migration_1"),
TempIndex::Two => buf.push("temp_migration_2"),
};
buf
}
}
pub struct Manager {
config: Config,
migrations: Vec<Box<dyn Migration>>,
}
impl Manager {
pub fn new(config: Config) -> Self {
Manager {
config,
migrations: vec![],
}
}
pub fn add_migration<T: 'static>(&mut self, migration: T) -> io::Result<()> where T: Migration {
let is_new = match self.migrations.last() {
Some(last) => migration.version() > last.version(),
None => true,
};
match is_new {
true => Ok(self.migrations.push(Box::new(migration))),
false => Err(other_io_err("Cannot add migration.")),
}
}
pub fn execute(&mut self, old_path: &Path, version: u32) -> io::Result<PathBuf> {
let config = self.config.clone();
let migrations = self.migrations_from(version);
trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
if migrations.is_empty() {
return Err(other_io_err("Migration impossible"));
};
let columns = migrations.first().expect("checked empty above; qed").pre_columns();
trace!(target: "migration", "Expecting database to contain {} columns", columns);
let mut db_config = DatabaseConfig {
max_open_files: 64,
compaction: config.compaction_profile,
columns,
..Default::default()
};
let db_root = database_path(old_path);
let mut temp_idx = TempIndex::One;
let mut temp_path = old_path.to_path_buf();
let old_path_str = old_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
let mut cur_db = Arc::new(Database::open(&db_config, old_path_str)?);
for migration in migrations {
trace!(target: "migration", "starting migration to version {}", migration.version());
let current_columns = db_config.columns;
db_config.columns = migration.columns();
if migration.alters_existing() {
temp_path = temp_idx.path(&db_root);
let temp_path_str = temp_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
let mut new_db = Database::open(&db_config, temp_path_str)?;
for col in 0..current_columns {
migration.migrate(cur_db.clone(), &config, &mut new_db, col)?
}
cur_db = Arc::new(new_db);
temp_idx.swap();
let _ = fs::remove_dir_all(temp_idx.path(&db_root));
} else {
let goal_columns = migration.columns();
while cur_db.num_columns() < goal_columns {
cur_db.add_column().map_err(other_io_err)?;
}
while cur_db.num_columns() > goal_columns {
cur_db.remove_last_column().map_err(other_io_err)?;
}
}
}
Ok(temp_path)
}
pub fn is_needed(&self, version: u32) -> bool {
match self.migrations.last() {
Some(last) => version < last.version(),
None => false,
}
}
fn migrations_from(&mut self, version: u32) -> Vec<&mut Box<dyn Migration>> {
self.migrations.iter_mut().filter(|m| m.version() > version).collect()
}
}
pub struct Progress {
current: usize,
max: usize,
}
impl Default for Progress {
fn default() -> Self {
Progress {
current: 0,
max: 100_000,
}
}
}
impl Progress {
pub fn tick(&mut self) {
self.current += 1;
if self.current == self.max {
self.current = 0;
flush!(".");
}
}
}