ckb_db_migration/
lib.rs

1//! TODO(doc): @quake
2use ckb_channel::Receiver;
3use ckb_channel::select;
4use ckb_channel::unbounded;
5use ckb_db::{ReadOnlyDB, RocksDB};
6use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
7use ckb_error::{Error, InternalErrorKind};
8use ckb_logger::{debug, error, info};
9use ckb_stop_handler::register_thread;
10use console::Term;
11pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
12use std::cmp::Ordering;
13use std::collections::BTreeMap;
14use std::collections::VecDeque;
15use std::sync::Arc;
16use std::sync::Mutex;
17use std::sync::OnceLock;
18use std::thread;
19use std::thread::JoinHandle;
20
21/// Shutdown flag for background migration.
22pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();
23
24#[cfg(test)]
25mod tests;
26
27fn internal_error(reason: String) -> Error {
28    InternalErrorKind::Database.other(reason).into()
29}
30
31/// TODO(doc): @quake
32#[derive(Default)]
33pub struct Migrations {
34    migrations: BTreeMap<String, Arc<dyn Migration>>,
35}
36
37/// Commands
38#[derive(PartialEq, Eq, Debug)]
39enum Command {
40    Start,
41    Stop,
42}
43
44type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
45struct MigrationWorker {
46    tasks: Arc<Mutex<MigrationTasks>>,
47    db: RocksDB,
48    inbox: Receiver<Command>,
49}
50
51impl MigrationWorker {
52    pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
53        Self { tasks, db, inbox }
54    }
55
56    pub fn start(self) -> JoinHandle<()> {
57        thread::spawn(move || {
58            if let Ok(Command::Start) = self.inbox.recv() {
59                let mut idx = 0;
60                let migrations_count = self.tasks.lock().unwrap().len() as u64;
61                let mpb = Arc::new(MultiProgress::new());
62
63                while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
64                    select! {
65                        recv(self.inbox) -> msg => {
66                            if let Ok(Command::Stop) = msg {
67                                eprintln!("stop to run migrate in background: {}", name);
68                                break;
69                            }
70                        }
71                        default => {
72                            eprintln!("start to run migrate in background: {}", name);
73                            let mpbc = Arc::clone(&mpb);
74                            idx += 1;
75                            let pb = move |count: u64| -> ProgressBar {
76                                let pb = mpbc.add(ProgressBar::new(count));
77                                pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
78                                pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
79                                pb
80                            };
81                            if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
82                                db.put_default(MIGRATION_VERSION_KEY, task.version())
83                                .map_err(|err| {
84                                    internal_error(format!("failed to migrate the database: {err}"))
85                                })
86                                .unwrap();
87                            }
88                        }
89                    }
90                }
91            }
92        })
93    }
94}
95
96impl Migrations {
97    /// TODO(doc): @quake
98    pub fn new() -> Self {
99        Migrations {
100            migrations: BTreeMap::new(),
101        }
102    }
103
104    /// TODO(doc): @quake
105    pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
106        self.migrations
107            .insert(migration.version().to_string(), migration);
108    }
109
110    /// Check if database's version is matched with the executable binary version.
111    ///
112    /// Returns
113    /// - Less: The database version is less than the matched version of the executable binary.
114    ///   Requires migration.
115    /// - Equal: The database version is matched with the executable binary version.
116    /// - Greater: The database version is greater than the matched version of the executable binary.
117    ///   Requires upgrade the executable binary.
118    pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
119        let db_version = match db
120            .get_pinned_default(MIGRATION_VERSION_KEY)
121            .expect("get the version of database")
122        {
123            Some(version_bytes) => {
124                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
125            }
126            None => {
127                // if version is none, but db is not empty
128                // patch 220464f
129                if self.is_non_empty_rdb(db) {
130                    return Ordering::Less;
131                } else {
132                    return Ordering::Equal;
133                }
134            }
135        };
136        debug!("Current database version [{}]", db_version);
137
138        let migrations = self
139            .migrations
140            .values()
141            .filter(|m| include_background || !m.run_in_background());
142
143        let latest_version = migrations
144            .last()
145            .unwrap_or_else(|| panic!("should have at least one version"))
146            .version();
147        debug!("Latest database version [{}]", latest_version);
148
149        db_version.as_str().cmp(latest_version)
150    }
151
152    /// Check if the migrations will consume a lot of time.
153    pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
154        let db_version = match db
155            .get_pinned_default(MIGRATION_VERSION_KEY)
156            .expect("get the version of database")
157        {
158            Some(version_bytes) => {
159                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
160            }
161            None => {
162                // if version is none, but db is not empty
163                // patch 220464f
164                return self.is_non_empty_rdb(db);
165            }
166        };
167
168        let migrations = self
169            .migrations
170            .values()
171            .filter(|m| include_background || !m.run_in_background());
172
173        migrations
174            .skip_while(|m| m.version() <= db_version.as_str())
175            .any(|m| m.expensive())
176    }
177
178    /// Check if all the pending migrations will be executed in background.
179    pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
180        let db_version = match db
181            .get_pinned_default(MIGRATION_VERSION_KEY)
182            .expect("get the version of database")
183        {
184            Some(version_bytes) => {
185                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
186            }
187            None => {
188                // if version is none, but db is not empty
189                // patch 220464f
190                return self.is_non_empty_rdb(db);
191            }
192        };
193
194        self.migrations
195            .values()
196            .skip_while(|m| m.version() <= db_version.as_str())
197            .all(|m| m.run_in_background())
198    }
199
200    fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
201        if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
202            if v.is_some() {
203                return true;
204            }
205        }
206        false
207    }
208
209    fn is_non_empty_db(&self, db: &RocksDB) -> bool {
210        if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
211            if v.is_some() {
212                return true;
213            }
214        }
215        false
216    }
217
218    fn run_migrate(&self, mut db: RocksDB, v: &str) -> Result<RocksDB, Error> {
219        let mpb = Arc::new(MultiProgress::new());
220        let migrations: BTreeMap<_, _> = self
221            .migrations
222            .iter()
223            .filter(|(mv, _)| mv.as_str() > v)
224            .collect();
225        let migrations_count = migrations.len();
226        for (idx, (_, m)) in migrations.iter().enumerate() {
227            let mpbc = Arc::clone(&mpb);
228            let pb = move |count: u64| -> ProgressBar {
229                let pb = mpbc.add(ProgressBar::new(count));
230                pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
231                pb.set_prefix(format!("[{}/{}]", idx + 1, migrations_count));
232                pb
233            };
234            db = m.migrate(db, Arc::new(pb))?;
235            db.put_default(MIGRATION_VERSION_KEY, m.version())
236                .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
237        }
238        mpb.join_and_clear().expect("MultiProgress join");
239        Ok(db)
240    }
241
242    fn run_migrate_async(&self, db: RocksDB, v: &str) {
243        let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
244            .migrations
245            .iter()
246            .filter(|(mv, _)| mv.as_str() > v)
247            .map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
248            .collect::<VecDeque<_>>();
249
250        let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
251        let tasks = Arc::new(Mutex::new(migrations));
252        let (tx, rx) = unbounded();
253        let worker = MigrationWorker::new(tasks, db, rx);
254
255        let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
256        let clone = v.to_string();
257        let tx_clone = tx.clone();
258        let notifier = thread::spawn(move || {
259            let _ = exit_signal.recv();
260            let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
261            let _ = tx_clone.send(Command::Stop);
262            info!("set shutdown flag to true: {:?} version: {}", res, clone);
263        });
264        register_thread("migration-notifier", notifier);
265
266        let handler = worker.start();
267        if all_can_resume {
268            info!("register thread: migration ....");
269            register_thread("migration", handler);
270        }
271        tx.send(Command::Start).expect("send start command");
272    }
273
274    fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
275        let raw = db
276            .get_pinned_default(MIGRATION_VERSION_KEY)
277            .map_err(|err| {
278                internal_error(format!("failed to get the version of database: {err}"))
279            })?;
280
281        Ok(raw.map(|version_bytes| {
282            String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
283        }))
284    }
285
286    /// Initial db version
287    pub fn init_db_version(&self, db: &RocksDB) -> Result<(), Error> {
288        let db_version = self.get_migration_version(db)?;
289        if db_version.is_none() {
290            if let Some(m) = self.migrations.values().last() {
291                info!("Init database version {}", m.version());
292                db.put_default(MIGRATION_VERSION_KEY, m.version())
293                    .map_err(|err| {
294                        internal_error(format!("failed to migrate the database: {err}"))
295                    })?;
296            }
297        }
298        Ok(())
299    }
300
301    /// TODO(doc): @quake
302    pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
303        let db_version = self.get_migration_version(&db)?;
304        match db_version {
305            Some(ref v) => {
306                info!("Current database version {}", v);
307                self.check_migration_downgrade(v)?;
308                let db = if !run_in_background {
309                    self.run_migrate(db, v.as_str())?
310                } else {
311                    self.run_migrate_async(db.clone(), v.as_str());
312                    db
313                };
314                Ok(db)
315            }
316            None => {
317                // if version is none, but db is not empty
318                // patch 220464f
319                if self.is_non_empty_db(&db) {
320                    return self.patch_220464f(db);
321                }
322                Ok(db)
323            }
324        }
325    }
326
327    fn patch_220464f(&self, db: RocksDB) -> Result<RocksDB, Error> {
328        const V: &str = "20210609195048"; // AddExtraDataHash - 1
329        self.run_migrate(db, V)
330    }
331
332    fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
333        if let Some(m) = self.migrations.values().last() {
334            if m.version() < cur_version {
335                error!(
336                    "Database downgrade detected. \
337                    The database schema version is newer than `ckb` schema version,\
338                    please upgrade `ckb` to the latest version"
339                );
340                return Err(internal_error(
341                    "Database downgrade is not supported".to_string(),
342                ));
343            }
344        }
345        Ok(())
346    }
347}
348
349/// TODO(doc): @quake
350pub trait Migration: Send + Sync {
351    /// TODO(doc): @quake
352    fn migrate(
353        &self,
354        _db: RocksDB,
355        _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
356    ) -> Result<RocksDB, Error>;
357
358    /// returns migration version, use `date +'%Y%m%d%H%M%S'` timestamp format
359    fn version(&self) -> &str;
360
361    /// Will cost a lot of time to perform this migration operation.
362    ///
363    /// Override this function for `Migrations` which could be executed very fast.
364    fn expensive(&self) -> bool {
365        true
366    }
367
368    /// Will this migration be executed in background.
369    fn run_in_background(&self) -> bool {
370        false
371    }
372
373    /// Check if the background migration should be stopped.
374    /// If a migration need to implement the recovery logic, it should check this flag periodically,
375    /// store the migration progress when exiting and recover from the current progress when restarting.
376    fn stop_background(&self) -> bool {
377        *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
378    }
379
380    /// Check if the background migration can be resumed.
381    ///
382    /// If a migration can be resumed, it should implement the recovery logic in `migrate` function.
383    /// and the `MigirateWorker` will add the migration's handler with `register_thread`, so that then
384    /// main thread can wait for the background migration to store the progress and exit.
385    ///
386    /// Otherwise, the migration will be restarted from the beginning.
387    ///
388    fn can_resume(&self) -> bool {
389        false
390    }
391}
392
393/// TODO(doc): @quake
394pub struct DefaultMigration {
395    version: String,
396}
397
398impl DefaultMigration {
399    /// TODO(doc): @quake
400    pub fn new(version: &str) -> Self {
401        Self {
402            version: version.to_string(),
403        }
404    }
405}
406
407impl Migration for DefaultMigration {
408    fn migrate(
409        &self,
410        db: RocksDB,
411        _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
412    ) -> Result<RocksDB, Error> {
413        Ok(db)
414    }
415
416    fn version(&self) -> &str {
417        &self.version
418    }
419
420    fn expensive(&self) -> bool {
421        false
422    }
423}