1use ckb_channel::Receiver;
6use ckb_channel::select;
7use ckb_channel::unbounded;
8use ckb_db::{ReadOnlyDB, RocksDB};
9use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
10use ckb_error::{Error, InternalErrorKind};
11use ckb_logger::{debug, error, info};
12use ckb_stop_handler::register_thread;
13pub use indicatif::{HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
14use std::cmp::Ordering;
15use std::collections::BTreeMap;
16use std::collections::VecDeque;
17use std::sync::Arc;
18use std::sync::Mutex;
19use std::sync::OnceLock;
20use std::thread;
21use std::thread::JoinHandle;
22
23pub static SHUTDOWN_BACKGROUND_MIGRATION: OnceLock<bool> = OnceLock::new();
25
26#[cfg(test)]
27mod tests;
28
29fn internal_error(reason: String) -> Error {
30 InternalErrorKind::Database.other(reason).into()
31}
32
33#[derive(Default)]
35pub struct Migrations {
36 migrations: BTreeMap<String, Arc<dyn Migration>>,
37}
38
39#[derive(PartialEq, Eq, Debug)]
41enum Command {
42 Start,
43 Stop,
44}
45
46type MigrationTasks = VecDeque<(String, Arc<dyn Migration>)>;
47struct MigrationWorker {
48 tasks: Arc<Mutex<MigrationTasks>>,
49 db: RocksDB,
50 inbox: Receiver<Command>,
51}
52
53impl MigrationWorker {
54 pub fn new(tasks: Arc<Mutex<MigrationTasks>>, db: RocksDB, inbox: Receiver<Command>) -> Self {
55 Self { tasks, db, inbox }
56 }
57
58 pub fn start(self) -> JoinHandle<()> {
59 thread::spawn(move || {
60 if let Ok(Command::Start) = self.inbox.recv() {
61 let mut idx = 0;
62 let migrations_count = self.tasks.lock().unwrap().len() as u64;
63 let mpb = Arc::new(MultiProgress::new());
64
65 while let Some((name, task)) = self.tasks.lock().unwrap().pop_front() {
66 select! {
67 recv(self.inbox) -> msg => {
68 if let Ok(Command::Stop) = msg {
69 eprintln!("stop to run migrate in background: {}", name);
70 break;
71 }
72 }
73 default => {
74 eprintln!("start to run migrate in background: {}", name);
75 let mpbc = Arc::clone(&mpb);
76 idx += 1;
77 let pb = move |count: u64| -> ProgressBar {
78 let pb = mpbc.add(ProgressBar::new(count));
79 pb.set_draw_target(ProgressDrawTarget::stderr());
80 pb.set_prefix(format!("[{}/{}]", idx, migrations_count));
81 pb
82 };
83 if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
84 db.put_default(MIGRATION_VERSION_KEY, task.version())
85 .map_err(|err| {
86 internal_error(format!("failed to migrate the database: {err}"))
87 })
88 .unwrap();
89 }
90 }
91 }
92 }
93 }
94 })
95 }
96}
97
98impl Migrations {
99 pub fn new() -> Self {
101 Migrations {
102 migrations: BTreeMap::new(),
103 }
104 }
105
106 pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
108 self.migrations
109 .insert(migration.version().to_string(), migration);
110 }
111
112 pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
121 let db_version = match db
122 .get_pinned_default(MIGRATION_VERSION_KEY)
123 .expect("get the version of database")
124 {
125 Some(version_bytes) => {
126 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
127 }
128 None => {
129 if self.is_non_empty_rdb(db) {
132 return Ordering::Less;
133 } else {
134 return Ordering::Equal;
135 }
136 }
137 };
138 debug!("Current database version [{}]", db_version);
139
140 let mut migrations = self
141 .migrations
142 .values()
143 .filter(|m| include_background || !m.run_in_background());
144
145 let latest_version = migrations
146 .next_back()
147 .unwrap_or_else(|| panic!("should have at least one version"))
148 .version();
149 debug!("Latest database version [{}]", latest_version);
150
151 db_version.as_str().cmp(latest_version)
152 }
153
154 pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
156 let db_version = match db
157 .get_pinned_default(MIGRATION_VERSION_KEY)
158 .expect("get the version of database")
159 {
160 Some(version_bytes) => {
161 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
162 }
163 None => {
164 return self.is_non_empty_rdb(db);
167 }
168 };
169
170 let migrations = self
171 .migrations
172 .values()
173 .filter(|m| include_background || !m.run_in_background());
174
175 migrations
176 .skip_while(|m| m.version() <= db_version.as_str())
177 .any(|m| m.expensive())
178 }
179
180 pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
182 let db_version = match db
183 .get_pinned_default(MIGRATION_VERSION_KEY)
184 .expect("get the version of database")
185 {
186 Some(version_bytes) => {
187 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
188 }
189 None => {
190 return self.is_non_empty_rdb(db);
193 }
194 };
195
196 self.migrations
197 .values()
198 .skip_while(|m| m.version() <= db_version.as_str())
199 .all(|m| m.run_in_background())
200 }
201
202 fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
203 if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY)
204 && v.is_some()
205 {
206 return true;
207 }
208 false
209 }
210
211 fn is_non_empty_db(&self, db: &RocksDB) -> bool {
212 if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY)
213 && v.is_some()
214 {
215 return true;
216 }
217 false
218 }
219
220 fn run_migrate(&self, mut db: RocksDB, v: &str) -> Result<RocksDB, Error> {
221 let mpb = Arc::new(MultiProgress::new());
222 let migrations: BTreeMap<_, _> = self
223 .migrations
224 .iter()
225 .filter(|(mv, _)| mv.as_str() > v)
226 .collect();
227 let migrations_count = migrations.len();
228 for (idx, (_, m)) in migrations.iter().enumerate() {
229 let mpbc = Arc::clone(&mpb);
230 let pb = move |count: u64| -> ProgressBar {
231 let pb = mpbc.add(ProgressBar::new(count));
232 pb.set_draw_target(ProgressDrawTarget::stderr());
233 pb.set_prefix(format!("[{}/{}]", idx + 1, migrations_count));
234 pb
235 };
236 db = m.migrate(db, Arc::new(pb))?;
237 db.put_default(MIGRATION_VERSION_KEY, m.version())
238 .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
239 }
240 Ok(db)
241 }
242
243 fn run_migrate_async(&self, db: RocksDB, v: &str) {
244 let migrations: VecDeque<(String, Arc<dyn Migration>)> = self
245 .migrations
246 .iter()
247 .filter(|(mv, _)| mv.as_str() > v)
248 .map(|(mv, m)| (mv.to_string(), Arc::clone(m)))
249 .collect::<VecDeque<_>>();
250
251 let all_can_resume = migrations.iter().all(|(_, m)| m.can_resume());
252 let tasks = Arc::new(Mutex::new(migrations));
253 let (tx, rx) = unbounded();
254 let worker = MigrationWorker::new(tasks, db, rx);
255
256 let exit_signal = ckb_stop_handler::new_crossbeam_exit_rx();
257 let clone = v.to_string();
258 let tx_clone = tx.clone();
259 let notifier = thread::spawn(move || {
260 let _ = exit_signal.recv();
261 let res = SHUTDOWN_BACKGROUND_MIGRATION.set(true);
262 let _ = tx_clone.send(Command::Stop);
263 info!("set shutdown flag to true: {:?} version: {}", res, clone);
264 });
265 register_thread("migration-notifier", notifier);
266
267 let handler = worker.start();
268 if all_can_resume {
269 info!("register thread: migration ....");
270 register_thread("migration", handler);
271 }
272 tx.send(Command::Start).expect("send start command");
273 }
274
275 fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
276 let raw = db
277 .get_pinned_default(MIGRATION_VERSION_KEY)
278 .map_err(|err| {
279 internal_error(format!("failed to get the version of database: {err}"))
280 })?;
281
282 Ok(raw.map(|version_bytes| {
283 String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
284 }))
285 }
286
287 pub fn init_db_version(&self, db: &RocksDB) -> Result<(), Error> {
289 let db_version = self.get_migration_version(db)?;
290 if db_version.is_none()
291 && let Some(m) = self.migrations.values().last()
292 {
293 info!("Init database version {}", m.version());
294 db.put_default(MIGRATION_VERSION_KEY, m.version())
295 .map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
296 }
297 Ok(())
298 }
299
300 pub fn migrate(&self, db: RocksDB, run_in_background: bool) -> Result<RocksDB, Error> {
304 let db_version = self.get_migration_version(&db)?;
305 match db_version {
306 Some(ref v) => {
307 info!("Current database version {}", v);
308 self.check_migration_downgrade(v)?;
309 let db = if !run_in_background {
310 self.run_migrate(db, v.as_str())?
311 } else {
312 self.run_migrate_async(db.clone(), v.as_str());
313 db
314 };
315 Ok(db)
316 }
317 None => {
318 if self.is_non_empty_db(&db) {
321 return self.patch_220464f(db);
322 }
323 Ok(db)
324 }
325 }
326 }
327
328 fn patch_220464f(&self, db: RocksDB) -> Result<RocksDB, Error> {
329 const V: &str = "20210609195048"; self.run_migrate(db, V)
331 }
332
333 fn check_migration_downgrade(&self, cur_version: &str) -> Result<(), Error> {
334 if let Some(m) = self.migrations.values().last()
335 && m.version() < cur_version
336 {
337 error!(
338 "Database downgrade detected. \
339 The database schema version is newer than `ckb` schema version,\
340 please upgrade `ckb` to the latest version"
341 );
342 return Err(internal_error(
343 "Database downgrade is not supported".to_string(),
344 ));
345 }
346 Ok(())
347 }
348}
349
350pub trait Migration: Send + Sync {
352 fn migrate(
356 &self,
357 _db: RocksDB,
358 _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
359 ) -> Result<RocksDB, Error>;
360
361 fn version(&self) -> &str;
363
364 fn expensive(&self) -> bool {
368 true
369 }
370
371 fn run_in_background(&self) -> bool {
373 false
374 }
375
376 fn stop_background(&self) -> bool {
380 *SHUTDOWN_BACKGROUND_MIGRATION.get().unwrap_or(&false)
381 }
382
383 fn can_resume(&self) -> bool {
392 false
393 }
394}
395
396pub struct DefaultMigration {
398 version: String,
399}
400
401impl DefaultMigration {
402 pub fn new(version: &str) -> Self {
404 Self {
405 version: version.to_string(),
406 }
407 }
408}
409
410impl Migration for DefaultMigration {
411 fn migrate(
412 &self,
413 db: RocksDB,
414 _pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
415 ) -> Result<RocksDB, Error> {
416 Ok(db)
417 }
418
419 fn version(&self) -> &str {
420 &self.version
421 }
422
423 fn expensive(&self) -> bool {
424 false
425 }
426}