1use ckb_channel::Receiver;
3use ckb_channel::select;
4use ckb_channel::unbounded;
5use ckb_db::{ReadOnlyDB, RocksDB};
6use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
7use ckb_error::{Error, InternalErrorKind};
8use ckb_logger::{debug, error, info};
9use ckb_stop_handler::register_thread;
10use console::Term;
11pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
12use std::cmp::Ordering;
13use std::collections::BTreeMap;
14use std::collections::VecDeque;
15use std::sync::Arc;
16use std::sync::Mutex;
17use std::sync::OnceLock;
18use std::thread;
19use std::thread::JoinHandle;
20
21pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();
23
24#[cfg(test)]
25mod tests;
26
27fn internal_error(reason: String) -> Error {
28 InternalErrorKind::Database.other(reason).into()
29}
30
31#[derive(Default)]
33pub struct Migrations {
34 migrations: BTreeMap<String, Arc<dyn Migration>>,
35}
36
37#[derive(PartialEq, Eq, Debug)]
39enum Command {
40 Start,
41 Stop,
42}
43
44type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
45struct MigrationWorker {
46 tasks: Arc<Mutex<MigrationTasks>>,
47 db: RocksDB,
48 inbox: Receiver<Command>,
49}
50
51impl MigrationWorker {
52 pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
53 Self { tasks, db, inbox }
54 }
55
56 pub fn start(self) -> JoinHandle<()> {
57 thread::spawn(move || {
58 if let Ok(Command::Start) = self.inbox.recv() {
59 let mut idx = 0;
60 let migrations_count = self.tasks.lock().unwrap().len() as u64;
61 let mpb = Arc::new(MultiProgress::new());
62
63 while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
64 select! {
65 recv(self.inbox) -> msg => {
66 if let Ok(Command::Stop) = msg {
67 eprintln!("stop to run migrate in background: {}", name);
68 break;
69 }
70 }
71 default => {
72 eprintln!("start to run migrate in background: {}", name);
73 let mpbc = Arc::clone(&mpb);
74 idx += 1;
75 let pb = move |count: u64| -> ProgressBar {
76 let pb = mpbc.add(ProgressBar::new(count));
77 pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
78 pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
79 pb
80 };
81 if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
82 db.put_default(MIGRATION_VERSION_KEY, task.version())
83 .map_err(|err| {
84 internal_error(format!("failed to migrate the database: {err}"))
85 })
86 .unwrap();
87 }
88 }
89 }
90 }
91 }
92 })
93 }
94}
95
96impl Migrations {
97 pub fn new() -> Self {
99 Migrations {
100 migrations: BTreeMap::new(),
101 }
102 }
103
104 pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
106 self.migrations
107 .insert(migration.version().to_string(), migration);
108 }
109
110 pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
119 let db_version = match db
120 .get_pinned_default(MIGRATION_VERSION_KEY)
121 .expect("get the version of database")
122 {
123 Some(version_bytes) => {
124 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
125 }
126 None => {
127 if self.is_non_empty_rdb(db) {
130 return Ordering::Less;
131 } else {
132 return Ordering::Equal;
133 }
134 }
135 };
136 debug!("Current database version [{}]", db_version);
137
138 let migrations = self
139 .migrations
140 .values()
141 .filter(|m| include_background || !m.run_in_background());
142
143 let latest_version = migrations
144 .last()
145 .unwrap_or_else(|| panic!("should have at least one version"))
146 .version();
147 debug!("Latest database version [{}]", latest_version);
148
149 db_version.as_str().cmp(latest_version)
150 }
151
152 pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
154 let db_version = match db
155 .get_pinned_default(MIGRATION_VERSION_KEY)
156 .expect("get the version of database")
157 {
158 Some(version_bytes) => {
159 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
160 }
161 None => {
162 return self.is_non_empty_rdb(db);
165 }
166 };
167
168 let migrations = self
169 .migrations
170 .values()
171 .filter(|m| include_background || !m.run_in_background());
172
173 migrations
174 .skip_while(|m| m.version() <= db_version.as_str())
175 .any(|m| m.expensive())
176 }
177
178 pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
180 let db_version = match db
181 .get_pinned_default(MIGRATION_VERSION_KEY)
182 .expect("get the version of database")
183 {
184 Some(version_bytes) => {
185 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
186 }
187 None => {
188 return self.is_non_empty_rdb(db);
191 }
192 };
193
194 self.migrations
195 .values()
196 .skip_while(|m| m.version() <= db_version.as_str())
197 .all(|m| m.run_in_background())
198 }
199
200 fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
201 if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
202 if v.is_some() {
203 return true;
204 }
205 }
206 false
207 }
208
209 fn is_non_empty_db(&self, db: &RocksDB) -> bool {
210 if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
211 if v.is_some() {
212 return true;
213 }
214 }
215 false
216 }
217
218 fn run_migrate(&self, mut db: RocksDB, v: &str) -> Result<RocksDB, Error> {
219 let mpb = Arc::new(MultiProgress::new());
220 let migrations: BTreeMap<_, _> = self
221 .migrations
222 .iter()
223 .filter(|(mv, _)| mv.as_str() > v)
224 .collect();
225 let migrations_count = migrations.len();
226 for (idx, (_, m)) in migrations.iter().enumerate() {
227 let mpbc = Arc::clone(&mpb);
228 let pb = move |count: u64| -> ProgressBar {
229 let pb = mpbc.add(ProgressBar::new(count));
230 pb.set_draw_target(ProgressDrawTarget::term(Term::stdout(), None));
231 pb.set_prefix(format!("[{}/{}]", idx + 1, migrations_count));
232 pb
233 };
234 db = m.migrate(db, Arc::new(pb))?;
235 db.put_default(MIGRATION_VERSION_KEY, m.version())
236 .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
237 }
238 mpb.join_and_clear().expect("MultiProgress join");
239 Ok(db)
240 }
241
242 fn run_migrate_async(&self, db: RocksDB, v: &str) {
243 let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
244 .migrations
245 .iter()
246 .filter(|(mv, _)| mv.as_str() > v)
247 .map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
248 .collect::<VecDeque<_>>();
249
250 let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
251 let tasks = Arc::new(Mutex::new(migrations));
252 let (tx, rx) = unbounded();
253 let worker = MigrationWorker::new(tasks, db, rx);
254
255 let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
256 let clone = v.to_string();
257 let tx_clone = tx.clone();
258 let notifier = thread::spawn(move || {
259 let _ = exit_signal.recv();
260 let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
261 let _ = tx_clone.send(Command::Stop);
262 info!("set shutdown flag to true: {:?} version: {}", res, clone);
263 });
264 register_thread("migration-notifier", notifier);
265
266 let handler = worker.start();
267 if all_can_resume {
268 info!("register thread: migration ....");
269 register_thread("migration", handler);
270 }
271 tx.send(Command::Start).expect("send start command");
272 }
273
274 fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
275 let raw = db
276 .get_pinned_default(MIGRATION_VERSION_KEY)
277 .map_err(|err| {
278 internal_error(format!("failed to get the version of database: {err}"))
279 })?;
280
281 Ok(raw.map(|version_bytes| {
282 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
283 }))
284 }
285
286 pub fn init_db_version(&self, db: &RocksDB) -> Result<(), Error> {
288 let db_version = self.get_migration_version(db)?;
289 if db_version.is_none() {
290 if let Some(m) = self.migrations.values().last() {
291 info!("Init database version {}", m.version());
292 db.put_default(MIGRATION_VERSION_KEY, m.version())
293 .map_err(|err| {
294 internal_error(format!("failed to migrate the database: {err}"))
295 })?;
296 }
297 }
298 Ok(())
299 }
300
301 pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
303 let db_version = self.get_migration_version(&db)?;
304 match db_version {
305 Some(ref v) => {
306 info!("Current database version {}", v);
307 self.check_migration_downgrade(v)?;
308 let db = if !run_in_background {
309 self.run_migrate(db, v.as_str())?
310 } else {
311 self.run_migrate_async(db.clone(), v.as_str());
312 db
313 };
314 Ok(db)
315 }
316 None => {
317 if self.is_non_empty_db(&db) {
320 return self.patch_220464f(db);
321 }
322 Ok(db)
323 }
324 }
325 }
326
327 fn patch_220464f(&self, db: RocksDB) -> Result<RocksDB, Error> {
328 const V: &str = "20210609195048"; self.run_migrate(db, V)
330 }
331
332 fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
333 if let Some(m) = self.migrations.values().last() {
334 if m.version() < cur_version {
335 error!(
336 "Database downgrade detected. \
337 The database schema version is newer than `ckb` schema version,\
338 please upgrade `ckb` to the latest version"
339 );
340 return Err(internal_error(
341 "Database downgrade is not supported".to_string(),
342 ));
343 }
344 }
345 Ok(())
346 }
347}
348
349pub trait Migration: Send + Sync {
351 fn migrate(
353 &self,
354 _db: RocksDB,
355 _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
356 ) -> Result<RocksDB, Error>;
357
358 fn version(&self) -> &str;
360
361 fn expensive(&self) -> bool {
365 true
366 }
367
368 fn run_in_background(&self) -> bool {
370 false
371 }
372
373 fn stop_background(&self) -> bool {
377 *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
378 }
379
380 fn can_resume(&self) -> bool {
389 false
390 }
391}
392
393pub struct DefaultMigration {
395 version: String,
396}
397
398impl DefaultMigration {
399 pub fn new(version: &str) -> Self {
401 Self {
402 version: version.to_string(),
403 }
404 }
405}
406
407impl Migration for DefaultMigration {
408 fn migrate(
409 &self,
410 db: RocksDB,
411 _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
412 ) -> Result<RocksDB, Error> {
413 Ok(db)
414 }
415
416 fn version(&self) -> &str {
417 &self.version
418 }
419
420 fn expensive(&self) -> bool {
421 false
422 }
423}