1#[macro_use]
20extern crate log;
21#[macro_use]
22extern crate tetsy_macros;
23
24extern crate tetsy_kvdb;
25extern crate tetsy_kvdb_rocksdb;
26
27use std::collections::BTreeMap;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::{fs, io, error};
31
32use tetsy_kvdb::DBTransaction;
33use tetsy_kvdb_rocksdb::{CompactionProfile, Database, DatabaseConfig};
34
35fn other_io_err<E>(e: E) -> io::Error where E: Into<Box<dyn error::Error + Send + Sync>> {
36 io::Error::new(io::ErrorKind::Other, e)
37}
38
39#[derive(Clone)]
41pub struct Config {
42 pub batch_size: usize,
44 pub compaction_profile: CompactionProfile,
46}
47
48impl Default for Config {
49 fn default() -> Self {
50 Config {
51 batch_size: 1024,
52 compaction_profile: Default::default(),
53 }
54 }
55}
56
57pub struct Batch {
59 inner: BTreeMap<Vec<u8>, Vec<u8>>,
60 batch_size: usize,
61 column: u32,
62}
63
64impl Batch {
65 pub fn new(config: &Config, column: u32) -> Self {
67 Batch {
68 inner: BTreeMap::new(),
69 batch_size: config.batch_size,
70 column,
71 }
72 }
73
74 pub fn insert(&mut self, key: Vec<u8>, value: Vec<u8>, dest: &mut Database) -> io::Result<()> {
76 self.inner.insert(key, value);
77 if self.inner.len() == self.batch_size {
78 self.commit(dest)?;
79 }
80 Ok(())
81 }
82
83 pub fn commit(&mut self, dest: &mut Database) -> io::Result<()> {
85 if self.inner.is_empty() { return Ok(()) }
86
87 let mut transaction = DBTransaction::new();
88
89 for keypair in &self.inner {
90 transaction.put(self.column, &keypair.0, &keypair.1);
91 }
92
93 self.inner.clear();
94 dest.write(transaction)
95 }
96}
97
98pub trait Migration {
100 fn pre_columns(&self) -> u32 { self.columns() }
102 fn columns(&self) -> u32;
104 fn alters_existing(&self) -> bool { true }
107 fn version(&self) -> u32;
109 fn migrate(&mut self, source: Arc<Database>, config: &Config, destination: &mut Database, col: u32) -> io::Result<()>;
111}
112
113pub trait SimpleMigration {
115 fn columns(&self) -> u32;
117 fn version(&self) -> u32;
119 fn migrated_column_index(&self) -> u32;
121 fn simple_migrate(&mut self, key: Vec<u8>, value: Vec<u8>) -> Option<(Vec<u8>, Vec<u8>)>;
124}
125
126impl<T: SimpleMigration> Migration for T {
127 fn columns(&self) -> u32 { SimpleMigration::columns(self) }
128
129 fn alters_existing(&self) -> bool { true }
130
131 fn version(&self) -> u32 { SimpleMigration::version(self) }
132
133 fn migrate(&mut self, source: Arc<Database>, config: &Config, dest: &mut Database, col: u32) -> io::Result<()> {
134 let migration_needed = col == SimpleMigration::migrated_column_index(self);
135 let mut batch = Batch::new(config, col);
136
137 for (key, value) in source.iter(col) {
138 if migration_needed {
139 if let Some((key, value)) = self.simple_migrate(key.into_vec(), value.into_vec()) {
140 batch.insert(key, value, dest)?;
141 }
142 } else {
143 batch.insert(key.into_vec(), value.into_vec(), dest)?;
144 }
145 }
146
147 batch.commit(dest)
148 }
149}
150
151pub struct ChangeColumns {
153 pub pre_columns: u32,
155 pub post_columns: u32,
157 pub version: u32,
159}
160
161impl Migration for ChangeColumns {
162 fn pre_columns(&self) -> u32 { self.pre_columns }
163 fn columns(&self) -> u32 { self.post_columns }
164 fn alters_existing(&self) -> bool { false }
165 fn version(&self) -> u32 { self.version }
166 fn migrate(&mut self, _: Arc<Database>, _: &Config, _: &mut Database, _: u32) -> io::Result<()> {
167 Ok(())
168 }
169}
170
171fn database_path(path: &Path) -> PathBuf {
173 let mut temp_path = path.to_owned();
174 temp_path.pop();
175 temp_path
176}
177
178enum TempIndex {
179 One,
180 Two,
181}
182
183impl TempIndex {
184 fn swap(&mut self) {
185 match *self {
186 TempIndex::One => *self = TempIndex::Two,
187 TempIndex::Two => *self = TempIndex::One,
188 }
189 }
190
191 fn path(&self, db_root: &Path) -> PathBuf {
193 let mut buf = db_root.to_owned();
194
195 match *self {
196 TempIndex::One => buf.push("temp_migration_1"),
197 TempIndex::Two => buf.push("temp_migration_2"),
198 };
199
200 buf
201 }
202}
203
204pub struct Manager {
206 config: Config,
207 migrations: Vec<Box<dyn Migration>>,
208}
209
210impl Manager {
211 pub fn new(config: Config) -> Self {
213 Manager {
214 config,
215 migrations: vec![],
216 }
217 }
218
219 pub fn add_migration<T: 'static>(&mut self, migration: T) -> io::Result<()> where T: Migration {
221 let is_new = match self.migrations.last() {
222 Some(last) => migration.version() > last.version(),
223 None => true,
224 };
225
226 match is_new {
227 true => Ok(self.migrations.push(Box::new(migration))),
228 false => Err(other_io_err("Cannot add migration.")),
229 }
230 }
231
232 pub fn execute(&mut self, old_path: &Path, version: u32) -> io::Result<PathBuf> {
235 let config = self.config.clone();
236 let migrations = self.migrations_from(version);
237 trace!(target: "migration", "Total migrations to execute for version {}: {}", version, migrations.len());
238 if migrations.is_empty() {
239 return Err(other_io_err("Migration impossible"));
240 };
241
242 let columns = migrations.first().expect("checked empty above; qed").pre_columns();
243 trace!(target: "migration", "Expecting database to contain {} columns", columns);
244 let mut db_config = DatabaseConfig {
245 max_open_files: 64,
246 compaction: config.compaction_profile,
247 columns,
248 ..Default::default()
249 };
250
251 let db_root = database_path(old_path);
252 let mut temp_idx = TempIndex::One;
253 let mut temp_path = old_path.to_path_buf();
254
255 let old_path_str = old_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
257 let mut cur_db = Arc::new(Database::open(&db_config, old_path_str)?);
258
259 for migration in migrations {
260 trace!(target: "migration", "starting migration to version {}", migration.version());
261 let current_columns = db_config.columns;
263 db_config.columns = migration.columns();
264
265 if migration.alters_existing() {
267 temp_path = temp_idx.path(&db_root);
268
269 let temp_path_str = temp_path.to_str().ok_or_else(|| other_io_err("Migration impossible."))?;
271 let mut new_db = Database::open(&db_config, temp_path_str)?;
272
273 for col in 0..current_columns {
274 migration.migrate(cur_db.clone(), &config, &mut new_db, col)?
275 }
276
277 cur_db = Arc::new(new_db);
279 temp_idx.swap();
280
281 let _ = fs::remove_dir_all(temp_idx.path(&db_root));
283 } else {
284 let goal_columns = migration.columns();
287 while cur_db.num_columns() < goal_columns {
288 cur_db.add_column().map_err(other_io_err)?;
289 }
290
291 while cur_db.num_columns() > goal_columns {
292 cur_db.remove_last_column().map_err(other_io_err)?;
293 }
294 }
295 }
296 Ok(temp_path)
297 }
298
299 pub fn is_needed(&self, version: u32) -> bool {
301 match self.migrations.last() {
302 Some(last) => version < last.version(),
303 None => false,
304 }
305 }
306
307 fn migrations_from(&mut self, version: u32) -> Vec<&mut Box<dyn Migration>> {
309 self.migrations.iter_mut().filter(|m| m.version() > version).collect()
310 }
311}
312
313pub struct Progress {
315 current: usize,
316 max: usize,
317}
318
319impl Default for Progress {
320 fn default() -> Self {
321 Progress {
322 current: 0,
323 max: 100_000,
324 }
325 }
326}
327
328impl Progress {
329 pub fn tick(&mut self) {
331 self.current += 1;
332 if self.current == self.max {
333 self.current = 0;
334 flush!(".");
335 }
336 }
337}