ckb_db_migration/
lib.rs

1//! Database migration framework.
2//!
3//! This crate provides a migration framework for managing database schema changes
4//! and data transformations across different versions of CKB.
5use ckb_channel::Receiver;
6use ckb_channel::select;
7use ckb_channel::unbounded;
8use ckb_db::{ReadOnlyDB, RocksDB};
9use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
10use ckb_error::{Error, InternalErrorKind};
11use ckb_logger::{debug, error, info};
12use ckb_stop_handler::register_thread;
13use console::Term;
14pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
15use std::cmp::Ordering;
16use std::collections::BTreeMap;
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21use std::thread;
22use std::thread::JoinHandle;
23
24/// Shutdown flag for background migration.
25pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();
26
27#[cfg(test)]
28mod tests;
29
30fn internal_error(reason: String) -> Error {
31    InternalErrorKind::Database.other(reason).into()
32}
33
34/// Collection of database migrations.
35#[derive(Default)]
36pub struct Migrations {
37    migrations: BTreeMap<String, Arc<dyn Migration>>,
38}
39
40/// Commands
41#[derive(PartialEq, Eq, Debug)]
42enum Command {
43    Start,
44    Stop,
45}
46
47type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
48struct MigrationWorker {
49    tasks: Arc<Mutex<MigrationTasks>>,
50    db: RocksDB,
51    inbox: Receiver<Command>,
52}
53
54impl MigrationWorker {
55    pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
56        Self { tasks, db, inbox }
57    }
58
59    pub fn start(self) -> JoinHandle<()> {
60        thread::spawn(move || {
61            if let Ok(Command::Start) = self.inbox.recv() {
62                let mut idx = 0;
63                let migrations_count = self.tasks.lock().unwrap().len() as u64;
64                let mpb = Arc::new(MultiProgress::new());
65
66                while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
67                    select! {
68                        recv(self.inbox) -> msg => {
69                            if let Ok(Command::Stop) = msg {
70                                eprintln!("stop to run migrate in background: {}", name);
71                                break;
72                            }
73                        }
74                        default => {
75                            eprintln!("start to run migrate in background: {}", name);
76                            let mpbc = Arc::clone(&mpb);
77                            idx += 1;
78                            let pb = move |count: u64| -> ProgressBar {
79                                let pb = mpbc.add(ProgressBar::new(count));
80                                pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), 20));
81                                pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
82                                pb
83                            };
84                            if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
85                                db.put_default(MIGRATION_VERSION_KEY, task.version())
86                                .map_err(|err| {
87                                    internal_error(format!("failed to migrate the database: {err}"))
88                                })
89                                .unwrap();
90                            }
91                        }
92                    }
93                }
94            }
95        })
96    }
97}
98
99impl Migrations {
100    /// Creates a new empty migrations collection.
101    pub fn new() -> Self {
102        Migrations {
103            migrations: BTreeMap::new(),
104        }
105    }
106
107    /// Adds a migration to the collection.
108    pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
109        self.migrations
110            .insert(migration.version().to_string(), migration);
111    }
112
113    /// Check if database's version is matched with the executable binary version.
114    ///
115    /// Returns
116    /// - Less: The database version is less than the matched version of the executable binary.
117    ///   Requires migration.
118    /// - Equal: The database version is matched with the executable binary version.
119    /// - Greater: The database version is greater than the matched version of the executable binary.
120    ///   Requires upgrade the executable binary.
121    pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
122        let db_version = match db
123            .get_pinned_default(MIGRATION_VERSION_KEY)
124            .expect("get the version of database")
125        {
126            Some(version_bytes) => {
127                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
128            }
129            None => {
130                // if version is none, but db is not empty
131                // patch 220464f
132                if self.is_non_empty_rdb(db) {
133                    return Ordering::Less;
134                } else {
135                    return Ordering::Equal;
136                }
137            }
138        };
139        debug!("Current database version [{}]", db_version);
140
141        let migrations = self
142            .migrations
143            .values()
144            .filter(|m| include_background || !m.run_in_background());
145
146        let latest_version = migrations
147            .last()
148            .unwrap_or_else(|| panic!("should have at least one version"))
149            .version();
150        debug!("Latest database version [{}]", latest_version);
151
152        db_version.as_str().cmp(latest_version)
153    }
154
155    /// Check if the migrations will consume a lot of time.
156    pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
157        let db_version = match db
158            .get_pinned_default(MIGRATION_VERSION_KEY)
159            .expect("get the version of database")
160        {
161            Some(version_bytes) => {
162                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
163            }
164            None => {
165                // if version is none, but db is not empty
166                // patch 220464f
167                return self.is_non_empty_rdb(db);
168            }
169        };
170
171        let migrations = self
172            .migrations
173            .values()
174            .filter(|m| include_background || !m.run_in_background());
175
176        migrations
177            .skip_while(|m| m.version() <= db_version.as_str())
178            .any(|m| m.expensive())
179    }
180
181    /// Check if all the pending migrations will be executed in background.
182    pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
183        let db_version = match db
184            .get_pinned_default(MIGRATION_VERSION_KEY)
185            .expect("get the version of database")
186        {
187            Some(version_bytes) => {
188                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
189            }
190            None => {
191                // if version is none, but db is not empty
192                // patch 220464f
193                return self.is_non_empty_rdb(db);
194            }
195        };
196
197        self.migrations
198            .values()
199            .skip_while(|m| m.version() <= db_version.as_str())
200            .all(|m| m.run_in_background())
201    }
202
203    fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
204        if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
205            if v.is_some() {
206                return true;
207            }
208        }
209        false
210    }
211
212    fn is_non_empty_db(&self, db: &RocksDB) -> bool {
213        if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
214            if v.is_some() {
215                return true;
216            }
217        }
218        false
219    }
220
221    fn run_migrate(&self, mut db: RocksDB, v: &str) -> Result<RocksDB, Error> {
222        let mpb = Arc::new(MultiProgress::new());
223        let migrations: BTreeMap<_, _> = self
224            .migrations
225            .iter()
226            .filter(|(mv, _)| mv.as_str() > v)
227            .collect();
228        let migrations_count = migrations.len();
229        for (idx, (_, m)) in migrations.iter().enumerate() {
230            let mpbc = Arc::clone(&mpb);
231            let pb = move |count: u64| -> ProgressBar {
232                let pb = mpbc.add(ProgressBar::new(count));
233                pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), 20));
234                pb.set_prefix(format!("[{}/{}]", idx + 1, migrations_count));
235                pb
236            };
237            db = m.migrate(db, Arc::new(pb))?;
238            db.put_default(MIGRATION_VERSION_KEY, m.version())
239                .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
240        }
241        Ok(db)
242    }
243
244    fn run_migrate_async(&self, db: RocksDB, v: &str) {
245        let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
246            .migrations
247            .iter()
248            .filter(|(mv, _)| mv.as_str() > v)
249            .map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
250            .collect::<VecDeque<_>>();
251
252        let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
253        let tasks = Arc::new(Mutex::new(migrations));
254        let (tx, rx) = unbounded();
255        let worker = MigrationWorker::new(tasks, db, rx);
256
257        let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
258        let clone = v.to_string();
259        let tx_clone = tx.clone();
260        let notifier = thread::spawn(move || {
261            let _ = exit_signal.recv();
262            let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
263            let _ = tx_clone.send(Command::Stop);
264            info!("set shutdown flag to true: {:?} version: {}", res, clone);
265        });
266        register_thread("migration-notifier", notifier);
267
268        let handler = worker.start();
269        if all_can_resume {
270            info!("register thread: migration ....");
271            register_thread("migration", handler);
272        }
273        tx.send(Command::Start).expect("send start command");
274    }
275
276    fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
277        let raw = db
278            .get_pinned_default(MIGRATION_VERSION_KEY)
279            .map_err(|err| {
280                internal_error(format!("failed to get the version of database: {err}"))
281            })?;
282
283        Ok(raw.map(|version_bytes| {
284            String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
285        }))
286    }
287
288    /// Initial db version
289    pub fn init_db_version(&self, db: &RocksDB) -> Result<(), Error> {
290        let db_version = self.get_migration_version(db)?;
291        if db_version.is_none() {
292            if let Some(m) = self.migrations.values().last() {
293                info!("Init database version {}", m.version());
294                db.put_default(MIGRATION_VERSION_KEY, m.version())
295                    .map_err(|err| {
296                        internal_error(format!("failed to migrate the database: {err}"))
297                    })?;
298            }
299        }
300        Ok(())
301    }
302
303    /// Runs pending migrations on the database.
304    ///
305    /// If `run_in_background` is true, long-running migrations will be executed asynchronously.
306    pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
307        let db_version = self.get_migration_version(&db)?;
308        match db_version {
309            Some(ref v) => {
310                info!("Current database version {}", v);
311                self.check_migration_downgrade(v)?;
312                let db = if !run_in_background {
313                    self.run_migrate(db, v.as_str())?
314                } else {
315                    self.run_migrate_async(db.clone(), v.as_str());
316                    db
317                };
318                Ok(db)
319            }
320            None => {
321                // if version is none, but db is not empty
322                // patch 220464f
323                if self.is_non_empty_db(&db) {
324                    return self.patch_220464f(db);
325                }
326                Ok(db)
327            }
328        }
329    }
330
331    fn patch_220464f(&self, db: RocksDB) -> Result<RocksDB, Error> {
332        const V: &str = "20210609195048"; // AddExtraDataHash - 1
333        self.run_migrate(db, V)
334    }
335
336    fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
337        if let Some(m) = self.migrations.values().last() {
338            if m.version() < cur_version {
339                error!(
340                    "Database downgrade detected. \
341                    The database schema version is newer than `ckb` schema version,\
342                    please upgrade `ckb` to the latest version"
343                );
344                return Err(internal_error(
345                    "Database downgrade is not supported".to_string(),
346                ));
347            }
348        }
349        Ok(())
350    }
351}
352
353/// Trait for database migrations.
354pub trait Migration: Send + Sync {
355    /// Executes the migration on the database.
356    ///
357    /// The `pb` parameter is a progress bar factory for tracking migration progress.
358    fn migrate(
359        &self,
360        _db: RocksDB,
361        _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
362    ) -> Result<RocksDB, Error>;
363
364    /// returns migration version, use `date +'%Y%m%d%H%M%S'` timestamp format
365    fn version(&self) -> &str;
366
367    /// Will cost a lot of time to perform this migration operation.
368    ///
369    /// Override this function for `Migrations` which could be executed very fast.
370    fn expensive(&self) -> bool {
371        true
372    }
373
374    /// Will this migration be executed in background.
375    fn run_in_background(&self) -> bool {
376        false
377    }
378
379    /// Check if the background migration should be stopped.
380    /// If a migration need to implement the recovery logic, it should check this flag periodically,
381    /// store the migration progress when exiting and recover from the current progress when restarting.
382    fn stop_background(&self) -> bool {
383        *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
384    }
385
386    /// Check if the background migration can be resumed.
387    ///
388    /// If a migration can be resumed, it should implement the recovery logic in `migrate` function.
389    /// and the `MigirateWorker` will add the migration's handler with `register_thread`, so that then
390    /// main thread can wait for the background migration to store the progress and exit.
391    ///
392    /// Otherwise, the migration will be restarted from the beginning.
393    ///
394    fn can_resume(&self) -> bool {
395        false
396    }
397}
398
399/// Default migration implementation that does nothing.
400pub struct DefaultMigration {
401    version: String,
402}
403
404impl DefaultMigration {
405    /// Creates a new default migration with the given version string.
406    pub fn new(version: &str) -> Self {
407        Self {
408            version: version.to_string(),
409        }
410    }
411}
412
413impl Migration for DefaultMigration {
414    fn migrate(
415        &self,
416        db: RocksDB,
417        _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
418    ) -> Result<RocksDB, Error> {
419        Ok(db)
420    }
421
422    fn version(&self) -> &str {
423        &self.version
424    }
425
426    fn expensive(&self) -> bool {
427        false
428    }
429}