migration_rocksdb/
lib.rs

1// Copyright 2015-2020 Parity Technologies (UK) Ltd.
2// This file is part of Tetsy Vapory.
3
4// Tetsy Vapory is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Tetsy Vapory is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Tetsy Vapory.  If not, see <http://www.gnu.org/licenses/>.
16
17//! DB Migration module.
18
19#[macro_use]
20extern crate log;
21#[macro_use]
22extern crate tetsy_macros;
23
24extern crate tetsy_kvdb;
25extern crate tetsy_kvdb_rocksdb;
26
27use std::collections::BTreeMap;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::{fs, io, error};
31
32use tetsy_kvdb::DBTransaction;
33use tetsy_kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
34
35fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<dyn error::Error + Send + Sync>> {
36	io::Error::new(io::ErrorKind::Other, e)
37}
38
39/// Migration config.
40#[derive(Clone)]
41pub struct Config {
42	/// Defines how many elements should be migrated at once.
43	pub batch_size: usize,
44	/// Database compaction profile.
45	pub compaction_profile: CompactionProfile,
46}
47
48impl Default for Config {
49	fn default() -> Self {
50		Config {
51			batch_size: 1024,
52			compaction_profile: Default::default(),
53		}
54	}
55}
56
57/// A batch of key-value pairs to be written into the database.
58pub struct Batch {
59	inner: BTreeMap<Vec<u8>, Vec<u8>>,
60	batch_size: usize,
61	column: u32,
62}
63
64impl Batch {
65	/// Make a new batch with the given config.
66	pub fn new(config: &Config, column: u32) -> Self {
67		Batch {
68			inner: BTreeMap::new(),
69			batch_size: config.batch_size,
70			column,
71		}
72	}
73
74	/// Insert a value into the batch, committing if necessary.
75	pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> io::Result<()> {
76		self.inner.insert(key, value);
77		if self.inner.len() == self.batch_size {
78			self.commit(dest)?;
79		}
80		Ok(())
81	}
82
83	/// Commit all the items in the batch to the given database.
84	pub fn commit(&mut self, dest: &mut Database) -> io::Result<()> {
85		if self.inner.is_empty() { return Ok(()) }
86
87		let mut transaction = DBTransaction::new();
88
89		for keypair in &self.inner {
90			transaction.put(self.column, &keypair.0, &keypair.1);
91		}
92
93		self.inner.clear();
94		dest.write(transaction)
95	}
96}
97
98/// A generalized migration from the given db to a destination db.
99pub trait Migration {
100	/// Number of columns in the database before the migration.
101	fn pre_columns(&self) -> u32 { self.columns() }
102	/// Number of columns in database after the migration.
103	fn columns(&self) -> u32;
104	/// Whether this migration alters any existing columns.
105	/// if not, then column families will simply be added and `migrate` will never be called.
106	fn alters_existing(&self) -> bool { true }
107	/// Version of the database after the migration.
108	fn version(&self) -> u32;
109	/// Migrate a source to a destination.
110	fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: u32) -> io::Result<()>;
111}
112
113/// A simple migration over key-value pairs of a single column.
114pub trait SimpleMigration {
115	/// Number of columns in database after the migration.
116	fn columns(&self) -> u32;
117	/// Version of database after the migration.
118	fn version(&self) -> u32;
119	/// Index of column which should be migrated.
120	fn migrated_column_index(&self) -> u32;
121	/// Should migrate existing object to new database.
122	/// Returns `None` if the object does not exist in new version of database.
123	fn simple_migrate(&mut self, key: Vec<u8>, value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)>;
124}
125
126impl<T: SimpleMigration> Migration for T {
127	fn columns(&self) -> u32 { SimpleMigration::columns(self) }
128
129	fn alters_existing(&self) -> bool { true }
130
131	fn version(&self) -> u32 { SimpleMigration::version(self) }
132
133	fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: u32) -> io::Result<()> {
134		let migration_needed = col == SimpleMigration::migrated_column_index(self);
135		let mut batch = Batch::new(config, col);
136
137		for (key, value) in source.iter(col) {
138			if migration_needed {
139				if let Some((key, value)) = self.simple_migrate(key.into_vec(), value.into_vec()) {
140					batch.insert(key, value, dest)?;
141				}
142			} else {
143				batch.insert(key.into_vec(), value.into_vec(), dest)?;
144			}
145		}
146
147		batch.commit(dest)
148	}
149}
150
151/// An even simpler migration which just changes the number of columns.
152pub struct ChangeColumns {
153	/// The amount of columns before this migration.
154	pub pre_columns: u32,
155	/// The amount of columns after this migration.
156	pub post_columns: u32,
157	/// The version after this migration.
158	pub version: u32,
159}
160
161impl Migration for ChangeColumns {
162	fn pre_columns(&self) -> u32 { self.pre_columns }
163	fn columns(&self) -> u32 { self.post_columns }
164	fn alters_existing(&self) -> bool { false }
165	fn version(&self) -> u32 { self.version }
166	fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: u32) -> io::Result<()> {
167		Ok(())
168	}
169}
170
171/// Get the path where all databases reside.
172fn database_path(path: &Path) -> PathBuf {
173	let mut temp_path = path.to_owned();
174	temp_path.pop();
175	temp_path
176}
177
178enum TempIndex {
179	One,
180	Two,
181}
182
183impl TempIndex {
184	fn swap(&mut self) {
185		match *self {
186			TempIndex::One => *self = TempIndex::Two,
187			TempIndex::Two => *self = TempIndex::One,
188		}
189	}
190
191	// given the path to the old database, get the path of this one.
192	fn path(&self, db_root: &Path) -> PathBuf {
193		let mut buf = db_root.to_owned();
194
195		match *self {
196			TempIndex::One => buf.push("temp_migration_1"),
197			TempIndex::Two => buf.push("temp_migration_2"),
198		};
199
200		buf
201	}
202}
203
204/// Manages database migration.
205pub struct Manager {
206	config: Config,
207	migrations: Vec<Box<dyn Migration>>,
208}
209
210impl Manager {
211	/// Creates new migration manager with given configuration.
212	pub fn new(config: Config) -> Self {
213		Manager {
214			config,
215			migrations: vec![],
216		}
217	}
218
219	/// Adds new migration rules.
220	pub fn add_migration<T: 'static>(&mut self, migration: T) -> io::Result<()> where T: Migration {
221		let is_new = match self.migrations.last() {
222			Some(last) => migration.version() > last.version(),
223			None => true,
224		};
225
226		match is_new {
227			true => Ok(self.migrations.push(Box::new(migration))),
228			false => Err(other_io_err("Cannot add migration.")),
229		}
230	}
231
232	/// Performs migration in order, starting with a source path, migrating between two temporary databases,
233	/// and producing a path where the final migration lives.
234	pub fn execute(&mut self, old_path: &Path, version: u32) -> io::Result<PathBuf> {
235		let config = self.config.clone();
236		let migrations = self.migrations_from(version);
237		trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
238		if migrations.is_empty() {
239			return Err(other_io_err("Migration impossible"));
240		};
241
242		let columns = migrations.first().expect("checked empty above; qed").pre_columns();
243		trace!(target: "migration", "Expecting database to contain {} columns", columns);
244		let mut db_config = DatabaseConfig {
245			max_open_files: 64,
246			compaction: config.compaction_profile,
247			columns,
248			..Default::default()
249		};
250
251		let db_root = database_path(old_path);
252		let mut temp_idx = TempIndex::One;
253		let mut temp_path = old_path.to_path_buf();
254
255		// start with the old db.
256		let old_path_str = old_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
257		let mut cur_db = Arc::new(Database::open(&db_config, old_path_str)?);
258
259		for migration in migrations {
260			trace!(target: "migration", "starting migration to version {}", migration.version());
261			// Change number of columns in new db
262			let current_columns = db_config.columns;
263			db_config.columns = migration.columns();
264
265			// slow migrations: alter existing data.
266			if migration.alters_existing() {
267				temp_path = temp_idx.path(&db_root);
268
269				// open the target temporary database.
270				let temp_path_str = temp_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
271				let mut new_db = Database::open(&db_config, temp_path_str)?;
272
273				for col in 0..current_columns {
274					migration.migrate(cur_db.clone(), &config, &mut new_db, col)?
275				}
276
277				// next iteration, we will migrate from this db into the other temp.
278				cur_db = Arc::new(new_db);
279				temp_idx.swap();
280
281				// remove the other temporary migration database.
282				let _ = fs::remove_dir_all(temp_idx.path(&db_root));
283			} else {
284				// migrations which simply add or remove column families.
285				// we can do this in-place.
286				let goal_columns = migration.columns();
287				while cur_db.num_columns() < goal_columns {
288					cur_db.add_column().map_err(other_io_err)?;
289				}
290
291				while cur_db.num_columns() > goal_columns {
292					cur_db.remove_last_column().map_err(other_io_err)?;
293				}
294			}
295		}
296		Ok(temp_path)
297	}
298
299	/// Returns true if migration is needed.
300	pub fn is_needed(&self, version: u32) -> bool {
301		match self.migrations.last() {
302			Some(last) => version < last.version(),
303			None => false,
304		}
305	}
306
307	/// Find all needed migrations.
308	fn migrations_from(&mut self, version: u32) -> Vec<&mut Box<dyn Migration>> {
309		self.migrations.iter_mut().filter(|m| m.version() > version).collect()
310	}
311}
312
313/// Prints a dot every `max` ticks
314pub struct Progress {
315	current: usize,
316	max: usize,
317}
318
319impl Default for Progress {
320	fn default() -> Self {
321		Progress {
322			current: 0,
323			max: 100_000,
324		}
325	}
326}
327
328impl Progress {
329	/// Tick progress meter.
330	pub fn tick(&mut self) {
331		self.current += 1;
332		if self.current == self.max {
333			self.current = 0;
334			flush!(".");
335		}
336	}
337}