Skip to main content

ckb_db_migration/
lib.rs

1//! Database migration framework.
2//!
3//! This crate provides a migration framework for managing database schema changes
4//! and data transformations across different versions of CKB.
5use ckb_channel::Receiver;
6use ckb_channel::select;
7use ckb_channel::unbounded;
8use ckb_db::{ReadOnlyDB, RocksDB};
9use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
10use ckb_error::{Error, InternalErrorKind};
11use ckb_logger::{debug, error, info};
12use ckb_stop_handler::register_thread;
13pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
14use std::cmp::Ordering;
15use std::collections::BTreeMap;
16use std::collections::VecDeque;
17use std::sync::Arc;
18use std::sync::Mutex;
19use std::sync::OnceLock;
20use std::thread;
21use std::thread::JoinHandle;
22
23/// Shutdown flag for background migration.
24pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();
25
26#[cfg(test)]
27mod tests;
28
29fn internal_error(reason: String) -> Error {
30    InternalErrorKind::Database.other(reason).into()
31}
32
33/// Collection of database migrations.
34#[derive(Default)]
35pub struct Migrations {
36    migrations: BTreeMap<String, Arc<dyn Migration>>,
37}
38
39/// Commands
40#[derive(PartialEq, Eq, Debug)]
41enum Command {
42    Start,
43    Stop,
44}
45
46type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
47struct MigrationWorker {
48    tasks: Arc<Mutex<MigrationTasks>>,
49    db: RocksDB,
50    inbox: Receiver<Command>,
51}
52
53impl MigrationWorker {
54    pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
55        Self { tasks, db, inbox }
56    }
57
58    pub fn start(self) -> JoinHandle<()> {
59        thread::spawn(move || {
60            if let Ok(Command::Start) = self.inbox.recv() {
61                let mut idx = 0;
62                let migrations_count = self.tasks.lock().unwrap().len() as u64;
63                let mpb = Arc::new(MultiProgress::new());
64
65                while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
66                    select! {
67                        recv(self.inbox) -> msg => {
68                            if let Ok(Command::Stop) = msg {
69                                eprintln!("stop to run migrate in background: {}", name);
70                                break;
71                            }
72                        }
73                        default => {
74                            eprintln!("start to run migrate in background: {}", name);
75                            let mpbc = Arc::clone(&mpb);
76                            idx += 1;
77                            let pb = move |count: u64| -> ProgressBar {
78                                let pb = mpbc.add(ProgressBar::new(count));
79                                pb.set_draw_target(ProgressDrawTarget::stderr());
80                                pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
81                                pb
82                            };
83                            if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
84                                db.put_default(MIGRATION_VERSION_KEY, task.version())
85                                .map_err(|err| {
86                                    internal_error(format!("failed to migrate the database: {err}"))
87                                })
88                                .unwrap();
89                            }
90                        }
91                    }
92                }
93            }
94        })
95    }
96}
97
98impl Migrations {
99    /// Creates a new empty migrations collection.
100    pub fn new() -> Self {
101        Migrations {
102            migrations: BTreeMap::new(),
103        }
104    }
105
106    /// Adds a migration to the collection.
107    pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
108        self.migrations
109            .insert(migration.version().to_string(), migration);
110    }
111
112    /// Check if database's version is matched with the executable binary version.
113    ///
114    /// Returns
115    /// - Less: The database version is less than the matched version of the executable binary.
116    ///   Requires migration.
117    /// - Equal: The database version is matched with the executable binary version.
118    /// - Greater: The database version is greater than the matched version of the executable binary.
119    ///   Requires upgrade the executable binary.
120    pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
121        let db_version = match db
122            .get_pinned_default(MIGRATION_VERSION_KEY)
123            .expect("get the version of database")
124        {
125            Some(version_bytes) => {
126                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
127            }
128            None => {
129                // if version is none, but db is not empty
130                // patch 220464f
131                if self.is_non_empty_rdb(db) {
132                    return Ordering::Less;
133                } else {
134                    return Ordering::Equal;
135                }
136            }
137        };
138        debug!("Current database version [{}]", db_version);
139
140        let mut migrations = self
141            .migrations
142            .values()
143            .filter(|m| include_background || !m.run_in_background());
144
145        let latest_version = migrations
146            .next_back()
147            .unwrap_or_else(|| panic!("should have at least one version"))
148            .version();
149        debug!("Latest database version [{}]", latest_version);
150
151        db_version.as_str().cmp(latest_version)
152    }
153
154    /// Check if the migrations will consume a lot of time.
155    pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
156        let db_version = match db
157            .get_pinned_default(MIGRATION_VERSION_KEY)
158            .expect("get the version of database")
159        {
160            Some(version_bytes) => {
161                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
162            }
163            None => {
164                // if version is none, but db is not empty
165                // patch 220464f
166                return self.is_non_empty_rdb(db);
167            }
168        };
169
170        let migrations = self
171            .migrations
172            .values()
173            .filter(|m| include_background || !m.run_in_background());
174
175        migrations
176            .skip_while(|m| m.version() <= db_version.as_str())
177            .any(|m| m.expensive())
178    }
179
180    /// Check if all the pending migrations will be executed in background.
181    pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
182        let db_version = match db
183            .get_pinned_default(MIGRATION_VERSION_KEY)
184            .expect("get the version of database")
185        {
186            Some(version_bytes) => {
187                String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
188            }
189            None => {
190                // if version is none, but db is not empty
191                // patch 220464f
192                return self.is_non_empty_rdb(db);
193            }
194        };
195
196        self.migrations
197            .values()
198            .skip_while(|m| m.version() <= db_version.as_str())
199            .all(|m| m.run_in_background())
200    }
201
202    fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
203        if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY)
204            && v.is_some()
205        {
206            return true;
207        }
208        false
209    }
210
211    fn is_non_empty_db(&self, db: &RocksDB) -> bool {
212        if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY)
213            && v.is_some()
214        {
215            return true;
216        }
217        false
218    }
219
220    fn run_migrate(&self, mut db: RocksDB, v: &str) -> Result<RocksDB, Error> {
221        let mpb = Arc::new(MultiProgress::new());
222        let migrations: BTreeMap<_, _> = self
223            .migrations
224            .iter()
225            .filter(|(mv, _)| mv.as_str() > v)
226            .collect();
227        let migrations_count = migrations.len();
228        for (idx, (_, m)) in migrations.iter().enumerate() {
229            let mpbc = Arc::clone(&mpb);
230            let pb = move |count: u64| -> ProgressBar {
231                let pb = mpbc.add(ProgressBar::new(count));
232                pb.set_draw_target(ProgressDrawTarget::stderr());
233                pb.set_prefix(format!("[{}/{}]", idx + 1, migrations_count));
234                pb
235            };
236            db = m.migrate(db, Arc::new(pb))?;
237            db.put_default(MIGRATION_VERSION_KEY, m.version())
238                .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
239        }
240        Ok(db)
241    }
242
243    fn run_migrate_async(&self, db: RocksDB, v: &str) {
244        let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
245            .migrations
246            .iter()
247            .filter(|(mv, _)| mv.as_str() > v)
248            .map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
249            .collect::<VecDeque<_>>();
250
251        let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
252        let tasks = Arc::new(Mutex::new(migrations));
253        let (tx, rx) = unbounded();
254        let worker = MigrationWorker::new(tasks, db, rx);
255
256        let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
257        let clone = v.to_string();
258        let tx_clone = tx.clone();
259        let notifier = thread::spawn(move || {
260            let _ = exit_signal.recv();
261            let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
262            let _ = tx_clone.send(Command::Stop);
263            info!("set shutdown flag to true: {:?} version: {}", res, clone);
264        });
265        register_thread("migration-notifier", notifier);
266
267        let handler = worker.start();
268        if all_can_resume {
269            info!("register thread: migration ....");
270            register_thread("migration", handler);
271        }
272        tx.send(Command::Start).expect("send start command");
273    }
274
275    fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
276        let raw = db
277            .get_pinned_default(MIGRATION_VERSION_KEY)
278            .map_err(|err| {
279                internal_error(format!("failed to get the version of database: {err}"))
280            })?;
281
282        Ok(raw.map(|version_bytes| {
283            String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
284        }))
285    }
286
287    /// Initial db version
288    pub fn init_db_version(&self, db: &RocksDB) -> Result<(), Error> {
289        let db_version = self.get_migration_version(db)?;
290        if db_version.is_none()
291            && let Some(m) = self.migrations.values().last()
292        {
293            info!("Init database version {}", m.version());
294            db.put_default(MIGRATION_VERSION_KEY, m.version())
295                .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
296        }
297        Ok(())
298    }
299
300    /// Runs pending migrations on the database.
301    ///
302    /// If `run_in_background` is true, long-running migrations will be executed asynchronously.
303    pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
304        let db_version = self.get_migration_version(&db)?;
305        match db_version {
306            Some(ref v) => {
307                info!("Current database version {}", v);
308                self.check_migration_downgrade(v)?;
309                let db = if !run_in_background {
310                    self.run_migrate(db, v.as_str())?
311                } else {
312                    self.run_migrate_async(db.clone(), v.as_str());
313                    db
314                };
315                Ok(db)
316            }
317            None => {
318                // if version is none, but db is not empty
319                // patch 220464f
320                if self.is_non_empty_db(&db) {
321                    return self.patch_220464f(db);
322                }
323                Ok(db)
324            }
325        }
326    }
327
328    fn patch_220464f(&self, db: RocksDB) -> Result<RocksDB, Error> {
329        const V: &str = "20210609195048"; // AddExtraDataHash - 1
330        self.run_migrate(db, V)
331    }
332
333    fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
334        if let Some(m) = self.migrations.values().last()
335            && m.version() < cur_version
336        {
337            error!(
338                "Database downgrade detected. \
339                    The database schema version is newer than `ckb` schema version,\
340                    please upgrade `ckb` to the latest version"
341            );
342            return Err(internal_error(
343                "Database downgrade is not supported".to_string(),
344            ));
345        }
346        Ok(())
347    }
348}
349
350/// Trait for database migrations.
351pub trait Migration: Send + Sync {
352    /// Executes the migration on the database.
353    ///
354    /// The `pb` parameter is a progress bar factory for tracking migration progress.
355    fn migrate(
356        &self,
357        _db: RocksDB,
358        _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
359    ) -> Result<RocksDB, Error>;
360
361    /// returns migration version, use `date +'%Y%m%d%H%M%S'` timestamp format
362    fn version(&self) -> &str;
363
364    /// Will cost a lot of time to perform this migration operation.
365    ///
366    /// Override this function for `Migrations` which could be executed very fast.
367    fn expensive(&self) -> bool {
368        true
369    }
370
371    /// Will this migration be executed in background.
372    fn run_in_background(&self) -> bool {
373        false
374    }
375
376    /// Check if the background migration should be stopped.
377    /// If a migration need to implement the recovery logic, it should check this flag periodically,
378    /// store the migration progress when exiting and recover from the current progress when restarting.
379    fn stop_background(&self) -> bool {
380        *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
381    }
382
383    /// Check if the background migration can be resumed.
384    ///
385    /// If a migration can be resumed, it should implement the recovery logic in `migrate` function.
386    /// and the `MigirateWorker` will add the migration's handler with `register_thread`, so that then
387    /// main thread can wait for the background migration to store the progress and exit.
388    ///
389    /// Otherwise, the migration will be restarted from the beginning.
390    ///
391    fn can_resume(&self) -> bool {
392        false
393    }
394}
395
396/// Default migration implementation that does nothing.
397pub struct DefaultMigration {
398    version: String,
399}
400
401impl DefaultMigration {
402    /// Creates a new default migration with the given version string.
403    pub fn new(version: &str) -> Self {
404        Self {
405            version: version.to_string(),
406        }
407    }
408}
409
410impl Migration for DefaultMigration {
411    fn migrate(
412        &self,
413        db: RocksDB,
414        _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
415    ) -> Result<RocksDB, Error> {
416        Ok(db)
417    }
418
419    fn version(&self) -> &str {
420        &self.version
421    }
422
423    fn expensive(&self) -> bool {
424        false
425    }
426}