1use ckb_channel::Receiver;
6use ckb_channel::select;
7use ckb_channel::unbounded;
8use ckb_db::{ReadOnlyDB, RocksDB};
9use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
10use ckb_error::{Error, InternalErrorKind};
11use ckb_logger::{debug, error, info};
12use ckb_stop_handler::register_thread;
13use console::Term;
14pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
15use std::cmp::Ordering;
16use std::collections::BTreeMap;
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::sync::Mutex;
20use std::sync::OnceLock;
21use std::thread;
22use std::thread::JoinHandle;
23
24pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();
26
27#[cfg(test)]
28mod tests;
29
30fn internal_error(reason: String) -> Error {
31 InternalErrorKind::Database.other(reason).into()
32}
33
34#[derive(Default)]
36pub struct Migrations {
37 migrations: BTreeMap<String, Arc<dyn Migration>>,
38}
39
40#[derive(PartialEq, Eq, Debug)]
42enum Command {
43 Start,
44 Stop,
45}
46
47type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
48struct MigrationWorker {
49 tasks: Arc<Mutex<MigrationTasks>>,
50 db: RocksDB,
51 inbox: Receiver<Command>,
52}
53
54impl MigrationWorker {
55 pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
56 Self { tasks, db, inbox }
57 }
58
59 pub fn start(self) -> JoinHandle<()> {
60 thread::spawn(move || {
61 if let Ok(Command::Start) = self.inbox.recv() {
62 let mut idx = 0;
63 let migrations_count = self.tasks.lock().unwrap().len() as u64;
64 let mpb = Arc::new(MultiProgress::new());
65
66 while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
67 select! {
68 recv(self.inbox) -> msg => {
69 if let Ok(Command::Stop) = msg {
70 eprintln!("stop to run migrate in background: {}", name);
71 break;
72 }
73 }
74 default => {
75 eprintln!("start to run migrate in background: {}", name);
76 let mpbc = Arc::clone(&mpb);
77 idx += 1;
78 let pb = move |count: u64| -> ProgressBar {
79 let pb = mpbc.add(ProgressBar::new(count));
80 pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), 20));
81 pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
82 pb
83 };
84 if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
85 db.put_default(MIGRATION_VERSION_KEY, task.version())
86 .map_err(|err| {
87 internal_error(format!("failed to migrate the database: {err}"))
88 })
89 .unwrap();
90 }
91 }
92 }
93 }
94 }
95 })
96 }
97}
98
99impl Migrations {
100 pub fn new() -> Self {
102 Migrations {
103 migrations: BTreeMap::new(),
104 }
105 }
106
107 pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
109 self.migrations
110 .insert(migration.version().to_string(), migration);
111 }
112
113 pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
122 let db_version = match db
123 .get_pinned_default(MIGRATION_VERSION_KEY)
124 .expect("get the version of database")
125 {
126 Some(version_bytes) => {
127 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
128 }
129 None => {
130 if self.is_non_empty_rdb(db) {
133 return Ordering::Less;
134 } else {
135 return Ordering::Equal;
136 }
137 }
138 };
139 debug!("Current database version [{}]", db_version);
140
141 let migrations = self
142 .migrations
143 .values()
144 .filter(|m| include_background || !m.run_in_background());
145
146 let latest_version = migrations
147 .last()
148 .unwrap_or_else(|| panic!("should have at least one version"))
149 .version();
150 debug!("Latest database version [{}]", latest_version);
151
152 db_version.as_str().cmp(latest_version)
153 }
154
155 pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
157 let db_version = match db
158 .get_pinned_default(MIGRATION_VERSION_KEY)
159 .expect("get the version of database")
160 {
161 Some(version_bytes) => {
162 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
163 }
164 None => {
165 return self.is_non_empty_rdb(db);
168 }
169 };
170
171 let migrations = self
172 .migrations
173 .values()
174 .filter(|m| include_background || !m.run_in_background());
175
176 migrations
177 .skip_while(|m| m.version() <= db_version.as_str())
178 .any(|m| m.expensive())
179 }
180
181 pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
183 let db_version = match db
184 .get_pinned_default(MIGRATION_VERSION_KEY)
185 .expect("get the version of database")
186 {
187 Some(version_bytes) => {
188 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
189 }
190 None => {
191 return self.is_non_empty_rdb(db);
194 }
195 };
196
197 self.migrations
198 .values()
199 .skip_while(|m| m.version() <= db_version.as_str())
200 .all(|m| m.run_in_background())
201 }
202
203 fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
204 if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
205 if v.is_some() {
206 return true;
207 }
208 }
209 false
210 }
211
212 fn is_non_empty_db(&self, db: &RocksDB) -> bool {
213 if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
214 if v.is_some() {
215 return true;
216 }
217 }
218 false
219 }
220
221 fn run_migrate(&self, mut db: RocksDB, v: &str) -> Result<RocksDB, Error> {
222 let mpb = Arc::new(MultiProgress::new());
223 let migrations: BTreeMap<_, _> = self
224 .migrations
225 .iter()
226 .filter(|(mv, _)| mv.as_str() > v)
227 .collect();
228 let migrations_count = migrations.len();
229 for (idx, (_, m)) in migrations.iter().enumerate() {
230 let mpbc = Arc::clone(&mpb);
231 let pb = move |count: u64| -> ProgressBar {
232 let pb = mpbc.add(ProgressBar::new(count));
233 pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), 20));
234 pb.set_prefix(format!("[{}/{}]", idx + 1, migrations_count));
235 pb
236 };
237 db = m.migrate(db, Arc::new(pb))?;
238 db.put_default(MIGRATION_VERSION_KEY, m.version())
239 .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
240 }
241 Ok(db)
242 }
243
244 fn run_migrate_async(&self, db: RocksDB, v: &str) {
245 let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
246 .migrations
247 .iter()
248 .filter(|(mv, _)| mv.as_str() > v)
249 .map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
250 .collect::<VecDeque<_>>();
251
252 let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
253 let tasks = Arc::new(Mutex::new(migrations));
254 let (tx, rx) = unbounded();
255 let worker = MigrationWorker::new(tasks, db, rx);
256
257 let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
258 let clone = v.to_string();
259 let tx_clone = tx.clone();
260 let notifier = thread::spawn(move || {
261 let _ = exit_signal.recv();
262 let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
263 let _ = tx_clone.send(Command::Stop);
264 info!("set shutdown flag to true: {:?} version: {}", res, clone);
265 });
266 register_thread("migration-notifier", notifier);
267
268 let handler = worker.start();
269 if all_can_resume {
270 info!("register thread: migration ....");
271 register_thread("migration", handler);
272 }
273 tx.send(Command::Start).expect("send start command");
274 }
275
276 fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
277 let raw = db
278 .get_pinned_default(MIGRATION_VERSION_KEY)
279 .map_err(|err| {
280 internal_error(format!("failed to get the version of database: {err}"))
281 })?;
282
283 Ok(raw.map(|version_bytes| {
284 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
285 }))
286 }
287
288 pub fn init_db_version(&self, db: &RocksDB) -> Result<(), Error> {
290 let db_version = self.get_migration_version(db)?;
291 if db_version.is_none() {
292 if let Some(m) = self.migrations.values().last() {
293 info!("Init database version {}", m.version());
294 db.put_default(MIGRATION_VERSION_KEY, m.version())
295 .map_err(|err| {
296 internal_error(format!("failed to migrate the database: {err}"))
297 })?;
298 }
299 }
300 Ok(())
301 }
302
303 pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
307 let db_version = self.get_migration_version(&db)?;
308 match db_version {
309 Some(ref v) => {
310 info!("Current database version {}", v);
311 self.check_migration_downgrade(v)?;
312 let db = if !run_in_background {
313 self.run_migrate(db, v.as_str())?
314 } else {
315 self.run_migrate_async(db.clone(), v.as_str());
316 db
317 };
318 Ok(db)
319 }
320 None => {
321 if self.is_non_empty_db(&db) {
324 return self.patch_220464f(db);
325 }
326 Ok(db)
327 }
328 }
329 }
330
331 fn patch_220464f(&self, db: RocksDB) -> Result<RocksDB, Error> {
332 const V: &str = "20210609195048"; self.run_migrate(db, V)
334 }
335
336 fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
337 if let Some(m) = self.migrations.values().last() {
338 if m.version() < cur_version {
339 error!(
340 "Database downgrade detected. \
341 The database schema version is newer than `ckb` schema version,\
342 please upgrade `ckb` to the latest version"
343 );
344 return Err(internal_error(
345 "Database downgrade is not supported".to_string(),
346 ));
347 }
348 }
349 Ok(())
350 }
351}
352
353pub trait Migration: Send + Sync {
355 fn migrate(
359 &self,
360 _db: RocksDB,
361 _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
362 ) -> Result<RocksDB, Error>;
363
364 fn version(&self) -> &str;
366
367 fn expensive(&self) -> bool {
371 true
372 }
373
374 fn run_in_background(&self) -> bool {
376 false
377 }
378
379 fn stop_background(&self) -> bool {
383 *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
384 }
385
386 fn can_resume(&self) -> bool {
395 false
396 }
397}
398
399pub struct DefaultMigration {
401 version: String,
402}
403
404impl DefaultMigration {
405 pub fn new(version: &str) -> Self {
407 Self {
408 version: version.to_string(),
409 }
410 }
411}
412
413impl Migration for DefaultMigration {
414 fn migrate(
415 &self,
416 db: RocksDB,
417 _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
418 ) -> Result<RocksDB, Error> {
419 Ok(db)
420 }
421
422 fn version(&self) -> &str {
423 &self.version
424 }
425
426 fn expensive(&self) -> bool {
427 false
428 }
429}